1mod options;
2
3pub use options::{
4 ActivityCloseTimeouts, ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions,
5 ContinueAsNewVersioningBehavior, LocalActivityOptions, NexusOperationOptions, Signal,
6 SignalData, TimerOptions,
7};
8pub use temporalio_common_wasm::protos::coresdk::child_workflow::StartChildWorkflowExecutionFailedCause;
9
10use crate::runtime::{
11 SdkGuardedFuture, SdkWakeGuard,
12 entry::WorkflowImplementation,
13 host::WorkflowHost,
14 model::{
15 CancelExternalWfResult, CancellableID, NexusStartResult, SignalExternalWfResult,
16 TimerResult, UnblockEvent, Unblockable, WorkflowTermination,
17 },
18};
19use futures_channel::oneshot;
20use futures_util::{
21 FutureExt,
22 future::{FusedFuture, Shared},
23 task::Context,
24};
25use std::{
26 cell::{Cell, Ref, RefCell},
27 collections::HashMap,
28 future::{self, Future},
29 marker::PhantomData,
30 ops::Deref,
31 pin::Pin,
32 rc::Rc,
33 sync::atomic::{AtomicBool, Ordering},
34 task::{Poll, Waker},
35 time::{Duration, SystemTime},
36};
37use temporalio_common_wasm::{
38 ActivityDefinition, SignalDefinition, WorkflowDefinition,
39 data_converters::{
40 ActivityExecutionDecodeHint, ChildWorkflowExecutionDecodeHint,
41 ChildWorkflowStartDecodeHint, DataConverter, GenericPayloadConverter, PayloadConverter,
42 SerializationContext, SerializationContextData, TemporalDeserializable,
43 WorkflowSignalDecodeHint,
44 },
45 error::{
46 ActivityExecutionError, ChildWorkflowExecutionError, ChildWorkflowStartError,
47 WorkflowSignalError,
48 },
49 protos::{
50 coresdk::{
51 activity_result::{ActivityResolution, Cancellation, activity_resolution},
52 child_workflow::{ChildWorkflowResult, child_workflow_result},
53 common::NamespacedWorkflowExecution,
54 nexus::NexusOperationResult,
55 workflow_activation::{
56 InitializeWorkflow, WorkflowActivation as CoreWorkflowActivation,
57 resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
58 workflow_activation_job::Variant as ActivationVariant,
59 },
60 workflow_commands::{
61 CancelChildWorkflowExecution, CancelSignalWorkflow, CancelTimer,
62 ModifyWorkflowProperties, RequestCancelActivity,
63 RequestCancelExternalWorkflowExecution, RequestCancelLocalActivity,
64 RequestCancelNexusOperation, SetPatchMarker, SignalExternalWorkflowExecution,
65 UpsertWorkflowSearchAttributes, signal_external_workflow_execution,
66 workflow_command,
67 },
68 },
69 temporal::api::{
70 common::v1::{Memo, Payload, SearchAttributes},
71 failure::v1::{CanceledFailureInfo, Failure, failure::FailureInfo},
72 },
73 utilities::TryIntoOrNone,
74 },
75 worker::WorkerDeploymentVersion,
76};
77
78#[derive(Clone)]
82pub struct BaseWorkflowContext {
83 inner: Rc<WorkflowContextInner>,
84}
85impl BaseWorkflowContext {
86 pub(crate) fn apply_activation_context(&self, activation: &CoreWorkflowActivation) {
87 let mut shared = self.inner.shared.borrow_mut();
88 shared.activation = activation.clone();
89 if let Some(seed) = activation.jobs.iter().find_map(|job| match &job.variant {
90 Some(ActivationVariant::UpdateRandomSeed(attrs)) => Some(attrs.randomness_seed),
91 _ => None,
92 }) {
93 shared.random_seed = seed;
94 }
95 }
96
97 pub fn data_converter(&self) -> &DataConverter {
99 &self.inner.data_converter
100 }
101
102 pub(crate) fn record_patch(&self, patch_id: String, present: bool) {
103 self.inner
104 .shared
105 .borrow_mut()
106 .changes
107 .insert(patch_id, present);
108 }
109
110 pub(crate) fn view(&self) -> WorkflowContextView {
112 WorkflowContextView::new(
113 self.inner.namespace.clone(),
114 self.inner.task_queue.clone(),
115 self.inner.run_id.clone(),
116 &self.inner.inital_information,
117 )
118 }
119}
120
121#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
122enum PendingCommandId {
123 Timer(u32),
124 Activity(u32),
125 ChildWorkflowStart(u32),
126 ChildWorkflowComplete(u32),
127 SignalExternal(u32),
128 CancelExternal(u32),
129 NexusOpStart(u32),
130 NexusOpComplete(u32),
131}
132
133impl PendingCommandId {
134 fn from_unblock_event(event: &UnblockEvent) -> Self {
135 match event {
136 UnblockEvent::Timer(seq, _) => Self::Timer(*seq),
137 UnblockEvent::Activity(seq, _) => Self::Activity(*seq),
138 UnblockEvent::WorkflowStart(seq, _) => Self::ChildWorkflowStart(*seq),
139 UnblockEvent::WorkflowComplete(seq, _) => Self::ChildWorkflowComplete(*seq),
140 UnblockEvent::SignalExternal(seq, _) => Self::SignalExternal(*seq),
141 UnblockEvent::CancelExternal(seq, _) => Self::CancelExternal(*seq),
142 UnblockEvent::NexusOperationStart(seq, _) => Self::NexusOpStart(*seq),
143 UnblockEvent::NexusOperationComplete(seq, _) => Self::NexusOpComplete(*seq),
144 }
145 }
146}
147
148struct WorkflowRuntimeState {
149 host: Rc<dyn WorkflowHost>,
150 pending_unblocks: RefCell<HashMap<PendingCommandId, oneshot::Sender<UnblockEvent>>>,
151 forced_wft_failure: RefCell<Option<Box<dyn std::error::Error + Send + Sync>>>,
152 progress_made: Cell<bool>,
153}
154
155impl WorkflowRuntimeState {
156 fn new(host: Rc<dyn WorkflowHost>) -> Self {
157 Self {
158 host,
159 pending_unblocks: RefCell::new(HashMap::new()),
160 forced_wft_failure: RefCell::new(None),
161 progress_made: Cell::new(false),
162 }
163 }
164
165 fn register_unblocker(&self, id: PendingCommandId, unblocker: oneshot::Sender<UnblockEvent>) {
166 self.pending_unblocks.borrow_mut().insert(id, unblocker);
167 }
168
169 fn unblock(&self, event: UnblockEvent) -> Result<(), anyhow::Error> {
170 let id = PendingCommandId::from_unblock_event(&event);
171 let unblocker = self
172 .pending_unblocks
173 .borrow_mut()
174 .remove(&id)
175 .ok_or_else(|| anyhow::anyhow!("Command {id:?} not found to unblock"))?;
176 self.progress_made.set(true);
177 let _guard = SdkWakeGuard::new();
178 let _ = unblocker.send(event);
179 Ok(())
180 }
181
182 fn maybe_unblock(&self, event: UnblockEvent) -> bool {
183 let id = PendingCommandId::from_unblock_event(&event);
184 let Some(unblocker) = self.pending_unblocks.borrow_mut().remove(&id) else {
185 return false;
186 };
187 self.progress_made.set(true);
188 let _guard = SdkWakeGuard::new();
189 let _ = unblocker.send(event);
190 true
191 }
192
193 fn set_forced_wft_failure(&self, err: Box<dyn std::error::Error + Send + Sync>) {
194 *self.forced_wft_failure.borrow_mut() = Some(err);
195 self.progress_made.set(true);
196 }
197
198 fn take_forced_wft_failure(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
199 self.forced_wft_failure.borrow_mut().take()
200 }
201
202 fn mark_progress(&self) {
203 self.progress_made.set(true);
204 }
205
206 fn take_progress(&self) -> bool {
207 self.progress_made.replace(false)
208 }
209}
210
211struct WorkflowContextInner {
212 namespace: String,
213 task_queue: String,
214 run_id: String,
215 inital_information: InitializeWorkflow,
216 runtime: WorkflowRuntimeState,
217 cancelled_reason: RefCell<Option<String>>,
218 cancel_wakers: RefCell<Vec<Waker>>,
219 shared: RefCell<WorkflowContextSharedData>,
220 seq_nums: RefCell<WfCtxProtectedDat>,
221 data_converter: DataConverter,
222 state_mutated: Cell<bool>,
223}
224
225pub struct SyncWorkflowContext<W> {
233 base: BaseWorkflowContext,
234 headers: Rc<HashMap<String, Payload>>,
236 _phantom: PhantomData<W>,
237}
238
239impl<W> Clone for SyncWorkflowContext<W> {
240 fn clone(&self) -> Self {
241 Self {
242 base: self.base.clone(),
243 headers: self.headers.clone(),
244 _phantom: PhantomData,
245 }
246 }
247}
248
249pub struct WorkflowContext<W> {
254 sync: SyncWorkflowContext<W>,
255 workflow_state: Rc<RefCell<W>>,
257 condition_wakers: Rc<RefCell<Vec<Waker>>>,
261}
262
263impl<W> Clone for WorkflowContext<W> {
264 fn clone(&self) -> Self {
265 Self {
266 sync: self.sync.clone(),
267 workflow_state: self.workflow_state.clone(),
268 condition_wakers: self.condition_wakers.clone(),
269 }
270 }
271}
272
273#[derive(Clone, Debug)]
277#[non_exhaustive]
278pub struct WorkflowContextView {
279 pub workflow_id: String,
281 pub run_id: String,
283 pub workflow_type: String,
285 pub task_queue: String,
287 pub namespace: String,
289
290 pub attempt: u32,
292 pub first_execution_run_id: String,
294 pub continued_from_run_id: Option<String>,
296
297 pub start_time: Option<SystemTime>,
299 pub execution_timeout: Option<Duration>,
301 pub run_timeout: Option<Duration>,
303 pub task_timeout: Option<Duration>,
305
306 pub parent: Option<ParentWorkflowInfo>,
308 pub root: Option<RootWorkflowInfo>,
310
311 pub retry_policy:
313 Option<temporalio_common_wasm::protos::temporal::api::common::v1::RetryPolicy>,
314 pub cron_schedule: Option<String>,
316 pub memo: Option<Memo>,
318 pub search_attributes: Option<SearchAttributes>,
320}
321
322#[derive(Clone, Debug)]
324#[non_exhaustive]
325pub struct ParentWorkflowInfo {
326 pub workflow_id: String,
328 pub run_id: String,
330 pub namespace: String,
332}
333
334#[derive(Clone, Debug)]
336#[non_exhaustive]
337pub struct RootWorkflowInfo {
338 pub workflow_id: String,
340 pub run_id: String,
342}
343
344impl WorkflowContextView {
345 pub(crate) fn new(
347 namespace: String,
348 task_queue: String,
349 run_id: String,
350 init: &InitializeWorkflow,
351 ) -> Self {
352 let parent = init
353 .parent_workflow_info
354 .as_ref()
355 .map(|p| ParentWorkflowInfo {
356 workflow_id: p.workflow_id.clone(),
357 run_id: p.run_id.clone(),
358 namespace: p.namespace.clone(),
359 });
360
361 let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
362 workflow_id: r.workflow_id.clone(),
363 run_id: r.run_id.clone(),
364 });
365
366 let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
367 None
368 } else {
369 Some(init.continued_from_execution_run_id.clone())
370 };
371
372 let cron_schedule = if init.cron_schedule.is_empty() {
373 None
374 } else {
375 Some(init.cron_schedule.clone())
376 };
377
378 Self {
379 workflow_id: init.workflow_id.clone(),
380 run_id,
381 workflow_type: init.workflow_type.clone(),
382 task_queue,
383 namespace,
384 attempt: init.attempt as u32,
385 first_execution_run_id: init.first_execution_run_id.clone(),
386 continued_from_run_id,
387 start_time: init.start_time.and_then(|t| t.try_into().ok()),
388 execution_timeout: init
389 .workflow_execution_timeout
390 .and_then(|d| d.try_into().ok()),
391 run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
392 task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
393 parent,
394 root,
395 retry_policy: init.retry_policy.clone(),
396 cron_schedule,
397 memo: init.memo.clone(),
398 search_attributes: init.search_attributes.clone(),
399 }
400 }
401}
402
403impl BaseWorkflowContext {
404 #[doc(hidden)]
406 pub fn new(
407 namespace: String,
408 task_queue: String,
409 run_id: String,
410 init_workflow_job: InitializeWorkflow,
411 data_converter: DataConverter,
412 host: Rc<dyn WorkflowHost>,
413 ) -> Self {
414 Self {
415 inner: Rc::new(WorkflowContextInner {
416 namespace,
417 task_queue,
418 run_id,
419 shared: RefCell::new(WorkflowContextSharedData {
420 random_seed: init_workflow_job.randomness_seed,
421 search_attributes: init_workflow_job
422 .search_attributes
423 .clone()
424 .unwrap_or_default(),
425 ..Default::default()
426 }),
427 inital_information: init_workflow_job,
428 runtime: WorkflowRuntimeState::new(host),
429 cancelled_reason: RefCell::new(None),
430 cancel_wakers: RefCell::new(Vec::new()),
431 seq_nums: RefCell::new(WfCtxProtectedDat {
432 next_timer_sequence_number: 1,
433 next_activity_sequence_number: 1,
434 next_child_workflow_sequence_number: 1,
435 next_cancel_external_wf_sequence_number: 1,
436 next_signal_external_wf_sequence_number: 1,
437 next_nexus_op_sequence_number: 1,
438 }),
439 data_converter,
440 state_mutated: Cell::new(false),
441 }),
442 }
443 }
444
445 pub(crate) fn take_state_mutated(&self) -> bool {
448 self.inner.state_mutated.replace(false)
449 }
450
451 pub(crate) fn set_state_mutated(&self) {
453 self.inner.state_mutated.set(true);
454 }
455
456 pub(crate) fn take_runtime_progress(&self) -> bool {
457 self.inner.runtime.take_progress()
458 }
459
460 pub(crate) fn take_forced_wft_failure(
461 &self,
462 ) -> Option<Box<dyn std::error::Error + Send + Sync>> {
463 self.inner.runtime.take_forced_wft_failure()
464 }
465
466 pub(crate) fn notify_cancel(&self, reason: String) {
467 let _guard = SdkWakeGuard::new();
468 *self.inner.cancelled_reason.borrow_mut() = Some(reason);
469 for waker in self.inner.cancel_wakers.borrow_mut().drain(..) {
470 waker.wake();
471 }
472 self.inner.runtime.mark_progress();
473 }
474
475 pub(crate) fn unblock(&self, event: UnblockEvent) -> Result<(), anyhow::Error> {
476 self.inner.runtime.unblock(event)
477 }
478
479 fn cancel(&self, cancellable_id: CancellableID) {
481 match cancellable_id {
482 CancellableID::Timer(seq) => {
483 if self
484 .inner
485 .runtime
486 .maybe_unblock(UnblockEvent::Timer(seq, TimerResult::Cancelled))
487 {
488 self.inner.runtime.host.push_command(
489 workflow_command::Variant::CancelTimer(CancelTimer { seq }).into(),
490 );
491 }
492 }
493 CancellableID::Activity(seq) => {
494 self.inner.runtime.host.push_command(
495 workflow_command::Variant::RequestCancelActivity(RequestCancelActivity { seq })
496 .into(),
497 );
498 }
499 CancellableID::LocalActivity(seq) => {
500 self.inner.runtime.host.push_command(
501 workflow_command::Variant::RequestCancelLocalActivity(
502 RequestCancelLocalActivity { seq },
503 )
504 .into(),
505 );
506 }
507 CancellableID::ChildWorkflow { seqnum, reason } => {
508 self.inner.runtime.host.push_command(
509 workflow_command::Variant::CancelChildWorkflowExecution(
510 CancelChildWorkflowExecution {
511 child_workflow_seq: seqnum,
512 reason,
513 },
514 )
515 .into(),
516 );
517 }
518 CancellableID::SignalExternalWorkflow(seq) => {
519 self.inner.runtime.host.push_command(
520 workflow_command::Variant::CancelSignalWorkflow(CancelSignalWorkflow { seq })
521 .into(),
522 );
523 }
524 CancellableID::NexusOp(seq) => {
525 self.inner.runtime.host.push_command(
526 workflow_command::Variant::RequestCancelNexusOperation(
527 RequestCancelNexusOperation { seq },
528 )
529 .into(),
530 );
531 }
532 }
533 }
534
535 pub fn current_details(&self) -> String {
537 self.inner.shared.borrow().current_details.clone()
538 }
539
540 pub fn timer<T: Into<TimerOptions>>(
542 &self,
543 opts: T,
544 ) -> impl CancellableFuture<TimerResult> + use<T> {
545 let opts: TimerOptions = opts.into();
546 let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
547 let (cmd, unblocker) =
548 CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
549 self.inner
550 .runtime
551 .register_unblocker(PendingCommandId::Timer(seq), unblocker);
552 self.inner.runtime.host.push_command(opts.into_command(seq));
553 cmd
554 }
555
556 pub fn start_activity<AD: ActivityDefinition>(
558 &self,
559 _activity: AD,
560 input: impl Into<AD::Input>,
561 mut opts: ActivityOptions,
562 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
563 where
564 AD::Output: TemporalDeserializable,
565 {
566 let input = input.into();
567 let payload_converter = self.inner.data_converter.payload_converter();
568 let ctx = SerializationContext {
569 data: &SerializationContextData::Workflow,
570 converter: payload_converter,
571 };
572 let payloads = match payload_converter.to_payloads(&ctx, &input) {
573 Ok(p) => p,
574 Err(e) => {
575 return ActivityFut::eager(e.into());
576 }
577 };
578 let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
579 let (cmd, unblocker) =
580 CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
581 self.inner
582 .runtime
583 .register_unblocker(PendingCommandId::Activity(seq), unblocker);
584 if opts.task_queue.is_none() {
585 opts.task_queue = Some(self.inner.task_queue.clone());
586 }
587 self.inner.runtime.host.push_command(opts.into_command(
588 seq,
589 AD::name().to_string(),
590 payloads,
591 ));
592 ActivityFut::running(cmd, self.inner.data_converter.clone())
593 }
594
595 pub fn start_local_activity<AD: ActivityDefinition>(
597 &self,
598 _activity: AD,
599 input: impl Into<AD::Input>,
600 opts: LocalActivityOptions,
601 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
602 where
603 AD::Output: TemporalDeserializable,
604 {
605 let input = input.into();
606 let payload_converter = self.inner.data_converter.payload_converter();
607 let ctx = SerializationContext {
608 data: &SerializationContextData::Workflow,
609 converter: payload_converter,
610 };
611 let payloads = match payload_converter.to_payloads(&ctx, &input) {
612 Ok(p) => p,
613 Err(e) => {
614 return ActivityFut::eager(e.into());
615 }
616 };
617 ActivityFut::running(
618 LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
619 self.inner.data_converter.clone(),
620 )
621 }
622
623 fn start_child_workflow<WD: WorkflowDefinition>(
625 &self,
626 workflow: WD,
627 input: impl Into<WD::Input>,
628 opts: ChildWorkflowOptions,
629 ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
630 where
631 WD::Output: TemporalDeserializable,
632 {
633 let input = input.into();
634 let payload_converter = self.inner.data_converter.payload_converter();
635 let ctx = SerializationContext {
636 data: &SerializationContextData::Workflow,
637 converter: payload_converter,
638 };
639 let payloads = match payload_converter.to_payloads(&ctx, &input) {
640 Ok(p) => p,
641 Err(e) => {
642 return ChildWorkflowStartFut::eager(e.into());
643 }
644 };
645 let workflow_type = workflow.name().to_string();
646
647 let child_seq = self.inner.seq_nums.borrow_mut().next_child_workflow_seq();
648 let (result_cmd, unblocker) = CancellableWFCommandFut::new(
652 CancellableID::ChildWorkflow {
653 seqnum: child_seq,
654 reason: String::new(),
655 },
656 self.clone(),
657 );
658 self.inner.runtime.register_unblocker(
659 PendingCommandId::ChildWorkflowComplete(child_seq),
660 unblocker,
661 );
662
663 let common = ChildWfCommon {
664 workflow_id: opts.workflow_id.clone(),
665 child_seq,
666 result_future: result_cmd,
667 base_ctx: self.clone(),
668 data_converter: self.inner.data_converter.clone(),
669 };
670
671 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
672 CancellableID::ChildWorkflow {
673 seqnum: child_seq,
674 reason: String::new(),
675 },
676 common,
677 self.clone(),
678 );
679 self.inner
680 .runtime
681 .register_unblocker(PendingCommandId::ChildWorkflowStart(child_seq), unblocker);
682 self.inner
683 .runtime
684 .host
685 .push_command(opts.into_command(child_seq, workflow_type, payloads));
686
687 ChildWorkflowStartFut::Running(cmd)
688 }
689
690 fn local_activity_no_timer_retry(
692 self,
693 activity_type: String,
694 arguments: Vec<Payload>,
695 opts: LocalActivityOptions,
696 ) -> impl CancellableFuture<ActivityResolution> {
697 let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
698 let (cmd, unblocker) =
699 CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
700 self.inner
701 .runtime
702 .register_unblocker(PendingCommandId::Activity(seq), unblocker);
703 self.inner
704 .runtime
705 .host
706 .push_command(opts.into_command(seq, activity_type, arguments));
707 cmd
708 }
709
710 fn send_signal_wf(
711 self,
712 target: signal_external_workflow_execution::Target,
713 signal: Signal,
714 ) -> impl CancellableFuture<SignalExternalWfResult> {
715 let seq = self
716 .inner
717 .seq_nums
718 .borrow_mut()
719 .next_signal_external_wf_seq();
720 let (cmd, unblocker) =
721 CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
722 self.inner
723 .runtime
724 .register_unblocker(PendingCommandId::SignalExternal(seq), unblocker);
725 let signal = signal.into_invocation();
726 self.inner.runtime.host.push_command(
727 workflow_command::Variant::SignalExternalWorkflowExecution(
728 SignalExternalWorkflowExecution {
729 seq,
730 signal_name: signal.signal_name,
731 args: signal.input,
732 target: Some(target),
733 headers: signal.headers,
734 },
735 )
736 .into(),
737 );
738 cmd
739 }
740}
741
742impl<W> SyncWorkflowContext<W> {
743 pub fn workflow_id(&self) -> &str {
745 &self.base.inner.inital_information.workflow_id
746 }
747
748 pub fn run_id(&self) -> &str {
750 &self.base.inner.run_id
751 }
752
753 pub fn namespace(&self) -> &str {
755 &self.base.inner.namespace
756 }
757
758 pub fn task_queue(&self) -> &str {
760 &self.base.inner.task_queue
761 }
762
763 pub fn workflow_time(&self) -> Option<SystemTime> {
765 self.base
766 .inner
767 .shared
768 .borrow()
769 .activation
770 .timestamp
771 .try_into_or_none()
772 }
773
774 pub fn history_length(&self) -> u32 {
776 self.base.inner.shared.borrow().activation.history_length
777 }
778
779 pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
783 self.base
784 .inner
785 .shared
786 .borrow()
787 .activation
788 .clone()
789 .deployment_version_for_current_task
790 .map(Into::into)
791 }
792
793 pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
795 Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
796 }
797
798 pub fn random_seed(&self) -> u64 {
800 self.base.inner.shared.borrow().random_seed
801 }
802
803 pub fn is_replaying(&self) -> bool {
805 self.base.inner.shared.borrow().activation.is_replaying
806 }
807
808 pub fn continue_as_new_suggested(&self) -> bool {
810 self.base
811 .inner
812 .shared
813 .borrow()
814 .activation
815 .continue_as_new_suggested
816 }
817
818 pub fn target_worker_deployment_version_changed(&self) -> bool {
822 self.base
823 .inner
824 .shared
825 .borrow()
826 .activation
827 .target_worker_deployment_version_changed
828 }
829
830 pub fn headers(&self) -> &HashMap<String, Payload> {
835 &self.headers
836 }
837
838 pub fn payload_converter(&self) -> &PayloadConverter {
840 self.base.inner.data_converter.payload_converter()
841 }
842
843 pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
846 &self.base.inner.inital_information
847 }
848
849 pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
851 let inner = self.base.inner.clone();
852 future::poll_fn(move |cx| {
853 if let Some(reason) = inner.cancelled_reason.borrow().as_ref() {
854 Poll::Ready(reason.clone())
855 } else {
856 inner.cancel_wakers.borrow_mut().push(cx.waker().clone());
857 Poll::Pending
858 }
859 })
860 .fuse()
861 }
862
863 pub fn continue_as_new(
868 &self,
869 input: &<W::Run as WorkflowDefinition>::Input,
870 opts: ContinueAsNewOptions,
871 ) -> Result<std::convert::Infallible, WorkflowTermination>
872 where
873 W: WorkflowImplementation,
874 {
875 let pc = self.base.inner.data_converter.payload_converter();
876 let ctx = SerializationContext {
877 data: &SerializationContextData::Workflow,
878 converter: pc,
879 };
880 let arguments = pc
881 .to_payloads(&ctx, input)
882 .map_err(WorkflowTermination::from)?;
883 let workflow_type = self.workflow_initial_info().workflow_type.clone();
884 let request = opts.into_request(workflow_type, arguments);
885 Err(WorkflowTermination::continue_as_new(request))
886 }
887
888 pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
890 self.base.timer(opts)
891 }
892
893 pub fn start_activity<AD: ActivityDefinition>(
895 &self,
896 activity: AD,
897 input: impl Into<AD::Input>,
898 opts: ActivityOptions,
899 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
900 where
901 AD::Output: TemporalDeserializable,
902 {
903 self.base.start_activity(activity, input, opts)
904 }
905
906 pub fn start_local_activity<AD: ActivityDefinition>(
908 &self,
909 activity: AD,
910 input: impl Into<AD::Input>,
911 opts: LocalActivityOptions,
912 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
913 where
914 AD::Output: TemporalDeserializable,
915 {
916 self.base.start_local_activity(activity, input, opts)
917 }
918
919 pub fn start_child_workflow<WD: WorkflowDefinition>(
922 &self,
923 workflow: WD,
924 input: impl Into<WD::Input>,
925 opts: ChildWorkflowOptions,
926 ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
927 where
928 WD::Output: TemporalDeserializable,
929 {
930 self.base.start_child_workflow(workflow, input, opts)
931 }
932
933 #[deprecated(note = "use `start_child_workflow` instead")]
935 pub fn child_workflow<WD: WorkflowDefinition>(
936 &self,
937 workflow: WD,
938 input: impl Into<WD::Input>,
939 opts: ChildWorkflowOptions,
940 ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
941 where
942 WD::Output: TemporalDeserializable,
943 {
944 self.start_child_workflow(workflow, input, opts)
945 }
946
947 pub fn patched(&self, patch_id: &str) -> bool {
949 self.patch_impl(patch_id, false)
950 }
951
952 pub fn deprecate_patch(&self, patch_id: &str) -> bool {
955 self.patch_impl(patch_id, true)
956 }
957
958 fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
959 self.base.inner.runtime.host.push_command(
960 workflow_command::Variant::SetPatchMarker(SetPatchMarker {
961 patch_id: patch_id.to_string(),
962 deprecated,
963 })
964 .into(),
965 );
966 if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
968 return *present;
969 }
970
971 let res = !self.base.inner.shared.borrow().activation.is_replaying;
974
975 self.base
976 .inner
977 .shared
978 .borrow_mut()
979 .changes
980 .insert(patch_id.to_string(), res);
981
982 res
983 }
984
985 pub fn external_workflow(
987 &self,
988 workflow_id: impl Into<String>,
989 run_id: Option<String>,
990 ) -> ExternalWorkflowHandle {
991 ExternalWorkflowHandle {
992 workflow_id: workflow_id.into(),
993 run_id,
994 namespace: self.base.inner.namespace.clone(),
995 base_ctx: self.base.clone(),
996 }
997 }
998
999 pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1001 self.base.inner.runtime.host.push_command(
1002 workflow_command::Variant::UpsertWorkflowSearchAttributes(
1003 UpsertWorkflowSearchAttributes {
1004 search_attributes: Some(SearchAttributes {
1005 indexed_fields: attr_iter.into_iter().collect(),
1006 }),
1007 },
1008 )
1009 .into(),
1010 );
1011 }
1012
1013 pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1015 self.base.inner.runtime.host.push_command(
1016 workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
1017 upserted_memo: Some(Memo {
1018 fields: attr_iter.into_iter().collect(),
1019 }),
1020 })
1021 .into(),
1022 );
1023 }
1024
1025 pub fn set_current_details(&self, details: impl Into<String>) {
1030 let details = details.into();
1031 self.base.inner.shared.borrow_mut().current_details = details.clone();
1032 self.base.inner.runtime.host.set_current_details(details);
1033 }
1034
1035 pub fn force_task_fail(&self, with: impl Into<Box<dyn std::error::Error + Send + Sync>>) {
1037 self.base.inner.runtime.set_forced_wft_failure(with.into());
1038 }
1039
1040 pub fn start_nexus_operation(
1042 &self,
1043 opts: NexusOperationOptions,
1044 ) -> impl CancellableFuture<NexusStartResult> {
1045 let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
1046 let (result_future, unblocker) = WFCommandFut::new();
1047 self.base
1048 .inner
1049 .runtime
1050 .register_unblocker(PendingCommandId::NexusOpComplete(seq), unblocker);
1051 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
1052 CancellableID::NexusOp(seq),
1053 NexusUnblockData {
1054 result_future: result_future.shared(),
1055 schedule_seq: seq,
1056 base_ctx: self.base.clone(),
1057 },
1058 self.base.clone(),
1059 );
1060 self.base
1061 .inner
1062 .runtime
1063 .register_unblocker(PendingCommandId::NexusOpStart(seq), unblocker);
1064 self.base
1065 .inner
1066 .runtime
1067 .host
1068 .push_command(opts.into_command(seq));
1069 cmd
1070 }
1071
1072 pub(crate) fn view(&self) -> WorkflowContextView {
1074 self.base.view()
1075 }
1076}
1077
1078impl<W> WorkflowContext<W> {
1079 pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
1081 Self {
1082 sync: SyncWorkflowContext {
1083 base,
1084 headers: Rc::new(HashMap::new()),
1085 _phantom: PhantomData,
1086 },
1087 workflow_state,
1088 condition_wakers: Rc::new(RefCell::new(Vec::new())),
1089 }
1090 }
1091
1092 pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
1094 Self {
1095 sync: SyncWorkflowContext {
1096 base: self.sync.base.clone(),
1097 headers: Rc::new(headers),
1098 _phantom: PhantomData,
1099 },
1100 workflow_state: self.workflow_state.clone(),
1101 condition_wakers: self.condition_wakers.clone(),
1102 }
1103 }
1104
1105 pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
1107 self.sync.clone()
1108 }
1109
1110 pub(crate) fn view(&self) -> WorkflowContextView {
1112 self.sync.view()
1113 }
1114
1115 pub fn workflow_id(&self) -> &str {
1119 self.sync.workflow_id()
1120 }
1121
1122 pub fn run_id(&self) -> &str {
1124 self.sync.run_id()
1125 }
1126
1127 pub fn namespace(&self) -> &str {
1129 self.sync.namespace()
1130 }
1131
1132 pub fn task_queue(&self) -> &str {
1134 self.sync.task_queue()
1135 }
1136
1137 pub fn workflow_time(&self) -> Option<SystemTime> {
1139 self.sync.workflow_time()
1140 }
1141
1142 pub fn history_length(&self) -> u32 {
1144 self.sync.history_length()
1145 }
1146
1147 pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
1151 self.sync.current_deployment_version()
1152 }
1153
1154 pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
1156 self.sync.search_attributes()
1157 }
1158
1159 pub fn random_seed(&self) -> u64 {
1161 self.sync.random_seed()
1162 }
1163
1164 pub fn is_replaying(&self) -> bool {
1166 self.sync.is_replaying()
1167 }
1168
1169 pub fn continue_as_new_suggested(&self) -> bool {
1171 self.sync.continue_as_new_suggested()
1172 }
1173
1174 pub fn target_worker_deployment_version_changed(&self) -> bool {
1178 self.sync.target_worker_deployment_version_changed()
1179 }
1180
1181 pub fn headers(&self) -> &HashMap<String, Payload> {
1183 self.sync.headers()
1184 }
1185
1186 pub fn payload_converter(&self) -> &PayloadConverter {
1188 self.sync.payload_converter()
1189 }
1190
1191 pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
1193 self.sync.workflow_initial_info()
1194 }
1195
1196 pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
1198 self.sync.cancelled()
1199 }
1200
1201 pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
1203 self.sync.timer(opts)
1204 }
1205
1206 pub fn start_activity<AD: ActivityDefinition>(
1208 &self,
1209 activity: AD,
1210 input: impl Into<AD::Input>,
1211 opts: ActivityOptions,
1212 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
1213 where
1214 AD::Output: TemporalDeserializable,
1215 {
1216 self.sync.start_activity(activity, input, opts)
1217 }
1218
1219 pub fn start_local_activity<AD: ActivityDefinition>(
1221 &self,
1222 activity: AD,
1223 input: impl Into<AD::Input>,
1224 opts: LocalActivityOptions,
1225 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
1226 where
1227 AD::Output: TemporalDeserializable,
1228 {
1229 self.sync.start_local_activity(activity, input, opts)
1230 }
1231
1232 pub fn start_child_workflow<WD: WorkflowDefinition>(
1234 &self,
1235 workflow: WD,
1236 input: impl Into<WD::Input>,
1237 opts: ChildWorkflowOptions,
1238 ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
1239 where
1240 WD::Output: TemporalDeserializable,
1241 {
1242 self.sync.start_child_workflow(workflow, input, opts)
1243 }
1244
1245 #[deprecated(note = "use `start_child_workflow` instead")]
1247 pub fn child_workflow<WD: WorkflowDefinition>(
1248 &self,
1249 workflow: WD,
1250 input: impl Into<WD::Input>,
1251 opts: ChildWorkflowOptions,
1252 ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
1253 where
1254 WD::Output: TemporalDeserializable,
1255 {
1256 self.start_child_workflow(workflow, input, opts)
1257 }
1258
1259 pub fn patched(&self, patch_id: &str) -> bool {
1261 self.sync.patched(patch_id)
1262 }
1263
1264 pub fn deprecate_patch(&self, patch_id: &str) -> bool {
1267 self.sync.deprecate_patch(patch_id)
1268 }
1269
1270 pub fn external_workflow(
1272 &self,
1273 workflow_id: impl Into<String>,
1274 run_id: Option<String>,
1275 ) -> ExternalWorkflowHandle {
1276 self.sync.external_workflow(workflow_id, run_id)
1277 }
1278
1279 pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1281 self.sync.upsert_search_attributes(attr_iter)
1282 }
1283
1284 pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1286 self.sync.upsert_memo(attr_iter)
1287 }
1288
1289 pub fn set_current_details(&self, details: impl Into<String>) {
1293 self.sync.set_current_details(details)
1294 }
1295
1296 pub fn force_task_fail(&self, with: impl Into<Box<dyn std::error::Error + Send + Sync>>) {
1298 self.sync.force_task_fail(with)
1299 }
1300
1301 pub fn start_nexus_operation(
1303 &self,
1304 opts: NexusOperationOptions,
1305 ) -> impl CancellableFuture<NexusStartResult> {
1306 self.sync.start_nexus_operation(opts)
1307 }
1308
1309 pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
1314 f(&*self.workflow_state.borrow())
1315 }
1316
1317 pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
1326 let result = f(&mut *self.workflow_state.borrow_mut());
1327 let _guard = SdkWakeGuard::new();
1328 for waker in self.condition_wakers.borrow_mut().drain(..) {
1329 waker.wake();
1330 }
1331 self.sync.base.set_state_mutated();
1332 result
1333 }
1334
1335 pub fn continue_as_new(
1340 &self,
1341 input: &<W::Run as WorkflowDefinition>::Input,
1342 opts: ContinueAsNewOptions,
1343 ) -> Result<std::convert::Infallible, WorkflowTermination>
1344 where
1345 W: WorkflowImplementation,
1346 {
1347 self.sync.continue_as_new(input, opts)
1348 }
1349
1350 pub fn wait_condition<'a>(
1355 &'a self,
1356 mut condition: impl FnMut(&W) -> bool + 'a,
1357 ) -> impl FusedFuture<Output = ()> + 'a {
1358 future::poll_fn(move |cx: &mut Context<'_>| {
1359 if condition(&*self.workflow_state.borrow()) {
1360 Poll::Ready(())
1361 } else {
1362 self.condition_wakers.borrow_mut().push(cx.waker().clone());
1363 Poll::Pending
1364 }
1365 })
1366 .fuse()
1367 }
1368}
1369
1370struct WfCtxProtectedDat {
1371 next_timer_sequence_number: u32,
1372 next_activity_sequence_number: u32,
1373 next_child_workflow_sequence_number: u32,
1374 next_cancel_external_wf_sequence_number: u32,
1375 next_signal_external_wf_sequence_number: u32,
1376 next_nexus_op_sequence_number: u32,
1377}
1378
1379impl WfCtxProtectedDat {
1380 fn next_timer_seq(&mut self) -> u32 {
1381 let seq = self.next_timer_sequence_number;
1382 self.next_timer_sequence_number += 1;
1383 seq
1384 }
1385 fn next_activity_seq(&mut self) -> u32 {
1386 let seq = self.next_activity_sequence_number;
1387 self.next_activity_sequence_number += 1;
1388 seq
1389 }
1390 fn next_child_workflow_seq(&mut self) -> u32 {
1391 let seq = self.next_child_workflow_sequence_number;
1392 self.next_child_workflow_sequence_number += 1;
1393 seq
1394 }
1395 fn next_cancel_external_wf_seq(&mut self) -> u32 {
1396 let seq = self.next_cancel_external_wf_sequence_number;
1397 self.next_cancel_external_wf_sequence_number += 1;
1398 seq
1399 }
1400 fn next_signal_external_wf_seq(&mut self) -> u32 {
1401 let seq = self.next_signal_external_wf_sequence_number;
1402 self.next_signal_external_wf_sequence_number += 1;
1403 seq
1404 }
1405 fn next_nexus_op_seq(&mut self) -> u32 {
1406 let seq = self.next_nexus_op_sequence_number;
1407 self.next_nexus_op_sequence_number += 1;
1408 seq
1409 }
1410}
1411
1412#[derive(Clone, Debug, Default)]
1413struct WorkflowContextSharedData {
1414 changes: HashMap<String, bool>,
1416 activation: CoreWorkflowActivation,
1417 search_attributes: SearchAttributes,
1418 random_seed: u64,
1419 current_details: String,
1421}
1422
1423pub trait CancellableFuture<T>: Future<Output = T> + FusedFuture {
1426 fn cancel(&self);
1428}
1429
1430pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
1432 fn cancel_with_reason(&self, reason: String);
1434}
1435
1436struct WFCommandFut<T, D> {
1437 _unused: PhantomData<T>,
1438 result_rx: oneshot::Receiver<UnblockEvent>,
1439 other_dat: Option<D>,
1440}
1441impl<T> WFCommandFut<T, ()> {
1442 fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
1443 Self::new_with_dat(())
1444 }
1445}
1446
1447impl<T, D> WFCommandFut<T, D> {
1448 fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
1449 let (tx, rx) = oneshot::channel();
1450 (
1451 Self {
1452 _unused: PhantomData,
1453 result_rx: rx,
1454 other_dat: Some(other_dat),
1455 },
1456 tx,
1457 )
1458 }
1459}
1460
1461impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1462impl<T, D> Future for WFCommandFut<T, D>
1463where
1464 T: Unblockable<OtherDat = D>,
1465{
1466 type Output = T;
1467
1468 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1469 self.result_rx.poll_unpin(cx).map(|x| {
1470 let od = self
1471 .other_dat
1472 .take()
1473 .expect("Other data must exist when resolving command future");
1474 Unblockable::unblock(x.unwrap(), od)
1475 })
1476 }
1477}
1478impl<T, D> FusedFuture for WFCommandFut<T, D>
1479where
1480 T: Unblockable<OtherDat = D>,
1481{
1482 fn is_terminated(&self) -> bool {
1483 self.other_dat.is_none()
1484 }
1485}
1486
1487struct CancellableWFCommandFut<T, D> {
1488 cmd_fut: WFCommandFut<T, D>,
1489 cancellable_id: CancellableID,
1490 base_ctx: BaseWorkflowContext,
1491}
1492impl<T> CancellableWFCommandFut<T, ()> {
1493 fn new(
1494 cancellable_id: CancellableID,
1495 base_ctx: BaseWorkflowContext,
1496 ) -> (Self, oneshot::Sender<UnblockEvent>) {
1497 Self::new_with_dat(cancellable_id, (), base_ctx)
1498 }
1499}
1500impl<T, D> CancellableWFCommandFut<T, D> {
1501 fn new_with_dat(
1502 cancellable_id: CancellableID,
1503 other_dat: D,
1504 base_ctx: BaseWorkflowContext,
1505 ) -> (Self, oneshot::Sender<UnblockEvent>) {
1506 let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
1507 (
1508 Self {
1509 cmd_fut,
1510 cancellable_id,
1511 base_ctx,
1512 },
1513 sender,
1514 )
1515 }
1516}
1517impl<T, D> Unpin for CancellableWFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1518impl<T, D> Future for CancellableWFCommandFut<T, D>
1519where
1520 T: Unblockable<OtherDat = D>,
1521{
1522 type Output = T;
1523
1524 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1525 self.cmd_fut.poll_unpin(cx)
1526 }
1527}
1528impl<T, D> FusedFuture for CancellableWFCommandFut<T, D>
1529where
1530 T: Unblockable<OtherDat = D>,
1531{
1532 fn is_terminated(&self) -> bool {
1533 self.cmd_fut.is_terminated()
1534 }
1535}
1536
1537impl<T, D> CancellableFuture<T> for CancellableWFCommandFut<T, D>
1538where
1539 T: Unblockable<OtherDat = D>,
1540{
1541 fn cancel(&self) {
1542 self.base_ctx.cancel(self.cancellable_id.clone());
1543 }
1544}
1545impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D>
1546where
1547 T: Unblockable<OtherDat = D>,
1548{
1549 fn cancel_with_reason(&self, reason: String) {
1550 self.base_ctx
1551 .cancel(self.cancellable_id.clone().with_reason(reason));
1552 }
1553}
1554
1555struct LATimerBackoffFut {
1556 la_opts: LocalActivityOptions,
1557 activity_type: String,
1558 arguments: Vec<Payload>,
1559 current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
1560 timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
1561 base_ctx: BaseWorkflowContext,
1562 next_attempt: u32,
1563 next_sched_time: Option<prost_types::Timestamp>,
1564 did_cancel: AtomicBool,
1565 terminated: bool,
1566}
1567impl LATimerBackoffFut {
1568 fn new(
1569 activity_type: String,
1570 arguments: Vec<Payload>,
1571 opts: LocalActivityOptions,
1572 base_ctx: BaseWorkflowContext,
1573 ) -> Self {
1574 let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
1575 activity_type.clone(),
1576 arguments.clone(),
1577 opts.clone(),
1578 ));
1579 Self {
1580 la_opts: opts,
1581 activity_type,
1582 arguments,
1583 current_fut,
1584 timer_fut: None,
1585 base_ctx,
1586 next_attempt: 1,
1587 next_sched_time: None,
1588 did_cancel: AtomicBool::new(false),
1589 terminated: false,
1590 }
1591 }
1592}
1593impl Unpin for LATimerBackoffFut {}
1594impl Future for LATimerBackoffFut {
1595 type Output = ActivityResolution;
1596
1597 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1598 if let Some(tf) = self.timer_fut.as_mut() {
1600 return match tf.poll_unpin(cx) {
1601 Poll::Ready(tr) => {
1602 self.timer_fut = None;
1603 if let TimerResult::Fired = tr {
1605 let mut opts = self.la_opts.clone();
1606 opts.attempt = Some(self.next_attempt);
1607 opts.original_schedule_time
1608 .clone_from(&self.next_sched_time);
1609 self.current_fut =
1610 Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
1611 self.activity_type.clone(),
1612 self.arguments.clone(),
1613 opts,
1614 ));
1615 Poll::Pending
1616 } else {
1617 self.terminated = true;
1618 Poll::Ready(ActivityResolution {
1619 status: Some(activity_resolution::Status::Cancelled(Cancellation {
1620 failure: Some(Failure {
1621 message: "Activity cancelled".to_owned(),
1622 failure_info: Some(FailureInfo::CanceledFailureInfo(
1623 CanceledFailureInfo::default(),
1624 )),
1625 ..Default::default()
1626 }),
1627 })),
1628 })
1629 }
1630 }
1631 Poll::Pending => Poll::Pending,
1632 };
1633 }
1634 let poll_res = self.current_fut.poll_unpin(cx);
1635 if let Poll::Ready(ref r) = poll_res
1636 && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
1637 {
1638 if self.did_cancel.load(Ordering::Acquire) {
1642 self.terminated = true;
1643 return Poll::Ready(ActivityResolution {
1644 status: Some(activity_resolution::Status::Cancelled(Cancellation {
1645 failure: Some(Failure {
1646 message: "Activity cancelled".to_owned(),
1647 failure_info: Some(FailureInfo::CanceledFailureInfo(
1648 CanceledFailureInfo::default(),
1649 )),
1650 ..Default::default()
1651 }),
1652 })),
1653 });
1654 }
1655
1656 let timer_f = self.base_ctx.timer::<Duration>(
1657 b.backoff_duration
1658 .expect("Duration is set")
1659 .try_into()
1660 .expect("duration converts ok"),
1661 );
1662 self.timer_fut = Some(Box::pin(timer_f));
1663 self.next_attempt = b.attempt;
1664 self.next_sched_time.clone_from(&b.original_schedule_time);
1665 return Poll::Pending;
1666 }
1667 if poll_res.is_ready() {
1668 self.terminated = true;
1669 }
1670 poll_res
1671 }
1672}
1673impl FusedFuture for LATimerBackoffFut {
1674 fn is_terminated(&self) -> bool {
1675 self.terminated
1676 }
1677}
1678impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
1679 fn cancel(&self) {
1680 self.did_cancel.store(true, Ordering::Release);
1681 if let Some(tf) = self.timer_fut.as_ref() {
1682 tf.cancel();
1683 }
1684 self.current_fut.cancel();
1685 }
1686}
1687
1688enum ActivityFut<F, Output> {
1690 Errored {
1692 error: Option<Box<ActivityExecutionError>>,
1693 _phantom: PhantomData<Output>,
1694 },
1695 Running {
1697 inner: F,
1698 data_converter: DataConverter,
1699 _phantom: PhantomData<Output>,
1700 },
1701 Terminated,
1702}
1703
1704impl<F, Output> ActivityFut<F, Output> {
1705 fn eager(err: ActivityExecutionError) -> Self {
1706 Self::Errored {
1707 error: Some(Box::new(err)),
1708 _phantom: PhantomData,
1709 }
1710 }
1711
1712 fn running(inner: F, data_converter: DataConverter) -> Self {
1713 Self::Running {
1714 inner,
1715 data_converter,
1716 _phantom: PhantomData,
1717 }
1718 }
1719}
1720
1721impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
1722
1723impl<F, Output> Future for ActivityFut<F, Output>
1724where
1725 F: Future<Output = ActivityResolution> + Unpin,
1726 Output: TemporalDeserializable + 'static,
1727{
1728 type Output = Result<Output, ActivityExecutionError>;
1729
1730 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1731 let this = self.get_mut();
1732 let poll = match this {
1733 ActivityFut::Errored { error, .. } => {
1734 Poll::Ready(Err(*error.take().expect("polled after completion")))
1735 }
1736 ActivityFut::Running {
1737 inner,
1738 data_converter,
1739 ..
1740 } => match Pin::new(inner).poll(cx) {
1741 Poll::Pending => Poll::Pending,
1742 Poll::Ready(resolution) => Poll::Ready({
1743 let status = resolution.status.ok_or_else(|| {
1744 data_converter
1745 .to_error(
1746 &SerializationContextData::Workflow,
1747 Failure {
1748 message: "Activity completed without a status".to_string(),
1749 ..Default::default()
1750 },
1751 ActivityExecutionDecodeHint { cancelled: false },
1752 )
1753 .expect("synthetic activity failure should decode")
1754 })?;
1755
1756 match status {
1757 activity_resolution::Status::Completed(success) => {
1758 let payload = success.result.unwrap_or_default();
1759 let ctx = SerializationContext {
1760 data: &SerializationContextData::Workflow,
1761 converter: data_converter.payload_converter(),
1762 };
1763 data_converter
1764 .payload_converter()
1765 .from_payload::<Output>(&ctx, payload)
1766 .map_err(ActivityExecutionError::Serialization)
1767 }
1768 activity_resolution::Status::Failed(f) => Err(data_converter.to_error(
1769 &SerializationContextData::Workflow,
1770 f.failure.unwrap_or_default(),
1771 ActivityExecutionDecodeHint { cancelled: false },
1772 )?),
1773 activity_resolution::Status::Cancelled(c) => Err(data_converter.to_error(
1774 &SerializationContextData::Workflow,
1775 c.failure.unwrap_or_default(),
1776 ActivityExecutionDecodeHint { cancelled: true },
1777 )?),
1778 activity_resolution::Status::Backoff(_) => {
1779 panic!("DoBackoff should be handled by LATimerBackoffFut")
1780 }
1781 }
1782 }),
1783 },
1784 ActivityFut::Terminated => panic!("polled after termination"),
1785 };
1786 if poll.is_ready() {
1787 *this = ActivityFut::Terminated;
1788 }
1789 poll
1790 }
1791}
1792
1793impl<F, Output> FusedFuture for ActivityFut<F, Output>
1794where
1795 F: Future<Output = ActivityResolution> + Unpin,
1796 Output: TemporalDeserializable + 'static,
1797{
1798 fn is_terminated(&self) -> bool {
1799 matches!(self, ActivityFut::Terminated)
1800 }
1801}
1802
1803impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
1804where
1805 F: CancellableFuture<ActivityResolution> + Unpin,
1806 Output: TemporalDeserializable + 'static,
1807{
1808 fn cancel(&self) {
1809 if let ActivityFut::Running { inner, .. } = self {
1810 inner.cancel()
1811 }
1812 }
1813}
1814
1815pub(crate) struct ChildWfCommon {
1816 workflow_id: String,
1817 child_seq: u32,
1818 result_future: CancellableWFCommandFut<ChildWorkflowResult, ()>,
1819 base_ctx: BaseWorkflowContext,
1820 data_converter: DataConverter,
1821}
1822
1823#[derive(derive_more::Debug)]
1827pub(crate) struct PendingChildWorkflow<WD: WorkflowDefinition> {
1828 pub(crate) status: ChildWorkflowStartStatus,
1829 #[debug(skip)]
1830 pub(crate) common: ChildWfCommon,
1831 pub(crate) _phantom: PhantomData<WD>,
1832}
1833
1834#[derive(derive_more::Debug)]
1836pub struct StartedChildWorkflow<WD: WorkflowDefinition> {
1837 pub run_id: String,
1839 #[debug(skip)]
1840 common: ChildWfCommon,
1841 _phantom: PhantomData<WD>,
1842}
1843
1844enum ChildWorkflowFut<F, Output> {
1847 Running {
1848 inner: F,
1849 data_converter: DataConverter,
1850 _phantom: PhantomData<Output>,
1851 },
1852 Terminated,
1853}
1854
1855impl<F, Output> Unpin for ChildWorkflowFut<F, Output> where F: Unpin {}
1856
1857impl<F, Output> Future for ChildWorkflowFut<F, Output>
1858where
1859 F: Future<Output = ChildWorkflowResult> + Unpin,
1860 Output: TemporalDeserializable + 'static,
1861{
1862 type Output = Result<Output, ChildWorkflowExecutionError>;
1863
1864 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1865 let this = self.get_mut();
1866 let poll = match this {
1867 ChildWorkflowFut::Running {
1868 inner,
1869 data_converter,
1870 ..
1871 } => match Pin::new(inner).poll(cx) {
1872 Poll::Pending => Poll::Pending,
1873 Poll::Ready(result) => Poll::Ready({
1874 let status = result.status.ok_or_else(|| {
1875 data_converter
1876 .to_error(
1877 &SerializationContextData::Workflow,
1878 Failure {
1879 message: "Child workflow completed without a status"
1880 .to_string(),
1881 ..Default::default()
1882 },
1883 ChildWorkflowExecutionDecodeHint,
1884 )
1885 .expect("synthetic child workflow failure should decode")
1886 })?;
1887 match status {
1888 child_workflow_result::Status::Completed(success) => {
1889 let payloads = success.result.into_iter().collect();
1890 let ctx = SerializationContext {
1891 data: &SerializationContextData::Workflow,
1892 converter: data_converter.payload_converter(),
1893 };
1894 data_converter
1895 .payload_converter()
1896 .from_payloads::<Output>(&ctx, payloads)
1897 .map_err(ChildWorkflowExecutionError::Serialization)
1898 }
1899 child_workflow_result::Status::Failed(f) => Err(data_converter.to_error(
1900 &SerializationContextData::Workflow,
1901 f.failure.unwrap_or_default(),
1902 ChildWorkflowExecutionDecodeHint,
1903 )?),
1904 child_workflow_result::Status::Cancelled(c) => Err(data_converter
1905 .to_error(
1906 &SerializationContextData::Workflow,
1907 c.failure.unwrap_or_default(),
1908 ChildWorkflowExecutionDecodeHint,
1909 )?),
1910 }
1911 }),
1912 },
1913 ChildWorkflowFut::Terminated => panic!("polled after termination"),
1914 };
1915 if poll.is_ready() {
1916 *this = ChildWorkflowFut::Terminated;
1917 }
1918 poll
1919 }
1920}
1921
1922impl<F, Output> FusedFuture for ChildWorkflowFut<F, Output>
1923where
1924 F: Future<Output = ChildWorkflowResult> + Unpin,
1925 Output: TemporalDeserializable + 'static,
1926{
1927 fn is_terminated(&self) -> bool {
1928 matches!(self, ChildWorkflowFut::Terminated)
1929 }
1930}
1931
1932impl<F, Output> CancellableFutureWithReason<Result<Output, ChildWorkflowExecutionError>>
1933 for ChildWorkflowFut<F, Output>
1934where
1935 F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
1936 Output: TemporalDeserializable + 'static,
1937{
1938 fn cancel_with_reason(&self, reason: String) {
1939 if let ChildWorkflowFut::Running { inner, .. } = self {
1940 inner.cancel_with_reason(reason)
1941 }
1942 }
1943}
1944
1945impl<F, Output> CancellableFuture<Result<Output, ChildWorkflowExecutionError>>
1946 for ChildWorkflowFut<F, Output>
1947where
1948 F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
1949 Output: TemporalDeserializable + 'static,
1950{
1951 fn cancel(&self) {
1952 if let ChildWorkflowFut::Running { inner, .. } = self {
1953 inner.cancel()
1954 }
1955 }
1956}
1957
1958enum ChildWorkflowStartFut<F, WD: WorkflowDefinition> {
1961 Errored {
1963 error: Option<Box<ChildWorkflowStartError>>,
1964 _phantom: PhantomData<WD>,
1965 },
1966 Running(F),
1967 Terminated,
1968}
1969
1970impl<F, WD: WorkflowDefinition> ChildWorkflowStartFut<F, WD> {
1971 fn eager(err: ChildWorkflowStartError) -> Self {
1972 Self::Errored {
1973 error: Some(Box::new(err)),
1974 _phantom: PhantomData,
1975 }
1976 }
1977}
1978
1979impl<F, WD: WorkflowDefinition> Unpin for ChildWorkflowStartFut<F, WD> where F: Unpin {}
1980
1981impl<F, WD> Future for ChildWorkflowStartFut<F, WD>
1982where
1983 F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
1984 WD: WorkflowDefinition,
1985{
1986 type Output = Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>;
1987
1988 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1989 let this = self.get_mut();
1990 let poll = match this {
1991 ChildWorkflowStartFut::Errored { error, .. } => {
1992 Poll::Ready(Err(*error.take().expect("polled after completion")))
1993 }
1994 ChildWorkflowStartFut::Running(inner) => match Pin::new(inner).poll(cx) {
1995 Poll::Pending => Poll::Pending,
1996 Poll::Ready(pending) => Poll::Ready(match pending.status {
1997 ChildWorkflowStartStatus::Succeeded(s) => Ok(StartedChildWorkflow {
1998 run_id: s.run_id,
1999 common: pending.common,
2000 _phantom: PhantomData,
2001 }),
2002 ChildWorkflowStartStatus::Failed(f) => {
2003 Err(ChildWorkflowStartError::StartFailed {
2004 workflow_id: f.workflow_id,
2005 workflow_type: f.workflow_type,
2006 cause: StartChildWorkflowExecutionFailedCause::try_from(f.cause)
2007 .unwrap_or(StartChildWorkflowExecutionFailedCause::Unspecified),
2008 })
2009 }
2010 ChildWorkflowStartStatus::Cancelled(c) => {
2011 Err(pending.common.data_converter.to_error(
2012 &SerializationContextData::Workflow,
2013 c.failure.unwrap_or_default(),
2014 ChildWorkflowStartDecodeHint,
2015 )?)
2016 }
2017 }),
2018 },
2019 ChildWorkflowStartFut::Terminated => panic!("polled after termination"),
2020 };
2021 if poll.is_ready() {
2022 *this = ChildWorkflowStartFut::Terminated;
2023 }
2024 poll
2025 }
2026}
2027
2028impl<F, WD> FusedFuture for ChildWorkflowStartFut<F, WD>
2029where
2030 F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
2031 WD: WorkflowDefinition,
2032{
2033 fn is_terminated(&self) -> bool {
2034 matches!(self, ChildWorkflowStartFut::Terminated)
2035 }
2036}
2037
2038impl<F, WD> CancellableFuture<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
2039 for ChildWorkflowStartFut<F, WD>
2040where
2041 F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
2042 WD: WorkflowDefinition,
2043{
2044 fn cancel(&self) {
2045 if let ChildWorkflowStartFut::Running(inner) = self {
2046 inner.cancel()
2047 }
2048 }
2049}
2050
2051impl<F, WD> CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
2052 for ChildWorkflowStartFut<F, WD>
2053where
2054 F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
2055 WD: WorkflowDefinition,
2056{
2057 fn cancel_with_reason(&self, reason: String) {
2058 if let ChildWorkflowStartFut::Running(inner) = self {
2059 inner.cancel_with_reason(reason)
2060 }
2061 }
2062}
2063
2064enum SignalChildFut<F> {
2067 Errored {
2069 error: Option<WorkflowSignalError>,
2070 },
2071 Running {
2072 inner: F,
2073 data_converter: DataConverter,
2074 },
2075 Terminated,
2076}
2077
2078impl<F> SignalChildFut<F> {
2079 fn eager(err: WorkflowSignalError) -> Self {
2080 Self::Errored { error: Some(err) }
2081 }
2082}
2083
2084impl<F> Unpin for SignalChildFut<F> where F: Unpin {}
2085
2086impl<F> Future for SignalChildFut<F>
2087where
2088 F: Future<Output = SignalExternalWfResult> + Unpin,
2089{
2090 type Output = Result<(), WorkflowSignalError>;
2091
2092 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2093 let this = self.get_mut();
2094 let poll = match this {
2095 SignalChildFut::Errored { error } => {
2096 Poll::Ready(Err(error.take().expect("polled after completion")))
2097 }
2098 SignalChildFut::Running {
2099 inner,
2100 data_converter,
2101 } => match Pin::new(inner).poll(cx) {
2102 Poll::Pending => Poll::Pending,
2103 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
2104 Poll::Ready(Err(failure)) => Poll::Ready(Err(data_converter.to_error(
2105 &SerializationContextData::Workflow,
2106 failure,
2107 WorkflowSignalDecodeHint,
2108 )?)),
2109 },
2110 SignalChildFut::Terminated => panic!("polled after termination"),
2111 };
2112 if poll.is_ready() {
2113 *this = SignalChildFut::Terminated;
2114 }
2115 poll
2116 }
2117}
2118
2119impl<F> FusedFuture for SignalChildFut<F>
2120where
2121 F: Future<Output = SignalExternalWfResult> + Unpin,
2122{
2123 fn is_terminated(&self) -> bool {
2124 matches!(self, SignalChildFut::Terminated)
2125 }
2126}
2127
2128impl<F> CancellableFuture<Result<(), WorkflowSignalError>> for SignalChildFut<F>
2129where
2130 F: CancellableFuture<SignalExternalWfResult> + Unpin,
2131{
2132 fn cancel(&self) {
2133 if let SignalChildFut::Running { inner, .. } = self {
2134 inner.cancel()
2135 }
2136 }
2137}
2138
2139impl<WD: WorkflowDefinition> StartedChildWorkflow<WD>
2140where
2141 WD::Output: TemporalDeserializable + 'static,
2142{
2143 pub fn result(
2146 self,
2147 ) -> impl CancellableFutureWithReason<Result<WD::Output, ChildWorkflowExecutionError>> {
2148 ChildWorkflowFut::Running {
2149 inner: self.common.result_future,
2150 data_converter: self.common.data_converter,
2151 _phantom: PhantomData,
2152 }
2153 }
2154
2155 pub fn cancel(&self, reason: String) {
2157 self.common.base_ctx.inner.runtime.host.push_command(
2158 workflow_command::Variant::CancelChildWorkflowExecution(CancelChildWorkflowExecution {
2159 child_workflow_seq: self.common.child_seq,
2160 reason,
2161 })
2162 .into(),
2163 );
2164 }
2165
2166 pub fn signal<S: SignalDefinition<Workflow = WD>>(
2168 &self,
2169 signal: S,
2170 input: S::Input,
2171 ) -> impl CancellableFuture<Result<(), WorkflowSignalError>> + 'static {
2172 let payload_converter = self.common.data_converter.payload_converter();
2173 let ctx = SerializationContext {
2174 data: &SerializationContextData::Workflow,
2175 converter: payload_converter,
2176 };
2177 let payloads = match payload_converter.to_payloads(&ctx, &input) {
2178 Ok(p) => p,
2179 Err(e) => {
2180 return SignalChildFut::eager(e.into());
2181 }
2182 };
2183 let signal = Signal::new(S::name(&signal), payloads);
2184 let target = signal_external_workflow_execution::Target::ChildWorkflowId(
2185 self.common.workflow_id.clone(),
2186 );
2187 SignalChildFut::Running {
2188 inner: self.common.base_ctx.clone().send_signal_wf(target, signal),
2189 data_converter: self.common.data_converter.clone(),
2190 }
2191 }
2192}
2193
2194#[derive(derive_more::Debug)]
2199pub struct ExternalWorkflowHandle {
2200 workflow_id: String,
2201 run_id: Option<String>,
2202 namespace: String,
2203 #[debug(skip)]
2204 base_ctx: BaseWorkflowContext,
2205}
2206
2207impl ExternalWorkflowHandle {
2208 pub fn workflow_id(&self) -> &str {
2210 &self.workflow_id
2211 }
2212
2213 pub fn run_id(&self) -> Option<&str> {
2215 self.run_id.as_deref()
2216 }
2217
2218 pub fn signal<S: SignalDefinition>(
2220 &self,
2221 signal: S,
2222 input: S::Input,
2223 ) -> impl CancellableFuture<Result<(), WorkflowSignalError>> + 'static {
2224 let payload_converter = self.base_ctx.data_converter().payload_converter();
2225 let ctx = SerializationContext {
2226 data: &SerializationContextData::Workflow,
2227 converter: payload_converter,
2228 };
2229 let payloads = match payload_converter.to_payloads(&ctx, &input) {
2230 Ok(p) => p,
2231 Err(e) => {
2232 return SignalChildFut::eager(e.into());
2233 }
2234 };
2235 let signal = Signal::new(S::name(&signal), payloads);
2236 let target = signal_external_workflow_execution::Target::WorkflowExecution(
2237 NamespacedWorkflowExecution {
2238 namespace: self.namespace.clone(),
2239 workflow_id: self.workflow_id.clone(),
2240 run_id: self.run_id.clone().unwrap_or_default(),
2241 },
2242 );
2243 SignalChildFut::Running {
2244 inner: self.base_ctx.clone().send_signal_wf(target, signal),
2245 data_converter: self.base_ctx.data_converter().clone(),
2246 }
2247 }
2248
2249 pub fn cancel(
2251 &self,
2252 reason: Option<String>,
2253 ) -> impl FusedFuture<Output = CancelExternalWfResult> {
2254 let seq = self
2255 .base_ctx
2256 .inner
2257 .seq_nums
2258 .borrow_mut()
2259 .next_cancel_external_wf_seq();
2260 let (cmd, unblocker) = WFCommandFut::new();
2261 self.base_ctx
2262 .inner
2263 .runtime
2264 .register_unblocker(PendingCommandId::CancelExternal(seq), unblocker);
2265 self.base_ctx.inner.runtime.host.push_command(
2266 workflow_command::Variant::RequestCancelExternalWorkflowExecution(
2267 RequestCancelExternalWorkflowExecution {
2268 seq,
2269 workflow_execution: Some(NamespacedWorkflowExecution {
2270 namespace: self.namespace.clone(),
2271 workflow_id: self.workflow_id.clone(),
2272 run_id: self.run_id.clone().unwrap_or_default(),
2273 }),
2274 reason: reason.unwrap_or_default(),
2275 },
2276 )
2277 .into(),
2278 );
2279 cmd
2280 }
2281}
2282
2283#[derive(derive_more::Debug)]
2284#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
2285pub struct StartedNexusOperation {
2286 pub operation_token: Option<String>,
2288 pub(crate) unblock_dat: NexusUnblockData,
2289}
2290
2291pub(crate) struct NexusUnblockData {
2292 result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
2293 schedule_seq: u32,
2294 base_ctx: BaseWorkflowContext,
2295}
2296
2297impl StartedNexusOperation {
2298 pub async fn result(&self) -> NexusOperationResult {
2299 SdkGuardedFuture(self.unblock_dat.result_future.clone()).await
2303 }
2304
2305 pub fn cancel(&self) {
2306 self.unblock_dat
2307 .base_ctx
2308 .cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
2309 }
2310}
2311
2312#[cfg(test)]
2313mod tests {
2314 use super::*;
2315 use std::collections::HashMap;
2316 use temporalio_common_wasm::{
2317 data_converters::{TemporalDeserializable, TemporalSerializable},
2318 protos::{
2319 coresdk::{
2320 AsJsonPayloadExt, common::VersioningIntent as ProtoVersioningIntent,
2321 workflow_commands::WorkflowCommand,
2322 },
2323 temporal::api::{
2324 common::v1::{Payload, RetryPolicy},
2325 enums::v1::ContinueAsNewVersioningBehavior as ProtoContinueAsNewVersioningBehavior,
2326 },
2327 },
2328 };
2329 use temporalio_macros::{workflow, workflow_methods};
2330
2331 #[derive(Default)]
2332 struct NoopHost;
2333
2334 impl WorkflowHost for NoopHost {
2335 fn set_current_details(&self, _details: String) {}
2336 fn push_command(&self, _command: WorkflowCommand) {}
2337 }
2338
2339 #[workflow]
2340 #[derive(Default)]
2341 struct TestWorkflow;
2342
2343 #[workflow_methods]
2344 impl TestWorkflow {
2345 #[run]
2346 async fn run(_ctx: &mut WorkflowContext<Self>, _input: u8) -> crate::WorkflowResult<()> {
2347 unreachable!("test workflow run should not be polled")
2348 }
2349 }
2350
2351 fn test_context() -> WorkflowContext<TestWorkflow> {
2352 let init = InitializeWorkflow {
2353 workflow_type: TestWorkflow.name().to_string(),
2354 ..Default::default()
2355 };
2356 let base = BaseWorkflowContext::new(
2357 "default".to_string(),
2358 "orig-task-queue".to_string(),
2359 "run-id".to_string(),
2360 init,
2361 DataConverter::default(),
2362 Rc::new(NoopHost),
2363 );
2364 WorkflowContext::from_base(base, Rc::new(RefCell::new(TestWorkflow)))
2365 }
2366
2367 #[test]
2368 fn workflow_context_continue_as_new_serializes_input_and_defaults() {
2369 let ctx = test_context();
2370
2371 let termination = ctx
2372 .continue_as_new(&7, ContinueAsNewOptions::default())
2373 .expect_err("continue_as_new should terminate the workflow");
2374 assert!(
2375 matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2376 "expected continue-as-new termination, got {termination:?}"
2377 );
2378 let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2379 unreachable!()
2380 };
2381
2382 assert_eq!(
2383 *cmd,
2384 crate::runtime::types::ContinueAsNewRequest {
2385 workflow_type: TestWorkflow.name().to_string(),
2386 task_queue: String::new(),
2387 arguments: vec![7u8.as_json_payload().unwrap()],
2388 workflow_run_timeout: None,
2389 workflow_task_timeout: None,
2390 backoff_start_interval: None,
2391 memo: HashMap::new(),
2392 headers: HashMap::new(),
2393 search_attributes: None,
2394 retry_policy: None,
2395 versioning_intent: ProtoVersioningIntent::Unspecified.into(),
2396 initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::Unspecified
2397 .into(),
2398 }
2399 );
2400 }
2401
2402 #[test]
2403 fn sync_workflow_context_continue_as_new_applies_options() {
2404 let ctx = test_context();
2405 let sync = ctx.sync_context();
2406 let mut memo = HashMap::new();
2407 memo.insert(
2408 "memo-key".to_string(),
2409 Payload::from(b"memo-value".as_slice()),
2410 );
2411 let mut headers = HashMap::new();
2412 headers.insert(
2413 "header-key".to_string(),
2414 Payload::from(b"header-value".as_slice()),
2415 );
2416 let mut search_attributes = SearchAttributes::default();
2417 search_attributes.indexed_fields.insert(
2418 "CustomKeywordField".to_string(),
2419 Payload::from(b"value".as_slice()),
2420 );
2421
2422 let termination = sync
2423 .continue_as_new(
2424 &11,
2425 ContinueAsNewOptions {
2426 workflow_type: Some("next-workflow".to_string()),
2427 task_queue: Some("next-task-queue".to_string()),
2428 run_timeout: Some(Duration::from_secs(10)),
2429 task_timeout: Some(Duration::from_secs(3)),
2430 backoff_start_interval: Some(Duration::from_secs(4)),
2431 memo: Some(memo.clone()),
2432 headers: Some(headers.clone()),
2433 search_attributes: Some(search_attributes.clone()),
2434 retry_policy: Some(RetryPolicy {
2435 maximum_attempts: 5,
2436 ..Default::default()
2437 }),
2438 versioning_intent: Some(ProtoVersioningIntent::Compatible),
2439 initial_versioning_behavior: Some(
2440 ContinueAsNewVersioningBehavior::UseRampingVersion,
2441 ),
2442 },
2443 )
2444 .expect_err("continue_as_new should terminate the workflow");
2445 assert!(
2446 matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2447 "expected continue-as-new termination, got {termination:?}"
2448 );
2449 let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2450 unreachable!()
2451 };
2452
2453 assert_eq!(
2454 *cmd,
2455 crate::runtime::types::ContinueAsNewRequest {
2456 workflow_type: "next-workflow".to_string(),
2457 task_queue: "next-task-queue".to_string(),
2458 arguments: vec![11u8.as_json_payload().unwrap()],
2459 workflow_run_timeout: Some(Duration::from_secs(10).try_into().unwrap()),
2460 workflow_task_timeout: Some(Duration::from_secs(3).try_into().unwrap()),
2461 backoff_start_interval: Some(Duration::from_secs(4).try_into().unwrap()),
2462 memo,
2463 headers,
2464 search_attributes: Some(search_attributes),
2465 retry_policy: Some(RetryPolicy {
2466 maximum_attempts: 5,
2467 ..Default::default()
2468 }),
2469 versioning_intent: ProtoVersioningIntent::Compatible.into(),
2470 initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::UseRampingVersion
2471 as i32,
2472 }
2473 );
2474 }
2475
2476 #[test]
2477 fn continue_as_new_preserves_explicit_empty_search_attributes() {
2478 let ctx = test_context();
2479 let sync = ctx.sync_context();
2480
2481 let termination = sync
2482 .continue_as_new(
2483 &11,
2484 ContinueAsNewOptions {
2485 search_attributes: Some(SearchAttributes::default()),
2486 ..Default::default()
2487 },
2488 )
2489 .expect_err("continue_as_new should terminate the workflow");
2490 let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2491 unreachable!()
2492 };
2493
2494 assert_eq!(cmd.search_attributes, Some(SearchAttributes::default()));
2495 }
2496
2497 #[test]
2498 fn workflow_context_continue_as_new_applies_auto_upgrade_versioning_behavior() {
2499 let ctx = test_context();
2500
2501 let termination = ctx
2502 .continue_as_new(
2503 &13,
2504 ContinueAsNewOptions {
2505 initial_versioning_behavior: Some(ContinueAsNewVersioningBehavior::AutoUpgrade),
2506 ..Default::default()
2507 },
2508 )
2509 .expect_err("continue_as_new should terminate the workflow");
2510 let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2511 unreachable!()
2512 };
2513
2514 assert_eq!(
2515 cmd.initial_versioning_behavior,
2516 ProtoContinueAsNewVersioningBehavior::AutoUpgrade as i32
2517 );
2518 }
2519
2520 #[test]
2521 fn continue_as_new_reports_serialization_errors() {
2522 #[derive(Debug)]
2523 struct FailingInput;
2524
2525 impl TemporalSerializable for FailingInput {
2526 fn to_payload(
2527 &self,
2528 _ctx: &temporalio_common_wasm::data_converters::SerializationContext<'_>,
2529 ) -> Result<Payload, temporalio_common_wasm::data_converters::PayloadConversionError>
2530 {
2531 Err(
2532 temporalio_common_wasm::data_converters::PayloadConversionError::EncodingError(
2533 std::io::Error::other("serialization failure").into(),
2534 ),
2535 )
2536 }
2537 }
2538
2539 impl TemporalDeserializable for FailingInput {
2540 fn from_payload(
2541 _ctx: &temporalio_common_wasm::data_converters::SerializationContext<'_>,
2542 _payload: Payload,
2543 ) -> Result<Self, temporalio_common_wasm::data_converters::PayloadConversionError>
2544 {
2545 unreachable!("test input is only serialized")
2546 }
2547 }
2548
2549 #[workflow]
2550 #[derive(Default)]
2551 struct FailingWorkflow;
2552
2553 #[workflow_methods]
2554 impl FailingWorkflow {
2555 #[run]
2556 async fn run(
2557 _ctx: &mut WorkflowContext<Self>,
2558 _input: FailingInput,
2559 ) -> crate::WorkflowResult<()> {
2560 unreachable!("test workflow run should not be polled")
2561 }
2562 }
2563
2564 let init = InitializeWorkflow {
2565 workflow_type: "failing-workflow".to_string(),
2566 ..Default::default()
2567 };
2568 let base = BaseWorkflowContext::new(
2569 "default".to_string(),
2570 "orig-task-queue".to_string(),
2571 "run-id".to_string(),
2572 init,
2573 DataConverter::default(),
2574 Rc::new(NoopHost),
2575 );
2576 let ctx = WorkflowContext::from_base(base, Rc::new(RefCell::new(FailingWorkflow)));
2577
2578 let err = ctx
2579 .continue_as_new(&FailingInput, ContinueAsNewOptions::default())
2580 .expect_err("serialization errors should be surfaced");
2581
2582 let WorkflowTermination::Failed(err) = err else {
2583 panic!("expected failed termination, got {err:?}");
2584 };
2585 assert_eq!(err.to_string(), "Encoding error: serialization failure");
2586 }
2587}