Skip to main content

temporalio_sdk/
workflow_context.rs

1mod options;
2
3pub use options::{
4    ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, Signal,
5    SignalData, SignalWorkflowOptions, TimerOptions,
6};
7
8use crate::{
9    CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
10    CommandSubscribeChildWorkflowCompletion, NexusStartResult, RustWfCmd, SignalExternalWfResult,
11    SupportsCancelReason, TimerResult, UnblockEvent, Unblockable,
12    workflow_context::options::IntoWorkflowCommand,
13};
14use futures_util::{
15    FutureExt,
16    future::{FusedFuture, Shared},
17    task::Context,
18};
19use std::{
20    cell::{Ref, RefCell},
21    collections::HashMap,
22    future::{self, Future},
23    marker::PhantomData,
24    ops::{Deref, DerefMut},
25    pin::Pin,
26    rc::Rc,
27    sync::{
28        atomic::{AtomicBool, Ordering},
29        mpsc::{Receiver, Sender},
30    },
31    task::{Poll, Waker},
32    time::{Duration, SystemTime},
33};
34use temporalio_common::{
35    ActivityDefinition,
36    data_converters::{
37        GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
38        SerializationContextData, TemporalDeserializable,
39    },
40    protos::{
41        coresdk::{
42            activity_result::{ActivityResolution, activity_resolution},
43            child_workflow::ChildWorkflowResult,
44            common::NamespacedWorkflowExecution,
45            nexus::NexusOperationResult,
46            workflow_activation::{
47                InitializeWorkflow,
48                resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
49            },
50            workflow_commands::{
51                CancelChildWorkflowExecution, ModifyWorkflowProperties,
52                RequestCancelExternalWorkflowExecution, SetPatchMarker,
53                SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
54                WorkflowCommand, signal_external_workflow_execution as sig_we, workflow_command,
55            },
56        },
57        temporal::api::{
58            common::v1::{Memo, Payload, SearchAttributes},
59            failure::v1::Failure,
60            sdk::v1::UserMetadata,
61        },
62    },
63    worker::WorkerDeploymentVersion,
64};
65use tokio::sync::{oneshot, watch};
66
67/// Non-generic base context containing all workflow execution infrastructure.
68///
69/// This is used internally by futures and commands that don't need typed workflow state.
70#[derive(Clone)]
71pub struct BaseWorkflowContext {
72    inner: Rc<WorkflowContextInner>,
73}
74impl BaseWorkflowContext {
75    pub(crate) fn shared_mut(&self) -> impl DerefMut<Target = WorkflowContextSharedData> {
76        self.inner.shared.borrow_mut()
77    }
78
79    /// Create a read-only view of this context.
80    pub(crate) fn view(&self) -> WorkflowContextView {
81        WorkflowContextView::new(
82            self.inner.namespace.clone(),
83            self.inner.task_queue.clone(),
84            self.inner.run_id.clone(),
85            &self.inner.inital_information,
86        )
87    }
88}
89
90struct WorkflowContextInner {
91    namespace: String,
92    task_queue: String,
93    run_id: String,
94    inital_information: InitializeWorkflow,
95    chan: Sender<RustWfCmd>,
96    am_cancelled: watch::Receiver<Option<String>>,
97    shared: RefCell<WorkflowContextSharedData>,
98    seq_nums: RefCell<WfCtxProtectedDat>,
99    payload_converter: PayloadConverter,
100}
101
102/// Context provided to synchronous signal and update handlers.
103///
104/// This type provides all workflow context capabilities except `state()`, `state_mut()`,
105/// and `wait_condition()`. Those methods are not applicable in sync handler contexts.
106///
107/// Sync handlers receive `&mut self` directly, so they can reference and mutate workflow state without
108/// needing `state()`/`state_mut()`.
109pub struct SyncWorkflowContext<W> {
110    base: BaseWorkflowContext,
111    /// Headers from the current handler invocation (signal, update, etc.)
112    headers: Rc<HashMap<String, Payload>>,
113    _phantom: PhantomData<W>,
114}
115
116impl<W> Clone for SyncWorkflowContext<W> {
117    fn clone(&self) -> Self {
118        Self {
119            base: self.base.clone(),
120            headers: self.headers.clone(),
121            _phantom: PhantomData,
122        }
123    }
124}
125
126/// Used within workflows to issue commands, get info, etc.
127///
128/// The type parameter `W` represents the workflow type. This enables type-safe
129/// access to workflow state via `state_mut()` for mutations.
130pub struct WorkflowContext<W> {
131    sync: SyncWorkflowContext<W>,
132    /// The workflow instance
133    workflow_state: Rc<RefCell<W>>,
134    /// Wakers registered by `wait_condition` futures. Drained and woken on
135    /// every `state_mut` call so that waker-based combinators (e.g.
136    /// `FuturesOrdered`) re-poll the condition after state changes.
137    condition_wakers: Rc<RefCell<Vec<Waker>>>,
138}
139
140impl<W> Clone for WorkflowContext<W> {
141    fn clone(&self) -> Self {
142        Self {
143            sync: self.sync.clone(),
144            workflow_state: self.workflow_state.clone(),
145            condition_wakers: self.condition_wakers.clone(),
146        }
147    }
148}
149
150/// Read-only view of workflow context for use in init and query handlers.
151///
152/// This provides access to workflow information but cannot issue commands.
153#[derive(Clone, Debug)]
154#[non_exhaustive]
155pub struct WorkflowContextView {
156    /// The workflow's unique identifier
157    pub workflow_id: String,
158    /// The run id of this workflow execution
159    pub run_id: String,
160    /// The workflow type name
161    pub workflow_type: String,
162    /// The task queue this workflow is executing on
163    pub task_queue: String,
164    /// The namespace this workflow is executing in
165    pub namespace: String,
166
167    /// The current attempt number (starting from 1)
168    pub attempt: u32,
169    /// The run id of the very first execution in the chain
170    pub first_execution_run_id: String,
171    /// The run id of the previous execution if this is a continuation
172    pub continued_from_run_id: Option<String>,
173
174    /// When the workflow execution started
175    pub start_time: Option<SystemTime>,
176    /// Total workflow execution timeout including retries and continue as new
177    pub execution_timeout: Option<Duration>,
178    /// Timeout of a single workflow run
179    pub run_timeout: Option<Duration>,
180    /// Timeout of a single workflow task
181    pub task_timeout: Option<Duration>,
182
183    /// Information about the parent workflow, if this is a child workflow
184    pub parent: Option<ParentWorkflowInfo>,
185    /// Information about the root workflow in the execution chain
186    pub root: Option<RootWorkflowInfo>,
187
188    /// The workflow's retry policy
189    pub retry_policy: Option<temporalio_common::protos::temporal::api::common::v1::RetryPolicy>,
190    /// If this workflow runs on a cron schedule
191    pub cron_schedule: Option<String>,
192    /// User-defined memo
193    pub memo: Option<Memo>,
194    /// Initial search attributes
195    pub search_attributes: Option<SearchAttributes>,
196}
197
198/// Information about a parent workflow.
199#[derive(Clone, Debug)]
200#[non_exhaustive]
201pub struct ParentWorkflowInfo {
202    /// The parent workflow's unique identifier
203    pub workflow_id: String,
204    /// The parent workflow's run id
205    pub run_id: String,
206    /// The parent workflow's namespace
207    pub namespace: String,
208}
209
210/// Information about the root workflow in an execution chain.
211#[derive(Clone, Debug)]
212#[non_exhaustive]
213pub struct RootWorkflowInfo {
214    /// The root workflow's unique identifier
215    pub workflow_id: String,
216    /// The root workflow's run id
217    pub run_id: String,
218}
219
220impl WorkflowContextView {
221    /// Create a new view from workflow initialization data.
222    pub(crate) fn new(
223        namespace: String,
224        task_queue: String,
225        run_id: String,
226        init: &InitializeWorkflow,
227    ) -> Self {
228        let parent = init
229            .parent_workflow_info
230            .as_ref()
231            .map(|p| ParentWorkflowInfo {
232                workflow_id: p.workflow_id.clone(),
233                run_id: p.run_id.clone(),
234                namespace: p.namespace.clone(),
235            });
236
237        let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
238            workflow_id: r.workflow_id.clone(),
239            run_id: r.run_id.clone(),
240        });
241
242        let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
243            None
244        } else {
245            Some(init.continued_from_execution_run_id.clone())
246        };
247
248        let cron_schedule = if init.cron_schedule.is_empty() {
249            None
250        } else {
251            Some(init.cron_schedule.clone())
252        };
253
254        Self {
255            workflow_id: init.workflow_id.clone(),
256            run_id,
257            workflow_type: init.workflow_type.clone(),
258            task_queue,
259            namespace,
260            attempt: init.attempt as u32,
261            first_execution_run_id: init.first_execution_run_id.clone(),
262            continued_from_run_id,
263            start_time: init.start_time.and_then(|t| t.try_into().ok()),
264            execution_timeout: init
265                .workflow_execution_timeout
266                .and_then(|d| d.try_into().ok()),
267            run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
268            task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
269            parent,
270            root,
271            retry_policy: init.retry_policy.clone(),
272            cron_schedule,
273            memo: init.memo.clone(),
274            search_attributes: init.search_attributes.clone(),
275        }
276    }
277}
278
279/// Error type for activity execution outcomes.
280#[derive(Debug, thiserror::Error)]
281pub enum ActivityExecutionError {
282    /// The activity failed with the given failure details.
283    #[error("Activity failed: {}", .0.message)]
284    Failed(Box<Failure>),
285    /// The activity was cancelled.
286    #[error("Activity cancelled: {}", .0.message)]
287    Cancelled(Box<Failure>),
288    // TODO: Timed out variant
289    /// Failed to serialize input or deserialize result payload.
290    #[error("Payload conversion failed: {0}")]
291    Serialization(#[from] PayloadConversionError),
292}
293
294impl ActivityExecutionError {
295    /// Returns true if this error represents a timeout.
296    pub fn is_timeout(&self) -> bool {
297        match self {
298            ActivityExecutionError::Failed(f) => f.is_timeout().is_some(),
299            _ => false,
300        }
301    }
302}
303
304impl BaseWorkflowContext {
305    /// Create a new base context, returning the context itself and a receiver which outputs commands
306    /// sent from the workflow.
307    pub(crate) fn new(
308        namespace: String,
309        task_queue: String,
310        run_id: String,
311        init_workflow_job: InitializeWorkflow,
312        am_cancelled: watch::Receiver<Option<String>>,
313        payload_converter: PayloadConverter,
314    ) -> (Self, Receiver<RustWfCmd>) {
315        // The receiving side is non-async
316        let (chan, rx) = std::sync::mpsc::channel();
317        (
318            Self {
319                inner: Rc::new(WorkflowContextInner {
320                    namespace,
321                    task_queue,
322                    run_id,
323                    shared: RefCell::new(WorkflowContextSharedData {
324                        random_seed: init_workflow_job.randomness_seed,
325                        search_attributes: init_workflow_job
326                            .search_attributes
327                            .clone()
328                            .unwrap_or_default(),
329                        ..Default::default()
330                    }),
331                    inital_information: init_workflow_job,
332                    chan,
333                    am_cancelled,
334                    seq_nums: RefCell::new(WfCtxProtectedDat {
335                        next_timer_sequence_number: 1,
336                        next_activity_sequence_number: 1,
337                        next_child_workflow_sequence_number: 1,
338                        next_cancel_external_wf_sequence_number: 1,
339                        next_signal_external_wf_sequence_number: 1,
340                        next_nexus_op_sequence_number: 1,
341                    }),
342                    payload_converter,
343                }),
344            },
345            rx,
346        )
347    }
348
349    /// Buffer a command to be sent in the activation reply
350    pub(crate) fn send(&self, c: RustWfCmd) {
351        self.inner.chan.send(c).expect("command channel intact");
352    }
353
354    /// Cancel any cancellable operation by ID
355    fn cancel(&self, cancellable_id: CancellableID) {
356        self.send(RustWfCmd::Cancel(cancellable_id));
357    }
358
359    /// Request to create a timer
360    pub fn timer<T: Into<TimerOptions>>(
361        &self,
362        opts: T,
363    ) -> impl CancellableFuture<TimerResult> + use<T> {
364        let opts: TimerOptions = opts.into();
365        let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
366        let (cmd, unblocker) =
367            CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
368        self.send(
369            CommandCreateRequest {
370                cmd: WorkflowCommand {
371                    variant: Some(
372                        StartTimer {
373                            seq,
374                            start_to_fire_timeout: Some(
375                                opts.duration
376                                    .try_into()
377                                    .expect("Durations must fit into 64 bits"),
378                            ),
379                        }
380                        .into(),
381                    ),
382                    user_metadata: Some(UserMetadata {
383                        summary: opts.summary.map(|x| x.as_bytes().into()),
384                        details: None,
385                    }),
386                },
387                unblocker,
388            }
389            .into(),
390        );
391        cmd
392    }
393
394    /// Request to run an activity
395    pub fn start_activity<AD: ActivityDefinition>(
396        &self,
397        _activity: AD,
398        input: impl Into<AD::Input>,
399        mut opts: ActivityOptions,
400    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
401    where
402        AD::Output: TemporalDeserializable,
403    {
404        let input = input.into();
405        let ctx = SerializationContext {
406            data: &SerializationContextData::Workflow,
407            converter: &self.inner.payload_converter,
408        };
409        let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
410            Ok(p) => p,
411            Err(e) => {
412                return ActivityFut::eager(e.into());
413            }
414        };
415        let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
416        let (cmd, unblocker) =
417            CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
418        if opts.task_queue.is_none() {
419            opts.task_queue = Some(self.inner.task_queue.clone());
420        }
421        self.send(
422            CommandCreateRequest {
423                cmd: opts.into_command(AD::name().to_string(), payloads, seq),
424                unblocker,
425            }
426            .into(),
427        );
428        ActivityFut::running(cmd, self.inner.payload_converter.clone())
429    }
430
431    /// Request to run a local activity
432    pub fn start_local_activity<AD: ActivityDefinition>(
433        &self,
434        _activity: AD,
435        input: impl Into<AD::Input>,
436        opts: LocalActivityOptions,
437    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
438    where
439        AD::Output: TemporalDeserializable,
440    {
441        let input = input.into();
442        let ctx = SerializationContext {
443            data: &SerializationContextData::Workflow,
444            converter: &self.inner.payload_converter,
445        };
446        let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
447            Ok(p) => p,
448            Err(e) => {
449                return ActivityFut::eager(e.into());
450            }
451        };
452        ActivityFut::running(
453            LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
454            self.inner.payload_converter.clone(),
455        )
456    }
457
458    /// Request to run a local activity with no implementation of timer-backoff based retrying.
459    fn local_activity_no_timer_retry(
460        self,
461        activity_type: String,
462        arguments: Vec<Payload>,
463        opts: LocalActivityOptions,
464    ) -> impl CancellableFuture<ActivityResolution> {
465        let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
466        let (cmd, unblocker) =
467            CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
468        self.inner
469            .chan
470            .send(
471                CommandCreateRequest {
472                    cmd: opts.into_command(activity_type, arguments, seq),
473                    unblocker,
474                }
475                .into(),
476            )
477            .expect("command channel intact");
478        cmd
479    }
480
481    fn send_signal_wf(
482        self,
483        target: sig_we::Target,
484        signal: Signal,
485    ) -> impl CancellableFuture<SignalExternalWfResult> {
486        let seq = self
487            .inner
488            .seq_nums
489            .borrow_mut()
490            .next_signal_external_wf_seq();
491        let (cmd, unblocker) =
492            CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
493        self.send(
494            CommandCreateRequest {
495                cmd: WorkflowCommand {
496                    variant: Some(
497                        SignalExternalWorkflowExecution {
498                            seq,
499                            signal_name: signal.signal_name,
500                            args: signal.data.input,
501                            target: Some(target),
502                            headers: signal.data.headers,
503                        }
504                        .into(),
505                    ),
506                    user_metadata: None,
507                },
508                unblocker,
509            }
510            .into(),
511        );
512        cmd
513    }
514}
515
516impl<W> SyncWorkflowContext<W> {
517    /// Return the workflow's unique identifier
518    pub fn workflow_id(&self) -> &str {
519        &self.base.inner.inital_information.workflow_id
520    }
521
522    /// Return the run id of this workflow execution
523    pub fn run_id(&self) -> &str {
524        &self.base.inner.run_id
525    }
526
527    /// Return the namespace the workflow is executing in
528    pub fn namespace(&self) -> &str {
529        &self.base.inner.namespace
530    }
531
532    /// Return the task queue the workflow is executing in
533    pub fn task_queue(&self) -> &str {
534        &self.base.inner.task_queue
535    }
536
537    /// Return the current time according to the workflow (which is not wall-clock time).
538    pub fn workflow_time(&self) -> Option<SystemTime> {
539        self.base.inner.shared.borrow().wf_time
540    }
541
542    /// Return the length of history so far at this point in the workflow
543    pub fn history_length(&self) -> u32 {
544        self.base.inner.shared.borrow().history_length
545    }
546
547    /// Return the deployment version, if any,  as it was when this point in the workflow was first
548    /// reached. If this code is being executed for the first time, return this Worker's deployment
549    /// version if it has one.
550    pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
551        self.base
552            .inner
553            .shared
554            .borrow()
555            .current_deployment_version
556            .clone()
557    }
558
559    /// Return current values for workflow search attributes
560    pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
561        Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
562    }
563
564    /// Return the workflow's randomness seed
565    pub fn random_seed(&self) -> u64 {
566        self.base.inner.shared.borrow().random_seed
567    }
568
569    /// Returns true if the current workflow task is happening under replay
570    pub fn is_replaying(&self) -> bool {
571        self.base.inner.shared.borrow().is_replaying
572    }
573
574    /// Returns true if the server suggests this workflow should continue-as-new
575    pub fn continue_as_new_suggested(&self) -> bool {
576        self.base.inner.shared.borrow().continue_as_new_suggested
577    }
578
579    /// Returns the headers for the current handler invocation (signal, update, query, etc.).
580    ///
581    /// When called from within a signal handler, returns the headers that were sent with that
582    /// signal. When called from the main workflow run method, returns an empty map.
583    pub fn headers(&self) -> &HashMap<String, Payload> {
584        &self.headers
585    }
586
587    /// Returns the [PayloadConverter] currently used by the worker running this workflow.
588    pub fn payload_converter(&self) -> &PayloadConverter {
589        &self.base.inner.payload_converter
590    }
591
592    /// Return various information that the workflow was initialized with. Will eventually become
593    /// a proper non-proto workflow info struct.
594    pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
595        &self.base.inner.inital_information
596    }
597
598    /// A future that resolves if/when the workflow is cancelled, with the user provided cause
599    pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
600        let am_cancelled = self.base.inner.am_cancelled.clone();
601        async move {
602            if let Some(s) = am_cancelled.borrow().as_ref() {
603                return s.clone();
604            }
605            am_cancelled
606                .clone()
607                .changed()
608                .await
609                .expect("Cancelled send half not dropped");
610            am_cancelled.borrow().as_ref().cloned().unwrap_or_default()
611        }
612        .fuse()
613    }
614
615    /// Request to create a timer
616    pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
617        self.base.timer(opts)
618    }
619
620    /// Request to run an activity
621    pub fn start_activity<AD: ActivityDefinition>(
622        &self,
623        activity: AD,
624        input: impl Into<AD::Input>,
625        opts: ActivityOptions,
626    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
627    where
628        AD::Output: TemporalDeserializable,
629    {
630        self.base.start_activity(activity, input, opts)
631    }
632
633    /// Request to run a local activity
634    pub fn start_local_activity<AD: ActivityDefinition>(
635        &self,
636        activity: AD,
637        input: impl Into<AD::Input>,
638        opts: LocalActivityOptions,
639    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
640    where
641        AD::Output: TemporalDeserializable,
642    {
643        self.base.start_local_activity(activity, input, opts)
644    }
645
646    /// Creates a child workflow stub with the provided options
647    pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
648        ChildWorkflow {
649            opts,
650            base_ctx: self.base.clone(),
651        }
652    }
653
654    /// Check (or record) that this workflow history was created with the provided patch
655    pub fn patched(&self, patch_id: &str) -> bool {
656        self.patch_impl(patch_id, false)
657    }
658
659    /// Record that this workflow history was created with the provided patch, and it is being
660    /// phased out.
661    pub fn deprecate_patch(&self, patch_id: &str) -> bool {
662        self.patch_impl(patch_id, true)
663    }
664
665    fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
666        self.base.send(
667            workflow_command::Variant::SetPatchMarker(SetPatchMarker {
668                patch_id: patch_id.to_string(),
669                deprecated,
670            })
671            .into(),
672        );
673        // See if we already know about the status of this change
674        if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
675            return *present;
676        }
677
678        // If we don't already know about the change, that means there is no marker in history,
679        // and we should return false if we are replaying
680        let res = !self.base.inner.shared.borrow().is_replaying;
681
682        self.base
683            .inner
684            .shared
685            .borrow_mut()
686            .changes
687            .insert(patch_id.to_string(), res);
688
689        res
690    }
691
692    /// Send a signal to an external workflow. May resolve as a failure if the signal didn't work
693    /// or was cancelled.
694    pub fn signal_workflow(
695        &self,
696        opts: impl Into<SignalWorkflowOptions>,
697    ) -> impl CancellableFuture<SignalExternalWfResult> {
698        let options: SignalWorkflowOptions = opts.into();
699        let target = sig_we::Target::WorkflowExecution(NamespacedWorkflowExecution {
700            namespace: self.base.inner.namespace.clone(),
701            workflow_id: options.workflow_id,
702            run_id: options.run_id.unwrap_or_default(),
703        });
704        self.base.clone().send_signal_wf(target, options.signal)
705    }
706
707    /// Add or create a set of search attributes
708    pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
709        self.base.send(RustWfCmd::NewNonblockingCmd(
710            workflow_command::Variant::UpsertWorkflowSearchAttributes(
711                UpsertWorkflowSearchAttributes {
712                    search_attributes: Some(SearchAttributes {
713                        indexed_fields: HashMap::from_iter(attr_iter),
714                    }),
715                },
716            ),
717        ))
718    }
719
720    /// Add or create a set of search attributes
721    pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
722        self.base.send(RustWfCmd::NewNonblockingCmd(
723            workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
724                upserted_memo: Some(Memo {
725                    fields: HashMap::from_iter(attr_iter),
726                }),
727            }),
728        ))
729    }
730
731    /// Force a workflow task failure (EX: in order to retry on non-sticky queue)
732    pub fn force_task_fail(&self, with: anyhow::Error) {
733        self.base.send(with.into());
734    }
735
736    /// Request the cancellation of an external workflow. May resolve as a failure if the workflow
737    /// was not found or the cancel was otherwise unsendable.
738    pub fn cancel_external(
739        &self,
740        target: NamespacedWorkflowExecution,
741        reason: String,
742    ) -> impl FusedFuture<Output = CancelExternalWfResult> {
743        let seq = self
744            .base
745            .inner
746            .seq_nums
747            .borrow_mut()
748            .next_cancel_external_wf_seq();
749        let (cmd, unblocker) = WFCommandFut::new();
750        self.base.send(
751            CommandCreateRequest {
752                cmd: WorkflowCommand {
753                    variant: Some(
754                        RequestCancelExternalWorkflowExecution {
755                            seq,
756                            workflow_execution: Some(target),
757                            reason,
758                        }
759                        .into(),
760                    ),
761                    user_metadata: None,
762                },
763                unblocker,
764            }
765            .into(),
766        );
767        cmd
768    }
769
770    /// Start a nexus operation
771    pub fn start_nexus_operation(
772        &self,
773        opts: NexusOperationOptions,
774    ) -> impl CancellableFuture<NexusStartResult> {
775        let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
776        let (result_future, unblocker) = WFCommandFut::new();
777        self.base
778            .send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker });
779        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
780            CancellableID::NexusOp(seq),
781            NexusUnblockData {
782                result_future: result_future.shared(),
783                schedule_seq: seq,
784                base_ctx: self.base.clone(),
785            },
786            self.base.clone(),
787        );
788        self.base.send(
789            CommandCreateRequest {
790                cmd: opts.into_command(seq),
791                unblocker,
792            }
793            .into(),
794        );
795        cmd
796    }
797
798    /// Create a read-only view of this context.
799    pub(crate) fn view(&self) -> WorkflowContextView {
800        self.base.view()
801    }
802}
803
804impl<W> WorkflowContext<W> {
805    /// Create a new wf context from a base context and workflow state.
806    pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
807        Self {
808            sync: SyncWorkflowContext {
809                base,
810                headers: Rc::new(HashMap::new()),
811                _phantom: PhantomData,
812            },
813            workflow_state,
814            condition_wakers: Rc::new(RefCell::new(Vec::new())),
815        }
816    }
817
818    /// Returns a new context with the specified headers set.
819    pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
820        Self {
821            sync: SyncWorkflowContext {
822                base: self.sync.base.clone(),
823                headers: Rc::new(headers),
824                _phantom: PhantomData,
825            },
826            workflow_state: self.workflow_state.clone(),
827            condition_wakers: self.condition_wakers.clone(),
828        }
829    }
830
831    /// Returns a [`SyncWorkflowContext`] extracted from this context.
832    pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
833        self.sync.clone()
834    }
835
836    // --- Delegated methods from SyncWorkflowContext ---
837
838    /// Return the workflow's unique identifier
839    pub fn workflow_id(&self) -> &str {
840        self.sync.workflow_id()
841    }
842
843    /// Return the run id of this workflow execution
844    pub fn run_id(&self) -> &str {
845        self.sync.run_id()
846    }
847
848    /// Return the namespace the workflow is executing in
849    pub fn namespace(&self) -> &str {
850        self.sync.namespace()
851    }
852
853    /// Return the task queue the workflow is executing in
854    pub fn task_queue(&self) -> &str {
855        self.sync.task_queue()
856    }
857
858    /// Return the current time according to the workflow (which is not wall-clock time).
859    pub fn workflow_time(&self) -> Option<SystemTime> {
860        self.sync.workflow_time()
861    }
862
863    /// Return the length of history so far at this point in the workflow
864    pub fn history_length(&self) -> u32 {
865        self.sync.history_length()
866    }
867
868    /// Return the deployment version, if any, as it was when this point in the workflow was first
869    /// reached. If this code is being executed for the first time, return this Worker's deployment
870    /// version if it has one.
871    pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
872        self.sync.current_deployment_version()
873    }
874
875    /// Return current values for workflow search attributes
876    pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
877        self.sync.search_attributes()
878    }
879
880    /// Return the workflow's randomness seed
881    pub fn random_seed(&self) -> u64 {
882        self.sync.random_seed()
883    }
884
885    /// Returns true if the current workflow task is happening under replay
886    pub fn is_replaying(&self) -> bool {
887        self.sync.is_replaying()
888    }
889
890    /// Returns true if the server suggests this workflow should continue-as-new
891    pub fn continue_as_new_suggested(&self) -> bool {
892        self.sync.continue_as_new_suggested()
893    }
894
895    /// Returns the headers for the current handler invocation (signal, update, query, etc.).
896    pub fn headers(&self) -> &HashMap<String, Payload> {
897        self.sync.headers()
898    }
899
900    /// Returns the [PayloadConverter] currently used by the worker running this workflow.
901    pub fn payload_converter(&self) -> &PayloadConverter {
902        self.sync.payload_converter()
903    }
904
905    /// Return various information that the workflow was initialized with.
906    pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
907        self.sync.workflow_initial_info()
908    }
909
910    /// A future that resolves if/when the workflow is cancelled, with the user provided cause
911    pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
912        self.sync.cancelled()
913    }
914
915    /// Request to create a timer
916    pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
917        self.sync.timer(opts)
918    }
919
920    /// Request to run an activity
921    pub fn start_activity<AD: ActivityDefinition>(
922        &self,
923        activity: AD,
924        input: impl Into<AD::Input>,
925        opts: ActivityOptions,
926    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
927    where
928        AD::Output: TemporalDeserializable,
929    {
930        self.sync.start_activity(activity, input, opts)
931    }
932
933    /// Request to run a local activity
934    pub fn start_local_activity<AD: ActivityDefinition>(
935        &self,
936        activity: AD,
937        input: impl Into<AD::Input>,
938        opts: LocalActivityOptions,
939    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
940    where
941        AD::Output: TemporalDeserializable,
942    {
943        self.sync.start_local_activity(activity, input, opts)
944    }
945
946    /// Creates a child workflow stub with the provided options
947    pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
948        self.sync.child_workflow(opts)
949    }
950
951    /// Check (or record) that this workflow history was created with the provided patch
952    pub fn patched(&self, patch_id: &str) -> bool {
953        self.sync.patched(patch_id)
954    }
955
956    /// Record that this workflow history was created with the provided patch, and it is being
957    /// phased out.
958    pub fn deprecate_patch(&self, patch_id: &str) -> bool {
959        self.sync.deprecate_patch(patch_id)
960    }
961
962    /// Send a signal to an external workflow.
963    pub fn signal_workflow(
964        &self,
965        opts: impl Into<SignalWorkflowOptions>,
966    ) -> impl CancellableFuture<SignalExternalWfResult> {
967        self.sync.signal_workflow(opts)
968    }
969
970    /// Add or create a set of search attributes
971    pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
972        self.sync.upsert_search_attributes(attr_iter)
973    }
974
975    /// Add or create a set of memo fields
976    pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
977        self.sync.upsert_memo(attr_iter)
978    }
979
980    /// Force a workflow task failure (EX: in order to retry on non-sticky queue)
981    pub fn force_task_fail(&self, with: anyhow::Error) {
982        self.sync.force_task_fail(with)
983    }
984
985    /// Request the cancellation of an external workflow.
986    pub fn cancel_external(
987        &self,
988        target: NamespacedWorkflowExecution,
989        reason: String,
990    ) -> impl FusedFuture<Output = CancelExternalWfResult> {
991        self.sync.cancel_external(target, reason)
992    }
993
994    /// Start a nexus operation
995    pub fn start_nexus_operation(
996        &self,
997        opts: NexusOperationOptions,
998    ) -> impl CancellableFuture<NexusStartResult> {
999        self.sync.start_nexus_operation(opts)
1000    }
1001
1002    /// Create a read-only view of this context.
1003    pub(crate) fn view(&self) -> WorkflowContextView {
1004        self.sync.view()
1005    }
1006
1007    /// Access workflow state immutably via closure.
1008    ///
1009    /// The borrow is scoped to the closure and cannot escape, preventing
1010    /// borrows from being held across await points.
1011    pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
1012        f(&*self.workflow_state.borrow())
1013    }
1014
1015    /// Access workflow state mutably via closure.
1016    ///
1017    /// The borrow is scoped to the closure and cannot escape, preventing
1018    /// borrows from being held across await points.
1019    ///
1020    /// After the mutation, all wakers registered by pending `wait_condition`
1021    /// futures are woken so that waker-based combinators (e.g.
1022    /// `FuturesOrdered`) re-poll them on the next pass.
1023    pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
1024        let result = f(&mut *self.workflow_state.borrow_mut());
1025        for waker in self.condition_wakers.borrow_mut().drain(..) {
1026            waker.wake();
1027        }
1028        result
1029    }
1030
1031    /// Wait for some condition on workflow state to become true, yielding the workflow if not.
1032    ///
1033    /// The condition closure receives an immutable reference to the workflow state,
1034    /// which is borrowed only for the duration of each poll (not across await points).
1035    pub fn wait_condition<'a>(
1036        &'a self,
1037        mut condition: impl FnMut(&W) -> bool + 'a,
1038    ) -> impl Future<Output = ()> + 'a {
1039        future::poll_fn(move |cx: &mut Context<'_>| {
1040            if condition(&*self.workflow_state.borrow()) {
1041                Poll::Ready(())
1042            } else {
1043                self.condition_wakers.borrow_mut().push(cx.waker().clone());
1044                Poll::Pending
1045            }
1046        })
1047    }
1048}
1049
1050struct WfCtxProtectedDat {
1051    next_timer_sequence_number: u32,
1052    next_activity_sequence_number: u32,
1053    next_child_workflow_sequence_number: u32,
1054    next_cancel_external_wf_sequence_number: u32,
1055    next_signal_external_wf_sequence_number: u32,
1056    next_nexus_op_sequence_number: u32,
1057}
1058
1059impl WfCtxProtectedDat {
1060    fn next_timer_seq(&mut self) -> u32 {
1061        let seq = self.next_timer_sequence_number;
1062        self.next_timer_sequence_number += 1;
1063        seq
1064    }
1065    fn next_activity_seq(&mut self) -> u32 {
1066        let seq = self.next_activity_sequence_number;
1067        self.next_activity_sequence_number += 1;
1068        seq
1069    }
1070    fn next_child_workflow_seq(&mut self) -> u32 {
1071        let seq = self.next_child_workflow_sequence_number;
1072        self.next_child_workflow_sequence_number += 1;
1073        seq
1074    }
1075    fn next_cancel_external_wf_seq(&mut self) -> u32 {
1076        let seq = self.next_cancel_external_wf_sequence_number;
1077        self.next_cancel_external_wf_sequence_number += 1;
1078        seq
1079    }
1080    fn next_signal_external_wf_seq(&mut self) -> u32 {
1081        let seq = self.next_signal_external_wf_sequence_number;
1082        self.next_signal_external_wf_sequence_number += 1;
1083        seq
1084    }
1085    fn next_nexus_op_seq(&mut self) -> u32 {
1086        let seq = self.next_nexus_op_sequence_number;
1087        self.next_nexus_op_sequence_number += 1;
1088        seq
1089    }
1090}
1091
1092#[derive(Clone, Debug, Default)]
1093pub(crate) struct WorkflowContextSharedData {
1094    /// Maps change ids -> resolved status
1095    pub(crate) changes: HashMap<String, bool>,
1096    pub(crate) is_replaying: bool,
1097    pub(crate) wf_time: Option<SystemTime>,
1098    pub(crate) history_length: u32,
1099    pub(crate) continue_as_new_suggested: bool,
1100    pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
1101    pub(crate) search_attributes: SearchAttributes,
1102    pub(crate) random_seed: u64,
1103}
1104
1105/// A Future that can be cancelled.
1106/// Used in the prototype SDK for cancelling operations like timers and activities.
1107pub trait CancellableFuture<T>: Future<Output = T> + FusedFuture {
1108    /// Cancel this Future
1109    fn cancel(&self);
1110}
1111
1112/// A Future that can be cancelled with a reason
1113pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
1114    /// Cancel this Future with a reason
1115    fn cancel_with_reason(&self, reason: String);
1116}
1117
1118struct WFCommandFut<T, D> {
1119    _unused: PhantomData<T>,
1120    result_rx: oneshot::Receiver<UnblockEvent>,
1121    other_dat: Option<D>,
1122}
1123impl<T> WFCommandFut<T, ()> {
1124    fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
1125        Self::new_with_dat(())
1126    }
1127}
1128
1129impl<T, D> WFCommandFut<T, D> {
1130    fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
1131        let (tx, rx) = oneshot::channel();
1132        (
1133            Self {
1134                _unused: PhantomData,
1135                result_rx: rx,
1136                other_dat: Some(other_dat),
1137            },
1138            tx,
1139        )
1140    }
1141}
1142
1143impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1144impl<T, D> Future for WFCommandFut<T, D>
1145where
1146    T: Unblockable<OtherDat = D>,
1147{
1148    type Output = T;
1149
1150    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1151        self.result_rx.poll_unpin(cx).map(|x| {
1152            let od = self
1153                .other_dat
1154                .take()
1155                .expect("Other data must exist when resolving command future");
1156            Unblockable::unblock(x.unwrap(), od)
1157        })
1158    }
1159}
1160impl<T, D> FusedFuture for WFCommandFut<T, D>
1161where
1162    T: Unblockable<OtherDat = D>,
1163{
1164    fn is_terminated(&self) -> bool {
1165        self.other_dat.is_none()
1166    }
1167}
1168
1169struct CancellableWFCommandFut<T, D, ID = CancellableID> {
1170    cmd_fut: WFCommandFut<T, D>,
1171    cancellable_id: ID,
1172    base_ctx: BaseWorkflowContext,
1173}
1174impl<T, ID> CancellableWFCommandFut<T, (), ID> {
1175    fn new(
1176        cancellable_id: ID,
1177        base_ctx: BaseWorkflowContext,
1178    ) -> (Self, oneshot::Sender<UnblockEvent>) {
1179        Self::new_with_dat(cancellable_id, (), base_ctx)
1180    }
1181}
1182impl<T, D, ID> CancellableWFCommandFut<T, D, ID> {
1183    fn new_with_dat(
1184        cancellable_id: ID,
1185        other_dat: D,
1186        base_ctx: BaseWorkflowContext,
1187    ) -> (Self, oneshot::Sender<UnblockEvent>) {
1188        let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
1189        (
1190            Self {
1191                cmd_fut,
1192                cancellable_id,
1193                base_ctx,
1194            },
1195            sender,
1196        )
1197    }
1198}
1199impl<T, D, ID> Unpin for CancellableWFCommandFut<T, D, ID> where T: Unblockable<OtherDat = D> {}
1200impl<T, D, ID> Future for CancellableWFCommandFut<T, D, ID>
1201where
1202    T: Unblockable<OtherDat = D>,
1203{
1204    type Output = T;
1205
1206    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1207        self.cmd_fut.poll_unpin(cx)
1208    }
1209}
1210impl<T, D, ID> FusedFuture for CancellableWFCommandFut<T, D, ID>
1211where
1212    T: Unblockable<OtherDat = D>,
1213{
1214    fn is_terminated(&self) -> bool {
1215        self.cmd_fut.is_terminated()
1216    }
1217}
1218
1219impl<T, D, ID> CancellableFuture<T> for CancellableWFCommandFut<T, D, ID>
1220where
1221    T: Unblockable<OtherDat = D>,
1222    ID: Clone + Into<CancellableID>,
1223{
1224    fn cancel(&self) {
1225        self.base_ctx.cancel(self.cancellable_id.clone().into());
1226    }
1227}
1228impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D, CancellableIDWithReason>
1229where
1230    T: Unblockable<OtherDat = D>,
1231{
1232    fn cancel_with_reason(&self, reason: String) {
1233        let new_id = self.cancellable_id.clone().with_reason(reason);
1234        self.base_ctx.cancel(new_id);
1235    }
1236}
1237
1238struct LATimerBackoffFut {
1239    la_opts: LocalActivityOptions,
1240    activity_type: String,
1241    arguments: Vec<Payload>,
1242    current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
1243    timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
1244    base_ctx: BaseWorkflowContext,
1245    next_attempt: u32,
1246    next_sched_time: Option<prost_types::Timestamp>,
1247    did_cancel: AtomicBool,
1248    terminated: bool,
1249}
1250impl LATimerBackoffFut {
1251    pub(crate) fn new(
1252        activity_type: String,
1253        arguments: Vec<Payload>,
1254        opts: LocalActivityOptions,
1255        base_ctx: BaseWorkflowContext,
1256    ) -> Self {
1257        let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
1258            activity_type.clone(),
1259            arguments.clone(),
1260            opts.clone(),
1261        ));
1262        Self {
1263            la_opts: opts,
1264            activity_type,
1265            arguments,
1266            current_fut,
1267            timer_fut: None,
1268            base_ctx,
1269            next_attempt: 1,
1270            next_sched_time: None,
1271            did_cancel: AtomicBool::new(false),
1272            terminated: false,
1273        }
1274    }
1275}
1276impl Unpin for LATimerBackoffFut {}
1277impl Future for LATimerBackoffFut {
1278    type Output = ActivityResolution;
1279
1280    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1281        // If the timer exists, wait for it first
1282        if let Some(tf) = self.timer_fut.as_mut() {
1283            return match tf.poll_unpin(cx) {
1284                Poll::Ready(tr) => {
1285                    self.timer_fut = None;
1286                    // Schedule next LA if this timer wasn't cancelled
1287                    if let TimerResult::Fired = tr {
1288                        let mut opts = self.la_opts.clone();
1289                        opts.attempt = Some(self.next_attempt);
1290                        opts.original_schedule_time
1291                            .clone_from(&self.next_sched_time);
1292                        self.current_fut =
1293                            Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
1294                                self.activity_type.clone(),
1295                                self.arguments.clone(),
1296                                opts,
1297                            ));
1298                        Poll::Pending
1299                    } else {
1300                        self.terminated = true;
1301                        Poll::Ready(ActivityResolution {
1302                            status: Some(
1303                                activity_resolution::Status::Cancelled(Default::default()),
1304                            ),
1305                        })
1306                    }
1307                }
1308                Poll::Pending => Poll::Pending,
1309            };
1310        }
1311        let poll_res = self.current_fut.poll_unpin(cx);
1312        if let Poll::Ready(ref r) = poll_res
1313            && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
1314        {
1315            // If we've already said we want to cancel, don't schedule the backoff timer. Just
1316            // return cancel status. This can happen if cancel comes after the LA says it wants
1317            // to back off but before we have scheduled the timer.
1318            if self.did_cancel.load(Ordering::Acquire) {
1319                self.terminated = true;
1320                return Poll::Ready(ActivityResolution {
1321                    status: Some(activity_resolution::Status::Cancelled(Default::default())),
1322                });
1323            }
1324
1325            let timer_f = self.base_ctx.timer::<Duration>(
1326                b.backoff_duration
1327                    .expect("Duration is set")
1328                    .try_into()
1329                    .expect("duration converts ok"),
1330            );
1331            self.timer_fut = Some(Box::pin(timer_f));
1332            self.next_attempt = b.attempt;
1333            self.next_sched_time.clone_from(&b.original_schedule_time);
1334            return Poll::Pending;
1335        }
1336        if poll_res.is_ready() {
1337            self.terminated = true;
1338        }
1339        poll_res
1340    }
1341}
1342impl FusedFuture for LATimerBackoffFut {
1343    fn is_terminated(&self) -> bool {
1344        self.terminated
1345    }
1346}
1347impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
1348    fn cancel(&self) {
1349        self.did_cancel.store(true, Ordering::Release);
1350        if let Some(tf) = self.timer_fut.as_ref() {
1351            tf.cancel();
1352        }
1353        self.current_fut.cancel();
1354    }
1355}
1356
1357/// Future for activity results. Either an immediate error or a running activity.
1358enum ActivityFut<F, Output> {
1359    /// Immediate error (e.g., input serialization failure). Resolves on first poll.
1360    Errored {
1361        error: Option<ActivityExecutionError>,
1362        _phantom: PhantomData<Output>,
1363    },
1364    /// Running activity that will deserialize output on completion.
1365    Running {
1366        inner: F,
1367        payload_converter: PayloadConverter,
1368        _phantom: PhantomData<Output>,
1369    },
1370    Terminated,
1371}
1372
1373impl<F, Output> ActivityFut<F, Output> {
1374    fn eager(err: ActivityExecutionError) -> Self {
1375        Self::Errored {
1376            error: Some(err),
1377            _phantom: PhantomData,
1378        }
1379    }
1380
1381    fn running(inner: F, payload_converter: PayloadConverter) -> Self {
1382        Self::Running {
1383            inner,
1384            payload_converter,
1385            _phantom: PhantomData,
1386        }
1387    }
1388}
1389
1390impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
1391
1392impl<F, Output> Future for ActivityFut<F, Output>
1393where
1394    F: Future<Output = ActivityResolution> + Unpin,
1395    Output: TemporalDeserializable + 'static,
1396{
1397    type Output = Result<Output, ActivityExecutionError>;
1398
1399    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1400        let this = self.get_mut();
1401        let poll = match this {
1402            ActivityFut::Errored { error, .. } => {
1403                Poll::Ready(Err(error.take().expect("polled after completion")))
1404            }
1405            ActivityFut::Running {
1406                inner,
1407                payload_converter,
1408                ..
1409            } => match Pin::new(inner).poll(cx) {
1410                Poll::Pending => Poll::Pending,
1411                Poll::Ready(resolution) => Poll::Ready({
1412                    let status = resolution.status.ok_or_else(|| {
1413                        ActivityExecutionError::Failed(Box::new(Failure {
1414                            message: "Activity completed without a status".to_string(),
1415                            ..Default::default()
1416                        }))
1417                    })?;
1418
1419                    match status {
1420                        activity_resolution::Status::Completed(success) => {
1421                            let payload = success.result.unwrap_or_default();
1422                            let ctx = SerializationContext {
1423                                data: &SerializationContextData::Workflow,
1424                                converter: payload_converter,
1425                            };
1426                            payload_converter
1427                                .from_payload::<Output>(&ctx, payload)
1428                                .map_err(ActivityExecutionError::Serialization)
1429                        }
1430                        activity_resolution::Status::Failed(f) => Err(
1431                            ActivityExecutionError::Failed(Box::new(f.failure.unwrap_or_default())),
1432                        ),
1433                        activity_resolution::Status::Cancelled(c) => {
1434                            Err(ActivityExecutionError::Cancelled(Box::new(
1435                                c.failure.unwrap_or_default(),
1436                            )))
1437                        }
1438                        activity_resolution::Status::Backoff(_) => {
1439                            panic!("DoBackoff should be handled by LATimerBackoffFut")
1440                        }
1441                    }
1442                }),
1443            },
1444            ActivityFut::Terminated => panic!("polled after termination"),
1445        };
1446        if poll.is_ready() {
1447            *this = ActivityFut::Terminated;
1448        }
1449        poll
1450    }
1451}
1452
1453impl<F, Output> FusedFuture for ActivityFut<F, Output>
1454where
1455    F: Future<Output = ActivityResolution> + Unpin,
1456    Output: TemporalDeserializable + 'static,
1457{
1458    fn is_terminated(&self) -> bool {
1459        matches!(self, ActivityFut::Terminated)
1460    }
1461}
1462
1463impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
1464where
1465    F: CancellableFuture<ActivityResolution> + Unpin,
1466    Output: TemporalDeserializable + 'static,
1467{
1468    fn cancel(&self) {
1469        if let ActivityFut::Running { inner, .. } = self {
1470            inner.cancel()
1471        }
1472    }
1473}
1474
1475/// A stub representing an unstarted child workflow.
1476#[derive(Clone, derive_more::Debug)]
1477pub struct ChildWorkflow {
1478    opts: ChildWorkflowOptions,
1479    #[debug(skip)]
1480    base_ctx: BaseWorkflowContext,
1481}
1482
1483pub(crate) struct ChildWfCommon {
1484    workflow_id: String,
1485    result_future: CancellableWFCommandFut<ChildWorkflowResult, (), CancellableIDWithReason>,
1486    base_ctx: BaseWorkflowContext,
1487}
1488
1489/// Child workflow in pending state
1490#[derive(derive_more::Debug)]
1491pub struct PendingChildWorkflow {
1492    /// The status of the child workflow start
1493    pub status: ChildWorkflowStartStatus,
1494    #[debug(skip)]
1495    pub(crate) common: ChildWfCommon,
1496}
1497
1498impl PendingChildWorkflow {
1499    /// Returns `None` if the child did not start successfully. The returned [StartedChildWorkflow]
1500    /// can be used to wait on, signal, or cancel the child workflow.
1501    pub fn into_started(self) -> Option<StartedChildWorkflow> {
1502        match self.status {
1503            ChildWorkflowStartStatus::Succeeded(s) => Some(StartedChildWorkflow {
1504                run_id: s.run_id,
1505                common: self.common,
1506            }),
1507            _ => None,
1508        }
1509    }
1510}
1511
1512/// Child workflow in started state
1513#[derive(derive_more::Debug)]
1514pub struct StartedChildWorkflow {
1515    /// Run ID of the child workflow
1516    pub run_id: String,
1517    #[debug(skip)]
1518    common: ChildWfCommon,
1519}
1520
1521impl ChildWorkflow {
1522    /// Start the child workflow, the returned Future is cancellable.
1523    pub fn start(self) -> impl CancellableFutureWithReason<PendingChildWorkflow> {
1524        let child_seq = self
1525            .base_ctx
1526            .inner
1527            .seq_nums
1528            .borrow_mut()
1529            .next_child_workflow_seq();
1530        // Immediately create the command/future for the result, otherwise if the user does
1531        // not await the result until *after* we receive an activation for it, there will be nothing
1532        // to match when unblocking.
1533        let cancel_seq = self
1534            .base_ctx
1535            .inner
1536            .seq_nums
1537            .borrow_mut()
1538            .next_cancel_external_wf_seq();
1539        let (result_cmd, unblocker) = CancellableWFCommandFut::new(
1540            CancellableIDWithReason::ExternalWorkflow {
1541                seqnum: cancel_seq,
1542                execution: NamespacedWorkflowExecution {
1543                    workflow_id: self.opts.workflow_id.clone(),
1544                    ..Default::default()
1545                },
1546            },
1547            self.base_ctx.clone(),
1548        );
1549        self.base_ctx.send(
1550            CommandSubscribeChildWorkflowCompletion {
1551                seq: child_seq,
1552                unblocker,
1553            }
1554            .into(),
1555        );
1556
1557        let common = ChildWfCommon {
1558            workflow_id: self.opts.workflow_id.clone(),
1559            result_future: result_cmd,
1560            base_ctx: self.base_ctx.clone(),
1561        };
1562
1563        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
1564            CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
1565            common,
1566            self.base_ctx.clone(),
1567        );
1568        self.base_ctx.send(
1569            CommandCreateRequest {
1570                cmd: self.opts.into_command(child_seq),
1571                unblocker,
1572            }
1573            .into(),
1574        );
1575
1576        cmd
1577    }
1578}
1579
1580impl StartedChildWorkflow {
1581    /// Consumes self and returns a future that will wait until completion of this child workflow
1582    /// execution
1583    pub fn result(self) -> impl CancellableFutureWithReason<ChildWorkflowResult> {
1584        self.common.result_future
1585    }
1586
1587    /// Cancel the child workflow
1588    pub fn cancel(&self, reason: String) {
1589        self.common.base_ctx.send(RustWfCmd::NewNonblockingCmd(
1590            CancelChildWorkflowExecution {
1591                child_workflow_seq: self.common.result_future.cancellable_id.seq_num(),
1592                reason,
1593            }
1594            .into(),
1595        ));
1596    }
1597
1598    /// Signal the child workflow
1599    pub fn signal<S: Into<Signal>>(
1600        &self,
1601        data: S,
1602    ) -> impl CancellableFuture<SignalExternalWfResult> + 'static {
1603        let target = sig_we::Target::ChildWorkflowId(self.common.workflow_id.clone());
1604        self.common
1605            .base_ctx
1606            .clone()
1607            .send_signal_wf(target, data.into())
1608    }
1609}
1610
1611#[derive(derive_more::Debug)]
1612#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
1613pub struct StartedNexusOperation {
1614    /// The operation token, if the operation started asynchronously
1615    pub operation_token: Option<String>,
1616    pub(crate) unblock_dat: NexusUnblockData,
1617}
1618
1619pub(crate) struct NexusUnblockData {
1620    result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
1621    schedule_seq: u32,
1622    base_ctx: BaseWorkflowContext,
1623}
1624
1625impl StartedNexusOperation {
1626    pub async fn result(&self) -> NexusOperationResult {
1627        self.unblock_dat.result_future.clone().await
1628    }
1629
1630    pub fn cancel(&self) {
1631        self.unblock_dat
1632            .base_ctx
1633            .cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
1634    }
1635}