1use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
2use std::sync::OnceLock;
3
4use opentelemetry::trace::TracerProvider;
5use tracing_opentelemetry::OpenTelemetryLayer;
6use tracing_subscriber::layer::SubscriberExt;
7use tracing_subscriber::util::SubscriberInitExt;
8use tracing_subscriber::EnvFilter;
9
10pub struct Metrics {
12 pub envelope_apply_count: AtomicU64,
13 envelope_apply_duration_sum_us: AtomicU64,
14 envelope_apply_duration_min_us: AtomicU64,
15 envelope_apply_duration_max_us: AtomicU64,
16 pub watcher_changes_detected: AtomicU64,
17 watcher_poll_count: AtomicU64,
18 watcher_poll_sum_us: AtomicU64,
19 watcher_poll_min_us: AtomicU64,
20 watcher_poll_max_us: AtomicU64,
21 pub broadcast_lag_count: AtomicU64,
22}
23
24static METRICS: OnceLock<Metrics> = OnceLock::new();
25
26impl Metrics {
27 pub fn get() -> &'static Metrics {
29 METRICS.get().expect("telemetry not initialised")
30 }
31
32 fn new() -> Self {
33 Self {
34 envelope_apply_count: AtomicU64::new(0),
35 envelope_apply_duration_sum_us: AtomicU64::new(0),
36 envelope_apply_duration_min_us: AtomicU64::new(u64::MAX),
37 envelope_apply_duration_max_us: AtomicU64::new(0),
38 watcher_changes_detected: AtomicU64::new(0),
39 watcher_poll_count: AtomicU64::new(0),
40 watcher_poll_sum_us: AtomicU64::new(0),
41 watcher_poll_min_us: AtomicU64::new(u64::MAX),
42 watcher_poll_max_us: AtomicU64::new(0),
43 broadcast_lag_count: AtomicU64::new(0),
44 }
45 }
46
47 pub fn record_apply(&self, duration_ms: f64) {
49 self.envelope_apply_count.fetch_add(1, Relaxed);
50 let us = (duration_ms * 1000.0) as u64;
51 self.envelope_apply_duration_sum_us.fetch_add(us, Relaxed);
52 self.envelope_apply_duration_min_us.fetch_min(us, Relaxed);
53 self.envelope_apply_duration_max_us.fetch_max(us, Relaxed);
54 }
55
56 pub fn record_poll(&self, duration_ms: f64) {
58 self.watcher_poll_count.fetch_add(1, Relaxed);
59 let us = (duration_ms * 1000.0) as u64;
60 self.watcher_poll_sum_us.fetch_add(us, Relaxed);
61 self.watcher_poll_min_us.fetch_min(us, Relaxed);
62 self.watcher_poll_max_us.fetch_max(us, Relaxed);
63 }
64}
65
66pub struct TelemetryGuard;
68
69pub fn init() -> TelemetryGuard {
71 METRICS.get_or_init(Metrics::new);
72
73 let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder().build();
74 let tracer = tracer_provider.tracer("aap");
75 let otel_layer = OpenTelemetryLayer::new(tracer);
76
77 let env_filter = EnvFilter::try_from_default_env()
78 .unwrap_or_else(|_| EnvFilter::new("aap=info"));
79
80 tracing_subscriber::registry()
81 .with(env_filter)
82 .with(
83 tracing_subscriber::fmt::layer()
84 .compact()
85 .with_writer(std::io::stderr),
86 )
87 .with(otel_layer)
88 .init();
89
90 TelemetryGuard
91}
92
93impl TelemetryGuard {
94 pub fn shutdown(self) {
96 let m = Metrics::get();
97
98 eprintln!();
99 eprintln!(
100 "\u{2500}\u{2500} Metrics Summary \u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}"
101 );
102
103 let ac = m.envelope_apply_count.load(Relaxed);
104 eprintln!("{:<30}{}", "envelope.apply_count", ac);
105
106 if ac > 0 {
107 let sum = m.envelope_apply_duration_sum_us.load(Relaxed) as f64 / 1000.0;
108 let min = m.envelope_apply_duration_min_us.load(Relaxed) as f64 / 1000.0;
109 let max = m.envelope_apply_duration_max_us.load(Relaxed) as f64 / 1000.0;
110 let avg = sum / ac as f64;
111 eprintln!(
112 "{:<30}avg={:<10.1} min={:<10.1} max={:.1}",
113 "envelope.apply_duration_ms", avg, min, max
114 );
115 }
116
117 eprintln!(
118 "{:<30}{}",
119 "watcher.changes_detected",
120 m.watcher_changes_detected.load(Relaxed)
121 );
122
123 let pc = m.watcher_poll_count.load(Relaxed);
124 if pc > 0 {
125 let sum = m.watcher_poll_sum_us.load(Relaxed) as f64 / 1000.0;
126 let min = m.watcher_poll_min_us.load(Relaxed) as f64 / 1000.0;
127 let max = m.watcher_poll_max_us.load(Relaxed) as f64 / 1000.0;
128 let avg = sum / pc as f64;
129 eprintln!(
130 "{:<30}avg={:<10.1} min={:<10.1} max={:.1}",
131 "watcher.poll_duration_ms", avg, min, max
132 );
133 }
134
135 eprintln!(
136 "{:<30}{}",
137 "broadcast.lag_count",
138 m.broadcast_lag_count.load(Relaxed)
139 );
140
141 eprintln!(
142 "\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}\u{2500}"
143 );
144 }
145}