Skip to main content

temporalio_workflow/workflow_context/
options.rs

1use std::{collections::HashMap, time::Duration};
2
3use crate::runtime::types::ContinueAsNewRequest;
4use temporalio_common_wasm::{
5    Priority,
6    data_converters::{
7        GenericPayloadConverter, PayloadConverter, SerializationContext, SerializationContextData,
8    },
9    protos::{
10        coresdk::{
11            child_workflow::{ChildWorkflowCancellationType, ParentClosePolicy},
12            common::VersioningIntent,
13            nexus::NexusOperationCancellationType,
14            workflow_activation::SignalWorkflow,
15            workflow_commands::{
16                ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity,
17                ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution,
18                StartTimer, WorkflowCommand, workflow_command,
19            },
20        },
21        temporal::api::{
22            common::v1::{Payload, RetryPolicy, SearchAttributes},
23            enums::v1::{
24                ContinueAsNewVersioningBehavior as ProtoContinueAsNewVersioningBehavior,
25                WorkflowIdReusePolicy,
26            },
27            sdk::v1::UserMetadata,
28        },
29    },
30};
31/// Options for scheduling an activity
32#[derive(Debug, bon::Builder, Clone)]
33#[non_exhaustive]
34#[builder(start_fn = with_close_timeouts, on(String, into), state_mod(vis = "pub"))]
35pub struct ActivityOptions {
36    /// Timeouts for activity completion.
37    ///
38    /// See [`ActivityCloseTimeouts`] for the meaning of each timeout variant.
39    #[builder(start_fn)]
40    pub close_timeouts: ActivityCloseTimeouts,
41    /// Identifier to use for tracking the activity in Workflow history.
42    /// The `activityId` can be accessed by the activity function.
43    /// Does not need to be unique.
44    ///
45    /// If `None` use the context's sequence number
46    pub activity_id: Option<String>,
47    /// Task queue to schedule the activity in
48    ///
49    /// If `None`, use the same task queue as the parent workflow.
50    pub task_queue: Option<String>,
51    /// Time that the Activity Task can stay in the Task Queue before it is picked up by a Worker.
52    /// Do not specify this timeout unless using host specific Task Queues for Activity Tasks are
53    /// being used for routing.
54    /// `schedule_to_start_timeout` is always non-retryable.
55    /// Retrying after this timeout doesn't make sense as it would just put the Activity Task back
56    /// into the same Task Queue.
57    pub schedule_to_start_timeout: Option<Duration>,
58    /// Heartbeat interval. Activity must heartbeat before this interval passes after a last
59    /// heartbeat or activity start.
60    pub heartbeat_timeout: Option<Duration>,
61    /// Determines what the SDK does when the Activity is cancelled.
62    #[builder(default)]
63    pub cancellation_type: ActivityCancellationType,
64    /// Activity retry policy
65    pub retry_policy: Option<RetryPolicy>,
66    /// Summary of the activity
67    pub summary: Option<String>,
68    /// Priority for the activity
69    pub priority: Option<Priority>,
70    /// If true, disable eager execution for this activity
71    #[builder(default)]
72    pub do_not_eagerly_execute: bool,
73}
74
75impl ActivityOptions {
76    /// Returns a builder with `close_timeout` set to [`ActivityCloseTimeouts::StartToClose`].
77    pub fn with_start_to_close_timeout(duration: Duration) -> ActivityOptionsBuilder {
78        Self::with_close_timeouts(ActivityCloseTimeouts::StartToClose(duration))
79    }
80
81    /// Returns a builder with `close_timeout` set to [`ActivityCloseTimeouts::ScheduleToClose`].
82    pub fn with_schedule_to_close_timeout(duration: Duration) -> ActivityOptionsBuilder {
83        Self::with_close_timeouts(ActivityCloseTimeouts::ScheduleToClose(duration))
84    }
85
86    /// Creates activity options with only `start_to_close_timeout` set.
87    ///
88    /// If you need additional fields set, use [`Self::with_start_to_close_timeout`].
89    pub fn start_to_close_timeout(duration: Duration) -> Self {
90        Self::with_start_to_close_timeout(duration).build()
91    }
92
93    /// Creates activity options with only `schedule_to_close_timeout` set.
94    ///
95    /// If you need additional fields set, use [`Self::with_schedule_to_close_timeout`].
96    pub fn schedule_to_close_timeout(duration: Duration) -> Self {
97        Self::with_schedule_to_close_timeout(duration).build()
98    }
99}
100
101/// The timeouts applied to an activity's completion.
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum ActivityCloseTimeouts {
104    /// Total time that a workflow is willing to wait for Activity to complete.
105    /// `ActivityCloseTimeouts::ScheduleToClose` limits the total time of an Activity's execution
106    /// including retries (use `ActivityCloseTimeouts::StartToClose` to limit the time of a single
107    /// attempt).
108    ScheduleToClose(Duration),
109    /// Maximum time of a single Activity execution attempt. Note that the Temporal Server doesn't
110    /// detect Worker process failures directly. It relies on this timeout to detect that an
111    /// Activity that didn't complete on time. So this timeout should be as short as the longest
112    /// possible execution of the Activity body. Potentially long running Activities must specify
113    /// `ActivityOptions::heartbeat_timeout` and heartbeat from the activity periodically for timely
114    /// failure detection.
115    StartToClose(Duration),
116    /// Applies both execution-attempt and overall-completion bounds.
117    Both {
118        /// Maximum time of a single Activity execution attempt.
119        start_to_close: Duration,
120        /// Total time that a workflow is willing to wait for Activity to complete.
121        schedule_to_close: Duration,
122    },
123}
124
125impl ActivityCloseTimeouts {
126    fn into_durations(self) -> (Option<Duration>, Option<Duration>) {
127        match self {
128            Self::ScheduleToClose(schedule_to_close) => (None, Some(schedule_to_close)),
129            Self::StartToClose(start_to_close) => (Some(start_to_close), None),
130            Self::Both {
131                start_to_close,
132                schedule_to_close,
133            } => (Some(start_to_close), Some(schedule_to_close)),
134        }
135    }
136}
137
138impl ActivityOptions {
139    pub(crate) fn into_command(
140        self,
141        seq: u32,
142        activity_type: String,
143        args: Vec<Payload>,
144    ) -> WorkflowCommand {
145        let (start_to_close_timeout, schedule_to_close_timeout) =
146            self.close_timeouts.into_durations();
147        command_with_metadata(
148            workflow_command::Variant::ScheduleActivity(ScheduleActivity {
149                seq,
150                activity_type,
151                activity_id: self.activity_id.unwrap_or_else(|| seq.to_string()),
152                task_queue: self.task_queue.unwrap_or_default(),
153                arguments: args,
154                schedule_to_close_timeout: schedule_to_close_timeout
155                    .and_then(|duration| duration.try_into().ok()),
156                schedule_to_start_timeout: self
157                    .schedule_to_start_timeout
158                    .and_then(|duration| duration.try_into().ok()),
159                start_to_close_timeout: start_to_close_timeout
160                    .and_then(|duration| duration.try_into().ok()),
161                heartbeat_timeout: self
162                    .heartbeat_timeout
163                    .and_then(|duration| duration.try_into().ok()),
164                cancellation_type: self.cancellation_type.into(),
165                retry_policy: self.retry_policy,
166                priority: self.priority.map(Into::into),
167                do_not_eagerly_execute: self.do_not_eagerly_execute,
168                ..Default::default()
169            }),
170            self.summary,
171            None,
172        )
173    }
174}
175
176/// Options for scheduling a local activity
177#[derive(Default, Debug, Clone)]
178pub struct LocalActivityOptions {
179    /// Identifier to use for tracking the activity in Workflow history.
180    /// The `activityId` can be accessed by the activity function.
181    /// Does not need to be unique.
182    ///
183    /// If `None` use the context's sequence number
184    pub activity_id: Option<String>,
185    /// Retry policy
186    pub retry_policy: RetryPolicy,
187    /// Override attempt number rather than using 1.
188    /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
189    pub attempt: Option<u32>,
190    /// Override schedule time when doing timer backoff.
191    /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
192    pub original_schedule_time: Option<prost_types::Timestamp>,
193    /// Retry backoffs over this amount will use a timer rather than a local retry
194    pub timer_backoff_threshold: Option<Duration>,
195    /// How the activity will cancel
196    pub cancel_type: ActivityCancellationType,
197    /// Indicates how long the caller is willing to wait for local activity completion. Limits how
198    /// long retries will be attempted. When not specified defaults to the workflow execution
199    /// timeout (which may be unset).
200    pub schedule_to_close_timeout: Option<Duration>,
201    /// Limits time the local activity can idle internally before being executed. That can happen if
202    /// the worker is currently at max concurrent local activity executions. This timeout is always
203    /// non retryable as all a retry would achieve is to put it back into the same queue. Defaults
204    /// to `schedule_to_close_timeout` if not specified and that is set. Must be <=
205    /// `schedule_to_close_timeout` when set, if not, it will be clamped down.
206    pub schedule_to_start_timeout: Option<Duration>,
207    /// Maximum time the local activity is allowed to execute after the task is dispatched. This
208    /// timeout is always retryable. Either or both of `schedule_to_close_timeout` and this must be
209    /// specified. If set, this must be <= `schedule_to_close_timeout`, if not, it will be clamped
210    /// down.
211    pub start_to_close_timeout: Option<Duration>,
212    /// Single-line summary for this activity that will appear in UI/CLI.
213    pub summary: Option<String>,
214}
215
216impl LocalActivityOptions {
217    pub(crate) fn into_command(
218        mut self,
219        seq: u32,
220        activity_type: String,
221        args: Vec<Payload>,
222    ) -> WorkflowCommand {
223        // Tests and some workflow code rely on the historical SDK behavior where omitted local
224        // activity timeouts are normalized before the command is emitted.
225        self.schedule_to_close_timeout
226            .get_or_insert(Duration::from_secs(100));
227        command_with_metadata(
228            workflow_command::Variant::ScheduleLocalActivity(ScheduleLocalActivity {
229                seq,
230                activity_type,
231                activity_id: self.activity_id.unwrap_or_else(|| seq.to_string()),
232                arguments: args,
233                retry_policy: Some(self.retry_policy),
234                attempt: self.attempt.unwrap_or(1),
235                original_schedule_time: self.original_schedule_time,
236                local_retry_threshold: self
237                    .timer_backoff_threshold
238                    .and_then(|duration| duration.try_into().ok()),
239                cancellation_type: self.cancel_type.into(),
240                schedule_to_close_timeout: self
241                    .schedule_to_close_timeout
242                    .and_then(|duration| duration.try_into().ok()),
243                schedule_to_start_timeout: self
244                    .schedule_to_start_timeout
245                    .and_then(|duration| duration.try_into().ok()),
246                start_to_close_timeout: self
247                    .start_to_close_timeout
248                    .and_then(|duration| duration.try_into().ok()),
249                ..Default::default()
250            }),
251            self.summary,
252            None,
253        )
254    }
255}
256
257/// Options for scheduling a child workflow
258#[derive(Default, Debug, Clone)]
259pub struct ChildWorkflowOptions {
260    /// Workflow ID
261    pub workflow_id: String,
262    /// Task queue to schedule the workflow in
263    ///
264    /// If `None`, use the same task queue as the parent workflow.
265    pub task_queue: Option<String>,
266    /// Cancellation strategy for the child workflow
267    pub cancel_type: ChildWorkflowCancellationType,
268    /// How to respond to parent workflow ending
269    pub parent_close_policy: ParentClosePolicy,
270    /// Static summary of the child workflow
271    pub static_summary: Option<String>,
272    /// Static details of the child workflow
273    pub static_details: Option<String>,
274    /// Set the policy for reusing the workflow id
275    pub id_reuse_policy: WorkflowIdReusePolicy,
276    /// Optionally set the execution timeout for the workflow
277    pub execution_timeout: Option<Duration>,
278    /// Optionally indicates the default run timeout for a workflow run
279    pub run_timeout: Option<Duration>,
280    /// Optionally indicates the default task timeout for a workflow run
281    pub task_timeout: Option<Duration>,
282    /// Optionally set a cron schedule for the workflow
283    pub cron_schedule: Option<String>,
284    /// Optionally associate extra search attributes with a workflow
285    pub search_attributes: Option<HashMap<String, Payload>>,
286    /// Priority for the workflow
287    pub priority: Option<Priority>,
288}
289
290impl ChildWorkflowOptions {
291    pub(crate) fn into_command(
292        self,
293        seq: u32,
294        workflow_type: String,
295        args: Vec<Payload>,
296    ) -> WorkflowCommand {
297        command_with_metadata(
298            workflow_command::Variant::StartChildWorkflowExecution(StartChildWorkflowExecution {
299                seq,
300                workflow_type,
301                workflow_id: self.workflow_id,
302                task_queue: self.task_queue.unwrap_or_default(),
303                input: args,
304                cancellation_type: self.cancel_type.into(),
305                parent_close_policy: self.parent_close_policy.into(),
306                workflow_id_reuse_policy: match self.id_reuse_policy {
307                    WorkflowIdReusePolicy::Unspecified => WorkflowIdReusePolicy::AllowDuplicate,
308                    policy => policy,
309                }
310                .into(),
311                workflow_execution_timeout: self
312                    .execution_timeout
313                    .and_then(|duration| duration.try_into().ok()),
314                workflow_run_timeout: self
315                    .run_timeout
316                    .and_then(|duration| duration.try_into().ok()),
317                workflow_task_timeout: self
318                    .task_timeout
319                    .and_then(|duration| duration.try_into().ok()),
320                cron_schedule: self.cron_schedule.unwrap_or_default(),
321                search_attributes: self.search_attributes.and_then(|attrs| {
322                    (!attrs.is_empty()).then_some(SearchAttributes {
323                        indexed_fields: attrs,
324                    })
325                }),
326                priority: self.priority.map(Into::into),
327                ..Default::default()
328            }),
329            self.static_summary,
330            self.static_details,
331        )
332    }
333}
334
335/// Information needed to send a specific signal
336#[derive(Debug)]
337pub struct Signal {
338    /// The signal name
339    pub signal_name: String,
340    /// The data the signal carries
341    pub data: SignalData,
342}
343
344impl Signal {
345    /// Create a new signal
346    pub fn new(
347        name: impl Into<String>,
348        input: impl IntoIterator<Item = impl Into<Payload>>,
349    ) -> Self {
350        Self {
351            signal_name: name.into(),
352            data: SignalData::new(input),
353        }
354    }
355
356    pub(crate) fn into_invocation(self) -> SignalWorkflow {
357        SignalWorkflow {
358            signal_name: self.signal_name,
359            input: self.data.input,
360            identity: String::new(),
361            headers: self.data.headers,
362        }
363    }
364}
365
366/// Data contained within a signal
367#[derive(Default, Debug)]
368pub struct SignalData {
369    /// The arguments the signal will receive
370    pub input: Vec<Payload>,
371    /// Metadata attached to the signal
372    pub headers: HashMap<String, Payload>,
373}
374
375impl SignalData {
376    /// Create data for a signal
377    pub fn new(input: impl IntoIterator<Item = impl Into<Payload>>) -> Self {
378        Self {
379            input: input.into_iter().map(Into::into).collect(),
380            headers: HashMap::new(),
381        }
382    }
383
384    /// Set a header k/v pair attached to the signal
385    pub fn with_header(
386        &mut self,
387        key: impl Into<String>,
388        payload: impl Into<Payload>,
389    ) -> &mut Self {
390        self.headers.insert(key.into(), payload.into());
391        self
392    }
393}
394
395/// Options for timer
396#[derive(Default, Debug, Clone)]
397pub struct TimerOptions {
398    /// Duration for the timer
399    pub duration: Duration,
400    /// Summary of the timer
401    pub summary: Option<String>,
402}
403
404impl From<Duration> for TimerOptions {
405    fn from(duration: Duration) -> Self {
406        TimerOptions {
407            duration,
408            ..Default::default()
409        }
410    }
411}
412
413impl TimerOptions {
414    pub(crate) fn into_command(self, seq: u32) -> WorkflowCommand {
415        command_with_metadata(
416            workflow_command::Variant::StartTimer(StartTimer {
417                seq,
418                start_to_fire_timeout: Some(
419                    self.duration
420                        .try_into()
421                        .expect("workflow timer timeout must fit into protobuf duration"),
422                ),
423            }),
424            self.summary,
425            None,
426        )
427    }
428}
429
430/// Options for Nexus Operations
431#[derive(Default, Debug, Clone)]
432pub struct NexusOperationOptions {
433    /// Endpoint name, must exist in the endpoint registry or this command will fail.
434    pub endpoint: String,
435    /// Service name.
436    pub service: String,
437    /// Operation name.
438    pub operation: String,
439    /// Input for the operation. The server converts this into Nexus request content and the
440    /// appropriate content headers internally when sending the StartOperation request. On the
441    /// handler side, if it is also backed by Temporal, the content is transformed back to the
442    /// original Payload sent in this command.
443    pub input: Option<Payload>,
444    /// Schedule-to-close timeout for this operation.
445    /// Indicates how long the caller is willing to wait for operation completion.
446    /// Calls are retried internally by the server.
447    pub schedule_to_close_timeout: Option<Duration>,
448    /// Header to attach to the Nexus request.
449    /// Users are responsible for encrypting sensitive data in this header as it is stored in
450    /// workflow history and transmitted to external services as-is. This is useful for propagating
451    /// tracing information. Note these headers are not the same as Temporal headers on internal
452    /// activities and child workflows, these are transmitted to Nexus operations that may be
453    /// external and are not traditional payloads.
454    pub nexus_header: HashMap<String, String>,
455    /// Cancellation type for the operation
456    pub cancellation_type: Option<NexusOperationCancellationType>,
457    /// Schedule-to-start timeout for this operation.
458    /// Indicates how long the caller is willing to wait for the operation to be started (or completed if synchronous)
459    /// by the handler. If the operation is not started within this timeout, it will fail with
460    /// TIMEOUT_TYPE_SCHEDULE_TO_START.
461    /// If not set or zero, no schedule-to-start timeout is enforced.
462    pub schedule_to_start_timeout: Option<Duration>,
463    /// Start-to-close timeout for this operation.
464    /// Indicates how long the caller is willing to wait for an asynchronous operation to complete after it has been
465    /// started. If the operation does not complete within this timeout after starting, it will fail with
466    /// TIMEOUT_TYPE_START_TO_CLOSE.
467    /// Only applies to asynchronous operations. Synchronous operations ignore this timeout.
468    /// If not set or zero, no start-to-close timeout is enforced.
469    pub start_to_close_timeout: Option<Duration>,
470}
471
472impl NexusOperationOptions {
473    pub(crate) fn into_command(self, seq: u32) -> WorkflowCommand {
474        workflow_command::Variant::ScheduleNexusOperation(ScheduleNexusOperation {
475            seq,
476            endpoint: self.endpoint,
477            service: self.service,
478            operation: self.operation,
479            input: self.input,
480            schedule_to_close_timeout: self
481                .schedule_to_close_timeout
482                .and_then(|duration| duration.try_into().ok()),
483            schedule_to_start_timeout: self
484                .schedule_to_start_timeout
485                .and_then(|duration| duration.try_into().ok()),
486            start_to_close_timeout: self
487                .start_to_close_timeout
488                .and_then(|duration| duration.try_into().ok()),
489            nexus_header: self.nexus_header,
490            cancellation_type: self
491                .cancellation_type
492                .unwrap_or(NexusOperationCancellationType::WaitCancellationCompleted)
493                .into(),
494        })
495        .into()
496    }
497}
498
499/// Versioning behavior to use for the first workflow task of a new continue-as-new run.
500#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
501#[non_exhaustive]
502pub enum ContinueAsNewVersioningBehavior {
503    /// No initial versioning behavior was specified.
504    #[default]
505    Unspecified,
506    /// Start the new run with AutoUpgrade behavior.
507    AutoUpgrade,
508    /// Start the new run on the task queue's ramping deployment version.
509    UseRampingVersion,
510}
511
512impl From<ContinueAsNewVersioningBehavior> for ProtoContinueAsNewVersioningBehavior {
513    fn from(value: ContinueAsNewVersioningBehavior) -> Self {
514        match value {
515            ContinueAsNewVersioningBehavior::Unspecified => {
516                ProtoContinueAsNewVersioningBehavior::Unspecified
517            }
518            ContinueAsNewVersioningBehavior::AutoUpgrade => {
519                ProtoContinueAsNewVersioningBehavior::AutoUpgrade
520            }
521            ContinueAsNewVersioningBehavior::UseRampingVersion => {
522                ProtoContinueAsNewVersioningBehavior::UseRampingVersion
523            }
524        }
525    }
526}
527
528impl From<ProtoContinueAsNewVersioningBehavior> for ContinueAsNewVersioningBehavior {
529    fn from(value: ProtoContinueAsNewVersioningBehavior) -> Self {
530        match value {
531            ProtoContinueAsNewVersioningBehavior::Unspecified => {
532                ContinueAsNewVersioningBehavior::Unspecified
533            }
534            ProtoContinueAsNewVersioningBehavior::AutoUpgrade => {
535                ContinueAsNewVersioningBehavior::AutoUpgrade
536            }
537            ProtoContinueAsNewVersioningBehavior::UseRampingVersion => {
538                ContinueAsNewVersioningBehavior::UseRampingVersion
539            }
540        }
541    }
542}
543
544/// Options for continuing a workflow as a new execution.
545///
546/// Unset fields inherit the current workflow's values where applicable.
547#[derive(Default, Debug, bon::Builder)]
548#[non_exhaustive]
549pub struct ContinueAsNewOptions {
550    /// Override the workflow type for the new execution. If `None`, reuses the current type.
551    pub workflow_type: Option<String>,
552    /// Task queue for the new execution. If `None`, reuses the current task queue.
553    pub task_queue: Option<String>,
554    /// Timeout for a single run of the new workflow.
555    pub run_timeout: Option<Duration>,
556    /// Timeout of a single workflow task.
557    pub task_timeout: Option<Duration>,
558    /// Delay before the first workflow task of the continued run is scheduled.
559    pub backoff_start_interval: Option<Duration>,
560    /// If set, the new workflow will have this memo. If `None`, reuses the current memo.
561    pub memo: Option<HashMap<String, Payload>>,
562    /// If set, the new workflow will have these headers.
563    pub headers: Option<HashMap<String, Payload>>,
564    /// If set, the new workflow will have these search attributes. If `None`, reuses the current
565    /// search attributes.
566    pub search_attributes: Option<SearchAttributes>,
567    /// If set, the new workflow will have this retry policy. If `None`, reuses the current policy.
568    pub retry_policy: Option<RetryPolicy>,
569    /// Whether the new workflow should run on a worker with a compatible build id.
570    pub versioning_intent: Option<VersioningIntent>,
571    /// Versioning behavior to use for the first workflow task of the new run.
572    ///
573    /// This experimental option is only meaningful for workers using worker deployment
574    /// versioning. `AutoUpgrade` routes the new run to the current deployment version;
575    /// `UseRampingVersion` routes it to the ramping deployment version when one is configured.
576    pub initial_versioning_behavior: Option<ContinueAsNewVersioningBehavior>,
577}
578
579impl ContinueAsNewOptions {
580    pub(crate) fn into_request(
581        self,
582        workflow_type: String,
583        arguments: Vec<Payload>,
584    ) -> ContinueAsNewRequest {
585        ContinueAsNewWorkflowExecution {
586            workflow_type: self.workflow_type.unwrap_or(workflow_type),
587            task_queue: self.task_queue.unwrap_or_default(),
588            arguments,
589            workflow_run_timeout: self
590                .run_timeout
591                .and_then(|duration| duration.try_into().ok()),
592            workflow_task_timeout: self
593                .task_timeout
594                .and_then(|duration| duration.try_into().ok()),
595            backoff_start_interval: self
596                .backoff_start_interval
597                .and_then(|duration| duration.try_into().ok()),
598            memo: self.memo.unwrap_or_default(),
599            headers: self.headers.unwrap_or_default(),
600            search_attributes: self.search_attributes,
601            retry_policy: self.retry_policy,
602            versioning_intent: self
603                .versioning_intent
604                .unwrap_or(VersioningIntent::Unspecified)
605                .into(),
606            initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::from(
607                self.initial_versioning_behavior
608                    .unwrap_or(ContinueAsNewVersioningBehavior::Unspecified),
609            )
610            .into(),
611        }
612    }
613}
614
615fn command_with_metadata(
616    variant: workflow_command::Variant,
617    summary: Option<String>,
618    details: Option<String>,
619) -> WorkflowCommand {
620    WorkflowCommand {
621        variant: Some(variant),
622        user_metadata: string_user_metadata(summary, details),
623    }
624}
625
626fn string_user_metadata(summary: Option<String>, details: Option<String>) -> Option<UserMetadata> {
627    if summary.is_none() && details.is_none() {
628        return None;
629    }
630    let converter = PayloadConverter::default();
631    let context = SerializationContext {
632        data: &SerializationContextData::Workflow,
633        converter: &converter,
634    };
635    Some(UserMetadata {
636        summary: summary.map(|value| {
637            converter
638                .to_payload(&context, &value)
639                .expect("String-to-JSON payload serialization is infallible")
640        }),
641        details: details.map(|value| {
642            converter
643                .to_payload(&context, &value)
644                .expect("String-to-JSON payload serialization is infallible")
645        }),
646    })
647}
648
649#[cfg(test)]
650mod tests {
651    use super::*;
652
653    #[test]
654    fn continue_as_new_options_maps_backoff_start_interval_to_request() {
655        let req = ContinueAsNewOptions {
656            backoff_start_interval: Some(Duration::from_secs(7)),
657            ..Default::default()
658        }
659        .into_request("test-workflow".to_string(), vec![]);
660
661        let backoff = req
662            .backoff_start_interval
663            .expect("backoff_start_interval should be set");
664        assert_eq!(backoff.seconds, 7);
665        assert_eq!(backoff.nanos, 0);
666    }
667
668    #[test]
669    fn activity_options_with_start_to_close_timeout_wrapper_supports_builder_chaining() {
670        let opts = ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5))
671            .heartbeat_timeout(Duration::from_secs(2))
672            .build();
673
674        assert_eq!(
675            opts.close_timeouts,
676            ActivityCloseTimeouts::StartToClose(Duration::from_secs(5))
677        );
678        assert_eq!(opts.heartbeat_timeout, Some(Duration::from_secs(2)));
679    }
680
681    #[test]
682    fn activity_options_with_schedule_to_close_timeout_wrapper_supports_builder_chaining() {
683        let opts = ActivityOptions::with_schedule_to_close_timeout(Duration::from_secs(5))
684            .heartbeat_timeout(Duration::from_secs(2))
685            .build();
686
687        assert_eq!(
688            opts.close_timeouts,
689            ActivityCloseTimeouts::ScheduleToClose(Duration::from_secs(5))
690        );
691        assert_eq!(opts.heartbeat_timeout, Some(Duration::from_secs(2)));
692    }
693
694    #[test]
695    fn activity_options_both_close_timeouts_map_to_command() {
696        let req = ActivityOptions::with_close_timeouts(ActivityCloseTimeouts::Both {
697            start_to_close: Duration::from_secs(3),
698            schedule_to_close: Duration::from_secs(8),
699        })
700        .build()
701        .into_command(7, "test".to_string(), vec![]);
702        let Some(workflow_command::Variant::ScheduleActivity(req)) = req.variant else {
703            panic!("expected ScheduleActivity command");
704        };
705        assert_eq!(req.start_to_close_timeout.unwrap().seconds, 3);
706        assert_eq!(req.schedule_to_close_timeout.unwrap().seconds, 8);
707    }
708
709    #[test]
710    fn child_workflow_run_timeout_uses_run_timeout_field() {
711        let opts = ChildWorkflowOptions {
712            workflow_id: "test-wf".to_string(),
713            execution_timeout: Some(Duration::from_secs(60)),
714            run_timeout: Some(Duration::from_secs(10)),
715            ..Default::default()
716        };
717        let command = opts.into_command(1, "TestWorkflow".to_string(), vec![]);
718        let Some(workflow_command::Variant::StartChildWorkflowExecution(req)) = command.variant
719        else {
720            panic!("expected StartChildWorkflowExecution command");
721        };
722        let exec_timeout = req.workflow_execution_timeout.unwrap();
723        let run_timeout = req.workflow_run_timeout.unwrap();
724        assert_eq!(exec_timeout.seconds, 60);
725        assert_eq!(run_timeout.seconds, 10);
726    }
727
728    #[test]
729    fn child_workflow_run_timeout_none_when_unset() {
730        let opts = ChildWorkflowOptions {
731            workflow_id: "test-wf".to_string(),
732            execution_timeout: Some(Duration::from_secs(60)),
733            ..Default::default()
734        };
735        let command = opts.into_command(1, "TestWorkflow".to_string(), vec![]);
736        let Some(workflow_command::Variant::StartChildWorkflowExecution(req)) = command.variant
737        else {
738            panic!("expected StartChildWorkflowExecution command");
739        };
740        let exec_timeout = req.workflow_execution_timeout.unwrap();
741        assert_eq!(exec_timeout.seconds, 60);
742        assert!(req.workflow_run_timeout.is_none());
743    }
744}