restate_sdk_shared_core/
lib.rs

1pub mod error;
2pub mod fmt;
3mod headers;
4#[cfg(feature = "request_identity")]
5mod request_identity;
6mod retries;
7mod service_protocol;
8mod vm;
9
10use bytes::Bytes;
11use std::borrow::Cow;
12use std::time::Duration;
13
14pub use crate::retries::RetryPolicy;
15pub use error::Error;
16pub use headers::HeaderMap;
17#[cfg(feature = "request_identity")]
18pub use request_identity::*;
19pub use service_protocol::Version;
20pub use vm::CoreVM;
21
22#[derive(Debug, Eq, PartialEq)]
23pub struct Header {
24    pub key: Cow<'static, str>,
25    pub value: Cow<'static, str>,
26}
27
28#[derive(Debug)]
29pub struct ResponseHead {
30    pub status_code: u16,
31    pub headers: Vec<Header>,
32    pub version: Version,
33}
34
35#[derive(Debug, Eq, PartialEq)]
36pub struct Input {
37    pub invocation_id: String,
38    pub random_seed: u64,
39    pub key: String,
40    pub headers: Vec<Header>,
41    pub input: Bytes,
42}
43
44#[derive(Debug, Clone, Eq, PartialEq)]
45pub enum CommandType {
46    Input,
47    Output,
48    GetState,
49    GetStateKeys,
50    SetState,
51    ClearState,
52    ClearAllState,
53    GetPromise,
54    PeekPromise,
55    CompletePromise,
56    Sleep,
57    Call,
58    OneWayCall,
59    SendSignal,
60    Run,
61    AttachInvocation,
62    GetInvocationOutput,
63    CompleteAwakeable,
64    CancelInvocation,
65}
66
67/// Used in `notify_error` to specify which command this error relates to.
68#[derive(Debug, Clone, Eq, PartialEq)]
69pub enum CommandRelationship {
70    Last,
71    Next {
72        ty: CommandType,
73        name: Option<Cow<'static, str>>,
74    },
75    Specific {
76        command_index: u32,
77        ty: CommandType,
78        name: Option<Cow<'static, str>>,
79    },
80}
81
82#[derive(Debug, Eq, PartialEq)]
83pub struct Target {
84    pub service: String,
85    pub handler: String,
86    pub key: Option<String>,
87    pub idempotency_key: Option<String>,
88    pub headers: Vec<Header>,
89}
90
91pub const CANCEL_NOTIFICATION_HANDLE: NotificationHandle = NotificationHandle(1);
92
93#[derive(Debug, Hash, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
94pub struct NotificationHandle(u32);
95
96impl From<u32> for NotificationHandle {
97    fn from(value: u32) -> Self {
98        NotificationHandle(value)
99    }
100}
101
102impl From<NotificationHandle> for u32 {
103    fn from(value: NotificationHandle) -> Self {
104        value.0
105    }
106}
107
108#[derive(Debug, Clone, Copy, Eq, PartialEq)]
109pub struct CallHandle {
110    pub invocation_id_notification_handle: NotificationHandle,
111    pub call_notification_handle: NotificationHandle,
112}
113
114#[derive(Debug, Clone, Copy, Eq, PartialEq)]
115pub struct SendHandle {
116    pub invocation_id_notification_handle: NotificationHandle,
117}
118
119#[derive(Debug, Eq, PartialEq, strum::IntoStaticStr)]
120pub enum Value {
121    /// a void/None/undefined success
122    Void,
123    Success(Bytes),
124    Failure(TerminalFailure),
125    /// Only returned for get_state_keys
126    StateKeys(Vec<String>),
127    /// Only returned for get_call_invocation_id
128    InvocationId(String),
129}
130
131/// Terminal failure
132#[derive(Debug, Clone, Eq, PartialEq)]
133pub struct TerminalFailure {
134    pub code: u16,
135    pub message: String,
136}
137
138#[derive(Debug, Default)]
139pub struct EntryRetryInfo {
140    /// Number of retries that happened so far for this entry.
141    pub retry_count: u32,
142    /// Time spent in the current retry loop.
143    pub retry_loop_duration: Duration,
144}
145
146#[derive(Debug, Clone)]
147pub enum RunExitResult {
148    Success(Bytes),
149    TerminalFailure(TerminalFailure),
150    RetryableFailure {
151        attempt_duration: Duration,
152        error: Error,
153    },
154}
155
156#[derive(Debug, Clone)]
157pub enum NonEmptyValue {
158    Success(Bytes),
159    Failure(TerminalFailure),
160}
161
162impl From<NonEmptyValue> for Value {
163    fn from(value: NonEmptyValue) -> Self {
164        match value {
165            NonEmptyValue::Success(s) => Value::Success(s),
166            NonEmptyValue::Failure(f) => Value::Failure(f),
167        }
168    }
169}
170
171#[derive(Debug, Eq, PartialEq)]
172pub enum AttachInvocationTarget {
173    InvocationId(String),
174    WorkflowId {
175        name: String,
176        key: String,
177    },
178    IdempotencyId {
179        service_name: String,
180        service_key: Option<String>,
181        handler_name: String,
182        idempotency_key: String,
183    },
184}
185
186#[derive(Debug, Eq, PartialEq)]
187pub enum TakeOutputResult {
188    Buffer(Bytes),
189    EOF,
190}
191
192pub type VMResult<T> = Result<T, Error>;
193
194#[derive(Debug)]
195pub enum ImplicitCancellationOption {
196    Disabled,
197    Enabled {
198        cancel_children_calls: bool,
199        cancel_children_one_way_calls: bool,
200    },
201}
202
203#[derive(Debug)]
204pub struct VMOptions {
205    pub implicit_cancellation: ImplicitCancellationOption,
206}
207
208impl Default for VMOptions {
209    fn default() -> Self {
210        Self {
211            implicit_cancellation: ImplicitCancellationOption::Enabled {
212                cancel_children_calls: true,
213                cancel_children_one_way_calls: false,
214            },
215        }
216    }
217}
218
219#[derive(Debug, PartialEq, Eq)]
220pub enum DoProgressResponse {
221    /// Any of the given AsyncResultHandle completed
222    AnyCompleted,
223    /// The SDK should read from input at this point
224    ReadFromInput,
225    /// The SDK should execute a pending run
226    ExecuteRun(NotificationHandle),
227    /// Any of the run given before with ExecuteRun is waiting for completion
228    WaitingPendingRun,
229    /// Returned only when [ImplicitCancellationOption::Enabled].
230    CancelSignalReceived,
231}
232
233pub trait VM: Sized {
234    fn new(request_headers: impl HeaderMap, options: VMOptions) -> VMResult<Self>;
235
236    fn get_response_head(&self) -> ResponseHead;
237
238    // --- Input stream
239
240    fn notify_input(&mut self, buffer: Bytes);
241
242    fn notify_input_closed(&mut self);
243
244    // --- Errors
245
246    fn notify_error(&mut self, error: Error, related_command: Option<CommandRelationship>);
247
248    // --- Output stream
249
250    fn take_output(&mut self) -> TakeOutputResult;
251
252    // --- Execution start waiting point
253
254    fn is_ready_to_execute(&self) -> VMResult<bool>;
255
256    // --- Async results
257
258    fn is_completed(&self, handle: NotificationHandle) -> bool;
259
260    fn do_progress(&mut self, any_handle: Vec<NotificationHandle>) -> VMResult<DoProgressResponse>;
261
262    fn take_notification(&mut self, handle: NotificationHandle) -> VMResult<Option<Value>>;
263
264    // --- Syscall(s)
265
266    fn sys_input(&mut self) -> VMResult<Input>;
267
268    fn sys_state_get(&mut self, key: String) -> VMResult<NotificationHandle>;
269
270    fn sys_state_get_keys(&mut self) -> VMResult<NotificationHandle>;
271
272    fn sys_state_set(&mut self, key: String, value: Bytes) -> VMResult<()>;
273
274    fn sys_state_clear(&mut self, key: String) -> VMResult<()>;
275
276    fn sys_state_clear_all(&mut self) -> VMResult<()>;
277
278    /// Note: `now_since_unix_epoch` is only used for debugging purposes
279    fn sys_sleep(
280        &mut self,
281        name: String,
282        wake_up_time_since_unix_epoch: Duration,
283        now_since_unix_epoch: Option<Duration>,
284    ) -> VMResult<NotificationHandle>;
285
286    fn sys_call(&mut self, target: Target, input: Bytes) -> VMResult<CallHandle>;
287
288    fn sys_send(
289        &mut self,
290        target: Target,
291        input: Bytes,
292        execution_time_since_unix_epoch: Option<Duration>,
293    ) -> VMResult<SendHandle>;
294
295    fn sys_awakeable(&mut self) -> VMResult<(String, NotificationHandle)>;
296
297    fn sys_complete_awakeable(&mut self, id: String, value: NonEmptyValue) -> VMResult<()>;
298
299    fn create_signal_handle(&mut self, signal_name: String) -> VMResult<NotificationHandle>;
300
301    fn sys_complete_signal(
302        &mut self,
303        target_invocation_id: String,
304        signal_name: String,
305        value: NonEmptyValue,
306    ) -> VMResult<()>;
307
308    fn sys_get_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
309
310    fn sys_peek_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
311
312    fn sys_complete_promise(
313        &mut self,
314        key: String,
315        value: NonEmptyValue,
316    ) -> VMResult<NotificationHandle>;
317
318    fn sys_run(&mut self, name: String) -> VMResult<NotificationHandle>;
319
320    fn propose_run_completion(
321        &mut self,
322        notification_handle: NotificationHandle,
323        value: RunExitResult,
324        retry_policy: RetryPolicy,
325    ) -> VMResult<()>;
326
327    fn sys_cancel_invocation(&mut self, target_invocation_id: String) -> VMResult<()>;
328
329    fn sys_attach_invocation(
330        &mut self,
331        target: AttachInvocationTarget,
332    ) -> VMResult<NotificationHandle>;
333
334    fn sys_get_invocation_output(
335        &mut self,
336        target: AttachInvocationTarget,
337    ) -> VMResult<NotificationHandle>;
338
339    fn sys_write_output(&mut self, value: NonEmptyValue) -> VMResult<()>;
340
341    fn sys_end(&mut self) -> VMResult<()>;
342
343    // Returns true if the state machine is waiting pre-flight to complete
344    fn is_waiting_preflight(&self) -> bool;
345
346    // Returns true if the state machine is replaying
347    fn is_replaying(&self) -> bool;
348
349    /// Returns true if the state machine is in processing state
350    fn is_processing(&self) -> bool;
351
352    /// Returns last command index. Returns `-1` if there was no progress in the journal.
353    fn last_command_index(&self) -> i64;
354}
355
356// HOW TO USE THIS API
357//
358// pre_user_code:
359//     while !vm.is_ready_to_execute() {
360//         match io.read_input() {
361//             buffer => vm.notify_input(buffer),
362//             EOF => vm.notify_input_closed()
363//         }
364//     }
365//
366// sys_[something]:
367//     try {
368//         vm.sys_[something]()
369//         io.write_out(vm.take_output())
370//     } catch (e) {
371//         log(e)
372//         io.write_out(vm.take_output())
373//         throw e
374//     }
375//
376// await_restate_future:
377//     vm.notify_await_point(handle);
378//     loop {
379//         // Result here can be value, not_ready, suspended, vm error
380//         let result = vm.take_async_result(handle);
381//         if result.is_not_ready() {
382//             match await io.read_input() {
383//                buffer => vm.notify_input(buffer),
384//                EOF => vm.notify_input_closed()
385//             }
386//         }
387//         return result
388//     }
389//
390// post_user_code:
391//     // Consume vm.take_output() until EOF
392//     while buffer = vm.take_output() {
393//         io.write_out(buffer)
394//     }
395//     io.close()
396
397#[cfg(test)]
398mod tests;