Skip to main content

temporalio_sdk/
workflow_context.rs

1mod options;
2
3pub use options::{
4    ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, Signal,
5    SignalData, SignalWorkflowOptions, TimerOptions,
6};
7
8use crate::{
9    CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
10    CommandSubscribeChildWorkflowCompletion, NexusStartResult, RustWfCmd, SignalExternalWfResult,
11    SupportsCancelReason, TimerResult, UnblockEvent, Unblockable,
12    workflow_context::options::IntoWorkflowCommand,
13};
14use futures_util::{FutureExt, future::Shared, task::Context};
15use std::{
16    cell::{Ref, RefCell},
17    collections::HashMap,
18    future::{self, Future},
19    marker::PhantomData,
20    ops::{Deref, DerefMut},
21    pin::Pin,
22    rc::Rc,
23    sync::{
24        atomic::{AtomicBool, Ordering},
25        mpsc::{Receiver, Sender},
26    },
27    task::{Poll, Waker},
28    time::{Duration, SystemTime},
29};
30use temporalio_common::{
31    ActivityDefinition,
32    data_converters::{
33        GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
34        SerializationContextData, TemporalDeserializable,
35    },
36    protos::{
37        coresdk::{
38            activity_result::{ActivityResolution, activity_resolution},
39            child_workflow::ChildWorkflowResult,
40            common::NamespacedWorkflowExecution,
41            nexus::NexusOperationResult,
42            workflow_activation::{
43                InitializeWorkflow,
44                resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
45            },
46            workflow_commands::{
47                CancelChildWorkflowExecution, ModifyWorkflowProperties,
48                RequestCancelExternalWorkflowExecution, SetPatchMarker,
49                SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
50                WorkflowCommand, signal_external_workflow_execution as sig_we, workflow_command,
51            },
52        },
53        temporal::api::{
54            common::v1::{Memo, Payload, SearchAttributes},
55            failure::v1::Failure,
56            sdk::v1::UserMetadata,
57        },
58    },
59    worker::WorkerDeploymentVersion,
60};
61use tokio::sync::{oneshot, watch};
62
63/// Non-generic base context containing all workflow execution infrastructure.
64///
65/// This is used internally by futures and commands that don't need typed workflow state.
66#[derive(Clone)]
67pub struct BaseWorkflowContext {
68    inner: Rc<WorkflowContextInner>,
69}
70impl BaseWorkflowContext {
71    pub(crate) fn shared_mut(&self) -> impl DerefMut<Target = WorkflowContextSharedData> {
72        self.inner.shared.borrow_mut()
73    }
74
75    /// Create a read-only view of this context.
76    pub(crate) fn view(&self) -> WorkflowContextView {
77        WorkflowContextView::new(
78            self.inner.namespace.clone(),
79            self.inner.task_queue.clone(),
80            self.inner.run_id.clone(),
81            &self.inner.inital_information,
82        )
83    }
84}
85
86struct WorkflowContextInner {
87    namespace: String,
88    task_queue: String,
89    run_id: String,
90    inital_information: InitializeWorkflow,
91    chan: Sender<RustWfCmd>,
92    am_cancelled: watch::Receiver<Option<String>>,
93    shared: RefCell<WorkflowContextSharedData>,
94    seq_nums: RefCell<WfCtxProtectedDat>,
95    payload_converter: PayloadConverter,
96}
97
98/// Context provided to synchronous signal and update handlers.
99///
100/// This type provides all workflow context capabilities except `state()`, `state_mut()`,
101/// and `wait_condition()`. Those methods are not applicable in sync handler contexts.
102///
103/// Sync handlers receive `&mut self` directly, so they can reference and mutate workflow state without
104/// needing `state()`/`state_mut()`.
105pub struct SyncWorkflowContext<W> {
106    base: BaseWorkflowContext,
107    /// Headers from the current handler invocation (signal, update, etc.)
108    headers: Rc<HashMap<String, Payload>>,
109    _phantom: PhantomData<W>,
110}
111
112impl<W> Clone for SyncWorkflowContext<W> {
113    fn clone(&self) -> Self {
114        Self {
115            base: self.base.clone(),
116            headers: self.headers.clone(),
117            _phantom: PhantomData,
118        }
119    }
120}
121
122/// Used within workflows to issue commands, get info, etc.
123///
124/// The type parameter `W` represents the workflow type. This enables type-safe
125/// access to workflow state via `state_mut()` for mutations.
126pub struct WorkflowContext<W> {
127    sync: SyncWorkflowContext<W>,
128    /// The workflow instance
129    workflow_state: Rc<RefCell<W>>,
130    /// Wakers registered by `wait_condition` futures. Drained and woken on
131    /// every `state_mut` call so that waker-based combinators (e.g.
132    /// `FuturesOrdered`) re-poll the condition after state changes.
133    condition_wakers: Rc<RefCell<Vec<Waker>>>,
134}
135
136impl<W> Clone for WorkflowContext<W> {
137    fn clone(&self) -> Self {
138        Self {
139            sync: self.sync.clone(),
140            workflow_state: self.workflow_state.clone(),
141            condition_wakers: self.condition_wakers.clone(),
142        }
143    }
144}
145
146/// Read-only view of workflow context for use in init and query handlers.
147///
148/// This provides access to workflow information but cannot issue commands.
149#[derive(Clone, Debug)]
150#[non_exhaustive]
151pub struct WorkflowContextView {
152    /// The workflow's unique identifier
153    pub workflow_id: String,
154    /// The run id of this workflow execution
155    pub run_id: String,
156    /// The workflow type name
157    pub workflow_type: String,
158    /// The task queue this workflow is executing on
159    pub task_queue: String,
160    /// The namespace this workflow is executing in
161    pub namespace: String,
162
163    /// The current attempt number (starting from 1)
164    pub attempt: u32,
165    /// The run id of the very first execution in the chain
166    pub first_execution_run_id: String,
167    /// The run id of the previous execution if this is a continuation
168    pub continued_from_run_id: Option<String>,
169
170    /// When the workflow execution started
171    pub start_time: Option<SystemTime>,
172    /// Total workflow execution timeout including retries and continue as new
173    pub execution_timeout: Option<Duration>,
174    /// Timeout of a single workflow run
175    pub run_timeout: Option<Duration>,
176    /// Timeout of a single workflow task
177    pub task_timeout: Option<Duration>,
178
179    /// Information about the parent workflow, if this is a child workflow
180    pub parent: Option<ParentWorkflowInfo>,
181    /// Information about the root workflow in the execution chain
182    pub root: Option<RootWorkflowInfo>,
183
184    /// The workflow's retry policy
185    pub retry_policy: Option<temporalio_common::protos::temporal::api::common::v1::RetryPolicy>,
186    /// If this workflow runs on a cron schedule
187    pub cron_schedule: Option<String>,
188    /// User-defined memo
189    pub memo: Option<Memo>,
190    /// Initial search attributes
191    pub search_attributes: Option<SearchAttributes>,
192}
193
194/// Information about a parent workflow.
195#[derive(Clone, Debug)]
196#[non_exhaustive]
197pub struct ParentWorkflowInfo {
198    /// The parent workflow's unique identifier
199    pub workflow_id: String,
200    /// The parent workflow's run id
201    pub run_id: String,
202    /// The parent workflow's namespace
203    pub namespace: String,
204}
205
206/// Information about the root workflow in an execution chain.
207#[derive(Clone, Debug)]
208#[non_exhaustive]
209pub struct RootWorkflowInfo {
210    /// The root workflow's unique identifier
211    pub workflow_id: String,
212    /// The root workflow's run id
213    pub run_id: String,
214}
215
216impl WorkflowContextView {
217    /// Create a new view from workflow initialization data.
218    pub(crate) fn new(
219        namespace: String,
220        task_queue: String,
221        run_id: String,
222        init: &InitializeWorkflow,
223    ) -> Self {
224        let parent = init
225            .parent_workflow_info
226            .as_ref()
227            .map(|p| ParentWorkflowInfo {
228                workflow_id: p.workflow_id.clone(),
229                run_id: p.run_id.clone(),
230                namespace: p.namespace.clone(),
231            });
232
233        let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
234            workflow_id: r.workflow_id.clone(),
235            run_id: r.run_id.clone(),
236        });
237
238        let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
239            None
240        } else {
241            Some(init.continued_from_execution_run_id.clone())
242        };
243
244        let cron_schedule = if init.cron_schedule.is_empty() {
245            None
246        } else {
247            Some(init.cron_schedule.clone())
248        };
249
250        Self {
251            workflow_id: init.workflow_id.clone(),
252            run_id,
253            workflow_type: init.workflow_type.clone(),
254            task_queue,
255            namespace,
256            attempt: init.attempt as u32,
257            first_execution_run_id: init.first_execution_run_id.clone(),
258            continued_from_run_id,
259            start_time: init.start_time.and_then(|t| t.try_into().ok()),
260            execution_timeout: init
261                .workflow_execution_timeout
262                .and_then(|d| d.try_into().ok()),
263            run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
264            task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
265            parent,
266            root,
267            retry_policy: init.retry_policy.clone(),
268            cron_schedule,
269            memo: init.memo.clone(),
270            search_attributes: init.search_attributes.clone(),
271        }
272    }
273}
274
275/// Error type for activity execution outcomes.
276#[derive(Debug, thiserror::Error)]
277pub enum ActivityExecutionError {
278    /// The activity failed with the given failure details.
279    #[error("Activity failed: {}", .0.message)]
280    Failed(Box<Failure>),
281    /// The activity was cancelled.
282    #[error("Activity cancelled: {}", .0.message)]
283    Cancelled(Box<Failure>),
284    // TODO: Timed out variant
285    /// Failed to serialize input or deserialize result payload.
286    #[error("Payload conversion failed: {0}")]
287    Serialization(#[from] PayloadConversionError),
288}
289
290impl ActivityExecutionError {
291    /// Returns true if this error represents a timeout.
292    pub fn is_timeout(&self) -> bool {
293        match self {
294            ActivityExecutionError::Failed(f) => f.is_timeout().is_some(),
295            _ => false,
296        }
297    }
298}
299
300impl BaseWorkflowContext {
301    /// Create a new base context, returning the context itself and a receiver which outputs commands
302    /// sent from the workflow.
303    pub(crate) fn new(
304        namespace: String,
305        task_queue: String,
306        run_id: String,
307        init_workflow_job: InitializeWorkflow,
308        am_cancelled: watch::Receiver<Option<String>>,
309        payload_converter: PayloadConverter,
310    ) -> (Self, Receiver<RustWfCmd>) {
311        // The receiving side is non-async
312        let (chan, rx) = std::sync::mpsc::channel();
313        (
314            Self {
315                inner: Rc::new(WorkflowContextInner {
316                    namespace,
317                    task_queue,
318                    run_id,
319                    shared: RefCell::new(WorkflowContextSharedData {
320                        random_seed: init_workflow_job.randomness_seed,
321                        search_attributes: init_workflow_job
322                            .search_attributes
323                            .clone()
324                            .unwrap_or_default(),
325                        ..Default::default()
326                    }),
327                    inital_information: init_workflow_job,
328                    chan,
329                    am_cancelled,
330                    seq_nums: RefCell::new(WfCtxProtectedDat {
331                        next_timer_sequence_number: 1,
332                        next_activity_sequence_number: 1,
333                        next_child_workflow_sequence_number: 1,
334                        next_cancel_external_wf_sequence_number: 1,
335                        next_signal_external_wf_sequence_number: 1,
336                        next_nexus_op_sequence_number: 1,
337                    }),
338                    payload_converter,
339                }),
340            },
341            rx,
342        )
343    }
344
345    /// Buffer a command to be sent in the activation reply
346    pub(crate) fn send(&self, c: RustWfCmd) {
347        self.inner.chan.send(c).expect("command channel intact");
348    }
349
350    /// Cancel any cancellable operation by ID
351    fn cancel(&self, cancellable_id: CancellableID) {
352        self.send(RustWfCmd::Cancel(cancellable_id));
353    }
354
355    /// Request to create a timer
356    pub fn timer<T: Into<TimerOptions>>(
357        &self,
358        opts: T,
359    ) -> impl CancellableFuture<TimerResult> + use<T> {
360        let opts: TimerOptions = opts.into();
361        let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
362        let (cmd, unblocker) =
363            CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
364        self.send(
365            CommandCreateRequest {
366                cmd: WorkflowCommand {
367                    variant: Some(
368                        StartTimer {
369                            seq,
370                            start_to_fire_timeout: Some(
371                                opts.duration
372                                    .try_into()
373                                    .expect("Durations must fit into 64 bits"),
374                            ),
375                        }
376                        .into(),
377                    ),
378                    user_metadata: Some(UserMetadata {
379                        summary: opts.summary.map(|x| x.as_bytes().into()),
380                        details: None,
381                    }),
382                },
383                unblocker,
384            }
385            .into(),
386        );
387        cmd
388    }
389
390    /// Request to run an activity
391    pub fn start_activity<AD: ActivityDefinition>(
392        &self,
393        _activity: AD,
394        input: impl Into<AD::Input>,
395        mut opts: ActivityOptions,
396    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
397    where
398        AD::Output: TemporalDeserializable,
399    {
400        let input = input.into();
401        let ctx = SerializationContext {
402            data: &SerializationContextData::Workflow,
403            converter: &self.inner.payload_converter,
404        };
405        let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
406            Ok(p) => p,
407            Err(e) => {
408                return ActivityFut::eager(e.into());
409            }
410        };
411        let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
412        let (cmd, unblocker) =
413            CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
414        if opts.task_queue.is_none() {
415            opts.task_queue = Some(self.inner.task_queue.clone());
416        }
417        self.send(
418            CommandCreateRequest {
419                cmd: opts.into_command(AD::name().to_string(), payloads, seq),
420                unblocker,
421            }
422            .into(),
423        );
424        ActivityFut::running(cmd, self.inner.payload_converter.clone())
425    }
426
427    /// Request to run a local activity
428    pub fn start_local_activity<AD: ActivityDefinition>(
429        &self,
430        _activity: AD,
431        input: impl Into<AD::Input>,
432        opts: LocalActivityOptions,
433    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
434    where
435        AD::Output: TemporalDeserializable,
436    {
437        let input = input.into();
438        let ctx = SerializationContext {
439            data: &SerializationContextData::Workflow,
440            converter: &self.inner.payload_converter,
441        };
442        let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
443            Ok(p) => p,
444            Err(e) => {
445                return ActivityFut::eager(e.into());
446            }
447        };
448        ActivityFut::running(
449            LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
450            self.inner.payload_converter.clone(),
451        )
452    }
453
454    /// Request to run a local activity with no implementation of timer-backoff based retrying.
455    fn local_activity_no_timer_retry(
456        self,
457        activity_type: String,
458        arguments: Vec<Payload>,
459        opts: LocalActivityOptions,
460    ) -> impl CancellableFuture<ActivityResolution> {
461        let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
462        let (cmd, unblocker) =
463            CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
464        self.inner
465            .chan
466            .send(
467                CommandCreateRequest {
468                    cmd: opts.into_command(activity_type, arguments, seq),
469                    unblocker,
470                }
471                .into(),
472            )
473            .expect("command channel intact");
474        cmd
475    }
476
477    fn send_signal_wf(
478        self,
479        target: sig_we::Target,
480        signal: Signal,
481    ) -> impl CancellableFuture<SignalExternalWfResult> {
482        let seq = self
483            .inner
484            .seq_nums
485            .borrow_mut()
486            .next_signal_external_wf_seq();
487        let (cmd, unblocker) =
488            CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
489        self.send(
490            CommandCreateRequest {
491                cmd: WorkflowCommand {
492                    variant: Some(
493                        SignalExternalWorkflowExecution {
494                            seq,
495                            signal_name: signal.signal_name,
496                            args: signal.data.input,
497                            target: Some(target),
498                            headers: signal.data.headers,
499                        }
500                        .into(),
501                    ),
502                    user_metadata: None,
503                },
504                unblocker,
505            }
506            .into(),
507        );
508        cmd
509    }
510}
511
512impl<W> SyncWorkflowContext<W> {
513    /// Return the namespace the workflow is executing in
514    pub fn namespace(&self) -> &str {
515        &self.base.inner.namespace
516    }
517
518    /// Return the task queue the workflow is executing in
519    pub fn task_queue(&self) -> &str {
520        &self.base.inner.task_queue
521    }
522
523    /// Return the current time according to the workflow (which is not wall-clock time).
524    pub fn workflow_time(&self) -> Option<SystemTime> {
525        self.base.inner.shared.borrow().wf_time
526    }
527
528    /// Return the length of history so far at this point in the workflow
529    pub fn history_length(&self) -> u32 {
530        self.base.inner.shared.borrow().history_length
531    }
532
533    /// Return the deployment version, if any,  as it was when this point in the workflow was first
534    /// reached. If this code is being executed for the first time, return this Worker's deployment
535    /// version if it has one.
536    pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
537        self.base
538            .inner
539            .shared
540            .borrow()
541            .current_deployment_version
542            .clone()
543    }
544
545    /// Return current values for workflow search attributes
546    pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
547        Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
548    }
549
550    /// Return the workflow's randomness seed
551    pub fn random_seed(&self) -> u64 {
552        self.base.inner.shared.borrow().random_seed
553    }
554
555    /// Returns true if the current workflow task is happening under replay
556    pub fn is_replaying(&self) -> bool {
557        self.base.inner.shared.borrow().is_replaying
558    }
559
560    /// Returns the headers for the current handler invocation (signal, update, query, etc.).
561    ///
562    /// When called from within a signal handler, returns the headers that were sent with that
563    /// signal. When called from the main workflow run method, returns an empty map.
564    pub fn headers(&self) -> &HashMap<String, Payload> {
565        &self.headers
566    }
567
568    /// Returns the [PayloadConverter] currently used by the worker running this workflow.
569    pub fn payload_converter(&self) -> &PayloadConverter {
570        &self.base.inner.payload_converter
571    }
572
573    /// Return various information that the workflow was initialized with. Will eventually become
574    /// a proper non-proto workflow info struct.
575    pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
576        &self.base.inner.inital_information
577    }
578
579    /// A future that resolves if/when the workflow is cancelled, with the user provided cause
580    pub async fn cancelled(&self) -> String {
581        if let Some(s) = self.base.inner.am_cancelled.borrow().as_ref() {
582            return s.clone();
583        }
584        self.base
585            .inner
586            .am_cancelled
587            .clone()
588            .changed()
589            .await
590            .expect("Cancelled send half not dropped");
591        self.base
592            .inner
593            .am_cancelled
594            .borrow()
595            .as_ref()
596            .cloned()
597            .unwrap_or_default()
598    }
599
600    /// Request to create a timer
601    pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
602        self.base.timer(opts)
603    }
604
605    /// Request to run an activity
606    pub fn start_activity<AD: ActivityDefinition>(
607        &self,
608        activity: AD,
609        input: impl Into<AD::Input>,
610        opts: ActivityOptions,
611    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
612    where
613        AD::Output: TemporalDeserializable,
614    {
615        self.base.start_activity(activity, input, opts)
616    }
617
618    /// Request to run a local activity
619    pub fn start_local_activity<AD: ActivityDefinition>(
620        &self,
621        activity: AD,
622        input: impl Into<AD::Input>,
623        opts: LocalActivityOptions,
624    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
625    where
626        AD::Output: TemporalDeserializable,
627    {
628        self.base.start_local_activity(activity, input, opts)
629    }
630
631    /// Creates a child workflow stub with the provided options
632    pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
633        ChildWorkflow {
634            opts,
635            base_ctx: self.base.clone(),
636        }
637    }
638
639    /// Check (or record) that this workflow history was created with the provided patch
640    pub fn patched(&self, patch_id: &str) -> bool {
641        self.patch_impl(patch_id, false)
642    }
643
644    /// Record that this workflow history was created with the provided patch, and it is being
645    /// phased out.
646    pub fn deprecate_patch(&self, patch_id: &str) -> bool {
647        self.patch_impl(patch_id, true)
648    }
649
650    fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
651        self.base.send(
652            workflow_command::Variant::SetPatchMarker(SetPatchMarker {
653                patch_id: patch_id.to_string(),
654                deprecated,
655            })
656            .into(),
657        );
658        // See if we already know about the status of this change
659        if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
660            return *present;
661        }
662
663        // If we don't already know about the change, that means there is no marker in history,
664        // and we should return false if we are replaying
665        let res = !self.base.inner.shared.borrow().is_replaying;
666
667        self.base
668            .inner
669            .shared
670            .borrow_mut()
671            .changes
672            .insert(patch_id.to_string(), res);
673
674        res
675    }
676
677    /// Send a signal to an external workflow. May resolve as a failure if the signal didn't work
678    /// or was cancelled.
679    pub fn signal_workflow(
680        &self,
681        opts: impl Into<SignalWorkflowOptions>,
682    ) -> impl CancellableFuture<SignalExternalWfResult> {
683        let options: SignalWorkflowOptions = opts.into();
684        let target = sig_we::Target::WorkflowExecution(NamespacedWorkflowExecution {
685            namespace: self.base.inner.namespace.clone(),
686            workflow_id: options.workflow_id,
687            run_id: options.run_id.unwrap_or_default(),
688        });
689        self.base.clone().send_signal_wf(target, options.signal)
690    }
691
692    /// Add or create a set of search attributes
693    pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
694        self.base.send(RustWfCmd::NewNonblockingCmd(
695            workflow_command::Variant::UpsertWorkflowSearchAttributes(
696                UpsertWorkflowSearchAttributes {
697                    search_attributes: Some(SearchAttributes {
698                        indexed_fields: HashMap::from_iter(attr_iter),
699                    }),
700                },
701            ),
702        ))
703    }
704
705    /// Add or create a set of search attributes
706    pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
707        self.base.send(RustWfCmd::NewNonblockingCmd(
708            workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
709                upserted_memo: Some(Memo {
710                    fields: HashMap::from_iter(attr_iter),
711                }),
712            }),
713        ))
714    }
715
716    /// Force a workflow task failure (EX: in order to retry on non-sticky queue)
717    pub fn force_task_fail(&self, with: anyhow::Error) {
718        self.base.send(with.into());
719    }
720
721    /// Request the cancellation of an external workflow. May resolve as a failure if the workflow
722    /// was not found or the cancel was otherwise unsendable.
723    pub fn cancel_external(
724        &self,
725        target: NamespacedWorkflowExecution,
726        reason: String,
727    ) -> impl Future<Output = CancelExternalWfResult> {
728        let seq = self
729            .base
730            .inner
731            .seq_nums
732            .borrow_mut()
733            .next_cancel_external_wf_seq();
734        let (cmd, unblocker) = WFCommandFut::new();
735        self.base.send(
736            CommandCreateRequest {
737                cmd: WorkflowCommand {
738                    variant: Some(
739                        RequestCancelExternalWorkflowExecution {
740                            seq,
741                            workflow_execution: Some(target),
742                            reason,
743                        }
744                        .into(),
745                    ),
746                    user_metadata: None,
747                },
748                unblocker,
749            }
750            .into(),
751        );
752        cmd
753    }
754
755    /// Start a nexus operation
756    pub fn start_nexus_operation(
757        &self,
758        opts: NexusOperationOptions,
759    ) -> impl CancellableFuture<NexusStartResult> {
760        let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
761        let (result_future, unblocker) = WFCommandFut::new();
762        self.base
763            .send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker });
764        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
765            CancellableID::NexusOp(seq),
766            NexusUnblockData {
767                result_future: result_future.shared(),
768                schedule_seq: seq,
769                base_ctx: self.base.clone(),
770            },
771            self.base.clone(),
772        );
773        self.base.send(
774            CommandCreateRequest {
775                cmd: opts.into_command(seq),
776                unblocker,
777            }
778            .into(),
779        );
780        cmd
781    }
782
783    /// Create a read-only view of this context.
784    pub(crate) fn view(&self) -> WorkflowContextView {
785        self.base.view()
786    }
787}
788
789impl<W> WorkflowContext<W> {
790    /// Create a new wf context from a base context and workflow state.
791    pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
792        Self {
793            sync: SyncWorkflowContext {
794                base,
795                headers: Rc::new(HashMap::new()),
796                _phantom: PhantomData,
797            },
798            workflow_state,
799            condition_wakers: Rc::new(RefCell::new(Vec::new())),
800        }
801    }
802
803    /// Returns a new context with the specified headers set.
804    pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
805        Self {
806            sync: SyncWorkflowContext {
807                base: self.sync.base.clone(),
808                headers: Rc::new(headers),
809                _phantom: PhantomData,
810            },
811            workflow_state: self.workflow_state.clone(),
812            condition_wakers: self.condition_wakers.clone(),
813        }
814    }
815
816    /// Returns a [`SyncWorkflowContext`] extracted from this context.
817    pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
818        self.sync.clone()
819    }
820
821    // --- Delegated methods from SyncWorkflowContext ---
822
823    /// Return the namespace the workflow is executing in
824    pub fn namespace(&self) -> &str {
825        self.sync.namespace()
826    }
827
828    /// Return the task queue the workflow is executing in
829    pub fn task_queue(&self) -> &str {
830        self.sync.task_queue()
831    }
832
833    /// Return the current time according to the workflow (which is not wall-clock time).
834    pub fn workflow_time(&self) -> Option<SystemTime> {
835        self.sync.workflow_time()
836    }
837
838    /// Return the length of history so far at this point in the workflow
839    pub fn history_length(&self) -> u32 {
840        self.sync.history_length()
841    }
842
843    /// Return the deployment version, if any, as it was when this point in the workflow was first
844    /// reached. If this code is being executed for the first time, return this Worker's deployment
845    /// version if it has one.
846    pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
847        self.sync.current_deployment_version()
848    }
849
850    /// Return current values for workflow search attributes
851    pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
852        self.sync.search_attributes()
853    }
854
855    /// Return the workflow's randomness seed
856    pub fn random_seed(&self) -> u64 {
857        self.sync.random_seed()
858    }
859
860    /// Returns true if the current workflow task is happening under replay
861    pub fn is_replaying(&self) -> bool {
862        self.sync.is_replaying()
863    }
864
865    /// Returns the headers for the current handler invocation (signal, update, query, etc.).
866    pub fn headers(&self) -> &HashMap<String, Payload> {
867        self.sync.headers()
868    }
869
870    /// Returns the [PayloadConverter] currently used by the worker running this workflow.
871    pub fn payload_converter(&self) -> &PayloadConverter {
872        self.sync.payload_converter()
873    }
874
875    /// Return various information that the workflow was initialized with.
876    pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
877        self.sync.workflow_initial_info()
878    }
879
880    /// A future that resolves if/when the workflow is cancelled, with the user provided cause
881    pub async fn cancelled(&self) -> String {
882        self.sync.cancelled().await
883    }
884
885    /// Request to create a timer
886    pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
887        self.sync.timer(opts)
888    }
889
890    /// Request to run an activity
891    pub fn start_activity<AD: ActivityDefinition>(
892        &self,
893        activity: AD,
894        input: impl Into<AD::Input>,
895        opts: ActivityOptions,
896    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
897    where
898        AD::Output: TemporalDeserializable,
899    {
900        self.sync.start_activity(activity, input, opts)
901    }
902
903    /// Request to run a local activity
904    pub fn start_local_activity<AD: ActivityDefinition>(
905        &self,
906        activity: AD,
907        input: impl Into<AD::Input>,
908        opts: LocalActivityOptions,
909    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
910    where
911        AD::Output: TemporalDeserializable,
912    {
913        self.sync.start_local_activity(activity, input, opts)
914    }
915
916    /// Creates a child workflow stub with the provided options
917    pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
918        self.sync.child_workflow(opts)
919    }
920
921    /// Check (or record) that this workflow history was created with the provided patch
922    pub fn patched(&self, patch_id: &str) -> bool {
923        self.sync.patched(patch_id)
924    }
925
926    /// Record that this workflow history was created with the provided patch, and it is being
927    /// phased out.
928    pub fn deprecate_patch(&self, patch_id: &str) -> bool {
929        self.sync.deprecate_patch(patch_id)
930    }
931
932    /// Send a signal to an external workflow.
933    pub fn signal_workflow(
934        &self,
935        opts: impl Into<SignalWorkflowOptions>,
936    ) -> impl CancellableFuture<SignalExternalWfResult> {
937        self.sync.signal_workflow(opts)
938    }
939
940    /// Add or create a set of search attributes
941    pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
942        self.sync.upsert_search_attributes(attr_iter)
943    }
944
945    /// Add or create a set of memo fields
946    pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
947        self.sync.upsert_memo(attr_iter)
948    }
949
950    /// Force a workflow task failure (EX: in order to retry on non-sticky queue)
951    pub fn force_task_fail(&self, with: anyhow::Error) {
952        self.sync.force_task_fail(with)
953    }
954
955    /// Request the cancellation of an external workflow.
956    pub fn cancel_external(
957        &self,
958        target: NamespacedWorkflowExecution,
959        reason: String,
960    ) -> impl Future<Output = CancelExternalWfResult> {
961        self.sync.cancel_external(target, reason)
962    }
963
964    /// Start a nexus operation
965    pub fn start_nexus_operation(
966        &self,
967        opts: NexusOperationOptions,
968    ) -> impl CancellableFuture<NexusStartResult> {
969        self.sync.start_nexus_operation(opts)
970    }
971
972    /// Create a read-only view of this context.
973    pub(crate) fn view(&self) -> WorkflowContextView {
974        self.sync.view()
975    }
976
977    /// Access workflow state immutably via closure.
978    ///
979    /// The borrow is scoped to the closure and cannot escape, preventing
980    /// borrows from being held across await points.
981    pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
982        f(&*self.workflow_state.borrow())
983    }
984
985    /// Access workflow state mutably via closure.
986    ///
987    /// The borrow is scoped to the closure and cannot escape, preventing
988    /// borrows from being held across await points.
989    ///
990    /// After the mutation, all wakers registered by pending `wait_condition`
991    /// futures are woken so that waker-based combinators (e.g.
992    /// `FuturesOrdered`) re-poll them on the next pass.
993    pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
994        let result = f(&mut *self.workflow_state.borrow_mut());
995        for waker in self.condition_wakers.borrow_mut().drain(..) {
996            waker.wake();
997        }
998        result
999    }
1000
1001    /// Wait for some condition on workflow state to become true, yielding the workflow if not.
1002    ///
1003    /// The condition closure receives an immutable reference to the workflow state,
1004    /// which is borrowed only for the duration of each poll (not across await points).
1005    pub fn wait_condition<'a>(
1006        &'a self,
1007        mut condition: impl FnMut(&W) -> bool + 'a,
1008    ) -> impl Future<Output = ()> + 'a {
1009        future::poll_fn(move |cx: &mut Context<'_>| {
1010            if condition(&*self.workflow_state.borrow()) {
1011                Poll::Ready(())
1012            } else {
1013                self.condition_wakers.borrow_mut().push(cx.waker().clone());
1014                Poll::Pending
1015            }
1016        })
1017    }
1018}
1019
1020struct WfCtxProtectedDat {
1021    next_timer_sequence_number: u32,
1022    next_activity_sequence_number: u32,
1023    next_child_workflow_sequence_number: u32,
1024    next_cancel_external_wf_sequence_number: u32,
1025    next_signal_external_wf_sequence_number: u32,
1026    next_nexus_op_sequence_number: u32,
1027}
1028
1029impl WfCtxProtectedDat {
1030    fn next_timer_seq(&mut self) -> u32 {
1031        let seq = self.next_timer_sequence_number;
1032        self.next_timer_sequence_number += 1;
1033        seq
1034    }
1035    fn next_activity_seq(&mut self) -> u32 {
1036        let seq = self.next_activity_sequence_number;
1037        self.next_activity_sequence_number += 1;
1038        seq
1039    }
1040    fn next_child_workflow_seq(&mut self) -> u32 {
1041        let seq = self.next_child_workflow_sequence_number;
1042        self.next_child_workflow_sequence_number += 1;
1043        seq
1044    }
1045    fn next_cancel_external_wf_seq(&mut self) -> u32 {
1046        let seq = self.next_cancel_external_wf_sequence_number;
1047        self.next_cancel_external_wf_sequence_number += 1;
1048        seq
1049    }
1050    fn next_signal_external_wf_seq(&mut self) -> u32 {
1051        let seq = self.next_signal_external_wf_sequence_number;
1052        self.next_signal_external_wf_sequence_number += 1;
1053        seq
1054    }
1055    fn next_nexus_op_seq(&mut self) -> u32 {
1056        let seq = self.next_nexus_op_sequence_number;
1057        self.next_nexus_op_sequence_number += 1;
1058        seq
1059    }
1060}
1061
1062#[derive(Clone, Debug, Default)]
1063pub(crate) struct WorkflowContextSharedData {
1064    /// Maps change ids -> resolved status
1065    pub(crate) changes: HashMap<String, bool>,
1066    pub(crate) is_replaying: bool,
1067    pub(crate) wf_time: Option<SystemTime>,
1068    pub(crate) history_length: u32,
1069    pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
1070    pub(crate) search_attributes: SearchAttributes,
1071    pub(crate) random_seed: u64,
1072}
1073
1074/// A Future that can be cancelled.
1075/// Used in the prototype SDK for cancelling operations like timers and activities.
1076pub trait CancellableFuture<T>: Future<Output = T> {
1077    /// Cancel this Future
1078    fn cancel(&self);
1079}
1080
1081/// A Future that can be cancelled with a reason
1082pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
1083    /// Cancel this Future with a reason
1084    fn cancel_with_reason(&self, reason: String);
1085}
1086
1087struct WFCommandFut<T, D> {
1088    _unused: PhantomData<T>,
1089    result_rx: oneshot::Receiver<UnblockEvent>,
1090    other_dat: Option<D>,
1091}
1092impl<T> WFCommandFut<T, ()> {
1093    fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
1094        Self::new_with_dat(())
1095    }
1096}
1097
1098impl<T, D> WFCommandFut<T, D> {
1099    fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
1100        let (tx, rx) = oneshot::channel();
1101        (
1102            Self {
1103                _unused: PhantomData,
1104                result_rx: rx,
1105                other_dat: Some(other_dat),
1106            },
1107            tx,
1108        )
1109    }
1110}
1111
1112impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1113impl<T, D> Future for WFCommandFut<T, D>
1114where
1115    T: Unblockable<OtherDat = D>,
1116{
1117    type Output = T;
1118
1119    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1120        self.result_rx.poll_unpin(cx).map(|x| {
1121            // SAFETY: Because we can only enter this section once the future has resolved, we
1122            // know it will never be polled again, therefore consuming the option is OK.
1123            let od = self
1124                .other_dat
1125                .take()
1126                .expect("Other data must exist when resolving command future");
1127            Unblockable::unblock(x.unwrap(), od)
1128        })
1129    }
1130}
1131
1132struct CancellableWFCommandFut<T, D, ID = CancellableID> {
1133    cmd_fut: WFCommandFut<T, D>,
1134    cancellable_id: ID,
1135    base_ctx: BaseWorkflowContext,
1136}
1137impl<T, ID> CancellableWFCommandFut<T, (), ID> {
1138    fn new(
1139        cancellable_id: ID,
1140        base_ctx: BaseWorkflowContext,
1141    ) -> (Self, oneshot::Sender<UnblockEvent>) {
1142        Self::new_with_dat(cancellable_id, (), base_ctx)
1143    }
1144}
1145impl<T, D, ID> CancellableWFCommandFut<T, D, ID> {
1146    fn new_with_dat(
1147        cancellable_id: ID,
1148        other_dat: D,
1149        base_ctx: BaseWorkflowContext,
1150    ) -> (Self, oneshot::Sender<UnblockEvent>) {
1151        let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
1152        (
1153            Self {
1154                cmd_fut,
1155                cancellable_id,
1156                base_ctx,
1157            },
1158            sender,
1159        )
1160    }
1161}
1162impl<T, D, ID> Unpin for CancellableWFCommandFut<T, D, ID> where T: Unblockable<OtherDat = D> {}
1163impl<T, D, ID> Future for CancellableWFCommandFut<T, D, ID>
1164where
1165    T: Unblockable<OtherDat = D>,
1166{
1167    type Output = T;
1168
1169    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1170        self.cmd_fut.poll_unpin(cx)
1171    }
1172}
1173
1174impl<T, D, ID> CancellableFuture<T> for CancellableWFCommandFut<T, D, ID>
1175where
1176    T: Unblockable<OtherDat = D>,
1177    ID: Clone + Into<CancellableID>,
1178{
1179    fn cancel(&self) {
1180        self.base_ctx.cancel(self.cancellable_id.clone().into());
1181    }
1182}
1183impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D, CancellableIDWithReason>
1184where
1185    T: Unblockable<OtherDat = D>,
1186{
1187    fn cancel_with_reason(&self, reason: String) {
1188        let new_id = self.cancellable_id.clone().with_reason(reason);
1189        self.base_ctx.cancel(new_id);
1190    }
1191}
1192
1193struct LATimerBackoffFut {
1194    la_opts: LocalActivityOptions,
1195    activity_type: String,
1196    arguments: Vec<Payload>,
1197    current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
1198    timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
1199    base_ctx: BaseWorkflowContext,
1200    next_attempt: u32,
1201    next_sched_time: Option<prost_types::Timestamp>,
1202    did_cancel: AtomicBool,
1203}
1204impl LATimerBackoffFut {
1205    pub(crate) fn new(
1206        activity_type: String,
1207        arguments: Vec<Payload>,
1208        opts: LocalActivityOptions,
1209        base_ctx: BaseWorkflowContext,
1210    ) -> Self {
1211        let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
1212            activity_type.clone(),
1213            arguments.clone(),
1214            opts.clone(),
1215        ));
1216        Self {
1217            la_opts: opts,
1218            activity_type,
1219            arguments,
1220            current_fut,
1221            timer_fut: None,
1222            base_ctx,
1223            next_attempt: 1,
1224            next_sched_time: None,
1225            did_cancel: AtomicBool::new(false),
1226        }
1227    }
1228}
1229impl Unpin for LATimerBackoffFut {}
1230impl Future for LATimerBackoffFut {
1231    type Output = ActivityResolution;
1232
1233    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1234        // If the timer exists, wait for it first
1235        if let Some(tf) = self.timer_fut.as_mut() {
1236            return match tf.poll_unpin(cx) {
1237                Poll::Ready(tr) => {
1238                    self.timer_fut = None;
1239                    // Schedule next LA if this timer wasn't cancelled
1240                    if let TimerResult::Fired = tr {
1241                        let mut opts = self.la_opts.clone();
1242                        opts.attempt = Some(self.next_attempt);
1243                        opts.original_schedule_time
1244                            .clone_from(&self.next_sched_time);
1245                        self.current_fut =
1246                            Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
1247                                self.activity_type.clone(),
1248                                self.arguments.clone(),
1249                                opts,
1250                            ));
1251                        Poll::Pending
1252                    } else {
1253                        Poll::Ready(ActivityResolution {
1254                            status: Some(
1255                                activity_resolution::Status::Cancelled(Default::default()),
1256                            ),
1257                        })
1258                    }
1259                }
1260                Poll::Pending => Poll::Pending,
1261            };
1262        }
1263        let poll_res = self.current_fut.poll_unpin(cx);
1264        if let Poll::Ready(ref r) = poll_res
1265            && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
1266        {
1267            // If we've already said we want to cancel, don't schedule the backoff timer. Just
1268            // return cancel status. This can happen if cancel comes after the LA says it wants
1269            // to back off but before we have scheduled the timer.
1270            if self.did_cancel.load(Ordering::Acquire) {
1271                return Poll::Ready(ActivityResolution {
1272                    status: Some(activity_resolution::Status::Cancelled(Default::default())),
1273                });
1274            }
1275
1276            let timer_f = self.base_ctx.timer::<Duration>(
1277                b.backoff_duration
1278                    .expect("Duration is set")
1279                    .try_into()
1280                    .expect("duration converts ok"),
1281            );
1282            self.timer_fut = Some(Box::pin(timer_f));
1283            self.next_attempt = b.attempt;
1284            self.next_sched_time.clone_from(&b.original_schedule_time);
1285            return Poll::Pending;
1286        }
1287        poll_res
1288    }
1289}
1290impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
1291    fn cancel(&self) {
1292        self.did_cancel.store(true, Ordering::Release);
1293        if let Some(tf) = self.timer_fut.as_ref() {
1294            tf.cancel();
1295        }
1296        self.current_fut.cancel();
1297    }
1298}
1299
1300/// Future for activity results. Either an immediate error or a running activity.
1301enum ActivityFut<F, Output> {
1302    /// Immediate error (e.g., input serialization failure). Resolves on first poll.
1303    Errored {
1304        error: Option<ActivityExecutionError>,
1305        _phantom: PhantomData<Output>,
1306    },
1307    /// Running activity that will deserialize output on completion.
1308    Running {
1309        inner: F,
1310        payload_converter: PayloadConverter,
1311        _phantom: PhantomData<Output>,
1312    },
1313}
1314
1315impl<F, Output> ActivityFut<F, Output> {
1316    fn eager(err: ActivityExecutionError) -> Self {
1317        Self::Errored {
1318            error: Some(err),
1319            _phantom: PhantomData,
1320        }
1321    }
1322
1323    fn running(inner: F, payload_converter: PayloadConverter) -> Self {
1324        Self::Running {
1325            inner,
1326            payload_converter,
1327            _phantom: PhantomData,
1328        }
1329    }
1330}
1331
1332impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
1333
1334impl<F, Output> Future for ActivityFut<F, Output>
1335where
1336    F: Future<Output = ActivityResolution> + Unpin,
1337    Output: TemporalDeserializable + 'static,
1338{
1339    type Output = Result<Output, ActivityExecutionError>;
1340
1341    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1342        match self.get_mut() {
1343            ActivityFut::Errored { error, .. } => {
1344                Poll::Ready(Err(error.take().expect("polled after completion")))
1345            }
1346            ActivityFut::Running {
1347                inner,
1348                payload_converter,
1349                ..
1350            } => match Pin::new(inner).poll(cx) {
1351                Poll::Pending => Poll::Pending,
1352                Poll::Ready(resolution) => Poll::Ready({
1353                    let status = resolution.status.ok_or_else(|| {
1354                        ActivityExecutionError::Failed(Box::new(Failure {
1355                            message: "Activity completed without a status".to_string(),
1356                            ..Default::default()
1357                        }))
1358                    })?;
1359
1360                    match status {
1361                        activity_resolution::Status::Completed(success) => {
1362                            let payload = success.result.unwrap_or_default();
1363                            let ctx = SerializationContext {
1364                                data: &SerializationContextData::Workflow,
1365                                converter: payload_converter,
1366                            };
1367                            payload_converter
1368                                .from_payload::<Output>(&ctx, payload)
1369                                .map_err(ActivityExecutionError::Serialization)
1370                        }
1371                        activity_resolution::Status::Failed(f) => Err(
1372                            ActivityExecutionError::Failed(Box::new(f.failure.unwrap_or_default())),
1373                        ),
1374                        activity_resolution::Status::Cancelled(c) => {
1375                            Err(ActivityExecutionError::Cancelled(Box::new(
1376                                c.failure.unwrap_or_default(),
1377                            )))
1378                        }
1379                        activity_resolution::Status::Backoff(_) => {
1380                            panic!("DoBackoff should be handled by LATimerBackoffFut")
1381                        }
1382                    }
1383                }),
1384            },
1385        }
1386    }
1387}
1388
1389impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
1390where
1391    F: CancellableFuture<ActivityResolution> + Unpin,
1392    Output: TemporalDeserializable + 'static,
1393{
1394    fn cancel(&self) {
1395        match self {
1396            ActivityFut::Errored { .. } => {}
1397            ActivityFut::Running { inner, .. } => inner.cancel(),
1398        }
1399    }
1400}
1401
1402/// A stub representing an unstarted child workflow.
1403#[derive(Clone, derive_more::Debug)]
1404pub struct ChildWorkflow {
1405    opts: ChildWorkflowOptions,
1406    #[debug(skip)]
1407    base_ctx: BaseWorkflowContext,
1408}
1409
1410pub(crate) struct ChildWfCommon {
1411    workflow_id: String,
1412    result_future: CancellableWFCommandFut<ChildWorkflowResult, (), CancellableIDWithReason>,
1413    base_ctx: BaseWorkflowContext,
1414}
1415
1416/// Child workflow in pending state
1417#[derive(derive_more::Debug)]
1418pub struct PendingChildWorkflow {
1419    /// The status of the child workflow start
1420    pub status: ChildWorkflowStartStatus,
1421    #[debug(skip)]
1422    pub(crate) common: ChildWfCommon,
1423}
1424
1425impl PendingChildWorkflow {
1426    /// Returns `None` if the child did not start successfully. The returned [StartedChildWorkflow]
1427    /// can be used to wait on, signal, or cancel the child workflow.
1428    pub fn into_started(self) -> Option<StartedChildWorkflow> {
1429        match self.status {
1430            ChildWorkflowStartStatus::Succeeded(s) => Some(StartedChildWorkflow {
1431                run_id: s.run_id,
1432                common: self.common,
1433            }),
1434            _ => None,
1435        }
1436    }
1437}
1438
1439/// Child workflow in started state
1440#[derive(derive_more::Debug)]
1441pub struct StartedChildWorkflow {
1442    /// Run ID of the child workflow
1443    pub run_id: String,
1444    #[debug(skip)]
1445    common: ChildWfCommon,
1446}
1447
1448impl ChildWorkflow {
1449    /// Start the child workflow, the returned Future is cancellable.
1450    pub fn start(self) -> impl CancellableFutureWithReason<PendingChildWorkflow> {
1451        let child_seq = self
1452            .base_ctx
1453            .inner
1454            .seq_nums
1455            .borrow_mut()
1456            .next_child_workflow_seq();
1457        // Immediately create the command/future for the result, otherwise if the user does
1458        // not await the result until *after* we receive an activation for it, there will be nothing
1459        // to match when unblocking.
1460        let cancel_seq = self
1461            .base_ctx
1462            .inner
1463            .seq_nums
1464            .borrow_mut()
1465            .next_cancel_external_wf_seq();
1466        let (result_cmd, unblocker) = CancellableWFCommandFut::new(
1467            CancellableIDWithReason::ExternalWorkflow {
1468                seqnum: cancel_seq,
1469                execution: NamespacedWorkflowExecution {
1470                    workflow_id: self.opts.workflow_id.clone(),
1471                    ..Default::default()
1472                },
1473            },
1474            self.base_ctx.clone(),
1475        );
1476        self.base_ctx.send(
1477            CommandSubscribeChildWorkflowCompletion {
1478                seq: child_seq,
1479                unblocker,
1480            }
1481            .into(),
1482        );
1483
1484        let common = ChildWfCommon {
1485            workflow_id: self.opts.workflow_id.clone(),
1486            result_future: result_cmd,
1487            base_ctx: self.base_ctx.clone(),
1488        };
1489
1490        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
1491            CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
1492            common,
1493            self.base_ctx.clone(),
1494        );
1495        self.base_ctx.send(
1496            CommandCreateRequest {
1497                cmd: self.opts.into_command(child_seq),
1498                unblocker,
1499            }
1500            .into(),
1501        );
1502
1503        cmd
1504    }
1505}
1506
1507impl StartedChildWorkflow {
1508    /// Consumes self and returns a future that will wait until completion of this child workflow
1509    /// execution
1510    pub fn result(self) -> impl CancellableFutureWithReason<ChildWorkflowResult> {
1511        self.common.result_future
1512    }
1513
1514    /// Cancel the child workflow
1515    pub fn cancel(&self, reason: String) {
1516        self.common.base_ctx.send(RustWfCmd::NewNonblockingCmd(
1517            CancelChildWorkflowExecution {
1518                child_workflow_seq: self.common.result_future.cancellable_id.seq_num(),
1519                reason,
1520            }
1521            .into(),
1522        ));
1523    }
1524
1525    /// Signal the child workflow
1526    pub fn signal<S: Into<Signal>>(
1527        &self,
1528        data: S,
1529    ) -> impl CancellableFuture<SignalExternalWfResult> + 'static {
1530        let target = sig_we::Target::ChildWorkflowId(self.common.workflow_id.clone());
1531        self.common
1532            .base_ctx
1533            .clone()
1534            .send_signal_wf(target, data.into())
1535    }
1536}
1537
1538#[derive(derive_more::Debug)]
1539#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
1540pub struct StartedNexusOperation {
1541    /// The operation token, if the operation started asynchronously
1542    pub operation_token: Option<String>,
1543    pub(crate) unblock_dat: NexusUnblockData,
1544}
1545
1546pub(crate) struct NexusUnblockData {
1547    result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
1548    schedule_seq: u32,
1549    base_ctx: BaseWorkflowContext,
1550}
1551
1552impl StartedNexusOperation {
1553    pub async fn result(&self) -> NexusOperationResult {
1554        self.unblock_dat.result_future.clone().await
1555    }
1556
1557    pub fn cancel(&self) {
1558        self.unblock_dat
1559            .base_ctx
1560            .cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
1561    }
1562}