Skip to main content

courier/observability/
metrics.rs

1//! Metrics core for Courier.
2//!
3//! Single-stop wiring for the OpenTelemetry metrics SDK:
4//! - [`init_metrics`] builds an [`ObsHandle`] from the parsed
5//!   `[observability]` config (or returns a no-op handle when no
6//!   exporter is configured).
7//! - [`NodeCtx`] holds a pre-built attribute set and counter/histogram
8//!   handles bound to a single node id. Hot loops in `BasicTransform`
9//!   and `ManagedSink` record metrics without rebuilding attributes.
10//! - [`ObsHandle::shutdown`] force-flushes any installed providers so
11//!   the last batch survives a graceful drain.
12//!
13//! The OTLP push path (PeriodicReader + grpc-tonic exporter) is
14//! activated when `[observability.metrics].otlp_endpoint` is set.
15
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use opentelemetry::KeyValue;
21use opentelemetry::metrics::{Counter, Histogram, InstrumentProvider, Meter, MeterProvider};
22use opentelemetry_otlp::{MetricExporter, WithExportConfig};
23use opentelemetry_sdk::Resource;
24use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
25
26use crate::config::{ObservabilityConfig, redact_secret};
27
28/// Coarse-grained classification of a runtime node, used as a metric
29/// attribute so dashboards can slice "all sinks" or "all transforms"
30/// without per-name aggregation.
31#[derive(Debug, Clone, Copy, Eq, PartialEq)]
32pub enum NodeKind {
33    Source,
34    Transform,
35    Sink,
36    /// The implicit broadcast splitter inserted by `spawn_pipeline`
37    /// when a pipeline has more than one sink.
38    Splitter,
39    /// One mpsc edge between two adjacent nodes. Reported by the
40    /// channel-depth sampler; not used by transforms or sinks.
41    Edge,
42}
43
44impl NodeKind {
45    fn as_str(self) -> &'static str {
46        match self {
47            NodeKind::Source => "source",
48            NodeKind::Transform => "transform",
49            NodeKind::Sink => "sink",
50            NodeKind::Splitter => "splitter",
51            NodeKind::Edge => "edge",
52        }
53    }
54}
55
56/// Shared metrics provider plus pre-built instrument handles.
57///
58/// One `ObsHandle` is constructed per `Courier` and cloned into every
59/// `NodeCtx`. Cloning is cheap — instruments are `Arc` internally.
60#[derive(Clone)]
61pub struct ObsHandle {
62    inner: Arc<ObsHandleInner>,
63}
64
65struct ObsHandleInner {
66    /// `Some` when the SDK provider is owned (real or in-memory test
67    /// reader); `None` for the global noop fallback.
68    provider: Option<SdkMeterProvider>,
69    instruments: Instruments,
70    log_keys: bool,
71}
72
73struct Instruments {
74    processed: Counter<u64>,
75    failed: Counter<u64>,
76    filtered: Counter<u64>,
77    retries: Counter<u64>,
78    dead_lettered: Counter<u64>,
79    stage_duration: Histogram<f64>,
80    end_to_end_latency: Histogram<f64>,
81    channel_capacity_used: Histogram<u64>,
82}
83
84#[derive(Debug)]
85struct NoopInstrumentProvider;
86
87impl InstrumentProvider for NoopInstrumentProvider {}
88
89impl ObsHandle {
90    /// Build an `ObsHandle` with no exporter installed. Counters and
91    /// histograms still work (so callers don't branch on `Option`),
92    /// but observations are dropped. This uses a private no-op meter,
93    /// not `opentelemetry::global`, so embedded hosts with a global
94    /// provider do not receive Courier metrics when Courier metrics
95    /// are disabled.
96    pub fn noop() -> Self {
97        Self::noop_with_log_keys(false)
98    }
99
100    fn noop_with_log_keys(log_keys: bool) -> Self {
101        let meter = Meter::new(Arc::new(NoopInstrumentProvider));
102        Self::from_meter(meter, None, log_keys)
103    }
104
105    /// Whether this handle owns a concrete SDK provider. False means
106    /// observations go to a private no-op meter and callers can skip
107    /// auxiliary sampling tasks.
108    pub(crate) fn is_enabled(&self) -> bool {
109        self.inner.provider.is_some()
110    }
111
112    fn from_meter(meter: Meter, provider: Option<SdkMeterProvider>, log_keys: bool) -> Self {
113        let instruments = Instruments {
114            processed: meter
115                .u64_counter("courier_envelopes_processed_total")
116                .with_description("Envelopes successfully processed by a node.")
117                .build(),
118            failed: meter
119                .u64_counter("courier_envelopes_failed_total")
120                .with_description(
121                    "Envelopes that triggered an error in a node, after retries are exhausted.",
122                )
123                .build(),
124            filtered: meter
125                .u64_counter("courier_envelopes_filtered_total")
126                .with_description("Envelopes intentionally dropped by a transform (MapOne returned None).")
127                .build(),
128            retries: meter
129                .u64_counter("courier_retries_total")
130                .with_description("Retry attempts performed by a sink.")
131                .build(),
132            dead_lettered: meter
133                .u64_counter("courier_dead_lettered_total")
134                .with_description("Envelopes routed to a dead-letter sink after retries were exhausted.")
135                .build(),
136            stage_duration: meter
137                .f64_histogram("courier_stage_duration_milliseconds")
138                .with_description("Wall-clock time a node spent processing one envelope.")
139                .with_unit("ms")
140                .build(),
141            end_to_end_latency: meter
142                .f64_histogram("courier_end_to_end_latency_milliseconds")
143                .with_description(
144                    "Time from envelope creation (meta.timestamp_ms) to sink write completion.",
145                )
146                .with_unit("ms")
147                .build(),
148            channel_capacity_used: meter
149                .u64_histogram("courier_channel_capacity_used")
150                .with_description(
151                    "In-flight items on a pipeline edge, sampled periodically (capacity - sender.capacity()).",
152                )
153                .build(),
154        };
155        Self {
156            inner: Arc::new(ObsHandleInner {
157                provider,
158                instruments,
159                log_keys,
160            }),
161        }
162    }
163
164    /// Force-flush pending observations without tearing down the provider.
165    pub fn force_flush(&self) {
166        if let Some(provider) = &self.inner.provider {
167            let _ = provider.force_flush();
168        }
169    }
170
171    /// Drain observations and tear down the installed provider.
172    pub fn shutdown(&self) {
173        if let Some(provider) = &self.inner.provider {
174            let _ = provider.shutdown();
175        }
176    }
177}
178
179/// Per-node bundle of metric attributes and instrument handles.
180///
181/// Constructed once per node by `spawn_pipeline` (or by tests). The
182/// hot-path code in `BasicTransform` / `ManagedSink` stores the
183/// `NodeCtx` alongside its other state and calls `processed_add(1)`
184/// without hashmap lookups.
185#[derive(Clone)]
186pub struct NodeCtx {
187    handle: ObsHandle,
188    attrs: Arc<[KeyValue]>,
189    pipeline: Arc<str>,
190    node_id: Arc<str>,
191    node_kind: NodeKind,
192    log_keys: bool,
193}
194
195impl NodeCtx {
196    /// Build a `NodeCtx` for a single node. The attribute set is
197    /// `pipeline`, `node_id`, `node_kind` — kept tight on purpose to
198    /// avoid cardinality blow-ups (no `meta.key`, no payload labels).
199    pub fn for_node(pipeline: &str, node_id: &str, node_kind: NodeKind, handle: ObsHandle) -> Self {
200        let attrs: Arc<[KeyValue]> = Arc::from(
201            [
202                KeyValue::new("pipeline", pipeline.to_string()),
203                KeyValue::new("node_id", node_id.to_string()),
204                KeyValue::new("node_kind", node_kind.as_str()),
205            ]
206            .as_slice(),
207        );
208        let log_keys = handle.inner.log_keys;
209        Self {
210            handle,
211            attrs,
212            pipeline: Arc::from(pipeline),
213            node_id: Arc::from(node_id),
214            node_kind,
215            log_keys,
216        }
217    }
218
219    /// No-op context with empty attributes, backed by a private
220    /// no-op meter. Used by tests that build pipelines manually and
221    /// by the default state of `BasicTransform` / `ManagedSink` until
222    /// `spawn_pipeline` attaches a real ctx.
223    pub fn noop() -> Self {
224        Self {
225            handle: ObsHandle::noop(),
226            attrs: Arc::from([] as [KeyValue; 0]),
227            pipeline: Arc::from(""),
228            node_id: Arc::from(""),
229            node_kind: NodeKind::Transform,
230            log_keys: false,
231        }
232    }
233
234    pub fn attrs(&self) -> &[KeyValue] {
235        &self.attrs
236    }
237
238    pub fn handle(&self) -> &ObsHandle {
239        &self.handle
240    }
241
242    pub fn pipeline(&self) -> &str {
243        &self.pipeline
244    }
245
246    pub fn node_id(&self) -> &str {
247        &self.node_id
248    }
249
250    pub fn node_kind(&self) -> NodeKind {
251        self.node_kind
252    }
253
254    pub fn node_kind_str(&self) -> &'static str {
255        self.node_kind.as_str()
256    }
257
258    pub fn log_keys(&self) -> bool {
259        self.log_keys
260    }
261
262    pub fn record_processed(&self) {
263        self.handle.inner.instruments.processed.add(1, &self.attrs);
264    }
265
266    pub fn record_filtered(&self) {
267        self.handle.inner.instruments.filtered.add(1, &self.attrs);
268    }
269
270    pub fn record_failed(&self) {
271        self.handle.inner.instruments.failed.add(1, &self.attrs);
272    }
273
274    pub fn record_retry(&self) {
275        self.handle.inner.instruments.retries.add(1, &self.attrs);
276    }
277
278    pub fn record_dead_letter(&self) {
279        self.handle
280            .inner
281            .instruments
282            .dead_lettered
283            .add(1, &self.attrs);
284    }
285
286    pub fn record_stage_duration_ms(&self, ms: f64) {
287        self.handle
288            .inner
289            .instruments
290            .stage_duration
291            .record(ms, &self.attrs);
292    }
293
294    pub fn record_end_to_end_latency_ms(&self, ms: f64) {
295        self.handle
296            .inner
297            .instruments
298            .end_to_end_latency
299            .record(ms, &self.attrs);
300    }
301
302    pub fn record_channel_capacity_used(&self, used: u64) {
303        self.handle
304            .inner
305            .instruments
306            .channel_capacity_used
307            .record(used, &self.attrs);
308    }
309}
310
311/// Build an `ObsHandle` from the parsed `[observability]` config.
312///
313/// When `config.metrics.otlp_endpoint` is set, installs a
314/// `PeriodicReader` over the OTLP/gRPC exporter pushing every
315/// `export_interval_ms`. When unset, returns a no-op handle so the
316/// rest of the runtime is unaware of the difference.
317pub fn init_metrics(config: Option<&ObservabilityConfig>) -> Result<ObsHandle> {
318    let Some(obs) = config else {
319        return Ok(ObsHandle::noop());
320    };
321    let Some(endpoint) = super::configured_endpoint(obs.metrics.otlp_endpoint.as_deref()) else {
322        return Ok(ObsHandle::noop_with_log_keys(obs.log_keys));
323    };
324
325    let exporter = MetricExporter::builder()
326        .with_tonic()
327        .with_endpoint(endpoint)
328        .build()
329        .with_context(|| {
330            format!(
331                "failed to build OTLP metric exporter for {}",
332                redact_secret(endpoint)
333            )
334        })?;
335
336    let reader = PeriodicReader::builder(exporter)
337        .with_interval(Duration::from_millis(obs.metrics.export_interval_ms))
338        .build();
339
340    let resource = Resource::builder()
341        .with_service_name(obs.service_name.clone())
342        .build();
343
344    let provider = SdkMeterProvider::builder()
345        .with_reader(reader)
346        .with_resource(resource)
347        .build();
348
349    let meter = provider.meter("courier");
350    Ok(ObsHandle::from_meter(meter, Some(provider), obs.log_keys))
351}
352
353#[cfg(test)]
354pub(crate) mod testing {
355    //! Test helpers — an `InMemoryMetricExporter` paired with a
356    //! `PeriodicReader` so unit tests can drive a small pipeline,
357    //! force a flush, and assert exact counter / histogram values
358    //! without touching a real OTLP collector.
359
360    use std::time::Duration;
361
362    use opentelemetry::metrics::MeterProvider;
363    use opentelemetry_sdk::Resource;
364    use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
365    use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
366
367    use super::ObsHandle;
368
369    /// Build an `ObsHandle` backed by an in-memory exporter and return
370    /// the exporter so tests can pull collected metrics out of it.
371    pub fn obs_handle_in_memory() -> (ObsHandle, InMemoryMetricExporter) {
372        let exporter = InMemoryMetricExporter::default();
373        // 1-hour interval — tests must call `provider.force_flush()`
374        // explicitly so the timing is deterministic. The reader still
375        // exists so the SDK has somewhere to push.
376        let reader = PeriodicReader::builder(exporter.clone())
377            .with_interval(Duration::from_secs(3600))
378            .build();
379        let provider = SdkMeterProvider::builder()
380            .with_reader(reader)
381            .with_resource(Resource::builder().with_service_name("test").build())
382            .build();
383        let meter = provider.meter("courier_test");
384        let handle = ObsHandle::from_meter(meter, Some(provider), false);
385        (handle, exporter)
386    }
387
388    /// Sum a `u64` counter across every collected resource metrics
389    /// snapshot, restricted to data points whose attribute set
390    /// matches `expected_attrs` (subset match — extra attrs ignored).
391    pub fn counter_sum(
392        exporter: &InMemoryMetricExporter,
393        metric_name: &str,
394        expected_attrs: &[(&str, &str)],
395    ) -> u64 {
396        let mut total = 0u64;
397        for rm in exporter.get_finished_metrics().unwrap_or_default() {
398            for sm in rm.scope_metrics() {
399                for metric in sm.metrics() {
400                    if metric.name() != metric_name {
401                        continue;
402                    }
403                    if let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() {
404                        for dp in sum.data_points() {
405                            if attrs_match(dp.attributes(), expected_attrs) {
406                                total += dp.value();
407                            }
408                        }
409                    }
410                }
411            }
412        }
413        total
414    }
415
416    /// Number of histogram observations for `metric_name` matching
417    /// `expected_attrs`. Useful for "did this run record N samples?"
418    pub fn histogram_count(
419        exporter: &InMemoryMetricExporter,
420        metric_name: &str,
421        expected_attrs: &[(&str, &str)],
422    ) -> u64 {
423        let mut total = 0u64;
424        for rm in exporter.get_finished_metrics().unwrap_or_default() {
425            for sm in rm.scope_metrics() {
426                for metric in sm.metrics() {
427                    if metric.name() != metric_name {
428                        continue;
429                    }
430                    match metric.data() {
431                        AggregatedMetrics::F64(MetricData::Histogram(h)) => {
432                            for dp in h.data_points() {
433                                if attrs_match(dp.attributes(), expected_attrs) {
434                                    total += dp.count();
435                                }
436                            }
437                        }
438                        AggregatedMetrics::U64(MetricData::Histogram(h)) => {
439                            for dp in h.data_points() {
440                                if attrs_match(dp.attributes(), expected_attrs) {
441                                    total += dp.count();
442                                }
443                            }
444                        }
445                        _ => {}
446                    }
447                }
448            }
449        }
450        total
451    }
452
453    fn attrs_match<'a>(
454        actual: impl Iterator<Item = &'a opentelemetry::KeyValue>,
455        expected: &[(&str, &str)],
456    ) -> bool {
457        let actual: Vec<_> = actual.collect();
458        expected.iter().all(|(k, v)| {
459            actual
460                .iter()
461                .any(|kv| kv.key.as_str() == *k && kv.value.as_str() == *v)
462        })
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use std::time::Duration;
469
470    use opentelemetry::global;
471    use opentelemetry_sdk::Resource;
472    use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
473
474    use super::testing::counter_sum;
475    use super::*;
476
477    #[test]
478    fn noop_handle_does_not_record_to_global_meter_provider() {
479        let exporter = InMemoryMetricExporter::default();
480        let reader = PeriodicReader::builder(exporter.clone())
481            .with_interval(Duration::from_secs(3600))
482            .build();
483        let provider = SdkMeterProvider::builder()
484            .with_reader(reader)
485            .with_resource(Resource::builder().with_service_name("host").build())
486            .build();
487
488        global::set_meter_provider(provider.clone());
489
490        let ctx = NodeCtx::noop();
491        ctx.record_processed();
492        ctx.record_failed();
493        ctx.record_stage_duration_ms(1.0);
494
495        let _ = provider.force_flush();
496
497        assert_eq!(
498            counter_sum(&exporter, "courier_envelopes_processed_total", &[]),
499            0
500        );
501        assert_eq!(
502            counter_sum(&exporter, "courier_envelopes_failed_total", &[]),
503            0
504        );
505    }
506
507    #[test]
508    fn init_metrics_preserves_log_keys_when_exporter_is_disabled() {
509        let obs = ObservabilityConfig {
510            log_keys: true,
511            ..ObservabilityConfig::default()
512        };
513
514        let handle = init_metrics(Some(&obs)).unwrap();
515        assert!(!handle.is_enabled());
516
517        let ctx = NodeCtx::for_node("p", "p/source", NodeKind::Source, handle);
518        assert!(ctx.log_keys());
519    }
520}