Skip to main content

temporalio_sdk/
lib.rs

1#![warn(missing_docs)] // error if there are missing docs
2
3//! This crate defines an alpha-stage Temporal Rust SDK.
4//!
5//! Currently defining activities and running an activity-only worker is the most stable code.
6//! Workflow definitions exist and running a workflow worker works, but the API is still very
7//! unstable.
8//!
9//! An example of running an activity worker:
10//! ```no_run
11//! use std::str::FromStr;
12//! use temporalio_client::{Client, ClientOptions, Connection, ConnectionOptions};
13//! use temporalio_common::{
14//!     telemetry::TelemetryOptions,
15//!     worker::{WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerTaskTypes},
16//! };
17//! use temporalio_macros::activities;
18//! use temporalio_sdk::{
19//!     Worker, WorkerOptions,
20//!     activities::{ActivityContext, ActivityError},
21//! };
22//! use temporalio_sdk_core::{CoreRuntime, RuntimeOptions, Url};
23//!
24//! struct MyActivities;
25//!
26//! #[activities]
27//! impl MyActivities {
28//!     #[activity]
29//!     pub(crate) async fn echo(
30//!         _ctx: ActivityContext,
31//!         e: String,
32//!     ) -> Result<String, ActivityError> {
33//!         Ok(e)
34//!     }
35//! }
36//!
37//! #[tokio::main]
38//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
39//!     let connection_options =
40//!         ConnectionOptions::new(Url::from_str("http://localhost:7233")?).build();
41//!     let telemetry_options = TelemetryOptions::builder().build();
42//!     let runtime_options = RuntimeOptions::builder()
43//!         .telemetry_options(telemetry_options)
44//!         .build()?;
45//!     let runtime = CoreRuntime::new_assume_tokio(runtime_options)?;
46//!
47//!     let connection = Connection::connect(connection_options).await?;
48//!     let client = Client::new(connection, ClientOptions::new("my_namespace").build())?;
49//!
50//!     let worker_options = WorkerOptions::new("task_queue")
51//!         .task_types(WorkerTaskTypes::activity_only())
52//!         .deployment_options(WorkerDeploymentOptions {
53//!             version: WorkerDeploymentVersion {
54//!                 deployment_name: "my_deployment".to_owned(),
55//!                 build_id: "my_build_id".to_owned(),
56//!             },
57//!             use_worker_versioning: false,
58//!             default_versioning_behavior: None,
59//!         })
60//!         .register_activities(MyActivities)
61//!         .build();
62//!
63//!     let mut worker = Worker::new(&runtime, client, worker_options)?;
64//!     worker.run().await?;
65//!
66//!     Ok(())
67//! }
68//! ```
69
70#[macro_use]
71extern crate tracing;
72extern crate self as temporalio_sdk;
73
74pub mod activities;
75pub mod interceptors;
76mod workflow_context;
77mod workflow_future;
78pub mod workflows;
79
80#[macro_export]
81#[doc(hidden)]
82macro_rules! __temporal_select {
83    ($($tokens:tt)*) => {
84        ::futures_util::select_biased! { $($tokens)* }
85    };
86}
87
88#[macro_export]
89#[doc(hidden)]
90macro_rules! __temporal_join {
91    ($($tokens:tt)*) => {
92        ::futures_util::join!($($tokens)*)
93    };
94}
95
96use workflow_future::WorkflowFunction;
97
98pub use temporalio_client::Namespace;
99pub use workflow_context::{
100    ActivityExecutionError, ActivityOptions, BaseWorkflowContext, CancellableFuture, ChildWorkflow,
101    ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo,
102    PendingChildWorkflow, RootWorkflowInfo, Signal, SignalData, SignalWorkflowOptions,
103    StartedChildWorkflow, SyncWorkflowContext, TimerOptions, WorkflowContext, WorkflowContextView,
104};
105
106use crate::{
107    activities::{
108        ActivityContext, ActivityDefinitions, ActivityError, ActivityImplementer,
109        ExecutableActivity,
110    },
111    interceptors::WorkerInterceptor,
112    workflow_context::{ChildWfCommon, NexusUnblockData, StartedNexusOperation},
113    workflows::{WorkflowDefinitions, WorkflowImplementation, WorkflowImplementer},
114};
115use anyhow::{Context, anyhow, bail};
116use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
117use std::{
118    any::{Any, TypeId},
119    cell::RefCell,
120    collections::{HashMap, HashSet},
121    fmt::{Debug, Display, Formatter},
122    future::Future,
123    panic::AssertUnwindSafe,
124    sync::Arc,
125    time::Duration,
126};
127use temporalio_client::{Client, NamespacedClient};
128use temporalio_common::{
129    ActivityDefinition, WorkflowDefinition,
130    data_converters::{DataConverter, SerializationContextData},
131    payload_visitor::{decode_payloads, encode_payloads},
132    protos::{
133        TaskToken,
134        coresdk::{
135            ActivityTaskCompletion, AsJsonPayloadExt,
136            activity_result::{ActivityExecutionResult, ActivityResolution},
137            activity_task::{ActivityTask, activity_task},
138            child_workflow::ChildWorkflowResult,
139            common::NamespacedWorkflowExecution,
140            nexus::NexusOperationResult,
141            workflow_activation::{
142                WorkflowActivation,
143                resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
144                resolve_nexus_operation_start, workflow_activation_job::Variant,
145            },
146            workflow_commands::{
147                ContinueAsNewWorkflowExecution, WorkflowCommand, workflow_command,
148            },
149            workflow_completion::WorkflowActivationCompletion,
150        },
151        temporal::api::{
152            common::v1::Payload,
153            enums::v1::WorkflowTaskFailedCause,
154            failure::v1::{Failure, failure},
155        },
156    },
157    worker::{WorkerDeploymentOptions, WorkerTaskTypes, build_id_from_current_exe},
158};
159use temporalio_sdk_core::{
160    CoreRuntime, PollError, PollerBehavior, TunerBuilder, Worker as CoreWorker, WorkerConfig,
161    WorkerTuner, WorkerVersioningStrategy, WorkflowErrorType, init_worker,
162};
163use tokio::{
164    sync::{
165        Notify,
166        mpsc::{UnboundedSender, unbounded_channel},
167        oneshot,
168    },
169    task::JoinError,
170};
171use tokio_stream::wrappers::UnboundedReceiverStream;
172use tokio_util::sync::CancellationToken;
173use tracing::{Instrument, Span, field};
174use uuid::Uuid;
175
176/// Contains options for configuring a worker.
177#[derive(bon::Builder, Clone)]
178#[builder(start_fn = new, on(String, into), state_mod(vis = "pub"))]
179#[non_exhaustive]
180pub struct WorkerOptions {
181    /// What task queue will this worker poll from? This task queue name will be used for both
182    /// workflow and activity polling.
183    #[builder(start_fn)]
184    pub task_queue: String,
185
186    #[builder(field)]
187    activities: ActivityDefinitions,
188
189    #[builder(field)]
190    workflows: WorkflowDefinitions,
191
192    /// Set the deployment options for this worker. Defaults to a hash of the currently running
193    /// executable.
194    #[builder(default = def_build_id())]
195    pub deployment_options: WorkerDeploymentOptions,
196    /// A human-readable string that can identify this worker. If set, overrides the identity on
197    /// the client used by this worker. If unset and the client has no identity, defaults to
198    /// `{pid}@{hostname}`.
199    pub client_identity_override: Option<String>,
200    /// If set nonzero, workflows will be cached and sticky task queues will be used, meaning that
201    /// history updates are applied incrementally to suspended instances of workflow execution.
202    /// Workflows are evicted according to a least-recently-used policy once the cache maximum is
203    /// reached. Workflows may also be explicitly evicted at any time, or as a result of errors
204    /// or failures.
205    #[builder(default = 1000)]
206    pub max_cached_workflows: usize,
207    /// Set a [crate::WorkerTuner] for this worker, which controls how many slots are available for
208    /// the different kinds of tasks.
209    #[builder(default = Arc::new(TunerBuilder::default().build()))]
210    pub tuner: Arc<dyn WorkerTuner + Send + Sync>,
211    /// Controls how polling for Workflow tasks will happen on this worker's task queue. See also
212    /// [WorkerConfig::nonsticky_to_sticky_poll_ratio]. If using SimpleMaximum, Must be at least 2
213    /// when `max_cached_workflows` > 0, or is an error.
214    #[builder(default = PollerBehavior::SimpleMaximum(5))]
215    pub workflow_task_poller_behavior: PollerBehavior,
216    /// Only applies when using [PollerBehavior::SimpleMaximum]
217    ///
218    /// (max workflow task polls * this number) = the number of max pollers that will be allowed for
219    /// the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky
220    /// queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for
221    /// either poller is 1, so if the maximum allowed is 1 and sticky queues are enabled, there will
222    /// be 2 concurrent polls.
223    #[builder(default = 0.2)]
224    pub nonsticky_to_sticky_poll_ratio: f32,
225    /// Controls how polling for Activity tasks will happen on this worker's task queue.
226    #[builder(default = PollerBehavior::SimpleMaximum(5))]
227    pub activity_task_poller_behavior: PollerBehavior,
228    /// Controls how polling for Nexus tasks will happen on this worker's task queue.
229    #[builder(default = PollerBehavior::SimpleMaximum(5))]
230    pub nexus_task_poller_behavior: PollerBehavior,
231    // TODO [rust-sdk-branch]: Will go away once workflow registration can only happen in here.
232    //   Then it can be auto-determined.
233    /// Specifies which task types this worker will poll for.
234    ///
235    /// Note: At least one task type must be specified or the worker will fail validation.
236    #[builder(default = WorkerTaskTypes::all())]
237    pub task_types: WorkerTaskTypes,
238    /// How long a workflow task is allowed to sit on the sticky queue before it is timed out
239    /// and moved to the non-sticky queue where it may be picked up by any worker.
240    #[builder(default = Duration::from_secs(10))]
241    pub sticky_queue_schedule_to_start_timeout: Duration,
242    /// Longest interval for throttling activity heartbeats
243    #[builder(default = Duration::from_secs(60))]
244    pub max_heartbeat_throttle_interval: Duration,
245    /// Default interval for throttling activity heartbeats in case
246    /// `ActivityOptions.heartbeat_timeout` is unset.
247    /// When the timeout *is* set in the `ActivityOptions`, throttling is set to
248    /// `heartbeat_timeout * 0.8`.
249    #[builder(default = Duration::from_secs(30))]
250    pub default_heartbeat_throttle_interval: Duration,
251    /// Sets the maximum number of activities per second the task queue will dispatch, controlled
252    /// server-side. Note that this only takes effect upon an activity poll request. If multiple
253    /// workers on the same queue have different values set, they will thrash with the last poller
254    /// winning.
255    ///
256    /// Setting this to a nonzero value will also disable eager activity execution.
257    pub max_task_queue_activities_per_second: Option<f64>,
258    /// Limits the number of activities per second that this worker will process. The worker will
259    /// not poll for new activities if by doing so it might receive and execute an activity which
260    /// would cause it to exceed this limit. Negative, zero, or NaN values will cause building
261    /// the options to fail.
262    pub max_worker_activities_per_second: Option<f64>,
263    /// Any error types listed here will cause any workflow being processed by this worker to fail,
264    /// rather than simply failing the workflow task.
265    #[builder(default)]
266    pub workflow_failure_errors: HashSet<WorkflowErrorType>,
267    /// Like [WorkerConfig::workflow_failure_errors], but specific to certain workflow types (the
268    /// map key).
269    #[builder(default)]
270    pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
271    /// If set, the worker will issue cancels for all outstanding activities and nexus operations after
272    /// shutdown has been initiated and this amount of time has elapsed.
273    pub graceful_shutdown_period: Option<Duration>,
274}
275
276impl<S: worker_options_builder::State> WorkerOptionsBuilder<S> {
277    /// Registers all activities on an activity implementer.
278    pub fn register_activities<AI: ActivityImplementer>(mut self, instance: AI) -> Self {
279        self.activities.register_activities::<AI>(instance);
280        self
281    }
282    /// Registers a specific activitiy.
283    pub fn register_activity<AD>(mut self, instance: Arc<AD::Implementer>) -> Self
284    where
285        AD: ActivityDefinition + ExecutableActivity,
286        AD::Output: Send + Sync,
287    {
288        self.activities.register_activity::<AD>(instance);
289        self
290    }
291
292    /// Registers all workflows on a workflow implementer.
293    pub fn register_workflow<WI: WorkflowImplementer>(mut self) -> Self {
294        self.workflows.register_workflow::<WI>();
295        self
296    }
297
298    /// Register a workflow with a custom factory for instance creation.
299    ///
300    /// # Warning: Advanced Usage
301    ///
302    /// This method is intended for scenarios requiring injection of un-serializable
303    /// state into workflows.
304    ///
305    /// **This can easily cause nondeterminism**
306    ///
307    /// Only use when you understand the implications and have a specific need that cannot be met
308    /// otherwise.
309    ///
310    /// # Panics
311    ///
312    /// Panics if the workflow type defines an `#[init]` method. Workflows using
313    /// factory registration must not have `#[init]` to avoid ambiguity about
314    /// instance creation.
315    pub fn register_workflow_with_factory<W, F>(mut self, factory: F) -> Self
316    where
317        W: WorkflowImplementation,
318        <W::Run as WorkflowDefinition>::Input: Send,
319        F: Fn() -> W + Send + Sync + 'static,
320    {
321        self.workflows
322            .register_workflow_run_with_factory::<W, F>(factory);
323        self
324    }
325}
326
327// Needs to exist to avoid https://github.com/elastio/bon/issues/359
328fn def_build_id() -> WorkerDeploymentOptions {
329    WorkerDeploymentOptions::from_build_id(build_id_from_current_exe().to_owned())
330}
331
332impl WorkerOptions {
333    /// Registers all activities on an activity implementer.
334    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
335        self.activities.register_activities::<AI>(instance);
336        self
337    }
338    /// Registers a specific activitiy.
339    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
340    where
341        AD: ActivityDefinition + ExecutableActivity,
342        AD::Output: Send + Sync,
343    {
344        self.activities.register_activity::<AD>(instance);
345        self
346    }
347    /// Returns all the registered activities by cloning the current set.
348    pub fn activities(&self) -> ActivityDefinitions {
349        self.activities.clone()
350    }
351
352    /// Registers all workflows on a workflow implementer.
353    pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
354        self.workflows.register_workflow::<WI>();
355        self
356    }
357
358    /// Register a workflow with a custom factory for instance creation.
359    ///
360    /// # Warning: Advanced Usage
361    /// See [WorkerOptionsBuilder::register_workflow_with_factory] for more.
362    pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
363    where
364        W: WorkflowImplementation,
365        <W::Run as WorkflowDefinition>::Input: Send,
366        F: Fn() -> W + Send + Sync + 'static,
367    {
368        self.workflows
369            .register_workflow_run_with_factory::<W, F>(factory);
370        self
371    }
372
373    /// Returns all the registered workflows by cloning the current set.
374    pub fn workflows(&self) -> WorkflowDefinitions {
375        self.workflows.clone()
376    }
377
378    #[doc(hidden)]
379    pub fn to_core_options(
380        &self,
381        namespace: String,
382        connection_identity: String,
383    ) -> Result<WorkerConfig, String> {
384        WorkerConfig::builder()
385            .namespace(namespace)
386            .task_queue(self.task_queue.clone())
387            .maybe_client_identity_override(self.client_identity_override.clone().or_else(|| {
388                connection_identity.is_empty().then(|| {
389                    format!(
390                        "{}@{}",
391                        std::process::id(),
392                        gethostname::gethostname().to_string_lossy()
393                    )
394                })
395            }))
396            .max_cached_workflows(self.max_cached_workflows)
397            .tuner(self.tuner.clone())
398            .workflow_task_poller_behavior(self.workflow_task_poller_behavior)
399            .activity_task_poller_behavior(self.activity_task_poller_behavior)
400            .nexus_task_poller_behavior(self.nexus_task_poller_behavior)
401            .task_types(self.task_types)
402            .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
403            .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
404            .default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
405            .maybe_max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
406            .maybe_max_worker_activities_per_second(self.max_worker_activities_per_second)
407            .maybe_graceful_shutdown_period(self.graceful_shutdown_period)
408            .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
409                self.deployment_options.clone(),
410            ))
411            .workflow_failure_errors(self.workflow_failure_errors.clone())
412            .workflow_types_to_failure_errors(self.workflow_types_to_failure_errors.clone())
413            .build()
414    }
415}
416
417/// A worker that can poll for and respond to workflow tasks by using
418/// [temporalio_macros::workflow], and activity tasks by using activities defined with
419/// [temporalio_macros::activities].
420pub struct Worker {
421    common: CommonWorker,
422    workflow_half: WorkflowHalf,
423    activity_half: ActivityHalf,
424}
425
426struct CommonWorker {
427    worker: Arc<CoreWorker>,
428    task_queue: String,
429    worker_interceptor: Option<Box<dyn WorkerInterceptor>>,
430    data_converter: DataConverter,
431}
432
433#[derive(Default)]
434struct WorkflowHalf {
435    /// Maps run id to cached workflow state
436    workflows: RefCell<HashMap<String, WorkflowData>>,
437    workflow_definitions: WorkflowDefinitions,
438    workflow_removed_from_map: Notify,
439}
440struct WorkflowData {
441    /// Channel used to send the workflow activations
442    activation_chan: UnboundedSender<WorkflowActivation>,
443}
444
445struct WorkflowFutureHandle<F: Future<Output = Result<WorkflowResult<Payload>, JoinError>>> {
446    join_handle: F,
447    run_id: String,
448}
449
450#[derive(Default)]
451struct ActivityHalf {
452    /// Maps activity type to the function for executing activities of that type
453    activities: ActivityDefinitions,
454    task_tokens_to_cancels: HashMap<TaskToken, CancellationToken>,
455}
456
457impl Worker {
458    /// Create a new worker from an existing connection, and options.
459    pub fn new(
460        runtime: &CoreRuntime,
461        client: Client,
462        mut options: WorkerOptions,
463    ) -> Result<Self, Box<dyn std::error::Error>> {
464        let acts = std::mem::take(&mut options.activities);
465        let wfs = std::mem::take(&mut options.workflows);
466        let wc = options
467            .to_core_options(client.namespace(), client.identity())
468            .map_err(|s| anyhow::anyhow!("{s}"))?;
469        let core = init_worker(runtime, wc, client.connection().clone())?;
470        let mut me = Self::new_from_core_definitions(
471            Arc::new(core),
472            client.data_converter().clone(),
473            Default::default(),
474            Default::default(),
475        );
476        me.activity_half.activities = acts;
477        me.workflow_half.workflow_definitions = wfs;
478        Ok(me)
479    }
480
481    // TODO [rust-sdk-branch]: Eliminate this constructor in favor of passing in fake connection
482    #[doc(hidden)]
483    pub fn new_from_core(worker: Arc<CoreWorker>, data_converter: DataConverter) -> Self {
484        Self::new_from_core_definitions(
485            worker,
486            data_converter,
487            Default::default(),
488            Default::default(),
489        )
490    }
491
492    // TODO [rust-sdk-branch]: Eliminate this constructor in favor of passing in fake connection
493    #[doc(hidden)]
494    pub fn new_from_core_definitions(
495        worker: Arc<CoreWorker>,
496        data_converter: DataConverter,
497        activities: ActivityDefinitions,
498        workflows: WorkflowDefinitions,
499    ) -> Self {
500        Self {
501            common: CommonWorker {
502                task_queue: worker.get_config().task_queue.clone(),
503                worker,
504                worker_interceptor: None,
505                data_converter,
506            },
507            workflow_half: WorkflowHalf {
508                workflow_definitions: workflows,
509                ..Default::default()
510            },
511            activity_half: ActivityHalf {
512                activities,
513                ..Default::default()
514            },
515        }
516    }
517
518    /// Returns the task queue name this worker polls on
519    pub fn task_queue(&self) -> &str {
520        &self.common.task_queue
521    }
522
523    /// Return a handle that can be used to initiate shutdown. This is useful because [Worker::run]
524    /// takes self mutably, so you may want to obtain a handle for shutting down before running.
525    pub fn shutdown_handle(&self) -> impl Fn() + use<> {
526        let w = self.common.worker.clone();
527        move || w.initiate_shutdown()
528    }
529
530    /// Registers all activities on an activity implementer.
531    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
532        self.activity_half
533            .activities
534            .register_activities::<AI>(instance);
535        self
536    }
537    /// Registers a specific activitiy.
538    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
539    where
540        AD: ActivityDefinition + ExecutableActivity,
541        AD::Output: Send + Sync,
542    {
543        self.activity_half
544            .activities
545            .register_activity::<AD>(instance);
546        self
547    }
548
549    /// Registers all workflows on a workflow implementer.
550    pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
551        self.workflow_half
552            .workflow_definitions
553            .register_workflow::<WI>();
554        self
555    }
556
557    /// Register a workflow with a custom factory for instance creation.
558    ///
559    /// See [WorkerOptionsBuilder::register_workflow_with_factory] for more.
560    pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
561    where
562        W: WorkflowImplementation,
563        <W::Run as WorkflowDefinition>::Input: Send,
564        F: Fn() -> W + Send + Sync + 'static,
565    {
566        self.workflow_half
567            .workflow_definitions
568            .register_workflow_run_with_factory::<W, F>(factory);
569        self
570    }
571
572    /// Runs the worker. Eventually resolves after the worker has been explicitly shut down,
573    /// or may return early with an error in the event of some unresolvable problem.
574    pub async fn run(&mut self) -> Result<(), anyhow::Error> {
575        let shutdown_token = CancellationToken::new();
576        let (common, wf_half, act_half) = self.split_apart();
577        let (wf_future_tx, wf_future_rx) = unbounded_channel();
578        let (completions_tx, completions_rx) = unbounded_channel();
579
580        // Workflows run in a LocalSet because they use Rc<RefCell> for state management.
581        // This allows them to not require Send/Sync bounds.
582        let workflow_local_set = tokio::task::LocalSet::new();
583
584        let wf_future_joiner = async {
585            UnboundedReceiverStream::new(wf_future_rx)
586                .map(Result::<_, anyhow::Error>::Ok)
587                .try_for_each_concurrent(
588                    None,
589                    |WorkflowFutureHandle {
590                         join_handle,
591                         run_id,
592                     }| {
593                        let wf_half = &*wf_half;
594                        async move {
595                            let result = join_handle.await?;
596                            // Eviction is normal workflow lifecycle - workflows loop waiting for
597                            // eviction after completion to manage cache cleanup
598                            if let Err(e) = result
599                                && !matches!(e, WorkflowTermination::Evicted)
600                            {
601                                return Err(e.into());
602                            }
603                            debug!(run_id=%run_id, "Removing workflow from cache");
604                            wf_half.workflows.borrow_mut().remove(&run_id);
605                            wf_half.workflow_removed_from_map.notify_one();
606                            Ok(())
607                        }
608                    },
609                )
610                .await
611                .context("Workflow futures encountered an error")
612        };
613        let wf_completion_processor = async {
614            UnboundedReceiverStream::new(completions_rx)
615                .map(Ok)
616                .try_for_each_concurrent(None, |mut completion| async {
617                    encode_payloads(
618                        &mut completion,
619                        common.data_converter.codec(),
620                        &SerializationContextData::Workflow,
621                    )
622                    .await;
623                    if let Some(ref i) = common.worker_interceptor {
624                        i.on_workflow_activation_completion(&completion).await;
625                    }
626                    common.worker.complete_workflow_activation(completion).await
627                })
628                .map_err(anyhow::Error::from)
629                .await
630                .context("Workflow completions processor encountered an error")
631        };
632        tokio::try_join!(
633            // Workflow-related tasks run inside LocalSet (allows !Send futures)
634            async {
635                workflow_local_set.run_until(async {
636                    tokio::try_join!(
637                        // Workflow polling loop
638                        async {
639                            loop {
640                            let mut activation =
641                                match common.worker.poll_workflow_activation().await {
642                                    Err(PollError::ShutDown) => {
643                                        break;
644                                    }
645                                    o => o?,
646                                };
647                            decode_payloads(
648                                &mut activation,
649                                common.data_converter.codec(),
650                                &SerializationContextData::Workflow,
651                            )
652                            .await;
653                            if let Some(ref i) = common.worker_interceptor {
654                                i.on_workflow_activation(&activation).await?;
655                            }
656                            if let Some(wf_fut) = wf_half
657                                .workflow_activation_handler(
658                                    common,
659                                    shutdown_token.clone(),
660                                    activation,
661                                    &completions_tx,
662                                )
663                                .await?
664                                && wf_future_tx.send(wf_fut).is_err()
665                            {
666                                panic!(
667                                    "Receive half of completion processor channel cannot be dropped"
668                                );
669                            }
670                        }
671                        // Tell still-alive workflows to evict themselves
672                        shutdown_token.cancel();
673                        // It's important to drop these so the future and completion processors will
674                        // terminate.
675                        drop(wf_future_tx);
676                        drop(completions_tx);
677                        Result::<_, anyhow::Error>::Ok(())
678                    },
679                    wf_future_joiner,
680                )
681                }).await
682            },
683            // Only poll on the activity queue if activity functions have been registered. This
684            // makes tests which use mocks dramatically more manageable.
685            async {
686                if !act_half.activities.is_empty() {
687                    loop {
688                        let activity = common.worker.poll_activity_task().await;
689                        if matches!(activity, Err(PollError::ShutDown)) {
690                            break;
691                        }
692                        let mut activity = activity?;
693                        decode_payloads(
694                            &mut activity,
695                            common.data_converter.codec(),
696                            &SerializationContextData::Activity,
697                        )
698                        .await;
699                        act_half.activity_task_handler(
700                            common.worker.clone(),
701                            common.task_queue.clone(),
702                            common.data_converter.clone(),
703                            activity,
704                        )?;
705                    }
706                };
707                Result::<_, anyhow::Error>::Ok(())
708            },
709            wf_completion_processor,
710        )?;
711
712        if let Some(i) = self.common.worker_interceptor.as_ref() {
713            i.on_shutdown(self);
714        }
715        self.common.worker.shutdown().await;
716        Ok(())
717    }
718
719    /// Set a [WorkerInterceptor]
720    pub fn set_worker_interceptor(&mut self, interceptor: impl WorkerInterceptor + 'static) {
721        self.common.worker_interceptor = Some(Box::new(interceptor));
722    }
723
724    /// Turns this rust worker into a new worker with all the same workflows and activities
725    /// registered, but with a new underlying core worker. Can be used to swap the worker for
726    /// a replay worker, change task queues, etc.
727    pub fn with_new_core_worker(&mut self, new_core_worker: Arc<CoreWorker>) {
728        self.common.worker = new_core_worker;
729    }
730
731    /// Returns number of currently cached workflows as understood by the SDK. Importantly, this
732    /// is not the same as understood by core, though they *should* always be in sync.
733    pub fn cached_workflows(&self) -> usize {
734        self.workflow_half.workflows.borrow().len()
735    }
736
737    /// Returns the instance key for this worker, used for worker heartbeating.
738    pub fn worker_instance_key(&self) -> Uuid {
739        self.common.worker.worker_instance_key()
740    }
741
742    #[doc(hidden)]
743    pub fn core_worker(&self) -> Arc<CoreWorker> {
744        self.common.worker.clone()
745    }
746
747    fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) {
748        (
749            &mut self.common,
750            &mut self.workflow_half,
751            &mut self.activity_half,
752        )
753    }
754}
755
756impl WorkflowHalf {
757    #[allow(clippy::type_complexity)]
758    async fn workflow_activation_handler(
759        &self,
760        common: &CommonWorker,
761        shutdown_token: CancellationToken,
762        mut activation: WorkflowActivation,
763        completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
764    ) -> Result<
765        Option<
766            WorkflowFutureHandle<
767                impl Future<Output = Result<WorkflowResult<Payload>, JoinError>> + use<>,
768            >,
769        >,
770        anyhow::Error,
771    > {
772        let mut res = None;
773        let run_id = activation.run_id.clone();
774
775        // If the activation is to init a workflow, create a new workflow driver for it,
776        // using the function associated with that workflow id
777        if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
778            Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
779            _ => None,
780        }) {
781            let workflow_type = sw.workflow_type.clone();
782            let payload_converter = common.data_converter.payload_converter().clone();
783            let (wff, activations) = {
784                if let Some(factory) = self.workflow_definitions.get_workflow(&workflow_type) {
785                    match WorkflowFunction::from_invocation(factory).start_workflow(
786                        common.worker.get_config().namespace.clone(),
787                        common.task_queue.clone(),
788                        run_id.clone(),
789                        std::mem::take(sw),
790                        completions_tx.clone(),
791                        payload_converter,
792                    ) {
793                        Ok(result) => result,
794                        Err(e) => {
795                            warn!("Failed to create workflow {workflow_type}: {e}");
796                            completions_tx
797                                .send(WorkflowActivationCompletion::fail(
798                                    run_id,
799                                    format!("Failed to create workflow: {e}").into(),
800                                    Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
801                                ))
802                                .expect("Completion channel intact");
803                            return Ok(None);
804                        }
805                    }
806                } else {
807                    warn!("Workflow type {workflow_type} not found");
808                    completions_tx
809                        .send(WorkflowActivationCompletion::fail(
810                            run_id,
811                            format!("Workflow type {workflow_type} not found").into(),
812                            Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
813                        ))
814                        .expect("Completion channel intact");
815                    return Ok(None);
816                }
817            };
818            // Wrap in unconstrained to prevent Tokio from imposing limits on commands per poll
819            // TODO [rust-sdk-branch]: Deadlock detection
820            let wff = tokio::task::unconstrained(wff);
821            // The LocalSet is created in Worker::run().
822            let jh = tokio::task::spawn_local(async move {
823                tokio::select! {
824                    r = wff.fuse() => r,
825                    // TODO: This probably shouldn't abort early, as it could cause an in-progress
826                    //  complete to abort. Send synthetic remove activation
827                    _ = shutdown_token.cancelled() => {
828                        Err(WorkflowTermination::Evicted)
829                    }
830                }
831            });
832            res = Some(WorkflowFutureHandle {
833                join_handle: jh,
834                run_id: run_id.clone(),
835            });
836            loop {
837                // It's possible that we've got a new initialize workflow action before the last
838                // future for this run finished evicting, as a result of how futures might be
839                // interleaved. In that case, just wait until it's not in the map, which should be
840                // a matter of only a few `poll` calls.
841                if self.workflows.borrow_mut().contains_key(&run_id) {
842                    self.workflow_removed_from_map.notified().await;
843                } else {
844                    break;
845                }
846            }
847            self.workflows.borrow_mut().insert(
848                run_id.clone(),
849                WorkflowData {
850                    activation_chan: activations,
851                },
852            );
853        }
854
855        // The activation is expected to apply to some workflow we know about. Use it to
856        // unblock things and advance the workflow.
857        if let Some(dat) = self.workflows.borrow_mut().get_mut(&run_id) {
858            dat.activation_chan
859                .send(activation)
860                .expect("Workflow should exist if we're sending it an activation");
861        } else {
862            // When we failed to start a workflow, we never inserted it into the cache. But core
863            // sends us a `RemoveFromCache` job when we mark the StartWorkflow workflow activation
864            // as a failure, which we need to complete. Other SDKs add the workflow to the cache
865            // even when the workflow type is unknown/not found. To circumvent this, we simply mark
866            // any RemoveFromCache job for workflows that are not in the cache as complete.
867            if activation.jobs.len() == 1
868                && matches!(
869                    activation.jobs.first().map(|j| &j.variant),
870                    Some(Some(Variant::RemoveFromCache(_)))
871                )
872            {
873                completions_tx
874                    .send(WorkflowActivationCompletion::from_cmds(run_id, vec![]))
875                    .expect("Completion channel intact");
876                return Ok(None);
877            }
878
879            // In all other cases, we want to error as the runtime could be in an inconsistent state
880            // at this point.
881            bail!("Got activation {activation:?} for unknown workflow {run_id}");
882        };
883
884        Ok(res)
885    }
886}
887
888impl ActivityHalf {
889    /// Spawns off a task to handle the provided activity task
890    fn activity_task_handler(
891        &mut self,
892        worker: Arc<CoreWorker>,
893        task_queue: String,
894        data_converter: DataConverter,
895        activity: ActivityTask,
896    ) -> Result<(), anyhow::Error> {
897        match activity.variant {
898            Some(activity_task::Variant::Start(start)) => {
899                let act_fn = self.activities.get(&start.activity_type).ok_or_else(|| {
900                    anyhow!(
901                        "No function registered for activity type {}",
902                        start.activity_type
903                    )
904                })?;
905                let span = info_span!(
906                    "RunActivity",
907                    "otel.name" = format!("RunActivity:{}", start.activity_type),
908                    "otel.kind" = "server",
909                    "temporalActivityID" = start.activity_id,
910                    "temporalWorkflowID" = field::Empty,
911                    "temporalRunID" = field::Empty,
912                );
913                let ct = CancellationToken::new();
914                let task_token = activity.task_token;
915                self.task_tokens_to_cancels
916                    .insert(task_token.clone().into(), ct.clone());
917
918                let (ctx, args) =
919                    ActivityContext::new(worker.clone(), ct, task_queue, task_token.clone(), start);
920                let codec_data_converter = data_converter.clone();
921
922                tokio::spawn(async move {
923                    let act_fut = async move {
924                        if let Some(info) = &ctx.info().workflow_execution {
925                            Span::current()
926                                .record("temporalWorkflowID", &info.workflow_id)
927                                .record("temporalRunID", &info.run_id);
928                        }
929                        (act_fn)(args, data_converter, ctx).await
930                    }
931                    .instrument(span);
932                    let output = AssertUnwindSafe(act_fut).catch_unwind().await;
933                    let result = match output {
934                        Err(e) => ActivityExecutionResult::fail(Failure::application_failure(
935                            format!("Activity function panicked: {}", panic_formatter(e)),
936                            true,
937                        )),
938                        Ok(Ok(p)) => ActivityExecutionResult::ok(p),
939                        Ok(Err(err)) => match err {
940                            ActivityError::Retryable {
941                                source,
942                                explicit_delay,
943                            } => ActivityExecutionResult::fail({
944                                let mut f = Failure::application_failure_from_error(
945                                    anyhow::Error::from_boxed(source),
946                                    false,
947                                );
948                                if let Some(d) = explicit_delay
949                                    && let Some(failure::FailureInfo::ApplicationFailureInfo(fi)) =
950                                        f.failure_info.as_mut()
951                                {
952                                    fi.next_retry_delay = d.try_into().ok();
953                                }
954                                f
955                            }),
956                            ActivityError::Cancelled { details } => {
957                                ActivityExecutionResult::cancel_from_details(details)
958                            }
959                            ActivityError::NonRetryable(nre) => ActivityExecutionResult::fail(
960                                Failure::application_failure_from_error(
961                                    anyhow::Error::from_boxed(nre),
962                                    true,
963                                ),
964                            ),
965                            ActivityError::WillCompleteAsync => {
966                                ActivityExecutionResult::will_complete_async()
967                            }
968                        },
969                    };
970                    let mut completion = ActivityTaskCompletion {
971                        task_token,
972                        result: Some(result),
973                    };
974                    encode_payloads(
975                        &mut completion,
976                        codec_data_converter.codec(),
977                        &SerializationContextData::Activity,
978                    )
979                    .await;
980                    worker.complete_activity_task(completion).await?;
981                    Ok::<_, anyhow::Error>(())
982                });
983            }
984            Some(activity_task::Variant::Cancel(_)) => {
985                if let Some(ct) = self
986                    .task_tokens_to_cancels
987                    .get(activity.task_token.as_slice())
988                {
989                    ct.cancel();
990                }
991            }
992            None => bail!("Undefined activity task variant"),
993        }
994        Ok(())
995    }
996}
997
998#[derive(Debug)]
999enum UnblockEvent {
1000    Timer(u32, TimerResult),
1001    Activity(u32, Box<ActivityResolution>),
1002    WorkflowStart(u32, Box<ChildWorkflowStartStatus>),
1003    WorkflowComplete(u32, Box<ChildWorkflowResult>),
1004    SignalExternal(u32, Option<Failure>),
1005    CancelExternal(u32, Option<Failure>),
1006    NexusOperationStart(u32, Box<resolve_nexus_operation_start::Status>),
1007    NexusOperationComplete(u32, Box<NexusOperationResult>),
1008}
1009
1010/// Result of awaiting on a timer
1011#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1012pub enum TimerResult {
1013    /// The timer was cancelled
1014    Cancelled,
1015    /// The timer elapsed and fired
1016    Fired,
1017}
1018
1019/// Successful result of sending a signal to an external workflow
1020#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1021pub struct SignalExternalOk;
1022/// Result of awaiting on sending a signal to an external workflow
1023pub type SignalExternalWfResult = Result<SignalExternalOk, Failure>;
1024
1025/// Successful result of sending a cancel request to an external workflow
1026#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1027pub struct CancelExternalOk;
1028/// Result of awaiting on sending a cancel request to an external workflow
1029pub type CancelExternalWfResult = Result<CancelExternalOk, Failure>;
1030
1031trait Unblockable {
1032    type OtherDat;
1033
1034    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self;
1035}
1036
1037impl Unblockable for TimerResult {
1038    type OtherDat = ();
1039    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1040        match ue {
1041            UnblockEvent::Timer(_, result) => result,
1042            _ => panic!("Invalid unblock event for timer"),
1043        }
1044    }
1045}
1046
1047impl Unblockable for ActivityResolution {
1048    type OtherDat = ();
1049    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1050        match ue {
1051            UnblockEvent::Activity(_, result) => *result,
1052            _ => panic!("Invalid unblock event for activity"),
1053        }
1054    }
1055}
1056
1057impl Unblockable for PendingChildWorkflow {
1058    // Other data here is workflow id
1059    type OtherDat = ChildWfCommon;
1060    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1061        match ue {
1062            UnblockEvent::WorkflowStart(_, result) => Self {
1063                status: *result,
1064                common: od,
1065            },
1066            _ => panic!("Invalid unblock event for child workflow start"),
1067        }
1068    }
1069}
1070
1071impl Unblockable for ChildWorkflowResult {
1072    type OtherDat = ();
1073    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1074        match ue {
1075            UnblockEvent::WorkflowComplete(_, result) => *result,
1076            _ => panic!("Invalid unblock event for child workflow complete"),
1077        }
1078    }
1079}
1080
1081impl Unblockable for SignalExternalWfResult {
1082    type OtherDat = ();
1083    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1084        match ue {
1085            UnblockEvent::SignalExternal(_, maybefail) => {
1086                maybefail.map_or(Ok(SignalExternalOk), Err)
1087            }
1088            _ => panic!("Invalid unblock event for signal external workflow result"),
1089        }
1090    }
1091}
1092
1093impl Unblockable for CancelExternalWfResult {
1094    type OtherDat = ();
1095    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1096        match ue {
1097            UnblockEvent::CancelExternal(_, maybefail) => {
1098                maybefail.map_or(Ok(CancelExternalOk), Err)
1099            }
1100            _ => panic!("Invalid unblock event for signal external workflow result"),
1101        }
1102    }
1103}
1104
1105type NexusStartResult = Result<StartedNexusOperation, Failure>;
1106impl Unblockable for NexusStartResult {
1107    type OtherDat = NexusUnblockData;
1108    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1109        match ue {
1110            UnblockEvent::NexusOperationStart(_, result) => match *result {
1111                resolve_nexus_operation_start::Status::OperationToken(op_token) => {
1112                    Ok(StartedNexusOperation {
1113                        operation_token: Some(op_token),
1114                        unblock_dat: od,
1115                    })
1116                }
1117                resolve_nexus_operation_start::Status::StartedSync(_) => {
1118                    Ok(StartedNexusOperation {
1119                        operation_token: None,
1120                        unblock_dat: od,
1121                    })
1122                }
1123                resolve_nexus_operation_start::Status::Failed(f) => Err(f),
1124            },
1125            _ => panic!("Invalid unblock event for nexus operation"),
1126        }
1127    }
1128}
1129
1130impl Unblockable for NexusOperationResult {
1131    type OtherDat = ();
1132
1133    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1134        match ue {
1135            UnblockEvent::NexusOperationComplete(_, result) => *result,
1136            _ => panic!("Invalid unblock event for nexus operation complete"),
1137        }
1138    }
1139}
1140
1141/// Identifier for cancellable operations
1142#[derive(Debug, Clone)]
1143pub(crate) enum CancellableID {
1144    Timer(u32),
1145    Activity(u32),
1146    LocalActivity(u32),
1147    ChildWorkflow {
1148        seqnum: u32,
1149        reason: String,
1150    },
1151    SignalExternalWorkflow(u32),
1152    ExternalWorkflow {
1153        seqnum: u32,
1154        execution: NamespacedWorkflowExecution,
1155        reason: String,
1156    },
1157    /// A nexus operation (waiting for start)
1158    NexusOp(u32),
1159}
1160
1161/// Cancellation IDs that support a reason.
1162pub(crate) trait SupportsCancelReason {
1163    /// Returns a new version of this ID with the provided cancellation reason.
1164    fn with_reason(self, reason: String) -> CancellableID;
1165}
1166#[derive(Debug, Clone)]
1167pub(crate) enum CancellableIDWithReason {
1168    ChildWorkflow {
1169        seqnum: u32,
1170    },
1171    ExternalWorkflow {
1172        seqnum: u32,
1173        execution: NamespacedWorkflowExecution,
1174    },
1175}
1176impl CancellableIDWithReason {
1177    pub(crate) fn seq_num(&self) -> u32 {
1178        match self {
1179            CancellableIDWithReason::ChildWorkflow { seqnum } => *seqnum,
1180            CancellableIDWithReason::ExternalWorkflow { seqnum, .. } => *seqnum,
1181        }
1182    }
1183}
1184impl SupportsCancelReason for CancellableIDWithReason {
1185    fn with_reason(self, reason: String) -> CancellableID {
1186        match self {
1187            CancellableIDWithReason::ChildWorkflow { seqnum } => {
1188                CancellableID::ChildWorkflow { seqnum, reason }
1189            }
1190            CancellableIDWithReason::ExternalWorkflow { seqnum, execution } => {
1191                CancellableID::ExternalWorkflow {
1192                    seqnum,
1193                    execution,
1194                    reason,
1195                }
1196            }
1197        }
1198    }
1199}
1200impl From<CancellableIDWithReason> for CancellableID {
1201    fn from(v: CancellableIDWithReason) -> Self {
1202        v.with_reason("".to_string())
1203    }
1204}
1205
1206#[derive(derive_more::From)]
1207#[allow(clippy::large_enum_variant)]
1208enum RustWfCmd {
1209    #[from(ignore)]
1210    Cancel(CancellableID),
1211    ForceWFTFailure(anyhow::Error),
1212    NewCmd(CommandCreateRequest),
1213    NewNonblockingCmd(workflow_command::Variant),
1214    SubscribeChildWorkflowCompletion(CommandSubscribeChildWorkflowCompletion),
1215    SubscribeNexusOperationCompletion {
1216        seq: u32,
1217        unblocker: oneshot::Sender<UnblockEvent>,
1218    },
1219}
1220
1221struct CommandCreateRequest {
1222    cmd: WorkflowCommand,
1223    unblocker: oneshot::Sender<UnblockEvent>,
1224}
1225
1226struct CommandSubscribeChildWorkflowCompletion {
1227    seq: u32,
1228    unblocker: oneshot::Sender<UnblockEvent>,
1229}
1230
1231/// The result of running a workflow.
1232///
1233/// Successful completion returns `Ok(T)` where `T` is the workflow's return type.
1234/// Non-error terminations (cancel, eviction, continue-as-new) return `Err(WorkflowTermination)`.
1235pub type WorkflowResult<T> = Result<T, WorkflowTermination>;
1236
1237/// Represents ways a workflow can terminate without producing a normal result.
1238///
1239/// This is used as the error type in [`WorkflowResult<T>`] for non-error termination conditions
1240/// like cancellation, eviction, continue-as-new, or actual failures.
1241#[derive(Debug, thiserror::Error)]
1242pub enum WorkflowTermination {
1243    /// The workflow was cancelled.
1244    #[error("Workflow cancelled")]
1245    Cancelled,
1246
1247    /// The workflow was evicted from the cache.
1248    #[error("Workflow evicted from cache")]
1249    Evicted,
1250
1251    /// The workflow should continue as a new execution.
1252    #[error("Continue as new")]
1253    ContinueAsNew(Box<ContinueAsNewWorkflowExecution>),
1254
1255    /// The workflow failed with an error.
1256    #[error("Workflow failed: {0}")]
1257    Failed(#[source] anyhow::Error),
1258}
1259
1260impl WorkflowTermination {
1261    /// Construct a [WorkflowTermination::ContinueAsNew]
1262    pub fn continue_as_new(can: ContinueAsNewWorkflowExecution) -> Self {
1263        Self::ContinueAsNew(Box::new(can))
1264    }
1265
1266    /// Construct a [WorkflowTermination::Failed] variant from any error.
1267    pub fn failed(err: impl Into<anyhow::Error>) -> Self {
1268        Self::Failed(err.into())
1269    }
1270}
1271
1272impl From<anyhow::Error> for WorkflowTermination {
1273    fn from(err: anyhow::Error) -> Self {
1274        Self::Failed(err)
1275    }
1276}
1277
1278impl From<ActivityExecutionError> for WorkflowTermination {
1279    fn from(value: ActivityExecutionError) -> Self {
1280        Self::failed(value)
1281    }
1282}
1283
1284/// Activity functions may return these values when exiting
1285#[derive(Debug)]
1286pub enum ActExitValue<T> {
1287    /// Completion requires an asynchronous callback
1288    WillCompleteAsync,
1289    /// Finish with a result
1290    Normal(T),
1291}
1292
1293impl<T: AsJsonPayloadExt> From<T> for ActExitValue<T> {
1294    fn from(t: T) -> Self {
1295        Self::Normal(t)
1296    }
1297}
1298
1299/// Attempts to turn caught panics into something printable
1300fn panic_formatter(panic: Box<dyn Any>) -> Box<dyn Display> {
1301    _panic_formatter::<&str>(panic)
1302}
1303fn _panic_formatter<T: 'static + PrintablePanicType>(panic: Box<dyn Any>) -> Box<dyn Display> {
1304    match panic.downcast::<T>() {
1305        Ok(d) => d,
1306        Err(orig) => {
1307            if TypeId::of::<<T as PrintablePanicType>::NextType>()
1308                == TypeId::of::<EndPrintingAttempts>()
1309            {
1310                return Box::new("Couldn't turn panic into a string");
1311            }
1312            _panic_formatter::<T::NextType>(orig)
1313        }
1314    }
1315}
1316trait PrintablePanicType: Display {
1317    type NextType: PrintablePanicType;
1318}
1319impl PrintablePanicType for &str {
1320    type NextType = String;
1321}
1322impl PrintablePanicType for String {
1323    type NextType = EndPrintingAttempts;
1324}
1325struct EndPrintingAttempts {}
1326impl Display for EndPrintingAttempts {
1327    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1328        write!(f, "Will never be printed")
1329    }
1330}
1331impl PrintablePanicType for EndPrintingAttempts {
1332    type NextType = EndPrintingAttempts;
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337    use super::*;
1338    use temporalio_macros::{activities, workflow, workflow_methods};
1339
1340    struct MyActivities {}
1341
1342    #[activities]
1343    impl MyActivities {
1344        #[activity]
1345        async fn my_activity(_ctx: ActivityContext) -> Result<(), ActivityError> {
1346            Ok(())
1347        }
1348
1349        #[activity]
1350        async fn takes_self(
1351            self: Arc<Self>,
1352            _ctx: ActivityContext,
1353            _: String,
1354        ) -> Result<(), ActivityError> {
1355            Ok(())
1356        }
1357    }
1358
1359    #[test]
1360    fn test_activity_registration() {
1361        let act_instance = MyActivities {};
1362        let _ = WorkerOptions::new("task_q").register_activities(act_instance);
1363    }
1364
1365    // Compile-only test for workflow context invocation
1366    #[allow(unused, clippy::diverging_sub_expression)]
1367    fn test_activity_via_workflow_context() {
1368        let wf_ctx: WorkflowContext<MyWorkflow> = unimplemented!();
1369        wf_ctx.start_activity(MyActivities::my_activity, (), ActivityOptions::default());
1370        wf_ctx.start_activity(
1371            MyActivities::takes_self,
1372            "Hi".to_owned(),
1373            ActivityOptions::default(),
1374        );
1375    }
1376
1377    // Compile-only test for direct invocation via .run()
1378    #[allow(dead_code, unreachable_code, unused, clippy::diverging_sub_expression)]
1379    async fn test_activity_direct_invocation() {
1380        let ctx: ActivityContext = unimplemented!();
1381        let _result = MyActivities::my_activity.run(ctx).await;
1382    }
1383
1384    #[workflow]
1385    struct MyWorkflow {
1386        counter: u32,
1387    }
1388
1389    #[allow(dead_code)]
1390    #[workflow_methods]
1391    impl MyWorkflow {
1392        #[init]
1393        fn new(_ctx: &WorkflowContextView, _input: String) -> Self {
1394            Self { counter: 0 }
1395        }
1396
1397        #[run]
1398        async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
1399            Ok(format!("Counter: {}", ctx.state(|s| s.counter)))
1400        }
1401
1402        #[signal(name = "increment")]
1403        fn increment_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
1404            self.counter += amount;
1405        }
1406
1407        #[signal]
1408        async fn async_signal(_ctx: &mut WorkflowContext<Self>) {}
1409
1410        #[query]
1411        fn get_counter(&self, _ctx: &WorkflowContextView) -> u32 {
1412            self.counter
1413        }
1414
1415        #[update(name = "double")]
1416        fn double_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>) -> u32 {
1417            self.counter *= 2;
1418            self.counter
1419        }
1420
1421        #[update]
1422        async fn async_update(_ctx: &mut WorkflowContext<Self>, val: i32) -> i32 {
1423            val * 2
1424        }
1425    }
1426
1427    #[test]
1428    fn test_workflow_registration() {
1429        let _ = WorkerOptions::new("task_q").register_workflow::<MyWorkflow>();
1430    }
1431
1432    fn default_identity() -> String {
1433        format!(
1434            "{}@{}",
1435            std::process::id(),
1436            gethostname::gethostname().to_string_lossy()
1437        )
1438    }
1439
1440    #[rstest::rstest]
1441    #[case::default_when_none_provided(None, "", Some(default_identity()))]
1442    #[case::connection_identity_preserved(None, "conn-identity", None)]
1443    #[case::worker_override_takes_precedence(
1444        Some("worker-identity"),
1445        "conn-identity",
1446        Some("worker-identity".into())
1447    )]
1448    #[case::worker_override_with_empty_connection(
1449        Some("worker-identity"),
1450        "",
1451        Some("worker-identity".into())
1452    )]
1453    #[test]
1454    fn client_identity_resolution(
1455        #[case] worker_override: Option<&str>,
1456        #[case] connection_identity: &str,
1457        #[case] expected: Option<String>,
1458    ) {
1459        let opts = WorkerOptions::new("task_q")
1460            .task_types(WorkerTaskTypes::activity_only())
1461            .maybe_client_identity_override(worker_override.map(|s| s.to_owned()))
1462            .build();
1463        let config = opts
1464            .to_core_options("ns".into(), connection_identity.into())
1465            .unwrap();
1466        assert_eq!(config.client_identity_override, expected);
1467    }
1468}