1use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use opentelemetry::KeyValue;
21use opentelemetry::metrics::{Counter, Histogram, InstrumentProvider, Meter, MeterProvider};
22use opentelemetry_otlp::{MetricExporter, WithExportConfig};
23use opentelemetry_sdk::Resource;
24use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
25
26use crate::config::{ObservabilityConfig, redact_secret};
27
28#[derive(Debug, Clone, Copy, Eq, PartialEq)]
32pub enum NodeKind {
33 Source,
34 Transform,
35 Sink,
36 Splitter,
39 Edge,
42}
43
44impl NodeKind {
45 fn as_str(self) -> &'static str {
46 match self {
47 NodeKind::Source => "source",
48 NodeKind::Transform => "transform",
49 NodeKind::Sink => "sink",
50 NodeKind::Splitter => "splitter",
51 NodeKind::Edge => "edge",
52 }
53 }
54}
55
56#[derive(Clone)]
61pub struct ObsHandle {
62 inner: Arc<ObsHandleInner>,
63}
64
65struct ObsHandleInner {
66 provider: Option<SdkMeterProvider>,
69 instruments: Instruments,
70 log_keys: bool,
71}
72
73struct Instruments {
74 processed: Counter<u64>,
75 failed: Counter<u64>,
76 filtered: Counter<u64>,
77 retries: Counter<u64>,
78 dead_lettered: Counter<u64>,
79 stage_duration: Histogram<f64>,
80 end_to_end_latency: Histogram<f64>,
81 channel_capacity_used: Histogram<u64>,
82}
83
84#[derive(Debug)]
85struct NoopInstrumentProvider;
86
87impl InstrumentProvider for NoopInstrumentProvider {}
88
89impl ObsHandle {
90 pub fn noop() -> Self {
97 Self::noop_with_log_keys(false)
98 }
99
100 fn noop_with_log_keys(log_keys: bool) -> Self {
101 let meter = Meter::new(Arc::new(NoopInstrumentProvider));
102 Self::from_meter(meter, None, log_keys)
103 }
104
105 pub(crate) fn is_enabled(&self) -> bool {
109 self.inner.provider.is_some()
110 }
111
112 fn from_meter(meter: Meter, provider: Option<SdkMeterProvider>, log_keys: bool) -> Self {
113 let instruments = Instruments {
114 processed: meter
115 .u64_counter("courier_envelopes_processed_total")
116 .with_description("Envelopes successfully processed by a node.")
117 .build(),
118 failed: meter
119 .u64_counter("courier_envelopes_failed_total")
120 .with_description(
121 "Envelopes that triggered an error in a node, after retries are exhausted.",
122 )
123 .build(),
124 filtered: meter
125 .u64_counter("courier_envelopes_filtered_total")
126 .with_description("Envelopes intentionally dropped by a transform (MapOne returned None).")
127 .build(),
128 retries: meter
129 .u64_counter("courier_retries_total")
130 .with_description("Retry attempts performed by a sink.")
131 .build(),
132 dead_lettered: meter
133 .u64_counter("courier_dead_lettered_total")
134 .with_description("Envelopes routed to a dead-letter sink after retries were exhausted.")
135 .build(),
136 stage_duration: meter
137 .f64_histogram("courier_stage_duration_milliseconds")
138 .with_description("Wall-clock time a node spent processing one envelope.")
139 .with_unit("ms")
140 .build(),
141 end_to_end_latency: meter
142 .f64_histogram("courier_end_to_end_latency_milliseconds")
143 .with_description(
144 "Time from envelope creation (meta.timestamp_ms) to sink write completion.",
145 )
146 .with_unit("ms")
147 .build(),
148 channel_capacity_used: meter
149 .u64_histogram("courier_channel_capacity_used")
150 .with_description(
151 "In-flight items on a pipeline edge, sampled periodically (capacity - sender.capacity()).",
152 )
153 .build(),
154 };
155 Self {
156 inner: Arc::new(ObsHandleInner {
157 provider,
158 instruments,
159 log_keys,
160 }),
161 }
162 }
163
164 pub fn force_flush(&self) {
166 if let Some(provider) = &self.inner.provider {
167 let _ = provider.force_flush();
168 }
169 }
170
171 pub fn shutdown(&self) {
173 if let Some(provider) = &self.inner.provider {
174 let _ = provider.shutdown();
175 }
176 }
177}
178
179#[derive(Clone)]
186pub struct NodeCtx {
187 handle: ObsHandle,
188 attrs: Arc<[KeyValue]>,
189 pipeline: Arc<str>,
190 node_id: Arc<str>,
191 node_kind: NodeKind,
192 log_keys: bool,
193}
194
195impl NodeCtx {
196 pub fn for_node(pipeline: &str, node_id: &str, node_kind: NodeKind, handle: ObsHandle) -> Self {
200 let attrs: Arc<[KeyValue]> = Arc::from(
201 [
202 KeyValue::new("pipeline", pipeline.to_string()),
203 KeyValue::new("node_id", node_id.to_string()),
204 KeyValue::new("node_kind", node_kind.as_str()),
205 ]
206 .as_slice(),
207 );
208 let log_keys = handle.inner.log_keys;
209 Self {
210 handle,
211 attrs,
212 pipeline: Arc::from(pipeline),
213 node_id: Arc::from(node_id),
214 node_kind,
215 log_keys,
216 }
217 }
218
219 pub fn noop() -> Self {
224 Self {
225 handle: ObsHandle::noop(),
226 attrs: Arc::from([] as [KeyValue; 0]),
227 pipeline: Arc::from(""),
228 node_id: Arc::from(""),
229 node_kind: NodeKind::Transform,
230 log_keys: false,
231 }
232 }
233
234 pub fn attrs(&self) -> &[KeyValue] {
235 &self.attrs
236 }
237
238 pub fn handle(&self) -> &ObsHandle {
239 &self.handle
240 }
241
242 pub fn pipeline(&self) -> &str {
243 &self.pipeline
244 }
245
246 pub fn node_id(&self) -> &str {
247 &self.node_id
248 }
249
250 pub fn node_kind(&self) -> NodeKind {
251 self.node_kind
252 }
253
254 pub fn node_kind_str(&self) -> &'static str {
255 self.node_kind.as_str()
256 }
257
258 pub fn log_keys(&self) -> bool {
259 self.log_keys
260 }
261
262 pub fn record_processed(&self) {
263 self.handle.inner.instruments.processed.add(1, &self.attrs);
264 }
265
266 pub fn record_filtered(&self) {
267 self.handle.inner.instruments.filtered.add(1, &self.attrs);
268 }
269
270 pub fn record_failed(&self) {
271 self.handle.inner.instruments.failed.add(1, &self.attrs);
272 }
273
274 pub fn record_retry(&self) {
275 self.handle.inner.instruments.retries.add(1, &self.attrs);
276 }
277
278 pub fn record_dead_letter(&self) {
279 self.handle
280 .inner
281 .instruments
282 .dead_lettered
283 .add(1, &self.attrs);
284 }
285
286 pub fn record_stage_duration_ms(&self, ms: f64) {
287 self.handle
288 .inner
289 .instruments
290 .stage_duration
291 .record(ms, &self.attrs);
292 }
293
294 pub fn record_end_to_end_latency_ms(&self, ms: f64) {
295 self.handle
296 .inner
297 .instruments
298 .end_to_end_latency
299 .record(ms, &self.attrs);
300 }
301
302 pub fn record_channel_capacity_used(&self, used: u64) {
303 self.handle
304 .inner
305 .instruments
306 .channel_capacity_used
307 .record(used, &self.attrs);
308 }
309}
310
311pub fn init_metrics(config: Option<&ObservabilityConfig>) -> Result<ObsHandle> {
318 let Some(obs) = config else {
319 return Ok(ObsHandle::noop());
320 };
321 let Some(endpoint) = super::configured_endpoint(obs.metrics.otlp_endpoint.as_deref()) else {
322 return Ok(ObsHandle::noop_with_log_keys(obs.log_keys));
323 };
324
325 let exporter = MetricExporter::builder()
326 .with_tonic()
327 .with_endpoint(endpoint)
328 .build()
329 .with_context(|| {
330 format!(
331 "failed to build OTLP metric exporter for {}",
332 redact_secret(endpoint)
333 )
334 })?;
335
336 let reader = PeriodicReader::builder(exporter)
337 .with_interval(Duration::from_millis(obs.metrics.export_interval_ms))
338 .build();
339
340 let resource = Resource::builder()
341 .with_service_name(obs.service_name.clone())
342 .build();
343
344 let provider = SdkMeterProvider::builder()
345 .with_reader(reader)
346 .with_resource(resource)
347 .build();
348
349 let meter = provider.meter("courier");
350 Ok(ObsHandle::from_meter(meter, Some(provider), obs.log_keys))
351}
352
353#[cfg(test)]
354pub(crate) mod testing {
355 use std::time::Duration;
361
362 use opentelemetry::metrics::MeterProvider;
363 use opentelemetry_sdk::Resource;
364 use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
365 use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
366
367 use super::ObsHandle;
368
369 pub fn obs_handle_in_memory() -> (ObsHandle, InMemoryMetricExporter) {
372 let exporter = InMemoryMetricExporter::default();
373 let reader = PeriodicReader::builder(exporter.clone())
377 .with_interval(Duration::from_secs(3600))
378 .build();
379 let provider = SdkMeterProvider::builder()
380 .with_reader(reader)
381 .with_resource(Resource::builder().with_service_name("test").build())
382 .build();
383 let meter = provider.meter("courier_test");
384 let handle = ObsHandle::from_meter(meter, Some(provider), false);
385 (handle, exporter)
386 }
387
388 pub fn counter_sum(
392 exporter: &InMemoryMetricExporter,
393 metric_name: &str,
394 expected_attrs: &[(&str, &str)],
395 ) -> u64 {
396 let mut total = 0u64;
397 for rm in exporter.get_finished_metrics().unwrap_or_default() {
398 for sm in rm.scope_metrics() {
399 for metric in sm.metrics() {
400 if metric.name() != metric_name {
401 continue;
402 }
403 if let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() {
404 for dp in sum.data_points() {
405 if attrs_match(dp.attributes(), expected_attrs) {
406 total += dp.value();
407 }
408 }
409 }
410 }
411 }
412 }
413 total
414 }
415
416 pub fn histogram_count(
419 exporter: &InMemoryMetricExporter,
420 metric_name: &str,
421 expected_attrs: &[(&str, &str)],
422 ) -> u64 {
423 let mut total = 0u64;
424 for rm in exporter.get_finished_metrics().unwrap_or_default() {
425 for sm in rm.scope_metrics() {
426 for metric in sm.metrics() {
427 if metric.name() != metric_name {
428 continue;
429 }
430 match metric.data() {
431 AggregatedMetrics::F64(MetricData::Histogram(h)) => {
432 for dp in h.data_points() {
433 if attrs_match(dp.attributes(), expected_attrs) {
434 total += dp.count();
435 }
436 }
437 }
438 AggregatedMetrics::U64(MetricData::Histogram(h)) => {
439 for dp in h.data_points() {
440 if attrs_match(dp.attributes(), expected_attrs) {
441 total += dp.count();
442 }
443 }
444 }
445 _ => {}
446 }
447 }
448 }
449 }
450 total
451 }
452
453 fn attrs_match<'a>(
454 actual: impl Iterator<Item = &'a opentelemetry::KeyValue>,
455 expected: &[(&str, &str)],
456 ) -> bool {
457 let actual: Vec<_> = actual.collect();
458 expected.iter().all(|(k, v)| {
459 actual
460 .iter()
461 .any(|kv| kv.key.as_str() == *k && kv.value.as_str() == *v)
462 })
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use std::time::Duration;
469
470 use opentelemetry::global;
471 use opentelemetry_sdk::Resource;
472 use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
473
474 use super::testing::counter_sum;
475 use super::*;
476
477 #[test]
478 fn noop_handle_does_not_record_to_global_meter_provider() {
479 let exporter = InMemoryMetricExporter::default();
480 let reader = PeriodicReader::builder(exporter.clone())
481 .with_interval(Duration::from_secs(3600))
482 .build();
483 let provider = SdkMeterProvider::builder()
484 .with_reader(reader)
485 .with_resource(Resource::builder().with_service_name("host").build())
486 .build();
487
488 global::set_meter_provider(provider.clone());
489
490 let ctx = NodeCtx::noop();
491 ctx.record_processed();
492 ctx.record_failed();
493 ctx.record_stage_duration_ms(1.0);
494
495 let _ = provider.force_flush();
496
497 assert_eq!(
498 counter_sum(&exporter, "courier_envelopes_processed_total", &[]),
499 0
500 );
501 assert_eq!(
502 counter_sum(&exporter, "courier_envelopes_failed_total", &[]),
503 0
504 );
505 }
506
507 #[test]
508 fn init_metrics_preserves_log_keys_when_exporter_is_disabled() {
509 let obs = ObservabilityConfig {
510 log_keys: true,
511 ..ObservabilityConfig::default()
512 };
513
514 let handle = init_metrics(Some(&obs)).unwrap();
515 assert!(!handle.is_enabled());
516
517 let ctx = NodeCtx::for_node("p", "p/source", NodeKind::Source, handle);
518 assert!(ctx.log_keys());
519 }
520}