squads_temporal_sdk/workflow_context/
options.rs

1use std::{collections::HashMap, time::Duration};
2
3use squads_temporal_client::{Priority, WorkflowOptions};
4use squads_temporal_sdk_core_protos::{
5    coresdk::{
6        child_workflow::ChildWorkflowCancellationType,
7        nexus::NexusOperationCancellationType,
8        workflow_commands::{
9            ActivityCancellationType, ScheduleActivity, ScheduleLocalActivity,
10            ScheduleNexusOperation, StartChildWorkflowExecution, WorkflowCommand,
11        },
12    },
13    temporal::api::{
14        common::v1::{Payload, RetryPolicy},
15        enums::v1::ParentClosePolicy,
16        sdk::v1::UserMetadata,
17    },
18};
19// TODO: Before release, probably best to avoid using proto types entirely here. They're awkward.
20
21pub(crate) trait IntoWorkflowCommand {
22    /// Produces a workflow command from some options
23    fn into_command(self, seq: u32) -> WorkflowCommand;
24}
25
26/// Options for scheduling an activity
27#[derive(Default, Debug)]
28pub struct ActivityOptions {
29    /// Identifier to use for tracking the activity in Workflow history.
30    /// The `activityId` can be accessed by the activity function.
31    /// Does not need to be unique.
32    ///
33    /// If `None` use the context's sequence number
34    pub activity_id: Option<String>,
35    /// Type of activity to schedule
36    pub activity_type: String,
37    /// Input to the activity
38    pub input: Payload,
39    /// Task queue to schedule the activity in
40    ///
41    /// If `None`, use the same task queue as the parent workflow.
42    pub task_queue: Option<String>,
43    /// Time that the Activity Task can stay in the Task Queue before it is picked up by a Worker.
44    /// Do not specify this timeout unless using host specific Task Queues for Activity Tasks are
45    /// being used for routing.
46    /// `schedule_to_start_timeout` is always non-retryable.
47    /// Retrying after this timeout doesn't make sense as it would just put the Activity Task back
48    /// into the same Task Queue.
49    pub schedule_to_start_timeout: Option<Duration>,
50    /// Maximum time of a single Activity execution attempt.
51    /// Note that the Temporal Server doesn't detect Worker process failures directly.
52    /// It relies on this timeout to detect that an Activity that didn't complete on time.
53    /// So this timeout should be as short as the longest possible execution of the Activity body.
54    /// Potentially long running Activities must specify `heartbeat_timeout` and heartbeat from the
55    /// activity periodically for timely failure detection.
56    /// Either this option or `schedule_to_close_timeout` is required.
57    pub start_to_close_timeout: Option<Duration>,
58    /// Total time that a workflow is willing to wait for Activity to complete.
59    /// `schedule_to_close_timeout` limits the total time of an Activity's execution including
60    /// retries (use `start_to_close_timeout` to limit the time of a single attempt).
61    /// Either this option or `start_to_close_timeout` is required.
62    pub schedule_to_close_timeout: Option<Duration>,
63    /// Heartbeat interval. Activity must heartbeat before this interval passes after a last
64    /// heartbeat or activity start.
65    pub heartbeat_timeout: Option<Duration>,
66    /// Determines what the SDK does when the Activity is cancelled.
67    pub cancellation_type: ActivityCancellationType,
68    /// Activity retry policy
69    pub retry_policy: Option<RetryPolicy>,
70    /// Summary of the activity
71    pub summary: Option<String>,
72    /// Priority for the activity
73    pub priority: Option<Priority>,
74    /// If true, disable eager execution for this activity
75    pub do_not_eagerly_execute: bool,
76}
77
78impl IntoWorkflowCommand for ActivityOptions {
79    fn into_command(self, seq: u32) -> WorkflowCommand {
80        WorkflowCommand {
81            variant: Some(
82                ScheduleActivity {
83                    seq,
84                    activity_id: match self.activity_id {
85                        None => seq.to_string(),
86                        Some(aid) => aid,
87                    },
88                    activity_type: self.activity_type,
89                    task_queue: self.task_queue.unwrap_or_default(),
90                    schedule_to_close_timeout: self
91                        .schedule_to_close_timeout
92                        .and_then(|d| d.try_into().ok()),
93                    schedule_to_start_timeout: self
94                        .schedule_to_start_timeout
95                        .and_then(|d| d.try_into().ok()),
96                    start_to_close_timeout: self
97                        .start_to_close_timeout
98                        .and_then(|d| d.try_into().ok()),
99                    heartbeat_timeout: self.heartbeat_timeout.and_then(|d| d.try_into().ok()),
100                    cancellation_type: self.cancellation_type as i32,
101                    arguments: vec![self.input],
102                    retry_policy: self.retry_policy,
103                    priority: self.priority.map(Into::into),
104                    do_not_eagerly_execute: self.do_not_eagerly_execute,
105                    ..Default::default()
106                }
107                .into(),
108            ),
109            user_metadata: self.summary.map(|s| UserMetadata {
110                summary: Some(s.into()),
111                details: None,
112            }),
113        }
114    }
115}
116
117/// Options for scheduling a local activity
118#[derive(Default, Debug, Clone)]
119pub struct LocalActivityOptions {
120    /// Identifier to use for tracking the activity in Workflow history.
121    /// The `activityId` can be accessed by the activity function.
122    /// Does not need to be unique.
123    ///
124    /// If `None` use the context's sequence number
125    pub activity_id: Option<String>,
126    /// Type of activity to schedule
127    pub activity_type: String,
128    /// Input to the activity
129    // TODO: Make optional
130    pub input: Payload,
131    /// Retry policy
132    pub retry_policy: RetryPolicy,
133    /// Override attempt number rather than using 1.
134    /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
135    pub attempt: Option<u32>,
136    /// Override schedule time when doing timer backoff.
137    /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
138    pub original_schedule_time: Option<prost_types::Timestamp>,
139    /// Retry backoffs over this amount will use a timer rather than a local retry
140    pub timer_backoff_threshold: Option<Duration>,
141    /// How the activity will cancel
142    pub cancel_type: ActivityCancellationType,
143    /// Indicates how long the caller is willing to wait for local activity completion. Limits how
144    /// long retries will be attempted. When not specified defaults to the workflow execution
145    /// timeout (which may be unset).
146    pub schedule_to_close_timeout: Option<Duration>,
147    /// Limits time the local activity can idle internally before being executed. That can happen if
148    /// the worker is currently at max concurrent local activity executions. This timeout is always
149    /// non retryable as all a retry would achieve is to put it back into the same queue. Defaults
150    /// to `schedule_to_close_timeout` if not specified and that is set. Must be <=
151    /// `schedule_to_close_timeout` when set, if not, it will be clamped down.
152    pub schedule_to_start_timeout: Option<Duration>,
153    /// Maximum time the local activity is allowed to execute after the task is dispatched. This
154    /// timeout is always retryable. Either or both of `schedule_to_close_timeout` and this must be
155    /// specified. If set, this must be <= `schedule_to_close_timeout`, if not, it will be clamped
156    /// down.
157    pub start_to_close_timeout: Option<Duration>,
158}
159
160impl IntoWorkflowCommand for LocalActivityOptions {
161    fn into_command(mut self, seq: u32) -> WorkflowCommand {
162        // Allow tests to avoid extra verbosity when they don't care about timeouts
163        // TODO: Builderize LA options
164        self.schedule_to_close_timeout
165            .get_or_insert(Duration::from_secs(100));
166
167        WorkflowCommand {
168            variant: Some(
169                ScheduleLocalActivity {
170                    seq,
171                    attempt: self.attempt.unwrap_or(1),
172                    original_schedule_time: self.original_schedule_time,
173                    activity_id: match self.activity_id {
174                        None => seq.to_string(),
175                        Some(aid) => aid,
176                    },
177                    activity_type: self.activity_type,
178                    arguments: vec![self.input],
179                    retry_policy: Some(self.retry_policy),
180                    local_retry_threshold: self
181                        .timer_backoff_threshold
182                        .and_then(|d| d.try_into().ok()),
183                    cancellation_type: self.cancel_type.into(),
184                    schedule_to_close_timeout: self
185                        .schedule_to_close_timeout
186                        .and_then(|d| d.try_into().ok()),
187                    schedule_to_start_timeout: self
188                        .schedule_to_start_timeout
189                        .and_then(|d| d.try_into().ok()),
190                    start_to_close_timeout: self
191                        .start_to_close_timeout
192                        .and_then(|d| d.try_into().ok()),
193                    ..Default::default()
194                }
195                .into(),
196            ),
197            user_metadata: None,
198        }
199    }
200}
201
202/// Options for scheduling a child workflow
203#[derive(Default, Debug, Clone)]
204pub struct ChildWorkflowOptions {
205    /// Workflow ID
206    pub workflow_id: String,
207    /// Type of workflow to schedule
208    pub workflow_type: String,
209    /// Task queue to schedule the workflow in
210    ///
211    /// If `None`, use the same task queue as the parent workflow.
212    pub task_queue: Option<String>,
213    /// Input to send the child Workflow
214    pub input: Vec<Payload>,
215    /// Cancellation strategy for the child workflow
216    pub cancel_type: ChildWorkflowCancellationType,
217    /// Common options
218    pub options: WorkflowOptions,
219    /// How to respond to parent workflow ending
220    pub parent_close_policy: ParentClosePolicy,
221    /// Static summary of the child workflow
222    pub static_summary: Option<String>,
223    /// Static details of the child workflow
224    pub static_details: Option<String>,
225}
226
227impl IntoWorkflowCommand for ChildWorkflowOptions {
228    fn into_command(self, seq: u32) -> WorkflowCommand {
229        let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() {
230            Some(UserMetadata {
231                summary: self.static_summary.map(Into::into),
232                details: self.static_details.map(Into::into),
233            })
234        } else {
235            None
236        };
237        WorkflowCommand {
238            variant: Some(
239                StartChildWorkflowExecution {
240                    seq,
241                    workflow_id: self.workflow_id,
242                    workflow_type: self.workflow_type,
243                    task_queue: self.task_queue.unwrap_or_default(),
244                    input: self.input,
245                    cancellation_type: self.cancel_type as i32,
246                    workflow_id_reuse_policy: self.options.id_reuse_policy as i32,
247                    workflow_execution_timeout: self
248                        .options
249                        .execution_timeout
250                        .and_then(|d| d.try_into().ok()),
251                    workflow_run_timeout: self
252                        .options
253                        .execution_timeout
254                        .and_then(|d| d.try_into().ok()),
255                    workflow_task_timeout: self
256                        .options
257                        .task_timeout
258                        .and_then(|d| d.try_into().ok()),
259                    search_attributes: self.options.search_attributes.unwrap_or_default(),
260                    cron_schedule: self.options.cron_schedule.unwrap_or_default(),
261                    parent_close_policy: self.parent_close_policy as i32,
262                    priority: self.options.priority.map(Into::into),
263                    ..Default::default()
264                }
265                .into(),
266            ),
267            user_metadata,
268        }
269    }
270}
271
272/// Options for sending a signal to an external workflow
273pub struct SignalWorkflowOptions {
274    /// The workflow's id
275    pub workflow_id: String,
276    /// The particular run to target, or latest if `None`
277    pub run_id: Option<String>,
278    /// The details of the signal to send
279    pub signal: Signal,
280}
281
282impl SignalWorkflowOptions {
283    /// Create options for sending a signal to another workflow
284    pub fn new(
285        workflow_id: impl Into<String>,
286        run_id: impl Into<String>,
287        name: impl Into<String>,
288        input: impl IntoIterator<Item = impl Into<Payload>>,
289    ) -> Self {
290        Self {
291            workflow_id: workflow_id.into(),
292            run_id: Some(run_id.into()),
293            signal: Signal::new(name, input),
294        }
295    }
296
297    /// Set a header k/v pair attached to the signal
298    pub fn with_header(
299        &mut self,
300        key: impl Into<String>,
301        payload: impl Into<Payload>,
302    ) -> &mut Self {
303        self.signal.data.with_header(key.into(), payload.into());
304        self
305    }
306}
307
308/// Information needed to send a specific signal
309pub struct Signal {
310    /// The signal name
311    pub signal_name: String,
312    /// The data the signal carries
313    pub data: SignalData,
314}
315
316impl Signal {
317    /// Create a new signal
318    pub fn new(
319        name: impl Into<String>,
320        input: impl IntoIterator<Item = impl Into<Payload>>,
321    ) -> Self {
322        Self {
323            signal_name: name.into(),
324            data: SignalData::new(input),
325        }
326    }
327}
328
329/// Data contained within a signal
330#[derive(Default, Debug)]
331pub struct SignalData {
332    /// The arguments the signal will receive
333    pub input: Vec<Payload>,
334    /// Metadata attached to the signal
335    pub headers: HashMap<String, Payload>,
336}
337
338impl SignalData {
339    /// Create data for a signal
340    pub fn new(input: impl IntoIterator<Item = impl Into<Payload>>) -> Self {
341        Self {
342            input: input.into_iter().map(Into::into).collect(),
343            headers: HashMap::new(),
344        }
345    }
346
347    /// Set a header k/v pair attached to the signal
348    pub fn with_header(
349        &mut self,
350        key: impl Into<String>,
351        payload: impl Into<Payload>,
352    ) -> &mut Self {
353        self.headers.insert(key.into(), payload.into());
354        self
355    }
356}
357
358/// Options for timer
359#[derive(Default, Debug, Clone)]
360pub struct TimerOptions {
361    /// Duration for the timer
362    pub duration: Duration,
363    /// Summary of the timer
364    pub summary: Option<String>,
365}
366
367impl From<Duration> for TimerOptions {
368    fn from(duration: Duration) -> Self {
369        TimerOptions {
370            duration,
371            ..Default::default()
372        }
373    }
374}
375
376/// Options for Nexus Operations
377#[derive(Default, Debug, Clone)]
378pub struct NexusOperationOptions {
379    /// Endpoint name, must exist in the endpoint registry or this command will fail.
380    pub endpoint: String,
381    /// Service name.
382    pub service: String,
383    /// Operation name.
384    pub operation: String,
385    /// Input for the operation. The server converts this into Nexus request content and the
386    /// appropriate content headers internally when sending the StartOperation request. On the
387    /// handler side, if it is also backed by Temporal, the content is transformed back to the
388    /// original Payload sent in this command.
389    pub input: Option<Payload>,
390    /// Schedule-to-close timeout for this operation.
391    /// Indicates how long the caller is willing to wait for operation completion.
392    /// Calls are retried internally by the server.
393    pub schedule_to_close_timeout: Option<Duration>,
394    /// Header to attach to the Nexus request.
395    /// Users are responsible for encrypting sensitive data in this header as it is stored in
396    /// workflow history and transmitted to external services as-is. This is useful for propagating
397    /// tracing information. Note these headers are not the same as Temporal headers on internal
398    /// activities and child workflows, these are transmitted to Nexus operations that may be
399    /// external and are not traditional payloads.
400    pub nexus_header: HashMap<String, String>,
401    /// Cancellation type for the operation
402    pub cancellation_type: Option<NexusOperationCancellationType>,
403}
404
405impl IntoWorkflowCommand for NexusOperationOptions {
406    fn into_command(self, seq: u32) -> WorkflowCommand {
407        WorkflowCommand {
408            user_metadata: None,
409            variant: Some(
410                ScheduleNexusOperation {
411                    seq,
412                    endpoint: self.endpoint,
413                    service: self.service,
414                    operation: self.operation,
415                    input: self.input,
416                    schedule_to_close_timeout: self
417                        .schedule_to_close_timeout
418                        .and_then(|t| t.try_into().ok()),
419                    nexus_header: self.nexus_header,
420                    cancellation_type: self
421                        .cancellation_type
422                        .unwrap_or(NexusOperationCancellationType::WaitCancellationCompleted)
423                        .into(),
424                }
425                .into(),
426            ),
427        }
428    }
429}