squads_temporal_sdk_core/
lib.rs1#![warn(missing_docs)] #![allow(clippy::upper_case_acronyms)]
3
4#[cfg(test)]
8#[macro_use]
9pub extern crate assert_matches;
10#[macro_use]
11extern crate tracing;
12extern crate core;
13
14mod abstractions;
15#[cfg(feature = "debug-plugin")]
16pub mod debug_client;
17#[cfg(feature = "ephemeral-server")]
18pub mod ephemeral_server;
19mod internal_flags;
20mod pollers;
21mod protosext;
22pub mod replay;
23pub(crate) mod retry_logic;
24pub mod telemetry;
25mod worker;
26
27#[cfg(test)]
28mod core_tests;
29#[cfg(any(feature = "test-utilities", test))]
30#[macro_use]
31pub mod test_help;
32
33pub(crate) use temporal_sdk_core_api::errors;
34
35pub use pollers::{
36 Client, ClientOptions, ClientOptionsBuilder, ClientTlsConfig, RetryClient, RetryConfig,
37 TlsConfig, WorkflowClientTrait,
38};
39pub use temporal_sdk_core_api as api;
40pub use temporal_sdk_core_protos as protos;
41pub use temporal_sdk_core_protos::TaskToken;
42pub use url::Url;
43pub use worker::{
44 FixedSizeSlotSupplier, RealSysInfo, ResourceBasedSlotsOptions,
45 ResourceBasedSlotsOptionsBuilder, ResourceBasedTuner, ResourceSlotOptions, SlotSupplierOptions,
46 TunerBuilder, TunerHolder, TunerHolderOptions, TunerHolderOptionsBuilder, Worker, WorkerConfig,
47 WorkerConfigBuilder,
48};
49
50pub use crate::worker::client::{
52 PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient, WorkflowTaskCompletion,
53};
54use crate::{
55 replay::{HistoryForReplay, ReplayWorkerInput},
56 telemetry::{
57 TelemetryInstance, metrics::MetricsContext, remove_trace_subscriber_for_current_thread,
58 set_trace_subscriber_for_current_thread, telemetry_init,
59 },
60 worker::client::WorkerClientBag,
61};
62use anyhow::bail;
63use futures_util::Stream;
64use std::sync::{Arc, OnceLock};
65use temporal_client::{
66 ConfiguredClient, NamespacedClient, SharedReplaceableClient, TemporalServiceClientWithMetrics,
67};
68use temporal_sdk_core_api::{
69 Worker as WorkerTrait,
70 errors::{CompleteActivityError, PollError},
71 telemetry::TelemetryOptions,
72};
73use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
74
75pub fn init_worker<CT>(
87 runtime: &CoreRuntime,
88 worker_config: WorkerConfig,
89 client: CT,
90) -> Result<Worker, anyhow::Error>
91where
92 CT: Into<sealed::AnyClient>,
93{
94 if worker_config.namespace.is_empty() {
95 bail!("Worker namespace cannot be empty");
96 }
97
98 let client = RetryClient::new(
99 SharedReplaceableClient::new(init_worker_client(&worker_config, client)),
100 RetryConfig::default(),
101 );
102 let client_ident = client.identity();
103 let sticky_q = sticky_q_name_for_worker(&client_ident, &worker_config);
104
105 if client_ident.is_empty() {
106 bail!("Client identity cannot be empty. Either lang or user should be setting this value");
107 }
108
109 let heartbeat_fn = worker_config
110 .heartbeat_interval
111 .map(|_| Arc::new(OnceLock::new()));
112
113 let client_bag = Arc::new(WorkerClientBag::new(
114 client,
115 worker_config.namespace.clone(),
116 client_ident,
117 worker_config.versioning_strategy.clone(),
118 heartbeat_fn.clone(),
119 ));
120
121 Ok(Worker::new(
122 worker_config,
123 sticky_q,
124 client_bag,
125 Some(&runtime.telemetry),
126 heartbeat_fn,
127 ))
128}
129
130pub fn init_replay_worker<I>(rwi: ReplayWorkerInput<I>) -> Result<Worker, anyhow::Error>
137where
138 I: Stream<Item = HistoryForReplay> + Send + 'static,
139{
140 info!(
141 task_queue = rwi.config.task_queue.as_str(),
142 "Registering replay worker"
143 );
144 rwi.into_core_worker()
145}
146
147pub(crate) fn init_worker_client<CT>(config: &WorkerConfig, client: CT) -> Client
148where
149 CT: Into<sealed::AnyClient>,
150{
151 let mut client = Client::new(*client.into().into_inner(), config.namespace.clone());
152 if let Some(ref id_override) = config.client_identity_override {
153 client.options_mut().identity.clone_from(id_override);
154 }
155 client
156}
157
158pub(crate) fn sticky_q_name_for_worker(
161 process_identity: &str,
162 config: &WorkerConfig,
163) -> Option<String> {
164 if config.max_cached_workflows > 0 {
165 Some(format!(
166 "{}-{}",
167 &process_identity,
168 uuid::Uuid::new_v4().simple()
169 ))
170 } else {
171 None
172 }
173}
174
175mod sealed {
176 use super::*;
177 use temporal_client::SharedReplaceableClient;
178
179 pub struct AnyClient {
184 pub(crate) inner: Box<ConfiguredClient<TemporalServiceClientWithMetrics>>,
185 }
186 impl AnyClient {
187 pub(crate) fn into_inner(self) -> Box<ConfiguredClient<TemporalServiceClientWithMetrics>> {
188 self.inner
189 }
190 }
191
192 impl From<ConfiguredClient<TemporalServiceClientWithMetrics>> for AnyClient {
193 fn from(c: ConfiguredClient<TemporalServiceClientWithMetrics>) -> Self {
194 Self { inner: Box::new(c) }
195 }
196 }
197
198 impl From<Client> for AnyClient {
199 fn from(c: Client) -> Self {
200 c.into_inner().into()
201 }
202 }
203
204 impl<T> From<RetryClient<T>> for AnyClient
205 where
206 T: Into<AnyClient>,
207 {
208 fn from(c: RetryClient<T>) -> Self {
209 c.into_inner().into()
210 }
211 }
212
213 impl<T> From<SharedReplaceableClient<T>> for AnyClient
214 where
215 T: Into<AnyClient> + Clone + Send + Sync,
216 {
217 fn from(c: SharedReplaceableClient<T>) -> Self {
218 c.inner_clone().into()
219 }
220 }
221
222 impl<T> From<Arc<T>> for AnyClient
223 where
224 T: Into<AnyClient> + Clone,
225 {
226 fn from(c: Arc<T>) -> Self {
227 Arc::unwrap_or_clone(c).into()
228 }
229 }
230}
231
232pub struct CoreRuntime {
236 telemetry: TelemetryInstance,
237 runtime: Option<tokio::runtime::Runtime>,
238 runtime_handle: tokio::runtime::Handle,
239}
240
241pub struct TokioRuntimeBuilder<F> {
243 pub inner: tokio::runtime::Builder,
245 pub lang_on_thread_start: Option<F>,
247}
248
249impl Default for TokioRuntimeBuilder<Box<dyn Fn() + Send + Sync>> {
250 fn default() -> Self {
251 TokioRuntimeBuilder {
252 inner: tokio::runtime::Builder::new_multi_thread(),
253 lang_on_thread_start: None,
254 }
255 }
256}
257
258impl CoreRuntime {
259 pub fn new<F>(
273 telemetry_options: TelemetryOptions,
274 mut tokio_builder: TokioRuntimeBuilder<F>,
275 ) -> Result<Self, anyhow::Error>
276 where
277 F: Fn() + Send + Sync + 'static,
278 {
279 let telemetry = telemetry_init(telemetry_options)?;
280 let subscriber = telemetry.trace_subscriber();
281 let runtime = tokio_builder
282 .inner
283 .enable_all()
284 .on_thread_start(move || {
285 if let Some(sub) = subscriber.as_ref() {
286 set_trace_subscriber_for_current_thread(sub.clone());
287 }
288 if let Some(lang_on_thread_start) = tokio_builder.lang_on_thread_start.as_ref() {
289 lang_on_thread_start();
290 }
291 })
292 .build()?;
293 let _rg = runtime.enter();
294 let mut me = Self::new_assume_tokio_initialized_telem(telemetry);
295 me.runtime = Some(runtime);
296 Ok(me)
297 }
298
299 pub fn new_assume_tokio(telemetry_options: TelemetryOptions) -> Result<Self, anyhow::Error> {
305 let telemetry = telemetry_init(telemetry_options)?;
306 Ok(Self::new_assume_tokio_initialized_telem(telemetry))
307 }
308
309 pub fn new_assume_tokio_initialized_telem(telemetry: TelemetryInstance) -> Self {
315 let runtime_handle = tokio::runtime::Handle::current();
316 if let Some(sub) = telemetry.trace_subscriber() {
317 set_trace_subscriber_for_current_thread(sub);
318 }
319 Self {
320 telemetry,
321 runtime: None,
322 runtime_handle,
323 }
324 }
325
326 pub fn tokio_handle(&self) -> tokio::runtime::Handle {
328 self.runtime_handle.clone()
329 }
330
331 pub fn telemetry(&self) -> &TelemetryInstance {
333 &self.telemetry
334 }
335
336 pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
338 &mut self.telemetry
339 }
340}
341
342impl Drop for CoreRuntime {
343 fn drop(&mut self) {
344 remove_trace_subscriber_for_current_thread();
345 }
346}