temporal_sdk_core_api/lib.rs
1#[cfg(feature = "envconfig")]
2pub mod envconfig;
3pub mod errors;
4pub mod telemetry;
5pub mod worker;
6
7use crate::{
8 errors::{
9 CompleteActivityError, CompleteNexusError, CompleteWfError, PollError,
10 WorkerValidationError,
11 },
12 worker::WorkerConfig,
13};
14use std::sync::Arc;
15use temporal_sdk_core_protos::coresdk::{
16 ActivityHeartbeat, ActivityTaskCompletion,
17 activity_task::ActivityTask,
18 nexus::{NexusTask, NexusTaskCompletion},
19 workflow_activation::WorkflowActivation,
20 workflow_completion::WorkflowActivationCompletion,
21};
22
23/// This trait is the primary way by which language specific SDKs interact with the core SDK.
24/// It represents one worker, which has a (potentially shared) client for connecting to the service
25/// and is bound to a specific task queue.
26#[async_trait::async_trait]
27pub trait Worker: Send + Sync {
28 /// Validate that the worker can properly connect to server, plus any other validation that
29 /// needs to be done asynchronously. Lang SDKs should call this function once before calling
30 /// any others.
31 async fn validate(&self) -> Result<(), WorkerValidationError>;
32
33 /// Ask the worker for some work, returning a [WorkflowActivation]. It is then the language
34 /// SDK's responsibility to call the appropriate workflow code with the provided inputs. Blocks
35 /// indefinitely until such work is available or [Worker::shutdown] is called.
36 ///
37 /// It is important to understand that all activations must be responded to. There can only
38 /// be one outstanding activation for a particular run of a workflow at any time. If an
39 /// activation is not responded to, it will cause that workflow to become stuck forever.
40 ///
41 /// See [WorkflowActivation] for more details on the expected behavior of lang w.r.t activation
42 /// & job processing.
43 ///
44 /// Do not call poll concurrently. It handles polling the server concurrently internally.
45 async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollError>;
46
47 /// Ask the worker for some work, returning an [ActivityTask]. It is then the language SDK's
48 /// responsibility to call the appropriate activity code with the provided inputs. Blocks
49 /// indefinitely until such work is available or [Worker::shutdown] is called.
50 ///
51 /// Do not call poll concurrently. It handles polling the server concurrently internally.
52 async fn poll_activity_task(&self) -> Result<ActivityTask, PollError>;
53
54 /// Ask the worker for some nexus related work. It is then the language SDK's
55 /// responsibility to call the appropriate nexus operation handler code with the provided
56 /// inputs. Blocks indefinitely until such work is available or [Worker::shutdown] is called.
57 ///
58 /// All tasks must be responded to for shutdown to complete.
59 ///
60 /// Do not call poll concurrently. It handles polling the server concurrently internally.
61 async fn poll_nexus_task(&self) -> Result<NexusTask, PollError>;
62
63 /// Tell the worker that a workflow activation has completed. May (and should) be freely called
64 /// concurrently. The future may take some time to resolve, as fetching more events might be
65 /// necessary for completion to... complete - thus SDK implementers should make sure they do
66 /// not serialize completions.
67 async fn complete_workflow_activation(
68 &self,
69 completion: WorkflowActivationCompletion,
70 ) -> Result<(), CompleteWfError>;
71
72 /// Tell the worker that an activity has finished executing. May (and should) be freely called
73 /// concurrently.
74 async fn complete_activity_task(
75 &self,
76 completion: ActivityTaskCompletion,
77 ) -> Result<(), CompleteActivityError>;
78
79 /// Tell the worker that a nexus task has completed. May (and should) be freely called
80 /// concurrently.
81 async fn complete_nexus_task(
82 &self,
83 completion: NexusTaskCompletion,
84 ) -> Result<(), CompleteNexusError>;
85
86 /// Notify the Temporal service that an activity is still alive. Long running activities that
87 /// take longer than `activity_heartbeat_timeout` to finish must call this function in order to
88 /// report progress, otherwise the activity will timeout and a new attempt will be scheduled.
89 ///
90 /// The first heartbeat request will be sent immediately, subsequent rapid calls to this
91 /// function will result in heartbeat requests being aggregated and the last one received during
92 /// the aggregation period will be sent to the server, where that period is defined as half the
93 /// heartbeat timeout.
94 ///
95 /// Unlike Java/Go SDKs we do not return cancellation status as part of heartbeat response and
96 /// instead send it as a separate activity task to the lang, decoupling heartbeat and
97 /// cancellation processing.
98 ///
99 /// For now activity still need to send heartbeats if they want to receive cancellation
100 /// requests. In the future we will change this and will dispatch cancellations more
101 /// proactively. Note that this function does not block on the server call and returns
102 /// immediately. Underlying validation errors are swallowed and logged, this has been agreed to
103 /// be optimal behavior for the user as we don't want to break activity execution due to badly
104 /// configured heartbeat options.
105 fn record_activity_heartbeat(&self, details: ActivityHeartbeat);
106
107 /// Request that a workflow be evicted by its run id. This will generate a workflow activation
108 /// with the eviction job inside it to be eventually returned by
109 /// [Worker::poll_workflow_activation]. If the workflow had any existing outstanding activations,
110 /// such activations are invalidated and subsequent completions of them will do nothing and log
111 /// a warning.
112 fn request_workflow_eviction(&self, run_id: &str);
113
114 /// Return this worker's config
115 fn get_config(&self) -> &WorkerConfig;
116
117 /// Initiate shutdown. See [Worker::shutdown], this is just a sync version that starts the
118 /// process. You can then wait on `shutdown` or [Worker::finalize_shutdown].
119 fn initiate_shutdown(&self);
120
121 /// Initiates async shutdown procedure, eventually ceases all polling of the server and shuts
122 /// down this worker. [Worker::poll_workflow_activation] and [Worker::poll_activity_task] should
123 /// be called until both return a `ShutDown` error to ensure that all outstanding work is
124 /// complete. This means that the lang sdk will need to call
125 /// [Worker::complete_workflow_activation] and [Worker::complete_activity_task] for those
126 /// workflows & activities until they are done. At that point, the lang SDK can end the process,
127 /// or drop the [Worker] instance via [Worker::finalize_shutdown], which will close the
128 /// connection and free resources. If you have set [WorkerConfig::no_remote_activities], you may
129 /// skip calling [Worker::poll_activity_task].
130 ///
131 /// Lang implementations should use [Worker::initiate_shutdown] followed by
132 /// [Worker::finalize_shutdown].
133 async fn shutdown(&self);
134
135 /// Completes shutdown and frees all resources. You should avoid simply dropping workers, as
136 /// this does not allow async tasks to report any panics that may have occurred cleanly.
137 ///
138 /// This should be called only after [Worker::shutdown] has resolved and/or both polling
139 /// functions have returned `ShutDown` errors.
140 async fn finalize_shutdown(self);
141}
142
143#[async_trait::async_trait]
144impl<W> Worker for Arc<W>
145where
146 W: Worker + ?Sized,
147{
148 async fn validate(&self) -> Result<(), WorkerValidationError> {
149 (**self).validate().await
150 }
151
152 async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollError> {
153 (**self).poll_workflow_activation().await
154 }
155
156 async fn poll_activity_task(&self) -> Result<ActivityTask, PollError> {
157 (**self).poll_activity_task().await
158 }
159
160 async fn poll_nexus_task(&self) -> Result<NexusTask, PollError> {
161 (**self).poll_nexus_task().await
162 }
163
164 async fn complete_workflow_activation(
165 &self,
166 completion: WorkflowActivationCompletion,
167 ) -> Result<(), CompleteWfError> {
168 (**self).complete_workflow_activation(completion).await
169 }
170
171 async fn complete_activity_task(
172 &self,
173 completion: ActivityTaskCompletion,
174 ) -> Result<(), CompleteActivityError> {
175 (**self).complete_activity_task(completion).await
176 }
177
178 async fn complete_nexus_task(
179 &self,
180 completion: NexusTaskCompletion,
181 ) -> Result<(), CompleteNexusError> {
182 (**self).complete_nexus_task(completion).await
183 }
184
185 fn record_activity_heartbeat(&self, details: ActivityHeartbeat) {
186 (**self).record_activity_heartbeat(details)
187 }
188
189 fn request_workflow_eviction(&self, run_id: &str) {
190 (**self).request_workflow_eviction(run_id)
191 }
192
193 fn get_config(&self) -> &WorkerConfig {
194 (**self).get_config()
195 }
196
197 fn initiate_shutdown(&self) {
198 (**self).initiate_shutdown()
199 }
200
201 async fn shutdown(&self) {
202 (**self).shutdown().await
203 }
204
205 async fn finalize_shutdown(self) {
206 panic!("Can't finalize shutdown on Arc'd worker")
207 }
208}
209
210macro_rules! dbg_panic {
211 ($($arg:tt)*) => {
212 use tracing::error;
213 error!($($arg)*);
214 debug_assert!(false, $($arg)*);
215 };
216}
217pub(crate) use dbg_panic;