Skip to main content

temporalio_sdk/
lib.rs

1#![warn(missing_docs)] // error if there are missing docs
2
3//! This crate defines an alpha-stage Temporal Rust SDK.
4//!
5//! Currently defining activities and running an activity-only worker is the most stable code.
6//! Workflow definitions exist and running a workflow worker works, but the API is still very
7//! unstable.
8//!
9//! An example of running an activity worker:
10//! ```no_run
11//! use std::str::FromStr;
12//! use temporalio_client::{Client, ClientOptions, Connection, ConnectionOptions};
13//! use temporalio_common::{
14//!     telemetry::TelemetryOptions,
15//!     worker::{WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerTaskTypes},
16//! };
17//! use temporalio_macros::activities;
18//! use temporalio_sdk::{
19//!     Worker, WorkerOptions,
20//!     activities::{ActivityContext, ActivityError},
21//! };
22//! use temporalio_sdk_core::{CoreRuntime, RuntimeOptions, Url};
23//!
24//! struct MyActivities;
25//!
26//! #[activities]
27//! impl MyActivities {
28//!     #[activity]
29//!     pub(crate) async fn echo(
30//!         _ctx: ActivityContext,
31//!         e: String,
32//!     ) -> Result<String, ActivityError> {
33//!         Ok(e)
34//!     }
35//! }
36//!
37//! #[tokio::main]
38//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
39//!     let connection_options =
40//!         ConnectionOptions::new(Url::from_str("http://localhost:7233")?).build();
41//!     let telemetry_options = TelemetryOptions::builder().build();
42//!     let runtime_options = RuntimeOptions::builder()
43//!         .telemetry_options(telemetry_options)
44//!         .build()?;
45//!     let runtime = CoreRuntime::new_assume_tokio(runtime_options)?;
46//!
47//!     let connection = Connection::connect(connection_options).await?;
48//!     let client = Client::new(connection, ClientOptions::new("my_namespace").build())?;
49//!
50//!     let worker_options = WorkerOptions::new("task_queue")
51//!         .task_types(WorkerTaskTypes::activity_only())
52//!         .deployment_options(WorkerDeploymentOptions {
53//!             version: WorkerDeploymentVersion {
54//!                 deployment_name: "my_deployment".to_owned(),
55//!                 build_id: "my_build_id".to_owned(),
56//!             },
57//!             use_worker_versioning: false,
58//!             default_versioning_behavior: None,
59//!         })
60//!         .register_activities(MyActivities)
61//!         .build();
62//!
63//!     let mut worker = Worker::new(&runtime, client, worker_options)?;
64//!     worker.run().await?;
65//!
66//!     Ok(())
67//! }
68//! ```
69
70#[macro_use]
71extern crate tracing;
72extern crate self as temporalio_sdk;
73
74pub mod activities;
75pub mod interceptors;
76mod workflow_context;
77mod workflow_future;
78pub mod workflows;
79
80use workflow_future::WorkflowFunction;
81
82pub use temporalio_client::Namespace;
83pub use workflow_context::{
84    ActivityExecutionError, ActivityOptions, BaseWorkflowContext, CancellableFuture, ChildWorkflow,
85    ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo,
86    PendingChildWorkflow, RootWorkflowInfo, Signal, SignalData, SignalWorkflowOptions,
87    StartedChildWorkflow, SyncWorkflowContext, TimerOptions, WorkflowContext, WorkflowContextView,
88};
89
90use crate::{
91    activities::{
92        ActivityContext, ActivityDefinitions, ActivityError, ActivityImplementer,
93        ExecutableActivity,
94    },
95    interceptors::WorkerInterceptor,
96    workflow_context::{ChildWfCommon, NexusUnblockData, StartedNexusOperation},
97    workflows::{WorkflowDefinitions, WorkflowImplementation, WorkflowImplementer},
98};
99use anyhow::{Context, anyhow, bail};
100use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
101use std::{
102    any::{Any, TypeId},
103    cell::RefCell,
104    collections::{HashMap, HashSet},
105    fmt::{Debug, Display, Formatter},
106    future::Future,
107    panic::AssertUnwindSafe,
108    sync::Arc,
109    time::Duration,
110};
111use temporalio_client::{Client, NamespacedClient};
112use temporalio_common::{
113    ActivityDefinition, WorkflowDefinition,
114    data_converters::{DataConverter, SerializationContextData},
115    payload_visitor::{decode_payloads, encode_payloads},
116    protos::{
117        TaskToken,
118        coresdk::{
119            ActivityTaskCompletion, AsJsonPayloadExt,
120            activity_result::{ActivityExecutionResult, ActivityResolution},
121            activity_task::{ActivityTask, activity_task},
122            child_workflow::ChildWorkflowResult,
123            common::NamespacedWorkflowExecution,
124            nexus::NexusOperationResult,
125            workflow_activation::{
126                WorkflowActivation,
127                resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus,
128                resolve_nexus_operation_start, workflow_activation_job::Variant,
129            },
130            workflow_commands::{
131                ContinueAsNewWorkflowExecution, WorkflowCommand, workflow_command,
132            },
133            workflow_completion::WorkflowActivationCompletion,
134        },
135        temporal::api::{
136            common::v1::Payload,
137            enums::v1::WorkflowTaskFailedCause,
138            failure::v1::{Failure, failure},
139        },
140    },
141    worker::{WorkerDeploymentOptions, WorkerTaskTypes, build_id_from_current_exe},
142};
143use temporalio_sdk_core::{
144    CoreRuntime, PollError, PollerBehavior, TunerBuilder, Worker as CoreWorker, WorkerConfig,
145    WorkerTuner, WorkerVersioningStrategy, WorkflowErrorType, init_worker,
146};
147use tokio::{
148    sync::{
149        Notify,
150        mpsc::{UnboundedSender, unbounded_channel},
151        oneshot,
152    },
153    task::JoinError,
154};
155use tokio_stream::wrappers::UnboundedReceiverStream;
156use tokio_util::sync::CancellationToken;
157use tracing::{Instrument, Span, field};
158use uuid::Uuid;
159
160/// Contains options for configuring a worker.
161#[derive(bon::Builder, Clone)]
162#[builder(start_fn = new, on(String, into), state_mod(vis = "pub"))]
163#[non_exhaustive]
164pub struct WorkerOptions {
165    /// What task queue will this worker poll from? This task queue name will be used for both
166    /// workflow and activity polling.
167    #[builder(start_fn)]
168    pub task_queue: String,
169
170    #[builder(field)]
171    activities: ActivityDefinitions,
172
173    #[builder(field)]
174    workflows: WorkflowDefinitions,
175
176    /// Set the deployment options for this worker. Defaults to a hash of the currently running
177    /// executable.
178    #[builder(default = def_build_id())]
179    pub deployment_options: WorkerDeploymentOptions,
180    /// A human-readable string that can identify this worker. Using something like sdk version
181    /// and host name is a good default. If set, overrides the identity set (if any) on the client
182    /// used by this worker.
183    pub client_identity_override: Option<String>,
184    /// If set nonzero, workflows will be cached and sticky task queues will be used, meaning that
185    /// history updates are applied incrementally to suspended instances of workflow execution.
186    /// Workflows are evicted according to a least-recently-used policy once the cache maximum is
187    /// reached. Workflows may also be explicitly evicted at any time, or as a result of errors
188    /// or failures.
189    #[builder(default = 1000)]
190    pub max_cached_workflows: usize,
191    /// Set a [crate::WorkerTuner] for this worker, which controls how many slots are available for
192    /// the different kinds of tasks.
193    #[builder(default = Arc::new(TunerBuilder::default().build()))]
194    pub tuner: Arc<dyn WorkerTuner + Send + Sync>,
195    /// Controls how polling for Workflow tasks will happen on this worker's task queue. See also
196    /// [WorkerConfig::nonsticky_to_sticky_poll_ratio]. If using SimpleMaximum, Must be at least 2
197    /// when `max_cached_workflows` > 0, or is an error.
198    #[builder(default = PollerBehavior::SimpleMaximum(5))]
199    pub workflow_task_poller_behavior: PollerBehavior,
200    /// Only applies when using [PollerBehavior::SimpleMaximum]
201    ///
202    /// (max workflow task polls * this number) = the number of max pollers that will be allowed for
203    /// the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky
204    /// queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for
205    /// either poller is 1, so if the maximum allowed is 1 and sticky queues are enabled, there will
206    /// be 2 concurrent polls.
207    #[builder(default = 0.2)]
208    pub nonsticky_to_sticky_poll_ratio: f32,
209    /// Controls how polling for Activity tasks will happen on this worker's task queue.
210    #[builder(default = PollerBehavior::SimpleMaximum(5))]
211    pub activity_task_poller_behavior: PollerBehavior,
212    /// Controls how polling for Nexus tasks will happen on this worker's task queue.
213    #[builder(default = PollerBehavior::SimpleMaximum(5))]
214    pub nexus_task_poller_behavior: PollerBehavior,
215    // TODO [rust-sdk-branch]: Will go away once workflow registration can only happen in here.
216    //   Then it can be auto-determined.
217    /// Specifies which task types this worker will poll for.
218    ///
219    /// Note: At least one task type must be specified or the worker will fail validation.
220    #[builder(default = WorkerTaskTypes::all())]
221    pub task_types: WorkerTaskTypes,
222    /// How long a workflow task is allowed to sit on the sticky queue before it is timed out
223    /// and moved to the non-sticky queue where it may be picked up by any worker.
224    #[builder(default = Duration::from_secs(10))]
225    pub sticky_queue_schedule_to_start_timeout: Duration,
226    /// Longest interval for throttling activity heartbeats
227    #[builder(default = Duration::from_secs(60))]
228    pub max_heartbeat_throttle_interval: Duration,
229    /// Default interval for throttling activity heartbeats in case
230    /// `ActivityOptions.heartbeat_timeout` is unset.
231    /// When the timeout *is* set in the `ActivityOptions`, throttling is set to
232    /// `heartbeat_timeout * 0.8`.
233    #[builder(default = Duration::from_secs(30))]
234    pub default_heartbeat_throttle_interval: Duration,
235    /// Sets the maximum number of activities per second the task queue will dispatch, controlled
236    /// server-side. Note that this only takes effect upon an activity poll request. If multiple
237    /// workers on the same queue have different values set, they will thrash with the last poller
238    /// winning.
239    ///
240    /// Setting this to a nonzero value will also disable eager activity execution.
241    pub max_task_queue_activities_per_second: Option<f64>,
242    /// Limits the number of activities per second that this worker will process. The worker will
243    /// not poll for new activities if by doing so it might receive and execute an activity which
244    /// would cause it to exceed this limit. Negative, zero, or NaN values will cause building
245    /// the options to fail.
246    pub max_worker_activities_per_second: Option<f64>,
247    /// Any error types listed here will cause any workflow being processed by this worker to fail,
248    /// rather than simply failing the workflow task.
249    #[builder(default)]
250    pub workflow_failure_errors: HashSet<WorkflowErrorType>,
251    /// Like [WorkerConfig::workflow_failure_errors], but specific to certain workflow types (the
252    /// map key).
253    #[builder(default)]
254    pub workflow_types_to_failure_errors: HashMap<String, HashSet<WorkflowErrorType>>,
255    /// If set, the worker will issue cancels for all outstanding activities and nexus operations after
256    /// shutdown has been initiated and this amount of time has elapsed.
257    pub graceful_shutdown_period: Option<Duration>,
258}
259
260impl<S: worker_options_builder::State> WorkerOptionsBuilder<S> {
261    /// Registers all activities on an activity implementer.
262    pub fn register_activities<AI: ActivityImplementer>(mut self, instance: AI) -> Self {
263        self.activities.register_activities::<AI>(instance);
264        self
265    }
266    /// Registers a specific activitiy.
267    pub fn register_activity<AD>(mut self, instance: Arc<AD::Implementer>) -> Self
268    where
269        AD: ActivityDefinition + ExecutableActivity,
270        AD::Output: Send + Sync,
271    {
272        self.activities.register_activity::<AD>(instance);
273        self
274    }
275
276    /// Registers all workflows on a workflow implementer.
277    pub fn register_workflow<WI: WorkflowImplementer>(mut self) -> Self {
278        self.workflows.register_workflow::<WI>();
279        self
280    }
281
282    /// Register a workflow with a custom factory for instance creation.
283    ///
284    /// # Warning: Advanced Usage
285    ///
286    /// This method is intended for scenarios requiring injection of un-serializable
287    /// state into workflows.
288    ///
289    /// **This can easily cause nondeterminism**
290    ///
291    /// Only use when you understand the implications and have a specific need that cannot be met
292    /// otherwise.
293    ///
294    /// # Panics
295    ///
296    /// Panics if the workflow type defines an `#[init]` method. Workflows using
297    /// factory registration must not have `#[init]` to avoid ambiguity about
298    /// instance creation.
299    pub fn register_workflow_with_factory<W, F>(mut self, factory: F) -> Self
300    where
301        W: WorkflowImplementation,
302        <W::Run as WorkflowDefinition>::Input: Send,
303        F: Fn() -> W + Send + Sync + 'static,
304    {
305        self.workflows
306            .register_workflow_run_with_factory::<W, F>(factory);
307        self
308    }
309}
310
311// Needs to exist to avoid https://github.com/elastio/bon/issues/359
312fn def_build_id() -> WorkerDeploymentOptions {
313    WorkerDeploymentOptions::from_build_id(build_id_from_current_exe().to_owned())
314}
315
316impl WorkerOptions {
317    /// Registers all activities on an activity implementer.
318    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
319        self.activities.register_activities::<AI>(instance);
320        self
321    }
322    /// Registers a specific activitiy.
323    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
324    where
325        AD: ActivityDefinition + ExecutableActivity,
326        AD::Output: Send + Sync,
327    {
328        self.activities.register_activity::<AD>(instance);
329        self
330    }
331    /// Returns all the registered activities by cloning the current set.
332    pub fn activities(&self) -> ActivityDefinitions {
333        self.activities.clone()
334    }
335
336    /// Registers all workflows on a workflow implementer.
337    pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
338        self.workflows.register_workflow::<WI>();
339        self
340    }
341
342    /// Register a workflow with a custom factory for instance creation.
343    ///
344    /// # Warning: Advanced Usage
345    /// See [WorkerOptionsBuilder::register_workflow_with_factory] for more.
346    pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
347    where
348        W: WorkflowImplementation,
349        <W::Run as WorkflowDefinition>::Input: Send,
350        F: Fn() -> W + Send + Sync + 'static,
351    {
352        self.workflows
353            .register_workflow_run_with_factory::<W, F>(factory);
354        self
355    }
356
357    /// Returns all the registered workflows by cloning the current set.
358    pub fn workflows(&self) -> WorkflowDefinitions {
359        self.workflows.clone()
360    }
361
362    #[doc(hidden)]
363    pub fn to_core_options(&self, namespace: String) -> Result<WorkerConfig, String> {
364        WorkerConfig::builder()
365            .namespace(namespace)
366            .task_queue(self.task_queue.clone())
367            .maybe_client_identity_override(self.client_identity_override.clone())
368            .max_cached_workflows(self.max_cached_workflows)
369            .tuner(self.tuner.clone())
370            .workflow_task_poller_behavior(self.workflow_task_poller_behavior)
371            .activity_task_poller_behavior(self.activity_task_poller_behavior)
372            .nexus_task_poller_behavior(self.nexus_task_poller_behavior)
373            .task_types(self.task_types)
374            .sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
375            .max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
376            .default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
377            .maybe_max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
378            .maybe_max_worker_activities_per_second(self.max_worker_activities_per_second)
379            .maybe_graceful_shutdown_period(self.graceful_shutdown_period)
380            .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
381                self.deployment_options.clone(),
382            ))
383            .workflow_failure_errors(self.workflow_failure_errors.clone())
384            .workflow_types_to_failure_errors(self.workflow_types_to_failure_errors.clone())
385            .build()
386    }
387}
388
389/// A worker that can poll for and respond to workflow tasks by using
390/// [temporalio_macros::workflow], and activity tasks by using activities defined with
391/// [temporalio_macros::activities].
392pub struct Worker {
393    common: CommonWorker,
394    workflow_half: WorkflowHalf,
395    activity_half: ActivityHalf,
396}
397
398struct CommonWorker {
399    worker: Arc<CoreWorker>,
400    task_queue: String,
401    worker_interceptor: Option<Box<dyn WorkerInterceptor>>,
402    data_converter: DataConverter,
403}
404
405#[derive(Default)]
406struct WorkflowHalf {
407    /// Maps run id to cached workflow state
408    workflows: RefCell<HashMap<String, WorkflowData>>,
409    workflow_definitions: WorkflowDefinitions,
410    workflow_removed_from_map: Notify,
411}
412struct WorkflowData {
413    /// Channel used to send the workflow activations
414    activation_chan: UnboundedSender<WorkflowActivation>,
415}
416
417struct WorkflowFutureHandle<F: Future<Output = Result<WorkflowResult<Payload>, JoinError>>> {
418    join_handle: F,
419    run_id: String,
420}
421
422#[derive(Default)]
423struct ActivityHalf {
424    /// Maps activity type to the function for executing activities of that type
425    activities: ActivityDefinitions,
426    task_tokens_to_cancels: HashMap<TaskToken, CancellationToken>,
427}
428
429impl Worker {
430    /// Create a new worker from an existing connection, and options.
431    pub fn new(
432        runtime: &CoreRuntime,
433        client: Client,
434        mut options: WorkerOptions,
435    ) -> Result<Self, Box<dyn std::error::Error>> {
436        let acts = std::mem::take(&mut options.activities);
437        let wfs = std::mem::take(&mut options.workflows);
438        let wc = options
439            .to_core_options(client.namespace())
440            .map_err(|s| anyhow::anyhow!("{s}"))?;
441        let core = init_worker(runtime, wc, client.connection().clone())?;
442        let mut me = Self::new_from_core_definitions(
443            Arc::new(core),
444            client.data_converter().clone(),
445            Default::default(),
446            Default::default(),
447        );
448        me.activity_half.activities = acts;
449        me.workflow_half.workflow_definitions = wfs;
450        Ok(me)
451    }
452
453    // TODO [rust-sdk-branch]: Eliminate this constructor in favor of passing in fake connection
454    #[doc(hidden)]
455    pub fn new_from_core(worker: Arc<CoreWorker>, data_converter: DataConverter) -> Self {
456        Self::new_from_core_definitions(
457            worker,
458            data_converter,
459            Default::default(),
460            Default::default(),
461        )
462    }
463
464    // TODO [rust-sdk-branch]: Eliminate this constructor in favor of passing in fake connection
465    #[doc(hidden)]
466    pub fn new_from_core_definitions(
467        worker: Arc<CoreWorker>,
468        data_converter: DataConverter,
469        activities: ActivityDefinitions,
470        workflows: WorkflowDefinitions,
471    ) -> Self {
472        Self {
473            common: CommonWorker {
474                task_queue: worker.get_config().task_queue.clone(),
475                worker,
476                worker_interceptor: None,
477                data_converter,
478            },
479            workflow_half: WorkflowHalf {
480                workflow_definitions: workflows,
481                ..Default::default()
482            },
483            activity_half: ActivityHalf {
484                activities,
485                ..Default::default()
486            },
487        }
488    }
489
490    /// Returns the task queue name this worker polls on
491    pub fn task_queue(&self) -> &str {
492        &self.common.task_queue
493    }
494
495    /// Return a handle that can be used to initiate shutdown. This is useful because [Worker::run]
496    /// takes self mutably, so you may want to obtain a handle for shutting down before running.
497    pub fn shutdown_handle(&self) -> impl Fn() + use<> {
498        let w = self.common.worker.clone();
499        move || w.initiate_shutdown()
500    }
501
502    /// Registers all activities on an activity implementer.
503    pub fn register_activities<AI: ActivityImplementer>(&mut self, instance: AI) -> &mut Self {
504        self.activity_half
505            .activities
506            .register_activities::<AI>(instance);
507        self
508    }
509    /// Registers a specific activitiy.
510    pub fn register_activity<AD>(&mut self, instance: Arc<AD::Implementer>) -> &mut Self
511    where
512        AD: ActivityDefinition + ExecutableActivity,
513        AD::Output: Send + Sync,
514    {
515        self.activity_half
516            .activities
517            .register_activity::<AD>(instance);
518        self
519    }
520
521    /// Registers all workflows on a workflow implementer.
522    pub fn register_workflow<WI: WorkflowImplementer>(&mut self) -> &mut Self {
523        self.workflow_half
524            .workflow_definitions
525            .register_workflow::<WI>();
526        self
527    }
528
529    /// Register a workflow with a custom factory for instance creation.
530    ///
531    /// See [WorkerOptionsBuilder::register_workflow_with_factory] for more.
532    pub fn register_workflow_with_factory<W, F>(&mut self, factory: F) -> &mut Self
533    where
534        W: WorkflowImplementation,
535        <W::Run as WorkflowDefinition>::Input: Send,
536        F: Fn() -> W + Send + Sync + 'static,
537    {
538        self.workflow_half
539            .workflow_definitions
540            .register_workflow_run_with_factory::<W, F>(factory);
541        self
542    }
543
544    /// Runs the worker. Eventually resolves after the worker has been explicitly shut down,
545    /// or may return early with an error in the event of some unresolvable problem.
546    pub async fn run(&mut self) -> Result<(), anyhow::Error> {
547        let shutdown_token = CancellationToken::new();
548        let (common, wf_half, act_half) = self.split_apart();
549        let (wf_future_tx, wf_future_rx) = unbounded_channel();
550        let (completions_tx, completions_rx) = unbounded_channel();
551
552        // Workflows run in a LocalSet because they use Rc<RefCell> for state management.
553        // This allows them to not require Send/Sync bounds.
554        let workflow_local_set = tokio::task::LocalSet::new();
555
556        let wf_future_joiner = async {
557            UnboundedReceiverStream::new(wf_future_rx)
558                .map(Result::<_, anyhow::Error>::Ok)
559                .try_for_each_concurrent(
560                    None,
561                    |WorkflowFutureHandle {
562                         join_handle,
563                         run_id,
564                     }| {
565                        let wf_half = &*wf_half;
566                        async move {
567                            let result = join_handle.await?;
568                            // Eviction is normal workflow lifecycle - workflows loop waiting for
569                            // eviction after completion to manage cache cleanup
570                            if let Err(e) = result
571                                && !matches!(e, WorkflowTermination::Evicted)
572                            {
573                                return Err(e.into());
574                            }
575                            debug!(run_id=%run_id, "Removing workflow from cache");
576                            wf_half.workflows.borrow_mut().remove(&run_id);
577                            wf_half.workflow_removed_from_map.notify_one();
578                            Ok(())
579                        }
580                    },
581                )
582                .await
583                .context("Workflow futures encountered an error")
584        };
585        let wf_completion_processor = async {
586            UnboundedReceiverStream::new(completions_rx)
587                .map(Ok)
588                .try_for_each_concurrent(None, |mut completion| async {
589                    encode_payloads(
590                        &mut completion,
591                        common.data_converter.codec(),
592                        &SerializationContextData::Workflow,
593                    )
594                    .await;
595                    if let Some(ref i) = common.worker_interceptor {
596                        i.on_workflow_activation_completion(&completion).await;
597                    }
598                    common.worker.complete_workflow_activation(completion).await
599                })
600                .map_err(anyhow::Error::from)
601                .await
602                .context("Workflow completions processor encountered an error")
603        };
604        tokio::try_join!(
605            // Workflow-related tasks run inside LocalSet (allows !Send futures)
606            async {
607                workflow_local_set.run_until(async {
608                    tokio::try_join!(
609                        // Workflow polling loop
610                        async {
611                            loop {
612                            let mut activation =
613                                match common.worker.poll_workflow_activation().await {
614                                    Err(PollError::ShutDown) => {
615                                        break;
616                                    }
617                                    o => o?,
618                                };
619                            decode_payloads(
620                                &mut activation,
621                                common.data_converter.codec(),
622                                &SerializationContextData::Workflow,
623                            )
624                            .await;
625                            if let Some(ref i) = common.worker_interceptor {
626                                i.on_workflow_activation(&activation).await?;
627                            }
628                            if let Some(wf_fut) = wf_half
629                                .workflow_activation_handler(
630                                    common,
631                                    shutdown_token.clone(),
632                                    activation,
633                                    &completions_tx,
634                                )
635                                .await?
636                                && wf_future_tx.send(wf_fut).is_err()
637                            {
638                                panic!(
639                                    "Receive half of completion processor channel cannot be dropped"
640                                );
641                            }
642                        }
643                        // Tell still-alive workflows to evict themselves
644                        shutdown_token.cancel();
645                        // It's important to drop these so the future and completion processors will
646                        // terminate.
647                        drop(wf_future_tx);
648                        drop(completions_tx);
649                        Result::<_, anyhow::Error>::Ok(())
650                    },
651                    wf_future_joiner,
652                )
653                }).await
654            },
655            // Only poll on the activity queue if activity functions have been registered. This
656            // makes tests which use mocks dramatically more manageable.
657            async {
658                if !act_half.activities.is_empty() {
659                    loop {
660                        let activity = common.worker.poll_activity_task().await;
661                        if matches!(activity, Err(PollError::ShutDown)) {
662                            break;
663                        }
664                        let mut activity = activity?;
665                        decode_payloads(
666                            &mut activity,
667                            common.data_converter.codec(),
668                            &SerializationContextData::Activity,
669                        )
670                        .await;
671                        act_half.activity_task_handler(
672                            common.worker.clone(),
673                            common.task_queue.clone(),
674                            common.data_converter.clone(),
675                            activity,
676                        )?;
677                    }
678                };
679                Result::<_, anyhow::Error>::Ok(())
680            },
681            wf_completion_processor,
682        )?;
683
684        if let Some(i) = self.common.worker_interceptor.as_ref() {
685            i.on_shutdown(self);
686        }
687        self.common.worker.shutdown().await;
688        Ok(())
689    }
690
691    /// Set a [WorkerInterceptor]
692    pub fn set_worker_interceptor(&mut self, interceptor: impl WorkerInterceptor + 'static) {
693        self.common.worker_interceptor = Some(Box::new(interceptor));
694    }
695
696    /// Turns this rust worker into a new worker with all the same workflows and activities
697    /// registered, but with a new underlying core worker. Can be used to swap the worker for
698    /// a replay worker, change task queues, etc.
699    pub fn with_new_core_worker(&mut self, new_core_worker: Arc<CoreWorker>) {
700        self.common.worker = new_core_worker;
701    }
702
703    /// Returns number of currently cached workflows as understood by the SDK. Importantly, this
704    /// is not the same as understood by core, though they *should* always be in sync.
705    pub fn cached_workflows(&self) -> usize {
706        self.workflow_half.workflows.borrow().len()
707    }
708
709    /// Returns the instance key for this worker, used for worker heartbeating.
710    pub fn worker_instance_key(&self) -> Uuid {
711        self.common.worker.worker_instance_key()
712    }
713
714    #[doc(hidden)]
715    pub fn core_worker(&self) -> Arc<CoreWorker> {
716        self.common.worker.clone()
717    }
718
719    fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) {
720        (
721            &mut self.common,
722            &mut self.workflow_half,
723            &mut self.activity_half,
724        )
725    }
726}
727
728impl WorkflowHalf {
729    #[allow(clippy::type_complexity)]
730    async fn workflow_activation_handler(
731        &self,
732        common: &CommonWorker,
733        shutdown_token: CancellationToken,
734        mut activation: WorkflowActivation,
735        completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
736    ) -> Result<
737        Option<
738            WorkflowFutureHandle<
739                impl Future<Output = Result<WorkflowResult<Payload>, JoinError>> + use<>,
740            >,
741        >,
742        anyhow::Error,
743    > {
744        let mut res = None;
745        let run_id = activation.run_id.clone();
746
747        // If the activation is to init a workflow, create a new workflow driver for it,
748        // using the function associated with that workflow id
749        if let Some(sw) = activation.jobs.iter_mut().find_map(|j| match j.variant {
750            Some(Variant::InitializeWorkflow(ref mut sw)) => Some(sw),
751            _ => None,
752        }) {
753            let workflow_type = sw.workflow_type.clone();
754            let payload_converter = common.data_converter.payload_converter().clone();
755            let (wff, activations) = {
756                if let Some(factory) = self.workflow_definitions.get_workflow(&workflow_type) {
757                    match WorkflowFunction::from_invocation(factory).start_workflow(
758                        common.worker.get_config().namespace.clone(),
759                        common.task_queue.clone(),
760                        run_id.clone(),
761                        std::mem::take(sw),
762                        completions_tx.clone(),
763                        payload_converter,
764                    ) {
765                        Ok(result) => result,
766                        Err(e) => {
767                            warn!("Failed to create workflow {workflow_type}: {e}");
768                            completions_tx
769                                .send(WorkflowActivationCompletion::fail(
770                                    run_id,
771                                    format!("Failed to create workflow: {e}").into(),
772                                    Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
773                                ))
774                                .expect("Completion channel intact");
775                            return Ok(None);
776                        }
777                    }
778                } else {
779                    warn!("Workflow type {workflow_type} not found");
780                    completions_tx
781                        .send(WorkflowActivationCompletion::fail(
782                            run_id,
783                            format!("Workflow type {workflow_type} not found").into(),
784                            Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
785                        ))
786                        .expect("Completion channel intact");
787                    return Ok(None);
788                }
789            };
790            // Wrap in unconstrained to prevent Tokio from imposing limits on commands per poll
791            // TODO [rust-sdk-branch]: Deadlock detection
792            let wff = tokio::task::unconstrained(wff);
793            // The LocalSet is created in Worker::run().
794            let jh = tokio::task::spawn_local(async move {
795                tokio::select! {
796                    r = wff.fuse() => r,
797                    // TODO: This probably shouldn't abort early, as it could cause an in-progress
798                    //  complete to abort. Send synthetic remove activation
799                    _ = shutdown_token.cancelled() => {
800                        Err(WorkflowTermination::Evicted)
801                    }
802                }
803            });
804            res = Some(WorkflowFutureHandle {
805                join_handle: jh,
806                run_id: run_id.clone(),
807            });
808            loop {
809                // It's possible that we've got a new initialize workflow action before the last
810                // future for this run finished evicting, as a result of how futures might be
811                // interleaved. In that case, just wait until it's not in the map, which should be
812                // a matter of only a few `poll` calls.
813                if self.workflows.borrow_mut().contains_key(&run_id) {
814                    self.workflow_removed_from_map.notified().await;
815                } else {
816                    break;
817                }
818            }
819            self.workflows.borrow_mut().insert(
820                run_id.clone(),
821                WorkflowData {
822                    activation_chan: activations,
823                },
824            );
825        }
826
827        // The activation is expected to apply to some workflow we know about. Use it to
828        // unblock things and advance the workflow.
829        if let Some(dat) = self.workflows.borrow_mut().get_mut(&run_id) {
830            dat.activation_chan
831                .send(activation)
832                .expect("Workflow should exist if we're sending it an activation");
833        } else {
834            // When we failed to start a workflow, we never inserted it into the cache. But core
835            // sends us a `RemoveFromCache` job when we mark the StartWorkflow workflow activation
836            // as a failure, which we need to complete. Other SDKs add the workflow to the cache
837            // even when the workflow type is unknown/not found. To circumvent this, we simply mark
838            // any RemoveFromCache job for workflows that are not in the cache as complete.
839            if activation.jobs.len() == 1
840                && matches!(
841                    activation.jobs.first().map(|j| &j.variant),
842                    Some(Some(Variant::RemoveFromCache(_)))
843                )
844            {
845                completions_tx
846                    .send(WorkflowActivationCompletion::from_cmds(run_id, vec![]))
847                    .expect("Completion channel intact");
848                return Ok(None);
849            }
850
851            // In all other cases, we want to error as the runtime could be in an inconsistent state
852            // at this point.
853            bail!("Got activation {activation:?} for unknown workflow {run_id}");
854        };
855
856        Ok(res)
857    }
858}
859
860impl ActivityHalf {
861    /// Spawns off a task to handle the provided activity task
862    fn activity_task_handler(
863        &mut self,
864        worker: Arc<CoreWorker>,
865        task_queue: String,
866        data_converter: DataConverter,
867        activity: ActivityTask,
868    ) -> Result<(), anyhow::Error> {
869        match activity.variant {
870            Some(activity_task::Variant::Start(start)) => {
871                let act_fn = self.activities.get(&start.activity_type).ok_or_else(|| {
872                    anyhow!(
873                        "No function registered for activity type {}",
874                        start.activity_type
875                    )
876                })?;
877                let span = info_span!(
878                    "RunActivity",
879                    "otel.name" = format!("RunActivity:{}", start.activity_type),
880                    "otel.kind" = "server",
881                    "temporalActivityID" = start.activity_id,
882                    "temporalWorkflowID" = field::Empty,
883                    "temporalRunID" = field::Empty,
884                );
885                let ct = CancellationToken::new();
886                let task_token = activity.task_token;
887                self.task_tokens_to_cancels
888                    .insert(task_token.clone().into(), ct.clone());
889
890                let (ctx, args) =
891                    ActivityContext::new(worker.clone(), ct, task_queue, task_token.clone(), start);
892                let codec_data_converter = data_converter.clone();
893
894                tokio::spawn(async move {
895                    let act_fut = async move {
896                        if let Some(info) = &ctx.info().workflow_execution {
897                            Span::current()
898                                .record("temporalWorkflowID", &info.workflow_id)
899                                .record("temporalRunID", &info.run_id);
900                        }
901                        (act_fn)(args, data_converter, ctx).await
902                    }
903                    .instrument(span);
904                    let output = AssertUnwindSafe(act_fut).catch_unwind().await;
905                    let result = match output {
906                        Err(e) => ActivityExecutionResult::fail(Failure::application_failure(
907                            format!("Activity function panicked: {}", panic_formatter(e)),
908                            true,
909                        )),
910                        Ok(Ok(p)) => ActivityExecutionResult::ok(p),
911                        Ok(Err(err)) => match err {
912                            ActivityError::Retryable {
913                                source,
914                                explicit_delay,
915                            } => ActivityExecutionResult::fail({
916                                let mut f = Failure::application_failure_from_error(
917                                    anyhow::Error::from_boxed(source),
918                                    false,
919                                );
920                                if let Some(d) = explicit_delay
921                                    && let Some(failure::FailureInfo::ApplicationFailureInfo(fi)) =
922                                        f.failure_info.as_mut()
923                                {
924                                    fi.next_retry_delay = d.try_into().ok();
925                                }
926                                f
927                            }),
928                            ActivityError::Cancelled { details } => {
929                                ActivityExecutionResult::cancel_from_details(details)
930                            }
931                            ActivityError::NonRetryable(nre) => ActivityExecutionResult::fail(
932                                Failure::application_failure_from_error(
933                                    anyhow::Error::from_boxed(nre),
934                                    true,
935                                ),
936                            ),
937                            ActivityError::WillCompleteAsync => {
938                                ActivityExecutionResult::will_complete_async()
939                            }
940                        },
941                    };
942                    let mut completion = ActivityTaskCompletion {
943                        task_token,
944                        result: Some(result),
945                    };
946                    encode_payloads(
947                        &mut completion,
948                        codec_data_converter.codec(),
949                        &SerializationContextData::Activity,
950                    )
951                    .await;
952                    worker.complete_activity_task(completion).await?;
953                    Ok::<_, anyhow::Error>(())
954                });
955            }
956            Some(activity_task::Variant::Cancel(_)) => {
957                if let Some(ct) = self
958                    .task_tokens_to_cancels
959                    .get(activity.task_token.as_slice())
960                {
961                    ct.cancel();
962                }
963            }
964            None => bail!("Undefined activity task variant"),
965        }
966        Ok(())
967    }
968}
969
970#[derive(Debug)]
971enum UnblockEvent {
972    Timer(u32, TimerResult),
973    Activity(u32, Box<ActivityResolution>),
974    WorkflowStart(u32, Box<ChildWorkflowStartStatus>),
975    WorkflowComplete(u32, Box<ChildWorkflowResult>),
976    SignalExternal(u32, Option<Failure>),
977    CancelExternal(u32, Option<Failure>),
978    NexusOperationStart(u32, Box<resolve_nexus_operation_start::Status>),
979    NexusOperationComplete(u32, Box<NexusOperationResult>),
980}
981
982/// Result of awaiting on a timer
983#[derive(Debug, Copy, Clone, PartialEq, Eq)]
984pub enum TimerResult {
985    /// The timer was cancelled
986    Cancelled,
987    /// The timer elapsed and fired
988    Fired,
989}
990
991/// Successful result of sending a signal to an external workflow
992#[derive(Debug, Clone, Copy, PartialEq, Eq)]
993pub struct SignalExternalOk;
994/// Result of awaiting on sending a signal to an external workflow
995pub type SignalExternalWfResult = Result<SignalExternalOk, Failure>;
996
997/// Successful result of sending a cancel request to an external workflow
998#[derive(Debug, Clone, Copy, PartialEq, Eq)]
999pub struct CancelExternalOk;
1000/// Result of awaiting on sending a cancel request to an external workflow
1001pub type CancelExternalWfResult = Result<CancelExternalOk, Failure>;
1002
1003trait Unblockable {
1004    type OtherDat;
1005
1006    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self;
1007}
1008
1009impl Unblockable for TimerResult {
1010    type OtherDat = ();
1011    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1012        match ue {
1013            UnblockEvent::Timer(_, result) => result,
1014            _ => panic!("Invalid unblock event for timer"),
1015        }
1016    }
1017}
1018
1019impl Unblockable for ActivityResolution {
1020    type OtherDat = ();
1021    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1022        match ue {
1023            UnblockEvent::Activity(_, result) => *result,
1024            _ => panic!("Invalid unblock event for activity"),
1025        }
1026    }
1027}
1028
1029impl Unblockable for PendingChildWorkflow {
1030    // Other data here is workflow id
1031    type OtherDat = ChildWfCommon;
1032    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1033        match ue {
1034            UnblockEvent::WorkflowStart(_, result) => Self {
1035                status: *result,
1036                common: od,
1037            },
1038            _ => panic!("Invalid unblock event for child workflow start"),
1039        }
1040    }
1041}
1042
1043impl Unblockable for ChildWorkflowResult {
1044    type OtherDat = ();
1045    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1046        match ue {
1047            UnblockEvent::WorkflowComplete(_, result) => *result,
1048            _ => panic!("Invalid unblock event for child workflow complete"),
1049        }
1050    }
1051}
1052
1053impl Unblockable for SignalExternalWfResult {
1054    type OtherDat = ();
1055    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1056        match ue {
1057            UnblockEvent::SignalExternal(_, maybefail) => {
1058                maybefail.map_or(Ok(SignalExternalOk), Err)
1059            }
1060            _ => panic!("Invalid unblock event for signal external workflow result"),
1061        }
1062    }
1063}
1064
1065impl Unblockable for CancelExternalWfResult {
1066    type OtherDat = ();
1067    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1068        match ue {
1069            UnblockEvent::CancelExternal(_, maybefail) => {
1070                maybefail.map_or(Ok(CancelExternalOk), Err)
1071            }
1072            _ => panic!("Invalid unblock event for signal external workflow result"),
1073        }
1074    }
1075}
1076
1077type NexusStartResult = Result<StartedNexusOperation, Failure>;
1078impl Unblockable for NexusStartResult {
1079    type OtherDat = NexusUnblockData;
1080    fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self {
1081        match ue {
1082            UnblockEvent::NexusOperationStart(_, result) => match *result {
1083                resolve_nexus_operation_start::Status::OperationToken(op_token) => {
1084                    Ok(StartedNexusOperation {
1085                        operation_token: Some(op_token),
1086                        unblock_dat: od,
1087                    })
1088                }
1089                resolve_nexus_operation_start::Status::StartedSync(_) => {
1090                    Ok(StartedNexusOperation {
1091                        operation_token: None,
1092                        unblock_dat: od,
1093                    })
1094                }
1095                resolve_nexus_operation_start::Status::Failed(f) => Err(f),
1096            },
1097            _ => panic!("Invalid unblock event for nexus operation"),
1098        }
1099    }
1100}
1101
1102impl Unblockable for NexusOperationResult {
1103    type OtherDat = ();
1104
1105    fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self {
1106        match ue {
1107            UnblockEvent::NexusOperationComplete(_, result) => *result,
1108            _ => panic!("Invalid unblock event for nexus operation complete"),
1109        }
1110    }
1111}
1112
1113/// Identifier for cancellable operations
1114#[derive(Debug, Clone)]
1115pub(crate) enum CancellableID {
1116    Timer(u32),
1117    Activity(u32),
1118    LocalActivity(u32),
1119    ChildWorkflow {
1120        seqnum: u32,
1121        reason: String,
1122    },
1123    SignalExternalWorkflow(u32),
1124    ExternalWorkflow {
1125        seqnum: u32,
1126        execution: NamespacedWorkflowExecution,
1127        reason: String,
1128    },
1129    /// A nexus operation (waiting for start)
1130    NexusOp(u32),
1131}
1132
1133/// Cancellation IDs that support a reason.
1134pub(crate) trait SupportsCancelReason {
1135    /// Returns a new version of this ID with the provided cancellation reason.
1136    fn with_reason(self, reason: String) -> CancellableID;
1137}
1138#[derive(Debug, Clone)]
1139pub(crate) enum CancellableIDWithReason {
1140    ChildWorkflow {
1141        seqnum: u32,
1142    },
1143    ExternalWorkflow {
1144        seqnum: u32,
1145        execution: NamespacedWorkflowExecution,
1146    },
1147}
1148impl CancellableIDWithReason {
1149    pub(crate) fn seq_num(&self) -> u32 {
1150        match self {
1151            CancellableIDWithReason::ChildWorkflow { seqnum } => *seqnum,
1152            CancellableIDWithReason::ExternalWorkflow { seqnum, .. } => *seqnum,
1153        }
1154    }
1155}
1156impl SupportsCancelReason for CancellableIDWithReason {
1157    fn with_reason(self, reason: String) -> CancellableID {
1158        match self {
1159            CancellableIDWithReason::ChildWorkflow { seqnum } => {
1160                CancellableID::ChildWorkflow { seqnum, reason }
1161            }
1162            CancellableIDWithReason::ExternalWorkflow { seqnum, execution } => {
1163                CancellableID::ExternalWorkflow {
1164                    seqnum,
1165                    execution,
1166                    reason,
1167                }
1168            }
1169        }
1170    }
1171}
1172impl From<CancellableIDWithReason> for CancellableID {
1173    fn from(v: CancellableIDWithReason) -> Self {
1174        v.with_reason("".to_string())
1175    }
1176}
1177
1178#[derive(derive_more::From)]
1179#[allow(clippy::large_enum_variant)]
1180enum RustWfCmd {
1181    #[from(ignore)]
1182    Cancel(CancellableID),
1183    ForceWFTFailure(anyhow::Error),
1184    NewCmd(CommandCreateRequest),
1185    NewNonblockingCmd(workflow_command::Variant),
1186    SubscribeChildWorkflowCompletion(CommandSubscribeChildWorkflowCompletion),
1187    SubscribeNexusOperationCompletion {
1188        seq: u32,
1189        unblocker: oneshot::Sender<UnblockEvent>,
1190    },
1191}
1192
1193struct CommandCreateRequest {
1194    cmd: WorkflowCommand,
1195    unblocker: oneshot::Sender<UnblockEvent>,
1196}
1197
1198struct CommandSubscribeChildWorkflowCompletion {
1199    seq: u32,
1200    unblocker: oneshot::Sender<UnblockEvent>,
1201}
1202
1203/// The result of running a workflow.
1204///
1205/// Successful completion returns `Ok(T)` where `T` is the workflow's return type.
1206/// Non-error terminations (cancel, eviction, continue-as-new) return `Err(WorkflowTermination)`.
1207pub type WorkflowResult<T> = Result<T, WorkflowTermination>;
1208
1209/// Represents ways a workflow can terminate without producing a normal result.
1210///
1211/// This is used as the error type in [`WorkflowResult<T>`] for non-error termination conditions
1212/// like cancellation, eviction, continue-as-new, or actual failures.
1213#[derive(Debug, thiserror::Error)]
1214pub enum WorkflowTermination {
1215    /// The workflow was cancelled.
1216    #[error("Workflow cancelled")]
1217    Cancelled,
1218
1219    /// The workflow was evicted from the cache.
1220    #[error("Workflow evicted from cache")]
1221    Evicted,
1222
1223    /// The workflow should continue as a new execution.
1224    #[error("Continue as new")]
1225    ContinueAsNew(Box<ContinueAsNewWorkflowExecution>),
1226
1227    /// The workflow failed with an error.
1228    #[error("Workflow failed: {0}")]
1229    Failed(#[source] anyhow::Error),
1230}
1231
1232impl WorkflowTermination {
1233    /// Construct a [WorkflowTermination::ContinueAsNew]
1234    pub fn continue_as_new(can: ContinueAsNewWorkflowExecution) -> Self {
1235        Self::ContinueAsNew(Box::new(can))
1236    }
1237
1238    /// Construct a [WorkflowTermination::Failed] variant from any error.
1239    pub fn failed(err: impl Into<anyhow::Error>) -> Self {
1240        Self::Failed(err.into())
1241    }
1242}
1243
1244impl From<anyhow::Error> for WorkflowTermination {
1245    fn from(err: anyhow::Error) -> Self {
1246        Self::Failed(err)
1247    }
1248}
1249
1250impl From<ActivityExecutionError> for WorkflowTermination {
1251    fn from(value: ActivityExecutionError) -> Self {
1252        Self::failed(value)
1253    }
1254}
1255
1256/// Activity functions may return these values when exiting
1257#[derive(Debug)]
1258pub enum ActExitValue<T> {
1259    /// Completion requires an asynchronous callback
1260    WillCompleteAsync,
1261    /// Finish with a result
1262    Normal(T),
1263}
1264
1265impl<T: AsJsonPayloadExt> From<T> for ActExitValue<T> {
1266    fn from(t: T) -> Self {
1267        Self::Normal(t)
1268    }
1269}
1270
1271/// Attempts to turn caught panics into something printable
1272fn panic_formatter(panic: Box<dyn Any>) -> Box<dyn Display> {
1273    _panic_formatter::<&str>(panic)
1274}
1275fn _panic_formatter<T: 'static + PrintablePanicType>(panic: Box<dyn Any>) -> Box<dyn Display> {
1276    match panic.downcast::<T>() {
1277        Ok(d) => d,
1278        Err(orig) => {
1279            if TypeId::of::<<T as PrintablePanicType>::NextType>()
1280                == TypeId::of::<EndPrintingAttempts>()
1281            {
1282                return Box::new("Couldn't turn panic into a string");
1283            }
1284            _panic_formatter::<T::NextType>(orig)
1285        }
1286    }
1287}
1288trait PrintablePanicType: Display {
1289    type NextType: PrintablePanicType;
1290}
1291impl PrintablePanicType for &str {
1292    type NextType = String;
1293}
1294impl PrintablePanicType for String {
1295    type NextType = EndPrintingAttempts;
1296}
1297struct EndPrintingAttempts {}
1298impl Display for EndPrintingAttempts {
1299    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1300        write!(f, "Will never be printed")
1301    }
1302}
1303impl PrintablePanicType for EndPrintingAttempts {
1304    type NextType = EndPrintingAttempts;
1305}
1306
1307#[cfg(test)]
1308mod tests {
1309    use super::*;
1310    use temporalio_macros::{activities, workflow, workflow_methods};
1311
1312    struct MyActivities {}
1313
1314    #[activities]
1315    impl MyActivities {
1316        #[activity]
1317        async fn my_activity(_ctx: ActivityContext) -> Result<(), ActivityError> {
1318            Ok(())
1319        }
1320
1321        #[activity]
1322        async fn takes_self(
1323            self: Arc<Self>,
1324            _ctx: ActivityContext,
1325            _: String,
1326        ) -> Result<(), ActivityError> {
1327            Ok(())
1328        }
1329    }
1330
1331    #[test]
1332    fn test_activity_registration() {
1333        let act_instance = MyActivities {};
1334        let _ = WorkerOptions::new("task_q").register_activities(act_instance);
1335    }
1336
1337    // Compile-only test for workflow context invocation
1338    #[allow(unused, clippy::diverging_sub_expression)]
1339    fn test_activity_via_workflow_context() {
1340        let wf_ctx: WorkflowContext<MyWorkflow> = unimplemented!();
1341        wf_ctx.start_activity(MyActivities::my_activity, (), ActivityOptions::default());
1342        wf_ctx.start_activity(
1343            MyActivities::takes_self,
1344            "Hi".to_owned(),
1345            ActivityOptions::default(),
1346        );
1347    }
1348
1349    // Compile-only test for direct invocation via .run()
1350    #[allow(dead_code, unreachable_code, unused, clippy::diverging_sub_expression)]
1351    async fn test_activity_direct_invocation() {
1352        let ctx: ActivityContext = unimplemented!();
1353        let _result = MyActivities::my_activity.run(ctx).await;
1354    }
1355
1356    #[workflow]
1357    struct MyWorkflow {
1358        counter: u32,
1359    }
1360
1361    #[allow(dead_code)]
1362    #[workflow_methods]
1363    impl MyWorkflow {
1364        #[init]
1365        fn new(_ctx: &WorkflowContextView, _input: String) -> Self {
1366            Self { counter: 0 }
1367        }
1368
1369        #[run]
1370        async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
1371            Ok(format!("Counter: {}", ctx.state(|s| s.counter)))
1372        }
1373
1374        #[signal(name = "increment")]
1375        fn increment_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>, amount: u32) {
1376            self.counter += amount;
1377        }
1378
1379        #[signal]
1380        async fn async_signal(_ctx: &mut WorkflowContext<Self>) {}
1381
1382        #[query]
1383        fn get_counter(&self, _ctx: &WorkflowContextView) -> u32 {
1384            self.counter
1385        }
1386
1387        #[update(name = "double")]
1388        fn double_counter(&mut self, _ctx: &mut SyncWorkflowContext<Self>) -> u32 {
1389            self.counter *= 2;
1390            self.counter
1391        }
1392
1393        #[update]
1394        async fn async_update(_ctx: &mut WorkflowContext<Self>, val: i32) -> i32 {
1395            val * 2
1396        }
1397    }
1398
1399    #[test]
1400    fn test_workflow_registration() {
1401        let _ = WorkerOptions::new("task_q").register_workflow::<MyWorkflow>();
1402    }
1403}