Skip to main content

temporalio_sdk/
lib.rs

1#![warn(missing_docs)] // error if there are missing docs
2
3//! This crate defines a Public Preview Temporal Rust SDK.
4//!
5//! The SDK is built on top of Core and provides a native Rust experience for writing Temporal
6//! Workflows and Activities.
7//!
8//! The SDK is in Public Preview and under active development. The API can and will continue to evolve.
9//!
10//! An example of running an activity worker:
11//! ```no_run
12//! use std::str::FromStr;
13//! use temporalio_client::{Client, ClientOptions, Connection, ConnectionOptions};
14//! use temporalio_common::{
15//!     telemetry::TelemetryOptions,
16//!     worker::{WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerTaskTypes},
17//! };
18//! use temporalio_macros::activities;
19//! use temporalio_sdk::{
20//!     Worker, WorkerOptions,
21//!     activities::{ActivityContext, ActivityError},
22//! };
23//! use temporalio_sdk_core::{CoreRuntime, RuntimeOptions, Url};
24//!
25//! struct MyActivities;
26//!
27//! #[activities]
28//! impl MyActivities {
29//!     #[activity]
30//!     pub(crate) async fn echo(
31//!         _ctx: ActivityContext,
32//!         e: String,
33//!     ) -> Result<String, ActivityError> {
34//!         Ok(e)
35//!     }
36//! }
37//!
38//! #[tokio::main]
39//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
40//!     let connection_options =
41//!         ConnectionOptions::new(Url::from_str("http://localhost:7233")?).build();
42//!     let telemetry_options = TelemetryOptions::builder().build();
43//!     let runtime_options = RuntimeOptions::builder()
44//!         .telemetry_options(telemetry_options)
45//!         .build()?;
46//!     let runtime = CoreRuntime::new_assume_tokio(runtime_options)?;
47//!
48//!     let connection = Connection::connect(connection_options).await?;
49//!     let client = Client::new(connection, ClientOptions::new("my_namespace").build())?;
50//!
51//!     let worker_options = WorkerOptions::new("task_queue")
52//!         .task_types(WorkerTaskTypes::activity_only())
53//!         .deployment_options(WorkerDeploymentOptions {
54//!             version: WorkerDeploymentVersion {
55//!                 deployment_name: "my_deployment".to_owned(),
56//!                 build_id: "my_build_id".to_owned(),
57//!             },
58//!             use_worker_versioning: false,
59//!             default_versioning_behavior: None,
60//!         })
61//!         .register_activities(MyActivities)
62//!         .build();
63//!
64//!     let mut worker = Worker::new(&runtime, client, worker_options)?;
65//!     worker.run().await?;
66//!
67//!     Ok(())
68//! }
69//! ```
70
71#[macro_use]
72extern crate tracing;
73extern crate self as temporalio_sdk;
74
75pub mod activities;
76pub mod error;
77pub mod interceptors;
78mod workflow_executor;
79mod workflow_future;
80mod workflow_registry;
81#[cfg(feature = "wasm-workflows")]
82mod workflow_wasm;
83pub mod workflows;
84
85pub use crate::error::{
86    ActivityExecutionError, ApplicationFailure, ChildWorkflowExecutionError,
87    ChildWorkflowStartError, OutgoingActivityError, OutgoingError, OutgoingWorkflowError,
88    WorkflowRegistrationError, WorkflowSignalError,
89};
90pub use temporalio_client::Namespace;
91pub use temporalio_workflow::{
92    ActivityCloseTimeouts, ActivityOptions, BaseWorkflowContext, CancellableFuture,
93    ChildWorkflowOptions, ContinueAsNewOptions, ContinueAsNewVersioningBehavior,
94    ExternalWorkflowHandle, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo,
95    RootWorkflowInfo, Signal, SignalData, StartChildWorkflowExecutionFailedCause,
96    StartedChildWorkflow, SyncWorkflowContext, TimerOptions, TimerResult, WorkflowContext,
97    WorkflowContextView, WorkflowResult, WorkflowTermination,
98};
99#[cfg(feature = "wasm-workflows")]
100pub use workflow_wasm::WasmWorkflowComponent;
101
102use crate::{
103    activities::{
104        ActivityContext, ActivityDefinitions, ActivityImplementer, ExecutableActivity,
105        activity_error_to_core_result,
106    },
107    interceptors::{ActivityInboundInterceptor, WorkerInterceptor},
108    workflow_executor::{TaskHandle, WorkflowExecutor},
109    workflow_future::start_workflow,
110    workflow_registry::WorkflowDefinitions,
111};
112use anyhow::{Context, anyhow, bail};
113use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
114use std::{
115    any::{Any, TypeId},
116    cell::RefCell,
117    collections::{HashMap, HashSet},
118    fmt::{Debug, Display, Formatter},
119    future::Future,
120    sync::Arc,
121    time::Duration,
122};
123use temporalio_client::{Client, ClientOptions, NamespacedClient};
124use temporalio_common::{
125    ActivityDefinition, WorkflowDefinition,
126    data_converters::{DataConverter, SerializationContext, SerializationContextData},
127    payload_visitor::{decode_payloads, encode_payloads},
128    protos::{
129        TaskToken,
130        coresdk::{
131            ActivityTaskCompletion, AsJsonPayloadExt,
132            activity_result::ActivityExecutionResult,
133            activity_task::{ActivityTask, activity_task},
134            workflow_activation::{WorkflowActivation, workflow_activation_job::Variant},
135            workflow_completion::WorkflowActivationCompletion,
136        },
137        temporal::api::{common::v1::Payload, enums::v1::WorkflowTaskFailedCause},
138    },
139    worker::{WorkerDeploymentOptions, WorkerTaskTypes, build_id_from_current_exe},
140};
141use temporalio_sdk_core::{
142    CoreRuntime, PollError, PollerBehavior, TunerBuilder, Worker as CoreWorker, WorkerConfig,
143    WorkerTuner, WorkerVersioningStrategy, WorkflowErrorType, init_worker,
144};
145use temporalio_workflow::runtime::entry::WorkflowImplementation;
146use tokio::sync::{
147    Notify,
148    mpsc::{UnboundedSender, unbounded_channel},
149};
150use tokio_stream::wrappers::UnboundedReceiverStream;
151use tokio_util::sync::CancellationToken;
152use tracing::{Instrument, Span, field};
153use uuid::Uuid;
154
155/// Contains options for configuring a worker.
156#[derive(bon::Builder, Clone)]
157#[builder(start_fn = new, on(String, into), state_mod(vis = "pub"))]
158#[non_exhaustive]
159pub struct WorkerOptions {
160    /// What task queue will this worker poll from? This task queue name will be used for both
161    /// workflow and activity polling.
162    #[builder(start_fn)]
163    pub task_queue: String,
164
165    #[builder(field)]
166    activities: ActivityDefinitions,
167
168    #[builder(field)]
169    workflows: WorkflowDefinitions,
170
171    #[cfg(feature = "wasm-workflows")]
172    #[builder(field)]
173    wasm_workflow_components: Vec<WasmWorkflowComponent>,
174
175    /// Set the deployment options for this worker. Defaults to a hash of the currently running
176    /// executable.
177    #[builder(default = def_build_id())]
178    pub deployment_options: WorkerDeploymentOptions,
179    /// A human-readable string that can identify this worker. If set, overrides the identity on
180    /// the client used by this worker. If unset and the client has no identity, defaults to
181    /// `{pid}@{hostname}`.
182    pub client_identity_override: Option<String>,
183    /// If set nonzero, workflows will be cached and sticky task queues will be used, meaning that
184    /// history updates are applied incrementally to suspended instances of workflow execution.
185    /// Workflows are evicted according to a least-recently-used policy once the cache maximum is
186    /// reached. Workflows may also be explicitly evicted at any time, or as a result of errors
187    /// or failures.
188    #[builder(default = 1000)]
189    pub max_cached_workflows: usize,
190    /// Set a [crate::WorkerTuner] for this worker, which controls how many slots are available for
191    /// the different kinds of tasks.
192    #[builder(default = Arc::new(TunerBuilder::default().build()))]
193    pub tuner: Arc<dyn WorkerTuner + Send + Sync>,
194    /// Controls how polling for Workflow tasks will happen on this worker's task queue. See also
195    /// [WorkerConfig::nonsticky_to_sticky_poll_ratio]. If using SimpleMaximum, Must be at least 2
196    /// when `max_cached_workflows` > 0, or is an error.
197    #[builder(default = PollerBehavior::SimpleMaximum(5))]
198    pub workflow_task_poller_behavior: PollerBehavior,
199    /// Only applies when using [PollerBehavior::SimpleMaximum]
200    ///
201    /// (max workflow task polls * this number) = the number of max pollers that will be allowed for
202    /// the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky
203    /// queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for
204    /// either poller is 1, so if the maximum allowed is 1 and sticky queues are enabled, there will
205    /// be 2 concurrent polls.
206    #[builder(default = 0.2)]
207    pub nonsticky_to_sticky_poll_ratio: f32,
208    /// Controls how polling for Activity tasks will happen on this worker's task queue.
209    #[builder(default = PollerBehavior::SimpleMaximum(5))]
210    pub activity_task_poller_behavior: PollerBehavior,
211    /// Controls how polling for Nexus tasks will happen on this worker's task queue.
212    #[builder(default = PollerBehavior::SimpleMaximum(5))]
213    pub nexus_task_poller_behavior: PollerBehavior,
214    // TODO [rust-sdk-branch]: Will go away once workflow registration can only happen in here.
215    //   Then it can be auto-determined.
216    /// Specifies which task types this worker will poll for.
217    ///
218    /// Note: At least one task type must be specified or the worker will fail validation.
219    #[builder(default = WorkerTaskTypes::all())]
220    pub task_types: WorkerTaskTypes,
221    /// How long a workflow task is allowed to sit on the sticky queue before it is timed out
222    /// and moved to the non-sticky queue where it may be picked up by any worker.
223    #[builder(default = Duration::from_secs(10))]
224    pub sticky_queue_schedule_to_start_timeout: Duration,
225    /// Longest interval for throttling activity heartbeats
226    #[builder(default = Duration::from_secs(60))]
227    pub max_heartbeat_throttle_interval: Duration,
228    /// Default interval for throttling activity heartbeats in case
229    /// `ActivityOptions.heartbeat_timeout` is unset.
230    /// When the timeout *is* set in the `ActivityOptions`, throttling is set to
231    /// `heartbeat_timeout * 0.8`.
232    #[builder(default = Duration::from_secs(30))]
233    pub default_heartbeat_throttle_interval: Duration,
234    /// Sets the maximum number of activities per second the task queue will dispatch, controlled
235    /// server-side. Note that this only takes effect upon an activity poll request. If multiple
236    /// workers on the same queue have different values set, they will thrash with the last poller
237    /// winning.
238    ///
239    /// Setting this to a nonzero value will also disable eager activity execution.
240    pub max_task_queue_activities_per_second: Option<f64>,
241    /// Limits the number of activities per second that this worker will process. The worker will
242    /// not poll for new activities if by doing so it might receive and execute an activity which
243    /// would cause it to exceed this limit. Negative, zero, or NaN values will cause building
244    /// the options to fail.
245    pub max_worker_activities_per_second: Option<f64>,
246    /// Any error types listed here will cause any workflow being processed by this worker to fail,
247    /// rather than simply failing the workflow task.
248    #[builder(default)]
249    pub workflow_failure_errors: HashSet<WorkflowErrorType>,
250    /// Like [WorkerConfig::workflow_failure_errors], but specific to certain workflow types (the
251    /// map key).
252    #[builder(default)]
253    pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
254    /// If set, the worker will issue cancels for all outstanding activities and nexus operations after
255    /// shutdown has been initiated and this amount of time has elapsed.
256    pub graceful_shutdown_period: Option<Duration>,
257    /// Detect nondeterministic async usage in workflow code. When enabled (the default), workflows
258    /// that use external async operations (tokio timers, IO, spawned threads, raw tokio::sync
259    /// channels, etc.) will have their tasks failed with a descriptive error.
260    #[builder(default = true)]
261    pub detect_nondeterministic_futures: bool,
262}
263
264impl<S: worker_options_builder::State> WorkerOptionsBuilder<S> {
265    /// Registers all activities on an activity implementer.
266    pub fn register_activities<AI: ActivityImplementer>(mut self, instance: AI) -> Self {
267        self.activities.register_activities::<AI>(instance);
268        self
269    }
270    /// Registers a specific activitiy.
271    pub fn register_activity<AD>(mut self, instance: Arc<AD::Implementer>) -> Self
272    where
273        AD: ActivityDefinition + ExecutableActivity,
274        AD::Input: Send + Sync,
275        AD::Output: Send + Sync,
276    {
277        self.activities.register_activity::<AD>(instance);
278        self
279    }
280
281    /// Registers all workflows on a workflow implementer.
282    pub fn register_workflow<W>(mut self) -> Result<Self, WorkflowRegistrationError>
283    where
284        W: WorkflowImplementation,
285        <W::Run as WorkflowDefinition>::Input: Send,
286    {
287        self.workflows.register_workflow::<W>()?;
288        Ok(self)
289    }
290
291    /// Register a workflow with a custom factory for instance creation.
292    ///
293    /// # Warning: Advanced Usage
294    ///
295    /// This method is intended for scenarios requiring injection of un-serializable
296    /// state into workflows.
297    ///
298    /// **This can easily cause nondeterminism**
299    ///
300    /// Only use when you understand the implications and have a specific need that cannot be met
301    /// otherwise.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if a workflow with the same type is already registered, or if the workflow
306    /// type defines an `#[init]` method. Workflows using factory registration must not have
307    /// `#[init]` to avoid ambiguity about instance creation.
308    pub fn register_workflow_with_factory<W, F>(
309        mut self,
310        factory: F,
311    ) -> Result<Self, WorkflowRegistrationError>
312    where
313        W: WorkflowImplementation,
314        <W::Run as WorkflowDefinition>::Input: Send,
315        F: Fn() -> W + Send + Sync + 'static,
316    {
317        self.workflows
318            .register_workflow_run_with_factory::<W, F>(factory)?;
319        Ok(self)
320    }
321
322    /// Register a prebuilt WASM workflow component that exports one or more workflows.
323    #[cfg(feature = "wasm-workflows")]
324    pub fn register_wasm_workflow(mut self, component: WasmWorkflowComponent) -> Self {
325        self.wasm_workflow_components.push(component);
326        self
327    }
328}
329
330// Needs to exist to avoid https://github.com/elastio/bon/issues/359
331fn def_build_id() -> WorkerDeploymentOptions {
332    WorkerDeploymentOptions::from_build_id(build_id_from_current_exe().to_owned())
333}
334
335impl WorkerOptions {
336    /// Registers all activities on an activity implementer.
337    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
338        self.activities.register_activities::<AI>(instance);
339        self
340    }
341    /// Registers a specific activitiy.
342    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
343    where
344        AD: ActivityDefinition + ExecutableActivity,
345        AD::Input: Send + Sync,
346        AD::Output: Send + Sync,
347    {
348        self.activities.register_activity::<AD>(instance);
349        self
350    }
351    /// Returns all the registered activities by cloning the current set.
352    pub fn activities(&self) -> ActivityDefinitions {
353        self.activities.clone()
354    }
355
356    /// Registers all workflows on a workflow implementer.
357    pub fn register_workflow<W>(&mut self) -> Result<&mut Self, WorkflowRegistrationError>
358    where
359        W: WorkflowImplementation,
360        <W::Run as WorkflowDefinition>::Input: Send,
361    {
362        self.workflows.register_workflow::<W>()?;
363        Ok(self)
364    }
365
366    /// Register a workflow with a custom factory for instance creation.
367    ///
368    /// # Warning: Advanced Usage
369    /// See [WorkerOptionsBuilder::register_workflow_with_factory] for more.
370    pub fn register_workflow_with_factory<W, F>(
371        &mut self,
372        factory: F,
373    ) -> Result<&mut Self, WorkflowRegistrationError>
374    where
375        W: WorkflowImplementation,
376        <W::Run as WorkflowDefinition>::Input: Send,
377        F: Fn() -> W + Send + Sync + 'static,
378    {
379        self.workflows
380            .register_workflow_run_with_factory::<W, F>(factory)?;
381        Ok(self)
382    }
383
384    /// Register a prebuilt WASM workflow component that exports one or more workflows.
385    #[cfg(feature = "wasm-workflows")]
386    pub fn register_wasm_workflow(&mut self, component: WasmWorkflowComponent) -> &mut Self {
387        self.wasm_workflow_components.push(component);
388        self
389    }
390
391    /// Returns all the registered workflows by cloning the current set.
392    pub fn workflows(&self) -> WorkflowDefinitions {
393        self.workflows.clone()
394    }
395
396    #[doc(hidden)]
397    pub fn to_core_options(
398        &self,
399        namespace: String,
400        connection_identity: String,
401    ) -> Result<WorkerConfig, String> {
402        WorkerConfig::builder()
403            .namespace(namespace)
404            .task_queue(self.task_queue.clone())
405            .maybe_client_identity_override(self.client_identity_override.clone().or_else(|| {
406                connection_identity.is_empty().then(|| {
407                    format!(
408                        "{}@{}",
409                        std::process::id(),
410                        gethostname::gethostname().to_string_lossy()
411                    )
412                })
413            }))
414            .max_cached_workflows(self.max_cached_workflows)
415            .tuner(self.tuner.clone())
416            .workflow_task_poller_behavior(self.workflow_task_poller_behavior)
417            .activity_task_poller_behavior(self.activity_task_poller_behavior)
418            .nexus_task_poller_behavior(self.nexus_task_poller_behavior)
419            .task_types(self.task_types)
420            .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
421            .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
422            .default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
423            .maybe_max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
424            .maybe_max_worker_activities_per_second(self.max_worker_activities_per_second)
425            .maybe_graceful_shutdown_period(self.graceful_shutdown_period)
426            .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
427                self.deployment_options.clone(),
428            ))
429            .workflow_failure_errors(self.workflow_failure_errors.clone())
430            .workflow_types_to_failure_errors(self.workflow_types_to_failure_errors.clone())
431            .build()
432    }
433}
434
435/// A worker that can poll for and respond to workflow tasks by using
436/// [temporalio_macros::workflow], and activity tasks by using activities defined with
437/// [temporalio_macros::activities].
438pub struct Worker {
439    common: CommonWorker,
440    workflow_half: WorkflowHalf,
441    activity_half: ActivityHalf,
442}
443
444struct CommonWorker {
445    worker: Arc<CoreWorker>,
446    task_queue: String,
447    worker_interceptor: Option<Box<dyn WorkerInterceptor>>,
448    activity_inbound_interceptors: Vec<Arc<dyn ActivityInboundInterceptor>>,
449    client_options: ClientOptions,
450    data_converter: DataConverter,
451}
452
453struct WorkflowHalf {
454    /// Maps run id to cached workflow state
455    workflows: RefCell<HashMap<String, WorkflowData>>,
456    workflow_definitions: WorkflowDefinitions,
457    workflow_removed_from_map: Notify,
458    detect_nondeterministic_futures: bool,
459}
460struct WorkflowData {
461    /// Channel used to send the workflow activations
462    activation_chan: UnboundedSender<WorkflowActivation>,
463}
464
465struct WorkflowFutureHandle<F: Future> {
466    join_handle: F,
467    run_id: String,
468}
469
470#[derive(Default)]
471struct ActivityHalf {
472    /// Maps activity type to the function for executing activities of that type
473    activities: ActivityDefinitions,
474    task_tokens_to_cancels: HashMap<TaskToken, CancellationToken>,
475}
476
477#[derive(Debug, thiserror::Error)]
478enum ActivityTaskHandlerError {
479    #[error("{source}")]
480    UnregisteredActivity {
481        source: ActivityNotRegisteredError,
482        task_token: Vec<u8>,
483    },
484    #[error(transparent)]
485    Fatal(#[from] anyhow::Error),
486}
487
488#[derive(Debug, thiserror::Error)]
489enum ActivityNotRegisteredError {
490    #[error(
491        "Activity {activity_type} is not registered on this worker, available activities: {}",
492        .available_activities.join(", ")
493    )]
494    HasAvailable {
495        activity_type: String,
496        available_activities: Vec<&'static str>,
497    },
498    #[error("Activity {activity_type} is not registered on this worker, no available activities.")]
499    NoAvailable { activity_type: String },
500}
501
502impl ActivityNotRegisteredError {
503    fn new(activity_type: String, available_activities: Vec<&'static str>) -> Self {
504        if available_activities.is_empty() {
505            Self::NoAvailable { activity_type }
506        } else {
507            Self::HasAvailable {
508                activity_type,
509                available_activities,
510            }
511        }
512    }
513}
514
515impl Worker {
516    /// Create a new worker from an existing client, and options.
517    pub fn new(
518        runtime: &CoreRuntime,
519        client: Client,
520        options: WorkerOptions,
521    ) -> Result<Self, Box<dyn std::error::Error>> {
522        let wc = options
523            .to_core_options(client.namespace(), client.identity())
524            .map_err(|s| anyhow::anyhow!("{s}"))?;
525        let core = init_worker(runtime, wc, client.connection().clone())?;
526        Self::new_from_core_options(Arc::new(core), client.options().clone(), options)
527    }
528
529    // TODO [rust-sdk-branch]: Eliminate this constructor in favor of passing in fake connection
530    #[doc(hidden)]
531    pub fn new_from_core(worker: Arc<CoreWorker>, data_converter: DataConverter) -> Self {
532        let client_options = ClientOptions::new(worker.get_config().namespace.clone())
533            .data_converter(data_converter)
534            .build();
535        Self::new_from_core_definitions(
536            worker,
537            client_options,
538            Default::default(),
539            Default::default(),
540        )
541    }
542
543    // TODO [rust-sdk-branch]: Eliminate this constructor in favor of passing in fake connection
544    #[doc(hidden)]
545    pub fn new_from_core_options(
546        worker: Arc<CoreWorker>,
547        client_options: ClientOptions,
548        mut options: WorkerOptions,
549    ) -> Result<Self, Box<dyn std::error::Error>> {
550        let acts = std::mem::take(&mut options.activities);
551        let wfs = std::mem::take(&mut options.workflows);
552        #[cfg(feature = "wasm-workflows")]
553        let wasm_components = std::mem::take(&mut options.wasm_workflow_components);
554        let mut me = Self::new_from_core_definitions(worker, client_options, acts, wfs);
555        me.set_detect_nondeterministic_futures(options.detect_nondeterministic_futures);
556        #[cfg(feature = "wasm-workflows")]
557        me.workflow_half
558            .workflow_definitions
559            .register_wasm_workflows(wasm_components)?;
560        Ok(me)
561    }
562
563    fn new_from_core_definitions(
564        worker: Arc<CoreWorker>,
565        client_options: ClientOptions,
566        activities: ActivityDefinitions,
567        workflows: WorkflowDefinitions,
568    ) -> Self {
569        let data_converter = client_options.data_converter.clone();
570        Self {
571            common: CommonWorker {
572                task_queue: worker.get_config().task_queue.clone(),
573                worker,
574                worker_interceptor: None,
575                activity_inbound_interceptors: Vec::new(),
576                client_options,
577                data_converter,
578            },
579            workflow_half: WorkflowHalf {
580                workflows: Default::default(),
581                workflow_definitions: workflows,
582                workflow_removed_from_map: Default::default(),
583                detect_nondeterministic_futures: false,
584            },
585            activity_half: ActivityHalf {
586                activities,
587                ..Default::default()
588            },
589        }
590    }
591
592    /// Returns the task queue name this worker polls on
593    pub fn task_queue(&self) -> &str {
594        &self.common.task_queue
595    }
596
597    #[doc(hidden)]
598    /// Set whether nondeterministic future detection is enabled for workflows on this worker. Users
599    /// should use [WorkerOptions] to set this. TODO: Only needs to exist due to test setup.
600    pub fn set_detect_nondeterministic_futures(&mut self, enabled: bool) {
601        self.workflow_half.detect_nondeterministic_futures = enabled;
602    }
603
604    /// Return a handle that can be used to initiate shutdown. This is useful because [Worker::run]
605    /// takes self mutably, so you may want to obtain a handle for shutting down before running.
606    pub fn shutdown_handle(&self) -> impl Fn() + use<> {
607        let w = self.common.worker.clone();
608        move || w.initiate_shutdown()
609    }
610
611    /// Registers all activities on an activity implementer.
612    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
613        self.activity_half
614            .activities
615            .register_activities::<AI>(instance);
616        self
617    }
618    /// Registers a specific activitiy.
619    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
620    where
621        AD: ActivityDefinition + ExecutableActivity,
622        AD::Input: Send + Sync,
623        AD::Output: Send + Sync,
624    {
625        self.activity_half
626            .activities
627            .register_activity::<AD>(instance);
628        self
629    }
630
631    /// Registers all workflows on a workflow implementer.
632    pub fn register_workflow<W>(&mut self) -> Result<&mut Self, WorkflowRegistrationError>
633    where
634        W: WorkflowImplementation,
635        <W::Run as WorkflowDefinition>::Input: Send,
636    {
637        self.workflow_half
638            .workflow_definitions
639            .register_workflow::<W>()?;
640        Ok(self)
641    }
642
643    /// Register a workflow with a custom factory for instance creation.
644    ///
645    /// See [WorkerOptionsBuilder::register_workflow_with_factory] for more.
646    pub fn register_workflow_with_factory<W, F>(
647        &mut self,
648        factory: F,
649    ) -> Result<&mut Self, WorkflowRegistrationError>
650    where
651        W: WorkflowImplementation,
652        <W::Run as WorkflowDefinition>::Input: Send,
653        F: Fn() -> W + Send + Sync + 'static,
654    {
655        self.workflow_half
656            .workflow_definitions
657            .register_workflow_run_with_factory::<W, F>(factory)?;
658        Ok(self)
659    }
660
661    /// Runs the worker. Eventually resolves after the worker has been explicitly shut down,
662    /// or may return early with an error in the event of some unresolvable problem.
663    pub async fn run(&mut self) -> Result<(), anyhow::Error> {
664        let shutdown_token = CancellationToken::new();
665        let (common, wf_half, act_half) = self.split_apart();
666        let (wf_future_tx, wf_future_rx) =
667            unbounded_channel::<WorkflowFutureHandle<TaskHandle<WorkflowResult<Payload>>>>();
668        let (completions_tx, completions_rx) = unbounded_channel();
669
670        // Workflows run in a LocalSet because they use Rc<RefCell> for state management.
671        // This allows them to not require Send/Sync bounds. The WorkflowExecutor replaces
672        // tokio::task::spawn_local for workflow tasks and provides custom wakers for
673        // nondeterminism detection.
674        let workflow_local_set = tokio::task::LocalSet::new();
675        let executor = WorkflowExecutor::new();
676
677        let wf_future_joiner = async {
678            UnboundedReceiverStream::new(wf_future_rx)
679                .map(Result::<_, anyhow::Error>::Ok)
680                .try_for_each_concurrent(
681                    None,
682                    |WorkflowFutureHandle {
683                         join_handle,
684                         run_id,
685                     }| {
686                        let wf_half = &*wf_half;
687                        async move {
688                            let result = join_handle.await.map_err(anyhow::Error::new)?;
689                            // Eviction is normal workflow lifecycle - workflows loop waiting for
690                            // eviction after completion to manage cache cleanup
691                            if let Err(e) = result
692                                && !matches!(e, WorkflowTermination::Evicted)
693                            {
694                                return Err(anyhow::Error::new(e));
695                            }
696                            debug!(run_id=%run_id, "Removing workflow from cache");
697                            wf_half.workflows.borrow_mut().remove(&run_id);
698                            wf_half.workflow_removed_from_map.notify_one();
699                            Ok(())
700                        }
701                    },
702                )
703                .await
704                .context("Workflow futures encountered an error")
705        };
706        let wf_completion_processor = async {
707            UnboundedReceiverStream::new(completions_rx)
708                .map(Ok)
709                .try_for_each_concurrent(None, |mut completion| async {
710                    encode_payloads(
711                        &mut completion,
712                        common.data_converter.codec(),
713                        &SerializationContextData::Workflow,
714                    )
715                    .await;
716                    if let Some(ref i) = common.worker_interceptor {
717                        i.on_workflow_activation_completion(&completion).await;
718                    }
719                    common.worker.complete_workflow_activation(completion).await
720                })
721                .map_err(anyhow::Error::from)
722                .await
723                .context("Workflow completions processor encountered an error")
724        };
725        tokio::try_join!(
726            // Workflow-related tasks run inside LocalSet (allows !Send futures)
727            async {
728                workflow_local_set.run_until(async {
729                    tokio::try_join!(
730                        // Workflow polling loop
731                        async {
732                            loop {
733                            let mut activation =
734                                match common.worker.poll_workflow_activation().await {
735                                    Err(PollError::ShutDown) => {
736                                        break;
737                                    }
738                                    o => o?,
739                                };
740                            decode_payloads(
741                                &mut activation,
742                                common.data_converter.codec(),
743                                &SerializationContextData::Workflow,
744                            )
745                            .await;
746                            if let Some(ref i) = common.worker_interceptor {
747                                i.on_workflow_activation(&activation).await?;
748                            }
749                            if let Some(wf_fut) = wf_half
750                                .workflow_activation_handler(
751                                    common,
752                                    shutdown_token.clone(),
753                                    activation,
754                                    &completions_tx,
755                                    &executor,
756                                )
757                                .await?
758                                && wf_future_tx.send(wf_fut).is_err()
759                            {
760                                panic!(
761                                    "Receive half of completion processor channel cannot be dropped"
762                                );
763                            }
764                            // Drive the executor so spawned tasks and sent activations make
765                            // progress.
766                            executor.process_tasks();
767                        }
768                        // Tell still-alive workflows to evict themselves
769                        shutdown_token.cancel();
770                        // It's important to drop these so the future and completion processors will
771                        // terminate.
772                        drop(wf_future_tx);
773                        drop(completions_tx);
774                        executor.shutdown().await;
775                        Result::<_, anyhow::Error>::Ok(())
776                    },
777                    wf_future_joiner,
778                )
779                }).await
780            },
781            // Only poll on the activity queue if activity functions have been registered. This
782            // makes tests which use mocks dramatically more manageable.
783            async {
784                if !act_half.activities.is_empty() {
785                    loop {
786                        let activity = common.worker.poll_activity_task().await;
787                        if matches!(activity, Err(PollError::ShutDown)) {
788                            break;
789                        }
790                        let mut activity = activity?;
791                        decode_payloads(
792                            &mut activity,
793                            common.data_converter.codec(),
794                            &SerializationContextData::Activity,
795                        )
796                        .await;
797                        match act_half.activity_task_handler(
798                            common.worker.clone(),
799                            common.client_options.clone(),
800                            common.task_queue.clone(),
801                            common.data_converter.clone(),
802                            common.activity_inbound_interceptors.clone(),
803                            activity,
804                        ) {
805                            Ok(()) => {}
806                            Err(ActivityTaskHandlerError::UnregisteredActivity {
807                                source,
808                                task_token,
809                            }) => {
810                                let failure = common.data_converter.to_failure(
811                                    &SerializationContextData::Activity,
812                                    OutgoingError::Activity(OutgoingActivityError::Application(
813                                        ApplicationFailure::builder(source)
814                                            .type_name("NotFoundError".to_owned())
815                                            .build()
816                                            .into(),
817                                    )),
818                                );
819                                let mut completion = ActivityTaskCompletion {
820                                    task_token,
821                                    result: Some(ActivityExecutionResult::fail(failure)),
822                                };
823                                encode_payloads(
824                                    &mut completion,
825                                    common.data_converter.codec(),
826                                    &SerializationContextData::Activity,
827                                )
828                                .await;
829                                common.worker.complete_activity_task(completion).await?;
830                            }
831                            Err(ActivityTaskHandlerError::Fatal(err)) => return Err(err),
832                        };
833                    }
834                };
835                Result::<_, anyhow::Error>::Ok(())
836            },
837            wf_completion_processor,
838        )?;
839
840        if let Some(i) = self.common.worker_interceptor.as_ref() {
841            i.on_shutdown(self);
842        }
843        self.common.worker.shutdown().await;
844        Ok(())
845    }
846
847    /// Set a [WorkerInterceptor]
848    pub fn set_worker_interceptor(&mut self, interceptor: impl WorkerInterceptor + 'static) {
849        self.common.worker_interceptor = Some(Box::new(interceptor));
850    }
851
852    /// Append an [ActivityInboundInterceptor] to the chain. Interceptors run in the order they
853    /// are added, outer-most first.
854    pub fn add_activity_inbound_interceptor(
855        &mut self,
856        interceptor: impl ActivityInboundInterceptor,
857    ) {
858        self.common
859            .activity_inbound_interceptors
860            .push(Arc::new(interceptor));
861    }
862
863    /// Turns this rust worker into a new worker with all the same workflows and activities
864    /// registered, but with a new underlying core worker. Can be used to swap the worker for
865    /// a replay worker, change task queues, etc.
866    pub fn with_new_core_worker(&mut self, new_core_worker: Arc<CoreWorker>) {
867        self.common.worker = new_core_worker;
868    }
869
870    /// Returns number of currently cached workflows as understood by the SDK. Importantly, this
871    /// is not the same as understood by core, though they *should* always be in sync.
872    pub fn cached_workflows(&self) -> usize {
873        self.workflow_half.workflows.borrow().len()
874    }
875
876    /// Returns the instance key for this worker, used for worker heartbeating.
877    pub fn worker_instance_key(&self) -> Uuid {
878        self.common.worker.worker_instance_key()
879    }
880
881    #[doc(hidden)]
882    pub fn core_worker(&self) -> Arc<CoreWorker> {
883        self.common.worker.clone()
884    }
885
886    fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) {
887        (
888            &mut self.common,
889            &mut self.workflow_half,
890            &mut self.activity_half,
891        )
892    }
893}
894
895impl WorkflowHalf {
896    #[allow(clippy::type_complexity)]
897    async fn workflow_activation_handler(
898        &self,
899        common: &CommonWorker,
900        shutdown_token: CancellationToken,
901        mut activation: WorkflowActivation,
902        completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
903        executor: &WorkflowExecutor,
904    ) -> Result<Option<WorkflowFutureHandle<TaskHandle<WorkflowResult<Payload>>>>, anyhow::Error>
905    {
906        let mut res = None;
907        let run_id = activation.run_id.clone();
908
909        // If the activation is to init a workflow, create a new workflow driver for it,
910        // using the function associated with that workflow id
911        if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
912            Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
913            _ => None,
914        }) {
915            let workflow_type = sw.workflow_type.clone();
916            let (wff, activations) = {
917                if let Some(factory) = self.workflow_definitions.get_workflow(&workflow_type) {
918                    match start_workflow(
919                        factory,
920                        common.worker.get_config().namespace.clone(),
921                        common.task_queue.clone(),
922                        run_id.clone(),
923                        std::mem::take(sw),
924                        completions_tx.clone(),
925                        common.data_converter.clone(),
926                        self.detect_nondeterministic_futures,
927                    ) {
928                        Ok(result) => result,
929                        Err(e) => {
930                            warn!("Failed to create workflow {workflow_type}: {e}");
931                            completions_tx
932                                .send(WorkflowActivationCompletion::fail(
933                                    run_id,
934                                    format!("Failed to create workflow: {e}").into(),
935                                    Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
936                                ))
937                                .expect("Completion channel intact");
938                            return Ok(None);
939                        }
940                    }
941                } else {
942                    warn!("Workflow type {workflow_type} not found");
943                    completions_tx
944                        .send(WorkflowActivationCompletion::fail(
945                            run_id,
946                            format!("Workflow type {workflow_type} not found").into(),
947                            Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
948                        ))
949                        .expect("Completion channel intact");
950                    return Ok(None);
951                }
952            };
953            // TODO [rust-sdk-branch]: Deadlock detection
954            let jh = executor.spawn(async move {
955                tokio::select! {
956                    r = wff.fuse() => r,
957                    // TODO: This probably shouldn't abort early, as it could cause an in-progress
958                    //  complete to abort. Send synthetic remove activation
959                    _ = shutdown_token.cancelled() => {
960                        Err(WorkflowTermination::Evicted)
961                    }
962                }
963            });
964            res = Some(WorkflowFutureHandle {
965                join_handle: jh,
966                run_id: run_id.clone(),
967            });
968            loop {
969                // It's possible that we've got a new initialize workflow action before the last
970                // future for this run finished evicting, as a result of how futures might be
971                // interleaved. In that case, just wait until it's not in the map, which should be
972                // a matter of only a few `poll` calls.
973                if self.workflows.borrow_mut().contains_key(&run_id) {
974                    self.workflow_removed_from_map.notified().await;
975                } else {
976                    break;
977                }
978            }
979            self.workflows.borrow_mut().insert(
980                run_id.clone(),
981                WorkflowData {
982                    activation_chan: activations,
983                },
984            );
985        }
986
987        // The activation is expected to apply to some workflow we know about. Use it to
988        // unblock things and advance the workflow.
989        if let Some(dat) = self.workflows.borrow_mut().get_mut(&run_id) {
990            dat.activation_chan
991                .send(activation)
992                .expect("Workflow should exist if we're sending it an activation");
993        } else {
994            // When we failed to start a workflow, we never inserted it into the cache. But core
995            // sends us a `RemoveFromCache` job when we mark the StartWorkflow workflow activation
996            // as a failure, which we need to complete. Other SDKs add the workflow to the cache
997            // even when the workflow type is unknown/not found. To circumvent this, we simply mark
998            // any RemoveFromCache job for workflows that are not in the cache as complete.
999            if activation.jobs.len() == 1
1000                && matches!(
1001                    activation.jobs.first().map(|j| &j.variant),
1002                    Some(Some(Variant::RemoveFromCache(_)))
1003                )
1004            {
1005                completions_tx
1006                    .send(WorkflowActivationCompletion::from_cmds(run_id, vec![]))
1007                    .expect("Completion channel intact");
1008                return Ok(None);
1009            }
1010
1011            // In all other cases, we want to error as the runtime could be in an inconsistent state
1012            // at this point.
1013            bail!("Got activation {activation:?} for unknown workflow {run_id}");
1014        };
1015
1016        Ok(res)
1017    }
1018}
1019
1020impl ActivityHalf {
1021    /// Spawns off a task to handle the provided activity task
1022    fn activity_task_handler(
1023        &mut self,
1024        worker: Arc<CoreWorker>,
1025        client_options: ClientOptions,
1026        task_queue: String,
1027        data_converter: DataConverter,
1028        activity_inbound_interceptors: Vec<Arc<dyn ActivityInboundInterceptor>>,
1029        activity: ActivityTask,
1030    ) -> Result<(), ActivityTaskHandlerError> {
1031        match activity.variant {
1032            Some(activity_task::Variant::Start(start)) => {
1033                let Some(act_fn) = self.activities.get(&start.activity_type) else {
1034                    let activity_type = start.activity_type.clone();
1035                    let source =
1036                        ActivityNotRegisteredError::new(activity_type, self.activities.names());
1037                    return Err(ActivityTaskHandlerError::UnregisteredActivity {
1038                        source,
1039                        task_token: activity.task_token,
1040                    });
1041                };
1042                let span = info_span!(
1043                    "RunActivity",
1044                    "otel.name" = format!("RunActivity:{}", start.activity_type),
1045                    "otel.kind" = "server",
1046                    "temporalActivityID" = start.activity_id,
1047                    "temporalWorkflowID" = field::Empty,
1048                    "temporalRunID" = field::Empty,
1049                );
1050                let ct = CancellationToken::new();
1051                let task_token = activity.task_token;
1052                self.task_tokens_to_cancels
1053                    .insert(task_token.clone().into(), ct.clone());
1054
1055                let (ctx, args) = ActivityContext::new(
1056                    worker.clone(),
1057                    client_options,
1058                    ct,
1059                    task_queue,
1060                    task_token.clone(),
1061                    start,
1062                );
1063                let codec_data_converter = data_converter.clone();
1064
1065                tokio::spawn(async move {
1066                    let act_fut = async move {
1067                        if let Some(info) = &ctx.info().workflow_execution {
1068                            Span::current()
1069                                .record("temporalWorkflowID", &info.workflow_id)
1070                                .record("temporalRunID", &info.run_id);
1071                        }
1072                        (act_fn)(args, data_converter, ctx, activity_inbound_interceptors).await
1073                    }
1074                    .instrument(span);
1075                    let result = act_fut.await;
1076                    let result = match result {
1077                        Ok(output) => {
1078                            // Codec application happens at the SDK/Core boundary, so activity
1079                            // implementations work with the payload converter directly.
1080                            let pc = codec_data_converter.payload_converter();
1081                            let ctx = SerializationContext {
1082                                data: &SerializationContextData::Activity,
1083                                converter: pc,
1084                            };
1085                            match output.serialize_payload(&ctx) {
1086                                Ok(payload) => ActivityExecutionResult::ok(payload),
1087                                Err(err) => {
1088                                    activity_error_to_core_result(&codec_data_converter, err.into())
1089                                }
1090                            }
1091                        }
1092                        Err(err) => activity_error_to_core_result(&codec_data_converter, err),
1093                    };
1094                    let mut completion = ActivityTaskCompletion {
1095                        task_token,
1096                        result: Some(result),
1097                    };
1098                    encode_payloads(
1099                        &mut completion,
1100                        codec_data_converter.codec(),
1101                        &SerializationContextData::Activity,
1102                    )
1103                    .await;
1104                    worker.complete_activity_task(completion).await?;
1105                    Ok::<_, anyhow::Error>(())
1106                });
1107            }
1108            Some(activity_task::Variant::Cancel(_)) => {
1109                if let Some(ct) = self
1110                    .task_tokens_to_cancels
1111                    .get(activity.task_token.as_slice())
1112                {
1113                    ct.cancel();
1114                }
1115            }
1116            None => {
1117                return Err(anyhow!("Undefined activity task variant").into());
1118            }
1119        }
1120        Ok(())
1121    }
1122}
1123
1124/// Activity functions may return these values when exiting
1125#[derive(Debug)]
1126pub enum ActExitValue<T> {
1127    /// Completion requires an asynchronous callback
1128    WillCompleteAsync,
1129    /// Finish with a result
1130    Normal(T),
1131}
1132
1133impl<T: AsJsonPayloadExt> From<T> for ActExitValue<T> {
1134    fn from(t: T) -> Self {
1135        Self::Normal(t)
1136    }
1137}
1138
1139/// Attempts to turn caught panics into something printable
1140fn panic_formatter(panic: Box<dyn Any>) -> Box<dyn Display> {
1141    _panic_formatter::<&str>(panic)
1142}
1143fn _panic_formatter<T: 'static + PrintablePanicType>(panic: Box<dyn Any>) -> Box<dyn Display> {
1144    match panic.downcast::<T>() {
1145        Ok(d) => d,
1146        Err(orig) => {
1147            if TypeId::of::<<T as PrintablePanicType>::NextType>()
1148                == TypeId::of::<EndPrintingAttempts>()
1149            {
1150                return Box::new("Couldn't turn panic into a string");
1151            }
1152            _panic_formatter::<T::NextType>(orig)
1153        }
1154    }
1155}
1156trait PrintablePanicType: Display {
1157    type NextType: PrintablePanicType;
1158}
1159
1160impl PrintablePanicType for &str {
1161    type NextType = String;
1162}
1163impl PrintablePanicType for String {
1164    type NextType = EndPrintingAttempts;
1165}
1166struct EndPrintingAttempts {}
1167impl Display for EndPrintingAttempts {
1168    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1169        write!(f, "Will never be printed")
1170    }
1171}
1172impl PrintablePanicType for EndPrintingAttempts {
1173    type NextType = EndPrintingAttempts;
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use super::*;
1179    use crate::activities::ActivityError;
1180    use temporalio_macros::{activities, activity_definitions, workflow, workflow_methods};
1181
1182    struct MyActivities {}
1183
1184    struct SharedActivities;
1185    #[activity_definitions]
1186    impl SharedActivities {
1187        #[activity(name = "shared-greet")]
1188        fn greet(name: String) -> Result<String, ActivityError> {
1189            unimplemented!()
1190        }
1191    }
1192
1193    #[activities]
1194    impl MyActivities {
1195        #[activity]
1196        async fn my_activity(_ctx: ActivityContext) -> Result<(), ActivityError> {
1197            Ok(())
1198        }
1199
1200        #[activity(definition = shared_activities::Greet)]
1201        async fn greet(_ctx: ActivityContext, name: String) -> Result<String, ActivityError> {
1202            Ok(name)
1203        }
1204
1205        #[activity]
1206        async fn takes_self(
1207            self: Arc<Self>,
1208            _ctx: ActivityContext,
1209            _: String,
1210        ) -> Result<(), ActivityError> {
1211            Ok(())
1212        }
1213    }
1214
1215    #[test]
1216    fn test_activity_registration() {
1217        let act_instance = MyActivities {};
1218        let _ = WorkerOptions::new("task_q").register_activities(act_instance);
1219    }
1220
1221    // Compile-only test for workflow context invocation
1222    #[allow(unused, clippy::diverging_sub_expression)]
1223    fn test_activity_via_workflow_context() {
1224        let wf_ctx: WorkflowContext<MyWorkflow> = unimplemented!();
1225        wf_ctx.start_activity(
1226            MyActivities::my_activity,
1227            (),
1228            ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1229        );
1230        wf_ctx.start_activity(
1231            SharedActivities::greet,
1232            "Hi".to_owned(),
1233            ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1234        );
1235        wf_ctx.start_activity(
1236            MyActivities::greet,
1237            "Hi".to_owned(),
1238            ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1239        );
1240        wf_ctx.start_activity(
1241            MyActivities::takes_self,
1242            "Hi".to_owned(),
1243            ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1244        );
1245    }
1246
1247    // Compile-only test for direct invocation via .run()
1248    #[allow(dead_code, unreachable_code, unused, clippy::diverging_sub_expression)]
1249    async fn test_activity_direct_invocation() {
1250        let ctx: ActivityContext = unimplemented!();
1251        let _result = MyActivities::my_activity.run(ctx).await;
1252    }
1253
1254    #[workflow]
1255    struct MyWorkflow {
1256        counter: u32,
1257    }
1258
1259    #[allow(dead_code)]
1260    #[workflow_methods]
1261    impl MyWorkflow {
1262        #[init]
1263        fn new(_ctx: &WorkflowContextView, _input: String) -> Self {
1264            Self { counter: 0 }
1265        }
1266
1267        #[run]
1268        async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
1269            Ok(format!("Counter: {}", ctx.state(|s| s.counter)))
1270        }
1271
1272        #[signal(name = "increment")]
1273        fn increment_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
1274            self.counter += amount;
1275        }
1276
1277        #[signal]
1278        async fn async_signal(_ctx: &mut WorkflowContext<Self>) {}
1279
1280        #[query]
1281        fn get_counter(&self, _ctx: &WorkflowContextView) -> u32 {
1282            self.counter
1283        }
1284
1285        #[update(name = "double")]
1286        fn double_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>) -> u32 {
1287            self.counter *= 2;
1288            self.counter
1289        }
1290
1291        #[update]
1292        async fn async_update(_ctx: &mut WorkflowContext<Self>, val: i32) -> i32 {
1293            val * 2
1294        }
1295    }
1296
1297    #[test]
1298    fn test_workflow_registration() {
1299        let _ = WorkerOptions::new("task_q")
1300            .register_workflow::<MyWorkflow>()
1301            .unwrap();
1302    }
1303
1304    #[test]
1305    fn duplicate_workflow_registration_errors() {
1306        let result = WorkerOptions::new("task_q")
1307            .register_workflow::<MyWorkflow>()
1308            .unwrap()
1309            .register_workflow::<MyWorkflow>();
1310
1311        let err = match result {
1312            Ok(_) => panic!("duplicate workflow registration should error"),
1313            Err(err) => err,
1314        };
1315        assert_eq!(
1316            err,
1317            WorkflowRegistrationError::DuplicateWorkflowType {
1318                workflow_type: "MyWorkflow".to_string()
1319            }
1320        );
1321    }
1322
1323    #[test]
1324    fn factory_registration_with_init_errors() {
1325        let result = WorkerOptions::new("task_q")
1326            .register_workflow_with_factory(|| MyWorkflow { counter: 0 });
1327
1328        let err = match result {
1329            Ok(_) => panic!("factory registration with #[init] should error"),
1330            Err(err) => err,
1331        };
1332        assert_eq!(
1333            err,
1334            WorkflowRegistrationError::FactoryRegistrationWithInit {
1335                workflow_type: "MyWorkflow".to_string()
1336            }
1337        );
1338    }
1339
1340    fn default_identity() -> String {
1341        format!(
1342            "{}@{}",
1343            std::process::id(),
1344            gethostname::gethostname().to_string_lossy()
1345        )
1346    }
1347
1348    #[rstest::rstest]
1349    #[case::default_when_none_provided(None, "", Some(default_identity()))]
1350    #[case::connection_identity_preserved(None, "conn-identity", None)]
1351    #[case::worker_override_takes_precedence(
1352        Some("worker-identity"),
1353        "conn-identity",
1354        Some("worker-identity".into())
1355    )]
1356    #[case::worker_override_with_empty_connection(
1357        Some("worker-identity"),
1358        "",
1359        Some("worker-identity".into())
1360    )]
1361    #[test]
1362    fn client_identity_resolution(
1363        #[case] worker_override: Option<&str>,
1364        #[case] connection_identity: &str,
1365        #[case] expected: Option<String>,
1366    ) {
1367        let opts = WorkerOptions::new("task_q")
1368            .task_types(WorkerTaskTypes::activity_only())
1369            .maybe_client_identity_override(worker_override.map(|s| s.to_owned()))
1370            .build();
1371        let config = opts
1372            .to_core_options("ns".into(), connection_identity.into())
1373            .unwrap();
1374        assert_eq!(config.client_identity_override, expected);
1375    }
1376}