1#![warn(missing_docs)] #![allow(clippy::upper_case_acronyms)]
3
4#[cfg(test)]
10#[macro_use]
11pub extern crate assert_matches;
12#[macro_use]
13extern crate tracing;
14extern crate core;
15
16mod abstractions;
17#[cfg(feature = "antithesis_assertions")]
18mod antithesis;
19#[cfg(feature = "debug-plugin")]
20pub mod debug_client;
21#[cfg(feature = "ephemeral-server")]
22pub mod ephemeral_server;
23mod internal_flags;
24mod pollers;
25mod protosext;
26pub mod replay;
27pub(crate) mod retry_logic;
28pub mod telemetry;
29mod worker;
30
31#[cfg(test)]
32mod core_tests;
33#[cfg(any(feature = "test-utilities", test))]
34#[macro_use]
35pub mod test_help;
36
37pub use crate::worker::client::{
38 PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient, WorkflowTaskCompletion,
39};
40pub use pollers::{Client, ClientOptions, ClientTlsOptions, RetryOptions, TlsOptions};
41pub use temporalio_common::protos::TaskToken;
42pub use url::Url;
43pub use worker::{
44 ActivitySlotKind, CompleteActivityError, CompleteNexusError, CompleteWfError,
45 FixedSizeSlotSupplier, LocalActivitySlotKind, NamespaceCapabilities, NexusSlotKind, PollError,
46 PollerBehavior, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder,
47 ResourceBasedTuner, ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType,
48 SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
49 SlotSupplierOptions, SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions,
50 TunerHolderOptionsBuilder, Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner,
51 WorkerValidationError, WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
52};
53
54use crate::{
55 replay::{HistoryForReplay, ReplayWorkerInput},
56 telemetry::metrics::MetricsContext,
57 worker::client::WorkerClientBag,
58};
59use anyhow::bail;
60use futures_util::Stream;
61use std::{sync::Arc, time::Duration};
62use temporalio_client::{Connection, SharedReplaceableClient};
63use temporalio_common::{
64 protos::coresdk::ActivityHeartbeat,
65 telemetry::{
66 TelemetryInstance, TelemetryOptions, remove_trace_subscriber_for_current_thread,
67 set_trace_subscriber_for_current_thread, telemetry_init,
68 },
69};
70
71pub fn init_worker(
81 runtime: &CoreRuntime,
82 worker_config: WorkerConfig,
83 mut connection: Connection,
84) -> Result<Worker, anyhow::Error> {
85 let namespace = worker_config.namespace.clone();
86 if namespace.is_empty() {
87 bail!("Worker namespace cannot be empty");
88 }
89
90 *connection.retry_options_mut() = RetryOptions::default();
91 init_worker_client(
92 &mut connection,
93 worker_config.client_identity_override.clone(),
94 );
95 let client = SharedReplaceableClient::new(connection);
96 let client_ident = client.inner_cow().identity().to_owned();
97 if client_ident.is_empty() {
98 bail!("Client identity cannot be empty. Either lang or user should be setting this value");
99 }
100 let sticky_q = sticky_q_name_for_worker(&client_ident, worker_config.max_cached_workflows);
101
102 let worker_instance_key = uuid::Uuid::new_v4();
103 let client_bag = Arc::new(WorkerClientBag::new(
104 client,
105 namespace.clone(),
106 worker_config.versioning_strategy.clone(),
107 worker_instance_key,
108 ));
109
110 Worker::new(
111 worker_config.clone(),
112 sticky_q,
113 client_bag.clone(),
114 Some(&runtime.telemetry),
115 runtime.heartbeat_interval,
116 )
117}
118
119pub fn init_replay_worker<I>(rwi: ReplayWorkerInput<I>) -> Result<Worker, anyhow::Error>
126where
127 I: Stream<Item = HistoryForReplay> + Send + 'static,
128{
129 info!(
130 task_queue = rwi.config.task_queue.as_str(),
131 "Registering replay worker"
132 );
133 rwi.into_core_worker()
134}
135
136pub(crate) fn init_worker_client(
137 connection: &mut Connection,
138 client_identity_override: Option<String>,
139) {
140 if let Some(ref id_override) = client_identity_override {
141 connection.identity_mut().clone_from(id_override);
142 }
143}
144
145pub(crate) fn sticky_q_name_for_worker(
148 process_identity: &str,
149 max_cached_workflows: usize,
150) -> Option<String> {
151 if max_cached_workflows > 0 {
152 Some(format!(
153 "{}-{}",
154 &process_identity,
155 uuid::Uuid::new_v4().simple()
156 ))
157 } else {
158 None
159 }
160}
161
162pub struct CoreRuntime {
166 telemetry: TelemetryInstance,
167 runtime: Option<tokio::runtime::Runtime>,
168 runtime_handle: tokio::runtime::Handle,
169 heartbeat_interval: Option<Duration>,
170}
171
172#[derive(Default, bon::Builder)]
175#[builder(finish_fn(vis = "", name = build_internal))]
176#[non_exhaustive]
177pub struct RuntimeOptions {
178 #[builder(default)]
180 telemetry_options: TelemetryOptions,
181 #[builder(required, default = Some(Duration::from_secs(60)))]
186 heartbeat_interval: Option<Duration>,
187}
188
189impl<S: runtime_options_builder::State> RuntimeOptionsBuilder<S> {
190 pub fn build(self) -> Result<RuntimeOptions, String> {
195 let options = self.build_internal();
196 {
197 if let Some(interval) = options.heartbeat_interval
198 && (interval < Duration::from_secs(1) || interval > Duration::from_secs(60))
199 {
200 return Err(format!(
201 "heartbeat_interval ({interval:?}) must be between 1s and 60s",
202 ));
203 }
204
205 Ok(options)
206 }
207 }
208}
209
210pub struct TokioRuntimeBuilder<F> {
212 pub inner: tokio::runtime::Builder,
214 pub lang_on_thread_start: Option<F>,
216}
217
218impl Default for TokioRuntimeBuilder<Box<dyn Fn() + Send + Sync>> {
219 fn default() -> Self {
220 TokioRuntimeBuilder {
221 inner: tokio::runtime::Builder::new_multi_thread(),
222 lang_on_thread_start: None,
223 }
224 }
225}
226
227impl CoreRuntime {
228 pub fn new<F>(
242 runtime_options: RuntimeOptions,
243 mut tokio_builder: TokioRuntimeBuilder<F>,
244 ) -> Result<Self, anyhow::Error>
245 where
246 F: Fn() + Send + Sync + 'static,
247 {
248 let telemetry = telemetry_init(runtime_options.telemetry_options)?;
249 let subscriber = telemetry.trace_subscriber();
250 let runtime = tokio_builder
251 .inner
252 .enable_all()
253 .on_thread_start(move || {
254 if let Some(sub) = subscriber.as_ref() {
255 set_trace_subscriber_for_current_thread(sub.clone());
256 }
257 if let Some(lang_on_thread_start) = tokio_builder.lang_on_thread_start.as_ref() {
258 lang_on_thread_start();
259 }
260 })
261 .build()?;
262 let _rg = runtime.enter();
263 let mut me =
264 Self::new_assume_tokio_initialized_telem(telemetry, runtime_options.heartbeat_interval);
265 me.runtime = Some(runtime);
266 Ok(me)
267 }
268
269 pub fn new_assume_tokio(runtime_options: RuntimeOptions) -> Result<Self, anyhow::Error> {
275 let telemetry = telemetry_init(runtime_options.telemetry_options)?;
276 Ok(Self::new_assume_tokio_initialized_telem(
277 telemetry,
278 runtime_options.heartbeat_interval,
279 ))
280 }
281
282 pub fn new_assume_tokio_initialized_telem(
288 telemetry: TelemetryInstance,
289 heartbeat_interval: Option<Duration>,
290 ) -> Self {
291 let runtime_handle = tokio::runtime::Handle::current();
292 if let Some(sub) = telemetry.trace_subscriber() {
293 set_trace_subscriber_for_current_thread(sub);
294 }
295 Self {
296 telemetry,
297 runtime: None,
298 runtime_handle,
299 heartbeat_interval,
300 }
301 }
302
303 pub fn tokio_handle(&self) -> tokio::runtime::Handle {
305 self.runtime_handle.clone()
306 }
307
308 pub fn telemetry(&self) -> &TelemetryInstance {
310 &self.telemetry
311 }
312
313 pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
315 &mut self.telemetry
316 }
317}
318
319impl Drop for CoreRuntime {
320 fn drop(&mut self) {
321 remove_trace_subscriber_for_current_thread();
322 }
323}