squads_temporal_sdk/workflow_context/options.rs
1use std::{collections::HashMap, time::Duration};
2
3use squads_temporal_client::{Priority, WorkflowOptions};
4use squads_temporal_sdk_core_protos::{
5 coresdk::{
6 child_workflow::ChildWorkflowCancellationType,
7 nexus::NexusOperationCancellationType,
8 workflow_commands::{
9 ActivityCancellationType, ScheduleActivity, ScheduleLocalActivity,
10 ScheduleNexusOperation, StartChildWorkflowExecution, WorkflowCommand,
11 },
12 },
13 temporal::api::{
14 common::v1::{Payload, RetryPolicy},
15 enums::v1::ParentClosePolicy,
16 sdk::v1::UserMetadata,
17 },
18};
19// TODO: Before release, probably best to avoid using proto types entirely here. They're awkward.
20
21pub(crate) trait IntoWorkflowCommand {
22 /// Produces a workflow command from some options
23 fn into_command(self, seq: u32) -> WorkflowCommand;
24}
25
26/// Options for scheduling an activity
27#[derive(Default, Debug)]
28pub struct ActivityOptions {
29 /// Identifier to use for tracking the activity in Workflow history.
30 /// The `activityId` can be accessed by the activity function.
31 /// Does not need to be unique.
32 ///
33 /// If `None` use the context's sequence number
34 pub activity_id: Option<String>,
35 /// Type of activity to schedule
36 pub activity_type: String,
37 /// Input to the activity
38 pub input: Payload,
39 /// Task queue to schedule the activity in
40 ///
41 /// If `None`, use the same task queue as the parent workflow.
42 pub task_queue: Option<String>,
43 /// Time that the Activity Task can stay in the Task Queue before it is picked up by a Worker.
44 /// Do not specify this timeout unless using host specific Task Queues for Activity Tasks are
45 /// being used for routing.
46 /// `schedule_to_start_timeout` is always non-retryable.
47 /// Retrying after this timeout doesn't make sense as it would just put the Activity Task back
48 /// into the same Task Queue.
49 pub schedule_to_start_timeout: Option<Duration>,
50 /// Maximum time of a single Activity execution attempt.
51 /// Note that the Temporal Server doesn't detect Worker process failures directly.
52 /// It relies on this timeout to detect that an Activity that didn't complete on time.
53 /// So this timeout should be as short as the longest possible execution of the Activity body.
54 /// Potentially long running Activities must specify `heartbeat_timeout` and heartbeat from the
55 /// activity periodically for timely failure detection.
56 /// Either this option or `schedule_to_close_timeout` is required.
57 pub start_to_close_timeout: Option<Duration>,
58 /// Total time that a workflow is willing to wait for Activity to complete.
59 /// `schedule_to_close_timeout` limits the total time of an Activity's execution including
60 /// retries (use `start_to_close_timeout` to limit the time of a single attempt).
61 /// Either this option or `start_to_close_timeout` is required.
62 pub schedule_to_close_timeout: Option<Duration>,
63 /// Heartbeat interval. Activity must heartbeat before this interval passes after a last
64 /// heartbeat or activity start.
65 pub heartbeat_timeout: Option<Duration>,
66 /// Determines what the SDK does when the Activity is cancelled.
67 pub cancellation_type: ActivityCancellationType,
68 /// Activity retry policy
69 pub retry_policy: Option<RetryPolicy>,
70 /// Summary of the activity
71 pub summary: Option<String>,
72 /// Priority for the activity
73 pub priority: Option<Priority>,
74 /// If true, disable eager execution for this activity
75 pub do_not_eagerly_execute: bool,
76}
77
78impl IntoWorkflowCommand for ActivityOptions {
79 fn into_command(self, seq: u32) -> WorkflowCommand {
80 WorkflowCommand {
81 variant: Some(
82 ScheduleActivity {
83 seq,
84 activity_id: match self.activity_id {
85 None => seq.to_string(),
86 Some(aid) => aid,
87 },
88 activity_type: self.activity_type,
89 task_queue: self.task_queue.unwrap_or_default(),
90 schedule_to_close_timeout: self
91 .schedule_to_close_timeout
92 .and_then(|d| d.try_into().ok()),
93 schedule_to_start_timeout: self
94 .schedule_to_start_timeout
95 .and_then(|d| d.try_into().ok()),
96 start_to_close_timeout: self
97 .start_to_close_timeout
98 .and_then(|d| d.try_into().ok()),
99 heartbeat_timeout: self.heartbeat_timeout.and_then(|d| d.try_into().ok()),
100 cancellation_type: self.cancellation_type as i32,
101 arguments: vec![self.input],
102 retry_policy: self.retry_policy,
103 priority: self.priority.map(Into::into),
104 do_not_eagerly_execute: self.do_not_eagerly_execute,
105 ..Default::default()
106 }
107 .into(),
108 ),
109 user_metadata: self.summary.map(|s| UserMetadata {
110 summary: Some(s.into()),
111 details: None,
112 }),
113 }
114 }
115}
116
117/// Options for scheduling a local activity
118#[derive(Default, Debug, Clone)]
119pub struct LocalActivityOptions {
120 /// Identifier to use for tracking the activity in Workflow history.
121 /// The `activityId` can be accessed by the activity function.
122 /// Does not need to be unique.
123 ///
124 /// If `None` use the context's sequence number
125 pub activity_id: Option<String>,
126 /// Type of activity to schedule
127 pub activity_type: String,
128 /// Input to the activity
129 // TODO: Make optional
130 pub input: Payload,
131 /// Retry policy
132 pub retry_policy: RetryPolicy,
133 /// Override attempt number rather than using 1.
134 /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
135 pub attempt: Option<u32>,
136 /// Override schedule time when doing timer backoff.
137 /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
138 pub original_schedule_time: Option<prost_types::Timestamp>,
139 /// Retry backoffs over this amount will use a timer rather than a local retry
140 pub timer_backoff_threshold: Option<Duration>,
141 /// How the activity will cancel
142 pub cancel_type: ActivityCancellationType,
143 /// Indicates how long the caller is willing to wait for local activity completion. Limits how
144 /// long retries will be attempted. When not specified defaults to the workflow execution
145 /// timeout (which may be unset).
146 pub schedule_to_close_timeout: Option<Duration>,
147 /// Limits time the local activity can idle internally before being executed. That can happen if
148 /// the worker is currently at max concurrent local activity executions. This timeout is always
149 /// non retryable as all a retry would achieve is to put it back into the same queue. Defaults
150 /// to `schedule_to_close_timeout` if not specified and that is set. Must be <=
151 /// `schedule_to_close_timeout` when set, if not, it will be clamped down.
152 pub schedule_to_start_timeout: Option<Duration>,
153 /// Maximum time the local activity is allowed to execute after the task is dispatched. This
154 /// timeout is always retryable. Either or both of `schedule_to_close_timeout` and this must be
155 /// specified. If set, this must be <= `schedule_to_close_timeout`, if not, it will be clamped
156 /// down.
157 pub start_to_close_timeout: Option<Duration>,
158}
159
160impl IntoWorkflowCommand for LocalActivityOptions {
161 fn into_command(mut self, seq: u32) -> WorkflowCommand {
162 // Allow tests to avoid extra verbosity when they don't care about timeouts
163 // TODO: Builderize LA options
164 self.schedule_to_close_timeout
165 .get_or_insert(Duration::from_secs(100));
166
167 WorkflowCommand {
168 variant: Some(
169 ScheduleLocalActivity {
170 seq,
171 attempt: self.attempt.unwrap_or(1),
172 original_schedule_time: self.original_schedule_time,
173 activity_id: match self.activity_id {
174 None => seq.to_string(),
175 Some(aid) => aid,
176 },
177 activity_type: self.activity_type,
178 arguments: vec![self.input],
179 retry_policy: Some(self.retry_policy),
180 local_retry_threshold: self
181 .timer_backoff_threshold
182 .and_then(|d| d.try_into().ok()),
183 cancellation_type: self.cancel_type.into(),
184 schedule_to_close_timeout: self
185 .schedule_to_close_timeout
186 .and_then(|d| d.try_into().ok()),
187 schedule_to_start_timeout: self
188 .schedule_to_start_timeout
189 .and_then(|d| d.try_into().ok()),
190 start_to_close_timeout: self
191 .start_to_close_timeout
192 .and_then(|d| d.try_into().ok()),
193 ..Default::default()
194 }
195 .into(),
196 ),
197 user_metadata: None,
198 }
199 }
200}
201
202/// Options for scheduling a child workflow
203#[derive(Default, Debug, Clone)]
204pub struct ChildWorkflowOptions {
205 /// Workflow ID
206 pub workflow_id: String,
207 /// Type of workflow to schedule
208 pub workflow_type: String,
209 /// Task queue to schedule the workflow in
210 ///
211 /// If `None`, use the same task queue as the parent workflow.
212 pub task_queue: Option<String>,
213 /// Input to send the child Workflow
214 pub input: Vec<Payload>,
215 /// Cancellation strategy for the child workflow
216 pub cancel_type: ChildWorkflowCancellationType,
217 /// Common options
218 pub options: WorkflowOptions,
219 /// How to respond to parent workflow ending
220 pub parent_close_policy: ParentClosePolicy,
221 /// Static summary of the child workflow
222 pub static_summary: Option<String>,
223 /// Static details of the child workflow
224 pub static_details: Option<String>,
225}
226
227impl IntoWorkflowCommand for ChildWorkflowOptions {
228 fn into_command(self, seq: u32) -> WorkflowCommand {
229 let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() {
230 Some(UserMetadata {
231 summary: self.static_summary.map(Into::into),
232 details: self.static_details.map(Into::into),
233 })
234 } else {
235 None
236 };
237 WorkflowCommand {
238 variant: Some(
239 StartChildWorkflowExecution {
240 seq,
241 workflow_id: self.workflow_id,
242 workflow_type: self.workflow_type,
243 task_queue: self.task_queue.unwrap_or_default(),
244 input: self.input,
245 cancellation_type: self.cancel_type as i32,
246 workflow_id_reuse_policy: self.options.id_reuse_policy as i32,
247 workflow_execution_timeout: self
248 .options
249 .execution_timeout
250 .and_then(|d| d.try_into().ok()),
251 workflow_run_timeout: self
252 .options
253 .execution_timeout
254 .and_then(|d| d.try_into().ok()),
255 workflow_task_timeout: self
256 .options
257 .task_timeout
258 .and_then(|d| d.try_into().ok()),
259 search_attributes: self.options.search_attributes.unwrap_or_default(),
260 cron_schedule: self.options.cron_schedule.unwrap_or_default(),
261 parent_close_policy: self.parent_close_policy as i32,
262 priority: self.options.priority.map(Into::into),
263 ..Default::default()
264 }
265 .into(),
266 ),
267 user_metadata,
268 }
269 }
270}
271
272/// Options for sending a signal to an external workflow
273pub struct SignalWorkflowOptions {
274 /// The workflow's id
275 pub workflow_id: String,
276 /// The particular run to target, or latest if `None`
277 pub run_id: Option<String>,
278 /// The details of the signal to send
279 pub signal: Signal,
280}
281
282impl SignalWorkflowOptions {
283 /// Create options for sending a signal to another workflow
284 pub fn new(
285 workflow_id: impl Into<String>,
286 run_id: impl Into<String>,
287 name: impl Into<String>,
288 input: impl IntoIterator<Item = impl Into<Payload>>,
289 ) -> Self {
290 Self {
291 workflow_id: workflow_id.into(),
292 run_id: Some(run_id.into()),
293 signal: Signal::new(name, input),
294 }
295 }
296
297 /// Set a header k/v pair attached to the signal
298 pub fn with_header(
299 &mut self,
300 key: impl Into<String>,
301 payload: impl Into<Payload>,
302 ) -> &mut Self {
303 self.signal.data.with_header(key.into(), payload.into());
304 self
305 }
306}
307
308/// Information needed to send a specific signal
309pub struct Signal {
310 /// The signal name
311 pub signal_name: String,
312 /// The data the signal carries
313 pub data: SignalData,
314}
315
316impl Signal {
317 /// Create a new signal
318 pub fn new(
319 name: impl Into<String>,
320 input: impl IntoIterator<Item = impl Into<Payload>>,
321 ) -> Self {
322 Self {
323 signal_name: name.into(),
324 data: SignalData::new(input),
325 }
326 }
327}
328
329/// Data contained within a signal
330#[derive(Default, Debug)]
331pub struct SignalData {
332 /// The arguments the signal will receive
333 pub input: Vec<Payload>,
334 /// Metadata attached to the signal
335 pub headers: HashMap<String, Payload>,
336}
337
338impl SignalData {
339 /// Create data for a signal
340 pub fn new(input: impl IntoIterator<Item = impl Into<Payload>>) -> Self {
341 Self {
342 input: input.into_iter().map(Into::into).collect(),
343 headers: HashMap::new(),
344 }
345 }
346
347 /// Set a header k/v pair attached to the signal
348 pub fn with_header(
349 &mut self,
350 key: impl Into<String>,
351 payload: impl Into<Payload>,
352 ) -> &mut Self {
353 self.headers.insert(key.into(), payload.into());
354 self
355 }
356}
357
358/// Options for timer
359#[derive(Default, Debug, Clone)]
360pub struct TimerOptions {
361 /// Duration for the timer
362 pub duration: Duration,
363 /// Summary of the timer
364 pub summary: Option<String>,
365}
366
367impl From<Duration> for TimerOptions {
368 fn from(duration: Duration) -> Self {
369 TimerOptions {
370 duration,
371 ..Default::default()
372 }
373 }
374}
375
376/// Options for Nexus Operations
377#[derive(Default, Debug, Clone)]
378pub struct NexusOperationOptions {
379 /// Endpoint name, must exist in the endpoint registry or this command will fail.
380 pub endpoint: String,
381 /// Service name.
382 pub service: String,
383 /// Operation name.
384 pub operation: String,
385 /// Input for the operation. The server converts this into Nexus request content and the
386 /// appropriate content headers internally when sending the StartOperation request. On the
387 /// handler side, if it is also backed by Temporal, the content is transformed back to the
388 /// original Payload sent in this command.
389 pub input: Option<Payload>,
390 /// Schedule-to-close timeout for this operation.
391 /// Indicates how long the caller is willing to wait for operation completion.
392 /// Calls are retried internally by the server.
393 pub schedule_to_close_timeout: Option<Duration>,
394 /// Header to attach to the Nexus request.
395 /// Users are responsible for encrypting sensitive data in this header as it is stored in
396 /// workflow history and transmitted to external services as-is. This is useful for propagating
397 /// tracing information. Note these headers are not the same as Temporal headers on internal
398 /// activities and child workflows, these are transmitted to Nexus operations that may be
399 /// external and are not traditional payloads.
400 pub nexus_header: HashMap<String, String>,
401 /// Cancellation type for the operation
402 pub cancellation_type: Option<NexusOperationCancellationType>,
403}
404
405impl IntoWorkflowCommand for NexusOperationOptions {
406 fn into_command(self, seq: u32) -> WorkflowCommand {
407 WorkflowCommand {
408 user_metadata: None,
409 variant: Some(
410 ScheduleNexusOperation {
411 seq,
412 endpoint: self.endpoint,
413 service: self.service,
414 operation: self.operation,
415 input: self.input,
416 schedule_to_close_timeout: self
417 .schedule_to_close_timeout
418 .and_then(|t| t.try_into().ok()),
419 nexus_header: self.nexus_header,
420 cancellation_type: self
421 .cancellation_type
422 .unwrap_or(NexusOperationCancellationType::WaitCancellationCompleted)
423 .into(),
424 }
425 .into(),
426 ),
427 }
428 }
429}