Skip to main content

temporalio_sdk/workflow_context/
options.rs

1use std::{collections::HashMap, time::Duration};
2
3use temporalio_client::Priority;
4use temporalio_common::protos::{
5    coresdk::{
6        AsJsonPayloadExt,
7        child_workflow::ChildWorkflowCancellationType,
8        nexus::NexusOperationCancellationType,
9        workflow_commands::{
10            ActivityCancellationType, ScheduleActivity, ScheduleLocalActivity,
11            ScheduleNexusOperation, StartChildWorkflowExecution, WorkflowCommand,
12        },
13    },
14    temporal::api::{
15        common::v1::{Payload, RetryPolicy, SearchAttributes},
16        enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy},
17        sdk::v1::UserMetadata,
18    },
19};
20// TODO: Before release, probably best to avoid using proto types entirely here. They're awkward.
21
22pub(crate) trait IntoWorkflowCommand {
23    /// Produces a workflow command from some options
24    fn into_command(self, seq: u32) -> WorkflowCommand;
25}
26
27/// Options for scheduling an activity
28#[derive(Default, Debug)]
29pub struct ActivityOptions {
30    /// Identifier to use for tracking the activity in Workflow history.
31    /// The `activityId` can be accessed by the activity function.
32    /// Does not need to be unique.
33    ///
34    /// If `None` use the context's sequence number
35    pub activity_id: Option<String>,
36    /// Task queue to schedule the activity in
37    ///
38    /// If `None`, use the same task queue as the parent workflow.
39    pub task_queue: Option<String>,
40    /// Time that the Activity Task can stay in the Task Queue before it is picked up by a Worker.
41    /// Do not specify this timeout unless using host specific Task Queues for Activity Tasks are
42    /// being used for routing.
43    /// `schedule_to_start_timeout` is always non-retryable.
44    /// Retrying after this timeout doesn't make sense as it would just put the Activity Task back
45    /// into the same Task Queue.
46    pub schedule_to_start_timeout: Option<Duration>,
47    /// Maximum time of a single Activity execution attempt.
48    /// Note that the Temporal Server doesn't detect Worker process failures directly.
49    /// It relies on this timeout to detect that an Activity that didn't complete on time.
50    /// So this timeout should be as short as the longest possible execution of the Activity body.
51    /// Potentially long running Activities must specify `heartbeat_timeout` and heartbeat from the
52    /// activity periodically for timely failure detection.
53    /// Either this option or `schedule_to_close_timeout` is required.
54    pub start_to_close_timeout: Option<Duration>,
55    /// Total time that a workflow is willing to wait for Activity to complete.
56    /// `schedule_to_close_timeout` limits the total time of an Activity's execution including
57    /// retries (use `start_to_close_timeout` to limit the time of a single attempt).
58    /// Either this option or `start_to_close_timeout` is required.
59    pub schedule_to_close_timeout: Option<Duration>,
60    /// Heartbeat interval. Activity must heartbeat before this interval passes after a last
61    /// heartbeat or activity start.
62    pub heartbeat_timeout: Option<Duration>,
63    /// Determines what the SDK does when the Activity is cancelled.
64    pub cancellation_type: ActivityCancellationType,
65    /// Activity retry policy
66    pub retry_policy: Option<RetryPolicy>,
67    /// Summary of the activity
68    pub summary: Option<String>,
69    /// Priority for the activity
70    pub priority: Option<Priority>,
71    /// If true, disable eager execution for this activity
72    pub do_not_eagerly_execute: bool,
73}
74
75impl ActivityOptions {
76    pub(crate) fn into_command(
77        self,
78        activity_type: String,
79        arguments: Vec<Payload>,
80        seq: u32,
81    ) -> WorkflowCommand {
82        WorkflowCommand {
83            variant: Some(
84                ScheduleActivity {
85                    seq,
86                    activity_id: match self.activity_id {
87                        None => seq.to_string(),
88                        Some(aid) => aid,
89                    },
90                    activity_type,
91                    task_queue: self.task_queue.unwrap_or_default(),
92                    schedule_to_close_timeout: self
93                        .schedule_to_close_timeout
94                        .and_then(|d| d.try_into().ok()),
95                    schedule_to_start_timeout: self
96                        .schedule_to_start_timeout
97                        .and_then(|d| d.try_into().ok()),
98                    start_to_close_timeout: self
99                        .start_to_close_timeout
100                        .and_then(|d| d.try_into().ok()),
101                    heartbeat_timeout: self.heartbeat_timeout.and_then(|d| d.try_into().ok()),
102                    cancellation_type: self.cancellation_type as i32,
103                    arguments,
104                    retry_policy: self.retry_policy,
105                    priority: self.priority.map(Into::into),
106                    do_not_eagerly_execute: self.do_not_eagerly_execute,
107                    ..Default::default()
108                }
109                .into(),
110            ),
111            user_metadata: self.summary.map(|s| UserMetadata {
112                summary: Some(s.into()),
113                details: None,
114            }),
115        }
116    }
117}
118
119/// Options for scheduling a local activity
120#[derive(Default, Debug, Clone)]
121pub struct LocalActivityOptions {
122    /// Identifier to use for tracking the activity in Workflow history.
123    /// The `activityId` can be accessed by the activity function.
124    /// Does not need to be unique.
125    ///
126    /// If `None` use the context's sequence number
127    pub activity_id: Option<String>,
128    /// Retry policy
129    pub retry_policy: RetryPolicy,
130    /// Override attempt number rather than using 1.
131    /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
132    pub attempt: Option<u32>,
133    /// Override schedule time when doing timer backoff.
134    /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
135    pub original_schedule_time: Option<prost_types::Timestamp>,
136    /// Retry backoffs over this amount will use a timer rather than a local retry
137    pub timer_backoff_threshold: Option<Duration>,
138    /// How the activity will cancel
139    pub cancel_type: ActivityCancellationType,
140    /// Indicates how long the caller is willing to wait for local activity completion. Limits how
141    /// long retries will be attempted. When not specified defaults to the workflow execution
142    /// timeout (which may be unset).
143    pub schedule_to_close_timeout: Option<Duration>,
144    /// Limits time the local activity can idle internally before being executed. That can happen if
145    /// the worker is currently at max concurrent local activity executions. This timeout is always
146    /// non retryable as all a retry would achieve is to put it back into the same queue. Defaults
147    /// to `schedule_to_close_timeout` if not specified and that is set. Must be <=
148    /// `schedule_to_close_timeout` when set, if not, it will be clamped down.
149    pub schedule_to_start_timeout: Option<Duration>,
150    /// Maximum time the local activity is allowed to execute after the task is dispatched. This
151    /// timeout is always retryable. Either or both of `schedule_to_close_timeout` and this must be
152    /// specified. If set, this must be <= `schedule_to_close_timeout`, if not, it will be clamped
153    /// down.
154    pub start_to_close_timeout: Option<Duration>,
155    /// Single-line summary for this activity that will appear in UI/CLI.
156    pub summary: Option<String>,
157}
158
159impl LocalActivityOptions {
160    pub(crate) fn into_command(
161        mut self,
162        activity_type: String,
163        arguments: Vec<Payload>,
164        seq: u32,
165    ) -> WorkflowCommand {
166        // Allow tests to avoid extra verbosity when they don't care about timeouts
167        // TODO: Builderize LA options
168        self.schedule_to_close_timeout
169            .get_or_insert(Duration::from_secs(100));
170
171        WorkflowCommand {
172            variant: Some(
173                ScheduleLocalActivity {
174                    seq,
175                    attempt: self.attempt.unwrap_or(1),
176                    original_schedule_time: self.original_schedule_time,
177                    activity_id: match self.activity_id {
178                        None => seq.to_string(),
179                        Some(aid) => aid,
180                    },
181                    activity_type,
182                    arguments,
183                    retry_policy: Some(self.retry_policy),
184                    local_retry_threshold: self
185                        .timer_backoff_threshold
186                        .and_then(|d| d.try_into().ok()),
187                    cancellation_type: self.cancel_type.into(),
188                    schedule_to_close_timeout: self
189                        .schedule_to_close_timeout
190                        .and_then(|d| d.try_into().ok()),
191                    schedule_to_start_timeout: self
192                        .schedule_to_start_timeout
193                        .and_then(|d| d.try_into().ok()),
194                    start_to_close_timeout: self
195                        .start_to_close_timeout
196                        .and_then(|d| d.try_into().ok()),
197                    ..Default::default()
198                }
199                .into(),
200            ),
201            user_metadata: self
202                .summary
203                .and_then(|summary| summary.as_json_payload().ok())
204                .map(|summary| UserMetadata {
205                    summary: Some(summary),
206                    details: None,
207                }),
208        }
209    }
210}
211
212/// Options for scheduling a child workflow
213#[derive(Default, Debug, Clone)]
214pub struct ChildWorkflowOptions {
215    /// Workflow ID
216    pub workflow_id: String,
217    /// Type of workflow to schedule
218    pub workflow_type: String,
219    /// Task queue to schedule the workflow in
220    ///
221    /// If `None`, use the same task queue as the parent workflow.
222    pub task_queue: Option<String>,
223    /// Input to send the child Workflow
224    pub input: Vec<Payload>,
225    /// Cancellation strategy for the child workflow
226    pub cancel_type: ChildWorkflowCancellationType,
227    /// How to respond to parent workflow ending
228    pub parent_close_policy: ParentClosePolicy,
229    /// Static summary of the child workflow
230    pub static_summary: Option<String>,
231    /// Static details of the child workflow
232    pub static_details: Option<String>,
233    /// Set the policy for reusing the workflow id
234    pub id_reuse_policy: WorkflowIdReusePolicy,
235    /// Optionally set the execution timeout for the workflow
236    pub execution_timeout: Option<Duration>,
237    /// Optionally indicates the default run timeout for a workflow run
238    pub run_timeout: Option<Duration>,
239    /// Optionally indicates the default task timeout for a workflow run
240    pub task_timeout: Option<Duration>,
241    /// Optionally set a cron schedule for the workflow
242    pub cron_schedule: Option<String>,
243    /// Optionally associate extra search attributes with a workflow
244    pub search_attributes: Option<HashMap<String, Payload>>,
245    /// Priority for the workflow
246    pub priority: Option<Priority>,
247}
248
249impl IntoWorkflowCommand for ChildWorkflowOptions {
250    fn into_command(self, seq: u32) -> WorkflowCommand {
251        let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() {
252            Some(UserMetadata {
253                summary: self.static_summary.map(Into::into),
254                details: self.static_details.map(Into::into),
255            })
256        } else {
257            None
258        };
259        WorkflowCommand {
260            variant: Some(
261                StartChildWorkflowExecution {
262                    seq,
263                    workflow_id: self.workflow_id,
264                    workflow_type: self.workflow_type,
265                    task_queue: self.task_queue.unwrap_or_default(),
266                    input: self.input,
267                    cancellation_type: self.cancel_type as i32,
268                    workflow_id_reuse_policy: self.id_reuse_policy as i32,
269                    workflow_execution_timeout: self
270                        .execution_timeout
271                        .and_then(|d| d.try_into().ok()),
272                    workflow_run_timeout: self.execution_timeout.and_then(|d| d.try_into().ok()),
273                    workflow_task_timeout: self.task_timeout.and_then(|d| d.try_into().ok()),
274                    search_attributes: self
275                        .search_attributes
276                        .map(|sa| SearchAttributes { indexed_fields: sa }),
277                    cron_schedule: self.cron_schedule.unwrap_or_default(),
278                    parent_close_policy: self.parent_close_policy as i32,
279                    priority: self.priority.map(Into::into),
280                    ..Default::default()
281                }
282                .into(),
283            ),
284            user_metadata,
285        }
286    }
287}
288
289/// Options for sending a signal to an external workflow
290#[derive(Debug)]
291pub struct SignalWorkflowOptions {
292    /// The workflow's id
293    pub workflow_id: String,
294    /// The particular run to target, or latest if `None`
295    pub run_id: Option<String>,
296    /// The details of the signal to send
297    pub signal: Signal,
298}
299
300impl SignalWorkflowOptions {
301    /// Create options for sending a signal to another workflow
302    pub fn new(
303        workflow_id: impl Into<String>,
304        run_id: impl Into<String>,
305        name: impl Into<String>,
306        input: impl IntoIterator<Item = impl Into<Payload>>,
307    ) -> Self {
308        Self {
309            workflow_id: workflow_id.into(),
310            run_id: Some(run_id.into()),
311            signal: Signal::new(name, input),
312        }
313    }
314
315    /// Set a header k/v pair attached to the signal
316    pub fn with_header(
317        &mut self,
318        key: impl Into<String>,
319        payload: impl Into<Payload>,
320    ) -> &mut Self {
321        self.signal.data.with_header(key.into(), payload.into());
322        self
323    }
324}
325
326/// Information needed to send a specific signal
327#[derive(Debug)]
328pub struct Signal {
329    /// The signal name
330    pub signal_name: String,
331    /// The data the signal carries
332    pub data: SignalData,
333}
334
335impl Signal {
336    /// Create a new signal
337    pub fn new(
338        name: impl Into<String>,
339        input: impl IntoIterator<Item = impl Into<Payload>>,
340    ) -> Self {
341        Self {
342            signal_name: name.into(),
343            data: SignalData::new(input),
344        }
345    }
346}
347
348/// Data contained within a signal
349#[derive(Default, Debug)]
350pub struct SignalData {
351    /// The arguments the signal will receive
352    pub input: Vec<Payload>,
353    /// Metadata attached to the signal
354    pub headers: HashMap<String, Payload>,
355}
356
357impl SignalData {
358    /// Create data for a signal
359    pub fn new(input: impl IntoIterator<Item = impl Into<Payload>>) -> Self {
360        Self {
361            input: input.into_iter().map(Into::into).collect(),
362            headers: HashMap::new(),
363        }
364    }
365
366    /// Set a header k/v pair attached to the signal
367    pub fn with_header(
368        &mut self,
369        key: impl Into<String>,
370        payload: impl Into<Payload>,
371    ) -> &mut Self {
372        self.headers.insert(key.into(), payload.into());
373        self
374    }
375}
376
377/// Options for timer
378#[derive(Default, Debug, Clone)]
379pub struct TimerOptions {
380    /// Duration for the timer
381    pub duration: Duration,
382    /// Summary of the timer
383    pub summary: Option<String>,
384}
385
386impl From<Duration> for TimerOptions {
387    fn from(duration: Duration) -> Self {
388        TimerOptions {
389            duration,
390            ..Default::default()
391        }
392    }
393}
394
395/// Options for Nexus Operations
396#[derive(Default, Debug, Clone)]
397pub struct NexusOperationOptions {
398    /// Endpoint name, must exist in the endpoint registry or this command will fail.
399    pub endpoint: String,
400    /// Service name.
401    pub service: String,
402    /// Operation name.
403    pub operation: String,
404    /// Input for the operation. The server converts this into Nexus request content and the
405    /// appropriate content headers internally when sending the StartOperation request. On the
406    /// handler side, if it is also backed by Temporal, the content is transformed back to the
407    /// original Payload sent in this command.
408    pub input: Option<Payload>,
409    /// Schedule-to-close timeout for this operation.
410    /// Indicates how long the caller is willing to wait for operation completion.
411    /// Calls are retried internally by the server.
412    pub schedule_to_close_timeout: Option<Duration>,
413    /// Header to attach to the Nexus request.
414    /// Users are responsible for encrypting sensitive data in this header as it is stored in
415    /// workflow history and transmitted to external services as-is. This is useful for propagating
416    /// tracing information. Note these headers are not the same as Temporal headers on internal
417    /// activities and child workflows, these are transmitted to Nexus operations that may be
418    /// external and are not traditional payloads.
419    pub nexus_header: HashMap<String, String>,
420    /// Cancellation type for the operation
421    pub cancellation_type: Option<NexusOperationCancellationType>,
422    /// Schedule-to-start timeout for this operation.
423    /// Indicates how long the caller is willing to wait for the operation to be started (or completed if synchronous)
424    /// by the handler. If the operation is not started within this timeout, it will fail with
425    /// TIMEOUT_TYPE_SCHEDULE_TO_START.
426    /// If not set or zero, no schedule-to-start timeout is enforced.
427    pub schedule_to_start_timeout: Option<Duration>,
428    /// Start-to-close timeout for this operation.
429    /// Indicates how long the caller is willing to wait for an asynchronous operation to complete after it has been
430    /// started. If the operation does not complete within this timeout after starting, it will fail with
431    /// TIMEOUT_TYPE_START_TO_CLOSE.
432    /// Only applies to asynchronous operations. Synchronous operations ignore this timeout.
433    /// If not set or zero, no start-to-close timeout is enforced.
434    pub start_to_close_timeout: Option<Duration>,
435}
436
437impl IntoWorkflowCommand for NexusOperationOptions {
438    fn into_command(self, seq: u32) -> WorkflowCommand {
439        WorkflowCommand {
440            user_metadata: None,
441            variant: Some(
442                ScheduleNexusOperation {
443                    seq,
444                    endpoint: self.endpoint,
445                    service: self.service,
446                    operation: self.operation,
447                    input: self.input,
448                    schedule_to_close_timeout: self
449                        .schedule_to_close_timeout
450                        .and_then(|t| t.try_into().ok()),
451                    schedule_to_start_timeout: self
452                        .schedule_to_start_timeout
453                        .and_then(|t| t.try_into().ok()),
454                    start_to_close_timeout: self
455                        .start_to_close_timeout
456                        .and_then(|t| t.try_into().ok()),
457                    nexus_header: self.nexus_header,
458                    cancellation_type: self
459                        .cancellation_type
460                        .unwrap_or(NexusOperationCancellationType::WaitCancellationCompleted)
461                        .into(),
462                }
463                .into(),
464            ),
465        }
466    }
467}