restate_sdk_shared_core/
lib.rs

1pub mod error;
2pub mod fmt;
3mod headers;
4#[cfg(feature = "request_identity")]
5mod request_identity;
6mod retries;
7mod service_protocol;
8mod vm;
9
10use bytes::Bytes;
11use std::borrow::Cow;
12use std::time::Duration;
13
14pub use crate::retries::RetryPolicy;
15pub use error::Error;
16pub use headers::HeaderMap;
17#[cfg(feature = "request_identity")]
18pub use request_identity::*;
19pub use service_protocol::Version;
20pub use vm::CoreVM;
21
22#[derive(Debug, Eq, PartialEq)]
23pub struct Header {
24    pub key: Cow<'static, str>,
25    pub value: Cow<'static, str>,
26}
27
28#[derive(Debug)]
29pub struct ResponseHead {
30    pub status_code: u16,
31    pub headers: Vec<Header>,
32    pub version: Version,
33}
34
35#[derive(Debug, Eq, PartialEq)]
36pub struct Input {
37    pub invocation_id: String,
38    pub random_seed: u64,
39    pub key: String,
40    pub headers: Vec<Header>,
41    pub input: Bytes,
42}
43
44#[derive(Debug, Clone, Eq, PartialEq)]
45pub enum CommandType {
46    Input,
47    Output,
48    GetState,
49    GetStateKeys,
50    SetState,
51    ClearState,
52    ClearAllState,
53    GetPromise,
54    PeekPromise,
55    CompletePromise,
56    Sleep,
57    Call,
58    OneWayCall,
59    SendSignal,
60    Run,
61    AttachInvocation,
62    GetInvocationOutput,
63    CompleteAwakeable,
64    CancelInvocation,
65}
66
67/// Used in `notify_error` to specify which command this error relates to.
68#[derive(Debug, Clone, Eq, PartialEq)]
69pub enum CommandRelationship {
70    Last,
71    Next {
72        ty: CommandType,
73        name: Option<Cow<'static, str>>,
74    },
75    Specific {
76        command_index: u32,
77        ty: CommandType,
78        name: Option<Cow<'static, str>>,
79    },
80}
81
82#[derive(Debug, Eq, PartialEq)]
83pub struct Target {
84    pub service: String,
85    pub handler: String,
86    pub key: Option<String>,
87    pub idempotency_key: Option<String>,
88    pub headers: Vec<Header>,
89}
90
91pub const CANCEL_NOTIFICATION_HANDLE: NotificationHandle = NotificationHandle(1);
92
93#[derive(Debug, Hash, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
94pub struct NotificationHandle(u32);
95
96impl From<u32> for NotificationHandle {
97    fn from(value: u32) -> Self {
98        NotificationHandle(value)
99    }
100}
101
102impl From<NotificationHandle> for u32 {
103    fn from(value: NotificationHandle) -> Self {
104        value.0
105    }
106}
107
108#[derive(Debug, Clone, Copy, Eq, PartialEq)]
109pub struct CallHandle {
110    pub invocation_id_notification_handle: NotificationHandle,
111    pub call_notification_handle: NotificationHandle,
112}
113
114#[derive(Debug, Clone, Copy, Eq, PartialEq)]
115pub struct SendHandle {
116    pub invocation_id_notification_handle: NotificationHandle,
117}
118
119#[derive(Debug, Eq, PartialEq, strum::IntoStaticStr)]
120pub enum Value {
121    /// a void/None/undefined success
122    Void,
123    Success(Bytes),
124    Failure(TerminalFailure),
125    /// Only returned for get_state_keys
126    StateKeys(Vec<String>),
127    /// Only returned for get_call_invocation_id
128    InvocationId(String),
129}
130
131/// Terminal failure
132#[derive(Debug, Clone, Eq, PartialEq)]
133pub struct TerminalFailure {
134    pub code: u16,
135    pub message: String,
136}
137
138#[derive(Debug, Default)]
139pub struct EntryRetryInfo {
140    /// Number of retries that happened so far for this entry.
141    pub retry_count: u32,
142    /// Time spent in the current retry loop.
143    pub retry_loop_duration: Duration,
144}
145
146#[derive(Debug, Clone)]
147pub enum RunExitResult {
148    Success(Bytes),
149    TerminalFailure(TerminalFailure),
150    RetryableFailure {
151        attempt_duration: Duration,
152        error: Error,
153    },
154}
155
156#[derive(Debug, Clone)]
157pub enum NonEmptyValue {
158    Success(Bytes),
159    Failure(TerminalFailure),
160}
161
162impl From<NonEmptyValue> for Value {
163    fn from(value: NonEmptyValue) -> Self {
164        match value {
165            NonEmptyValue::Success(s) => Value::Success(s),
166            NonEmptyValue::Failure(f) => Value::Failure(f),
167        }
168    }
169}
170
171#[derive(Debug, Eq, PartialEq)]
172pub enum AttachInvocationTarget {
173    InvocationId(String),
174    WorkflowId {
175        name: String,
176        key: String,
177    },
178    IdempotencyId {
179        service_name: String,
180        service_key: Option<String>,
181        handler_name: String,
182        idempotency_key: String,
183    },
184}
185
186#[derive(Debug, Eq, PartialEq)]
187pub enum TakeOutputResult {
188    Buffer(Bytes),
189    EOF,
190}
191
192pub type VMResult<T> = Result<T, Error>;
193
194#[derive(Debug)]
195pub enum ImplicitCancellationOption {
196    Disabled,
197    Enabled {
198        cancel_children_calls: bool,
199        cancel_children_one_way_calls: bool,
200    },
201}
202
203#[derive(Debug, Default)]
204pub enum NonDeterministicChecksOption {
205    /// This will disable checking payloads (state values, rpc request, complete awakeable value),
206    /// but will still check all the other commands parameters.
207    PayloadChecksDisabled,
208    #[default]
209    Enabled,
210}
211
212#[derive(Debug)]
213pub struct VMOptions {
214    pub implicit_cancellation: ImplicitCancellationOption,
215    pub non_determinism_checks: NonDeterministicChecksOption,
216}
217
218impl Default for VMOptions {
219    fn default() -> Self {
220        Self {
221            implicit_cancellation: ImplicitCancellationOption::Enabled {
222                cancel_children_calls: true,
223                cancel_children_one_way_calls: false,
224            },
225            non_determinism_checks: NonDeterministicChecksOption::Enabled,
226        }
227    }
228}
229
230#[derive(Debug, PartialEq, Eq)]
231pub enum DoProgressResponse {
232    /// Any of the given AsyncResultHandle completed
233    AnyCompleted,
234    /// The SDK should read from input at this point
235    ReadFromInput,
236    /// The SDK should execute a pending run
237    ExecuteRun(NotificationHandle),
238    /// Any of the run given before with ExecuteRun is waiting for completion
239    WaitingPendingRun,
240    /// Returned only when [ImplicitCancellationOption::Enabled].
241    CancelSignalReceived,
242}
243
244pub trait VM: Sized {
245    fn new(request_headers: impl HeaderMap, options: VMOptions) -> VMResult<Self>;
246
247    fn get_response_head(&self) -> ResponseHead;
248
249    // --- Input stream
250
251    fn notify_input(&mut self, buffer: Bytes);
252
253    fn notify_input_closed(&mut self);
254
255    // --- Errors
256
257    fn notify_error(&mut self, error: Error, related_command: Option<CommandRelationship>);
258
259    // --- Output stream
260
261    fn take_output(&mut self) -> TakeOutputResult;
262
263    // --- Execution start waiting point
264
265    fn is_ready_to_execute(&self) -> VMResult<bool>;
266
267    // --- Async results
268
269    fn is_completed(&self, handle: NotificationHandle) -> bool;
270
271    fn do_progress(&mut self, any_handle: Vec<NotificationHandle>) -> VMResult<DoProgressResponse>;
272
273    fn take_notification(&mut self, handle: NotificationHandle) -> VMResult<Option<Value>>;
274
275    // --- Syscall(s)
276
277    fn sys_input(&mut self) -> VMResult<Input>;
278
279    fn sys_state_get(&mut self, key: String) -> VMResult<NotificationHandle>;
280
281    fn sys_state_get_keys(&mut self) -> VMResult<NotificationHandle>;
282
283    fn sys_state_set(&mut self, key: String, value: Bytes) -> VMResult<()>;
284
285    fn sys_state_clear(&mut self, key: String) -> VMResult<()>;
286
287    fn sys_state_clear_all(&mut self) -> VMResult<()>;
288
289    /// Note: `now_since_unix_epoch` is only used for debugging purposes
290    fn sys_sleep(
291        &mut self,
292        name: String,
293        wake_up_time_since_unix_epoch: Duration,
294        now_since_unix_epoch: Option<Duration>,
295    ) -> VMResult<NotificationHandle>;
296
297    fn sys_call(&mut self, target: Target, input: Bytes) -> VMResult<CallHandle>;
298
299    fn sys_send(
300        &mut self,
301        target: Target,
302        input: Bytes,
303        execution_time_since_unix_epoch: Option<Duration>,
304    ) -> VMResult<SendHandle>;
305
306    fn sys_awakeable(&mut self) -> VMResult<(String, NotificationHandle)>;
307
308    fn sys_complete_awakeable(&mut self, id: String, value: NonEmptyValue) -> VMResult<()>;
309
310    fn create_signal_handle(&mut self, signal_name: String) -> VMResult<NotificationHandle>;
311
312    fn sys_complete_signal(
313        &mut self,
314        target_invocation_id: String,
315        signal_name: String,
316        value: NonEmptyValue,
317    ) -> VMResult<()>;
318
319    fn sys_get_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
320
321    fn sys_peek_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
322
323    fn sys_complete_promise(
324        &mut self,
325        key: String,
326        value: NonEmptyValue,
327    ) -> VMResult<NotificationHandle>;
328
329    fn sys_run(&mut self, name: String) -> VMResult<NotificationHandle>;
330
331    fn propose_run_completion(
332        &mut self,
333        notification_handle: NotificationHandle,
334        value: RunExitResult,
335        retry_policy: RetryPolicy,
336    ) -> VMResult<()>;
337
338    fn sys_cancel_invocation(&mut self, target_invocation_id: String) -> VMResult<()>;
339
340    fn sys_attach_invocation(
341        &mut self,
342        target: AttachInvocationTarget,
343    ) -> VMResult<NotificationHandle>;
344
345    fn sys_get_invocation_output(
346        &mut self,
347        target: AttachInvocationTarget,
348    ) -> VMResult<NotificationHandle>;
349
350    fn sys_write_output(&mut self, value: NonEmptyValue) -> VMResult<()>;
351
352    fn sys_end(&mut self) -> VMResult<()>;
353
354    // Returns true if the state machine is waiting pre-flight to complete
355    fn is_waiting_preflight(&self) -> bool;
356
357    // Returns true if the state machine is replaying
358    fn is_replaying(&self) -> bool;
359
360    /// Returns true if the state machine is in processing state
361    fn is_processing(&self) -> bool;
362
363    /// Returns last command index. Returns `-1` if there was no progress in the journal.
364    fn last_command_index(&self) -> i64;
365}
366
367// HOW TO USE THIS API
368//
369// pre_user_code:
370//     while !vm.is_ready_to_execute() {
371//         match io.read_input() {
372//             buffer => vm.notify_input(buffer),
373//             EOF => vm.notify_input_closed()
374//         }
375//     }
376//
377// sys_[something]:
378//     try {
379//         vm.sys_[something]()
380//         io.write_out(vm.take_output())
381//     } catch (e) {
382//         log(e)
383//         io.write_out(vm.take_output())
384//         throw e
385//     }
386//
387// await_restate_future:
388//     vm.notify_await_point(handle);
389//     loop {
390//         // Result here can be value, not_ready, suspended, vm error
391//         let result = vm.take_async_result(handle);
392//         if result.is_not_ready() {
393//             match await io.read_input() {
394//                buffer => vm.notify_input(buffer),
395//                EOF => vm.notify_input_closed()
396//             }
397//         }
398//         return result
399//     }
400//
401// post_user_code:
402//     // Consume vm.take_output() until EOF
403//     while buffer = vm.take_output() {
404//         io.write_out(buffer)
405//     }
406//     io.close()
407
408#[cfg(test)]
409mod tests;