Skip to main content

temporalio_common/
telemetry.rs

1//! Contains tracing/logging and metrics related functionality
2
3/// Metric instrument types and the [`CoreMeter`] trait.
4pub mod metrics;
5
6#[cfg(feature = "core-based-sdk")]
7mod log_export;
8#[cfg(feature = "otel")]
9mod otel;
10#[cfg(feature = "prometheus")]
11mod prometheus_meter;
12#[cfg(feature = "prometheus")]
13mod prometheus_server;
14
15use crate::telemetry::metrics::{
16    CoreMeter, MetricKeyValue, NewAttributes, PrefixedMetricsMeter, TemporalMeter,
17};
18use std::{
19    cell::RefCell,
20    collections::HashMap,
21    env,
22    fmt::{Debug, Formatter},
23    net::SocketAddr,
24    sync::{
25        Arc,
26        atomic::{AtomicBool, Ordering},
27    },
28    time::{Duration, SystemTime, UNIX_EPOCH},
29};
30use tracing::{Level, Subscriber};
31use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt};
32use url::Url;
33
34#[cfg(feature = "core-based-sdk")]
35use crate::telemetry::log_export::CoreLogConsumerLayer;
36
37#[cfg(feature = "core-based-sdk")]
38pub use log_export::{CoreLogBuffer, CoreLogBufferedConsumer, CoreLogStreamConsumer};
39#[cfg(feature = "otel")]
40pub use otel::build_otlp_metric_exporter;
41#[cfg(feature = "prometheus")]
42pub use prometheus_server::start_prometheus_metric_exporter;
43
44/// The default prefix applied to all Temporal metric names.
45pub static METRIC_PREFIX: &str = "temporal_";
46
47const TELEM_SERVICE_NAME: &str = "temporal-core-sdk";
48
49/// Each core runtime instance has a telemetry subsystem associated with it, this trait defines the
50/// operations that lang might want to perform on that telemetry after it's initialized.
51pub trait CoreTelemetry {
52    /// Each worker buffers logs that should be shuttled over to lang so that they may be rendered
53    /// with the user's desired logging library. Use this function to grab the most recent buffered
54    /// logs since the last time it was called. A fixed number of such logs are retained at maximum,
55    /// with the oldest being dropped when full.
56    ///
57    /// Returns the list of logs from oldest to newest. Returns an empty vec if the feature is not
58    /// configured.
59    fn fetch_buffered_logs(&self) -> Vec<CoreLog>;
60}
61
62/// Telemetry configuration options. Construct with [TelemetryOptions::builder]
63#[derive(Clone, bon::Builder)]
64#[non_exhaustive]
65pub struct TelemetryOptions {
66    /// Optional logger - set as None to disable.
67    #[builder(into)]
68    pub logging: Option<Logger>,
69    /// Optional metrics exporter - set as None to disable.
70    #[builder(into)]
71    pub metrics: Option<Arc<dyn CoreMeter>>,
72    /// If set true (the default) explicitly attach a `service_name` label to all metrics. Turn this
73    /// off if your collection system supports the `target_info` metric from the OpenMetrics spec.
74    /// For more, see
75    /// [here](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
76    #[builder(default = true)]
77    pub attach_service_name: bool,
78    /// A prefix to be applied to all core-created metrics. Defaults to "temporal_".
79    #[builder(default = METRIC_PREFIX.to_string())]
80    pub metric_prefix: String,
81    /// If provided, logging config will be ignored and this explicit subscriber will be used for
82    /// all logging and traces.
83    pub subscriber_override: Option<Arc<dyn Subscriber + Send + Sync>>,
84    /// See [TaskQueueLabelStrategy].
85    #[builder(default = TaskQueueLabelStrategy::UseNormal)]
86    pub task_queue_label_strategy: TaskQueueLabelStrategy,
87}
88impl Debug for TelemetryOptions {
89    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
90        #[derive(Debug)]
91        #[allow(dead_code)]
92        struct TelemetryOptions<'a> {
93            logging: &'a Option<Logger>,
94            metrics: &'a Option<Arc<dyn CoreMeter>>,
95            attach_service_name: &'a bool,
96            metric_prefix: &'a str,
97        }
98        let Self {
99            logging,
100            metrics,
101            attach_service_name,
102            metric_prefix,
103            ..
104        } = self;
105
106        Debug::fmt(
107            &TelemetryOptions {
108                logging,
109                metrics,
110                attach_service_name,
111                metric_prefix,
112            },
113            f,
114        )
115    }
116}
117
118/// Determines how the `task_queue` label value is set on metrics.
119#[derive(Copy, Clone, Debug)]
120#[non_exhaustive]
121pub enum TaskQueueLabelStrategy {
122    /// Always use the normal task queue name, including for actions relating to sticky queues.
123    UseNormal,
124    /// Use the sticky queue name when recording metrics operating on sticky queues.
125    UseNormalAndSticky,
126}
127
128/// Options for exporting to an OpenTelemetry Collector
129#[derive(Debug, Clone, bon::Builder)]
130pub struct OtelCollectorOptions {
131    /// The url of the OTel collector to export telemetry and metrics to. Lang SDK should also
132    /// export to this same collector.
133    pub url: Url,
134    /// Optional set of HTTP headers to send to the Collector, e.g for authentication.
135    #[builder(default = HashMap::new())]
136    pub headers: HashMap<String, String>,
137    /// Optionally specify how frequently metrics should be exported. Defaults to 1 second.
138    #[builder(default = Duration::from_secs(1))]
139    pub metric_periodicity: Duration,
140    /// Specifies the aggregation temporality for metric export. Defaults to cumulative.
141    #[builder(default = MetricTemporality::Cumulative)]
142    pub metric_temporality: MetricTemporality,
143    /// A map of tags to be applied to all metrics
144    #[builder(default)]
145    pub global_tags: HashMap<String, String>,
146    /// If set to true, use f64 seconds for durations instead of u64 milliseconds
147    #[builder(default)]
148    pub use_seconds_for_durations: bool,
149    /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
150    #[builder(default)]
151    pub histogram_bucket_overrides: HistogramBucketOverrides,
152    /// Protocol to use for communication with the collector
153    #[builder(default = OtlpProtocol::Grpc)]
154    pub protocol: OtlpProtocol,
155}
156
157/// Options for exporting metrics to Prometheus
158#[derive(Debug, Clone, bon::Builder)]
159pub struct PrometheusExporterOptions {
160    /// The address the Prometheus exporter HTTP server will bind to.
161    pub socket_addr: SocketAddr,
162    /// A map of tags to be applied to all metrics
163    #[builder(default)]
164    pub global_tags: HashMap<String, String>,
165    /// If set true, all counters will include a "_total" suffix
166    #[builder(default)]
167    pub counters_total_suffix: bool,
168    /// If set true, all histograms will include the unit in their name as a suffix.
169    /// Ex: "_milliseconds".
170    #[builder(default)]
171    pub unit_suffix: bool,
172    /// If set to true, use f64 seconds for durations instead of u64 milliseconds
173    #[builder(default)]
174    pub use_seconds_for_durations: bool,
175    /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
176    #[builder(default)]
177    pub histogram_bucket_overrides: HistogramBucketOverrides,
178}
179
180/// Allows overriding the buckets used by histogram metrics
181#[derive(Debug, Clone, Default)]
182pub struct HistogramBucketOverrides {
183    /// Overrides where the key is the metric name and the value is the list of bucket boundaries.
184    /// The metric name will apply regardless of name prefixing, if any. IE: the name acts like
185    /// `*metric_name`.
186    ///
187    /// The string names of core's built-in histogram metrics are publicly available on the
188    /// `core::telemetry` module and the `client` crate.
189    ///
190    /// See [here](https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/metrics/enum.Aggregation.html#variant.ExplicitBucketHistogram.field.boundaries)
191    /// for the exact meaning of boundaries.
192    pub overrides: HashMap<String, Vec<f64>>,
193}
194
195/// Control where logs go
196#[derive(Debug, Clone)]
197pub enum Logger {
198    /// Log directly to console.
199    Console {
200        /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
201        filter: String,
202    },
203    #[cfg(feature = "core-based-sdk")]
204    /// Forward logs to Lang - collectable with `fetch_global_buffered_logs`.
205    Forward {
206        /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
207        filter: String,
208    },
209    #[cfg(feature = "core-based-sdk")]
210    /// Push logs to Lang. Can be used with
211    /// temporalio_sdk_core::telemetry::log_export::CoreLogBufferedConsumer to buffer.
212    Push {
213        /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
214        filter: String,
215        /// Trait invoked on each log.
216        consumer: Arc<dyn CoreLogConsumer>,
217    },
218}
219
220/// Types of aggregation temporality for metric export.
221/// See: <https://github.com/open-telemetry/opentelemetry-specification/blob/ce50e4634efcba8da445cc23523243cb893905cb/specification/metrics/datamodel.md#temporality>
222#[derive(Debug, Clone, Copy)]
223pub enum MetricTemporality {
224    /// Successive data points repeat the starting timestamp
225    Cumulative,
226    /// Successive data points advance the starting timestamp
227    Delta,
228}
229
230/// Options for configuring telemetry
231#[derive(Debug, Clone, Copy)]
232pub enum OtlpProtocol {
233    /// Use gRPC to communicate with the collector
234    Grpc,
235    /// Use HTTP to communicate with the collector
236    Http,
237}
238
239impl Default for TelemetryOptions {
240    fn default() -> Self {
241        TelemetryOptions::builder().build()
242    }
243}
244
245/// A log line (which ultimately came from a tracing event) exported from Core->Lang
246#[derive(Debug)]
247pub struct CoreLog {
248    /// The module within core this message originated from
249    pub target: String,
250    /// Log message
251    pub message: String,
252    /// Time log was generated (not when it was exported to lang)
253    pub timestamp: SystemTime,
254    /// Message level
255    pub level: Level,
256    /// Arbitrary k/v pairs (span k/vs are collapsed with event k/vs here). We could change this
257    /// to include them in `span_contexts` instead, but there's probably not much value for log
258    /// forwarding.
259    pub fields: HashMap<String, serde_json::Value>,
260    /// A list of the outermost to the innermost span names
261    pub span_contexts: Vec<String>,
262}
263
264impl CoreLog {
265    /// Return timestamp as ms since epoch
266    pub fn millis_since_epoch(&self) -> u128 {
267        self.timestamp
268            .duration_since(UNIX_EPOCH)
269            .unwrap_or(Duration::ZERO)
270            .as_millis()
271    }
272}
273
274/// Consumer trait for use with push logger.
275pub trait CoreLogConsumer: Send + Sync + Debug {
276    /// Invoked synchronously for every single log.
277    fn on_log(&self, log: CoreLog);
278}
279
280#[cfg(feature = "core-based-sdk")]
281const FORWARD_LOG_BUFFER_SIZE: usize = 2048;
282
283/// Help you construct an [EnvFilter] compatible filter string which will forward all core module
284/// traces at `core_level` and all others (from 3rd party modules, etc) at `other_level`.
285pub fn construct_filter_string(core_level: Level, other_level: Level) -> String {
286    format!(
287        "{other_level},temporalio_common={core_level},temporalio_sdk_core={core_level},temporalio_client={core_level},temporalio_sdk={core_level}"
288    )
289}
290
291/// Holds initialized tracing/metrics exporters, etc
292pub struct TelemetryInstance {
293    metric_prefix: String,
294    #[cfg(feature = "core-based-sdk")]
295    logs_out: Option<parking_lot::Mutex<CoreLogBuffer>>,
296    metrics: Option<Arc<dyn CoreMeter + 'static>>,
297    /// The tracing subscriber which is associated with this telemetry instance. May be `None` if
298    /// the user has not opted into any tracing configuration.
299    trace_subscriber: Option<Arc<dyn Subscriber + Send + Sync>>,
300    attach_service_name: bool,
301    task_queue_label_strategy: TaskQueueLabelStrategy,
302}
303
304impl TelemetryInstance {
305    /// Return the trace subscriber associated with the telemetry options/instance. Can be used
306    /// to manually set the default for a thread or globally using the `tracing` crate, or with
307    /// [set_trace_subscriber_for_current_thread].
308    pub fn trace_subscriber(&self) -> Option<Arc<dyn Subscriber + Send + Sync>> {
309        self.trace_subscriber.clone()
310    }
311
312    /// Some metric meters cannot be initialized until after a tokio runtime has started and after
313    /// other telemetry has initted (ex: prometheus). They can be attached here.
314    pub fn attach_late_init_metrics(&mut self, meter: Arc<dyn CoreMeter + 'static>) {
315        self.metrics = Some(meter);
316    }
317
318    /// Returns our wrapper for metric meters, including the `metric_prefix` from
319    /// [TelemetryOptions]. This should be used to initialize clients or for any other
320    /// temporal-owned metrics. User defined metrics should use [Self::get_metric_meter].
321    pub fn get_temporal_metric_meter(&self) -> Option<TemporalMeter> {
322        self.metrics.clone().map(|m| {
323            let kvs = self.default_kvs();
324            let attribs = NewAttributes::new(kvs);
325            TemporalMeter::new(
326                Arc::new(PrefixedMetricsMeter::new(self.metric_prefix.clone(), m))
327                    as Arc<dyn CoreMeter>,
328                attribs,
329                self.task_queue_label_strategy,
330            )
331        })
332    }
333
334    /// Returns our wrapper for metric meters, including attaching the service name if enabled.
335    pub fn get_metric_meter(&self) -> Option<TemporalMeter> {
336        self.metrics.clone().map(|m| {
337            let kvs = self.default_kvs();
338            let attribs = NewAttributes::new(kvs);
339            TemporalMeter::new(m, attribs, self.task_queue_label_strategy)
340        })
341    }
342
343    fn default_kvs(&self) -> Vec<MetricKeyValue> {
344        if self.attach_service_name {
345            vec![MetricKeyValue::new("service_name", TELEM_SERVICE_NAME)]
346        } else {
347            vec![]
348        }
349    }
350}
351
352thread_local! {
353    static SUB_GUARD: RefCell<Option<tracing::subscriber::DefaultGuard>> =
354        const { RefCell::new(None) };
355}
356/// Set the trace subscriber for the current thread. This must be done in every thread which uses
357/// core stuff, otherwise traces/logs will not be collected on that thread. For example, if using
358/// a multithreaded Tokio runtime, you should ensure that said runtime uses
359/// [on_thread_start](https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.on_thread_start)
360/// or a similar mechanism to call this for each thread within the runtime.
361pub fn set_trace_subscriber_for_current_thread(sub: impl Subscriber + Send + Sync + 'static) {
362    SUB_GUARD.with(|sg| {
363        if sg.borrow().is_none() {
364            let g = tracing::subscriber::set_default(sub);
365            *sg.borrow_mut() = Some(g);
366        }
367    })
368}
369
370/// Undoes [set_trace_subscriber_for_current_thread]
371pub fn remove_trace_subscriber_for_current_thread() {
372    SUB_GUARD.take();
373}
374
375#[cfg(feature = "core-based-sdk")]
376impl CoreTelemetry for TelemetryInstance {
377    fn fetch_buffered_logs(&self) -> Vec<CoreLog> {
378        if let Some(logs_out) = self.logs_out.as_ref() {
379            logs_out.lock().drain()
380        } else {
381            vec![]
382        }
383    }
384}
385
386/// Initialize tracing subscribers/output and logging export, returning a [TelemetryInstance]
387/// which can be used to register default / global tracing subscribers.
388///
389/// You should only call this once per unique [TelemetryOptions]
390///
391/// See [TelemetryOptions] docs for more on configuration.
392pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyhow::Error> {
393    #[cfg(feature = "core-based-sdk")]
394    let mut logs_out = None;
395
396    // Tracing subscriber layers =========
397    let mut console_pretty_layer = None;
398    let mut console_compact_layer = None;
399    #[cfg(feature = "core-based-sdk")]
400    let mut forward_layer = None;
401    // ===================================
402
403    let tracing_sub = if let Some(ts) = opts.subscriber_override {
404        Some(ts)
405    } else {
406        opts.logging.map(|logger| {
407            match logger {
408                Logger::Console { filter } => {
409                    // This is silly dupe but can't be avoided without boxing.
410                    if env::var("TEMPORAL_CORE_PRETTY_LOGS").is_ok() {
411                        console_pretty_layer = Some(
412                            tracing_subscriber::fmt::layer()
413                                .with_target(false)
414                                .event_format(
415                                    tracing_subscriber::fmt::format()
416                                        .pretty()
417                                        .with_source_location(false),
418                                )
419                                .with_filter(EnvFilter::new(filter)),
420                        )
421                    } else {
422                        console_compact_layer = Some(
423                            tracing_subscriber::fmt::layer()
424                                .with_target(false)
425                                .event_format(
426                                    tracing_subscriber::fmt::format()
427                                        .compact()
428                                        .with_source_location(false),
429                                )
430                                .with_filter(EnvFilter::new(filter)),
431                        )
432                    }
433                }
434                #[cfg(feature = "core-based-sdk")]
435                Logger::Forward { filter } => {
436                    let (export_layer, lo) =
437                        CoreLogConsumerLayer::new_buffered(FORWARD_LOG_BUFFER_SIZE);
438                    logs_out = Some(parking_lot::Mutex::new(lo));
439                    forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter)));
440                }
441                #[cfg(feature = "core-based-sdk")]
442                Logger::Push { filter, consumer } => {
443                    forward_layer = Some(
444                        CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter)),
445                    );
446                }
447            };
448            let reg = tracing_subscriber::registry()
449                .with(console_pretty_layer)
450                .with(console_compact_layer);
451            #[cfg(feature = "core-based-sdk")]
452            let reg = reg.with(forward_layer);
453
454            Arc::new(reg) as Arc<dyn Subscriber + Send + Sync>
455        })
456    };
457
458    Ok(TelemetryInstance {
459        metric_prefix: opts.metric_prefix,
460        #[cfg(feature = "core-based-sdk")]
461        logs_out,
462        metrics: opts.metrics,
463        trace_subscriber: tracing_sub,
464        attach_service_name: opts.attach_service_name,
465        task_queue_label_strategy: opts.task_queue_label_strategy,
466    })
467}
468
469/// WARNING: Calling can cause panics because of <https://github.com/tokio-rs/tracing/issues/1656>
470/// Lang must not start using until resolved
471///
472/// Initialize telemetry/tracing globally. Useful for testing. Only takes affect when called
473/// the first time. Subsequent calls are ignored.
474pub fn telemetry_init_global(opts: TelemetryOptions) -> Result<(), anyhow::Error> {
475    static INITTED: AtomicBool = AtomicBool::new(false);
476    if INITTED
477        .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
478        .is_ok()
479    {
480        let ti = telemetry_init(opts)?;
481        if let Some(ts) = ti.trace_subscriber() {
482            tracing::subscriber::set_global_default(ts)?;
483        }
484    }
485    Ok(())
486}
487
488/// WARNING: Calling can cause panics because of <https://github.com/tokio-rs/tracing/issues/1656>
489/// Lang must not start using until resolved
490///
491/// Initialize the fallback global handler. All lang SDKs should call this somewhere, once, at
492/// startup, as it initializes a fallback handler for any dependencies (looking at you, otel) that
493/// don't provide good ways to customize their tracing usage. It sets a WARN-level global filter
494/// that uses the default console logger.
495pub fn telemetry_init_fallback() -> Result<(), anyhow::Error> {
496    telemetry_init_global(
497        TelemetryOptions::builder()
498            .logging(Logger::Console {
499                filter: construct_filter_string(Level::DEBUG, Level::WARN),
500            })
501            .build(),
502    )?;
503    Ok(())
504}