Skip to main content

temporalio_sdk/
lib.rs

1#![warn(missing_docs)] // error if there are missing docs
2
3//! This crate defines an alpha-stage Temporal Rust SDK.
4//!
5//! Currently defining activities and running an activity-only worker is the most stable code.
6//! Workflow definitions exist and running a workflow worker works, but the API is still very
7//! unstable.
8//!
9//! An example of running an activity worker:
10//! ```no_run
11//! use std::str::FromStr;
12//! use temporalio_client::{Client, ClientOptions, Connection, ConnectionOptions};
13//! use temporalio_common::{
14//!     telemetry::TelemetryOptions,
15//!     worker::{WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerTaskTypes},
16//! };
17//! use temporalio_macros::activities;
18//! use temporalio_sdk::{
19//!     Worker, WorkerOptions,
20//!     activities::{ActivityContext, ActivityError},
21//! };
22//! use temporalio_sdk_core::{CoreRuntime, RuntimeOptions, Url};
23//!
24//! struct MyActivities;
25//!
26//! #[activities]
27//! impl MyActivities {
28//!     #[activity]
29//!     pub(crate) async fn echo(
30//!         _ctx: ActivityContext,
31//!         e: String,
32//!     ) -> Result<String, ActivityError> {
33//!         Ok(e)
34//!     }
35//! }
36//!
37//! #[tokio::main]
38//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
39//!     let connection_options =
40//!         ConnectionOptions::new(Url::from_str("http://localhost:7233")?).build();
41//!     let telemetry_options = TelemetryOptions::builder().build();
42//!     let runtime_options = RuntimeOptions::builder()
43//!         .telemetry_options(telemetry_options)
44//!         .build()?;
45//!     let runtime = CoreRuntime::new_assume_tokio(runtime_options)?;
46//!
47//!     let connection = Connection::connect(connection_options).await?;
48//!     let client = Client::new(connection, ClientOptions::new("my_namespace").build())?;
49//!
50//!     let worker_options = WorkerOptions::new("task_queue")
51//!         .task_types(WorkerTaskTypes::activity_only())
52//!         .deployment_options(WorkerDeploymentOptions {
53//!             version: WorkerDeploymentVersion {
54//!                 deployment_name: "my_deployment".to_owned(),
55//!                 build_id: "my_build_id".to_owned(),
56//!             },
57//!             use_worker_versioning: false,
58//!             default_versioning_behavior: None,
59//!         })
60//!         .register_activities(MyActivities)
61//!         .build();
62//!
63//!     let mut worker = Worker::new(&runtime, client, worker_options)?;
64//!     worker.run().await?;
65//!
66//!     Ok(())
67//! }
68//! ```
69
70#[macro_use]
71extern crate tracing;
72extern crate self as temporalio_sdk;
73
74pub mod activities;
75pub mod interceptors;
76mod workflow_context;
77mod workflow_executor;
78mod workflow_future;
79pub mod workflows;
80
81#[macro_export]
82#[doc(hidden)]
83macro_rules! __temporal_select {
84    ($($tokens:tt)*) => {
85        ::futures_util::select_biased! { $($tokens)* }
86    };
87}
88
89#[macro_export]
90#[doc(hidden)]
91macro_rules! __temporal_join {
92    ($($tokens:tt)*) => {
93        ::futures_util::join!($($tokens)*)
94    };
95}
96
97use workflow_future::WorkflowFunction;
98
99pub use temporalio_client::Namespace;
100pub use workflow_context::{
101    ActivityCloseTimeouts, ActivityExecutionError, ActivityOptions, BaseWorkflowContext,
102    CancellableFuture, ChildWorkflowExecutionError, ChildWorkflowOptions, ChildWorkflowSignalError,
103    ContinueAsNewOptions, ExternalWorkflowHandle, LocalActivityOptions, NexusOperationOptions,
104    ParentWorkflowInfo, RootWorkflowInfo, Signal, SignalData,
105    StartChildWorkflowExecutionFailedCause, StartedChildWorkflow, SyncWorkflowContext,
106    TimerOptions, WorkflowContext, WorkflowContextView,
107};
108
109use crate::{
110    activities::{
111        ActivityContext, ActivityDefinitions, ActivityError, ActivityImplementer,
112        ExecutableActivity,
113    },
114    interceptors::WorkerInterceptor,
115    workflow_context::{
116        ChildWfCommon, NexusUnblockData, PendingChildWorkflow, StartedNexusOperation,
117    },
118    workflow_executor::WorkflowExecutor,
119    workflows::{WorkflowDefinitions, WorkflowImplementation, WorkflowImplementer},
120};
121use anyhow::{Context, anyhow, bail};
122use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
123use std::{
124    any::{Any, TypeId},
125    cell::RefCell,
126    collections::{HashMap, HashSet},
127    fmt::{Debug, Display, Formatter},
128    future::Future,
129    marker::PhantomData,
130    panic::AssertUnwindSafe,
131    sync::Arc,
132    time::Duration,
133};
134use temporalio_client::{Client, NamespacedClient};
135use temporalio_common::{
136    ActivityDefinition, WorkflowDefinition,
137    data_converters::{DataConverter, SerializationContextData},
138    payload_visitor::{decode_payloads, encode_payloads},
139    protos::{
140        TaskToken,
141        coresdk::{
142            ActivityTaskCompletion, AsJsonPayloadExt,
143            activity_result::{ActivityExecutionResult, ActivityResolution},
144            activity_task::{ActivityTask, activity_task},
145            child_workflow::ChildWorkflowResult,
146            nexus::NexusOperationResult,
147            workflow_activation::{
148                WorkflowActivation,
149                resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
150                resolve_nexus_operation_start, workflow_activation_job::Variant,
151            },
152            workflow_commands::{
153                ContinueAsNewWorkflowExecution, WorkflowCommand, workflow_command,
154            },
155            workflow_completion::WorkflowActivationCompletion,
156        },
157        temporal::api::{
158            common::v1::Payload,
159            enums::v1::WorkflowTaskFailedCause,
160            failure::v1::{Failure, failure},
161        },
162    },
163    worker::{WorkerDeploymentOptions, WorkerTaskTypes, build_id_from_current_exe},
164};
165use temporalio_sdk_core::{
166    CoreRuntime, PollError, PollerBehavior, TunerBuilder, Worker as CoreWorker, WorkerConfig,
167    WorkerTuner, WorkerVersioningStrategy, WorkflowErrorType, init_worker,
168};
169use tokio::sync::{
170    Notify,
171    mpsc::{UnboundedSender, unbounded_channel},
172    oneshot,
173};
174use tokio_stream::wrappers::UnboundedReceiverStream;
175use tokio_util::sync::CancellationToken;
176use tracing::{Instrument, Span, field};
177use uuid::Uuid;
178
179/// Contains options for configuring a worker.
180#[derive(bon::Builder, Clone)]
181#[builder(start_fn = new, on(String, into), state_mod(vis = "pub"))]
182#[non_exhaustive]
183pub struct WorkerOptions {
184    /// What task queue will this worker poll from? This task queue name will be used for both
185    /// workflow and activity polling.
186    #[builder(start_fn)]
187    pub task_queue: String,
188
189    #[builder(field)]
190    activities: ActivityDefinitions,
191
192    #[builder(field)]
193    workflows: WorkflowDefinitions,
194
195    /// Set the deployment options for this worker. Defaults to a hash of the currently running
196    /// executable.
197    #[builder(default = def_build_id())]
198    pub deployment_options: WorkerDeploymentOptions,
199    /// A human-readable string that can identify this worker. If set, overrides the identity on
200    /// the client used by this worker. If unset and the client has no identity, defaults to
201    /// `{pid}@{hostname}`.
202    pub client_identity_override: Option<String>,
203    /// If set nonzero, workflows will be cached and sticky task queues will be used, meaning that
204    /// history updates are applied incrementally to suspended instances of workflow execution.
205    /// Workflows are evicted according to a least-recently-used policy once the cache maximum is
206    /// reached. Workflows may also be explicitly evicted at any time, or as a result of errors
207    /// or failures.
208    #[builder(default = 1000)]
209    pub max_cached_workflows: usize,
210    /// Set a [crate::WorkerTuner] for this worker, which controls how many slots are available for
211    /// the different kinds of tasks.
212    #[builder(default = Arc::new(TunerBuilder::default().build()))]
213    pub tuner: Arc<dyn WorkerTuner + Send + Sync>,
214    /// Controls how polling for Workflow tasks will happen on this worker's task queue. See also
215    /// [WorkerConfig::nonsticky_to_sticky_poll_ratio]. If using SimpleMaximum, Must be at least 2
216    /// when `max_cached_workflows` > 0, or is an error.
217    #[builder(default = PollerBehavior::SimpleMaximum(5))]
218    pub workflow_task_poller_behavior: PollerBehavior,
219    /// Only applies when using [PollerBehavior::SimpleMaximum]
220    ///
221    /// (max workflow task polls * this number) = the number of max pollers that will be allowed for
222    /// the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky
223    /// queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for
224    /// either poller is 1, so if the maximum allowed is 1 and sticky queues are enabled, there will
225    /// be 2 concurrent polls.
226    #[builder(default = 0.2)]
227    pub nonsticky_to_sticky_poll_ratio: f32,
228    /// Controls how polling for Activity tasks will happen on this worker's task queue.
229    #[builder(default = PollerBehavior::SimpleMaximum(5))]
230    pub activity_task_poller_behavior: PollerBehavior,
231    /// Controls how polling for Nexus tasks will happen on this worker's task queue.
232    #[builder(default = PollerBehavior::SimpleMaximum(5))]
233    pub nexus_task_poller_behavior: PollerBehavior,
234    // TODO [rust-sdk-branch]: Will go away once workflow registration can only happen in here.
235    //   Then it can be auto-determined.
236    /// Specifies which task types this worker will poll for.
237    ///
238    /// Note: At least one task type must be specified or the worker will fail validation.
239    #[builder(default = WorkerTaskTypes::all())]
240    pub task_types: WorkerTaskTypes,
241    /// How long a workflow task is allowed to sit on the sticky queue before it is timed out
242    /// and moved to the non-sticky queue where it may be picked up by any worker.
243    #[builder(default = Duration::from_secs(10))]
244    pub sticky_queue_schedule_to_start_timeout: Duration,
245    /// Longest interval for throttling activity heartbeats
246    #[builder(default = Duration::from_secs(60))]
247    pub max_heartbeat_throttle_interval: Duration,
248    /// Default interval for throttling activity heartbeats in case
249    /// `ActivityOptions.heartbeat_timeout` is unset.
250    /// When the timeout *is* set in the `ActivityOptions`, throttling is set to
251    /// `heartbeat_timeout * 0.8`.
252    #[builder(default = Duration::from_secs(30))]
253    pub default_heartbeat_throttle_interval: Duration,
254    /// Sets the maximum number of activities per second the task queue will dispatch, controlled
255    /// server-side. Note that this only takes effect upon an activity poll request. If multiple
256    /// workers on the same queue have different values set, they will thrash with the last poller
257    /// winning.
258    ///
259    /// Setting this to a nonzero value will also disable eager activity execution.
260    pub max_task_queue_activities_per_second: Option<f64>,
261    /// Limits the number of activities per second that this worker will process. The worker will
262    /// not poll for new activities if by doing so it might receive and execute an activity which
263    /// would cause it to exceed this limit. Negative, zero, or NaN values will cause building
264    /// the options to fail.
265    pub max_worker_activities_per_second: Option<f64>,
266    /// Any error types listed here will cause any workflow being processed by this worker to fail,
267    /// rather than simply failing the workflow task.
268    #[builder(default)]
269    pub workflow_failure_errors: HashSet<WorkflowErrorType>,
270    /// Like [WorkerConfig::workflow_failure_errors], but specific to certain workflow types (the
271    /// map key).
272    #[builder(default)]
273    pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
274    /// If set, the worker will issue cancels for all outstanding activities and nexus operations after
275    /// shutdown has been initiated and this amount of time has elapsed.
276    pub graceful_shutdown_period: Option<Duration>,
277    /// Detect nondeterministic async usage in workflow code. When enabled (the default), workflows
278    /// that use external async operations (tokio timers, IO, spawned threads, raw tokio::sync
279    /// channels, etc.) will have their tasks failed with a descriptive error.
280    #[builder(default = true)]
281    pub detect_nondeterministic_futures: bool,
282}
283
284impl<S: worker_options_builder::State> WorkerOptionsBuilder<S> {
285    /// Registers all activities on an activity implementer.
286    pub fn register_activities<AI: ActivityImplementer>(mut self, instance: AI) -> Self {
287        self.activities.register_activities::<AI>(instance);
288        self
289    }
290    /// Registers a specific activitiy.
291    pub fn register_activity<AD>(mut self, instance: Arc<AD::Implementer>) -> Self
292    where
293        AD: ActivityDefinition + ExecutableActivity,
294        AD::Output: Send + Sync,
295    {
296        self.activities.register_activity::<AD>(instance);
297        self
298    }
299
300    /// Registers all workflows on a workflow implementer.
301    pub fn register_workflow<WI: WorkflowImplementer>(mut self) -> Self {
302        self.workflows.register_workflow::<WI>();
303        self
304    }
305
306    /// Register a workflow with a custom factory for instance creation.
307    ///
308    /// # Warning: Advanced Usage
309    ///
310    /// This method is intended for scenarios requiring injection of un-serializable
311    /// state into workflows.
312    ///
313    /// **This can easily cause nondeterminism**
314    ///
315    /// Only use when you understand the implications and have a specific need that cannot be met
316    /// otherwise.
317    ///
318    /// # Panics
319    ///
320    /// Panics if the workflow type defines an `#[init]` method. Workflows using
321    /// factory registration must not have `#[init]` to avoid ambiguity about
322    /// instance creation.
323    pub fn register_workflow_with_factory<W, F>(mut self, factory: F) -> Self
324    where
325        W: WorkflowImplementation,
326        <W::Run as WorkflowDefinition>::Input: Send,
327        F: Fn() -> W + Send + Sync + 'static,
328    {
329        self.workflows
330            .register_workflow_run_with_factory::<W, F>(factory);
331        self
332    }
333}
334
335// Needs to exist to avoid https://github.com/elastio/bon/issues/359
336fn def_build_id() -> WorkerDeploymentOptions {
337    WorkerDeploymentOptions::from_build_id(build_id_from_current_exe().to_owned())
338}
339
340impl WorkerOptions {
341    /// Registers all activities on an activity implementer.
342    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
343        self.activities.register_activities::<AI>(instance);
344        self
345    }
346    /// Registers a specific activitiy.
347    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
348    where
349        AD: ActivityDefinition + ExecutableActivity,
350        AD::Output: Send + Sync,
351    {
352        self.activities.register_activity::<AD>(instance);
353        self
354    }
355    /// Returns all the registered activities by cloning the current set.
356    pub fn activities(&self) -> ActivityDefinitions {
357        self.activities.clone()
358    }
359
360    /// Registers all workflows on a workflow implementer.
361    pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
362        self.workflows.register_workflow::<WI>();
363        self
364    }
365
366    /// Register a workflow with a custom factory for instance creation.
367    ///
368    /// # Warning: Advanced Usage
369    /// See [WorkerOptionsBuilder::register_workflow_with_factory] for more.
370    pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
371    where
372        W: WorkflowImplementation,
373        <W::Run as WorkflowDefinition>::Input: Send,
374        F: Fn() -> W + Send + Sync + 'static,
375    {
376        self.workflows
377            .register_workflow_run_with_factory::<W, F>(factory);
378        self
379    }
380
381    /// Returns all the registered workflows by cloning the current set.
382    pub fn workflows(&self) -> WorkflowDefinitions {
383        self.workflows.clone()
384    }
385
386    #[doc(hidden)]
387    pub fn to_core_options(
388        &self,
389        namespace: String,
390        connection_identity: String,
391    ) -> Result<WorkerConfig, String> {
392        WorkerConfig::builder()
393            .namespace(namespace)
394            .task_queue(self.task_queue.clone())
395            .maybe_client_identity_override(self.client_identity_override.clone().or_else(|| {
396                connection_identity.is_empty().then(|| {
397                    format!(
398                        "{}@{}",
399                        std::process::id(),
400                        gethostname::gethostname().to_string_lossy()
401                    )
402                })
403            }))
404            .max_cached_workflows(self.max_cached_workflows)
405            .tuner(self.tuner.clone())
406            .workflow_task_poller_behavior(self.workflow_task_poller_behavior)
407            .activity_task_poller_behavior(self.activity_task_poller_behavior)
408            .nexus_task_poller_behavior(self.nexus_task_poller_behavior)
409            .task_types(self.task_types)
410            .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
411            .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
412            .default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
413            .maybe_max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
414            .maybe_max_worker_activities_per_second(self.max_worker_activities_per_second)
415            .maybe_graceful_shutdown_period(self.graceful_shutdown_period)
416            .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
417                self.deployment_options.clone(),
418            ))
419            .workflow_failure_errors(self.workflow_failure_errors.clone())
420            .workflow_types_to_failure_errors(self.workflow_types_to_failure_errors.clone())
421            .build()
422    }
423}
424
425/// A worker that can poll for and respond to workflow tasks by using
426/// [temporalio_macros::workflow], and activity tasks by using activities defined with
427/// [temporalio_macros::activities].
428pub struct Worker {
429    common: CommonWorker,
430    workflow_half: WorkflowHalf,
431    activity_half: ActivityHalf,
432}
433
434struct CommonWorker {
435    worker: Arc<CoreWorker>,
436    task_queue: String,
437    worker_interceptor: Option<Box<dyn WorkerInterceptor>>,
438    data_converter: DataConverter,
439}
440
441struct WorkflowHalf {
442    /// Maps run id to cached workflow state
443    workflows: RefCell<HashMap<String, WorkflowData>>,
444    workflow_definitions: WorkflowDefinitions,
445    workflow_removed_from_map: Notify,
446    detect_nondeterministic_futures: bool,
447}
448struct WorkflowData {
449    /// Channel used to send the workflow activations
450    activation_chan: UnboundedSender<WorkflowActivation>,
451}
452
453struct WorkflowFutureHandle<F: Future> {
454    join_handle: F,
455    run_id: String,
456}
457
458#[derive(Default)]
459struct ActivityHalf {
460    /// Maps activity type to the function for executing activities of that type
461    activities: ActivityDefinitions,
462    task_tokens_to_cancels: HashMap<TaskToken, CancellationToken>,
463}
464
465impl Worker {
466    /// Create a new worker from an existing connection, and options.
467    pub fn new(
468        runtime: &CoreRuntime,
469        client: Client,
470        mut options: WorkerOptions,
471    ) -> Result<Self, Box<dyn std::error::Error>> {
472        let acts = std::mem::take(&mut options.activities);
473        let wfs = std::mem::take(&mut options.workflows);
474        let wc = options
475            .to_core_options(client.namespace(), client.identity())
476            .map_err(|s| anyhow::anyhow!("{s}"))?;
477        let core = init_worker(runtime, wc, client.connection().clone())?;
478        let mut me = Self::new_from_core_definitions(
479            Arc::new(core),
480            client.data_converter().clone(),
481            Default::default(),
482            Default::default(),
483        );
484        me.set_detect_nondeterministic_futures(options.detect_nondeterministic_futures);
485        me.activity_half.activities = acts;
486        me.workflow_half.workflow_definitions = wfs;
487        Ok(me)
488    }
489
490    // TODO [rust-sdk-branch]: Eliminate this constructor in favor of passing in fake connection
491    #[doc(hidden)]
492    pub fn new_from_core(worker: Arc<CoreWorker>, data_converter: DataConverter) -> Self {
493        Self::new_from_core_definitions(
494            worker,
495            data_converter,
496            Default::default(),
497            Default::default(),
498        )
499    }
500
501    // TODO [rust-sdk-branch]: Eliminate this constructor in favor of passing in fake connection
502    #[doc(hidden)]
503    pub fn new_from_core_definitions(
504        worker: Arc<CoreWorker>,
505        data_converter: DataConverter,
506        activities: ActivityDefinitions,
507        workflows: WorkflowDefinitions,
508    ) -> Self {
509        Self {
510            common: CommonWorker {
511                task_queue: worker.get_config().task_queue.clone(),
512                worker,
513                worker_interceptor: None,
514                data_converter,
515            },
516            workflow_half: WorkflowHalf {
517                workflows: Default::default(),
518                workflow_definitions: workflows,
519                workflow_removed_from_map: Default::default(),
520                detect_nondeterministic_futures: false,
521            },
522            activity_half: ActivityHalf {
523                activities,
524                ..Default::default()
525            },
526        }
527    }
528
529    /// Returns the task queue name this worker polls on
530    pub fn task_queue(&self) -> &str {
531        &self.common.task_queue
532    }
533
534    #[doc(hidden)]
535    /// Set whether nondeterministic future detection is enabled for workflows on this worker. Users
536    /// should use [WorkerOptions] to set this. TODO: Only needs to exist due to test setup.
537    pub fn set_detect_nondeterministic_futures(&mut self, enabled: bool) {
538        self.workflow_half.detect_nondeterministic_futures = enabled;
539    }
540
541    /// Return a handle that can be used to initiate shutdown. This is useful because [Worker::run]
542    /// takes self mutably, so you may want to obtain a handle for shutting down before running.
543    pub fn shutdown_handle(&self) -> impl Fn() + use<> {
544        let w = self.common.worker.clone();
545        move || w.initiate_shutdown()
546    }
547
548    /// Registers all activities on an activity implementer.
549    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
550        self.activity_half
551            .activities
552            .register_activities::<AI>(instance);
553        self
554    }
555    /// Registers a specific activitiy.
556    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
557    where
558        AD: ActivityDefinition + ExecutableActivity,
559        AD::Output: Send + Sync,
560    {
561        self.activity_half
562            .activities
563            .register_activity::<AD>(instance);
564        self
565    }
566
567    /// Registers all workflows on a workflow implementer.
568    pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
569        self.workflow_half
570            .workflow_definitions
571            .register_workflow::<WI>();
572        self
573    }
574
575    /// Register a workflow with a custom factory for instance creation.
576    ///
577    /// See [WorkerOptionsBuilder::register_workflow_with_factory] for more.
578    pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
579    where
580        W: WorkflowImplementation,
581        <W::Run as WorkflowDefinition>::Input: Send,
582        F: Fn() -> W + Send + Sync + 'static,
583    {
584        self.workflow_half
585            .workflow_definitions
586            .register_workflow_run_with_factory::<W, F>(factory);
587        self
588    }
589
590    /// Runs the worker. Eventually resolves after the worker has been explicitly shut down,
591    /// or may return early with an error in the event of some unresolvable problem.
592    pub async fn run(&mut self) -> Result<(), anyhow::Error> {
593        let shutdown_token = CancellationToken::new();
594        let (common, wf_half, act_half) = self.split_apart();
595        let (wf_future_tx, wf_future_rx) = unbounded_channel::<
596            WorkflowFutureHandle<workflow_executor::TaskHandle<WorkflowResult<Payload>>>,
597        >();
598        let (completions_tx, completions_rx) = unbounded_channel();
599
600        // Workflows run in a LocalSet because they use Rc<RefCell> for state management.
601        // This allows them to not require Send/Sync bounds. The WorkflowExecutor replaces
602        // tokio::task::spawn_local for workflow tasks and provides custom wakers for
603        // nondeterminism detection.
604        let workflow_local_set = tokio::task::LocalSet::new();
605        let executor = WorkflowExecutor::new();
606
607        let wf_future_joiner = async {
608            UnboundedReceiverStream::new(wf_future_rx)
609                .map(Result::<_, anyhow::Error>::Ok)
610                .try_for_each_concurrent(
611                    None,
612                    |WorkflowFutureHandle {
613                         join_handle,
614                         run_id,
615                     }| {
616                        let wf_half = &*wf_half;
617                        async move {
618                            let result = join_handle.await.map_err(|e| anyhow::anyhow!("{e}"))?;
619                            // Eviction is normal workflow lifecycle - workflows loop waiting for
620                            // eviction after completion to manage cache cleanup
621                            if let Err(e) = result
622                                && !matches!(e, WorkflowTermination::Evicted)
623                            {
624                                return Err(e.into());
625                            }
626                            debug!(run_id=%run_id, "Removing workflow from cache");
627                            wf_half.workflows.borrow_mut().remove(&run_id);
628                            wf_half.workflow_removed_from_map.notify_one();
629                            Ok(())
630                        }
631                    },
632                )
633                .await
634                .context("Workflow futures encountered an error")
635        };
636        let wf_completion_processor = async {
637            UnboundedReceiverStream::new(completions_rx)
638                .map(Ok)
639                .try_for_each_concurrent(None, |mut completion| async {
640                    encode_payloads(
641                        &mut completion,
642                        common.data_converter.codec(),
643                        &SerializationContextData::Workflow,
644                    )
645                    .await;
646                    if let Some(ref i) = common.worker_interceptor {
647                        i.on_workflow_activation_completion(&completion).await;
648                    }
649                    common.worker.complete_workflow_activation(completion).await
650                })
651                .map_err(anyhow::Error::from)
652                .await
653                .context("Workflow completions processor encountered an error")
654        };
655        tokio::try_join!(
656            // Workflow-related tasks run inside LocalSet (allows !Send futures)
657            async {
658                workflow_local_set.run_until(async {
659                    tokio::try_join!(
660                        // Workflow polling loop
661                        async {
662                            loop {
663                            let mut activation =
664                                match common.worker.poll_workflow_activation().await {
665                                    Err(PollError::ShutDown) => {
666                                        break;
667                                    }
668                                    o => o?,
669                                };
670                            decode_payloads(
671                                &mut activation,
672                                common.data_converter.codec(),
673                                &SerializationContextData::Workflow,
674                            )
675                            .await;
676                            if let Some(ref i) = common.worker_interceptor {
677                                i.on_workflow_activation(&activation).await?;
678                            }
679                            if let Some(wf_fut) = wf_half
680                                .workflow_activation_handler(
681                                    common,
682                                    shutdown_token.clone(),
683                                    activation,
684                                    &completions_tx,
685                                    &executor,
686                                )
687                                .await?
688                                && wf_future_tx.send(wf_fut).is_err()
689                            {
690                                panic!(
691                                    "Receive half of completion processor channel cannot be dropped"
692                                );
693                            }
694                            // Drive the executor so spawned tasks and sent activations make
695                            // progress.
696                            executor.process_tasks();
697                        }
698                        // Tell still-alive workflows to evict themselves
699                        shutdown_token.cancel();
700                        // It's important to drop these so the future and completion processors will
701                        // terminate.
702                        drop(wf_future_tx);
703                        drop(completions_tx);
704                        executor.shutdown().await;
705                        Result::<_, anyhow::Error>::Ok(())
706                    },
707                    wf_future_joiner,
708                )
709                }).await
710            },
711            // Only poll on the activity queue if activity functions have been registered. This
712            // makes tests which use mocks dramatically more manageable.
713            async {
714                if !act_half.activities.is_empty() {
715                    loop {
716                        let activity = common.worker.poll_activity_task().await;
717                        if matches!(activity, Err(PollError::ShutDown)) {
718                            break;
719                        }
720                        let mut activity = activity?;
721                        decode_payloads(
722                            &mut activity,
723                            common.data_converter.codec(),
724                            &SerializationContextData::Activity,
725                        )
726                        .await;
727                        act_half.activity_task_handler(
728                            common.worker.clone(),
729                            common.task_queue.clone(),
730                            common.data_converter.clone(),
731                            activity,
732                        )?;
733                    }
734                };
735                Result::<_, anyhow::Error>::Ok(())
736            },
737            wf_completion_processor,
738        )?;
739
740        if let Some(i) = self.common.worker_interceptor.as_ref() {
741            i.on_shutdown(self);
742        }
743        self.common.worker.shutdown().await;
744        Ok(())
745    }
746
747    /// Set a [WorkerInterceptor]
748    pub fn set_worker_interceptor(&mut self, interceptor: impl WorkerInterceptor + 'static) {
749        self.common.worker_interceptor = Some(Box::new(interceptor));
750    }
751
752    /// Turns this rust worker into a new worker with all the same workflows and activities
753    /// registered, but with a new underlying core worker. Can be used to swap the worker for
754    /// a replay worker, change task queues, etc.
755    pub fn with_new_core_worker(&mut self, new_core_worker: Arc<CoreWorker>) {
756        self.common.worker = new_core_worker;
757    }
758
759    /// Returns number of currently cached workflows as understood by the SDK. Importantly, this
760    /// is not the same as understood by core, though they *should* always be in sync.
761    pub fn cached_workflows(&self) -> usize {
762        self.workflow_half.workflows.borrow().len()
763    }
764
765    /// Returns the instance key for this worker, used for worker heartbeating.
766    pub fn worker_instance_key(&self) -> Uuid {
767        self.common.worker.worker_instance_key()
768    }
769
770    #[doc(hidden)]
771    pub fn core_worker(&self) -> Arc<CoreWorker> {
772        self.common.worker.clone()
773    }
774
775    fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) {
776        (
777            &mut self.common,
778            &mut self.workflow_half,
779            &mut self.activity_half,
780        )
781    }
782}
783
784impl WorkflowHalf {
785    #[allow(clippy::type_complexity)]
786    async fn workflow_activation_handler(
787        &self,
788        common: &CommonWorker,
789        shutdown_token: CancellationToken,
790        mut activation: WorkflowActivation,
791        completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
792        executor: &WorkflowExecutor,
793    ) -> Result<
794        Option<WorkflowFutureHandle<workflow_executor::TaskHandle<WorkflowResult<Payload>>>>,
795        anyhow::Error,
796    > {
797        let mut res = None;
798        let run_id = activation.run_id.clone();
799
800        // If the activation is to init a workflow, create a new workflow driver for it,
801        // using the function associated with that workflow id
802        if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
803            Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
804            _ => None,
805        }) {
806            let workflow_type = sw.workflow_type.clone();
807            let payload_converter = common.data_converter.payload_converter().clone();
808            let (wff, activations) = {
809                if let Some(factory) = self.workflow_definitions.get_workflow(&workflow_type) {
810                    match WorkflowFunction::from_invocation(factory).start_workflow(
811                        common.worker.get_config().namespace.clone(),
812                        common.task_queue.clone(),
813                        run_id.clone(),
814                        std::mem::take(sw),
815                        completions_tx.clone(),
816                        payload_converter,
817                        self.detect_nondeterministic_futures,
818                    ) {
819                        Ok(result) => result,
820                        Err(e) => {
821                            warn!("Failed to create workflow {workflow_type}: {e}");
822                            completions_tx
823                                .send(WorkflowActivationCompletion::fail(
824                                    run_id,
825                                    format!("Failed to create workflow: {e}").into(),
826                                    Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
827                                ))
828                                .expect("Completion channel intact");
829                            return Ok(None);
830                        }
831                    }
832                } else {
833                    warn!("Workflow type {workflow_type} not found");
834                    completions_tx
835                        .send(WorkflowActivationCompletion::fail(
836                            run_id,
837                            format!("Workflow type {workflow_type} not found").into(),
838                            Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
839                        ))
840                        .expect("Completion channel intact");
841                    return Ok(None);
842                }
843            };
844            // TODO [rust-sdk-branch]: Deadlock detection
845            let jh = executor.spawn(async move {
846                tokio::select! {
847                    r = wff.fuse() => r,
848                    // TODO: This probably shouldn't abort early, as it could cause an in-progress
849                    //  complete to abort. Send synthetic remove activation
850                    _ = shutdown_token.cancelled() => {
851                        Err(WorkflowTermination::Evicted)
852                    }
853                }
854            });
855            res = Some(WorkflowFutureHandle {
856                join_handle: jh,
857                run_id: run_id.clone(),
858            });
859            loop {
860                // It's possible that we've got a new initialize workflow action before the last
861                // future for this run finished evicting, as a result of how futures might be
862                // interleaved. In that case, just wait until it's not in the map, which should be
863                // a matter of only a few `poll` calls.
864                if self.workflows.borrow_mut().contains_key(&run_id) {
865                    self.workflow_removed_from_map.notified().await;
866                } else {
867                    break;
868                }
869            }
870            self.workflows.borrow_mut().insert(
871                run_id.clone(),
872                WorkflowData {
873                    activation_chan: activations,
874                },
875            );
876        }
877
878        // The activation is expected to apply to some workflow we know about. Use it to
879        // unblock things and advance the workflow.
880        if let Some(dat) = self.workflows.borrow_mut().get_mut(&run_id) {
881            dat.activation_chan
882                .send(activation)
883                .expect("Workflow should exist if we're sending it an activation");
884        } else {
885            // When we failed to start a workflow, we never inserted it into the cache. But core
886            // sends us a `RemoveFromCache` job when we mark the StartWorkflow workflow activation
887            // as a failure, which we need to complete. Other SDKs add the workflow to the cache
888            // even when the workflow type is unknown/not found. To circumvent this, we simply mark
889            // any RemoveFromCache job for workflows that are not in the cache as complete.
890            if activation.jobs.len() == 1
891                && matches!(
892                    activation.jobs.first().map(|j| &j.variant),
893                    Some(Some(Variant::RemoveFromCache(_)))
894                )
895            {
896                completions_tx
897                    .send(WorkflowActivationCompletion::from_cmds(run_id, vec![]))
898                    .expect("Completion channel intact");
899                return Ok(None);
900            }
901
902            // In all other cases, we want to error as the runtime could be in an inconsistent state
903            // at this point.
904            bail!("Got activation {activation:?} for unknown workflow {run_id}");
905        };
906
907        Ok(res)
908    }
909}
910
911impl ActivityHalf {
912    /// Spawns off a task to handle the provided activity task
913    fn activity_task_handler(
914        &mut self,
915        worker: Arc<CoreWorker>,
916        task_queue: String,
917        data_converter: DataConverter,
918        activity: ActivityTask,
919    ) -> Result<(), anyhow::Error> {
920        match activity.variant {
921            Some(activity_task::Variant::Start(start)) => {
922                let act_fn = self.activities.get(&start.activity_type).ok_or_else(|| {
923                    anyhow!(
924                        "No function registered for activity type {}",
925                        start.activity_type
926                    )
927                })?;
928                let span = info_span!(
929                    "RunActivity",
930                    "otel.name" = format!("RunActivity:{}", start.activity_type),
931                    "otel.kind" = "server",
932                    "temporalActivityID" = start.activity_id,
933                    "temporalWorkflowID" = field::Empty,
934                    "temporalRunID" = field::Empty,
935                );
936                let ct = CancellationToken::new();
937                let task_token = activity.task_token;
938                self.task_tokens_to_cancels
939                    .insert(task_token.clone().into(), ct.clone());
940
941                let (ctx, args) =
942                    ActivityContext::new(worker.clone(), ct, task_queue, task_token.clone(), start);
943                let codec_data_converter = data_converter.clone();
944
945                tokio::spawn(async move {
946                    let act_fut = async move {
947                        if let Some(info) = &ctx.info().workflow_execution {
948                            Span::current()
949                                .record("temporalWorkflowID", &info.workflow_id)
950                                .record("temporalRunID", &info.run_id);
951                        }
952                        (act_fn)(args, data_converter, ctx).await
953                    }
954                    .instrument(span);
955                    let output = AssertUnwindSafe(act_fut).catch_unwind().await;
956                    let result = match output {
957                        Err(e) => ActivityExecutionResult::fail(Failure::application_failure(
958                            format!("Activity function panicked: {}", panic_formatter(e)),
959                            true,
960                        )),
961                        Ok(Ok(p)) => ActivityExecutionResult::ok(p),
962                        Ok(Err(err)) => match err {
963                            ActivityError::Retryable {
964                                source,
965                                explicit_delay,
966                            } => ActivityExecutionResult::fail({
967                                let mut f = Failure::application_failure_from_error(
968                                    anyhow::Error::from_boxed(source),
969                                    false,
970                                );
971                                if let Some(d) = explicit_delay
972                                    && let Some(failure::FailureInfo::ApplicationFailureInfo(fi)) =
973                                        f.failure_info.as_mut()
974                                {
975                                    fi.next_retry_delay = d.try_into().ok();
976                                }
977                                f
978                            }),
979                            ActivityError::Cancelled { details } => {
980                                ActivityExecutionResult::cancel_from_details(details)
981                            }
982                            ActivityError::NonRetryable(nre) => ActivityExecutionResult::fail(
983                                Failure::application_failure_from_error(
984                                    anyhow::Error::from_boxed(nre),
985                                    true,
986                                ),
987                            ),
988                            ActivityError::WillCompleteAsync => {
989                                ActivityExecutionResult::will_complete_async()
990                            }
991                        },
992                    };
993                    let mut completion = ActivityTaskCompletion {
994                        task_token,
995                        result: Some(result),
996                    };
997                    encode_payloads(
998                        &mut completion,
999                        codec_data_converter.codec(),
1000                        &SerializationContextData::Activity,
1001                    )
1002                    .await;
1003                    worker.complete_activity_task(completion).await?;
1004                    Ok::<_, anyhow::Error>(())
1005                });
1006            }
1007            Some(activity_task::Variant::Cancel(_)) => {
1008                if let Some(ct) = self
1009                    .task_tokens_to_cancels
1010                    .get(activity.task_token.as_slice())
1011                {
1012                    ct.cancel();
1013                }
1014            }
1015            None => bail!("Undefined activity task variant"),
1016        }
1017        Ok(())
1018    }
1019}
1020
1021#[derive(Debug)]
1022enum UnblockEvent {
1023    Timer(u32, TimerResult),
1024    Activity(u32, Box<ActivityResolution>),
1025    WorkflowStart(u32, Box<ChildWorkflowStartStatus>),
1026    WorkflowComplete(u32, Box<ChildWorkflowResult>),
1027    SignalExternal(u32, Option<Failure>),
1028    CancelExternal(u32, Option<Failure>),
1029    NexusOperationStart(u32, Box<resolve_nexus_operation_start::Status>),
1030    NexusOperationComplete(u32, Box<NexusOperationResult>),
1031}
1032
1033/// Result of awaiting on a timer
1034#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1035pub enum TimerResult {
1036    /// The timer was cancelled
1037    Cancelled,
1038    /// The timer elapsed and fired
1039    Fired,
1040}
1041
1042/// Successful result of sending a signal to an external workflow
1043#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1044pub struct SignalExternalOk;
1045/// Result of awaiting on sending a signal to an external workflow
1046pub type SignalExternalWfResult = Result<SignalExternalOk, Failure>;
1047
1048/// Successful result of sending a cancel request to an external workflow
1049#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1050pub struct CancelExternalOk;
1051/// Result of awaiting on sending a cancel request to an external workflow
1052pub type CancelExternalWfResult = Result<CancelExternalOk, Failure>;
1053
1054trait Unblockable {
1055    type OtherDat;
1056
1057    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self;
1058}
1059
1060impl Unblockable for TimerResult {
1061    type OtherDat = ();
1062    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1063        match ue {
1064            UnblockEvent::Timer(_, result) => result,
1065            _ => panic!("Invalid unblock event for timer"),
1066        }
1067    }
1068}
1069
1070impl Unblockable for ActivityResolution {
1071    type OtherDat = ();
1072    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1073        match ue {
1074            UnblockEvent::Activity(_, result) => *result,
1075            _ => panic!("Invalid unblock event for activity"),
1076        }
1077    }
1078}
1079
1080impl<WD: WorkflowDefinition> Unblockable for PendingChildWorkflow<WD> {
1081    type OtherDat = ChildWfCommon;
1082    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1083        match ue {
1084            UnblockEvent::WorkflowStart(_, result) => Self {
1085                status: *result,
1086                common: od,
1087                _phantom: PhantomData,
1088            },
1089            _ => panic!("Invalid unblock event for child workflow start"),
1090        }
1091    }
1092}
1093
1094impl Unblockable for ChildWorkflowResult {
1095    type OtherDat = ();
1096    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1097        match ue {
1098            UnblockEvent::WorkflowComplete(_, result) => *result,
1099            _ => panic!("Invalid unblock event for child workflow complete"),
1100        }
1101    }
1102}
1103
1104impl Unblockable for SignalExternalWfResult {
1105    type OtherDat = ();
1106    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1107        match ue {
1108            UnblockEvent::SignalExternal(_, maybefail) => {
1109                maybefail.map_or(Ok(SignalExternalOk), Err)
1110            }
1111            _ => panic!("Invalid unblock event for signal external workflow result"),
1112        }
1113    }
1114}
1115
1116impl Unblockable for CancelExternalWfResult {
1117    type OtherDat = ();
1118    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1119        match ue {
1120            UnblockEvent::CancelExternal(_, maybefail) => {
1121                maybefail.map_or(Ok(CancelExternalOk), Err)
1122            }
1123            _ => panic!("Invalid unblock event for signal external workflow result"),
1124        }
1125    }
1126}
1127
1128type NexusStartResult = Result<StartedNexusOperation, Failure>;
1129impl Unblockable for NexusStartResult {
1130    type OtherDat = NexusUnblockData;
1131    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1132        match ue {
1133            UnblockEvent::NexusOperationStart(_, result) => match *result {
1134                resolve_nexus_operation_start::Status::OperationToken(op_token) => {
1135                    Ok(StartedNexusOperation {
1136                        operation_token: Some(op_token),
1137                        unblock_dat: od,
1138                    })
1139                }
1140                resolve_nexus_operation_start::Status::StartedSync(_) => {
1141                    Ok(StartedNexusOperation {
1142                        operation_token: None,
1143                        unblock_dat: od,
1144                    })
1145                }
1146                resolve_nexus_operation_start::Status::Failed(f) => Err(f),
1147            },
1148            _ => panic!("Invalid unblock event for nexus operation"),
1149        }
1150    }
1151}
1152
1153impl Unblockable for NexusOperationResult {
1154    type OtherDat = ();
1155
1156    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1157        match ue {
1158            UnblockEvent::NexusOperationComplete(_, result) => *result,
1159            _ => panic!("Invalid unblock event for nexus operation complete"),
1160        }
1161    }
1162}
1163
1164/// Identifier for cancellable operations
1165#[derive(Debug, Clone)]
1166pub(crate) enum CancellableID {
1167    Timer(u32),
1168    Activity(u32),
1169    LocalActivity(u32),
1170    ChildWorkflow {
1171        seqnum: u32,
1172        reason: String,
1173    },
1174    SignalExternalWorkflow(u32),
1175    /// A nexus operation (waiting for start)
1176    NexusOp(u32),
1177}
1178
1179/// Cancellation IDs that support a reason.
1180pub(crate) trait SupportsCancelReason {
1181    /// Returns a new version of this ID with the provided cancellation reason.
1182    fn with_reason(self, reason: String) -> CancellableID;
1183}
1184#[derive(Debug, Clone)]
1185pub(crate) enum CancellableIDWithReason {
1186    ChildWorkflow { seqnum: u32 },
1187}
1188impl SupportsCancelReason for CancellableIDWithReason {
1189    fn with_reason(self, reason: String) -> CancellableID {
1190        match self {
1191            CancellableIDWithReason::ChildWorkflow { seqnum } => {
1192                CancellableID::ChildWorkflow { seqnum, reason }
1193            }
1194        }
1195    }
1196}
1197impl From<CancellableIDWithReason> for CancellableID {
1198    fn from(v: CancellableIDWithReason) -> Self {
1199        v.with_reason("".to_string())
1200    }
1201}
1202
1203#[derive(derive_more::From)]
1204#[allow(clippy::large_enum_variant)]
1205enum RustWfCmd {
1206    #[from(ignore)]
1207    Cancel(CancellableID),
1208    ForceWFTFailure(anyhow::Error),
1209    NewCmd(CommandCreateRequest),
1210    NewNonblockingCmd(workflow_command::Variant),
1211    SubscribeChildWorkflowCompletion(CommandSubscribeChildWorkflowCompletion),
1212    SubscribeNexusOperationCompletion {
1213        seq: u32,
1214        unblocker: oneshot::Sender<UnblockEvent>,
1215    },
1216}
1217
1218struct CommandCreateRequest {
1219    cmd: WorkflowCommand,
1220    unblocker: oneshot::Sender<UnblockEvent>,
1221}
1222
1223struct CommandSubscribeChildWorkflowCompletion {
1224    seq: u32,
1225    unblocker: oneshot::Sender<UnblockEvent>,
1226}
1227
1228/// The result of running a workflow.
1229///
1230/// Successful completion returns `Ok(T)` where `T` is the workflow's return type.
1231/// Non-error terminations (cancel, eviction, continue-as-new) return `Err(WorkflowTermination)`.
1232pub type WorkflowResult<T> = Result<T, WorkflowTermination>;
1233
1234/// Represents ways a workflow can terminate without producing a normal result.
1235///
1236/// This is used as the error type in [`WorkflowResult<T>`] for non-error termination conditions
1237/// like cancellation, eviction, continue-as-new, or actual failures.
1238#[derive(Debug, thiserror::Error)]
1239pub enum WorkflowTermination {
1240    /// The workflow was cancelled.
1241    #[error("Workflow cancelled")]
1242    Cancelled,
1243
1244    /// The workflow was evicted from the cache.
1245    #[error("Workflow evicted from cache")]
1246    Evicted,
1247
1248    /// The workflow should continue as a new execution.
1249    #[error("Continue as new")]
1250    ContinueAsNew(Box<ContinueAsNewWorkflowExecution>),
1251
1252    /// The workflow failed with an error.
1253    #[error("Workflow failed: {0}")]
1254    Failed(#[source] anyhow::Error),
1255}
1256
1257impl WorkflowTermination {
1258    /// Construct a [WorkflowTermination::ContinueAsNew]
1259    pub fn continue_as_new(can: ContinueAsNewWorkflowExecution) -> Self {
1260        Self::ContinueAsNew(Box::new(can))
1261    }
1262
1263    /// Construct a [WorkflowTermination::Failed] variant from any error.
1264    pub fn failed(err: impl Into<anyhow::Error>) -> Self {
1265        Self::Failed(err.into())
1266    }
1267}
1268
1269impl From<anyhow::Error> for WorkflowTermination {
1270    fn from(err: anyhow::Error) -> Self {
1271        Self::Failed(err)
1272    }
1273}
1274
1275impl From<ActivityExecutionError> for WorkflowTermination {
1276    fn from(value: ActivityExecutionError) -> Self {
1277        Self::failed(value)
1278    }
1279}
1280
1281impl From<ChildWorkflowExecutionError> for WorkflowTermination {
1282    fn from(value: ChildWorkflowExecutionError) -> Self {
1283        Self::failed(value)
1284    }
1285}
1286
1287impl From<ChildWorkflowSignalError> for WorkflowTermination {
1288    fn from(value: ChildWorkflowSignalError) -> Self {
1289        Self::failed(value)
1290    }
1291}
1292
1293/// Activity functions may return these values when exiting
1294#[derive(Debug)]
1295pub enum ActExitValue<T> {
1296    /// Completion requires an asynchronous callback
1297    WillCompleteAsync,
1298    /// Finish with a result
1299    Normal(T),
1300}
1301
1302impl<T: AsJsonPayloadExt> From<T> for ActExitValue<T> {
1303    fn from(t: T) -> Self {
1304        Self::Normal(t)
1305    }
1306}
1307
1308/// Attempts to turn caught panics into something printable
1309fn panic_formatter(panic: Box<dyn Any>) -> Box<dyn Display> {
1310    _panic_formatter::<&str>(panic)
1311}
1312fn _panic_formatter<T: 'static + PrintablePanicType>(panic: Box<dyn Any>) -> Box<dyn Display> {
1313    match panic.downcast::<T>() {
1314        Ok(d) => d,
1315        Err(orig) => {
1316            if TypeId::of::<<T as PrintablePanicType>::NextType>()
1317                == TypeId::of::<EndPrintingAttempts>()
1318            {
1319                return Box::new("Couldn't turn panic into a string");
1320            }
1321            _panic_formatter::<T::NextType>(orig)
1322        }
1323    }
1324}
1325trait PrintablePanicType: Display {
1326    type NextType: PrintablePanicType;
1327}
1328impl PrintablePanicType for &str {
1329    type NextType = String;
1330}
1331impl PrintablePanicType for String {
1332    type NextType = EndPrintingAttempts;
1333}
1334struct EndPrintingAttempts {}
1335impl Display for EndPrintingAttempts {
1336    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1337        write!(f, "Will never be printed")
1338    }
1339}
1340impl PrintablePanicType for EndPrintingAttempts {
1341    type NextType = EndPrintingAttempts;
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346    use super::*;
1347    use temporalio_macros::{activities, workflow, workflow_methods};
1348
1349    struct MyActivities {}
1350
1351    #[activities]
1352    impl MyActivities {
1353        #[activity]
1354        async fn my_activity(_ctx: ActivityContext) -> Result<(), ActivityError> {
1355            Ok(())
1356        }
1357
1358        #[activity]
1359        async fn takes_self(
1360            self: Arc<Self>,
1361            _ctx: ActivityContext,
1362            _: String,
1363        ) -> Result<(), ActivityError> {
1364            Ok(())
1365        }
1366    }
1367
1368    #[test]
1369    fn test_activity_registration() {
1370        let act_instance = MyActivities {};
1371        let _ = WorkerOptions::new("task_q").register_activities(act_instance);
1372    }
1373
1374    // Compile-only test for workflow context invocation
1375    #[allow(unused, clippy::diverging_sub_expression)]
1376    fn test_activity_via_workflow_context() {
1377        let wf_ctx: WorkflowContext<MyWorkflow> = unimplemented!();
1378        wf_ctx.start_activity(
1379            MyActivities::my_activity,
1380            (),
1381            ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1382        );
1383        wf_ctx.start_activity(
1384            MyActivities::takes_self,
1385            "Hi".to_owned(),
1386            ActivityOptions::start_to_close_timeout(Duration::from_secs(5)),
1387        );
1388    }
1389
1390    // Compile-only test for direct invocation via .run()
1391    #[allow(dead_code, unreachable_code, unused, clippy::diverging_sub_expression)]
1392    async fn test_activity_direct_invocation() {
1393        let ctx: ActivityContext = unimplemented!();
1394        let _result = MyActivities::my_activity.run(ctx).await;
1395    }
1396
1397    #[workflow]
1398    struct MyWorkflow {
1399        counter: u32,
1400    }
1401
1402    #[allow(dead_code)]
1403    #[workflow_methods]
1404    impl MyWorkflow {
1405        #[init]
1406        fn new(_ctx: &WorkflowContextView, _input: String) -> Self {
1407            Self { counter: 0 }
1408        }
1409
1410        #[run]
1411        async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
1412            Ok(format!("Counter: {}", ctx.state(|s| s.counter)))
1413        }
1414
1415        #[signal(name = "increment")]
1416        fn increment_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
1417            self.counter += amount;
1418        }
1419
1420        #[signal]
1421        async fn async_signal(_ctx: &mut WorkflowContext<Self>) {}
1422
1423        #[query]
1424        fn get_counter(&self, _ctx: &WorkflowContextView) -> u32 {
1425            self.counter
1426        }
1427
1428        #[update(name = "double")]
1429        fn double_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>) -> u32 {
1430            self.counter *= 2;
1431            self.counter
1432        }
1433
1434        #[update]
1435        async fn async_update(_ctx: &mut WorkflowContext<Self>, val: i32) -> i32 {
1436            val * 2
1437        }
1438    }
1439
1440    #[test]
1441    fn test_workflow_registration() {
1442        let _ = WorkerOptions::new("task_q").register_workflow::<MyWorkflow>();
1443    }
1444
1445    fn default_identity() -> String {
1446        format!(
1447            "{}@{}",
1448            std::process::id(),
1449            gethostname::gethostname().to_string_lossy()
1450        )
1451    }
1452
1453    #[rstest::rstest]
1454    #[case::default_when_none_provided(None, "", Some(default_identity()))]
1455    #[case::connection_identity_preserved(None, "conn-identity", None)]
1456    #[case::worker_override_takes_precedence(
1457        Some("worker-identity"),
1458        "conn-identity",
1459        Some("worker-identity".into())
1460    )]
1461    #[case::worker_override_with_empty_connection(
1462        Some("worker-identity"),
1463        "",
1464        Some("worker-identity".into())
1465    )]
1466    #[test]
1467    fn client_identity_resolution(
1468        #[case] worker_override: Option<&str>,
1469        #[case] connection_identity: &str,
1470        #[case] expected: Option<String>,
1471    ) {
1472        let opts = WorkerOptions::new("task_q")
1473            .task_types(WorkerTaskTypes::activity_only())
1474            .maybe_client_identity_override(worker_override.map(|s| s.to_owned()))
1475            .build();
1476        let config = opts
1477            .to_core_options("ns".into(), connection_identity.into())
1478            .unwrap();
1479        assert_eq!(config.client_identity_override, expected);
1480    }
1481}