1#![warn(missing_docs)] #[macro_use]
71extern crate tracing;
72extern crate self as temporalio_sdk;
73
74pub mod activities;
75pub mod interceptors;
76mod workflow_context;
77mod workflow_future;
78pub mod workflows;
79
80#[macro_export]
81#[doc(hidden)]
82macro_rules! __temporal_select {
83 ($($tokens:tt)*) => {
84 ::futures_util::select_biased! { $($tokens)* }
85 };
86}
87
88#[macro_export]
89#[doc(hidden)]
90macro_rules! __temporal_join {
91 ($($tokens:tt)*) => {
92 ::futures_util::join!($($tokens)*)
93 };
94}
95
96use workflow_future::WorkflowFunction;
97
98pub use temporalio_client::Namespace;
99pub use workflow_context::{
100 ActivityExecutionError, ActivityOptions, BaseWorkflowContext, CancellableFuture, ChildWorkflow,
101 ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo,
102 PendingChildWorkflow, RootWorkflowInfo, Signal, SignalData, SignalWorkflowOptions,
103 StartedChildWorkflow, SyncWorkflowContext, TimerOptions, WorkflowContext, WorkflowContextView,
104};
105
106use crate::{
107 activities::{
108 ActivityContext, ActivityDefinitions, ActivityError, ActivityImplementer,
109 ExecutableActivity,
110 },
111 interceptors::WorkerInterceptor,
112 workflow_context::{ChildWfCommon, NexusUnblockData, StartedNexusOperation},
113 workflows::{WorkflowDefinitions, WorkflowImplementation, WorkflowImplementer},
114};
115use anyhow::{Context, anyhow, bail};
116use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
117use std::{
118 any::{Any, TypeId},
119 cell::RefCell,
120 collections::{HashMap, HashSet},
121 fmt::{Debug, Display, Formatter},
122 future::Future,
123 panic::AssertUnwindSafe,
124 sync::Arc,
125 time::Duration,
126};
127use temporalio_client::{Client, NamespacedClient};
128use temporalio_common::{
129 ActivityDefinition, WorkflowDefinition,
130 data_converters::{DataConverter, SerializationContextData},
131 payload_visitor::{decode_payloads, encode_payloads},
132 protos::{
133 TaskToken,
134 coresdk::{
135 ActivityTaskCompletion, AsJsonPayloadExt,
136 activity_result::{ActivityExecutionResult, ActivityResolution},
137 activity_task::{ActivityTask, activity_task},
138 child_workflow::ChildWorkflowResult,
139 common::NamespacedWorkflowExecution,
140 nexus::NexusOperationResult,
141 workflow_activation::{
142 WorkflowActivation,
143 resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
144 resolve_nexus_operation_start, workflow_activation_job::Variant,
145 },
146 workflow_commands::{
147 ContinueAsNewWorkflowExecution, WorkflowCommand, workflow_command,
148 },
149 workflow_completion::WorkflowActivationCompletion,
150 },
151 temporal::api::{
152 common::v1::Payload,
153 enums::v1::WorkflowTaskFailedCause,
154 failure::v1::{Failure, failure},
155 },
156 },
157 worker::{WorkerDeploymentOptions, WorkerTaskTypes, build_id_from_current_exe},
158};
159use temporalio_sdk_core::{
160 CoreRuntime, PollError, PollerBehavior, TunerBuilder, Worker as CoreWorker, WorkerConfig,
161 WorkerTuner, WorkerVersioningStrategy, WorkflowErrorType, init_worker,
162};
163use tokio::{
164 sync::{
165 Notify,
166 mpsc::{UnboundedSender, unbounded_channel},
167 oneshot,
168 },
169 task::JoinError,
170};
171use tokio_stream::wrappers::UnboundedReceiverStream;
172use tokio_util::sync::CancellationToken;
173use tracing::{Instrument, Span, field};
174use uuid::Uuid;
175
176#[derive(bon::Builder, Clone)]
178#[builder(start_fn = new, on(String, into), state_mod(vis = "pub"))]
179#[non_exhaustive]
180pub struct WorkerOptions {
181 #[builder(start_fn)]
184 pub task_queue: String,
185
186 #[builder(field)]
187 activities: ActivityDefinitions,
188
189 #[builder(field)]
190 workflows: WorkflowDefinitions,
191
192 #[builder(default = def_build_id())]
195 pub deployment_options: WorkerDeploymentOptions,
196 pub client_identity_override: Option<String>,
200 #[builder(default = 1000)]
206 pub max_cached_workflows: usize,
207 #[builder(default = Arc::new(TunerBuilder::default().build()))]
210 pub tuner: Arc<dyn WorkerTuner + Send + Sync>,
211 #[builder(default = PollerBehavior::SimpleMaximum(5))]
215 pub workflow_task_poller_behavior: PollerBehavior,
216 #[builder(default = 0.2)]
224 pub nonsticky_to_sticky_poll_ratio: f32,
225 #[builder(default = PollerBehavior::SimpleMaximum(5))]
227 pub activity_task_poller_behavior: PollerBehavior,
228 #[builder(default = PollerBehavior::SimpleMaximum(5))]
230 pub nexus_task_poller_behavior: PollerBehavior,
231 #[builder(default = WorkerTaskTypes::all())]
237 pub task_types: WorkerTaskTypes,
238 #[builder(default = Duration::from_secs(10))]
241 pub sticky_queue_schedule_to_start_timeout: Duration,
242 #[builder(default = Duration::from_secs(60))]
244 pub max_heartbeat_throttle_interval: Duration,
245 #[builder(default = Duration::from_secs(30))]
250 pub default_heartbeat_throttle_interval: Duration,
251 pub max_task_queue_activities_per_second: Option<f64>,
258 pub max_worker_activities_per_second: Option<f64>,
263 #[builder(default)]
266 pub workflow_failure_errors: HashSet<WorkflowErrorType>,
267 #[builder(default)]
270 pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
271 pub graceful_shutdown_period: Option<Duration>,
274}
275
276impl<S: worker_options_builder::State> WorkerOptionsBuilder<S> {
277 pub fn register_activities<AI: ActivityImplementer>(mut self, instance: AI) -> Self {
279 self.activities.register_activities::<AI>(instance);
280 self
281 }
282 pub fn register_activity<AD>(mut self, instance: Arc<AD::Implementer>) -> Self
284 where
285 AD: ActivityDefinition + ExecutableActivity,
286 AD::Output: Send + Sync,
287 {
288 self.activities.register_activity::<AD>(instance);
289 self
290 }
291
292 pub fn register_workflow<WI: WorkflowImplementer>(mut self) -> Self {
294 self.workflows.register_workflow::<WI>();
295 self
296 }
297
298 pub fn register_workflow_with_factory<W, F>(mut self, factory: F) -> Self
316 where
317 W: WorkflowImplementation,
318 <W::Run as WorkflowDefinition>::Input: Send,
319 F: Fn() -> W + Send + Sync + 'static,
320 {
321 self.workflows
322 .register_workflow_run_with_factory::<W, F>(factory);
323 self
324 }
325}
326
327fn def_build_id() -> WorkerDeploymentOptions {
329 WorkerDeploymentOptions::from_build_id(build_id_from_current_exe().to_owned())
330}
331
332impl WorkerOptions {
333 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
335 self.activities.register_activities::<AI>(instance);
336 self
337 }
338 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
340 where
341 AD: ActivityDefinition + ExecutableActivity,
342 AD::Output: Send + Sync,
343 {
344 self.activities.register_activity::<AD>(instance);
345 self
346 }
347 pub fn activities(&self) -> ActivityDefinitions {
349 self.activities.clone()
350 }
351
352 pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
354 self.workflows.register_workflow::<WI>();
355 self
356 }
357
358 pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
363 where
364 W: WorkflowImplementation,
365 <W::Run as WorkflowDefinition>::Input: Send,
366 F: Fn() -> W + Send + Sync + 'static,
367 {
368 self.workflows
369 .register_workflow_run_with_factory::<W, F>(factory);
370 self
371 }
372
373 pub fn workflows(&self) -> WorkflowDefinitions {
375 self.workflows.clone()
376 }
377
378 #[doc(hidden)]
379 pub fn to_core_options(
380 &self,
381 namespace: String,
382 connection_identity: String,
383 ) -> Result<WorkerConfig, String> {
384 WorkerConfig::builder()
385 .namespace(namespace)
386 .task_queue(self.task_queue.clone())
387 .maybe_client_identity_override(self.client_identity_override.clone().or_else(|| {
388 connection_identity.is_empty().then(|| {
389 format!(
390 "{}@{}",
391 std::process::id(),
392 gethostname::gethostname().to_string_lossy()
393 )
394 })
395 }))
396 .max_cached_workflows(self.max_cached_workflows)
397 .tuner(self.tuner.clone())
398 .workflow_task_poller_behavior(self.workflow_task_poller_behavior)
399 .activity_task_poller_behavior(self.activity_task_poller_behavior)
400 .nexus_task_poller_behavior(self.nexus_task_poller_behavior)
401 .task_types(self.task_types)
402 .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
403 .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
404 .default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
405 .maybe_max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
406 .maybe_max_worker_activities_per_second(self.max_worker_activities_per_second)
407 .maybe_graceful_shutdown_period(self.graceful_shutdown_period)
408 .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
409 self.deployment_options.clone(),
410 ))
411 .workflow_failure_errors(self.workflow_failure_errors.clone())
412 .workflow_types_to_failure_errors(self.workflow_types_to_failure_errors.clone())
413 .build()
414 }
415}
416
417pub struct Worker {
421 common: CommonWorker,
422 workflow_half: WorkflowHalf,
423 activity_half: ActivityHalf,
424}
425
426struct CommonWorker {
427 worker: Arc<CoreWorker>,
428 task_queue: String,
429 worker_interceptor: Option<Box<dyn WorkerInterceptor>>,
430 data_converter: DataConverter,
431}
432
433#[derive(Default)]
434struct WorkflowHalf {
435 workflows: RefCell<HashMap<String, WorkflowData>>,
437 workflow_definitions: WorkflowDefinitions,
438 workflow_removed_from_map: Notify,
439}
440struct WorkflowData {
441 activation_chan: UnboundedSender<WorkflowActivation>,
443}
444
445struct WorkflowFutureHandle<F: Future<Output = Result<WorkflowResult<Payload>, JoinError>>> {
446 join_handle: F,
447 run_id: String,
448}
449
450#[derive(Default)]
451struct ActivityHalf {
452 activities: ActivityDefinitions,
454 task_tokens_to_cancels: HashMap<TaskToken, CancellationToken>,
455}
456
457impl Worker {
458 pub fn new(
460 runtime: &CoreRuntime,
461 client: Client,
462 mut options: WorkerOptions,
463 ) -> Result<Self, Box<dyn std::error::Error>> {
464 let acts = std::mem::take(&mut options.activities);
465 let wfs = std::mem::take(&mut options.workflows);
466 let wc = options
467 .to_core_options(client.namespace(), client.identity())
468 .map_err(|s| anyhow::anyhow!("{s}"))?;
469 let core = init_worker(runtime, wc, client.connection().clone())?;
470 let mut me = Self::new_from_core_definitions(
471 Arc::new(core),
472 client.data_converter().clone(),
473 Default::default(),
474 Default::default(),
475 );
476 me.activity_half.activities = acts;
477 me.workflow_half.workflow_definitions = wfs;
478 Ok(me)
479 }
480
481 #[doc(hidden)]
483 pub fn new_from_core(worker: Arc<CoreWorker>, data_converter: DataConverter) -> Self {
484 Self::new_from_core_definitions(
485 worker,
486 data_converter,
487 Default::default(),
488 Default::default(),
489 )
490 }
491
492 #[doc(hidden)]
494 pub fn new_from_core_definitions(
495 worker: Arc<CoreWorker>,
496 data_converter: DataConverter,
497 activities: ActivityDefinitions,
498 workflows: WorkflowDefinitions,
499 ) -> Self {
500 Self {
501 common: CommonWorker {
502 task_queue: worker.get_config().task_queue.clone(),
503 worker,
504 worker_interceptor: None,
505 data_converter,
506 },
507 workflow_half: WorkflowHalf {
508 workflow_definitions: workflows,
509 ..Default::default()
510 },
511 activity_half: ActivityHalf {
512 activities,
513 ..Default::default()
514 },
515 }
516 }
517
518 pub fn task_queue(&self) -> &str {
520 &self.common.task_queue
521 }
522
523 pub fn shutdown_handle(&self) -> impl Fn() + use<> {
526 let w = self.common.worker.clone();
527 move || w.initiate_shutdown()
528 }
529
530 pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
532 self.activity_half
533 .activities
534 .register_activities::<AI>(instance);
535 self
536 }
537 pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
539 where
540 AD: ActivityDefinition + ExecutableActivity,
541 AD::Output: Send + Sync,
542 {
543 self.activity_half
544 .activities
545 .register_activity::<AD>(instance);
546 self
547 }
548
549 pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
551 self.workflow_half
552 .workflow_definitions
553 .register_workflow::<WI>();
554 self
555 }
556
557 pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
561 where
562 W: WorkflowImplementation,
563 <W::Run as WorkflowDefinition>::Input: Send,
564 F: Fn() -> W + Send + Sync + 'static,
565 {
566 self.workflow_half
567 .workflow_definitions
568 .register_workflow_run_with_factory::<W, F>(factory);
569 self
570 }
571
572 pub async fn run(&mut self) -> Result<(), anyhow::Error> {
575 let shutdown_token = CancellationToken::new();
576 let (common, wf_half, act_half) = self.split_apart();
577 let (wf_future_tx, wf_future_rx) = unbounded_channel();
578 let (completions_tx, completions_rx) = unbounded_channel();
579
580 let workflow_local_set = tokio::task::LocalSet::new();
583
584 let wf_future_joiner = async {
585 UnboundedReceiverStream::new(wf_future_rx)
586 .map(Result::<_, anyhow::Error>::Ok)
587 .try_for_each_concurrent(
588 None,
589 |WorkflowFutureHandle {
590 join_handle,
591 run_id,
592 }| {
593 let wf_half = &*wf_half;
594 async move {
595 let result = join_handle.await?;
596 if let Err(e) = result
599 && !matches!(e, WorkflowTermination::Evicted)
600 {
601 return Err(e.into());
602 }
603 debug!(run_id=%run_id, "Removing workflow from cache");
604 wf_half.workflows.borrow_mut().remove(&run_id);
605 wf_half.workflow_removed_from_map.notify_one();
606 Ok(())
607 }
608 },
609 )
610 .await
611 .context("Workflow futures encountered an error")
612 };
613 let wf_completion_processor = async {
614 UnboundedReceiverStream::new(completions_rx)
615 .map(Ok)
616 .try_for_each_concurrent(None, |mut completion| async {
617 encode_payloads(
618 &mut completion,
619 common.data_converter.codec(),
620 &SerializationContextData::Workflow,
621 )
622 .await;
623 if let Some(ref i) = common.worker_interceptor {
624 i.on_workflow_activation_completion(&completion).await;
625 }
626 common.worker.complete_workflow_activation(completion).await
627 })
628 .map_err(anyhow::Error::from)
629 .await
630 .context("Workflow completions processor encountered an error")
631 };
632 tokio::try_join!(
633 async {
635 workflow_local_set.run_until(async {
636 tokio::try_join!(
637 async {
639 loop {
640 let mut activation =
641 match common.worker.poll_workflow_activation().await {
642 Err(PollError::ShutDown) => {
643 break;
644 }
645 o => o?,
646 };
647 decode_payloads(
648 &mut activation,
649 common.data_converter.codec(),
650 &SerializationContextData::Workflow,
651 )
652 .await;
653 if let Some(ref i) = common.worker_interceptor {
654 i.on_workflow_activation(&activation).await?;
655 }
656 if let Some(wf_fut) = wf_half
657 .workflow_activation_handler(
658 common,
659 shutdown_token.clone(),
660 activation,
661 &completions_tx,
662 )
663 .await?
664 && wf_future_tx.send(wf_fut).is_err()
665 {
666 panic!(
667 "Receive half of completion processor channel cannot be dropped"
668 );
669 }
670 }
671 shutdown_token.cancel();
673 drop(wf_future_tx);
676 drop(completions_tx);
677 Result::<_, anyhow::Error>::Ok(())
678 },
679 wf_future_joiner,
680 )
681 }).await
682 },
683 async {
686 if !act_half.activities.is_empty() {
687 loop {
688 let activity = common.worker.poll_activity_task().await;
689 if matches!(activity, Err(PollError::ShutDown)) {
690 break;
691 }
692 let mut activity = activity?;
693 decode_payloads(
694 &mut activity,
695 common.data_converter.codec(),
696 &SerializationContextData::Activity,
697 )
698 .await;
699 act_half.activity_task_handler(
700 common.worker.clone(),
701 common.task_queue.clone(),
702 common.data_converter.clone(),
703 activity,
704 )?;
705 }
706 };
707 Result::<_, anyhow::Error>::Ok(())
708 },
709 wf_completion_processor,
710 )?;
711
712 if let Some(i) = self.common.worker_interceptor.as_ref() {
713 i.on_shutdown(self);
714 }
715 self.common.worker.shutdown().await;
716 Ok(())
717 }
718
719 pub fn set_worker_interceptor(&mut self, interceptor: impl WorkerInterceptor + 'static) {
721 self.common.worker_interceptor = Some(Box::new(interceptor));
722 }
723
724 pub fn with_new_core_worker(&mut self, new_core_worker: Arc<CoreWorker>) {
728 self.common.worker = new_core_worker;
729 }
730
731 pub fn cached_workflows(&self) -> usize {
734 self.workflow_half.workflows.borrow().len()
735 }
736
737 pub fn worker_instance_key(&self) -> Uuid {
739 self.common.worker.worker_instance_key()
740 }
741
742 #[doc(hidden)]
743 pub fn core_worker(&self) -> Arc<CoreWorker> {
744 self.common.worker.clone()
745 }
746
747 fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) {
748 (
749 &mut self.common,
750 &mut self.workflow_half,
751 &mut self.activity_half,
752 )
753 }
754}
755
756impl WorkflowHalf {
757 #[allow(clippy::type_complexity)]
758 async fn workflow_activation_handler(
759 &self,
760 common: &CommonWorker,
761 shutdown_token: CancellationToken,
762 mut activation: WorkflowActivation,
763 completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
764 ) -> Result<
765 Option<
766 WorkflowFutureHandle<
767 impl Future<Output = Result<WorkflowResult<Payload>, JoinError>> + use<>,
768 >,
769 >,
770 anyhow::Error,
771 > {
772 let mut res = None;
773 let run_id = activation.run_id.clone();
774
775 if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
778 Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
779 _ => None,
780 }) {
781 let workflow_type = sw.workflow_type.clone();
782 let payload_converter = common.data_converter.payload_converter().clone();
783 let (wff, activations) = {
784 if let Some(factory) = self.workflow_definitions.get_workflow(&workflow_type) {
785 match WorkflowFunction::from_invocation(factory).start_workflow(
786 common.worker.get_config().namespace.clone(),
787 common.task_queue.clone(),
788 run_id.clone(),
789 std::mem::take(sw),
790 completions_tx.clone(),
791 payload_converter,
792 ) {
793 Ok(result) => result,
794 Err(e) => {
795 warn!("Failed to create workflow {workflow_type}: {e}");
796 completions_tx
797 .send(WorkflowActivationCompletion::fail(
798 run_id,
799 format!("Failed to create workflow: {e}").into(),
800 Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
801 ))
802 .expect("Completion channel intact");
803 return Ok(None);
804 }
805 }
806 } else {
807 warn!("Workflow type {workflow_type} not found");
808 completions_tx
809 .send(WorkflowActivationCompletion::fail(
810 run_id,
811 format!("Workflow type {workflow_type} not found").into(),
812 Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
813 ))
814 .expect("Completion channel intact");
815 return Ok(None);
816 }
817 };
818 let wff = tokio::task::unconstrained(wff);
821 let jh = tokio::task::spawn_local(async move {
823 tokio::select! {
824 r = wff.fuse() => r,
825 _ = shutdown_token.cancelled() => {
828 Err(WorkflowTermination::Evicted)
829 }
830 }
831 });
832 res = Some(WorkflowFutureHandle {
833 join_handle: jh,
834 run_id: run_id.clone(),
835 });
836 loop {
837 if self.workflows.borrow_mut().contains_key(&run_id) {
842 self.workflow_removed_from_map.notified().await;
843 } else {
844 break;
845 }
846 }
847 self.workflows.borrow_mut().insert(
848 run_id.clone(),
849 WorkflowData {
850 activation_chan: activations,
851 },
852 );
853 }
854
855 if let Some(dat) = self.workflows.borrow_mut().get_mut(&run_id) {
858 dat.activation_chan
859 .send(activation)
860 .expect("Workflow should exist if we're sending it an activation");
861 } else {
862 if activation.jobs.len() == 1
868 && matches!(
869 activation.jobs.first().map(|j| &j.variant),
870 Some(Some(Variant::RemoveFromCache(_)))
871 )
872 {
873 completions_tx
874 .send(WorkflowActivationCompletion::from_cmds(run_id, vec![]))
875 .expect("Completion channel intact");
876 return Ok(None);
877 }
878
879 bail!("Got activation {activation:?} for unknown workflow {run_id}");
882 };
883
884 Ok(res)
885 }
886}
887
888impl ActivityHalf {
889 fn activity_task_handler(
891 &mut self,
892 worker: Arc<CoreWorker>,
893 task_queue: String,
894 data_converter: DataConverter,
895 activity: ActivityTask,
896 ) -> Result<(), anyhow::Error> {
897 match activity.variant {
898 Some(activity_task::Variant::Start(start)) => {
899 let act_fn = self.activities.get(&start.activity_type).ok_or_else(|| {
900 anyhow!(
901 "No function registered for activity type {}",
902 start.activity_type
903 )
904 })?;
905 let span = info_span!(
906 "RunActivity",
907 "otel.name" = format!("RunActivity:{}", start.activity_type),
908 "otel.kind" = "server",
909 "temporalActivityID" = start.activity_id,
910 "temporalWorkflowID" = field::Empty,
911 "temporalRunID" = field::Empty,
912 );
913 let ct = CancellationToken::new();
914 let task_token = activity.task_token;
915 self.task_tokens_to_cancels
916 .insert(task_token.clone().into(), ct.clone());
917
918 let (ctx, args) =
919 ActivityContext::new(worker.clone(), ct, task_queue, task_token.clone(), start);
920 let codec_data_converter = data_converter.clone();
921
922 tokio::spawn(async move {
923 let act_fut = async move {
924 if let Some(info) = &ctx.info().workflow_execution {
925 Span::current()
926 .record("temporalWorkflowID", &info.workflow_id)
927 .record("temporalRunID", &info.run_id);
928 }
929 (act_fn)(args, data_converter, ctx).await
930 }
931 .instrument(span);
932 let output = AssertUnwindSafe(act_fut).catch_unwind().await;
933 let result = match output {
934 Err(e) => ActivityExecutionResult::fail(Failure::application_failure(
935 format!("Activity function panicked: {}", panic_formatter(e)),
936 true,
937 )),
938 Ok(Ok(p)) => ActivityExecutionResult::ok(p),
939 Ok(Err(err)) => match err {
940 ActivityError::Retryable {
941 source,
942 explicit_delay,
943 } => ActivityExecutionResult::fail({
944 let mut f = Failure::application_failure_from_error(
945 anyhow::Error::from_boxed(source),
946 false,
947 );
948 if let Some(d) = explicit_delay
949 && let Some(failure::FailureInfo::ApplicationFailureInfo(fi)) =
950 f.failure_info.as_mut()
951 {
952 fi.next_retry_delay = d.try_into().ok();
953 }
954 f
955 }),
956 ActivityError::Cancelled { details } => {
957 ActivityExecutionResult::cancel_from_details(details)
958 }
959 ActivityError::NonRetryable(nre) => ActivityExecutionResult::fail(
960 Failure::application_failure_from_error(
961 anyhow::Error::from_boxed(nre),
962 true,
963 ),
964 ),
965 ActivityError::WillCompleteAsync => {
966 ActivityExecutionResult::will_complete_async()
967 }
968 },
969 };
970 let mut completion = ActivityTaskCompletion {
971 task_token,
972 result: Some(result),
973 };
974 encode_payloads(
975 &mut completion,
976 codec_data_converter.codec(),
977 &SerializationContextData::Activity,
978 )
979 .await;
980 worker.complete_activity_task(completion).await?;
981 Ok::<_, anyhow::Error>(())
982 });
983 }
984 Some(activity_task::Variant::Cancel(_)) => {
985 if let Some(ct) = self
986 .task_tokens_to_cancels
987 .get(activity.task_token.as_slice())
988 {
989 ct.cancel();
990 }
991 }
992 None => bail!("Undefined activity task variant"),
993 }
994 Ok(())
995 }
996}
997
998#[derive(Debug)]
999enum UnblockEvent {
1000 Timer(u32, TimerResult),
1001 Activity(u32, Box<ActivityResolution>),
1002 WorkflowStart(u32, Box<ChildWorkflowStartStatus>),
1003 WorkflowComplete(u32, Box<ChildWorkflowResult>),
1004 SignalExternal(u32, Option<Failure>),
1005 CancelExternal(u32, Option<Failure>),
1006 NexusOperationStart(u32, Box<resolve_nexus_operation_start::Status>),
1007 NexusOperationComplete(u32, Box<NexusOperationResult>),
1008}
1009
1010#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1012pub enum TimerResult {
1013 Cancelled,
1015 Fired,
1017}
1018
1019#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1021pub struct SignalExternalOk;
1022pub type SignalExternalWfResult = Result<SignalExternalOk, Failure>;
1024
1025#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1027pub struct CancelExternalOk;
1028pub type CancelExternalWfResult = Result<CancelExternalOk, Failure>;
1030
1031trait Unblockable {
1032 type OtherDat;
1033
1034 fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self;
1035}
1036
1037impl Unblockable for TimerResult {
1038 type OtherDat = ();
1039 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1040 match ue {
1041 UnblockEvent::Timer(_, result) => result,
1042 _ => panic!("Invalid unblock event for timer"),
1043 }
1044 }
1045}
1046
1047impl Unblockable for ActivityResolution {
1048 type OtherDat = ();
1049 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1050 match ue {
1051 UnblockEvent::Activity(_, result) => *result,
1052 _ => panic!("Invalid unblock event for activity"),
1053 }
1054 }
1055}
1056
1057impl Unblockable for PendingChildWorkflow {
1058 type OtherDat = ChildWfCommon;
1060 fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1061 match ue {
1062 UnblockEvent::WorkflowStart(_, result) => Self {
1063 status: *result,
1064 common: od,
1065 },
1066 _ => panic!("Invalid unblock event for child workflow start"),
1067 }
1068 }
1069}
1070
1071impl Unblockable for ChildWorkflowResult {
1072 type OtherDat = ();
1073 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1074 match ue {
1075 UnblockEvent::WorkflowComplete(_, result) => *result,
1076 _ => panic!("Invalid unblock event for child workflow complete"),
1077 }
1078 }
1079}
1080
1081impl Unblockable for SignalExternalWfResult {
1082 type OtherDat = ();
1083 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1084 match ue {
1085 UnblockEvent::SignalExternal(_, maybefail) => {
1086 maybefail.map_or(Ok(SignalExternalOk), Err)
1087 }
1088 _ => panic!("Invalid unblock event for signal external workflow result"),
1089 }
1090 }
1091}
1092
1093impl Unblockable for CancelExternalWfResult {
1094 type OtherDat = ();
1095 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1096 match ue {
1097 UnblockEvent::CancelExternal(_, maybefail) => {
1098 maybefail.map_or(Ok(CancelExternalOk), Err)
1099 }
1100 _ => panic!("Invalid unblock event for signal external workflow result"),
1101 }
1102 }
1103}
1104
1105type NexusStartResult = Result<StartedNexusOperation, Failure>;
1106impl Unblockable for NexusStartResult {
1107 type OtherDat = NexusUnblockData;
1108 fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1109 match ue {
1110 UnblockEvent::NexusOperationStart(_, result) => match *result {
1111 resolve_nexus_operation_start::Status::OperationToken(op_token) => {
1112 Ok(StartedNexusOperation {
1113 operation_token: Some(op_token),
1114 unblock_dat: od,
1115 })
1116 }
1117 resolve_nexus_operation_start::Status::StartedSync(_) => {
1118 Ok(StartedNexusOperation {
1119 operation_token: None,
1120 unblock_dat: od,
1121 })
1122 }
1123 resolve_nexus_operation_start::Status::Failed(f) => Err(f),
1124 },
1125 _ => panic!("Invalid unblock event for nexus operation"),
1126 }
1127 }
1128}
1129
1130impl Unblockable for NexusOperationResult {
1131 type OtherDat = ();
1132
1133 fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1134 match ue {
1135 UnblockEvent::NexusOperationComplete(_, result) => *result,
1136 _ => panic!("Invalid unblock event for nexus operation complete"),
1137 }
1138 }
1139}
1140
1141#[derive(Debug, Clone)]
1143pub(crate) enum CancellableID {
1144 Timer(u32),
1145 Activity(u32),
1146 LocalActivity(u32),
1147 ChildWorkflow {
1148 seqnum: u32,
1149 reason: String,
1150 },
1151 SignalExternalWorkflow(u32),
1152 ExternalWorkflow {
1153 seqnum: u32,
1154 execution: NamespacedWorkflowExecution,
1155 reason: String,
1156 },
1157 NexusOp(u32),
1159}
1160
1161pub(crate) trait SupportsCancelReason {
1163 fn with_reason(self, reason: String) -> CancellableID;
1165}
1166#[derive(Debug, Clone)]
1167pub(crate) enum CancellableIDWithReason {
1168 ChildWorkflow {
1169 seqnum: u32,
1170 },
1171 ExternalWorkflow {
1172 seqnum: u32,
1173 execution: NamespacedWorkflowExecution,
1174 },
1175}
1176impl CancellableIDWithReason {
1177 pub(crate) fn seq_num(&self) -> u32 {
1178 match self {
1179 CancellableIDWithReason::ChildWorkflow { seqnum } => *seqnum,
1180 CancellableIDWithReason::ExternalWorkflow { seqnum, .. } => *seqnum,
1181 }
1182 }
1183}
1184impl SupportsCancelReason for CancellableIDWithReason {
1185 fn with_reason(self, reason: String) -> CancellableID {
1186 match self {
1187 CancellableIDWithReason::ChildWorkflow { seqnum } => {
1188 CancellableID::ChildWorkflow { seqnum, reason }
1189 }
1190 CancellableIDWithReason::ExternalWorkflow { seqnum, execution } => {
1191 CancellableID::ExternalWorkflow {
1192 seqnum,
1193 execution,
1194 reason,
1195 }
1196 }
1197 }
1198 }
1199}
1200impl From<CancellableIDWithReason> for CancellableID {
1201 fn from(v: CancellableIDWithReason) -> Self {
1202 v.with_reason("".to_string())
1203 }
1204}
1205
1206#[derive(derive_more::From)]
1207#[allow(clippy::large_enum_variant)]
1208enum RustWfCmd {
1209 #[from(ignore)]
1210 Cancel(CancellableID),
1211 ForceWFTFailure(anyhow::Error),
1212 NewCmd(CommandCreateRequest),
1213 NewNonblockingCmd(workflow_command::Variant),
1214 SubscribeChildWorkflowCompletion(CommandSubscribeChildWorkflowCompletion),
1215 SubscribeNexusOperationCompletion {
1216 seq: u32,
1217 unblocker: oneshot::Sender<UnblockEvent>,
1218 },
1219}
1220
1221struct CommandCreateRequest {
1222 cmd: WorkflowCommand,
1223 unblocker: oneshot::Sender<UnblockEvent>,
1224}
1225
1226struct CommandSubscribeChildWorkflowCompletion {
1227 seq: u32,
1228 unblocker: oneshot::Sender<UnblockEvent>,
1229}
1230
1231pub type WorkflowResult<T> = Result<T, WorkflowTermination>;
1236
1237#[derive(Debug, thiserror::Error)]
1242pub enum WorkflowTermination {
1243 #[error("Workflow cancelled")]
1245 Cancelled,
1246
1247 #[error("Workflow evicted from cache")]
1249 Evicted,
1250
1251 #[error("Continue as new")]
1253 ContinueAsNew(Box<ContinueAsNewWorkflowExecution>),
1254
1255 #[error("Workflow failed: {0}")]
1257 Failed(#[source] anyhow::Error),
1258}
1259
1260impl WorkflowTermination {
1261 pub fn continue_as_new(can: ContinueAsNewWorkflowExecution) -> Self {
1263 Self::ContinueAsNew(Box::new(can))
1264 }
1265
1266 pub fn failed(err: impl Into<anyhow::Error>) -> Self {
1268 Self::Failed(err.into())
1269 }
1270}
1271
1272impl From<anyhow::Error> for WorkflowTermination {
1273 fn from(err: anyhow::Error) -> Self {
1274 Self::Failed(err)
1275 }
1276}
1277
1278impl From<ActivityExecutionError> for WorkflowTermination {
1279 fn from(value: ActivityExecutionError) -> Self {
1280 Self::failed(value)
1281 }
1282}
1283
1284#[derive(Debug)]
1286pub enum ActExitValue<T> {
1287 WillCompleteAsync,
1289 Normal(T),
1291}
1292
1293impl<T: AsJsonPayloadExt> From<T> for ActExitValue<T> {
1294 fn from(t: T) -> Self {
1295 Self::Normal(t)
1296 }
1297}
1298
1299fn panic_formatter(panic: Box<dyn Any>) -> Box<dyn Display> {
1301 _panic_formatter::<&str>(panic)
1302}
1303fn _panic_formatter<T: 'static + PrintablePanicType>(panic: Box<dyn Any>) -> Box<dyn Display> {
1304 match panic.downcast::<T>() {
1305 Ok(d) => d,
1306 Err(orig) => {
1307 if TypeId::of::<<T as PrintablePanicType>::NextType>()
1308 == TypeId::of::<EndPrintingAttempts>()
1309 {
1310 return Box::new("Couldn't turn panic into a string");
1311 }
1312 _panic_formatter::<T::NextType>(orig)
1313 }
1314 }
1315}
1316trait PrintablePanicType: Display {
1317 type NextType: PrintablePanicType;
1318}
1319impl PrintablePanicType for &str {
1320 type NextType = String;
1321}
1322impl PrintablePanicType for String {
1323 type NextType = EndPrintingAttempts;
1324}
1325struct EndPrintingAttempts {}
1326impl Display for EndPrintingAttempts {
1327 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1328 write!(f, "Will never be printed")
1329 }
1330}
1331impl PrintablePanicType for EndPrintingAttempts {
1332 type NextType = EndPrintingAttempts;
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337 use super::*;
1338 use temporalio_macros::{activities, workflow, workflow_methods};
1339
1340 struct MyActivities {}
1341
1342 #[activities]
1343 impl MyActivities {
1344 #[activity]
1345 async fn my_activity(_ctx: ActivityContext) -> Result<(), ActivityError> {
1346 Ok(())
1347 }
1348
1349 #[activity]
1350 async fn takes_self(
1351 self: Arc<Self>,
1352 _ctx: ActivityContext,
1353 _: String,
1354 ) -> Result<(), ActivityError> {
1355 Ok(())
1356 }
1357 }
1358
1359 #[test]
1360 fn test_activity_registration() {
1361 let act_instance = MyActivities {};
1362 let _ = WorkerOptions::new("task_q").register_activities(act_instance);
1363 }
1364
1365 #[allow(unused, clippy::diverging_sub_expression)]
1367 fn test_activity_via_workflow_context() {
1368 let wf_ctx: WorkflowContext<MyWorkflow> = unimplemented!();
1369 wf_ctx.start_activity(MyActivities::my_activity, (), ActivityOptions::default());
1370 wf_ctx.start_activity(
1371 MyActivities::takes_self,
1372 "Hi".to_owned(),
1373 ActivityOptions::default(),
1374 );
1375 }
1376
1377 #[allow(dead_code, unreachable_code, unused, clippy::diverging_sub_expression)]
1379 async fn test_activity_direct_invocation() {
1380 let ctx: ActivityContext = unimplemented!();
1381 let _result = MyActivities::my_activity.run(ctx).await;
1382 }
1383
1384 #[workflow]
1385 struct MyWorkflow {
1386 counter: u32,
1387 }
1388
1389 #[allow(dead_code)]
1390 #[workflow_methods]
1391 impl MyWorkflow {
1392 #[init]
1393 fn new(_ctx: &WorkflowContextView, _input: String) -> Self {
1394 Self { counter: 0 }
1395 }
1396
1397 #[run]
1398 async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
1399 Ok(format!("Counter: {}", ctx.state(|s| s.counter)))
1400 }
1401
1402 #[signal(name = "increment")]
1403 fn increment_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
1404 self.counter += amount;
1405 }
1406
1407 #[signal]
1408 async fn async_signal(_ctx: &mut WorkflowContext<Self>) {}
1409
1410 #[query]
1411 fn get_counter(&self, _ctx: &WorkflowContextView) -> u32 {
1412 self.counter
1413 }
1414
1415 #[update(name = "double")]
1416 fn double_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>) -> u32 {
1417 self.counter *= 2;
1418 self.counter
1419 }
1420
1421 #[update]
1422 async fn async_update(_ctx: &mut WorkflowContext<Self>, val: i32) -> i32 {
1423 val * 2
1424 }
1425 }
1426
1427 #[test]
1428 fn test_workflow_registration() {
1429 let _ = WorkerOptions::new("task_q").register_workflow::<MyWorkflow>();
1430 }
1431
1432 fn default_identity() -> String {
1433 format!(
1434 "{}@{}",
1435 std::process::id(),
1436 gethostname::gethostname().to_string_lossy()
1437 )
1438 }
1439
1440 #[rstest::rstest]
1441 #[case::default_when_none_provided(None, "", Some(default_identity()))]
1442 #[case::connection_identity_preserved(None, "conn-identity", None)]
1443 #[case::worker_override_takes_precedence(
1444 Some("worker-identity"),
1445 "conn-identity",
1446 Some("worker-identity".into())
1447 )]
1448 #[case::worker_override_with_empty_connection(
1449 Some("worker-identity"),
1450 "",
1451 Some("worker-identity".into())
1452 )]
1453 #[test]
1454 fn client_identity_resolution(
1455 #[case] worker_override: Option<&str>,
1456 #[case] connection_identity: &str,
1457 #[case] expected: Option<String>,
1458 ) {
1459 let opts = WorkerOptions::new("task_q")
1460 .task_types(WorkerTaskTypes::activity_only())
1461 .maybe_client_identity_override(worker_override.map(|s| s.to_owned()))
1462 .build();
1463 let config = opts
1464 .to_core_options("ns".into(), connection_identity.into())
1465 .unwrap();
1466 assert_eq!(config.client_identity_override, expected);
1467 }
1468}