restate_sdk_shared_core/
lib.rs

1mod headers;
2#[cfg(feature = "request_identity")]
3mod request_identity;
4mod retries;
5mod service_protocol;
6mod vm;
7
8use bytes::Bytes;
9use std::borrow::Cow;
10use std::time::Duration;
11
12pub use crate::retries::RetryPolicy;
13pub use headers::HeaderMap;
14#[cfg(feature = "request_identity")]
15pub use request_identity::*;
16pub use service_protocol::Version;
17pub use vm::CoreVM;
18
19// Re-export only some stuff from vm::errors
20pub mod error {
21    pub use crate::vm::errors::codes;
22    pub use crate::vm::errors::InvocationErrorCode;
23}
24
25#[derive(Debug, Eq, PartialEq)]
26pub struct Header {
27    pub key: Cow<'static, str>,
28    pub value: Cow<'static, str>,
29}
30
31#[derive(Debug)]
32pub struct ResponseHead {
33    pub status_code: u16,
34    pub headers: Vec<Header>,
35    pub version: Version,
36}
37
38#[derive(Debug, Clone, Copy, thiserror::Error)]
39#[error("Suspended execution")]
40pub struct SuspendedError;
41
42#[derive(Debug, Clone, thiserror::Error)]
43#[error("State machine error [{code}]: {message}. Stacktrace: {stacktrace}")]
44pub struct Error {
45    code: u16,
46    message: Cow<'static, str>,
47    stacktrace: Cow<'static, str>,
48}
49
50impl Error {
51    pub fn new(code: impl Into<u16>, message: impl Into<Cow<'static, str>>) -> Self {
52        Error {
53            code: code.into(),
54            message: message.into(),
55            stacktrace: Default::default(),
56        }
57    }
58
59    pub fn internal(message: impl Into<Cow<'static, str>>) -> Self {
60        Self::new(error::codes::INTERNAL, message)
61    }
62
63    pub fn code(&self) -> u16 {
64        self.code
65    }
66
67    pub fn message(&self) -> &str {
68        &self.message
69    }
70
71    pub fn description(&self) -> &str {
72        &self.stacktrace
73    }
74
75    pub fn with_stacktrace(mut self, stacktrace: impl Into<Cow<'static, str>>) -> Self {
76        self.stacktrace = stacktrace.into();
77        self
78    }
79
80    /// Append the given description to the original one, in case the code is the same
81    pub fn append_description_for_code(
82        mut self,
83        code: impl Into<u16>,
84        description: impl Into<Cow<'static, str>>,
85    ) -> Self {
86        let c = code.into();
87        if self.code == c {
88            if self.stacktrace.is_empty() {
89                self.stacktrace = description.into();
90            } else {
91                self.stacktrace = format!("{}. {}", self.stacktrace, description.into()).into();
92            }
93            self
94        } else {
95            self
96        }
97    }
98}
99
100#[derive(Debug, Clone, thiserror::Error)]
101pub enum SuspendedOrVMError {
102    #[error(transparent)]
103    Suspended(SuspendedError),
104    #[error(transparent)]
105    VM(Error),
106}
107
108#[derive(Debug, Eq, PartialEq)]
109pub struct Input {
110    pub invocation_id: String,
111    pub random_seed: u64,
112    pub key: String,
113    pub headers: Vec<Header>,
114    pub input: Bytes,
115}
116
117#[derive(Debug, Eq, PartialEq)]
118pub struct Target {
119    pub service: String,
120    pub handler: String,
121    pub key: Option<String>,
122    pub idempotency_key: Option<String>,
123    pub headers: Vec<Header>,
124}
125
126pub const CANCEL_NOTIFICATION_HANDLE: NotificationHandle = NotificationHandle(1);
127
128#[derive(Debug, Hash, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
129pub struct NotificationHandle(u32);
130
131impl From<u32> for NotificationHandle {
132    fn from(value: u32) -> Self {
133        NotificationHandle(value)
134    }
135}
136
137impl From<NotificationHandle> for u32 {
138    fn from(value: NotificationHandle) -> Self {
139        value.0
140    }
141}
142
143#[derive(Debug, Clone, Copy, Eq, PartialEq)]
144pub struct CallHandle {
145    pub invocation_id_notification_handle: NotificationHandle,
146    pub call_notification_handle: NotificationHandle,
147}
148
149#[derive(Debug, Clone, Copy, Eq, PartialEq)]
150pub struct SendHandle {
151    pub invocation_id_notification_handle: NotificationHandle,
152}
153
154#[derive(Debug, Eq, PartialEq, strum::IntoStaticStr)]
155pub enum Value {
156    /// a void/None/undefined success
157    Void,
158    Success(Bytes),
159    Failure(TerminalFailure),
160    /// Only returned for get_state_keys
161    StateKeys(Vec<String>),
162    /// Only returned for get_call_invocation_id
163    InvocationId(String),
164}
165
166/// Terminal failure
167#[derive(Debug, Clone, Eq, PartialEq)]
168pub struct TerminalFailure {
169    pub code: u16,
170    pub message: String,
171}
172
173#[derive(Debug, Default)]
174pub struct EntryRetryInfo {
175    /// Number of retries that happened so far for this entry.
176    pub retry_count: u32,
177    /// Time spent in the current retry loop.
178    pub retry_loop_duration: Duration,
179}
180
181#[derive(Debug, Clone)]
182pub enum RunExitResult {
183    Success(Bytes),
184    TerminalFailure(TerminalFailure),
185    RetryableFailure {
186        attempt_duration: Duration,
187        error: Error,
188    },
189}
190
191#[derive(Debug, Clone)]
192pub enum NonEmptyValue {
193    Success(Bytes),
194    Failure(TerminalFailure),
195}
196
197impl From<NonEmptyValue> for Value {
198    fn from(value: NonEmptyValue) -> Self {
199        match value {
200            NonEmptyValue::Success(s) => Value::Success(s),
201            NonEmptyValue::Failure(f) => Value::Failure(f),
202        }
203    }
204}
205
206#[derive(Debug, Eq, PartialEq)]
207pub enum AttachInvocationTarget {
208    InvocationId(String),
209    WorkflowId {
210        name: String,
211        key: String,
212    },
213    IdempotencyId {
214        service_name: String,
215        service_key: Option<String>,
216        handler_name: String,
217        idempotency_key: String,
218    },
219}
220
221#[derive(Debug, Eq, PartialEq)]
222pub enum TakeOutputResult {
223    Buffer(Bytes),
224    EOF,
225}
226
227pub type VMResult<T> = Result<T, Error>;
228
229#[derive(Debug)]
230pub enum ImplicitCancellationOption {
231    Disabled,
232    Enabled {
233        cancel_children_calls: bool,
234        cancel_children_one_way_calls: bool,
235    },
236}
237
238#[derive(Debug)]
239pub struct VMOptions {
240    pub implicit_cancellation: ImplicitCancellationOption,
241}
242
243impl Default for VMOptions {
244    fn default() -> Self {
245        Self {
246            implicit_cancellation: ImplicitCancellationOption::Enabled {
247                cancel_children_calls: true,
248                cancel_children_one_way_calls: false,
249            },
250        }
251    }
252}
253
254#[derive(Debug, PartialEq, Eq)]
255pub enum DoProgressResponse {
256    /// Any of the given AsyncResultHandle completed
257    AnyCompleted,
258    /// The SDK should read from input at this point
259    ReadFromInput,
260    /// The SDK should execute a pending run
261    ExecuteRun(NotificationHandle),
262    /// Any of the run given before with ExecuteRun is waiting for completion
263    WaitingPendingRun,
264    /// Returned only when [ImplicitCancellationOption::Enabled].
265    CancelSignalReceived,
266}
267
268pub trait VM: Sized {
269    fn new(request_headers: impl HeaderMap, options: VMOptions) -> VMResult<Self>;
270
271    fn get_response_head(&self) -> ResponseHead;
272
273    // --- Input stream
274
275    fn notify_input(&mut self, buffer: Bytes);
276
277    fn notify_input_closed(&mut self);
278
279    // --- Errors
280
281    fn notify_error(&mut self, error: Error, next_retry_delay: Option<Duration>);
282
283    // --- Output stream
284
285    fn take_output(&mut self) -> TakeOutputResult;
286
287    // --- Execution start waiting point
288
289    fn is_ready_to_execute(&self) -> VMResult<bool>;
290
291    // --- Async results
292
293    fn is_completed(&self, handle: NotificationHandle) -> bool;
294
295    fn do_progress(
296        &mut self,
297        any_handle: Vec<NotificationHandle>,
298    ) -> Result<DoProgressResponse, SuspendedOrVMError>;
299
300    fn take_notification(
301        &mut self,
302        handle: NotificationHandle,
303    ) -> Result<Option<Value>, SuspendedOrVMError>;
304
305    // --- Syscall(s)
306
307    fn sys_input(&mut self) -> VMResult<Input>;
308
309    fn sys_state_get(&mut self, key: String) -> VMResult<NotificationHandle>;
310
311    fn sys_state_get_keys(&mut self) -> VMResult<NotificationHandle>;
312
313    fn sys_state_set(&mut self, key: String, value: Bytes) -> VMResult<()>;
314
315    fn sys_state_clear(&mut self, key: String) -> VMResult<()>;
316
317    fn sys_state_clear_all(&mut self) -> VMResult<()>;
318
319    /// Note: `now_since_unix_epoch` is only used for debugging purposes
320    fn sys_sleep(
321        &mut self,
322        name: String,
323        wake_up_time_since_unix_epoch: Duration,
324        now_since_unix_epoch: Option<Duration>,
325    ) -> VMResult<NotificationHandle>;
326
327    fn sys_call(&mut self, target: Target, input: Bytes) -> VMResult<CallHandle>;
328
329    fn sys_send(
330        &mut self,
331        target: Target,
332        input: Bytes,
333        execution_time_since_unix_epoch: Option<Duration>,
334    ) -> VMResult<SendHandle>;
335
336    fn sys_awakeable(&mut self) -> VMResult<(String, NotificationHandle)>;
337
338    fn sys_complete_awakeable(&mut self, id: String, value: NonEmptyValue) -> VMResult<()>;
339
340    fn create_signal_handle(&mut self, signal_name: String) -> VMResult<NotificationHandle>;
341
342    fn sys_complete_signal(
343        &mut self,
344        target_invocation_id: String,
345        signal_name: String,
346        value: NonEmptyValue,
347    ) -> VMResult<()>;
348
349    fn sys_get_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
350
351    fn sys_peek_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
352
353    fn sys_complete_promise(
354        &mut self,
355        key: String,
356        value: NonEmptyValue,
357    ) -> VMResult<NotificationHandle>;
358
359    fn sys_run(&mut self, name: String) -> VMResult<NotificationHandle>;
360
361    fn propose_run_completion(
362        &mut self,
363        notification_handle: NotificationHandle,
364        value: RunExitResult,
365        retry_policy: RetryPolicy,
366    ) -> VMResult<()>;
367
368    fn sys_cancel_invocation(&mut self, target_invocation_id: String) -> VMResult<()>;
369
370    fn sys_attach_invocation(
371        &mut self,
372        target: AttachInvocationTarget,
373    ) -> VMResult<NotificationHandle>;
374
375    fn sys_get_invocation_output(
376        &mut self,
377        target: AttachInvocationTarget,
378    ) -> VMResult<NotificationHandle>;
379
380    fn sys_write_output(&mut self, value: NonEmptyValue) -> VMResult<()>;
381
382    fn sys_end(&mut self) -> VMResult<()>;
383
384    // Returns true if the state machine is waiting pre-flight to complete
385    fn is_waiting_preflight(&self) -> bool;
386
387    // Returns true if the state machine is replaying
388    fn is_replaying(&self) -> bool;
389
390    /// Returns true if the state machine is in processing state
391    fn is_processing(&self) -> bool;
392}
393
394// HOW TO USE THIS API
395//
396// pre_user_code:
397//     while !vm.is_ready_to_execute() {
398//         match io.read_input() {
399//             buffer => vm.notify_input(buffer),
400//             EOF => vm.notify_input_closed()
401//         }
402//     }
403//
404// sys_[something]:
405//     try {
406//         vm.sys_[something]()
407//         io.write_out(vm.take_output())
408//     } catch (e) {
409//         log(e)
410//         io.write_out(vm.take_output())
411//         throw e
412//     }
413//
414// await_restate_future:
415//     vm.notify_await_point(handle);
416//     loop {
417//         // Result here can be value, not_ready, suspended, vm error
418//         let result = vm.take_async_result(handle);
419//         if result.is_not_ready() {
420//             match await io.read_input() {
421//                buffer => vm.notify_input(buffer),
422//                EOF => vm.notify_input_closed()
423//             }
424//         }
425//         return result
426//     }
427//
428// post_user_code:
429//     // Consume vm.take_output() until EOF
430//     while buffer = vm.take_output() {
431//         io.write_out(buffer)
432//     }
433//     io.close()
434
435#[cfg(test)]
436mod tests;