1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use dashmap::DashMap;
12use parking_lot::RwLock;
13
14use super::config::{LagCalculation, LagRoutingConfig};
15use super::SyncMode;
16
17pub type NodeId = String;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum LagTrend {
23 Improving,
25 Stable,
27 Degrading,
29 Unknown,
31}
32
33impl Default for LagTrend {
34 fn default() -> Self {
35 Self::Unknown
36 }
37}
38
39impl std::fmt::Display for LagTrend {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 LagTrend::Improving => write!(f, "improving"),
43 LagTrend::Stable => write!(f, "stable"),
44 LagTrend::Degrading => write!(f, "degrading"),
45 LagTrend::Unknown => write!(f, "unknown"),
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct LagInfo {
53 pub current_lsn: u64,
55
56 pub lag_bytes: u64,
58
59 pub lag_time: Duration,
61
62 pub updated_at: Instant,
64
65 pub trend: LagTrend,
67
68 pub sync_mode: SyncMode,
70
71 pub healthy: bool,
73}
74
75impl Default for LagInfo {
76 fn default() -> Self {
77 Self {
78 current_lsn: 0,
79 lag_bytes: 0,
80 lag_time: Duration::ZERO,
81 updated_at: Instant::now(),
82 trend: LagTrend::Unknown,
83 sync_mode: SyncMode::Unknown,
84 healthy: true,
85 }
86 }
87}
88
89impl LagInfo {
90 pub fn is_stale(&self, max_age: Duration) -> bool {
92 self.updated_at.elapsed() > max_age
93 }
94
95 pub fn meets_freshness(&self, max_lag: Duration) -> bool {
97 self.healthy && self.lag_time <= max_lag
98 }
99
100 pub fn has_reached_lsn(&self, required_lsn: u64) -> bool {
102 self.current_lsn >= required_lsn
103 }
104}
105
106#[derive(Debug)]
108pub struct NodeLagData {
109 pub info: LagInfo,
111
112 lag_history: VecDeque<u64>,
114
115 window_size: usize,
117}
118
119impl NodeLagData {
120 fn new(window_size: usize) -> Self {
121 Self {
122 info: LagInfo::default(),
123 lag_history: VecDeque::with_capacity(window_size),
124 window_size,
125 }
126 }
127
128 fn add_sample(&mut self, lag_bytes: u64) {
129 if self.lag_history.len() >= self.window_size {
130 self.lag_history.pop_front();
131 }
132 self.lag_history.push_back(lag_bytes);
133 }
134
135 fn calculate_trend(&self) -> LagTrend {
136 if self.lag_history.len() < 3 {
137 return LagTrend::Unknown;
138 }
139
140 let recent: Vec<_> = self.lag_history.iter().rev().take(3).collect();
141 let oldest = *recent[2];
142 let middle = *recent[1];
143 let newest = *recent[0];
144
145 let threshold = (oldest as f64 * 0.1) as u64; if newest + threshold < oldest && newest + threshold < middle {
149 LagTrend::Improving
150 } else if newest > oldest + threshold && newest > middle + threshold {
151 LagTrend::Degrading
152 } else {
153 LagTrend::Stable
154 }
155 }
156
157 fn get_smoothed_lag(&self) -> u64 {
158 if self.lag_history.is_empty() {
159 return self.info.lag_bytes;
160 }
161
162 let alpha = 0.3;
164 let mut ema = self.lag_history[0] as f64;
165
166 for &sample in self.lag_history.iter().skip(1) {
167 ema = alpha * sample as f64 + (1.0 - alpha) * ema;
168 }
169
170 ema as u64
171 }
172}
173
174pub struct LagMonitor {
176 node_lags: DashMap<NodeId, NodeLagData>,
178
179 primary_lsn: AtomicU64,
181
182 primary_id: RwLock<Option<NodeId>>,
184
185 config: LagRoutingConfig,
187
188 running: AtomicBool,
190
191 primary_updated_at: RwLock<Instant>,
193}
194
195impl LagMonitor {
196 pub fn new(config: LagRoutingConfig) -> Self {
198 Self {
199 node_lags: DashMap::new(),
200 primary_lsn: AtomicU64::new(0),
201 primary_id: RwLock::new(None),
202 config,
203 running: AtomicBool::new(false),
204 primary_updated_at: RwLock::new(Instant::now()),
205 }
206 }
207
208 pub fn with_defaults() -> Self {
210 Self::new(LagRoutingConfig::default())
211 }
212
213 pub fn is_running(&self) -> bool {
215 self.running.load(Ordering::Relaxed)
216 }
217
218 pub fn start(&self) {
220 self.running.store(true, Ordering::Relaxed);
221 }
222
223 pub fn stop(&self) {
225 self.running.store(false, Ordering::Relaxed);
226 }
227
228 pub fn set_primary(&self, node_id: &str) {
230 *self.primary_id.write() = Some(node_id.to_string());
231 }
232
233 pub fn get_primary(&self) -> Option<NodeId> {
235 self.primary_id.read().clone()
236 }
237
238 pub fn update_primary_lsn(&self, lsn: u64) {
240 self.primary_lsn.store(lsn, Ordering::SeqCst);
241 *self.primary_updated_at.write() = Instant::now();
242 }
243
244 pub fn get_primary_lsn(&self) -> u64 {
246 self.primary_lsn.load(Ordering::SeqCst)
247 }
248
249 pub fn register_standby(&self, node_id: &str, sync_mode: SyncMode) {
251 let mut data = NodeLagData::new(self.config.smoothing_window);
252 data.info.sync_mode = sync_mode;
253 self.node_lags.insert(node_id.to_string(), data);
254 }
255
256 pub fn remove_node(&self, node_id: &str) {
258 self.node_lags.remove(node_id);
259 }
260
261 pub fn update_standby_lag(
263 &self,
264 node_id: &str,
265 current_lsn: u64,
266 time_lag: Option<Duration>,
267 ) {
268 let primary_lsn = self.primary_lsn.load(Ordering::SeqCst);
269 let lag_bytes = primary_lsn.saturating_sub(current_lsn);
270
271 let lag_time = self.config.lag_calculation.calculate_lag(lag_bytes, time_lag);
273
274 let healthy = lag_time <= self.config.stale_threshold;
276
277 self.node_lags
278 .entry(node_id.to_string())
279 .and_modify(|data| {
280 data.add_sample(lag_bytes);
282
283 let trend = if self.config.enable_smoothing {
285 data.calculate_trend()
286 } else {
287 LagTrend::Unknown
288 };
289
290 let effective_lag_bytes = if self.config.enable_smoothing {
292 data.get_smoothed_lag()
293 } else {
294 lag_bytes
295 };
296
297 let effective_lag_time = self.config.lag_calculation.calculate_lag(
298 effective_lag_bytes,
299 time_lag,
300 );
301
302 data.info = LagInfo {
304 current_lsn,
305 lag_bytes: effective_lag_bytes,
306 lag_time: effective_lag_time,
307 updated_at: Instant::now(),
308 trend,
309 sync_mode: data.info.sync_mode,
310 healthy,
311 };
312 })
313 .or_insert_with(|| {
314 let mut data = NodeLagData::new(self.config.smoothing_window);
315 data.info = LagInfo {
316 current_lsn,
317 lag_bytes,
318 lag_time,
319 updated_at: Instant::now(),
320 trend: LagTrend::Unknown,
321 sync_mode: SyncMode::Unknown,
322 healthy,
323 };
324 data
325 });
326 }
327
328 pub fn get_lag(&self, node_id: &str) -> Option<LagInfo> {
330 self.node_lags.get(node_id).map(|data| data.info.clone())
331 }
332
333 pub fn get_all_lags(&self) -> Vec<(NodeId, LagInfo)> {
335 self.node_lags
336 .iter()
337 .map(|entry| (entry.key().clone(), entry.value().info.clone()))
338 .collect()
339 }
340
341 pub fn get_fresh_nodes(&self, max_lag: Duration) -> Vec<NodeId> {
343 let stale_threshold = self.config.poll_interval * 3;
344
345 self.node_lags
346 .iter()
347 .filter(|entry| {
348 let info = &entry.value().info;
349 !info.is_stale(stale_threshold) && info.meets_freshness(max_lag)
350 })
351 .map(|entry| entry.key().clone())
352 .collect()
353 }
354
355 pub fn get_nodes_at_lsn(&self, required_lsn: u64) -> Vec<NodeId> {
357 let stale_threshold = self.config.poll_interval * 3;
358
359 self.node_lags
360 .iter()
361 .filter(|entry| {
362 let info = &entry.value().info;
363 !info.is_stale(stale_threshold) && info.has_reached_lsn(required_lsn)
364 })
365 .map(|entry| entry.key().clone())
366 .collect()
367 }
368
369 pub fn has_reached_lsn(&self, node_id: &str, required_lsn: u64) -> bool {
371 self.node_lags
372 .get(node_id)
373 .map(|data| data.info.has_reached_lsn(required_lsn))
374 .unwrap_or(false)
375 }
376
377 pub fn get_healthy_nodes(&self) -> Vec<NodeId> {
379 self.node_lags
380 .iter()
381 .filter(|entry| entry.value().info.healthy)
382 .map(|entry| entry.key().clone())
383 .collect()
384 }
385
386 pub fn get_nodes_by_sync_mode(&self, mode: SyncMode) -> Vec<NodeId> {
388 self.node_lags
389 .iter()
390 .filter(|entry| entry.value().info.sync_mode == mode)
391 .map(|entry| entry.key().clone())
392 .collect()
393 }
394
395 pub fn get_freshest_standby(&self) -> Option<(NodeId, LagInfo)> {
397 let stale_threshold = self.config.poll_interval * 3;
398
399 self.node_lags
400 .iter()
401 .filter(|entry| {
402 let info = &entry.value().info;
403 info.healthy && !info.is_stale(stale_threshold)
404 })
405 .min_by_key(|entry| entry.value().info.lag_time)
406 .map(|entry| (entry.key().clone(), entry.value().info.clone()))
407 }
408
409 pub fn node_count(&self) -> usize {
411 self.node_lags.len()
412 }
413
414 pub fn clear(&self) {
416 self.node_lags.clear();
417 self.primary_lsn.store(0, Ordering::SeqCst);
418 }
419}
420
421impl std::fmt::Debug for LagMonitor {
422 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
423 f.debug_struct("LagMonitor")
424 .field("primary_lsn", &self.primary_lsn.load(Ordering::Relaxed))
425 .field("node_count", &self.node_lags.len())
426 .field("running", &self.running.load(Ordering::Relaxed))
427 .finish()
428 }
429}
430
431impl LagMonitor {
433 pub fn arc(config: LagRoutingConfig) -> Arc<Self> {
435 Arc::new(Self::new(config))
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 #[test]
444 fn test_lag_info_default() {
445 let info = LagInfo::default();
446 assert_eq!(info.current_lsn, 0);
447 assert_eq!(info.lag_bytes, 0);
448 assert!(info.healthy);
449 }
450
451 #[test]
452 fn test_lag_info_meets_freshness() {
453 let mut info = LagInfo::default();
454 info.lag_time = Duration::from_millis(100);
455
456 assert!(info.meets_freshness(Duration::from_millis(200)));
457 assert!(info.meets_freshness(Duration::from_millis(100)));
458 assert!(!info.meets_freshness(Duration::from_millis(50)));
459 }
460
461 #[test]
462 fn test_lag_info_has_reached_lsn() {
463 let mut info = LagInfo::default();
464 info.current_lsn = 1000;
465
466 assert!(info.has_reached_lsn(500));
467 assert!(info.has_reached_lsn(1000));
468 assert!(!info.has_reached_lsn(1001));
469 }
470
471 #[test]
472 fn test_lag_monitor_creation() {
473 let monitor = LagMonitor::with_defaults();
474 assert!(!monitor.is_running());
475 assert_eq!(monitor.node_count(), 0);
476 }
477
478 #[test]
479 fn test_lag_monitor_primary_lsn() {
480 let monitor = LagMonitor::with_defaults();
481 monitor.update_primary_lsn(1000);
482 assert_eq!(monitor.get_primary_lsn(), 1000);
483 }
484
485 #[test]
486 fn test_lag_monitor_register_standby() {
487 let monitor = LagMonitor::with_defaults();
488 monitor.register_standby("standby-1", SyncMode::Sync);
489 monitor.register_standby("standby-2", SyncMode::Async);
490
491 assert_eq!(monitor.node_count(), 2);
492 assert_eq!(monitor.get_nodes_by_sync_mode(SyncMode::Sync).len(), 1);
493 assert_eq!(monitor.get_nodes_by_sync_mode(SyncMode::Async).len(), 1);
494 }
495
496 #[test]
497 fn test_lag_monitor_update_lag() {
498 let monitor = LagMonitor::with_defaults();
499 monitor.update_primary_lsn(1000);
500 monitor.register_standby("standby-1", SyncMode::Async);
501 monitor.update_standby_lag("standby-1", 990, Some(Duration::from_millis(50)));
502
503 let lag = monitor.get_lag("standby-1").unwrap();
504 assert_eq!(lag.current_lsn, 990);
505 assert!(lag.lag_bytes > 0);
506 }
507
508 #[test]
509 fn test_lag_monitor_fresh_nodes() {
510 let monitor = LagMonitor::with_defaults();
511 monitor.update_primary_lsn(1000);
512
513 monitor.register_standby("fresh", SyncMode::Sync);
514 monitor.update_standby_lag("fresh", 999, Some(Duration::from_millis(10)));
515
516 monitor.register_standby("stale", SyncMode::Async);
517 monitor.update_standby_lag("stale", 500, Some(Duration::from_secs(5)));
518
519 let fresh = monitor.get_fresh_nodes(Duration::from_millis(100));
520 assert!(fresh.contains(&"fresh".to_string()));
521 assert!(!fresh.contains(&"stale".to_string()));
522 }
523
524 #[test]
525 fn test_lag_monitor_lsn_check() {
526 let monitor = LagMonitor::with_defaults();
527 monitor.update_primary_lsn(1000);
528 monitor.register_standby("standby-1", SyncMode::Async);
529 monitor.update_standby_lag("standby-1", 900, None);
530
531 assert!(monitor.has_reached_lsn("standby-1", 800));
532 assert!(monitor.has_reached_lsn("standby-1", 900));
533 assert!(!monitor.has_reached_lsn("standby-1", 901));
534 }
535
536 #[test]
537 fn test_lag_monitor_freshest_standby() {
538 let config = LagRoutingConfig::new()
539 .with_lag_calculation(LagCalculation::time());
540 let monitor = LagMonitor::new(config);
541 monitor.update_primary_lsn(1000);
542
543 monitor.register_standby("slow", SyncMode::Async);
544 monitor.update_standby_lag("slow", 900, Some(Duration::from_millis(500)));
545
546 monitor.register_standby("fast", SyncMode::Sync);
547 monitor.update_standby_lag("fast", 999, Some(Duration::from_millis(10)));
548
549 let (node_id, _) = monitor.get_freshest_standby().unwrap();
550 assert_eq!(node_id, "fast");
551 }
552
553 #[test]
554 fn test_node_lag_data_trend() {
555 let mut data = NodeLagData::new(10);
556
557 data.add_sample(1000);
559 data.add_sample(800);
560 data.add_sample(600);
561
562 assert_eq!(data.calculate_trend(), LagTrend::Improving);
563
564 data.add_sample(700);
566 data.add_sample(900);
567 data.add_sample(1100);
568
569 assert_eq!(data.calculate_trend(), LagTrend::Degrading);
570 }
571
572 #[test]
573 fn test_lag_trend_display() {
574 assert_eq!(LagTrend::Improving.to_string(), "improving");
575 assert_eq!(LagTrend::Stable.to_string(), "stable");
576 assert_eq!(LagTrend::Degrading.to_string(), "degrading");
577 assert_eq!(LagTrend::Unknown.to_string(), "unknown");
578 }
579}