Skip to main content

nodedb_cluster/
loop_metrics.rs

1//! Standardized control-loop metrics.
2//!
3//! Every periodic loop in the cluster (Principle 2.4) exposes the
4//! same four observations:
5//!
6//! - `{loop_name}_iterations_total` — counter, incremented at the end
7//!   of every tick (success or failure).
8//! - `{loop_name}_last_iteration_duration_seconds` — gauge, wall-time
9//!   of the most recent tick.
10//! - `{loop_name}_errors_total{kind}` — counter keyed by error kind.
11//! - `{loop_name}_up` — gauge (0/1), set by the loop's lifecycle
12//!   owner when the driver task spawns/exits.
13//!
14//! Loop-specific gauges (`raft_tick_loop_pending_groups`,
15//! `health_loop_suspect_peers{peer_id}`, etc.) are rendered by the
16//! Prometheus route directly from the owning subsystem — they are
17//! not part of this primitive because their sources are not
18//! uniform.
19//!
20//! # Usage
21//!
22//! A driver owns an `Arc<LoopMetrics>` and registers it with a
23//! cluster-scoped [`LoopMetricsRegistry`] on spawn. Inside the tick
24//! body:
25//!
26//! ```ignore
27//! let t = Instant::now();
28//! match self.sweep().await {
29//!     Ok(()) => {}
30//!     Err(e) => self.metrics.record_error(e.kind_label()),
31//! }
32//! self.metrics.observe(t.elapsed());
33//! ```
34//!
35//! On spawn: `metrics.set_up(true)`. On graceful shutdown:
36//! `metrics.set_up(false)`.
37
38use std::collections::BTreeMap;
39use std::fmt::Write as _;
40use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
41use std::sync::{Arc, Mutex};
42use std::time::Duration;
43
44/// Standardized per-loop observations.
45#[derive(Debug)]
46pub struct LoopMetrics {
47    name: &'static str,
48    iterations_total: AtomicU64,
49    last_iteration_duration_ns: AtomicU64,
50    up: AtomicBool,
51    /// Error counts keyed by short kind label. Labels are caller-
52    /// supplied `&'static str` so the set stays bounded and cardinality
53    /// never explodes — do not pass a formatted error string here.
54    errors: Mutex<BTreeMap<&'static str, u64>>,
55}
56
57impl LoopMetrics {
58    /// Construct a metrics handle for a loop named `name` (e.g.
59    /// `"rebalancer_loop"`). The name appears as the prefix of every
60    /// emitted metric; use `snake_case` and include the `_loop`
61    /// suffix to match the vocabularies.
62    pub fn new(name: &'static str) -> Arc<Self> {
63        Arc::new(Self {
64            name,
65            iterations_total: AtomicU64::new(0),
66            last_iteration_duration_ns: AtomicU64::new(0),
67            up: AtomicBool::new(false),
68            errors: Mutex::new(BTreeMap::new()),
69        })
70    }
71
72    pub fn name(&self) -> &'static str {
73        self.name
74    }
75
76    /// Record a completed tick. Increments `iterations_total` and
77    /// stores the duration for the `last_iteration_duration_seconds`
78    /// gauge. Call regardless of success/failure — errors are tracked
79    /// separately via [`record_error`](Self::record_error).
80    pub fn observe(&self, duration: Duration) {
81        let ns = u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX);
82        self.last_iteration_duration_ns.store(ns, Ordering::Relaxed);
83        self.iterations_total.fetch_add(1, Ordering::Relaxed);
84    }
85
86    /// Increment the counter for an error kind. `kind` must be a short
87    /// bounded label — never a formatted error string.
88    pub fn record_error(&self, kind: &'static str) {
89        let mut g = self.errors.lock().unwrap_or_else(|p| p.into_inner());
90        *g.entry(kind).or_insert(0) += 1;
91    }
92
93    /// Mark the loop up (driver task is running) or down (driver exited
94    /// or not yet spawned).
95    pub fn set_up(&self, up: bool) {
96        self.up.store(up, Ordering::Relaxed);
97    }
98
99    pub fn iterations_total(&self) -> u64 {
100        self.iterations_total.load(Ordering::Relaxed)
101    }
102
103    pub fn last_iteration_duration(&self) -> Duration {
104        Duration::from_nanos(self.last_iteration_duration_ns.load(Ordering::Relaxed))
105    }
106
107    pub fn is_up(&self) -> bool {
108        self.up.load(Ordering::Relaxed)
109    }
110
111    pub fn errors_snapshot(&self) -> BTreeMap<&'static str, u64> {
112        self.errors
113            .lock()
114            .unwrap_or_else(|p| p.into_inner())
115            .clone()
116    }
117
118    /// Append Prometheus-format exposition for this loop to `out`.
119    /// Emits HELP/TYPE headers plus one sample per metric.
120    pub fn render_prom(&self, out: &mut String) {
121        let n = self.name;
122
123        let _ = writeln!(
124            out,
125            "# HELP {n}_iterations_total Total completed iterations of the {n} driver."
126        );
127        let _ = writeln!(out, "# TYPE {n}_iterations_total counter");
128        let _ = writeln!(out, "{n}_iterations_total {}", self.iterations_total());
129
130        let _ = writeln!(
131            out,
132            "# HELP {n}_last_iteration_duration_seconds Wall-time of the most recent {n} iteration."
133        );
134        let _ = writeln!(out, "# TYPE {n}_last_iteration_duration_seconds gauge");
135        let _ = writeln!(
136            out,
137            "{n}_last_iteration_duration_seconds {}",
138            self.last_iteration_duration().as_secs_f64()
139        );
140
141        let _ = writeln!(
142            out,
143            "# HELP {n}_up 1 if the {n} driver task is currently running, 0 otherwise."
144        );
145        let _ = writeln!(out, "# TYPE {n}_up gauge");
146        let _ = writeln!(out, "{n}_up {}", if self.is_up() { 1 } else { 0 });
147
148        let _ = writeln!(
149            out,
150            "# HELP {n}_errors_total Errors observed by {n}, by kind."
151        );
152        let _ = writeln!(out, "# TYPE {n}_errors_total counter");
153        let snap = self.errors_snapshot();
154        if snap.is_empty() {
155            // Emit a zero sample with `kind="none"` so scrapes never
156            // see the series disappear between ticks.
157            let _ = writeln!(out, "{n}_errors_total{{kind=\"none\"}} 0");
158        } else {
159            for (kind, count) in snap {
160                let _ = writeln!(out, "{n}_errors_total{{kind=\"{kind}\"}} {count}");
161            }
162        }
163        out.push('\n');
164    }
165}
166
167/// Collection of [`LoopMetrics`] handles so a single Prometheus render
168/// pass can iterate every registered loop.
169///
170/// Owners (the lifecycle code that spawns drivers) call
171/// [`register`](Self::register) once per driver with the driver's
172/// shared metrics handle. The Prometheus route iterates via
173/// [`render_prom`](Self::render_prom).
174#[derive(Debug, Default)]
175pub struct LoopMetricsRegistry {
176    loops: Mutex<Vec<Arc<LoopMetrics>>>,
177}
178
179impl LoopMetricsRegistry {
180    pub fn new() -> Arc<Self> {
181        Arc::new(Self::default())
182    }
183
184    pub fn register(&self, metrics: Arc<LoopMetrics>) {
185        let mut g = self.loops.lock().unwrap_or_else(|p| p.into_inner());
186        // Idempotent: a driver that respawns shouldn't duplicate its
187        // entry. Match on the interned name — there is exactly one
188        // `LoopMetrics` per loop in a process.
189        if g.iter().any(|m| m.name() == metrics.name()) {
190            return;
191        }
192        g.push(metrics);
193    }
194
195    pub fn render_prom(&self, out: &mut String) {
196        let loops: Vec<Arc<LoopMetrics>> = {
197            let g = self.loops.lock().unwrap_or_else(|p| p.into_inner());
198            g.iter().cloned().collect()
199        };
200        for m in loops {
201            m.render_prom(out);
202        }
203    }
204
205    /// Return every registered loop handle. Used by tests and by the
206    /// `/cluster/debug/transport` style inspectors.
207    pub fn loops(&self) -> Vec<Arc<LoopMetrics>> {
208        self.loops
209            .lock()
210            .unwrap_or_else(|p| p.into_inner())
211            .iter()
212            .cloned()
213            .collect()
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn observe_increments_count_and_sets_duration() {
223        let m = LoopMetrics::new("test_loop");
224        assert_eq!(m.iterations_total(), 0);
225        m.observe(Duration::from_millis(10));
226        assert_eq!(m.iterations_total(), 1);
227        assert!(m.last_iteration_duration() >= Duration::from_millis(10));
228        m.observe(Duration::from_millis(20));
229        assert_eq!(m.iterations_total(), 2);
230    }
231
232    #[test]
233    fn record_error_counts_by_kind() {
234        let m = LoopMetrics::new("test_loop");
235        m.record_error("transport");
236        m.record_error("transport");
237        m.record_error("plan");
238        let snap = m.errors_snapshot();
239        assert_eq!(snap.get("transport").copied(), Some(2));
240        assert_eq!(snap.get("plan").copied(), Some(1));
241    }
242
243    #[test]
244    fn up_flag_toggles() {
245        let m = LoopMetrics::new("test_loop");
246        assert!(!m.is_up());
247        m.set_up(true);
248        assert!(m.is_up());
249        m.set_up(false);
250        assert!(!m.is_up());
251    }
252
253    #[test]
254    fn render_prom_emits_all_four_metrics() {
255        let m = LoopMetrics::new("rebalancer_loop");
256        m.observe(Duration::from_millis(5));
257        m.record_error("plan");
258        m.set_up(true);
259        let mut out = String::new();
260        m.render_prom(&mut out);
261        assert!(out.contains("rebalancer_loop_iterations_total 1"));
262        assert!(out.contains("rebalancer_loop_last_iteration_duration_seconds"));
263        assert!(out.contains("rebalancer_loop_up 1"));
264        assert!(out.contains("rebalancer_loop_errors_total{kind=\"plan\"} 1"));
265    }
266
267    #[test]
268    fn render_prom_emits_none_sample_when_no_errors() {
269        let m = LoopMetrics::new("quiet_loop");
270        let mut out = String::new();
271        m.render_prom(&mut out);
272        assert!(out.contains("quiet_loop_errors_total{kind=\"none\"} 0"));
273    }
274
275    #[test]
276    fn registry_renders_every_loop_once() {
277        let reg = LoopMetricsRegistry::new();
278        let a = LoopMetrics::new("a_loop");
279        let b = LoopMetrics::new("b_loop");
280        a.observe(Duration::from_millis(1));
281        b.observe(Duration::from_millis(2));
282        reg.register(Arc::clone(&a));
283        reg.register(Arc::clone(&b));
284        let mut out = String::new();
285        reg.render_prom(&mut out);
286        assert!(out.contains("a_loop_iterations_total 1"));
287        assert!(out.contains("b_loop_iterations_total 1"));
288    }
289
290    #[test]
291    fn registry_register_is_idempotent_by_name() {
292        let reg = LoopMetricsRegistry::new();
293        let m = LoopMetrics::new("dup_loop");
294        reg.register(Arc::clone(&m));
295        reg.register(Arc::clone(&m));
296        assert_eq!(reg.loops().len(), 1);
297    }
298}