1#![warn(missing_docs)] #[macro_use]
71extern crate tracing;
72extern crate self as temporalio_sdk;
73
74pub mod activities;
75pub mod interceptors;
76mod workflow_context;
77mod workflow_executor;
78mod workflow_future;
79pub mod workflows;
80
81#[macro_export]
82#[doc(hidden)]
83macro_rules! __temporal_select {
84 ($($tokens:tt)*) => {
85 ::futures_util::select_biased! { $($tokens)* }
86 };
87}
88
89#[macro_export]
90#[doc(hidden)]
91macro_rules! __temporal_join {
92 ($($tokens:tt)*) => {
93 ::futures_util::join!($($tokens)*)
94 };
95}
96
97use workflow_future::WorkflowFunction;
98
99pub use temporalio_client::Namespace;
100pub use workflow_context::{
101 ActivityCloseTimeouts, ActivityExecutionError, ActivityOptions, BaseWorkflowContext,
102 CancellableFuture, ChildWorkflowExecutionError, ChildWorkflowOptions, ChildWorkflowSignalError,
103 ContinueAsNewOptions, ExternalWorkflowHandle, LocalActivityOptions, NexusOperationOptions,
104 ParentWorkflowInfo, RootWorkflowInfo, Signal, SignalData,
105 StartChildWorkflowExecutionFailedCause, StartedChildWorkflow, SyncWorkflowContext,
106 TimerOptions, WorkflowContext, WorkflowContextView,
107};
108
109use crate::{
110 activities::{
111 ActivityContext, ActivityDefinitions, ActivityError, ActivityImplementer,
112 ExecutableActivity,
113 },
114 interceptors::WorkerInterceptor,
115 workflow_context::{
116 ChildWfCommon, NexusUnblockData, PendingChildWorkflow, StartedNexusOperation,
117 },
118 workflow_executor::WorkflowExecutor,
119 workflows::{WorkflowDefinitions, WorkflowImplementation, WorkflowImplementer},
120};
121use anyhow::{Context, anyhow, bail};
122use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
123use std::{
124 any::{Any, TypeId},
125 cell::RefCell,
126 collections::{HashMap, HashSet},
127 fmt::{Debug, Display, Formatter},
128 future::Future,
129 marker::PhantomData,
130 panic::AssertUnwindSafe,
131 sync::Arc,
132 time::Duration,
133};
134use temporalio_client::{Client, NamespacedClient};
135use temporalio_common::{
136 ActivityDefinition, WorkflowDefinition,
137 data_converters::{DataConverter, SerializationContextData},
138 payload_visitor::{decode_payloads, encode_payloads},
139 protos::{
140 TaskToken,
141 coresdk::{
142 ActivityTaskCompletion, AsJsonPayloadExt,
143 activity_result::{ActivityExecutionResult, ActivityResolution},
144 activity_task::{ActivityTask, activity_task},
145 child_workflow::ChildWorkflowResult,
146 nexus::NexusOperationResult,
147 workflow_activation::{
148 WorkflowActivation,
149 resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
150 resolve_nexus_operation_start, workflow_activation_job::Variant,
151 },
152 workflow_commands::{
153 ContinueAsNewWorkflowExecution, WorkflowCommand, workflow_command,
154 },
155 workflow_completion::WorkflowActivationCompletion,
156 },
157 temporal::api::{
158 common::v1::Payload,
159 enums::v1::WorkflowTaskFailedCause,
160 failure::v1::{Failure, failure},
161 },
162 },
163 worker::{WorkerDeploymentOptions, WorkerTaskTypes, build_id_from_current_exe},
164};
165use temporalio_sdk_core::{
166 CoreRuntime, PollError, PollerBehavior, TunerBuilder, Worker as CoreWorker, WorkerConfig,
167 WorkerTuner, WorkerVersioningStrategy, WorkflowErrorType, init_worker,
168};
169use tokio::sync::{
170 Notify,
171 mpsc::{UnboundedSender, unbounded_channel},
172 oneshot,
173};
174use tokio_stream::wrappers::UnboundedReceiverStream;
175use tokio_util::sync::CancellationToken;
176use tracing::{Instrument, Span, field};
177use uuid::Uuid;
178
179#[derive(bon::Builder, Clone)]
181#[builder(start_fn = new, on(String, into), state_mod(vis = "pub"))]
182#[non_exhaustive]
183pub struct WorkerOptions {
184 #[builder(start_fn)]
187 pub task_queue: String,
188
189 #[builder(field)]
190 activities: ActivityDefinitions,
191
192 #[builder(field)]
193 workflows: WorkflowDefinitions,
194
195 #[builder(default = def_build_id())]
198 pub deployment_options: WorkerDeploymentOptions,
199 pub client_identity_override: Option<String>,
203 #[builder(default = 1000)]
209 pub max_cached_workflows: usize,
210 #[builder(default = Arc::new(TunerBuilder::default().build()))]
213 pub tuner: Arc<dyn WorkerTuner + Send + Sync>,
214 #[builder(default = PollerBehavior::SimpleMaximum(5))]
218 pub workflow_task_poller_behavior: PollerBehavior,
219 #[builder(default = 0.2)]
227 pub nonsticky_to_sticky_poll_ratio: f32,
228 #[builder(default = PollerBehavior::SimpleMaximum(5))]
230 pub activity_task_poller_behavior: PollerBehavior,
231 #[builder(default = PollerBehavior::SimpleMaximum(5))]
233 pub nexus_task_poller_behavior: PollerBehavior,
234 #[builder(default = WorkerTaskTypes::all())]
240 pub task_types: WorkerTaskTypes,
241 #[builder(default = Duration::from_secs(10))]
244 pub sticky_queue_schedule_to_start_timeout: Duration,
245 #[builder(default = Duration::from_secs(60))]
247 pub max_heartbeat_throttle_interval: Duration,
248 #[builder(default = Duration::from_secs(30))]
253 pub default_heartbeat_throttle_interval: Duration,
254 pub max_task_queue_activities_per_second: Option<f64>,
261 pub max_worker_activities_per_second: Option<f64>,
266 #[builder(default)]
269 pub workflow_failure_errors: HashSet<WorkflowErrorType>,
270 #[builder(default)]
273 pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
274 pub graceful_shutdown_period: Option<Duration>,
277 #[builder(default = true)]
281 pub detect_nondeterministic_futures: bool,
282}
283
284impl<S: worker_options_builder::State> WorkerOptionsBuilder<S> {
285 pub fn register_activities<AI: ActivityImplementer>(mut self, instance: AI) -> Self {
287 self.activities.register_activities::<AI>(instance);
288 self
289 }
290 pub fn register_activity<AD>(mut self, instance: Arc<AD::Implementer>) -> Self
292 where
293 AD: ActivityDefinition + ExecutableActivity,
294 AD::Output: Send + Sync,
295 {
296 self.activities.register_activity::<AD>(instance);
297 self
298 }
299
300 pub fn register_workflow<WI: WorkflowImplementer>(mut self) -> Self {
302 self.workflows.register_workflow::<WI>();
303 self
304 }
305
306 pub fn register_workflow_with_factory<W, F>(mut self, factory: F) -> Self
324 where
325 W: WorkflowImplementation,
326 <W::Run as WorkflowDefinition>::Input: Send,
327 F: Fn() -> W + Send + Sync + 'static,
328 {
329 self.workflows
330 .register_workflow_run_with_factory::<W, F>(factory);
331 self
332 }
333}
334
335fn def_build_id() -> WorkerDeploymentOptions {
337 WorkerDeploymentOptions::from_build_id(build_id_from_current_exe().to_owned())
338}
339
340impl WorkerOptions {
341 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
343 self.activities.register_activities::<AI>(instance);
344 self
345 }
346 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
348 where
349 AD: ActivityDefinition + ExecutableActivity,
350 AD::Output: Send + Sync,
351 {
352 self.activities.register_activity::<AD>(instance);
353 self
354 }
355 pub fn activities(&self) -> ActivityDefinitions {
357 self.activities.clone()
358 }
359
360 pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
362 self.workflows.register_workflow::<WI>();
363 self
364 }
365
366 pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
371 where
372 W: WorkflowImplementation,
373 <W::Run as WorkflowDefinition>::Input: Send,
374 F: Fn() -> W + Send + Sync + 'static,
375 {
376 self.workflows
377 .register_workflow_run_with_factory::<W, F>(factory);
378 self
379 }
380
381 pub fn workflows(&self) -> WorkflowDefinitions {
383 self.workflows.clone()
384 }
385
386 #[doc(hidden)]
387 pub fn to_core_options(
388 &self,
389 namespace: String,
390 connection_identity: String,
391 ) -> Result<WorkerConfig, String> {
392 WorkerConfig::builder()
393 .namespace(namespace)
394 .task_queue(self.task_queue.clone())
395 .maybe_client_identity_override(self.client_identity_override.clone().or_else(|| {
396 connection_identity.is_empty().then(|| {
397 format!(
398 "{}@{}",
399 std::process::id(),
400 gethostname::gethostname().to_string_lossy()
401 )
402 })
403 }))
404 .max_cached_workflows(self.max_cached_workflows)
405 .tuner(self.tuner.clone())
406 .workflow_task_poller_behavior(self.workflow_task_poller_behavior)
407 .activity_task_poller_behavior(self.activity_task_poller_behavior)
408 .nexus_task_poller_behavior(self.nexus_task_poller_behavior)
409 .task_types(self.task_types)
410 .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
411 .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
412 .default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
413 .maybe_max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
414 .maybe_max_worker_activities_per_second(self.max_worker_activities_per_second)
415 .maybe_graceful_shutdown_period(self.graceful_shutdown_period)
416 .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
417 self.deployment_options.clone(),
418 ))
419 .workflow_failure_errors(self.workflow_failure_errors.clone())
420 .workflow_types_to_failure_errors(self.workflow_types_to_failure_errors.clone())
421 .build()
422 }
423}
424
425pub struct Worker {
429 common: CommonWorker,
430 workflow_half: WorkflowHalf,
431 activity_half: ActivityHalf,
432}
433
434struct CommonWorker {
435 worker: Arc<CoreWorker>,
436 task_queue: String,
437 worker_interceptor: Option<Box<dyn WorkerInterceptor>>,
438 data_converter: DataConverter,
439}
440
441struct WorkflowHalf {
442 workflows: RefCell<HashMap<String, WorkflowData>>,
444 workflow_definitions: WorkflowDefinitions,
445 workflow_removed_from_map: Notify,
446 detect_nondeterministic_futures: bool,
447}
448struct WorkflowData {
449 activation_chan: UnboundedSender<WorkflowActivation>,
451}
452
453struct WorkflowFutureHandle<F: Future> {
454 join_handle: F,
455 run_id: String,
456}
457
458#[derive(Default)]
459struct ActivityHalf {
460 activities: ActivityDefinitions,
462 task_tokens_to_cancels: HashMap<TaskToken, CancellationToken>,
463}
464
465impl Worker {
466 pub fn new(
468 runtime: &CoreRuntime,
469 client: Client,
470 mut options: WorkerOptions,
471 ) -> Result<Self, Box<dyn std::error::Error>> {
472 let acts = std::mem::take(&mut options.activities);
473 let wfs = std::mem::take(&mut options.workflows);
474 let wc = options
475 .to_core_options(client.namespace(), client.identity())
476 .map_err(|s| anyhow::anyhow!("{s}"))?;
477 let core = init_worker(runtime, wc, client.connection().clone())?;
478 let mut me = Self::new_from_core_definitions(
479 Arc::new(core),
480 client.data_converter().clone(),
481 Default::default(),
482 Default::default(),
483 );
484 me.set_detect_nondeterministic_futures(options.detect_nondeterministic_futures);
485 me.activity_half.activities = acts;
486 me.workflow_half.workflow_definitions = wfs;
487 Ok(me)
488 }
489
490 #[doc(hidden)]
492 pub fn new_from_core(worker: Arc<CoreWorker>, data_converter: DataConverter) -> Self {
493 Self::new_from_core_definitions(
494 worker,
495 data_converter,
496 Default::default(),
497 Default::default(),
498 )
499 }
500
501 #[doc(hidden)]
503 pub fn new_from_core_definitions(
504 worker: Arc<CoreWorker>,
505 data_converter: DataConverter,
506 activities: ActivityDefinitions,
507 workflows: WorkflowDefinitions,
508 ) -> Self {
509 Self {
510 common: CommonWorker {
511 task_queue: worker.get_config().task_queue.clone(),
512 worker,
513 worker_interceptor: None,
514 data_converter,
515 },
516 workflow_half: WorkflowHalf {
517 workflows: Default::default(),
518 workflow_definitions: workflows,
519 workflow_removed_from_map: Default::default(),
520 detect_nondeterministic_futures: false,
521 },
522 activity_half: ActivityHalf {
523 activities,
524 ..Default::default()
525 },
526 }
527 }
528
529 pub fn task_queue(&self) -> &str {
531 &self.common.task_queue
532 }
533
534 #[doc(hidden)]
535 pub fn set_detect_nondeterministic_futures(&mut self, enabled: bool) {
538 self.workflow_half.detect_nondeterministic_futures = enabled;
539 }
540
541 pub fn shutdown_handle(&self) -> impl Fn() + use<> {
544 let w = self.common.worker.clone();
545 move || w.initiate_shutdown()
546 }
547
548 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
550 self.activity_half
551 .activities
552 .register_activities::<AI>(instance);
553 self
554 }
555 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
557 where
558 AD: ActivityDefinition + ExecutableActivity,
559 AD::Output: Send + Sync,
560 {
561 self.activity_half
562 .activities
563 .register_activity::<AD>(instance);
564 self
565 }
566
567 pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
569 self.workflow_half
570 .workflow_definitions
571 .register_workflow::<WI>();
572 self
573 }
574
575 pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
579 where
580 W: WorkflowImplementation,
581 <W::Run as WorkflowDefinition>::Input: Send,
582 F: Fn() -> W + Send + Sync + 'static,
583 {
584 self.workflow_half
585 .workflow_definitions
586 .register_workflow_run_with_factory::<W, F>(factory);
587 self
588 }
589
590 pub async fn run(&mut self) -> Result<(), anyhow::Error> {
593 let shutdown_token = CancellationToken::new();
594 let (common, wf_half, act_half) = self.split_apart();
595 let (wf_future_tx, wf_future_rx) = unbounded_channel::<
596 WorkflowFutureHandle<workflow_executor::TaskHandle<WorkflowResult<Payload>>>,
597 >();
598 let (completions_tx, completions_rx) = unbounded_channel();
599
600 let workflow_local_set = tokio::task::LocalSet::new();
605 let executor = WorkflowExecutor::new();
606
607 let wf_future_joiner = async {
608 UnboundedReceiverStream::new(wf_future_rx)
609 .map(Result::<_, anyhow::Error>::Ok)
610 .try_for_each_concurrent(
611 None,
612 |WorkflowFutureHandle {
613 join_handle,
614 run_id,
615 }| {
616 let wf_half = &*wf_half;
617 async move {
618 let result = join_handle.await.map_err(|e| anyhow::anyhow!("{e}"))?;
619 if let Err(e) = result
622 && !matches!(e, WorkflowTermination::Evicted)
623 {
624 return Err(e.into());
625 }
626 debug!(run_id=%run_id, "Removing workflow from cache");
627 wf_half.workflows.borrow_mut().remove(&run_id);
628 wf_half.workflow_removed_from_map.notify_one();
629 Ok(())
630 }
631 },
632 )
633 .await
634 .context("Workflow futures encountered an error")
635 };
636 let wf_completion_processor = async {
637 UnboundedReceiverStream::new(completions_rx)
638 .map(Ok)
639 .try_for_each_concurrent(None, |mut completion| async {
640 encode_payloads(
641 &mut completion,
642 common.data_converter.codec(),
643 &SerializationContextData::Workflow,
644 )
645 .await;
646 if let Some(ref i) = common.worker_interceptor {
647 i.on_workflow_activation_completion(&completion).await;
648 }
649 common.worker.complete_workflow_activation(completion).await
650 })
651 .map_err(anyhow::Error::from)
652 .await
653 .context("Workflow completions processor encountered an error")
654 };
655 tokio::try_join!(
656 async {
658 workflow_local_set.run_until(async {
659 tokio::try_join!(
660 async {
662 loop {
663 let mut activation =
664 match common.worker.poll_workflow_activation().await {
665 Err(PollError::ShutDown) => {
666 break;
667 }
668 o => o?,
669 };
670 decode_payloads(
671 &mut activation,
672 common.data_converter.codec(),
673 &SerializationContextData::Workflow,
674 )
675 .await;
676 if let Some(ref i) = common.worker_interceptor {
677 i.on_workflow_activation(&activation).await?;
678 }
679 if let Some(wf_fut) = wf_half
680 .workflow_activation_handler(
681 common,
682 shutdown_token.clone(),
683 activation,
684 &completions_tx,
685 &executor,
686 )
687 .await?
688 && wf_future_tx.send(wf_fut).is_err()
689 {
690 panic!(
691 "Receive half of completion processor channel cannot be dropped"
692 );
693 }
694 executor.process_tasks();
697 }
698 shutdown_token.cancel();
700 drop(wf_future_tx);
703 drop(completions_tx);
704 executor.shutdown().await;
705 Result::<_, anyhow::Error>::Ok(())
706 },
707 wf_future_joiner,
708 )
709 }).await
710 },
711 async {
714 if !act_half.activities.is_empty() {
715 loop {
716 let activity = common.worker.poll_activity_task().await;
717 if matches!(activity, Err(PollError::ShutDown)) {
718 break;
719 }
720 let mut activity = activity?;
721 decode_payloads(
722 &mut activity,
723 common.data_converter.codec(),
724 &SerializationContextData::Activity,
725 )
726 .await;
727 act_half.activity_task_handler(
728 common.worker.clone(),
729 common.task_queue.clone(),
730 common.data_converter.clone(),
731 activity,
732 )?;
733 }
734 };
735 Result::<_, anyhow::Error>::Ok(())
736 },
737 wf_completion_processor,
738 )?;
739
740 if let Some(i) = self.common.worker_interceptor.as_ref() {
741 i.on_shutdown(self);
742 }
743 self.common.worker.shutdown().await;
744 Ok(())
745 }
746
747 pub fn set_worker_interceptor(&mut self, interceptor: impl WorkerInterceptor + 'static) {
749 self.common.worker_interceptor = Some(Box::new(interceptor));
750 }
751
752 pub fn with_new_core_worker(&mut self, new_core_worker: Arc<CoreWorker>) {
756 self.common.worker = new_core_worker;
757 }
758
759 pub fn cached_workflows(&self) -> usize {
762 self.workflow_half.workflows.borrow().len()
763 }
764
765 pub fn worker_instance_key(&self) -> Uuid {
767 self.common.worker.worker_instance_key()
768 }
769
770 #[doc(hidden)]
771 pub fn core_worker(&self) -> Arc<CoreWorker> {
772 self.common.worker.clone()
773 }
774
775 fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) {
776 (
777 &mut self.common,
778 &mut self.workflow_half,
779 &mut self.activity_half,
780 )
781 }
782}
783
784impl WorkflowHalf {
785 #[allow(clippy::type_complexity)]
786 async fn workflow_activation_handler(
787 &self,
788 common: &CommonWorker,
789 shutdown_token: CancellationToken,
790 mut activation: WorkflowActivation,
791 completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
792 executor: &WorkflowExecutor,
793 ) -> Result<
794 Option<WorkflowFutureHandle<workflow_executor::TaskHandle<WorkflowResult<Payload>>>>,
795 anyhow::Error,
796 > {
797 let mut res = None;
798 let run_id = activation.run_id.clone();
799
800 if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
803 Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
804 _ => None,
805 }) {
806 let workflow_type = sw.workflow_type.clone();
807 let payload_converter = common.data_converter.payload_converter().clone();
808 let (wff, activations) = {
809 if let Some(factory) = self.workflow_definitions.get_workflow(&workflow_type) {
810 match WorkflowFunction::from_invocation(factory).start_workflow(
811 common.worker.get_config().namespace.clone(),
812 common.task_queue.clone(),
813 run_id.clone(),
814 std::mem::take(sw),
815 completions_tx.clone(),
816 payload_converter,
817 self.detect_nondeterministic_futures,
818 ) {
819 Ok(result) => result,
820 Err(e) => {
821 warn!("Failed to create workflow {workflow_type}: {e}");
822 completions_tx
823 .send(WorkflowActivationCompletion::fail(
824 run_id,
825 format!("Failed to create workflow: {e}").into(),
826 Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
827 ))
828 .expect("Completion channel intact");
829 return Ok(None);
830 }
831 }
832 } else {
833 warn!("Workflow type {workflow_type} not found");
834 completions_tx
835 .send(WorkflowActivationCompletion::fail(
836 run_id,
837 format!("Workflow type {workflow_type} not found").into(),
838 Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
839 ))
840 .expect("Completion channel intact");
841 return Ok(None);
842 }
843 };
844 let jh = executor.spawn(async move {
846 tokio::select! {
847 r = wff.fuse() => r,
848 _ = shutdown_token.cancelled() => {
851 Err(WorkflowTermination::Evicted)
852 }
853 }
854 });
855 res = Some(WorkflowFutureHandle {
856 join_handle: jh,
857 run_id: run_id.clone(),
858 });
859 loop {
860 if self.workflows.borrow_mut().contains_key(&run_id) {
865 self.workflow_removed_from_map.notified().await;
866 } else {
867 break;
868 }
869 }
870 self.workflows.borrow_mut().insert(
871 run_id.clone(),
872 WorkflowData {
873 activation_chan: activations,
874 },
875 );
876 }
877
878 if let Some(dat) = self.workflows.borrow_mut().get_mut(&run_id) {
881 dat.activation_chan
882 .send(activation)
883 .expect("Workflow should exist if we're sending it an activation");
884 } else {
885 if activation.jobs.len() == 1
891 && matches!(
892 activation.jobs.first().map(|j| &j.variant),
893 Some(Some(Variant::RemoveFromCache(_)))
894 )
895 {
896 completions_tx
897 .send(WorkflowActivationCompletion::from_cmds(run_id, vec![]))
898 .expect("Completion channel intact");
899 return Ok(None);
900 }
901
902 bail!("Got activation {activation:?} for unknown workflow {run_id}");
905 };
906
907 Ok(res)
908 }
909}
910
911impl ActivityHalf {
912 fn activity_task_handler(
914 &mut self,
915 worker: Arc<CoreWorker>,
916 task_queue: String,
917 data_converter: DataConverter,
918 activity: ActivityTask,
919 ) -> Result<(), anyhow::Error> {
920 match activity.variant {
921 Some(activity_task::Variant::Start(start)) => {
922 let act_fn = self.activities.get(&start.activity_type).ok_or_else(|| {
923 anyhow!(
924 "No function registered for activity type {}",
925 start.activity_type
926 )
927 })?;
928 let span = info_span!(
929 "RunActivity",
930 "otel.name" = format!("RunActivity:{}", start.activity_type),
931 "otel.kind" = "server",
932 "temporalActivityID" = start.activity_id,
933 "temporalWorkflowID" = field::Empty,
934 "temporalRunID" = field::Empty,
935 );
936 let ct = CancellationToken::new();
937 let task_token = activity.task_token;
938 self.task_tokens_to_cancels
939 .insert(task_token.clone().into(), ct.clone());
940
941 let (ctx, args) =
942 ActivityContext::new(worker.clone(), ct, task_queue, task_token.clone(), start);
943 let codec_data_converter = data_converter.clone();
944
945 tokio::spawn(async move {
946 let act_fut = async move {
947 if let Some(info) = &ctx.info().workflow_execution {
948 Span::current()
949 .record("temporalWorkflowID", &info.workflow_id)
950 .record("temporalRunID", &info.run_id);
951 }
952 (act_fn)(args, data_converter, ctx).await
953 }
954 .instrument(span);
955 let output = AssertUnwindSafe(act_fut).catch_unwind().await;
956 let result = match output {
957 Err(e) => ActivityExecutionResult::fail(Failure::application_failure(
958 format!("Activity function panicked: {}", panic_formatter(e)),
959 true,
960 )),
961 Ok(Ok(p)) => ActivityExecutionResult::ok(p),
962 Ok(Err(err)) => match err {
963 ActivityError::Retryable {
964 source,
965 explicit_delay,
966 } => ActivityExecutionResult::fail({
967 let mut f = Failure::application_failure_from_error(
968 anyhow::Error::from_boxed(source),
969 false,
970 );
971 if let Some(d) = explicit_delay
972 && let Some(failure::FailureInfo::ApplicationFailureInfo(fi)) =
973 f.failure_info.as_mut()
974 {
975 fi.next_retry_delay = d.try_into().ok();
976 }
977 f
978 }),
979 ActivityError::Cancelled { details } => {
980 ActivityExecutionResult::cancel_from_details(details)
981 }
982 ActivityError::NonRetryable(nre) => ActivityExecutionResult::fail(
983 Failure::application_failure_from_error(
984 anyhow::Error::from_boxed(nre),
985 true,
986 ),
987 ),
988 ActivityError::WillCompleteAsync => {
989 ActivityExecutionResult::will_complete_async()
990 }
991 },
992 };
993 let mut completion = ActivityTaskCompletion {
994 task_token,
995 result: Some(result),
996 };
997 encode_payloads(
998 &mut completion,
999 codec_data_converter.codec(),
1000 &SerializationContextData::Activity,
1001 )
1002 .await;
1003 worker.complete_activity_task(completion).await?;
1004 Ok::<_, anyhow::Error>(())
1005 });
1006 }
1007 Some(activity_task::Variant::Cancel(_)) => {
1008 if let Some(ct) = self
1009 .task_tokens_to_cancels
1010 .get(activity.task_token.as_slice())
1011 {
1012 ct.cancel();
1013 }
1014 }
1015 None => bail!("Undefined activity task variant"),
1016 }
1017 Ok(())
1018 }
1019}
1020
1021#[derive(Debug)]
1022enum UnblockEvent {
1023 Timer(u32, TimerResult),
1024 Activity(u32, Box<ActivityResolution>),
1025 WorkflowStart(u32, Box<ChildWorkflowStartStatus>),
1026 WorkflowComplete(u32, Box<ChildWorkflowResult>),
1027 SignalExternal(u32, Option<Failure>),
1028 CancelExternal(u32, Option<Failure>),
1029 NexusOperationStart(u32, Box<resolve_nexus_operation_start::Status>),
1030 NexusOperationComplete(u32, Box<NexusOperationResult>),
1031}
1032
1033#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1035pub enum TimerResult {
1036 Cancelled,
1038 Fired,
1040}
1041
1042#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1044pub struct SignalExternalOk;
1045pub type SignalExternalWfResult = Result<SignalExternalOk, Failure>;
1047
1048#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1050pub struct CancelExternalOk;
1051pub type CancelExternalWfResult = Result<CancelExternalOk, Failure>;
1053
1054trait Unblockable {
1055 type OtherDat;
1056
1057 fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self;
1058}
1059
1060impl Unblockable for TimerResult {
1061 type OtherDat = ();
1062 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1063 match ue {
1064 UnblockEvent::Timer(_, result) => result,
1065 _ => panic!("Invalid unblock event for timer"),
1066 }
1067 }
1068}
1069
1070impl Unblockable for ActivityResolution {
1071 type OtherDat = ();
1072 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1073 match ue {
1074 UnblockEvent::Activity(_, result) => *result,
1075 _ => panic!("Invalid unblock event for activity"),
1076 }
1077 }
1078}
1079
1080impl<WD: WorkflowDefinition> Unblockable for PendingChildWorkflow<WD> {
1081 type OtherDat = ChildWfCommon;
1082 fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1083 match ue {
1084 UnblockEvent::WorkflowStart(_, result) => Self {
1085 status: *result,
1086 common: od,
1087 _phantom: PhantomData,
1088 },
1089 _ => panic!("Invalid unblock event for child workflow start"),
1090 }
1091 }
1092}
1093
1094impl Unblockable for ChildWorkflowResult {
1095 type OtherDat = ();
1096 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1097 match ue {
1098 UnblockEvent::WorkflowComplete(_, result) => *result,
1099 _ => panic!("Invalid unblock event for child workflow complete"),
1100 }
1101 }
1102}
1103
1104impl Unblockable for SignalExternalWfResult {
1105 type OtherDat = ();
1106 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1107 match ue {
1108 UnblockEvent::SignalExternal(_, maybefail) => {
1109 maybefail.map_or(Ok(SignalExternalOk), Err)
1110 }
1111 _ => panic!("Invalid unblock event for signal external workflow result"),
1112 }
1113 }
1114}
1115
1116impl Unblockable for CancelExternalWfResult {
1117 type OtherDat = ();
1118 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1119 match ue {
1120 UnblockEvent::CancelExternal(_, maybefail) => {
1121 maybefail.map_or(Ok(CancelExternalOk), Err)
1122 }
1123 _ => panic!("Invalid unblock event for signal external workflow result"),
1124 }
1125 }
1126}
1127
1128type NexusStartResult = Result<StartedNexusOperation, Failure>;
1129impl Unblockable for NexusStartResult {
1130 type OtherDat = NexusUnblockData;
1131 fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1132 match ue {
1133 UnblockEvent::NexusOperationStart(_, result) => match *result {
1134 resolve_nexus_operation_start::Status::OperationToken(op_token) => {
1135 Ok(StartedNexusOperation {
1136 operation_token: Some(op_token),
1137 unblock_dat: od,
1138 })
1139 }
1140 resolve_nexus_operation_start::Status::StartedSync(_) => {
1141 Ok(StartedNexusOperation {
1142 operation_token: None,
1143 unblock_dat: od,
1144 })
1145 }
1146 resolve_nexus_operation_start::Status::Failed(f) => Err(f),
1147 },
1148 _ => panic!("Invalid unblock event for nexus operation"),
1149 }
1150 }
1151}
1152
1153impl Unblockable for NexusOperationResult {
1154 type OtherDat = ();
1155
1156 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1157 match ue {
1158 UnblockEvent::NexusOperationComplete(_, result) => *result,
1159 _ => panic!("Invalid unblock event for nexus operation complete"),
1160 }
1161 }
1162}
1163
1164#[derive(Debug, Clone)]
1166pub(crate) enum CancellableID {
1167 Timer(u32),
1168 Activity(u32),
1169 LocalActivity(u32),
1170 ChildWorkflow {
1171 seqnum: u32,
1172 reason: String,
1173 },
1174 SignalExternalWorkflow(u32),
1175 NexusOp(u32),
1177}
1178
1179pub(crate) trait SupportsCancelReason {
1181 fn with_reason(self, reason: String) -> CancellableID;
1183}
1184#[derive(Debug, Clone)]
1185pub(crate) enum CancellableIDWithReason {
1186 ChildWorkflow { seqnum: u32 },
1187}
1188impl SupportsCancelReason for CancellableIDWithReason {
1189 fn with_reason(self, reason: String) -> CancellableID {
1190 match self {
1191 CancellableIDWithReason::ChildWorkflow { seqnum } => {
1192 CancellableID::ChildWorkflow { seqnum, reason }
1193 }
1194 }
1195 }
1196}
1197impl From<CancellableIDWithReason> for CancellableID {
1198 fn from(v: CancellableIDWithReason) -> Self {
1199 v.with_reason("".to_string())
1200 }
1201}
1202
1203#[derive(derive_more::From)]
1204#[allow(clippy::large_enum_variant)]
1205enum RustWfCmd {
1206 #[from(ignore)]
1207 Cancel(CancellableID),
1208 ForceWFTFailure(anyhow::Error),
1209 NewCmd(CommandCreateRequest),
1210 NewNonblockingCmd(workflow_command::Variant),
1211 SubscribeChildWorkflowCompletion(CommandSubscribeChildWorkflowCompletion),
1212 SubscribeNexusOperationCompletion {
1213 seq: u32,
1214 unblocker: oneshot::Sender<UnblockEvent>,
1215 },
1216}
1217
1218struct CommandCreateRequest {
1219 cmd: WorkflowCommand,
1220 unblocker: oneshot::Sender<UnblockEvent>,
1221}
1222
1223struct CommandSubscribeChildWorkflowCompletion {
1224 seq: u32,
1225 unblocker: oneshot::Sender<UnblockEvent>,
1226}
1227
1228pub type WorkflowResult<T> = Result<T, WorkflowTermination>;
1233
1234#[derive(Debug, thiserror::Error)]
1239pub enum WorkflowTermination {
1240 #[error("Workflow cancelled")]
1242 Cancelled,
1243
1244 #[error("Workflow evicted from cache")]
1246 Evicted,
1247
1248 #[error("Continue as new")]
1250 ContinueAsNew(Box<ContinueAsNewWorkflowExecution>),
1251
1252 #[error("Workflow failed: {0}")]
1254 Failed(#[source] anyhow::Error),
1255}
1256
1257impl WorkflowTermination {
1258 pub fn continue_as_new(can: ContinueAsNewWorkflowExecution) -> Self {
1260 Self::ContinueAsNew(Box::new(can))
1261 }
1262
1263 pub fn failed(err: impl Into<anyhow::Error>) -> Self {
1265 Self::Failed(err.into())
1266 }
1267}
1268
1269impl From<anyhow::Error> for WorkflowTermination {
1270 fn from(err: anyhow::Error) -> Self {
1271 Self::Failed(err)
1272 }
1273}
1274
1275impl From<ActivityExecutionError> for WorkflowTermination {
1276 fn from(value: ActivityExecutionError) -> Self {
1277 Self::failed(value)
1278 }
1279}
1280
1281impl From<ChildWorkflowExecutionError> for WorkflowTermination {
1282 fn from(value: ChildWorkflowExecutionError) -> Self {
1283 Self::failed(value)
1284 }
1285}
1286
1287impl From<ChildWorkflowSignalError> for WorkflowTermination {
1288 fn from(value: ChildWorkflowSignalError) -> Self {
1289 Self::failed(value)
1290 }
1291}
1292
1293#[derive(Debug)]
1295pub enum ActExitValue<T> {
1296 WillCompleteAsync,
1298 Normal(T),
1300}
1301
1302impl<T: AsJsonPayloadExt> From<T> for ActExitValue<T> {
1303 fn from(t: T) -> Self {
1304 Self::Normal(t)
1305 }
1306}
1307
1308fn panic_formatter(panic: Box<dyn Any>) -> Box<dyn Display> {
1310 _panic_formatter::<&str>(panic)
1311}
1312fn _panic_formatter<T: 'static + PrintablePanicType>(panic: Box<dyn Any>) -> Box<dyn Display> {
1313 match panic.downcast::<T>() {
1314 Ok(d) => d,
1315 Err(orig) => {
1316 if TypeId::of::<<T as PrintablePanicType>::NextType>()
1317 == TypeId::of::<EndPrintingAttempts>()
1318 {
1319 return Box::new("Couldn't turn panic into a string");
1320 }
1321 _panic_formatter::<T::NextType>(orig)
1322 }
1323 }
1324}
1325trait PrintablePanicType: Display {
1326 type NextType: PrintablePanicType;
1327}
1328impl PrintablePanicType for &str {
1329 type NextType = String;
1330}
1331impl PrintablePanicType for String {
1332 type NextType = EndPrintingAttempts;
1333}
1334struct EndPrintingAttempts {}
1335impl Display for EndPrintingAttempts {
1336 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1337 write!(f, "Will never be printed")
1338 }
1339}
1340impl PrintablePanicType for EndPrintingAttempts {
1341 type NextType = EndPrintingAttempts;
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346 use super::*;
1347 use temporalio_macros::{activities, workflow, workflow_methods};
1348
1349 struct MyActivities {}
1350
1351 #[activities]
1352 impl MyActivities {
1353 #[activity]
1354 async fn my_activity(_ctx: ActivityContext) -> Result<(), ActivityError> {
1355 Ok(())
1356 }
1357
1358 #[activity]
1359 async fn takes_self(
1360 self: Arc<Self>,
1361 _ctx: ActivityContext,
1362 _: String,
1363 ) -> Result<(), ActivityError> {
1364 Ok(())
1365 }
1366 }
1367
1368 #[test]
1369 fn test_activity_registration() {
1370 let act_instance = MyActivities {};
1371 let _ = WorkerOptions::new("task_q").register_activities(act_instance);
1372 }
1373
1374 #[allow(unused, clippy::diverging_sub_expression)]
1376 fn test_activity_via_workflow_context() {
1377 let wf_ctx: WorkflowContext<MyWorkflow> = unimplemented!();
1378 wf_ctx.start_activity(
1379 MyActivities::my_activity,
1380 (),
1381 ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1382 );
1383 wf_ctx.start_activity(
1384 MyActivities::takes_self,
1385 "Hi".to_owned(),
1386 ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1387 );
1388 }
1389
1390 #[allow(dead_code, unreachable_code, unused, clippy::diverging_sub_expression)]
1392 async fn test_activity_direct_invocation() {
1393 let ctx: ActivityContext = unimplemented!();
1394 let _result = MyActivities::my_activity.run(ctx).await;
1395 }
1396
1397 #[workflow]
1398 struct MyWorkflow {
1399 counter: u32,
1400 }
1401
1402 #[allow(dead_code)]
1403 #[workflow_methods]
1404 impl MyWorkflow {
1405 #[init]
1406 fn new(_ctx: &WorkflowContextView, _input: String) -> Self {
1407 Self { counter: 0 }
1408 }
1409
1410 #[run]
1411 async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
1412 Ok(format!("Counter: {}", ctx.state(|s| s.counter)))
1413 }
1414
1415 #[signal(name = "increment")]
1416 fn increment_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
1417 self.counter += amount;
1418 }
1419
1420 #[signal]
1421 async fn async_signal(_ctx: &mut WorkflowContext<Self>) {}
1422
1423 #[query]
1424 fn get_counter(&self, _ctx: &WorkflowContextView) -> u32 {
1425 self.counter
1426 }
1427
1428 #[update(name = "double")]
1429 fn double_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>) -> u32 {
1430 self.counter *= 2;
1431 self.counter
1432 }
1433
1434 #[update]
1435 async fn async_update(_ctx: &mut WorkflowContext<Self>, val: i32) -> i32 {
1436 val * 2
1437 }
1438 }
1439
1440 #[test]
1441 fn test_workflow_registration() {
1442 let _ = WorkerOptions::new("task_q").register_workflow::<MyWorkflow>();
1443 }
1444
1445 fn default_identity() -> String {
1446 format!(
1447 "{}@{}",
1448 std::process::id(),
1449 gethostname::gethostname().to_string_lossy()
1450 )
1451 }
1452
1453 #[rstest::rstest]
1454 #[case::default_when_none_provided(None, "", Some(default_identity()))]
1455 #[case::connection_identity_preserved(None, "conn-identity", None)]
1456 #[case::worker_override_takes_precedence(
1457 Some("worker-identity"),
1458 "conn-identity",
1459 Some("worker-identity".into())
1460 )]
1461 #[case::worker_override_with_empty_connection(
1462 Some("worker-identity"),
1463 "",
1464 Some("worker-identity".into())
1465 )]
1466 #[test]
1467 fn client_identity_resolution(
1468 #[case] worker_override: Option<&str>,
1469 #[case] connection_identity: &str,
1470 #[case] expected: Option<String>,
1471 ) {
1472 let opts = WorkerOptions::new("task_q")
1473 .task_types(WorkerTaskTypes::activity_only())
1474 .maybe_client_identity_override(worker_override.map(|s| s.to_owned()))
1475 .build();
1476 let config = opts
1477 .to_core_options("ns".into(), connection_identity.into())
1478 .unwrap();
1479 assert_eq!(config.client_identity_override, expected);
1480 }
1481}