#![warn(missing_docs)] #![allow(clippy::upper_case_acronyms)]
#[cfg(test)]
#[macro_use]
pub extern crate assert_matches;
#[macro_use]
extern crate tracing;
extern crate core;
mod abstractions;
#[cfg(feature = "antithesis_assertions")]
mod antithesis;
#[cfg(feature = "debug-plugin")]
pub mod debug_client;
#[cfg(feature = "ephemeral-server")]
pub mod ephemeral_server;
mod internal_flags;
mod pollers;
mod protosext;
pub mod replay;
pub(crate) mod retry_logic;
pub mod telemetry;
mod worker;
#[cfg(test)]
mod core_tests;
#[cfg(any(feature = "test-utilities", test))]
#[macro_use]
pub mod test_help;
pub use crate::worker::client::{
PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient, WorkflowTaskCompletion,
};
pub use pollers::{Client, ClientOptions, ClientTlsOptions, RetryOptions, TlsOptions};
pub use temporalio_common::protos::TaskToken;
pub use url::Url;
pub use worker::{
ActivitySlotKind, CompleteActivityError, CompleteNexusError, CompleteWfError,
FixedSizeSlotSupplier, LocalActivitySlotKind, NamespaceCapabilities, NexusSlotKind, PollError,
PollerBehavior, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder,
ResourceBasedTuner, ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType,
SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
SlotSupplierOptions, SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions,
TunerHolderOptionsBuilder, Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner,
WorkerValidationError, WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
};
use crate::{
replay::{HistoryForReplay, ReplayWorkerInput},
telemetry::metrics::MetricsContext,
worker::client::WorkerClientBag,
};
use anyhow::bail;
use futures_util::Stream;
use std::{sync::Arc, time::Duration};
use temporalio_client::{Connection, SharedReplaceableClient};
use temporalio_common::{
protos::coresdk::ActivityHeartbeat,
telemetry::{
TelemetryInstance, TelemetryOptions, remove_trace_subscriber_for_current_thread,
set_trace_subscriber_for_current_thread, telemetry_init,
},
};
pub fn init_worker(
runtime: &CoreRuntime,
worker_config: WorkerConfig,
mut connection: Connection,
) -> Result<Worker, anyhow::Error> {
let namespace = worker_config.namespace.clone();
if namespace.is_empty() {
bail!("Worker namespace cannot be empty");
}
*connection.retry_options_mut() = RetryOptions::default();
init_worker_client(
&mut connection,
worker_config.client_identity_override.clone(),
);
let client = SharedReplaceableClient::new(connection);
let client_ident = client.inner_cow().identity().to_owned();
if client_ident.is_empty() {
bail!("Client identity cannot be empty. Either lang or user should be setting this value");
}
let sticky_q = sticky_q_name_for_worker(&client_ident, worker_config.max_cached_workflows);
let worker_instance_key = uuid::Uuid::new_v4();
let client_bag = Arc::new(WorkerClientBag::new(
client,
namespace.clone(),
worker_config.versioning_strategy.clone(),
worker_instance_key,
));
Worker::new(
worker_config.clone(),
sticky_q,
client_bag.clone(),
Some(&runtime.telemetry),
runtime.heartbeat_interval,
)
}
pub fn init_replay_worker<I>(rwi: ReplayWorkerInput<I>) -> Result<Worker, anyhow::Error>
where
I: Stream<Item = HistoryForReplay> + Send + 'static,
{
info!(
task_queue = rwi.config.task_queue.as_str(),
"Registering replay worker"
);
rwi.into_core_worker()
}
pub(crate) fn init_worker_client(
connection: &mut Connection,
client_identity_override: Option<String>,
) {
if let Some(ref id_override) = client_identity_override {
connection.identity_mut().clone_from(id_override);
}
}
pub(crate) fn sticky_q_name_for_worker(
process_identity: &str,
max_cached_workflows: usize,
) -> Option<String> {
if max_cached_workflows > 0 {
Some(format!(
"{}-{}",
&process_identity,
uuid::Uuid::new_v4().simple()
))
} else {
None
}
}
pub struct CoreRuntime {
telemetry: TelemetryInstance,
runtime: Option<tokio::runtime::Runtime>,
runtime_handle: tokio::runtime::Handle,
heartbeat_interval: Option<Duration>,
}
#[derive(Default, bon::Builder)]
#[builder(finish_fn(vis = "", name = build_internal))]
#[non_exhaustive]
pub struct RuntimeOptions {
#[builder(default)]
telemetry_options: TelemetryOptions,
#[builder(required, default = Some(Duration::from_secs(60)))]
heartbeat_interval: Option<Duration>,
}
impl<S: runtime_options_builder::State> RuntimeOptionsBuilder<S> {
pub fn build(self) -> Result<RuntimeOptions, String> {
let options = self.build_internal();
{
if let Some(interval) = options.heartbeat_interval
&& (interval < Duration::from_secs(1) || interval > Duration::from_secs(60))
{
return Err(format!(
"heartbeat_interval ({interval:?}) must be between 1s and 60s",
));
}
Ok(options)
}
}
}
pub struct TokioRuntimeBuilder<F> {
pub inner: tokio::runtime::Builder,
pub lang_on_thread_start: Option<F>,
}
impl Default for TokioRuntimeBuilder<Box<dyn Fn() + Send + Sync>> {
fn default() -> Self {
TokioRuntimeBuilder {
inner: tokio::runtime::Builder::new_multi_thread(),
lang_on_thread_start: None,
}
}
}
impl CoreRuntime {
pub fn new<F>(
runtime_options: RuntimeOptions,
mut tokio_builder: TokioRuntimeBuilder<F>,
) -> Result<Self, anyhow::Error>
where
F: Fn() + Send + Sync + 'static,
{
let telemetry = telemetry_init(runtime_options.telemetry_options)?;
let subscriber = telemetry.trace_subscriber();
let runtime = tokio_builder
.inner
.enable_all()
.on_thread_start(move || {
if let Some(sub) = subscriber.as_ref() {
set_trace_subscriber_for_current_thread(sub.clone());
}
if let Some(lang_on_thread_start) = tokio_builder.lang_on_thread_start.as_ref() {
lang_on_thread_start();
}
})
.build()?;
let _rg = runtime.enter();
let mut me =
Self::new_assume_tokio_initialized_telem(telemetry, runtime_options.heartbeat_interval);
me.runtime = Some(runtime);
Ok(me)
}
pub fn new_assume_tokio(runtime_options: RuntimeOptions) -> Result<Self, anyhow::Error> {
let telemetry = telemetry_init(runtime_options.telemetry_options)?;
Ok(Self::new_assume_tokio_initialized_telem(
telemetry,
runtime_options.heartbeat_interval,
))
}
pub fn new_assume_tokio_initialized_telem(
telemetry: TelemetryInstance,
heartbeat_interval: Option<Duration>,
) -> Self {
let runtime_handle = tokio::runtime::Handle::current();
if let Some(sub) = telemetry.trace_subscriber() {
set_trace_subscriber_for_current_thread(sub);
}
Self {
telemetry,
runtime: None,
runtime_handle,
heartbeat_interval,
}
}
pub fn tokio_handle(&self) -> tokio::runtime::Handle {
self.runtime_handle.clone()
}
pub fn telemetry(&self) -> &TelemetryInstance {
&self.telemetry
}
pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
&mut self.telemetry
}
}
impl Drop for CoreRuntime {
fn drop(&mut self) {
remove_trace_subscriber_for_current_thread();
}
}