Skip to main content

heliosdb_proxy/lag/
monitor.rs

1//! Lag Monitor - Continuous replication lag tracking
2//!
3//! Monitors replication lag across all standbys in real-time,
4//! providing data for lag-aware routing decisions.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use dashmap::DashMap;
12use parking_lot::RwLock;
13
14use super::config::LagRoutingConfig;
15use super::SyncMode;
16
17/// Unique identifier for a node
18pub type NodeId = String;
19
20/// Lag trend indicating whether lag is improving, stable, or degrading
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
22pub enum LagTrend {
23    /// Lag is decreasing
24    Improving,
25    /// Lag is stable within tolerance
26    Stable,
27    /// Lag is increasing
28    Degrading,
29    /// Not enough samples to determine trend
30    #[default]
31    Unknown,
32}
33
34impl std::fmt::Display for LagTrend {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            LagTrend::Improving => write!(f, "improving"),
38            LagTrend::Stable => write!(f, "stable"),
39            LagTrend::Degrading => write!(f, "degrading"),
40            LagTrend::Unknown => write!(f, "unknown"),
41        }
42    }
43}
44
45/// Lag information for a single node
46#[derive(Debug, Clone)]
47pub struct LagInfo {
48    /// Current LSN (Log Sequence Number) on this node
49    pub current_lsn: u64,
50
51    /// Lag in bytes (LSN difference from primary)
52    pub lag_bytes: u64,
53
54    /// Estimated lag in time
55    pub lag_time: Duration,
56
57    /// When this info was last updated
58    pub updated_at: Instant,
59
60    /// Lag trend (improving, stable, degrading)
61    pub trend: LagTrend,
62
63    /// Node's sync mode
64    pub sync_mode: SyncMode,
65
66    /// Whether the node is considered healthy based on lag
67    pub healthy: bool,
68}
69
70impl Default for LagInfo {
71    fn default() -> Self {
72        Self {
73            current_lsn: 0,
74            lag_bytes: 0,
75            lag_time: Duration::ZERO,
76            updated_at: Instant::now(),
77            trend: LagTrend::Unknown,
78            sync_mode: SyncMode::Unknown,
79            healthy: true,
80        }
81    }
82}
83
84impl LagInfo {
85    /// Check if this lag info is stale (not updated recently)
86    pub fn is_stale(&self, max_age: Duration) -> bool {
87        self.updated_at.elapsed() > max_age
88    }
89
90    /// Check if this node meets the freshness requirement
91    pub fn meets_freshness(&self, max_lag: Duration) -> bool {
92        self.healthy && self.lag_time <= max_lag
93    }
94
95    /// Check if this node has reached a specific LSN
96    pub fn has_reached_lsn(&self, required_lsn: u64) -> bool {
97        self.current_lsn >= required_lsn
98    }
99}
100
101/// Internal tracking data for a node
102#[derive(Debug)]
103pub struct NodeLagData {
104    /// Current lag info
105    pub info: LagInfo,
106
107    /// Recent lag samples for trend calculation
108    lag_history: VecDeque<u64>,
109
110    /// Smoothing window size
111    window_size: usize,
112}
113
114impl NodeLagData {
115    fn new(window_size: usize) -> Self {
116        Self {
117            info: LagInfo::default(),
118            lag_history: VecDeque::with_capacity(window_size),
119            window_size,
120        }
121    }
122
123    fn add_sample(&mut self, lag_bytes: u64) {
124        if self.lag_history.len() >= self.window_size {
125            self.lag_history.pop_front();
126        }
127        self.lag_history.push_back(lag_bytes);
128    }
129
130    fn calculate_trend(&self) -> LagTrend {
131        if self.lag_history.len() < 3 {
132            return LagTrend::Unknown;
133        }
134
135        let recent: Vec<_> = self.lag_history.iter().rev().take(3).collect();
136        let oldest = *recent[2];
137        let middle = *recent[1];
138        let newest = *recent[0];
139
140        // Calculate trend based on recent samples
141        let threshold = (oldest as f64 * 0.1) as u64; // 10% threshold
142
143        if newest + threshold < oldest && newest + threshold < middle {
144            LagTrend::Improving
145        } else if newest > oldest + threshold && newest > middle + threshold {
146            LagTrend::Degrading
147        } else {
148            LagTrend::Stable
149        }
150    }
151
152    fn get_smoothed_lag(&self) -> u64 {
153        if self.lag_history.is_empty() {
154            return self.info.lag_bytes;
155        }
156
157        // Use exponential moving average for smoothing
158        let alpha = 0.3;
159        let mut ema = self.lag_history[0] as f64;
160
161        for &sample in self.lag_history.iter().skip(1) {
162            ema = alpha * sample as f64 + (1.0 - alpha) * ema;
163        }
164
165        ema as u64
166    }
167}
168
169/// Lag Monitor - tracks replication lag across all nodes
170pub struct LagMonitor {
171    /// Current lag data for each node
172    node_lags: DashMap<NodeId, NodeLagData>,
173
174    /// Current LSN on primary
175    primary_lsn: AtomicU64,
176
177    /// Primary node ID
178    primary_id: RwLock<Option<NodeId>>,
179
180    /// Configuration
181    config: LagRoutingConfig,
182
183    /// Whether monitoring is running
184    running: AtomicBool,
185
186    /// Last update time for primary LSN
187    primary_updated_at: RwLock<Instant>,
188}
189
190impl LagMonitor {
191    /// Create a new lag monitor
192    pub fn new(config: LagRoutingConfig) -> Self {
193        Self {
194            node_lags: DashMap::new(),
195            primary_lsn: AtomicU64::new(0),
196            primary_id: RwLock::new(None),
197            config,
198            running: AtomicBool::new(false),
199            primary_updated_at: RwLock::new(Instant::now()),
200        }
201    }
202
203    /// Create with default config
204    pub fn with_defaults() -> Self {
205        Self::new(LagRoutingConfig::default())
206    }
207
208    /// Check if monitoring is running
209    pub fn is_running(&self) -> bool {
210        self.running.load(Ordering::Relaxed)
211    }
212
213    /// Start monitoring (sets running flag)
214    pub fn start(&self) {
215        self.running.store(true, Ordering::Relaxed);
216    }
217
218    /// Stop monitoring
219    pub fn stop(&self) {
220        self.running.store(false, Ordering::Relaxed);
221    }
222
223    /// Set the primary node ID
224    pub fn set_primary(&self, node_id: &str) {
225        *self.primary_id.write() = Some(node_id.to_string());
226    }
227
228    /// Get the primary node ID
229    pub fn get_primary(&self) -> Option<NodeId> {
230        self.primary_id.read().clone()
231    }
232
233    /// Update primary LSN
234    pub fn update_primary_lsn(&self, lsn: u64) {
235        self.primary_lsn.store(lsn, Ordering::SeqCst);
236        *self.primary_updated_at.write() = Instant::now();
237    }
238
239    /// Get current primary LSN
240    pub fn get_primary_lsn(&self) -> u64 {
241        self.primary_lsn.load(Ordering::SeqCst)
242    }
243
244    /// Register a standby node
245    pub fn register_standby(&self, node_id: &str, sync_mode: SyncMode) {
246        let mut data = NodeLagData::new(self.config.smoothing_window);
247        data.info.sync_mode = sync_mode;
248        self.node_lags.insert(node_id.to_string(), data);
249    }
250
251    /// Remove a node from monitoring
252    pub fn remove_node(&self, node_id: &str) {
253        self.node_lags.remove(node_id);
254    }
255
256    /// Update lag info for a standby node
257    pub fn update_standby_lag(&self, node_id: &str, current_lsn: u64, time_lag: Option<Duration>) {
258        let primary_lsn = self.primary_lsn.load(Ordering::SeqCst);
259        let lag_bytes = primary_lsn.saturating_sub(current_lsn);
260
261        // Calculate lag time using configured method
262        let lag_time = self
263            .config
264            .lag_calculation
265            .calculate_lag(lag_bytes, time_lag);
266
267        // Determine if node is healthy
268        let healthy = lag_time <= self.config.stale_threshold;
269
270        self.node_lags
271            .entry(node_id.to_string())
272            .and_modify(|data| {
273                // Add sample for trend calculation
274                data.add_sample(lag_bytes);
275
276                // Calculate trend
277                let trend = if self.config.enable_smoothing {
278                    data.calculate_trend()
279                } else {
280                    LagTrend::Unknown
281                };
282
283                // Get smoothed lag if enabled
284                let effective_lag_bytes = if self.config.enable_smoothing {
285                    data.get_smoothed_lag()
286                } else {
287                    lag_bytes
288                };
289
290                let effective_lag_time = self
291                    .config
292                    .lag_calculation
293                    .calculate_lag(effective_lag_bytes, time_lag);
294
295                // Update lag info
296                data.info = LagInfo {
297                    current_lsn,
298                    lag_bytes: effective_lag_bytes,
299                    lag_time: effective_lag_time,
300                    updated_at: Instant::now(),
301                    trend,
302                    sync_mode: data.info.sync_mode,
303                    healthy,
304                };
305            })
306            .or_insert_with(|| {
307                let mut data = NodeLagData::new(self.config.smoothing_window);
308                data.info = LagInfo {
309                    current_lsn,
310                    lag_bytes,
311                    lag_time,
312                    updated_at: Instant::now(),
313                    trend: LagTrend::Unknown,
314                    sync_mode: SyncMode::Unknown,
315                    healthy,
316                };
317                data
318            });
319    }
320
321    /// Get lag info for a specific node
322    pub fn get_lag(&self, node_id: &str) -> Option<LagInfo> {
323        self.node_lags.get(node_id).map(|data| data.info.clone())
324    }
325
326    /// Get all current lag info
327    pub fn get_all_lags(&self) -> Vec<(NodeId, LagInfo)> {
328        self.node_lags
329            .iter()
330            .map(|entry| (entry.key().clone(), entry.value().info.clone()))
331            .collect()
332    }
333
334    /// Get nodes that meet freshness requirement
335    pub fn get_fresh_nodes(&self, max_lag: Duration) -> Vec<NodeId> {
336        let stale_threshold = self.config.poll_interval * 3;
337
338        self.node_lags
339            .iter()
340            .filter(|entry| {
341                let info = &entry.value().info;
342                !info.is_stale(stale_threshold) && info.meets_freshness(max_lag)
343            })
344            .map(|entry| entry.key().clone())
345            .collect()
346    }
347
348    /// Get nodes that have reached a specific LSN
349    pub fn get_nodes_at_lsn(&self, required_lsn: u64) -> Vec<NodeId> {
350        let stale_threshold = self.config.poll_interval * 3;
351
352        self.node_lags
353            .iter()
354            .filter(|entry| {
355                let info = &entry.value().info;
356                !info.is_stale(stale_threshold) && info.has_reached_lsn(required_lsn)
357            })
358            .map(|entry| entry.key().clone())
359            .collect()
360    }
361
362    /// Check if a node has reached a specific LSN
363    pub fn has_reached_lsn(&self, node_id: &str, required_lsn: u64) -> bool {
364        self.node_lags
365            .get(node_id)
366            .map(|data| data.info.has_reached_lsn(required_lsn))
367            .unwrap_or(false)
368    }
369
370    /// Get healthy nodes (lag below stale threshold)
371    pub fn get_healthy_nodes(&self) -> Vec<NodeId> {
372        self.node_lags
373            .iter()
374            .filter(|entry| entry.value().info.healthy)
375            .map(|entry| entry.key().clone())
376            .collect()
377    }
378
379    /// Get nodes by sync mode
380    pub fn get_nodes_by_sync_mode(&self, mode: SyncMode) -> Vec<NodeId> {
381        self.node_lags
382            .iter()
383            .filter(|entry| entry.value().info.sync_mode == mode)
384            .map(|entry| entry.key().clone())
385            .collect()
386    }
387
388    /// Get the freshest standby (lowest lag)
389    pub fn get_freshest_standby(&self) -> Option<(NodeId, LagInfo)> {
390        let stale_threshold = self.config.poll_interval * 3;
391
392        self.node_lags
393            .iter()
394            .filter(|entry| {
395                let info = &entry.value().info;
396                info.healthy && !info.is_stale(stale_threshold)
397            })
398            .min_by_key(|entry| entry.value().info.lag_time)
399            .map(|entry| (entry.key().clone(), entry.value().info.clone()))
400    }
401
402    /// Get node count
403    pub fn node_count(&self) -> usize {
404        self.node_lags.len()
405    }
406
407    /// Clear all lag data
408    pub fn clear(&self) {
409        self.node_lags.clear();
410        self.primary_lsn.store(0, Ordering::SeqCst);
411    }
412}
413
414impl std::fmt::Debug for LagMonitor {
415    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416        f.debug_struct("LagMonitor")
417            .field("primary_lsn", &self.primary_lsn.load(Ordering::Relaxed))
418            .field("node_count", &self.node_lags.len())
419            .field("running", &self.running.load(Ordering::Relaxed))
420            .finish()
421    }
422}
423
424// Thread-safe wrapper for use with Arc
425impl LagMonitor {
426    /// Create an Arc-wrapped instance
427    pub fn arc(config: LagRoutingConfig) -> Arc<Self> {
428        Arc::new(Self::new(config))
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use crate::lag::LagCalculation;
436
437    #[test]
438    fn test_lag_info_default() {
439        let info = LagInfo::default();
440        assert_eq!(info.current_lsn, 0);
441        assert_eq!(info.lag_bytes, 0);
442        assert!(info.healthy);
443    }
444
445    #[test]
446    fn test_lag_info_meets_freshness() {
447        let mut info = LagInfo::default();
448        info.lag_time = Duration::from_millis(100);
449
450        assert!(info.meets_freshness(Duration::from_millis(200)));
451        assert!(info.meets_freshness(Duration::from_millis(100)));
452        assert!(!info.meets_freshness(Duration::from_millis(50)));
453    }
454
455    #[test]
456    fn test_lag_info_has_reached_lsn() {
457        let mut info = LagInfo::default();
458        info.current_lsn = 1000;
459
460        assert!(info.has_reached_lsn(500));
461        assert!(info.has_reached_lsn(1000));
462        assert!(!info.has_reached_lsn(1001));
463    }
464
465    #[test]
466    fn test_lag_monitor_creation() {
467        let monitor = LagMonitor::with_defaults();
468        assert!(!monitor.is_running());
469        assert_eq!(monitor.node_count(), 0);
470    }
471
472    #[test]
473    fn test_lag_monitor_primary_lsn() {
474        let monitor = LagMonitor::with_defaults();
475        monitor.update_primary_lsn(1000);
476        assert_eq!(monitor.get_primary_lsn(), 1000);
477    }
478
479    #[test]
480    fn test_lag_monitor_register_standby() {
481        let monitor = LagMonitor::with_defaults();
482        monitor.register_standby("standby-1", SyncMode::Sync);
483        monitor.register_standby("standby-2", SyncMode::Async);
484
485        assert_eq!(monitor.node_count(), 2);
486        assert_eq!(monitor.get_nodes_by_sync_mode(SyncMode::Sync).len(), 1);
487        assert_eq!(monitor.get_nodes_by_sync_mode(SyncMode::Async).len(), 1);
488    }
489
490    #[test]
491    fn test_lag_monitor_update_lag() {
492        let monitor = LagMonitor::with_defaults();
493        monitor.update_primary_lsn(1000);
494        monitor.register_standby("standby-1", SyncMode::Async);
495        monitor.update_standby_lag("standby-1", 990, Some(Duration::from_millis(50)));
496
497        let lag = monitor.get_lag("standby-1").unwrap();
498        assert_eq!(lag.current_lsn, 990);
499        assert!(lag.lag_bytes > 0);
500    }
501
502    #[test]
503    fn test_lag_monitor_fresh_nodes() {
504        let monitor = LagMonitor::with_defaults();
505        monitor.update_primary_lsn(1000);
506
507        monitor.register_standby("fresh", SyncMode::Sync);
508        monitor.update_standby_lag("fresh", 999, Some(Duration::from_millis(10)));
509
510        monitor.register_standby("stale", SyncMode::Async);
511        monitor.update_standby_lag("stale", 500, Some(Duration::from_secs(5)));
512
513        let fresh = monitor.get_fresh_nodes(Duration::from_millis(100));
514        assert!(fresh.contains(&"fresh".to_string()));
515        assert!(!fresh.contains(&"stale".to_string()));
516    }
517
518    #[test]
519    fn test_lag_monitor_lsn_check() {
520        let monitor = LagMonitor::with_defaults();
521        monitor.update_primary_lsn(1000);
522        monitor.register_standby("standby-1", SyncMode::Async);
523        monitor.update_standby_lag("standby-1", 900, None);
524
525        assert!(monitor.has_reached_lsn("standby-1", 800));
526        assert!(monitor.has_reached_lsn("standby-1", 900));
527        assert!(!monitor.has_reached_lsn("standby-1", 901));
528    }
529
530    #[test]
531    fn test_lag_monitor_freshest_standby() {
532        let config = LagRoutingConfig::new().with_lag_calculation(LagCalculation::time());
533        let monitor = LagMonitor::new(config);
534        monitor.update_primary_lsn(1000);
535
536        monitor.register_standby("slow", SyncMode::Async);
537        monitor.update_standby_lag("slow", 900, Some(Duration::from_millis(500)));
538
539        monitor.register_standby("fast", SyncMode::Sync);
540        monitor.update_standby_lag("fast", 999, Some(Duration::from_millis(10)));
541
542        let (node_id, _) = monitor.get_freshest_standby().unwrap();
543        assert_eq!(node_id, "fast");
544    }
545
546    #[test]
547    fn test_node_lag_data_trend() {
548        let mut data = NodeLagData::new(10);
549
550        // Add improving samples (decreasing lag)
551        data.add_sample(1000);
552        data.add_sample(800);
553        data.add_sample(600);
554
555        assert_eq!(data.calculate_trend(), LagTrend::Improving);
556
557        // Add degrading samples (increasing lag)
558        data.add_sample(700);
559        data.add_sample(900);
560        data.add_sample(1100);
561
562        assert_eq!(data.calculate_trend(), LagTrend::Degrading);
563    }
564
565    #[test]
566    fn test_lag_trend_display() {
567        assert_eq!(LagTrend::Improving.to_string(), "improving");
568        assert_eq!(LagTrend::Stable.to_string(), "stable");
569        assert_eq!(LagTrend::Degrading.to_string(), "degrading");
570        assert_eq!(LagTrend::Unknown.to_string(), "unknown");
571    }
572}