1use std::{collections::HashMap, time::Duration};
2
3use temporalio_client::Priority;
4use temporalio_common::{
5 data_converters::{
6 GenericPayloadConverter, PayloadConverter, SerializationContext, SerializationContextData,
7 },
8 protos::{
9 coresdk::{
10 child_workflow::ChildWorkflowCancellationType,
11 common::VersioningIntent,
12 nexus::NexusOperationCancellationType,
13 workflow_commands::{
14 ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity,
15 ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution,
16 WorkflowCommand,
17 },
18 },
19 temporal::api::{
20 common::v1::{Payload, RetryPolicy, SearchAttributes},
21 enums::v1::{ParentClosePolicy, WorkflowIdReusePolicy},
22 sdk::v1::UserMetadata,
23 },
24 },
25};
26pub(crate) trait IntoWorkflowCommand {
29 fn into_command(self, seq: u32) -> WorkflowCommand;
31}
32
33#[derive(Debug, bon::Builder, Clone)]
35#[non_exhaustive]
36#[builder(start_fn = with_close_timeouts, on(String, into), state_mod(vis = "pub"))]
37pub struct ActivityOptions {
38 #[builder(start_fn)]
42 pub close_timeouts: ActivityCloseTimeouts,
43 pub activity_id: Option<String>,
49 pub task_queue: Option<String>,
53 pub schedule_to_start_timeout: Option<Duration>,
60 pub heartbeat_timeout: Option<Duration>,
63 #[builder(default)]
65 pub cancellation_type: ActivityCancellationType,
66 pub retry_policy: Option<RetryPolicy>,
68 pub summary: Option<String>,
70 pub priority: Option<Priority>,
72 #[builder(default)]
74 pub do_not_eagerly_execute: bool,
75}
76
77impl ActivityOptions {
78 pub fn with_start_to_close_timeout(duration: Duration) -> ActivityOptionsBuilder {
80 Self::with_close_timeouts(ActivityCloseTimeouts::StartToClose(duration))
81 }
82
83 pub fn with_schedule_to_close_timeout(duration: Duration) -> ActivityOptionsBuilder {
85 Self::with_close_timeouts(ActivityCloseTimeouts::ScheduleToClose(duration))
86 }
87
88 pub fn start_to_close_timeout(duration: Duration) -> Self {
92 Self::with_start_to_close_timeout(duration).build()
93 }
94
95 pub fn schedule_to_close_timeout(duration: Duration) -> Self {
99 Self::with_schedule_to_close_timeout(duration).build()
100 }
101
102 pub(crate) fn into_command(
103 self,
104 activity_type: String,
105 arguments: Vec<Payload>,
106 seq: u32,
107 ) -> WorkflowCommand {
108 let payload_converter = PayloadConverter::default();
109 let context = SerializationContext {
110 data: &SerializationContextData::Workflow,
111 converter: &payload_converter,
112 };
113 let (start_to_close_timeout, schedule_to_close_timeout) =
114 self.close_timeouts.into_durations();
115 WorkflowCommand {
116 variant: Some(
117 ScheduleActivity {
118 seq,
119 activity_id: match self.activity_id {
120 None => seq.to_string(),
121 Some(aid) => aid,
122 },
123 activity_type,
124 task_queue: self.task_queue.unwrap_or_default(),
125 schedule_to_close_timeout: schedule_to_close_timeout
126 .and_then(|d| d.try_into().ok()),
127 schedule_to_start_timeout: self
128 .schedule_to_start_timeout
129 .and_then(|d| d.try_into().ok()),
130 start_to_close_timeout: start_to_close_timeout.and_then(|d| d.try_into().ok()),
131 heartbeat_timeout: self.heartbeat_timeout.and_then(|d| d.try_into().ok()),
132 cancellation_type: self.cancellation_type as i32,
133 arguments,
134 retry_policy: self.retry_policy,
135 priority: self.priority.map(Into::into),
136 do_not_eagerly_execute: self.do_not_eagerly_execute,
137 ..Default::default()
138 }
139 .into(),
140 ),
141 user_metadata: self
142 .summary
143 .map(|s| {
144 payload_converter
145 .to_payload(&context, &s)
146 .expect("String-to-JSON payload serialization is infallible")
147 })
148 .map(|summary| UserMetadata {
149 summary: Some(summary),
150 details: None,
151 }),
152 }
153 }
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum ActivityCloseTimeouts {
159 ScheduleToClose(Duration),
163 StartToClose(Duration),
170 Both {
172 start_to_close: Duration,
174 schedule_to_close: Duration,
176 },
177}
178
179impl ActivityCloseTimeouts {
180 fn into_durations(self) -> (Option<Duration>, Option<Duration>) {
181 match self {
182 Self::ScheduleToClose(schedule_to_close) => (None, Some(schedule_to_close)),
183 Self::StartToClose(start_to_close) => (Some(start_to_close), None),
184 Self::Both {
185 start_to_close,
186 schedule_to_close,
187 } => (Some(start_to_close), Some(schedule_to_close)),
188 }
189 }
190}
191
192#[derive(Default, Debug, Clone)]
194pub struct LocalActivityOptions {
195 pub activity_id: Option<String>,
201 pub retry_policy: RetryPolicy,
203 pub attempt: Option<u32>,
206 pub original_schedule_time: Option<prost_types::Timestamp>,
209 pub timer_backoff_threshold: Option<Duration>,
211 pub cancel_type: ActivityCancellationType,
213 pub schedule_to_close_timeout: Option<Duration>,
217 pub schedule_to_start_timeout: Option<Duration>,
223 pub start_to_close_timeout: Option<Duration>,
228 pub summary: Option<String>,
230}
231
232impl LocalActivityOptions {
233 pub(crate) fn into_command(
234 mut self,
235 activity_type: String,
236 arguments: Vec<Payload>,
237 seq: u32,
238 ) -> WorkflowCommand {
239 let payload_converter = PayloadConverter::default();
240 let context = SerializationContext {
241 data: &SerializationContextData::Workflow,
242 converter: &payload_converter,
243 };
244 self.schedule_to_close_timeout
247 .get_or_insert(Duration::from_secs(100));
248
249 WorkflowCommand {
250 variant: Some(
251 ScheduleLocalActivity {
252 seq,
253 attempt: self.attempt.unwrap_or(1),
254 original_schedule_time: self.original_schedule_time,
255 activity_id: match self.activity_id {
256 None => seq.to_string(),
257 Some(aid) => aid,
258 },
259 activity_type,
260 arguments,
261 retry_policy: Some(self.retry_policy),
262 local_retry_threshold: self
263 .timer_backoff_threshold
264 .and_then(|d| d.try_into().ok()),
265 cancellation_type: self.cancel_type.into(),
266 schedule_to_close_timeout: self
267 .schedule_to_close_timeout
268 .and_then(|d| d.try_into().ok()),
269 schedule_to_start_timeout: self
270 .schedule_to_start_timeout
271 .and_then(|d| d.try_into().ok()),
272 start_to_close_timeout: self
273 .start_to_close_timeout
274 .and_then(|d| d.try_into().ok()),
275 ..Default::default()
276 }
277 .into(),
278 ),
279 user_metadata: self
280 .summary
281 .map(|summary| {
282 payload_converter
283 .to_payload(&context, &summary)
284 .expect("String-to-JSON payload serialization is infallible")
285 })
286 .map(|summary| UserMetadata {
287 summary: Some(summary),
288 details: None,
289 }),
290 }
291 }
292}
293
294#[derive(Default, Debug, Clone)]
296pub struct ChildWorkflowOptions {
297 pub workflow_id: String,
299 pub task_queue: Option<String>,
303 pub cancel_type: ChildWorkflowCancellationType,
305 pub parent_close_policy: ParentClosePolicy,
307 pub static_summary: Option<String>,
309 pub static_details: Option<String>,
311 pub id_reuse_policy: WorkflowIdReusePolicy,
313 pub execution_timeout: Option<Duration>,
315 pub run_timeout: Option<Duration>,
317 pub task_timeout: Option<Duration>,
319 pub cron_schedule: Option<String>,
321 pub search_attributes: Option<HashMap<String, Payload>>,
323 pub priority: Option<Priority>,
325}
326
327impl ChildWorkflowOptions {
328 pub(crate) fn into_command(
329 self,
330 workflow_type: String,
331 input: Vec<Payload>,
332 seq: u32,
333 ) -> WorkflowCommand {
334 let payload_converter = PayloadConverter::default();
335 let context = SerializationContext {
336 data: &SerializationContextData::Workflow,
337 converter: &payload_converter,
338 };
339 let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() {
340 Some(UserMetadata {
341 summary: self.static_summary.map(|s| {
342 payload_converter
343 .to_payload(&context, &s)
344 .expect("String-to-JSON payload serialization is infallible")
345 }),
346 details: self.static_details.map(|s| {
347 payload_converter
348 .to_payload(&context, &s)
349 .expect("String-to-JSON payload serialization is infallible")
350 }),
351 })
352 } else {
353 None
354 };
355 WorkflowCommand {
356 variant: Some(
357 StartChildWorkflowExecution {
358 seq,
359 workflow_id: self.workflow_id,
360 workflow_type,
361 task_queue: self.task_queue.unwrap_or_default(),
362 input,
363 cancellation_type: self.cancel_type as i32,
364 workflow_id_reuse_policy: self.id_reuse_policy as i32,
365 workflow_execution_timeout: self
366 .execution_timeout
367 .and_then(|d| d.try_into().ok()),
368 workflow_run_timeout: self.run_timeout.and_then(|d| d.try_into().ok()),
369 workflow_task_timeout: self.task_timeout.and_then(|d| d.try_into().ok()),
370 search_attributes: self
371 .search_attributes
372 .map(|sa| SearchAttributes { indexed_fields: sa }),
373 cron_schedule: self.cron_schedule.unwrap_or_default(),
374 parent_close_policy: self.parent_close_policy as i32,
375 priority: self.priority.map(Into::into),
376 ..Default::default()
377 }
378 .into(),
379 ),
380 user_metadata,
381 }
382 }
383}
384
385#[derive(Debug)]
387pub struct Signal {
388 pub signal_name: String,
390 pub data: SignalData,
392}
393
394impl Signal {
395 pub fn new(
397 name: impl Into<String>,
398 input: impl IntoIterator<Item = impl Into<Payload>>,
399 ) -> Self {
400 Self {
401 signal_name: name.into(),
402 data: SignalData::new(input),
403 }
404 }
405}
406
407#[derive(Default, Debug)]
409pub struct SignalData {
410 pub input: Vec<Payload>,
412 pub headers: HashMap<String, Payload>,
414}
415
416impl SignalData {
417 pub fn new(input: impl IntoIterator<Item = impl Into<Payload>>) -> Self {
419 Self {
420 input: input.into_iter().map(Into::into).collect(),
421 headers: HashMap::new(),
422 }
423 }
424
425 pub fn with_header(
427 &mut self,
428 key: impl Into<String>,
429 payload: impl Into<Payload>,
430 ) -> &mut Self {
431 self.headers.insert(key.into(), payload.into());
432 self
433 }
434}
435
436#[derive(Default, Debug, Clone)]
438pub struct TimerOptions {
439 pub duration: Duration,
441 pub summary: Option<String>,
443}
444
445impl From<Duration> for TimerOptions {
446 fn from(duration: Duration) -> Self {
447 TimerOptions {
448 duration,
449 ..Default::default()
450 }
451 }
452}
453
454#[derive(Default, Debug, Clone)]
456pub struct NexusOperationOptions {
457 pub endpoint: String,
459 pub service: String,
461 pub operation: String,
463 pub input: Option<Payload>,
468 pub schedule_to_close_timeout: Option<Duration>,
472 pub nexus_header: HashMap<String, String>,
479 pub cancellation_type: Option<NexusOperationCancellationType>,
481 pub schedule_to_start_timeout: Option<Duration>,
487 pub start_to_close_timeout: Option<Duration>,
494}
495
496impl IntoWorkflowCommand for NexusOperationOptions {
497 fn into_command(self, seq: u32) -> WorkflowCommand {
498 WorkflowCommand {
499 user_metadata: None,
500 variant: Some(
501 ScheduleNexusOperation {
502 seq,
503 endpoint: self.endpoint,
504 service: self.service,
505 operation: self.operation,
506 input: self.input,
507 schedule_to_close_timeout: self
508 .schedule_to_close_timeout
509 .and_then(|t| t.try_into().ok()),
510 schedule_to_start_timeout: self
511 .schedule_to_start_timeout
512 .and_then(|t| t.try_into().ok()),
513 start_to_close_timeout: self
514 .start_to_close_timeout
515 .and_then(|t| t.try_into().ok()),
516 nexus_header: self.nexus_header,
517 cancellation_type: self
518 .cancellation_type
519 .unwrap_or(NexusOperationCancellationType::WaitCancellationCompleted)
520 .into(),
521 }
522 .into(),
523 ),
524 }
525 }
526}
527
528#[derive(Default, Debug, bon::Builder)]
532#[non_exhaustive]
533pub struct ContinueAsNewOptions {
534 pub workflow_type: Option<String>,
536 pub task_queue: Option<String>,
538 pub run_timeout: Option<Duration>,
540 pub task_timeout: Option<Duration>,
542 pub memo: Option<HashMap<String, Payload>>,
544 pub headers: Option<HashMap<String, Payload>>,
546 pub search_attributes: Option<SearchAttributes>,
549 pub retry_policy: Option<RetryPolicy>,
551 pub versioning_intent: Option<VersioningIntent>,
553}
554
555impl ContinueAsNewOptions {
556 pub(crate) fn into_proto(
557 self,
558 workflow_type: String,
559 arguments: Vec<Payload>,
560 ) -> ContinueAsNewWorkflowExecution {
561 ContinueAsNewWorkflowExecution {
562 workflow_type: self.workflow_type.unwrap_or(workflow_type),
563 task_queue: self.task_queue.unwrap_or_default(),
564 arguments,
565 workflow_run_timeout: self.run_timeout.and_then(|t| t.try_into().ok()),
566 workflow_task_timeout: self.task_timeout.and_then(|t| t.try_into().ok()),
567 memo: self.memo.unwrap_or_default(),
568 headers: self.headers.unwrap_or_default(),
569 search_attributes: self.search_attributes,
570 retry_policy: self.retry_policy,
571 versioning_intent: self
572 .versioning_intent
573 .unwrap_or(VersioningIntent::Unspecified)
574 .into(),
575 ..Default::default()
576 }
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use temporalio_common::protos::coresdk::workflow_commands::workflow_command::Variant;
584
585 #[test]
586 fn activity_options_with_start_to_close_timeout_wrapper_supports_builder_chaining() {
587 let opts = ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5))
588 .heartbeat_timeout(Duration::from_secs(2))
589 .build();
590
591 assert_eq!(
592 opts.close_timeouts,
593 ActivityCloseTimeouts::StartToClose(Duration::from_secs(5))
594 );
595 assert_eq!(opts.heartbeat_timeout, Some(Duration::from_secs(2)));
596 }
597
598 #[test]
599 fn activity_options_with_schedule_to_close_timeout_wrapper_supports_builder_chaining() {
600 let opts = ActivityOptions::with_schedule_to_close_timeout(Duration::from_secs(5))
601 .heartbeat_timeout(Duration::from_secs(2))
602 .build();
603
604 assert_eq!(
605 opts.close_timeouts,
606 ActivityCloseTimeouts::ScheduleToClose(Duration::from_secs(5))
607 );
608 assert_eq!(opts.heartbeat_timeout, Some(Duration::from_secs(2)));
609 }
610
611 #[test]
612 fn activity_options_both_close_timeouts_map_to_command() {
613 let cmd = ActivityOptions::with_close_timeouts(ActivityCloseTimeouts::Both {
614 start_to_close: Duration::from_secs(3),
615 schedule_to_close: Duration::from_secs(8),
616 })
617 .build()
618 .into_command("test".to_string(), vec![], 7);
619 let schedule_cmd = match cmd.variant.unwrap() {
620 Variant::ScheduleActivity(cmd) => cmd,
621 other => panic!("Expected ScheduleActivity, got {other:?}"),
622 };
623
624 assert_eq!(schedule_cmd.start_to_close_timeout.unwrap().seconds, 3);
625 assert_eq!(schedule_cmd.schedule_to_close_timeout.unwrap().seconds, 8);
626 }
627
628 #[test]
629 fn child_workflow_run_timeout_uses_run_timeout_field() {
630 let opts = ChildWorkflowOptions {
631 workflow_id: "test-wf".to_string(),
632 execution_timeout: Some(Duration::from_secs(60)),
633 run_timeout: Some(Duration::from_secs(10)),
634 ..Default::default()
635 };
636 let cmd = opts.into_command("TestWorkflow".to_string(), vec![], 1);
637 let variant = cmd.variant.unwrap();
638 let start_cmd: StartChildWorkflowExecution = match variant {
639 temporalio_common::protos::coresdk::workflow_commands::workflow_command::Variant::StartChildWorkflowExecution(s) => s,
640 other => panic!("Expected StartChildWorkflowExecution, got {other:?}"),
641 };
642
643 let exec_timeout = start_cmd.workflow_execution_timeout.unwrap();
644 let run_timeout = start_cmd.workflow_run_timeout.unwrap();
645 assert_eq!(exec_timeout.seconds, 60);
646 assert_eq!(run_timeout.seconds, 10);
647 }
648
649 #[test]
650 fn child_workflow_run_timeout_none_when_unset() {
651 let opts = ChildWorkflowOptions {
652 workflow_id: "test-wf".to_string(),
653 execution_timeout: Some(Duration::from_secs(60)),
654 ..Default::default()
655 };
656 let cmd = opts.into_command("TestWorkflow".to_string(), vec![], 1);
657 let variant = cmd.variant.unwrap();
658 let start_cmd: StartChildWorkflowExecution = match variant {
659 temporalio_common::protos::coresdk::workflow_commands::workflow_command::Variant::StartChildWorkflowExecution(s) => s,
660 other => panic!("Expected StartChildWorkflowExecution, got {other:?}"),
661 };
662
663 assert_eq!(start_cmd.workflow_execution_timeout.unwrap().seconds, 60);
664 assert!(start_cmd.workflow_run_timeout.is_none());
665 }
666}