Skip to main content

heliosdb_proxy/lag/
monitor.rs

1//! Lag Monitor - Continuous replication lag tracking
2//!
3//! Monitors replication lag across all standbys in real-time,
4//! providing data for lag-aware routing decisions.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use dashmap::DashMap;
12use parking_lot::RwLock;
13
14use super::config::{LagCalculation, LagRoutingConfig};
15use super::SyncMode;
16
17/// Unique identifier for a node
18pub type NodeId = String;
19
20/// Lag trend indicating whether lag is improving, stable, or degrading
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum LagTrend {
23    /// Lag is decreasing
24    Improving,
25    /// Lag is stable within tolerance
26    Stable,
27    /// Lag is increasing
28    Degrading,
29    /// Not enough samples to determine trend
30    Unknown,
31}
32
33impl Default for LagTrend {
34    fn default() -> Self {
35        Self::Unknown
36    }
37}
38
39impl std::fmt::Display for LagTrend {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            LagTrend::Improving => write!(f, "improving"),
43            LagTrend::Stable => write!(f, "stable"),
44            LagTrend::Degrading => write!(f, "degrading"),
45            LagTrend::Unknown => write!(f, "unknown"),
46        }
47    }
48}
49
50/// Lag information for a single node
51#[derive(Debug, Clone)]
52pub struct LagInfo {
53    /// Current LSN (Log Sequence Number) on this node
54    pub current_lsn: u64,
55
56    /// Lag in bytes (LSN difference from primary)
57    pub lag_bytes: u64,
58
59    /// Estimated lag in time
60    pub lag_time: Duration,
61
62    /// When this info was last updated
63    pub updated_at: Instant,
64
65    /// Lag trend (improving, stable, degrading)
66    pub trend: LagTrend,
67
68    /// Node's sync mode
69    pub sync_mode: SyncMode,
70
71    /// Whether the node is considered healthy based on lag
72    pub healthy: bool,
73}
74
75impl Default for LagInfo {
76    fn default() -> Self {
77        Self {
78            current_lsn: 0,
79            lag_bytes: 0,
80            lag_time: Duration::ZERO,
81            updated_at: Instant::now(),
82            trend: LagTrend::Unknown,
83            sync_mode: SyncMode::Unknown,
84            healthy: true,
85        }
86    }
87}
88
89impl LagInfo {
90    /// Check if this lag info is stale (not updated recently)
91    pub fn is_stale(&self, max_age: Duration) -> bool {
92        self.updated_at.elapsed() > max_age
93    }
94
95    /// Check if this node meets the freshness requirement
96    pub fn meets_freshness(&self, max_lag: Duration) -> bool {
97        self.healthy && self.lag_time <= max_lag
98    }
99
100    /// Check if this node has reached a specific LSN
101    pub fn has_reached_lsn(&self, required_lsn: u64) -> bool {
102        self.current_lsn >= required_lsn
103    }
104}
105
106/// Internal tracking data for a node
107#[derive(Debug)]
108pub struct NodeLagData {
109    /// Current lag info
110    pub info: LagInfo,
111
112    /// Recent lag samples for trend calculation
113    lag_history: VecDeque<u64>,
114
115    /// Smoothing window size
116    window_size: usize,
117}
118
119impl NodeLagData {
120    fn new(window_size: usize) -> Self {
121        Self {
122            info: LagInfo::default(),
123            lag_history: VecDeque::with_capacity(window_size),
124            window_size,
125        }
126    }
127
128    fn add_sample(&mut self, lag_bytes: u64) {
129        if self.lag_history.len() >= self.window_size {
130            self.lag_history.pop_front();
131        }
132        self.lag_history.push_back(lag_bytes);
133    }
134
135    fn calculate_trend(&self) -> LagTrend {
136        if self.lag_history.len() < 3 {
137            return LagTrend::Unknown;
138        }
139
140        let recent: Vec<_> = self.lag_history.iter().rev().take(3).collect();
141        let oldest = *recent[2];
142        let middle = *recent[1];
143        let newest = *recent[0];
144
145        // Calculate trend based on recent samples
146        let threshold = (oldest as f64 * 0.1) as u64; // 10% threshold
147
148        if newest + threshold < oldest && newest + threshold < middle {
149            LagTrend::Improving
150        } else if newest > oldest + threshold && newest > middle + threshold {
151            LagTrend::Degrading
152        } else {
153            LagTrend::Stable
154        }
155    }
156
157    fn get_smoothed_lag(&self) -> u64 {
158        if self.lag_history.is_empty() {
159            return self.info.lag_bytes;
160        }
161
162        // Use exponential moving average for smoothing
163        let alpha = 0.3;
164        let mut ema = self.lag_history[0] as f64;
165
166        for &sample in self.lag_history.iter().skip(1) {
167            ema = alpha * sample as f64 + (1.0 - alpha) * ema;
168        }
169
170        ema as u64
171    }
172}
173
174/// Lag Monitor - tracks replication lag across all nodes
175pub struct LagMonitor {
176    /// Current lag data for each node
177    node_lags: DashMap<NodeId, NodeLagData>,
178
179    /// Current LSN on primary
180    primary_lsn: AtomicU64,
181
182    /// Primary node ID
183    primary_id: RwLock<Option<NodeId>>,
184
185    /// Configuration
186    config: LagRoutingConfig,
187
188    /// Whether monitoring is running
189    running: AtomicBool,
190
191    /// Last update time for primary LSN
192    primary_updated_at: RwLock<Instant>,
193}
194
195impl LagMonitor {
196    /// Create a new lag monitor
197    pub fn new(config: LagRoutingConfig) -> Self {
198        Self {
199            node_lags: DashMap::new(),
200            primary_lsn: AtomicU64::new(0),
201            primary_id: RwLock::new(None),
202            config,
203            running: AtomicBool::new(false),
204            primary_updated_at: RwLock::new(Instant::now()),
205        }
206    }
207
208    /// Create with default config
209    pub fn with_defaults() -> Self {
210        Self::new(LagRoutingConfig::default())
211    }
212
213    /// Check if monitoring is running
214    pub fn is_running(&self) -> bool {
215        self.running.load(Ordering::Relaxed)
216    }
217
218    /// Start monitoring (sets running flag)
219    pub fn start(&self) {
220        self.running.store(true, Ordering::Relaxed);
221    }
222
223    /// Stop monitoring
224    pub fn stop(&self) {
225        self.running.store(false, Ordering::Relaxed);
226    }
227
228    /// Set the primary node ID
229    pub fn set_primary(&self, node_id: &str) {
230        *self.primary_id.write() = Some(node_id.to_string());
231    }
232
233    /// Get the primary node ID
234    pub fn get_primary(&self) -> Option<NodeId> {
235        self.primary_id.read().clone()
236    }
237
238    /// Update primary LSN
239    pub fn update_primary_lsn(&self, lsn: u64) {
240        self.primary_lsn.store(lsn, Ordering::SeqCst);
241        *self.primary_updated_at.write() = Instant::now();
242    }
243
244    /// Get current primary LSN
245    pub fn get_primary_lsn(&self) -> u64 {
246        self.primary_lsn.load(Ordering::SeqCst)
247    }
248
249    /// Register a standby node
250    pub fn register_standby(&self, node_id: &str, sync_mode: SyncMode) {
251        let mut data = NodeLagData::new(self.config.smoothing_window);
252        data.info.sync_mode = sync_mode;
253        self.node_lags.insert(node_id.to_string(), data);
254    }
255
256    /// Remove a node from monitoring
257    pub fn remove_node(&self, node_id: &str) {
258        self.node_lags.remove(node_id);
259    }
260
261    /// Update lag info for a standby node
262    pub fn update_standby_lag(
263        &self,
264        node_id: &str,
265        current_lsn: u64,
266        time_lag: Option<Duration>,
267    ) {
268        let primary_lsn = self.primary_lsn.load(Ordering::SeqCst);
269        let lag_bytes = primary_lsn.saturating_sub(current_lsn);
270
271        // Calculate lag time using configured method
272        let lag_time = self.config.lag_calculation.calculate_lag(lag_bytes, time_lag);
273
274        // Determine if node is healthy
275        let healthy = lag_time <= self.config.stale_threshold;
276
277        self.node_lags
278            .entry(node_id.to_string())
279            .and_modify(|data| {
280                // Add sample for trend calculation
281                data.add_sample(lag_bytes);
282
283                // Calculate trend
284                let trend = if self.config.enable_smoothing {
285                    data.calculate_trend()
286                } else {
287                    LagTrend::Unknown
288                };
289
290                // Get smoothed lag if enabled
291                let effective_lag_bytes = if self.config.enable_smoothing {
292                    data.get_smoothed_lag()
293                } else {
294                    lag_bytes
295                };
296
297                let effective_lag_time = self.config.lag_calculation.calculate_lag(
298                    effective_lag_bytes,
299                    time_lag,
300                );
301
302                // Update lag info
303                data.info = LagInfo {
304                    current_lsn,
305                    lag_bytes: effective_lag_bytes,
306                    lag_time: effective_lag_time,
307                    updated_at: Instant::now(),
308                    trend,
309                    sync_mode: data.info.sync_mode,
310                    healthy,
311                };
312            })
313            .or_insert_with(|| {
314                let mut data = NodeLagData::new(self.config.smoothing_window);
315                data.info = LagInfo {
316                    current_lsn,
317                    lag_bytes,
318                    lag_time,
319                    updated_at: Instant::now(),
320                    trend: LagTrend::Unknown,
321                    sync_mode: SyncMode::Unknown,
322                    healthy,
323                };
324                data
325            });
326    }
327
328    /// Get lag info for a specific node
329    pub fn get_lag(&self, node_id: &str) -> Option<LagInfo> {
330        self.node_lags.get(node_id).map(|data| data.info.clone())
331    }
332
333    /// Get all current lag info
334    pub fn get_all_lags(&self) -> Vec<(NodeId, LagInfo)> {
335        self.node_lags
336            .iter()
337            .map(|entry| (entry.key().clone(), entry.value().info.clone()))
338            .collect()
339    }
340
341    /// Get nodes that meet freshness requirement
342    pub fn get_fresh_nodes(&self, max_lag: Duration) -> Vec<NodeId> {
343        let stale_threshold = self.config.poll_interval * 3;
344
345        self.node_lags
346            .iter()
347            .filter(|entry| {
348                let info = &entry.value().info;
349                !info.is_stale(stale_threshold) && info.meets_freshness(max_lag)
350            })
351            .map(|entry| entry.key().clone())
352            .collect()
353    }
354
355    /// Get nodes that have reached a specific LSN
356    pub fn get_nodes_at_lsn(&self, required_lsn: u64) -> Vec<NodeId> {
357        let stale_threshold = self.config.poll_interval * 3;
358
359        self.node_lags
360            .iter()
361            .filter(|entry| {
362                let info = &entry.value().info;
363                !info.is_stale(stale_threshold) && info.has_reached_lsn(required_lsn)
364            })
365            .map(|entry| entry.key().clone())
366            .collect()
367    }
368
369    /// Check if a node has reached a specific LSN
370    pub fn has_reached_lsn(&self, node_id: &str, required_lsn: u64) -> bool {
371        self.node_lags
372            .get(node_id)
373            .map(|data| data.info.has_reached_lsn(required_lsn))
374            .unwrap_or(false)
375    }
376
377    /// Get healthy nodes (lag below stale threshold)
378    pub fn get_healthy_nodes(&self) -> Vec<NodeId> {
379        self.node_lags
380            .iter()
381            .filter(|entry| entry.value().info.healthy)
382            .map(|entry| entry.key().clone())
383            .collect()
384    }
385
386    /// Get nodes by sync mode
387    pub fn get_nodes_by_sync_mode(&self, mode: SyncMode) -> Vec<NodeId> {
388        self.node_lags
389            .iter()
390            .filter(|entry| entry.value().info.sync_mode == mode)
391            .map(|entry| entry.key().clone())
392            .collect()
393    }
394
395    /// Get the freshest standby (lowest lag)
396    pub fn get_freshest_standby(&self) -> Option<(NodeId, LagInfo)> {
397        let stale_threshold = self.config.poll_interval * 3;
398
399        self.node_lags
400            .iter()
401            .filter(|entry| {
402                let info = &entry.value().info;
403                info.healthy && !info.is_stale(stale_threshold)
404            })
405            .min_by_key(|entry| entry.value().info.lag_time)
406            .map(|entry| (entry.key().clone(), entry.value().info.clone()))
407    }
408
409    /// Get node count
410    pub fn node_count(&self) -> usize {
411        self.node_lags.len()
412    }
413
414    /// Clear all lag data
415    pub fn clear(&self) {
416        self.node_lags.clear();
417        self.primary_lsn.store(0, Ordering::SeqCst);
418    }
419}
420
421impl std::fmt::Debug for LagMonitor {
422    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
423        f.debug_struct("LagMonitor")
424            .field("primary_lsn", &self.primary_lsn.load(Ordering::Relaxed))
425            .field("node_count", &self.node_lags.len())
426            .field("running", &self.running.load(Ordering::Relaxed))
427            .finish()
428    }
429}
430
431// Thread-safe wrapper for use with Arc
432impl LagMonitor {
433    /// Create an Arc-wrapped instance
434    pub fn arc(config: LagRoutingConfig) -> Arc<Self> {
435        Arc::new(Self::new(config))
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[test]
444    fn test_lag_info_default() {
445        let info = LagInfo::default();
446        assert_eq!(info.current_lsn, 0);
447        assert_eq!(info.lag_bytes, 0);
448        assert!(info.healthy);
449    }
450
451    #[test]
452    fn test_lag_info_meets_freshness() {
453        let mut info = LagInfo::default();
454        info.lag_time = Duration::from_millis(100);
455
456        assert!(info.meets_freshness(Duration::from_millis(200)));
457        assert!(info.meets_freshness(Duration::from_millis(100)));
458        assert!(!info.meets_freshness(Duration::from_millis(50)));
459    }
460
461    #[test]
462    fn test_lag_info_has_reached_lsn() {
463        let mut info = LagInfo::default();
464        info.current_lsn = 1000;
465
466        assert!(info.has_reached_lsn(500));
467        assert!(info.has_reached_lsn(1000));
468        assert!(!info.has_reached_lsn(1001));
469    }
470
471    #[test]
472    fn test_lag_monitor_creation() {
473        let monitor = LagMonitor::with_defaults();
474        assert!(!monitor.is_running());
475        assert_eq!(monitor.node_count(), 0);
476    }
477
478    #[test]
479    fn test_lag_monitor_primary_lsn() {
480        let monitor = LagMonitor::with_defaults();
481        monitor.update_primary_lsn(1000);
482        assert_eq!(monitor.get_primary_lsn(), 1000);
483    }
484
485    #[test]
486    fn test_lag_monitor_register_standby() {
487        let monitor = LagMonitor::with_defaults();
488        monitor.register_standby("standby-1", SyncMode::Sync);
489        monitor.register_standby("standby-2", SyncMode::Async);
490
491        assert_eq!(monitor.node_count(), 2);
492        assert_eq!(monitor.get_nodes_by_sync_mode(SyncMode::Sync).len(), 1);
493        assert_eq!(monitor.get_nodes_by_sync_mode(SyncMode::Async).len(), 1);
494    }
495
496    #[test]
497    fn test_lag_monitor_update_lag() {
498        let monitor = LagMonitor::with_defaults();
499        monitor.update_primary_lsn(1000);
500        monitor.register_standby("standby-1", SyncMode::Async);
501        monitor.update_standby_lag("standby-1", 990, Some(Duration::from_millis(50)));
502
503        let lag = monitor.get_lag("standby-1").unwrap();
504        assert_eq!(lag.current_lsn, 990);
505        assert!(lag.lag_bytes > 0);
506    }
507
508    #[test]
509    fn test_lag_monitor_fresh_nodes() {
510        let monitor = LagMonitor::with_defaults();
511        monitor.update_primary_lsn(1000);
512
513        monitor.register_standby("fresh", SyncMode::Sync);
514        monitor.update_standby_lag("fresh", 999, Some(Duration::from_millis(10)));
515
516        monitor.register_standby("stale", SyncMode::Async);
517        monitor.update_standby_lag("stale", 500, Some(Duration::from_secs(5)));
518
519        let fresh = monitor.get_fresh_nodes(Duration::from_millis(100));
520        assert!(fresh.contains(&"fresh".to_string()));
521        assert!(!fresh.contains(&"stale".to_string()));
522    }
523
524    #[test]
525    fn test_lag_monitor_lsn_check() {
526        let monitor = LagMonitor::with_defaults();
527        monitor.update_primary_lsn(1000);
528        monitor.register_standby("standby-1", SyncMode::Async);
529        monitor.update_standby_lag("standby-1", 900, None);
530
531        assert!(monitor.has_reached_lsn("standby-1", 800));
532        assert!(monitor.has_reached_lsn("standby-1", 900));
533        assert!(!monitor.has_reached_lsn("standby-1", 901));
534    }
535
536    #[test]
537    fn test_lag_monitor_freshest_standby() {
538        let config = LagRoutingConfig::new()
539            .with_lag_calculation(LagCalculation::time());
540        let monitor = LagMonitor::new(config);
541        monitor.update_primary_lsn(1000);
542
543        monitor.register_standby("slow", SyncMode::Async);
544        monitor.update_standby_lag("slow", 900, Some(Duration::from_millis(500)));
545
546        monitor.register_standby("fast", SyncMode::Sync);
547        monitor.update_standby_lag("fast", 999, Some(Duration::from_millis(10)));
548
549        let (node_id, _) = monitor.get_freshest_standby().unwrap();
550        assert_eq!(node_id, "fast");
551    }
552
553    #[test]
554    fn test_node_lag_data_trend() {
555        let mut data = NodeLagData::new(10);
556
557        // Add improving samples (decreasing lag)
558        data.add_sample(1000);
559        data.add_sample(800);
560        data.add_sample(600);
561
562        assert_eq!(data.calculate_trend(), LagTrend::Improving);
563
564        // Add degrading samples (increasing lag)
565        data.add_sample(700);
566        data.add_sample(900);
567        data.add_sample(1100);
568
569        assert_eq!(data.calculate_trend(), LagTrend::Degrading);
570    }
571
572    #[test]
573    fn test_lag_trend_display() {
574        assert_eq!(LagTrend::Improving.to_string(), "improving");
575        assert_eq!(LagTrend::Stable.to_string(), "stable");
576        assert_eq!(LagTrend::Degrading.to_string(), "degrading");
577        assert_eq!(LagTrend::Unknown.to_string(), "unknown");
578    }
579}