1use std::{collections::HashMap, time::Duration};
2
3use crate::runtime::types::ContinueAsNewRequest;
4use temporalio_common_wasm::{
5 Priority,
6 data_converters::{
7 GenericPayloadConverter, PayloadConverter, SerializationContext, SerializationContextData,
8 },
9 protos::{
10 coresdk::{
11 child_workflow::{ChildWorkflowCancellationType, ParentClosePolicy},
12 common::VersioningIntent,
13 nexus::NexusOperationCancellationType,
14 workflow_activation::SignalWorkflow,
15 workflow_commands::{
16 ActivityCancellationType, ContinueAsNewWorkflowExecution, ScheduleActivity,
17 ScheduleLocalActivity, ScheduleNexusOperation, StartChildWorkflowExecution,
18 StartTimer, WorkflowCommand, workflow_command,
19 },
20 },
21 temporal::api::{
22 common::v1::{Payload, RetryPolicy, SearchAttributes},
23 enums::v1::{
24 ContinueAsNewVersioningBehavior as ProtoContinueAsNewVersioningBehavior,
25 WorkflowIdReusePolicy,
26 },
27 sdk::v1::UserMetadata,
28 },
29 },
30};
31#[derive(Debug, bon::Builder, Clone)]
33#[non_exhaustive]
34#[builder(start_fn = with_close_timeouts, on(String, into), state_mod(vis = "pub"))]
35pub struct ActivityOptions {
36 #[builder(start_fn)]
40 pub close_timeouts: ActivityCloseTimeouts,
41 pub activity_id: Option<String>,
47 pub task_queue: Option<String>,
51 pub schedule_to_start_timeout: Option<Duration>,
58 pub heartbeat_timeout: Option<Duration>,
61 #[builder(default)]
63 pub cancellation_type: ActivityCancellationType,
64 pub retry_policy: Option<RetryPolicy>,
66 pub summary: Option<String>,
68 pub priority: Option<Priority>,
70 #[builder(default)]
72 pub do_not_eagerly_execute: bool,
73}
74
75impl ActivityOptions {
76 pub fn with_start_to_close_timeout(duration: Duration) -> ActivityOptionsBuilder {
78 Self::with_close_timeouts(ActivityCloseTimeouts::StartToClose(duration))
79 }
80
81 pub fn with_schedule_to_close_timeout(duration: Duration) -> ActivityOptionsBuilder {
83 Self::with_close_timeouts(ActivityCloseTimeouts::ScheduleToClose(duration))
84 }
85
86 pub fn start_to_close_timeout(duration: Duration) -> Self {
90 Self::with_start_to_close_timeout(duration).build()
91 }
92
93 pub fn schedule_to_close_timeout(duration: Duration) -> Self {
97 Self::with_schedule_to_close_timeout(duration).build()
98 }
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum ActivityCloseTimeouts {
104 ScheduleToClose(Duration),
109 StartToClose(Duration),
116 Both {
118 start_to_close: Duration,
120 schedule_to_close: Duration,
122 },
123}
124
125impl ActivityCloseTimeouts {
126 fn into_durations(self) -> (Option<Duration>, Option<Duration>) {
127 match self {
128 Self::ScheduleToClose(schedule_to_close) => (None, Some(schedule_to_close)),
129 Self::StartToClose(start_to_close) => (Some(start_to_close), None),
130 Self::Both {
131 start_to_close,
132 schedule_to_close,
133 } => (Some(start_to_close), Some(schedule_to_close)),
134 }
135 }
136}
137
138impl ActivityOptions {
139 pub(crate) fn into_command(
140 self,
141 seq: u32,
142 activity_type: String,
143 args: Vec<Payload>,
144 ) -> WorkflowCommand {
145 let (start_to_close_timeout, schedule_to_close_timeout) =
146 self.close_timeouts.into_durations();
147 command_with_metadata(
148 workflow_command::Variant::ScheduleActivity(ScheduleActivity {
149 seq,
150 activity_type,
151 activity_id: self.activity_id.unwrap_or_else(|| seq.to_string()),
152 task_queue: self.task_queue.unwrap_or_default(),
153 arguments: args,
154 schedule_to_close_timeout: schedule_to_close_timeout
155 .and_then(|duration| duration.try_into().ok()),
156 schedule_to_start_timeout: self
157 .schedule_to_start_timeout
158 .and_then(|duration| duration.try_into().ok()),
159 start_to_close_timeout: start_to_close_timeout
160 .and_then(|duration| duration.try_into().ok()),
161 heartbeat_timeout: self
162 .heartbeat_timeout
163 .and_then(|duration| duration.try_into().ok()),
164 cancellation_type: self.cancellation_type.into(),
165 retry_policy: self.retry_policy,
166 priority: self.priority.map(Into::into),
167 do_not_eagerly_execute: self.do_not_eagerly_execute,
168 ..Default::default()
169 }),
170 self.summary,
171 None,
172 )
173 }
174}
175
176#[derive(Default, Debug, Clone)]
178pub struct LocalActivityOptions {
179 pub activity_id: Option<String>,
185 pub retry_policy: RetryPolicy,
187 pub attempt: Option<u32>,
190 pub original_schedule_time: Option<prost_types::Timestamp>,
193 pub timer_backoff_threshold: Option<Duration>,
195 pub cancel_type: ActivityCancellationType,
197 pub schedule_to_close_timeout: Option<Duration>,
201 pub schedule_to_start_timeout: Option<Duration>,
207 pub start_to_close_timeout: Option<Duration>,
212 pub summary: Option<String>,
214}
215
216impl LocalActivityOptions {
217 pub(crate) fn into_command(
218 mut self,
219 seq: u32,
220 activity_type: String,
221 args: Vec<Payload>,
222 ) -> WorkflowCommand {
223 self.schedule_to_close_timeout
226 .get_or_insert(Duration::from_secs(100));
227 command_with_metadata(
228 workflow_command::Variant::ScheduleLocalActivity(ScheduleLocalActivity {
229 seq,
230 activity_type,
231 activity_id: self.activity_id.unwrap_or_else(|| seq.to_string()),
232 arguments: args,
233 retry_policy: Some(self.retry_policy),
234 attempt: self.attempt.unwrap_or(1),
235 original_schedule_time: self.original_schedule_time,
236 local_retry_threshold: self
237 .timer_backoff_threshold
238 .and_then(|duration| duration.try_into().ok()),
239 cancellation_type: self.cancel_type.into(),
240 schedule_to_close_timeout: self
241 .schedule_to_close_timeout
242 .and_then(|duration| duration.try_into().ok()),
243 schedule_to_start_timeout: self
244 .schedule_to_start_timeout
245 .and_then(|duration| duration.try_into().ok()),
246 start_to_close_timeout: self
247 .start_to_close_timeout
248 .and_then(|duration| duration.try_into().ok()),
249 ..Default::default()
250 }),
251 self.summary,
252 None,
253 )
254 }
255}
256
257#[derive(Default, Debug, Clone)]
259pub struct ChildWorkflowOptions {
260 pub workflow_id: String,
262 pub task_queue: Option<String>,
266 pub cancel_type: ChildWorkflowCancellationType,
268 pub parent_close_policy: ParentClosePolicy,
270 pub static_summary: Option<String>,
272 pub static_details: Option<String>,
274 pub id_reuse_policy: WorkflowIdReusePolicy,
276 pub execution_timeout: Option<Duration>,
278 pub run_timeout: Option<Duration>,
280 pub task_timeout: Option<Duration>,
282 pub cron_schedule: Option<String>,
284 pub search_attributes: Option<HashMap<String, Payload>>,
286 pub priority: Option<Priority>,
288}
289
290impl ChildWorkflowOptions {
291 pub(crate) fn into_command(
292 self,
293 seq: u32,
294 workflow_type: String,
295 args: Vec<Payload>,
296 ) -> WorkflowCommand {
297 command_with_metadata(
298 workflow_command::Variant::StartChildWorkflowExecution(StartChildWorkflowExecution {
299 seq,
300 workflow_type,
301 workflow_id: self.workflow_id,
302 task_queue: self.task_queue.unwrap_or_default(),
303 input: args,
304 cancellation_type: self.cancel_type.into(),
305 parent_close_policy: self.parent_close_policy.into(),
306 workflow_id_reuse_policy: match self.id_reuse_policy {
307 WorkflowIdReusePolicy::Unspecified => WorkflowIdReusePolicy::AllowDuplicate,
308 policy => policy,
309 }
310 .into(),
311 workflow_execution_timeout: self
312 .execution_timeout
313 .and_then(|duration| duration.try_into().ok()),
314 workflow_run_timeout: self
315 .run_timeout
316 .and_then(|duration| duration.try_into().ok()),
317 workflow_task_timeout: self
318 .task_timeout
319 .and_then(|duration| duration.try_into().ok()),
320 cron_schedule: self.cron_schedule.unwrap_or_default(),
321 search_attributes: self.search_attributes.and_then(|attrs| {
322 (!attrs.is_empty()).then_some(SearchAttributes {
323 indexed_fields: attrs,
324 })
325 }),
326 priority: self.priority.map(Into::into),
327 ..Default::default()
328 }),
329 self.static_summary,
330 self.static_details,
331 )
332 }
333}
334
335#[derive(Debug)]
337pub struct Signal {
338 pub signal_name: String,
340 pub data: SignalData,
342}
343
344impl Signal {
345 pub fn new(
347 name: impl Into<String>,
348 input: impl IntoIterator<Item = impl Into<Payload>>,
349 ) -> Self {
350 Self {
351 signal_name: name.into(),
352 data: SignalData::new(input),
353 }
354 }
355
356 pub(crate) fn into_invocation(self) -> SignalWorkflow {
357 SignalWorkflow {
358 signal_name: self.signal_name,
359 input: self.data.input,
360 identity: String::new(),
361 headers: self.data.headers,
362 }
363 }
364}
365
366#[derive(Default, Debug)]
368pub struct SignalData {
369 pub input: Vec<Payload>,
371 pub headers: HashMap<String, Payload>,
373}
374
375impl SignalData {
376 pub fn new(input: impl IntoIterator<Item = impl Into<Payload>>) -> Self {
378 Self {
379 input: input.into_iter().map(Into::into).collect(),
380 headers: HashMap::new(),
381 }
382 }
383
384 pub fn with_header(
386 &mut self,
387 key: impl Into<String>,
388 payload: impl Into<Payload>,
389 ) -> &mut Self {
390 self.headers.insert(key.into(), payload.into());
391 self
392 }
393}
394
395#[derive(Default, Debug, Clone)]
397pub struct TimerOptions {
398 pub duration: Duration,
400 pub summary: Option<String>,
402}
403
404impl From<Duration> for TimerOptions {
405 fn from(duration: Duration) -> Self {
406 TimerOptions {
407 duration,
408 ..Default::default()
409 }
410 }
411}
412
413impl TimerOptions {
414 pub(crate) fn into_command(self, seq: u32) -> WorkflowCommand {
415 command_with_metadata(
416 workflow_command::Variant::StartTimer(StartTimer {
417 seq,
418 start_to_fire_timeout: Some(
419 self.duration
420 .try_into()
421 .expect("workflow timer timeout must fit into protobuf duration"),
422 ),
423 }),
424 self.summary,
425 None,
426 )
427 }
428}
429
430#[derive(Default, Debug, Clone)]
432pub struct NexusOperationOptions {
433 pub endpoint: String,
435 pub service: String,
437 pub operation: String,
439 pub input: Option<Payload>,
444 pub schedule_to_close_timeout: Option<Duration>,
448 pub nexus_header: HashMap<String, String>,
455 pub cancellation_type: Option<NexusOperationCancellationType>,
457 pub schedule_to_start_timeout: Option<Duration>,
463 pub start_to_close_timeout: Option<Duration>,
470}
471
472impl NexusOperationOptions {
473 pub(crate) fn into_command(self, seq: u32) -> WorkflowCommand {
474 workflow_command::Variant::ScheduleNexusOperation(ScheduleNexusOperation {
475 seq,
476 endpoint: self.endpoint,
477 service: self.service,
478 operation: self.operation,
479 input: self.input,
480 schedule_to_close_timeout: self
481 .schedule_to_close_timeout
482 .and_then(|duration| duration.try_into().ok()),
483 schedule_to_start_timeout: self
484 .schedule_to_start_timeout
485 .and_then(|duration| duration.try_into().ok()),
486 start_to_close_timeout: self
487 .start_to_close_timeout
488 .and_then(|duration| duration.try_into().ok()),
489 nexus_header: self.nexus_header,
490 cancellation_type: self
491 .cancellation_type
492 .unwrap_or(NexusOperationCancellationType::WaitCancellationCompleted)
493 .into(),
494 })
495 .into()
496 }
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
501#[non_exhaustive]
502pub enum ContinueAsNewVersioningBehavior {
503 #[default]
505 Unspecified,
506 AutoUpgrade,
508 UseRampingVersion,
510}
511
512impl From<ContinueAsNewVersioningBehavior> for ProtoContinueAsNewVersioningBehavior {
513 fn from(value: ContinueAsNewVersioningBehavior) -> Self {
514 match value {
515 ContinueAsNewVersioningBehavior::Unspecified => {
516 ProtoContinueAsNewVersioningBehavior::Unspecified
517 }
518 ContinueAsNewVersioningBehavior::AutoUpgrade => {
519 ProtoContinueAsNewVersioningBehavior::AutoUpgrade
520 }
521 ContinueAsNewVersioningBehavior::UseRampingVersion => {
522 ProtoContinueAsNewVersioningBehavior::UseRampingVersion
523 }
524 }
525 }
526}
527
528impl From<ProtoContinueAsNewVersioningBehavior> for ContinueAsNewVersioningBehavior {
529 fn from(value: ProtoContinueAsNewVersioningBehavior) -> Self {
530 match value {
531 ProtoContinueAsNewVersioningBehavior::Unspecified => {
532 ContinueAsNewVersioningBehavior::Unspecified
533 }
534 ProtoContinueAsNewVersioningBehavior::AutoUpgrade => {
535 ContinueAsNewVersioningBehavior::AutoUpgrade
536 }
537 ProtoContinueAsNewVersioningBehavior::UseRampingVersion => {
538 ContinueAsNewVersioningBehavior::UseRampingVersion
539 }
540 }
541 }
542}
543
544#[derive(Default, Debug, bon::Builder)]
548#[non_exhaustive]
549pub struct ContinueAsNewOptions {
550 pub workflow_type: Option<String>,
552 pub task_queue: Option<String>,
554 pub run_timeout: Option<Duration>,
556 pub task_timeout: Option<Duration>,
558 pub backoff_start_interval: Option<Duration>,
560 pub memo: Option<HashMap<String, Payload>>,
562 pub headers: Option<HashMap<String, Payload>>,
564 pub search_attributes: Option<SearchAttributes>,
567 pub retry_policy: Option<RetryPolicy>,
569 pub versioning_intent: Option<VersioningIntent>,
571 pub initial_versioning_behavior: Option<ContinueAsNewVersioningBehavior>,
577}
578
579impl ContinueAsNewOptions {
580 pub(crate) fn into_request(
581 self,
582 workflow_type: String,
583 arguments: Vec<Payload>,
584 ) -> ContinueAsNewRequest {
585 ContinueAsNewWorkflowExecution {
586 workflow_type: self.workflow_type.unwrap_or(workflow_type),
587 task_queue: self.task_queue.unwrap_or_default(),
588 arguments,
589 workflow_run_timeout: self
590 .run_timeout
591 .and_then(|duration| duration.try_into().ok()),
592 workflow_task_timeout: self
593 .task_timeout
594 .and_then(|duration| duration.try_into().ok()),
595 backoff_start_interval: self
596 .backoff_start_interval
597 .and_then(|duration| duration.try_into().ok()),
598 memo: self.memo.unwrap_or_default(),
599 headers: self.headers.unwrap_or_default(),
600 search_attributes: self.search_attributes,
601 retry_policy: self.retry_policy,
602 versioning_intent: self
603 .versioning_intent
604 .unwrap_or(VersioningIntent::Unspecified)
605 .into(),
606 initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::from(
607 self.initial_versioning_behavior
608 .unwrap_or(ContinueAsNewVersioningBehavior::Unspecified),
609 )
610 .into(),
611 }
612 }
613}
614
615fn command_with_metadata(
616 variant: workflow_command::Variant,
617 summary: Option<String>,
618 details: Option<String>,
619) -> WorkflowCommand {
620 WorkflowCommand {
621 variant: Some(variant),
622 user_metadata: string_user_metadata(summary, details),
623 }
624}
625
626fn string_user_metadata(summary: Option<String>, details: Option<String>) -> Option<UserMetadata> {
627 if summary.is_none() && details.is_none() {
628 return None;
629 }
630 let converter = PayloadConverter::default();
631 let context = SerializationContext {
632 data: &SerializationContextData::Workflow,
633 converter: &converter,
634 };
635 Some(UserMetadata {
636 summary: summary.map(|value| {
637 converter
638 .to_payload(&context, &value)
639 .expect("String-to-JSON payload serialization is infallible")
640 }),
641 details: details.map(|value| {
642 converter
643 .to_payload(&context, &value)
644 .expect("String-to-JSON payload serialization is infallible")
645 }),
646 })
647}
648
649#[cfg(test)]
650mod tests {
651 use super::*;
652
653 #[test]
654 fn continue_as_new_options_maps_backoff_start_interval_to_request() {
655 let req = ContinueAsNewOptions {
656 backoff_start_interval: Some(Duration::from_secs(7)),
657 ..Default::default()
658 }
659 .into_request("test-workflow".to_string(), vec![]);
660
661 let backoff = req
662 .backoff_start_interval
663 .expect("backoff_start_interval should be set");
664 assert_eq!(backoff.seconds, 7);
665 assert_eq!(backoff.nanos, 0);
666 }
667
668 #[test]
669 fn activity_options_with_start_to_close_timeout_wrapper_supports_builder_chaining() {
670 let opts = ActivityOptions::with_start_to_close_timeout(Duration::from_secs(5))
671 .heartbeat_timeout(Duration::from_secs(2))
672 .build();
673
674 assert_eq!(
675 opts.close_timeouts,
676 ActivityCloseTimeouts::StartToClose(Duration::from_secs(5))
677 );
678 assert_eq!(opts.heartbeat_timeout, Some(Duration::from_secs(2)));
679 }
680
681 #[test]
682 fn activity_options_with_schedule_to_close_timeout_wrapper_supports_builder_chaining() {
683 let opts = ActivityOptions::with_schedule_to_close_timeout(Duration::from_secs(5))
684 .heartbeat_timeout(Duration::from_secs(2))
685 .build();
686
687 assert_eq!(
688 opts.close_timeouts,
689 ActivityCloseTimeouts::ScheduleToClose(Duration::from_secs(5))
690 );
691 assert_eq!(opts.heartbeat_timeout, Some(Duration::from_secs(2)));
692 }
693
694 #[test]
695 fn activity_options_both_close_timeouts_map_to_command() {
696 let req = ActivityOptions::with_close_timeouts(ActivityCloseTimeouts::Both {
697 start_to_close: Duration::from_secs(3),
698 schedule_to_close: Duration::from_secs(8),
699 })
700 .build()
701 .into_command(7, "test".to_string(), vec![]);
702 let Some(workflow_command::Variant::ScheduleActivity(req)) = req.variant else {
703 panic!("expected ScheduleActivity command");
704 };
705 assert_eq!(req.start_to_close_timeout.unwrap().seconds, 3);
706 assert_eq!(req.schedule_to_close_timeout.unwrap().seconds, 8);
707 }
708
709 #[test]
710 fn child_workflow_run_timeout_uses_run_timeout_field() {
711 let opts = ChildWorkflowOptions {
712 workflow_id: "test-wf".to_string(),
713 execution_timeout: Some(Duration::from_secs(60)),
714 run_timeout: Some(Duration::from_secs(10)),
715 ..Default::default()
716 };
717 let command = opts.into_command(1, "TestWorkflow".to_string(), vec![]);
718 let Some(workflow_command::Variant::StartChildWorkflowExecution(req)) = command.variant
719 else {
720 panic!("expected StartChildWorkflowExecution command");
721 };
722 let exec_timeout = req.workflow_execution_timeout.unwrap();
723 let run_timeout = req.workflow_run_timeout.unwrap();
724 assert_eq!(exec_timeout.seconds, 60);
725 assert_eq!(run_timeout.seconds, 10);
726 }
727
728 #[test]
729 fn child_workflow_run_timeout_none_when_unset() {
730 let opts = ChildWorkflowOptions {
731 workflow_id: "test-wf".to_string(),
732 execution_timeout: Some(Duration::from_secs(60)),
733 ..Default::default()
734 };
735 let command = opts.into_command(1, "TestWorkflow".to_string(), vec![]);
736 let Some(workflow_command::Variant::StartChildWorkflowExecution(req)) = command.variant
737 else {
738 panic!("expected StartChildWorkflowExecution command");
739 };
740 let exec_timeout = req.workflow_execution_timeout.unwrap();
741 assert_eq!(exec_timeout.seconds, 60);
742 assert!(req.workflow_run_timeout.is_none());
743 }
744}