pub mod metrics;
#[cfg(feature = "core-based-sdk")]
mod log_export;
#[cfg(feature = "otel")]
mod otel;
#[cfg(feature = "prometheus")]
mod prometheus_meter;
#[cfg(feature = "prometheus")]
mod prometheus_server;
use crate::telemetry::metrics::{
CoreMeter, MetricKeyValue, NewAttributes, PrefixedMetricsMeter, TemporalMeter,
};
use std::{
cell::RefCell,
collections::HashMap,
env,
fmt::{Debug, Formatter},
net::SocketAddr,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tracing::{Level, Subscriber};
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt};
use url::Url;
#[cfg(feature = "core-based-sdk")]
use crate::telemetry::log_export::CoreLogConsumerLayer;
#[cfg(feature = "core-based-sdk")]
pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer, CoreLogStreamConsumer};
#[cfg(feature = "otel")]
pub use otel::build_otlp_metric_exporter;
#[cfg(feature = "prometheus")]
pub use prometheus_server::start_prometheus_metric_exporter;
pub static METRIC_PREFIX: &str = "temporal_";
const TELEM_SERVICE_NAME: &str = "temporal-core-sdk";
pub trait CoreTelemetry {
fn fetch_buffered_logs(&self) -> Vec<CoreLog>;
}
#[derive(Clone, bon::Builder)]
#[non_exhaustive]
pub struct TelemetryOptions {
#[builder(into)]
pub logging: Option<Logger>,
#[builder(into)]
pub metrics: Option<Arc<dyn CoreMeter>>,
#[builder(default = true)]
pub attach_service_name: bool,
#[builder(default = METRIC_PREFIX.to_string())]
pub metric_prefix: String,
pub subscriber_override: Option<Arc<dyn Subscriber + Send + Sync>>,
#[builder(default = TaskQueueLabelStrategy::UseNormal)]
pub task_queue_label_strategy: TaskQueueLabelStrategy,
}
impl Debug for TelemetryOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
#[derive(Debug)]
#[allow(dead_code)]
struct TelemetryOptions<'a> {
logging: &'a Option<Logger>,
metrics: &'a Option<Arc<dyn CoreMeter>>,
attach_service_name: &'a bool,
metric_prefix: &'a str,
}
let Self {
logging,
metrics,
attach_service_name,
metric_prefix,
..
} = self;
Debug::fmt(
&TelemetryOptions {
logging,
metrics,
attach_service_name,
metric_prefix,
},
f,
)
}
}
#[derive(Copy, Clone, Debug)]
#[non_exhaustive]
pub enum TaskQueueLabelStrategy {
UseNormal,
UseNormalAndSticky,
}
#[derive(Debug, Clone, bon::Builder)]
pub struct OtelCollectorOptions {
pub url: Url,
#[builder(default = HashMap::new())]
pub headers: HashMap<String, String>,
#[builder(default = Duration::from_secs(1))]
pub metric_periodicity: Duration,
#[builder(default = MetricTemporality::Cumulative)]
pub metric_temporality: MetricTemporality,
#[builder(default)]
pub global_tags: HashMap<String, String>,
#[builder(default)]
pub use_seconds_for_durations: bool,
#[builder(default)]
pub histogram_bucket_overrides: HistogramBucketOverrides,
#[builder(default = OtlpProtocol::Grpc)]
pub protocol: OtlpProtocol,
}
#[derive(Debug, Clone, bon::Builder)]
pub struct PrometheusExporterOptions {
pub socket_addr: SocketAddr,
#[builder(default)]
pub global_tags: HashMap<String, String>,
#[builder(default)]
pub counters_total_suffix: bool,
#[builder(default)]
pub unit_suffix: bool,
#[builder(default)]
pub use_seconds_for_durations: bool,
#[builder(default)]
pub histogram_bucket_overrides: HistogramBucketOverrides,
}
#[derive(Debug, Clone, Default)]
pub struct HistogramBucketOverrides {
pub overrides: HashMap<String, Vec<f64>>,
}
#[derive(Debug, Clone)]
pub enum Logger {
Console {
filter: String,
},
#[cfg(feature = "core-based-sdk")]
Forward {
filter: String,
},
#[cfg(feature = "core-based-sdk")]
Push {
filter: String,
consumer: Arc<dyn CoreLogConsumer>,
},
}
#[derive(Debug, Clone, Copy)]
pub enum MetricTemporality {
Cumulative,
Delta,
}
#[derive(Debug, Clone, Copy)]
pub enum OtlpProtocol {
Grpc,
Http,
}
impl Default for TelemetryOptions {
fn default() -> Self {
TelemetryOptions::builder().build()
}
}
#[derive(Debug)]
pub struct CoreLog {
pub target: String,
pub message: String,
pub timestamp: SystemTime,
pub level: Level,
pub fields: HashMap<String, serde_json::Value>,
pub span_contexts: Vec<String>,
}
impl CoreLog {
pub fn millis_since_epoch(&self) -> u128 {
self.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis()
}
}
pub trait CoreLogConsumer: Send + Sync + Debug {
fn on_log(&self, log: CoreLog);
}
#[cfg(feature = "core-based-sdk")]
const FORWARD_LOG_BUFFER_SIZE: usize = 2048;
pub fn construct_filter_string(core_level: Level, other_level: Level) -> String {
format!(
"{other_level},temporalio_common={core_level},temporalio_sdk_core={core_level},temporalio_client={core_level},temporalio_sdk={core_level}"
)
}
pub struct TelemetryInstance {
metric_prefix: String,
#[cfg(feature = "core-based-sdk")]
logs_out: Option<parking_lot::Mutex<CoreLogBuffer>>,
metrics: Option<Arc<dyn CoreMeter + 'static>>,
trace_subscriber: Option<Arc<dyn Subscriber + Send + Sync>>,
attach_service_name: bool,
task_queue_label_strategy: TaskQueueLabelStrategy,
}
impl TelemetryInstance {
pub fn trace_subscriber(&self) -> Option<Arc<dyn Subscriber + Send + Sync>> {
self.trace_subscriber.clone()
}
pub fn attach_late_init_metrics(&mut self, meter: Arc<dyn CoreMeter + 'static>) {
self.metrics = Some(meter);
}
pub fn get_temporal_metric_meter(&self) -> Option<TemporalMeter> {
self.metrics.clone().map(|m| {
let kvs = self.default_kvs();
let attribs = NewAttributes::new(kvs);
TemporalMeter::new(
Arc::new(PrefixedMetricsMeter::new(self.metric_prefix.clone(), m))
as Arc<dyn CoreMeter>,
attribs,
self.task_queue_label_strategy,
)
})
}
pub fn get_metric_meter(&self) -> Option<TemporalMeter> {
self.metrics.clone().map(|m| {
let kvs = self.default_kvs();
let attribs = NewAttributes::new(kvs);
TemporalMeter::new(m, attribs, self.task_queue_label_strategy)
})
}
fn default_kvs(&self) -> Vec<MetricKeyValue> {
if self.attach_service_name {
vec![MetricKeyValue::new("service_name", TELEM_SERVICE_NAME)]
} else {
vec![]
}
}
}
thread_local! {
static SUB_GUARD: RefCell<Option<tracing::subscriber::DefaultGuard>> =
const { RefCell::new(None) };
}
pub fn set_trace_subscriber_for_current_thread(sub: impl Subscriber + Send + Sync + 'static) {
SUB_GUARD.with(|sg| {
if sg.borrow().is_none() {
let g = tracing::subscriber::set_default(sub);
*sg.borrow_mut() = Some(g);
}
})
}
pub fn remove_trace_subscriber_for_current_thread() {
SUB_GUARD.take();
}
#[cfg(feature = "core-based-sdk")]
impl CoreTelemetry for TelemetryInstance {
fn fetch_buffered_logs(&self) -> Vec<CoreLog> {
if let Some(logs_out) = self.logs_out.as_ref() {
logs_out.lock().drain()
} else {
vec![]
}
}
}
pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyhow::Error> {
#[cfg(feature = "core-based-sdk")]
let mut logs_out = None;
let mut console_pretty_layer = None;
let mut console_compact_layer = None;
#[cfg(feature = "core-based-sdk")]
let mut forward_layer = None;
let tracing_sub = if let Some(ts) = opts.subscriber_override {
Some(ts)
} else {
opts.logging.map(|logger| {
match logger {
Logger::Console { filter } => {
if env::var("TEMPORAL_CORE_PRETTY_LOGS").is_ok() {
console_pretty_layer = Some(
tracing_subscriber::fmt::layer()
.with_target(false)
.event_format(
tracing_subscriber::fmt::format()
.pretty()
.with_source_location(false),
)
.with_filter(EnvFilter::new(filter)),
)
} else {
console_compact_layer = Some(
tracing_subscriber::fmt::layer()
.with_target(false)
.event_format(
tracing_subscriber::fmt::format()
.compact()
.with_source_location(false),
)
.with_filter(EnvFilter::new(filter)),
)
}
}
#[cfg(feature = "core-based-sdk")]
Logger::Forward { filter } => {
let (export_layer, lo) =
CoreLogConsumerLayer::new_buffered(FORWARD_LOG_BUFFER_SIZE);
logs_out = Some(parking_lot::Mutex::new(lo));
forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter)));
}
#[cfg(feature = "core-based-sdk")]
Logger::Push { filter, consumer } => {
forward_layer = Some(
CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter)),
);
}
};
let reg = tracing_subscriber::registry()
.with(console_pretty_layer)
.with(console_compact_layer);
#[cfg(feature = "core-based-sdk")]
let reg = reg.with(forward_layer);
Arc::new(reg) as Arc<dyn Subscriber + Send + Sync>
})
};
Ok(TelemetryInstance {
metric_prefix: opts.metric_prefix,
#[cfg(feature = "core-based-sdk")]
logs_out,
metrics: opts.metrics,
trace_subscriber: tracing_sub,
attach_service_name: opts.attach_service_name,
task_queue_label_strategy: opts.task_queue_label_strategy,
})
}
pub fn telemetry_init_global(opts: TelemetryOptions) -> Result<(), anyhow::Error> {
static INITTED: AtomicBool = AtomicBool::new(false);
if INITTED
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
let ti = telemetry_init(opts)?;
if let Some(ts) = ti.trace_subscriber() {
tracing::subscriber::set_global_default(ts)?;
}
}
Ok(())
}
pub fn telemetry_init_fallback() -> Result<(), anyhow::Error> {
telemetry_init_global(
TelemetryOptions::builder()
.logging(Logger::Console {
filter: construct_filter_string(Level::DEBUG, Level::WARN),
})
.build(),
)?;
Ok(())
}