nodedb_cluster/
loop_metrics.rs1use std::collections::BTreeMap;
41use std::fmt::Write as _;
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::sync::{Arc, Mutex};
44use std::time::Duration;
45
46#[derive(Debug)]
48pub struct LoopMetrics {
49 name: &'static str,
50 iterations_total: AtomicU64,
51 last_iteration_duration_ns: AtomicU64,
52 up: AtomicBool,
53 errors: Mutex<BTreeMap<&'static str, u64>>,
57}
58
59impl LoopMetrics {
60 pub fn new(name: &'static str) -> Arc<Self> {
65 Arc::new(Self {
66 name,
67 iterations_total: AtomicU64::new(0),
68 last_iteration_duration_ns: AtomicU64::new(0),
69 up: AtomicBool::new(false),
70 errors: Mutex::new(BTreeMap::new()),
71 })
72 }
73
74 pub fn name(&self) -> &'static str {
75 self.name
76 }
77
78 pub fn observe(&self, duration: Duration) {
83 let ns = u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX);
84 self.last_iteration_duration_ns.store(ns, Ordering::Relaxed);
85 self.iterations_total.fetch_add(1, Ordering::Relaxed);
86 }
87
88 pub fn record_error(&self, kind: &'static str) {
91 let mut g = self.errors.lock().unwrap_or_else(|p| p.into_inner());
92 *g.entry(kind).or_insert(0) += 1;
93 }
94
95 pub fn set_up(&self, up: bool) {
98 self.up.store(up, Ordering::Relaxed);
99 }
100
101 pub fn iterations_total(&self) -> u64 {
102 self.iterations_total.load(Ordering::Relaxed)
103 }
104
105 pub fn last_iteration_duration(&self) -> Duration {
106 Duration::from_nanos(self.last_iteration_duration_ns.load(Ordering::Relaxed))
107 }
108
109 pub fn is_up(&self) -> bool {
110 self.up.load(Ordering::Relaxed)
111 }
112
113 pub fn errors_snapshot(&self) -> BTreeMap<&'static str, u64> {
114 self.errors
115 .lock()
116 .unwrap_or_else(|p| p.into_inner())
117 .clone()
118 }
119
120 pub fn render_prom(&self, out: &mut String) {
123 let n = self.name;
124
125 let _ = writeln!(
126 out,
127 "# HELP {n}_iterations_total Total completed iterations of the {n} driver."
128 );
129 let _ = writeln!(out, "# TYPE {n}_iterations_total counter");
130 let _ = writeln!(out, "{n}_iterations_total {}", self.iterations_total());
131
132 let _ = writeln!(
133 out,
134 "# HELP {n}_last_iteration_duration_seconds Wall-time of the most recent {n} iteration."
135 );
136 let _ = writeln!(out, "# TYPE {n}_last_iteration_duration_seconds gauge");
137 let _ = writeln!(
138 out,
139 "{n}_last_iteration_duration_seconds {}",
140 self.last_iteration_duration().as_secs_f64()
141 );
142
143 let _ = writeln!(
144 out,
145 "# HELP {n}_up 1 if the {n} driver task is currently running, 0 otherwise."
146 );
147 let _ = writeln!(out, "# TYPE {n}_up gauge");
148 let _ = writeln!(out, "{n}_up {}", if self.is_up() { 1 } else { 0 });
149
150 let _ = writeln!(
151 out,
152 "# HELP {n}_errors_total Errors observed by {n}, by kind."
153 );
154 let _ = writeln!(out, "# TYPE {n}_errors_total counter");
155 let snap = self.errors_snapshot();
156 if snap.is_empty() {
157 let _ = writeln!(out, "{n}_errors_total{{kind=\"none\"}} 0");
160 } else {
161 for (kind, count) in snap {
162 let _ = writeln!(out, "{n}_errors_total{{kind=\"{kind}\"}} {count}");
163 }
164 }
165 out.push('\n');
166 }
167}
168
169#[derive(Debug, Default)]
177pub struct LoopMetricsRegistry {
178 loops: Mutex<Vec<Arc<LoopMetrics>>>,
179}
180
181impl LoopMetricsRegistry {
182 pub fn new() -> Arc<Self> {
183 Arc::new(Self::default())
184 }
185
186 pub fn register(&self, metrics: Arc<LoopMetrics>) {
187 let mut g = self.loops.lock().unwrap_or_else(|p| p.into_inner());
188 if g.iter().any(|m| m.name() == metrics.name()) {
192 return;
193 }
194 g.push(metrics);
195 }
196
197 pub fn render_prom(&self, out: &mut String) {
198 let loops: Vec<Arc<LoopMetrics>> = {
199 let g = self.loops.lock().unwrap_or_else(|p| p.into_inner());
200 g.iter().cloned().collect()
201 };
202 for m in loops {
203 m.render_prom(out);
204 }
205 }
206
207 pub fn loops(&self) -> Vec<Arc<LoopMetrics>> {
210 self.loops
211 .lock()
212 .unwrap_or_else(|p| p.into_inner())
213 .iter()
214 .cloned()
215 .collect()
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222
223 #[test]
224 fn observe_increments_count_and_sets_duration() {
225 let m = LoopMetrics::new("test_loop");
226 assert_eq!(m.iterations_total(), 0);
227 m.observe(Duration::from_millis(10));
228 assert_eq!(m.iterations_total(), 1);
229 assert!(m.last_iteration_duration() >= Duration::from_millis(10));
230 m.observe(Duration::from_millis(20));
231 assert_eq!(m.iterations_total(), 2);
232 }
233
234 #[test]
235 fn record_error_counts_by_kind() {
236 let m = LoopMetrics::new("test_loop");
237 m.record_error("transport");
238 m.record_error("transport");
239 m.record_error("plan");
240 let snap = m.errors_snapshot();
241 assert_eq!(snap.get("transport").copied(), Some(2));
242 assert_eq!(snap.get("plan").copied(), Some(1));
243 }
244
245 #[test]
246 fn up_flag_toggles() {
247 let m = LoopMetrics::new("test_loop");
248 assert!(!m.is_up());
249 m.set_up(true);
250 assert!(m.is_up());
251 m.set_up(false);
252 assert!(!m.is_up());
253 }
254
255 #[test]
256 fn render_prom_emits_all_four_metrics() {
257 let m = LoopMetrics::new("rebalancer_loop");
258 m.observe(Duration::from_millis(5));
259 m.record_error("plan");
260 m.set_up(true);
261 let mut out = String::new();
262 m.render_prom(&mut out);
263 assert!(out.contains("rebalancer_loop_iterations_total 1"));
264 assert!(out.contains("rebalancer_loop_last_iteration_duration_seconds"));
265 assert!(out.contains("rebalancer_loop_up 1"));
266 assert!(out.contains("rebalancer_loop_errors_total{kind=\"plan\"} 1"));
267 }
268
269 #[test]
270 fn render_prom_emits_none_sample_when_no_errors() {
271 let m = LoopMetrics::new("quiet_loop");
272 let mut out = String::new();
273 m.render_prom(&mut out);
274 assert!(out.contains("quiet_loop_errors_total{kind=\"none\"} 0"));
275 }
276
277 #[test]
278 fn registry_renders_every_loop_once() {
279 let reg = LoopMetricsRegistry::new();
280 let a = LoopMetrics::new("a_loop");
281 let b = LoopMetrics::new("b_loop");
282 a.observe(Duration::from_millis(1));
283 b.observe(Duration::from_millis(2));
284 reg.register(Arc::clone(&a));
285 reg.register(Arc::clone(&b));
286 let mut out = String::new();
287 reg.render_prom(&mut out);
288 assert!(out.contains("a_loop_iterations_total 1"));
289 assert!(out.contains("b_loop_iterations_total 1"));
290 }
291
292 #[test]
293 fn registry_register_is_idempotent_by_name() {
294 let reg = LoopMetricsRegistry::new();
295 let m = LoopMetrics::new("dup_loop");
296 reg.register(Arc::clone(&m));
297 reg.register(Arc::clone(&m));
298 assert_eq!(reg.loops().len(), 1);
299 }
300}