Skip to main content

restate_sdk_shared_core/
lib.rs

1pub mod error;
2pub mod fmt;
3mod headers;
4#[cfg(feature = "request_identity")]
5mod request_identity;
6mod retries;
7mod service_protocol;
8mod vm;
9
10use bytes::Bytes;
11use std::borrow::Cow;
12use std::time::Duration;
13
14pub use crate::retries::RetryPolicy;
15pub use error::Error;
16pub use headers::HeaderMap;
17#[cfg(feature = "request_identity")]
18pub use request_identity::*;
19pub use service_protocol::Version;
20pub use vm::CoreVM;
21
22/// Options for syscalls that involve payload serialization.
23/// Use this to indicate when payload bytes may differ between executions
24/// (e.g., when using non-deterministic serialization like protojson).
25#[derive(Debug, Clone, Copy, Default)]
26pub struct PayloadOptions {
27    /// If true, skip payload byte equality checks during replay.
28    /// Use this when the serialization format is non-deterministic.
29    pub unstable_serialization: bool,
30}
31
32impl PayloadOptions {
33    /// Create options indicating stable (deterministic) serialization (default).
34    pub fn stable() -> Self {
35        Self {
36            unstable_serialization: false,
37        }
38    }
39
40    /// Create options indicating unstable (non-deterministic) serialization.
41    /// Payload byte equality will be skipped during replay.
42    pub fn unstable() -> Self {
43        Self {
44            unstable_serialization: true,
45        }
46    }
47}
48
49#[derive(Debug, Eq, PartialEq)]
50pub struct Header {
51    pub key: Cow<'static, str>,
52    pub value: Cow<'static, str>,
53}
54
55#[derive(Debug)]
56pub struct ResponseHead {
57    pub status_code: u16,
58    pub headers: Vec<Header>,
59    pub version: Version,
60}
61
62#[derive(Debug, Eq, PartialEq)]
63pub struct Input {
64    pub invocation_id: String,
65    pub random_seed: u64,
66    pub key: String,
67    pub headers: Vec<Header>,
68    pub input: Bytes,
69}
70
71#[derive(Debug, Copy, Clone, Eq, PartialEq)]
72pub enum CommandType {
73    Input,
74    Output,
75    GetState,
76    GetStateKeys,
77    SetState,
78    ClearState,
79    ClearAllState,
80    GetPromise,
81    PeekPromise,
82    CompletePromise,
83    Sleep,
84    Call,
85    OneWayCall,
86    SendSignal,
87    Run,
88    AttachInvocation,
89    GetInvocationOutput,
90    CompleteAwakeable,
91    CancelInvocation,
92}
93
94impl std::fmt::Display for CommandType {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        write!(f, "{}", fmt::format_command_ty(*self))
97    }
98}
99
100/// Used in `notify_error` to specify which command this error relates to.
101#[derive(Debug, Clone, Eq, PartialEq)]
102pub enum CommandRelationship {
103    Last,
104    Next {
105        ty: CommandType,
106        name: Option<Cow<'static, str>>,
107    },
108    Specific {
109        command_index: u32,
110        ty: CommandType,
111        name: Option<Cow<'static, str>>,
112    },
113}
114
115#[derive(Debug, Eq, PartialEq)]
116pub struct Target {
117    pub service: String,
118    pub handler: String,
119    pub key: Option<String>,
120    pub idempotency_key: Option<String>,
121    pub headers: Vec<Header>,
122}
123
124pub const CANCEL_NOTIFICATION_HANDLE: NotificationHandle = NotificationHandle(1);
125
126#[derive(Debug, Hash, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
127pub struct NotificationHandle(u32);
128
129impl From<u32> for NotificationHandle {
130    fn from(value: u32) -> Self {
131        NotificationHandle(value)
132    }
133}
134
135impl From<NotificationHandle> for u32 {
136    fn from(value: NotificationHandle) -> Self {
137        value.0
138    }
139}
140
141#[derive(Debug, Clone, Copy, Eq, PartialEq)]
142pub struct CallHandle {
143    pub invocation_id_notification_handle: NotificationHandle,
144    pub call_notification_handle: NotificationHandle,
145}
146
147#[derive(Debug, Clone, Copy, Eq, PartialEq)]
148pub struct SendHandle {
149    pub invocation_id_notification_handle: NotificationHandle,
150}
151
152#[derive(Debug, Eq, PartialEq, strum::IntoStaticStr)]
153pub enum Value {
154    /// a void/None/undefined success
155    Void,
156    Success(Bytes),
157    Failure(TerminalFailure),
158    /// Only returned for get_state_keys
159    StateKeys(Vec<String>),
160    /// Only returned for get_call_invocation_id
161    InvocationId(String),
162}
163
164/// Terminal failure
165#[derive(Debug, Clone, Eq, PartialEq)]
166pub struct TerminalFailure {
167    pub code: u16,
168    pub message: String,
169    pub metadata: Vec<(String, String)>,
170}
171
172#[derive(Debug, Default)]
173pub struct EntryRetryInfo {
174    /// Number of retries that happened so far for this entry.
175    pub retry_count: u32,
176    /// Time spent in the current retry loop.
177    pub retry_loop_duration: Duration,
178}
179
180#[derive(Debug, Clone)]
181pub enum RunExitResult {
182    Success(Bytes),
183    TerminalFailure(TerminalFailure),
184    RetryableFailure {
185        attempt_duration: Duration,
186        error: Error,
187    },
188}
189
190#[derive(Debug, Clone)]
191pub enum NonEmptyValue {
192    Success(Bytes),
193    Failure(TerminalFailure),
194}
195
196impl From<NonEmptyValue> for Value {
197    fn from(value: NonEmptyValue) -> Self {
198        match value {
199            NonEmptyValue::Success(s) => Value::Success(s),
200            NonEmptyValue::Failure(f) => Value::Failure(f),
201        }
202    }
203}
204
205#[derive(Debug, Eq, PartialEq)]
206pub enum AttachInvocationTarget {
207    InvocationId(String),
208    WorkflowId {
209        name: String,
210        key: String,
211    },
212    IdempotencyId {
213        service_name: String,
214        service_key: Option<String>,
215        handler_name: String,
216        idempotency_key: String,
217    },
218}
219
220#[derive(Debug, Eq, PartialEq)]
221pub enum TakeOutputResult {
222    Buffer(Bytes),
223    EOF,
224}
225
226pub type VMResult<T> = Result<T, Error>;
227
228#[derive(Debug)]
229pub enum ImplicitCancellationOption {
230    Disabled,
231    Enabled {
232        cancel_children_calls: bool,
233        cancel_children_one_way_calls: bool,
234    },
235}
236
237#[derive(Debug, Default)]
238pub enum NonDeterministicChecksOption {
239    /// This will disable checking payloads (state values, rpc request, complete awakeable value),
240    /// but will still check all the other commands parameters.
241    PayloadChecksDisabled,
242    #[default]
243    Enabled,
244}
245
246#[derive(Debug)]
247pub struct VMOptions {
248    pub implicit_cancellation: ImplicitCancellationOption,
249    pub non_determinism_checks: NonDeterministicChecksOption,
250}
251
252impl Default for VMOptions {
253    fn default() -> Self {
254        Self {
255            implicit_cancellation: ImplicitCancellationOption::Enabled {
256                cancel_children_calls: true,
257                cancel_children_one_way_calls: false,
258            },
259            non_determinism_checks: NonDeterministicChecksOption::Enabled,
260        }
261    }
262}
263
264#[derive(Debug, PartialEq, Eq)]
265pub enum DoProgressResponse {
266    /// Any of the given AsyncResultHandle completed
267    AnyCompleted,
268    /// The SDK should read from input at this point
269    ReadFromInput,
270    /// The SDK should execute a pending run
271    ExecuteRun(NotificationHandle),
272    /// Any of the run given before with ExecuteRun is waiting for completion
273    WaitingPendingRun,
274    /// Returned only when [ImplicitCancellationOption::Enabled].
275    CancelSignalReceived,
276}
277
278pub trait VM: Sized {
279    fn new(request_headers: impl HeaderMap, options: VMOptions) -> VMResult<Self>;
280
281    fn get_response_head(&self) -> ResponseHead;
282
283    // --- Input stream
284
285    fn notify_input(&mut self, buffer: Bytes);
286
287    fn notify_input_closed(&mut self);
288
289    // --- Errors
290
291    fn notify_error(&mut self, error: Error, related_command: Option<CommandRelationship>);
292
293    // --- Output stream
294
295    fn take_output(&mut self) -> TakeOutputResult;
296
297    // --- Execution start waiting point
298
299    fn is_ready_to_execute(&self) -> VMResult<bool>;
300
301    // --- Async results
302
303    fn is_completed(&self, handle: NotificationHandle) -> bool;
304
305    fn do_progress(&mut self, any_handle: Vec<NotificationHandle>) -> VMResult<DoProgressResponse>;
306
307    fn take_notification(&mut self, handle: NotificationHandle) -> VMResult<Option<Value>>;
308
309    // --- Syscall(s)
310
311    fn sys_input(&mut self) -> VMResult<Input>;
312
313    fn sys_state_get(
314        &mut self,
315        key: String,
316        options: PayloadOptions,
317    ) -> VMResult<NotificationHandle>;
318
319    fn sys_state_get_keys(&mut self) -> VMResult<NotificationHandle>;
320
321    fn sys_state_set(&mut self, key: String, value: Bytes, options: PayloadOptions)
322        -> VMResult<()>;
323
324    fn sys_state_clear(&mut self, key: String) -> VMResult<()>;
325
326    fn sys_state_clear_all(&mut self) -> VMResult<()>;
327
328    /// Note: `now_since_unix_epoch` is only used for debugging purposes
329    fn sys_sleep(
330        &mut self,
331        name: String,
332        wake_up_time_since_unix_epoch: Duration,
333        now_since_unix_epoch: Option<Duration>,
334    ) -> VMResult<NotificationHandle>;
335
336    fn sys_call(
337        &mut self,
338        target: Target,
339        input: Bytes,
340        name: Option<String>,
341        options: PayloadOptions,
342    ) -> VMResult<CallHandle>;
343
344    fn sys_send(
345        &mut self,
346        target: Target,
347        input: Bytes,
348        execution_time_since_unix_epoch: Option<Duration>,
349        name: Option<String>,
350        options: PayloadOptions,
351    ) -> VMResult<SendHandle>;
352
353    fn sys_awakeable(&mut self) -> VMResult<(String, NotificationHandle)>;
354
355    fn sys_complete_awakeable(
356        &mut self,
357        id: String,
358        value: NonEmptyValue,
359        options: PayloadOptions,
360    ) -> VMResult<()>;
361
362    fn create_signal_handle(&mut self, signal_name: String) -> VMResult<NotificationHandle>;
363
364    fn sys_complete_signal(
365        &mut self,
366        target_invocation_id: String,
367        signal_name: String,
368        value: NonEmptyValue,
369    ) -> VMResult<()>;
370
371    fn sys_get_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
372
373    fn sys_peek_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
374
375    fn sys_complete_promise(
376        &mut self,
377        key: String,
378        value: NonEmptyValue,
379        options: PayloadOptions,
380    ) -> VMResult<NotificationHandle>;
381
382    fn sys_run(&mut self, name: String) -> VMResult<NotificationHandle>;
383
384    fn propose_run_completion(
385        &mut self,
386        notification_handle: NotificationHandle,
387        value: RunExitResult,
388        retry_policy: RetryPolicy,
389    ) -> VMResult<()>;
390
391    fn sys_cancel_invocation(&mut self, target_invocation_id: String) -> VMResult<()>;
392
393    fn sys_attach_invocation(
394        &mut self,
395        target: AttachInvocationTarget,
396    ) -> VMResult<NotificationHandle>;
397
398    fn sys_get_invocation_output(
399        &mut self,
400        target: AttachInvocationTarget,
401    ) -> VMResult<NotificationHandle>;
402
403    fn sys_write_output(&mut self, value: NonEmptyValue, options: PayloadOptions) -> VMResult<()>;
404
405    fn sys_end(&mut self) -> VMResult<()>;
406
407    // Returns true if the state machine is waiting pre-flight to complete
408    fn is_waiting_preflight(&self) -> bool;
409
410    // Returns true if the state machine is replaying
411    fn is_replaying(&self) -> bool;
412
413    /// Returns true if the state machine is in processing state
414    fn is_processing(&self) -> bool;
415
416    /// Returns last command index. Returns `-1` if there was no progress in the journal.
417    fn last_command_index(&self) -> i64;
418}
419
420// HOW TO USE THIS API
421//
422// pre_user_code:
423//     while !vm.is_ready_to_execute() {
424//         match io.read_input() {
425//             buffer => vm.notify_input(buffer),
426//             EOF => vm.notify_input_closed()
427//         }
428//     }
429//
430// sys_[something]:
431//     try {
432//         vm.sys_[something]()
433//         io.write_out(vm.take_output())
434//     } catch (e) {
435//         log(e)
436//         io.write_out(vm.take_output())
437//         throw e
438//     }
439//
440// await_restate_future:
441//     vm.notify_await_point(handle);
442//     loop {
443//         // Result here can be value, not_ready, suspended, vm error
444//         let result = vm.take_async_result(handle);
445//         if result.is_not_ready() {
446//             match await io.read_input() {
447//                buffer => vm.notify_input(buffer),
448//                EOF => vm.notify_input_closed()
449//             }
450//         }
451//         return result
452//     }
453//
454// post_user_code:
455//     // Consume vm.take_output() until EOF
456//     while buffer = vm.take_output() {
457//         io.write_out(buffer)
458//     }
459//     io.close()
460
461#[cfg(test)]
462mod tests;