Skip to main content

temporalio_sdk_core/
lib.rs

1#![warn(missing_docs)] // error if there are missing docs
2#![allow(clippy::upper_case_acronyms)]
3
4//! This crate provides a basis for creating new Temporal SDKs without completely starting from
5//! scratch. APIs provided by this crate are not considered stable and may break at any time.
6//!
7//! If you are looking for the Temporal Rust SDK, please use `temporalio-sdk`.
8
9#[cfg(test)]
10#[macro_use]
11pub extern crate assert_matches;
12#[macro_use]
13extern crate tracing;
14extern crate core;
15
16mod abstractions;
17#[cfg(feature = "antithesis_assertions")]
18mod antithesis;
19#[cfg(feature = "debug-plugin")]
20pub mod debug_client;
21#[cfg(feature = "ephemeral-server")]
22pub mod ephemeral_server;
23mod internal_flags;
24mod pollers;
25mod protosext;
26pub mod replay;
27pub(crate) mod retry_logic;
28pub mod telemetry;
29mod worker;
30
31#[cfg(test)]
32mod core_tests;
33#[cfg(any(feature = "test-utilities", test))]
34#[macro_use]
35pub mod test_help;
36
37pub use crate::worker::client::{
38    PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient, WorkflowTaskCompletion,
39};
40pub use pollers::{Client, ClientOptions, ClientTlsOptions, RetryOptions, TlsOptions};
41pub use temporalio_common::protos::TaskToken;
42pub use url::Url;
43pub use worker::{
44    ActivitySlotKind, CompleteActivityError, CompleteNexusError, CompleteWfError,
45    FixedSizeSlotSupplier, LocalActivitySlotKind, NamespaceCapabilities, NexusSlotKind, PollError,
46    PollerBehavior, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder,
47    ResourceBasedTuner, ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType,
48    SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
49    SlotSupplierOptions, SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions,
50    TunerHolderOptionsBuilder, Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner,
51    WorkerValidationError, WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
52};
53
54use crate::{
55    replay::{HistoryForReplay, ReplayWorkerInput},
56    telemetry::metrics::MetricsContext,
57    worker::client::WorkerClientBag,
58};
59use anyhow::bail;
60use futures_util::Stream;
61use std::{sync::Arc, time::Duration};
62use temporalio_client::{Connection, SharedReplaceableClient};
63use temporalio_common::{
64    protos::coresdk::ActivityHeartbeat,
65    telemetry::{
66        TelemetryInstance, TelemetryOptions, remove_trace_subscriber_for_current_thread,
67        set_trace_subscriber_for_current_thread, telemetry_init,
68    },
69};
70
71/// Initialize a worker bound to a task queue.
72///
73/// You will need to have already initialized a [CoreRuntime] which will be used for this worker.
74/// After the worker is initialized, you should use [CoreRuntime::tokio_handle] to run the worker's
75/// async functions.
76///
77/// Lang implementations must pass in a [Client] When they do so, this function will always
78/// overwrite the client retry configuration, force the client to use the namespace defined in the
79/// worker config, and set the client identity appropriately.
80pub fn init_worker(
81    runtime: &CoreRuntime,
82    worker_config: WorkerConfig,
83    mut connection: Connection,
84) -> Result<Worker, anyhow::Error> {
85    let namespace = worker_config.namespace.clone();
86    if namespace.is_empty() {
87        bail!("Worker namespace cannot be empty");
88    }
89
90    *connection.retry_options_mut() = RetryOptions::default();
91    init_worker_client(
92        &mut connection,
93        worker_config.client_identity_override.clone(),
94    );
95    let client = SharedReplaceableClient::new(connection);
96    let client_ident = client.inner_cow().identity().to_owned();
97    if client_ident.is_empty() {
98        bail!("Client identity cannot be empty. Either lang or user should be setting this value");
99    }
100    let sticky_q = sticky_q_name_for_worker(&client_ident, worker_config.max_cached_workflows);
101
102    let worker_instance_key = uuid::Uuid::new_v4();
103    let client_bag = Arc::new(WorkerClientBag::new(
104        client,
105        namespace.clone(),
106        worker_config.versioning_strategy.clone(),
107        worker_instance_key,
108    ));
109
110    Worker::new(
111        worker_config.clone(),
112        sticky_q,
113        client_bag.clone(),
114        Some(&runtime.telemetry),
115        runtime.heartbeat_interval,
116    )
117}
118
119/// Create a worker for replaying one or more existing histories. It will auto-shutdown as soon as
120/// all histories have finished being replayed.
121///
122/// You do not necessarily need a [CoreRuntime] for replay workers, but it's advisable to create
123/// one and use it to run the replay worker's async functions the same way you would for a normal
124/// worker.
125pub fn init_replay_worker<I>(rwi: ReplayWorkerInput<I>) -> Result<Worker, anyhow::Error>
126where
127    I: Stream<Item = HistoryForReplay> + Send + 'static,
128{
129    info!(
130        task_queue = rwi.config.task_queue.as_str(),
131        "Registering replay worker"
132    );
133    rwi.into_core_worker()
134}
135
136pub(crate) fn init_worker_client(
137    connection: &mut Connection,
138    client_identity_override: Option<String>,
139) {
140    if let Some(ref id_override) = client_identity_override {
141        connection.identity_mut().clone_from(id_override);
142    }
143}
144
145/// Creates a unique sticky queue name for a worker, iff the config allows for 1 or more cached
146/// workflows.
147pub(crate) fn sticky_q_name_for_worker(
148    process_identity: &str,
149    max_cached_workflows: usize,
150) -> Option<String> {
151    if max_cached_workflows > 0 {
152        Some(format!(
153            "{}-{}",
154            &process_identity,
155            uuid::Uuid::new_v4().simple()
156        ))
157    } else {
158        None
159    }
160}
161
162/// Holds shared state/components needed to back instances of workers and clients. More than one
163/// may be instantiated, but typically only one is needed. More than one runtime instance may be
164/// useful if multiple different telemetry settings are required.
165pub struct CoreRuntime {
166    telemetry: TelemetryInstance,
167    runtime: Option<tokio::runtime::Runtime>,
168    runtime_handle: tokio::runtime::Handle,
169    heartbeat_interval: Option<Duration>,
170}
171
172/// Holds telemetry options, as well as worker heartbeat_interval. Construct with
173/// [RuntimeOptions::builder]
174#[derive(Default, bon::Builder)]
175#[builder(finish_fn(vis = "", name = build_internal))]
176#[non_exhaustive]
177pub struct RuntimeOptions {
178    /// Telemetry configuration options.
179    #[builder(default)]
180    telemetry_options: TelemetryOptions,
181    /// Optional worker heartbeat interval - This configures the heartbeat setting of all
182    /// workers created using this runtime.
183    ///
184    /// Interval must be between 1s and 60s, inclusive.
185    #[builder(required, default = Some(Duration::from_secs(60)))]
186    heartbeat_interval: Option<Duration>,
187}
188
189impl<S: runtime_options_builder::State> RuntimeOptionsBuilder<S> {
190    /// Builds the RuntimeOptions
191    ///
192    /// # Errors
193    /// Returns an error if heartbeat_interval is set but not between 1s and 60s inclusive.
194    pub fn build(self) -> Result<RuntimeOptions, String> {
195        let options = self.build_internal();
196        {
197            if let Some(interval) = options.heartbeat_interval
198                && (interval < Duration::from_secs(1) || interval > Duration::from_secs(60))
199            {
200                return Err(format!(
201                    "heartbeat_interval ({interval:?}) must be between 1s and 60s",
202                ));
203            }
204
205            Ok(options)
206        }
207    }
208}
209
210/// Wraps a [tokio::runtime::Builder] to allow layering multiple on_thread_start functions
211pub struct TokioRuntimeBuilder<F> {
212    /// The underlying tokio runtime builder
213    pub inner: tokio::runtime::Builder,
214    /// A function to be called when setting the runtime builder's on thread start
215    pub lang_on_thread_start: Option<F>,
216}
217
218impl Default for TokioRuntimeBuilder<Box<dyn Fn() + Send + Sync>> {
219    fn default() -> Self {
220        TokioRuntimeBuilder {
221            inner: tokio::runtime::Builder::new_multi_thread(),
222            lang_on_thread_start: None,
223        }
224    }
225}
226
227impl CoreRuntime {
228    /// Create a new core runtime with the provided telemetry options and tokio runtime builder.
229    /// Also initialize telemetry for the thread this is being called on.
230    ///
231    /// Note that this function will call the [tokio::runtime::Builder::enable_all] builder option
232    /// on the Tokio runtime builder, and will call [tokio::runtime::Builder::on_thread_start] to
233    /// ensure telemetry subscribers are set on every tokio thread.
234    ///
235    /// **Important**: You need to call this *before* calling any async functions on workers or
236    /// clients, otherwise the tracing subscribers will not be properly attached.
237    ///
238    /// # Panics
239    /// If a tokio runtime has already been initialized. To re-use an existing runtime, call
240    /// [CoreRuntime::new_assume_tokio].
241    pub fn new<F>(
242        runtime_options: RuntimeOptions,
243        mut tokio_builder: TokioRuntimeBuilder<F>,
244    ) -> Result<Self, anyhow::Error>
245    where
246        F: Fn() + Send + Sync + 'static,
247    {
248        let telemetry = telemetry_init(runtime_options.telemetry_options)?;
249        let subscriber = telemetry.trace_subscriber();
250        let runtime = tokio_builder
251            .inner
252            .enable_all()
253            .on_thread_start(move || {
254                if let Some(sub) = subscriber.as_ref() {
255                    set_trace_subscriber_for_current_thread(sub.clone());
256                }
257                if let Some(lang_on_thread_start) = tokio_builder.lang_on_thread_start.as_ref() {
258                    lang_on_thread_start();
259                }
260            })
261            .build()?;
262        let _rg = runtime.enter();
263        let mut me =
264            Self::new_assume_tokio_initialized_telem(telemetry, runtime_options.heartbeat_interval);
265        me.runtime = Some(runtime);
266        Ok(me)
267    }
268
269    /// Initialize telemetry for the thread this is being called on, assuming a tokio runtime is
270    /// already active and this call exists in its context. See [Self::new] for more.
271    ///
272    /// # Panics
273    /// If there is no currently active Tokio runtime
274    pub fn new_assume_tokio(runtime_options: RuntimeOptions) -> Result<Self, anyhow::Error> {
275        let telemetry = telemetry_init(runtime_options.telemetry_options)?;
276        Ok(Self::new_assume_tokio_initialized_telem(
277            telemetry,
278            runtime_options.heartbeat_interval,
279        ))
280    }
281
282    /// Construct a runtime from an already-initialized telemetry instance, assuming a tokio runtime
283    /// is already active and this call exists in its context. See [Self::new] for more.
284    ///
285    /// # Panics
286    /// If there is no currently active Tokio runtime
287    pub fn new_assume_tokio_initialized_telem(
288        telemetry: TelemetryInstance,
289        heartbeat_interval: Option<Duration>,
290    ) -> Self {
291        let runtime_handle = tokio::runtime::Handle::current();
292        if let Some(sub) = telemetry.trace_subscriber() {
293            set_trace_subscriber_for_current_thread(sub);
294        }
295        Self {
296            telemetry,
297            runtime: None,
298            runtime_handle,
299            heartbeat_interval,
300        }
301    }
302
303    /// Get a handle to the tokio runtime used by this Core runtime.
304    pub fn tokio_handle(&self) -> tokio::runtime::Handle {
305        self.runtime_handle.clone()
306    }
307
308    /// Return a reference to the owned [TelemetryInstance]
309    pub fn telemetry(&self) -> &TelemetryInstance {
310        &self.telemetry
311    }
312
313    /// Return a mutable reference to the owned [TelemetryInstance]
314    pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
315        &mut self.telemetry
316    }
317}
318
319impl Drop for CoreRuntime {
320    fn drop(&mut self) {
321        remove_trace_subscriber_for_current_thread();
322    }
323}