1mod headers;
2#[cfg(feature = "request_identity")]
3mod request_identity;
4mod retries;
5mod service_protocol;
6mod vm;
7
8use bytes::Bytes;
9use std::borrow::Cow;
10use std::time::Duration;
11
12pub use crate::retries::RetryPolicy;
13pub use headers::HeaderMap;
14#[cfg(feature = "request_identity")]
15pub use request_identity::*;
16pub use service_protocol::Version;
17pub use vm::CoreVM;
18
19pub mod error {
21 pub use crate::vm::errors::codes;
22 pub use crate::vm::errors::InvocationErrorCode;
23}
24
25#[derive(Debug, Eq, PartialEq)]
26pub struct Header {
27 pub key: Cow<'static, str>,
28 pub value: Cow<'static, str>,
29}
30
31#[derive(Debug)]
32pub struct ResponseHead {
33 pub status_code: u16,
34 pub headers: Vec<Header>,
35 pub version: Version,
36}
37
38#[derive(Debug, Clone, Copy, thiserror::Error)]
39#[error("Suspended execution")]
40pub struct SuspendedError;
41
42#[derive(Debug, Clone, thiserror::Error)]
43#[error("State machine error [{code}]: {message}. Stacktrace: {stacktrace}")]
44pub struct Error {
45 code: u16,
46 message: Cow<'static, str>,
47 stacktrace: Cow<'static, str>,
48}
49
50impl Error {
51 pub fn new(code: impl Into<u16>, message: impl Into<Cow<'static, str>>) -> Self {
52 Error {
53 code: code.into(),
54 message: message.into(),
55 stacktrace: Default::default(),
56 }
57 }
58
59 pub fn internal(message: impl Into<Cow<'static, str>>) -> Self {
60 Self::new(error::codes::INTERNAL, message)
61 }
62
63 pub fn code(&self) -> u16 {
64 self.code
65 }
66
67 pub fn message(&self) -> &str {
68 &self.message
69 }
70
71 pub fn description(&self) -> &str {
72 &self.stacktrace
73 }
74
75 pub fn with_stacktrace(mut self, stacktrace: impl Into<Cow<'static, str>>) -> Self {
76 self.stacktrace = stacktrace.into();
77 self
78 }
79
80 pub fn append_description_for_code(
82 mut self,
83 code: impl Into<u16>,
84 description: impl Into<Cow<'static, str>>,
85 ) -> Self {
86 let c = code.into();
87 if self.code == c {
88 if self.stacktrace.is_empty() {
89 self.stacktrace = description.into();
90 } else {
91 self.stacktrace = format!("{}. {}", self.stacktrace, description.into()).into();
92 }
93 self
94 } else {
95 self
96 }
97 }
98}
99
100#[derive(Debug, Clone, thiserror::Error)]
101pub enum SuspendedOrVMError {
102 #[error(transparent)]
103 Suspended(SuspendedError),
104 #[error(transparent)]
105 VM(Error),
106}
107
108#[derive(Debug, Eq, PartialEq)]
109pub struct Input {
110 pub invocation_id: String,
111 pub random_seed: u64,
112 pub key: String,
113 pub headers: Vec<Header>,
114 pub input: Bytes,
115}
116
117#[derive(Debug, Eq, PartialEq)]
118pub struct Target {
119 pub service: String,
120 pub handler: String,
121 pub key: Option<String>,
122 pub idempotency_key: Option<String>,
123 pub headers: Vec<Header>,
124}
125
126pub const CANCEL_NOTIFICATION_HANDLE: NotificationHandle = NotificationHandle(1);
127
128#[derive(Debug, Hash, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
129pub struct NotificationHandle(u32);
130
131impl From<u32> for NotificationHandle {
132 fn from(value: u32) -> Self {
133 NotificationHandle(value)
134 }
135}
136
137impl From<NotificationHandle> for u32 {
138 fn from(value: NotificationHandle) -> Self {
139 value.0
140 }
141}
142
143#[derive(Debug, Clone, Copy, Eq, PartialEq)]
144pub struct CallHandle {
145 pub invocation_id_notification_handle: NotificationHandle,
146 pub call_notification_handle: NotificationHandle,
147}
148
149#[derive(Debug, Clone, Copy, Eq, PartialEq)]
150pub struct SendHandle {
151 pub invocation_id_notification_handle: NotificationHandle,
152}
153
154#[derive(Debug, Eq, PartialEq, strum::IntoStaticStr)]
155pub enum Value {
156 Void,
158 Success(Bytes),
159 Failure(TerminalFailure),
160 StateKeys(Vec<String>),
162 InvocationId(String),
164}
165
166#[derive(Debug, Clone, Eq, PartialEq)]
168pub struct TerminalFailure {
169 pub code: u16,
170 pub message: String,
171}
172
173#[derive(Debug, Default)]
174pub struct EntryRetryInfo {
175 pub retry_count: u32,
177 pub retry_loop_duration: Duration,
179}
180
181#[derive(Debug, Clone)]
182pub enum RunExitResult {
183 Success(Bytes),
184 TerminalFailure(TerminalFailure),
185 RetryableFailure {
186 attempt_duration: Duration,
187 error: Error,
188 },
189}
190
191#[derive(Debug, Clone)]
192pub enum NonEmptyValue {
193 Success(Bytes),
194 Failure(TerminalFailure),
195}
196
197impl From<NonEmptyValue> for Value {
198 fn from(value: NonEmptyValue) -> Self {
199 match value {
200 NonEmptyValue::Success(s) => Value::Success(s),
201 NonEmptyValue::Failure(f) => Value::Failure(f),
202 }
203 }
204}
205
206#[derive(Debug, Eq, PartialEq)]
207pub enum AttachInvocationTarget {
208 InvocationId(String),
209 WorkflowId {
210 name: String,
211 key: String,
212 },
213 IdempotencyId {
214 service_name: String,
215 service_key: Option<String>,
216 handler_name: String,
217 idempotency_key: String,
218 },
219}
220
221#[derive(Debug, Eq, PartialEq)]
222pub enum TakeOutputResult {
223 Buffer(Bytes),
224 EOF,
225}
226
227pub type VMResult<T> = Result<T, Error>;
228
229#[derive(Debug)]
230pub enum ImplicitCancellationOption {
231 Disabled,
232 Enabled {
233 cancel_children_calls: bool,
234 cancel_children_one_way_calls: bool,
235 },
236}
237
238#[derive(Debug)]
239pub struct VMOptions {
240 pub implicit_cancellation: ImplicitCancellationOption,
241}
242
243impl Default for VMOptions {
244 fn default() -> Self {
245 Self {
246 implicit_cancellation: ImplicitCancellationOption::Enabled {
247 cancel_children_calls: true,
248 cancel_children_one_way_calls: false,
249 },
250 }
251 }
252}
253
254#[derive(Debug, PartialEq, Eq)]
255pub enum DoProgressResponse {
256 AnyCompleted,
258 ReadFromInput,
260 ExecuteRun(NotificationHandle),
262 WaitingPendingRun,
264 CancelSignalReceived,
266}
267
268pub trait VM: Sized {
269 fn new(request_headers: impl HeaderMap, options: VMOptions) -> VMResult<Self>;
270
271 fn get_response_head(&self) -> ResponseHead;
272
273 fn notify_input(&mut self, buffer: Bytes);
276
277 fn notify_input_closed(&mut self);
278
279 fn notify_error(&mut self, error: Error, next_retry_delay: Option<Duration>);
282
283 fn take_output(&mut self) -> TakeOutputResult;
286
287 fn is_ready_to_execute(&self) -> VMResult<bool>;
290
291 fn is_completed(&self, handle: NotificationHandle) -> bool;
294
295 fn do_progress(
296 &mut self,
297 any_handle: Vec<NotificationHandle>,
298 ) -> Result<DoProgressResponse, SuspendedOrVMError>;
299
300 fn take_notification(
301 &mut self,
302 handle: NotificationHandle,
303 ) -> Result<Option<Value>, SuspendedOrVMError>;
304
305 fn sys_input(&mut self) -> VMResult<Input>;
308
309 fn sys_state_get(&mut self, key: String) -> VMResult<NotificationHandle>;
310
311 fn sys_state_get_keys(&mut self) -> VMResult<NotificationHandle>;
312
313 fn sys_state_set(&mut self, key: String, value: Bytes) -> VMResult<()>;
314
315 fn sys_state_clear(&mut self, key: String) -> VMResult<()>;
316
317 fn sys_state_clear_all(&mut self) -> VMResult<()>;
318
319 fn sys_sleep(
321 &mut self,
322 name: String,
323 wake_up_time_since_unix_epoch: Duration,
324 now_since_unix_epoch: Option<Duration>,
325 ) -> VMResult<NotificationHandle>;
326
327 fn sys_call(&mut self, target: Target, input: Bytes) -> VMResult<CallHandle>;
328
329 fn sys_send(
330 &mut self,
331 target: Target,
332 input: Bytes,
333 execution_time_since_unix_epoch: Option<Duration>,
334 ) -> VMResult<SendHandle>;
335
336 fn sys_awakeable(&mut self) -> VMResult<(String, NotificationHandle)>;
337
338 fn sys_complete_awakeable(&mut self, id: String, value: NonEmptyValue) -> VMResult<()>;
339
340 fn create_signal_handle(&mut self, signal_name: String) -> VMResult<NotificationHandle>;
341
342 fn sys_complete_signal(
343 &mut self,
344 target_invocation_id: String,
345 signal_name: String,
346 value: NonEmptyValue,
347 ) -> VMResult<()>;
348
349 fn sys_get_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
350
351 fn sys_peek_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
352
353 fn sys_complete_promise(
354 &mut self,
355 key: String,
356 value: NonEmptyValue,
357 ) -> VMResult<NotificationHandle>;
358
359 fn sys_run(&mut self, name: String) -> VMResult<NotificationHandle>;
360
361 fn propose_run_completion(
362 &mut self,
363 notification_handle: NotificationHandle,
364 value: RunExitResult,
365 retry_policy: RetryPolicy,
366 ) -> VMResult<()>;
367
368 fn sys_cancel_invocation(&mut self, target_invocation_id: String) -> VMResult<()>;
369
370 fn sys_attach_invocation(
371 &mut self,
372 target: AttachInvocationTarget,
373 ) -> VMResult<NotificationHandle>;
374
375 fn sys_get_invocation_output(
376 &mut self,
377 target: AttachInvocationTarget,
378 ) -> VMResult<NotificationHandle>;
379
380 fn sys_write_output(&mut self, value: NonEmptyValue) -> VMResult<()>;
381
382 fn sys_end(&mut self) -> VMResult<()>;
383
384 fn is_waiting_preflight(&self) -> bool;
386
387 fn is_replaying(&self) -> bool;
389
390 fn is_processing(&self) -> bool;
392}
393
394#[cfg(test)]
436mod tests;