Skip to main content

temporalio_sdk/
workflow_context.rs

1mod options;
2
3pub use options::{
4    ActivityCloseTimeouts, ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions,
5    LocalActivityOptions, NexusOperationOptions, Signal, SignalData, TimerOptions,
6};
7pub use temporalio_common::protos::coresdk::child_workflow::StartChildWorkflowExecutionFailedCause;
8
9use crate::{
10    CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
11    CommandSubscribeChildWorkflowCompletion, NexusStartResult, RustWfCmd, SignalExternalWfResult,
12    SupportsCancelReason, TimerResult, UnblockEvent, Unblockable, WorkflowTermination,
13    workflow_context::options::IntoWorkflowCommand, workflow_executor::SdkWakeGuard,
14};
15use futures_util::{
16    FutureExt,
17    future::{FusedFuture, Shared},
18    task::Context,
19};
20use std::{
21    cell::{Cell, Ref, RefCell},
22    collections::HashMap,
23    future::{self, Future},
24    marker::PhantomData,
25    ops::{Deref, DerefMut},
26    pin::Pin,
27    rc::Rc,
28    sync::{
29        atomic::{AtomicBool, Ordering},
30        mpsc::{Receiver, Sender},
31    },
32    task::{Poll, Waker},
33    time::{Duration, SystemTime},
34};
35use temporalio_common::{
36    ActivityDefinition, SignalDefinition, WorkflowDefinition,
37    data_converters::{
38        GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
39        SerializationContextData, TemporalDeserializable,
40    },
41    protos::{
42        coresdk::{
43            activity_result::{ActivityResolution, activity_resolution},
44            child_workflow::ChildWorkflowResult,
45            common::NamespacedWorkflowExecution,
46            nexus::NexusOperationResult,
47            workflow_activation::{
48                InitializeWorkflow,
49                resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
50            },
51            workflow_commands::{
52                CancelChildWorkflowExecution, ModifyWorkflowProperties,
53                RequestCancelExternalWorkflowExecution, SetPatchMarker,
54                SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
55                WorkflowCommand, signal_external_workflow_execution as sig_we, workflow_command,
56            },
57        },
58        temporal::api::{
59            common::v1::{Memo, Payload, SearchAttributes},
60            failure::v1::Failure,
61            sdk::v1::UserMetadata,
62        },
63    },
64    worker::WorkerDeploymentVersion,
65};
66use tokio::sync::{oneshot, watch};
67
68/// Non-generic base context containing all workflow execution infrastructure.
69///
70/// This is used internally by futures and commands that don't need typed workflow state.
71#[derive(Clone)]
72pub struct BaseWorkflowContext {
73    inner: Rc<WorkflowContextInner>,
74}
75impl BaseWorkflowContext {
76    pub(crate) fn shared_mut(&self) -> impl DerefMut<Target = WorkflowContextSharedData> {
77        self.inner.shared.borrow_mut()
78    }
79
80    /// Create a read-only view of this context.
81    pub(crate) fn view(&self) -> WorkflowContextView {
82        WorkflowContextView::new(
83            self.inner.namespace.clone(),
84            self.inner.task_queue.clone(),
85            self.inner.run_id.clone(),
86            &self.inner.inital_information,
87        )
88    }
89}
90
91struct WorkflowContextInner {
92    namespace: String,
93    task_queue: String,
94    run_id: String,
95    inital_information: InitializeWorkflow,
96    chan: Sender<RustWfCmd>,
97    am_cancelled: watch::Receiver<Option<String>>,
98    shared: RefCell<WorkflowContextSharedData>,
99    seq_nums: RefCell<WfCtxProtectedDat>,
100    payload_converter: PayloadConverter,
101    state_mutated: Cell<bool>,
102}
103
104/// Context provided to synchronous signal and update handlers.
105///
106/// This type provides all workflow context capabilities except `state()`, `state_mut()`,
107/// and `wait_condition()`. Those methods are not applicable in sync handler contexts.
108///
109/// Sync handlers receive `&mut self` directly, so they can reference and mutate workflow state without
110/// needing `state()`/`state_mut()`.
111pub struct SyncWorkflowContext<W> {
112    base: BaseWorkflowContext,
113    /// Headers from the current handler invocation (signal, update, etc.)
114    headers: Rc<HashMap<String, Payload>>,
115    _phantom: PhantomData<W>,
116}
117
118impl<W> Clone for SyncWorkflowContext<W> {
119    fn clone(&self) -> Self {
120        Self {
121            base: self.base.clone(),
122            headers: self.headers.clone(),
123            _phantom: PhantomData,
124        }
125    }
126}
127
128/// Used within workflows to issue commands, get info, etc.
129///
130/// The type parameter `W` represents the workflow type. This enables type-safe
131/// access to workflow state via `state_mut()` for mutations.
132pub struct WorkflowContext<W> {
133    sync: SyncWorkflowContext<W>,
134    /// The workflow instance
135    workflow_state: Rc<RefCell<W>>,
136    /// Wakers registered by `wait_condition` futures. Drained and woken on
137    /// every `state_mut` call so that waker-based combinators (e.g.
138    /// `FuturesOrdered`) re-poll the condition after state changes.
139    condition_wakers: Rc<RefCell<Vec<Waker>>>,
140}
141
142impl<W> Clone for WorkflowContext<W> {
143    fn clone(&self) -> Self {
144        Self {
145            sync: self.sync.clone(),
146            workflow_state: self.workflow_state.clone(),
147            condition_wakers: self.condition_wakers.clone(),
148        }
149    }
150}
151
152/// Read-only view of workflow context for use in init and query handlers.
153///
154/// This provides access to workflow information but cannot issue commands.
155#[derive(Clone, Debug)]
156#[non_exhaustive]
157pub struct WorkflowContextView {
158    /// The workflow's unique identifier
159    pub workflow_id: String,
160    /// The run id of this workflow execution
161    pub run_id: String,
162    /// The workflow type name
163    pub workflow_type: String,
164    /// The task queue this workflow is executing on
165    pub task_queue: String,
166    /// The namespace this workflow is executing in
167    pub namespace: String,
168
169    /// The current attempt number (starting from 1)
170    pub attempt: u32,
171    /// The run id of the very first execution in the chain
172    pub first_execution_run_id: String,
173    /// The run id of the previous execution if this is a continuation
174    pub continued_from_run_id: Option<String>,
175
176    /// When the workflow execution started
177    pub start_time: Option<SystemTime>,
178    /// Total workflow execution timeout including retries and continue as new
179    pub execution_timeout: Option<Duration>,
180    /// Timeout of a single workflow run
181    pub run_timeout: Option<Duration>,
182    /// Timeout of a single workflow task
183    pub task_timeout: Option<Duration>,
184
185    /// Information about the parent workflow, if this is a child workflow
186    pub parent: Option<ParentWorkflowInfo>,
187    /// Information about the root workflow in the execution chain
188    pub root: Option<RootWorkflowInfo>,
189
190    /// The workflow's retry policy
191    pub retry_policy: Option<temporalio_common::protos::temporal::api::common::v1::RetryPolicy>,
192    /// If this workflow runs on a cron schedule
193    pub cron_schedule: Option<String>,
194    /// User-defined memo
195    pub memo: Option<Memo>,
196    /// Initial search attributes
197    pub search_attributes: Option<SearchAttributes>,
198}
199
200/// Information about a parent workflow.
201#[derive(Clone, Debug)]
202#[non_exhaustive]
203pub struct ParentWorkflowInfo {
204    /// The parent workflow's unique identifier
205    pub workflow_id: String,
206    /// The parent workflow's run id
207    pub run_id: String,
208    /// The parent workflow's namespace
209    pub namespace: String,
210}
211
212/// Information about the root workflow in an execution chain.
213#[derive(Clone, Debug)]
214#[non_exhaustive]
215pub struct RootWorkflowInfo {
216    /// The root workflow's unique identifier
217    pub workflow_id: String,
218    /// The root workflow's run id
219    pub run_id: String,
220}
221
222impl WorkflowContextView {
223    /// Create a new view from workflow initialization data.
224    pub(crate) fn new(
225        namespace: String,
226        task_queue: String,
227        run_id: String,
228        init: &InitializeWorkflow,
229    ) -> Self {
230        let parent = init
231            .parent_workflow_info
232            .as_ref()
233            .map(|p| ParentWorkflowInfo {
234                workflow_id: p.workflow_id.clone(),
235                run_id: p.run_id.clone(),
236                namespace: p.namespace.clone(),
237            });
238
239        let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
240            workflow_id: r.workflow_id.clone(),
241            run_id: r.run_id.clone(),
242        });
243
244        let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
245            None
246        } else {
247            Some(init.continued_from_execution_run_id.clone())
248        };
249
250        let cron_schedule = if init.cron_schedule.is_empty() {
251            None
252        } else {
253            Some(init.cron_schedule.clone())
254        };
255
256        Self {
257            workflow_id: init.workflow_id.clone(),
258            run_id,
259            workflow_type: init.workflow_type.clone(),
260            task_queue,
261            namespace,
262            attempt: init.attempt as u32,
263            first_execution_run_id: init.first_execution_run_id.clone(),
264            continued_from_run_id,
265            start_time: init.start_time.and_then(|t| t.try_into().ok()),
266            execution_timeout: init
267                .workflow_execution_timeout
268                .and_then(|d| d.try_into().ok()),
269            run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
270            task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
271            parent,
272            root,
273            retry_policy: init.retry_policy.clone(),
274            cron_schedule,
275            memo: init.memo.clone(),
276            search_attributes: init.search_attributes.clone(),
277        }
278    }
279}
280
281/// Error type for activity execution outcomes.
282#[derive(Debug, thiserror::Error)]
283pub enum ActivityExecutionError {
284    /// The activity failed with the given failure details.
285    #[error("Activity failed: {}", .0.message)]
286    Failed(Box<Failure>),
287    /// The activity was cancelled.
288    #[error("Activity cancelled: {}", .0.message)]
289    Cancelled(Box<Failure>),
290    // TODO: Timed out variant
291    /// Failed to serialize input or deserialize result payload.
292    #[error("Payload conversion failed: {0}")]
293    Serialization(#[from] PayloadConversionError),
294}
295
296impl ActivityExecutionError {
297    /// Returns true if this error represents a timeout.
298    pub fn is_timeout(&self) -> bool {
299        match self {
300            ActivityExecutionError::Failed(f) => f.is_timeout().is_some(),
301            _ => false,
302        }
303    }
304}
305
306/// Error returned when a child workflow execution fails.
307#[derive(Debug, thiserror::Error)]
308pub enum ChildWorkflowExecutionError {
309    /// The child workflow failed.
310    #[error("Child workflow failed: {}", .0.message)]
311    Failed(Box<Failure>),
312    /// The child workflow was cancelled.
313    #[error("Child workflow cancelled: {}", .0.message)]
314    Cancelled(Box<Failure>),
315    /// The child workflow failed to start (e.g., workflow ID already exists).
316    #[error(
317        "Child workflow start failed: workflow_id={workflow_id}, workflow_type={workflow_type}, cause={cause:?}"
318    )]
319    StartFailed {
320        /// The workflow ID that was requested.
321        workflow_id: String,
322        /// The workflow type that was requested.
323        workflow_type: String,
324        /// The cause of the start failure.
325        cause: StartChildWorkflowExecutionFailedCause,
326    },
327    /// Failed to serialize input or deserialize the child workflow result payload.
328    #[error("Payload conversion failed: {0}")]
329    Serialization(#[from] PayloadConversionError),
330}
331
332/// Error returned when signaling a child workflow fails.
333#[derive(Debug, thiserror::Error)]
334pub enum ChildWorkflowSignalError {
335    /// The signal delivery failed.
336    #[error("Child workflow signal failed: {}", .0.message)]
337    Failed(Box<Failure>),
338    /// Failed to serialize the signal input payload.
339    #[error("Signal payload conversion failed: {0}")]
340    Serialization(#[from] PayloadConversionError),
341}
342
343impl BaseWorkflowContext {
344    /// Create a new base context, returning the context itself and a receiver which outputs commands
345    /// sent from the workflow.
346    pub(crate) fn new(
347        namespace: String,
348        task_queue: String,
349        run_id: String,
350        init_workflow_job: InitializeWorkflow,
351        am_cancelled: watch::Receiver<Option<String>>,
352        payload_converter: PayloadConverter,
353    ) -> (Self, Receiver<RustWfCmd>) {
354        // The receiving side is non-async
355        let (chan, rx) = std::sync::mpsc::channel();
356        (
357            Self {
358                inner: Rc::new(WorkflowContextInner {
359                    namespace,
360                    task_queue,
361                    run_id,
362                    shared: RefCell::new(WorkflowContextSharedData {
363                        random_seed: init_workflow_job.randomness_seed,
364                        search_attributes: init_workflow_job
365                            .search_attributes
366                            .clone()
367                            .unwrap_or_default(),
368                        ..Default::default()
369                    }),
370                    inital_information: init_workflow_job,
371                    chan,
372                    am_cancelled,
373                    seq_nums: RefCell::new(WfCtxProtectedDat {
374                        next_timer_sequence_number: 1,
375                        next_activity_sequence_number: 1,
376                        next_child_workflow_sequence_number: 1,
377                        next_cancel_external_wf_sequence_number: 1,
378                        next_signal_external_wf_sequence_number: 1,
379                        next_nexus_op_sequence_number: 1,
380                    }),
381                    payload_converter,
382                    state_mutated: Cell::new(false),
383                }),
384            },
385            rx,
386        )
387    }
388
389    /// Buffer a command to be sent in the activation reply
390    pub(crate) fn send(&self, c: RustWfCmd) {
391        self.inner.chan.send(c).expect("command channel intact");
392    }
393
394    /// Check and clear the state_mutated flag. Returns `true` if `state_mut`
395    /// was called since the last time this method was invoked.
396    pub(crate) fn take_state_mutated(&self) -> bool {
397        self.inner.state_mutated.replace(false)
398    }
399
400    /// Mark that workflow state has been mutated.
401    pub(crate) fn set_state_mutated(&self) {
402        self.inner.state_mutated.set(true);
403    }
404
405    /// Return the current value of current_details.
406    pub(crate) fn current_details(&self) -> String {
407        self.inner.shared.borrow().current_details.clone()
408    }
409
410    /// Cancel any cancellable operation by ID
411    fn cancel(&self, cancellable_id: CancellableID) {
412        self.send(RustWfCmd::Cancel(cancellable_id));
413    }
414
415    /// Request to create a timer
416    pub fn timer<T: Into<TimerOptions>>(
417        &self,
418        opts: T,
419    ) -> impl CancellableFuture<TimerResult> + use<T> {
420        let opts: TimerOptions = opts.into();
421        let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
422        let (cmd, unblocker) =
423            CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
424        let payload_converter = PayloadConverter::default();
425        let context = SerializationContext {
426            data: &SerializationContextData::Workflow,
427            converter: &payload_converter,
428        };
429        self.send(
430            CommandCreateRequest {
431                cmd: WorkflowCommand {
432                    variant: Some(
433                        StartTimer {
434                            seq,
435                            start_to_fire_timeout: Some(
436                                opts.duration
437                                    .try_into()
438                                    .expect("Durations must fit into 64 bits"),
439                            ),
440                        }
441                        .into(),
442                    ),
443                    user_metadata: Some(UserMetadata {
444                        summary: opts.summary.map(|summary| {
445                            payload_converter
446                                .to_payload(&context, &summary)
447                                .expect("String-to-JSON payload serialization is infallible")
448                        }),
449                        details: None,
450                    }),
451                },
452                unblocker,
453            }
454            .into(),
455        );
456        cmd
457    }
458
459    /// Request to run an activity
460    pub fn start_activity<AD: ActivityDefinition>(
461        &self,
462        _activity: AD,
463        input: impl Into<AD::Input>,
464        mut opts: ActivityOptions,
465    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
466    where
467        AD::Output: TemporalDeserializable,
468    {
469        let input = input.into();
470        let ctx = SerializationContext {
471            data: &SerializationContextData::Workflow,
472            converter: &self.inner.payload_converter,
473        };
474        let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
475            Ok(p) => p,
476            Err(e) => {
477                return ActivityFut::eager(e.into());
478            }
479        };
480        let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
481        let (cmd, unblocker) =
482            CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
483        if opts.task_queue.is_none() {
484            opts.task_queue = Some(self.inner.task_queue.clone());
485        }
486        self.send(
487            CommandCreateRequest {
488                cmd: opts.into_command(AD::name().to_string(), payloads, seq),
489                unblocker,
490            }
491            .into(),
492        );
493        ActivityFut::running(cmd, self.inner.payload_converter.clone())
494    }
495
496    /// Request to run a local activity
497    pub fn start_local_activity<AD: ActivityDefinition>(
498        &self,
499        _activity: AD,
500        input: impl Into<AD::Input>,
501        opts: LocalActivityOptions,
502    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
503    where
504        AD::Output: TemporalDeserializable,
505    {
506        let input = input.into();
507        let ctx = SerializationContext {
508            data: &SerializationContextData::Workflow,
509            converter: &self.inner.payload_converter,
510        };
511        let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
512            Ok(p) => p,
513            Err(e) => {
514                return ActivityFut::eager(e.into());
515            }
516        };
517        ActivityFut::running(
518            LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
519            self.inner.payload_converter.clone(),
520        )
521    }
522
523    /// Start a child workflow with typed input/output.
524    fn child_workflow<WD: WorkflowDefinition>(
525        &self,
526        workflow: WD,
527        input: impl Into<WD::Input>,
528        opts: ChildWorkflowOptions,
529    ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
530    where
531        WD::Output: TemporalDeserializable,
532    {
533        let input = input.into();
534        let ctx = SerializationContext {
535            data: &SerializationContextData::Workflow,
536            converter: &self.inner.payload_converter,
537        };
538        let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
539            Ok(p) => p,
540            Err(e) => {
541                return ChildWorkflowStartFut::eager(e.into());
542            }
543        };
544        let workflow_type = workflow.name().to_string();
545
546        let child_seq = self.inner.seq_nums.borrow_mut().next_child_workflow_seq();
547        // Immediately create the command/future for the result, otherwise if the user does
548        // not await the result until *after* we receive an activation for it, there will be nothing
549        // to match when unblocking.
550        let (result_cmd, unblocker) = CancellableWFCommandFut::new(
551            CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
552            self.clone(),
553        );
554        self.send(
555            CommandSubscribeChildWorkflowCompletion {
556                seq: child_seq,
557                unblocker,
558            }
559            .into(),
560        );
561
562        let common = ChildWfCommon {
563            workflow_id: opts.workflow_id.clone(),
564            child_seq,
565            result_future: result_cmd,
566            base_ctx: self.clone(),
567            payload_converter: self.inner.payload_converter.clone(),
568        };
569
570        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
571            CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
572            common,
573            self.clone(),
574        );
575        self.send(
576            CommandCreateRequest {
577                cmd: opts.into_command(workflow_type, payloads, child_seq),
578                unblocker,
579            }
580            .into(),
581        );
582
583        ChildWorkflowStartFut::Running(cmd)
584    }
585
586    /// Request to run a local activity with no implementation of timer-backoff based retrying.
587    fn local_activity_no_timer_retry(
588        self,
589        activity_type: String,
590        arguments: Vec<Payload>,
591        opts: LocalActivityOptions,
592    ) -> impl CancellableFuture<ActivityResolution> {
593        let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
594        let (cmd, unblocker) =
595            CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
596        self.inner
597            .chan
598            .send(
599                CommandCreateRequest {
600                    cmd: opts.into_command(activity_type, arguments, seq),
601                    unblocker,
602                }
603                .into(),
604            )
605            .expect("command channel intact");
606        cmd
607    }
608
609    fn send_signal_wf(
610        self,
611        target: sig_we::Target,
612        signal: Signal,
613    ) -> impl CancellableFuture<SignalExternalWfResult> {
614        let seq = self
615            .inner
616            .seq_nums
617            .borrow_mut()
618            .next_signal_external_wf_seq();
619        let (cmd, unblocker) =
620            CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
621        self.send(
622            CommandCreateRequest {
623                cmd: WorkflowCommand {
624                    variant: Some(
625                        SignalExternalWorkflowExecution {
626                            seq,
627                            signal_name: signal.signal_name,
628                            args: signal.data.input,
629                            target: Some(target),
630                            headers: signal.data.headers,
631                        }
632                        .into(),
633                    ),
634                    user_metadata: None,
635                },
636                unblocker,
637            }
638            .into(),
639        );
640        cmd
641    }
642}
643
644impl<W> SyncWorkflowContext<W> {
645    /// Return the workflow's unique identifier
646    pub fn workflow_id(&self) -> &str {
647        &self.base.inner.inital_information.workflow_id
648    }
649
650    /// Return the run id of this workflow execution
651    pub fn run_id(&self) -> &str {
652        &self.base.inner.run_id
653    }
654
655    /// Return the namespace the workflow is executing in
656    pub fn namespace(&self) -> &str {
657        &self.base.inner.namespace
658    }
659
660    /// Return the task queue the workflow is executing in
661    pub fn task_queue(&self) -> &str {
662        &self.base.inner.task_queue
663    }
664
665    /// Return the current time according to the workflow (which is not wall-clock time).
666    pub fn workflow_time(&self) -> Option<SystemTime> {
667        self.base.inner.shared.borrow().wf_time
668    }
669
670    /// Return the length of history so far at this point in the workflow
671    pub fn history_length(&self) -> u32 {
672        self.base.inner.shared.borrow().history_length
673    }
674
675    /// Return the deployment version, if any,  as it was when this point in the workflow was first
676    /// reached. If this code is being executed for the first time, return this Worker's deployment
677    /// version if it has one.
678    pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
679        self.base
680            .inner
681            .shared
682            .borrow()
683            .current_deployment_version
684            .clone()
685    }
686
687    /// Return current values for workflow search attributes
688    pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
689        Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
690    }
691
692    /// Return the workflow's randomness seed
693    pub fn random_seed(&self) -> u64 {
694        self.base.inner.shared.borrow().random_seed
695    }
696
697    /// Returns true if the current workflow task is happening under replay
698    pub fn is_replaying(&self) -> bool {
699        self.base.inner.shared.borrow().is_replaying
700    }
701
702    /// Returns true if the server suggests this workflow should continue-as-new
703    pub fn continue_as_new_suggested(&self) -> bool {
704        self.base.inner.shared.borrow().continue_as_new_suggested
705    }
706
707    /// Returns the headers for the current handler invocation (signal, update, query, etc.).
708    ///
709    /// When called from within a signal handler, returns the headers that were sent with that
710    /// signal. When called from the main workflow run method, returns an empty map.
711    pub fn headers(&self) -> &HashMap<String, Payload> {
712        &self.headers
713    }
714
715    /// Returns the [PayloadConverter] currently used by the worker running this workflow.
716    pub fn payload_converter(&self) -> &PayloadConverter {
717        &self.base.inner.payload_converter
718    }
719
720    /// Return various information that the workflow was initialized with. Will eventually become
721    /// a proper non-proto workflow info struct.
722    pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
723        &self.base.inner.inital_information
724    }
725
726    /// A future that resolves if/when the workflow is cancelled, with the user provided cause
727    pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
728        let am_cancelled = self.base.inner.am_cancelled.clone();
729        async move {
730            if let Some(s) = am_cancelled.borrow().as_ref() {
731                return s.clone();
732            }
733            am_cancelled
734                .clone()
735                .changed()
736                .await
737                .expect("Cancelled send half not dropped");
738            am_cancelled.borrow().as_ref().cloned().unwrap_or_default()
739        }
740        .fuse()
741    }
742
743    /// Signal that this workflow should continue as a new workflow execution with the given input and
744    /// options.
745    ///
746    /// This always returns an `Err` which should be propigated.
747    pub fn continue_as_new(
748        &self,
749        input: &<W::Run as WorkflowDefinition>::Input,
750        opts: ContinueAsNewOptions,
751    ) -> Result<std::convert::Infallible, WorkflowTermination>
752    where
753        W: crate::workflows::WorkflowImplementation,
754    {
755        let pc = &self.base.inner.payload_converter;
756        let ctx = SerializationContext {
757            data: &SerializationContextData::Workflow,
758            converter: pc,
759        };
760        let arguments = pc
761            .to_payloads(&ctx, input)
762            .map_err(WorkflowTermination::failed)?;
763        let workflow_type = self.workflow_initial_info().workflow_type.clone();
764        let proto = opts.into_proto(workflow_type, arguments);
765        Err(WorkflowTermination::continue_as_new(proto))
766    }
767
768    /// Request to create a timer
769    pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
770        self.base.timer(opts)
771    }
772
773    /// Request to run an activity
774    pub fn start_activity<AD: ActivityDefinition>(
775        &self,
776        activity: AD,
777        input: impl Into<AD::Input>,
778        opts: ActivityOptions,
779    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
780    where
781        AD::Output: TemporalDeserializable,
782    {
783        self.base.start_activity(activity, input, opts)
784    }
785
786    /// Request to run a local activity
787    pub fn start_local_activity<AD: ActivityDefinition>(
788        &self,
789        activity: AD,
790        input: impl Into<AD::Input>,
791        opts: LocalActivityOptions,
792    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
793    where
794        AD::Output: TemporalDeserializable,
795    {
796        self.base.start_local_activity(activity, input, opts)
797    }
798
799    /// Start a child workflow. Returns a future that resolves to a [StartedChildWorkflow]
800    /// which can be used to await the result, send signals, or cancel the child.
801    pub fn child_workflow<WD: WorkflowDefinition>(
802        &self,
803        workflow: WD,
804        input: impl Into<WD::Input>,
805        opts: ChildWorkflowOptions,
806    ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
807    where
808        WD::Output: TemporalDeserializable,
809    {
810        self.base.child_workflow(workflow, input, opts)
811    }
812
813    /// Check (or record) that this workflow history was created with the provided patch
814    pub fn patched(&self, patch_id: &str) -> bool {
815        self.patch_impl(patch_id, false)
816    }
817
818    /// Record that this workflow history was created with the provided patch, and it is being
819    /// phased out.
820    pub fn deprecate_patch(&self, patch_id: &str) -> bool {
821        self.patch_impl(patch_id, true)
822    }
823
824    fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
825        self.base.send(
826            workflow_command::Variant::SetPatchMarker(SetPatchMarker {
827                patch_id: patch_id.to_string(),
828                deprecated,
829            })
830            .into(),
831        );
832        // See if we already know about the status of this change
833        if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
834            return *present;
835        }
836
837        // If we don't already know about the change, that means there is no marker in history,
838        // and we should return false if we are replaying
839        let res = !self.base.inner.shared.borrow().is_replaying;
840
841        self.base
842            .inner
843            .shared
844            .borrow_mut()
845            .changes
846            .insert(patch_id.to_string(), res);
847
848        res
849    }
850
851    /// Get a handle to an external workflow for sending signals or requesting cancellation.
852    pub fn external_workflow(
853        &self,
854        workflow_id: impl Into<String>,
855        run_id: Option<String>,
856    ) -> ExternalWorkflowHandle {
857        ExternalWorkflowHandle {
858            workflow_id: workflow_id.into(),
859            run_id,
860            namespace: self.base.inner.namespace.clone(),
861            base_ctx: self.base.clone(),
862        }
863    }
864
865    /// Add or create a set of search attributes
866    pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
867        self.base.send(RustWfCmd::NewNonblockingCmd(
868            workflow_command::Variant::UpsertWorkflowSearchAttributes(
869                UpsertWorkflowSearchAttributes {
870                    search_attributes: Some(SearchAttributes {
871                        indexed_fields: HashMap::from_iter(attr_iter),
872                    }),
873                },
874            ),
875        ))
876    }
877
878    /// Add or create a set of search attributes
879    pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
880        self.base.send(RustWfCmd::NewNonblockingCmd(
881            workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
882                upserted_memo: Some(Memo {
883                    fields: HashMap::from_iter(attr_iter),
884                }),
885            }),
886        ))
887    }
888
889    /// Set the current details string for this workflow execution.
890    ///
891    /// The value is surfaced to the Temporal server UI in real time via the
892    /// the workflow metadata query.
893    pub fn set_current_details(&self, details: impl Into<String>) {
894        self.base.inner.shared.borrow_mut().current_details = details.into();
895    }
896
897    /// Force a workflow task failure (EX: in order to retry on non-sticky queue)
898    pub fn force_task_fail(&self, with: anyhow::Error) {
899        self.base.send(with.into());
900    }
901
902    /// Start a nexus operation
903    pub fn start_nexus_operation(
904        &self,
905        opts: NexusOperationOptions,
906    ) -> impl CancellableFuture<NexusStartResult> {
907        let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
908        let (result_future, unblocker) = WFCommandFut::new();
909        self.base
910            .send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker });
911        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
912            CancellableID::NexusOp(seq),
913            NexusUnblockData {
914                result_future: result_future.shared(),
915                schedule_seq: seq,
916                base_ctx: self.base.clone(),
917            },
918            self.base.clone(),
919        );
920        self.base.send(
921            CommandCreateRequest {
922                cmd: opts.into_command(seq),
923                unblocker,
924            }
925            .into(),
926        );
927        cmd
928    }
929
930    /// Create a read-only view of this context.
931    pub(crate) fn view(&self) -> WorkflowContextView {
932        self.base.view()
933    }
934}
935
936impl<W> WorkflowContext<W> {
937    /// Create a new wf context from a base context and workflow state.
938    pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
939        Self {
940            sync: SyncWorkflowContext {
941                base,
942                headers: Rc::new(HashMap::new()),
943                _phantom: PhantomData,
944            },
945            workflow_state,
946            condition_wakers: Rc::new(RefCell::new(Vec::new())),
947        }
948    }
949
950    /// Returns a new context with the specified headers set.
951    pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
952        Self {
953            sync: SyncWorkflowContext {
954                base: self.sync.base.clone(),
955                headers: Rc::new(headers),
956                _phantom: PhantomData,
957            },
958            workflow_state: self.workflow_state.clone(),
959            condition_wakers: self.condition_wakers.clone(),
960        }
961    }
962
963    /// Returns a [`SyncWorkflowContext`] extracted from this context.
964    pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
965        self.sync.clone()
966    }
967
968    // --- Delegated methods from SyncWorkflowContext ---
969
970    /// Return the workflow's unique identifier
971    pub fn workflow_id(&self) -> &str {
972        self.sync.workflow_id()
973    }
974
975    /// Return the run id of this workflow execution
976    pub fn run_id(&self) -> &str {
977        self.sync.run_id()
978    }
979
980    /// Return the namespace the workflow is executing in
981    pub fn namespace(&self) -> &str {
982        self.sync.namespace()
983    }
984
985    /// Return the task queue the workflow is executing in
986    pub fn task_queue(&self) -> &str {
987        self.sync.task_queue()
988    }
989
990    /// Return the current time according to the workflow (which is not wall-clock time).
991    pub fn workflow_time(&self) -> Option<SystemTime> {
992        self.sync.workflow_time()
993    }
994
995    /// Return the length of history so far at this point in the workflow
996    pub fn history_length(&self) -> u32 {
997        self.sync.history_length()
998    }
999
1000    /// Return the deployment version, if any, as it was when this point in the workflow was first
1001    /// reached. If this code is being executed for the first time, return this Worker's deployment
1002    /// version if it has one.
1003    pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
1004        self.sync.current_deployment_version()
1005    }
1006
1007    /// Return current values for workflow search attributes
1008    pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
1009        self.sync.search_attributes()
1010    }
1011
1012    /// Return the workflow's randomness seed
1013    pub fn random_seed(&self) -> u64 {
1014        self.sync.random_seed()
1015    }
1016
1017    /// Returns true if the current workflow task is happening under replay
1018    pub fn is_replaying(&self) -> bool {
1019        self.sync.is_replaying()
1020    }
1021
1022    /// Returns true if the server suggests this workflow should continue-as-new
1023    pub fn continue_as_new_suggested(&self) -> bool {
1024        self.sync.continue_as_new_suggested()
1025    }
1026
1027    /// Returns the headers for the current handler invocation (signal, update, query, etc.).
1028    pub fn headers(&self) -> &HashMap<String, Payload> {
1029        self.sync.headers()
1030    }
1031
1032    /// Returns the [PayloadConverter] currently used by the worker running this workflow.
1033    pub fn payload_converter(&self) -> &PayloadConverter {
1034        self.sync.payload_converter()
1035    }
1036
1037    /// Return various information that the workflow was initialized with.
1038    pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
1039        self.sync.workflow_initial_info()
1040    }
1041
1042    /// A future that resolves if/when the workflow is cancelled, with the user provided cause
1043    pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
1044        self.sync.cancelled()
1045    }
1046
1047    /// Request to create a timer
1048    pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
1049        self.sync.timer(opts)
1050    }
1051
1052    /// Request to run an activity
1053    pub fn start_activity<AD: ActivityDefinition>(
1054        &self,
1055        activity: AD,
1056        input: impl Into<AD::Input>,
1057        opts: ActivityOptions,
1058    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
1059    where
1060        AD::Output: TemporalDeserializable,
1061    {
1062        self.sync.start_activity(activity, input, opts)
1063    }
1064
1065    /// Request to run a local activity
1066    pub fn start_local_activity<AD: ActivityDefinition>(
1067        &self,
1068        activity: AD,
1069        input: impl Into<AD::Input>,
1070        opts: LocalActivityOptions,
1071    ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
1072    where
1073        AD::Output: TemporalDeserializable,
1074    {
1075        self.sync.start_local_activity(activity, input, opts)
1076    }
1077
1078    /// Start a child workflow. See [SyncWorkflowContext::child_workflow] for details.
1079    pub fn child_workflow<WD: WorkflowDefinition>(
1080        &self,
1081        workflow: WD,
1082        input: impl Into<WD::Input>,
1083        opts: ChildWorkflowOptions,
1084    ) -> impl CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
1085    where
1086        WD::Output: TemporalDeserializable,
1087    {
1088        self.sync.child_workflow(workflow, input, opts)
1089    }
1090
1091    /// Check (or record) that this workflow history was created with the provided patch
1092    pub fn patched(&self, patch_id: &str) -> bool {
1093        self.sync.patched(patch_id)
1094    }
1095
1096    /// Record that this workflow history was created with the provided patch, and it is being
1097    /// phased out.
1098    pub fn deprecate_patch(&self, patch_id: &str) -> bool {
1099        self.sync.deprecate_patch(patch_id)
1100    }
1101
1102    /// Get a handle to an external workflow. See [SyncWorkflowContext::external_workflow].
1103    pub fn external_workflow(
1104        &self,
1105        workflow_id: impl Into<String>,
1106        run_id: Option<String>,
1107    ) -> ExternalWorkflowHandle {
1108        self.sync.external_workflow(workflow_id, run_id)
1109    }
1110
1111    /// Add or create a set of search attributes
1112    pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1113        self.sync.upsert_search_attributes(attr_iter)
1114    }
1115
1116    /// Add or create a set of memo fields
1117    pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
1118        self.sync.upsert_memo(attr_iter)
1119    }
1120
1121    /// Set the current details string for this workflow execution.
1122    ///
1123    /// See [`SyncWorkflowContext::set_current_details`].
1124    pub fn set_current_details(&self, details: impl Into<String>) {
1125        self.sync.set_current_details(details)
1126    }
1127
1128    /// Force a workflow task failure (EX: in order to retry on non-sticky queue)
1129    pub fn force_task_fail(&self, with: anyhow::Error) {
1130        self.sync.force_task_fail(with)
1131    }
1132
1133    /// Start a nexus operation
1134    pub fn start_nexus_operation(
1135        &self,
1136        opts: NexusOperationOptions,
1137    ) -> impl CancellableFuture<NexusStartResult> {
1138        self.sync.start_nexus_operation(opts)
1139    }
1140
1141    /// Create a read-only view of this context.
1142    pub(crate) fn view(&self) -> WorkflowContextView {
1143        self.sync.view()
1144    }
1145
1146    /// Access workflow state immutably via closure.
1147    ///
1148    /// The borrow is scoped to the closure and cannot escape, preventing
1149    /// borrows from being held across await points.
1150    pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
1151        f(&*self.workflow_state.borrow())
1152    }
1153
1154    /// Access workflow state mutably via closure.
1155    ///
1156    /// The borrow is scoped to the closure and cannot escape, preventing
1157    /// borrows from being held across await points.
1158    ///
1159    /// After the mutation, all wakers registered by pending `wait_condition`
1160    /// futures are woken so that waker-based combinators (e.g.
1161    /// `FuturesOrdered`) re-poll them on the next pass.
1162    pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
1163        let result = f(&mut *self.workflow_state.borrow_mut());
1164        let _guard = SdkWakeGuard::new();
1165        for waker in self.condition_wakers.borrow_mut().drain(..) {
1166            waker.wake();
1167        }
1168        self.sync.base.set_state_mutated();
1169        result
1170    }
1171
1172    /// Signal that this workflow should continue as a new workflow execution with the given input and
1173    /// options.
1174    ///
1175    /// This always returns an `Err` which should be propigated
1176    pub fn continue_as_new(
1177        &self,
1178        input: &<W::Run as WorkflowDefinition>::Input,
1179        opts: ContinueAsNewOptions,
1180    ) -> Result<std::convert::Infallible, WorkflowTermination>
1181    where
1182        W: crate::workflows::WorkflowImplementation,
1183    {
1184        self.sync.continue_as_new(input, opts)
1185    }
1186
1187    /// Wait for some condition on workflow state to become true, yielding the workflow if not.
1188    ///
1189    /// The condition closure receives an immutable reference to the workflow state,
1190    /// which is borrowed only for the duration of each poll (not across await points).
1191    pub fn wait_condition<'a>(
1192        &'a self,
1193        mut condition: impl FnMut(&W) -> bool + 'a,
1194    ) -> impl FusedFuture<Output = ()> + 'a {
1195        future::poll_fn(move |cx: &mut Context<'_>| {
1196            if condition(&*self.workflow_state.borrow()) {
1197                Poll::Ready(())
1198            } else {
1199                self.condition_wakers.borrow_mut().push(cx.waker().clone());
1200                Poll::Pending
1201            }
1202        })
1203        .fuse()
1204    }
1205}
1206
1207struct WfCtxProtectedDat {
1208    next_timer_sequence_number: u32,
1209    next_activity_sequence_number: u32,
1210    next_child_workflow_sequence_number: u32,
1211    next_cancel_external_wf_sequence_number: u32,
1212    next_signal_external_wf_sequence_number: u32,
1213    next_nexus_op_sequence_number: u32,
1214}
1215
1216impl WfCtxProtectedDat {
1217    fn next_timer_seq(&mut self) -> u32 {
1218        let seq = self.next_timer_sequence_number;
1219        self.next_timer_sequence_number += 1;
1220        seq
1221    }
1222    fn next_activity_seq(&mut self) -> u32 {
1223        let seq = self.next_activity_sequence_number;
1224        self.next_activity_sequence_number += 1;
1225        seq
1226    }
1227    fn next_child_workflow_seq(&mut self) -> u32 {
1228        let seq = self.next_child_workflow_sequence_number;
1229        self.next_child_workflow_sequence_number += 1;
1230        seq
1231    }
1232    fn next_cancel_external_wf_seq(&mut self) -> u32 {
1233        let seq = self.next_cancel_external_wf_sequence_number;
1234        self.next_cancel_external_wf_sequence_number += 1;
1235        seq
1236    }
1237    fn next_signal_external_wf_seq(&mut self) -> u32 {
1238        let seq = self.next_signal_external_wf_sequence_number;
1239        self.next_signal_external_wf_sequence_number += 1;
1240        seq
1241    }
1242    fn next_nexus_op_seq(&mut self) -> u32 {
1243        let seq = self.next_nexus_op_sequence_number;
1244        self.next_nexus_op_sequence_number += 1;
1245        seq
1246    }
1247}
1248
1249#[derive(Clone, Debug, Default)]
1250pub(crate) struct WorkflowContextSharedData {
1251    /// Maps change ids -> resolved status
1252    pub(crate) changes: HashMap<String, bool>,
1253    pub(crate) is_replaying: bool,
1254    pub(crate) wf_time: Option<SystemTime>,
1255    pub(crate) history_length: u32,
1256    pub(crate) continue_as_new_suggested: bool,
1257    pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
1258    pub(crate) search_attributes: SearchAttributes,
1259    pub(crate) random_seed: u64,
1260    /// Current details string, surfaced via the workflow metadata query.
1261    pub(crate) current_details: String,
1262}
1263
1264/// A Future that can be cancelled.
1265/// Used in the prototype SDK for cancelling operations like timers and activities.
1266pub trait CancellableFuture<T>: Future<Output = T> + FusedFuture {
1267    /// Cancel this Future
1268    fn cancel(&self);
1269}
1270
1271/// A Future that can be cancelled with a reason
1272pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
1273    /// Cancel this Future with a reason
1274    fn cancel_with_reason(&self, reason: String);
1275}
1276
1277struct WFCommandFut<T, D> {
1278    _unused: PhantomData<T>,
1279    result_rx: oneshot::Receiver<UnblockEvent>,
1280    other_dat: Option<D>,
1281}
1282impl<T> WFCommandFut<T, ()> {
1283    fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
1284        Self::new_with_dat(())
1285    }
1286}
1287
1288impl<T, D> WFCommandFut<T, D> {
1289    fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
1290        let (tx, rx) = oneshot::channel();
1291        (
1292            Self {
1293                _unused: PhantomData,
1294                result_rx: rx,
1295                other_dat: Some(other_dat),
1296            },
1297            tx,
1298        )
1299    }
1300}
1301
1302impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1303impl<T, D> Future for WFCommandFut<T, D>
1304where
1305    T: Unblockable<OtherDat = D>,
1306{
1307    type Output = T;
1308
1309    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1310        self.result_rx.poll_unpin(cx).map(|x| {
1311            let od = self
1312                .other_dat
1313                .take()
1314                .expect("Other data must exist when resolving command future");
1315            Unblockable::unblock(x.unwrap(), od)
1316        })
1317    }
1318}
1319impl<T, D> FusedFuture for WFCommandFut<T, D>
1320where
1321    T: Unblockable<OtherDat = D>,
1322{
1323    fn is_terminated(&self) -> bool {
1324        self.other_dat.is_none()
1325    }
1326}
1327
1328struct CancellableWFCommandFut<T, D, ID = CancellableID> {
1329    cmd_fut: WFCommandFut<T, D>,
1330    cancellable_id: ID,
1331    base_ctx: BaseWorkflowContext,
1332}
1333impl<T, ID> CancellableWFCommandFut<T, (), ID> {
1334    fn new(
1335        cancellable_id: ID,
1336        base_ctx: BaseWorkflowContext,
1337    ) -> (Self, oneshot::Sender<UnblockEvent>) {
1338        Self::new_with_dat(cancellable_id, (), base_ctx)
1339    }
1340}
1341impl<T, D, ID> CancellableWFCommandFut<T, D, ID> {
1342    fn new_with_dat(
1343        cancellable_id: ID,
1344        other_dat: D,
1345        base_ctx: BaseWorkflowContext,
1346    ) -> (Self, oneshot::Sender<UnblockEvent>) {
1347        let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
1348        (
1349            Self {
1350                cmd_fut,
1351                cancellable_id,
1352                base_ctx,
1353            },
1354            sender,
1355        )
1356    }
1357}
1358impl<T, D, ID> Unpin for CancellableWFCommandFut<T, D, ID> where T: Unblockable<OtherDat = D> {}
1359impl<T, D, ID> Future for CancellableWFCommandFut<T, D, ID>
1360where
1361    T: Unblockable<OtherDat = D>,
1362{
1363    type Output = T;
1364
1365    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1366        self.cmd_fut.poll_unpin(cx)
1367    }
1368}
1369impl<T, D, ID> FusedFuture for CancellableWFCommandFut<T, D, ID>
1370where
1371    T: Unblockable<OtherDat = D>,
1372{
1373    fn is_terminated(&self) -> bool {
1374        self.cmd_fut.is_terminated()
1375    }
1376}
1377
1378impl<T, D, ID> CancellableFuture<T> for CancellableWFCommandFut<T, D, ID>
1379where
1380    T: Unblockable<OtherDat = D>,
1381    ID: Clone + Into<CancellableID>,
1382{
1383    fn cancel(&self) {
1384        self.base_ctx.cancel(self.cancellable_id.clone().into());
1385    }
1386}
1387impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D, CancellableIDWithReason>
1388where
1389    T: Unblockable<OtherDat = D>,
1390{
1391    fn cancel_with_reason(&self, reason: String) {
1392        let new_id = self.cancellable_id.clone().with_reason(reason);
1393        self.base_ctx.cancel(new_id);
1394    }
1395}
1396
1397struct LATimerBackoffFut {
1398    la_opts: LocalActivityOptions,
1399    activity_type: String,
1400    arguments: Vec<Payload>,
1401    current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
1402    timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
1403    base_ctx: BaseWorkflowContext,
1404    next_attempt: u32,
1405    next_sched_time: Option<prost_types::Timestamp>,
1406    did_cancel: AtomicBool,
1407    terminated: bool,
1408}
1409impl LATimerBackoffFut {
1410    pub(crate) fn new(
1411        activity_type: String,
1412        arguments: Vec<Payload>,
1413        opts: LocalActivityOptions,
1414        base_ctx: BaseWorkflowContext,
1415    ) -> Self {
1416        let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
1417            activity_type.clone(),
1418            arguments.clone(),
1419            opts.clone(),
1420        ));
1421        Self {
1422            la_opts: opts,
1423            activity_type,
1424            arguments,
1425            current_fut,
1426            timer_fut: None,
1427            base_ctx,
1428            next_attempt: 1,
1429            next_sched_time: None,
1430            did_cancel: AtomicBool::new(false),
1431            terminated: false,
1432        }
1433    }
1434}
1435impl Unpin for LATimerBackoffFut {}
1436impl Future for LATimerBackoffFut {
1437    type Output = ActivityResolution;
1438
1439    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1440        // If the timer exists, wait for it first
1441        if let Some(tf) = self.timer_fut.as_mut() {
1442            return match tf.poll_unpin(cx) {
1443                Poll::Ready(tr) => {
1444                    self.timer_fut = None;
1445                    // Schedule next LA if this timer wasn't cancelled
1446                    if let TimerResult::Fired = tr {
1447                        let mut opts = self.la_opts.clone();
1448                        opts.attempt = Some(self.next_attempt);
1449                        opts.original_schedule_time
1450                            .clone_from(&self.next_sched_time);
1451                        self.current_fut =
1452                            Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
1453                                self.activity_type.clone(),
1454                                self.arguments.clone(),
1455                                opts,
1456                            ));
1457                        Poll::Pending
1458                    } else {
1459                        self.terminated = true;
1460                        Poll::Ready(ActivityResolution {
1461                            status: Some(
1462                                activity_resolution::Status::Cancelled(Default::default()),
1463                            ),
1464                        })
1465                    }
1466                }
1467                Poll::Pending => Poll::Pending,
1468            };
1469        }
1470        let poll_res = self.current_fut.poll_unpin(cx);
1471        if let Poll::Ready(ref r) = poll_res
1472            && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
1473        {
1474            // If we've already said we want to cancel, don't schedule the backoff timer. Just
1475            // return cancel status. This can happen if cancel comes after the LA says it wants
1476            // to back off but before we have scheduled the timer.
1477            if self.did_cancel.load(Ordering::Acquire) {
1478                self.terminated = true;
1479                return Poll::Ready(ActivityResolution {
1480                    status: Some(activity_resolution::Status::Cancelled(Default::default())),
1481                });
1482            }
1483
1484            let timer_f = self.base_ctx.timer::<Duration>(
1485                b.backoff_duration
1486                    .expect("Duration is set")
1487                    .try_into()
1488                    .expect("duration converts ok"),
1489            );
1490            self.timer_fut = Some(Box::pin(timer_f));
1491            self.next_attempt = b.attempt;
1492            self.next_sched_time.clone_from(&b.original_schedule_time);
1493            return Poll::Pending;
1494        }
1495        if poll_res.is_ready() {
1496            self.terminated = true;
1497        }
1498        poll_res
1499    }
1500}
1501impl FusedFuture for LATimerBackoffFut {
1502    fn is_terminated(&self) -> bool {
1503        self.terminated
1504    }
1505}
1506impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
1507    fn cancel(&self) {
1508        self.did_cancel.store(true, Ordering::Release);
1509        if let Some(tf) = self.timer_fut.as_ref() {
1510            tf.cancel();
1511        }
1512        self.current_fut.cancel();
1513    }
1514}
1515
1516/// Future for activity results. Either an immediate error or a running activity.
1517enum ActivityFut<F, Output> {
1518    /// Immediate error (e.g., input serialization failure). Resolves on first poll.
1519    Errored {
1520        error: Option<ActivityExecutionError>,
1521        _phantom: PhantomData<Output>,
1522    },
1523    /// Running activity that will deserialize output on completion.
1524    Running {
1525        inner: F,
1526        payload_converter: PayloadConverter,
1527        _phantom: PhantomData<Output>,
1528    },
1529    Terminated,
1530}
1531
1532impl<F, Output> ActivityFut<F, Output> {
1533    fn eager(err: ActivityExecutionError) -> Self {
1534        Self::Errored {
1535            error: Some(err),
1536            _phantom: PhantomData,
1537        }
1538    }
1539
1540    fn running(inner: F, payload_converter: PayloadConverter) -> Self {
1541        Self::Running {
1542            inner,
1543            payload_converter,
1544            _phantom: PhantomData,
1545        }
1546    }
1547}
1548
1549impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
1550
1551impl<F, Output> Future for ActivityFut<F, Output>
1552where
1553    F: Future<Output = ActivityResolution> + Unpin,
1554    Output: TemporalDeserializable + 'static,
1555{
1556    type Output = Result<Output, ActivityExecutionError>;
1557
1558    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1559        let this = self.get_mut();
1560        let poll = match this {
1561            ActivityFut::Errored { error, .. } => {
1562                Poll::Ready(Err(error.take().expect("polled after completion")))
1563            }
1564            ActivityFut::Running {
1565                inner,
1566                payload_converter,
1567                ..
1568            } => match Pin::new(inner).poll(cx) {
1569                Poll::Pending => Poll::Pending,
1570                Poll::Ready(resolution) => Poll::Ready({
1571                    let status = resolution.status.ok_or_else(|| {
1572                        ActivityExecutionError::Failed(Box::new(Failure {
1573                            message: "Activity completed without a status".to_string(),
1574                            ..Default::default()
1575                        }))
1576                    })?;
1577
1578                    match status {
1579                        activity_resolution::Status::Completed(success) => {
1580                            let payload = success.result.unwrap_or_default();
1581                            let ctx = SerializationContext {
1582                                data: &SerializationContextData::Workflow,
1583                                converter: payload_converter,
1584                            };
1585                            payload_converter
1586                                .from_payload::<Output>(&ctx, payload)
1587                                .map_err(ActivityExecutionError::Serialization)
1588                        }
1589                        activity_resolution::Status::Failed(f) => Err(
1590                            ActivityExecutionError::Failed(Box::new(f.failure.unwrap_or_default())),
1591                        ),
1592                        activity_resolution::Status::Cancelled(c) => {
1593                            Err(ActivityExecutionError::Cancelled(Box::new(
1594                                c.failure.unwrap_or_default(),
1595                            )))
1596                        }
1597                        activity_resolution::Status::Backoff(_) => {
1598                            panic!("DoBackoff should be handled by LATimerBackoffFut")
1599                        }
1600                    }
1601                }),
1602            },
1603            ActivityFut::Terminated => panic!("polled after termination"),
1604        };
1605        if poll.is_ready() {
1606            *this = ActivityFut::Terminated;
1607        }
1608        poll
1609    }
1610}
1611
1612impl<F, Output> FusedFuture for ActivityFut<F, Output>
1613where
1614    F: Future<Output = ActivityResolution> + Unpin,
1615    Output: TemporalDeserializable + 'static,
1616{
1617    fn is_terminated(&self) -> bool {
1618        matches!(self, ActivityFut::Terminated)
1619    }
1620}
1621
1622impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
1623where
1624    F: CancellableFuture<ActivityResolution> + Unpin,
1625    Output: TemporalDeserializable + 'static,
1626{
1627    fn cancel(&self) {
1628        if let ActivityFut::Running { inner, .. } = self {
1629            inner.cancel()
1630        }
1631    }
1632}
1633
1634pub(crate) struct ChildWfCommon {
1635    workflow_id: String,
1636    child_seq: u32,
1637    result_future: CancellableWFCommandFut<ChildWorkflowResult, (), CancellableIDWithReason>,
1638    base_ctx: BaseWorkflowContext,
1639    payload_converter: PayloadConverter,
1640}
1641
1642/// Child workflow in pending state. Internal type used during the start handshake;
1643/// `ChildWorkflowStartFut` converts this into `Result<StartedChildWorkflow, _>` before
1644/// the caller sees it.
1645#[derive(derive_more::Debug)]
1646pub(crate) struct PendingChildWorkflow<WD: WorkflowDefinition> {
1647    pub(crate) status: ChildWorkflowStartStatus,
1648    #[debug(skip)]
1649    pub(crate) common: ChildWfCommon,
1650    pub(crate) _phantom: PhantomData<WD>,
1651}
1652
1653/// Child workflow in started state.
1654#[derive(derive_more::Debug)]
1655pub struct StartedChildWorkflow<WD: WorkflowDefinition> {
1656    /// Run ID of the child workflow
1657    pub run_id: String,
1658    #[debug(skip)]
1659    common: ChildWfCommon,
1660    _phantom: PhantomData<WD>,
1661}
1662
1663/// Future for child workflow results. Wraps the raw result future and deserializes
1664/// the output on completion.
1665enum ChildWorkflowFut<F, Output> {
1666    Running {
1667        inner: F,
1668        payload_converter: PayloadConverter,
1669        _phantom: PhantomData<Output>,
1670    },
1671    Terminated,
1672}
1673
1674impl<F, Output> Unpin for ChildWorkflowFut<F, Output> where F: Unpin {}
1675
1676impl<F, Output> Future for ChildWorkflowFut<F, Output>
1677where
1678    F: Future<Output = ChildWorkflowResult> + Unpin,
1679    Output: TemporalDeserializable + 'static,
1680{
1681    type Output = Result<Output, ChildWorkflowExecutionError>;
1682
1683    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1684        let this = self.get_mut();
1685        let poll = match this {
1686            ChildWorkflowFut::Running {
1687                inner,
1688                payload_converter,
1689                ..
1690            } => match Pin::new(inner).poll(cx) {
1691                Poll::Pending => Poll::Pending,
1692                Poll::Ready(result) => Poll::Ready({
1693                    use temporalio_common::protos::coresdk::child_workflow::child_workflow_result;
1694                    let status = result.status.ok_or_else(|| {
1695                        ChildWorkflowExecutionError::Failed(Box::new(Failure {
1696                            message: "Child workflow completed without a status".to_string(),
1697                            ..Default::default()
1698                        }))
1699                    })?;
1700                    match status {
1701                        child_workflow_result::Status::Completed(success) => {
1702                            let payloads = success.result.into_iter().collect();
1703                            let ctx = SerializationContext {
1704                                data: &SerializationContextData::Workflow,
1705                                converter: payload_converter,
1706                            };
1707                            payload_converter
1708                                .from_payloads::<Output>(&ctx, payloads)
1709                                .map_err(ChildWorkflowExecutionError::Serialization)
1710                        }
1711                        child_workflow_result::Status::Failed(f) => {
1712                            Err(ChildWorkflowExecutionError::Failed(Box::new(
1713                                f.failure.unwrap_or_default(),
1714                            )))
1715                        }
1716                        child_workflow_result::Status::Cancelled(c) => {
1717                            Err(ChildWorkflowExecutionError::Cancelled(Box::new(
1718                                c.failure.unwrap_or_default(),
1719                            )))
1720                        }
1721                    }
1722                }),
1723            },
1724            ChildWorkflowFut::Terminated => panic!("polled after termination"),
1725        };
1726        if poll.is_ready() {
1727            *this = ChildWorkflowFut::Terminated;
1728        }
1729        poll
1730    }
1731}
1732
1733impl<F, Output> FusedFuture for ChildWorkflowFut<F, Output>
1734where
1735    F: Future<Output = ChildWorkflowResult> + Unpin,
1736    Output: TemporalDeserializable + 'static,
1737{
1738    fn is_terminated(&self) -> bool {
1739        matches!(self, ChildWorkflowFut::Terminated)
1740    }
1741}
1742
1743impl<F, Output> CancellableFutureWithReason<Result<Output, ChildWorkflowExecutionError>>
1744    for ChildWorkflowFut<F, Output>
1745where
1746    F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
1747    Output: TemporalDeserializable + 'static,
1748{
1749    fn cancel_with_reason(&self, reason: String) {
1750        if let ChildWorkflowFut::Running { inner, .. } = self {
1751            inner.cancel_with_reason(reason)
1752        }
1753    }
1754}
1755
1756impl<F, Output> CancellableFuture<Result<Output, ChildWorkflowExecutionError>>
1757    for ChildWorkflowFut<F, Output>
1758where
1759    F: CancellableFutureWithReason<ChildWorkflowResult> + Unpin,
1760    Output: TemporalDeserializable + 'static,
1761{
1762    fn cancel(&self) {
1763        if let ChildWorkflowFut::Running { inner, .. } = self {
1764            inner.cancel()
1765        }
1766    }
1767}
1768
1769/// Wrapper future for starting a child workflow. Mirrors `ActivityFut` to allow returning
1770/// serialization errors eagerly.
1771enum ChildWorkflowStartFut<F, WD: WorkflowDefinition> {
1772    /// Immediate error (e.g., input serialization failure). Resolves on first poll.
1773    Errored {
1774        error: Option<ChildWorkflowExecutionError>,
1775        _phantom: PhantomData<WD>,
1776    },
1777    Running(F),
1778    Terminated,
1779}
1780
1781impl<F, WD: WorkflowDefinition> ChildWorkflowStartFut<F, WD> {
1782    fn eager(err: ChildWorkflowExecutionError) -> Self {
1783        Self::Errored {
1784            error: Some(err),
1785            _phantom: PhantomData,
1786        }
1787    }
1788}
1789
1790impl<F, WD: WorkflowDefinition> Unpin for ChildWorkflowStartFut<F, WD> where F: Unpin {}
1791
1792impl<F, WD> Future for ChildWorkflowStartFut<F, WD>
1793where
1794    F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
1795    WD: WorkflowDefinition,
1796{
1797    type Output = Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>;
1798
1799    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1800        let this = self.get_mut();
1801        let poll = match this {
1802            ChildWorkflowStartFut::Errored { error, .. } => {
1803                Poll::Ready(Err(error.take().expect("polled after completion")))
1804            }
1805            ChildWorkflowStartFut::Running(inner) => match Pin::new(inner).poll(cx) {
1806                Poll::Pending => Poll::Pending,
1807                Poll::Ready(pending) => Poll::Ready(match pending.status {
1808                    ChildWorkflowStartStatus::Succeeded(s) => Ok(StartedChildWorkflow {
1809                        run_id: s.run_id,
1810                        common: pending.common,
1811                        _phantom: PhantomData,
1812                    }),
1813                    ChildWorkflowStartStatus::Failed(f) => {
1814                        Err(ChildWorkflowExecutionError::StartFailed {
1815                            workflow_id: f.workflow_id,
1816                            workflow_type: f.workflow_type,
1817                            cause: StartChildWorkflowExecutionFailedCause::try_from(f.cause)
1818                                .unwrap_or(StartChildWorkflowExecutionFailedCause::Unspecified),
1819                        })
1820                    }
1821                    ChildWorkflowStartStatus::Cancelled(c) => {
1822                        Err(ChildWorkflowExecutionError::Cancelled(Box::new(
1823                            c.failure.unwrap_or_default(),
1824                        )))
1825                    }
1826                }),
1827            },
1828            ChildWorkflowStartFut::Terminated => panic!("polled after termination"),
1829        };
1830        if poll.is_ready() {
1831            *this = ChildWorkflowStartFut::Terminated;
1832        }
1833        poll
1834    }
1835}
1836
1837impl<F, WD> FusedFuture for ChildWorkflowStartFut<F, WD>
1838where
1839    F: Future<Output = PendingChildWorkflow<WD>> + Unpin,
1840    WD: WorkflowDefinition,
1841{
1842    fn is_terminated(&self) -> bool {
1843        matches!(self, ChildWorkflowStartFut::Terminated)
1844    }
1845}
1846
1847impl<F, WD> CancellableFuture<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
1848    for ChildWorkflowStartFut<F, WD>
1849where
1850    F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
1851    WD: WorkflowDefinition,
1852{
1853    fn cancel(&self) {
1854        if let ChildWorkflowStartFut::Running(inner) = self {
1855            inner.cancel()
1856        }
1857    }
1858}
1859
1860impl<F, WD>
1861    CancellableFutureWithReason<Result<StartedChildWorkflow<WD>, ChildWorkflowExecutionError>>
1862    for ChildWorkflowStartFut<F, WD>
1863where
1864    F: CancellableFutureWithReason<PendingChildWorkflow<WD>> + Unpin,
1865    WD: WorkflowDefinition,
1866{
1867    fn cancel_with_reason(&self, reason: String) {
1868        if let ChildWorkflowStartFut::Running(inner) = self {
1869            inner.cancel_with_reason(reason)
1870        }
1871    }
1872}
1873
1874/// Wrapper future for signaling a child workflow. Allows returning serialization errors
1875/// eagerly instead of panicking.
1876enum SignalChildFut<F> {
1877    /// Immediate error (e.g., signal input serialization failure). Resolves on first poll.
1878    Errored {
1879        error: Option<ChildWorkflowSignalError>,
1880    },
1881    Running(F),
1882    Terminated,
1883}
1884
1885impl<F> SignalChildFut<F> {
1886    fn eager(err: ChildWorkflowSignalError) -> Self {
1887        Self::Errored { error: Some(err) }
1888    }
1889}
1890
1891impl<F> Unpin for SignalChildFut<F> where F: Unpin {}
1892
1893impl<F> Future for SignalChildFut<F>
1894where
1895    F: Future<Output = SignalExternalWfResult> + Unpin,
1896{
1897    type Output = Result<(), ChildWorkflowSignalError>;
1898
1899    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1900        let this = self.get_mut();
1901        let poll = match this {
1902            SignalChildFut::Errored { error } => {
1903                Poll::Ready(Err(error.take().expect("polled after completion")))
1904            }
1905            SignalChildFut::Running(inner) => match Pin::new(inner).poll(cx) {
1906                Poll::Pending => Poll::Pending,
1907                Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
1908                Poll::Ready(Err(failure)) => {
1909                    Poll::Ready(Err(ChildWorkflowSignalError::Failed(Box::new(failure))))
1910                }
1911            },
1912            SignalChildFut::Terminated => panic!("polled after termination"),
1913        };
1914        if poll.is_ready() {
1915            *this = SignalChildFut::Terminated;
1916        }
1917        poll
1918    }
1919}
1920
1921impl<F> FusedFuture for SignalChildFut<F>
1922where
1923    F: Future<Output = SignalExternalWfResult> + Unpin,
1924{
1925    fn is_terminated(&self) -> bool {
1926        matches!(self, SignalChildFut::Terminated)
1927    }
1928}
1929
1930impl<F> CancellableFuture<Result<(), ChildWorkflowSignalError>> for SignalChildFut<F>
1931where
1932    F: CancellableFuture<SignalExternalWfResult> + Unpin,
1933{
1934    fn cancel(&self) {
1935        if let SignalChildFut::Running(inner) = self {
1936            inner.cancel()
1937        }
1938    }
1939}
1940
1941impl<WD: WorkflowDefinition> StartedChildWorkflow<WD>
1942where
1943    WD::Output: TemporalDeserializable + 'static,
1944{
1945    /// Consumes self and returns a future that deserializes the child workflow result
1946    /// into `WD::Output`.
1947    pub fn result(
1948        self,
1949    ) -> impl CancellableFutureWithReason<Result<WD::Output, ChildWorkflowExecutionError>> {
1950        ChildWorkflowFut::Running {
1951            inner: self.common.result_future,
1952            payload_converter: self.common.payload_converter,
1953            _phantom: PhantomData,
1954        }
1955    }
1956
1957    /// Cancel the child workflow
1958    pub fn cancel(&self, reason: String) {
1959        self.common.base_ctx.send(RustWfCmd::NewNonblockingCmd(
1960            CancelChildWorkflowExecution {
1961                child_workflow_seq: self.common.child_seq,
1962                reason,
1963            }
1964            .into(),
1965        ));
1966    }
1967
1968    /// Send a typed signal to the child workflow.
1969    pub fn signal<S: SignalDefinition<Workflow = WD>>(
1970        &self,
1971        signal: S,
1972        input: S::Input,
1973    ) -> impl CancellableFuture<Result<(), ChildWorkflowSignalError>> + 'static {
1974        let ctx = SerializationContext {
1975            data: &SerializationContextData::Workflow,
1976            converter: &self.common.payload_converter,
1977        };
1978        let payloads = match self.common.payload_converter.to_payloads(&ctx, &input) {
1979            Ok(p) => p,
1980            Err(e) => {
1981                return SignalChildFut::eager(e.into());
1982            }
1983        };
1984        let signal = Signal::new(S::name(&signal), payloads);
1985        let target = sig_we::Target::ChildWorkflowId(self.common.workflow_id.clone());
1986        SignalChildFut::Running(self.common.base_ctx.clone().send_signal_wf(target, signal))
1987    }
1988}
1989
1990/// Handle to an external workflow for sending signals or requesting cancellation.
1991///
1992/// Obtained via [`SyncWorkflowContext::external_workflow`] or
1993/// [`WorkflowContext::external_workflow`].
1994#[derive(derive_more::Debug)]
1995pub struct ExternalWorkflowHandle {
1996    workflow_id: String,
1997    run_id: Option<String>,
1998    namespace: String,
1999    #[debug(skip)]
2000    base_ctx: BaseWorkflowContext,
2001}
2002
2003impl ExternalWorkflowHandle {
2004    /// The workflow ID of the external workflow.
2005    pub fn workflow_id(&self) -> &str {
2006        &self.workflow_id
2007    }
2008
2009    /// The run ID of the external workflow, or `None` if targeting the latest run.
2010    pub fn run_id(&self) -> Option<&str> {
2011        self.run_id.as_deref()
2012    }
2013
2014    /// Send a signal to the external workflow.
2015    pub fn signal<S: SignalDefinition>(
2016        &self,
2017        signal: S,
2018        input: S::Input,
2019    ) -> impl CancellableFuture<SignalExternalWfResult> + 'static {
2020        let ctx = SerializationContext {
2021            data: &SerializationContextData::Workflow,
2022            converter: &self.base_ctx.inner.payload_converter,
2023        };
2024        let payloads = match self
2025            .base_ctx
2026            .inner
2027            .payload_converter
2028            .to_payloads(&ctx, &input)
2029        {
2030            Ok(p) => p,
2031            Err(e) => {
2032                return SignalExternalFut::SerializationError(Some(e));
2033            }
2034        };
2035        let signal = Signal::new(S::name(&signal), payloads);
2036        let target = sig_we::Target::WorkflowExecution(NamespacedWorkflowExecution {
2037            namespace: self.namespace.clone(),
2038            workflow_id: self.workflow_id.clone(),
2039            run_id: self.run_id.clone().unwrap_or_default(),
2040        });
2041        SignalExternalFut::Running(self.base_ctx.clone().send_signal_wf(target, signal))
2042    }
2043
2044    /// Request cancellation of the external workflow.
2045    pub fn cancel(
2046        &self,
2047        reason: Option<String>,
2048    ) -> impl FusedFuture<Output = CancelExternalWfResult> {
2049        let seq = self
2050            .base_ctx
2051            .inner
2052            .seq_nums
2053            .borrow_mut()
2054            .next_cancel_external_wf_seq();
2055        let (cmd, unblocker) = WFCommandFut::new();
2056        self.base_ctx.send(
2057            CommandCreateRequest {
2058                cmd: WorkflowCommand {
2059                    variant: Some(
2060                        RequestCancelExternalWorkflowExecution {
2061                            seq,
2062                            workflow_execution: Some(NamespacedWorkflowExecution {
2063                                namespace: self.namespace.clone(),
2064                                workflow_id: self.workflow_id.clone(),
2065                                run_id: self.run_id.clone().unwrap_or_default(),
2066                            }),
2067                            reason: reason.unwrap_or_default(),
2068                        }
2069                        .into(),
2070                    ),
2071                    user_metadata: None,
2072                },
2073                unblocker,
2074            }
2075            .into(),
2076        );
2077        cmd
2078    }
2079}
2080
2081enum SignalExternalFut<F> {
2082    Running(F),
2083    SerializationError(Option<PayloadConversionError>),
2084    Done,
2085}
2086
2087impl<F: Unpin> Unpin for SignalExternalFut<F> {}
2088
2089impl<F> Future for SignalExternalFut<F>
2090where
2091    F: Future<Output = SignalExternalWfResult> + Unpin,
2092{
2093    type Output = SignalExternalWfResult;
2094
2095    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2096        let this = self.get_mut();
2097        match this {
2098            SignalExternalFut::Running(inner) => {
2099                let result = std::task::ready!(Pin::new(inner).poll(cx));
2100                *this = SignalExternalFut::Done;
2101                Poll::Ready(result)
2102            }
2103            SignalExternalFut::SerializationError(e) => {
2104                let err = e.take().expect("polled after completion");
2105                *this = SignalExternalFut::Done;
2106                Poll::Ready(Err(Failure {
2107                    message: format!("Failed to serialize signal input: {err}"),
2108                    ..Default::default()
2109                }))
2110            }
2111            SignalExternalFut::Done => panic!("polled after completion"),
2112        }
2113    }
2114}
2115
2116impl<F> FusedFuture for SignalExternalFut<F>
2117where
2118    F: Future<Output = SignalExternalWfResult> + Unpin,
2119{
2120    fn is_terminated(&self) -> bool {
2121        matches!(self, SignalExternalFut::Done)
2122    }
2123}
2124
2125impl<F> CancellableFuture<SignalExternalWfResult> for SignalExternalFut<F>
2126where
2127    F: CancellableFuture<SignalExternalWfResult> + Unpin,
2128{
2129    fn cancel(&self) {
2130        if let SignalExternalFut::Running(inner) = self {
2131            inner.cancel()
2132        }
2133    }
2134}
2135
2136#[derive(derive_more::Debug)]
2137#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
2138pub struct StartedNexusOperation {
2139    /// The operation token, if the operation started asynchronously
2140    pub operation_token: Option<String>,
2141    pub(crate) unblock_dat: NexusUnblockData,
2142}
2143
2144pub(crate) struct NexusUnblockData {
2145    result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
2146    schedule_seq: u32,
2147    base_ctx: BaseWorkflowContext,
2148}
2149
2150impl StartedNexusOperation {
2151    pub async fn result(&self) -> NexusOperationResult {
2152        self.unblock_dat.result_future.clone().await
2153    }
2154
2155    pub fn cancel(&self) {
2156        self.unblock_dat
2157            .base_ctx
2158            .cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
2159    }
2160}
2161
2162#[cfg(test)]
2163mod tests {
2164    use super::*;
2165    use std::collections::HashMap;
2166    use temporalio_common::{
2167        data_converters::{TemporalDeserializable, TemporalSerializable},
2168        protos::{
2169            coresdk::{AsJsonPayloadExt, common::VersioningIntent},
2170            temporal::api::common::v1::{Payload, RetryPolicy},
2171        },
2172    };
2173    use temporalio_macros::{workflow, workflow_methods};
2174
2175    #[workflow]
2176    #[derive(Default)]
2177    struct TestWorkflow;
2178
2179    #[workflow_methods]
2180    impl TestWorkflow {
2181        #[run]
2182        async fn run(_ctx: &mut WorkflowContext<Self>, _input: u8) -> crate::WorkflowResult<()> {
2183            unreachable!("test workflow run should not be polled")
2184        }
2185    }
2186
2187    fn test_context() -> WorkflowContext<TestWorkflow> {
2188        let init = InitializeWorkflow {
2189            workflow_type: TestWorkflow.name().to_string(),
2190            ..Default::default()
2191        };
2192        let (_, cancelled_rx) = watch::channel(None);
2193        let (base, _cmd_rx) = BaseWorkflowContext::new(
2194            "default".to_string(),
2195            "orig-task-queue".to_string(),
2196            "run-id".to_string(),
2197            init,
2198            cancelled_rx,
2199            PayloadConverter::default(),
2200        );
2201        WorkflowContext::from_base(base, Rc::new(RefCell::new(TestWorkflow)))
2202    }
2203
2204    #[test]
2205    fn workflow_context_continue_as_new_serializes_input_and_defaults() {
2206        let ctx = test_context();
2207
2208        let termination = ctx
2209            .continue_as_new(&7, ContinueAsNewOptions::default())
2210            .expect_err("continue_as_new should terminate the workflow");
2211        assert!(
2212            matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2213            "expected continue-as-new termination, got {termination:?}"
2214        );
2215        let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2216            unreachable!()
2217        };
2218
2219        assert_eq!(
2220            *cmd,
2221            temporalio_common::protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution {
2222                workflow_type: TestWorkflow.name().to_string(),
2223                arguments: vec![7u8.as_json_payload().unwrap()],
2224                versioning_intent: VersioningIntent::Unspecified as i32,
2225                ..Default::default()
2226            }
2227        );
2228    }
2229
2230    #[test]
2231    fn sync_workflow_context_continue_as_new_applies_options() {
2232        let ctx = test_context();
2233        let sync = ctx.sync_context();
2234        let mut memo = HashMap::new();
2235        memo.insert(
2236            "memo-key".to_string(),
2237            Payload::from(b"memo-value".as_slice()),
2238        );
2239        let mut headers = HashMap::new();
2240        headers.insert(
2241            "header-key".to_string(),
2242            Payload::from(b"header-value".as_slice()),
2243        );
2244        let mut search_attributes = SearchAttributes::default();
2245        search_attributes.indexed_fields.insert(
2246            "CustomKeywordField".to_string(),
2247            Payload::from(b"value".as_slice()),
2248        );
2249
2250        let termination = sync
2251            .continue_as_new(
2252                &11,
2253                ContinueAsNewOptions {
2254                    workflow_type: Some("next-workflow".to_string()),
2255                    task_queue: Some("next-task-queue".to_string()),
2256                    run_timeout: Some(Duration::from_secs(10)),
2257                    task_timeout: Some(Duration::from_secs(3)),
2258                    memo: Some(memo.clone()),
2259                    headers: Some(headers.clone()),
2260                    search_attributes: Some(search_attributes.clone()),
2261                    retry_policy: Some(RetryPolicy {
2262                        maximum_attempts: 5,
2263                        ..Default::default()
2264                    }),
2265                    versioning_intent: Some(VersioningIntent::Compatible),
2266                },
2267            )
2268            .expect_err("continue_as_new should terminate the workflow");
2269        assert!(
2270            matches!(termination, WorkflowTermination::ContinueAsNew(_)),
2271            "expected continue-as-new termination, got {termination:?}"
2272        );
2273        let WorkflowTermination::ContinueAsNew(cmd) = termination else {
2274            unreachable!()
2275        };
2276
2277        assert_eq!(
2278            *cmd,
2279            temporalio_common::protos::coresdk::workflow_commands::ContinueAsNewWorkflowExecution {
2280                workflow_type: "next-workflow".to_string(),
2281                task_queue: "next-task-queue".to_string(),
2282                arguments: vec![11u8.as_json_payload().unwrap()],
2283                workflow_run_timeout: Some(Duration::from_secs(10).try_into().unwrap()),
2284                workflow_task_timeout: Some(Duration::from_secs(3).try_into().unwrap()),
2285                memo,
2286                headers,
2287                search_attributes: Some(search_attributes),
2288                retry_policy: Some(RetryPolicy {
2289                    maximum_attempts: 5,
2290                    ..Default::default()
2291                }),
2292                versioning_intent: VersioningIntent::Compatible as i32,
2293                ..Default::default()
2294            }
2295        );
2296    }
2297
2298    #[test]
2299    fn continue_as_new_reports_serialization_errors() {
2300        #[derive(Debug)]
2301        struct FailingInput;
2302
2303        impl TemporalSerializable for FailingInput {
2304            fn to_payload(
2305                &self,
2306                _ctx: &temporalio_common::data_converters::SerializationContext<'_>,
2307            ) -> Result<Payload, temporalio_common::data_converters::PayloadConversionError>
2308            {
2309                Err(
2310                    temporalio_common::data_converters::PayloadConversionError::EncodingError(
2311                        std::io::Error::other("serialization failure").into(),
2312                    ),
2313                )
2314            }
2315        }
2316
2317        impl TemporalDeserializable for FailingInput {
2318            fn from_payload(
2319                _ctx: &temporalio_common::data_converters::SerializationContext<'_>,
2320                _payload: Payload,
2321            ) -> Result<Self, temporalio_common::data_converters::PayloadConversionError>
2322            {
2323                unreachable!("test input is only serialized")
2324            }
2325        }
2326
2327        #[workflow]
2328        #[derive(Default)]
2329        struct FailingWorkflow;
2330
2331        #[workflow_methods]
2332        impl FailingWorkflow {
2333            #[run]
2334            async fn run(
2335                _ctx: &mut WorkflowContext<Self>,
2336                _input: FailingInput,
2337            ) -> crate::WorkflowResult<()> {
2338                unreachable!("test workflow run should not be polled")
2339            }
2340        }
2341
2342        let init = InitializeWorkflow {
2343            workflow_type: "failing-workflow".to_string(),
2344            ..Default::default()
2345        };
2346        let (_, cancelled_rx) = watch::channel(None);
2347        let (base, _cmd_rx) = BaseWorkflowContext::new(
2348            "default".to_string(),
2349            "orig-task-queue".to_string(),
2350            "run-id".to_string(),
2351            init,
2352            cancelled_rx,
2353            PayloadConverter::default(),
2354        );
2355        let ctx = WorkflowContext::from_base(base, Rc::new(RefCell::new(FailingWorkflow)));
2356
2357        let err = ctx
2358            .continue_as_new(&FailingInput, ContinueAsNewOptions::default())
2359            .expect_err("serialization errors should be surfaced");
2360
2361        let WorkflowTermination::Failed(err) = err else {
2362            panic!("expected failed termination, got {err:?}");
2363        };
2364        assert_eq!(err.to_string(), "Encoding error: serialization failure");
2365    }
2366}