temporal_sdk_core_api/
telemetry.rs

1pub mod metrics;
2
3use crate::telemetry::metrics::CoreMeter;
4use std::{
5    collections::HashMap,
6    fmt::{Debug, Formatter},
7    net::SocketAddr,
8    sync::Arc,
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11use tracing_core::{Level, Subscriber};
12use url::Url;
13
14pub static METRIC_PREFIX: &str = "temporal_";
15
16/// Each core runtime instance has a telemetry subsystem associated with it, this trait defines the
17/// operations that lang might want to perform on that telemetry after it's initialized.
18pub trait CoreTelemetry {
19    /// Each worker buffers logs that should be shuttled over to lang so that they may be rendered
20    /// with the user's desired logging library. Use this function to grab the most recent buffered
21    /// logs since the last time it was called. A fixed number of such logs are retained at maximum,
22    /// with the oldest being dropped when full.
23    ///
24    /// Returns the list of logs from oldest to newest. Returns an empty vec if the feature is not
25    /// configured.
26    fn fetch_buffered_logs(&self) -> Vec<CoreLog>;
27}
28
29/// Telemetry configuration options. Construct with [TelemetryOptionsBuilder]
30#[derive(Clone, derive_builder::Builder)]
31#[non_exhaustive]
32pub struct TelemetryOptions {
33    /// Optional logger - set as None to disable.
34    #[builder(setter(into, strip_option), default)]
35    pub logging: Option<Logger>,
36    /// Optional metrics exporter - set as None to disable.
37    #[builder(setter(into, strip_option), default)]
38    pub metrics: Option<Arc<dyn CoreMeter>>,
39    /// If set true (the default) explicitly attach a `service_name` label to all metrics. Turn this
40    /// off if your collection system supports the `target_info` metric from the OpenMetrics spec.
41    /// For more, see
42    /// [here](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
43    #[builder(default = "true")]
44    pub attach_service_name: bool,
45    /// A prefix to be applied to all core-created metrics. Defaults to "temporal_".
46    #[builder(default = "METRIC_PREFIX.to_string()")]
47    pub metric_prefix: String,
48    /// If provided, logging config will be ignored and this explicit subscriber will be used for
49    /// all logging and traces.
50    #[builder(setter(strip_option), default)]
51    pub subscriber_override: Option<Arc<dyn Subscriber + Send + Sync>>,
52}
53impl Debug for TelemetryOptions {
54    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55        #[derive(Debug)]
56        #[allow(dead_code)]
57        struct TelemetryOptions<'a> {
58            logging: &'a Option<Logger>,
59            metrics: &'a Option<Arc<dyn CoreMeter>>,
60            attach_service_name: &'a bool,
61            metric_prefix: &'a str,
62        }
63        let Self {
64            logging,
65            metrics,
66            attach_service_name,
67            metric_prefix,
68            ..
69        } = self;
70
71        Debug::fmt(
72            &TelemetryOptions {
73                logging,
74                metrics,
75                attach_service_name,
76                metric_prefix,
77            },
78            f,
79        )
80    }
81}
82
83/// Options for exporting to an OpenTelemetry Collector
84#[derive(Debug, Clone, derive_builder::Builder)]
85pub struct OtelCollectorOptions {
86    /// The url of the OTel collector to export telemetry and metrics to. Lang SDK should also
87    /// export to this same collector.
88    pub url: Url,
89    /// Optional set of HTTP headers to send to the Collector, e.g for authentication.
90    #[builder(default = "HashMap::new()")]
91    pub headers: HashMap<String, String>,
92    /// Optionally specify how frequently metrics should be exported. Defaults to 1 second.
93    #[builder(default = "Duration::from_secs(1)")]
94    pub metric_periodicity: Duration,
95    /// Specifies the aggregation temporality for metric export. Defaults to cumulative.
96    #[builder(default = "MetricTemporality::Cumulative")]
97    pub metric_temporality: MetricTemporality,
98    /// A map of tags to be applied to all metrics
99    #[builder(default)]
100    pub global_tags: HashMap<String, String>,
101    /// If set to true, use f64 seconds for durations instead of u64 milliseconds
102    #[builder(default)]
103    pub use_seconds_for_durations: bool,
104    /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
105    #[builder(default)]
106    pub histogram_bucket_overrides: HistogramBucketOverrides,
107    /// Protocol to use for communication with the collector
108    #[builder(default = "OtlpProtocol::Grpc")]
109    pub protocol: OtlpProtocol,
110}
111
112/// Options for exporting metrics to Prometheus
113#[derive(Debug, Clone, derive_builder::Builder)]
114pub struct PrometheusExporterOptions {
115    pub socket_addr: SocketAddr,
116    // A map of tags to be applied to all metrics
117    #[builder(default)]
118    pub global_tags: HashMap<String, String>,
119    /// If set true, all counters will include a "_total" suffix
120    #[builder(default)]
121    pub counters_total_suffix: bool,
122    /// If set true, all histograms will include the unit in their name as a suffix.
123    /// Ex: "_milliseconds".
124    #[builder(default)]
125    pub unit_suffix: bool,
126    /// If set to true, use f64 seconds for durations instead of u64 milliseconds
127    #[builder(default)]
128    pub use_seconds_for_durations: bool,
129    /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
130    #[builder(default)]
131    pub histogram_bucket_overrides: HistogramBucketOverrides,
132}
133
134/// Allows overriding the buckets used by histogram metrics
135#[derive(Debug, Clone, Default)]
136pub struct HistogramBucketOverrides {
137    /// Overrides where the key is the metric name and the value is the list of bucket boundaries.
138    /// The metric name will apply regardless of name prefixing, if any. IE: the name acts like
139    /// `*metric_name`.
140    ///
141    /// The string names of core's built-in histogram metrics are publicly available on the
142    /// `core::telemetry` module and the `client` crate.
143    ///
144    /// See [here](https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/metrics/enum.Aggregation.html#variant.ExplicitBucketHistogram.field.boundaries)
145    /// for the exact meaning of boundaries.
146    pub overrides: HashMap<String, Vec<f64>>,
147}
148
149/// Control where logs go
150#[derive(Debug, Clone)]
151pub enum Logger {
152    /// Log directly to console.
153    Console {
154        /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
155        filter: String,
156    },
157    /// Forward logs to Lang - collectable with `fetch_global_buffered_logs`.
158    Forward {
159        /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
160        filter: String,
161    },
162    /// Push logs to Lang. Can be used with
163    /// temporal_sdk_core::telemetry::log_export::CoreLogBufferedConsumer to buffer.
164    Push {
165        /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
166        filter: String,
167        /// Trait invoked on each log.
168        consumer: Arc<dyn CoreLogConsumer>,
169    },
170}
171
172/// Types of aggregation temporality for metric export.
173/// See: <https://github.com/open-telemetry/opentelemetry-specification/blob/ce50e4634efcba8da445cc23523243cb893905cb/specification/metrics/datamodel.md#temporality>
174#[derive(Debug, Clone, Copy)]
175pub enum MetricTemporality {
176    /// Successive data points repeat the starting timestamp
177    Cumulative,
178    /// Successive data points advance the starting timestamp
179    Delta,
180}
181
182/// Options for configuring telemetry
183#[derive(Debug, Clone, Copy)]
184pub enum OtlpProtocol {
185    /// Use gRPC to communicate with the collector
186    Grpc,
187    /// Use HTTP to communicate with the collector
188    Http,
189}
190
191impl Default for TelemetryOptions {
192    fn default() -> Self {
193        TelemetryOptionsBuilder::default().build().unwrap()
194    }
195}
196
197/// A log line (which ultimately came from a tracing event) exported from Core->Lang
198#[derive(Debug)]
199pub struct CoreLog {
200    /// The module within core this message originated from
201    pub target: String,
202    /// Log message
203    pub message: String,
204    /// Time log was generated (not when it was exported to lang)
205    pub timestamp: SystemTime,
206    /// Message level
207    pub level: Level,
208    /// Arbitrary k/v pairs (span k/vs are collapsed with event k/vs here). We could change this
209    /// to include them in `span_contexts` instead, but there's probably not much value for log
210    /// forwarding.
211    pub fields: HashMap<String, serde_json::Value>,
212    /// A list of the outermost to the innermost span names
213    pub span_contexts: Vec<String>,
214}
215
216impl CoreLog {
217    /// Return timestamp as ms since epoch
218    pub fn millis_since_epoch(&self) -> u128 {
219        self.timestamp
220            .duration_since(UNIX_EPOCH)
221            .unwrap_or(Duration::ZERO)
222            .as_millis()
223    }
224}
225
226/// Consumer trait for use with push logger.
227pub trait CoreLogConsumer: Send + Sync + Debug {
228    /// Invoked synchronously for every single log.
229    fn on_log(&self, log: CoreLog);
230}