squads_temporal_sdk/
workflow_context.rs

1mod options;
2
3pub use options::{
4    ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, Signal,
5    SignalData, SignalWorkflowOptions, TimerOptions,
6};
7
8use crate::{
9    CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
10    CommandSubscribeChildWorkflowCompletion, IntoUpdateHandlerFunc, IntoUpdateValidatorFunc,
11    NexusStartResult, RustWfCmd, SignalExternalWfResult, SupportsCancelReason, TimerResult,
12    UnblockEvent, Unblockable, UpdateFunctions, workflow_context::options::IntoWorkflowCommand,
13};
14use futures_util::{FutureExt, Stream, StreamExt, future::Shared, task::Context};
15use parking_lot::{RwLock, RwLockReadGuard};
16use std::{
17    collections::HashMap,
18    future,
19    future::Future,
20    marker::PhantomData,
21    ops::Deref,
22    pin::Pin,
23    sync::{
24        Arc,
25        atomic::{AtomicBool, Ordering},
26        mpsc::{Receiver, Sender},
27    },
28    task::Poll,
29    time::{Duration, SystemTime},
30};
31use squads_temporal_sdk_core_api::worker::WorkerDeploymentVersion;
32use squads_temporal_sdk_core_protos::{
33    coresdk::{
34        activity_result::{ActivityResolution, activity_resolution},
35        child_workflow::ChildWorkflowResult,
36        common::NamespacedWorkflowExecution,
37        nexus::NexusOperationResult,
38        workflow_activation::{
39            InitializeWorkflow,
40            resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
41        },
42        workflow_commands::{
43            CancelChildWorkflowExecution, ModifyWorkflowProperties,
44            RequestCancelExternalWorkflowExecution, SetPatchMarker,
45            SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
46            WorkflowCommand, signal_external_workflow_execution as sig_we, workflow_command,
47        },
48    },
49    temporal::api::{
50        common::v1::{Memo, Payload, SearchAttributes},
51        sdk::v1::UserMetadata,
52    },
53};
54use tokio::sync::{mpsc, oneshot, watch};
55use tokio_stream::wrappers::UnboundedReceiverStream;
56
57/// Used within workflows to issue commands, get info, etc.
58#[derive(Clone)]
59pub struct WfContext {
60    namespace: String,
61    task_queue: String,
62    inital_information: Arc<InitializeWorkflow>,
63
64    chan: Sender<RustWfCmd>,
65    am_cancelled: watch::Receiver<Option<String>>,
66    pub(crate) shared: Arc<RwLock<WfContextSharedData>>,
67
68    seq_nums: Arc<RwLock<WfCtxProtectedDat>>,
69}
70
71// TODO: Dataconverter type interface to replace Payloads here. Possibly just use serde
72//    traits.
73impl WfContext {
74    /// Create a new wf context, returning the context itself and a receiver which outputs commands
75    /// sent from the workflow.
76    pub(super) fn new(
77        namespace: String,
78        task_queue: String,
79        init_workflow_job: InitializeWorkflow,
80        am_cancelled: watch::Receiver<Option<String>>,
81    ) -> (Self, Receiver<RustWfCmd>) {
82        // The receiving side is non-async
83        let (chan, rx) = std::sync::mpsc::channel();
84        (
85            Self {
86                namespace,
87                task_queue,
88                shared: Arc::new(RwLock::new(WfContextSharedData {
89                    random_seed: init_workflow_job.randomness_seed,
90                    search_attributes: init_workflow_job
91                        .search_attributes
92                        .clone()
93                        .unwrap_or_default(),
94                    ..Default::default()
95                })),
96                inital_information: Arc::new(init_workflow_job),
97                chan,
98                am_cancelled,
99                seq_nums: Arc::new(RwLock::new(WfCtxProtectedDat {
100                    next_timer_sequence_number: 1,
101                    next_activity_sequence_number: 1,
102                    next_child_workflow_sequence_number: 1,
103                    next_cancel_external_wf_sequence_number: 1,
104                    next_signal_external_wf_sequence_number: 1,
105                    next_nexus_op_sequence_number: 1,
106                })),
107            },
108            rx,
109        )
110    }
111
112    /// Return the namespace the workflow is executing in
113    pub fn namespace(&self) -> &str {
114        &self.namespace
115    }
116
117    /// Return the task queue the workflow is executing in
118    pub fn task_queue(&self) -> &str {
119        &self.task_queue
120    }
121
122    /// Get the arguments provided to the workflow upon execution start
123    pub fn get_args(&self) -> &[Payload] {
124        self.inital_information.arguments.as_slice()
125    }
126
127    /// Return the current time according to the workflow (which is not wall-clock time).
128    pub fn workflow_time(&self) -> Option<SystemTime> {
129        self.shared.read().wf_time
130    }
131
132    /// Return the length of history so far at this point in the workflow
133    pub fn history_length(&self) -> u32 {
134        self.shared.read().history_length
135    }
136
137    /// Return the deployment version, if any,  as it was when this point in the workflow was first
138    /// reached. If this code is being executed for the first time, return this Worker's deployment
139    /// version if it has one.
140    pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
141        self.shared.read().current_deployment_version.clone()
142    }
143
144    /// Return current values for workflow search attributes
145    pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
146        RwLockReadGuard::map(self.shared.read(), |s| &s.search_attributes)
147    }
148
149    /// Return the workflow's randomness seed
150    pub fn random_seed(&self) -> u64 {
151        self.shared.read().random_seed
152    }
153
154    /// Returns true if the current workflow task is happening under replay
155    pub fn is_replaying(&self) -> bool {
156        self.shared.read().is_replaying
157    }
158
159    /// Return various information that the workflow was initialized with. Will eventually become
160    /// a proper non-proto workflow info struct.
161    pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
162        &self.inital_information
163    }
164
165    /// A future that resolves if/when the workflow is cancelled, with the user provided cause
166    pub async fn cancelled(&self) -> String {
167        if let Some(s) = self.am_cancelled.borrow().as_ref() {
168            return s.clone();
169        }
170        self.am_cancelled
171            .clone()
172            .changed()
173            .await
174            .expect("Cancelled send half not dropped");
175        self.am_cancelled
176            .borrow()
177            .as_ref()
178            .cloned()
179            .unwrap_or_default()
180    }
181
182    /// Request to create a timer
183    pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
184        let opts: TimerOptions = opts.into();
185        let seq = self.seq_nums.write().next_timer_seq();
186        let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Timer(seq));
187        self.send(
188            CommandCreateRequest {
189                cmd: WorkflowCommand {
190                    variant: Some(
191                        StartTimer {
192                            seq,
193                            start_to_fire_timeout: Some(
194                                opts.duration
195                                    .try_into()
196                                    .expect("Durations must fit into 64 bits"),
197                            ),
198                        }
199                        .into(),
200                    ),
201                    user_metadata: Some(UserMetadata {
202                        summary: opts.summary.map(|x| x.as_bytes().into()),
203                        details: None,
204                    }),
205                },
206                unblocker,
207            }
208            .into(),
209        );
210        cmd
211    }
212
213    /// Request to run an activity
214    pub fn activity(
215        &self,
216        mut opts: ActivityOptions,
217    ) -> impl CancellableFuture<ActivityResolution> {
218        if opts.task_queue.is_none() {
219            opts.task_queue = Some(self.task_queue.clone());
220        }
221        let seq = self.seq_nums.write().next_activity_seq();
222        let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Activity(seq));
223        self.send(
224            CommandCreateRequest {
225                cmd: opts.into_command(seq),
226                unblocker,
227            }
228            .into(),
229        );
230        cmd
231    }
232
233    /// Request to run a local activity
234    pub fn local_activity(
235        &self,
236        opts: LocalActivityOptions,
237    ) -> impl CancellableFuture<ActivityResolution> + '_ {
238        LATimerBackoffFut::new(opts, self)
239    }
240
241    /// Request to run a local activity with no implementation of timer-backoff based retrying.
242    fn local_activity_no_timer_retry(
243        &self,
244        opts: LocalActivityOptions,
245    ) -> impl CancellableFuture<ActivityResolution> {
246        let seq = self.seq_nums.write().next_activity_seq();
247        let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::LocalActivity(seq));
248        self.send(
249            CommandCreateRequest {
250                cmd: opts.into_command(seq),
251                unblocker,
252            }
253            .into(),
254        );
255        cmd
256    }
257
258    /// Creates a child workflow stub with the provided options
259    pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
260        ChildWorkflow { opts }
261    }
262
263    /// Check (or record) that this workflow history was created with the provided patch
264    pub fn patched(&self, patch_id: &str) -> bool {
265        self.patch_impl(patch_id, false)
266    }
267
268    /// Record that this workflow history was created with the provided patch, and it is being
269    /// phased out.
270    pub fn deprecate_patch(&self, patch_id: &str) -> bool {
271        self.patch_impl(patch_id, true)
272    }
273
274    fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
275        self.send(
276            workflow_command::Variant::SetPatchMarker(SetPatchMarker {
277                patch_id: patch_id.to_string(),
278                deprecated,
279            })
280            .into(),
281        );
282        // See if we already know about the status of this change
283        if let Some(present) = self.shared.read().changes.get(patch_id) {
284            return *present;
285        }
286
287        // If we don't already know about the change, that means there is no marker in history,
288        // and we should return false if we are replaying
289        let res = !self.shared.read().is_replaying;
290
291        self.shared
292            .write()
293            .changes
294            .insert(patch_id.to_string(), res);
295
296        res
297    }
298
299    /// Send a signal to an external workflow. May resolve as a failure if the signal didn't work
300    /// or was cancelled.
301    pub fn signal_workflow(
302        &self,
303        opts: impl Into<SignalWorkflowOptions>,
304    ) -> impl CancellableFuture<SignalExternalWfResult> {
305        let options: SignalWorkflowOptions = opts.into();
306        let target = sig_we::Target::WorkflowExecution(NamespacedWorkflowExecution {
307            namespace: self.namespace.clone(),
308            workflow_id: options.workflow_id,
309            run_id: options.run_id.unwrap_or_default(),
310        });
311        self.send_signal_wf(target, options.signal)
312    }
313
314    /// Add or create a set of search attributes
315    pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
316        self.send(RustWfCmd::NewNonblockingCmd(
317            workflow_command::Variant::UpsertWorkflowSearchAttributes(
318                UpsertWorkflowSearchAttributes {
319                    search_attributes: HashMap::from_iter(attr_iter),
320                },
321            ),
322        ))
323    }
324
325    /// Add or create a set of search attributes
326    pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
327        self.send(RustWfCmd::NewNonblockingCmd(
328            workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
329                upserted_memo: Some(Memo {
330                    fields: HashMap::from_iter(attr_iter),
331                }),
332            }),
333        ))
334    }
335
336    /// Return a stream that produces values when the named signal is sent to this workflow
337    pub fn make_signal_channel(&self, signal_name: impl Into<String>) -> DrainableSignalStream {
338        let (tx, rx) = mpsc::unbounded_channel();
339        self.send(RustWfCmd::SubscribeSignal(signal_name.into(), tx));
340        DrainableSignalStream(UnboundedReceiverStream::new(rx))
341    }
342
343    /// Force a workflow task failure (EX: in order to retry on non-sticky queue)
344    pub fn force_task_fail(&self, with: anyhow::Error) {
345        self.send(with.into());
346    }
347
348    /// Request the cancellation of an external workflow. May resolve as a failure if the workflow
349    /// was not found or the cancel was otherwise unsendable.
350    pub fn cancel_external(
351        &self,
352        target: NamespacedWorkflowExecution,
353        reason: String,
354    ) -> impl Future<Output = CancelExternalWfResult> {
355        let seq = self.seq_nums.write().next_cancel_external_wf_seq();
356        let (cmd, unblocker) = WFCommandFut::new();
357        self.send(
358            CommandCreateRequest {
359                cmd: WorkflowCommand {
360                    variant: Some(
361                        RequestCancelExternalWorkflowExecution {
362                            seq,
363                            workflow_execution: Some(target),
364                            reason,
365                        }
366                        .into(),
367                    ),
368                    user_metadata: None,
369                },
370                unblocker,
371            }
372            .into(),
373        );
374        cmd
375    }
376
377    /// Register an update handler by providing the handler name, a validator function, and an
378    /// update handler. The validator must not mutate workflow state and is synchronous. The handler
379    /// may mutate workflow state (though, that's annoying right now in the prototype) and is async.
380    ///
381    /// Note that if you want a validator that always passes, you will likely need to provide type
382    /// annotations to make the compiler happy, like: `|_: &_, _: T| Ok(())`
383    pub fn update_handler<Arg, Res>(
384        &self,
385        name: impl Into<String>,
386        validator: impl IntoUpdateValidatorFunc<Arg>,
387        handler: impl IntoUpdateHandlerFunc<Arg, Res>,
388    ) {
389        self.send(RustWfCmd::RegisterUpdate(
390            name.into(),
391            UpdateFunctions::new(validator, handler),
392        ))
393    }
394
395    /// Start a nexus operation
396    pub fn start_nexus_operation(
397        &self,
398        opts: NexusOperationOptions,
399    ) -> impl CancellableFuture<NexusStartResult> {
400        let seq = self.seq_nums.write().next_nexus_op_seq();
401        let (result_future, unblocker) = WFCommandFut::new();
402        self.send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker });
403        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
404            CancellableID::NexusOp(seq),
405            NexusUnblockData {
406                result_future: result_future.shared(),
407                schedule_seq: seq,
408            },
409        );
410        self.send(
411            CommandCreateRequest {
412                cmd: opts.into_command(seq),
413                unblocker,
414            }
415            .into(),
416        );
417        cmd
418    }
419
420    /// Wait for some condition to become true, yielding the workflow if it is not.
421    pub fn wait_condition(&self, mut condition: impl FnMut() -> bool) -> impl Future<Output = ()> {
422        future::poll_fn(move |_cx: &mut Context<'_>| {
423            if condition() {
424                Poll::Ready(())
425            } else {
426                Poll::Pending
427            }
428        })
429    }
430
431    /// Buffer a command to be sent in the activation reply
432    pub(crate) fn send(&self, c: RustWfCmd) {
433        self.chan.send(c).expect("command channel intact");
434    }
435
436    fn send_signal_wf(
437        &self,
438        target: sig_we::Target,
439        signal: Signal,
440    ) -> impl CancellableFuture<SignalExternalWfResult> {
441        let seq = self.seq_nums.write().next_signal_external_wf_seq();
442        let (cmd, unblocker) =
443            CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq));
444        self.send(
445            CommandCreateRequest {
446                cmd: WorkflowCommand {
447                    variant: Some(
448                        SignalExternalWorkflowExecution {
449                            seq,
450                            signal_name: signal.signal_name,
451                            args: signal.data.input,
452                            target: Some(target),
453                            headers: signal.data.headers,
454                        }
455                        .into(),
456                    ),
457                    user_metadata: None,
458                },
459                unblocker,
460            }
461            .into(),
462        );
463        cmd
464    }
465
466    /// Cancel any cancellable operation by ID
467    fn cancel(&self, cancellable_id: CancellableID) {
468        self.send(RustWfCmd::Cancel(cancellable_id));
469    }
470}
471
472struct WfCtxProtectedDat {
473    next_timer_sequence_number: u32,
474    next_activity_sequence_number: u32,
475    next_child_workflow_sequence_number: u32,
476    next_cancel_external_wf_sequence_number: u32,
477    next_signal_external_wf_sequence_number: u32,
478    next_nexus_op_sequence_number: u32,
479}
480
481impl WfCtxProtectedDat {
482    fn next_timer_seq(&mut self) -> u32 {
483        let seq = self.next_timer_sequence_number;
484        self.next_timer_sequence_number += 1;
485        seq
486    }
487    fn next_activity_seq(&mut self) -> u32 {
488        let seq = self.next_activity_sequence_number;
489        self.next_activity_sequence_number += 1;
490        seq
491    }
492    fn next_child_workflow_seq(&mut self) -> u32 {
493        let seq = self.next_child_workflow_sequence_number;
494        self.next_child_workflow_sequence_number += 1;
495        seq
496    }
497    fn next_cancel_external_wf_seq(&mut self) -> u32 {
498        let seq = self.next_cancel_external_wf_sequence_number;
499        self.next_cancel_external_wf_sequence_number += 1;
500        seq
501    }
502    fn next_signal_external_wf_seq(&mut self) -> u32 {
503        let seq = self.next_signal_external_wf_sequence_number;
504        self.next_signal_external_wf_sequence_number += 1;
505        seq
506    }
507    fn next_nexus_op_seq(&mut self) -> u32 {
508        let seq = self.next_nexus_op_sequence_number;
509        self.next_nexus_op_sequence_number += 1;
510        seq
511    }
512}
513
514#[derive(Clone, Debug, Default)]
515pub(crate) struct WfContextSharedData {
516    /// Maps change ids -> resolved status
517    pub(crate) changes: HashMap<String, bool>,
518    pub(crate) is_replaying: bool,
519    pub(crate) wf_time: Option<SystemTime>,
520    pub(crate) history_length: u32,
521    pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
522    pub(crate) search_attributes: SearchAttributes,
523    pub(crate) random_seed: u64,
524}
525
526/// Helper Wrapper that can drain the channel into a Vec<SignalData> in a blocking way.  Useful
527/// for making sure channels are empty before ContinueAsNew-ing a workflow
528pub struct DrainableSignalStream(UnboundedReceiverStream<SignalData>);
529
530impl DrainableSignalStream {
531    pub fn drain_all(self) -> Vec<SignalData> {
532        let mut receiver = self.0.into_inner();
533        let mut signals = vec![];
534        while let Ok(s) = receiver.try_recv() {
535            signals.push(s);
536        }
537        signals
538    }
539
540    pub fn drain_ready(&mut self) -> Vec<SignalData> {
541        let mut signals = vec![];
542        while let Some(s) = self.0.next().now_or_never().flatten() {
543            signals.push(s);
544        }
545        signals
546    }
547}
548
549impl Stream for DrainableSignalStream {
550    type Item = SignalData;
551
552    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
553        Pin::new(&mut self.0).poll_next(cx)
554    }
555}
556
557/// A Future that can be cancelled.
558/// Used in the prototype SDK for cancelling operations like timers and activities.
559pub trait CancellableFuture<T>: Future<Output = T> {
560    /// Cancel this Future
561    fn cancel(&self, cx: &WfContext);
562}
563
564/// A Future that can be cancelled with a reason
565pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
566    /// Cancel this Future with a reason
567    fn cancel_with_reason(&self, cx: &WfContext, reason: String);
568}
569
570struct WFCommandFut<T, D> {
571    _unused: PhantomData<T>,
572    result_rx: oneshot::Receiver<UnblockEvent>,
573    other_dat: Option<D>,
574}
575impl<T> WFCommandFut<T, ()> {
576    fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
577        Self::new_with_dat(())
578    }
579}
580
581impl<T, D> WFCommandFut<T, D> {
582    fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
583        let (tx, rx) = oneshot::channel();
584        (
585            Self {
586                _unused: PhantomData,
587                result_rx: rx,
588                other_dat: Some(other_dat),
589            },
590            tx,
591        )
592    }
593}
594
595impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
596impl<T, D> Future for WFCommandFut<T, D>
597where
598    T: Unblockable<OtherDat = D>,
599{
600    type Output = T;
601
602    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
603        self.result_rx.poll_unpin(cx).map(|x| {
604            // SAFETY: Because we can only enter this section once the future has resolved, we
605            // know it will never be polled again, therefore consuming the option is OK.
606            let od = self
607                .other_dat
608                .take()
609                .expect("Other data must exist when resolving command future");
610            Unblockable::unblock(x.unwrap(), od)
611        })
612    }
613}
614
615struct CancellableWFCommandFut<T, D, ID = CancellableID> {
616    cmd_fut: WFCommandFut<T, D>,
617    cancellable_id: ID,
618}
619impl<T, ID> CancellableWFCommandFut<T, (), ID> {
620    fn new(cancellable_id: ID) -> (Self, oneshot::Sender<UnblockEvent>) {
621        Self::new_with_dat(cancellable_id, ())
622    }
623}
624impl<T, D, ID> CancellableWFCommandFut<T, D, ID> {
625    fn new_with_dat(cancellable_id: ID, other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
626        let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
627        (
628            Self {
629                cmd_fut,
630                cancellable_id,
631            },
632            sender,
633        )
634    }
635}
636impl<T, D, ID> Unpin for CancellableWFCommandFut<T, D, ID> where T: Unblockable<OtherDat = D> {}
637impl<T, D, ID> Future for CancellableWFCommandFut<T, D, ID>
638where
639    T: Unblockable<OtherDat = D>,
640{
641    type Output = T;
642
643    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
644        self.cmd_fut.poll_unpin(cx)
645    }
646}
647
648impl<T, D, ID> CancellableFuture<T> for CancellableWFCommandFut<T, D, ID>
649where
650    T: Unblockable<OtherDat = D>,
651    ID: Clone + Into<CancellableID>,
652{
653    fn cancel(&self, cx: &WfContext) {
654        cx.cancel(self.cancellable_id.clone().into());
655    }
656}
657impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D, CancellableIDWithReason>
658where
659    T: Unblockable<OtherDat = D>,
660{
661    fn cancel_with_reason(&self, cx: &WfContext, reason: String) {
662        let new_id = self.cancellable_id.clone().with_reason(reason);
663        cx.cancel(new_id);
664    }
665}
666
667struct LATimerBackoffFut<'a> {
668    la_opts: LocalActivityOptions,
669    current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Send + Unpin + 'a>>,
670    timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Send + Unpin + 'a>>>,
671    ctx: &'a WfContext,
672    next_attempt: u32,
673    next_sched_time: Option<prost_types::Timestamp>,
674    did_cancel: AtomicBool,
675}
676impl<'a> LATimerBackoffFut<'a> {
677    pub(crate) fn new(opts: LocalActivityOptions, ctx: &'a WfContext) -> Self {
678        Self {
679            la_opts: opts.clone(),
680            current_fut: Box::pin(ctx.local_activity_no_timer_retry(opts)),
681            timer_fut: None,
682            ctx,
683            next_attempt: 1,
684            next_sched_time: None,
685            did_cancel: AtomicBool::new(false),
686        }
687    }
688}
689impl Unpin for LATimerBackoffFut<'_> {}
690impl Future for LATimerBackoffFut<'_> {
691    type Output = ActivityResolution;
692
693    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
694        // If the timer exists, wait for it first
695        if let Some(tf) = self.timer_fut.as_mut() {
696            return match tf.poll_unpin(cx) {
697                Poll::Ready(tr) => {
698                    self.timer_fut = None;
699                    // Schedule next LA if this timer wasn't cancelled
700                    if let TimerResult::Fired = tr {
701                        let mut opts = self.la_opts.clone();
702                        opts.attempt = Some(self.next_attempt);
703                        opts.original_schedule_time
704                            .clone_from(&self.next_sched_time);
705                        self.current_fut = Box::pin(self.ctx.local_activity_no_timer_retry(opts));
706                        Poll::Pending
707                    } else {
708                        Poll::Ready(ActivityResolution {
709                            status: Some(
710                                activity_resolution::Status::Cancelled(Default::default()),
711                            ),
712                        })
713                    }
714                }
715                Poll::Pending => Poll::Pending,
716            };
717        }
718        let poll_res = self.current_fut.poll_unpin(cx);
719        if let Poll::Ready(ref r) = poll_res
720            && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
721        {
722            // If we've already said we want to cancel, don't schedule the backoff timer. Just
723            // return cancel status. This can happen if cancel comes after the LA says it wants
724            // to back off but before we have scheduled the timer.
725            if self.did_cancel.load(Ordering::Acquire) {
726                return Poll::Ready(ActivityResolution {
727                    status: Some(activity_resolution::Status::Cancelled(Default::default())),
728                });
729            }
730
731            let timer_f = self.ctx.timer::<Duration>(
732                b.backoff_duration
733                    .expect("Duration is set")
734                    .try_into()
735                    .expect("duration converts ok"),
736            );
737            self.timer_fut = Some(Box::pin(timer_f));
738            self.next_attempt = b.attempt;
739            self.next_sched_time.clone_from(&b.original_schedule_time);
740            return Poll::Pending;
741        }
742        poll_res
743    }
744}
745impl CancellableFuture<ActivityResolution> for LATimerBackoffFut<'_> {
746    fn cancel(&self, ctx: &WfContext) {
747        self.did_cancel.store(true, Ordering::Release);
748        if let Some(tf) = self.timer_fut.as_ref() {
749            tf.cancel(ctx);
750        }
751        self.current_fut.cancel(ctx);
752    }
753}
754
755/// A stub representing an unstarted child workflow.
756#[derive(Default, Debug, Clone)]
757pub struct ChildWorkflow {
758    opts: ChildWorkflowOptions,
759}
760
761pub(crate) struct ChildWfCommon {
762    workflow_id: String,
763    result_future: CancellableWFCommandFut<ChildWorkflowResult, (), CancellableIDWithReason>,
764}
765
766/// Child workflow in pending state
767pub struct PendingChildWorkflow {
768    /// The status of the child workflow start
769    pub status: ChildWorkflowStartStatus,
770    pub(crate) common: ChildWfCommon,
771}
772
773impl PendingChildWorkflow {
774    /// Returns `None` if the child did not start successfully. The returned [StartedChildWorkflow]
775    /// can be used to wait on, signal, or cancel the child workflow.
776    pub fn into_started(self) -> Option<StartedChildWorkflow> {
777        match self.status {
778            ChildWorkflowStartStatus::Succeeded(s) => Some(StartedChildWorkflow {
779                run_id: s.run_id,
780                common: self.common,
781            }),
782            _ => None,
783        }
784    }
785}
786
787/// Child workflow in started state
788pub struct StartedChildWorkflow {
789    /// Run ID of the child workflow
790    pub run_id: String,
791    common: ChildWfCommon,
792}
793
794impl ChildWorkflow {
795    /// Start the child workflow, the returned Future is cancellable.
796    pub fn start(self, cx: &WfContext) -> impl CancellableFutureWithReason<PendingChildWorkflow> {
797        let child_seq = cx.seq_nums.write().next_child_workflow_seq();
798        // Immediately create the command/future for the result, otherwise if the user does
799        // not await the result until *after* we receive an activation for it, there will be nothing
800        // to match when unblocking.
801        let cancel_seq = cx.seq_nums.write().next_cancel_external_wf_seq();
802        let (result_cmd, unblocker) =
803            CancellableWFCommandFut::new(CancellableIDWithReason::ExternalWorkflow {
804                seqnum: cancel_seq,
805                execution: NamespacedWorkflowExecution {
806                    workflow_id: self.opts.workflow_id.clone(),
807                    ..Default::default()
808                },
809            });
810        cx.send(
811            CommandSubscribeChildWorkflowCompletion {
812                seq: child_seq,
813                unblocker,
814            }
815            .into(),
816        );
817
818        let common = ChildWfCommon {
819            workflow_id: self.opts.workflow_id.clone(),
820            result_future: result_cmd,
821        };
822
823        let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
824            CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
825            common,
826        );
827        cx.send(
828            CommandCreateRequest {
829                cmd: self.opts.into_command(child_seq),
830                unblocker,
831            }
832            .into(),
833        );
834
835        cmd
836    }
837}
838
839impl StartedChildWorkflow {
840    /// Consumes self and returns a future that will wait until completion of this child workflow
841    /// execution
842    pub fn result(self) -> impl CancellableFutureWithReason<ChildWorkflowResult> {
843        self.common.result_future
844    }
845
846    /// Cancel the child workflow
847    pub fn cancel(&self, cx: &WfContext, reason: String) {
848        cx.send(RustWfCmd::NewNonblockingCmd(
849            CancelChildWorkflowExecution {
850                child_workflow_seq: self.common.result_future.cancellable_id.seq_num(),
851                reason,
852            }
853            .into(),
854        ));
855    }
856
857    /// Signal the child workflow
858    pub fn signal<'a, S: Into<Signal>>(
859        &self,
860        cx: &'a WfContext,
861        data: S,
862    ) -> impl CancellableFuture<SignalExternalWfResult> + use<'a, S> {
863        let target = sig_we::Target::ChildWorkflowId(self.common.workflow_id.clone());
864        cx.send_signal_wf(target, data.into())
865    }
866}
867
868#[derive(derive_more::Debug)]
869#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
870pub struct StartedNexusOperation {
871    /// The operation token, if the operation started asynchronously
872    pub operation_token: Option<String>,
873    pub(crate) unblock_dat: NexusUnblockData,
874}
875
876pub(crate) struct NexusUnblockData {
877    result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
878    schedule_seq: u32,
879}
880
881impl StartedNexusOperation {
882    pub async fn result(&self) -> NexusOperationResult {
883        self.unblock_dat.result_future.clone().await
884    }
885
886    pub fn cancel(&self, cx: &WfContext) {
887        cx.cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
888    }
889}