1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use dashmap::DashMap;
12use parking_lot::RwLock;
13
14use super::config::LagRoutingConfig;
15use super::SyncMode;
16
17pub type NodeId = String;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
22pub enum LagTrend {
23 Improving,
25 Stable,
27 Degrading,
29 #[default]
31 Unknown,
32}
33
34impl std::fmt::Display for LagTrend {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 LagTrend::Improving => write!(f, "improving"),
38 LagTrend::Stable => write!(f, "stable"),
39 LagTrend::Degrading => write!(f, "degrading"),
40 LagTrend::Unknown => write!(f, "unknown"),
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
47pub struct LagInfo {
48 pub current_lsn: u64,
50
51 pub lag_bytes: u64,
53
54 pub lag_time: Duration,
56
57 pub updated_at: Instant,
59
60 pub trend: LagTrend,
62
63 pub sync_mode: SyncMode,
65
66 pub healthy: bool,
68}
69
70impl Default for LagInfo {
71 fn default() -> Self {
72 Self {
73 current_lsn: 0,
74 lag_bytes: 0,
75 lag_time: Duration::ZERO,
76 updated_at: Instant::now(),
77 trend: LagTrend::Unknown,
78 sync_mode: SyncMode::Unknown,
79 healthy: true,
80 }
81 }
82}
83
84impl LagInfo {
85 pub fn is_stale(&self, max_age: Duration) -> bool {
87 self.updated_at.elapsed() > max_age
88 }
89
90 pub fn meets_freshness(&self, max_lag: Duration) -> bool {
92 self.healthy && self.lag_time <= max_lag
93 }
94
95 pub fn has_reached_lsn(&self, required_lsn: u64) -> bool {
97 self.current_lsn >= required_lsn
98 }
99}
100
101#[derive(Debug)]
103pub struct NodeLagData {
104 pub info: LagInfo,
106
107 lag_history: VecDeque<u64>,
109
110 window_size: usize,
112}
113
114impl NodeLagData {
115 fn new(window_size: usize) -> Self {
116 Self {
117 info: LagInfo::default(),
118 lag_history: VecDeque::with_capacity(window_size),
119 window_size,
120 }
121 }
122
123 fn add_sample(&mut self, lag_bytes: u64) {
124 if self.lag_history.len() >= self.window_size {
125 self.lag_history.pop_front();
126 }
127 self.lag_history.push_back(lag_bytes);
128 }
129
130 fn calculate_trend(&self) -> LagTrend {
131 if self.lag_history.len() < 3 {
132 return LagTrend::Unknown;
133 }
134
135 let recent: Vec<_> = self.lag_history.iter().rev().take(3).collect();
136 let oldest = *recent[2];
137 let middle = *recent[1];
138 let newest = *recent[0];
139
140 let threshold = (oldest as f64 * 0.1) as u64; if newest + threshold < oldest && newest + threshold < middle {
144 LagTrend::Improving
145 } else if newest > oldest + threshold && newest > middle + threshold {
146 LagTrend::Degrading
147 } else {
148 LagTrend::Stable
149 }
150 }
151
152 fn get_smoothed_lag(&self) -> u64 {
153 if self.lag_history.is_empty() {
154 return self.info.lag_bytes;
155 }
156
157 let alpha = 0.3;
159 let mut ema = self.lag_history[0] as f64;
160
161 for &sample in self.lag_history.iter().skip(1) {
162 ema = alpha * sample as f64 + (1.0 - alpha) * ema;
163 }
164
165 ema as u64
166 }
167}
168
169pub struct LagMonitor {
171 node_lags: DashMap<NodeId, NodeLagData>,
173
174 primary_lsn: AtomicU64,
176
177 primary_id: RwLock<Option<NodeId>>,
179
180 config: LagRoutingConfig,
182
183 running: AtomicBool,
185
186 primary_updated_at: RwLock<Instant>,
188}
189
190impl LagMonitor {
191 pub fn new(config: LagRoutingConfig) -> Self {
193 Self {
194 node_lags: DashMap::new(),
195 primary_lsn: AtomicU64::new(0),
196 primary_id: RwLock::new(None),
197 config,
198 running: AtomicBool::new(false),
199 primary_updated_at: RwLock::new(Instant::now()),
200 }
201 }
202
203 pub fn with_defaults() -> Self {
205 Self::new(LagRoutingConfig::default())
206 }
207
208 pub fn is_running(&self) -> bool {
210 self.running.load(Ordering::Relaxed)
211 }
212
213 pub fn start(&self) {
215 self.running.store(true, Ordering::Relaxed);
216 }
217
218 pub fn stop(&self) {
220 self.running.store(false, Ordering::Relaxed);
221 }
222
223 pub fn set_primary(&self, node_id: &str) {
225 *self.primary_id.write() = Some(node_id.to_string());
226 }
227
228 pub fn get_primary(&self) -> Option<NodeId> {
230 self.primary_id.read().clone()
231 }
232
233 pub fn update_primary_lsn(&self, lsn: u64) {
235 self.primary_lsn.store(lsn, Ordering::SeqCst);
236 *self.primary_updated_at.write() = Instant::now();
237 }
238
239 pub fn get_primary_lsn(&self) -> u64 {
241 self.primary_lsn.load(Ordering::SeqCst)
242 }
243
244 pub fn register_standby(&self, node_id: &str, sync_mode: SyncMode) {
246 let mut data = NodeLagData::new(self.config.smoothing_window);
247 data.info.sync_mode = sync_mode;
248 self.node_lags.insert(node_id.to_string(), data);
249 }
250
251 pub fn remove_node(&self, node_id: &str) {
253 self.node_lags.remove(node_id);
254 }
255
256 pub fn update_standby_lag(&self, node_id: &str, current_lsn: u64, time_lag: Option<Duration>) {
258 let primary_lsn = self.primary_lsn.load(Ordering::SeqCst);
259 let lag_bytes = primary_lsn.saturating_sub(current_lsn);
260
261 let lag_time = self
263 .config
264 .lag_calculation
265 .calculate_lag(lag_bytes, time_lag);
266
267 let healthy = lag_time <= self.config.stale_threshold;
269
270 self.node_lags
271 .entry(node_id.to_string())
272 .and_modify(|data| {
273 data.add_sample(lag_bytes);
275
276 let trend = if self.config.enable_smoothing {
278 data.calculate_trend()
279 } else {
280 LagTrend::Unknown
281 };
282
283 let effective_lag_bytes = if self.config.enable_smoothing {
285 data.get_smoothed_lag()
286 } else {
287 lag_bytes
288 };
289
290 let effective_lag_time = self
291 .config
292 .lag_calculation
293 .calculate_lag(effective_lag_bytes, time_lag);
294
295 data.info = LagInfo {
297 current_lsn,
298 lag_bytes: effective_lag_bytes,
299 lag_time: effective_lag_time,
300 updated_at: Instant::now(),
301 trend,
302 sync_mode: data.info.sync_mode,
303 healthy,
304 };
305 })
306 .or_insert_with(|| {
307 let mut data = NodeLagData::new(self.config.smoothing_window);
308 data.info = LagInfo {
309 current_lsn,
310 lag_bytes,
311 lag_time,
312 updated_at: Instant::now(),
313 trend: LagTrend::Unknown,
314 sync_mode: SyncMode::Unknown,
315 healthy,
316 };
317 data
318 });
319 }
320
321 pub fn get_lag(&self, node_id: &str) -> Option<LagInfo> {
323 self.node_lags.get(node_id).map(|data| data.info.clone())
324 }
325
326 pub fn get_all_lags(&self) -> Vec<(NodeId, LagInfo)> {
328 self.node_lags
329 .iter()
330 .map(|entry| (entry.key().clone(), entry.value().info.clone()))
331 .collect()
332 }
333
334 pub fn get_fresh_nodes(&self, max_lag: Duration) -> Vec<NodeId> {
336 let stale_threshold = self.config.poll_interval * 3;
337
338 self.node_lags
339 .iter()
340 .filter(|entry| {
341 let info = &entry.value().info;
342 !info.is_stale(stale_threshold) && info.meets_freshness(max_lag)
343 })
344 .map(|entry| entry.key().clone())
345 .collect()
346 }
347
348 pub fn get_nodes_at_lsn(&self, required_lsn: u64) -> Vec<NodeId> {
350 let stale_threshold = self.config.poll_interval * 3;
351
352 self.node_lags
353 .iter()
354 .filter(|entry| {
355 let info = &entry.value().info;
356 !info.is_stale(stale_threshold) && info.has_reached_lsn(required_lsn)
357 })
358 .map(|entry| entry.key().clone())
359 .collect()
360 }
361
362 pub fn has_reached_lsn(&self, node_id: &str, required_lsn: u64) -> bool {
364 self.node_lags
365 .get(node_id)
366 .map(|data| data.info.has_reached_lsn(required_lsn))
367 .unwrap_or(false)
368 }
369
370 pub fn get_healthy_nodes(&self) -> Vec<NodeId> {
372 self.node_lags
373 .iter()
374 .filter(|entry| entry.value().info.healthy)
375 .map(|entry| entry.key().clone())
376 .collect()
377 }
378
379 pub fn get_nodes_by_sync_mode(&self, mode: SyncMode) -> Vec<NodeId> {
381 self.node_lags
382 .iter()
383 .filter(|entry| entry.value().info.sync_mode == mode)
384 .map(|entry| entry.key().clone())
385 .collect()
386 }
387
388 pub fn get_freshest_standby(&self) -> Option<(NodeId, LagInfo)> {
390 let stale_threshold = self.config.poll_interval * 3;
391
392 self.node_lags
393 .iter()
394 .filter(|entry| {
395 let info = &entry.value().info;
396 info.healthy && !info.is_stale(stale_threshold)
397 })
398 .min_by_key(|entry| entry.value().info.lag_time)
399 .map(|entry| (entry.key().clone(), entry.value().info.clone()))
400 }
401
402 pub fn node_count(&self) -> usize {
404 self.node_lags.len()
405 }
406
407 pub fn clear(&self) {
409 self.node_lags.clear();
410 self.primary_lsn.store(0, Ordering::SeqCst);
411 }
412}
413
414impl std::fmt::Debug for LagMonitor {
415 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416 f.debug_struct("LagMonitor")
417 .field("primary_lsn", &self.primary_lsn.load(Ordering::Relaxed))
418 .field("node_count", &self.node_lags.len())
419 .field("running", &self.running.load(Ordering::Relaxed))
420 .finish()
421 }
422}
423
424impl LagMonitor {
426 pub fn arc(config: LagRoutingConfig) -> Arc<Self> {
428 Arc::new(Self::new(config))
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use crate::lag::LagCalculation;
436
437 #[test]
438 fn test_lag_info_default() {
439 let info = LagInfo::default();
440 assert_eq!(info.current_lsn, 0);
441 assert_eq!(info.lag_bytes, 0);
442 assert!(info.healthy);
443 }
444
445 #[test]
446 fn test_lag_info_meets_freshness() {
447 let mut info = LagInfo::default();
448 info.lag_time = Duration::from_millis(100);
449
450 assert!(info.meets_freshness(Duration::from_millis(200)));
451 assert!(info.meets_freshness(Duration::from_millis(100)));
452 assert!(!info.meets_freshness(Duration::from_millis(50)));
453 }
454
455 #[test]
456 fn test_lag_info_has_reached_lsn() {
457 let mut info = LagInfo::default();
458 info.current_lsn = 1000;
459
460 assert!(info.has_reached_lsn(500));
461 assert!(info.has_reached_lsn(1000));
462 assert!(!info.has_reached_lsn(1001));
463 }
464
465 #[test]
466 fn test_lag_monitor_creation() {
467 let monitor = LagMonitor::with_defaults();
468 assert!(!monitor.is_running());
469 assert_eq!(monitor.node_count(), 0);
470 }
471
472 #[test]
473 fn test_lag_monitor_primary_lsn() {
474 let monitor = LagMonitor::with_defaults();
475 monitor.update_primary_lsn(1000);
476 assert_eq!(monitor.get_primary_lsn(), 1000);
477 }
478
479 #[test]
480 fn test_lag_monitor_register_standby() {
481 let monitor = LagMonitor::with_defaults();
482 monitor.register_standby("standby-1", SyncMode::Sync);
483 monitor.register_standby("standby-2", SyncMode::Async);
484
485 assert_eq!(monitor.node_count(), 2);
486 assert_eq!(monitor.get_nodes_by_sync_mode(SyncMode::Sync).len(), 1);
487 assert_eq!(monitor.get_nodes_by_sync_mode(SyncMode::Async).len(), 1);
488 }
489
490 #[test]
491 fn test_lag_monitor_update_lag() {
492 let monitor = LagMonitor::with_defaults();
493 monitor.update_primary_lsn(1000);
494 monitor.register_standby("standby-1", SyncMode::Async);
495 monitor.update_standby_lag("standby-1", 990, Some(Duration::from_millis(50)));
496
497 let lag = monitor.get_lag("standby-1").unwrap();
498 assert_eq!(lag.current_lsn, 990);
499 assert!(lag.lag_bytes > 0);
500 }
501
502 #[test]
503 fn test_lag_monitor_fresh_nodes() {
504 let monitor = LagMonitor::with_defaults();
505 monitor.update_primary_lsn(1000);
506
507 monitor.register_standby("fresh", SyncMode::Sync);
508 monitor.update_standby_lag("fresh", 999, Some(Duration::from_millis(10)));
509
510 monitor.register_standby("stale", SyncMode::Async);
511 monitor.update_standby_lag("stale", 500, Some(Duration::from_secs(5)));
512
513 let fresh = monitor.get_fresh_nodes(Duration::from_millis(100));
514 assert!(fresh.contains(&"fresh".to_string()));
515 assert!(!fresh.contains(&"stale".to_string()));
516 }
517
518 #[test]
519 fn test_lag_monitor_lsn_check() {
520 let monitor = LagMonitor::with_defaults();
521 monitor.update_primary_lsn(1000);
522 monitor.register_standby("standby-1", SyncMode::Async);
523 monitor.update_standby_lag("standby-1", 900, None);
524
525 assert!(monitor.has_reached_lsn("standby-1", 800));
526 assert!(monitor.has_reached_lsn("standby-1", 900));
527 assert!(!monitor.has_reached_lsn("standby-1", 901));
528 }
529
530 #[test]
531 fn test_lag_monitor_freshest_standby() {
532 let config = LagRoutingConfig::new().with_lag_calculation(LagCalculation::time());
533 let monitor = LagMonitor::new(config);
534 monitor.update_primary_lsn(1000);
535
536 monitor.register_standby("slow", SyncMode::Async);
537 monitor.update_standby_lag("slow", 900, Some(Duration::from_millis(500)));
538
539 monitor.register_standby("fast", SyncMode::Sync);
540 monitor.update_standby_lag("fast", 999, Some(Duration::from_millis(10)));
541
542 let (node_id, _) = monitor.get_freshest_standby().unwrap();
543 assert_eq!(node_id, "fast");
544 }
545
546 #[test]
547 fn test_node_lag_data_trend() {
548 let mut data = NodeLagData::new(10);
549
550 data.add_sample(1000);
552 data.add_sample(800);
553 data.add_sample(600);
554
555 assert_eq!(data.calculate_trend(), LagTrend::Improving);
556
557 data.add_sample(700);
559 data.add_sample(900);
560 data.add_sample(1100);
561
562 assert_eq!(data.calculate_trend(), LagTrend::Degrading);
563 }
564
565 #[test]
566 fn test_lag_trend_display() {
567 assert_eq!(LagTrend::Improving.to_string(), "improving");
568 assert_eq!(LagTrend::Stable.to_string(), "stable");
569 assert_eq!(LagTrend::Degrading.to_string(), "degrading");
570 assert_eq!(LagTrend::Unknown.to_string(), "unknown");
571 }
572}