1pub mod error;
2pub mod fmt;
3mod headers;
4#[cfg(feature = "request_identity")]
5mod request_identity;
6mod retries;
7mod service_protocol;
8mod vm;
9
10use bytes::Bytes;
11use std::borrow::Cow;
12use std::time::Duration;
13
14pub use crate::retries::RetryPolicy;
15pub use error::Error;
16pub use headers::HeaderMap;
17#[cfg(feature = "request_identity")]
18pub use request_identity::*;
19pub use service_protocol::Version;
20pub use vm::CoreVM;
21
22#[derive(Debug, Clone, Copy, Default)]
26pub struct PayloadOptions {
27 pub unstable_serialization: bool,
30}
31
32impl PayloadOptions {
33 pub fn stable() -> Self {
35 Self {
36 unstable_serialization: false,
37 }
38 }
39
40 pub fn unstable() -> Self {
43 Self {
44 unstable_serialization: true,
45 }
46 }
47}
48
49#[derive(Debug, Eq, PartialEq)]
50pub struct Header {
51 pub key: Cow<'static, str>,
52 pub value: Cow<'static, str>,
53}
54
55#[derive(Debug)]
56pub struct ResponseHead {
57 pub status_code: u16,
58 pub headers: Vec<Header>,
59 pub version: Version,
60}
61
62#[derive(Debug, Eq, PartialEq)]
63pub struct Input {
64 pub invocation_id: String,
65 pub random_seed: u64,
66 pub key: String,
67 pub headers: Vec<Header>,
68 pub input: Bytes,
69}
70
71#[derive(Debug, Copy, Clone, Eq, PartialEq)]
72pub enum CommandType {
73 Input,
74 Output,
75 GetState,
76 GetStateKeys,
77 SetState,
78 ClearState,
79 ClearAllState,
80 GetPromise,
81 PeekPromise,
82 CompletePromise,
83 Sleep,
84 Call,
85 OneWayCall,
86 SendSignal,
87 Run,
88 AttachInvocation,
89 GetInvocationOutput,
90 CompleteAwakeable,
91 CancelInvocation,
92}
93
94impl std::fmt::Display for CommandType {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 write!(f, "{}", fmt::format_command_ty(*self))
97 }
98}
99
100#[derive(Debug, Clone, Eq, PartialEq)]
102pub enum CommandRelationship {
103 Last,
104 Next {
105 ty: CommandType,
106 name: Option<Cow<'static, str>>,
107 },
108 Specific {
109 command_index: u32,
110 ty: CommandType,
111 name: Option<Cow<'static, str>>,
112 },
113}
114
115#[derive(Debug, Eq, PartialEq)]
116pub struct Target {
117 pub service: String,
118 pub handler: String,
119 pub key: Option<String>,
120 pub idempotency_key: Option<String>,
121 pub headers: Vec<Header>,
122}
123
124pub const CANCEL_NOTIFICATION_HANDLE: NotificationHandle = NotificationHandle(1);
125
126#[derive(Debug, Hash, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
127pub struct NotificationHandle(u32);
128
129impl From<u32> for NotificationHandle {
130 fn from(value: u32) -> Self {
131 NotificationHandle(value)
132 }
133}
134
135impl From<NotificationHandle> for u32 {
136 fn from(value: NotificationHandle) -> Self {
137 value.0
138 }
139}
140
141#[derive(Debug, Clone, Copy, Eq, PartialEq)]
142pub struct CallHandle {
143 pub invocation_id_notification_handle: NotificationHandle,
144 pub call_notification_handle: NotificationHandle,
145}
146
147#[derive(Debug, Clone, Copy, Eq, PartialEq)]
148pub struct SendHandle {
149 pub invocation_id_notification_handle: NotificationHandle,
150}
151
152#[derive(Debug, Eq, PartialEq, strum::IntoStaticStr)]
153pub enum Value {
154 Void,
156 Success(Bytes),
157 Failure(TerminalFailure),
158 StateKeys(Vec<String>),
160 InvocationId(String),
162}
163
164#[derive(Debug, Clone, Eq, PartialEq)]
166pub struct TerminalFailure {
167 pub code: u16,
168 pub message: String,
169 pub metadata: Vec<(String, String)>,
170}
171
172#[derive(Debug, Default)]
173pub struct EntryRetryInfo {
174 pub retry_count: u32,
176 pub retry_loop_duration: Duration,
178}
179
180#[derive(Debug, Clone)]
181pub enum RunExitResult {
182 Success(Bytes),
183 TerminalFailure(TerminalFailure),
184 RetryableFailure {
185 attempt_duration: Duration,
186 error: Error,
187 },
188}
189
190#[derive(Debug, Clone)]
191pub enum NonEmptyValue {
192 Success(Bytes),
193 Failure(TerminalFailure),
194}
195
196impl From<NonEmptyValue> for Value {
197 fn from(value: NonEmptyValue) -> Self {
198 match value {
199 NonEmptyValue::Success(s) => Value::Success(s),
200 NonEmptyValue::Failure(f) => Value::Failure(f),
201 }
202 }
203}
204
205#[derive(Debug, Eq, PartialEq)]
206pub enum AttachInvocationTarget {
207 InvocationId(String),
208 WorkflowId {
209 name: String,
210 key: String,
211 },
212 IdempotencyId {
213 service_name: String,
214 service_key: Option<String>,
215 handler_name: String,
216 idempotency_key: String,
217 },
218}
219
220#[derive(Debug, Eq, PartialEq)]
221pub enum TakeOutputResult {
222 Buffer(Bytes),
223 EOF,
224}
225
226pub type VMResult<T> = Result<T, Error>;
227
228#[derive(Debug)]
229pub enum ImplicitCancellationOption {
230 Disabled,
231 Enabled {
232 cancel_children_calls: bool,
233 cancel_children_one_way_calls: bool,
234 },
235}
236
237#[derive(Debug, Default)]
238pub enum NonDeterministicChecksOption {
239 PayloadChecksDisabled,
242 #[default]
243 Enabled,
244}
245
246#[derive(Debug)]
247pub struct VMOptions {
248 pub implicit_cancellation: ImplicitCancellationOption,
249 pub non_determinism_checks: NonDeterministicChecksOption,
250}
251
252impl Default for VMOptions {
253 fn default() -> Self {
254 Self {
255 implicit_cancellation: ImplicitCancellationOption::Enabled {
256 cancel_children_calls: true,
257 cancel_children_one_way_calls: false,
258 },
259 non_determinism_checks: NonDeterministicChecksOption::Enabled,
260 }
261 }
262}
263
264#[derive(Debug, PartialEq, Eq)]
265pub enum DoProgressResponse {
266 AnyCompleted,
268 ReadFromInput,
270 ExecuteRun(NotificationHandle),
272 WaitingPendingRun,
274 CancelSignalReceived,
276}
277
278pub trait VM: Sized {
279 fn new(request_headers: impl HeaderMap, options: VMOptions) -> VMResult<Self>;
280
281 fn get_response_head(&self) -> ResponseHead;
282
283 fn notify_input(&mut self, buffer: Bytes);
286
287 fn notify_input_closed(&mut self);
288
289 fn notify_error(&mut self, error: Error, related_command: Option<CommandRelationship>);
292
293 fn take_output(&mut self) -> TakeOutputResult;
296
297 fn is_ready_to_execute(&self) -> VMResult<bool>;
300
301 fn is_completed(&self, handle: NotificationHandle) -> bool;
304
305 fn do_progress(&mut self, any_handle: Vec<NotificationHandle>) -> VMResult<DoProgressResponse>;
306
307 fn take_notification(&mut self, handle: NotificationHandle) -> VMResult<Option<Value>>;
308
309 fn sys_input(&mut self) -> VMResult<Input>;
312
313 fn sys_state_get(
314 &mut self,
315 key: String,
316 options: PayloadOptions,
317 ) -> VMResult<NotificationHandle>;
318
319 fn sys_state_get_keys(&mut self) -> VMResult<NotificationHandle>;
320
321 fn sys_state_set(&mut self, key: String, value: Bytes, options: PayloadOptions)
322 -> VMResult<()>;
323
324 fn sys_state_clear(&mut self, key: String) -> VMResult<()>;
325
326 fn sys_state_clear_all(&mut self) -> VMResult<()>;
327
328 fn sys_sleep(
330 &mut self,
331 name: String,
332 wake_up_time_since_unix_epoch: Duration,
333 now_since_unix_epoch: Option<Duration>,
334 ) -> VMResult<NotificationHandle>;
335
336 fn sys_call(
337 &mut self,
338 target: Target,
339 input: Bytes,
340 name: Option<String>,
341 options: PayloadOptions,
342 ) -> VMResult<CallHandle>;
343
344 fn sys_send(
345 &mut self,
346 target: Target,
347 input: Bytes,
348 execution_time_since_unix_epoch: Option<Duration>,
349 name: Option<String>,
350 options: PayloadOptions,
351 ) -> VMResult<SendHandle>;
352
353 fn sys_awakeable(&mut self) -> VMResult<(String, NotificationHandle)>;
354
355 fn sys_complete_awakeable(
356 &mut self,
357 id: String,
358 value: NonEmptyValue,
359 options: PayloadOptions,
360 ) -> VMResult<()>;
361
362 fn create_signal_handle(&mut self, signal_name: String) -> VMResult<NotificationHandle>;
363
364 fn sys_complete_signal(
365 &mut self,
366 target_invocation_id: String,
367 signal_name: String,
368 value: NonEmptyValue,
369 ) -> VMResult<()>;
370
371 fn sys_get_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
372
373 fn sys_peek_promise(&mut self, key: String) -> VMResult<NotificationHandle>;
374
375 fn sys_complete_promise(
376 &mut self,
377 key: String,
378 value: NonEmptyValue,
379 options: PayloadOptions,
380 ) -> VMResult<NotificationHandle>;
381
382 fn sys_run(&mut self, name: String) -> VMResult<NotificationHandle>;
383
384 fn propose_run_completion(
385 &mut self,
386 notification_handle: NotificationHandle,
387 value: RunExitResult,
388 retry_policy: RetryPolicy,
389 ) -> VMResult<()>;
390
391 fn sys_cancel_invocation(&mut self, target_invocation_id: String) -> VMResult<()>;
392
393 fn sys_attach_invocation(
394 &mut self,
395 target: AttachInvocationTarget,
396 ) -> VMResult<NotificationHandle>;
397
398 fn sys_get_invocation_output(
399 &mut self,
400 target: AttachInvocationTarget,
401 ) -> VMResult<NotificationHandle>;
402
403 fn sys_write_output(&mut self, value: NonEmptyValue, options: PayloadOptions) -> VMResult<()>;
404
405 fn sys_end(&mut self) -> VMResult<()>;
406
407 fn is_waiting_preflight(&self) -> bool;
409
410 fn is_replaying(&self) -> bool;
412
413 fn is_processing(&self) -> bool;
415
416 fn last_command_index(&self) -> i64;
418}
419
420#[cfg(test)]
462mod tests;