Skip to main content

temporalio_workflow/runtime/
model.rs

1//! Runtime protocol and execution model types shared by workflow code and native hosts.
2
3use crate::{
4    runtime::types::ContinueAsNewRequest,
5    workflow_context::{
6        ChildWfCommon, NexusUnblockData, PendingChildWorkflow, StartedNexusOperation,
7    },
8};
9use temporalio_common_wasm::{
10    WorkflowDefinition,
11    error::{
12        ActivityExecutionError, ApplicationFailure, ChildWorkflowExecutionError,
13        WorkflowSignalError,
14    },
15    protos::{
16        coresdk::{
17            activity_result::ActivityResolution,
18            child_workflow::ChildWorkflowResult,
19            nexus::NexusOperationResult,
20            workflow_activation::{
21                resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
22                resolve_nexus_operation_start,
23            },
24        },
25        temporal::api::failure::v1::Failure,
26    },
27};
28
29#[derive(Debug)]
30pub enum UnblockEvent {
31    Timer(u32, TimerResult),
32    Activity(u32, Box<ActivityResolution>),
33    WorkflowStart(u32, Box<ChildWorkflowStartStatus>),
34    WorkflowComplete(u32, Box<ChildWorkflowResult>),
35    SignalExternal(u32, Option<Failure>),
36    CancelExternal(u32, Option<Failure>),
37    NexusOperationStart(u32, Box<resolve_nexus_operation_start::Status>),
38    NexusOperationComplete(u32, Box<NexusOperationResult>),
39}
40
41/// Result of awaiting on a timer
42#[derive(Debug, Copy, Clone, PartialEq, Eq)]
43pub enum TimerResult {
44    /// The timer was cancelled
45    Cancelled,
46    /// The timer elapsed and fired
47    Fired,
48}
49
50/// Successful result of sending a signal to an external workflow
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub struct SignalExternalOk;
53/// Result of awaiting on sending a signal to an external workflow
54pub type SignalExternalWfResult = Result<SignalExternalOk, Failure>;
55
56/// Successful result of sending a cancel request to an external workflow
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub struct CancelExternalOk;
59/// Result of awaiting on sending a cancel request to an external workflow
60pub type CancelExternalWfResult = Result<CancelExternalOk, Failure>;
61
62pub(crate) trait Unblockable {
63    type OtherDat;
64
65    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self;
66}
67
68impl Unblockable for TimerResult {
69    type OtherDat = ();
70
71    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
72        match ue {
73            UnblockEvent::Timer(_, result) => result,
74            _ => panic!("Invalid unblock event for timer"),
75        }
76    }
77}
78
79impl Unblockable for ActivityResolution {
80    type OtherDat = ();
81
82    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
83        match ue {
84            UnblockEvent::Activity(_, result) => *result,
85            _ => panic!("Invalid unblock event for activity"),
86        }
87    }
88}
89
90impl<WD: WorkflowDefinition> Unblockable for PendingChildWorkflow<WD> {
91    type OtherDat = ChildWfCommon;
92
93    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
94        match ue {
95            UnblockEvent::WorkflowStart(_, result) => Self {
96                status: *result,
97                common: od,
98                _phantom: std::marker::PhantomData,
99            },
100            _ => panic!("Invalid unblock event for child workflow start"),
101        }
102    }
103}
104
105impl Unblockable for ChildWorkflowResult {
106    type OtherDat = ();
107
108    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
109        match ue {
110            UnblockEvent::WorkflowComplete(_, result) => *result,
111            _ => panic!("Invalid unblock event for child workflow complete"),
112        }
113    }
114}
115
116impl Unblockable for SignalExternalWfResult {
117    type OtherDat = ();
118
119    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
120        match ue {
121            UnblockEvent::SignalExternal(_, maybefail) => {
122                maybefail.map_or(Ok(SignalExternalOk), Err)
123            }
124            _ => panic!("Invalid unblock event for signal external workflow result"),
125        }
126    }
127}
128
129impl Unblockable for CancelExternalWfResult {
130    type OtherDat = ();
131
132    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
133        match ue {
134            UnblockEvent::CancelExternal(_, maybefail) => {
135                maybefail.map_or(Ok(CancelExternalOk), Err)
136            }
137            _ => panic!("Invalid unblock event for cancel external workflow result"),
138        }
139    }
140}
141
142pub(crate) type NexusStartResult = Result<StartedNexusOperation, Failure>;
143
144impl Unblockable for NexusStartResult {
145    type OtherDat = NexusUnblockData;
146
147    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
148        match ue {
149            UnblockEvent::NexusOperationStart(_, result) => match *result {
150                resolve_nexus_operation_start::Status::OperationToken(op_token) => {
151                    Ok(StartedNexusOperation {
152                        operation_token: Some(op_token),
153                        unblock_dat: od,
154                    })
155                }
156                resolve_nexus_operation_start::Status::StartedSync(_) => {
157                    Ok(StartedNexusOperation {
158                        operation_token: None,
159                        unblock_dat: od,
160                    })
161                }
162                resolve_nexus_operation_start::Status::Failed(f) => Err(f),
163            },
164            _ => panic!("Invalid unblock event for nexus operation"),
165        }
166    }
167}
168
169impl Unblockable for NexusOperationResult {
170    type OtherDat = ();
171
172    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
173        match ue {
174            UnblockEvent::NexusOperationComplete(_, result) => *result,
175            _ => panic!("Invalid unblock event for nexus operation complete"),
176        }
177    }
178}
179
180#[derive(Debug, Clone)]
181pub enum CancellableID {
182    Timer(u32),
183    Activity(u32),
184    LocalActivity(u32),
185    ChildWorkflow { seqnum: u32, reason: String },
186    SignalExternalWorkflow(u32),
187    NexusOp(u32),
188}
189
190impl CancellableID {
191    pub(crate) fn with_reason(self, reason: String) -> Self {
192        match self {
193            CancellableID::ChildWorkflow { seqnum, .. } => {
194                CancellableID::ChildWorkflow { seqnum, reason }
195            }
196            other => other,
197        }
198    }
199}
200
201/// The result of running a workflow.
202pub type WorkflowResult<T> = Result<T, WorkflowTermination>;
203
204/// Represents ways a workflow can terminate without producing a normal result.
205#[derive(Debug, thiserror::Error)]
206pub enum WorkflowTermination {
207    #[error("Workflow cancelled")]
208    Cancelled,
209    #[error("Workflow evicted from cache")]
210    Evicted,
211    #[error("Continue as new")]
212    ContinueAsNew(Box<ContinueAsNewRequest>),
213    #[error("Workflow failed: {0}")]
214    Failed(#[source] temporalio_common_wasm::error::OutgoingWorkflowError),
215}
216
217impl WorkflowTermination {
218    pub fn continue_as_new(can: ContinueAsNewRequest) -> Self {
219        Self::ContinueAsNew(Box::new(can))
220    }
221
222    /// Construct a [`WorkflowTermination::Failed`] from an [`ApplicationFailure`].
223    pub fn failed_application(err: ApplicationFailure) -> Self {
224        Self::Failed(err.into())
225    }
226}
227
228impl From<anyhow::Error> for WorkflowTermination {
229    fn from(err: anyhow::Error) -> Self {
230        Self::Failed(err.into())
231    }
232}
233
234impl From<ApplicationFailure> for WorkflowTermination {
235    fn from(value: ApplicationFailure) -> Self {
236        Self::Failed(value.into())
237    }
238}
239
240impl From<temporalio_common_wasm::data_converters::PayloadConversionError> for WorkflowTermination {
241    fn from(value: temporalio_common_wasm::data_converters::PayloadConversionError) -> Self {
242        Self::Failed(value.into())
243    }
244}
245
246impl From<crate::runtime::entry::WorkflowError> for WorkflowTermination {
247    fn from(value: crate::runtime::entry::WorkflowError) -> Self {
248        match value {
249            crate::runtime::entry::WorkflowError::PayloadConversion(err) => Self::from(err),
250            crate::runtime::entry::WorkflowError::Execution(err) => Self::Failed(
251                temporalio_common_wasm::error::OutgoingWorkflowError::Application(Box::new(
252                    ApplicationFailure::new(err),
253                )),
254            ),
255        }
256    }
257}
258
259impl From<ActivityExecutionError> for WorkflowTermination {
260    fn from(value: ActivityExecutionError) -> Self {
261        Self::Failed(value.into())
262    }
263}
264
265impl From<ChildWorkflowExecutionError> for WorkflowTermination {
266    fn from(value: ChildWorkflowExecutionError) -> Self {
267        Self::Failed(value.into())
268    }
269}
270
271impl From<WorkflowSignalError> for WorkflowTermination {
272    fn from(value: WorkflowSignalError) -> Self {
273        Self::Failed(value.into())
274    }
275}
276
277impl From<temporalio_common_wasm::error::ChildWorkflowStartError> for WorkflowTermination {
278    fn from(value: temporalio_common_wasm::error::ChildWorkflowStartError) -> Self {
279        Self::Failed(value.into())
280    }
281}