temporalio_common/telemetry.rs
1//! Contains tracing/logging and metrics related functionality
2
3/// Metric instrument types and the [`CoreMeter`] trait.
4pub mod metrics;
5
6#[cfg(feature = "core-based-sdk")]
7mod log_export;
8#[cfg(feature = "otel")]
9mod otel;
10#[cfg(feature = "prometheus")]
11mod prometheus_meter;
12#[cfg(feature = "prometheus")]
13mod prometheus_server;
14
15use crate::telemetry::metrics::{
16 CoreMeter, MetricKeyValue, NewAttributes, PrefixedMetricsMeter, TemporalMeter,
17};
18use std::{
19 cell::RefCell,
20 collections::HashMap,
21 env,
22 fmt::{Debug, Formatter},
23 net::SocketAddr,
24 sync::{
25 Arc,
26 atomic::{AtomicBool, Ordering},
27 },
28 time::{Duration, SystemTime, UNIX_EPOCH},
29};
30use tracing::{Level, Subscriber};
31use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt};
32use url::Url;
33
34#[cfg(feature = "core-based-sdk")]
35use crate::telemetry::log_export::CoreLogConsumerLayer;
36
37#[cfg(feature = "core-based-sdk")]
38pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer, CoreLogStreamConsumer};
39#[cfg(feature = "otel")]
40pub use otel::build_otlp_metric_exporter;
41#[cfg(feature = "prometheus")]
42pub use prometheus_server::start_prometheus_metric_exporter;
43
44/// The default prefix applied to all Temporal metric names.
45pub static METRIC_PREFIX: &str = "temporal_";
46
47const TELEM_SERVICE_NAME: &str = "temporal-core-sdk";
48
49/// Each core runtime instance has a telemetry subsystem associated with it, this trait defines the
50/// operations that lang might want to perform on that telemetry after it's initialized.
51pub trait CoreTelemetry {
52 /// Each worker buffers logs that should be shuttled over to lang so that they may be rendered
53 /// with the user's desired logging library. Use this function to grab the most recent buffered
54 /// logs since the last time it was called. A fixed number of such logs are retained at maximum,
55 /// with the oldest being dropped when full.
56 ///
57 /// Returns the list of logs from oldest to newest. Returns an empty vec if the feature is not
58 /// configured.
59 fn fetch_buffered_logs(&self) -> Vec<CoreLog>;
60}
61
62/// Telemetry configuration options. Construct with [TelemetryOptions::builder]
63#[derive(Clone, bon::Builder)]
64#[non_exhaustive]
65pub struct TelemetryOptions {
66 /// Optional logger - set as None to disable.
67 #[builder(into)]
68 pub logging: Option<Logger>,
69 /// Optional metrics exporter - set as None to disable.
70 #[builder(into)]
71 pub metrics: Option<Arc<dyn CoreMeter>>,
72 /// If set true (the default) explicitly attach a `service_name` label to all metrics. Turn this
73 /// off if your collection system supports the `target_info` metric from the OpenMetrics spec.
74 /// For more, see
75 /// [here](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
76 #[builder(default = true)]
77 pub attach_service_name: bool,
78 /// A prefix to be applied to all core-created metrics. Defaults to "temporal_".
79 #[builder(default = METRIC_PREFIX.to_string())]
80 pub metric_prefix: String,
81 /// If provided, logging config will be ignored and this explicit subscriber will be used for
82 /// all logging and traces.
83 pub subscriber_override: Option<Arc<dyn Subscriber + Send + Sync>>,
84 /// See [TaskQueueLabelStrategy].
85 #[builder(default = TaskQueueLabelStrategy::UseNormal)]
86 pub task_queue_label_strategy: TaskQueueLabelStrategy,
87}
88impl Debug for TelemetryOptions {
89 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
90 #[derive(Debug)]
91 #[allow(dead_code)]
92 struct TelemetryOptions<'a> {
93 logging: &'a Option<Logger>,
94 metrics: &'a Option<Arc<dyn CoreMeter>>,
95 attach_service_name: &'a bool,
96 metric_prefix: &'a str,
97 }
98 let Self {
99 logging,
100 metrics,
101 attach_service_name,
102 metric_prefix,
103 ..
104 } = self;
105
106 Debug::fmt(
107 &TelemetryOptions {
108 logging,
109 metrics,
110 attach_service_name,
111 metric_prefix,
112 },
113 f,
114 )
115 }
116}
117
118/// Determines how the `task_queue` label value is set on metrics.
119#[derive(Copy, Clone, Debug)]
120#[non_exhaustive]
121pub enum TaskQueueLabelStrategy {
122 /// Always use the normal task queue name, including for actions relating to sticky queues.
123 UseNormal,
124 /// Use the sticky queue name when recording metrics operating on sticky queues.
125 UseNormalAndSticky,
126}
127
128/// Options for exporting to an OpenTelemetry Collector
129#[derive(Debug, Clone, bon::Builder)]
130pub struct OtelCollectorOptions {
131 /// The url of the OTel collector to export telemetry and metrics to. Lang SDK should also
132 /// export to this same collector.
133 pub url: Url,
134 /// Optional set of HTTP headers to send to the Collector, e.g for authentication.
135 #[builder(default = HashMap::new())]
136 pub headers: HashMap<String, String>,
137 /// Optionally specify how frequently metrics should be exported. Defaults to 1 second.
138 #[builder(default = Duration::from_secs(1))]
139 pub metric_periodicity: Duration,
140 /// Specifies the aggregation temporality for metric export. Defaults to cumulative.
141 #[builder(default = MetricTemporality::Cumulative)]
142 pub metric_temporality: MetricTemporality,
143 /// A map of tags to be applied to all metrics
144 #[builder(default)]
145 pub global_tags: HashMap<String, String>,
146 /// If set to true, use f64 seconds for durations instead of u64 milliseconds
147 #[builder(default)]
148 pub use_seconds_for_durations: bool,
149 /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
150 #[builder(default)]
151 pub histogram_bucket_overrides: HistogramBucketOverrides,
152 /// Protocol to use for communication with the collector
153 #[builder(default = OtlpProtocol::Grpc)]
154 pub protocol: OtlpProtocol,
155}
156
157/// Options for exporting metrics to Prometheus
158#[derive(Debug, Clone, bon::Builder)]
159pub struct PrometheusExporterOptions {
160 /// The address the Prometheus exporter HTTP server will bind to.
161 pub socket_addr: SocketAddr,
162 /// A map of tags to be applied to all metrics
163 #[builder(default)]
164 pub global_tags: HashMap<String, String>,
165 /// If set true, all counters will include a "_total" suffix
166 #[builder(default)]
167 pub counters_total_suffix: bool,
168 /// If set true, all histograms will include the unit in their name as a suffix.
169 /// Ex: "_milliseconds".
170 #[builder(default)]
171 pub unit_suffix: bool,
172 /// If set to true, use f64 seconds for durations instead of u64 milliseconds
173 #[builder(default)]
174 pub use_seconds_for_durations: bool,
175 /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
176 #[builder(default)]
177 pub histogram_bucket_overrides: HistogramBucketOverrides,
178}
179
180/// Allows overriding the buckets used by histogram metrics
181#[derive(Debug, Clone, Default)]
182pub struct HistogramBucketOverrides {
183 /// Overrides where the key is the metric name and the value is the list of bucket boundaries.
184 /// The metric name will apply regardless of name prefixing, if any. IE: the name acts like
185 /// `*metric_name`.
186 ///
187 /// The string names of core's built-in histogram metrics are publicly available on the
188 /// `core::telemetry` module and the `client` crate.
189 ///
190 /// See [here](https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/metrics/enum.Aggregation.html#variant.ExplicitBucketHistogram.field.boundaries)
191 /// for the exact meaning of boundaries.
192 pub overrides: HashMap<String, Vec<f64>>,
193}
194
195/// Control where logs go
196#[derive(Debug, Clone)]
197pub enum Logger {
198 /// Log directly to console.
199 Console {
200 /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
201 filter: String,
202 },
203 #[cfg(feature = "core-based-sdk")]
204 /// Forward logs to Lang - collectable with `fetch_global_buffered_logs`.
205 Forward {
206 /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
207 filter: String,
208 },
209 #[cfg(feature = "core-based-sdk")]
210 /// Push logs to Lang. Can be used with
211 /// temporalio_sdk_core::telemetry::log_export::CoreLogBufferedConsumer to buffer.
212 Push {
213 /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
214 filter: String,
215 /// Trait invoked on each log.
216 consumer: Arc<dyn CoreLogConsumer>,
217 },
218}
219
220/// Types of aggregation temporality for metric export.
221/// See: <https://github.com/open-telemetry/opentelemetry-specification/blob/ce50e4634efcba8da445cc23523243cb893905cb/specification/metrics/datamodel.md#temporality>
222#[derive(Debug, Clone, Copy)]
223pub enum MetricTemporality {
224 /// Successive data points repeat the starting timestamp
225 Cumulative,
226 /// Successive data points advance the starting timestamp
227 Delta,
228}
229
230/// Options for configuring telemetry
231#[derive(Debug, Clone, Copy)]
232pub enum OtlpProtocol {
233 /// Use gRPC to communicate with the collector
234 Grpc,
235 /// Use HTTP to communicate with the collector
236 Http,
237}
238
239impl Default for TelemetryOptions {
240 fn default() -> Self {
241 TelemetryOptions::builder().build()
242 }
243}
244
245/// A log line (which ultimately came from a tracing event) exported from Core->Lang
246#[derive(Debug)]
247pub struct CoreLog {
248 /// The module within core this message originated from
249 pub target: String,
250 /// Log message
251 pub message: String,
252 /// Time log was generated (not when it was exported to lang)
253 pub timestamp: SystemTime,
254 /// Message level
255 pub level: Level,
256 /// Arbitrary k/v pairs (span k/vs are collapsed with event k/vs here). We could change this
257 /// to include them in `span_contexts` instead, but there's probably not much value for log
258 /// forwarding.
259 pub fields: HashMap<String, serde_json::Value>,
260 /// A list of the outermost to the innermost span names
261 pub span_contexts: Vec<String>,
262}
263
264impl CoreLog {
265 /// Return timestamp as ms since epoch
266 pub fn millis_since_epoch(&self) -> u128 {
267 self.timestamp
268 .duration_since(UNIX_EPOCH)
269 .unwrap_or(Duration::ZERO)
270 .as_millis()
271 }
272}
273
274/// Consumer trait for use with push logger.
275pub trait CoreLogConsumer: Send + Sync + Debug {
276 /// Invoked synchronously for every single log.
277 fn on_log(&self, log: CoreLog);
278}
279
280#[cfg(feature = "core-based-sdk")]
281const FORWARD_LOG_BUFFER_SIZE: usize = 2048;
282
283/// Help you construct an [EnvFilter] compatible filter string which will forward all core module
284/// traces at `core_level` and all others (from 3rd party modules, etc) at `other_level`.
285pub fn construct_filter_string(core_level: Level, other_level: Level) -> String {
286 format!(
287 "{other_level},temporalio_common={core_level},temporalio_sdk_core={core_level},temporalio_client={core_level},temporalio_sdk={core_level}"
288 )
289}
290
291/// Holds initialized tracing/metrics exporters, etc
292pub struct TelemetryInstance {
293 metric_prefix: String,
294 #[cfg(feature = "core-based-sdk")]
295 logs_out: Option<parking_lot::Mutex<CoreLogBuffer>>,
296 metrics: Option<Arc<dyn CoreMeter + 'static>>,
297 /// The tracing subscriber which is associated with this telemetry instance. May be `None` if
298 /// the user has not opted into any tracing configuration.
299 trace_subscriber: Option<Arc<dyn Subscriber + Send + Sync>>,
300 attach_service_name: bool,
301 task_queue_label_strategy: TaskQueueLabelStrategy,
302}
303
304impl TelemetryInstance {
305 /// Return the trace subscriber associated with the telemetry options/instance. Can be used
306 /// to manually set the default for a thread or globally using the `tracing` crate, or with
307 /// [set_trace_subscriber_for_current_thread].
308 pub fn trace_subscriber(&self) -> Option<Arc<dyn Subscriber + Send + Sync>> {
309 self.trace_subscriber.clone()
310 }
311
312 /// Some metric meters cannot be initialized until after a tokio runtime has started and after
313 /// other telemetry has initted (ex: prometheus). They can be attached here.
314 pub fn attach_late_init_metrics(&mut self, meter: Arc<dyn CoreMeter + 'static>) {
315 self.metrics = Some(meter);
316 }
317
318 /// Returns our wrapper for metric meters, including the `metric_prefix` from
319 /// [TelemetryOptions]. This should be used to initialize clients or for any other
320 /// temporal-owned metrics. User defined metrics should use [Self::get_metric_meter].
321 pub fn get_temporal_metric_meter(&self) -> Option<TemporalMeter> {
322 self.metrics.clone().map(|m| {
323 let kvs = self.default_kvs();
324 let attribs = NewAttributes::new(kvs);
325 TemporalMeter::new(
326 Arc::new(PrefixedMetricsMeter::new(self.metric_prefix.clone(), m))
327 as Arc<dyn CoreMeter>,
328 attribs,
329 self.task_queue_label_strategy,
330 )
331 })
332 }
333
334 /// Returns our wrapper for metric meters, including attaching the service name if enabled.
335 pub fn get_metric_meter(&self) -> Option<TemporalMeter> {
336 self.metrics.clone().map(|m| {
337 let kvs = self.default_kvs();
338 let attribs = NewAttributes::new(kvs);
339 TemporalMeter::new(m, attribs, self.task_queue_label_strategy)
340 })
341 }
342
343 fn default_kvs(&self) -> Vec<MetricKeyValue> {
344 if self.attach_service_name {
345 vec![MetricKeyValue::new("service_name", TELEM_SERVICE_NAME)]
346 } else {
347 vec![]
348 }
349 }
350}
351
352thread_local! {
353 static SUB_GUARD: RefCell<Option<tracing::subscriber::DefaultGuard>> =
354 const { RefCell::new(None) };
355}
356/// Set the trace subscriber for the current thread. This must be done in every thread which uses
357/// core stuff, otherwise traces/logs will not be collected on that thread. For example, if using
358/// a multithreaded Tokio runtime, you should ensure that said runtime uses
359/// [on_thread_start](https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.on_thread_start)
360/// or a similar mechanism to call this for each thread within the runtime.
361pub fn set_trace_subscriber_for_current_thread(sub: impl Subscriber + Send + Sync + 'static) {
362 SUB_GUARD.with(|sg| {
363 if sg.borrow().is_none() {
364 let g = tracing::subscriber::set_default(sub);
365 *sg.borrow_mut() = Some(g);
366 }
367 })
368}
369
370/// Undoes [set_trace_subscriber_for_current_thread]
371pub fn remove_trace_subscriber_for_current_thread() {
372 SUB_GUARD.take();
373}
374
375#[cfg(feature = "core-based-sdk")]
376impl CoreTelemetry for TelemetryInstance {
377 fn fetch_buffered_logs(&self) -> Vec<CoreLog> {
378 if let Some(logs_out) = self.logs_out.as_ref() {
379 logs_out.lock().drain()
380 } else {
381 vec![]
382 }
383 }
384}
385
386/// Initialize tracing subscribers/output and logging export, returning a [TelemetryInstance]
387/// which can be used to register default / global tracing subscribers.
388///
389/// You should only call this once per unique [TelemetryOptions]
390///
391/// See [TelemetryOptions] docs for more on configuration.
392pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyhow::Error> {
393 #[cfg(feature = "core-based-sdk")]
394 let mut logs_out = None;
395
396 // Tracing subscriber layers =========
397 let mut console_pretty_layer = None;
398 let mut console_compact_layer = None;
399 #[cfg(feature = "core-based-sdk")]
400 let mut forward_layer = None;
401 // ===================================
402
403 let tracing_sub = if let Some(ts) = opts.subscriber_override {
404 Some(ts)
405 } else {
406 opts.logging.map(|logger| {
407 match logger {
408 Logger::Console { filter } => {
409 // This is silly dupe but can't be avoided without boxing.
410 if env::var("TEMPORAL_CORE_PRETTY_LOGS").is_ok() {
411 console_pretty_layer = Some(
412 tracing_subscriber::fmt::layer()
413 .with_target(false)
414 .event_format(
415 tracing_subscriber::fmt::format()
416 .pretty()
417 .with_source_location(false),
418 )
419 .with_filter(EnvFilter::new(filter)),
420 )
421 } else {
422 console_compact_layer = Some(
423 tracing_subscriber::fmt::layer()
424 .with_target(false)
425 .event_format(
426 tracing_subscriber::fmt::format()
427 .compact()
428 .with_source_location(false),
429 )
430 .with_filter(EnvFilter::new(filter)),
431 )
432 }
433 }
434 #[cfg(feature = "core-based-sdk")]
435 Logger::Forward { filter } => {
436 let (export_layer, lo) =
437 CoreLogConsumerLayer::new_buffered(FORWARD_LOG_BUFFER_SIZE);
438 logs_out = Some(parking_lot::Mutex::new(lo));
439 forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter)));
440 }
441 #[cfg(feature = "core-based-sdk")]
442 Logger::Push { filter, consumer } => {
443 forward_layer = Some(
444 CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter)),
445 );
446 }
447 };
448 let reg = tracing_subscriber::registry()
449 .with(console_pretty_layer)
450 .with(console_compact_layer);
451 #[cfg(feature = "core-based-sdk")]
452 let reg = reg.with(forward_layer);
453
454 Arc::new(reg) as Arc<dyn Subscriber + Send + Sync>
455 })
456 };
457
458 Ok(TelemetryInstance {
459 metric_prefix: opts.metric_prefix,
460 #[cfg(feature = "core-based-sdk")]
461 logs_out,
462 metrics: opts.metrics,
463 trace_subscriber: tracing_sub,
464 attach_service_name: opts.attach_service_name,
465 task_queue_label_strategy: opts.task_queue_label_strategy,
466 })
467}
468
469/// WARNING: Calling can cause panics because of <https://github.com/tokio-rs/tracing/issues/1656>
470/// Lang must not start using until resolved
471///
472/// Initialize telemetry/tracing globally. Useful for testing. Only takes affect when called
473/// the first time. Subsequent calls are ignored.
474pub fn telemetry_init_global(opts: TelemetryOptions) -> Result<(), anyhow::Error> {
475 static INITTED: AtomicBool = AtomicBool::new(false);
476 if INITTED
477 .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
478 .is_ok()
479 {
480 let ti = telemetry_init(opts)?;
481 if let Some(ts) = ti.trace_subscriber() {
482 tracing::subscriber::set_global_default(ts)?;
483 }
484 }
485 Ok(())
486}
487
488/// WARNING: Calling can cause panics because of <https://github.com/tokio-rs/tracing/issues/1656>
489/// Lang must not start using until resolved
490///
491/// Initialize the fallback global handler. All lang SDKs should call this somewhere, once, at
492/// startup, as it initializes a fallback handler for any dependencies (looking at you, otel) that
493/// don't provide good ways to customize their tracing usage. It sets a WARN-level global filter
494/// that uses the default console logger.
495pub fn telemetry_init_fallback() -> Result<(), anyhow::Error> {
496 telemetry_init_global(
497 TelemetryOptions::builder()
498 .logging(Logger::Console {
499 filter: construct_filter_string(Level::DEBUG, Level::WARN),
500 })
501 .build(),
502 )?;
503 Ok(())
504}