1mod options;
2
3pub use options::{
4 ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, Signal,
5 SignalData, SignalWorkflowOptions, TimerOptions,
6};
7
8use crate::{
9 CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
10 CommandSubscribeChildWorkflowCompletion, IntoUpdateHandlerFunc, IntoUpdateValidatorFunc,
11 NexusStartResult, RustWfCmd, SignalExternalWfResult, SupportsCancelReason, TimerResult,
12 UnblockEvent, Unblockable, UpdateFunctions, workflow_context::options::IntoWorkflowCommand,
13};
14use futures_util::{FutureExt, Stream, StreamExt, future::Shared, task::Context};
15use parking_lot::{RwLock, RwLockReadGuard};
16use std::{
17 collections::HashMap,
18 future,
19 future::Future,
20 marker::PhantomData,
21 ops::Deref,
22 pin::Pin,
23 sync::{
24 Arc,
25 atomic::{AtomicBool, Ordering},
26 mpsc::{Receiver, Sender},
27 },
28 task::Poll,
29 time::{Duration, SystemTime},
30};
31use squads_temporal_sdk_core_api::worker::WorkerDeploymentVersion;
32use squads_temporal_sdk_core_protos::{
33 coresdk::{
34 activity_result::{ActivityResolution, activity_resolution},
35 child_workflow::ChildWorkflowResult,
36 common::NamespacedWorkflowExecution,
37 nexus::NexusOperationResult,
38 workflow_activation::{
39 InitializeWorkflow,
40 resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
41 },
42 workflow_commands::{
43 CancelChildWorkflowExecution, ModifyWorkflowProperties,
44 RequestCancelExternalWorkflowExecution, SetPatchMarker,
45 SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
46 WorkflowCommand, signal_external_workflow_execution as sig_we, workflow_command,
47 },
48 },
49 temporal::api::{
50 common::v1::{Memo, Payload, SearchAttributes},
51 sdk::v1::UserMetadata,
52 },
53};
54use tokio::sync::{mpsc, oneshot, watch};
55use tokio_stream::wrappers::UnboundedReceiverStream;
56
57#[derive(Clone)]
59pub struct WfContext {
60 namespace: String,
61 task_queue: String,
62 inital_information: Arc<InitializeWorkflow>,
63
64 chan: Sender<RustWfCmd>,
65 am_cancelled: watch::Receiver<Option<String>>,
66 pub(crate) shared: Arc<RwLock<WfContextSharedData>>,
67
68 seq_nums: Arc<RwLock<WfCtxProtectedDat>>,
69}
70
71impl WfContext {
74 pub(super) fn new(
77 namespace: String,
78 task_queue: String,
79 init_workflow_job: InitializeWorkflow,
80 am_cancelled: watch::Receiver<Option<String>>,
81 ) -> (Self, Receiver<RustWfCmd>) {
82 let (chan, rx) = std::sync::mpsc::channel();
84 (
85 Self {
86 namespace,
87 task_queue,
88 shared: Arc::new(RwLock::new(WfContextSharedData {
89 random_seed: init_workflow_job.randomness_seed,
90 search_attributes: init_workflow_job
91 .search_attributes
92 .clone()
93 .unwrap_or_default(),
94 ..Default::default()
95 })),
96 inital_information: Arc::new(init_workflow_job),
97 chan,
98 am_cancelled,
99 seq_nums: Arc::new(RwLock::new(WfCtxProtectedDat {
100 next_timer_sequence_number: 1,
101 next_activity_sequence_number: 1,
102 next_child_workflow_sequence_number: 1,
103 next_cancel_external_wf_sequence_number: 1,
104 next_signal_external_wf_sequence_number: 1,
105 next_nexus_op_sequence_number: 1,
106 })),
107 },
108 rx,
109 )
110 }
111
112 pub fn namespace(&self) -> &str {
114 &self.namespace
115 }
116
117 pub fn task_queue(&self) -> &str {
119 &self.task_queue
120 }
121
122 pub fn get_args(&self) -> &[Payload] {
124 self.inital_information.arguments.as_slice()
125 }
126
127 pub fn workflow_time(&self) -> Option<SystemTime> {
129 self.shared.read().wf_time
130 }
131
132 pub fn history_length(&self) -> u32 {
134 self.shared.read().history_length
135 }
136
137 pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
141 self.shared.read().current_deployment_version.clone()
142 }
143
144 pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
146 RwLockReadGuard::map(self.shared.read(), |s| &s.search_attributes)
147 }
148
149 pub fn random_seed(&self) -> u64 {
151 self.shared.read().random_seed
152 }
153
154 pub fn is_replaying(&self) -> bool {
156 self.shared.read().is_replaying
157 }
158
159 pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
162 &self.inital_information
163 }
164
165 pub async fn cancelled(&self) -> String {
167 if let Some(s) = self.am_cancelled.borrow().as_ref() {
168 return s.clone();
169 }
170 self.am_cancelled
171 .clone()
172 .changed()
173 .await
174 .expect("Cancelled send half not dropped");
175 self.am_cancelled
176 .borrow()
177 .as_ref()
178 .cloned()
179 .unwrap_or_default()
180 }
181
182 pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
184 let opts: TimerOptions = opts.into();
185 let seq = self.seq_nums.write().next_timer_seq();
186 let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Timer(seq));
187 self.send(
188 CommandCreateRequest {
189 cmd: WorkflowCommand {
190 variant: Some(
191 StartTimer {
192 seq,
193 start_to_fire_timeout: Some(
194 opts.duration
195 .try_into()
196 .expect("Durations must fit into 64 bits"),
197 ),
198 }
199 .into(),
200 ),
201 user_metadata: Some(UserMetadata {
202 summary: opts.summary.map(|x| x.as_bytes().into()),
203 details: None,
204 }),
205 },
206 unblocker,
207 }
208 .into(),
209 );
210 cmd
211 }
212
213 pub fn activity(
215 &self,
216 mut opts: ActivityOptions,
217 ) -> impl CancellableFuture<ActivityResolution> {
218 if opts.task_queue.is_none() {
219 opts.task_queue = Some(self.task_queue.clone());
220 }
221 let seq = self.seq_nums.write().next_activity_seq();
222 let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Activity(seq));
223 self.send(
224 CommandCreateRequest {
225 cmd: opts.into_command(seq),
226 unblocker,
227 }
228 .into(),
229 );
230 cmd
231 }
232
233 pub fn local_activity(
235 &self,
236 opts: LocalActivityOptions,
237 ) -> impl CancellableFuture<ActivityResolution> + '_ {
238 LATimerBackoffFut::new(opts, self)
239 }
240
241 fn local_activity_no_timer_retry(
243 &self,
244 opts: LocalActivityOptions,
245 ) -> impl CancellableFuture<ActivityResolution> {
246 let seq = self.seq_nums.write().next_activity_seq();
247 let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::LocalActivity(seq));
248 self.send(
249 CommandCreateRequest {
250 cmd: opts.into_command(seq),
251 unblocker,
252 }
253 .into(),
254 );
255 cmd
256 }
257
258 pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
260 ChildWorkflow { opts }
261 }
262
263 pub fn patched(&self, patch_id: &str) -> bool {
265 self.patch_impl(patch_id, false)
266 }
267
268 pub fn deprecate_patch(&self, patch_id: &str) -> bool {
271 self.patch_impl(patch_id, true)
272 }
273
274 fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
275 self.send(
276 workflow_command::Variant::SetPatchMarker(SetPatchMarker {
277 patch_id: patch_id.to_string(),
278 deprecated,
279 })
280 .into(),
281 );
282 if let Some(present) = self.shared.read().changes.get(patch_id) {
284 return *present;
285 }
286
287 let res = !self.shared.read().is_replaying;
290
291 self.shared
292 .write()
293 .changes
294 .insert(patch_id.to_string(), res);
295
296 res
297 }
298
299 pub fn signal_workflow(
302 &self,
303 opts: impl Into<SignalWorkflowOptions>,
304 ) -> impl CancellableFuture<SignalExternalWfResult> {
305 let options: SignalWorkflowOptions = opts.into();
306 let target = sig_we::Target::WorkflowExecution(NamespacedWorkflowExecution {
307 namespace: self.namespace.clone(),
308 workflow_id: options.workflow_id,
309 run_id: options.run_id.unwrap_or_default(),
310 });
311 self.send_signal_wf(target, options.signal)
312 }
313
314 pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
316 self.send(RustWfCmd::NewNonblockingCmd(
317 workflow_command::Variant::UpsertWorkflowSearchAttributes(
318 UpsertWorkflowSearchAttributes {
319 search_attributes: HashMap::from_iter(attr_iter),
320 },
321 ),
322 ))
323 }
324
325 pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
327 self.send(RustWfCmd::NewNonblockingCmd(
328 workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
329 upserted_memo: Some(Memo {
330 fields: HashMap::from_iter(attr_iter),
331 }),
332 }),
333 ))
334 }
335
336 pub fn make_signal_channel(&self, signal_name: impl Into<String>) -> DrainableSignalStream {
338 let (tx, rx) = mpsc::unbounded_channel();
339 self.send(RustWfCmd::SubscribeSignal(signal_name.into(), tx));
340 DrainableSignalStream(UnboundedReceiverStream::new(rx))
341 }
342
343 pub fn force_task_fail(&self, with: anyhow::Error) {
345 self.send(with.into());
346 }
347
348 pub fn cancel_external(
351 &self,
352 target: NamespacedWorkflowExecution,
353 reason: String,
354 ) -> impl Future<Output = CancelExternalWfResult> {
355 let seq = self.seq_nums.write().next_cancel_external_wf_seq();
356 let (cmd, unblocker) = WFCommandFut::new();
357 self.send(
358 CommandCreateRequest {
359 cmd: WorkflowCommand {
360 variant: Some(
361 RequestCancelExternalWorkflowExecution {
362 seq,
363 workflow_execution: Some(target),
364 reason,
365 }
366 .into(),
367 ),
368 user_metadata: None,
369 },
370 unblocker,
371 }
372 .into(),
373 );
374 cmd
375 }
376
377 pub fn update_handler<Arg, Res>(
384 &self,
385 name: impl Into<String>,
386 validator: impl IntoUpdateValidatorFunc<Arg>,
387 handler: impl IntoUpdateHandlerFunc<Arg, Res>,
388 ) {
389 self.send(RustWfCmd::RegisterUpdate(
390 name.into(),
391 UpdateFunctions::new(validator, handler),
392 ))
393 }
394
395 pub fn start_nexus_operation(
397 &self,
398 opts: NexusOperationOptions,
399 ) -> impl CancellableFuture<NexusStartResult> {
400 let seq = self.seq_nums.write().next_nexus_op_seq();
401 let (result_future, unblocker) = WFCommandFut::new();
402 self.send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker });
403 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
404 CancellableID::NexusOp(seq),
405 NexusUnblockData {
406 result_future: result_future.shared(),
407 schedule_seq: seq,
408 },
409 );
410 self.send(
411 CommandCreateRequest {
412 cmd: opts.into_command(seq),
413 unblocker,
414 }
415 .into(),
416 );
417 cmd
418 }
419
420 pub fn wait_condition(&self, mut condition: impl FnMut() -> bool) -> impl Future<Output = ()> {
422 future::poll_fn(move |_cx: &mut Context<'_>| {
423 if condition() {
424 Poll::Ready(())
425 } else {
426 Poll::Pending
427 }
428 })
429 }
430
431 pub(crate) fn send(&self, c: RustWfCmd) {
433 self.chan.send(c).expect("command channel intact");
434 }
435
436 fn send_signal_wf(
437 &self,
438 target: sig_we::Target,
439 signal: Signal,
440 ) -> impl CancellableFuture<SignalExternalWfResult> {
441 let seq = self.seq_nums.write().next_signal_external_wf_seq();
442 let (cmd, unblocker) =
443 CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq));
444 self.send(
445 CommandCreateRequest {
446 cmd: WorkflowCommand {
447 variant: Some(
448 SignalExternalWorkflowExecution {
449 seq,
450 signal_name: signal.signal_name,
451 args: signal.data.input,
452 target: Some(target),
453 headers: signal.data.headers,
454 }
455 .into(),
456 ),
457 user_metadata: None,
458 },
459 unblocker,
460 }
461 .into(),
462 );
463 cmd
464 }
465
466 fn cancel(&self, cancellable_id: CancellableID) {
468 self.send(RustWfCmd::Cancel(cancellable_id));
469 }
470}
471
472struct WfCtxProtectedDat {
473 next_timer_sequence_number: u32,
474 next_activity_sequence_number: u32,
475 next_child_workflow_sequence_number: u32,
476 next_cancel_external_wf_sequence_number: u32,
477 next_signal_external_wf_sequence_number: u32,
478 next_nexus_op_sequence_number: u32,
479}
480
481impl WfCtxProtectedDat {
482 fn next_timer_seq(&mut self) -> u32 {
483 let seq = self.next_timer_sequence_number;
484 self.next_timer_sequence_number += 1;
485 seq
486 }
487 fn next_activity_seq(&mut self) -> u32 {
488 let seq = self.next_activity_sequence_number;
489 self.next_activity_sequence_number += 1;
490 seq
491 }
492 fn next_child_workflow_seq(&mut self) -> u32 {
493 let seq = self.next_child_workflow_sequence_number;
494 self.next_child_workflow_sequence_number += 1;
495 seq
496 }
497 fn next_cancel_external_wf_seq(&mut self) -> u32 {
498 let seq = self.next_cancel_external_wf_sequence_number;
499 self.next_cancel_external_wf_sequence_number += 1;
500 seq
501 }
502 fn next_signal_external_wf_seq(&mut self) -> u32 {
503 let seq = self.next_signal_external_wf_sequence_number;
504 self.next_signal_external_wf_sequence_number += 1;
505 seq
506 }
507 fn next_nexus_op_seq(&mut self) -> u32 {
508 let seq = self.next_nexus_op_sequence_number;
509 self.next_nexus_op_sequence_number += 1;
510 seq
511 }
512}
513
514#[derive(Clone, Debug, Default)]
515pub(crate) struct WfContextSharedData {
516 pub(crate) changes: HashMap<String, bool>,
518 pub(crate) is_replaying: bool,
519 pub(crate) wf_time: Option<SystemTime>,
520 pub(crate) history_length: u32,
521 pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
522 pub(crate) search_attributes: SearchAttributes,
523 pub(crate) random_seed: u64,
524}
525
526pub struct DrainableSignalStream(UnboundedReceiverStream<SignalData>);
529
530impl DrainableSignalStream {
531 pub fn drain_all(self) -> Vec<SignalData> {
532 let mut receiver = self.0.into_inner();
533 let mut signals = vec![];
534 while let Ok(s) = receiver.try_recv() {
535 signals.push(s);
536 }
537 signals
538 }
539
540 pub fn drain_ready(&mut self) -> Vec<SignalData> {
541 let mut signals = vec![];
542 while let Some(s) = self.0.next().now_or_never().flatten() {
543 signals.push(s);
544 }
545 signals
546 }
547}
548
549impl Stream for DrainableSignalStream {
550 type Item = SignalData;
551
552 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
553 Pin::new(&mut self.0).poll_next(cx)
554 }
555}
556
557pub trait CancellableFuture<T>: Future<Output = T> {
560 fn cancel(&self, cx: &WfContext);
562}
563
564pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
566 fn cancel_with_reason(&self, cx: &WfContext, reason: String);
568}
569
570struct WFCommandFut<T, D> {
571 _unused: PhantomData<T>,
572 result_rx: oneshot::Receiver<UnblockEvent>,
573 other_dat: Option<D>,
574}
575impl<T> WFCommandFut<T, ()> {
576 fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
577 Self::new_with_dat(())
578 }
579}
580
581impl<T, D> WFCommandFut<T, D> {
582 fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
583 let (tx, rx) = oneshot::channel();
584 (
585 Self {
586 _unused: PhantomData,
587 result_rx: rx,
588 other_dat: Some(other_dat),
589 },
590 tx,
591 )
592 }
593}
594
595impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
596impl<T, D> Future for WFCommandFut<T, D>
597where
598 T: Unblockable<OtherDat = D>,
599{
600 type Output = T;
601
602 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
603 self.result_rx.poll_unpin(cx).map(|x| {
604 let od = self
607 .other_dat
608 .take()
609 .expect("Other data must exist when resolving command future");
610 Unblockable::unblock(x.unwrap(), od)
611 })
612 }
613}
614
615struct CancellableWFCommandFut<T, D, ID = CancellableID> {
616 cmd_fut: WFCommandFut<T, D>,
617 cancellable_id: ID,
618}
619impl<T, ID> CancellableWFCommandFut<T, (), ID> {
620 fn new(cancellable_id: ID) -> (Self, oneshot::Sender<UnblockEvent>) {
621 Self::new_with_dat(cancellable_id, ())
622 }
623}
624impl<T, D, ID> CancellableWFCommandFut<T, D, ID> {
625 fn new_with_dat(cancellable_id: ID, other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
626 let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
627 (
628 Self {
629 cmd_fut,
630 cancellable_id,
631 },
632 sender,
633 )
634 }
635}
636impl<T, D, ID> Unpin for CancellableWFCommandFut<T, D, ID> where T: Unblockable<OtherDat = D> {}
637impl<T, D, ID> Future for CancellableWFCommandFut<T, D, ID>
638where
639 T: Unblockable<OtherDat = D>,
640{
641 type Output = T;
642
643 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
644 self.cmd_fut.poll_unpin(cx)
645 }
646}
647
648impl<T, D, ID> CancellableFuture<T> for CancellableWFCommandFut<T, D, ID>
649where
650 T: Unblockable<OtherDat = D>,
651 ID: Clone + Into<CancellableID>,
652{
653 fn cancel(&self, cx: &WfContext) {
654 cx.cancel(self.cancellable_id.clone().into());
655 }
656}
657impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D, CancellableIDWithReason>
658where
659 T: Unblockable<OtherDat = D>,
660{
661 fn cancel_with_reason(&self, cx: &WfContext, reason: String) {
662 let new_id = self.cancellable_id.clone().with_reason(reason);
663 cx.cancel(new_id);
664 }
665}
666
667struct LATimerBackoffFut<'a> {
668 la_opts: LocalActivityOptions,
669 current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Send + Unpin + 'a>>,
670 timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Send + Unpin + 'a>>>,
671 ctx: &'a WfContext,
672 next_attempt: u32,
673 next_sched_time: Option<prost_types::Timestamp>,
674 did_cancel: AtomicBool,
675}
676impl<'a> LATimerBackoffFut<'a> {
677 pub(crate) fn new(opts: LocalActivityOptions, ctx: &'a WfContext) -> Self {
678 Self {
679 la_opts: opts.clone(),
680 current_fut: Box::pin(ctx.local_activity_no_timer_retry(opts)),
681 timer_fut: None,
682 ctx,
683 next_attempt: 1,
684 next_sched_time: None,
685 did_cancel: AtomicBool::new(false),
686 }
687 }
688}
689impl Unpin for LATimerBackoffFut<'_> {}
690impl Future for LATimerBackoffFut<'_> {
691 type Output = ActivityResolution;
692
693 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
694 if let Some(tf) = self.timer_fut.as_mut() {
696 return match tf.poll_unpin(cx) {
697 Poll::Ready(tr) => {
698 self.timer_fut = None;
699 if let TimerResult::Fired = tr {
701 let mut opts = self.la_opts.clone();
702 opts.attempt = Some(self.next_attempt);
703 opts.original_schedule_time
704 .clone_from(&self.next_sched_time);
705 self.current_fut = Box::pin(self.ctx.local_activity_no_timer_retry(opts));
706 Poll::Pending
707 } else {
708 Poll::Ready(ActivityResolution {
709 status: Some(
710 activity_resolution::Status::Cancelled(Default::default()),
711 ),
712 })
713 }
714 }
715 Poll::Pending => Poll::Pending,
716 };
717 }
718 let poll_res = self.current_fut.poll_unpin(cx);
719 if let Poll::Ready(ref r) = poll_res
720 && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
721 {
722 if self.did_cancel.load(Ordering::Acquire) {
726 return Poll::Ready(ActivityResolution {
727 status: Some(activity_resolution::Status::Cancelled(Default::default())),
728 });
729 }
730
731 let timer_f = self.ctx.timer::<Duration>(
732 b.backoff_duration
733 .expect("Duration is set")
734 .try_into()
735 .expect("duration converts ok"),
736 );
737 self.timer_fut = Some(Box::pin(timer_f));
738 self.next_attempt = b.attempt;
739 self.next_sched_time.clone_from(&b.original_schedule_time);
740 return Poll::Pending;
741 }
742 poll_res
743 }
744}
745impl CancellableFuture<ActivityResolution> for LATimerBackoffFut<'_> {
746 fn cancel(&self, ctx: &WfContext) {
747 self.did_cancel.store(true, Ordering::Release);
748 if let Some(tf) = self.timer_fut.as_ref() {
749 tf.cancel(ctx);
750 }
751 self.current_fut.cancel(ctx);
752 }
753}
754
755#[derive(Default, Debug, Clone)]
757pub struct ChildWorkflow {
758 opts: ChildWorkflowOptions,
759}
760
761pub(crate) struct ChildWfCommon {
762 workflow_id: String,
763 result_future: CancellableWFCommandFut<ChildWorkflowResult, (), CancellableIDWithReason>,
764}
765
766pub struct PendingChildWorkflow {
768 pub status: ChildWorkflowStartStatus,
770 pub(crate) common: ChildWfCommon,
771}
772
773impl PendingChildWorkflow {
774 pub fn into_started(self) -> Option<StartedChildWorkflow> {
777 match self.status {
778 ChildWorkflowStartStatus::Succeeded(s) => Some(StartedChildWorkflow {
779 run_id: s.run_id,
780 common: self.common,
781 }),
782 _ => None,
783 }
784 }
785}
786
787pub struct StartedChildWorkflow {
789 pub run_id: String,
791 common: ChildWfCommon,
792}
793
794impl ChildWorkflow {
795 pub fn start(self, cx: &WfContext) -> impl CancellableFutureWithReason<PendingChildWorkflow> {
797 let child_seq = cx.seq_nums.write().next_child_workflow_seq();
798 let cancel_seq = cx.seq_nums.write().next_cancel_external_wf_seq();
802 let (result_cmd, unblocker) =
803 CancellableWFCommandFut::new(CancellableIDWithReason::ExternalWorkflow {
804 seqnum: cancel_seq,
805 execution: NamespacedWorkflowExecution {
806 workflow_id: self.opts.workflow_id.clone(),
807 ..Default::default()
808 },
809 });
810 cx.send(
811 CommandSubscribeChildWorkflowCompletion {
812 seq: child_seq,
813 unblocker,
814 }
815 .into(),
816 );
817
818 let common = ChildWfCommon {
819 workflow_id: self.opts.workflow_id.clone(),
820 result_future: result_cmd,
821 };
822
823 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
824 CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
825 common,
826 );
827 cx.send(
828 CommandCreateRequest {
829 cmd: self.opts.into_command(child_seq),
830 unblocker,
831 }
832 .into(),
833 );
834
835 cmd
836 }
837}
838
839impl StartedChildWorkflow {
840 pub fn result(self) -> impl CancellableFutureWithReason<ChildWorkflowResult> {
843 self.common.result_future
844 }
845
846 pub fn cancel(&self, cx: &WfContext, reason: String) {
848 cx.send(RustWfCmd::NewNonblockingCmd(
849 CancelChildWorkflowExecution {
850 child_workflow_seq: self.common.result_future.cancellable_id.seq_num(),
851 reason,
852 }
853 .into(),
854 ));
855 }
856
857 pub fn signal<'a, S: Into<Signal>>(
859 &self,
860 cx: &'a WfContext,
861 data: S,
862 ) -> impl CancellableFuture<SignalExternalWfResult> + use<'a, S> {
863 let target = sig_we::Target::ChildWorkflowId(self.common.workflow_id.clone());
864 cx.send_signal_wf(target, data.into())
865 }
866}
867
868#[derive(derive_more::Debug)]
869#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
870pub struct StartedNexusOperation {
871 pub operation_token: Option<String>,
873 pub(crate) unblock_dat: NexusUnblockData,
874}
875
876pub(crate) struct NexusUnblockData {
877 result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
878 schedule_seq: u32,
879}
880
881impl StartedNexusOperation {
882 pub async fn result(&self) -> NexusOperationResult {
883 self.unblock_dat.result_future.clone().await
884 }
885
886 pub fn cancel(&self, cx: &WfContext) {
887 cx.cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
888 }
889}