Skip to main content

temporalio_sdk/
workflows.rs

1//! Functionality related to defining and interacting with workflows
2//!
3//! This module contains traits and types for implementing workflows using the
4//! `#[workflow]` and `#[workflow_methods]` macros.
5//!
6//! Example usage:
7//! ```
8//! use temporalio_macros::{workflow, workflow_methods};
9//! use temporalio_sdk::{
10//!     SyncWorkflowContext, WorkflowContext, WorkflowContextView, WorkflowResult,
11//! };
12//!
13//! #[workflow]
14//! pub struct MyWorkflow {
15//!     counter: u32,
16//! }
17//!
18//! #[workflow_methods]
19//! impl MyWorkflow {
20//!     #[init]
21//!     pub fn new(ctx: &WorkflowContextView, input: String) -> Self {
22//!         Self { counter: 0 }
23//!     }
24//!
25//!     // Async run method uses ctx.state() for reading
26//!     #[run]
27//!     pub async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
28//!         let counter = ctx.state(|s| s.counter);
29//!         Ok(format!("Done with counter: {}", counter))
30//!     }
31//!
32//!     // Sync signals use &mut self for direct mutations
33//!     #[signal]
34//!     pub fn increment(&mut self, ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
35//!         self.counter += amount;
36//!     }
37//!
38//!     // Queries use &self with read-only context
39//!     #[query]
40//!     pub fn get_counter(&self, ctx: &WorkflowContextView) -> u32 {
41//!         self.counter
42//!     }
43//! }
44//! ```
45
46/// Deterministic `select!` for use in Temporal workflows.
47///
48/// Polls branches in declaration order (top to bottom), ensuring deterministic
49/// behavior across workflow replays. Delegates to [`futures_util::select_biased!`].
50///
51/// All workflow futures (timers, activities, child workflows, etc.) implement
52/// `FusedFuture`, so they can be stored in variables and passed to `select!`
53/// without needing `.fuse()`.
54///
55/// # Example
56///
57/// ```ignore
58/// use temporalio_sdk::workflows::select;
59/// use temporalio_sdk::WorkflowContext;
60/// use std::time::Duration;
61///
62/// # async fn hidden(ctx: &mut WorkflowContext<()>) {
63/// select! {
64///     _ = ctx.timer(Duration::from_secs(60)) => { /* timer fired */ }
65///     reason = ctx.cancelled() => { /* cancelled */ }
66/// };
67/// # }
68/// ```
69#[doc(inline)]
70pub use crate::__temporal_select as select;
71
72/// Deterministic `join!` for use in Temporal workflows.
73///
74/// Polls all futures concurrently to completion in declaration order,
75/// ensuring deterministic behavior across workflow replays. Delegates
76/// to [`futures_util::join!`].
77///
78/// # Example
79///
80/// ```ignore
81/// use temporalio_sdk::workflows::join;
82///
83/// # async fn hidden() {
84/// let future_a = async { 1 };
85/// let future_b = async { 2 };
86/// let (a, b) = join!(future_a, future_b);
87/// # }
88/// ```
89#[doc(inline)]
90pub use crate::__temporal_join as join;
91
92/// Deterministic `join_all` for use in Temporal workflows.
93///
94/// Polls a collection of futures concurrently to completion in declaration order,
95/// returning a `Vec` of their results. Delegates to [`futures_util::future::join_all`].
96///
97/// # Example
98///
99/// ```ignore
100/// use temporalio_sdk::workflows::join_all;
101/// use temporalio_sdk::WorkflowContext;
102/// use std::time::Duration;
103///
104/// # async fn hidden(ctx: &mut WorkflowContext<()>) {
105/// let timers = vec![
106///     ctx.timer(Duration::from_secs(1)),
107///     ctx.timer(Duration::from_secs(2)),
108/// ];
109/// let results = join_all(timers).await;
110/// # }
111/// ```
112pub use futures_util::future::join_all;
113
114use crate::{
115    BaseWorkflowContext, SyncWorkflowContext, WorkflowContext, WorkflowContextView,
116    WorkflowTermination,
117};
118use futures_util::future::{Fuse, FutureExt, LocalBoxFuture};
119use std::{
120    cell::RefCell,
121    collections::HashMap,
122    fmt::Debug,
123    pin::Pin,
124    rc::Rc,
125    sync::Arc,
126    task::{Context as TaskContext, Poll},
127};
128use temporalio_common::{
129    QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
130    data_converters::{
131        GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
132        SerializationContextData, TemporalDeserializable, TemporalSerializable,
133    },
134    protos::temporal::api::{
135        common::v1::{Payload, Payloads},
136        failure::v1::Failure,
137    },
138};
139
140/// Error type for workflow operations
141#[derive(Debug, thiserror::Error)]
142pub enum WorkflowError {
143    /// Error during payload conversion
144    #[error("Payload conversion error: {0}")]
145    PayloadConversion(#[from] PayloadConversionError),
146
147    /// Workflow execution error
148    #[error("Workflow execution error: {0}")]
149    Execution(#[from] anyhow::Error),
150}
151
152impl From<WorkflowError> for Failure {
153    fn from(err: WorkflowError) -> Self {
154        Failure {
155            message: err.to_string(),
156            ..Default::default()
157        }
158    }
159}
160
161/// Trait implemented by workflow structs to enable execution by the worker.
162///
163/// This trait is typically generated by the `#[workflow_methods]` macro and should not
164/// be implemented manually in most cases.
165#[doc(hidden)]
166pub trait WorkflowImplementation: Sized + 'static {
167    /// The marker struct for the run method that implements `WorkflowDefinition`
168    type Run: WorkflowDefinition;
169
170    /// Whether this workflow has a user-defined `#[init]` method.
171    /// Set to `true` by the macro when `#[init]` is present, `false` otherwise.
172    const HAS_INIT: bool;
173
174    /// Whether the init method accepts the workflow input.
175    /// If true, input goes to init. If false, input goes to run.
176    const INIT_TAKES_INPUT: bool;
177
178    /// Returns the workflow type name.
179    fn name() -> &'static str;
180
181    /// Initialize the workflow instance.
182    ///
183    /// This is called when a new workflow execution starts. If `INIT_TAKES_INPUT` is true,
184    /// `input` will be `Some`. Otherwise it's `None`.
185    fn init(
186        ctx: WorkflowContextView,
187        input: Option<<Self::Run as WorkflowDefinition>::Input>,
188    ) -> Self;
189
190    /// Execute the workflow's main run function.
191    ///
192    /// If `INIT_TAKES_INPUT` is false, `input` will be `Some`. Otherwise it's `None`.
193    fn run(
194        ctx: WorkflowContext<Self>,
195        input: Option<<Self::Run as WorkflowDefinition>::Input>,
196    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>;
197
198    /// Dispatch an update request by name. Returns `None` if no handler for that name.
199    fn dispatch_update(
200        _ctx: WorkflowContext<Self>,
201        _name: &str,
202        _payloads: Payloads,
203        _converter: &PayloadConverter,
204    ) -> Option<LocalBoxFuture<'static, Result<Payload, WorkflowError>>> {
205        None
206    }
207
208    /// Validate an update request by name.
209    ///
210    /// Returns `None` if no handler for that name, `Some(Ok(()))` if valid,
211    /// `Some(Err(...))` if validation failed.
212    fn validate_update(
213        &self,
214        _ctx: WorkflowContextView,
215        _name: &str,
216        _payloads: &Payloads,
217        _converter: &PayloadConverter,
218    ) -> Option<Result<(), WorkflowError>> {
219        None
220    }
221
222    /// Dispatch a signal by name.
223    ///
224    /// Returns `None` if no handler for that name. For sync signals, the mutation happens
225    /// immediately and returns a completed future. For async signals, returns a future
226    /// that must be polled to completion.
227    fn dispatch_signal(
228        _ctx: WorkflowContext<Self>,
229        _name: &str,
230        _payloads: Payloads,
231        _converter: &PayloadConverter,
232    ) -> Option<LocalBoxFuture<'static, Result<(), WorkflowError>>> {
233        None
234    }
235
236    /// Dispatch a query by name.
237    ///
238    /// Returns `None` if no handler for that name, `Some(Ok(payload))` on success,
239    /// `Some(Err(...))` on failure. Queries are synchronous and read-only.
240    fn dispatch_query(
241        &self,
242        _ctx: WorkflowContextView,
243        _name: &str,
244        _payloads: &Payloads,
245        _converter: &PayloadConverter,
246    ) -> Option<Result<Payload, WorkflowError>> {
247        None
248    }
249}
250
251// NOTE: In the below traits, the dispatch functions take context by ownership while the handle
252// methods take them by ref when sync and by ownership when async. They must be owned by async
253// handlers since the returned futures must be 'static.
254
255/// Trait for executing synchronous signal handlers on a workflow.
256#[doc(hidden)]
257pub trait ExecutableSyncSignal<S: SignalDefinition>: WorkflowImplementation {
258    /// Handle an incoming signal with the given input.
259    fn handle(&mut self, ctx: &mut SyncWorkflowContext<Self>, input: S::Input);
260
261    /// Dispatch the signal with payload deserialization.
262    fn dispatch(
263        ctx: WorkflowContext<Self>,
264        payloads: Payloads,
265        converter: &PayloadConverter,
266    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
267        match deserialize_input::<S::Input>(payloads.payloads, converter) {
268            Ok(input) => {
269                let mut sync_ctx = ctx.sync_context();
270                ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
271                std::future::ready(Ok(())).boxed_local()
272            }
273            Err(e) => std::future::ready(Err(e)).boxed_local(),
274        }
275    }
276}
277
278/// Trait for executing asynchronous signal handlers on a workflow.
279#[doc(hidden)]
280pub trait ExecutableAsyncSignal<S: SignalDefinition>: WorkflowImplementation {
281    /// Handle an incoming signal with the given input.
282    fn handle(ctx: WorkflowContext<Self>, input: S::Input) -> LocalBoxFuture<'static, ()>;
283
284    /// Dispatch the signal with payload deserialization.
285    fn dispatch(
286        ctx: WorkflowContext<Self>,
287        payloads: Payloads,
288        converter: &PayloadConverter,
289    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
290        match deserialize_input::<S::Input>(payloads.payloads, converter) {
291            Ok(input) => Self::handle(ctx, input).map(|()| Ok(())).boxed_local(),
292            Err(e) => std::future::ready(Err(e)).boxed_local(),
293        }
294    }
295}
296
297/// Trait for executing query handlers on a workflow.
298///
299/// Queries are read-only operations that do not mutate workflow state.
300/// They must be synchronous.
301#[doc(hidden)]
302pub trait ExecutableQuery<Q: QueryDefinition>: WorkflowImplementation {
303    /// Handle a query with the given input and return the result.
304    ///
305    /// Queries take `&self` (immutable) and cannot modify workflow state.
306    /// Returning an error will cause the query to fail with that error message.
307    fn handle(
308        &self,
309        ctx: &WorkflowContextView,
310        input: Q::Input,
311    ) -> Result<Q::Output, Box<dyn std::error::Error + Send + Sync>>;
312
313    /// Dispatch the query with payload deserialization and output serialization.
314    fn dispatch(
315        &self,
316        ctx: &WorkflowContextView,
317        payloads: &Payloads,
318        converter: &PayloadConverter,
319    ) -> Result<Payload, WorkflowError> {
320        let input = deserialize_input::<Q::Input>(payloads.payloads.clone(), converter)?;
321        let output = self.handle(ctx, input).map_err(wrap_handler_error)?;
322        serialize_output(&output, converter)
323    }
324}
325
326/// Trait for executing synchronous update handlers on a workflow.
327#[doc(hidden)]
328pub trait ExecutableSyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
329    /// Handle an update with the given input and return the result.
330    /// Returning an error will cause the update to fail with that error message.
331    fn handle(
332        &mut self,
333        ctx: &mut SyncWorkflowContext<Self>,
334        input: U::Input,
335    ) -> Result<U::Output, Box<dyn std::error::Error + Send + Sync>>;
336
337    /// Validate an update before it is applied.
338    fn validate(
339        &self,
340        _ctx: &WorkflowContextView,
341        _input: &U::Input,
342    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
343        Ok(())
344    }
345
346    /// Dispatch the update with payload deserialization and output serialization.
347    fn dispatch(
348        ctx: WorkflowContext<Self>,
349        payloads: Payloads,
350        converter: &PayloadConverter,
351    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
352        let input = match deserialize_input::<U::Input>(payloads.payloads, converter) {
353            Ok(v) => v,
354            Err(e) => return std::future::ready(Err(e)).boxed_local(),
355        };
356        let converter = converter.clone();
357        let mut sync_ctx = ctx.sync_context();
358        let result = ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
359        match result {
360            Ok(output) => match serialize_output(&output, &converter) {
361                Ok(payload) => std::future::ready(Ok(payload)).boxed_local(),
362                Err(e) => std::future::ready(Err(e)).boxed_local(),
363            },
364            Err(e) => std::future::ready(Err(wrap_handler_error(e))).boxed_local(),
365        }
366    }
367
368    /// Dispatch validation with payload deserialization.
369    fn dispatch_validate(
370        &self,
371        ctx: &WorkflowContextView,
372        payloads: &Payloads,
373        converter: &PayloadConverter,
374    ) -> Result<(), WorkflowError> {
375        let input = deserialize_input::<U::Input>(payloads.payloads.clone(), converter)?;
376        self.validate(ctx, &input).map_err(wrap_handler_error)
377    }
378}
379
380/// Trait for executing asynchronous update handlers on a workflow.
381#[doc(hidden)]
382pub trait ExecutableAsyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
383    /// Handle an update with the given input and return the result.
384    /// Returning an error will cause the update to fail with that error message.
385    fn handle(
386        ctx: WorkflowContext<Self>,
387        input: U::Input,
388    ) -> LocalBoxFuture<'static, Result<U::Output, Box<dyn std::error::Error + Send + Sync>>>;
389
390    /// Validate an update before it is applied.
391    fn validate(
392        &self,
393        _ctx: &WorkflowContextView,
394        _input: &U::Input,
395    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
396        Ok(())
397    }
398
399    /// Dispatch the update with payload deserialization and output serialization.
400    fn dispatch(
401        ctx: WorkflowContext<Self>,
402        payloads: Payloads,
403        converter: &PayloadConverter,
404    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
405        let input = match deserialize_input::<U::Input>(payloads.payloads, converter) {
406            Ok(v) => v,
407            Err(e) => return std::future::ready(Err(e)).boxed_local(),
408        };
409        let converter = converter.clone();
410        async move {
411            let output = Self::handle(ctx, input).await.map_err(wrap_handler_error)?;
412            serialize_output(&output, &converter)
413        }
414        .boxed_local()
415    }
416
417    /// Dispatch validation with payload deserialization.
418    fn dispatch_validate(
419        &self,
420        ctx: &WorkflowContextView,
421        payloads: &Payloads,
422        converter: &PayloadConverter,
423    ) -> Result<(), WorkflowError> {
424        let input = deserialize_input::<U::Input>(payloads.payloads.clone(), converter)?;
425        self.validate(ctx, &input).map_err(wrap_handler_error)
426    }
427}
428
429/// Data passed to handler dispatch methods (signals, updates, queries).
430pub(crate) struct DispatchData<'a> {
431    pub(crate) payloads: Payloads,
432    pub(crate) headers: HashMap<String, Payload>,
433    pub(crate) converter: &'a PayloadConverter,
434}
435
436/// Trait implemented by workflow types to enable registration with workers.
437///
438/// This trait is automatically generated by the `#[workflow_methods]` macro.
439#[doc(hidden)]
440pub trait WorkflowImplementer: WorkflowImplementation {
441    /// Register this workflow and all its handlers with the given definitions container.
442    fn register_all(defs: &mut WorkflowDefinitions);
443}
444
445/// Type-erased trait for workflow execution instances.
446pub(crate) trait DynWorkflowExecution {
447    /// Poll the run future.
448    fn poll_run(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<Payload, WorkflowTermination>>;
449
450    /// Validate an update request. Returns `None` if no handler.
451    fn validate_update(&self, name: &str, data: &DispatchData)
452    -> Option<Result<(), WorkflowError>>;
453
454    /// Start an update handler. Returns `None` if no handler for that name.
455    fn start_update(
456        &mut self,
457        name: &str,
458        data: DispatchData,
459    ) -> Option<LocalBoxFuture<'static, Result<Payload, WorkflowError>>>;
460
461    /// Dispatch a signal by name. Returns `None` if no handler.
462    fn dispatch_signal(
463        &mut self,
464        name: &str,
465        data: DispatchData,
466    ) -> Option<LocalBoxFuture<'static, Result<(), WorkflowError>>>;
467
468    /// Dispatch a query by name. Returns `None` if no handler.
469    fn dispatch_query(
470        &self,
471        name: &str,
472        data: DispatchData,
473    ) -> Option<Result<Payload, WorkflowError>>;
474}
475
476/// Manages a workflow execution, holding the context and run future.
477pub(crate) struct WorkflowExecution<W: WorkflowImplementation> {
478    ctx: WorkflowContext<W>,
479    run_future: Fuse<LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>>,
480}
481
482impl<W: WorkflowImplementation> WorkflowExecution<W>
483where
484    <W::Run as WorkflowDefinition>::Input: Send,
485{
486    /// Create a new workflow execution using the workflow's `init` method.
487    pub(crate) fn new(
488        base_ctx: BaseWorkflowContext,
489        init_input: Option<<W::Run as WorkflowDefinition>::Input>,
490        run_input: Option<<W::Run as WorkflowDefinition>::Input>,
491    ) -> Self {
492        let view = base_ctx.view();
493        let workflow = W::init(view, init_input);
494        Self::new_with_workflow(workflow, base_ctx, run_input)
495    }
496
497    /// Create a new workflow execution from an already-created workflow instance.
498    pub(crate) fn new_with_workflow(
499        workflow: W,
500        base_ctx: BaseWorkflowContext,
501        run_input: Option<<W::Run as WorkflowDefinition>::Input>,
502    ) -> Self {
503        let workflow = Rc::new(RefCell::new(workflow));
504        let ctx = WorkflowContext::from_base(base_ctx, workflow);
505        let run_future = W::run(ctx.clone(), run_input).fuse();
506
507        Self { ctx, run_future }
508    }
509}
510
511impl<W: WorkflowImplementation> DynWorkflowExecution for WorkflowExecution<W> {
512    fn poll_run(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<Payload, WorkflowTermination>> {
513        Pin::new(&mut self.run_future).poll(cx)
514    }
515
516    fn validate_update(
517        &self,
518        name: &str,
519        data: &DispatchData,
520    ) -> Option<Result<(), WorkflowError>> {
521        let view = self.ctx.view();
522        self.ctx
523            .state(|wf| wf.validate_update(view, name, &data.payloads, data.converter))
524    }
525
526    fn start_update(
527        &mut self,
528        name: &str,
529        data: DispatchData,
530    ) -> Option<LocalBoxFuture<'static, Result<Payload, WorkflowError>>> {
531        let ctx = self.ctx.with_headers(data.headers);
532        W::dispatch_update(ctx, name, data.payloads, data.converter)
533    }
534
535    fn dispatch_signal(
536        &mut self,
537        name: &str,
538        data: DispatchData,
539    ) -> Option<LocalBoxFuture<'static, Result<(), WorkflowError>>> {
540        let ctx = self.ctx.with_headers(data.headers);
541        W::dispatch_signal(ctx, name, data.payloads, data.converter)
542    }
543
544    fn dispatch_query(
545        &self,
546        name: &str,
547        data: DispatchData,
548    ) -> Option<Result<Payload, WorkflowError>> {
549        let view = self.ctx.view();
550        self.ctx
551            .state(|wf| wf.dispatch_query(view, name, &data.payloads, data.converter))
552    }
553}
554
555/// Type alias for workflow execution factory functions.
556///
557/// Creates a new `WorkflowExecution` instance from the input payloads and context.
558pub(crate) type WorkflowExecutionFactory = Arc<
559    dyn Fn(
560            Vec<Payload>,
561            PayloadConverter,
562            BaseWorkflowContext,
563        ) -> Result<Box<dyn DynWorkflowExecution>, PayloadConversionError>
564        + Send
565        + Sync,
566>;
567
568/// Contains workflow registrations in a form ready for execution by workers.
569#[derive(Default, Clone)]
570pub struct WorkflowDefinitions {
571    /// Maps workflow type name to execution factories
572    workflows: HashMap<&'static str, WorkflowExecutionFactory>,
573}
574
575impl WorkflowDefinitions {
576    /// Creates a new empty `WorkflowDefinitions`.
577    pub fn new() -> Self {
578        Self::default()
579    }
580
581    /// Register a workflow implementation.
582    pub fn register_workflow<W: WorkflowImplementer>(&mut self) -> &mut Self {
583        W::register_all(self);
584        self
585    }
586
587    /// Register a specific workflow's run method.
588    #[doc(hidden)]
589    pub fn register_workflow_run<W: WorkflowImplementation>(&mut self) -> &mut Self
590    where
591        <W::Run as WorkflowDefinition>::Input: Send,
592    {
593        let workflow_name = W::name();
594        let factory: WorkflowExecutionFactory =
595            Arc::new(move |payloads, converter: PayloadConverter, base_ctx| {
596                let ser_ctx = SerializationContext {
597                    data: &SerializationContextData::Workflow,
598                    converter: &converter,
599                };
600                let input = converter.from_payloads(&ser_ctx, payloads)?;
601                let (init_input, run_input) = if W::INIT_TAKES_INPUT {
602                    (Some(input), None)
603                } else {
604                    (None, Some(input))
605                };
606                Ok(
607                    Box::new(WorkflowExecution::<W>::new(base_ctx, init_input, run_input))
608                        as Box<dyn DynWorkflowExecution>,
609                )
610            });
611        self.workflows.insert(workflow_name, factory);
612        self
613    }
614
615    /// Register a workflow with a custom factory for instance creation.
616    pub fn register_workflow_run_with_factory<W, F>(&mut self, user_factory: F) -> &mut Self
617    where
618        W: WorkflowImplementation,
619        <W::Run as WorkflowDefinition>::Input: Send,
620        F: Fn() -> W + Send + Sync + 'static,
621    {
622        assert!(
623            !W::HAS_INIT,
624            "Workflows registered with a factory must not define an #[init] method. \
625             The factory replaces init for instance creation."
626        );
627
628        let workflow_name = W::name();
629        let user_factory = Arc::new(user_factory);
630        let factory: WorkflowExecutionFactory =
631            Arc::new(move |payloads, converter: PayloadConverter, base_ctx| {
632                let ser_ctx = SerializationContext {
633                    data: &SerializationContextData::Workflow,
634                    converter: &converter,
635                };
636                let input: <W::Run as WorkflowDefinition>::Input =
637                    converter.from_payloads(&ser_ctx, payloads)?;
638
639                // User factory creates the instance - input always goes to run()
640                let workflow = user_factory();
641                Ok(Box::new(WorkflowExecution::<W>::new_with_workflow(
642                    workflow,
643                    base_ctx,
644                    Some(input),
645                )) as Box<dyn DynWorkflowExecution>)
646            });
647
648        self.workflows.insert(workflow_name, factory);
649        self
650    }
651
652    /// Check if any workflows are registered.
653    pub fn is_empty(&self) -> bool {
654        self.workflows.is_empty()
655    }
656
657    /// Get the workflow execution factory for a given workflow type.
658    pub(crate) fn get_workflow(&self, workflow_type: &str) -> Option<WorkflowExecutionFactory> {
659        self.workflows.get(workflow_type).cloned()
660    }
661
662    /// Returns an iterator over registered workflow type names.
663    pub fn workflow_types(&self) -> impl Iterator<Item = &'static str> + '_ {
664        self.workflows.keys().copied()
665    }
666}
667
668impl Debug for WorkflowDefinitions {
669    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670        f.debug_struct("WorkflowDefinitions")
671            .field("workflows", &self.workflows.keys().collect::<Vec<_>>())
672            .finish()
673    }
674}
675
676/// Deserialize handler input from payloads.
677pub fn deserialize_input<I: TemporalDeserializable + 'static>(
678    payloads: Vec<Payload>,
679    converter: &PayloadConverter,
680) -> Result<I, WorkflowError> {
681    let ctx = SerializationContext {
682        data: &SerializationContextData::Workflow,
683        converter,
684    };
685    converter.from_payloads(&ctx, payloads).map_err(Into::into)
686}
687
688/// Serialize handler output to a payload.
689pub fn serialize_output<O: TemporalSerializable + 'static>(
690    output: &O,
691    converter: &PayloadConverter,
692) -> Result<Payload, WorkflowError> {
693    let ctx = SerializationContext {
694        data: &SerializationContextData::Workflow,
695        converter,
696    };
697    converter.to_payload(&ctx, output).map_err(Into::into)
698}
699
700/// Wrap a handler error into WorkflowError.
701pub fn wrap_handler_error(e: Box<dyn std::error::Error + Send + Sync>) -> WorkflowError {
702    WorkflowError::Execution(anyhow::anyhow!(e))
703}
704
705/// Serialize a workflow result value to a payload.
706pub fn serialize_result<T: TemporalSerializable + 'static>(
707    result: T,
708    converter: &PayloadConverter,
709) -> Result<Payload, WorkflowError> {
710    serialize_output(&result, converter)
711}