Skip to main content

heliosdb_proxy/lag/
metrics.rs

1//! Lag Routing Metrics
2//!
3//! Metrics and statistics for lag-aware routing decisions.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10use parking_lot::RwLock;
11
12use super::monitor::NodeId;
13use super::SyncMode;
14
15/// Lag metrics collector
16pub struct LagMetrics {
17    /// Total routing decisions
18    total_decisions: AtomicU64,
19
20    /// Decisions that went to primary
21    primary_decisions: AtomicU64,
22
23    /// Decisions that went to standby
24    standby_decisions: AtomicU64,
25
26    /// Fallback to primary due to lag
27    fallback_count: AtomicU64,
28
29    /// RYW-triggered primary routes
30    ryw_fallback_count: AtomicU64,
31
32    /// No eligible nodes
33    no_nodes_count: AtomicU64,
34
35    /// Per-node statistics
36    node_stats: DashMap<NodeId, NodeLagStats>,
37
38    /// Per-sync-mode statistics
39    sync_mode_stats: DashMap<SyncMode, AtomicU64>,
40
41    /// Routing decision timing histogram (microseconds)
42    decision_times_us: RwLock<Vec<u64>>,
43
44    /// Maximum samples to keep for timing
45    max_timing_samples: usize,
46
47    /// Start time for uptime calculation
48    started_at: Instant,
49}
50
51impl LagMetrics {
52    /// Create new metrics collector
53    pub fn new() -> Self {
54        Self {
55            total_decisions: AtomicU64::new(0),
56            primary_decisions: AtomicU64::new(0),
57            standby_decisions: AtomicU64::new(0),
58            fallback_count: AtomicU64::new(0),
59            ryw_fallback_count: AtomicU64::new(0),
60            no_nodes_count: AtomicU64::new(0),
61            node_stats: DashMap::new(),
62            sync_mode_stats: DashMap::new(),
63            decision_times_us: RwLock::new(Vec::with_capacity(1000)),
64            max_timing_samples: 1000,
65            started_at: Instant::now(),
66        }
67    }
68
69    /// Record a routing decision to primary
70    pub fn record_primary_decision(&self, elapsed: Duration, reason: &str) {
71        self.total_decisions.fetch_add(1, Ordering::Relaxed);
72        self.primary_decisions.fetch_add(1, Ordering::Relaxed);
73
74        // Check for ryw first (more specific case) since "ryw fallback" contains "fallback"
75        if reason.contains("ryw") || reason.contains("RYW") {
76            self.ryw_fallback_count.fetch_add(1, Ordering::Relaxed);
77        } else if reason.contains("fallback") {
78            self.fallback_count.fetch_add(1, Ordering::Relaxed);
79        }
80
81        self.record_timing(elapsed);
82    }
83
84    /// Record a routing decision to standby
85    pub fn record_standby_decision(
86        &self,
87        node_id: &str,
88        sync_mode: SyncMode,
89        lag_ms: u64,
90        elapsed: Duration,
91    ) {
92        self.total_decisions.fetch_add(1, Ordering::Relaxed);
93        self.standby_decisions.fetch_add(1, Ordering::Relaxed);
94
95        // Update per-node stats
96        self.node_stats
97            .entry(node_id.to_string())
98            .and_modify(|stats| stats.record_decision(lag_ms))
99            .or_insert_with(|| {
100                let mut stats = NodeLagStats::new(sync_mode);
101                stats.record_decision(lag_ms);
102                stats
103            });
104
105        // Update per-sync-mode stats
106        self.sync_mode_stats
107            .entry(sync_mode)
108            .and_modify(|count| {
109                count.fetch_add(1, Ordering::Relaxed);
110            })
111            .or_insert_with(|| AtomicU64::new(1));
112
113        self.record_timing(elapsed);
114    }
115
116    /// Record no eligible nodes
117    pub fn record_no_nodes(&self, elapsed: Duration) {
118        self.total_decisions.fetch_add(1, Ordering::Relaxed);
119        self.no_nodes_count.fetch_add(1, Ordering::Relaxed);
120        self.record_timing(elapsed);
121    }
122
123    /// Record decision timing
124    fn record_timing(&self, elapsed: Duration) {
125        let us = elapsed.as_micros() as u64;
126        let mut times = self.decision_times_us.write();
127
128        if times.len() >= self.max_timing_samples {
129            // Remove oldest half when full
130            times.drain(0..self.max_timing_samples / 2);
131        }
132        times.push(us);
133    }
134
135    /// Get current statistics snapshot
136    pub fn get_stats(&self) -> LagStatsSnapshot {
137        let total = self.total_decisions.load(Ordering::Relaxed);
138        let primary = self.primary_decisions.load(Ordering::Relaxed);
139        let standby = self.standby_decisions.load(Ordering::Relaxed);
140        let fallback = self.fallback_count.load(Ordering::Relaxed);
141        let ryw_fallback = self.ryw_fallback_count.load(Ordering::Relaxed);
142        let no_nodes = self.no_nodes_count.load(Ordering::Relaxed);
143
144        // Calculate timing stats
145        let times = self.decision_times_us.read();
146        let (avg_time_us, p50_time_us, p99_time_us) = if times.is_empty() {
147            (0, 0, 0)
148        } else {
149            let mut sorted = times.clone();
150            sorted.sort_unstable();
151
152            let avg = sorted.iter().sum::<u64>() / sorted.len() as u64;
153            let p50 = sorted[sorted.len() / 2];
154            let p99_idx = (sorted.len() as f64 * 0.99) as usize;
155            let p99 = sorted.get(p99_idx).copied().unwrap_or(sorted[sorted.len() - 1]);
156
157            (avg, p50, p99)
158        };
159
160        // Collect per-node stats
161        let node_stats: HashMap<_, _> = self
162            .node_stats
163            .iter()
164            .map(|entry| (entry.key().clone(), entry.value().snapshot()))
165            .collect();
166
167        // Collect per-sync-mode stats
168        let sync_mode_counts: HashMap<_, _> = self
169            .sync_mode_stats
170            .iter()
171            .map(|entry| (*entry.key(), entry.value().load(Ordering::Relaxed)))
172            .collect();
173
174        LagStatsSnapshot {
175            total_decisions: total,
176            primary_decisions: primary,
177            standby_decisions: standby,
178            fallback_count: fallback,
179            ryw_fallback_count: ryw_fallback,
180            no_nodes_count: no_nodes,
181            avg_decision_time_us: avg_time_us,
182            p50_decision_time_us: p50_time_us,
183            p99_decision_time_us: p99_time_us,
184            node_stats,
185            sync_mode_counts,
186            uptime_secs: self.started_at.elapsed().as_secs(),
187        }
188    }
189
190    /// Reset all metrics
191    pub fn reset(&self) {
192        self.total_decisions.store(0, Ordering::Relaxed);
193        self.primary_decisions.store(0, Ordering::Relaxed);
194        self.standby_decisions.store(0, Ordering::Relaxed);
195        self.fallback_count.store(0, Ordering::Relaxed);
196        self.ryw_fallback_count.store(0, Ordering::Relaxed);
197        self.no_nodes_count.store(0, Ordering::Relaxed);
198        self.node_stats.clear();
199        self.sync_mode_stats.clear();
200        self.decision_times_us.write().clear();
201    }
202}
203
204impl Default for LagMetrics {
205    fn default() -> Self {
206        Self::new()
207    }
208}
209
210impl std::fmt::Debug for LagMetrics {
211    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212        f.debug_struct("LagMetrics")
213            .field("total_decisions", &self.total_decisions.load(Ordering::Relaxed))
214            .field("node_count", &self.node_stats.len())
215            .finish()
216    }
217}
218
219/// Per-node lag statistics
220pub struct NodeLagStats {
221    /// Sync mode of this node
222    sync_mode: SyncMode,
223
224    /// Total decisions routed to this node
225    total_decisions: AtomicU64,
226
227    /// Sum of lag values (for average calculation)
228    total_lag_ms: AtomicU64,
229
230    /// Minimum observed lag
231    min_lag_ms: AtomicU64,
232
233    /// Maximum observed lag
234    max_lag_ms: AtomicU64,
235
236    /// Recent lag samples
237    recent_lags: RwLock<Vec<u64>>,
238}
239
240impl NodeLagStats {
241    fn new(sync_mode: SyncMode) -> Self {
242        Self {
243            sync_mode,
244            total_decisions: AtomicU64::new(0),
245            total_lag_ms: AtomicU64::new(0),
246            min_lag_ms: AtomicU64::new(u64::MAX),
247            max_lag_ms: AtomicU64::new(0),
248            recent_lags: RwLock::new(Vec::with_capacity(100)),
249        }
250    }
251
252    fn record_decision(&mut self, lag_ms: u64) {
253        self.total_decisions.fetch_add(1, Ordering::Relaxed);
254        self.total_lag_ms.fetch_add(lag_ms, Ordering::Relaxed);
255
256        // Update min
257        let mut current_min = self.min_lag_ms.load(Ordering::Relaxed);
258        while lag_ms < current_min {
259            match self.min_lag_ms.compare_exchange_weak(
260                current_min,
261                lag_ms,
262                Ordering::Relaxed,
263                Ordering::Relaxed,
264            ) {
265                Ok(_) => break,
266                Err(x) => current_min = x,
267            }
268        }
269
270        // Update max
271        let mut current_max = self.max_lag_ms.load(Ordering::Relaxed);
272        while lag_ms > current_max {
273            match self.max_lag_ms.compare_exchange_weak(
274                current_max,
275                lag_ms,
276                Ordering::Relaxed,
277                Ordering::Relaxed,
278            ) {
279                Ok(_) => break,
280                Err(x) => current_max = x,
281            }
282        }
283
284        // Add to recent samples
285        let mut recent = self.recent_lags.write();
286        if recent.len() >= 100 {
287            recent.remove(0);
288        }
289        recent.push(lag_ms);
290    }
291
292    fn snapshot(&self) -> NodeLagStatsSnapshot {
293        let total = self.total_decisions.load(Ordering::Relaxed);
294        let total_lag = self.total_lag_ms.load(Ordering::Relaxed);
295        let min = self.min_lag_ms.load(Ordering::Relaxed);
296        let max = self.max_lag_ms.load(Ordering::Relaxed);
297
298        let avg = if total > 0 { total_lag / total } else { 0 };
299
300        NodeLagStatsSnapshot {
301            sync_mode: self.sync_mode,
302            total_decisions: total,
303            avg_lag_ms: avg,
304            min_lag_ms: if min == u64::MAX { 0 } else { min },
305            max_lag_ms: max,
306        }
307    }
308}
309
310/// Snapshot of node lag statistics
311#[derive(Debug, Clone)]
312pub struct NodeLagStatsSnapshot {
313    /// Node's sync mode
314    pub sync_mode: SyncMode,
315
316    /// Total routing decisions to this node
317    pub total_decisions: u64,
318
319    /// Average lag when routed
320    pub avg_lag_ms: u64,
321
322    /// Minimum observed lag
323    pub min_lag_ms: u64,
324
325    /// Maximum observed lag
326    pub max_lag_ms: u64,
327}
328
329/// Snapshot of overall lag routing statistics
330#[derive(Debug, Clone)]
331pub struct LagStatsSnapshot {
332    /// Total routing decisions made
333    pub total_decisions: u64,
334
335    /// Decisions that went to primary
336    pub primary_decisions: u64,
337
338    /// Decisions that went to standby
339    pub standby_decisions: u64,
340
341    /// Fallback to primary due to lag
342    pub fallback_count: u64,
343
344    /// RYW-triggered primary routes
345    pub ryw_fallback_count: u64,
346
347    /// No eligible nodes found
348    pub no_nodes_count: u64,
349
350    /// Average decision time in microseconds
351    pub avg_decision_time_us: u64,
352
353    /// P50 decision time in microseconds
354    pub p50_decision_time_us: u64,
355
356    /// P99 decision time in microseconds
357    pub p99_decision_time_us: u64,
358
359    /// Per-node statistics
360    pub node_stats: HashMap<NodeId, NodeLagStatsSnapshot>,
361
362    /// Per-sync-mode decision counts
363    pub sync_mode_counts: HashMap<SyncMode, u64>,
364
365    /// Uptime in seconds
366    pub uptime_secs: u64,
367}
368
369impl LagStatsSnapshot {
370    /// Calculate standby routing percentage
371    pub fn standby_percentage(&self) -> f64 {
372        if self.total_decisions == 0 {
373            return 0.0;
374        }
375        self.standby_decisions as f64 / self.total_decisions as f64 * 100.0
376    }
377
378    /// Calculate fallback percentage
379    pub fn fallback_percentage(&self) -> f64 {
380        if self.total_decisions == 0 {
381            return 0.0;
382        }
383        self.fallback_count as f64 / self.total_decisions as f64 * 100.0
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390
391    #[test]
392    fn test_metrics_creation() {
393        let metrics = LagMetrics::new();
394        let stats = metrics.get_stats();
395
396        assert_eq!(stats.total_decisions, 0);
397        assert_eq!(stats.primary_decisions, 0);
398        assert_eq!(stats.standby_decisions, 0);
399    }
400
401    #[test]
402    fn test_record_primary_decision() {
403        let metrics = LagMetrics::new();
404
405        metrics.record_primary_decision(Duration::from_micros(50), "direct");
406        metrics.record_primary_decision(Duration::from_micros(60), "fallback");
407        metrics.record_primary_decision(Duration::from_micros(70), "ryw fallback");
408
409        let stats = metrics.get_stats();
410        assert_eq!(stats.total_decisions, 3);
411        assert_eq!(stats.primary_decisions, 3);
412        assert_eq!(stats.fallback_count, 1);
413        assert_eq!(stats.ryw_fallback_count, 1);
414    }
415
416    #[test]
417    fn test_record_standby_decision() {
418        let metrics = LagMetrics::new();
419
420        metrics.record_standby_decision("node-1", SyncMode::Sync, 5, Duration::from_micros(30));
421        metrics.record_standby_decision("node-1", SyncMode::Sync, 10, Duration::from_micros(40));
422        metrics.record_standby_decision("node-2", SyncMode::Async, 100, Duration::from_micros(50));
423
424        let stats = metrics.get_stats();
425        assert_eq!(stats.total_decisions, 3);
426        assert_eq!(stats.standby_decisions, 3);
427        assert_eq!(stats.node_stats.len(), 2);
428
429        let node1_stats = stats.node_stats.get("node-1").unwrap();
430        assert_eq!(node1_stats.total_decisions, 2);
431        assert_eq!(node1_stats.min_lag_ms, 5);
432        assert_eq!(node1_stats.max_lag_ms, 10);
433    }
434
435    #[test]
436    fn test_timing_stats() {
437        let metrics = LagMetrics::new();
438
439        for i in 1..=100 {
440            metrics.record_primary_decision(Duration::from_micros(i * 10), "test");
441        }
442
443        let stats = metrics.get_stats();
444        assert!(stats.avg_decision_time_us > 0);
445        assert!(stats.p50_decision_time_us > 0);
446        assert!(stats.p99_decision_time_us >= stats.p50_decision_time_us);
447    }
448
449    #[test]
450    fn test_sync_mode_counts() {
451        let metrics = LagMetrics::new();
452
453        metrics.record_standby_decision("n1", SyncMode::Sync, 5, Duration::from_micros(30));
454        metrics.record_standby_decision("n2", SyncMode::Sync, 5, Duration::from_micros(30));
455        metrics.record_standby_decision("n3", SyncMode::Async, 100, Duration::from_micros(50));
456
457        let stats = metrics.get_stats();
458        assert_eq!(stats.sync_mode_counts.get(&SyncMode::Sync), Some(&2));
459        assert_eq!(stats.sync_mode_counts.get(&SyncMode::Async), Some(&1));
460    }
461
462    #[test]
463    fn test_reset_metrics() {
464        let metrics = LagMetrics::new();
465
466        metrics.record_primary_decision(Duration::from_micros(50), "test");
467        metrics.record_standby_decision("node-1", SyncMode::Async, 100, Duration::from_micros(50));
468
469        assert!(metrics.get_stats().total_decisions > 0);
470
471        metrics.reset();
472
473        let stats = metrics.get_stats();
474        assert_eq!(stats.total_decisions, 0);
475        assert_eq!(stats.node_stats.len(), 0);
476    }
477
478    #[test]
479    fn test_percentages() {
480        let stats = LagStatsSnapshot {
481            total_decisions: 100,
482            primary_decisions: 20,
483            standby_decisions: 80,
484            fallback_count: 10,
485            ryw_fallback_count: 5,
486            no_nodes_count: 0,
487            avg_decision_time_us: 50,
488            p50_decision_time_us: 45,
489            p99_decision_time_us: 100,
490            node_stats: HashMap::new(),
491            sync_mode_counts: HashMap::new(),
492            uptime_secs: 3600,
493        };
494
495        assert!((stats.standby_percentage() - 80.0).abs() < 0.01);
496        assert!((stats.fallback_percentage() - 10.0).abs() < 0.01);
497    }
498}