Skip to main content

temporalio_sdk/
workflows.rs

1//! Functionality related to defining and interacting with workflows
2//!
3//! This module contains traits and types for implementing workflows using the
4//! `#[workflow]` and `#[workflow_methods]` macros.
5//!
6//! Example usage:
7//! ```
8//! use temporalio_macros::{workflow, workflow_methods};
9//! use temporalio_sdk::{
10//!     SyncWorkflowContext, WorkflowContext, WorkflowContextView, WorkflowResult,
11//! };
12//!
13//! #[workflow]
14//! pub struct MyWorkflow {
15//!     counter: u32,
16//! }
17//!
18//! #[workflow_methods]
19//! impl MyWorkflow {
20//!     #[init]
21//!     pub fn new(ctx: &WorkflowContextView, input: String) -> Self {
22//!         Self { counter: 0 }
23//!     }
24//!
25//!     // Async run method uses ctx.state() for reading
26//!     #[run]
27//!     pub async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
28//!         let counter = ctx.state(|s| s.counter);
29//!         Ok(format!("Done with counter: {}", counter))
30//!     }
31//!
32//!     // Sync signals use &mut self for direct mutations
33//!     #[signal]
34//!     pub fn increment(&mut self, ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
35//!         self.counter += amount;
36//!     }
37//!
38//!     // Queries use &self with read-only context
39//!     #[query]
40//!     pub fn get_counter(&self, ctx: &WorkflowContextView) -> u32 {
41//!         self.counter
42//!     }
43//! }
44//! ```
45
46/// Deterministic `select!` for use in Temporal workflows.
47///
48/// Polls branches in declaration order (top to bottom), ensuring deterministic
49/// behavior across workflow replays. Delegates to [`futures_util::select_biased!`].
50///
51/// All workflow futures (timers, activities, child workflows, etc.) implement
52/// `FusedFuture`, so they can be stored in variables and passed to `select!`
53/// without needing `.fuse()`.
54///
55/// # Example
56///
57/// ```ignore
58/// use temporalio_sdk::workflows::select;
59/// use temporalio_sdk::WorkflowContext;
60/// use std::time::Duration;
61///
62/// # async fn hidden(ctx: &mut WorkflowContext<()>) {
63/// select! {
64///     _ = ctx.timer(Duration::from_secs(60)) => { /* timer fired */ }
65///     reason = ctx.cancelled() => { /* cancelled */ }
66/// };
67/// # }
68/// ```
69#[doc(inline)]
70pub use crate::__temporal_select as select;
71
72/// Deterministic `join!` for use in Temporal workflows.
73///
74/// Polls all futures concurrently to completion in declaration order,
75/// ensuring deterministic behavior across workflow replays. Delegates
76/// to [`futures_util::join!`].
77///
78/// # Example
79///
80/// ```ignore
81/// use temporalio_sdk::workflows::join;
82///
83/// # async fn hidden() {
84/// let future_a = async { 1 };
85/// let future_b = async { 2 };
86/// let (a, b) = join!(future_a, future_b);
87/// # }
88/// ```
89#[doc(inline)]
90pub use crate::__temporal_join as join;
91
92use crate::{
93    BaseWorkflowContext, SyncWorkflowContext, WorkflowContext, WorkflowContextView,
94    WorkflowTermination, workflow_executor::SdkGuardedFuture,
95};
96use futures_util::future::{Fuse, FutureExt, LocalBoxFuture};
97use std::{
98    cell::RefCell,
99    collections::HashMap,
100    fmt::Debug,
101    pin::Pin,
102    rc::Rc,
103    sync::Arc,
104    task::{Context as TaskContext, Poll},
105};
106use temporalio_common::{
107    QueryDefinition, SignalDefinition, UpdateDefinition, WorkflowDefinition,
108    data_converters::{
109        GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
110        SerializationContextData, TemporalDeserializable, TemporalSerializable,
111    },
112    protos::temporal::api::{
113        common::v1::{Payload, Payloads},
114        failure::v1::Failure,
115    },
116};
117
118/// Error type for workflow operations
119#[derive(Debug, thiserror::Error)]
120pub enum WorkflowError {
121    /// Error during payload conversion
122    #[error("Payload conversion error: {0}")]
123    PayloadConversion(#[from] PayloadConversionError),
124
125    /// Workflow execution error
126    #[error("Workflow execution error: {0}")]
127    Execution(#[from] anyhow::Error),
128}
129
130impl From<WorkflowError> for Failure {
131    fn from(err: WorkflowError) -> Self {
132        Failure {
133            message: err.to_string(),
134            ..Default::default()
135        }
136    }
137}
138
139/// Trait implemented by workflow structs to enable execution by the worker.
140///
141/// This trait is typically generated by the `#[workflow_methods]` macro and should not
142/// be implemented manually in most cases.
143#[doc(hidden)]
144pub trait WorkflowImplementation: Sized + 'static {
145    /// The marker struct for the run method that implements `WorkflowDefinition`
146    type Run: WorkflowDefinition;
147
148    /// Whether this workflow has a user-defined `#[init]` method.
149    /// Set to `true` by the macro when `#[init]` is present, `false` otherwise.
150    const HAS_INIT: bool;
151
152    /// Whether the init method accepts the workflow input.
153    /// If true, input goes to init. If false, input goes to run.
154    const INIT_TAKES_INPUT: bool;
155
156    /// Returns the workflow type name.
157    fn name() -> &'static str;
158
159    /// Initialize the workflow instance.
160    ///
161    /// This is called when a new workflow execution starts. If `INIT_TAKES_INPUT` is true,
162    /// `input` will be `Some`. Otherwise it's `None`.
163    fn init(
164        ctx: WorkflowContextView,
165        input: Option<<Self::Run as WorkflowDefinition>::Input>,
166    ) -> Self;
167
168    /// Execute the workflow's main run function.
169    ///
170    /// If `INIT_TAKES_INPUT` is false, `input` will be `Some`. Otherwise it's `None`.
171    fn run(
172        ctx: WorkflowContext<Self>,
173        input: Option<<Self::Run as WorkflowDefinition>::Input>,
174    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>;
175
176    /// Dispatch an update request by name. Returns `None` if no handler for that name.
177    fn dispatch_update(
178        _ctx: WorkflowContext<Self>,
179        _name: &str,
180        _payloads: Payloads,
181        _converter: &PayloadConverter,
182    ) -> Option<LocalBoxFuture<'static, Result<Payload, WorkflowError>>> {
183        None
184    }
185
186    /// Validate an update request by name.
187    ///
188    /// Returns `None` if no handler for that name, `Some(Ok(()))` if valid,
189    /// `Some(Err(...))` if validation failed.
190    fn validate_update(
191        &self,
192        _ctx: WorkflowContextView,
193        _name: &str,
194        _payloads: &Payloads,
195        _converter: &PayloadConverter,
196    ) -> Option<Result<(), WorkflowError>> {
197        None
198    }
199
200    /// Dispatch a signal by name.
201    ///
202    /// Returns `None` if no handler for that name. For sync signals, the mutation happens
203    /// immediately and returns a completed future. For async signals, returns a future
204    /// that must be polled to completion.
205    fn dispatch_signal(
206        _ctx: WorkflowContext<Self>,
207        _name: &str,
208        _payloads: Payloads,
209        _converter: &PayloadConverter,
210    ) -> Option<LocalBoxFuture<'static, Result<(), WorkflowError>>> {
211        None
212    }
213
214    /// Dispatch a query by name.
215    ///
216    /// Returns `None` if no handler for that name, `Some(Ok(payload))` on success,
217    /// `Some(Err(...))` on failure. Queries are synchronous and read-only.
218    fn dispatch_query(
219        &self,
220        _ctx: WorkflowContextView,
221        _name: &str,
222        _payloads: &Payloads,
223        _converter: &PayloadConverter,
224    ) -> Option<Result<Payload, WorkflowError>> {
225        None
226    }
227}
228
229// NOTE: In the below traits, the dispatch functions take context by ownership while the handle
230// methods take them by ref when sync and by ownership when async. They must be owned by async
231// handlers since the returned futures must be 'static.
232
233/// Trait for executing synchronous signal handlers on a workflow.
234#[doc(hidden)]
235pub trait ExecutableSyncSignal<S: SignalDefinition>: WorkflowImplementation {
236    /// Handle an incoming signal with the given input.
237    fn handle(&mut self, ctx: &mut SyncWorkflowContext<Self>, input: S::Input);
238
239    /// Dispatch the signal with payload deserialization.
240    fn dispatch(
241        ctx: WorkflowContext<Self>,
242        payloads: Payloads,
243        converter: &PayloadConverter,
244    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
245        match deserialize_input::<S::Input>(payloads.payloads, converter) {
246            Ok(input) => {
247                let mut sync_ctx = ctx.sync_context();
248                ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
249                std::future::ready(Ok(())).boxed_local()
250            }
251            Err(e) => std::future::ready(Err(e)).boxed_local(),
252        }
253    }
254}
255
256/// Trait for executing asynchronous signal handlers on a workflow.
257#[doc(hidden)]
258pub trait ExecutableAsyncSignal<S: SignalDefinition>: WorkflowImplementation {
259    /// Handle an incoming signal with the given input.
260    fn handle(ctx: WorkflowContext<Self>, input: S::Input) -> LocalBoxFuture<'static, ()>;
261
262    /// Dispatch the signal with payload deserialization.
263    fn dispatch(
264        ctx: WorkflowContext<Self>,
265        payloads: Payloads,
266        converter: &PayloadConverter,
267    ) -> LocalBoxFuture<'static, Result<(), WorkflowError>> {
268        match deserialize_input::<S::Input>(payloads.payloads, converter) {
269            Ok(input) => Self::handle(ctx, input).map(|()| Ok(())).boxed_local(),
270            Err(e) => std::future::ready(Err(e)).boxed_local(),
271        }
272    }
273}
274
275/// Trait for executing query handlers on a workflow.
276///
277/// Queries are read-only operations that do not mutate workflow state.
278/// They must be synchronous.
279#[doc(hidden)]
280pub trait ExecutableQuery<Q: QueryDefinition>: WorkflowImplementation {
281    /// Handle a query with the given input and return the result.
282    ///
283    /// Queries take `&self` (immutable) and cannot modify workflow state.
284    /// Returning an error will cause the query to fail with that error message.
285    fn handle(
286        &self,
287        ctx: &WorkflowContextView,
288        input: Q::Input,
289    ) -> Result<Q::Output, Box<dyn std::error::Error + Send + Sync>>;
290
291    /// Dispatch the query with payload deserialization and output serialization.
292    fn dispatch(
293        &self,
294        ctx: &WorkflowContextView,
295        payloads: &Payloads,
296        converter: &PayloadConverter,
297    ) -> Result<Payload, WorkflowError> {
298        let input = deserialize_input::<Q::Input>(payloads.payloads.clone(), converter)?;
299        let output = self.handle(ctx, input).map_err(wrap_handler_error)?;
300        serialize_output(&output, converter)
301    }
302}
303
304/// Trait for executing synchronous update handlers on a workflow.
305#[doc(hidden)]
306pub trait ExecutableSyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
307    /// Handle an update with the given input and return the result.
308    /// Returning an error will cause the update to fail with that error message.
309    fn handle(
310        &mut self,
311        ctx: &mut SyncWorkflowContext<Self>,
312        input: U::Input,
313    ) -> Result<U::Output, Box<dyn std::error::Error + Send + Sync>>;
314
315    /// Validate an update before it is applied.
316    fn validate(
317        &self,
318        _ctx: &WorkflowContextView,
319        _input: &U::Input,
320    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
321        Ok(())
322    }
323
324    /// Dispatch the update with payload deserialization and output serialization.
325    fn dispatch(
326        ctx: WorkflowContext<Self>,
327        payloads: Payloads,
328        converter: &PayloadConverter,
329    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
330        let input = match deserialize_input::<U::Input>(payloads.payloads, converter) {
331            Ok(v) => v,
332            Err(e) => return std::future::ready(Err(e)).boxed_local(),
333        };
334        let converter = converter.clone();
335        let mut sync_ctx = ctx.sync_context();
336        let result = ctx.state_mut(|wf| Self::handle(wf, &mut sync_ctx, input));
337        match result {
338            Ok(output) => match serialize_output(&output, &converter) {
339                Ok(payload) => std::future::ready(Ok(payload)).boxed_local(),
340                Err(e) => std::future::ready(Err(e)).boxed_local(),
341            },
342            Err(e) => std::future::ready(Err(wrap_handler_error(e))).boxed_local(),
343        }
344    }
345
346    /// Dispatch validation with payload deserialization.
347    fn dispatch_validate(
348        &self,
349        ctx: &WorkflowContextView,
350        payloads: &Payloads,
351        converter: &PayloadConverter,
352    ) -> Result<(), WorkflowError> {
353        let input = deserialize_input::<U::Input>(payloads.payloads.clone(), converter)?;
354        self.validate(ctx, &input).map_err(wrap_handler_error)
355    }
356}
357
358/// Trait for executing asynchronous update handlers on a workflow.
359#[doc(hidden)]
360pub trait ExecutableAsyncUpdate<U: UpdateDefinition>: WorkflowImplementation {
361    /// Handle an update with the given input and return the result.
362    /// Returning an error will cause the update to fail with that error message.
363    fn handle(
364        ctx: WorkflowContext<Self>,
365        input: U::Input,
366    ) -> LocalBoxFuture<'static, Result<U::Output, Box<dyn std::error::Error + Send + Sync>>>;
367
368    /// Validate an update before it is applied.
369    fn validate(
370        &self,
371        _ctx: &WorkflowContextView,
372        _input: &U::Input,
373    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
374        Ok(())
375    }
376
377    /// Dispatch the update with payload deserialization and output serialization.
378    fn dispatch(
379        ctx: WorkflowContext<Self>,
380        payloads: Payloads,
381        converter: &PayloadConverter,
382    ) -> LocalBoxFuture<'static, Result<Payload, WorkflowError>> {
383        let input = match deserialize_input::<U::Input>(payloads.payloads, converter) {
384            Ok(v) => v,
385            Err(e) => return std::future::ready(Err(e)).boxed_local(),
386        };
387        let converter = converter.clone();
388        async move {
389            let output = Self::handle(ctx, input).await.map_err(wrap_handler_error)?;
390            serialize_output(&output, &converter)
391        }
392        .boxed_local()
393    }
394
395    /// Dispatch validation with payload deserialization.
396    fn dispatch_validate(
397        &self,
398        ctx: &WorkflowContextView,
399        payloads: &Payloads,
400        converter: &PayloadConverter,
401    ) -> Result<(), WorkflowError> {
402        let input = deserialize_input::<U::Input>(payloads.payloads.clone(), converter)?;
403        self.validate(ctx, &input).map_err(wrap_handler_error)
404    }
405}
406
407/// Data passed to handler dispatch methods (signals, updates, queries).
408pub(crate) struct DispatchData<'a> {
409    pub(crate) payloads: Payloads,
410    pub(crate) headers: HashMap<String, Payload>,
411    pub(crate) converter: &'a PayloadConverter,
412}
413
414/// Trait implemented by workflow types to enable registration with workers.
415///
416/// This trait is automatically generated by the `#[workflow_methods]` macro.
417#[doc(hidden)]
418pub trait WorkflowImplementer: WorkflowImplementation {
419    /// Register this workflow and all its handlers with the given definitions container.
420    fn register_all(defs: &mut WorkflowDefinitions);
421}
422
423/// Type-erased trait for workflow execution instances.
424pub(crate) trait DynWorkflowExecution {
425    /// Poll the run future.
426    fn poll_run(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<Payload, WorkflowTermination>>;
427
428    /// Validate an update request. Returns `None` if no handler.
429    fn validate_update(&self, name: &str, data: &DispatchData)
430    -> Option<Result<(), WorkflowError>>;
431
432    /// Start an update handler. Returns `None` if no handler for that name.
433    fn start_update(
434        &mut self,
435        name: &str,
436        data: DispatchData,
437    ) -> Option<LocalBoxFuture<'static, Result<Payload, WorkflowError>>>;
438
439    /// Dispatch a signal by name. Returns `None` if no handler.
440    fn dispatch_signal(
441        &mut self,
442        name: &str,
443        data: DispatchData,
444    ) -> Option<LocalBoxFuture<'static, Result<(), WorkflowError>>>;
445
446    /// Dispatch a query by name. Returns `None` if no handler.
447    fn dispatch_query(
448        &self,
449        name: &str,
450        data: DispatchData,
451    ) -> Option<Result<Payload, WorkflowError>>;
452}
453
454/// Manages a workflow execution, holding the context and run future.
455pub(crate) struct WorkflowExecution<W: WorkflowImplementation> {
456    ctx: WorkflowContext<W>,
457    run_future: Fuse<LocalBoxFuture<'static, Result<Payload, WorkflowTermination>>>,
458}
459
460impl<W: WorkflowImplementation> WorkflowExecution<W>
461where
462    <W::Run as WorkflowDefinition>::Input: Send,
463{
464    /// Create a new workflow execution using the workflow's `init` method.
465    pub(crate) fn new(
466        base_ctx: BaseWorkflowContext,
467        init_input: Option<<W::Run as WorkflowDefinition>::Input>,
468        run_input: Option<<W::Run as WorkflowDefinition>::Input>,
469    ) -> Self {
470        let view = base_ctx.view();
471        let workflow = W::init(view, init_input);
472        Self::new_with_workflow(workflow, base_ctx, run_input)
473    }
474
475    /// Create a new workflow execution from an already-created workflow instance.
476    pub(crate) fn new_with_workflow(
477        workflow: W,
478        base_ctx: BaseWorkflowContext,
479        run_input: Option<<W::Run as WorkflowDefinition>::Input>,
480    ) -> Self {
481        let workflow = Rc::new(RefCell::new(workflow));
482        let ctx = WorkflowContext::from_base(base_ctx, workflow);
483        let run_future = W::run(ctx.clone(), run_input).fuse();
484
485        Self { ctx, run_future }
486    }
487}
488
489impl<W: WorkflowImplementation> DynWorkflowExecution for WorkflowExecution<W> {
490    fn poll_run(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<Payload, WorkflowTermination>> {
491        Pin::new(&mut self.run_future).poll(cx)
492    }
493
494    fn validate_update(
495        &self,
496        name: &str,
497        data: &DispatchData,
498    ) -> Option<Result<(), WorkflowError>> {
499        let view = self.ctx.view();
500        self.ctx
501            .state(|wf| wf.validate_update(view, name, &data.payloads, data.converter))
502    }
503
504    fn start_update(
505        &mut self,
506        name: &str,
507        data: DispatchData,
508    ) -> Option<LocalBoxFuture<'static, Result<Payload, WorkflowError>>> {
509        let ctx = self.ctx.with_headers(data.headers);
510        W::dispatch_update(ctx, name, data.payloads, data.converter)
511    }
512
513    fn dispatch_signal(
514        &mut self,
515        name: &str,
516        data: DispatchData,
517    ) -> Option<LocalBoxFuture<'static, Result<(), WorkflowError>>> {
518        let ctx = self.ctx.with_headers(data.headers);
519        W::dispatch_signal(ctx, name, data.payloads, data.converter)
520    }
521
522    fn dispatch_query(
523        &self,
524        name: &str,
525        data: DispatchData,
526    ) -> Option<Result<Payload, WorkflowError>> {
527        let view = self.ctx.view();
528        self.ctx
529            .state(|wf| wf.dispatch_query(view, name, &data.payloads, data.converter))
530    }
531}
532
533/// Type alias for workflow execution factory functions.
534///
535/// Creates a new `WorkflowExecution` instance from the input payloads and context.
536pub(crate) type WorkflowExecutionFactory = Arc<
537    dyn Fn(
538            Vec<Payload>,
539            PayloadConverter,
540            BaseWorkflowContext,
541        ) -> Result<Box<dyn DynWorkflowExecution>, PayloadConversionError>
542        + Send
543        + Sync,
544>;
545
546/// Contains workflow registrations in a form ready for execution by workers.
547#[derive(Default, Clone)]
548pub struct WorkflowDefinitions {
549    /// Maps workflow type name to execution factories
550    workflows: HashMap<&'static str, WorkflowExecutionFactory>,
551}
552
553impl WorkflowDefinitions {
554    /// Creates a new empty `WorkflowDefinitions`.
555    pub fn new() -> Self {
556        Self::default()
557    }
558
559    /// Register a workflow implementation.
560    pub fn register_workflow<W: WorkflowImplementer>(&mut self) -> &mut Self {
561        W::register_all(self);
562        self
563    }
564
565    /// Register a specific workflow's run method.
566    #[doc(hidden)]
567    pub fn register_workflow_run<W: WorkflowImplementation>(&mut self) -> &mut Self
568    where
569        <W::Run as WorkflowDefinition>::Input: Send,
570    {
571        let workflow_name = W::name();
572        let factory: WorkflowExecutionFactory =
573            Arc::new(move |payloads, converter: PayloadConverter, base_ctx| {
574                let ser_ctx = SerializationContext {
575                    data: &SerializationContextData::Workflow,
576                    converter: &converter,
577                };
578                let input = converter.from_payloads(&ser_ctx, payloads)?;
579                let (init_input, run_input) = if W::INIT_TAKES_INPUT {
580                    (Some(input), None)
581                } else {
582                    (None, Some(input))
583                };
584                Ok(
585                    Box::new(WorkflowExecution::<W>::new(base_ctx, init_input, run_input))
586                        as Box<dyn DynWorkflowExecution>,
587                )
588            });
589        self.workflows.insert(workflow_name, factory);
590        self
591    }
592
593    /// Register a workflow with a custom factory for instance creation.
594    pub fn register_workflow_run_with_factory<W, F>(&mut self, user_factory: F) -> &mut Self
595    where
596        W: WorkflowImplementation,
597        <W::Run as WorkflowDefinition>::Input: Send,
598        F: Fn() -> W + Send + Sync + 'static,
599    {
600        assert!(
601            !W::HAS_INIT,
602            "Workflows registered with a factory must not define an #[init] method. \
603             The factory replaces init for instance creation."
604        );
605
606        let workflow_name = W::name();
607        let user_factory = Arc::new(user_factory);
608        let factory: WorkflowExecutionFactory =
609            Arc::new(move |payloads, converter: PayloadConverter, base_ctx| {
610                let ser_ctx = SerializationContext {
611                    data: &SerializationContextData::Workflow,
612                    converter: &converter,
613                };
614                let input: <W::Run as WorkflowDefinition>::Input =
615                    converter.from_payloads(&ser_ctx, payloads)?;
616
617                // User factory creates the instance - input always goes to run()
618                let workflow = user_factory();
619                Ok(Box::new(WorkflowExecution::<W>::new_with_workflow(
620                    workflow,
621                    base_ctx,
622                    Some(input),
623                )) as Box<dyn DynWorkflowExecution>)
624            });
625
626        self.workflows.insert(workflow_name, factory);
627        self
628    }
629
630    /// Check if any workflows are registered.
631    pub fn is_empty(&self) -> bool {
632        self.workflows.is_empty()
633    }
634
635    /// Get the workflow execution factory for a given workflow type.
636    pub(crate) fn get_workflow(&self, workflow_type: &str) -> Option<WorkflowExecutionFactory> {
637        self.workflows.get(workflow_type).cloned()
638    }
639
640    /// Returns an iterator over registered workflow type names.
641    pub fn workflow_types(&self) -> impl Iterator<Item = &'static str> + '_ {
642        self.workflows.keys().copied()
643    }
644}
645
646impl Debug for WorkflowDefinitions {
647    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
648        f.debug_struct("WorkflowDefinitions")
649            .field("workflows", &self.workflows.keys().collect::<Vec<_>>())
650            .finish()
651    }
652}
653
654/// Deserialize handler input from payloads.
655pub fn deserialize_input<I: TemporalDeserializable + 'static>(
656    payloads: Vec<Payload>,
657    converter: &PayloadConverter,
658) -> Result<I, WorkflowError> {
659    let ctx = SerializationContext {
660        data: &SerializationContextData::Workflow,
661        converter,
662    };
663    converter.from_payloads(&ctx, payloads).map_err(Into::into)
664}
665
666/// Serialize handler output to a payload.
667pub fn serialize_output<O: TemporalSerializable + 'static>(
668    output: &O,
669    converter: &PayloadConverter,
670) -> Result<Payload, WorkflowError> {
671    let ctx = SerializationContext {
672        data: &SerializationContextData::Workflow,
673        converter,
674    };
675    converter.to_payload(&ctx, output).map_err(Into::into)
676}
677
678/// Wrap a handler error into WorkflowError.
679pub fn wrap_handler_error(e: Box<dyn std::error::Error + Send + Sync>) -> WorkflowError {
680    WorkflowError::Execution(anyhow::anyhow!(e))
681}
682
683/// Serialize a workflow result value to a payload.
684pub fn serialize_result<T: TemporalSerializable + 'static>(
685    result: T,
686    converter: &PayloadConverter,
687) -> Result<Payload, WorkflowError> {
688    serialize_output(&result, converter)
689}
690
691/// Deterministic `join_all` for use in Temporal workflows.
692///
693/// Polls a collection of futures concurrently to completion in declaration order,
694/// returning a `Vec` of their results.
695///
696/// # Example
697///
698/// ```ignore
699/// use temporalio_sdk::workflows::join_all;
700/// use temporalio_sdk::WorkflowContext;
701/// use std::time::Duration;
702///
703/// # async fn hidden(ctx: &mut WorkflowContext<()>) {
704/// let timers = vec![
705///     ctx.timer(Duration::from_secs(1)),
706///     ctx.timer(Duration::from_secs(2)),
707/// ];
708/// let results = join_all(timers).await;
709/// # }
710/// ```
711pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
712where
713    I: IntoIterator,
714    I::Item: std::future::Future,
715{
716    JoinAll(SdkGuardedFuture(futures_util::future::join_all(iter)))
717}
718
719/// Future returned by [`join_all`].
720pub struct JoinAll<F: std::future::Future>(SdkGuardedFuture<futures_util::future::JoinAll<F>>);
721
722impl<F: std::future::Future> std::future::Future for JoinAll<F> {
723    type Output = Vec<F::Output>;
724
725    fn poll(
726        mut self: std::pin::Pin<&mut Self>,
727        cx: &mut std::task::Context<'_>,
728    ) -> std::task::Poll<Self::Output> {
729        self.0.poll_unpin(cx)
730    }
731}