temporalio_sdk/workflow_context/options.rs
1use std::{collections::HashMap, time::Duration};
2
3use temporalio_client::Priority;
4use temporalio_common::protos::{
5 coresdk::{
6 AsJsonPayloadExt,
7 child_workflow::ChildWorkflowCancellationType,
8 nexus::NexusOperationCancellationType,
9 workflow_commands::{
10 ActivityCancellationType, ScheduleActivity, ScheduleLocalActivity,
11 ScheduleNexusOperation, StartChildWorkflowExecution, WorkflowCommand,
12 },
13 },
14 temporal::api::{
15 common::v1::{Payload, RetryPolicy, SearchAttributes},
16 enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy},
17 sdk::v1::UserMetadata,
18 },
19};
20// TODO: Before release, probably best to avoid using proto types entirely here. They're awkward.
21
22pub(crate) trait IntoWorkflowCommand {
23 /// Produces a workflow command from some options
24 fn into_command(self, seq: u32) -> WorkflowCommand;
25}
26
27/// Options for scheduling an activity
28#[derive(Default, Debug)]
29pub struct ActivityOptions {
30 /// Identifier to use for tracking the activity in Workflow history.
31 /// The `activityId` can be accessed by the activity function.
32 /// Does not need to be unique.
33 ///
34 /// If `None` use the context's sequence number
35 pub activity_id: Option<String>,
36 /// Task queue to schedule the activity in
37 ///
38 /// If `None`, use the same task queue as the parent workflow.
39 pub task_queue: Option<String>,
40 /// Time that the Activity Task can stay in the Task Queue before it is picked up by a Worker.
41 /// Do not specify this timeout unless using host specific Task Queues for Activity Tasks are
42 /// being used for routing.
43 /// `schedule_to_start_timeout` is always non-retryable.
44 /// Retrying after this timeout doesn't make sense as it would just put the Activity Task back
45 /// into the same Task Queue.
46 pub schedule_to_start_timeout: Option<Duration>,
47 /// Maximum time of a single Activity execution attempt.
48 /// Note that the Temporal Server doesn't detect Worker process failures directly.
49 /// It relies on this timeout to detect that an Activity that didn't complete on time.
50 /// So this timeout should be as short as the longest possible execution of the Activity body.
51 /// Potentially long running Activities must specify `heartbeat_timeout` and heartbeat from the
52 /// activity periodically for timely failure detection.
53 /// Either this option or `schedule_to_close_timeout` is required.
54 pub start_to_close_timeout: Option<Duration>,
55 /// Total time that a workflow is willing to wait for Activity to complete.
56 /// `schedule_to_close_timeout` limits the total time of an Activity's execution including
57 /// retries (use `start_to_close_timeout` to limit the time of a single attempt).
58 /// Either this option or `start_to_close_timeout` is required.
59 pub schedule_to_close_timeout: Option<Duration>,
60 /// Heartbeat interval. Activity must heartbeat before this interval passes after a last
61 /// heartbeat or activity start.
62 pub heartbeat_timeout: Option<Duration>,
63 /// Determines what the SDK does when the Activity is cancelled.
64 pub cancellation_type: ActivityCancellationType,
65 /// Activity retry policy
66 pub retry_policy: Option<RetryPolicy>,
67 /// Summary of the activity
68 pub summary: Option<String>,
69 /// Priority for the activity
70 pub priority: Option<Priority>,
71 /// If true, disable eager execution for this activity
72 pub do_not_eagerly_execute: bool,
73}
74
75impl ActivityOptions {
76 pub(crate) fn into_command(
77 self,
78 activity_type: String,
79 arguments: Vec<Payload>,
80 seq: u32,
81 ) -> WorkflowCommand {
82 WorkflowCommand {
83 variant: Some(
84 ScheduleActivity {
85 seq,
86 activity_id: match self.activity_id {
87 None => seq.to_string(),
88 Some(aid) => aid,
89 },
90 activity_type,
91 task_queue: self.task_queue.unwrap_or_default(),
92 schedule_to_close_timeout: self
93 .schedule_to_close_timeout
94 .and_then(|d| d.try_into().ok()),
95 schedule_to_start_timeout: self
96 .schedule_to_start_timeout
97 .and_then(|d| d.try_into().ok()),
98 start_to_close_timeout: self
99 .start_to_close_timeout
100 .and_then(|d| d.try_into().ok()),
101 heartbeat_timeout: self.heartbeat_timeout.and_then(|d| d.try_into().ok()),
102 cancellation_type: self.cancellation_type as i32,
103 arguments,
104 retry_policy: self.retry_policy,
105 priority: self.priority.map(Into::into),
106 do_not_eagerly_execute: self.do_not_eagerly_execute,
107 ..Default::default()
108 }
109 .into(),
110 ),
111 user_metadata: self.summary.map(|s| UserMetadata {
112 summary: Some(s.into()),
113 details: None,
114 }),
115 }
116 }
117}
118
119/// Options for scheduling a local activity
120#[derive(Default, Debug, Clone)]
121pub struct LocalActivityOptions {
122 /// Identifier to use for tracking the activity in Workflow history.
123 /// The `activityId` can be accessed by the activity function.
124 /// Does not need to be unique.
125 ///
126 /// If `None` use the context's sequence number
127 pub activity_id: Option<String>,
128 /// Retry policy
129 pub retry_policy: RetryPolicy,
130 /// Override attempt number rather than using 1.
131 /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
132 pub attempt: Option<u32>,
133 /// Override schedule time when doing timer backoff.
134 /// Ideally we would not expose this in a released Rust SDK, but it's needed for test.
135 pub original_schedule_time: Option<prost_types::Timestamp>,
136 /// Retry backoffs over this amount will use a timer rather than a local retry
137 pub timer_backoff_threshold: Option<Duration>,
138 /// How the activity will cancel
139 pub cancel_type: ActivityCancellationType,
140 /// Indicates how long the caller is willing to wait for local activity completion. Limits how
141 /// long retries will be attempted. When not specified defaults to the workflow execution
142 /// timeout (which may be unset).
143 pub schedule_to_close_timeout: Option<Duration>,
144 /// Limits time the local activity can idle internally before being executed. That can happen if
145 /// the worker is currently at max concurrent local activity executions. This timeout is always
146 /// non retryable as all a retry would achieve is to put it back into the same queue. Defaults
147 /// to `schedule_to_close_timeout` if not specified and that is set. Must be <=
148 /// `schedule_to_close_timeout` when set, if not, it will be clamped down.
149 pub schedule_to_start_timeout: Option<Duration>,
150 /// Maximum time the local activity is allowed to execute after the task is dispatched. This
151 /// timeout is always retryable. Either or both of `schedule_to_close_timeout` and this must be
152 /// specified. If set, this must be <= `schedule_to_close_timeout`, if not, it will be clamped
153 /// down.
154 pub start_to_close_timeout: Option<Duration>,
155 /// Single-line summary for this activity that will appear in UI/CLI.
156 pub summary: Option<String>,
157}
158
159impl LocalActivityOptions {
160 pub(crate) fn into_command(
161 mut self,
162 activity_type: String,
163 arguments: Vec<Payload>,
164 seq: u32,
165 ) -> WorkflowCommand {
166 // Allow tests to avoid extra verbosity when they don't care about timeouts
167 // TODO: Builderize LA options
168 self.schedule_to_close_timeout
169 .get_or_insert(Duration::from_secs(100));
170
171 WorkflowCommand {
172 variant: Some(
173 ScheduleLocalActivity {
174 seq,
175 attempt: self.attempt.unwrap_or(1),
176 original_schedule_time: self.original_schedule_time,
177 activity_id: match self.activity_id {
178 None => seq.to_string(),
179 Some(aid) => aid,
180 },
181 activity_type,
182 arguments,
183 retry_policy: Some(self.retry_policy),
184 local_retry_threshold: self
185 .timer_backoff_threshold
186 .and_then(|d| d.try_into().ok()),
187 cancellation_type: self.cancel_type.into(),
188 schedule_to_close_timeout: self
189 .schedule_to_close_timeout
190 .and_then(|d| d.try_into().ok()),
191 schedule_to_start_timeout: self
192 .schedule_to_start_timeout
193 .and_then(|d| d.try_into().ok()),
194 start_to_close_timeout: self
195 .start_to_close_timeout
196 .and_then(|d| d.try_into().ok()),
197 ..Default::default()
198 }
199 .into(),
200 ),
201 user_metadata: self
202 .summary
203 .and_then(|summary| summary.as_json_payload().ok())
204 .map(|summary| UserMetadata {
205 summary: Some(summary),
206 details: None,
207 }),
208 }
209 }
210}
211
212/// Options for scheduling a child workflow
213#[derive(Default, Debug, Clone)]
214pub struct ChildWorkflowOptions {
215 /// Workflow ID
216 pub workflow_id: String,
217 /// Type of workflow to schedule
218 pub workflow_type: String,
219 /// Task queue to schedule the workflow in
220 ///
221 /// If `None`, use the same task queue as the parent workflow.
222 pub task_queue: Option<String>,
223 /// Input to send the child Workflow
224 pub input: Vec<Payload>,
225 /// Cancellation strategy for the child workflow
226 pub cancel_type: ChildWorkflowCancellationType,
227 /// How to respond to parent workflow ending
228 pub parent_close_policy: ParentClosePolicy,
229 /// Static summary of the child workflow
230 pub static_summary: Option<String>,
231 /// Static details of the child workflow
232 pub static_details: Option<String>,
233 /// Set the policy for reusing the workflow id
234 pub id_reuse_policy: WorkflowIdReusePolicy,
235 /// Optionally set the execution timeout for the workflow
236 pub execution_timeout: Option<Duration>,
237 /// Optionally indicates the default run timeout for a workflow run
238 pub run_timeout: Option<Duration>,
239 /// Optionally indicates the default task timeout for a workflow run
240 pub task_timeout: Option<Duration>,
241 /// Optionally set a cron schedule for the workflow
242 pub cron_schedule: Option<String>,
243 /// Optionally associate extra search attributes with a workflow
244 pub search_attributes: Option<HashMap<String, Payload>>,
245 /// Priority for the workflow
246 pub priority: Option<Priority>,
247}
248
249impl IntoWorkflowCommand for ChildWorkflowOptions {
250 fn into_command(self, seq: u32) -> WorkflowCommand {
251 let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() {
252 Some(UserMetadata {
253 summary: self.static_summary.map(Into::into),
254 details: self.static_details.map(Into::into),
255 })
256 } else {
257 None
258 };
259 WorkflowCommand {
260 variant: Some(
261 StartChildWorkflowExecution {
262 seq,
263 workflow_id: self.workflow_id,
264 workflow_type: self.workflow_type,
265 task_queue: self.task_queue.unwrap_or_default(),
266 input: self.input,
267 cancellation_type: self.cancel_type as i32,
268 workflow_id_reuse_policy: self.id_reuse_policy as i32,
269 workflow_execution_timeout: self
270 .execution_timeout
271 .and_then(|d| d.try_into().ok()),
272 workflow_run_timeout: self.execution_timeout.and_then(|d| d.try_into().ok()),
273 workflow_task_timeout: self.task_timeout.and_then(|d| d.try_into().ok()),
274 search_attributes: self
275 .search_attributes
276 .map(|sa| SearchAttributes { indexed_fields: sa }),
277 cron_schedule: self.cron_schedule.unwrap_or_default(),
278 parent_close_policy: self.parent_close_policy as i32,
279 priority: self.priority.map(Into::into),
280 ..Default::default()
281 }
282 .into(),
283 ),
284 user_metadata,
285 }
286 }
287}
288
289/// Options for sending a signal to an external workflow
290#[derive(Debug)]
291pub struct SignalWorkflowOptions {
292 /// The workflow's id
293 pub workflow_id: String,
294 /// The particular run to target, or latest if `None`
295 pub run_id: Option<String>,
296 /// The details of the signal to send
297 pub signal: Signal,
298}
299
300impl SignalWorkflowOptions {
301 /// Create options for sending a signal to another workflow
302 pub fn new(
303 workflow_id: impl Into<String>,
304 run_id: impl Into<String>,
305 name: impl Into<String>,
306 input: impl IntoIterator<Item = impl Into<Payload>>,
307 ) -> Self {
308 Self {
309 workflow_id: workflow_id.into(),
310 run_id: Some(run_id.into()),
311 signal: Signal::new(name, input),
312 }
313 }
314
315 /// Set a header k/v pair attached to the signal
316 pub fn with_header(
317 &mut self,
318 key: impl Into<String>,
319 payload: impl Into<Payload>,
320 ) -> &mut Self {
321 self.signal.data.with_header(key.into(), payload.into());
322 self
323 }
324}
325
326/// Information needed to send a specific signal
327#[derive(Debug)]
328pub struct Signal {
329 /// The signal name
330 pub signal_name: String,
331 /// The data the signal carries
332 pub data: SignalData,
333}
334
335impl Signal {
336 /// Create a new signal
337 pub fn new(
338 name: impl Into<String>,
339 input: impl IntoIterator<Item = impl Into<Payload>>,
340 ) -> Self {
341 Self {
342 signal_name: name.into(),
343 data: SignalData::new(input),
344 }
345 }
346}
347
348/// Data contained within a signal
349#[derive(Default, Debug)]
350pub struct SignalData {
351 /// The arguments the signal will receive
352 pub input: Vec<Payload>,
353 /// Metadata attached to the signal
354 pub headers: HashMap<String, Payload>,
355}
356
357impl SignalData {
358 /// Create data for a signal
359 pub fn new(input: impl IntoIterator<Item = impl Into<Payload>>) -> Self {
360 Self {
361 input: input.into_iter().map(Into::into).collect(),
362 headers: HashMap::new(),
363 }
364 }
365
366 /// Set a header k/v pair attached to the signal
367 pub fn with_header(
368 &mut self,
369 key: impl Into<String>,
370 payload: impl Into<Payload>,
371 ) -> &mut Self {
372 self.headers.insert(key.into(), payload.into());
373 self
374 }
375}
376
377/// Options for timer
378#[derive(Default, Debug, Clone)]
379pub struct TimerOptions {
380 /// Duration for the timer
381 pub duration: Duration,
382 /// Summary of the timer
383 pub summary: Option<String>,
384}
385
386impl From<Duration> for TimerOptions {
387 fn from(duration: Duration) -> Self {
388 TimerOptions {
389 duration,
390 ..Default::default()
391 }
392 }
393}
394
395/// Options for Nexus Operations
396#[derive(Default, Debug, Clone)]
397pub struct NexusOperationOptions {
398 /// Endpoint name, must exist in the endpoint registry or this command will fail.
399 pub endpoint: String,
400 /// Service name.
401 pub service: String,
402 /// Operation name.
403 pub operation: String,
404 /// Input for the operation. The server converts this into Nexus request content and the
405 /// appropriate content headers internally when sending the StartOperation request. On the
406 /// handler side, if it is also backed by Temporal, the content is transformed back to the
407 /// original Payload sent in this command.
408 pub input: Option<Payload>,
409 /// Schedule-to-close timeout for this operation.
410 /// Indicates how long the caller is willing to wait for operation completion.
411 /// Calls are retried internally by the server.
412 pub schedule_to_close_timeout: Option<Duration>,
413 /// Header to attach to the Nexus request.
414 /// Users are responsible for encrypting sensitive data in this header as it is stored in
415 /// workflow history and transmitted to external services as-is. This is useful for propagating
416 /// tracing information. Note these headers are not the same as Temporal headers on internal
417 /// activities and child workflows, these are transmitted to Nexus operations that may be
418 /// external and are not traditional payloads.
419 pub nexus_header: HashMap<String, String>,
420 /// Cancellation type for the operation
421 pub cancellation_type: Option<NexusOperationCancellationType>,
422 /// Schedule-to-start timeout for this operation.
423 /// Indicates how long the caller is willing to wait for the operation to be started (or completed if synchronous)
424 /// by the handler. If the operation is not started within this timeout, it will fail with
425 /// TIMEOUT_TYPE_SCHEDULE_TO_START.
426 /// If not set or zero, no schedule-to-start timeout is enforced.
427 pub schedule_to_start_timeout: Option<Duration>,
428 /// Start-to-close timeout for this operation.
429 /// Indicates how long the caller is willing to wait for an asynchronous operation to complete after it has been
430 /// started. If the operation does not complete within this timeout after starting, it will fail with
431 /// TIMEOUT_TYPE_START_TO_CLOSE.
432 /// Only applies to asynchronous operations. Synchronous operations ignore this timeout.
433 /// If not set or zero, no start-to-close timeout is enforced.
434 pub start_to_close_timeout: Option<Duration>,
435}
436
437impl IntoWorkflowCommand for NexusOperationOptions {
438 fn into_command(self, seq: u32) -> WorkflowCommand {
439 WorkflowCommand {
440 user_metadata: None,
441 variant: Some(
442 ScheduleNexusOperation {
443 seq,
444 endpoint: self.endpoint,
445 service: self.service,
446 operation: self.operation,
447 input: self.input,
448 schedule_to_close_timeout: self
449 .schedule_to_close_timeout
450 .and_then(|t| t.try_into().ok()),
451 schedule_to_start_timeout: self
452 .schedule_to_start_timeout
453 .and_then(|t| t.try_into().ok()),
454 start_to_close_timeout: self
455 .start_to_close_timeout
456 .and_then(|t| t.try_into().ok()),
457 nexus_header: self.nexus_header,
458 cancellation_type: self
459 .cancellation_type
460 .unwrap_or(NexusOperationCancellationType::WaitCancellationCompleted)
461 .into(),
462 }
463 .into(),
464 ),
465 }
466 }
467}