1mod options;
2
3pub use options::{
4 ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, Signal,
5 SignalData, SignalWorkflowOptions, TimerOptions,
6};
7
8use crate::{
9 CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
10 CommandSubscribeChildWorkflowCompletion, NexusStartResult, RustWfCmd, SignalExternalWfResult,
11 SupportsCancelReason, TimerResult, UnblockEvent, Unblockable,
12 workflow_context::options::IntoWorkflowCommand,
13};
14use futures_util::{
15 FutureExt,
16 future::{FusedFuture, Shared},
17 task::Context,
18};
19use std::{
20 cell::{Ref, RefCell},
21 collections::HashMap,
22 future::{self, Future},
23 marker::PhantomData,
24 ops::{Deref, DerefMut},
25 pin::Pin,
26 rc::Rc,
27 sync::{
28 atomic::{AtomicBool, Ordering},
29 mpsc::{Receiver, Sender},
30 },
31 task::{Poll, Waker},
32 time::{Duration, SystemTime},
33};
34use temporalio_common::{
35 ActivityDefinition,
36 data_converters::{
37 GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
38 SerializationContextData, TemporalDeserializable,
39 },
40 protos::{
41 coresdk::{
42 activity_result::{ActivityResolution, activity_resolution},
43 child_workflow::ChildWorkflowResult,
44 common::NamespacedWorkflowExecution,
45 nexus::NexusOperationResult,
46 workflow_activation::{
47 InitializeWorkflow,
48 resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
49 },
50 workflow_commands::{
51 CancelChildWorkflowExecution, ModifyWorkflowProperties,
52 RequestCancelExternalWorkflowExecution, SetPatchMarker,
53 SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
54 WorkflowCommand, signal_external_workflow_execution as sig_we, workflow_command,
55 },
56 },
57 temporal::api::{
58 common::v1::{Memo, Payload, SearchAttributes},
59 failure::v1::Failure,
60 sdk::v1::UserMetadata,
61 },
62 },
63 worker::WorkerDeploymentVersion,
64};
65use tokio::sync::{oneshot, watch};
66
67#[derive(Clone)]
71pub struct BaseWorkflowContext {
72 inner: Rc<WorkflowContextInner>,
73}
74impl BaseWorkflowContext {
75 pub(crate) fn shared_mut(&self) -> impl DerefMut<Target = WorkflowContextSharedData> {
76 self.inner.shared.borrow_mut()
77 }
78
79 pub(crate) fn view(&self) -> WorkflowContextView {
81 WorkflowContextView::new(
82 self.inner.namespace.clone(),
83 self.inner.task_queue.clone(),
84 self.inner.run_id.clone(),
85 &self.inner.inital_information,
86 )
87 }
88}
89
90struct WorkflowContextInner {
91 namespace: String,
92 task_queue: String,
93 run_id: String,
94 inital_information: InitializeWorkflow,
95 chan: Sender<RustWfCmd>,
96 am_cancelled: watch::Receiver<Option<String>>,
97 shared: RefCell<WorkflowContextSharedData>,
98 seq_nums: RefCell<WfCtxProtectedDat>,
99 payload_converter: PayloadConverter,
100}
101
102pub struct SyncWorkflowContext<W> {
110 base: BaseWorkflowContext,
111 headers: Rc<HashMap<String, Payload>>,
113 _phantom: PhantomData<W>,
114}
115
116impl<W> Clone for SyncWorkflowContext<W> {
117 fn clone(&self) -> Self {
118 Self {
119 base: self.base.clone(),
120 headers: self.headers.clone(),
121 _phantom: PhantomData,
122 }
123 }
124}
125
126pub struct WorkflowContext<W> {
131 sync: SyncWorkflowContext<W>,
132 workflow_state: Rc<RefCell<W>>,
134 condition_wakers: Rc<RefCell<Vec<Waker>>>,
138}
139
140impl<W> Clone for WorkflowContext<W> {
141 fn clone(&self) -> Self {
142 Self {
143 sync: self.sync.clone(),
144 workflow_state: self.workflow_state.clone(),
145 condition_wakers: self.condition_wakers.clone(),
146 }
147 }
148}
149
150#[derive(Clone, Debug)]
154#[non_exhaustive]
155pub struct WorkflowContextView {
156 pub workflow_id: String,
158 pub run_id: String,
160 pub workflow_type: String,
162 pub task_queue: String,
164 pub namespace: String,
166
167 pub attempt: u32,
169 pub first_execution_run_id: String,
171 pub continued_from_run_id: Option<String>,
173
174 pub start_time: Option<SystemTime>,
176 pub execution_timeout: Option<Duration>,
178 pub run_timeout: Option<Duration>,
180 pub task_timeout: Option<Duration>,
182
183 pub parent: Option<ParentWorkflowInfo>,
185 pub root: Option<RootWorkflowInfo>,
187
188 pub retry_policy: Option<temporalio_common::protos::temporal::api::common::v1::RetryPolicy>,
190 pub cron_schedule: Option<String>,
192 pub memo: Option<Memo>,
194 pub search_attributes: Option<SearchAttributes>,
196}
197
198#[derive(Clone, Debug)]
200#[non_exhaustive]
201pub struct ParentWorkflowInfo {
202 pub workflow_id: String,
204 pub run_id: String,
206 pub namespace: String,
208}
209
210#[derive(Clone, Debug)]
212#[non_exhaustive]
213pub struct RootWorkflowInfo {
214 pub workflow_id: String,
216 pub run_id: String,
218}
219
220impl WorkflowContextView {
221 pub(crate) fn new(
223 namespace: String,
224 task_queue: String,
225 run_id: String,
226 init: &InitializeWorkflow,
227 ) -> Self {
228 let parent = init
229 .parent_workflow_info
230 .as_ref()
231 .map(|p| ParentWorkflowInfo {
232 workflow_id: p.workflow_id.clone(),
233 run_id: p.run_id.clone(),
234 namespace: p.namespace.clone(),
235 });
236
237 let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
238 workflow_id: r.workflow_id.clone(),
239 run_id: r.run_id.clone(),
240 });
241
242 let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
243 None
244 } else {
245 Some(init.continued_from_execution_run_id.clone())
246 };
247
248 let cron_schedule = if init.cron_schedule.is_empty() {
249 None
250 } else {
251 Some(init.cron_schedule.clone())
252 };
253
254 Self {
255 workflow_id: init.workflow_id.clone(),
256 run_id,
257 workflow_type: init.workflow_type.clone(),
258 task_queue,
259 namespace,
260 attempt: init.attempt as u32,
261 first_execution_run_id: init.first_execution_run_id.clone(),
262 continued_from_run_id,
263 start_time: init.start_time.and_then(|t| t.try_into().ok()),
264 execution_timeout: init
265 .workflow_execution_timeout
266 .and_then(|d| d.try_into().ok()),
267 run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
268 task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
269 parent,
270 root,
271 retry_policy: init.retry_policy.clone(),
272 cron_schedule,
273 memo: init.memo.clone(),
274 search_attributes: init.search_attributes.clone(),
275 }
276 }
277}
278
279#[derive(Debug, thiserror::Error)]
281pub enum ActivityExecutionError {
282 #[error("Activity failed: {}", .0.message)]
284 Failed(Box<Failure>),
285 #[error("Activity cancelled: {}", .0.message)]
287 Cancelled(Box<Failure>),
288 #[error("Payload conversion failed: {0}")]
291 Serialization(#[from] PayloadConversionError),
292}
293
294impl ActivityExecutionError {
295 pub fn is_timeout(&self) -> bool {
297 match self {
298 ActivityExecutionError::Failed(f) => f.is_timeout().is_some(),
299 _ => false,
300 }
301 }
302}
303
304impl BaseWorkflowContext {
305 pub(crate) fn new(
308 namespace: String,
309 task_queue: String,
310 run_id: String,
311 init_workflow_job: InitializeWorkflow,
312 am_cancelled: watch::Receiver<Option<String>>,
313 payload_converter: PayloadConverter,
314 ) -> (Self, Receiver<RustWfCmd>) {
315 let (chan, rx) = std::sync::mpsc::channel();
317 (
318 Self {
319 inner: Rc::new(WorkflowContextInner {
320 namespace,
321 task_queue,
322 run_id,
323 shared: RefCell::new(WorkflowContextSharedData {
324 random_seed: init_workflow_job.randomness_seed,
325 search_attributes: init_workflow_job
326 .search_attributes
327 .clone()
328 .unwrap_or_default(),
329 ..Default::default()
330 }),
331 inital_information: init_workflow_job,
332 chan,
333 am_cancelled,
334 seq_nums: RefCell::new(WfCtxProtectedDat {
335 next_timer_sequence_number: 1,
336 next_activity_sequence_number: 1,
337 next_child_workflow_sequence_number: 1,
338 next_cancel_external_wf_sequence_number: 1,
339 next_signal_external_wf_sequence_number: 1,
340 next_nexus_op_sequence_number: 1,
341 }),
342 payload_converter,
343 }),
344 },
345 rx,
346 )
347 }
348
349 pub(crate) fn send(&self, c: RustWfCmd) {
351 self.inner.chan.send(c).expect("command channel intact");
352 }
353
354 fn cancel(&self, cancellable_id: CancellableID) {
356 self.send(RustWfCmd::Cancel(cancellable_id));
357 }
358
359 pub fn timer<T: Into<TimerOptions>>(
361 &self,
362 opts: T,
363 ) -> impl CancellableFuture<TimerResult> + use<T> {
364 let opts: TimerOptions = opts.into();
365 let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
366 let (cmd, unblocker) =
367 CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
368 self.send(
369 CommandCreateRequest {
370 cmd: WorkflowCommand {
371 variant: Some(
372 StartTimer {
373 seq,
374 start_to_fire_timeout: Some(
375 opts.duration
376 .try_into()
377 .expect("Durations must fit into 64 bits"),
378 ),
379 }
380 .into(),
381 ),
382 user_metadata: Some(UserMetadata {
383 summary: opts.summary.map(|x| x.as_bytes().into()),
384 details: None,
385 }),
386 },
387 unblocker,
388 }
389 .into(),
390 );
391 cmd
392 }
393
394 pub fn start_activity<AD: ActivityDefinition>(
396 &self,
397 _activity: AD,
398 input: impl Into<AD::Input>,
399 mut opts: ActivityOptions,
400 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
401 where
402 AD::Output: TemporalDeserializable,
403 {
404 let input = input.into();
405 let ctx = SerializationContext {
406 data: &SerializationContextData::Workflow,
407 converter: &self.inner.payload_converter,
408 };
409 let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
410 Ok(p) => p,
411 Err(e) => {
412 return ActivityFut::eager(e.into());
413 }
414 };
415 let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
416 let (cmd, unblocker) =
417 CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
418 if opts.task_queue.is_none() {
419 opts.task_queue = Some(self.inner.task_queue.clone());
420 }
421 self.send(
422 CommandCreateRequest {
423 cmd: opts.into_command(AD::name().to_string(), payloads, seq),
424 unblocker,
425 }
426 .into(),
427 );
428 ActivityFut::running(cmd, self.inner.payload_converter.clone())
429 }
430
431 pub fn start_local_activity<AD: ActivityDefinition>(
433 &self,
434 _activity: AD,
435 input: impl Into<AD::Input>,
436 opts: LocalActivityOptions,
437 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
438 where
439 AD::Output: TemporalDeserializable,
440 {
441 let input = input.into();
442 let ctx = SerializationContext {
443 data: &SerializationContextData::Workflow,
444 converter: &self.inner.payload_converter,
445 };
446 let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
447 Ok(p) => p,
448 Err(e) => {
449 return ActivityFut::eager(e.into());
450 }
451 };
452 ActivityFut::running(
453 LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
454 self.inner.payload_converter.clone(),
455 )
456 }
457
458 fn local_activity_no_timer_retry(
460 self,
461 activity_type: String,
462 arguments: Vec<Payload>,
463 opts: LocalActivityOptions,
464 ) -> impl CancellableFuture<ActivityResolution> {
465 let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
466 let (cmd, unblocker) =
467 CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
468 self.inner
469 .chan
470 .send(
471 CommandCreateRequest {
472 cmd: opts.into_command(activity_type, arguments, seq),
473 unblocker,
474 }
475 .into(),
476 )
477 .expect("command channel intact");
478 cmd
479 }
480
481 fn send_signal_wf(
482 self,
483 target: sig_we::Target,
484 signal: Signal,
485 ) -> impl CancellableFuture<SignalExternalWfResult> {
486 let seq = self
487 .inner
488 .seq_nums
489 .borrow_mut()
490 .next_signal_external_wf_seq();
491 let (cmd, unblocker) =
492 CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
493 self.send(
494 CommandCreateRequest {
495 cmd: WorkflowCommand {
496 variant: Some(
497 SignalExternalWorkflowExecution {
498 seq,
499 signal_name: signal.signal_name,
500 args: signal.data.input,
501 target: Some(target),
502 headers: signal.data.headers,
503 }
504 .into(),
505 ),
506 user_metadata: None,
507 },
508 unblocker,
509 }
510 .into(),
511 );
512 cmd
513 }
514}
515
516impl<W> SyncWorkflowContext<W> {
517 pub fn workflow_id(&self) -> &str {
519 &self.base.inner.inital_information.workflow_id
520 }
521
522 pub fn run_id(&self) -> &str {
524 &self.base.inner.run_id
525 }
526
527 pub fn namespace(&self) -> &str {
529 &self.base.inner.namespace
530 }
531
532 pub fn task_queue(&self) -> &str {
534 &self.base.inner.task_queue
535 }
536
537 pub fn workflow_time(&self) -> Option<SystemTime> {
539 self.base.inner.shared.borrow().wf_time
540 }
541
542 pub fn history_length(&self) -> u32 {
544 self.base.inner.shared.borrow().history_length
545 }
546
547 pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
551 self.base
552 .inner
553 .shared
554 .borrow()
555 .current_deployment_version
556 .clone()
557 }
558
559 pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
561 Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
562 }
563
564 pub fn random_seed(&self) -> u64 {
566 self.base.inner.shared.borrow().random_seed
567 }
568
569 pub fn is_replaying(&self) -> bool {
571 self.base.inner.shared.borrow().is_replaying
572 }
573
574 pub fn continue_as_new_suggested(&self) -> bool {
576 self.base.inner.shared.borrow().continue_as_new_suggested
577 }
578
579 pub fn headers(&self) -> &HashMap<String, Payload> {
584 &self.headers
585 }
586
587 pub fn payload_converter(&self) -> &PayloadConverter {
589 &self.base.inner.payload_converter
590 }
591
592 pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
595 &self.base.inner.inital_information
596 }
597
598 pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
600 let am_cancelled = self.base.inner.am_cancelled.clone();
601 async move {
602 if let Some(s) = am_cancelled.borrow().as_ref() {
603 return s.clone();
604 }
605 am_cancelled
606 .clone()
607 .changed()
608 .await
609 .expect("Cancelled send half not dropped");
610 am_cancelled.borrow().as_ref().cloned().unwrap_or_default()
611 }
612 .fuse()
613 }
614
615 pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
617 self.base.timer(opts)
618 }
619
620 pub fn start_activity<AD: ActivityDefinition>(
622 &self,
623 activity: AD,
624 input: impl Into<AD::Input>,
625 opts: ActivityOptions,
626 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
627 where
628 AD::Output: TemporalDeserializable,
629 {
630 self.base.start_activity(activity, input, opts)
631 }
632
633 pub fn start_local_activity<AD: ActivityDefinition>(
635 &self,
636 activity: AD,
637 input: impl Into<AD::Input>,
638 opts: LocalActivityOptions,
639 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
640 where
641 AD::Output: TemporalDeserializable,
642 {
643 self.base.start_local_activity(activity, input, opts)
644 }
645
646 pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
648 ChildWorkflow {
649 opts,
650 base_ctx: self.base.clone(),
651 }
652 }
653
654 pub fn patched(&self, patch_id: &str) -> bool {
656 self.patch_impl(patch_id, false)
657 }
658
659 pub fn deprecate_patch(&self, patch_id: &str) -> bool {
662 self.patch_impl(patch_id, true)
663 }
664
665 fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
666 self.base.send(
667 workflow_command::Variant::SetPatchMarker(SetPatchMarker {
668 patch_id: patch_id.to_string(),
669 deprecated,
670 })
671 .into(),
672 );
673 if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
675 return *present;
676 }
677
678 let res = !self.base.inner.shared.borrow().is_replaying;
681
682 self.base
683 .inner
684 .shared
685 .borrow_mut()
686 .changes
687 .insert(patch_id.to_string(), res);
688
689 res
690 }
691
692 pub fn signal_workflow(
695 &self,
696 opts: impl Into<SignalWorkflowOptions>,
697 ) -> impl CancellableFuture<SignalExternalWfResult> {
698 let options: SignalWorkflowOptions = opts.into();
699 let target = sig_we::Target::WorkflowExecution(NamespacedWorkflowExecution {
700 namespace: self.base.inner.namespace.clone(),
701 workflow_id: options.workflow_id,
702 run_id: options.run_id.unwrap_or_default(),
703 });
704 self.base.clone().send_signal_wf(target, options.signal)
705 }
706
707 pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
709 self.base.send(RustWfCmd::NewNonblockingCmd(
710 workflow_command::Variant::UpsertWorkflowSearchAttributes(
711 UpsertWorkflowSearchAttributes {
712 search_attributes: Some(SearchAttributes {
713 indexed_fields: HashMap::from_iter(attr_iter),
714 }),
715 },
716 ),
717 ))
718 }
719
720 pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
722 self.base.send(RustWfCmd::NewNonblockingCmd(
723 workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
724 upserted_memo: Some(Memo {
725 fields: HashMap::from_iter(attr_iter),
726 }),
727 }),
728 ))
729 }
730
731 pub fn force_task_fail(&self, with: anyhow::Error) {
733 self.base.send(with.into());
734 }
735
736 pub fn cancel_external(
739 &self,
740 target: NamespacedWorkflowExecution,
741 reason: String,
742 ) -> impl FusedFuture<Output = CancelExternalWfResult> {
743 let seq = self
744 .base
745 .inner
746 .seq_nums
747 .borrow_mut()
748 .next_cancel_external_wf_seq();
749 let (cmd, unblocker) = WFCommandFut::new();
750 self.base.send(
751 CommandCreateRequest {
752 cmd: WorkflowCommand {
753 variant: Some(
754 RequestCancelExternalWorkflowExecution {
755 seq,
756 workflow_execution: Some(target),
757 reason,
758 }
759 .into(),
760 ),
761 user_metadata: None,
762 },
763 unblocker,
764 }
765 .into(),
766 );
767 cmd
768 }
769
770 pub fn start_nexus_operation(
772 &self,
773 opts: NexusOperationOptions,
774 ) -> impl CancellableFuture<NexusStartResult> {
775 let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
776 let (result_future, unblocker) = WFCommandFut::new();
777 self.base
778 .send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker });
779 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
780 CancellableID::NexusOp(seq),
781 NexusUnblockData {
782 result_future: result_future.shared(),
783 schedule_seq: seq,
784 base_ctx: self.base.clone(),
785 },
786 self.base.clone(),
787 );
788 self.base.send(
789 CommandCreateRequest {
790 cmd: opts.into_command(seq),
791 unblocker,
792 }
793 .into(),
794 );
795 cmd
796 }
797
798 pub(crate) fn view(&self) -> WorkflowContextView {
800 self.base.view()
801 }
802}
803
804impl<W> WorkflowContext<W> {
805 pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
807 Self {
808 sync: SyncWorkflowContext {
809 base,
810 headers: Rc::new(HashMap::new()),
811 _phantom: PhantomData,
812 },
813 workflow_state,
814 condition_wakers: Rc::new(RefCell::new(Vec::new())),
815 }
816 }
817
818 pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
820 Self {
821 sync: SyncWorkflowContext {
822 base: self.sync.base.clone(),
823 headers: Rc::new(headers),
824 _phantom: PhantomData,
825 },
826 workflow_state: self.workflow_state.clone(),
827 condition_wakers: self.condition_wakers.clone(),
828 }
829 }
830
831 pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
833 self.sync.clone()
834 }
835
836 pub fn workflow_id(&self) -> &str {
840 self.sync.workflow_id()
841 }
842
843 pub fn run_id(&self) -> &str {
845 self.sync.run_id()
846 }
847
848 pub fn namespace(&self) -> &str {
850 self.sync.namespace()
851 }
852
853 pub fn task_queue(&self) -> &str {
855 self.sync.task_queue()
856 }
857
858 pub fn workflow_time(&self) -> Option<SystemTime> {
860 self.sync.workflow_time()
861 }
862
863 pub fn history_length(&self) -> u32 {
865 self.sync.history_length()
866 }
867
868 pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
872 self.sync.current_deployment_version()
873 }
874
875 pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
877 self.sync.search_attributes()
878 }
879
880 pub fn random_seed(&self) -> u64 {
882 self.sync.random_seed()
883 }
884
885 pub fn is_replaying(&self) -> bool {
887 self.sync.is_replaying()
888 }
889
890 pub fn continue_as_new_suggested(&self) -> bool {
892 self.sync.continue_as_new_suggested()
893 }
894
895 pub fn headers(&self) -> &HashMap<String, Payload> {
897 self.sync.headers()
898 }
899
900 pub fn payload_converter(&self) -> &PayloadConverter {
902 self.sync.payload_converter()
903 }
904
905 pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
907 self.sync.workflow_initial_info()
908 }
909
910 pub fn cancelled(&self) -> impl FusedFuture<Output = String> + '_ {
912 self.sync.cancelled()
913 }
914
915 pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
917 self.sync.timer(opts)
918 }
919
920 pub fn start_activity<AD: ActivityDefinition>(
922 &self,
923 activity: AD,
924 input: impl Into<AD::Input>,
925 opts: ActivityOptions,
926 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
927 where
928 AD::Output: TemporalDeserializable,
929 {
930 self.sync.start_activity(activity, input, opts)
931 }
932
933 pub fn start_local_activity<AD: ActivityDefinition>(
935 &self,
936 activity: AD,
937 input: impl Into<AD::Input>,
938 opts: LocalActivityOptions,
939 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
940 where
941 AD::Output: TemporalDeserializable,
942 {
943 self.sync.start_local_activity(activity, input, opts)
944 }
945
946 pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
948 self.sync.child_workflow(opts)
949 }
950
951 pub fn patched(&self, patch_id: &str) -> bool {
953 self.sync.patched(patch_id)
954 }
955
956 pub fn deprecate_patch(&self, patch_id: &str) -> bool {
959 self.sync.deprecate_patch(patch_id)
960 }
961
962 pub fn signal_workflow(
964 &self,
965 opts: impl Into<SignalWorkflowOptions>,
966 ) -> impl CancellableFuture<SignalExternalWfResult> {
967 self.sync.signal_workflow(opts)
968 }
969
970 pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
972 self.sync.upsert_search_attributes(attr_iter)
973 }
974
975 pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
977 self.sync.upsert_memo(attr_iter)
978 }
979
980 pub fn force_task_fail(&self, with: anyhow::Error) {
982 self.sync.force_task_fail(with)
983 }
984
985 pub fn cancel_external(
987 &self,
988 target: NamespacedWorkflowExecution,
989 reason: String,
990 ) -> impl FusedFuture<Output = CancelExternalWfResult> {
991 self.sync.cancel_external(target, reason)
992 }
993
994 pub fn start_nexus_operation(
996 &self,
997 opts: NexusOperationOptions,
998 ) -> impl CancellableFuture<NexusStartResult> {
999 self.sync.start_nexus_operation(opts)
1000 }
1001
1002 pub(crate) fn view(&self) -> WorkflowContextView {
1004 self.sync.view()
1005 }
1006
1007 pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
1012 f(&*self.workflow_state.borrow())
1013 }
1014
1015 pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
1024 let result = f(&mut *self.workflow_state.borrow_mut());
1025 for waker in self.condition_wakers.borrow_mut().drain(..) {
1026 waker.wake();
1027 }
1028 result
1029 }
1030
1031 pub fn wait_condition<'a>(
1036 &'a self,
1037 mut condition: impl FnMut(&W) -> bool + 'a,
1038 ) -> impl Future<Output = ()> + 'a {
1039 future::poll_fn(move |cx: &mut Context<'_>| {
1040 if condition(&*self.workflow_state.borrow()) {
1041 Poll::Ready(())
1042 } else {
1043 self.condition_wakers.borrow_mut().push(cx.waker().clone());
1044 Poll::Pending
1045 }
1046 })
1047 }
1048}
1049
1050struct WfCtxProtectedDat {
1051 next_timer_sequence_number: u32,
1052 next_activity_sequence_number: u32,
1053 next_child_workflow_sequence_number: u32,
1054 next_cancel_external_wf_sequence_number: u32,
1055 next_signal_external_wf_sequence_number: u32,
1056 next_nexus_op_sequence_number: u32,
1057}
1058
1059impl WfCtxProtectedDat {
1060 fn next_timer_seq(&mut self) -> u32 {
1061 let seq = self.next_timer_sequence_number;
1062 self.next_timer_sequence_number += 1;
1063 seq
1064 }
1065 fn next_activity_seq(&mut self) -> u32 {
1066 let seq = self.next_activity_sequence_number;
1067 self.next_activity_sequence_number += 1;
1068 seq
1069 }
1070 fn next_child_workflow_seq(&mut self) -> u32 {
1071 let seq = self.next_child_workflow_sequence_number;
1072 self.next_child_workflow_sequence_number += 1;
1073 seq
1074 }
1075 fn next_cancel_external_wf_seq(&mut self) -> u32 {
1076 let seq = self.next_cancel_external_wf_sequence_number;
1077 self.next_cancel_external_wf_sequence_number += 1;
1078 seq
1079 }
1080 fn next_signal_external_wf_seq(&mut self) -> u32 {
1081 let seq = self.next_signal_external_wf_sequence_number;
1082 self.next_signal_external_wf_sequence_number += 1;
1083 seq
1084 }
1085 fn next_nexus_op_seq(&mut self) -> u32 {
1086 let seq = self.next_nexus_op_sequence_number;
1087 self.next_nexus_op_sequence_number += 1;
1088 seq
1089 }
1090}
1091
1092#[derive(Clone, Debug, Default)]
1093pub(crate) struct WorkflowContextSharedData {
1094 pub(crate) changes: HashMap<String, bool>,
1096 pub(crate) is_replaying: bool,
1097 pub(crate) wf_time: Option<SystemTime>,
1098 pub(crate) history_length: u32,
1099 pub(crate) continue_as_new_suggested: bool,
1100 pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
1101 pub(crate) search_attributes: SearchAttributes,
1102 pub(crate) random_seed: u64,
1103}
1104
1105pub trait CancellableFuture<T>: Future<Output = T> + FusedFuture {
1108 fn cancel(&self);
1110}
1111
1112pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
1114 fn cancel_with_reason(&self, reason: String);
1116}
1117
1118struct WFCommandFut<T, D> {
1119 _unused: PhantomData<T>,
1120 result_rx: oneshot::Receiver<UnblockEvent>,
1121 other_dat: Option<D>,
1122}
1123impl<T> WFCommandFut<T, ()> {
1124 fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
1125 Self::new_with_dat(())
1126 }
1127}
1128
1129impl<T, D> WFCommandFut<T, D> {
1130 fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
1131 let (tx, rx) = oneshot::channel();
1132 (
1133 Self {
1134 _unused: PhantomData,
1135 result_rx: rx,
1136 other_dat: Some(other_dat),
1137 },
1138 tx,
1139 )
1140 }
1141}
1142
1143impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1144impl<T, D> Future for WFCommandFut<T, D>
1145where
1146 T: Unblockable<OtherDat = D>,
1147{
1148 type Output = T;
1149
1150 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1151 self.result_rx.poll_unpin(cx).map(|x| {
1152 let od = self
1153 .other_dat
1154 .take()
1155 .expect("Other data must exist when resolving command future");
1156 Unblockable::unblock(x.unwrap(), od)
1157 })
1158 }
1159}
1160impl<T, D> FusedFuture for WFCommandFut<T, D>
1161where
1162 T: Unblockable<OtherDat = D>,
1163{
1164 fn is_terminated(&self) -> bool {
1165 self.other_dat.is_none()
1166 }
1167}
1168
1169struct CancellableWFCommandFut<T, D, ID = CancellableID> {
1170 cmd_fut: WFCommandFut<T, D>,
1171 cancellable_id: ID,
1172 base_ctx: BaseWorkflowContext,
1173}
1174impl<T, ID> CancellableWFCommandFut<T, (), ID> {
1175 fn new(
1176 cancellable_id: ID,
1177 base_ctx: BaseWorkflowContext,
1178 ) -> (Self, oneshot::Sender<UnblockEvent>) {
1179 Self::new_with_dat(cancellable_id, (), base_ctx)
1180 }
1181}
1182impl<T, D, ID> CancellableWFCommandFut<T, D, ID> {
1183 fn new_with_dat(
1184 cancellable_id: ID,
1185 other_dat: D,
1186 base_ctx: BaseWorkflowContext,
1187 ) -> (Self, oneshot::Sender<UnblockEvent>) {
1188 let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
1189 (
1190 Self {
1191 cmd_fut,
1192 cancellable_id,
1193 base_ctx,
1194 },
1195 sender,
1196 )
1197 }
1198}
1199impl<T, D, ID> Unpin for CancellableWFCommandFut<T, D, ID> where T: Unblockable<OtherDat = D> {}
1200impl<T, D, ID> Future for CancellableWFCommandFut<T, D, ID>
1201where
1202 T: Unblockable<OtherDat = D>,
1203{
1204 type Output = T;
1205
1206 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1207 self.cmd_fut.poll_unpin(cx)
1208 }
1209}
1210impl<T, D, ID> FusedFuture for CancellableWFCommandFut<T, D, ID>
1211where
1212 T: Unblockable<OtherDat = D>,
1213{
1214 fn is_terminated(&self) -> bool {
1215 self.cmd_fut.is_terminated()
1216 }
1217}
1218
1219impl<T, D, ID> CancellableFuture<T> for CancellableWFCommandFut<T, D, ID>
1220where
1221 T: Unblockable<OtherDat = D>,
1222 ID: Clone + Into<CancellableID>,
1223{
1224 fn cancel(&self) {
1225 self.base_ctx.cancel(self.cancellable_id.clone().into());
1226 }
1227}
1228impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D, CancellableIDWithReason>
1229where
1230 T: Unblockable<OtherDat = D>,
1231{
1232 fn cancel_with_reason(&self, reason: String) {
1233 let new_id = self.cancellable_id.clone().with_reason(reason);
1234 self.base_ctx.cancel(new_id);
1235 }
1236}
1237
1238struct LATimerBackoffFut {
1239 la_opts: LocalActivityOptions,
1240 activity_type: String,
1241 arguments: Vec<Payload>,
1242 current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
1243 timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
1244 base_ctx: BaseWorkflowContext,
1245 next_attempt: u32,
1246 next_sched_time: Option<prost_types::Timestamp>,
1247 did_cancel: AtomicBool,
1248 terminated: bool,
1249}
1250impl LATimerBackoffFut {
1251 pub(crate) fn new(
1252 activity_type: String,
1253 arguments: Vec<Payload>,
1254 opts: LocalActivityOptions,
1255 base_ctx: BaseWorkflowContext,
1256 ) -> Self {
1257 let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
1258 activity_type.clone(),
1259 arguments.clone(),
1260 opts.clone(),
1261 ));
1262 Self {
1263 la_opts: opts,
1264 activity_type,
1265 arguments,
1266 current_fut,
1267 timer_fut: None,
1268 base_ctx,
1269 next_attempt: 1,
1270 next_sched_time: None,
1271 did_cancel: AtomicBool::new(false),
1272 terminated: false,
1273 }
1274 }
1275}
1276impl Unpin for LATimerBackoffFut {}
1277impl Future for LATimerBackoffFut {
1278 type Output = ActivityResolution;
1279
1280 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1281 if let Some(tf) = self.timer_fut.as_mut() {
1283 return match tf.poll_unpin(cx) {
1284 Poll::Ready(tr) => {
1285 self.timer_fut = None;
1286 if let TimerResult::Fired = tr {
1288 let mut opts = self.la_opts.clone();
1289 opts.attempt = Some(self.next_attempt);
1290 opts.original_schedule_time
1291 .clone_from(&self.next_sched_time);
1292 self.current_fut =
1293 Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
1294 self.activity_type.clone(),
1295 self.arguments.clone(),
1296 opts,
1297 ));
1298 Poll::Pending
1299 } else {
1300 self.terminated = true;
1301 Poll::Ready(ActivityResolution {
1302 status: Some(
1303 activity_resolution::Status::Cancelled(Default::default()),
1304 ),
1305 })
1306 }
1307 }
1308 Poll::Pending => Poll::Pending,
1309 };
1310 }
1311 let poll_res = self.current_fut.poll_unpin(cx);
1312 if let Poll::Ready(ref r) = poll_res
1313 && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
1314 {
1315 if self.did_cancel.load(Ordering::Acquire) {
1319 self.terminated = true;
1320 return Poll::Ready(ActivityResolution {
1321 status: Some(activity_resolution::Status::Cancelled(Default::default())),
1322 });
1323 }
1324
1325 let timer_f = self.base_ctx.timer::<Duration>(
1326 b.backoff_duration
1327 .expect("Duration is set")
1328 .try_into()
1329 .expect("duration converts ok"),
1330 );
1331 self.timer_fut = Some(Box::pin(timer_f));
1332 self.next_attempt = b.attempt;
1333 self.next_sched_time.clone_from(&b.original_schedule_time);
1334 return Poll::Pending;
1335 }
1336 if poll_res.is_ready() {
1337 self.terminated = true;
1338 }
1339 poll_res
1340 }
1341}
1342impl FusedFuture for LATimerBackoffFut {
1343 fn is_terminated(&self) -> bool {
1344 self.terminated
1345 }
1346}
1347impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
1348 fn cancel(&self) {
1349 self.did_cancel.store(true, Ordering::Release);
1350 if let Some(tf) = self.timer_fut.as_ref() {
1351 tf.cancel();
1352 }
1353 self.current_fut.cancel();
1354 }
1355}
1356
1357enum ActivityFut<F, Output> {
1359 Errored {
1361 error: Option<ActivityExecutionError>,
1362 _phantom: PhantomData<Output>,
1363 },
1364 Running {
1366 inner: F,
1367 payload_converter: PayloadConverter,
1368 _phantom: PhantomData<Output>,
1369 },
1370 Terminated,
1371}
1372
1373impl<F, Output> ActivityFut<F, Output> {
1374 fn eager(err: ActivityExecutionError) -> Self {
1375 Self::Errored {
1376 error: Some(err),
1377 _phantom: PhantomData,
1378 }
1379 }
1380
1381 fn running(inner: F, payload_converter: PayloadConverter) -> Self {
1382 Self::Running {
1383 inner,
1384 payload_converter,
1385 _phantom: PhantomData,
1386 }
1387 }
1388}
1389
1390impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
1391
1392impl<F, Output> Future for ActivityFut<F, Output>
1393where
1394 F: Future<Output = ActivityResolution> + Unpin,
1395 Output: TemporalDeserializable + 'static,
1396{
1397 type Output = Result<Output, ActivityExecutionError>;
1398
1399 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1400 let this = self.get_mut();
1401 let poll = match this {
1402 ActivityFut::Errored { error, .. } => {
1403 Poll::Ready(Err(error.take().expect("polled after completion")))
1404 }
1405 ActivityFut::Running {
1406 inner,
1407 payload_converter,
1408 ..
1409 } => match Pin::new(inner).poll(cx) {
1410 Poll::Pending => Poll::Pending,
1411 Poll::Ready(resolution) => Poll::Ready({
1412 let status = resolution.status.ok_or_else(|| {
1413 ActivityExecutionError::Failed(Box::new(Failure {
1414 message: "Activity completed without a status".to_string(),
1415 ..Default::default()
1416 }))
1417 })?;
1418
1419 match status {
1420 activity_resolution::Status::Completed(success) => {
1421 let payload = success.result.unwrap_or_default();
1422 let ctx = SerializationContext {
1423 data: &SerializationContextData::Workflow,
1424 converter: payload_converter,
1425 };
1426 payload_converter
1427 .from_payload::<Output>(&ctx, payload)
1428 .map_err(ActivityExecutionError::Serialization)
1429 }
1430 activity_resolution::Status::Failed(f) => Err(
1431 ActivityExecutionError::Failed(Box::new(f.failure.unwrap_or_default())),
1432 ),
1433 activity_resolution::Status::Cancelled(c) => {
1434 Err(ActivityExecutionError::Cancelled(Box::new(
1435 c.failure.unwrap_or_default(),
1436 )))
1437 }
1438 activity_resolution::Status::Backoff(_) => {
1439 panic!("DoBackoff should be handled by LATimerBackoffFut")
1440 }
1441 }
1442 }),
1443 },
1444 ActivityFut::Terminated => panic!("polled after termination"),
1445 };
1446 if poll.is_ready() {
1447 *this = ActivityFut::Terminated;
1448 }
1449 poll
1450 }
1451}
1452
1453impl<F, Output> FusedFuture for ActivityFut<F, Output>
1454where
1455 F: Future<Output = ActivityResolution> + Unpin,
1456 Output: TemporalDeserializable + 'static,
1457{
1458 fn is_terminated(&self) -> bool {
1459 matches!(self, ActivityFut::Terminated)
1460 }
1461}
1462
1463impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
1464where
1465 F: CancellableFuture<ActivityResolution> + Unpin,
1466 Output: TemporalDeserializable + 'static,
1467{
1468 fn cancel(&self) {
1469 if let ActivityFut::Running { inner, .. } = self {
1470 inner.cancel()
1471 }
1472 }
1473}
1474
1475#[derive(Clone, derive_more::Debug)]
1477pub struct ChildWorkflow {
1478 opts: ChildWorkflowOptions,
1479 #[debug(skip)]
1480 base_ctx: BaseWorkflowContext,
1481}
1482
1483pub(crate) struct ChildWfCommon {
1484 workflow_id: String,
1485 result_future: CancellableWFCommandFut<ChildWorkflowResult, (), CancellableIDWithReason>,
1486 base_ctx: BaseWorkflowContext,
1487}
1488
1489#[derive(derive_more::Debug)]
1491pub struct PendingChildWorkflow {
1492 pub status: ChildWorkflowStartStatus,
1494 #[debug(skip)]
1495 pub(crate) common: ChildWfCommon,
1496}
1497
1498impl PendingChildWorkflow {
1499 pub fn into_started(self) -> Option<StartedChildWorkflow> {
1502 match self.status {
1503 ChildWorkflowStartStatus::Succeeded(s) => Some(StartedChildWorkflow {
1504 run_id: s.run_id,
1505 common: self.common,
1506 }),
1507 _ => None,
1508 }
1509 }
1510}
1511
1512#[derive(derive_more::Debug)]
1514pub struct StartedChildWorkflow {
1515 pub run_id: String,
1517 #[debug(skip)]
1518 common: ChildWfCommon,
1519}
1520
1521impl ChildWorkflow {
1522 pub fn start(self) -> impl CancellableFutureWithReason<PendingChildWorkflow> {
1524 let child_seq = self
1525 .base_ctx
1526 .inner
1527 .seq_nums
1528 .borrow_mut()
1529 .next_child_workflow_seq();
1530 let cancel_seq = self
1534 .base_ctx
1535 .inner
1536 .seq_nums
1537 .borrow_mut()
1538 .next_cancel_external_wf_seq();
1539 let (result_cmd, unblocker) = CancellableWFCommandFut::new(
1540 CancellableIDWithReason::ExternalWorkflow {
1541 seqnum: cancel_seq,
1542 execution: NamespacedWorkflowExecution {
1543 workflow_id: self.opts.workflow_id.clone(),
1544 ..Default::default()
1545 },
1546 },
1547 self.base_ctx.clone(),
1548 );
1549 self.base_ctx.send(
1550 CommandSubscribeChildWorkflowCompletion {
1551 seq: child_seq,
1552 unblocker,
1553 }
1554 .into(),
1555 );
1556
1557 let common = ChildWfCommon {
1558 workflow_id: self.opts.workflow_id.clone(),
1559 result_future: result_cmd,
1560 base_ctx: self.base_ctx.clone(),
1561 };
1562
1563 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
1564 CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
1565 common,
1566 self.base_ctx.clone(),
1567 );
1568 self.base_ctx.send(
1569 CommandCreateRequest {
1570 cmd: self.opts.into_command(child_seq),
1571 unblocker,
1572 }
1573 .into(),
1574 );
1575
1576 cmd
1577 }
1578}
1579
1580impl StartedChildWorkflow {
1581 pub fn result(self) -> impl CancellableFutureWithReason<ChildWorkflowResult> {
1584 self.common.result_future
1585 }
1586
1587 pub fn cancel(&self, reason: String) {
1589 self.common.base_ctx.send(RustWfCmd::NewNonblockingCmd(
1590 CancelChildWorkflowExecution {
1591 child_workflow_seq: self.common.result_future.cancellable_id.seq_num(),
1592 reason,
1593 }
1594 .into(),
1595 ));
1596 }
1597
1598 pub fn signal<S: Into<Signal>>(
1600 &self,
1601 data: S,
1602 ) -> impl CancellableFuture<SignalExternalWfResult> + 'static {
1603 let target = sig_we::Target::ChildWorkflowId(self.common.workflow_id.clone());
1604 self.common
1605 .base_ctx
1606 .clone()
1607 .send_signal_wf(target, data.into())
1608 }
1609}
1610
1611#[derive(derive_more::Debug)]
1612#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
1613pub struct StartedNexusOperation {
1614 pub operation_token: Option<String>,
1616 pub(crate) unblock_dat: NexusUnblockData,
1617}
1618
1619pub(crate) struct NexusUnblockData {
1620 result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
1621 schedule_seq: u32,
1622 base_ctx: BaseWorkflowContext,
1623}
1624
1625impl StartedNexusOperation {
1626 pub async fn result(&self) -> NexusOperationResult {
1627 self.unblock_dat.result_future.clone().await
1628 }
1629
1630 pub fn cancel(&self) {
1631 self.unblock_dat
1632 .base_ctx
1633 .cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
1634 }
1635}