Skip to main content

temporalio_workflow/
workflow_context.rs

1mod options;
2
3pub use options::{
4    ActivityCloseTimeouts, ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions,
5    ContinueAsNewVersioningBehavior, LocalActivityOptions, NexusOperationOptions, Signal,
6    SignalData, TimerOptions,
7};
8pub use temporalio_common_wasm::protos::coresdk::child_workflow::StartChildWorkflowExecutionFailedCause;
9
10use crate::runtime::{
11    SdkGuardedFuture, SdkWakeGuard,
12    entry::WorkflowImplementation,
13    host::WorkflowHost,
14    model::{
15        CancelExternalWfResult, CancellableID, NexusStartResult, SignalExternalWfResult,
16        TimerResult, UnblockEvent, Unblockable, WorkflowTermination,
17    },
18};
19use futures_channel::oneshot;
20use futures_util::{
21    FutureExt,
22    future::{FusedFuture, Shared},
23    task::Context,
24};
25use std::{
26    cell::{Cell, Ref, RefCell},
27    collections::HashMap,
28    future::{self, Future},
29    marker::PhantomData,
30    ops::Deref,
31    pin::Pin,
32    rc::Rc,
33    sync::atomic::{AtomicBool, Ordering},
34    task::{Poll, Waker},
35    time::{Duration, SystemTime},
36};
37use temporalio_common_wasm::{
38    ActivityDefinition, SignalDefinition, WorkflowDefinition,
39    data_converters::{
40        ActivityExecutionDecodeHint, ChildWorkflowExecutionDecodeHint,
41        ChildWorkflowStartDecodeHint, DataConverter, GenericPayloadConverter, PayloadConverter,
42        SerializationContext, SerializationContextData, TemporalDeserializable,
43        WorkflowSignalDecodeHint,
44    },
45    error::{
46        ActivityExecutionError, ChildWorkflowExecutionError, ChildWorkflowStartError,
47        WorkflowSignalError,
48    },
49    protos::{
50        coresdk::{
51            activity_result::{ActivityResolution, Cancellation, activity_resolution},
52            child_workflow::{ChildWorkflowResult, child_workflow_result},
53            common::NamespacedWorkflowExecution,
54            nexus::NexusOperationResult,
55            workflow_activation::{
56                InitializeWorkflow, WorkflowActivation as CoreWorkflowActivation,
57                resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
58                workflow_activation_job::Variant as ActivationVariant,
59            },
60            workflow_commands::{
61                CancelChildWorkflowExecution, CancelSignalWorkflow, CancelTimer,
62                ModifyWorkflowProperties, RequestCancelActivity,
63                RequestCancelExternalWorkflowExecution, RequestCancelLocalActivity,
64                RequestCancelNexusOperation, SetPatchMarker, SignalExternalWorkflowExecution,
65                UpsertWorkflowSearchAttributes, signal_external_workflow_execution,
66                workflow_command,
67            },
68        },
69        temporal::api::{
70            common::v1::{Memo, Payload, SearchAttributes},
71            failure::v1::{CanceledFailureInfo, Failure, failure::FailureInfo},
72        },
73        utilities::TryIntoOrNone,
74    },
75    worker::WorkerDeploymentVersion,
76};
77
78/// Non-generic base context containing all workflow execution infrastructure.
79///
80/// This is used internally by futures and commands that don't need typed workflow state.
81#[derive(Clone)]
82pub struct BaseWorkflowContext {
83    inner: Rc<WorkflowContextInner>,
84}
85impl BaseWorkflowContext {
86    pub(crate) fn apply_activation_context(&self, activation: &CoreWorkflowActivation) {
87        let mut shared = self.inner.shared.borrow_mut();
88        shared.activation = activation.clone();
89        if let Some(seed) = activation.jobs.iter().find_map(|job| match &job.variant {
90            Some(ActivationVariant::UpdateRandomSeed(attrs)) => Some(attrs.randomness_seed),
91            _ => None,
92        }) {
93            shared.random_seed = seed;
94        }
95    }
96
97    /// Returns the [`DataConverter`] associated with this workflow's worker.
98    pub fn data_converter(&self) -> &DataConverter {
99        &self.inner.data_converter
100    }
101
102    pub(crate) fn record_patch(&self, patch_id: String, present: bool) {
103        self.inner
104            .shared
105            .borrow_mut()
106            .changes
107            .insert(patch_id, present);
108    }
109
110    /// Create a read-only view of this context.
111    pub(crate) fn view(&self) -> WorkflowContextView {
112        WorkflowContextView::new(
113            self.inner.namespace.clone(),
114            self.inner.task_queue.clone(),
115            self.inner.run_id.clone(),
116            &self.inner.inital_information,
117        )
118    }
119}
120
121#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
122enum PendingCommandId {
123    Timer(u32),
124    Activity(u32),
125    ChildWorkflowStart(u32),
126    ChildWorkflowComplete(u32),
127    SignalExternal(u32),
128    CancelExternal(u32),
129    NexusOpStart(u32),
130    NexusOpComplete(u32),
131}
132
133impl PendingCommandId {
134    fn from_unblock_event(event: &UnblockEvent) -> Self {
135        match event {
136            UnblockEvent::Timer(seq, _) => Self::Timer(*seq),
137            UnblockEvent::Activity(seq, _) => Self::Activity(*seq),
138            UnblockEvent::WorkflowStart(seq, _) => Self::ChildWorkflowStart(*seq),
139            UnblockEvent::WorkflowComplete(seq, _) => Self::ChildWorkflowComplete(*seq),
140            UnblockEvent::SignalExternal(seq, _) => Self::SignalExternal(*seq),
141            UnblockEvent::CancelExternal(seq, _) => Self::CancelExternal(*seq),
142            UnblockEvent::NexusOperationStart(seq, _) => Self::NexusOpStart(*seq),
143            UnblockEvent::NexusOperationComplete(seq, _) => Self::NexusOpComplete(*seq),
144        }
145    }
146}
147
148struct WorkflowRuntimeState {
149    host: Rc<dyn WorkflowHost>,
150    pending_unblocks: RefCell<HashMap<PendingCommandId, oneshot::Sender<UnblockEvent>>>,
151    forced_wft_failure: RefCell<Option<Box<dyn std::error::Error + Send + Sync>>>,
152    progress_made: Cell<bool>,
153}
154
155impl WorkflowRuntimeState {
156    fn new(host: Rc<dyn WorkflowHost>) -> Self {
157        Self {
158            host,
159            pending_unblocks: RefCell::new(HashMap::new()),
160            forced_wft_failure: RefCell::new(None),
161            progress_made: Cell::new(false),
162        }
163    }
164
165    fn register_unblocker(&self, id: PendingCommandId, unblocker: oneshot::Sender<UnblockEvent>) {
166        self.pending_unblocks.borrow_mut().insert(id, unblocker);
167    }
168
169    fn unblock(&self, event: UnblockEvent) -> Result<(), anyhow::Error> {
170        let id = PendingCommandId::from_unblock_event(&event);
171        let unblocker = self
172            .pending_unblocks
173            .borrow_mut()
174            .remove(&id)
175            .ok_or_else(|| anyhow::anyhow!("Command {id:?} not found to unblock"))?;
176        self.progress_made.set(true);
177        let _guard = SdkWakeGuard::new();
178        let _ = unblocker.send(event);
179        Ok(())
180    }
181
182    fn maybe_unblock(&self, event: UnblockEvent) -> bool {
183        let id = PendingCommandId::from_unblock_event(&event);
184        let Some(unblocker) = self.pending_unblocks.borrow_mut().remove(&id) else {
185            return false;
186        };
187        self.progress_made.set(true);
188        let _guard = SdkWakeGuard::new();
189        let _ = unblocker.send(event);
190        true
191    }
192
193    fn set_forced_wft_failure(&self, err: Box<dyn std::error::Error + Send + Sync>) {
194        *self.forced_wft_failure.borrow_mut() = Some(err);
195        self.progress_made.set(true);
196    }
197
198    fn take_forced_wft_failure(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> {
199        self.forced_wft_failure.borrow_mut().take()
200    }
201
202    fn mark_progress(&self) {
203        self.progress_made.set(true);
204    }
205
206    fn take_progress(&self) -> bool {
207        self.progress_made.replace(false)
208    }
209}
210
211struct WorkflowContextInner {
212    namespace: String,
213    task_queue: String,
214    run_id: String,
215    inital_information: InitializeWorkflow,
216    runtime: WorkflowRuntimeState,
217    cancelled_reason: RefCell<Option<String>>,
218    cancel_wakers: RefCell<Vec<Waker>>,
219    shared: RefCell<WorkflowContextSharedData>,
220    seq_nums: RefCell<WfCtxProtectedDat>,
221    data_converter: DataConverter,
222    state_mutated: Cell<bool>,
223}
224
225/// Context provided to synchronous signal and update handlers.
226///
227/// This type provides all workflow context capabilities except `state()`, `state_mut()`,
228/// and `wait_condition()`. Those methods are not applicable in sync handler contexts.
229///
230/// Sync handlers receive `&mut self` directly, so they can reference and mutate workflow state without
231/// needing `state()`/`state_mut()`.
232pub struct SyncWorkflowContext<W> {
233    base: BaseWorkflowContext,
234    /// Headers from the current handler invocation (signal, update, etc.)
235    headers: Rc<HashMap<String, Payload>>,
236    _phantom: PhantomData<W>,
237}
238
239impl<W> Clone for SyncWorkflowContext<W> {
240    fn clone(&self) -> Self {
241        Self {
242            base: self.base.clone(),
243            headers: self.headers.clone(),
244            _phantom: PhantomData,
245        }
246    }
247}
248
249/// Used within workflows to issue commands, get info, etc.
250///
251/// The type parameter `W` represents the workflow type. This enables type-safe
252/// access to workflow state via `state_mut()` for mutations.
253pub struct WorkflowContext<W> {
254    sync: SyncWorkflowContext<W>,
255    /// The workflow instance
256    workflow_state: Rc<RefCell<W>>,
257    /// Wakers registered by `wait_condition` futures. Drained and woken on
258    /// every `state_mut` call so that waker-based combinators (e.g.
259    /// `FuturesOrdered`) re-poll the condition after state changes.
260    condition_wakers: Rc<RefCell<Vec<Waker>>>,
261}
262
263impl<W> Clone for WorkflowContext<W> {
264    fn clone(&self) -> Self {
265        Self {
266            sync: self.sync.clone(),
267            workflow_state: self.workflow_state.clone(),
268            condition_wakers: self.condition_wakers.clone(),
269        }
270    }
271}
272
273/// Read-only view of workflow context for use in init and query handlers.
274///
275/// This provides access to workflow information but cannot issue commands.
276#[derive(Clone, Debug)]
277#[non_exhaustive]
278pub struct WorkflowContextView {
279    /// The workflow's unique identifier
280    pub workflow_id: String,
281    /// The run id of this workflow execution
282    pub run_id: String,
283    /// The workflow type name
284    pub workflow_type: String,
285    /// The task queue this workflow is executing on
286    pub task_queue: String,
287    /// The namespace this workflow is executing in
288    pub namespace: String,
289
290    /// The current attempt number (starting from 1)
291    pub attempt: u32,
292    /// The run id of the very first execution in the chain
293    pub first_execution_run_id: String,
294    /// The run id of the previous execution if this is a continuation
295    pub continued_from_run_id: Option<String>,
296
297    /// When the workflow execution started
298    pub start_time: Option<SystemTime>,
299    /// Total workflow execution timeout including retries and continue as new
300    pub execution_timeout: Option<Duration>,
301    /// Timeout of a single workflow run
302    pub run_timeout: Option<Duration>,
303    /// Timeout of a single workflow task
304    pub task_timeout: Option<Duration>,
305
306    /// Information about the parent workflow, if this is a child workflow
307    pub parent: Option<ParentWorkflowInfo>,
308    /// Information about the root workflow in the execution chain
309    pub root: Option<RootWorkflowInfo>,
310
311    /// The workflow's retry policy
312    pub retry_policy:
313        Option<temporalio_common_wasm::protos::temporal::api::common::v1::RetryPolicy>,
314    /// If this workflow runs on a cron schedule
315    pub cron_schedule: Option<String>,
316    /// User-defined memo
317    pub memo: Option<Memo>,
318    /// Initial search attributes
319    pub search_attributes: Option<SearchAttributes>,
320}
321
322/// Information about a parent workflow.
323#[derive(Clone, Debug)]
324#[non_exhaustive]
325pub struct ParentWorkflowInfo {
326    /// The parent workflow's unique identifier
327    pub workflow_id: String,
328    /// The parent workflow's run id
329    pub run_id: String,
330    /// The parent workflow's namespace
331    pub namespace: String,
332}
333
334/// Information about the root workflow in an execution chain.
335#[derive(Clone, Debug)]
336#[non_exhaustive]
337pub struct RootWorkflowInfo {
338    /// The root workflow's unique identifier
339    pub workflow_id: String,
340    /// The root workflow's run id
341    pub run_id: String,
342}
343
344impl WorkflowContextView {
345    /// Create a new view from workflow initialization data.
346    pub(crate) fn new(
347        namespace: String,
348        task_queue: String,
349        run_id: String,
350        init: &InitializeWorkflow,
351    ) -> Self {
352        let parent = init
353            .parent_workflow_info
354            .as_ref()
355            .map(|p| ParentWorkflowInfo {
356                workflow_id: p.workflow_id.clone(),
357                run_id: p.run_id.clone(),
358                namespace: p.namespace.clone(),
359            });
360
361        let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
362            workflow_id: r.workflow_id.clone(),
363            run_id: r.run_id.clone(),
364        });
365
366        let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
367            None
368        } else {
369            Some(init.continued_from_execution_run_id.clone())
370        };
371
372        let cron_schedule = if init.cron_schedule.is_empty() {
373            None
374        } else {
375            Some(init.cron_schedule.clone())
376        };
377
378        Self {
379            workflow_id: init.workflow_id.clone(),
380            run_id,
381            workflow_type: init.workflow_type.clone(),
382            task_queue,
383            namespace,
384            attempt: init.attempt as u32,
385            first_execution_run_id: init.first_execution_run_id.clone(),
386            continued_from_run_id,
387            start_time: init.start_time.and_then(|t| t.try_into().ok()),
388            execution_timeout: init
389                .workflow_execution_timeout
390                .and_then(|d| d.try_into().ok()),
391            run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
392            task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
393            parent,
394            root,
395            retry_policy: init.retry_policy.clone(),
396            cron_schedule,
397            memo: init.memo.clone(),
398            search_attributes: init.search_attributes.clone(),
399        }
400    }
401}
402
403impl BaseWorkflowContext {
404    /// Create a new base context backed by the provided runtime host.
405    #[doc(hidden)]
406    pub fn new(
407        namespace: String,
408        task_queue: String,
409        run_id: String,
410        init_workflow_job: InitializeWorkflow,
411        data_converter: DataConverter,
412        host: Rc<dyn WorkflowHost>,
413    ) -> Self {
414        Self {
415            inner: Rc::new(WorkflowContextInner {
416                namespace,
417                task_queue,
418                run_id,
419                shared: RefCell::new(WorkflowContextSharedData {
420                    random_seed: init_workflow_job.randomness_seed,
421                    search_attributes: init_workflow_job
422                        .search_attributes
423                        .clone()
424                        .unwrap_or_default(),
425                    ..Default::default()
426                }),
427                inital_information: init_workflow_job,
428                runtime: WorkflowRuntimeState::new(host),
429                cancelled_reason: RefCell::new(None),
430                cancel_wakers: RefCell::new(Vec::new()),
431                seq_nums: RefCell::new(WfCtxProtectedDat {
432                    next_timer_sequence_number: 1,
433                    next_activity_sequence_number: 1,
434                    next_child_workflow_sequence_number: 1,
435                    next_cancel_external_wf_sequence_number: 1,
436                    next_signal_external_wf_sequence_number: 1,
437                    next_nexus_op_sequence_number: 1,
438                }),
439                data_converter,
440                state_mutated: Cell::new(false),
441            }),
442        }
443    }
444
445    /// Check and clear the state_mutated flag. Returns `true` if `state_mut`
446    /// was called since the last time this method was invoked.
447    pub(crate) fn take_state_mutated(&self) -> bool {
448        self.inner.state_mutated.replace(false)
449    }
450
451    /// Mark that workflow state has been mutated.
452    pub(crate) fn set_state_mutated(&self) {
453        self.inner.state_mutated.set(true);
454    }
455
456    pub(crate) fn take_runtime_progress(&self) -> bool {
457        self.inner.runtime.take_progress()
458    }
459
460    pub(crate) fn take_forced_wft_failure(
461        &self,
462    ) -> Option<Box<dyn std::error::Error + Send + Sync>> {
463        self.inner.runtime.take_forced_wft_failure()
464    }
465
466    pub(crate) fn notify_cancel(&self, reason: String) {
467        let _guard = SdkWakeGuard::new();
468        *self.inner.cancelled_reason.borrow_mut() = Some(reason);
469        for waker in self.inner.cancel_wakers.borrow_mut().drain(..) {
470            waker.wake();
471        }
472        self.inner.runtime.mark_progress();
473    }
474
475    pub(crate) fn unblock(&self, event: UnblockEvent) -> Result<(), anyhow::Error> {
476        self.inner.runtime.unblock(event)
477    }
478
479    /// Cancel any cancellable operation by ID
480    fn cancel(&self, cancellable_id: CancellableID) {
481        match cancellable_id {
482            CancellableID::Timer(seq) => {
483                if self
484                    .inner
485                    .runtime
486                    .maybe_unblock(UnblockEvent::Timer(seq, TimerResult::Cancelled))
487                {
488                    self.inner.runtime.host.push_command(
489                        workflow_command::Variant::CancelTimer(CancelTimer { seq }).into(),
490                    );
491                }
492            }
493            CancellableID::Activity(seq) => {
494                self.inner.runtime.host.push_command(
495                    workflow_command::Variant::RequestCancelActivity(RequestCancelActivity { seq })
496                        .into(),
497                );
498            }
499            CancellableID::LocalActivity(seq) => {
500                self.inner.runtime.host.push_command(
501                    workflow_command::Variant::RequestCancelLocalActivity(
502                        RequestCancelLocalActivity { seq },
503                    )
504                    .into(),
505                );
506            }
507            CancellableID::ChildWorkflow { seqnum, reason } => {
508                self.inner.runtime.host.push_command(
509                    workflow_command::Variant::CancelChildWorkflowExecution(
510                        CancelChildWorkflowExecution {
511                            child_workflow_seq: seqnum,
512                            reason,
513                        },
514                    )
515                    .into(),
516                );
517            }
518            CancellableID::SignalExternalWorkflow(seq) => {
519                self.inner.runtime.host.push_command(
520                    workflow_command::Variant::CancelSignalWorkflow(CancelSignalWorkflow { seq })
521                        .into(),
522                );
523            }
524            CancellableID::NexusOp(seq) => {
525                self.inner.runtime.host.push_command(
526                    workflow_command::Variant::RequestCancelNexusOperation(
527                        RequestCancelNexusOperation { seq },
528                    )
529                    .into(),
530                );
531            }
532        }
533    }
534
535    /// Return the current value of current_details.
536    pub fn current_details(&self) -> String {
537        self.inner.shared.borrow().current_details.clone()
538    }
539
540    /// Request to create a timer
541    pub fn timer<T: Into<TimerOptions>>(
542        &self,
543        opts: T,
544    ) -> impl CancellableFuture<TimerResult> + use<T> {
545        let opts: TimerOptions = opts.into();
546        let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
547        let (cmd, unblocker) =
548            CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
549        self.inner
550            .runtime
551            .register_unblocker(PendingCommandId::Timer(seq), unblocker);
552        self.inner.runtime.host.push_command(opts.into_command(seq));
553        cmd
554    }
555
556    /// Request to run an activity
557    pub fn start_activity<AD: ActivityDefinition>(
558        &self,
559        _activity: AD,
560        input: impl Into<AD::Input>,
561        mut opts: ActivityOptions,
562    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
563    where
564        AD::Output: TemporalDeserializable,
565    {
566        let input = input.into();
567        let payload_converter = self.inner.data_converter.payload_converter();
568        let ctx = SerializationContext {
569            data: &SerializationContextData::Workflow,
570            converter: payload_converter,
571        };
572        let payloads = match payload_converter.to_payloads(&ctx, &input) {
573            Ok(p) => p,
574            Err(e) => {
575                return ActivityFut::eager(e.into());
576            }
577        };
578        let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
579        let (cmd, unblocker) =
580            CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
581        self.inner
582            .runtime
583            .register_unblocker(PendingCommandId::Activity(seq), unblocker);
584        if opts.task_queue.is_none() {
585            opts.task_queue = Some(self.inner.task_queue.clone());
586        }
587        self.inner.runtime.host.push_command(opts.into_command(
588            seq,
589            AD::name().to_string(),
590            payloads,
591        ));
592        ActivityFut::running(cmd, self.inner.data_converter.clone())
593    }
594
595    /// Request to run a local activity
596    pub fn start_local_activity<AD: ActivityDefinition>(
597        &self,
598        _activity: AD,
599        input: impl Into<AD::Input>,
600        opts: LocalActivityOptions,
601    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
602    where
603        AD::Output: TemporalDeserializable,
604    {
605        let input = input.into();
606        let payload_converter = self.inner.data_converter.payload_converter();
607        let ctx = SerializationContext {
608            data: &SerializationContextData::Workflow,
609            converter: payload_converter,
610        };
611        let payloads = match payload_converter.to_payloads(&ctx, &input) {
612            Ok(p) => p,
613            Err(e) => {
614                return ActivityFut::eager(e.into());
615            }
616        };
617        ActivityFut::running(
618            LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
619            self.inner.data_converter.clone(),
620        )
621    }
622
623    /// Start a child workflow with typed input/output.
624    fn start_child_workflow<WD: WorkflowDefinition>(
625        &self,
626        workflow: WD,
627        input: impl Into<WD::Input>,
628        opts: ChildWorkflowOptions,
629    ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
630    where
631        WD::Output: TemporalDeserializable,
632    {
633        let input = input.into();
634        let payload_converter = self.inner.data_converter.payload_converter();
635        let ctx = SerializationContext {
636            data: &SerializationContextData::Workflow,
637            converter: payload_converter,
638        };
639        let payloads = match payload_converter.to_payloads(&ctx, &input) {
640            Ok(p) => p,
641            Err(e) => {
642                return ChildWorkflowStartFut::eager(e.into());
643            }
644        };
645        let workflow_type = workflow.name().to_string();
646
647        let child_seq = self.inner.seq_nums.borrow_mut().next_child_workflow_seq();
648        // Immediately create the command/future for the result, otherwise if the user does
649        // not await the result until *after* we receive an activation for it, there will be nothing
650        // to match when unblocking.
651        let (result_cmd, unblocker) = CancellableWFCommandFut::new(
652            CancellableID::ChildWorkflow {
653                seqnum: child_seq,
654                reason: String::new(),
655            },
656            self.clone(),
657        );
658        self.inner.runtime.register_unblocker(
659            PendingCommandId::ChildWorkflowComplete(child_seq),
660            unblocker,
661        );
662
663        let common = ChildWfCommon {
664            workflow_id: opts.workflow_id.clone(),
665            child_seq,
666            result_future: result_cmd,
667            base_ctx: self.clone(),
668            data_converter: self.inner.data_converter.clone(),
669        };
670
671        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
672            CancellableID::ChildWorkflow {
673                seqnum: child_seq,
674                reason: String::new(),
675            },
676            common,
677            self.clone(),
678        );
679        self.inner
680            .runtime
681            .register_unblocker(PendingCommandId::ChildWorkflowStart(child_seq), unblocker);
682        self.inner
683            .runtime
684            .host
685            .push_command(opts.into_command(child_seq, workflow_type, payloads));
686
687        ChildWorkflowStartFut::Running(cmd)
688    }
689
690    /// Request to run a local activity with no implementation of timer-backoff based retrying.
691    fn local_activity_no_timer_retry(
692        self,
693        activity_type: String,
694        arguments: Vec<Payload>,
695        opts: LocalActivityOptions,
696    ) -> impl CancellableFuture<ActivityResolution> {
697        let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
698        let (cmd, unblocker) =
699            CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
700        self.inner
701            .runtime
702            .register_unblocker(PendingCommandId::Activity(seq), unblocker);
703        self.inner
704            .runtime
705            .host
706            .push_command(opts.into_command(seq, activity_type, arguments));
707        cmd
708    }
709
710    fn send_signal_wf(
711        self,
712        target: signal_external_workflow_execution::Target,
713        signal: Signal,
714    ) -> impl CancellableFuture<SignalExternalWfResult> {
715        let seq = self
716            .inner
717            .seq_nums
718            .borrow_mut()
719            .next_signal_external_wf_seq();
720        let (cmd, unblocker) =
721            CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
722        self.inner
723            .runtime
724            .register_unblocker(PendingCommandId::SignalExternal(seq), unblocker);
725        let signal = signal.into_invocation();
726        self.inner.runtime.host.push_command(
727            workflow_command::Variant::SignalExternalWorkflowExecution(
728                SignalExternalWorkflowExecution {
729                    seq,
730                    signal_name: signal.signal_name,
731                    args: signal.input,
732                    target: Some(target),
733                    headers: signal.headers,
734                },
735            )
736            .into(),
737        );
738        cmd
739    }
740}
741
742impl<W> SyncWorkflowContext<W> {
743    /// Return the workflow's unique identifier
744    pub fn workflow_id(&self) -> &str {
745        &self.base.inner.inital_information.workflow_id
746    }
747
748    /// Return the run id of this workflow execution
749    pub fn run_id(&self) -> &str {
750        &self.base.inner.run_id
751    }
752
753    /// Return the namespace the workflow is executing in
754    pub fn namespace(&self) -> &str {
755        &self.base.inner.namespace
756    }
757
758    /// Return the task queue the workflow is executing in
759    pub fn task_queue(&self) -> &str {
760        &self.base.inner.task_queue
761    }
762
763    /// Return the current time according to the workflow (which is not wall-clock time).
764    pub fn workflow_time(&self) -> Option<SystemTime> {
765        self.base
766            .inner
767            .shared
768            .borrow()
769            .activation
770            .timestamp
771            .try_into_or_none()
772    }
773
774    /// Return the length of history so far at this point in the workflow
775    pub fn history_length(&self) -> u32 {
776        self.base.inner.shared.borrow().activation.history_length
777    }
778
779    /// Return the deployment version, if any,  as it was when this point in the workflow was first
780    /// reached. If this code is being executed for the first time, return this Worker's deployment
781    /// version if it has one.
782    pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
783        self.base
784            .inner
785            .shared
786            .borrow()
787            .activation
788            .clone()
789            .deployment_version_for_current_task
790            .map(Into::into)
791    }
792
793    /// Return current values for workflow search attributes
794    pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
795        Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
796    }
797
798    /// Return the workflow's randomness seed
799    pub fn random_seed(&self) -> u64 {
800        self.base.inner.shared.borrow().random_seed
801    }
802
803    /// Returns true if the current workflow task is happening under replay
804    pub fn is_replaying(&self) -> bool {
805        self.base.inner.shared.borrow().activation.is_replaying
806    }
807
808    /// Returns true if the server suggests this workflow should continue-as-new
809    pub fn continue_as_new_suggested(&self) -> bool {
810        self.base
811            .inner
812            .shared
813            .borrow()
814            .activation
815            .continue_as_new_suggested
816    }
817
818    /// Returns true if the workflow's target worker deployment version changed.
819    ///
820    /// This experimental signal is intended for workers using worker deployment versioning.
821    pub fn target_worker_deployment_version_changed(&self) -> bool {
822        self.base
823            .inner
824            .shared
825            .borrow()
826            .activation
827            .target_worker_deployment_version_changed
828    }
829
830    /// Returns the headers for the current handler invocation (signal, update, query, etc.).
831    ///
832    /// When called from within a signal handler, returns the headers that were sent with that
833    /// signal. When called from the main workflow run method, returns an empty map.
834    pub fn headers(&self) -> &HashMap<String, Payload> {
835        &self.headers
836    }
837
838    /// Returns the [PayloadConverter] currently used by the worker running this workflow.
839    pub fn payload_converter(&self) -> &PayloadConverter {
840        self.base.inner.data_converter.payload_converter()
841    }
842
843    /// Return various information that the workflow was initialized with. Will eventually become
844    /// a proper non-proto workflow info struct.
845    pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
846        &self.base.inner.inital_information
847    }
848
849    /// A future that resolves if/when the workflow is cancelled, with the user provided cause
850    pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
851        let inner = self.base.inner.clone();
852        future::poll_fn(move |cx| {
853            if let Some(reason) = inner.cancelled_reason.borrow().as_ref() {
854                Poll::Ready(reason.clone())
855            } else {
856                inner.cancel_wakers.borrow_mut().push(cx.waker().clone());
857                Poll::Pending
858            }
859        })
860        .fuse()
861    }
862
863    /// Signal that this workflow should continue as a new workflow execution with the given input and
864    /// options.
865    ///
866    /// This always returns an `Err` which should be propigated.
867    pub fn continue_as_new(
868        &self,
869        input: &<W::Run as WorkflowDefinition>::Input,
870        opts: ContinueAsNewOptions,
871    ) -> Result<std::convert::Infallible, WorkflowTermination>
872    where
873        W: WorkflowImplementation,
874    {
875        let pc = self.base.inner.data_converter.payload_converter();
876        let ctx = SerializationContext {
877            data: &SerializationContextData::Workflow,
878            converter: pc,
879        };
880        let arguments = pc
881            .to_payloads(&ctx, input)
882            .map_err(WorkflowTermination::from)?;
883        let workflow_type = self.workflow_initial_info().workflow_type.clone();
884        let request = opts.into_request(workflow_type, arguments);
885        Err(WorkflowTermination::continue_as_new(request))
886    }
887
888    /// Request to create a timer
889    pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
890        self.base.timer(opts)
891    }
892
893    /// Request to run an activity
894    pub fn start_activity<AD: ActivityDefinition>(
895        &self,
896        activity: AD,
897        input: impl Into<AD::Input>,
898        opts: ActivityOptions,
899    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
900    where
901        AD::Output: TemporalDeserializable,
902    {
903        self.base.start_activity(activity, input, opts)
904    }
905
906    /// Request to run a local activity
907    pub fn start_local_activity<AD: ActivityDefinition>(
908        &self,
909        activity: AD,
910        input: impl Into<AD::Input>,
911        opts: LocalActivityOptions,
912    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
913    where
914        AD::Output: TemporalDeserializable,
915    {
916        self.base.start_local_activity(activity, input, opts)
917    }
918
919    /// Start a child workflow. Returns a future that resolves to a [StartedChildWorkflow]
920    /// which can be used to await the result, send signals, or cancel the child.
921    pub fn start_child_workflow<WD: WorkflowDefinition>(
922        &self,
923        workflow: WD,
924        input: impl Into<WD::Input>,
925        opts: ChildWorkflowOptions,
926    ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
927    where
928        WD::Output: TemporalDeserializable,
929    {
930        self.base.start_child_workflow(workflow, input, opts)
931    }
932
933    /// Deprecated alias for [`SyncWorkflowContext::start_child_workflow`].
934    #[deprecated(note = "use `start_child_workflow` instead")]
935    pub fn child_workflow<WD: WorkflowDefinition>(
936        &self,
937        workflow: WD,
938        input: impl Into<WD::Input>,
939        opts: ChildWorkflowOptions,
940    ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
941    where
942        WD::Output: TemporalDeserializable,
943    {
944        self.start_child_workflow(workflow, input, opts)
945    }
946
947    /// Check (or record) that this workflow history was created with the provided patch
948    pub fn patched(&self, patch_id: &str) -> bool {
949        self.patch_impl(patch_id, false)
950    }
951
952    /// Record that this workflow history was created with the provided patch, and it is being
953    /// phased out.
954    pub fn deprecate_patch(&self, patch_id: &str) -> bool {
955        self.patch_impl(patch_id, true)
956    }
957
958    fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
959        self.base.inner.runtime.host.push_command(
960            workflow_command::Variant::SetPatchMarker(SetPatchMarker {
961                patch_id: patch_id.to_string(),
962                deprecated,
963            })
964            .into(),
965        );
966        // See if we already know about the status of this change
967        if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
968            return *present;
969        }
970
971        // If we don't already know about the change, that means there is no marker in history,
972        // and we should return false if we are replaying
973        let res = !self.base.inner.shared.borrow().activation.is_replaying;
974
975        self.base
976            .inner
977            .shared
978            .borrow_mut()
979            .changes
980            .insert(patch_id.to_string(), res);
981
982        res
983    }
984
985    /// Get a handle to an external workflow for sending signals or requesting cancellation.
986    pub fn external_workflow(
987        &self,
988        workflow_id: impl Into<String>,
989        run_id: Option<String>,
990    ) -> ExternalWorkflowHandle {
991        ExternalWorkflowHandle {
992            workflow_id: workflow_id.into(),
993            run_id,
994            namespace: self.base.inner.namespace.clone(),
995            base_ctx: self.base.clone(),
996        }
997    }
998
999    /// Add or create a set of search attributes
1000    pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1001        self.base.inner.runtime.host.push_command(
1002            workflow_command::Variant::UpsertWorkflowSearchAttributes(
1003                UpsertWorkflowSearchAttributes {
1004                    search_attributes: Some(SearchAttributes {
1005                        indexed_fields: attr_iter.into_iter().collect(),
1006                    }),
1007                },
1008            )
1009            .into(),
1010        );
1011    }
1012
1013    /// Add or create a set of search attributes
1014    pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1015        self.base.inner.runtime.host.push_command(
1016            workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
1017                upserted_memo: Some(Memo {
1018                    fields: attr_iter.into_iter().collect(),
1019                }),
1020            })
1021            .into(),
1022        );
1023    }
1024
1025    /// Set the current details string for this workflow execution.
1026    ///
1027    /// The value is surfaced to the Temporal server UI in real time via the
1028    /// the workflow metadata query.
1029    pub fn set_current_details(&self, details: impl Into<String>) {
1030        let details = details.into();
1031        self.base.inner.shared.borrow_mut().current_details = details.clone();
1032        self.base.inner.runtime.host.set_current_details(details);
1033    }
1034
1035    /// Force a workflow task failure (EX: in order to retry on non-sticky queue)
1036    pub fn force_task_fail(&self, with: impl Into<Box<dyn std::error::Error + Send + Sync>>) {
1037        self.base.inner.runtime.set_forced_wft_failure(with.into());
1038    }
1039
1040    /// Start a nexus operation
1041    pub fn start_nexus_operation(
1042        &self,
1043        opts: NexusOperationOptions,
1044    ) -> impl CancellableFuture<NexusStartResult> {
1045        let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
1046        let (result_future, unblocker) = WFCommandFut::new();
1047        self.base
1048            .inner
1049            .runtime
1050            .register_unblocker(PendingCommandId::NexusOpComplete(seq), unblocker);
1051        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
1052            CancellableID::NexusOp(seq),
1053            NexusUnblockData {
1054                result_future: result_future.shared(),
1055                schedule_seq: seq,
1056                base_ctx: self.base.clone(),
1057            },
1058            self.base.clone(),
1059        );
1060        self.base
1061            .inner
1062            .runtime
1063            .register_unblocker(PendingCommandId::NexusOpStart(seq), unblocker);
1064        self.base
1065            .inner
1066            .runtime
1067            .host
1068            .push_command(opts.into_command(seq));
1069        cmd
1070    }
1071
1072    /// Create a read-only view of this context.
1073    pub(crate) fn view(&self) -> WorkflowContextView {
1074        self.base.view()
1075    }
1076}
1077
1078impl<W> WorkflowContext<W> {
1079    /// Create a new wf context from a base context and workflow state.
1080    pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
1081        Self {
1082            sync: SyncWorkflowContext {
1083                base,
1084                headers: Rc::new(HashMap::new()),
1085                _phantom: PhantomData,
1086            },
1087            workflow_state,
1088            condition_wakers: Rc::new(RefCell::new(Vec::new())),
1089        }
1090    }
1091
1092    /// Returns a new context with the specified headers set.
1093    pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
1094        Self {
1095            sync: SyncWorkflowContext {
1096                base: self.sync.base.clone(),
1097                headers: Rc::new(headers),
1098                _phantom: PhantomData,
1099            },
1100            workflow_state: self.workflow_state.clone(),
1101            condition_wakers: self.condition_wakers.clone(),
1102        }
1103    }
1104
1105    /// Returns a [`SyncWorkflowContext`] extracted from this context.
1106    pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
1107        self.sync.clone()
1108    }
1109
1110    /// Create a read-only view of this context.
1111    pub(crate) fn view(&self) -> WorkflowContextView {
1112        self.sync.view()
1113    }
1114
1115    // --- Delegated methods from SyncWorkflowContext ---
1116
1117    /// Return the workflow's unique identifier
1118    pub fn workflow_id(&self) -> &str {
1119        self.sync.workflow_id()
1120    }
1121
1122    /// Return the run id of this workflow execution
1123    pub fn run_id(&self) -> &str {
1124        self.sync.run_id()
1125    }
1126
1127    /// Return the namespace the workflow is executing in
1128    pub fn namespace(&self) -> &str {
1129        self.sync.namespace()
1130    }
1131
1132    /// Return the task queue the workflow is executing in
1133    pub fn task_queue(&self) -> &str {
1134        self.sync.task_queue()
1135    }
1136
1137    /// Return the current time according to the workflow (which is not wall-clock time).
1138    pub fn workflow_time(&self) -> Option<SystemTime> {
1139        self.sync.workflow_time()
1140    }
1141
1142    /// Return the length of history so far at this point in the workflow
1143    pub fn history_length(&self) -> u32 {
1144        self.sync.history_length()
1145    }
1146
1147    /// Return the deployment version, if any, as it was when this point in the workflow was first
1148    /// reached. If this code is being executed for the first time, return this Worker's deployment
1149    /// version if it has one.
1150    pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
1151        self.sync.current_deployment_version()
1152    }
1153
1154    /// Return current values for workflow search attributes
1155    pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
1156        self.sync.search_attributes()
1157    }
1158
1159    /// Return the workflow's randomness seed
1160    pub fn random_seed(&self) -> u64 {
1161        self.sync.random_seed()
1162    }
1163
1164    /// Returns true if the current workflow task is happening under replay
1165    pub fn is_replaying(&self) -> bool {
1166        self.sync.is_replaying()
1167    }
1168
1169    /// Returns true if the server suggests this workflow should continue-as-new
1170    pub fn continue_as_new_suggested(&self) -> bool {
1171        self.sync.continue_as_new_suggested()
1172    }
1173
1174    /// Returns true if the workflow's target worker deployment version changed.
1175    ///
1176    /// This experimental signal is intended for workers using worker deployment versioning.
1177    pub fn target_worker_deployment_version_changed(&self) -> bool {
1178        self.sync.target_worker_deployment_version_changed()
1179    }
1180
1181    /// Returns the headers for the current handler invocation (signal, update, query, etc.).
1182    pub fn headers(&self) -> &HashMap<String, Payload> {
1183        self.sync.headers()
1184    }
1185
1186    /// Returns the [PayloadConverter] currently used by the worker running this workflow.
1187    pub fn payload_converter(&self) -> &PayloadConverter {
1188        self.sync.payload_converter()
1189    }
1190
1191    /// Return various information that the workflow was initialized with.
1192    pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
1193        self.sync.workflow_initial_info()
1194    }
1195
1196    /// A future that resolves if/when the workflow is cancelled, with the user provided cause
1197    pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
1198        self.sync.cancelled()
1199    }
1200
1201    /// Request to create a timer
1202    pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
1203        self.sync.timer(opts)
1204    }
1205
1206    /// Request to run an activity
1207    pub fn start_activity<AD: ActivityDefinition>(
1208        &self,
1209        activity: AD,
1210        input: impl Into<AD::Input>,
1211        opts: ActivityOptions,
1212    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
1213    where
1214        AD::Output: TemporalDeserializable,
1215    {
1216        self.sync.start_activity(activity, input, opts)
1217    }
1218
1219    /// Request to run a local activity
1220    pub fn start_local_activity<AD: ActivityDefinition>(
1221        &self,
1222        activity: AD,
1223        input: impl Into<AD::Input>,
1224        opts: LocalActivityOptions,
1225    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
1226    where
1227        AD::Output: TemporalDeserializable,
1228    {
1229        self.sync.start_local_activity(activity, input, opts)
1230    }
1231
1232    /// Start a child workflow. See [SyncWorkflowContext::start_child_workflow] for details.
1233    pub fn start_child_workflow<WD: WorkflowDefinition>(
1234        &self,
1235        workflow: WD,
1236        input: impl Into<WD::Input>,
1237        opts: ChildWorkflowOptions,
1238    ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
1239    where
1240        WD::Output: TemporalDeserializable,
1241    {
1242        self.sync.start_child_workflow(workflow, input, opts)
1243    }
1244
1245    /// Deprecated alias for [`WorkflowContext::start_child_workflow`].
1246    #[deprecated(note = "use `start_child_workflow` instead")]
1247    pub fn child_workflow<WD: WorkflowDefinition>(
1248        &self,
1249        workflow: WD,
1250        input: impl Into<WD::Input>,
1251        opts: ChildWorkflowOptions,
1252    ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
1253    where
1254        WD::Output: TemporalDeserializable,
1255    {
1256        self.start_child_workflow(workflow, input, opts)
1257    }
1258
1259    /// Check (or record) that this workflow history was created with the provided patch
1260    pub fn patched(&self, patch_id: &str) -> bool {
1261        self.sync.patched(patch_id)
1262    }
1263
1264    /// Record that this workflow history was created with the provided patch, and it is being
1265    /// phased out.
1266    pub fn deprecate_patch(&self, patch_id: &str) -> bool {
1267        self.sync.deprecate_patch(patch_id)
1268    }
1269
1270    /// Get a handle to an external workflow. See [SyncWorkflowContext::external_workflow].
1271    pub fn external_workflow(
1272        &self,
1273        workflow_id: impl Into<String>,
1274        run_id: Option<String>,
1275    ) -> ExternalWorkflowHandle {
1276        self.sync.external_workflow(workflow_id, run_id)
1277    }
1278
1279    /// Add or create a set of search attributes
1280    pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1281        self.sync.upsert_search_attributes(attr_iter)
1282    }
1283
1284    /// Add or create a set of memo fields
1285    pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1286        self.sync.upsert_memo(attr_iter)
1287    }
1288
1289    /// Set the current details string for this workflow execution.
1290    ///
1291    /// See [`SyncWorkflowContext::set_current_details`].
1292    pub fn set_current_details(&self, details: impl Into<String>) {
1293        self.sync.set_current_details(details)
1294    }
1295
1296    /// Force a workflow task failure (EX: in order to retry on non-sticky queue)
1297    pub fn force_task_fail(&self, with: impl Into<Box<dyn std::error::Error + Send + Sync>>) {
1298        self.sync.force_task_fail(with)
1299    }
1300
1301    /// Start a nexus operation
1302    pub fn start_nexus_operation(
1303        &self,
1304        opts: NexusOperationOptions,
1305    ) -> impl CancellableFuture<NexusStartResult> {
1306        self.sync.start_nexus_operation(opts)
1307    }
1308
1309    /// Access workflow state immutably via closure.
1310    ///
1311    /// The borrow is scoped to the closure and cannot escape, preventing
1312    /// borrows from being held across await points.
1313    pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
1314        f(&*self.workflow_state.borrow())
1315    }
1316
1317    /// Access workflow state mutably via closure.
1318    ///
1319    /// The borrow is scoped to the closure and cannot escape, preventing
1320    /// borrows from being held across await points.
1321    ///
1322    /// After the mutation, all wakers registered by pending `wait_condition`
1323    /// futures are woken so that waker-based combinators (e.g.
1324    /// `FuturesOrdered`) re-poll them on the next pass.
1325    pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
1326        let result = f(&mut *self.workflow_state.borrow_mut());
1327        let _guard = SdkWakeGuard::new();
1328        for waker in self.condition_wakers.borrow_mut().drain(..) {
1329            waker.wake();
1330        }
1331        self.sync.base.set_state_mutated();
1332        result
1333    }
1334
1335    /// Signal that this workflow should continue as a new workflow execution with the given input and
1336    /// options.
1337    ///
1338    /// This always returns an `Err` which should be propigated
1339    pub fn continue_as_new(
1340        &self,
1341        input: &<W::Run as WorkflowDefinition>::Input,
1342        opts: ContinueAsNewOptions,
1343    ) -> Result<std::convert::Infallible, WorkflowTermination>
1344    where
1345        W: WorkflowImplementation,
1346    {
1347        self.sync.continue_as_new(input, opts)
1348    }
1349
1350    /// Wait for some condition on workflow state to become true, yielding the workflow if not.
1351    ///
1352    /// The condition closure receives an immutable reference to the workflow state,
1353    /// which is borrowed only for the duration of each poll (not across await points).
1354    pub fn wait_condition<'a>(
1355        &'a self,
1356        mut condition: impl FnMut(&W) -> bool + 'a,
1357    ) -> impl FusedFuture<Output = ()> + 'a {
1358        future::poll_fn(move |cx: &mut Context<'_>| {
1359            if condition(&*self.workflow_state.borrow()) {
1360                Poll::Ready(())
1361            } else {
1362                self.condition_wakers.borrow_mut().push(cx.waker().clone());
1363                Poll::Pending
1364            }
1365        })
1366        .fuse()
1367    }
1368}
1369
1370struct WfCtxProtectedDat {
1371    next_timer_sequence_number: u32,
1372    next_activity_sequence_number: u32,
1373    next_child_workflow_sequence_number: u32,
1374    next_cancel_external_wf_sequence_number: u32,
1375    next_signal_external_wf_sequence_number: u32,
1376    next_nexus_op_sequence_number: u32,
1377}
1378
1379impl WfCtxProtectedDat {
1380    fn next_timer_seq(&mut self) -> u32 {
1381        let seq = self.next_timer_sequence_number;
1382        self.next_timer_sequence_number += 1;
1383        seq
1384    }
1385    fn next_activity_seq(&mut self) -> u32 {
1386        let seq = self.next_activity_sequence_number;
1387        self.next_activity_sequence_number += 1;
1388        seq
1389    }
1390    fn next_child_workflow_seq(&mut self) -> u32 {
1391        let seq = self.next_child_workflow_sequence_number;
1392        self.next_child_workflow_sequence_number += 1;
1393        seq
1394    }
1395    fn next_cancel_external_wf_seq(&mut self) -> u32 {
1396        let seq = self.next_cancel_external_wf_sequence_number;
1397        self.next_cancel_external_wf_sequence_number += 1;
1398        seq
1399    }
1400    fn next_signal_external_wf_seq(&mut self) -> u32 {
1401        let seq = self.next_signal_external_wf_sequence_number;
1402        self.next_signal_external_wf_sequence_number += 1;
1403        seq
1404    }
1405    fn next_nexus_op_seq(&mut self) -> u32 {
1406        let seq = self.next_nexus_op_sequence_number;
1407        self.next_nexus_op_sequence_number += 1;
1408        seq
1409    }
1410}
1411
1412#[derive(Clone, Debug, Default)]
1413struct WorkflowContextSharedData {
1414    /// Maps change ids -> resolved status
1415    changes: HashMap<String, bool>,
1416    activation: CoreWorkflowActivation,
1417    search_attributes: SearchAttributes,
1418    random_seed: u64,
1419    /// Current details string, surfaced via the workflow metadata query.
1420    current_details: String,
1421}
1422
1423/// A Future that can be cancelled.
1424/// Used in the prototype SDK for cancelling operations like timers and activities.
1425pub trait CancellableFuture<T>: Future<Output = T> + FusedFuture {
1426    /// Cancel this Future
1427    fn cancel(&self);
1428}
1429
1430/// A Future that can be cancelled with a reason
1431pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
1432    /// Cancel this Future with a reason
1433    fn cancel_with_reason(&self, reason: String);
1434}
1435
1436struct WFCommandFut<T, D> {
1437    _unused: PhantomData<T>,
1438    result_rx: oneshot::Receiver<UnblockEvent>,
1439    other_dat: Option<D>,
1440}
1441impl<T> WFCommandFut<T, ()> {
1442    fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
1443        Self::new_with_dat(())
1444    }
1445}
1446
1447impl<T, D> WFCommandFut<T, D> {
1448    fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
1449        let (tx, rx) = oneshot::channel();
1450        (
1451            Self {
1452                _unused: PhantomData,
1453                result_rx: rx,
1454                other_dat: Some(other_dat),
1455            },
1456            tx,
1457        )
1458    }
1459}
1460
1461impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1462impl<T, D> Future for WFCommandFut<T, D>
1463where
1464    T: Unblockable<OtherDat = D>,
1465{
1466    type Output = T;
1467
1468    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1469        self.result_rx.poll_unpin(cx).map(|x| {
1470            let od = self
1471                .other_dat
1472                .take()
1473                .expect("Other data must exist when resolving command future");
1474            Unblockable::unblock(x.unwrap(), od)
1475        })
1476    }
1477}
1478impl<T, D> FusedFuture for WFCommandFut<T, D>
1479where
1480    T: Unblockable<OtherDat = D>,
1481{
1482    fn is_terminated(&self) -> bool {
1483        self.other_dat.is_none()
1484    }
1485}
1486
1487struct CancellableWFCommandFut<T, D> {
1488    cmd_fut: WFCommandFut<T, D>,
1489    cancellable_id: CancellableID,
1490    base_ctx: BaseWorkflowContext,
1491}
1492impl<T> CancellableWFCommandFut<T, ()> {
1493    fn new(
1494        cancellable_id: CancellableID,
1495        base_ctx: BaseWorkflowContext,
1496    ) -> (Self, oneshot::Sender<UnblockEvent>) {
1497        Self::new_with_dat(cancellable_id, (), base_ctx)
1498    }
1499}
1500impl<T, D> CancellableWFCommandFut<T, D> {
1501    fn new_with_dat(
1502        cancellable_id: CancellableID,
1503        other_dat: D,
1504        base_ctx: BaseWorkflowContext,
1505    ) -> (Self, oneshot::Sender<UnblockEvent>) {
1506        let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
1507        (
1508            Self {
1509                cmd_fut,
1510                cancellable_id,
1511                base_ctx,
1512            },
1513            sender,
1514        )
1515    }
1516}
1517impl<T, D> Unpin for CancellableWFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1518impl<T, D> Future for CancellableWFCommandFut<T, D>
1519where
1520    T: Unblockable<OtherDat = D>,
1521{
1522    type Output = T;
1523
1524    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1525        self.cmd_fut.poll_unpin(cx)
1526    }
1527}
1528impl<T, D> FusedFuture for CancellableWFCommandFut<T, D>
1529where
1530    T: Unblockable<OtherDat = D>,
1531{
1532    fn is_terminated(&self) -> bool {
1533        self.cmd_fut.is_terminated()
1534    }
1535}
1536
1537impl<T, D> CancellableFuture<T> for CancellableWFCommandFut<T, D>
1538where
1539    T: Unblockable<OtherDat = D>,
1540{
1541    fn cancel(&self) {
1542        self.base_ctx.cancel(self.cancellable_id.clone());
1543    }
1544}
1545impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D>
1546where
1547    T: Unblockable<OtherDat = D>,
1548{
1549    fn cancel_with_reason(&self, reason: String) {
1550        self.base_ctx
1551            .cancel(self.cancellable_id.clone().with_reason(reason));
1552    }
1553}
1554
1555struct LATimerBackoffFut {
1556    la_opts: LocalActivityOptions,
1557    activity_type: String,
1558    arguments: Vec<Payload>,
1559    current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
1560    timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
1561    base_ctx: BaseWorkflowContext,
1562    next_attempt: u32,
1563    next_sched_time: Option<prost_types::Timestamp>,
1564    did_cancel: AtomicBool,
1565    terminated: bool,
1566}
1567impl LATimerBackoffFut {
1568    fn new(
1569        activity_type: String,
1570        arguments: Vec<Payload>,
1571        opts: LocalActivityOptions,
1572        base_ctx: BaseWorkflowContext,
1573    ) -> Self {
1574        let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
1575            activity_type.clone(),
1576            arguments.clone(),
1577            opts.clone(),
1578        ));
1579        Self {
1580            la_opts: opts,
1581            activity_type,
1582            arguments,
1583            current_fut,
1584            timer_fut: None,
1585            base_ctx,
1586            next_attempt: 1,
1587            next_sched_time: None,
1588            did_cancel: AtomicBool::new(false),
1589            terminated: false,
1590        }
1591    }
1592}
1593impl Unpin for LATimerBackoffFut {}
1594impl Future for LATimerBackoffFut {
1595    type Output = ActivityResolution;
1596
1597    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1598        // If the timer exists, wait for it first
1599        if let Some(tf) = self.timer_fut.as_mut() {
1600            return match tf.poll_unpin(cx) {
1601                Poll::Ready(tr) => {
1602                    self.timer_fut = None;
1603                    // Schedule next LA if this timer wasn't cancelled
1604                    if let TimerResult::Fired = tr {
1605                        let mut opts = self.la_opts.clone();
1606                        opts.attempt = Some(self.next_attempt);
1607                        opts.original_schedule_time
1608                            .clone_from(&self.next_sched_time);
1609                        self.current_fut =
1610                            Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
1611                                self.activity_type.clone(),
1612                                self.arguments.clone(),
1613                                opts,
1614                            ));
1615                        Poll::Pending
1616                    } else {
1617                        self.terminated = true;
1618                        Poll::Ready(ActivityResolution {
1619                            status: Some(activity_resolution::Status::Cancelled(Cancellation {
1620                                failure: Some(Failure {
1621                                    message: "Activity cancelled".to_owned(),
1622                                    failure_info: Some(FailureInfo::CanceledFailureInfo(
1623                                        CanceledFailureInfo::default(),
1624                                    )),
1625                                    ..Default::default()
1626                                }),
1627                            })),
1628                        })
1629                    }
1630                }
1631                Poll::Pending => Poll::Pending,
1632            };
1633        }
1634        let poll_res = self.current_fut.poll_unpin(cx);
1635        if let Poll::Ready(ref r) = poll_res
1636            && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
1637        {
1638            // If we've already said we want to cancel, don't schedule the backoff timer. Just
1639            // return cancel status. This can happen if cancel comes after the LA says it wants
1640            // to back off but before we have scheduled the timer.
1641            if self.did_cancel.load(Ordering::Acquire) {
1642                self.terminated = true;
1643                return Poll::Ready(ActivityResolution {
1644                    status: Some(activity_resolution::Status::Cancelled(Cancellation {
1645                        failure: Some(Failure {
1646                            message: "Activity cancelled".to_owned(),
1647                            failure_info: Some(FailureInfo::CanceledFailureInfo(
1648                                CanceledFailureInfo::default(),
1649                            )),
1650                            ..Default::default()
1651                        }),
1652                    })),
1653                });
1654            }
1655
1656            let timer_f = self.base_ctx.timer::<Duration>(
1657                b.backoff_duration
1658                    .expect("Duration is set")
1659                    .try_into()
1660                    .expect("duration converts ok"),
1661            );
1662            self.timer_fut = Some(Box::pin(timer_f));
1663            self.next_attempt = b.attempt;
1664            self.next_sched_time.clone_from(&b.original_schedule_time);
1665            return Poll::Pending;
1666        }
1667        if poll_res.is_ready() {
1668            self.terminated = true;
1669        }
1670        poll_res
1671    }
1672}
1673impl FusedFuture for LATimerBackoffFut {
1674    fn is_terminated(&self) -> bool {
1675        self.terminated
1676    }
1677}
1678impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
1679    fn cancel(&self) {
1680        self.did_cancel.store(true, Ordering::Release);
1681        if let Some(tf) = self.timer_fut.as_ref() {
1682            tf.cancel();
1683        }
1684        self.current_fut.cancel();
1685    }
1686}
1687
1688/// Future for activity results. Either an immediate error or a running activity.
1689enum ActivityFut<F, Output> {
1690    /// Immediate error (e.g., input serialization failure). Resolves on first poll.
1691    Errored {
1692        error: Option<Box<ActivityExecutionError>>,
1693        _phantom: PhantomData<Output>,
1694    },
1695    /// Running activity that will deserialize output on completion.
1696    Running {
1697        inner: F,
1698        data_converter: DataConverter,
1699        _phantom: PhantomData<Output>,
1700    },
1701    Terminated,
1702}
1703
1704impl<F, Output> ActivityFut<F, Output> {
1705    fn eager(err: ActivityExecutionError) -> Self {
1706        Self::Errored {
1707            error: Some(Box::new(err)),
1708            _phantom: PhantomData,
1709        }
1710    }
1711
1712    fn running(inner: F, data_converter: DataConverter) -> Self {
1713        Self::Running {
1714            inner,
1715            data_converter,
1716            _phantom: PhantomData,
1717        }
1718    }
1719}
1720
1721impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
1722
1723impl<F, Output> Future for ActivityFut<F, Output>
1724where
1725    F: Future<Output = ActivityResolution> + Unpin,
1726    Output: TemporalDeserializable + 'static,
1727{
1728    type Output = Result<Output, ActivityExecutionError>;
1729
1730    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1731        let this = self.get_mut();
1732        let poll = match this {
1733            ActivityFut::Errored { error, .. } => {
1734                Poll::Ready(Err(*error.take().expect("polled after completion")))
1735            }
1736            ActivityFut::Running {
1737                inner,
1738                data_converter,
1739                ..
1740            } => match Pin::new(inner).poll(cx) {
1741                Poll::Pending => Poll::Pending,
1742                Poll::Ready(resolution) => Poll::Ready({
1743                    let status = resolution.status.ok_or_else(|| {
1744                        data_converter
1745                            .to_error(
1746                                &SerializationContextData::Workflow,
1747                                Failure {
1748                                    message: "Activity completed without a status".to_string(),
1749                                    ..Default::default()
1750                                },
1751                                ActivityExecutionDecodeHint { cancelled: false },
1752                            )
1753                            .expect("synthetic activity failure should decode")
1754                    })?;
1755
1756                    match status {
1757                        activity_resolution::Status::Completed(success) => {
1758                            let payload = success.result.unwrap_or_default();
1759                            let ctx = SerializationContext {
1760                                data: &SerializationContextData::Workflow,
1761                                converter: data_converter.payload_converter(),
1762                            };
1763                            data_converter
1764                                .payload_converter()
1765                                .from_payload::<Output>(&ctx, payload)
1766                                .map_err(ActivityExecutionError::Serialization)
1767                        }
1768                        activity_resolution::Status::Failed(f) => Err(data_converter.to_error(
1769                            &SerializationContextData::Workflow,
1770                            f.failure.unwrap_or_default(),
1771                            ActivityExecutionDecodeHint { cancelled: false },
1772                        )?),
1773                        activity_resolution::Status::Cancelled(c) => Err(data_converter.to_error(
1774                            &SerializationContextData::Workflow,
1775                            c.failure.unwrap_or_default(),
1776                            ActivityExecutionDecodeHint { cancelled: true },
1777                        )?),
1778                        activity_resolution::Status::Backoff(_) => {
1779                            panic!("DoBackoff should be handled by LATimerBackoffFut")
1780                        }
1781                    }
1782                }),
1783            },
1784            ActivityFut::Terminated => panic!("polled after termination"),
1785        };
1786        if poll.is_ready() {
1787            *this = ActivityFut::Terminated;
1788        }
1789        poll
1790    }
1791}
1792
1793impl<F, Output> FusedFuture for ActivityFut<F, Output>
1794where
1795    F: Future<Output = ActivityResolution> + Unpin,
1796    Output: TemporalDeserializable + 'static,
1797{
1798    fn is_terminated(&self) -> bool {
1799        matches!(self, ActivityFut::Terminated)
1800    }
1801}
1802
1803impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
1804where
1805    F: CancellableFuture<ActivityResolution> + Unpin,
1806    Output: TemporalDeserializable + 'static,
1807{
1808    fn cancel(&self) {
1809        if let ActivityFut::Running { inner, .. } = self {
1810            inner.cancel()
1811        }
1812    }
1813}
1814
1815pub(crate) struct ChildWfCommon {
1816    workflow_id: String,
1817    child_seq: u32,
1818    result_future: CancellableWFCommandFut<ChildWorkflowResult, ()>,
1819    base_ctx: BaseWorkflowContext,
1820    data_converter: DataConverter,
1821}
1822
1823/// Child workflow in pending state. Internal type used during the start handshake;
1824/// `ChildWorkflowStartFut` converts this into `Result<StartedChildWorkflow, _>` before
1825/// the caller sees it.
1826#[derive(derive_more::Debug)]
1827pub(crate) struct PendingChildWorkflow<WD: WorkflowDefinition> {
1828    pub(crate) status: ChildWorkflowStartStatus,
1829    #[debug(skip)]
1830    pub(crate) common: ChildWfCommon,
1831    pub(crate) _phantom: PhantomData<WD>,
1832}
1833
1834/// Child workflow in started state.
1835#[derive(derive_more::Debug)]
1836pub struct StartedChildWorkflow<WD: WorkflowDefinition> {
1837    /// Run ID of the child workflow
1838    pub run_id: String,
1839    #[debug(skip)]
1840    common: ChildWfCommon,
1841    _phantom: PhantomData<WD>,
1842}
1843
1844/// Future for child workflow results. Wraps the raw result future and deserializes
1845/// the output on completion.
1846enum ChildWorkflowFut<F, Output> {
1847    Running {
1848        inner: F,
1849        data_converter: DataConverter,
1850        _phantom: PhantomData<Output>,
1851    },
1852    Terminated,
1853}
1854
1855impl<F, Output> Unpin for ChildWorkflowFut<F, Output> where F: Unpin {}
1856
1857impl<F, Output> Future for ChildWorkflowFut<F, Output>
1858where
1859    F: Future<Output = ChildWorkflowResult> + Unpin,
1860    Output: TemporalDeserializable + 'static,
1861{
1862    type Output = Result<Output, ChildWorkflowExecutionError>;
1863
1864    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1865        let this = self.get_mut();
1866        let poll = match this {
1867            ChildWorkflowFut::Running {
1868                inner,
1869                data_converter,
1870                ..
1871            } => match Pin::new(inner).poll(cx) {
1872                Poll::Pending => Poll::Pending,
1873                Poll::Ready(result) => Poll::Ready({
1874                    let status = result.status.ok_or_else(|| {
1875                        data_converter
1876                            .to_error(
1877                                &SerializationContextData::Workflow,
1878                                Failure {
1879                                    message: "Child workflow completed without a status"
1880                                        .to_string(),
1881                                    ..Default::default()
1882                                },
1883                                ChildWorkflowExecutionDecodeHint,
1884                            )
1885                            .expect("synthetic child workflow failure should decode")
1886                    })?;
1887                    match status {
1888                        child_workflow_result::Status::Completed(success) => {
1889                            let payloads = success.result.into_iter().collect();
1890                            let ctx = SerializationContext {
1891                                data: &SerializationContextData::Workflow,
1892                                converter: data_converter.payload_converter(),
1893                            };
1894                            data_converter
1895                                .payload_converter()
1896                                .from_payloads::<Output>(&ctx, payloads)
1897                                .map_err(ChildWorkflowExecutionError::Serialization)
1898                        }
1899                        child_workflow_result::Status::Failed(f) => Err(data_converter.to_error(
1900                            &SerializationContextData::Workflow,
1901                            f.failure.unwrap_or_default(),
1902                            ChildWorkflowExecutionDecodeHint,
1903                        )?),
1904                        child_workflow_result::Status::Cancelled(c) => Err(data_converter
1905                            .to_error(
1906                                &SerializationContextData::Workflow,
1907                                c.failure.unwrap_or_default(),
1908                                ChildWorkflowExecutionDecodeHint,
1909                            )?),
1910                    }
1911                }),
1912            },
1913            ChildWorkflowFut::Terminated => panic!("polled after termination"),
1914        };
1915        if poll.is_ready() {
1916            *this = ChildWorkflowFut::Terminated;
1917        }
1918        poll
1919    }
1920}
1921
1922impl<F, Output> FusedFuture for ChildWorkflowFut<F, Output>
1923where
1924    F: Future<Output = ChildWorkflowResult> + Unpin,
1925    Output: TemporalDeserializable + 'static,
1926{
1927    fn is_terminated(&self) -> bool {
1928        matches!(self, ChildWorkflowFut::Terminated)
1929    }
1930}
1931
1932impl<F, Output> CancellableFutureWithReason<Result<Output, ChildWorkflowExecutionError>>
1933    for ChildWorkflowFut<F, Output>
1934where
1935    F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
1936    Output: TemporalDeserializable + 'static,
1937{
1938    fn cancel_with_reason(&self, reason: String) {
1939        if let ChildWorkflowFut::Running { inner, .. } = self {
1940            inner.cancel_with_reason(reason)
1941        }
1942    }
1943}
1944
1945impl<F, Output> CancellableFuture<Result<Output, ChildWorkflowExecutionError>>
1946    for ChildWorkflowFut<F, Output>
1947where
1948    F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
1949    Output: TemporalDeserializable + 'static,
1950{
1951    fn cancel(&self) {
1952        if let ChildWorkflowFut::Running { inner, .. } = self {
1953            inner.cancel()
1954        }
1955    }
1956}
1957
1958/// Wrapper future for starting a child workflow. Mirrors `ActivityFut` to allow returning
1959/// serialization errors eagerly.
1960enum ChildWorkflowStartFut<F, WD: WorkflowDefinition> {
1961    /// Immediate error (e.g., input serialization failure). Resolves on first poll.
1962    Errored {
1963        error: Option<Box<ChildWorkflowStartError>>,
1964        _phantom: PhantomData<WD>,
1965    },
1966    Running(F),
1967    Terminated,
1968}
1969
1970impl<F, WD: WorkflowDefinition> ChildWorkflowStartFut<F, WD> {
1971    fn eager(err: ChildWorkflowStartError) -> Self {
1972        Self::Errored {
1973            error: Some(Box::new(err)),
1974            _phantom: PhantomData,
1975        }
1976    }
1977}
1978
1979impl<F, WD: WorkflowDefinition> Unpin for ChildWorkflowStartFut<F, WD> where F: Unpin {}
1980
1981impl<F, WD> Future for ChildWorkflowStartFut<F, WD>
1982where
1983    F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
1984    WD: WorkflowDefinition,
1985{
1986    type Output = Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>;
1987
1988    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1989        let this = self.get_mut();
1990        let poll = match this {
1991            ChildWorkflowStartFut::Errored { error, .. } => {
1992                Poll::Ready(Err(*error.take().expect("polled after completion")))
1993            }
1994            ChildWorkflowStartFut::Running(inner) => match Pin::new(inner).poll(cx) {
1995                Poll::Pending => Poll::Pending,
1996                Poll::Ready(pending) => Poll::Ready(match pending.status {
1997                    ChildWorkflowStartStatus::Succeeded(s) => Ok(StartedChildWorkflow {
1998                        run_id: s.run_id,
1999                        common: pending.common,
2000                        _phantom: PhantomData,
2001                    }),
2002                    ChildWorkflowStartStatus::Failed(f) => {
2003                        Err(ChildWorkflowStartError::StartFailed {
2004                            workflow_id: f.workflow_id,
2005                            workflow_type: f.workflow_type,
2006                            cause: StartChildWorkflowExecutionFailedCause::try_from(f.cause)
2007                                .unwrap_or(StartChildWorkflowExecutionFailedCause::Unspecified),
2008                        })
2009                    }
2010                    ChildWorkflowStartStatus::Cancelled(c) => {
2011                        Err(pending.common.data_converter.to_error(
2012                            &SerializationContextData::Workflow,
2013                            c.failure.unwrap_or_default(),
2014                            ChildWorkflowStartDecodeHint,
2015                        )?)
2016                    }
2017                }),
2018            },
2019            ChildWorkflowStartFut::Terminated => panic!("polled after termination"),
2020        };
2021        if poll.is_ready() {
2022            *this = ChildWorkflowStartFut::Terminated;
2023        }
2024        poll
2025    }
2026}
2027
2028impl<F, WD> FusedFuture for ChildWorkflowStartFut<F, WD>
2029where
2030    F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
2031    WD: WorkflowDefinition,
2032{
2033    fn is_terminated(&self) -> bool {
2034        matches!(self, ChildWorkflowStartFut::Terminated)
2035    }
2036}
2037
2038impl<F, WD> CancellableFuture<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
2039    for ChildWorkflowStartFut<F, WD>
2040where
2041    F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
2042    WD: WorkflowDefinition,
2043{
2044    fn cancel(&self) {
2045        if let ChildWorkflowStartFut::Running(inner) = self {
2046            inner.cancel()
2047        }
2048    }
2049}
2050
2051impl<F, WD> CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowStartError>>
2052    for ChildWorkflowStartFut<F, WD>
2053where
2054    F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
2055    WD: WorkflowDefinition,
2056{
2057    fn cancel_with_reason(&self, reason: String) {
2058        if let ChildWorkflowStartFut::Running(inner) = self {
2059            inner.cancel_with_reason(reason)
2060        }
2061    }
2062}
2063
2064/// Wrapper future for signaling a child workflow. Allows returning serialization errors
2065/// eagerly instead of panicking.
2066enum SignalChildFut<F> {
2067    /// Immediate error (e.g., signal input serialization failure). Resolves on first poll.
2068    Errored {
2069        error: Option<WorkflowSignalError>,
2070    },
2071    Running {
2072        inner: F,
2073        data_converter: DataConverter,
2074    },
2075    Terminated,
2076}
2077
2078impl<F> SignalChildFut<F> {
2079    fn eager(err: WorkflowSignalError) -> Self {
2080        Self::Errored { error: Some(err) }
2081    }
2082}
2083
2084impl<F> Unpin for SignalChildFut<F> where F: Unpin {}
2085
2086impl<F> Future for SignalChildFut<F>
2087where
2088    F: Future<Output = SignalExternalWfResult> + Unpin,
2089{
2090    type Output = Result<(), WorkflowSignalError>;
2091
2092    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2093        let this = self.get_mut();
2094        let poll = match this {
2095            SignalChildFut::Errored { error } => {
2096                Poll::Ready(Err(error.take().expect("polled after completion")))
2097            }
2098            SignalChildFut::Running {
2099                inner,
2100                data_converter,
2101            } => match Pin::new(inner).poll(cx) {
2102                Poll::Pending => Poll::Pending,
2103                Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
2104                Poll::Ready(Err(failure)) => Poll::Ready(Err(data_converter.to_error(
2105                    &SerializationContextData::Workflow,
2106                    failure,
2107                    WorkflowSignalDecodeHint,
2108                )?)),
2109            },
2110            SignalChildFut::Terminated => panic!("polled after termination"),
2111        };
2112        if poll.is_ready() {
2113            *this = SignalChildFut::Terminated;
2114        }
2115        poll
2116    }
2117}
2118
2119impl<F> FusedFuture for SignalChildFut<F>
2120where
2121    F: Future<Output = SignalExternalWfResult> + Unpin,
2122{
2123    fn is_terminated(&self) -> bool {
2124        matches!(self, SignalChildFut::Terminated)
2125    }
2126}
2127
2128impl<F> CancellableFuture<Result<(), WorkflowSignalError>> for SignalChildFut<F>
2129where
2130    F: CancellableFuture<SignalExternalWfResult> + Unpin,
2131{
2132    fn cancel(&self) {
2133        if let SignalChildFut::Running { inner, .. } = self {
2134            inner.cancel()
2135        }
2136    }
2137}
2138
2139impl<WD: WorkflowDefinition> StartedChildWorkflow<WD>
2140where
2141    WD::Output: TemporalDeserializable + 'static,
2142{
2143    /// Consumes self and returns a future that deserializes the child workflow result
2144    /// into `WD::Output`.
2145    pub fn result(
2146        self,
2147    ) -> impl CancellableFutureWithReason<Result<WD::Output, ChildWorkflowExecutionError>> {
2148        ChildWorkflowFut::Running {
2149            inner: self.common.result_future,
2150            data_converter: self.common.data_converter,
2151            _phantom: PhantomData,
2152        }
2153    }
2154
2155    /// Cancel the child workflow
2156    pub fn cancel(&self, reason: String) {
2157        self.common.base_ctx.inner.runtime.host.push_command(
2158            workflow_command::Variant::CancelChildWorkflowExecution(CancelChildWorkflowExecution {
2159                child_workflow_seq: self.common.child_seq,
2160                reason,
2161            })
2162            .into(),
2163        );
2164    }
2165
2166    /// Send a typed signal to the child workflow.
2167    pub fn signal<S: SignalDefinition<Workflow = WD>>(
2168        &self,
2169        signal: S,
2170        input: S::Input,
2171    ) -> impl CancellableFuture<Result<(), WorkflowSignalError>> + 'static {
2172        let payload_converter = self.common.data_converter.payload_converter();
2173        let ctx = SerializationContext {
2174            data: &SerializationContextData::Workflow,
2175            converter: payload_converter,
2176        };
2177        let payloads = match payload_converter.to_payloads(&ctx, &input) {
2178            Ok(p) => p,
2179            Err(e) => {
2180                return SignalChildFut::eager(e.into());
2181            }
2182        };
2183        let signal = Signal::new(S::name(&signal), payloads);
2184        let target = signal_external_workflow_execution::Target::ChildWorkflowId(
2185            self.common.workflow_id.clone(),
2186        );
2187        SignalChildFut::Running {
2188            inner: self.common.base_ctx.clone().send_signal_wf(target, signal),
2189            data_converter: self.common.data_converter.clone(),
2190        }
2191    }
2192}
2193
2194/// Handle to an external workflow for sending signals or requesting cancellation.
2195///
2196/// Obtained via [`SyncWorkflowContext::external_workflow`] or
2197/// [`WorkflowContext::external_workflow`].
2198#[derive(derive_more::Debug)]
2199pub struct ExternalWorkflowHandle {
2200    workflow_id: String,
2201    run_id: Option<String>,
2202    namespace: String,
2203    #[debug(skip)]
2204    base_ctx: BaseWorkflowContext,
2205}
2206
2207impl ExternalWorkflowHandle {
2208    /// The workflow ID of the external workflow.
2209    pub fn workflow_id(&self) -> &str {
2210        &self.workflow_id
2211    }
2212
2213    /// The run ID of the external workflow, or `None` if targeting the latest run.
2214    pub fn run_id(&self) -> Option<&str> {
2215        self.run_id.as_deref()
2216    }
2217
2218    /// Send a signal to the external workflow.
2219    pub fn signal<S: SignalDefinition>(
2220        &self,
2221        signal: S,
2222        input: S::Input,
2223    ) -> impl CancellableFuture<Result<(), WorkflowSignalError>> + 'static {
2224        let payload_converter = self.base_ctx.data_converter().payload_converter();
2225        let ctx = SerializationContext {
2226            data: &SerializationContextData::Workflow,
2227            converter: payload_converter,
2228        };
2229        let payloads = match payload_converter.to_payloads(&ctx, &input) {
2230            Ok(p) => p,
2231            Err(e) => {
2232                return SignalChildFut::eager(e.into());
2233            }
2234        };
2235        let signal = Signal::new(S::name(&signal), payloads);
2236        let target = signal_external_workflow_execution::Target::WorkflowExecution(
2237            NamespacedWorkflowExecution {
2238                namespace: self.namespace.clone(),
2239                workflow_id: self.workflow_id.clone(),
2240                run_id: self.run_id.clone().unwrap_or_default(),
2241            },
2242        );
2243        SignalChildFut::Running {
2244            inner: self.base_ctx.clone().send_signal_wf(target, signal),
2245            data_converter: self.base_ctx.data_converter().clone(),
2246        }
2247    }
2248
2249    /// Request cancellation of the external workflow.
2250    pub fn cancel(
2251        &self,
2252        reason: Option<String>,
2253    ) -> impl FusedFuture<Output = CancelExternalWfResult> {
2254        let seq = self
2255            .base_ctx
2256            .inner
2257            .seq_nums
2258            .borrow_mut()
2259            .next_cancel_external_wf_seq();
2260        let (cmd, unblocker) = WFCommandFut::new();
2261        self.base_ctx
2262            .inner
2263            .runtime
2264            .register_unblocker(PendingCommandId::CancelExternal(seq), unblocker);
2265        self.base_ctx.inner.runtime.host.push_command(
2266            workflow_command::Variant::RequestCancelExternalWorkflowExecution(
2267                RequestCancelExternalWorkflowExecution {
2268                    seq,
2269                    workflow_execution: Some(NamespacedWorkflowExecution {
2270                        namespace: self.namespace.clone(),
2271                        workflow_id: self.workflow_id.clone(),
2272                        run_id: self.run_id.clone().unwrap_or_default(),
2273                    }),
2274                    reason: reason.unwrap_or_default(),
2275                },
2276            )
2277            .into(),
2278        );
2279        cmd
2280    }
2281}
2282
2283#[derive(derive_more::Debug)]
2284#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
2285pub struct StartedNexusOperation {
2286    /// The operation token, if the operation started asynchronously
2287    pub operation_token: Option<String>,
2288    pub(crate) unblock_dat: NexusUnblockData,
2289}
2290
2291pub(crate) struct NexusUnblockData {
2292    result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
2293    schedule_seq: u32,
2294    base_ctx: BaseWorkflowContext,
2295}
2296
2297impl StartedNexusOperation {
2298    pub async fn result(&self) -> NexusOperationResult {
2299        // The result future is a `Shared`; poll it inside an `SdkWakeGuard` (via
2300        // `SdkGuardedFuture`) so its internal waker machinery isn't mistaken for a non-SDK wake on
2301        // replay (which would fail the workflow task with TMPRL1100).
2302        SdkGuardedFuture(self.unblock_dat.result_future.clone()).await
2303    }
2304
2305    pub fn cancel(&self) {
2306        self.unblock_dat
2307            .base_ctx
2308            .cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
2309    }
2310}
2311
2312#[cfg(test)]
2313mod tests {
2314    use super::*;
2315    use std::collections::HashMap;
2316    use temporalio_common_wasm::{
2317        data_converters::{TemporalDeserializable, TemporalSerializable},
2318        protos::{
2319            coresdk::{
2320                AsJsonPayloadExt, common::VersioningIntent as ProtoVersioningIntent,
2321                workflow_commands::WorkflowCommand,
2322            },
2323            temporal::api::{
2324                common::v1::{Payload, RetryPolicy},
2325                enums::v1::ContinueAsNewVersioningBehavior as ProtoContinueAsNewVersioningBehavior,
2326            },
2327        },
2328    };
2329    use temporalio_macros::{workflow, workflow_methods};
2330
2331    #[derive(Default)]
2332    struct NoopHost;
2333
2334    impl WorkflowHost for NoopHost {
2335        fn set_current_details(&self, _details: String) {}
2336        fn push_command(&self, _command: WorkflowCommand) {}
2337    }
2338
2339    #[workflow]
2340    #[derive(Default)]
2341    struct TestWorkflow;
2342
2343    #[workflow_methods]
2344    impl TestWorkflow {
2345        #[run]
2346        async fn run(_ctx: &mut WorkflowContext<Self>, _input: u8) -> crate::WorkflowResult<()> {
2347            unreachable!("test workflow run should not be polled")
2348        }
2349    }
2350
2351    fn test_context() -> WorkflowContext<TestWorkflow> {
2352        let init = InitializeWorkflow {
2353            workflow_type: TestWorkflow.name().to_string(),
2354            ..Default::default()
2355        };
2356        let base = BaseWorkflowContext::new(
2357            "default".to_string(),
2358            "orig-task-queue".to_string(),
2359            "run-id".to_string(),
2360            init,
2361            DataConverter::default(),
2362            Rc::new(NoopHost),
2363        );
2364        WorkflowContext::from_base(base, Rc::new(RefCell::new(TestWorkflow)))
2365    }
2366
2367    #[test]
2368    fn workflow_context_continue_as_new_serializes_input_and_defaults() {
2369        let ctx = test_context();
2370
2371        let termination = ctx
2372            .continue_as_new(&7, ContinueAsNewOptions::default())
2373            .expect_err("continue_as_new should terminate the workflow");
2374        assert!(
2375            matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2376            "expected continue-as-new termination, got {termination:?}"
2377        );
2378        let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2379            unreachable!()
2380        };
2381
2382        assert_eq!(
2383            *cmd,
2384            crate::runtime::types::ContinueAsNewRequest {
2385                workflow_type: TestWorkflow.name().to_string(),
2386                task_queue: String::new(),
2387                arguments: vec![7u8.as_json_payload().unwrap()],
2388                workflow_run_timeout: None,
2389                workflow_task_timeout: None,
2390                backoff_start_interval: None,
2391                memo: HashMap::new(),
2392                headers: HashMap::new(),
2393                search_attributes: None,
2394                retry_policy: None,
2395                versioning_intent: ProtoVersioningIntent::Unspecified.into(),
2396                initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::Unspecified
2397                    .into(),
2398            }
2399        );
2400    }
2401
2402    #[test]
2403    fn sync_workflow_context_continue_as_new_applies_options() {
2404        let ctx = test_context();
2405        let sync = ctx.sync_context();
2406        let mut memo = HashMap::new();
2407        memo.insert(
2408            "memo-key".to_string(),
2409            Payload::from(b"memo-value".as_slice()),
2410        );
2411        let mut headers = HashMap::new();
2412        headers.insert(
2413            "header-key".to_string(),
2414            Payload::from(b"header-value".as_slice()),
2415        );
2416        let mut search_attributes = SearchAttributes::default();
2417        search_attributes.indexed_fields.insert(
2418            "CustomKeywordField".to_string(),
2419            Payload::from(b"value".as_slice()),
2420        );
2421
2422        let termination = sync
2423            .continue_as_new(
2424                &11,
2425                ContinueAsNewOptions {
2426                    workflow_type: Some("next-workflow".to_string()),
2427                    task_queue: Some("next-task-queue".to_string()),
2428                    run_timeout: Some(Duration::from_secs(10)),
2429                    task_timeout: Some(Duration::from_secs(3)),
2430                    backoff_start_interval: Some(Duration::from_secs(4)),
2431                    memo: Some(memo.clone()),
2432                    headers: Some(headers.clone()),
2433                    search_attributes: Some(search_attributes.clone()),
2434                    retry_policy: Some(RetryPolicy {
2435                        maximum_attempts: 5,
2436                        ..Default::default()
2437                    }),
2438                    versioning_intent: Some(ProtoVersioningIntent::Compatible),
2439                    initial_versioning_behavior: Some(
2440                        ContinueAsNewVersioningBehavior::UseRampingVersion,
2441                    ),
2442                },
2443            )
2444            .expect_err("continue_as_new should terminate the workflow");
2445        assert!(
2446            matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2447            "expected continue-as-new termination, got {termination:?}"
2448        );
2449        let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2450            unreachable!()
2451        };
2452
2453        assert_eq!(
2454            *cmd,
2455            crate::runtime::types::ContinueAsNewRequest {
2456                workflow_type: "next-workflow".to_string(),
2457                task_queue: "next-task-queue".to_string(),
2458                arguments: vec![11u8.as_json_payload().unwrap()],
2459                workflow_run_timeout: Some(Duration::from_secs(10).try_into().unwrap()),
2460                workflow_task_timeout: Some(Duration::from_secs(3).try_into().unwrap()),
2461                backoff_start_interval: Some(Duration::from_secs(4).try_into().unwrap()),
2462                memo,
2463                headers,
2464                search_attributes: Some(search_attributes),
2465                retry_policy: Some(RetryPolicy {
2466                    maximum_attempts: 5,
2467                    ..Default::default()
2468                }),
2469                versioning_intent: ProtoVersioningIntent::Compatible.into(),
2470                initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::UseRampingVersion
2471                    as i32,
2472            }
2473        );
2474    }
2475
2476    #[test]
2477    fn continue_as_new_preserves_explicit_empty_search_attributes() {
2478        let ctx = test_context();
2479        let sync = ctx.sync_context();
2480
2481        let termination = sync
2482            .continue_as_new(
2483                &11,
2484                ContinueAsNewOptions {
2485                    search_attributes: Some(SearchAttributes::default()),
2486                    ..Default::default()
2487                },
2488            )
2489            .expect_err("continue_as_new should terminate the workflow");
2490        let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2491            unreachable!()
2492        };
2493
2494        assert_eq!(cmd.search_attributes, Some(SearchAttributes::default()));
2495    }
2496
2497    #[test]
2498    fn workflow_context_continue_as_new_applies_auto_upgrade_versioning_behavior() {
2499        let ctx = test_context();
2500
2501        let termination = ctx
2502            .continue_as_new(
2503                &13,
2504                ContinueAsNewOptions {
2505                    initial_versioning_behavior: Some(ContinueAsNewVersioningBehavior::AutoUpgrade),
2506                    ..Default::default()
2507                },
2508            )
2509            .expect_err("continue_as_new should terminate the workflow");
2510        let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2511            unreachable!()
2512        };
2513
2514        assert_eq!(
2515            cmd.initial_versioning_behavior,
2516            ProtoContinueAsNewVersioningBehavior::AutoUpgrade as i32
2517        );
2518    }
2519
2520    #[test]
2521    fn continue_as_new_reports_serialization_errors() {
2522        #[derive(Debug)]
2523        struct FailingInput;
2524
2525        impl TemporalSerializable for FailingInput {
2526            fn to_payload(
2527                &self,
2528                _ctx: &temporalio_common_wasm::data_converters::SerializationContext<'_>,
2529            ) -> Result<Payload, temporalio_common_wasm::data_converters::PayloadConversionError>
2530            {
2531                Err(
2532                    temporalio_common_wasm::data_converters::PayloadConversionError::EncodingError(
2533                        std::io::Error::other("serialization failure").into(),
2534                    ),
2535                )
2536            }
2537        }
2538
2539        impl TemporalDeserializable for FailingInput {
2540            fn from_payload(
2541                _ctx: &temporalio_common_wasm::data_converters::SerializationContext<'_>,
2542                _payload: Payload,
2543            ) -> Result<Self, temporalio_common_wasm::data_converters::PayloadConversionError>
2544            {
2545                unreachable!("test input is only serialized")
2546            }
2547        }
2548
2549        #[workflow]
2550        #[derive(Default)]
2551        struct FailingWorkflow;
2552
2553        #[workflow_methods]
2554        impl FailingWorkflow {
2555            #[run]
2556            async fn run(
2557                _ctx: &mut WorkflowContext<Self>,
2558                _input: FailingInput,
2559            ) -> crate::WorkflowResult<()> {
2560                unreachable!("test workflow run should not be polled")
2561            }
2562        }
2563
2564        let init = InitializeWorkflow {
2565            workflow_type: "failing-workflow".to_string(),
2566            ..Default::default()
2567        };
2568        let base = BaseWorkflowContext::new(
2569            "default".to_string(),
2570            "orig-task-queue".to_string(),
2571            "run-id".to_string(),
2572            init,
2573            DataConverter::default(),
2574            Rc::new(NoopHost),
2575        );
2576        let ctx = WorkflowContext::from_base(base, Rc::new(RefCell::new(FailingWorkflow)));
2577
2578        let err = ctx
2579            .continue_as_new(&FailingInput, ContinueAsNewOptions::default())
2580            .expect_err("serialization errors should be surfaced");
2581
2582        let WorkflowTermination::Failed(err) = err else {
2583            panic!("expected failed termination, got {err:?}");
2584        };
2585        assert_eq!(err.to_string(), "Encoding error: serialization failure");
2586    }
2587}