temporal_sdk_core_api/
lib.rs

1#[cfg(feature = "envconfig")]
2pub mod envconfig;
3pub mod errors;
4pub mod telemetry;
5pub mod worker;
6
7use crate::{
8    errors::{
9        CompleteActivityError, CompleteNexusError, CompleteWfError, PollError,
10        WorkerValidationError,
11    },
12    worker::WorkerConfig,
13};
14use std::sync::Arc;
15use temporal_sdk_core_protos::coresdk::{
16    ActivityHeartbeat, ActivityTaskCompletion,
17    activity_task::ActivityTask,
18    nexus::{NexusTask, NexusTaskCompletion},
19    workflow_activation::WorkflowActivation,
20    workflow_completion::WorkflowActivationCompletion,
21};
22
23/// This trait is the primary way by which language specific SDKs interact with the core SDK.
24/// It represents one worker, which has a (potentially shared) client for connecting to the service
25/// and is bound to a specific task queue.
26#[async_trait::async_trait]
27pub trait Worker: Send + Sync {
28    /// Validate that the worker can properly connect to server, plus any other validation that
29    /// needs to be done asynchronously. Lang SDKs should call this function once before calling
30    /// any others.
31    async fn validate(&self) -> Result<(), WorkerValidationError>;
32
33    /// Ask the worker for some work, returning a [WorkflowActivation]. It is then the language
34    /// SDK's responsibility to call the appropriate workflow code with the provided inputs. Blocks
35    /// indefinitely until such work is available or [Worker::shutdown] is called.
36    ///
37    /// It is important to understand that all activations must be responded to. There can only
38    /// be one outstanding activation for a particular run of a workflow at any time. If an
39    /// activation is not responded to, it will cause that workflow to become stuck forever.
40    ///
41    /// See [WorkflowActivation] for more details on the expected behavior of lang w.r.t activation
42    /// & job processing.
43    ///
44    /// Do not call poll concurrently. It handles polling the server concurrently internally.
45    async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollError>;
46
47    /// Ask the worker for some work, returning an [ActivityTask]. It is then the language SDK's
48    /// responsibility to call the appropriate activity code with the provided inputs. Blocks
49    /// indefinitely until such work is available or [Worker::shutdown] is called.
50    ///
51    /// Do not call poll concurrently. It handles polling the server concurrently internally.
52    async fn poll_activity_task(&self) -> Result<ActivityTask, PollError>;
53
54    /// Ask the worker for some nexus related work. It is then the language SDK's
55    /// responsibility to call the appropriate nexus operation handler code with the provided
56    /// inputs. Blocks indefinitely until such work is available or [Worker::shutdown] is called.
57    ///
58    /// All tasks must be responded to for shutdown to complete.
59    ///
60    /// Do not call poll concurrently. It handles polling the server concurrently internally.
61    async fn poll_nexus_task(&self) -> Result<NexusTask, PollError>;
62
63    /// Tell the worker that a workflow activation has completed. May (and should) be freely called
64    /// concurrently. The future may take some time to resolve, as fetching more events might be
65    /// necessary for completion to... complete - thus SDK implementers should make sure they do
66    /// not serialize completions.
67    async fn complete_workflow_activation(
68        &self,
69        completion: WorkflowActivationCompletion,
70    ) -> Result<(), CompleteWfError>;
71
72    /// Tell the worker that an activity has finished executing. May (and should) be freely called
73    /// concurrently.
74    async fn complete_activity_task(
75        &self,
76        completion: ActivityTaskCompletion,
77    ) -> Result<(), CompleteActivityError>;
78
79    /// Tell the worker that a nexus task has completed. May (and should) be freely called
80    /// concurrently.
81    async fn complete_nexus_task(
82        &self,
83        completion: NexusTaskCompletion,
84    ) -> Result<(), CompleteNexusError>;
85
86    /// Notify the Temporal service that an activity is still alive. Long running activities that
87    /// take longer than `activity_heartbeat_timeout` to finish must call this function in order to
88    /// report progress, otherwise the activity will timeout and a new attempt will be scheduled.
89    ///
90    /// The first heartbeat request will be sent immediately, subsequent rapid calls to this
91    /// function will result in heartbeat requests being aggregated and the last one received during
92    /// the aggregation period will be sent to the server, where that period is defined as half the
93    /// heartbeat timeout.
94    ///
95    /// Unlike Java/Go SDKs we do not return cancellation status as part of heartbeat response and
96    /// instead send it as a separate activity task to the lang, decoupling heartbeat and
97    /// cancellation processing.
98    ///
99    /// For now activity still need to send heartbeats if they want to receive cancellation
100    /// requests. In the future we will change this and will dispatch cancellations more
101    /// proactively. Note that this function does not block on the server call and returns
102    /// immediately. Underlying validation errors are swallowed and logged, this has been agreed to
103    /// be optimal behavior for the user as we don't want to break activity execution due to badly
104    /// configured heartbeat options.
105    fn record_activity_heartbeat(&self, details: ActivityHeartbeat);
106
107    /// Request that a workflow be evicted by its run id. This will generate a workflow activation
108    /// with the eviction job inside it to be eventually returned by
109    /// [Worker::poll_workflow_activation]. If the workflow had any existing outstanding activations,
110    /// such activations are invalidated and subsequent completions of them will do nothing and log
111    /// a warning.
112    fn request_workflow_eviction(&self, run_id: &str);
113
114    /// Return this worker's config
115    fn get_config(&self) -> &WorkerConfig;
116
117    /// Initiate shutdown. See [Worker::shutdown], this is just a sync version that starts the
118    /// process. You can then wait on `shutdown` or [Worker::finalize_shutdown].
119    fn initiate_shutdown(&self);
120
121    /// Initiates async shutdown procedure, eventually ceases all polling of the server and shuts
122    /// down this worker. [Worker::poll_workflow_activation] and [Worker::poll_activity_task] should
123    /// be called until both return a `ShutDown` error to ensure that all outstanding work is
124    /// complete. This means that the lang sdk will need to call
125    /// [Worker::complete_workflow_activation] and [Worker::complete_activity_task] for those
126    /// workflows & activities until they are done. At that point, the lang SDK can end the process,
127    /// or drop the [Worker] instance via [Worker::finalize_shutdown], which will close the
128    /// connection and free resources. If you have set [WorkerConfig::no_remote_activities], you may
129    /// skip calling [Worker::poll_activity_task].
130    ///
131    /// Lang implementations should use [Worker::initiate_shutdown] followed by
132    /// [Worker::finalize_shutdown].
133    async fn shutdown(&self);
134
135    /// Completes shutdown and frees all resources. You should avoid simply dropping workers, as
136    /// this does not allow async tasks to report any panics that may have occurred cleanly.
137    ///
138    /// This should be called only after [Worker::shutdown] has resolved and/or both polling
139    /// functions have returned `ShutDown` errors.
140    async fn finalize_shutdown(self);
141}
142
143#[async_trait::async_trait]
144impl<W> Worker for Arc<W>
145where
146    W: Worker + ?Sized,
147{
148    async fn validate(&self) -> Result<(), WorkerValidationError> {
149        (**self).validate().await
150    }
151
152    async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollError> {
153        (**self).poll_workflow_activation().await
154    }
155
156    async fn poll_activity_task(&self) -> Result<ActivityTask, PollError> {
157        (**self).poll_activity_task().await
158    }
159
160    async fn poll_nexus_task(&self) -> Result<NexusTask, PollError> {
161        (**self).poll_nexus_task().await
162    }
163
164    async fn complete_workflow_activation(
165        &self,
166        completion: WorkflowActivationCompletion,
167    ) -> Result<(), CompleteWfError> {
168        (**self).complete_workflow_activation(completion).await
169    }
170
171    async fn complete_activity_task(
172        &self,
173        completion: ActivityTaskCompletion,
174    ) -> Result<(), CompleteActivityError> {
175        (**self).complete_activity_task(completion).await
176    }
177
178    async fn complete_nexus_task(
179        &self,
180        completion: NexusTaskCompletion,
181    ) -> Result<(), CompleteNexusError> {
182        (**self).complete_nexus_task(completion).await
183    }
184
185    fn record_activity_heartbeat(&self, details: ActivityHeartbeat) {
186        (**self).record_activity_heartbeat(details)
187    }
188
189    fn request_workflow_eviction(&self, run_id: &str) {
190        (**self).request_workflow_eviction(run_id)
191    }
192
193    fn get_config(&self) -> &WorkerConfig {
194        (**self).get_config()
195    }
196
197    fn initiate_shutdown(&self) {
198        (**self).initiate_shutdown()
199    }
200
201    async fn shutdown(&self) {
202        (**self).shutdown().await
203    }
204
205    async fn finalize_shutdown(self) {
206        panic!("Can't finalize shutdown on Arc'd worker")
207    }
208}
209
210macro_rules! dbg_panic {
211  ($($arg:tt)*) => {
212      use tracing::error;
213      error!($($arg)*);
214      debug_assert!(false, $($arg)*);
215  };
216}
217pub(crate) use dbg_panic;