use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use opentelemetry::KeyValue;
use opentelemetry::metrics::{Counter, Histogram, InstrumentProvider, Meter, MeterProvider};
use opentelemetry_otlp::{MetricExporter, WithExportConfig};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use crate::config::{ObservabilityConfig, redact_secret};
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum NodeKind {
Source,
Transform,
Sink,
Splitter,
Edge,
}
impl NodeKind {
fn as_str(self) -> &'static str {
match self {
NodeKind::Source => "source",
NodeKind::Transform => "transform",
NodeKind::Sink => "sink",
NodeKind::Splitter => "splitter",
NodeKind::Edge => "edge",
}
}
}
#[derive(Clone)]
pub struct ObsHandle {
inner: Arc<ObsHandleInner>,
}
struct ObsHandleInner {
provider: Option<SdkMeterProvider>,
instruments: Instruments,
log_keys: bool,
}
struct Instruments {
processed: Counter<u64>,
failed: Counter<u64>,
filtered: Counter<u64>,
retries: Counter<u64>,
dead_lettered: Counter<u64>,
stage_duration: Histogram<f64>,
end_to_end_latency: Histogram<f64>,
channel_capacity_used: Histogram<u64>,
}
#[derive(Debug)]
struct NoopInstrumentProvider;
impl InstrumentProvider for NoopInstrumentProvider {}
impl ObsHandle {
pub fn noop() -> Self {
Self::noop_with_log_keys(false)
}
fn noop_with_log_keys(log_keys: bool) -> Self {
let meter = Meter::new(Arc::new(NoopInstrumentProvider));
Self::from_meter(meter, None, log_keys)
}
pub(crate) fn is_enabled(&self) -> bool {
self.inner.provider.is_some()
}
fn from_meter(meter: Meter, provider: Option<SdkMeterProvider>, log_keys: bool) -> Self {
let instruments = Instruments {
processed: meter
.u64_counter("courier_envelopes_processed_total")
.with_description("Envelopes successfully processed by a node.")
.build(),
failed: meter
.u64_counter("courier_envelopes_failed_total")
.with_description(
"Envelopes that triggered an error in a node, after retries are exhausted.",
)
.build(),
filtered: meter
.u64_counter("courier_envelopes_filtered_total")
.with_description("Envelopes intentionally dropped by a transform (MapOne returned None).")
.build(),
retries: meter
.u64_counter("courier_retries_total")
.with_description("Retry attempts performed by a sink.")
.build(),
dead_lettered: meter
.u64_counter("courier_dead_lettered_total")
.with_description("Envelopes routed to a dead-letter sink after retries were exhausted.")
.build(),
stage_duration: meter
.f64_histogram("courier_stage_duration_milliseconds")
.with_description("Wall-clock time a node spent processing one envelope.")
.with_unit("ms")
.build(),
end_to_end_latency: meter
.f64_histogram("courier_end_to_end_latency_milliseconds")
.with_description(
"Time from envelope creation (meta.timestamp_ms) to sink write completion.",
)
.with_unit("ms")
.build(),
channel_capacity_used: meter
.u64_histogram("courier_channel_capacity_used")
.with_description(
"In-flight items on a pipeline edge, sampled periodically (capacity - sender.capacity()).",
)
.build(),
};
Self {
inner: Arc::new(ObsHandleInner {
provider,
instruments,
log_keys,
}),
}
}
pub fn force_flush(&self) {
if let Some(provider) = &self.inner.provider {
let _ = provider.force_flush();
}
}
pub fn shutdown(&self) {
if let Some(provider) = &self.inner.provider {
let _ = provider.shutdown();
}
}
}
#[derive(Clone)]
pub struct NodeCtx {
handle: ObsHandle,
attrs: Arc<[KeyValue]>,
pipeline: Arc<str>,
node_id: Arc<str>,
node_kind: NodeKind,
log_keys: bool,
}
impl NodeCtx {
pub fn for_node(pipeline: &str, node_id: &str, node_kind: NodeKind, handle: ObsHandle) -> Self {
let attrs: Arc<[KeyValue]> = Arc::from(
[
KeyValue::new("pipeline", pipeline.to_string()),
KeyValue::new("node_id", node_id.to_string()),
KeyValue::new("node_kind", node_kind.as_str()),
]
.as_slice(),
);
let log_keys = handle.inner.log_keys;
Self {
handle,
attrs,
pipeline: Arc::from(pipeline),
node_id: Arc::from(node_id),
node_kind,
log_keys,
}
}
pub fn noop() -> Self {
Self {
handle: ObsHandle::noop(),
attrs: Arc::from([] as [KeyValue; 0]),
pipeline: Arc::from(""),
node_id: Arc::from(""),
node_kind: NodeKind::Transform,
log_keys: false,
}
}
pub fn attrs(&self) -> &[KeyValue] {
&self.attrs
}
pub fn handle(&self) -> &ObsHandle {
&self.handle
}
pub fn pipeline(&self) -> &str {
&self.pipeline
}
pub fn node_id(&self) -> &str {
&self.node_id
}
pub fn node_kind(&self) -> NodeKind {
self.node_kind
}
pub fn node_kind_str(&self) -> &'static str {
self.node_kind.as_str()
}
pub fn log_keys(&self) -> bool {
self.log_keys
}
pub fn record_processed(&self) {
self.handle.inner.instruments.processed.add(1, &self.attrs);
}
pub fn record_filtered(&self) {
self.handle.inner.instruments.filtered.add(1, &self.attrs);
}
pub fn record_failed(&self) {
self.handle.inner.instruments.failed.add(1, &self.attrs);
}
pub fn record_retry(&self) {
self.handle.inner.instruments.retries.add(1, &self.attrs);
}
pub fn record_dead_letter(&self) {
self.handle
.inner
.instruments
.dead_lettered
.add(1, &self.attrs);
}
pub fn record_stage_duration_ms(&self, ms: f64) {
self.handle
.inner
.instruments
.stage_duration
.record(ms, &self.attrs);
}
pub fn record_end_to_end_latency_ms(&self, ms: f64) {
self.handle
.inner
.instruments
.end_to_end_latency
.record(ms, &self.attrs);
}
pub fn record_channel_capacity_used(&self, used: u64) {
self.handle
.inner
.instruments
.channel_capacity_used
.record(used, &self.attrs);
}
}
pub fn init_metrics(config: Option<&ObservabilityConfig>) -> Result<ObsHandle> {
let Some(obs) = config else {
return Ok(ObsHandle::noop());
};
let Some(endpoint) = super::configured_endpoint(obs.metrics.otlp_endpoint.as_deref()) else {
return Ok(ObsHandle::noop_with_log_keys(obs.log_keys));
};
let exporter = MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.with_context(|| {
format!(
"failed to build OTLP metric exporter for {}",
redact_secret(endpoint)
)
})?;
let reader = PeriodicReader::builder(exporter)
.with_interval(Duration::from_millis(obs.metrics.export_interval_ms))
.build();
let resource = Resource::builder()
.with_service_name(obs.service_name.clone())
.build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource)
.build();
let meter = provider.meter("courier");
Ok(ObsHandle::from_meter(meter, Some(provider), obs.log_keys))
}
#[cfg(test)]
pub(crate) mod testing {
use std::time::Duration;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
use super::ObsHandle;
pub fn obs_handle_in_memory() -> (ObsHandle, InMemoryMetricExporter) {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(Duration::from_secs(3600))
.build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::builder().with_service_name("test").build())
.build();
let meter = provider.meter("courier_test");
let handle = ObsHandle::from_meter(meter, Some(provider), false);
(handle, exporter)
}
pub fn counter_sum(
exporter: &InMemoryMetricExporter,
metric_name: &str,
expected_attrs: &[(&str, &str)],
) -> u64 {
let mut total = 0u64;
for rm in exporter.get_finished_metrics().unwrap_or_default() {
for sm in rm.scope_metrics() {
for metric in sm.metrics() {
if metric.name() != metric_name {
continue;
}
if let AggregatedMetrics::U64(MetricData::Sum(sum)) = metric.data() {
for dp in sum.data_points() {
if attrs_match(dp.attributes(), expected_attrs) {
total += dp.value();
}
}
}
}
}
}
total
}
pub fn histogram_count(
exporter: &InMemoryMetricExporter,
metric_name: &str,
expected_attrs: &[(&str, &str)],
) -> u64 {
let mut total = 0u64;
for rm in exporter.get_finished_metrics().unwrap_or_default() {
for sm in rm.scope_metrics() {
for metric in sm.metrics() {
if metric.name() != metric_name {
continue;
}
match metric.data() {
AggregatedMetrics::F64(MetricData::Histogram(h)) => {
for dp in h.data_points() {
if attrs_match(dp.attributes(), expected_attrs) {
total += dp.count();
}
}
}
AggregatedMetrics::U64(MetricData::Histogram(h)) => {
for dp in h.data_points() {
if attrs_match(dp.attributes(), expected_attrs) {
total += dp.count();
}
}
}
_ => {}
}
}
}
}
total
}
fn attrs_match<'a>(
actual: impl Iterator<Item = &'a opentelemetry::KeyValue>,
expected: &[(&str, &str)],
) -> bool {
let actual: Vec<_> = actual.collect();
expected.iter().all(|(k, v)| {
actual
.iter()
.any(|kv| kv.key.as_str() == *k && kv.value.as_str() == *v)
})
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use opentelemetry::global;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider};
use super::testing::counter_sum;
use super::*;
#[test]
fn noop_handle_does_not_record_to_global_meter_provider() {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone())
.with_interval(Duration::from_secs(3600))
.build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::builder().with_service_name("host").build())
.build();
global::set_meter_provider(provider.clone());
let ctx = NodeCtx::noop();
ctx.record_processed();
ctx.record_failed();
ctx.record_stage_duration_ms(1.0);
let _ = provider.force_flush();
assert_eq!(
counter_sum(&exporter, "courier_envelopes_processed_total", &[]),
0
);
assert_eq!(
counter_sum(&exporter, "courier_envelopes_failed_total", &[]),
0
);
}
#[test]
fn init_metrics_preserves_log_keys_when_exporter_is_disabled() {
let obs = ObservabilityConfig {
log_keys: true,
..ObservabilityConfig::default()
};
let handle = init_metrics(Some(&obs)).unwrap();
assert!(!handle.is_enabled());
let ctx = NodeCtx::for_node("p", "p/source", NodeKind::Source, handle);
assert!(ctx.log_keys());
}
}