1mod options;
2
3pub use options::{
4 ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, Signal,
5 SignalData, SignalWorkflowOptions, TimerOptions,
6};
7
8use crate::{
9 CancelExternalWfResult, CancellableID, CancellableIDWithReason, CommandCreateRequest,
10 CommandSubscribeChildWorkflowCompletion, NexusStartResult, RustWfCmd, SignalExternalWfResult,
11 SupportsCancelReason, TimerResult, UnblockEvent, Unblockable,
12 workflow_context::options::IntoWorkflowCommand,
13};
14use futures_util::{FutureExt, future::Shared, task::Context};
15use std::{
16 cell::{Ref, RefCell},
17 collections::HashMap,
18 future::{self, Future},
19 marker::PhantomData,
20 ops::{Deref, DerefMut},
21 pin::Pin,
22 rc::Rc,
23 sync::{
24 atomic::{AtomicBool, Ordering},
25 mpsc::{Receiver, Sender},
26 },
27 task::{Poll, Waker},
28 time::{Duration, SystemTime},
29};
30use temporalio_common::{
31 ActivityDefinition,
32 data_converters::{
33 GenericPayloadConverter, PayloadConversionError, PayloadConverter, SerializationContext,
34 SerializationContextData, TemporalDeserializable,
35 },
36 protos::{
37 coresdk::{
38 activity_result::{ActivityResolution, activity_resolution},
39 child_workflow::ChildWorkflowResult,
40 common::NamespacedWorkflowExecution,
41 nexus::NexusOperationResult,
42 workflow_activation::{
43 InitializeWorkflow,
44 resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
45 },
46 workflow_commands::{
47 CancelChildWorkflowExecution, ModifyWorkflowProperties,
48 RequestCancelExternalWorkflowExecution, SetPatchMarker,
49 SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
50 WorkflowCommand, signal_external_workflow_execution as sig_we, workflow_command,
51 },
52 },
53 temporal::api::{
54 common::v1::{Memo, Payload, SearchAttributes},
55 failure::v1::Failure,
56 sdk::v1::UserMetadata,
57 },
58 },
59 worker::WorkerDeploymentVersion,
60};
61use tokio::sync::{oneshot, watch};
62
63#[derive(Clone)]
67pub struct BaseWorkflowContext {
68 inner: Rc<WorkflowContextInner>,
69}
70impl BaseWorkflowContext {
71 pub(crate) fn shared_mut(&self) -> impl DerefMut<Target = WorkflowContextSharedData> {
72 self.inner.shared.borrow_mut()
73 }
74
75 pub(crate) fn view(&self) -> WorkflowContextView {
77 WorkflowContextView::new(
78 self.inner.namespace.clone(),
79 self.inner.task_queue.clone(),
80 self.inner.run_id.clone(),
81 &self.inner.inital_information,
82 )
83 }
84}
85
86struct WorkflowContextInner {
87 namespace: String,
88 task_queue: String,
89 run_id: String,
90 inital_information: InitializeWorkflow,
91 chan: Sender<RustWfCmd>,
92 am_cancelled: watch::Receiver<Option<String>>,
93 shared: RefCell<WorkflowContextSharedData>,
94 seq_nums: RefCell<WfCtxProtectedDat>,
95 payload_converter: PayloadConverter,
96}
97
98pub struct SyncWorkflowContext<W> {
106 base: BaseWorkflowContext,
107 headers: Rc<HashMap<String, Payload>>,
109 _phantom: PhantomData<W>,
110}
111
112impl<W> Clone for SyncWorkflowContext<W> {
113 fn clone(&self) -> Self {
114 Self {
115 base: self.base.clone(),
116 headers: self.headers.clone(),
117 _phantom: PhantomData,
118 }
119 }
120}
121
122pub struct WorkflowContext<W> {
127 sync: SyncWorkflowContext<W>,
128 workflow_state: Rc<RefCell<W>>,
130 condition_wakers: Rc<RefCell<Vec<Waker>>>,
134}
135
136impl<W> Clone for WorkflowContext<W> {
137 fn clone(&self) -> Self {
138 Self {
139 sync: self.sync.clone(),
140 workflow_state: self.workflow_state.clone(),
141 condition_wakers: self.condition_wakers.clone(),
142 }
143 }
144}
145
146#[derive(Clone, Debug)]
150#[non_exhaustive]
151pub struct WorkflowContextView {
152 pub workflow_id: String,
154 pub run_id: String,
156 pub workflow_type: String,
158 pub task_queue: String,
160 pub namespace: String,
162
163 pub attempt: u32,
165 pub first_execution_run_id: String,
167 pub continued_from_run_id: Option<String>,
169
170 pub start_time: Option<SystemTime>,
172 pub execution_timeout: Option<Duration>,
174 pub run_timeout: Option<Duration>,
176 pub task_timeout: Option<Duration>,
178
179 pub parent: Option<ParentWorkflowInfo>,
181 pub root: Option<RootWorkflowInfo>,
183
184 pub retry_policy: Option<temporalio_common::protos::temporal::api::common::v1::RetryPolicy>,
186 pub cron_schedule: Option<String>,
188 pub memo: Option<Memo>,
190 pub search_attributes: Option<SearchAttributes>,
192}
193
194#[derive(Clone, Debug)]
196#[non_exhaustive]
197pub struct ParentWorkflowInfo {
198 pub workflow_id: String,
200 pub run_id: String,
202 pub namespace: String,
204}
205
206#[derive(Clone, Debug)]
208#[non_exhaustive]
209pub struct RootWorkflowInfo {
210 pub workflow_id: String,
212 pub run_id: String,
214}
215
216impl WorkflowContextView {
217 pub(crate) fn new(
219 namespace: String,
220 task_queue: String,
221 run_id: String,
222 init: &InitializeWorkflow,
223 ) -> Self {
224 let parent = init
225 .parent_workflow_info
226 .as_ref()
227 .map(|p| ParentWorkflowInfo {
228 workflow_id: p.workflow_id.clone(),
229 run_id: p.run_id.clone(),
230 namespace: p.namespace.clone(),
231 });
232
233 let root = init.root_workflow.as_ref().map(|r| RootWorkflowInfo {
234 workflow_id: r.workflow_id.clone(),
235 run_id: r.run_id.clone(),
236 });
237
238 let continued_from_run_id = if init.continued_from_execution_run_id.is_empty() {
239 None
240 } else {
241 Some(init.continued_from_execution_run_id.clone())
242 };
243
244 let cron_schedule = if init.cron_schedule.is_empty() {
245 None
246 } else {
247 Some(init.cron_schedule.clone())
248 };
249
250 Self {
251 workflow_id: init.workflow_id.clone(),
252 run_id,
253 workflow_type: init.workflow_type.clone(),
254 task_queue,
255 namespace,
256 attempt: init.attempt as u32,
257 first_execution_run_id: init.first_execution_run_id.clone(),
258 continued_from_run_id,
259 start_time: init.start_time.and_then(|t| t.try_into().ok()),
260 execution_timeout: init
261 .workflow_execution_timeout
262 .and_then(|d| d.try_into().ok()),
263 run_timeout: init.workflow_run_timeout.and_then(|d| d.try_into().ok()),
264 task_timeout: init.workflow_task_timeout.and_then(|d| d.try_into().ok()),
265 parent,
266 root,
267 retry_policy: init.retry_policy.clone(),
268 cron_schedule,
269 memo: init.memo.clone(),
270 search_attributes: init.search_attributes.clone(),
271 }
272 }
273}
274
275#[derive(Debug, thiserror::Error)]
277pub enum ActivityExecutionError {
278 #[error("Activity failed: {}", .0.message)]
280 Failed(Box<Failure>),
281 #[error("Activity cancelled: {}", .0.message)]
283 Cancelled(Box<Failure>),
284 #[error("Payload conversion failed: {0}")]
287 Serialization(#[from] PayloadConversionError),
288}
289
290impl ActivityExecutionError {
291 pub fn is_timeout(&self) -> bool {
293 match self {
294 ActivityExecutionError::Failed(f) => f.is_timeout().is_some(),
295 _ => false,
296 }
297 }
298}
299
300impl BaseWorkflowContext {
301 pub(crate) fn new(
304 namespace: String,
305 task_queue: String,
306 run_id: String,
307 init_workflow_job: InitializeWorkflow,
308 am_cancelled: watch::Receiver<Option<String>>,
309 payload_converter: PayloadConverter,
310 ) -> (Self, Receiver<RustWfCmd>) {
311 let (chan, rx) = std::sync::mpsc::channel();
313 (
314 Self {
315 inner: Rc::new(WorkflowContextInner {
316 namespace,
317 task_queue,
318 run_id,
319 shared: RefCell::new(WorkflowContextSharedData {
320 random_seed: init_workflow_job.randomness_seed,
321 search_attributes: init_workflow_job
322 .search_attributes
323 .clone()
324 .unwrap_or_default(),
325 ..Default::default()
326 }),
327 inital_information: init_workflow_job,
328 chan,
329 am_cancelled,
330 seq_nums: RefCell::new(WfCtxProtectedDat {
331 next_timer_sequence_number: 1,
332 next_activity_sequence_number: 1,
333 next_child_workflow_sequence_number: 1,
334 next_cancel_external_wf_sequence_number: 1,
335 next_signal_external_wf_sequence_number: 1,
336 next_nexus_op_sequence_number: 1,
337 }),
338 payload_converter,
339 }),
340 },
341 rx,
342 )
343 }
344
345 pub(crate) fn send(&self, c: RustWfCmd) {
347 self.inner.chan.send(c).expect("command channel intact");
348 }
349
350 fn cancel(&self, cancellable_id: CancellableID) {
352 self.send(RustWfCmd::Cancel(cancellable_id));
353 }
354
355 pub fn timer<T: Into<TimerOptions>>(
357 &self,
358 opts: T,
359 ) -> impl CancellableFuture<TimerResult> + use<T> {
360 let opts: TimerOptions = opts.into();
361 let seq = self.inner.seq_nums.borrow_mut().next_timer_seq();
362 let (cmd, unblocker) =
363 CancellableWFCommandFut::new(CancellableID::Timer(seq), self.clone());
364 self.send(
365 CommandCreateRequest {
366 cmd: WorkflowCommand {
367 variant: Some(
368 StartTimer {
369 seq,
370 start_to_fire_timeout: Some(
371 opts.duration
372 .try_into()
373 .expect("Durations must fit into 64 bits"),
374 ),
375 }
376 .into(),
377 ),
378 user_metadata: Some(UserMetadata {
379 summary: opts.summary.map(|x| x.as_bytes().into()),
380 details: None,
381 }),
382 },
383 unblocker,
384 }
385 .into(),
386 );
387 cmd
388 }
389
390 pub fn start_activity<AD: ActivityDefinition>(
392 &self,
393 _activity: AD,
394 input: impl Into<AD::Input>,
395 mut opts: ActivityOptions,
396 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
397 where
398 AD::Output: TemporalDeserializable,
399 {
400 let input = input.into();
401 let ctx = SerializationContext {
402 data: &SerializationContextData::Workflow,
403 converter: &self.inner.payload_converter,
404 };
405 let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
406 Ok(p) => p,
407 Err(e) => {
408 return ActivityFut::eager(e.into());
409 }
410 };
411 let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
412 let (cmd, unblocker) =
413 CancellableWFCommandFut::new(CancellableID::Activity(seq), self.clone());
414 if opts.task_queue.is_none() {
415 opts.task_queue = Some(self.inner.task_queue.clone());
416 }
417 self.send(
418 CommandCreateRequest {
419 cmd: opts.into_command(AD::name().to_string(), payloads, seq),
420 unblocker,
421 }
422 .into(),
423 );
424 ActivityFut::running(cmd, self.inner.payload_converter.clone())
425 }
426
427 pub fn start_local_activity<AD: ActivityDefinition>(
429 &self,
430 _activity: AD,
431 input: impl Into<AD::Input>,
432 opts: LocalActivityOptions,
433 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
434 where
435 AD::Output: TemporalDeserializable,
436 {
437 let input = input.into();
438 let ctx = SerializationContext {
439 data: &SerializationContextData::Workflow,
440 converter: &self.inner.payload_converter,
441 };
442 let payloads = match self.inner.payload_converter.to_payloads(&ctx, &input) {
443 Ok(p) => p,
444 Err(e) => {
445 return ActivityFut::eager(e.into());
446 }
447 };
448 ActivityFut::running(
449 LATimerBackoffFut::new(AD::name().to_string(), payloads, opts, self.clone()),
450 self.inner.payload_converter.clone(),
451 )
452 }
453
454 fn local_activity_no_timer_retry(
456 self,
457 activity_type: String,
458 arguments: Vec<Payload>,
459 opts: LocalActivityOptions,
460 ) -> impl CancellableFuture<ActivityResolution> {
461 let seq = self.inner.seq_nums.borrow_mut().next_activity_seq();
462 let (cmd, unblocker) =
463 CancellableWFCommandFut::new(CancellableID::LocalActivity(seq), self.clone());
464 self.inner
465 .chan
466 .send(
467 CommandCreateRequest {
468 cmd: opts.into_command(activity_type, arguments, seq),
469 unblocker,
470 }
471 .into(),
472 )
473 .expect("command channel intact");
474 cmd
475 }
476
477 fn send_signal_wf(
478 self,
479 target: sig_we::Target,
480 signal: Signal,
481 ) -> impl CancellableFuture<SignalExternalWfResult> {
482 let seq = self
483 .inner
484 .seq_nums
485 .borrow_mut()
486 .next_signal_external_wf_seq();
487 let (cmd, unblocker) =
488 CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq), self.clone());
489 self.send(
490 CommandCreateRequest {
491 cmd: WorkflowCommand {
492 variant: Some(
493 SignalExternalWorkflowExecution {
494 seq,
495 signal_name: signal.signal_name,
496 args: signal.data.input,
497 target: Some(target),
498 headers: signal.data.headers,
499 }
500 .into(),
501 ),
502 user_metadata: None,
503 },
504 unblocker,
505 }
506 .into(),
507 );
508 cmd
509 }
510}
511
512impl<W> SyncWorkflowContext<W> {
513 pub fn namespace(&self) -> &str {
515 &self.base.inner.namespace
516 }
517
518 pub fn task_queue(&self) -> &str {
520 &self.base.inner.task_queue
521 }
522
523 pub fn workflow_time(&self) -> Option<SystemTime> {
525 self.base.inner.shared.borrow().wf_time
526 }
527
528 pub fn history_length(&self) -> u32 {
530 self.base.inner.shared.borrow().history_length
531 }
532
533 pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
537 self.base
538 .inner
539 .shared
540 .borrow()
541 .current_deployment_version
542 .clone()
543 }
544
545 pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
547 Ref::map(self.base.inner.shared.borrow(), |s| &s.search_attributes)
548 }
549
550 pub fn random_seed(&self) -> u64 {
552 self.base.inner.shared.borrow().random_seed
553 }
554
555 pub fn is_replaying(&self) -> bool {
557 self.base.inner.shared.borrow().is_replaying
558 }
559
560 pub fn headers(&self) -> &HashMap<String, Payload> {
565 &self.headers
566 }
567
568 pub fn payload_converter(&self) -> &PayloadConverter {
570 &self.base.inner.payload_converter
571 }
572
573 pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
576 &self.base.inner.inital_information
577 }
578
579 pub async fn cancelled(&self) -> String {
581 if let Some(s) = self.base.inner.am_cancelled.borrow().as_ref() {
582 return s.clone();
583 }
584 self.base
585 .inner
586 .am_cancelled
587 .clone()
588 .changed()
589 .await
590 .expect("Cancelled send half not dropped");
591 self.base
592 .inner
593 .am_cancelled
594 .borrow()
595 .as_ref()
596 .cloned()
597 .unwrap_or_default()
598 }
599
600 pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
602 self.base.timer(opts)
603 }
604
605 pub fn start_activity<AD: ActivityDefinition>(
607 &self,
608 activity: AD,
609 input: impl Into<AD::Input>,
610 opts: ActivityOptions,
611 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
612 where
613 AD::Output: TemporalDeserializable,
614 {
615 self.base.start_activity(activity, input, opts)
616 }
617
618 pub fn start_local_activity<AD: ActivityDefinition>(
620 &self,
621 activity: AD,
622 input: impl Into<AD::Input>,
623 opts: LocalActivityOptions,
624 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
625 where
626 AD::Output: TemporalDeserializable,
627 {
628 self.base.start_local_activity(activity, input, opts)
629 }
630
631 pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
633 ChildWorkflow {
634 opts,
635 base_ctx: self.base.clone(),
636 }
637 }
638
639 pub fn patched(&self, patch_id: &str) -> bool {
641 self.patch_impl(patch_id, false)
642 }
643
644 pub fn deprecate_patch(&self, patch_id: &str) -> bool {
647 self.patch_impl(patch_id, true)
648 }
649
650 fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
651 self.base.send(
652 workflow_command::Variant::SetPatchMarker(SetPatchMarker {
653 patch_id: patch_id.to_string(),
654 deprecated,
655 })
656 .into(),
657 );
658 if let Some(present) = self.base.inner.shared.borrow().changes.get(patch_id) {
660 return *present;
661 }
662
663 let res = !self.base.inner.shared.borrow().is_replaying;
666
667 self.base
668 .inner
669 .shared
670 .borrow_mut()
671 .changes
672 .insert(patch_id.to_string(), res);
673
674 res
675 }
676
677 pub fn signal_workflow(
680 &self,
681 opts: impl Into<SignalWorkflowOptions>,
682 ) -> impl CancellableFuture<SignalExternalWfResult> {
683 let options: SignalWorkflowOptions = opts.into();
684 let target = sig_we::Target::WorkflowExecution(NamespacedWorkflowExecution {
685 namespace: self.base.inner.namespace.clone(),
686 workflow_id: options.workflow_id,
687 run_id: options.run_id.unwrap_or_default(),
688 });
689 self.base.clone().send_signal_wf(target, options.signal)
690 }
691
692 pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
694 self.base.send(RustWfCmd::NewNonblockingCmd(
695 workflow_command::Variant::UpsertWorkflowSearchAttributes(
696 UpsertWorkflowSearchAttributes {
697 search_attributes: Some(SearchAttributes {
698 indexed_fields: HashMap::from_iter(attr_iter),
699 }),
700 },
701 ),
702 ))
703 }
704
705 pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
707 self.base.send(RustWfCmd::NewNonblockingCmd(
708 workflow_command::Variant::ModifyWorkflowProperties(ModifyWorkflowProperties {
709 upserted_memo: Some(Memo {
710 fields: HashMap::from_iter(attr_iter),
711 }),
712 }),
713 ))
714 }
715
716 pub fn force_task_fail(&self, with: anyhow::Error) {
718 self.base.send(with.into());
719 }
720
721 pub fn cancel_external(
724 &self,
725 target: NamespacedWorkflowExecution,
726 reason: String,
727 ) -> impl Future<Output = CancelExternalWfResult> {
728 let seq = self
729 .base
730 .inner
731 .seq_nums
732 .borrow_mut()
733 .next_cancel_external_wf_seq();
734 let (cmd, unblocker) = WFCommandFut::new();
735 self.base.send(
736 CommandCreateRequest {
737 cmd: WorkflowCommand {
738 variant: Some(
739 RequestCancelExternalWorkflowExecution {
740 seq,
741 workflow_execution: Some(target),
742 reason,
743 }
744 .into(),
745 ),
746 user_metadata: None,
747 },
748 unblocker,
749 }
750 .into(),
751 );
752 cmd
753 }
754
755 pub fn start_nexus_operation(
757 &self,
758 opts: NexusOperationOptions,
759 ) -> impl CancellableFuture<NexusStartResult> {
760 let seq = self.base.inner.seq_nums.borrow_mut().next_nexus_op_seq();
761 let (result_future, unblocker) = WFCommandFut::new();
762 self.base
763 .send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker });
764 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
765 CancellableID::NexusOp(seq),
766 NexusUnblockData {
767 result_future: result_future.shared(),
768 schedule_seq: seq,
769 base_ctx: self.base.clone(),
770 },
771 self.base.clone(),
772 );
773 self.base.send(
774 CommandCreateRequest {
775 cmd: opts.into_command(seq),
776 unblocker,
777 }
778 .into(),
779 );
780 cmd
781 }
782
783 pub(crate) fn view(&self) -> WorkflowContextView {
785 self.base.view()
786 }
787}
788
789impl<W> WorkflowContext<W> {
790 pub(crate) fn from_base(base: BaseWorkflowContext, workflow_state: Rc<RefCell<W>>) -> Self {
792 Self {
793 sync: SyncWorkflowContext {
794 base,
795 headers: Rc::new(HashMap::new()),
796 _phantom: PhantomData,
797 },
798 workflow_state,
799 condition_wakers: Rc::new(RefCell::new(Vec::new())),
800 }
801 }
802
803 pub(crate) fn with_headers(&self, headers: HashMap<String, Payload>) -> Self {
805 Self {
806 sync: SyncWorkflowContext {
807 base: self.sync.base.clone(),
808 headers: Rc::new(headers),
809 _phantom: PhantomData,
810 },
811 workflow_state: self.workflow_state.clone(),
812 condition_wakers: self.condition_wakers.clone(),
813 }
814 }
815
816 pub(crate) fn sync_context(&self) -> SyncWorkflowContext<W> {
818 self.sync.clone()
819 }
820
821 pub fn namespace(&self) -> &str {
825 self.sync.namespace()
826 }
827
828 pub fn task_queue(&self) -> &str {
830 self.sync.task_queue()
831 }
832
833 pub fn workflow_time(&self) -> Option<SystemTime> {
835 self.sync.workflow_time()
836 }
837
838 pub fn history_length(&self) -> u32 {
840 self.sync.history_length()
841 }
842
843 pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
847 self.sync.current_deployment_version()
848 }
849
850 pub fn search_attributes(&self) -> impl Deref<Target = SearchAttributes> + '_ {
852 self.sync.search_attributes()
853 }
854
855 pub fn random_seed(&self) -> u64 {
857 self.sync.random_seed()
858 }
859
860 pub fn is_replaying(&self) -> bool {
862 self.sync.is_replaying()
863 }
864
865 pub fn headers(&self) -> &HashMap<String, Payload> {
867 self.sync.headers()
868 }
869
870 pub fn payload_converter(&self) -> &PayloadConverter {
872 self.sync.payload_converter()
873 }
874
875 pub fn workflow_initial_info(&self) -> &InitializeWorkflow {
877 self.sync.workflow_initial_info()
878 }
879
880 pub async fn cancelled(&self) -> String {
882 self.sync.cancelled().await
883 }
884
885 pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
887 self.sync.timer(opts)
888 }
889
890 pub fn start_activity<AD: ActivityDefinition>(
892 &self,
893 activity: AD,
894 input: impl Into<AD::Input>,
895 opts: ActivityOptions,
896 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
897 where
898 AD::Output: TemporalDeserializable,
899 {
900 self.sync.start_activity(activity, input, opts)
901 }
902
903 pub fn start_local_activity<AD: ActivityDefinition>(
905 &self,
906 activity: AD,
907 input: impl Into<AD::Input>,
908 opts: LocalActivityOptions,
909 ) -> impl CancellableFuture<Result<AD::Output, ActivityExecutionError>>
910 where
911 AD::Output: TemporalDeserializable,
912 {
913 self.sync.start_local_activity(activity, input, opts)
914 }
915
916 pub fn child_workflow(&self, opts: ChildWorkflowOptions) -> ChildWorkflow {
918 self.sync.child_workflow(opts)
919 }
920
921 pub fn patched(&self, patch_id: &str) -> bool {
923 self.sync.patched(patch_id)
924 }
925
926 pub fn deprecate_patch(&self, patch_id: &str) -> bool {
929 self.sync.deprecate_patch(patch_id)
930 }
931
932 pub fn signal_workflow(
934 &self,
935 opts: impl Into<SignalWorkflowOptions>,
936 ) -> impl CancellableFuture<SignalExternalWfResult> {
937 self.sync.signal_workflow(opts)
938 }
939
940 pub fn upsert_search_attributes(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
942 self.sync.upsert_search_attributes(attr_iter)
943 }
944
945 pub fn upsert_memo(&self, attr_iter: impl IntoIterator<Item = (String, Payload)>) {
947 self.sync.upsert_memo(attr_iter)
948 }
949
950 pub fn force_task_fail(&self, with: anyhow::Error) {
952 self.sync.force_task_fail(with)
953 }
954
955 pub fn cancel_external(
957 &self,
958 target: NamespacedWorkflowExecution,
959 reason: String,
960 ) -> impl Future<Output = CancelExternalWfResult> {
961 self.sync.cancel_external(target, reason)
962 }
963
964 pub fn start_nexus_operation(
966 &self,
967 opts: NexusOperationOptions,
968 ) -> impl CancellableFuture<NexusStartResult> {
969 self.sync.start_nexus_operation(opts)
970 }
971
972 pub(crate) fn view(&self) -> WorkflowContextView {
974 self.sync.view()
975 }
976
977 pub fn state<R>(&self, f: impl FnOnce(&W) -> R) -> R {
982 f(&*self.workflow_state.borrow())
983 }
984
985 pub fn state_mut<R>(&self, f: impl FnOnce(&mut W) -> R) -> R {
994 let result = f(&mut *self.workflow_state.borrow_mut());
995 for waker in self.condition_wakers.borrow_mut().drain(..) {
996 waker.wake();
997 }
998 result
999 }
1000
1001 pub fn wait_condition<'a>(
1006 &'a self,
1007 mut condition: impl FnMut(&W) -> bool + 'a,
1008 ) -> impl Future<Output = ()> + 'a {
1009 future::poll_fn(move |cx: &mut Context<'_>| {
1010 if condition(&*self.workflow_state.borrow()) {
1011 Poll::Ready(())
1012 } else {
1013 self.condition_wakers.borrow_mut().push(cx.waker().clone());
1014 Poll::Pending
1015 }
1016 })
1017 }
1018}
1019
1020struct WfCtxProtectedDat {
1021 next_timer_sequence_number: u32,
1022 next_activity_sequence_number: u32,
1023 next_child_workflow_sequence_number: u32,
1024 next_cancel_external_wf_sequence_number: u32,
1025 next_signal_external_wf_sequence_number: u32,
1026 next_nexus_op_sequence_number: u32,
1027}
1028
1029impl WfCtxProtectedDat {
1030 fn next_timer_seq(&mut self) -> u32 {
1031 let seq = self.next_timer_sequence_number;
1032 self.next_timer_sequence_number += 1;
1033 seq
1034 }
1035 fn next_activity_seq(&mut self) -> u32 {
1036 let seq = self.next_activity_sequence_number;
1037 self.next_activity_sequence_number += 1;
1038 seq
1039 }
1040 fn next_child_workflow_seq(&mut self) -> u32 {
1041 let seq = self.next_child_workflow_sequence_number;
1042 self.next_child_workflow_sequence_number += 1;
1043 seq
1044 }
1045 fn next_cancel_external_wf_seq(&mut self) -> u32 {
1046 let seq = self.next_cancel_external_wf_sequence_number;
1047 self.next_cancel_external_wf_sequence_number += 1;
1048 seq
1049 }
1050 fn next_signal_external_wf_seq(&mut self) -> u32 {
1051 let seq = self.next_signal_external_wf_sequence_number;
1052 self.next_signal_external_wf_sequence_number += 1;
1053 seq
1054 }
1055 fn next_nexus_op_seq(&mut self) -> u32 {
1056 let seq = self.next_nexus_op_sequence_number;
1057 self.next_nexus_op_sequence_number += 1;
1058 seq
1059 }
1060}
1061
1062#[derive(Clone, Debug, Default)]
1063pub(crate) struct WorkflowContextSharedData {
1064 pub(crate) changes: HashMap<String, bool>,
1066 pub(crate) is_replaying: bool,
1067 pub(crate) wf_time: Option<SystemTime>,
1068 pub(crate) history_length: u32,
1069 pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
1070 pub(crate) search_attributes: SearchAttributes,
1071 pub(crate) random_seed: u64,
1072}
1073
1074pub trait CancellableFuture<T>: Future<Output = T> {
1077 fn cancel(&self);
1079}
1080
1081pub trait CancellableFutureWithReason<T>: CancellableFuture<T> {
1083 fn cancel_with_reason(&self, reason: String);
1085}
1086
1087struct WFCommandFut<T, D> {
1088 _unused: PhantomData<T>,
1089 result_rx: oneshot::Receiver<UnblockEvent>,
1090 other_dat: Option<D>,
1091}
1092impl<T> WFCommandFut<T, ()> {
1093 fn new() -> (Self, oneshot::Sender<UnblockEvent>) {
1094 Self::new_with_dat(())
1095 }
1096}
1097
1098impl<T, D> WFCommandFut<T, D> {
1099 fn new_with_dat(other_dat: D) -> (Self, oneshot::Sender<UnblockEvent>) {
1100 let (tx, rx) = oneshot::channel();
1101 (
1102 Self {
1103 _unused: PhantomData,
1104 result_rx: rx,
1105 other_dat: Some(other_dat),
1106 },
1107 tx,
1108 )
1109 }
1110}
1111
1112impl<T, D> Unpin for WFCommandFut<T, D> where T: Unblockable<OtherDat = D> {}
1113impl<T, D> Future for WFCommandFut<T, D>
1114where
1115 T: Unblockable<OtherDat = D>,
1116{
1117 type Output = T;
1118
1119 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1120 self.result_rx.poll_unpin(cx).map(|x| {
1121 let od = self
1124 .other_dat
1125 .take()
1126 .expect("Other data must exist when resolving command future");
1127 Unblockable::unblock(x.unwrap(), od)
1128 })
1129 }
1130}
1131
1132struct CancellableWFCommandFut<T, D, ID = CancellableID> {
1133 cmd_fut: WFCommandFut<T, D>,
1134 cancellable_id: ID,
1135 base_ctx: BaseWorkflowContext,
1136}
1137impl<T, ID> CancellableWFCommandFut<T, (), ID> {
1138 fn new(
1139 cancellable_id: ID,
1140 base_ctx: BaseWorkflowContext,
1141 ) -> (Self, oneshot::Sender<UnblockEvent>) {
1142 Self::new_with_dat(cancellable_id, (), base_ctx)
1143 }
1144}
1145impl<T, D, ID> CancellableWFCommandFut<T, D, ID> {
1146 fn new_with_dat(
1147 cancellable_id: ID,
1148 other_dat: D,
1149 base_ctx: BaseWorkflowContext,
1150 ) -> (Self, oneshot::Sender<UnblockEvent>) {
1151 let (cmd_fut, sender) = WFCommandFut::new_with_dat(other_dat);
1152 (
1153 Self {
1154 cmd_fut,
1155 cancellable_id,
1156 base_ctx,
1157 },
1158 sender,
1159 )
1160 }
1161}
1162impl<T, D, ID> Unpin for CancellableWFCommandFut<T, D, ID> where T: Unblockable<OtherDat = D> {}
1163impl<T, D, ID> Future for CancellableWFCommandFut<T, D, ID>
1164where
1165 T: Unblockable<OtherDat = D>,
1166{
1167 type Output = T;
1168
1169 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1170 self.cmd_fut.poll_unpin(cx)
1171 }
1172}
1173
1174impl<T, D, ID> CancellableFuture<T> for CancellableWFCommandFut<T, D, ID>
1175where
1176 T: Unblockable<OtherDat = D>,
1177 ID: Clone + Into<CancellableID>,
1178{
1179 fn cancel(&self) {
1180 self.base_ctx.cancel(self.cancellable_id.clone().into());
1181 }
1182}
1183impl<T, D> CancellableFutureWithReason<T> for CancellableWFCommandFut<T, D, CancellableIDWithReason>
1184where
1185 T: Unblockable<OtherDat = D>,
1186{
1187 fn cancel_with_reason(&self, reason: String) {
1188 let new_id = self.cancellable_id.clone().with_reason(reason);
1189 self.base_ctx.cancel(new_id);
1190 }
1191}
1192
1193struct LATimerBackoffFut {
1194 la_opts: LocalActivityOptions,
1195 activity_type: String,
1196 arguments: Vec<Payload>,
1197 current_fut: Pin<Box<dyn CancellableFuture<ActivityResolution> + Unpin>>,
1198 timer_fut: Option<Pin<Box<dyn CancellableFuture<TimerResult> + Unpin>>>,
1199 base_ctx: BaseWorkflowContext,
1200 next_attempt: u32,
1201 next_sched_time: Option<prost_types::Timestamp>,
1202 did_cancel: AtomicBool,
1203}
1204impl LATimerBackoffFut {
1205 pub(crate) fn new(
1206 activity_type: String,
1207 arguments: Vec<Payload>,
1208 opts: LocalActivityOptions,
1209 base_ctx: BaseWorkflowContext,
1210 ) -> Self {
1211 let current_fut = Box::pin(base_ctx.clone().local_activity_no_timer_retry(
1212 activity_type.clone(),
1213 arguments.clone(),
1214 opts.clone(),
1215 ));
1216 Self {
1217 la_opts: opts,
1218 activity_type,
1219 arguments,
1220 current_fut,
1221 timer_fut: None,
1222 base_ctx,
1223 next_attempt: 1,
1224 next_sched_time: None,
1225 did_cancel: AtomicBool::new(false),
1226 }
1227 }
1228}
1229impl Unpin for LATimerBackoffFut {}
1230impl Future for LATimerBackoffFut {
1231 type Output = ActivityResolution;
1232
1233 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1234 if let Some(tf) = self.timer_fut.as_mut() {
1236 return match tf.poll_unpin(cx) {
1237 Poll::Ready(tr) => {
1238 self.timer_fut = None;
1239 if let TimerResult::Fired = tr {
1241 let mut opts = self.la_opts.clone();
1242 opts.attempt = Some(self.next_attempt);
1243 opts.original_schedule_time
1244 .clone_from(&self.next_sched_time);
1245 self.current_fut =
1246 Box::pin(self.base_ctx.clone().local_activity_no_timer_retry(
1247 self.activity_type.clone(),
1248 self.arguments.clone(),
1249 opts,
1250 ));
1251 Poll::Pending
1252 } else {
1253 Poll::Ready(ActivityResolution {
1254 status: Some(
1255 activity_resolution::Status::Cancelled(Default::default()),
1256 ),
1257 })
1258 }
1259 }
1260 Poll::Pending => Poll::Pending,
1261 };
1262 }
1263 let poll_res = self.current_fut.poll_unpin(cx);
1264 if let Poll::Ready(ref r) = poll_res
1265 && let Some(activity_resolution::Status::Backoff(b)) = r.status.as_ref()
1266 {
1267 if self.did_cancel.load(Ordering::Acquire) {
1271 return Poll::Ready(ActivityResolution {
1272 status: Some(activity_resolution::Status::Cancelled(Default::default())),
1273 });
1274 }
1275
1276 let timer_f = self.base_ctx.timer::<Duration>(
1277 b.backoff_duration
1278 .expect("Duration is set")
1279 .try_into()
1280 .expect("duration converts ok"),
1281 );
1282 self.timer_fut = Some(Box::pin(timer_f));
1283 self.next_attempt = b.attempt;
1284 self.next_sched_time.clone_from(&b.original_schedule_time);
1285 return Poll::Pending;
1286 }
1287 poll_res
1288 }
1289}
1290impl CancellableFuture<ActivityResolution> for LATimerBackoffFut {
1291 fn cancel(&self) {
1292 self.did_cancel.store(true, Ordering::Release);
1293 if let Some(tf) = self.timer_fut.as_ref() {
1294 tf.cancel();
1295 }
1296 self.current_fut.cancel();
1297 }
1298}
1299
1300enum ActivityFut<F, Output> {
1302 Errored {
1304 error: Option<ActivityExecutionError>,
1305 _phantom: PhantomData<Output>,
1306 },
1307 Running {
1309 inner: F,
1310 payload_converter: PayloadConverter,
1311 _phantom: PhantomData<Output>,
1312 },
1313}
1314
1315impl<F, Output> ActivityFut<F, Output> {
1316 fn eager(err: ActivityExecutionError) -> Self {
1317 Self::Errored {
1318 error: Some(err),
1319 _phantom: PhantomData,
1320 }
1321 }
1322
1323 fn running(inner: F, payload_converter: PayloadConverter) -> Self {
1324 Self::Running {
1325 inner,
1326 payload_converter,
1327 _phantom: PhantomData,
1328 }
1329 }
1330}
1331
1332impl<F, Output> Unpin for ActivityFut<F, Output> where F: Unpin {}
1333
1334impl<F, Output> Future for ActivityFut<F, Output>
1335where
1336 F: Future<Output = ActivityResolution> + Unpin,
1337 Output: TemporalDeserializable + 'static,
1338{
1339 type Output = Result<Output, ActivityExecutionError>;
1340
1341 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1342 match self.get_mut() {
1343 ActivityFut::Errored { error, .. } => {
1344 Poll::Ready(Err(error.take().expect("polled after completion")))
1345 }
1346 ActivityFut::Running {
1347 inner,
1348 payload_converter,
1349 ..
1350 } => match Pin::new(inner).poll(cx) {
1351 Poll::Pending => Poll::Pending,
1352 Poll::Ready(resolution) => Poll::Ready({
1353 let status = resolution.status.ok_or_else(|| {
1354 ActivityExecutionError::Failed(Box::new(Failure {
1355 message: "Activity completed without a status".to_string(),
1356 ..Default::default()
1357 }))
1358 })?;
1359
1360 match status {
1361 activity_resolution::Status::Completed(success) => {
1362 let payload = success.result.unwrap_or_default();
1363 let ctx = SerializationContext {
1364 data: &SerializationContextData::Workflow,
1365 converter: payload_converter,
1366 };
1367 payload_converter
1368 .from_payload::<Output>(&ctx, payload)
1369 .map_err(ActivityExecutionError::Serialization)
1370 }
1371 activity_resolution::Status::Failed(f) => Err(
1372 ActivityExecutionError::Failed(Box::new(f.failure.unwrap_or_default())),
1373 ),
1374 activity_resolution::Status::Cancelled(c) => {
1375 Err(ActivityExecutionError::Cancelled(Box::new(
1376 c.failure.unwrap_or_default(),
1377 )))
1378 }
1379 activity_resolution::Status::Backoff(_) => {
1380 panic!("DoBackoff should be handled by LATimerBackoffFut")
1381 }
1382 }
1383 }),
1384 },
1385 }
1386 }
1387}
1388
1389impl<F, Output> CancellableFuture<Result<Output, ActivityExecutionError>> for ActivityFut<F, Output>
1390where
1391 F: CancellableFuture<ActivityResolution> + Unpin,
1392 Output: TemporalDeserializable + 'static,
1393{
1394 fn cancel(&self) {
1395 match self {
1396 ActivityFut::Errored { .. } => {}
1397 ActivityFut::Running { inner, .. } => inner.cancel(),
1398 }
1399 }
1400}
1401
1402#[derive(Clone, derive_more::Debug)]
1404pub struct ChildWorkflow {
1405 opts: ChildWorkflowOptions,
1406 #[debug(skip)]
1407 base_ctx: BaseWorkflowContext,
1408}
1409
1410pub(crate) struct ChildWfCommon {
1411 workflow_id: String,
1412 result_future: CancellableWFCommandFut<ChildWorkflowResult, (), CancellableIDWithReason>,
1413 base_ctx: BaseWorkflowContext,
1414}
1415
1416#[derive(derive_more::Debug)]
1418pub struct PendingChildWorkflow {
1419 pub status: ChildWorkflowStartStatus,
1421 #[debug(skip)]
1422 pub(crate) common: ChildWfCommon,
1423}
1424
1425impl PendingChildWorkflow {
1426 pub fn into_started(self) -> Option<StartedChildWorkflow> {
1429 match self.status {
1430 ChildWorkflowStartStatus::Succeeded(s) => Some(StartedChildWorkflow {
1431 run_id: s.run_id,
1432 common: self.common,
1433 }),
1434 _ => None,
1435 }
1436 }
1437}
1438
1439#[derive(derive_more::Debug)]
1441pub struct StartedChildWorkflow {
1442 pub run_id: String,
1444 #[debug(skip)]
1445 common: ChildWfCommon,
1446}
1447
1448impl ChildWorkflow {
1449 pub fn start(self) -> impl CancellableFutureWithReason<PendingChildWorkflow> {
1451 let child_seq = self
1452 .base_ctx
1453 .inner
1454 .seq_nums
1455 .borrow_mut()
1456 .next_child_workflow_seq();
1457 let cancel_seq = self
1461 .base_ctx
1462 .inner
1463 .seq_nums
1464 .borrow_mut()
1465 .next_cancel_external_wf_seq();
1466 let (result_cmd, unblocker) = CancellableWFCommandFut::new(
1467 CancellableIDWithReason::ExternalWorkflow {
1468 seqnum: cancel_seq,
1469 execution: NamespacedWorkflowExecution {
1470 workflow_id: self.opts.workflow_id.clone(),
1471 ..Default::default()
1472 },
1473 },
1474 self.base_ctx.clone(),
1475 );
1476 self.base_ctx.send(
1477 CommandSubscribeChildWorkflowCompletion {
1478 seq: child_seq,
1479 unblocker,
1480 }
1481 .into(),
1482 );
1483
1484 let common = ChildWfCommon {
1485 workflow_id: self.opts.workflow_id.clone(),
1486 result_future: result_cmd,
1487 base_ctx: self.base_ctx.clone(),
1488 };
1489
1490 let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat(
1491 CancellableIDWithReason::ChildWorkflow { seqnum: child_seq },
1492 common,
1493 self.base_ctx.clone(),
1494 );
1495 self.base_ctx.send(
1496 CommandCreateRequest {
1497 cmd: self.opts.into_command(child_seq),
1498 unblocker,
1499 }
1500 .into(),
1501 );
1502
1503 cmd
1504 }
1505}
1506
1507impl StartedChildWorkflow {
1508 pub fn result(self) -> impl CancellableFutureWithReason<ChildWorkflowResult> {
1511 self.common.result_future
1512 }
1513
1514 pub fn cancel(&self, reason: String) {
1516 self.common.base_ctx.send(RustWfCmd::NewNonblockingCmd(
1517 CancelChildWorkflowExecution {
1518 child_workflow_seq: self.common.result_future.cancellable_id.seq_num(),
1519 reason,
1520 }
1521 .into(),
1522 ));
1523 }
1524
1525 pub fn signal<S: Into<Signal>>(
1527 &self,
1528 data: S,
1529 ) -> impl CancellableFuture<SignalExternalWfResult> + 'static {
1530 let target = sig_we::Target::ChildWorkflowId(self.common.workflow_id.clone());
1531 self.common
1532 .base_ctx
1533 .clone()
1534 .send_signal_wf(target, data.into())
1535 }
1536}
1537
1538#[derive(derive_more::Debug)]
1539#[debug("StartedNexusOperation{{ operation_token: {operation_token:?} }}")]
1540pub struct StartedNexusOperation {
1541 pub operation_token: Option<String>,
1543 pub(crate) unblock_dat: NexusUnblockData,
1544}
1545
1546pub(crate) struct NexusUnblockData {
1547 result_future: Shared<WFCommandFut<NexusOperationResult, ()>>,
1548 schedule_seq: u32,
1549 base_ctx: BaseWorkflowContext,
1550}
1551
1552impl StartedNexusOperation {
1553 pub async fn result(&self) -> NexusOperationResult {
1554 self.unblock_dat.result_future.clone().await
1555 }
1556
1557 pub fn cancel(&self) {
1558 self.unblock_dat
1559 .base_ctx
1560 .cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq));
1561 }
1562}