Skip to main content

gap/
telemetry.rs

1use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
2use std::sync::OnceLock;
3
4use opentelemetry::trace::TracerProvider;
5use tracing_opentelemetry::OpenTelemetryLayer;
6use tracing_subscriber::layer::SubscriberExt;
7use tracing_subscriber::util::SubscriberInitExt;
8use tracing_subscriber::EnvFilter;
9
10/// Metric instruments accessible from anywhere via `Metrics::get()`.
11pub struct Metrics {
12    pub envelope_apply_count: AtomicU64,
13    envelope_apply_duration_sum_us: AtomicU64,
14    envelope_apply_duration_min_us: AtomicU64,
15    envelope_apply_duration_max_us: AtomicU64,
16    pub watcher_changes_detected: AtomicU64,
17    watcher_poll_count: AtomicU64,
18    watcher_poll_sum_us: AtomicU64,
19    watcher_poll_min_us: AtomicU64,
20    watcher_poll_max_us: AtomicU64,
21    pub broadcast_lag_count: AtomicU64,
22}
23
24static METRICS: OnceLock<Metrics> = OnceLock::new();
25
26impl Metrics {
27    /// Returns the global metrics instruments. Panics if `init()` was not called.
28    pub fn get() -> &'static Metrics {
29        METRICS.get().expect("telemetry not initialised")
30    }
31
32    fn new() -> Self {
33        Self {
34            envelope_apply_count: AtomicU64::new(0),
35            envelope_apply_duration_sum_us: AtomicU64::new(0),
36            envelope_apply_duration_min_us: AtomicU64::new(u64::MAX),
37            envelope_apply_duration_max_us: AtomicU64::new(0),
38            watcher_changes_detected: AtomicU64::new(0),
39            watcher_poll_count: AtomicU64::new(0),
40            watcher_poll_sum_us: AtomicU64::new(0),
41            watcher_poll_min_us: AtomicU64::new(u64::MAX),
42            watcher_poll_max_us: AtomicU64::new(0),
43            broadcast_lag_count: AtomicU64::new(0),
44        }
45    }
46
47    /// Record a completed envelope apply.
48    pub fn record_apply(&self, duration_ms: f64) {
49        self.envelope_apply_count.fetch_add(1, Relaxed);
50        let us = (duration_ms * 1000.0) as u64;
51        self.envelope_apply_duration_sum_us.fetch_add(us, Relaxed);
52        self.envelope_apply_duration_min_us.fetch_min(us, Relaxed);
53        self.envelope_apply_duration_max_us.fetch_max(us, Relaxed);
54    }
55
56    /// Record a watcher poll duration.
57    pub fn record_poll(&self, duration_ms: f64) {
58        self.watcher_poll_count.fetch_add(1, Relaxed);
59        let us = (duration_ms * 1000.0) as u64;
60        self.watcher_poll_sum_us.fetch_add(us, Relaxed);
61        self.watcher_poll_min_us.fetch_min(us, Relaxed);
62        self.watcher_poll_max_us.fetch_max(us, Relaxed);
63    }
64}
65
66/// Guard returned by `init()`. Call `shutdown()` to print the metrics summary.
67pub struct TelemetryGuard;
68
69/// Initialise tracing + metrics. Call once at startup.
70pub fn init() -> TelemetryGuard {
71    METRICS.get_or_init(Metrics::new);
72
73    let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder().build();
74    let tracer = tracer_provider.tracer("aap");
75    let otel_layer = OpenTelemetryLayer::new(tracer);
76
77    let env_filter = EnvFilter::try_from_default_env()
78        .unwrap_or_else(|_| EnvFilter::new("aap=info"));
79
80    tracing_subscriber::registry()
81        .with(env_filter)
82        .with(
83            tracing_subscriber::fmt::layer()
84                .compact()
85                .with_writer(std::io::stderr),
86        )
87        .with(otel_layer)
88        .init();
89
90    TelemetryGuard
91}
92
93impl TelemetryGuard {
94    /// Print a human-readable metrics summary table to stderr.
95    pub fn shutdown(self) {
96        let m = Metrics::get();
97
98        eprintln!();
99        eprintln!(
100            "\u{2500}\u{2500} Metrics Summary \u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}"
101        );
102
103        let ac = m.envelope_apply_count.load(Relaxed);
104        eprintln!("{:<30}{}", "envelope.apply_count", ac);
105
106        if ac > 0 {
107            let sum = m.envelope_apply_duration_sum_us.load(Relaxed) as f64 / 1000.0;
108            let min = m.envelope_apply_duration_min_us.load(Relaxed) as f64 / 1000.0;
109            let max = m.envelope_apply_duration_max_us.load(Relaxed) as f64 / 1000.0;
110            let avg = sum / ac as f64;
111            eprintln!(
112                "{:<30}avg={:<10.1} min={:<10.1} max={:.1}",
113                "envelope.apply_duration_ms", avg, min, max
114            );
115        }
116
117        eprintln!(
118            "{:<30}{}",
119            "watcher.changes_detected",
120            m.watcher_changes_detected.load(Relaxed)
121        );
122
123        let pc = m.watcher_poll_count.load(Relaxed);
124        if pc > 0 {
125            let sum = m.watcher_poll_sum_us.load(Relaxed) as f64 / 1000.0;
126            let min = m.watcher_poll_min_us.load(Relaxed) as f64 / 1000.0;
127            let max = m.watcher_poll_max_us.load(Relaxed) as f64 / 1000.0;
128            let avg = sum / pc as f64;
129            eprintln!(
130                "{:<30}avg={:<10.1} min={:<10.1} max={:.1}",
131                "watcher.poll_duration_ms", avg, min, max
132            );
133        }
134
135        eprintln!(
136            "{:<30}{}",
137            "broadcast.lag_count",
138            m.broadcast_lag_count.load(Relaxed)
139        );
140
141        eprintln!(
142            "\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}"
143        );
144    }
145}