Skip to main content

temporalio_workflow/runtime/
entry.rs

1//! Runtime entry traits implemented by workflow definitions and message handlers.
2
3use crate::{
4    SyncWorkflowContext, WorkflowContext, WorkflowContextView,
5    runtime::{model::WorkflowTermination, types::WorkflowDefinitionDescriptor},
6};
7use futures_util::future::{FutureExt, LocalBoxFuture};
8use std::any::Any;
9use temporalio_common_wasm::{
10    QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
11    data_converters::{
12        GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
13        SerializationContextData, TemporalSerializable,
14    },
15    protos::temporal::api::{
16        common::v1::{Payload, Payloads},
17        failure::v1::Failure,
18    },
19};
20
21/// Error type for workflow operations
22#[derive(Debug, thiserror::Error)]
23pub enum WorkflowError {
24    /// Error during payload conversion
25    #[error("Payload conversion error: {0}")]
26    PayloadConversion(#[from] PayloadConversionError),
27
28    /// Workflow execution error
29    #[error("Workflow execution error: {0}")]
30    Execution(#[from] Box<dyn std::error::Error + Send + Sync>),
31}
32
33impl From<WorkflowError> for Failure {
34    fn from(err: WorkflowError) -> Self {
35        Failure {
36            message: err.to_string(),
37            ..Default::default()
38        }
39    }
40}
41
42fn downcast_handler_input<T: Any>(input: Box<dyn Any>, handler_kind: &'static str) -> T {
43    *input.downcast::<T>().unwrap_or_else(|_| {
44        panic!("typed {handler_kind} dispatch received input with wrong concrete type")
45    })
46}
47
48/// Trait implemented by workflow structs to enable execution by the worker.
49///
50/// This trait is typically generated by the `#[workflow_methods]` macro and should not
51/// be implemented manually in most cases.
52pub trait WorkflowImplementation: Sized + 'static {
53    /// The marker struct for the run method that implements `WorkflowDefinition`
54    type Run: WorkflowDefinition;
55
56    /// Whether this workflow has a user-defined `#[init]` method.
57    /// Set to `true` by the macro when `#[init]` is present, `false` otherwise.
58    const HAS_INIT: bool;
59
60    /// Whether the init method accepts the workflow input.
61    /// If true, input goes to init. If false, input goes to run.
62    const INIT_TAKES_INPUT: bool;
63
64    /// Returns the workflow type name.
65    fn name() -> &'static str;
66
67    /// Returns the exported workflow definition metadata for this workflow.
68    fn definition() -> WorkflowDefinitionDescriptor;
69
70    /// Initialize the workflow instance.
71    fn init(
72        ctx: WorkflowContextView,
73        input: Option<<Self::Run as WorkflowDefinition>::Input>,
74    ) -> Self;
75
76    /// Execute the workflow's main run function.
77    fn run(
78        ctx: WorkflowContext<Self>,
79        input: Option<<Self::Run as WorkflowDefinition>::Input>,
80    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>;
81
82    /// Decode a signal's payloads into that signal handler's concrete input type.
83    fn decode_signal_input(
84        _name: &str,
85        _payloads: Payloads,
86        _converter: &PayloadConverter,
87    ) -> Result<Option<Box<dyn Any>>, WorkflowError> {
88        Ok(None)
89    }
90
91    /// Dispatch a signal using an already decoded input value.
92    fn dispatch_signal(
93        ctx: WorkflowContext<Self>,
94        name: &str,
95        input: Box<dyn Any>,
96    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>>;
97
98    /// Decode a query's payloads into that query handler's concrete input type.
99    fn decode_query_input(
100        _name: &str,
101        _payloads: &Payloads,
102        _converter: &PayloadConverter,
103    ) -> Result<Option<Box<dyn Any>>, WorkflowError> {
104        Ok(None)
105    }
106
107    /// Dispatch a query using an already decoded input value.
108    fn dispatch_query(
109        &self,
110        ctx: WorkflowContextView,
111        name: &str,
112        input: Box<dyn Any>,
113        converter: &PayloadConverter,
114    ) -> Result<Payload, WorkflowError>;
115
116    /// Decode an update's payloads into that update handler's concrete input type.
117    fn decode_update_input(
118        _name: &str,
119        _payloads: Payloads,
120        _converter: &PayloadConverter,
121    ) -> Result<Option<Box<dyn Any>>, WorkflowError> {
122        Ok(None)
123    }
124
125    /// Dispatch an update using an already decoded input value.
126    fn dispatch_update(
127        ctx: WorkflowContext<Self>,
128        name: &str,
129        input: Box<dyn Any>,
130        converter: &PayloadConverter,
131    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>>;
132
133    /// Validate an update using an already decoded input value.
134    fn validate_update(
135        &self,
136        ctx: WorkflowContextView,
137        name: &str,
138        input: Box<dyn Any>,
139    ) -> Result<(), WorkflowError>;
140}
141
142/// Trait for executing synchronous signal handlers on a workflow.
143pub trait ExecutableSyncSignal<S: SignalDefinition>: WorkflowImplementation {
144    /// Handle an incoming signal with the given input.
145    fn handle(&mut self, ctx: &mut SyncWorkflowContext<Self>, input: S::Input);
146
147    /// Dispatch the signal with an already decoded input.
148    fn dispatch(
149        ctx: WorkflowContext<Self>,
150        input: Box<dyn Any>,
151    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
152        let input = downcast_handler_input::<S::Input>(input, "signal");
153        let mut sync_ctx = ctx.sync_context();
154        ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
155        std::future::ready(Ok(())).boxed_local()
156    }
157}
158
159/// Trait for executing asynchronous signal handlers on a workflow.
160pub trait ExecutableAsyncSignal<S: SignalDefinition>: WorkflowImplementation {
161    /// Handle an incoming signal with the given input.
162    fn handle(ctx: WorkflowContext<Self>, input: S::Input) -> LocalBoxFuture<'static, ()>;
163
164    /// Dispatch the signal with an already decoded input.
165    fn dispatch(
166        ctx: WorkflowContext<Self>,
167        input: Box<dyn Any>,
168    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
169        let input = downcast_handler_input::<S::Input>(input, "signal");
170        Self::handle(ctx, input).map(|()| Ok(())).boxed_local()
171    }
172}
173
174/// Trait for executing query handlers on a workflow.
175pub trait ExecutableQuery<Q: QueryDefinition>: WorkflowImplementation {
176    /// Handle a query with the given input and return the result.
177    fn handle(
178        &self,
179        ctx: &WorkflowContextView,
180        input: Q::Input,
181    ) -> Result<Q::Output, Box<dyn std::error::Error + Send + Sync>>;
182
183    /// Dispatch the query with an already decoded input.
184    fn dispatch(
185        &self,
186        ctx: &WorkflowContextView,
187        input: Box<dyn Any>,
188        converter: &PayloadConverter,
189    ) -> Result<Payload, WorkflowError> {
190        let input = downcast_handler_input::<Q::Input>(input, "query");
191        let output = self.handle(ctx, input).map_err(WorkflowError::Execution)?;
192        serialize_output(&output, converter)
193    }
194}
195
196/// Trait for executing synchronous update handlers on a workflow.
197pub trait ExecutableSyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
198    /// Handle an update with the given input and return the result.
199    fn handle(
200        &mut self,
201        ctx: &mut SyncWorkflowContext<Self>,
202        input: U::Input,
203    ) -> Result<U::Output, Box<dyn std::error::Error + Send + Sync>>;
204
205    /// Validate an update before it is applied.
206    fn validate(
207        &self,
208        _ctx: &WorkflowContextView,
209        _input: &U::Input,
210    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
211        Ok(())
212    }
213
214    /// Dispatch the update with an already decoded input.
215    fn dispatch(
216        ctx: WorkflowContext<Self>,
217        input: Box<dyn Any>,
218        converter: &PayloadConverter,
219    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
220        let input = downcast_handler_input::<U::Input>(input, "update");
221        let mut sync_ctx = ctx.sync_context();
222        let result = ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
223        match result {
224            Ok(output) => match serialize_output(&output, converter) {
225                Ok(payload) => std::future::ready(Ok(payload)).boxed_local(),
226                Err(e) => std::future::ready(Err(e)).boxed_local(),
227            },
228            Err(e) => std::future::ready(Err(WorkflowError::Execution(e))).boxed_local(),
229        }
230    }
231
232    /// Dispatch validation with an already decoded input.
233    fn dispatch_validate(
234        &self,
235        ctx: &WorkflowContextView,
236        input: Box<dyn Any>,
237    ) -> Result<(), WorkflowError> {
238        let input = downcast_handler_input::<U::Input>(input, "update validation");
239        self.validate(ctx, &input).map_err(WorkflowError::Execution)
240    }
241}
242
243/// Trait for executing asynchronous update handlers on a workflow.
244pub trait ExecutableAsyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
245    /// Handle an update with the given input and return the result.
246    fn handle(
247        ctx: WorkflowContext<Self>,
248        input: U::Input,
249    ) -> LocalBoxFuture<'static, Result<U::Output, Box<dyn std::error::Error + Send + Sync>>>;
250
251    /// Validate an update before it is applied.
252    fn validate(
253        &self,
254        _ctx: &WorkflowContextView,
255        _input: &U::Input,
256    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
257        Ok(())
258    }
259
260    /// Dispatch the update with an already decoded input.
261    fn dispatch(
262        ctx: WorkflowContext<Self>,
263        input: Box<dyn Any>,
264        converter: &PayloadConverter,
265    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
266        let input = downcast_handler_input::<U::Input>(input, "update");
267        let converter = converter.clone();
268        async move {
269            let output = Self::handle(ctx, input)
270                .await
271                .map_err(WorkflowError::Execution)?;
272            serialize_output(&output, &converter)
273        }
274        .boxed_local()
275    }
276
277    /// Dispatch validation with an already decoded input.
278    fn dispatch_validate(
279        &self,
280        ctx: &WorkflowContextView,
281        input: Box<dyn Any>,
282    ) -> Result<(), WorkflowError> {
283        let input = downcast_handler_input::<U::Input>(input, "update validation");
284        self.validate(ctx, &input).map_err(WorkflowError::Execution)
285    }
286}
287
288/// Serialize handler output to a payload.
289pub(crate) fn serialize_output<O: TemporalSerializable + 'static>(
290    output: &O,
291    converter: &PayloadConverter,
292) -> Result<Payload, WorkflowError> {
293    let ctx = SerializationContext {
294        data: &SerializationContextData::Workflow,
295        converter,
296    };
297    converter.to_payload(&ctx, output).map_err(Into::into)
298}
299
300/// Serialize a workflow result value to a payload.
301pub fn serialize_result<T: TemporalSerializable + 'static>(
302    result: T,
303    converter: &PayloadConverter,
304) -> Result<Payload, WorkflowError> {
305    serialize_output(&result, converter)
306}