1use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::monitor::NodeId;
13use super::SyncMode;
14
15pub struct LagMetrics {
17 total_decisions: AtomicU64,
19
20 primary_decisions: AtomicU64,
22
23 standby_decisions: AtomicU64,
25
26 fallback_count: AtomicU64,
28
29 ryw_fallback_count: AtomicU64,
31
32 no_nodes_count: AtomicU64,
34
35 node_stats: DashMap<NodeId, NodeLagStats>,
37
38 sync_mode_stats: DashMap<SyncMode, AtomicU64>,
40
41 decision_times_us: RwLock<Vec<u64>>,
43
44 max_timing_samples: usize,
46
47 started_at: Instant,
49}
50
51impl LagMetrics {
52 pub fn new() -> Self {
54 Self {
55 total_decisions: AtomicU64::new(0),
56 primary_decisions: AtomicU64::new(0),
57 standby_decisions: AtomicU64::new(0),
58 fallback_count: AtomicU64::new(0),
59 ryw_fallback_count: AtomicU64::new(0),
60 no_nodes_count: AtomicU64::new(0),
61 node_stats: DashMap::new(),
62 sync_mode_stats: DashMap::new(),
63 decision_times_us: RwLock::new(Vec::with_capacity(1000)),
64 max_timing_samples: 1000,
65 started_at: Instant::now(),
66 }
67 }
68
69 pub fn record_primary_decision(&self, elapsed: Duration, reason: &str) {
71 self.total_decisions.fetch_add(1, Ordering::Relaxed);
72 self.primary_decisions.fetch_add(1, Ordering::Relaxed);
73
74 if reason.contains("ryw") || reason.contains("RYW") {
76 self.ryw_fallback_count.fetch_add(1, Ordering::Relaxed);
77 } else if reason.contains("fallback") {
78 self.fallback_count.fetch_add(1, Ordering::Relaxed);
79 }
80
81 self.record_timing(elapsed);
82 }
83
84 pub fn record_standby_decision(
86 &self,
87 node_id: &str,
88 sync_mode: SyncMode,
89 lag_ms: u64,
90 elapsed: Duration,
91 ) {
92 self.total_decisions.fetch_add(1, Ordering::Relaxed);
93 self.standby_decisions.fetch_add(1, Ordering::Relaxed);
94
95 self.node_stats
97 .entry(node_id.to_string())
98 .and_modify(|stats| stats.record_decision(lag_ms))
99 .or_insert_with(|| {
100 let mut stats = NodeLagStats::new(sync_mode);
101 stats.record_decision(lag_ms);
102 stats
103 });
104
105 self.sync_mode_stats
107 .entry(sync_mode)
108 .and_modify(|count| {
109 count.fetch_add(1, Ordering::Relaxed);
110 })
111 .or_insert_with(|| AtomicU64::new(1));
112
113 self.record_timing(elapsed);
114 }
115
116 pub fn record_no_nodes(&self, elapsed: Duration) {
118 self.total_decisions.fetch_add(1, Ordering::Relaxed);
119 self.no_nodes_count.fetch_add(1, Ordering::Relaxed);
120 self.record_timing(elapsed);
121 }
122
123 fn record_timing(&self, elapsed: Duration) {
125 let us = elapsed.as_micros() as u64;
126 let mut times = self.decision_times_us.write();
127
128 if times.len() >= self.max_timing_samples {
129 times.drain(0..self.max_timing_samples / 2);
131 }
132 times.push(us);
133 }
134
135 pub fn get_stats(&self) -> LagStatsSnapshot {
137 let total = self.total_decisions.load(Ordering::Relaxed);
138 let primary = self.primary_decisions.load(Ordering::Relaxed);
139 let standby = self.standby_decisions.load(Ordering::Relaxed);
140 let fallback = self.fallback_count.load(Ordering::Relaxed);
141 let ryw_fallback = self.ryw_fallback_count.load(Ordering::Relaxed);
142 let no_nodes = self.no_nodes_count.load(Ordering::Relaxed);
143
144 let times = self.decision_times_us.read();
146 let (avg_time_us, p50_time_us, p99_time_us) = if times.is_empty() {
147 (0, 0, 0)
148 } else {
149 let mut sorted = times.clone();
150 sorted.sort_unstable();
151
152 let avg = sorted.iter().sum::<u64>() / sorted.len() as u64;
153 let p50 = sorted[sorted.len() / 2];
154 let p99_idx = (sorted.len() as f64 * 0.99) as usize;
155 let p99 = sorted.get(p99_idx).copied().unwrap_or(sorted[sorted.len() - 1]);
156
157 (avg, p50, p99)
158 };
159
160 let node_stats: HashMap<_, _> = self
162 .node_stats
163 .iter()
164 .map(|entry| (entry.key().clone(), entry.value().snapshot()))
165 .collect();
166
167 let sync_mode_counts: HashMap<_, _> = self
169 .sync_mode_stats
170 .iter()
171 .map(|entry| (*entry.key(), entry.value().load(Ordering::Relaxed)))
172 .collect();
173
174 LagStatsSnapshot {
175 total_decisions: total,
176 primary_decisions: primary,
177 standby_decisions: standby,
178 fallback_count: fallback,
179 ryw_fallback_count: ryw_fallback,
180 no_nodes_count: no_nodes,
181 avg_decision_time_us: avg_time_us,
182 p50_decision_time_us: p50_time_us,
183 p99_decision_time_us: p99_time_us,
184 node_stats,
185 sync_mode_counts,
186 uptime_secs: self.started_at.elapsed().as_secs(),
187 }
188 }
189
190 pub fn reset(&self) {
192 self.total_decisions.store(0, Ordering::Relaxed);
193 self.primary_decisions.store(0, Ordering::Relaxed);
194 self.standby_decisions.store(0, Ordering::Relaxed);
195 self.fallback_count.store(0, Ordering::Relaxed);
196 self.ryw_fallback_count.store(0, Ordering::Relaxed);
197 self.no_nodes_count.store(0, Ordering::Relaxed);
198 self.node_stats.clear();
199 self.sync_mode_stats.clear();
200 self.decision_times_us.write().clear();
201 }
202}
203
204impl Default for LagMetrics {
205 fn default() -> Self {
206 Self::new()
207 }
208}
209
210impl std::fmt::Debug for LagMetrics {
211 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 f.debug_struct("LagMetrics")
213 .field("total_decisions", &self.total_decisions.load(Ordering::Relaxed))
214 .field("node_count", &self.node_stats.len())
215 .finish()
216 }
217}
218
219pub struct NodeLagStats {
221 sync_mode: SyncMode,
223
224 total_decisions: AtomicU64,
226
227 total_lag_ms: AtomicU64,
229
230 min_lag_ms: AtomicU64,
232
233 max_lag_ms: AtomicU64,
235
236 recent_lags: RwLock<Vec<u64>>,
238}
239
240impl NodeLagStats {
241 fn new(sync_mode: SyncMode) -> Self {
242 Self {
243 sync_mode,
244 total_decisions: AtomicU64::new(0),
245 total_lag_ms: AtomicU64::new(0),
246 min_lag_ms: AtomicU64::new(u64::MAX),
247 max_lag_ms: AtomicU64::new(0),
248 recent_lags: RwLock::new(Vec::with_capacity(100)),
249 }
250 }
251
252 fn record_decision(&mut self, lag_ms: u64) {
253 self.total_decisions.fetch_add(1, Ordering::Relaxed);
254 self.total_lag_ms.fetch_add(lag_ms, Ordering::Relaxed);
255
256 let mut current_min = self.min_lag_ms.load(Ordering::Relaxed);
258 while lag_ms < current_min {
259 match self.min_lag_ms.compare_exchange_weak(
260 current_min,
261 lag_ms,
262 Ordering::Relaxed,
263 Ordering::Relaxed,
264 ) {
265 Ok(_) => break,
266 Err(x) => current_min = x,
267 }
268 }
269
270 let mut current_max = self.max_lag_ms.load(Ordering::Relaxed);
272 while lag_ms > current_max {
273 match self.max_lag_ms.compare_exchange_weak(
274 current_max,
275 lag_ms,
276 Ordering::Relaxed,
277 Ordering::Relaxed,
278 ) {
279 Ok(_) => break,
280 Err(x) => current_max = x,
281 }
282 }
283
284 let mut recent = self.recent_lags.write();
286 if recent.len() >= 100 {
287 recent.remove(0);
288 }
289 recent.push(lag_ms);
290 }
291
292 fn snapshot(&self) -> NodeLagStatsSnapshot {
293 let total = self.total_decisions.load(Ordering::Relaxed);
294 let total_lag = self.total_lag_ms.load(Ordering::Relaxed);
295 let min = self.min_lag_ms.load(Ordering::Relaxed);
296 let max = self.max_lag_ms.load(Ordering::Relaxed);
297
298 let avg = if total > 0 { total_lag / total } else { 0 };
299
300 NodeLagStatsSnapshot {
301 sync_mode: self.sync_mode,
302 total_decisions: total,
303 avg_lag_ms: avg,
304 min_lag_ms: if min == u64::MAX { 0 } else { min },
305 max_lag_ms: max,
306 }
307 }
308}
309
310#[derive(Debug, Clone)]
312pub struct NodeLagStatsSnapshot {
313 pub sync_mode: SyncMode,
315
316 pub total_decisions: u64,
318
319 pub avg_lag_ms: u64,
321
322 pub min_lag_ms: u64,
324
325 pub max_lag_ms: u64,
327}
328
329#[derive(Debug, Clone)]
331pub struct LagStatsSnapshot {
332 pub total_decisions: u64,
334
335 pub primary_decisions: u64,
337
338 pub standby_decisions: u64,
340
341 pub fallback_count: u64,
343
344 pub ryw_fallback_count: u64,
346
347 pub no_nodes_count: u64,
349
350 pub avg_decision_time_us: u64,
352
353 pub p50_decision_time_us: u64,
355
356 pub p99_decision_time_us: u64,
358
359 pub node_stats: HashMap<NodeId, NodeLagStatsSnapshot>,
361
362 pub sync_mode_counts: HashMap<SyncMode, u64>,
364
365 pub uptime_secs: u64,
367}
368
369impl LagStatsSnapshot {
370 pub fn standby_percentage(&self) -> f64 {
372 if self.total_decisions == 0 {
373 return 0.0;
374 }
375 self.standby_decisions as f64 / self.total_decisions as f64 * 100.0
376 }
377
378 pub fn fallback_percentage(&self) -> f64 {
380 if self.total_decisions == 0 {
381 return 0.0;
382 }
383 self.fallback_count as f64 / self.total_decisions as f64 * 100.0
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390
391 #[test]
392 fn test_metrics_creation() {
393 let metrics = LagMetrics::new();
394 let stats = metrics.get_stats();
395
396 assert_eq!(stats.total_decisions, 0);
397 assert_eq!(stats.primary_decisions, 0);
398 assert_eq!(stats.standby_decisions, 0);
399 }
400
401 #[test]
402 fn test_record_primary_decision() {
403 let metrics = LagMetrics::new();
404
405 metrics.record_primary_decision(Duration::from_micros(50), "direct");
406 metrics.record_primary_decision(Duration::from_micros(60), "fallback");
407 metrics.record_primary_decision(Duration::from_micros(70), "ryw fallback");
408
409 let stats = metrics.get_stats();
410 assert_eq!(stats.total_decisions, 3);
411 assert_eq!(stats.primary_decisions, 3);
412 assert_eq!(stats.fallback_count, 1);
413 assert_eq!(stats.ryw_fallback_count, 1);
414 }
415
416 #[test]
417 fn test_record_standby_decision() {
418 let metrics = LagMetrics::new();
419
420 metrics.record_standby_decision("node-1", SyncMode::Sync, 5, Duration::from_micros(30));
421 metrics.record_standby_decision("node-1", SyncMode::Sync, 10, Duration::from_micros(40));
422 metrics.record_standby_decision("node-2", SyncMode::Async, 100, Duration::from_micros(50));
423
424 let stats = metrics.get_stats();
425 assert_eq!(stats.total_decisions, 3);
426 assert_eq!(stats.standby_decisions, 3);
427 assert_eq!(stats.node_stats.len(), 2);
428
429 let node1_stats = stats.node_stats.get("node-1").unwrap();
430 assert_eq!(node1_stats.total_decisions, 2);
431 assert_eq!(node1_stats.min_lag_ms, 5);
432 assert_eq!(node1_stats.max_lag_ms, 10);
433 }
434
435 #[test]
436 fn test_timing_stats() {
437 let metrics = LagMetrics::new();
438
439 for i in 1..=100 {
440 metrics.record_primary_decision(Duration::from_micros(i * 10), "test");
441 }
442
443 let stats = metrics.get_stats();
444 assert!(stats.avg_decision_time_us > 0);
445 assert!(stats.p50_decision_time_us > 0);
446 assert!(stats.p99_decision_time_us >= stats.p50_decision_time_us);
447 }
448
449 #[test]
450 fn test_sync_mode_counts() {
451 let metrics = LagMetrics::new();
452
453 metrics.record_standby_decision("n1", SyncMode::Sync, 5, Duration::from_micros(30));
454 metrics.record_standby_decision("n2", SyncMode::Sync, 5, Duration::from_micros(30));
455 metrics.record_standby_decision("n3", SyncMode::Async, 100, Duration::from_micros(50));
456
457 let stats = metrics.get_stats();
458 assert_eq!(stats.sync_mode_counts.get(&SyncMode::Sync), Some(&2));
459 assert_eq!(stats.sync_mode_counts.get(&SyncMode::Async), Some(&1));
460 }
461
462 #[test]
463 fn test_reset_metrics() {
464 let metrics = LagMetrics::new();
465
466 metrics.record_primary_decision(Duration::from_micros(50), "test");
467 metrics.record_standby_decision("node-1", SyncMode::Async, 100, Duration::from_micros(50));
468
469 assert!(metrics.get_stats().total_decisions > 0);
470
471 metrics.reset();
472
473 let stats = metrics.get_stats();
474 assert_eq!(stats.total_decisions, 0);
475 assert_eq!(stats.node_stats.len(), 0);
476 }
477
478 #[test]
479 fn test_percentages() {
480 let stats = LagStatsSnapshot {
481 total_decisions: 100,
482 primary_decisions: 20,
483 standby_decisions: 80,
484 fallback_count: 10,
485 ryw_fallback_count: 5,
486 no_nodes_count: 0,
487 avg_decision_time_us: 50,
488 p50_decision_time_us: 45,
489 p99_decision_time_us: 100,
490 node_stats: HashMap::new(),
491 sync_mode_counts: HashMap::new(),
492 uptime_secs: 3600,
493 };
494
495 assert!((stats.standby_percentage() - 80.0).abs() < 0.01);
496 assert!((stats.fallback_percentage() - 10.0).abs() < 0.01);
497 }
498}