temporal_sdk_core_api/telemetry.rs
1pub mod metrics;
2
3use crate::telemetry::metrics::CoreMeter;
4use std::{
5 collections::HashMap,
6 fmt::{Debug, Formatter},
7 net::SocketAddr,
8 sync::Arc,
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11use tracing_core::{Level, Subscriber};
12use url::Url;
13
14pub static METRIC_PREFIX: &str = "temporal_";
15
16/// Each core runtime instance has a telemetry subsystem associated with it, this trait defines the
17/// operations that lang might want to perform on that telemetry after it's initialized.
18pub trait CoreTelemetry {
19 /// Each worker buffers logs that should be shuttled over to lang so that they may be rendered
20 /// with the user's desired logging library. Use this function to grab the most recent buffered
21 /// logs since the last time it was called. A fixed number of such logs are retained at maximum,
22 /// with the oldest being dropped when full.
23 ///
24 /// Returns the list of logs from oldest to newest. Returns an empty vec if the feature is not
25 /// configured.
26 fn fetch_buffered_logs(&self) -> Vec<CoreLog>;
27}
28
29/// Telemetry configuration options. Construct with [TelemetryOptionsBuilder]
30#[derive(Clone, derive_builder::Builder)]
31#[non_exhaustive]
32pub struct TelemetryOptions {
33 /// Optional logger - set as None to disable.
34 #[builder(setter(into, strip_option), default)]
35 pub logging: Option<Logger>,
36 /// Optional metrics exporter - set as None to disable.
37 #[builder(setter(into, strip_option), default)]
38 pub metrics: Option<Arc<dyn CoreMeter>>,
39 /// If set true (the default) explicitly attach a `service_name` label to all metrics. Turn this
40 /// off if your collection system supports the `target_info` metric from the OpenMetrics spec.
41 /// For more, see
42 /// [here](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
43 #[builder(default = "true")]
44 pub attach_service_name: bool,
45 /// A prefix to be applied to all core-created metrics. Defaults to "temporal_".
46 #[builder(default = "METRIC_PREFIX.to_string()")]
47 pub metric_prefix: String,
48 /// If provided, logging config will be ignored and this explicit subscriber will be used for
49 /// all logging and traces.
50 #[builder(setter(strip_option), default)]
51 pub subscriber_override: Option<Arc<dyn Subscriber + Send + Sync>>,
52}
53impl Debug for TelemetryOptions {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 #[derive(Debug)]
56 #[allow(dead_code)]
57 struct TelemetryOptions<'a> {
58 logging: &'a Option<Logger>,
59 metrics: &'a Option<Arc<dyn CoreMeter>>,
60 attach_service_name: &'a bool,
61 metric_prefix: &'a str,
62 }
63 let Self {
64 logging,
65 metrics,
66 attach_service_name,
67 metric_prefix,
68 ..
69 } = self;
70
71 Debug::fmt(
72 &TelemetryOptions {
73 logging,
74 metrics,
75 attach_service_name,
76 metric_prefix,
77 },
78 f,
79 )
80 }
81}
82
83/// Options for exporting to an OpenTelemetry Collector
84#[derive(Debug, Clone, derive_builder::Builder)]
85pub struct OtelCollectorOptions {
86 /// The url of the OTel collector to export telemetry and metrics to. Lang SDK should also
87 /// export to this same collector.
88 pub url: Url,
89 /// Optional set of HTTP headers to send to the Collector, e.g for authentication.
90 #[builder(default = "HashMap::new()")]
91 pub headers: HashMap<String, String>,
92 /// Optionally specify how frequently metrics should be exported. Defaults to 1 second.
93 #[builder(default = "Duration::from_secs(1)")]
94 pub metric_periodicity: Duration,
95 /// Specifies the aggregation temporality for metric export. Defaults to cumulative.
96 #[builder(default = "MetricTemporality::Cumulative")]
97 pub metric_temporality: MetricTemporality,
98 /// A map of tags to be applied to all metrics
99 #[builder(default)]
100 pub global_tags: HashMap<String, String>,
101 /// If set to true, use f64 seconds for durations instead of u64 milliseconds
102 #[builder(default)]
103 pub use_seconds_for_durations: bool,
104 /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
105 #[builder(default)]
106 pub histogram_bucket_overrides: HistogramBucketOverrides,
107 /// Protocol to use for communication with the collector
108 #[builder(default = "OtlpProtocol::Grpc")]
109 pub protocol: OtlpProtocol,
110}
111
112/// Options for exporting metrics to Prometheus
113#[derive(Debug, Clone, derive_builder::Builder)]
114pub struct PrometheusExporterOptions {
115 pub socket_addr: SocketAddr,
116 // A map of tags to be applied to all metrics
117 #[builder(default)]
118 pub global_tags: HashMap<String, String>,
119 /// If set true, all counters will include a "_total" suffix
120 #[builder(default)]
121 pub counters_total_suffix: bool,
122 /// If set true, all histograms will include the unit in their name as a suffix.
123 /// Ex: "_milliseconds".
124 #[builder(default)]
125 pub unit_suffix: bool,
126 /// If set to true, use f64 seconds for durations instead of u64 milliseconds
127 #[builder(default)]
128 pub use_seconds_for_durations: bool,
129 /// Overrides for histogram buckets. Units depend on the value of `use_seconds_for_durations`.
130 #[builder(default)]
131 pub histogram_bucket_overrides: HistogramBucketOverrides,
132}
133
134/// Allows overriding the buckets used by histogram metrics
135#[derive(Debug, Clone, Default)]
136pub struct HistogramBucketOverrides {
137 /// Overrides where the key is the metric name and the value is the list of bucket boundaries.
138 /// The metric name will apply regardless of name prefixing, if any. IE: the name acts like
139 /// `*metric_name`.
140 ///
141 /// The string names of core's built-in histogram metrics are publicly available on the
142 /// `core::telemetry` module and the `client` crate.
143 ///
144 /// See [here](https://docs.rs/opentelemetry_sdk/latest/opentelemetry_sdk/metrics/enum.Aggregation.html#variant.ExplicitBucketHistogram.field.boundaries)
145 /// for the exact meaning of boundaries.
146 pub overrides: HashMap<String, Vec<f64>>,
147}
148
149/// Control where logs go
150#[derive(Debug, Clone)]
151pub enum Logger {
152 /// Log directly to console.
153 Console {
154 /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
155 filter: String,
156 },
157 /// Forward logs to Lang - collectable with `fetch_global_buffered_logs`.
158 Forward {
159 /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
160 filter: String,
161 },
162 /// Push logs to Lang. Can be used with
163 /// temporal_sdk_core::telemetry::log_export::CoreLogBufferedConsumer to buffer.
164 Push {
165 /// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
166 filter: String,
167 /// Trait invoked on each log.
168 consumer: Arc<dyn CoreLogConsumer>,
169 },
170}
171
172/// Types of aggregation temporality for metric export.
173/// See: <https://github.com/open-telemetry/opentelemetry-specification/blob/ce50e4634efcba8da445cc23523243cb893905cb/specification/metrics/datamodel.md#temporality>
174#[derive(Debug, Clone, Copy)]
175pub enum MetricTemporality {
176 /// Successive data points repeat the starting timestamp
177 Cumulative,
178 /// Successive data points advance the starting timestamp
179 Delta,
180}
181
182/// Options for configuring telemetry
183#[derive(Debug, Clone, Copy)]
184pub enum OtlpProtocol {
185 /// Use gRPC to communicate with the collector
186 Grpc,
187 /// Use HTTP to communicate with the collector
188 Http,
189}
190
191impl Default for TelemetryOptions {
192 fn default() -> Self {
193 TelemetryOptionsBuilder::default().build().unwrap()
194 }
195}
196
197/// A log line (which ultimately came from a tracing event) exported from Core->Lang
198#[derive(Debug)]
199pub struct CoreLog {
200 /// The module within core this message originated from
201 pub target: String,
202 /// Log message
203 pub message: String,
204 /// Time log was generated (not when it was exported to lang)
205 pub timestamp: SystemTime,
206 /// Message level
207 pub level: Level,
208 /// Arbitrary k/v pairs (span k/vs are collapsed with event k/vs here). We could change this
209 /// to include them in `span_contexts` instead, but there's probably not much value for log
210 /// forwarding.
211 pub fields: HashMap<String, serde_json::Value>,
212 /// A list of the outermost to the innermost span names
213 pub span_contexts: Vec<String>,
214}
215
216impl CoreLog {
217 /// Return timestamp as ms since epoch
218 pub fn millis_since_epoch(&self) -> u128 {
219 self.timestamp
220 .duration_since(UNIX_EPOCH)
221 .unwrap_or(Duration::ZERO)
222 .as_millis()
223 }
224}
225
226/// Consumer trait for use with push logger.
227pub trait CoreLogConsumer: Send + Sync + Debug {
228 /// Invoked synchronously for every single log.
229 fn on_log(&self, log: CoreLog);
230}