1mod options;
2
3pub use options::{
4 ActivityCloseTimeouts, ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions,
5 LocalActivityOptions, NexusOperationOptions, Signal, SignalData, TimerOptions,
6};
7pub use temporalio_common::protos::coresdk::child_workflow::StartChildWorkflowExecutionFailedCause;
8
9use crate::{
10 CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
11 CommandSubscribeChildWorkflowCompletion, NexusStartResult, RustWfCmd, SignalExternalWfResult,
12 SupportsCancelReason, TimerResult, UnblockEvent, Unblockable, WorkflowTermination,
13 workflow_context::options::IntoWorkflowCommand, workflow_executor::SdkWakeGuard,
14};
15use futures_util::{
16 FutureExt,
17 future::{FusedFuture, Shared},
18 task::Context,
19};
20use std::{
21 cell::{Cell, Ref, RefCell},
22 collections::HashMap,
23 future::{self, Future},
24 marker::PhantomData,
25 ops::{Deref, DerefMut},
26 pin::Pin,
27 rc::Rc,
28 sync::{
29 atomic::{AtomicBool, Ordering},
30 mpsc::{Receiver, Sender},
31 },
32 task::{Poll, Waker},
33 time::{Duration, SystemTime},
34};
35use temporalio_common::{
36 ActivityDefinition, SignalDefinition, WorkflowDefinition,
37 data_converters::{
38 GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
39 SerializationContextData, TemporalDeserializable,
40 },
41 protos::{
42 coresdk::{
43 activity_result::{ActivityResolution, activity_resolution},
44 child_workflow::ChildWorkflowResult,
45 common::NamespacedWorkflowExecution,
46 nexus::NexusOperationResult,
47 workflow_activation::{
48 InitializeWorkflow,
49 resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
50 },
51 workflow_commands::{
52 CancelChildWorkflowExecution, ModifyWorkflowProperties,
53 RequestCancelExternalWorkflowExecution, SetPatchMarker,
54 SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
55 WorkflowCommand, signal_external_workflow_execution as sig_we, workflow_command,
56 },
57 },
58 temporal::api::{
59 common::v1::{Memo, Payload, SearchAttributes},
60 failure::v1::Failure,
61 sdk::v1::UserMetadata,
62 },
63 },
64 worker::WorkerDeploymentVersion,
65};
66use tokio::sync::{oneshot, watch};
67
68#[derive(Clone)]
72pub struct BaseWorkflowContext {
73 inner: Rc<WorkflowContextInner>,
74}
75impl BaseWorkflowContext {
76 pub(crate) fn shared_mut(&self) -> impl DerefMut<Target = WorkflowContextSharedData> {
77 self.inner.shared.borrow_mut()
78 }
79
80 pub(crate) fn view(&self) -> WorkflowContextView {
82 WorkflowContextView::new(
83 self.inner.namespace.clone(),
84 self.inner.task_queue.clone(),
85 self.inner.run_id.clone(),
86 &self.inner.inital_information,
87 )
88 }
89}
90
91struct WorkflowContextInner {
92 namespace: String,
93 task_queue: String,
94 run_id: String,
95 inital_information: InitializeWorkflow,
96 chan: Sender<RustWfCmd>,
97 am_cancelled: watch::Receiver<Option<String>>,
98 shared: RefCell<WorkflowContextSharedData>,
99 seq_nums: RefCell<WfCtxProtectedDat>,
100 payload_converter: PayloadConverter,
101 state_mutated: Cell<bool>,
102}
103
104pub struct SyncWorkflowContext<W> {
112 base: BaseWorkflowContext,
113 headers: Rc<HashMap<String, Payload>>,
115 _phantom: PhantomData<W>,
116}
117
118impl<W> Clone for SyncWorkflowContext<W> {
119 fn clone(&self) -> Self {
120 Self {
121 base: self.base.clone(),
122 headers: self.headers.clone(),
123 _phantom: PhantomData,
124 }
125 }
126}
127
128pub struct WorkflowContext<W> {
133 sync: SyncWorkflowContext<W>,
134 workflow_state: Rc<RefCell<W>>,
136 condition_wakers: Rc<RefCell<Vec<Waker>>>,
140}
141
142impl<W> Clone for WorkflowContext<W> {
143 fn clone(&self) -> Self {
144 Self {
145 sync: self.sync.clone(),
146 workflow_state: self.workflow_state.clone(),
147 condition_wakers: self.condition_wakers.clone(),
148 }
149 }
150}
151
152#[derive(Clone, Debug)]
156#[non_exhaustive]
157pub struct WorkflowContextView {
158 pub workflow_id: String,
160 pub run_id: String,
162 pub workflow_type: String,
164 pub task_queue: String,
166 pub namespace: String,
168
169 pub attempt: u32,
171 pub first_execution_run_id: String,
173 pub continued_from_run_id: Option<String>,
175
176 pub start_time: Option<SystemTime>,
178 pub execution_timeout: Option<Duration>,
180 pub run_timeout: Option<Duration>,
182 pub task_timeout: Option<Duration>,
184
185 pub parent: Option<ParentWorkflowInfo>,
187 pub root: Option<RootWorkflowInfo>,
189
190 pub retry_policy: Option<temporalio_common::protos::temporal::api::common::v1::RetryPolicy>,
192 pub cron_schedule: Option<String>,
194 pub memo: Option<Memo>,
196 pub search_attributes: Option<SearchAttributes>,
198}
199
200#[derive(Clone, Debug)]
202#[non_exhaustive]
203pub struct ParentWorkflowInfo {
204 pub workflow_id: String,
206 pub run_id: String,
208 pub namespace: String,
210}
211
212#[derive(Clone, Debug)]
214#[non_exhaustive]
215pub struct RootWorkflowInfo {
216 pub workflow_id: String,
218 pub run_id: String,
220}
221
222impl WorkflowContextView {
223 pub(crate) fn new(
225 namespace: String,
226 task_queue: String,
227 run_id: String,
228 init: &InitializeWorkflow,
229 ) -> Self {
230 let parent = init
231 .parent_workflow_info
232 .as_ref()
233 .map(|p| ParentWorkflowInfo {
234 workflow_id: p.workflow_id.clone(),
235 run_id: p.run_id.clone(),
236 namespace: p.namespace.clone(),
237 });
238
239 let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
240 workflow_id: r.workflow_id.clone(),
241 run_id: r.run_id.clone(),
242 });
243
244 let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
245 None
246 } else {
247 Some(init.continued_from_execution_run_id.clone())
248 };
249
250 let cron_schedule = if init.cron_schedule.is_empty() {
251 None
252 } else {
253 Some(init.cron_schedule.clone())
254 };
255
256 Self {
257 workflow_id: init.workflow_id.clone(),
258 run_id,
259 workflow_type: init.workflow_type.clone(),
260 task_queue,
261 namespace,
262 attempt: init.attempt as u32,
263 first_execution_run_id: init.first_execution_run_id.clone(),
264 continued_from_run_id,
265 start_time: init.start_time.and_then(|t| t.try_into().ok()),
266 execution_timeout: init
267 .workflow_execution_timeout
268 .and_then(|d| d.try_into().ok()),
269 run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
270 task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
271 parent,
272 root,
273 retry_policy: init.retry_policy.clone(),
274 cron_schedule,
275 memo: init.memo.clone(),
276 search_attributes: init.search_attributes.clone(),
277 }
278 }
279}
280
281#[derive(Debug, thiserror::Error)]
283pub enum ActivityExecutionError {
284 #[error("Activity failed: {}", .0.message)]
286 Failed(Box<Failure>),
287 #[error("Activity cancelled: {}", .0.message)]
289 Cancelled(Box<Failure>),
290 #[error("Payload conversion failed: {0}")]
293 Serialization(#[from] PayloadConversionError),
294}
295
296impl ActivityExecutionError {
297 pub fn is_timeout(&self) -> bool {
299 match self {
300 ActivityExecutionError::Failed(f) => f.is_timeout().is_some(),
301 _ => false,
302 }
303 }
304}
305
306#[derive(Debug, thiserror::Error)]
308pub enum ChildWorkflowExecutionError {
309 #[error("Child workflow failed: {}", .0.message)]
311 Failed(Box<Failure>),
312 #[error("Child workflow cancelled: {}", .0.message)]
314 Cancelled(Box<Failure>),
315 #[error(
317 "Child workflow start failed: workflow_id={workflow_id}, workflow_type={workflow_type}, cause={cause:?}"
318 )]
319 StartFailed {
320 workflow_id: String,
322 workflow_type: String,
324 cause: StartChildWorkflowExecutionFailedCause,
326 },
327 #[error("Payload conversion failed: {0}")]
329 Serialization(#[from] PayloadConversionError),
330}
331
332#[derive(Debug, thiserror::Error)]
334pub enum ChildWorkflowSignalError {
335 #[error("Child workflow signal failed: {}", .0.message)]
337 Failed(Box<Failure>),
338 #[error("Signal payload conversion failed: {0}")]
340 Serialization(#[from] PayloadConversionError),
341}
342
343impl BaseWorkflowContext {
344 pub(crate) fn new(
347 namespace: String,
348 task_queue: String,
349 run_id: String,
350 init_workflow_job: InitializeWorkflow,
351 am_cancelled: watch::Receiver<Option<String>>,
352 payload_converter: PayloadConverter,
353 ) -> (Self, Receiver<RustWfCmd>) {
354 let (chan, rx) = std::sync::mpsc::channel();
356 (
357 Self {
358 inner: Rc::new(WorkflowContextInner {
359 namespace,
360 task_queue,
361 run_id,
362 shared: RefCell::new(WorkflowContextSharedData {
363 random_seed: init_workflow_job.randomness_seed,
364 search_attributes: init_workflow_job
365 .search_attributes
366 .clone()
367 .unwrap_or_default(),
368 ..Default::default()
369 }),
370 inital_information: init_workflow_job,
371 chan,
372 am_cancelled,
373 seq_nums: RefCell::new(WfCtxProtectedDat {
374 next_timer_sequence_number: 1,
375 next_activity_sequence_number: 1,
376 next_child_workflow_sequence_number: 1,
377 next_cancel_external_wf_sequence_number: 1,
378 next_signal_external_wf_sequence_number: 1,
379 next_nexus_op_sequence_number: 1,
380 }),
381 payload_converter,
382 state_mutated: Cell::new(false),
383 }),
384 },
385 rx,
386 )
387 }
388
389 pub(crate) fn send(&self, c: RustWfCmd) {
391 self.inner.chan.send(c).expect("command channel intact");
392 }
393
394 pub(crate) fn take_state_mutated(&self) -> bool {
397 self.inner.state_mutated.replace(false)
398 }
399
400 pub(crate) fn set_state_mutated(&self) {
402 self.inner.state_mutated.set(true);
403 }
404
405 pub(crate) fn current_details(&self) -> String {
407 self.inner.shared.borrow().current_details.clone()
408 }
409
410 fn cancel(&self, cancellable_id: CancellableID) {
412 self.send(RustWfCmd::Cancel(cancellable_id));
413 }
414
415 pub fn timer<T: Into<TimerOptions>>(
417 &self,
418 opts: T,
419 ) -> impl CancellableFuture<TimerResult> + use<T> {
420 let opts: TimerOptions = opts.into();
421 let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
422 let (cmd, unblocker) =
423 CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
424 let payload_converter = PayloadConverter::default();
425 let context = SerializationContext {
426 data: &SerializationContextData::Workflow,
427 converter: &payload_converter,
428 };
429 self.send(
430 CommandCreateRequest {
431 cmd: WorkflowCommand {
432 variant: Some(
433 StartTimer {
434 seq,
435 start_to_fire_timeout: Some(
436 opts.duration
437 .try_into()
438 .expect("Durations must fit into 64 bits"),
439 ),
440 }
441 .into(),
442 ),
443 user_metadata: Some(UserMetadata {
444 summary: opts.summary.map(|summary| {
445 payload_converter
446 .to_payload(&context, &summary)
447 .expect("String-to-JSON payload serialization is infallible")
448 }),
449 details: None,
450 }),
451 },
452 unblocker,
453 }
454 .into(),
455 );
456 cmd
457 }
458
459 pub fn start_activity<AD: ActivityDefinition>(
461 &self,
462 _activity: AD,
463 input: impl Into<AD::Input>,
464 mut opts: ActivityOptions,
465 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
466 where
467 AD::Output: TemporalDeserializable,
468 {
469 let input = input.into();
470 let ctx = SerializationContext {
471 data: &SerializationContextData::Workflow,
472 converter: &self.inner.payload_converter,
473 };
474 let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
475 Ok(p) => p,
476 Err(e) => {
477 return ActivityFut::eager(e.into());
478 }
479 };
480 let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
481 let (cmd, unblocker) =
482 CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
483 if opts.task_queue.is_none() {
484 opts.task_queue = Some(self.inner.task_queue.clone());
485 }
486 self.send(
487 CommandCreateRequest {
488 cmd: opts.into_command(AD::name().to_string(), payloads, seq),
489 unblocker,
490 }
491 .into(),
492 );
493 ActivityFut::running(cmd, self.inner.payload_converter.clone())
494 }
495
496 pub fn start_local_activity<AD: ActivityDefinition>(
498 &self,
499 _activity: AD,
500 input: impl Into<AD::Input>,
501 opts: LocalActivityOptions,
502 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
503 where
504 AD::Output: TemporalDeserializable,
505 {
506 let input = input.into();
507 let ctx = SerializationContext {
508 data: &SerializationContextData::Workflow,
509 converter: &self.inner.payload_converter,
510 };
511 let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
512 Ok(p) => p,
513 Err(e) => {
514 return ActivityFut::eager(e.into());
515 }
516 };
517 ActivityFut::running(
518 LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
519 self.inner.payload_converter.clone(),
520 )
521 }
522
523 fn child_workflow<WD: WorkflowDefinition>(
525 &self,
526 workflow: WD,
527 input: impl Into<WD::Input>,
528 opts: ChildWorkflowOptions,
529 ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
530 where
531 WD::Output: TemporalDeserializable,
532 {
533 let input = input.into();
534 let ctx = SerializationContext {
535 data: &SerializationContextData::Workflow,
536 converter: &self.inner.payload_converter,
537 };
538 let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
539 Ok(p) => p,
540 Err(e) => {
541 return ChildWorkflowStartFut::eager(e.into());
542 }
543 };
544 let workflow_type = workflow.name().to_string();
545
546 let child_seq = self.inner.seq_nums.borrow_mut().next_child_workflow_seq();
547 let (result_cmd, unblocker) = CancellableWFCommandFut::new(
551 CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
552 self.clone(),
553 );
554 self.send(
555 CommandSubscribeChildWorkflowCompletion {
556 seq: child_seq,
557 unblocker,
558 }
559 .into(),
560 );
561
562 let common = ChildWfCommon {
563 workflow_id: opts.workflow_id.clone(),
564 child_seq,
565 result_future: result_cmd,
566 base_ctx: self.clone(),
567 payload_converter: self.inner.payload_converter.clone(),
568 };
569
570 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
571 CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
572 common,
573 self.clone(),
574 );
575 self.send(
576 CommandCreateRequest {
577 cmd: opts.into_command(workflow_type, payloads, child_seq),
578 unblocker,
579 }
580 .into(),
581 );
582
583 ChildWorkflowStartFut::Running(cmd)
584 }
585
586 fn local_activity_no_timer_retry(
588 self,
589 activity_type: String,
590 arguments: Vec<Payload>,
591 opts: LocalActivityOptions,
592 ) -> impl CancellableFuture<ActivityResolution> {
593 let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
594 let (cmd, unblocker) =
595 CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
596 self.inner
597 .chan
598 .send(
599 CommandCreateRequest {
600 cmd: opts.into_command(activity_type, arguments, seq),
601 unblocker,
602 }
603 .into(),
604 )
605 .expect("command channel intact");
606 cmd
607 }
608
609 fn send_signal_wf(
610 self,
611 target: sig_we::Target,
612 signal: Signal,
613 ) -> impl CancellableFuture<SignalExternalWfResult> {
614 let seq = self
615 .inner
616 .seq_nums
617 .borrow_mut()
618 .next_signal_external_wf_seq();
619 let (cmd, unblocker) =
620 CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
621 self.send(
622 CommandCreateRequest {
623 cmd: WorkflowCommand {
624 variant: Some(
625 SignalExternalWorkflowExecution {
626 seq,
627 signal_name: signal.signal_name,
628 args: signal.data.input,
629 target: Some(target),
630 headers: signal.data.headers,
631 }
632 .into(),
633 ),
634 user_metadata: None,
635 },
636 unblocker,
637 }
638 .into(),
639 );
640 cmd
641 }
642}
643
644impl<W> SyncWorkflowContext<W> {
645 pub fn workflow_id(&self) -> &str {
647 &self.base.inner.inital_information.workflow_id
648 }
649
650 pub fn run_id(&self) -> &str {
652 &self.base.inner.run_id
653 }
654
655 pub fn namespace(&self) -> &str {
657 &self.base.inner.namespace
658 }
659
660 pub fn task_queue(&self) -> &str {
662 &self.base.inner.task_queue
663 }
664
665 pub fn workflow_time(&self) -> Option<SystemTime> {
667 self.base.inner.shared.borrow().wf_time
668 }
669
670 pub fn history_length(&self) -> u32 {
672 self.base.inner.shared.borrow().history_length
673 }
674
675 pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
679 self.base
680 .inner
681 .shared
682 .borrow()
683 .current_deployment_version
684 .clone()
685 }
686
687 pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
689 Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
690 }
691
692 pub fn random_seed(&self) -> u64 {
694 self.base.inner.shared.borrow().random_seed
695 }
696
697 pub fn is_replaying(&self) -> bool {
699 self.base.inner.shared.borrow().is_replaying
700 }
701
702 pub fn continue_as_new_suggested(&self) -> bool {
704 self.base.inner.shared.borrow().continue_as_new_suggested
705 }
706
707 pub fn headers(&self) -> &HashMap<String, Payload> {
712 &self.headers
713 }
714
715 pub fn payload_converter(&self) -> &PayloadConverter {
717 &self.base.inner.payload_converter
718 }
719
720 pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
723 &self.base.inner.inital_information
724 }
725
726 pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
728 let am_cancelled = self.base.inner.am_cancelled.clone();
729 async move {
730 if let Some(s) = am_cancelled.borrow().as_ref() {
731 return s.clone();
732 }
733 am_cancelled
734 .clone()
735 .changed()
736 .await
737 .expect("Cancelled send half not dropped");
738 am_cancelled.borrow().as_ref().cloned().unwrap_or_default()
739 }
740 .fuse()
741 }
742
743 pub fn continue_as_new(
748 &self,
749 input: &<W::Run as WorkflowDefinition>::Input,
750 opts: ContinueAsNewOptions,
751 ) -> Result<std::convert::Infallible, WorkflowTermination>
752 where
753 W: crate::workflows::WorkflowImplementation,
754 {
755 let pc = &self.base.inner.payload_converter;
756 let ctx = SerializationContext {
757 data: &SerializationContextData::Workflow,
758 converter: pc,
759 };
760 let arguments = pc
761 .to_payloads(&ctx, input)
762 .map_err(WorkflowTermination::failed)?;
763 let workflow_type = self.workflow_initial_info().workflow_type.clone();
764 let proto = opts.into_proto(workflow_type, arguments);
765 Err(WorkflowTermination::continue_as_new(proto))
766 }
767
768 pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
770 self.base.timer(opts)
771 }
772
773 pub fn start_activity<AD: ActivityDefinition>(
775 &self,
776 activity: AD,
777 input: impl Into<AD::Input>,
778 opts: ActivityOptions,
779 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
780 where
781 AD::Output: TemporalDeserializable,
782 {
783 self.base.start_activity(activity, input, opts)
784 }
785
786 pub fn start_local_activity<AD: ActivityDefinition>(
788 &self,
789 activity: AD,
790 input: impl Into<AD::Input>,
791 opts: LocalActivityOptions,
792 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
793 where
794 AD::Output: TemporalDeserializable,
795 {
796 self.base.start_local_activity(activity, input, opts)
797 }
798
799 pub fn child_workflow<WD: WorkflowDefinition>(
802 &self,
803 workflow: WD,
804 input: impl Into<WD::Input>,
805 opts: ChildWorkflowOptions,
806 ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
807 where
808 WD::Output: TemporalDeserializable,
809 {
810 self.base.child_workflow(workflow, input, opts)
811 }
812
813 pub fn patched(&self, patch_id: &str) -> bool {
815 self.patch_impl(patch_id, false)
816 }
817
818 pub fn deprecate_patch(&self, patch_id: &str) -> bool {
821 self.patch_impl(patch_id, true)
822 }
823
824 fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
825 self.base.send(
826 workflow_command::Variant::SetPatchMarker(SetPatchMarker {
827 patch_id: patch_id.to_string(),
828 deprecated,
829 })
830 .into(),
831 );
832 if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
834 return *present;
835 }
836
837 let res = !self.base.inner.shared.borrow().is_replaying;
840
841 self.base
842 .inner
843 .shared
844 .borrow_mut()
845 .changes
846 .insert(patch_id.to_string(), res);
847
848 res
849 }
850
851 pub fn external_workflow(
853 &self,
854 workflow_id: impl Into<String>,
855 run_id: Option<String>,
856 ) -> ExternalWorkflowHandle {
857 ExternalWorkflowHandle {
858 workflow_id: workflow_id.into(),
859 run_id,
860 namespace: self.base.inner.namespace.clone(),
861 base_ctx: self.base.clone(),
862 }
863 }
864
865 pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
867 self.base.send(RustWfCmd::NewNonblockingCmd(
868 workflow_command::Variant::UpsertWorkflowSearchAttributes(
869 UpsertWorkflowSearchAttributes {
870 search_attributes: Some(SearchAttributes {
871 indexed_fields: HashMap::from_iter(attr_iter),
872 }),
873 },
874 ),
875 ))
876 }
877
878 pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
880 self.base.send(RustWfCmd::NewNonblockingCmd(
881 workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
882 upserted_memo: Some(Memo {
883 fields: HashMap::from_iter(attr_iter),
884 }),
885 }),
886 ))
887 }
888
889 pub fn set_current_details(&self, details: impl Into<String>) {
894 self.base.inner.shared.borrow_mut().current_details = details.into();
895 }
896
897 pub fn force_task_fail(&self, with: anyhow::Error) {
899 self.base.send(with.into());
900 }
901
902 pub fn start_nexus_operation(
904 &self,
905 opts: NexusOperationOptions,
906 ) -> impl CancellableFuture<NexusStartResult> {
907 let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
908 let (result_future, unblocker) = WFCommandFut::new();
909 self.base
910 .send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker });
911 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
912 CancellableID::NexusOp(seq),
913 NexusUnblockData {
914 result_future: result_future.shared(),
915 schedule_seq: seq,
916 base_ctx: self.base.clone(),
917 },
918 self.base.clone(),
919 );
920 self.base.send(
921 CommandCreateRequest {
922 cmd: opts.into_command(seq),
923 unblocker,
924 }
925 .into(),
926 );
927 cmd
928 }
929
930 pub(crate) fn view(&self) -> WorkflowContextView {
932 self.base.view()
933 }
934}
935
936impl<W> WorkflowContext<W> {
937 pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
939 Self {
940 sync: SyncWorkflowContext {
941 base,
942 headers: Rc::new(HashMap::new()),
943 _phantom: PhantomData,
944 },
945 workflow_state,
946 condition_wakers: Rc::new(RefCell::new(Vec::new())),
947 }
948 }
949
950 pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
952 Self {
953 sync: SyncWorkflowContext {
954 base: self.sync.base.clone(),
955 headers: Rc::new(headers),
956 _phantom: PhantomData,
957 },
958 workflow_state: self.workflow_state.clone(),
959 condition_wakers: self.condition_wakers.clone(),
960 }
961 }
962
963 pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
965 self.sync.clone()
966 }
967
968 pub fn workflow_id(&self) -> &str {
972 self.sync.workflow_id()
973 }
974
975 pub fn run_id(&self) -> &str {
977 self.sync.run_id()
978 }
979
980 pub fn namespace(&self) -> &str {
982 self.sync.namespace()
983 }
984
985 pub fn task_queue(&self) -> &str {
987 self.sync.task_queue()
988 }
989
990 pub fn workflow_time(&self) -> Option<SystemTime> {
992 self.sync.workflow_time()
993 }
994
995 pub fn history_length(&self) -> u32 {
997 self.sync.history_length()
998 }
999
1000 pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
1004 self.sync.current_deployment_version()
1005 }
1006
1007 pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
1009 self.sync.search_attributes()
1010 }
1011
1012 pub fn random_seed(&self) -> u64 {
1014 self.sync.random_seed()
1015 }
1016
1017 pub fn is_replaying(&self) -> bool {
1019 self.sync.is_replaying()
1020 }
1021
1022 pub fn continue_as_new_suggested(&self) -> bool {
1024 self.sync.continue_as_new_suggested()
1025 }
1026
1027 pub fn headers(&self) -> &HashMap<String, Payload> {
1029 self.sync.headers()
1030 }
1031
1032 pub fn payload_converter(&self) -> &PayloadConverter {
1034 self.sync.payload_converter()
1035 }
1036
1037 pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
1039 self.sync.workflow_initial_info()
1040 }
1041
1042 pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
1044 self.sync.cancelled()
1045 }
1046
1047 pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
1049 self.sync.timer(opts)
1050 }
1051
1052 pub fn start_activity<AD: ActivityDefinition>(
1054 &self,
1055 activity: AD,
1056 input: impl Into<AD::Input>,
1057 opts: ActivityOptions,
1058 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
1059 where
1060 AD::Output: TemporalDeserializable,
1061 {
1062 self.sync.start_activity(activity, input, opts)
1063 }
1064
1065 pub fn start_local_activity<AD: ActivityDefinition>(
1067 &self,
1068 activity: AD,
1069 input: impl Into<AD::Input>,
1070 opts: LocalActivityOptions,
1071 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
1072 where
1073 AD::Output: TemporalDeserializable,
1074 {
1075 self.sync.start_local_activity(activity, input, opts)
1076 }
1077
1078 pub fn child_workflow<WD: WorkflowDefinition>(
1080 &self,
1081 workflow: WD,
1082 input: impl Into<WD::Input>,
1083 opts: ChildWorkflowOptions,
1084 ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
1085 where
1086 WD::Output: TemporalDeserializable,
1087 {
1088 self.sync.child_workflow(workflow, input, opts)
1089 }
1090
1091 pub fn patched(&self, patch_id: &str) -> bool {
1093 self.sync.patched(patch_id)
1094 }
1095
1096 pub fn deprecate_patch(&self, patch_id: &str) -> bool {
1099 self.sync.deprecate_patch(patch_id)
1100 }
1101
1102 pub fn external_workflow(
1104 &self,
1105 workflow_id: impl Into<String>,
1106 run_id: Option<String>,
1107 ) -> ExternalWorkflowHandle {
1108 self.sync.external_workflow(workflow_id, run_id)
1109 }
1110
1111 pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1113 self.sync.upsert_search_attributes(attr_iter)
1114 }
1115
1116 pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1118 self.sync.upsert_memo(attr_iter)
1119 }
1120
1121 pub fn set_current_details(&self, details: impl Into<String>) {
1125 self.sync.set_current_details(details)
1126 }
1127
1128 pub fn force_task_fail(&self, with: anyhow::Error) {
1130 self.sync.force_task_fail(with)
1131 }
1132
1133 pub fn start_nexus_operation(
1135 &self,
1136 opts: NexusOperationOptions,
1137 ) -> impl CancellableFuture<NexusStartResult> {
1138 self.sync.start_nexus_operation(opts)
1139 }
1140
1141 pub(crate) fn view(&self) -> WorkflowContextView {
1143 self.sync.view()
1144 }
1145
1146 pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
1151 f(&*self.workflow_state.borrow())
1152 }
1153
1154 pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
1163 let result = f(&mut *self.workflow_state.borrow_mut());
1164 let _guard = SdkWakeGuard::new();
1165 for waker in self.condition_wakers.borrow_mut().drain(..) {
1166 waker.wake();
1167 }
1168 self.sync.base.set_state_mutated();
1169 result
1170 }
1171
1172 pub fn continue_as_new(
1177 &self,
1178 input: &<W::Run as WorkflowDefinition>::Input,
1179 opts: ContinueAsNewOptions,
1180 ) -> Result<std::convert::Infallible, WorkflowTermination>
1181 where
1182 W: crate::workflows::WorkflowImplementation,
1183 {
1184 self.sync.continue_as_new(input, opts)
1185 }
1186
1187 pub fn wait_condition<'a>(
1192 &'a self,
1193 mut condition: impl FnMut(&W) -> bool + 'a,
1194 ) -> impl FusedFuture<Output = ()> + 'a {
1195 future::poll_fn(move |cx: &mut Context<'_>| {
1196 if condition(&*self.workflow_state.borrow()) {
1197 Poll::Ready(())
1198 } else {
1199 self.condition_wakers.borrow_mut().push(cx.waker().clone());
1200 Poll::Pending
1201 }
1202 })
1203 .fuse()
1204 }
1205}
1206
1207struct WfCtxProtectedDat {
1208 next_timer_sequence_number: u32,
1209 next_activity_sequence_number: u32,
1210 next_child_workflow_sequence_number: u32,
1211 next_cancel_external_wf_sequence_number: u32,
1212 next_signal_external_wf_sequence_number: u32,
1213 next_nexus_op_sequence_number: u32,
1214}
1215
1216impl WfCtxProtectedDat {
1217 fn next_timer_seq(&mut self) -> u32 {
1218 let seq = self.next_timer_sequence_number;
1219 self.next_timer_sequence_number += 1;
1220 seq
1221 }
1222 fn next_activity_seq(&mut self) -> u32 {
1223 let seq = self.next_activity_sequence_number;
1224 self.next_activity_sequence_number += 1;
1225 seq
1226 }
1227 fn next_child_workflow_seq(&mut self) -> u32 {
1228 let seq = self.next_child_workflow_sequence_number;
1229 self.next_child_workflow_sequence_number += 1;
1230 seq
1231 }
1232 fn next_cancel_external_wf_seq(&mut self) -> u32 {
1233 let seq = self.next_cancel_external_wf_sequence_number;
1234 self.next_cancel_external_wf_sequence_number += 1;
1235 seq
1236 }
1237 fn next_signal_external_wf_seq(&mut self) -> u32 {
1238 let seq = self.next_signal_external_wf_sequence_number;
1239 self.next_signal_external_wf_sequence_number += 1;
1240 seq
1241 }
1242 fn next_nexus_op_seq(&mut self) -> u32 {
1243 let seq = self.next_nexus_op_sequence_number;
1244 self.next_nexus_op_sequence_number += 1;
1245 seq
1246 }
1247}
1248
1249#[derive(Clone, Debug, Default)]
1250pub(crate) struct WorkflowContextSharedData {
1251 pub(crate) changes: HashMap<String, bool>,
1253 pub(crate) is_replaying: bool,
1254 pub(crate) wf_time: Option<SystemTime>,
1255 pub(crate) history_length: u32,
1256 pub(crate) continue_as_new_suggested: bool,
1257 pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
1258 pub(crate) search_attributes: SearchAttributes,
1259 pub(crate) random_seed: u64,
1260 pub(crate) current_details: String,
1262}
1263
1264pub trait CancellableFuture<T>: Future<Output = T> + FusedFuture {
1267 fn cancel(&self);
1269}
1270
1271pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
1273 fn cancel_with_reason(&self, reason: String);
1275}
1276
1277struct WFCommandFut<T, D> {
1278 _unused: PhantomData<T>,
1279 result_rx: oneshot::Receiver<UnblockEvent>,
1280 other_dat: Option<D>,
1281}
1282impl<T> WFCommandFut<T, ()> {
1283 fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
1284 Self::new_with_dat(())
1285 }
1286}
1287
1288impl<T, D> WFCommandFut<T, D> {
1289 fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
1290 let (tx, rx) = oneshot::channel();
1291 (
1292 Self {
1293 _unused: PhantomData,
1294 result_rx: rx,
1295 other_dat: Some(other_dat),
1296 },
1297 tx,
1298 )
1299 }
1300}
1301
1302impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1303impl<T, D> Future for WFCommandFut<T, D>
1304where
1305 T: Unblockable<OtherDat = D>,
1306{
1307 type Output = T;
1308
1309 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1310 self.result_rx.poll_unpin(cx).map(|x| {
1311 let od = self
1312 .other_dat
1313 .take()
1314 .expect("Other data must exist when resolving command future");
1315 Unblockable::unblock(x.unwrap(), od)
1316 })
1317 }
1318}
1319impl<T, D> FusedFuture for WFCommandFut<T, D>
1320where
1321 T: Unblockable<OtherDat = D>,
1322{
1323 fn is_terminated(&self) -> bool {
1324 self.other_dat.is_none()
1325 }
1326}
1327
1328struct CancellableWFCommandFut<T, D, ID = CancellableID> {
1329 cmd_fut: WFCommandFut<T, D>,
1330 cancellable_id: ID,
1331 base_ctx: BaseWorkflowContext,
1332}
1333impl<T, ID> CancellableWFCommandFut<T, (), ID> {
1334 fn new(
1335 cancellable_id: ID,
1336 base_ctx: BaseWorkflowContext,
1337 ) -> (Self, oneshot::Sender<UnblockEvent>) {
1338 Self::new_with_dat(cancellable_id, (), base_ctx)
1339 }
1340}
1341impl<T, D, ID> CancellableWFCommandFut<T, D, ID> {
1342 fn new_with_dat(
1343 cancellable_id: ID,
1344 other_dat: D,
1345 base_ctx: BaseWorkflowContext,
1346 ) -> (Self, oneshot::Sender<UnblockEvent>) {
1347 let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
1348 (
1349 Self {
1350 cmd_fut,
1351 cancellable_id,
1352 base_ctx,
1353 },
1354 sender,
1355 )
1356 }
1357}
1358impl<T, D, ID> Unpin for CancellableWFCommandFut<T, D, ID> where T: Unblockable<OtherDat = D> {}
1359impl<T, D, ID> Future for CancellableWFCommandFut<T, D, ID>
1360where
1361 T: Unblockable<OtherDat = D>,
1362{
1363 type Output = T;
1364
1365 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1366 self.cmd_fut.poll_unpin(cx)
1367 }
1368}
1369impl<T, D, ID> FusedFuture for CancellableWFCommandFut<T, D, ID>
1370where
1371 T: Unblockable<OtherDat = D>,
1372{
1373 fn is_terminated(&self) -> bool {
1374 self.cmd_fut.is_terminated()
1375 }
1376}
1377
1378impl<T, D, ID> CancellableFuture<T> for CancellableWFCommandFut<T, D, ID>
1379where
1380 T: Unblockable<OtherDat = D>,
1381 ID: Clone + Into<CancellableID>,
1382{
1383 fn cancel(&self) {
1384 self.base_ctx.cancel(self.cancellable_id.clone().into());
1385 }
1386}
1387impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D, CancellableIDWithReason>
1388where
1389 T: Unblockable<OtherDat = D>,
1390{
1391 fn cancel_with_reason(&self, reason: String) {
1392 let new_id = self.cancellable_id.clone().with_reason(reason);
1393 self.base_ctx.cancel(new_id);
1394 }
1395}
1396
1397struct LATimerBackoffFut {
1398 la_opts: LocalActivityOptions,
1399 activity_type: String,
1400 arguments: Vec<Payload>,
1401 current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
1402 timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
1403 base_ctx: BaseWorkflowContext,
1404 next_attempt: u32,
1405 next_sched_time: Option<prost_types::Timestamp>,
1406 did_cancel: AtomicBool,
1407 terminated: bool,
1408}
1409impl LATimerBackoffFut {
1410 pub(crate) fn new(
1411 activity_type: String,
1412 arguments: Vec<Payload>,
1413 opts: LocalActivityOptions,
1414 base_ctx: BaseWorkflowContext,
1415 ) -> Self {
1416 let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
1417 activity_type.clone(),
1418 arguments.clone(),
1419 opts.clone(),
1420 ));
1421 Self {
1422 la_opts: opts,
1423 activity_type,
1424 arguments,
1425 current_fut,
1426 timer_fut: None,
1427 base_ctx,
1428 next_attempt: 1,
1429 next_sched_time: None,
1430 did_cancel: AtomicBool::new(false),
1431 terminated: false,
1432 }
1433 }
1434}
1435impl Unpin for LATimerBackoffFut {}
1436impl Future for LATimerBackoffFut {
1437 type Output = ActivityResolution;
1438
1439 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1440 if let Some(tf) = self.timer_fut.as_mut() {
1442 return match tf.poll_unpin(cx) {
1443 Poll::Ready(tr) => {
1444 self.timer_fut = None;
1445 if let TimerResult::Fired = tr {
1447 let mut opts = self.la_opts.clone();
1448 opts.attempt = Some(self.next_attempt);
1449 opts.original_schedule_time
1450 .clone_from(&self.next_sched_time);
1451 self.current_fut =
1452 Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
1453 self.activity_type.clone(),
1454 self.arguments.clone(),
1455 opts,
1456 ));
1457 Poll::Pending
1458 } else {
1459 self.terminated = true;
1460 Poll::Ready(ActivityResolution {
1461 status: Some(
1462 activity_resolution::Status::Cancelled(Default::default()),
1463 ),
1464 })
1465 }
1466 }
1467 Poll::Pending => Poll::Pending,
1468 };
1469 }
1470 let poll_res = self.current_fut.poll_unpin(cx);
1471 if let Poll::Ready(ref r) = poll_res
1472 && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
1473 {
1474 if self.did_cancel.load(Ordering::Acquire) {
1478 self.terminated = true;
1479 return Poll::Ready(ActivityResolution {
1480 status: Some(activity_resolution::Status::Cancelled(Default::default())),
1481 });
1482 }
1483
1484 let timer_f = self.base_ctx.timer::<Duration>(
1485 b.backoff_duration
1486 .expect("Duration is set")
1487 .try_into()
1488 .expect("duration converts ok"),
1489 );
1490 self.timer_fut = Some(Box::pin(timer_f));
1491 self.next_attempt = b.attempt;
1492 self.next_sched_time.clone_from(&b.original_schedule_time);
1493 return Poll::Pending;
1494 }
1495 if poll_res.is_ready() {
1496 self.terminated = true;
1497 }
1498 poll_res
1499 }
1500}
1501impl FusedFuture for LATimerBackoffFut {
1502 fn is_terminated(&self) -> bool {
1503 self.terminated
1504 }
1505}
1506impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
1507 fn cancel(&self) {
1508 self.did_cancel.store(true, Ordering::Release);
1509 if let Some(tf) = self.timer_fut.as_ref() {
1510 tf.cancel();
1511 }
1512 self.current_fut.cancel();
1513 }
1514}
1515
1516enum ActivityFut<F, Output> {
1518 Errored {
1520 error: Option<ActivityExecutionError>,
1521 _phantom: PhantomData<Output>,
1522 },
1523 Running {
1525 inner: F,
1526 payload_converter: PayloadConverter,
1527 _phantom: PhantomData<Output>,
1528 },
1529 Terminated,
1530}
1531
1532impl<F, Output> ActivityFut<F, Output> {
1533 fn eager(err: ActivityExecutionError) -> Self {
1534 Self::Errored {
1535 error: Some(err),
1536 _phantom: PhantomData,
1537 }
1538 }
1539
1540 fn running(inner: F, payload_converter: PayloadConverter) -> Self {
1541 Self::Running {
1542 inner,
1543 payload_converter,
1544 _phantom: PhantomData,
1545 }
1546 }
1547}
1548
1549impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
1550
1551impl<F, Output> Future for ActivityFut<F, Output>
1552where
1553 F: Future<Output = ActivityResolution> + Unpin,
1554 Output: TemporalDeserializable + 'static,
1555{
1556 type Output = Result<Output, ActivityExecutionError>;
1557
1558 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1559 let this = self.get_mut();
1560 let poll = match this {
1561 ActivityFut::Errored { error, .. } => {
1562 Poll::Ready(Err(error.take().expect("polled after completion")))
1563 }
1564 ActivityFut::Running {
1565 inner,
1566 payload_converter,
1567 ..
1568 } => match Pin::new(inner).poll(cx) {
1569 Poll::Pending => Poll::Pending,
1570 Poll::Ready(resolution) => Poll::Ready({
1571 let status = resolution.status.ok_or_else(|| {
1572 ActivityExecutionError::Failed(Box::new(Failure {
1573 message: "Activity completed without a status".to_string(),
1574 ..Default::default()
1575 }))
1576 })?;
1577
1578 match status {
1579 activity_resolution::Status::Completed(success) => {
1580 let payload = success.result.unwrap_or_default();
1581 let ctx = SerializationContext {
1582 data: &SerializationContextData::Workflow,
1583 converter: payload_converter,
1584 };
1585 payload_converter
1586 .from_payload::<Output>(&ctx, payload)
1587 .map_err(ActivityExecutionError::Serialization)
1588 }
1589 activity_resolution::Status::Failed(f) => Err(
1590 ActivityExecutionError::Failed(Box::new(f.failure.unwrap_or_default())),
1591 ),
1592 activity_resolution::Status::Cancelled(c) => {
1593 Err(ActivityExecutionError::Cancelled(Box::new(
1594 c.failure.unwrap_or_default(),
1595 )))
1596 }
1597 activity_resolution::Status::Backoff(_) => {
1598 panic!("DoBackoff should be handled by LATimerBackoffFut")
1599 }
1600 }
1601 }),
1602 },
1603 ActivityFut::Terminated => panic!("polled after termination"),
1604 };
1605 if poll.is_ready() {
1606 *this = ActivityFut::Terminated;
1607 }
1608 poll
1609 }
1610}
1611
1612impl<F, Output> FusedFuture for ActivityFut<F, Output>
1613where
1614 F: Future<Output = ActivityResolution> + Unpin,
1615 Output: TemporalDeserializable + 'static,
1616{
1617 fn is_terminated(&self) -> bool {
1618 matches!(self, ActivityFut::Terminated)
1619 }
1620}
1621
1622impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
1623where
1624 F: CancellableFuture<ActivityResolution> + Unpin,
1625 Output: TemporalDeserializable + 'static,
1626{
1627 fn cancel(&self) {
1628 if let ActivityFut::Running { inner, .. } = self {
1629 inner.cancel()
1630 }
1631 }
1632}
1633
1634pub(crate) struct ChildWfCommon {
1635 workflow_id: String,
1636 child_seq: u32,
1637 result_future: CancellableWFCommandFut<ChildWorkflowResult, (), CancellableIDWithReason>,
1638 base_ctx: BaseWorkflowContext,
1639 payload_converter: PayloadConverter,
1640}
1641
1642#[derive(derive_more::Debug)]
1646pub(crate) struct PendingChildWorkflow<WD: WorkflowDefinition> {
1647 pub(crate) status: ChildWorkflowStartStatus,
1648 #[debug(skip)]
1649 pub(crate) common: ChildWfCommon,
1650 pub(crate) _phantom: PhantomData<WD>,
1651}
1652
1653#[derive(derive_more::Debug)]
1655pub struct StartedChildWorkflow<WD: WorkflowDefinition> {
1656 pub run_id: String,
1658 #[debug(skip)]
1659 common: ChildWfCommon,
1660 _phantom: PhantomData<WD>,
1661}
1662
1663enum ChildWorkflowFut<F, Output> {
1666 Running {
1667 inner: F,
1668 payload_converter: PayloadConverter,
1669 _phantom: PhantomData<Output>,
1670 },
1671 Terminated,
1672}
1673
1674impl<F, Output> Unpin for ChildWorkflowFut<F, Output> where F: Unpin {}
1675
1676impl<F, Output> Future for ChildWorkflowFut<F, Output>
1677where
1678 F: Future<Output = ChildWorkflowResult> + Unpin,
1679 Output: TemporalDeserializable + 'static,
1680{
1681 type Output = Result<Output, ChildWorkflowExecutionError>;
1682
1683 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1684 let this = self.get_mut();
1685 let poll = match this {
1686 ChildWorkflowFut::Running {
1687 inner,
1688 payload_converter,
1689 ..
1690 } => match Pin::new(inner).poll(cx) {
1691 Poll::Pending => Poll::Pending,
1692 Poll::Ready(result) => Poll::Ready({
1693 use temporalio_common::protos::coresdk::child_workflow::child_workflow_result;
1694 let status = result.status.ok_or_else(|| {
1695 ChildWorkflowExecutionError::Failed(Box::new(Failure {
1696 message: "Child workflow completed without a status".to_string(),
1697 ..Default::default()
1698 }))
1699 })?;
1700 match status {
1701 child_workflow_result::Status::Completed(success) => {
1702 let payloads = success.result.into_iter().collect();
1703 let ctx = SerializationContext {
1704 data: &SerializationContextData::Workflow,
1705 converter: payload_converter,
1706 };
1707 payload_converter
1708 .from_payloads::<Output>(&ctx, payloads)
1709 .map_err(ChildWorkflowExecutionError::Serialization)
1710 }
1711 child_workflow_result::Status::Failed(f) => {
1712 Err(ChildWorkflowExecutionError::Failed(Box::new(
1713 f.failure.unwrap_or_default(),
1714 )))
1715 }
1716 child_workflow_result::Status::Cancelled(c) => {
1717 Err(ChildWorkflowExecutionError::Cancelled(Box::new(
1718 c.failure.unwrap_or_default(),
1719 )))
1720 }
1721 }
1722 }),
1723 },
1724 ChildWorkflowFut::Terminated => panic!("polled after termination"),
1725 };
1726 if poll.is_ready() {
1727 *this = ChildWorkflowFut::Terminated;
1728 }
1729 poll
1730 }
1731}
1732
1733impl<F, Output> FusedFuture for ChildWorkflowFut<F, Output>
1734where
1735 F: Future<Output = ChildWorkflowResult> + Unpin,
1736 Output: TemporalDeserializable + 'static,
1737{
1738 fn is_terminated(&self) -> bool {
1739 matches!(self, ChildWorkflowFut::Terminated)
1740 }
1741}
1742
1743impl<F, Output> CancellableFutureWithReason<Result<Output, ChildWorkflowExecutionError>>
1744 for ChildWorkflowFut<F, Output>
1745where
1746 F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
1747 Output: TemporalDeserializable + 'static,
1748{
1749 fn cancel_with_reason(&self, reason: String) {
1750 if let ChildWorkflowFut::Running { inner, .. } = self {
1751 inner.cancel_with_reason(reason)
1752 }
1753 }
1754}
1755
1756impl<F, Output> CancellableFuture<Result<Output, ChildWorkflowExecutionError>>
1757 for ChildWorkflowFut<F, Output>
1758where
1759 F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
1760 Output: TemporalDeserializable + 'static,
1761{
1762 fn cancel(&self) {
1763 if let ChildWorkflowFut::Running { inner, .. } = self {
1764 inner.cancel()
1765 }
1766 }
1767}
1768
1769enum ChildWorkflowStartFut<F, WD: WorkflowDefinition> {
1772 Errored {
1774 error: Option<ChildWorkflowExecutionError>,
1775 _phantom: PhantomData<WD>,
1776 },
1777 Running(F),
1778 Terminated,
1779}
1780
1781impl<F, WD: WorkflowDefinition> ChildWorkflowStartFut<F, WD> {
1782 fn eager(err: ChildWorkflowExecutionError) -> Self {
1783 Self::Errored {
1784 error: Some(err),
1785 _phantom: PhantomData,
1786 }
1787 }
1788}
1789
1790impl<F, WD: WorkflowDefinition> Unpin for ChildWorkflowStartFut<F, WD> where F: Unpin {}
1791
1792impl<F, WD> Future for ChildWorkflowStartFut<F, WD>
1793where
1794 F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
1795 WD: WorkflowDefinition,
1796{
1797 type Output = Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>;
1798
1799 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1800 let this = self.get_mut();
1801 let poll = match this {
1802 ChildWorkflowStartFut::Errored { error, .. } => {
1803 Poll::Ready(Err(error.take().expect("polled after completion")))
1804 }
1805 ChildWorkflowStartFut::Running(inner) => match Pin::new(inner).poll(cx) {
1806 Poll::Pending => Poll::Pending,
1807 Poll::Ready(pending) => Poll::Ready(match pending.status {
1808 ChildWorkflowStartStatus::Succeeded(s) => Ok(StartedChildWorkflow {
1809 run_id: s.run_id,
1810 common: pending.common,
1811 _phantom: PhantomData,
1812 }),
1813 ChildWorkflowStartStatus::Failed(f) => {
1814 Err(ChildWorkflowExecutionError::StartFailed {
1815 workflow_id: f.workflow_id,
1816 workflow_type: f.workflow_type,
1817 cause: StartChildWorkflowExecutionFailedCause::try_from(f.cause)
1818 .unwrap_or(StartChildWorkflowExecutionFailedCause::Unspecified),
1819 })
1820 }
1821 ChildWorkflowStartStatus::Cancelled(c) => {
1822 Err(ChildWorkflowExecutionError::Cancelled(Box::new(
1823 c.failure.unwrap_or_default(),
1824 )))
1825 }
1826 }),
1827 },
1828 ChildWorkflowStartFut::Terminated => panic!("polled after termination"),
1829 };
1830 if poll.is_ready() {
1831 *this = ChildWorkflowStartFut::Terminated;
1832 }
1833 poll
1834 }
1835}
1836
1837impl<F, WD> FusedFuture for ChildWorkflowStartFut<F, WD>
1838where
1839 F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
1840 WD: WorkflowDefinition,
1841{
1842 fn is_terminated(&self) -> bool {
1843 matches!(self, ChildWorkflowStartFut::Terminated)
1844 }
1845}
1846
1847impl<F, WD> CancellableFuture<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
1848 for ChildWorkflowStartFut<F, WD>
1849where
1850 F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
1851 WD: WorkflowDefinition,
1852{
1853 fn cancel(&self) {
1854 if let ChildWorkflowStartFut::Running(inner) = self {
1855 inner.cancel()
1856 }
1857 }
1858}
1859
1860impl<F, WD>
1861 CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
1862 for ChildWorkflowStartFut<F, WD>
1863where
1864 F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
1865 WD: WorkflowDefinition,
1866{
1867 fn cancel_with_reason(&self, reason: String) {
1868 if let ChildWorkflowStartFut::Running(inner) = self {
1869 inner.cancel_with_reason(reason)
1870 }
1871 }
1872}
1873
1874enum SignalChildFut<F> {
1877 Errored {
1879 error: Option<ChildWorkflowSignalError>,
1880 },
1881 Running(F),
1882 Terminated,
1883}
1884
1885impl<F> SignalChildFut<F> {
1886 fn eager(err: ChildWorkflowSignalError) -> Self {
1887 Self::Errored { error: Some(err) }
1888 }
1889}
1890
1891impl<F> Unpin for SignalChildFut<F> where F: Unpin {}
1892
1893impl<F> Future for SignalChildFut<F>
1894where
1895 F: Future<Output = SignalExternalWfResult> + Unpin,
1896{
1897 type Output = Result<(), ChildWorkflowSignalError>;
1898
1899 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1900 let this = self.get_mut();
1901 let poll = match this {
1902 SignalChildFut::Errored { error } => {
1903 Poll::Ready(Err(error.take().expect("polled after completion")))
1904 }
1905 SignalChildFut::Running(inner) => match Pin::new(inner).poll(cx) {
1906 Poll::Pending => Poll::Pending,
1907 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
1908 Poll::Ready(Err(failure)) => {
1909 Poll::Ready(Err(ChildWorkflowSignalError::Failed(Box::new(failure))))
1910 }
1911 },
1912 SignalChildFut::Terminated => panic!("polled after termination"),
1913 };
1914 if poll.is_ready() {
1915 *this = SignalChildFut::Terminated;
1916 }
1917 poll
1918 }
1919}
1920
1921impl<F> FusedFuture for SignalChildFut<F>
1922where
1923 F: Future<Output = SignalExternalWfResult> + Unpin,
1924{
1925 fn is_terminated(&self) -> bool {
1926 matches!(self, SignalChildFut::Terminated)
1927 }
1928}
1929
1930impl<F> CancellableFuture<Result<(), ChildWorkflowSignalError>> for SignalChildFut<F>
1931where
1932 F: CancellableFuture<SignalExternalWfResult> + Unpin,
1933{
1934 fn cancel(&self) {
1935 if let SignalChildFut::Running(inner) = self {
1936 inner.cancel()
1937 }
1938 }
1939}
1940
1941impl<WD: WorkflowDefinition> StartedChildWorkflow<WD>
1942where
1943 WD::Output: TemporalDeserializable + 'static,
1944{
1945 pub fn result(
1948 self,
1949 ) -> impl CancellableFutureWithReason<Result<WD::Output, ChildWorkflowExecutionError>> {
1950 ChildWorkflowFut::Running {
1951 inner: self.common.result_future,
1952 payload_converter: self.common.payload_converter,
1953 _phantom: PhantomData,
1954 }
1955 }
1956
1957 pub fn cancel(&self, reason: String) {
1959 self.common.base_ctx.send(RustWfCmd::NewNonblockingCmd(
1960 CancelChildWorkflowExecution {
1961 child_workflow_seq: self.common.child_seq,
1962 reason,
1963 }
1964 .into(),
1965 ));
1966 }
1967
1968 pub fn signal<S: SignalDefinition<Workflow = WD>>(
1970 &self,
1971 signal: S,
1972 input: S::Input,
1973 ) -> impl CancellableFuture<Result<(), ChildWorkflowSignalError>> + 'static {
1974 let ctx = SerializationContext {
1975 data: &SerializationContextData::Workflow,
1976 converter: &self.common.payload_converter,
1977 };
1978 let payloads = match self.common.payload_converter.to_payloads(&ctx, &input) {
1979 Ok(p) => p,
1980 Err(e) => {
1981 return SignalChildFut::eager(e.into());
1982 }
1983 };
1984 let signal = Signal::new(S::name(&signal), payloads);
1985 let target = sig_we::Target::ChildWorkflowId(self.common.workflow_id.clone());
1986 SignalChildFut::Running(self.common.base_ctx.clone().send_signal_wf(target, signal))
1987 }
1988}
1989
1990#[derive(derive_more::Debug)]
1995pub struct ExternalWorkflowHandle {
1996 workflow_id: String,
1997 run_id: Option<String>,
1998 namespace: String,
1999 #[debug(skip)]
2000 base_ctx: BaseWorkflowContext,
2001}
2002
2003impl ExternalWorkflowHandle {
2004 pub fn workflow_id(&self) -> &str {
2006 &self.workflow_id
2007 }
2008
2009 pub fn run_id(&self) -> Option<&str> {
2011 self.run_id.as_deref()
2012 }
2013
2014 pub fn signal<S: SignalDefinition>(
2016 &self,
2017 signal: S,
2018 input: S::Input,
2019 ) -> impl CancellableFuture<SignalExternalWfResult> + 'static {
2020 let ctx = SerializationContext {
2021 data: &SerializationContextData::Workflow,
2022 converter: &self.base_ctx.inner.payload_converter,
2023 };
2024 let payloads = match self
2025 .base_ctx
2026 .inner
2027 .payload_converter
2028 .to_payloads(&ctx, &input)
2029 {
2030 Ok(p) => p,
2031 Err(e) => {
2032 return SignalExternalFut::SerializationError(Some(e));
2033 }
2034 };
2035 let signal = Signal::new(S::name(&signal), payloads);
2036 let target = sig_we::Target::WorkflowExecution(NamespacedWorkflowExecution {
2037 namespace: self.namespace.clone(),
2038 workflow_id: self.workflow_id.clone(),
2039 run_id: self.run_id.clone().unwrap_or_default(),
2040 });
2041 SignalExternalFut::Running(self.base_ctx.clone().send_signal_wf(target, signal))
2042 }
2043
2044 pub fn cancel(
2046 &self,
2047 reason: Option<String>,
2048 ) -> impl FusedFuture<Output = CancelExternalWfResult> {
2049 let seq = self
2050 .base_ctx
2051 .inner
2052 .seq_nums
2053 .borrow_mut()
2054 .next_cancel_external_wf_seq();
2055 let (cmd, unblocker) = WFCommandFut::new();
2056 self.base_ctx.send(
2057 CommandCreateRequest {
2058 cmd: WorkflowCommand {
2059 variant: Some(
2060 RequestCancelExternalWorkflowExecution {
2061 seq,
2062 workflow_execution: Some(NamespacedWorkflowExecution {
2063 namespace: self.namespace.clone(),
2064 workflow_id: self.workflow_id.clone(),
2065 run_id: self.run_id.clone().unwrap_or_default(),
2066 }),
2067 reason: reason.unwrap_or_default(),
2068 }
2069 .into(),
2070 ),
2071 user_metadata: None,
2072 },
2073 unblocker,
2074 }
2075 .into(),
2076 );
2077 cmd
2078 }
2079}
2080
2081enum SignalExternalFut<F> {
2082 Running(F),
2083 SerializationError(Option<PayloadConversionError>),
2084 Done,
2085}
2086
2087impl<F: Unpin> Unpin for SignalExternalFut<F> {}
2088
2089impl<F> Future for SignalExternalFut<F>
2090where
2091 F: Future<Output = SignalExternalWfResult> + Unpin,
2092{
2093 type Output = SignalExternalWfResult;
2094
2095 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2096 let this = self.get_mut();
2097 match this {
2098 SignalExternalFut::Running(inner) => {
2099 let result = std::task::ready!(Pin::new(inner).poll(cx));
2100 *this = SignalExternalFut::Done;
2101 Poll::Ready(result)
2102 }
2103 SignalExternalFut::SerializationError(e) => {
2104 let err = e.take().expect("polled after completion");
2105 *this = SignalExternalFut::Done;
2106 Poll::Ready(Err(Failure {
2107 message: format!("Failed to serialize signal input: {err}"),
2108 ..Default::default()
2109 }))
2110 }
2111 SignalExternalFut::Done => panic!("polled after completion"),
2112 }
2113 }
2114}
2115
2116impl<F> FusedFuture for SignalExternalFut<F>
2117where
2118 F: Future<Output = SignalExternalWfResult> + Unpin,
2119{
2120 fn is_terminated(&self) -> bool {
2121 matches!(self, SignalExternalFut::Done)
2122 }
2123}
2124
2125impl<F> CancellableFuture<SignalExternalWfResult> for SignalExternalFut<F>
2126where
2127 F: CancellableFuture<SignalExternalWfResult> + Unpin,
2128{
2129 fn cancel(&self) {
2130 if let SignalExternalFut::Running(inner) = self {
2131 inner.cancel()
2132 }
2133 }
2134}
2135
2136#[derive(derive_more::Debug)]
2137#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
2138pub struct StartedNexusOperation {
2139 pub operation_token: Option<String>,
2141 pub(crate) unblock_dat: NexusUnblockData,
2142}
2143
2144pub(crate) struct NexusUnblockData {
2145 result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
2146 schedule_seq: u32,
2147 base_ctx: BaseWorkflowContext,
2148}
2149
2150impl StartedNexusOperation {
2151 pub async fn result(&self) -> NexusOperationResult {
2152 self.unblock_dat.result_future.clone().await
2153 }
2154
2155 pub fn cancel(&self) {
2156 self.unblock_dat
2157 .base_ctx
2158 .cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
2159 }
2160}
2161
2162#[cfg(test)]
2163mod tests {
2164 use super::*;
2165 use std::collections::HashMap;
2166 use temporalio_common::{
2167 data_converters::{TemporalDeserializable, TemporalSerializable},
2168 protos::{
2169 coresdk::{AsJsonPayloadExt, common::VersioningIntent},
2170 temporal::api::common::v1::{Payload, RetryPolicy},
2171 },
2172 };
2173 use temporalio_macros::{workflow, workflow_methods};
2174
2175 #[workflow]
2176 #[derive(Default)]
2177 struct TestWorkflow;
2178
2179 #[workflow_methods]
2180 impl TestWorkflow {
2181 #[run]
2182 async fn run(_ctx: &mut WorkflowContext<Self>, _input: u8) -> crate::WorkflowResult<()> {
2183 unreachable!("test workflow run should not be polled")
2184 }
2185 }
2186
2187 fn test_context() -> WorkflowContext<TestWorkflow> {
2188 let init = InitializeWorkflow {
2189 workflow_type: TestWorkflow.name().to_string(),
2190 ..Default::default()
2191 };
2192 let (_, cancelled_rx) = watch::channel(None);
2193 let (base, _cmd_rx) = BaseWorkflowContext::new(
2194 "default".to_string(),
2195 "orig-task-queue".to_string(),
2196 "run-id".to_string(),
2197 init,
2198 cancelled_rx,
2199 PayloadConverter::default(),
2200 );
2201 WorkflowContext::from_base(base, Rc::new(RefCell::new(TestWorkflow)))
2202 }
2203
2204 #[test]
2205 fn workflow_context_continue_as_new_serializes_input_and_defaults() {
2206 let ctx = test_context();
2207
2208 let termination = ctx
2209 .continue_as_new(&7, ContinueAsNewOptions::default())
2210 .expect_err("continue_as_new should terminate the workflow");
2211 assert!(
2212 matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2213 "expected continue-as-new termination, got {termination:?}"
2214 );
2215 let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2216 unreachable!()
2217 };
2218
2219 assert_eq!(
2220 *cmd,
2221 temporalio_common::protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution {
2222 workflow_type: TestWorkflow.name().to_string(),
2223 arguments: vec![7u8.as_json_payload().unwrap()],
2224 versioning_intent: VersioningIntent::Unspecified as i32,
2225 ..Default::default()
2226 }
2227 );
2228 }
2229
2230 #[test]
2231 fn sync_workflow_context_continue_as_new_applies_options() {
2232 let ctx = test_context();
2233 let sync = ctx.sync_context();
2234 let mut memo = HashMap::new();
2235 memo.insert(
2236 "memo-key".to_string(),
2237 Payload::from(b"memo-value".as_slice()),
2238 );
2239 let mut headers = HashMap::new();
2240 headers.insert(
2241 "header-key".to_string(),
2242 Payload::from(b"header-value".as_slice()),
2243 );
2244 let mut search_attributes = SearchAttributes::default();
2245 search_attributes.indexed_fields.insert(
2246 "CustomKeywordField".to_string(),
2247 Payload::from(b"value".as_slice()),
2248 );
2249
2250 let termination = sync
2251 .continue_as_new(
2252 &11,
2253 ContinueAsNewOptions {
2254 workflow_type: Some("next-workflow".to_string()),
2255 task_queue: Some("next-task-queue".to_string()),
2256 run_timeout: Some(Duration::from_secs(10)),
2257 task_timeout: Some(Duration::from_secs(3)),
2258 memo: Some(memo.clone()),
2259 headers: Some(headers.clone()),
2260 search_attributes: Some(search_attributes.clone()),
2261 retry_policy: Some(RetryPolicy {
2262 maximum_attempts: 5,
2263 ..Default::default()
2264 }),
2265 versioning_intent: Some(VersioningIntent::Compatible),
2266 },
2267 )
2268 .expect_err("continue_as_new should terminate the workflow");
2269 assert!(
2270 matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2271 "expected continue-as-new termination, got {termination:?}"
2272 );
2273 let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2274 unreachable!()
2275 };
2276
2277 assert_eq!(
2278 *cmd,
2279 temporalio_common::protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution {
2280 workflow_type: "next-workflow".to_string(),
2281 task_queue: "next-task-queue".to_string(),
2282 arguments: vec![11u8.as_json_payload().unwrap()],
2283 workflow_run_timeout: Some(Duration::from_secs(10).try_into().unwrap()),
2284 workflow_task_timeout: Some(Duration::from_secs(3).try_into().unwrap()),
2285 memo,
2286 headers,
2287 search_attributes: Some(search_attributes),
2288 retry_policy: Some(RetryPolicy {
2289 maximum_attempts: 5,
2290 ..Default::default()
2291 }),
2292 versioning_intent: VersioningIntent::Compatible as i32,
2293 ..Default::default()
2294 }
2295 );
2296 }
2297
2298 #[test]
2299 fn continue_as_new_reports_serialization_errors() {
2300 #[derive(Debug)]
2301 struct FailingInput;
2302
2303 impl TemporalSerializable for FailingInput {
2304 fn to_payload(
2305 &self,
2306 _ctx: &temporalio_common::data_converters::SerializationContext<'_>,
2307 ) -> Result<Payload, temporalio_common::data_converters::PayloadConversionError>
2308 {
2309 Err(
2310 temporalio_common::data_converters::PayloadConversionError::EncodingError(
2311 std::io::Error::other("serialization failure").into(),
2312 ),
2313 )
2314 }
2315 }
2316
2317 impl TemporalDeserializable for FailingInput {
2318 fn from_payload(
2319 _ctx: &temporalio_common::data_converters::SerializationContext<'_>,
2320 _payload: Payload,
2321 ) -> Result<Self, temporalio_common::data_converters::PayloadConversionError>
2322 {
2323 unreachable!("test input is only serialized")
2324 }
2325 }
2326
2327 #[workflow]
2328 #[derive(Default)]
2329 struct FailingWorkflow;
2330
2331 #[workflow_methods]
2332 impl FailingWorkflow {
2333 #[run]
2334 async fn run(
2335 _ctx: &mut WorkflowContext<Self>,
2336 _input: FailingInput,
2337 ) -> crate::WorkflowResult<()> {
2338 unreachable!("test workflow run should not be polled")
2339 }
2340 }
2341
2342 let init = InitializeWorkflow {
2343 workflow_type: "failing-workflow".to_string(),
2344 ..Default::default()
2345 };
2346 let (_, cancelled_rx) = watch::channel(None);
2347 let (base, _cmd_rx) = BaseWorkflowContext::new(
2348 "default".to_string(),
2349 "orig-task-queue".to_string(),
2350 "run-id".to_string(),
2351 init,
2352 cancelled_rx,
2353 PayloadConverter::default(),
2354 );
2355 let ctx = WorkflowContext::from_base(base, Rc::new(RefCell::new(FailingWorkflow)));
2356
2357 let err = ctx
2358 .continue_as_new(&FailingInput, ContinueAsNewOptions::default())
2359 .expect_err("serialization errors should be surfaced");
2360
2361 let WorkflowTermination::Failed(err) = err else {
2362 panic!("expected failed termination, got {err:?}");
2363 };
2364 assert_eq!(err.to_string(), "Encoding error: serialization failure");
2365 }
2366}