squads_temporal_sdk_core/
lib.rs

1#![warn(missing_docs)] // error if there are missing docs
2#![allow(clippy::upper_case_acronyms)]
3
4//! This crate provides a basis for creating new Temporal SDKs without completely starting from
5//! scratch
6
7#[cfg(test)]
8#[macro_use]
9pub extern crate assert_matches;
10#[macro_use]
11extern crate tracing;
12extern crate core;
13
14mod abstractions;
15#[cfg(feature = "debug-plugin")]
16pub mod debug_client;
17#[cfg(feature = "ephemeral-server")]
18pub mod ephemeral_server;
19mod internal_flags;
20mod pollers;
21mod protosext;
22pub mod replay;
23pub(crate) mod retry_logic;
24pub mod telemetry;
25mod worker;
26
27#[cfg(test)]
28mod core_tests;
29#[cfg(any(feature = "test-utilities", test))]
30#[macro_use]
31pub mod test_help;
32
33pub(crate) use temporal_sdk_core_api::errors;
34
35pub use pollers::{
36    Client, ClientOptions, ClientOptionsBuilder, ClientTlsConfig, RetryClient, RetryConfig,
37    TlsConfig, WorkflowClientTrait,
38};
39pub use temporal_sdk_core_api as api;
40pub use temporal_sdk_core_protos as protos;
41pub use temporal_sdk_core_protos::TaskToken;
42pub use url::Url;
43pub use worker::{
44    FixedSizeSlotSupplier, RealSysInfo, ResourceBasedSlotsOptions,
45    ResourceBasedSlotsOptionsBuilder, ResourceBasedTuner, ResourceSlotOptions, SlotSupplierOptions,
46    TunerBuilder, TunerHolder, TunerHolderOptions, TunerHolderOptionsBuilder, Worker, WorkerConfig,
47    WorkerConfigBuilder,
48};
49
50/// Expose [WorkerClient] symbols
51pub use crate::worker::client::{
52    PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient, WorkflowTaskCompletion,
53};
54use crate::{
55    replay::{HistoryForReplay, ReplayWorkerInput},
56    telemetry::{
57        TelemetryInstance, metrics::MetricsContext, remove_trace_subscriber_for_current_thread,
58        set_trace_subscriber_for_current_thread, telemetry_init,
59    },
60    worker::client::WorkerClientBag,
61};
62use anyhow::bail;
63use futures_util::Stream;
64use std::sync::{Arc, OnceLock};
65use temporal_client::{
66    ConfiguredClient, NamespacedClient, SharedReplaceableClient, TemporalServiceClientWithMetrics,
67};
68use temporal_sdk_core_api::{
69    Worker as WorkerTrait,
70    errors::{CompleteActivityError, PollError},
71    telemetry::TelemetryOptions,
72};
73use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
74
75/// Initialize a worker bound to a task queue.
76///
77/// You will need to have already initialized a [CoreRuntime] which will be used for this worker.
78/// After the worker is initialized, you should use [CoreRuntime::tokio_handle] to run the worker's
79/// async functions.
80///
81/// Lang implementations may pass in a [ConfiguredClient] directly (or a
82/// [RetryClient] wrapping one, or a handful of other variants of the same idea). When they do so,
83/// this function will always overwrite the client retry configuration, force the client to use the
84/// namespace defined in the worker config, and set the client identity appropriately. IE: Use
85/// [ClientOptions::connect_no_namespace], not [ClientOptions::connect].
86pub fn init_worker<CT>(
87    runtime: &CoreRuntime,
88    worker_config: WorkerConfig,
89    client: CT,
90) -> Result<Worker, anyhow::Error>
91where
92    CT: Into<sealed::AnyClient>,
93{
94    if worker_config.namespace.is_empty() {
95        bail!("Worker namespace cannot be empty");
96    }
97
98    let client = RetryClient::new(
99        SharedReplaceableClient::new(init_worker_client(&worker_config, client)),
100        RetryConfig::default(),
101    );
102    let client_ident = client.identity();
103    let sticky_q = sticky_q_name_for_worker(&client_ident, &worker_config);
104
105    if client_ident.is_empty() {
106        bail!("Client identity cannot be empty. Either lang or user should be setting this value");
107    }
108
109    let heartbeat_fn = worker_config
110        .heartbeat_interval
111        .map(|_| Arc::new(OnceLock::new()));
112
113    let client_bag = Arc::new(WorkerClientBag::new(
114        client,
115        worker_config.namespace.clone(),
116        client_ident,
117        worker_config.versioning_strategy.clone(),
118        heartbeat_fn.clone(),
119    ));
120
121    Ok(Worker::new(
122        worker_config,
123        sticky_q,
124        client_bag,
125        Some(&runtime.telemetry),
126        heartbeat_fn,
127    ))
128}
129
130/// Create a worker for replaying one or more existing histories. It will auto-shutdown as soon as
131/// all histories have finished being replayed.
132///
133/// You do not necessarily need a [CoreRuntime] for replay workers, but it's advisable to create
134/// one and use it to run the replay worker's async functions the same way you would for a normal
135/// worker.
136pub fn init_replay_worker<I>(rwi: ReplayWorkerInput<I>) -> Result<Worker, anyhow::Error>
137where
138    I: Stream<Item = HistoryForReplay> + Send + 'static,
139{
140    info!(
141        task_queue = rwi.config.task_queue.as_str(),
142        "Registering replay worker"
143    );
144    rwi.into_core_worker()
145}
146
147pub(crate) fn init_worker_client<CT>(config: &WorkerConfig, client: CT) -> Client
148where
149    CT: Into<sealed::AnyClient>,
150{
151    let mut client = Client::new(*client.into().into_inner(), config.namespace.clone());
152    if let Some(ref id_override) = config.client_identity_override {
153        client.options_mut().identity.clone_from(id_override);
154    }
155    client
156}
157
158/// Creates a unique sticky queue name for a worker, iff the config allows for 1 or more cached
159/// workflows.
160pub(crate) fn sticky_q_name_for_worker(
161    process_identity: &str,
162    config: &WorkerConfig,
163) -> Option<String> {
164    if config.max_cached_workflows > 0 {
165        Some(format!(
166            "{}-{}",
167            &process_identity,
168            uuid::Uuid::new_v4().simple()
169        ))
170    } else {
171        None
172    }
173}
174
175mod sealed {
176    use super::*;
177    use temporal_client::SharedReplaceableClient;
178
179    /// Allows passing different kinds of clients into things that want to be flexible. Motivating
180    /// use-case was worker initialization.
181    ///
182    /// Needs to exist in this crate to avoid blanket impl conflicts.
183    pub struct AnyClient {
184        pub(crate) inner: Box<ConfiguredClient<TemporalServiceClientWithMetrics>>,
185    }
186    impl AnyClient {
187        pub(crate) fn into_inner(self) -> Box<ConfiguredClient<TemporalServiceClientWithMetrics>> {
188            self.inner
189        }
190    }
191
192    impl From<ConfiguredClient<TemporalServiceClientWithMetrics>> for AnyClient {
193        fn from(c: ConfiguredClient<TemporalServiceClientWithMetrics>) -> Self {
194            Self { inner: Box::new(c) }
195        }
196    }
197
198    impl From<Client> for AnyClient {
199        fn from(c: Client) -> Self {
200            c.into_inner().into()
201        }
202    }
203
204    impl<T> From<RetryClient<T>> for AnyClient
205    where
206        T: Into<AnyClient>,
207    {
208        fn from(c: RetryClient<T>) -> Self {
209            c.into_inner().into()
210        }
211    }
212
213    impl<T> From<SharedReplaceableClient<T>> for AnyClient
214    where
215        T: Into<AnyClient> + Clone + Send + Sync,
216    {
217        fn from(c: SharedReplaceableClient<T>) -> Self {
218            c.inner_clone().into()
219        }
220    }
221
222    impl<T> From<Arc<T>> for AnyClient
223    where
224        T: Into<AnyClient> + Clone,
225    {
226        fn from(c: Arc<T>) -> Self {
227            Arc::unwrap_or_clone(c).into()
228        }
229    }
230}
231
232/// Holds shared state/components needed to back instances of workers and clients. More than one
233/// may be instantiated, but typically only one is needed. More than one runtime instance may be
234/// useful if multiple different telemetry settings are required.
235pub struct CoreRuntime {
236    telemetry: TelemetryInstance,
237    runtime: Option<tokio::runtime::Runtime>,
238    runtime_handle: tokio::runtime::Handle,
239}
240
241/// Wraps a [tokio::runtime::Builder] to allow layering multiple on_thread_start functions
242pub struct TokioRuntimeBuilder<F> {
243    /// The underlying tokio runtime builder
244    pub inner: tokio::runtime::Builder,
245    /// A function to be called when setting the runtime builder's on thread start
246    pub lang_on_thread_start: Option<F>,
247}
248
249impl Default for TokioRuntimeBuilder<Box<dyn Fn() + Send + Sync>> {
250    fn default() -> Self {
251        TokioRuntimeBuilder {
252            inner: tokio::runtime::Builder::new_multi_thread(),
253            lang_on_thread_start: None,
254        }
255    }
256}
257
258impl CoreRuntime {
259    /// Create a new core runtime with the provided telemetry options and tokio runtime builder.
260    /// Also initialize telemetry for the thread this is being called on.
261    ///
262    /// Note that this function will call the [tokio::runtime::Builder::enable_all] builder option
263    /// on the Tokio runtime builder, and will call [tokio::runtime::Builder::on_thread_start] to
264    /// ensure telemetry subscribers are set on every tokio thread.
265    ///
266    /// **Important**: You need to call this *before* calling any async functions on workers or
267    /// clients, otherwise the tracing subscribers will not be properly attached.
268    ///
269    /// # Panics
270    /// If a tokio runtime has already been initialized. To re-use an existing runtime, call
271    /// [CoreRuntime::new_assume_tokio].
272    pub fn new<F>(
273        telemetry_options: TelemetryOptions,
274        mut tokio_builder: TokioRuntimeBuilder<F>,
275    ) -> Result<Self, anyhow::Error>
276    where
277        F: Fn() + Send + Sync + 'static,
278    {
279        let telemetry = telemetry_init(telemetry_options)?;
280        let subscriber = telemetry.trace_subscriber();
281        let runtime = tokio_builder
282            .inner
283            .enable_all()
284            .on_thread_start(move || {
285                if let Some(sub) = subscriber.as_ref() {
286                    set_trace_subscriber_for_current_thread(sub.clone());
287                }
288                if let Some(lang_on_thread_start) = tokio_builder.lang_on_thread_start.as_ref() {
289                    lang_on_thread_start();
290                }
291            })
292            .build()?;
293        let _rg = runtime.enter();
294        let mut me = Self::new_assume_tokio_initialized_telem(telemetry);
295        me.runtime = Some(runtime);
296        Ok(me)
297    }
298
299    /// Initialize telemetry for the thread this is being called on, assuming a tokio runtime is
300    /// already active and this call exists in its context. See [Self::new] for more.
301    ///
302    /// # Panics
303    /// If there is no currently active Tokio runtime
304    pub fn new_assume_tokio(telemetry_options: TelemetryOptions) -> Result<Self, anyhow::Error> {
305        let telemetry = telemetry_init(telemetry_options)?;
306        Ok(Self::new_assume_tokio_initialized_telem(telemetry))
307    }
308
309    /// Construct a runtime from an already-initialized telemetry instance, assuming a tokio runtime
310    /// is already active and this call exists in its context. See [Self::new] for more.
311    ///
312    /// # Panics
313    /// If there is no currently active Tokio runtime
314    pub fn new_assume_tokio_initialized_telem(telemetry: TelemetryInstance) -> Self {
315        let runtime_handle = tokio::runtime::Handle::current();
316        if let Some(sub) = telemetry.trace_subscriber() {
317            set_trace_subscriber_for_current_thread(sub);
318        }
319        Self {
320            telemetry,
321            runtime: None,
322            runtime_handle,
323        }
324    }
325
326    /// Get a handle to the tokio runtime used by this Core runtime.
327    pub fn tokio_handle(&self) -> tokio::runtime::Handle {
328        self.runtime_handle.clone()
329    }
330
331    /// Return a reference to the owned [TelemetryInstance]
332    pub fn telemetry(&self) -> &TelemetryInstance {
333        &self.telemetry
334    }
335
336    /// Return a mutable reference to the owned [TelemetryInstance]
337    pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
338        &mut self.telemetry
339    }
340}
341
342impl Drop for CoreRuntime {
343    fn drop(&mut self) {
344        remove_trace_subscriber_for_current_thread();
345    }
346}