Skip to main content

nodedb_cluster/
loop_metrics.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Standardized control-loop metrics.
4//!
5//! Every periodic loop in the cluster (Principle 2.4) exposes the
6//! same four observations:
7//!
8//! - `{loop_name}_iterations_total` — counter, incremented at the end
9//!   of every tick (success or failure).
10//! - `{loop_name}_last_iteration_duration_seconds` — gauge, wall-time
11//!   of the most recent tick.
12//! - `{loop_name}_errors_total{kind}` — counter keyed by error kind.
13//! - `{loop_name}_up` — gauge (0/1), set by the loop's lifecycle
14//!   owner when the driver task spawns/exits.
15//!
16//! Loop-specific gauges (`raft_tick_loop_pending_groups`,
17//! `health_loop_suspect_peers{peer_id}`, etc.) are rendered by the
18//! Prometheus route directly from the owning subsystem — they are
19//! not part of this primitive because their sources are not
20//! uniform.
21//!
22//! # Usage
23//!
24//! A driver owns an `Arc<LoopMetrics>` and registers it with a
25//! cluster-scoped [`LoopMetricsRegistry`] on spawn. Inside the tick
26//! body:
27//!
28//! ```ignore
29//! let t = Instant::now();
30//! match self.sweep().await {
31//!     Ok(()) => {}
32//!     Err(e) => self.metrics.record_error(e.kind_label()),
33//! }
34//! self.metrics.observe(t.elapsed());
35//! ```
36//!
37//! On spawn: `metrics.set_up(true)`. On graceful shutdown:
38//! `metrics.set_up(false)`.
39
40use std::collections::BTreeMap;
41use std::fmt::Write as _;
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::sync::{Arc, Mutex};
44use std::time::Duration;
45
46/// Standardized per-loop observations.
47#[derive(Debug)]
48pub struct LoopMetrics {
49    name: &'static str,
50    iterations_total: AtomicU64,
51    last_iteration_duration_ns: AtomicU64,
52    up: AtomicBool,
53    /// Error counts keyed by short kind label. Labels are caller-
54    /// supplied `&'static str` so the set stays bounded and cardinality
55    /// never explodes — do not pass a formatted error string here.
56    errors: Mutex<BTreeMap<&'static str, u64>>,
57}
58
59impl LoopMetrics {
60    /// Construct a metrics handle for a loop named `name` (e.g.
61    /// `"rebalancer_loop"`). The name appears as the prefix of every
62    /// emitted metric; use `snake_case` and include the `_loop`
63    /// suffix to match the vocabularies.
64    pub fn new(name: &'static str) -> Arc<Self> {
65        Arc::new(Self {
66            name,
67            iterations_total: AtomicU64::new(0),
68            last_iteration_duration_ns: AtomicU64::new(0),
69            up: AtomicBool::new(false),
70            errors: Mutex::new(BTreeMap::new()),
71        })
72    }
73
74    pub fn name(&self) -> &'static str {
75        self.name
76    }
77
78    /// Record a completed tick. Increments `iterations_total` and
79    /// stores the duration for the `last_iteration_duration_seconds`
80    /// gauge. Call regardless of success/failure — errors are tracked
81    /// separately via [`record_error`](Self::record_error).
82    pub fn observe(&self, duration: Duration) {
83        let ns = u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX);
84        self.last_iteration_duration_ns.store(ns, Ordering::Relaxed);
85        self.iterations_total.fetch_add(1, Ordering::Relaxed);
86    }
87
88    /// Increment the counter for an error kind. `kind` must be a short
89    /// bounded label — never a formatted error string.
90    pub fn record_error(&self, kind: &'static str) {
91        let mut g = self.errors.lock().unwrap_or_else(|p| p.into_inner());
92        *g.entry(kind).or_insert(0) += 1;
93    }
94
95    /// Mark the loop up (driver task is running) or down (driver exited
96    /// or not yet spawned).
97    pub fn set_up(&self, up: bool) {
98        self.up.store(up, Ordering::Relaxed);
99    }
100
101    pub fn iterations_total(&self) -> u64 {
102        self.iterations_total.load(Ordering::Relaxed)
103    }
104
105    pub fn last_iteration_duration(&self) -> Duration {
106        Duration::from_nanos(self.last_iteration_duration_ns.load(Ordering::Relaxed))
107    }
108
109    pub fn is_up(&self) -> bool {
110        self.up.load(Ordering::Relaxed)
111    }
112
113    pub fn errors_snapshot(&self) -> BTreeMap<&'static str, u64> {
114        self.errors
115            .lock()
116            .unwrap_or_else(|p| p.into_inner())
117            .clone()
118    }
119
120    /// Append Prometheus-format exposition for this loop to `out`.
121    /// Emits HELP/TYPE headers plus one sample per metric.
122    pub fn render_prom(&self, out: &mut String) {
123        let n = self.name;
124
125        let _ = writeln!(
126            out,
127            "# HELP {n}_iterations_total Total completed iterations of the {n} driver."
128        );
129        let _ = writeln!(out, "# TYPE {n}_iterations_total counter");
130        let _ = writeln!(out, "{n}_iterations_total {}", self.iterations_total());
131
132        let _ = writeln!(
133            out,
134            "# HELP {n}_last_iteration_duration_seconds Wall-time of the most recent {n} iteration."
135        );
136        let _ = writeln!(out, "# TYPE {n}_last_iteration_duration_seconds gauge");
137        let _ = writeln!(
138            out,
139            "{n}_last_iteration_duration_seconds {}",
140            self.last_iteration_duration().as_secs_f64()
141        );
142
143        let _ = writeln!(
144            out,
145            "# HELP {n}_up 1 if the {n} driver task is currently running, 0 otherwise."
146        );
147        let _ = writeln!(out, "# TYPE {n}_up gauge");
148        let _ = writeln!(out, "{n}_up {}", if self.is_up() { 1 } else { 0 });
149
150        let _ = writeln!(
151            out,
152            "# HELP {n}_errors_total Errors observed by {n}, by kind."
153        );
154        let _ = writeln!(out, "# TYPE {n}_errors_total counter");
155        let snap = self.errors_snapshot();
156        if snap.is_empty() {
157            // Emit a zero sample with `kind="none"` so scrapes never
158            // see the series disappear between ticks.
159            let _ = writeln!(out, "{n}_errors_total{{kind=\"none\"}} 0");
160        } else {
161            for (kind, count) in snap {
162                let _ = writeln!(out, "{n}_errors_total{{kind=\"{kind}\"}} {count}");
163            }
164        }
165        out.push('\n');
166    }
167}
168
169/// Collection of [`LoopMetrics`] handles so a single Prometheus render
170/// pass can iterate every registered loop.
171///
172/// Owners (the lifecycle code that spawns drivers) call
173/// [`register`](Self::register) once per driver with the driver's
174/// shared metrics handle. The Prometheus route iterates via
175/// [`render_prom`](Self::render_prom).
176#[derive(Debug, Default)]
177pub struct LoopMetricsRegistry {
178    loops: Mutex<Vec<Arc<LoopMetrics>>>,
179}
180
181impl LoopMetricsRegistry {
182    pub fn new() -> Arc<Self> {
183        Arc::new(Self::default())
184    }
185
186    pub fn register(&self, metrics: Arc<LoopMetrics>) {
187        let mut g = self.loops.lock().unwrap_or_else(|p| p.into_inner());
188        // Idempotent: a driver that respawns shouldn't duplicate its
189        // entry. Match on the interned name — there is exactly one
190        // `LoopMetrics` per loop in a process.
191        if g.iter().any(|m| m.name() == metrics.name()) {
192            return;
193        }
194        g.push(metrics);
195    }
196
197    pub fn render_prom(&self, out: &mut String) {
198        let loops: Vec<Arc<LoopMetrics>> = {
199            let g = self.loops.lock().unwrap_or_else(|p| p.into_inner());
200            g.iter().cloned().collect()
201        };
202        for m in loops {
203            m.render_prom(out);
204        }
205    }
206
207    /// Return every registered loop handle. Used by tests and by the
208    /// `/cluster/debug/transport` style inspectors.
209    pub fn loops(&self) -> Vec<Arc<LoopMetrics>> {
210        self.loops
211            .lock()
212            .unwrap_or_else(|p| p.into_inner())
213            .iter()
214            .cloned()
215            .collect()
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222
223    #[test]
224    fn observe_increments_count_and_sets_duration() {
225        let m = LoopMetrics::new("test_loop");
226        assert_eq!(m.iterations_total(), 0);
227        m.observe(Duration::from_millis(10));
228        assert_eq!(m.iterations_total(), 1);
229        assert!(m.last_iteration_duration() >= Duration::from_millis(10));
230        m.observe(Duration::from_millis(20));
231        assert_eq!(m.iterations_total(), 2);
232    }
233
234    #[test]
235    fn record_error_counts_by_kind() {
236        let m = LoopMetrics::new("test_loop");
237        m.record_error("transport");
238        m.record_error("transport");
239        m.record_error("plan");
240        let snap = m.errors_snapshot();
241        assert_eq!(snap.get("transport").copied(), Some(2));
242        assert_eq!(snap.get("plan").copied(), Some(1));
243    }
244
245    #[test]
246    fn up_flag_toggles() {
247        let m = LoopMetrics::new("test_loop");
248        assert!(!m.is_up());
249        m.set_up(true);
250        assert!(m.is_up());
251        m.set_up(false);
252        assert!(!m.is_up());
253    }
254
255    #[test]
256    fn render_prom_emits_all_four_metrics() {
257        let m = LoopMetrics::new("rebalancer_loop");
258        m.observe(Duration::from_millis(5));
259        m.record_error("plan");
260        m.set_up(true);
261        let mut out = String::new();
262        m.render_prom(&mut out);
263        assert!(out.contains("rebalancer_loop_iterations_total 1"));
264        assert!(out.contains("rebalancer_loop_last_iteration_duration_seconds"));
265        assert!(out.contains("rebalancer_loop_up 1"));
266        assert!(out.contains("rebalancer_loop_errors_total{kind=\"plan\"} 1"));
267    }
268
269    #[test]
270    fn render_prom_emits_none_sample_when_no_errors() {
271        let m = LoopMetrics::new("quiet_loop");
272        let mut out = String::new();
273        m.render_prom(&mut out);
274        assert!(out.contains("quiet_loop_errors_total{kind=\"none\"} 0"));
275    }
276
277    #[test]
278    fn registry_renders_every_loop_once() {
279        let reg = LoopMetricsRegistry::new();
280        let a = LoopMetrics::new("a_loop");
281        let b = LoopMetrics::new("b_loop");
282        a.observe(Duration::from_millis(1));
283        b.observe(Duration::from_millis(2));
284        reg.register(Arc::clone(&a));
285        reg.register(Arc::clone(&b));
286        let mut out = String::new();
287        reg.render_prom(&mut out);
288        assert!(out.contains("a_loop_iterations_total 1"));
289        assert!(out.contains("b_loop_iterations_total 1"));
290    }
291
292    #[test]
293    fn registry_register_is_idempotent_by_name() {
294        let reg = LoopMetricsRegistry::new();
295        let m = LoopMetrics::new("dup_loop");
296        reg.register(Arc::clone(&m));
297        reg.register(Arc::clone(&m));
298        assert_eq!(reg.loops().len(), 1);
299    }
300}