1#![warn(missing_docs)] #[macro_use]
72extern crate tracing;
73extern crate self as temporalio_sdk;
74
75pub mod activities;
76pub mod error;
77pub mod interceptors;
78mod workflow_executor;
79mod workflow_future;
80mod workflow_registry;
81#[cfg(feature = "wasm-workflows")]
82mod workflow_wasm;
83pub mod workflows;
84
85pub use crate::error::{
86 ActivityExecutionError, ApplicationFailure, ChildWorkflowExecutionError,
87 ChildWorkflowStartError, OutgoingActivityError, OutgoingError, OutgoingWorkflowError,
88 WorkflowRegistrationError, WorkflowSignalError,
89};
90pub use temporalio_client::Namespace;
91pub use temporalio_workflow::{
92 ActivityCloseTimeouts, ActivityOptions, BaseWorkflowContext, CancellableFuture,
93 ChildWorkflowOptions, ContinueAsNewOptions, ContinueAsNewVersioningBehavior,
94 ExternalWorkflowHandle, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo,
95 RootWorkflowInfo, Signal, SignalData, StartChildWorkflowExecutionFailedCause,
96 StartedChildWorkflow, SyncWorkflowContext, TimerOptions, TimerResult, WorkflowContext,
97 WorkflowContextView, WorkflowResult, WorkflowTermination,
98};
99#[cfg(feature = "wasm-workflows")]
100pub use workflow_wasm::WasmWorkflowComponent;
101
102use crate::{
103 activities::{
104 ActivityContext, ActivityDefinitions, ActivityImplementer, ExecutableActivity,
105 activity_error_to_core_result,
106 },
107 interceptors::{ActivityInboundInterceptor, WorkerInterceptor},
108 workflow_executor::{TaskHandle, WorkflowExecutor},
109 workflow_future::start_workflow,
110 workflow_registry::WorkflowDefinitions,
111};
112use anyhow::{Context, anyhow, bail};
113use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
114use std::{
115 any::{Any, TypeId},
116 cell::RefCell,
117 collections::{HashMap, HashSet},
118 fmt::{Debug, Display, Formatter},
119 future::Future,
120 sync::Arc,
121 time::Duration,
122};
123use temporalio_client::{Client, ClientOptions, NamespacedClient};
124use temporalio_common::{
125 ActivityDefinition, WorkflowDefinition,
126 data_converters::{DataConverter, SerializationContext, SerializationContextData},
127 payload_visitor::{decode_payloads, encode_payloads},
128 protos::{
129 TaskToken,
130 coresdk::{
131 ActivityTaskCompletion, AsJsonPayloadExt,
132 activity_result::ActivityExecutionResult,
133 activity_task::{ActivityTask, activity_task},
134 workflow_activation::{WorkflowActivation, workflow_activation_job::Variant},
135 workflow_completion::WorkflowActivationCompletion,
136 },
137 temporal::api::{common::v1::Payload, enums::v1::WorkflowTaskFailedCause},
138 },
139 worker::{WorkerDeploymentOptions, WorkerTaskTypes, build_id_from_current_exe},
140};
141use temporalio_sdk_core::{
142 CoreRuntime, PollError, PollerBehavior, TunerBuilder, Worker as CoreWorker, WorkerConfig,
143 WorkerTuner, WorkerVersioningStrategy, WorkflowErrorType, init_worker,
144};
145use temporalio_workflow::runtime::entry::WorkflowImplementation;
146use tokio::sync::{
147 Notify,
148 mpsc::{UnboundedSender, unbounded_channel},
149};
150use tokio_stream::wrappers::UnboundedReceiverStream;
151use tokio_util::sync::CancellationToken;
152use tracing::{Instrument, Span, field};
153use uuid::Uuid;
154
155#[derive(bon::Builder, Clone)]
157#[builder(start_fn = new, on(String, into), state_mod(vis = "pub"))]
158#[non_exhaustive]
159pub struct WorkerOptions {
160 #[builder(start_fn)]
163 pub task_queue: String,
164
165 #[builder(field)]
166 activities: ActivityDefinitions,
167
168 #[builder(field)]
169 workflows: WorkflowDefinitions,
170
171 #[cfg(feature = "wasm-workflows")]
172 #[builder(field)]
173 wasm_workflow_components: Vec<WasmWorkflowComponent>,
174
175 #[builder(default = def_build_id())]
178 pub deployment_options: WorkerDeploymentOptions,
179 pub client_identity_override: Option<String>,
183 #[builder(default = 1000)]
189 pub max_cached_workflows: usize,
190 #[builder(default = Arc::new(TunerBuilder::default().build()))]
193 pub tuner: Arc<dyn WorkerTuner + Send + Sync>,
194 #[builder(default = PollerBehavior::SimpleMaximum(5))]
198 pub workflow_task_poller_behavior: PollerBehavior,
199 #[builder(default = 0.2)]
207 pub nonsticky_to_sticky_poll_ratio: f32,
208 #[builder(default = PollerBehavior::SimpleMaximum(5))]
210 pub activity_task_poller_behavior: PollerBehavior,
211 #[builder(default = PollerBehavior::SimpleMaximum(5))]
213 pub nexus_task_poller_behavior: PollerBehavior,
214 #[builder(default = WorkerTaskTypes::all())]
220 pub task_types: WorkerTaskTypes,
221 #[builder(default = Duration::from_secs(10))]
224 pub sticky_queue_schedule_to_start_timeout: Duration,
225 #[builder(default = Duration::from_secs(60))]
227 pub max_heartbeat_throttle_interval: Duration,
228 #[builder(default = Duration::from_secs(30))]
233 pub default_heartbeat_throttle_interval: Duration,
234 pub max_task_queue_activities_per_second: Option<f64>,
241 pub max_worker_activities_per_second: Option<f64>,
246 #[builder(default)]
249 pub workflow_failure_errors: HashSet<WorkflowErrorType>,
250 #[builder(default)]
253 pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
254 pub graceful_shutdown_period: Option<Duration>,
257 #[builder(default = true)]
261 pub detect_nondeterministic_futures: bool,
262}
263
264impl<S: worker_options_builder::State> WorkerOptionsBuilder<S> {
265 pub fn register_activities<AI: ActivityImplementer>(mut self, instance: AI) -> Self {
267 self.activities.register_activities::<AI>(instance);
268 self
269 }
270 pub fn register_activity<AD>(mut self, instance: Arc<AD::Implementer>) -> Self
272 where
273 AD: ActivityDefinition + ExecutableActivity,
274 AD::Input: Send + Sync,
275 AD::Output: Send + Sync,
276 {
277 self.activities.register_activity::<AD>(instance);
278 self
279 }
280
281 pub fn register_workflow<W>(mut self) -> Result<Self, WorkflowRegistrationError>
283 where
284 W: WorkflowImplementation,
285 <W::Run as WorkflowDefinition>::Input: Send,
286 {
287 self.workflows.register_workflow::<W>()?;
288 Ok(self)
289 }
290
291 pub fn register_workflow_with_factory<W, F>(
309 mut self,
310 factory: F,
311 ) -> Result<Self, WorkflowRegistrationError>
312 where
313 W: WorkflowImplementation,
314 <W::Run as WorkflowDefinition>::Input: Send,
315 F: Fn() -> W + Send + Sync + 'static,
316 {
317 self.workflows
318 .register_workflow_run_with_factory::<W, F>(factory)?;
319 Ok(self)
320 }
321
322 #[cfg(feature = "wasm-workflows")]
324 pub fn register_wasm_workflow(mut self, component: WasmWorkflowComponent) -> Self {
325 self.wasm_workflow_components.push(component);
326 self
327 }
328}
329
330fn def_build_id() -> WorkerDeploymentOptions {
332 WorkerDeploymentOptions::from_build_id(build_id_from_current_exe().to_owned())
333}
334
335impl WorkerOptions {
336 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
338 self.activities.register_activities::<AI>(instance);
339 self
340 }
341 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
343 where
344 AD: ActivityDefinition + ExecutableActivity,
345 AD::Input: Send + Sync,
346 AD::Output: Send + Sync,
347 {
348 self.activities.register_activity::<AD>(instance);
349 self
350 }
351 pub fn activities(&self) -> ActivityDefinitions {
353 self.activities.clone()
354 }
355
356 pub fn register_workflow<W>(&mut self) -> Result<&mut Self, WorkflowRegistrationError>
358 where
359 W: WorkflowImplementation,
360 <W::Run as WorkflowDefinition>::Input: Send,
361 {
362 self.workflows.register_workflow::<W>()?;
363 Ok(self)
364 }
365
366 pub fn register_workflow_with_factory<W, F>(
371 &mut self,
372 factory: F,
373 ) -> Result<&mut Self, WorkflowRegistrationError>
374 where
375 W: WorkflowImplementation,
376 <W::Run as WorkflowDefinition>::Input: Send,
377 F: Fn() -> W + Send + Sync + 'static,
378 {
379 self.workflows
380 .register_workflow_run_with_factory::<W, F>(factory)?;
381 Ok(self)
382 }
383
384 #[cfg(feature = "wasm-workflows")]
386 pub fn register_wasm_workflow(&mut self, component: WasmWorkflowComponent) -> &mut Self {
387 self.wasm_workflow_components.push(component);
388 self
389 }
390
391 pub fn workflows(&self) -> WorkflowDefinitions {
393 self.workflows.clone()
394 }
395
396 #[doc(hidden)]
397 pub fn to_core_options(
398 &self,
399 namespace: String,
400 connection_identity: String,
401 ) -> Result<WorkerConfig, String> {
402 WorkerConfig::builder()
403 .namespace(namespace)
404 .task_queue(self.task_queue.clone())
405 .maybe_client_identity_override(self.client_identity_override.clone().or_else(|| {
406 connection_identity.is_empty().then(|| {
407 format!(
408 "{}@{}",
409 std::process::id(),
410 gethostname::gethostname().to_string_lossy()
411 )
412 })
413 }))
414 .max_cached_workflows(self.max_cached_workflows)
415 .tuner(self.tuner.clone())
416 .workflow_task_poller_behavior(self.workflow_task_poller_behavior)
417 .activity_task_poller_behavior(self.activity_task_poller_behavior)
418 .nexus_task_poller_behavior(self.nexus_task_poller_behavior)
419 .task_types(self.task_types)
420 .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
421 .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
422 .default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
423 .maybe_max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
424 .maybe_max_worker_activities_per_second(self.max_worker_activities_per_second)
425 .maybe_graceful_shutdown_period(self.graceful_shutdown_period)
426 .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
427 self.deployment_options.clone(),
428 ))
429 .workflow_failure_errors(self.workflow_failure_errors.clone())
430 .workflow_types_to_failure_errors(self.workflow_types_to_failure_errors.clone())
431 .build()
432 }
433}
434
435pub struct Worker {
439 common: CommonWorker,
440 workflow_half: WorkflowHalf,
441 activity_half: ActivityHalf,
442}
443
444struct CommonWorker {
445 worker: Arc<CoreWorker>,
446 task_queue: String,
447 worker_interceptor: Option<Box<dyn WorkerInterceptor>>,
448 activity_inbound_interceptors: Vec<Arc<dyn ActivityInboundInterceptor>>,
449 client_options: ClientOptions,
450 data_converter: DataConverter,
451}
452
453struct WorkflowHalf {
454 workflows: RefCell<HashMap<String, WorkflowData>>,
456 workflow_definitions: WorkflowDefinitions,
457 workflow_removed_from_map: Notify,
458 detect_nondeterministic_futures: bool,
459}
460struct WorkflowData {
461 activation_chan: UnboundedSender<WorkflowActivation>,
463}
464
465struct WorkflowFutureHandle<F: Future> {
466 join_handle: F,
467 run_id: String,
468}
469
470#[derive(Default)]
471struct ActivityHalf {
472 activities: ActivityDefinitions,
474 task_tokens_to_cancels: HashMap<TaskToken, CancellationToken>,
475}
476
477#[derive(Debug, thiserror::Error)]
478enum ActivityTaskHandlerError {
479 #[error("{source}")]
480 UnregisteredActivity {
481 source: ActivityNotRegisteredError,
482 task_token: Vec<u8>,
483 },
484 #[error(transparent)]
485 Fatal(#[from] anyhow::Error),
486}
487
488#[derive(Debug, thiserror::Error)]
489enum ActivityNotRegisteredError {
490 #[error(
491 "Activity {activity_type} is not registered on this worker, available activities: {}",
492 .available_activities.join(", ")
493 )]
494 HasAvailable {
495 activity_type: String,
496 available_activities: Vec<&'static str>,
497 },
498 #[error("Activity {activity_type} is not registered on this worker, no available activities.")]
499 NoAvailable { activity_type: String },
500}
501
502impl ActivityNotRegisteredError {
503 fn new(activity_type: String, available_activities: Vec<&'static str>) -> Self {
504 if available_activities.is_empty() {
505 Self::NoAvailable { activity_type }
506 } else {
507 Self::HasAvailable {
508 activity_type,
509 available_activities,
510 }
511 }
512 }
513}
514
515impl Worker {
516 pub fn new(
518 runtime: &CoreRuntime,
519 client: Client,
520 options: WorkerOptions,
521 ) -> Result<Self, Box<dyn std::error::Error>> {
522 let wc = options
523 .to_core_options(client.namespace(), client.identity())
524 .map_err(|s| anyhow::anyhow!("{s}"))?;
525 let core = init_worker(runtime, wc, client.connection().clone())?;
526 Self::new_from_core_options(Arc::new(core), client.options().clone(), options)
527 }
528
529 #[doc(hidden)]
531 pub fn new_from_core(worker: Arc<CoreWorker>, data_converter: DataConverter) -> Self {
532 let client_options = ClientOptions::new(worker.get_config().namespace.clone())
533 .data_converter(data_converter)
534 .build();
535 Self::new_from_core_definitions(
536 worker,
537 client_options,
538 Default::default(),
539 Default::default(),
540 )
541 }
542
543 #[doc(hidden)]
545 pub fn new_from_core_options(
546 worker: Arc<CoreWorker>,
547 client_options: ClientOptions,
548 mut options: WorkerOptions,
549 ) -> Result<Self, Box<dyn std::error::Error>> {
550 let acts = std::mem::take(&mut options.activities);
551 let wfs = std::mem::take(&mut options.workflows);
552 #[cfg(feature = "wasm-workflows")]
553 let wasm_components = std::mem::take(&mut options.wasm_workflow_components);
554 let mut me = Self::new_from_core_definitions(worker, client_options, acts, wfs);
555 me.set_detect_nondeterministic_futures(options.detect_nondeterministic_futures);
556 #[cfg(feature = "wasm-workflows")]
557 me.workflow_half
558 .workflow_definitions
559 .register_wasm_workflows(wasm_components)?;
560 Ok(me)
561 }
562
563 fn new_from_core_definitions(
564 worker: Arc<CoreWorker>,
565 client_options: ClientOptions,
566 activities: ActivityDefinitions,
567 workflows: WorkflowDefinitions,
568 ) -> Self {
569 let data_converter = client_options.data_converter.clone();
570 Self {
571 common: CommonWorker {
572 task_queue: worker.get_config().task_queue.clone(),
573 worker,
574 worker_interceptor: None,
575 activity_inbound_interceptors: Vec::new(),
576 client_options,
577 data_converter,
578 },
579 workflow_half: WorkflowHalf {
580 workflows: Default::default(),
581 workflow_definitions: workflows,
582 workflow_removed_from_map: Default::default(),
583 detect_nondeterministic_futures: false,
584 },
585 activity_half: ActivityHalf {
586 activities,
587 ..Default::default()
588 },
589 }
590 }
591
592 pub fn task_queue(&self) -> &str {
594 &self.common.task_queue
595 }
596
597 #[doc(hidden)]
598 pub fn set_detect_nondeterministic_futures(&mut self, enabled: bool) {
601 self.workflow_half.detect_nondeterministic_futures = enabled;
602 }
603
604 pub fn shutdown_handle(&self) -> impl Fn() + use<> {
607 let w = self.common.worker.clone();
608 move || w.initiate_shutdown()
609 }
610
611 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
613 self.activity_half
614 .activities
615 .register_activities::<AI>(instance);
616 self
617 }
618 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
620 where
621 AD: ActivityDefinition + ExecutableActivity,
622 AD::Input: Send + Sync,
623 AD::Output: Send + Sync,
624 {
625 self.activity_half
626 .activities
627 .register_activity::<AD>(instance);
628 self
629 }
630
631 pub fn register_workflow<W>(&mut self) -> Result<&mut Self, WorkflowRegistrationError>
633 where
634 W: WorkflowImplementation,
635 <W::Run as WorkflowDefinition>::Input: Send,
636 {
637 self.workflow_half
638 .workflow_definitions
639 .register_workflow::<W>()?;
640 Ok(self)
641 }
642
643 pub fn register_workflow_with_factory<W, F>(
647 &mut self,
648 factory: F,
649 ) -> Result<&mut Self, WorkflowRegistrationError>
650 where
651 W: WorkflowImplementation,
652 <W::Run as WorkflowDefinition>::Input: Send,
653 F: Fn() -> W + Send + Sync + 'static,
654 {
655 self.workflow_half
656 .workflow_definitions
657 .register_workflow_run_with_factory::<W, F>(factory)?;
658 Ok(self)
659 }
660
661 pub async fn run(&mut self) -> Result<(), anyhow::Error> {
664 let shutdown_token = CancellationToken::new();
665 let (common, wf_half, act_half) = self.split_apart();
666 let (wf_future_tx, wf_future_rx) =
667 unbounded_channel::<WorkflowFutureHandle<TaskHandle<WorkflowResult<Payload>>>>();
668 let (completions_tx, completions_rx) = unbounded_channel();
669
670 let workflow_local_set = tokio::task::LocalSet::new();
675 let executor = WorkflowExecutor::new();
676
677 let wf_future_joiner = async {
678 UnboundedReceiverStream::new(wf_future_rx)
679 .map(Result::<_, anyhow::Error>::Ok)
680 .try_for_each_concurrent(
681 None,
682 |WorkflowFutureHandle {
683 join_handle,
684 run_id,
685 }| {
686 let wf_half = &*wf_half;
687 async move {
688 let result = join_handle.await.map_err(anyhow::Error::new)?;
689 if let Err(e) = result
692 && !matches!(e, WorkflowTermination::Evicted)
693 {
694 return Err(anyhow::Error::new(e));
695 }
696 debug!(run_id=%run_id, "Removing workflow from cache");
697 wf_half.workflows.borrow_mut().remove(&run_id);
698 wf_half.workflow_removed_from_map.notify_one();
699 Ok(())
700 }
701 },
702 )
703 .await
704 .context("Workflow futures encountered an error")
705 };
706 let wf_completion_processor = async {
707 UnboundedReceiverStream::new(completions_rx)
708 .map(Ok)
709 .try_for_each_concurrent(None, |mut completion| async {
710 encode_payloads(
711 &mut completion,
712 common.data_converter.codec(),
713 &SerializationContextData::Workflow,
714 )
715 .await;
716 if let Some(ref i) = common.worker_interceptor {
717 i.on_workflow_activation_completion(&completion).await;
718 }
719 common.worker.complete_workflow_activation(completion).await
720 })
721 .map_err(anyhow::Error::from)
722 .await
723 .context("Workflow completions processor encountered an error")
724 };
725 tokio::try_join!(
726 async {
728 workflow_local_set.run_until(async {
729 tokio::try_join!(
730 async {
732 loop {
733 let mut activation =
734 match common.worker.poll_workflow_activation().await {
735 Err(PollError::ShutDown) => {
736 break;
737 }
738 o => o?,
739 };
740 decode_payloads(
741 &mut activation,
742 common.data_converter.codec(),
743 &SerializationContextData::Workflow,
744 )
745 .await;
746 if let Some(ref i) = common.worker_interceptor {
747 i.on_workflow_activation(&activation).await?;
748 }
749 if let Some(wf_fut) = wf_half
750 .workflow_activation_handler(
751 common,
752 shutdown_token.clone(),
753 activation,
754 &completions_tx,
755 &executor,
756 )
757 .await?
758 && wf_future_tx.send(wf_fut).is_err()
759 {
760 panic!(
761 "Receive half of completion processor channel cannot be dropped"
762 );
763 }
764 executor.process_tasks();
767 }
768 shutdown_token.cancel();
770 drop(wf_future_tx);
773 drop(completions_tx);
774 executor.shutdown().await;
775 Result::<_, anyhow::Error>::Ok(())
776 },
777 wf_future_joiner,
778 )
779 }).await
780 },
781 async {
784 if !act_half.activities.is_empty() {
785 loop {
786 let activity = common.worker.poll_activity_task().await;
787 if matches!(activity, Err(PollError::ShutDown)) {
788 break;
789 }
790 let mut activity = activity?;
791 decode_payloads(
792 &mut activity,
793 common.data_converter.codec(),
794 &SerializationContextData::Activity,
795 )
796 .await;
797 match act_half.activity_task_handler(
798 common.worker.clone(),
799 common.client_options.clone(),
800 common.task_queue.clone(),
801 common.data_converter.clone(),
802 common.activity_inbound_interceptors.clone(),
803 activity,
804 ) {
805 Ok(()) => {}
806 Err(ActivityTaskHandlerError::UnregisteredActivity {
807 source,
808 task_token,
809 }) => {
810 let failure = common.data_converter.to_failure(
811 &SerializationContextData::Activity,
812 OutgoingError::Activity(OutgoingActivityError::Application(
813 ApplicationFailure::builder(source)
814 .type_name("NotFoundError".to_owned())
815 .build()
816 .into(),
817 )),
818 );
819 let mut completion = ActivityTaskCompletion {
820 task_token,
821 result: Some(ActivityExecutionResult::fail(failure)),
822 };
823 encode_payloads(
824 &mut completion,
825 common.data_converter.codec(),
826 &SerializationContextData::Activity,
827 )
828 .await;
829 common.worker.complete_activity_task(completion).await?;
830 }
831 Err(ActivityTaskHandlerError::Fatal(err)) => return Err(err),
832 };
833 }
834 };
835 Result::<_, anyhow::Error>::Ok(())
836 },
837 wf_completion_processor,
838 )?;
839
840 if let Some(i) = self.common.worker_interceptor.as_ref() {
841 i.on_shutdown(self);
842 }
843 self.common.worker.shutdown().await;
844 Ok(())
845 }
846
847 pub fn set_worker_interceptor(&mut self, interceptor: impl WorkerInterceptor + 'static) {
849 self.common.worker_interceptor = Some(Box::new(interceptor));
850 }
851
852 pub fn add_activity_inbound_interceptor(
855 &mut self,
856 interceptor: impl ActivityInboundInterceptor,
857 ) {
858 self.common
859 .activity_inbound_interceptors
860 .push(Arc::new(interceptor));
861 }
862
863 pub fn with_new_core_worker(&mut self, new_core_worker: Arc<CoreWorker>) {
867 self.common.worker = new_core_worker;
868 }
869
870 pub fn cached_workflows(&self) -> usize {
873 self.workflow_half.workflows.borrow().len()
874 }
875
876 pub fn worker_instance_key(&self) -> Uuid {
878 self.common.worker.worker_instance_key()
879 }
880
881 #[doc(hidden)]
882 pub fn core_worker(&self) -> Arc<CoreWorker> {
883 self.common.worker.clone()
884 }
885
886 fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) {
887 (
888 &mut self.common,
889 &mut self.workflow_half,
890 &mut self.activity_half,
891 )
892 }
893}
894
895impl WorkflowHalf {
896 #[allow(clippy::type_complexity)]
897 async fn workflow_activation_handler(
898 &self,
899 common: &CommonWorker,
900 shutdown_token: CancellationToken,
901 mut activation: WorkflowActivation,
902 completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
903 executor: &WorkflowExecutor,
904 ) -> Result<Option<WorkflowFutureHandle<TaskHandle<WorkflowResult<Payload>>>>, anyhow::Error>
905 {
906 let mut res = None;
907 let run_id = activation.run_id.clone();
908
909 if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
912 Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
913 _ => None,
914 }) {
915 let workflow_type = sw.workflow_type.clone();
916 let (wff, activations) = {
917 if let Some(factory) = self.workflow_definitions.get_workflow(&workflow_type) {
918 match start_workflow(
919 factory,
920 common.worker.get_config().namespace.clone(),
921 common.task_queue.clone(),
922 run_id.clone(),
923 std::mem::take(sw),
924 completions_tx.clone(),
925 common.data_converter.clone(),
926 self.detect_nondeterministic_futures,
927 ) {
928 Ok(result) => result,
929 Err(e) => {
930 warn!("Failed to create workflow {workflow_type}: {e}");
931 completions_tx
932 .send(WorkflowActivationCompletion::fail(
933 run_id,
934 format!("Failed to create workflow: {e}").into(),
935 Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
936 ))
937 .expect("Completion channel intact");
938 return Ok(None);
939 }
940 }
941 } else {
942 warn!("Workflow type {workflow_type} not found");
943 completions_tx
944 .send(WorkflowActivationCompletion::fail(
945 run_id,
946 format!("Workflow type {workflow_type} not found").into(),
947 Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
948 ))
949 .expect("Completion channel intact");
950 return Ok(None);
951 }
952 };
953 let jh = executor.spawn(async move {
955 tokio::select! {
956 r = wff.fuse() => r,
957 _ = shutdown_token.cancelled() => {
960 Err(WorkflowTermination::Evicted)
961 }
962 }
963 });
964 res = Some(WorkflowFutureHandle {
965 join_handle: jh,
966 run_id: run_id.clone(),
967 });
968 loop {
969 if self.workflows.borrow_mut().contains_key(&run_id) {
974 self.workflow_removed_from_map.notified().await;
975 } else {
976 break;
977 }
978 }
979 self.workflows.borrow_mut().insert(
980 run_id.clone(),
981 WorkflowData {
982 activation_chan: activations,
983 },
984 );
985 }
986
987 if let Some(dat) = self.workflows.borrow_mut().get_mut(&run_id) {
990 dat.activation_chan
991 .send(activation)
992 .expect("Workflow should exist if we're sending it an activation");
993 } else {
994 if activation.jobs.len() == 1
1000 && matches!(
1001 activation.jobs.first().map(|j| &j.variant),
1002 Some(Some(Variant::RemoveFromCache(_)))
1003 )
1004 {
1005 completions_tx
1006 .send(WorkflowActivationCompletion::from_cmds(run_id, vec![]))
1007 .expect("Completion channel intact");
1008 return Ok(None);
1009 }
1010
1011 bail!("Got activation {activation:?} for unknown workflow {run_id}");
1014 };
1015
1016 Ok(res)
1017 }
1018}
1019
1020impl ActivityHalf {
1021 fn activity_task_handler(
1023 &mut self,
1024 worker: Arc<CoreWorker>,
1025 client_options: ClientOptions,
1026 task_queue: String,
1027 data_converter: DataConverter,
1028 activity_inbound_interceptors: Vec<Arc<dyn ActivityInboundInterceptor>>,
1029 activity: ActivityTask,
1030 ) -> Result<(), ActivityTaskHandlerError> {
1031 match activity.variant {
1032 Some(activity_task::Variant::Start(start)) => {
1033 let Some(act_fn) = self.activities.get(&start.activity_type) else {
1034 let activity_type = start.activity_type.clone();
1035 let source =
1036 ActivityNotRegisteredError::new(activity_type, self.activities.names());
1037 return Err(ActivityTaskHandlerError::UnregisteredActivity {
1038 source,
1039 task_token: activity.task_token,
1040 });
1041 };
1042 let span = info_span!(
1043 "RunActivity",
1044 "otel.name" = format!("RunActivity:{}", start.activity_type),
1045 "otel.kind" = "server",
1046 "temporalActivityID" = start.activity_id,
1047 "temporalWorkflowID" = field::Empty,
1048 "temporalRunID" = field::Empty,
1049 );
1050 let ct = CancellationToken::new();
1051 let task_token = activity.task_token;
1052 self.task_tokens_to_cancels
1053 .insert(task_token.clone().into(), ct.clone());
1054
1055 let (ctx, args) = ActivityContext::new(
1056 worker.clone(),
1057 client_options,
1058 ct,
1059 task_queue,
1060 task_token.clone(),
1061 start,
1062 );
1063 let codec_data_converter = data_converter.clone();
1064
1065 tokio::spawn(async move {
1066 let act_fut = async move {
1067 if let Some(info) = &ctx.info().workflow_execution {
1068 Span::current()
1069 .record("temporalWorkflowID", &info.workflow_id)
1070 .record("temporalRunID", &info.run_id);
1071 }
1072 (act_fn)(args, data_converter, ctx, activity_inbound_interceptors).await
1073 }
1074 .instrument(span);
1075 let result = act_fut.await;
1076 let result = match result {
1077 Ok(output) => {
1078 let pc = codec_data_converter.payload_converter();
1081 let ctx = SerializationContext {
1082 data: &SerializationContextData::Activity,
1083 converter: pc,
1084 };
1085 match output.serialize_payload(&ctx) {
1086 Ok(payload) => ActivityExecutionResult::ok(payload),
1087 Err(err) => {
1088 activity_error_to_core_result(&codec_data_converter, err.into())
1089 }
1090 }
1091 }
1092 Err(err) => activity_error_to_core_result(&codec_data_converter, err),
1093 };
1094 let mut completion = ActivityTaskCompletion {
1095 task_token,
1096 result: Some(result),
1097 };
1098 encode_payloads(
1099 &mut completion,
1100 codec_data_converter.codec(),
1101 &SerializationContextData::Activity,
1102 )
1103 .await;
1104 worker.complete_activity_task(completion).await?;
1105 Ok::<_, anyhow::Error>(())
1106 });
1107 }
1108 Some(activity_task::Variant::Cancel(_)) => {
1109 if let Some(ct) = self
1110 .task_tokens_to_cancels
1111 .get(activity.task_token.as_slice())
1112 {
1113 ct.cancel();
1114 }
1115 }
1116 None => {
1117 return Err(anyhow!("Undefined activity task variant").into());
1118 }
1119 }
1120 Ok(())
1121 }
1122}
1123
1124#[derive(Debug)]
1126pub enum ActExitValue<T> {
1127 WillCompleteAsync,
1129 Normal(T),
1131}
1132
1133impl<T: AsJsonPayloadExt> From<T> for ActExitValue<T> {
1134 fn from(t: T) -> Self {
1135 Self::Normal(t)
1136 }
1137}
1138
1139fn panic_formatter(panic: Box<dyn Any>) -> Box<dyn Display> {
1141 _panic_formatter::<&str>(panic)
1142}
1143fn _panic_formatter<T: 'static + PrintablePanicType>(panic: Box<dyn Any>) -> Box<dyn Display> {
1144 match panic.downcast::<T>() {
1145 Ok(d) => d,
1146 Err(orig) => {
1147 if TypeId::of::<<T as PrintablePanicType>::NextType>()
1148 == TypeId::of::<EndPrintingAttempts>()
1149 {
1150 return Box::new("Couldn't turn panic into a string");
1151 }
1152 _panic_formatter::<T::NextType>(orig)
1153 }
1154 }
1155}
1156trait PrintablePanicType: Display {
1157 type NextType: PrintablePanicType;
1158}
1159
1160impl PrintablePanicType for &str {
1161 type NextType = String;
1162}
1163impl PrintablePanicType for String {
1164 type NextType = EndPrintingAttempts;
1165}
1166struct EndPrintingAttempts {}
1167impl Display for EndPrintingAttempts {
1168 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1169 write!(f, "Will never be printed")
1170 }
1171}
1172impl PrintablePanicType for EndPrintingAttempts {
1173 type NextType = EndPrintingAttempts;
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178 use super::*;
1179 use crate::activities::ActivityError;
1180 use temporalio_macros::{activities, activity_definitions, workflow, workflow_methods};
1181
1182 struct MyActivities {}
1183
1184 struct SharedActivities;
1185 #[activity_definitions]
1186 impl SharedActivities {
1187 #[activity(name = "shared-greet")]
1188 fn greet(name: String) -> Result<String, ActivityError> {
1189 unimplemented!()
1190 }
1191 }
1192
1193 #[activities]
1194 impl MyActivities {
1195 #[activity]
1196 async fn my_activity(_ctx: ActivityContext) -> Result<(), ActivityError> {
1197 Ok(())
1198 }
1199
1200 #[activity(definition = shared_activities::Greet)]
1201 async fn greet(_ctx: ActivityContext, name: String) -> Result<String, ActivityError> {
1202 Ok(name)
1203 }
1204
1205 #[activity]
1206 async fn takes_self(
1207 self: Arc<Self>,
1208 _ctx: ActivityContext,
1209 _: String,
1210 ) -> Result<(), ActivityError> {
1211 Ok(())
1212 }
1213 }
1214
1215 #[test]
1216 fn test_activity_registration() {
1217 let act_instance = MyActivities {};
1218 let _ = WorkerOptions::new("task_q").register_activities(act_instance);
1219 }
1220
1221 #[allow(unused, clippy::diverging_sub_expression)]
1223 fn test_activity_via_workflow_context() {
1224 let wf_ctx: WorkflowContext<MyWorkflow> = unimplemented!();
1225 wf_ctx.start_activity(
1226 MyActivities::my_activity,
1227 (),
1228 ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1229 );
1230 wf_ctx.start_activity(
1231 SharedActivities::greet,
1232 "Hi".to_owned(),
1233 ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1234 );
1235 wf_ctx.start_activity(
1236 MyActivities::greet,
1237 "Hi".to_owned(),
1238 ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1239 );
1240 wf_ctx.start_activity(
1241 MyActivities::takes_self,
1242 "Hi".to_owned(),
1243 ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1244 );
1245 }
1246
1247 #[allow(dead_code, unreachable_code, unused, clippy::diverging_sub_expression)]
1249 async fn test_activity_direct_invocation() {
1250 let ctx: ActivityContext = unimplemented!();
1251 let _result = MyActivities::my_activity.run(ctx).await;
1252 }
1253
1254 #[workflow]
1255 struct MyWorkflow {
1256 counter: u32,
1257 }
1258
1259 #[allow(dead_code)]
1260 #[workflow_methods]
1261 impl MyWorkflow {
1262 #[init]
1263 fn new(_ctx: &WorkflowContextView, _input: String) -> Self {
1264 Self { counter: 0 }
1265 }
1266
1267 #[run]
1268 async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
1269 Ok(format!("Counter: {}", ctx.state(|s| s.counter)))
1270 }
1271
1272 #[signal(name = "increment")]
1273 fn increment_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
1274 self.counter += amount;
1275 }
1276
1277 #[signal]
1278 async fn async_signal(_ctx: &mut WorkflowContext<Self>) {}
1279
1280 #[query]
1281 fn get_counter(&self, _ctx: &WorkflowContextView) -> u32 {
1282 self.counter
1283 }
1284
1285 #[update(name = "double")]
1286 fn double_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>) -> u32 {
1287 self.counter *= 2;
1288 self.counter
1289 }
1290
1291 #[update]
1292 async fn async_update(_ctx: &mut WorkflowContext<Self>, val: i32) -> i32 {
1293 val * 2
1294 }
1295 }
1296
1297 #[test]
1298 fn test_workflow_registration() {
1299 let _ = WorkerOptions::new("task_q")
1300 .register_workflow::<MyWorkflow>()
1301 .unwrap();
1302 }
1303
1304 #[test]
1305 fn duplicate_workflow_registration_errors() {
1306 let result = WorkerOptions::new("task_q")
1307 .register_workflow::<MyWorkflow>()
1308 .unwrap()
1309 .register_workflow::<MyWorkflow>();
1310
1311 let err = match result {
1312 Ok(_) => panic!("duplicate workflow registration should error"),
1313 Err(err) => err,
1314 };
1315 assert_eq!(
1316 err,
1317 WorkflowRegistrationError::DuplicateWorkflowType {
1318 workflow_type: "MyWorkflow".to_string()
1319 }
1320 );
1321 }
1322
1323 #[test]
1324 fn factory_registration_with_init_errors() {
1325 let result = WorkerOptions::new("task_q")
1326 .register_workflow_with_factory(|| MyWorkflow { counter: 0 });
1327
1328 let err = match result {
1329 Ok(_) => panic!("factory registration with #[init] should error"),
1330 Err(err) => err,
1331 };
1332 assert_eq!(
1333 err,
1334 WorkflowRegistrationError::FactoryRegistrationWithInit {
1335 workflow_type: "MyWorkflow".to_string()
1336 }
1337 );
1338 }
1339
1340 fn default_identity() -> String {
1341 format!(
1342 "{}@{}",
1343 std::process::id(),
1344 gethostname::gethostname().to_string_lossy()
1345 )
1346 }
1347
1348 #[rstest::rstest]
1349 #[case::default_when_none_provided(None, "", Some(default_identity()))]
1350 #[case::connection_identity_preserved(None, "conn-identity", None)]
1351 #[case::worker_override_takes_precedence(
1352 Some("worker-identity"),
1353 "conn-identity",
1354 Some("worker-identity".into())
1355 )]
1356 #[case::worker_override_with_empty_connection(
1357 Some("worker-identity"),
1358 "",
1359 Some("worker-identity".into())
1360 )]
1361 #[test]
1362 fn client_identity_resolution(
1363 #[case] worker_override: Option<&str>,
1364 #[case] connection_identity: &str,
1365 #[case] expected: Option<String>,
1366 ) {
1367 let opts = WorkerOptions::new("task_q")
1368 .task_types(WorkerTaskTypes::activity_only())
1369 .maybe_client_identity_override(worker_override.map(|s| s.to_owned()))
1370 .build();
1371 let config = opts
1372 .to_core_options("ns".into(), connection_identity.into())
1373 .unwrap();
1374 assert_eq!(config.client_identity_override, expected);
1375 }
1376}