nodedb_cluster/
loop_metrics.rs1use std::collections::BTreeMap;
39use std::fmt::Write as _;
40use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
41use std::sync::{Arc, Mutex};
42use std::time::Duration;
43
44#[derive(Debug)]
46pub struct LoopMetrics {
47 name: &'static str,
48 iterations_total: AtomicU64,
49 last_iteration_duration_ns: AtomicU64,
50 up: AtomicBool,
51 errors: Mutex<BTreeMap<&'static str, u64>>,
55}
56
57impl LoopMetrics {
58 pub fn new(name: &'static str) -> Arc<Self> {
63 Arc::new(Self {
64 name,
65 iterations_total: AtomicU64::new(0),
66 last_iteration_duration_ns: AtomicU64::new(0),
67 up: AtomicBool::new(false),
68 errors: Mutex::new(BTreeMap::new()),
69 })
70 }
71
72 pub fn name(&self) -> &'static str {
73 self.name
74 }
75
76 pub fn observe(&self, duration: Duration) {
81 let ns = u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX);
82 self.last_iteration_duration_ns.store(ns, Ordering::Relaxed);
83 self.iterations_total.fetch_add(1, Ordering::Relaxed);
84 }
85
86 pub fn record_error(&self, kind: &'static str) {
89 let mut g = self.errors.lock().unwrap_or_else(|p| p.into_inner());
90 *g.entry(kind).or_insert(0) += 1;
91 }
92
93 pub fn set_up(&self, up: bool) {
96 self.up.store(up, Ordering::Relaxed);
97 }
98
99 pub fn iterations_total(&self) -> u64 {
100 self.iterations_total.load(Ordering::Relaxed)
101 }
102
103 pub fn last_iteration_duration(&self) -> Duration {
104 Duration::from_nanos(self.last_iteration_duration_ns.load(Ordering::Relaxed))
105 }
106
107 pub fn is_up(&self) -> bool {
108 self.up.load(Ordering::Relaxed)
109 }
110
111 pub fn errors_snapshot(&self) -> BTreeMap<&'static str, u64> {
112 self.errors
113 .lock()
114 .unwrap_or_else(|p| p.into_inner())
115 .clone()
116 }
117
118 pub fn render_prom(&self, out: &mut String) {
121 let n = self.name;
122
123 let _ = writeln!(
124 out,
125 "# HELP {n}_iterations_total Total completed iterations of the {n} driver."
126 );
127 let _ = writeln!(out, "# TYPE {n}_iterations_total counter");
128 let _ = writeln!(out, "{n}_iterations_total {}", self.iterations_total());
129
130 let _ = writeln!(
131 out,
132 "# HELP {n}_last_iteration_duration_seconds Wall-time of the most recent {n} iteration."
133 );
134 let _ = writeln!(out, "# TYPE {n}_last_iteration_duration_seconds gauge");
135 let _ = writeln!(
136 out,
137 "{n}_last_iteration_duration_seconds {}",
138 self.last_iteration_duration().as_secs_f64()
139 );
140
141 let _ = writeln!(
142 out,
143 "# HELP {n}_up 1 if the {n} driver task is currently running, 0 otherwise."
144 );
145 let _ = writeln!(out, "# TYPE {n}_up gauge");
146 let _ = writeln!(out, "{n}_up {}", if self.is_up() { 1 } else { 0 });
147
148 let _ = writeln!(
149 out,
150 "# HELP {n}_errors_total Errors observed by {n}, by kind."
151 );
152 let _ = writeln!(out, "# TYPE {n}_errors_total counter");
153 let snap = self.errors_snapshot();
154 if snap.is_empty() {
155 let _ = writeln!(out, "{n}_errors_total{{kind=\"none\"}} 0");
158 } else {
159 for (kind, count) in snap {
160 let _ = writeln!(out, "{n}_errors_total{{kind=\"{kind}\"}} {count}");
161 }
162 }
163 out.push('\n');
164 }
165}
166
167#[derive(Debug, Default)]
175pub struct LoopMetricsRegistry {
176 loops: Mutex<Vec<Arc<LoopMetrics>>>,
177}
178
179impl LoopMetricsRegistry {
180 pub fn new() -> Arc<Self> {
181 Arc::new(Self::default())
182 }
183
184 pub fn register(&self, metrics: Arc<LoopMetrics>) {
185 let mut g = self.loops.lock().unwrap_or_else(|p| p.into_inner());
186 if g.iter().any(|m| m.name() == metrics.name()) {
190 return;
191 }
192 g.push(metrics);
193 }
194
195 pub fn render_prom(&self, out: &mut String) {
196 let loops: Vec<Arc<LoopMetrics>> = {
197 let g = self.loops.lock().unwrap_or_else(|p| p.into_inner());
198 g.iter().cloned().collect()
199 };
200 for m in loops {
201 m.render_prom(out);
202 }
203 }
204
205 pub fn loops(&self) -> Vec<Arc<LoopMetrics>> {
208 self.loops
209 .lock()
210 .unwrap_or_else(|p| p.into_inner())
211 .iter()
212 .cloned()
213 .collect()
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn observe_increments_count_and_sets_duration() {
223 let m = LoopMetrics::new("test_loop");
224 assert_eq!(m.iterations_total(), 0);
225 m.observe(Duration::from_millis(10));
226 assert_eq!(m.iterations_total(), 1);
227 assert!(m.last_iteration_duration() >= Duration::from_millis(10));
228 m.observe(Duration::from_millis(20));
229 assert_eq!(m.iterations_total(), 2);
230 }
231
232 #[test]
233 fn record_error_counts_by_kind() {
234 let m = LoopMetrics::new("test_loop");
235 m.record_error("transport");
236 m.record_error("transport");
237 m.record_error("plan");
238 let snap = m.errors_snapshot();
239 assert_eq!(snap.get("transport").copied(), Some(2));
240 assert_eq!(snap.get("plan").copied(), Some(1));
241 }
242
243 #[test]
244 fn up_flag_toggles() {
245 let m = LoopMetrics::new("test_loop");
246 assert!(!m.is_up());
247 m.set_up(true);
248 assert!(m.is_up());
249 m.set_up(false);
250 assert!(!m.is_up());
251 }
252
253 #[test]
254 fn render_prom_emits_all_four_metrics() {
255 let m = LoopMetrics::new("rebalancer_loop");
256 m.observe(Duration::from_millis(5));
257 m.record_error("plan");
258 m.set_up(true);
259 let mut out = String::new();
260 m.render_prom(&mut out);
261 assert!(out.contains("rebalancer_loop_iterations_total 1"));
262 assert!(out.contains("rebalancer_loop_last_iteration_duration_seconds"));
263 assert!(out.contains("rebalancer_loop_up 1"));
264 assert!(out.contains("rebalancer_loop_errors_total{kind=\"plan\"} 1"));
265 }
266
267 #[test]
268 fn render_prom_emits_none_sample_when_no_errors() {
269 let m = LoopMetrics::new("quiet_loop");
270 let mut out = String::new();
271 m.render_prom(&mut out);
272 assert!(out.contains("quiet_loop_errors_total{kind=\"none\"} 0"));
273 }
274
275 #[test]
276 fn registry_renders_every_loop_once() {
277 let reg = LoopMetricsRegistry::new();
278 let a = LoopMetrics::new("a_loop");
279 let b = LoopMetrics::new("b_loop");
280 a.observe(Duration::from_millis(1));
281 b.observe(Duration::from_millis(2));
282 reg.register(Arc::clone(&a));
283 reg.register(Arc::clone(&b));
284 let mut out = String::new();
285 reg.render_prom(&mut out);
286 assert!(out.contains("a_loop_iterations_total 1"));
287 assert!(out.contains("b_loop_iterations_total 1"));
288 }
289
290 #[test]
291 fn registry_register_is_idempotent_by_name() {
292 let reg = LoopMetricsRegistry::new();
293 let m = LoopMetrics::new("dup_loop");
294 reg.register(Arc::clone(&m));
295 reg.register(Arc::clone(&m));
296 assert_eq!(reg.loops().len(), 1);
297 }
298}