Skip to main content

heliosdb_proxy/lag/
mod.rs

1//! Replica Lag-Aware Routing Module
2//!
3//! This module provides lag-aware query routing for HeliosProxy, ensuring
4//! queries are routed to replicas that meet freshness requirements.
5//!
6//! # Features
7//!
8//! - Continuous lag monitoring via WAL position tracking
9//! - Read-your-writes consistency guarantees
10//! - Configurable lag thresholds per sync mode
11//! - Integration with routing hints (`/*helios:lag=X*/`)
12//!
13//! # Architecture
14//!
15//! ```text
16//! Query + Lag Hint
17//!       │
18//!       ▼
19//! ┌─────────────────┐
20//! │  LagAwareRouter │──────► LagMonitor
21//! │                 │        (node lag data)
22//! │  - Extract hint │
23//! │  - Check RYW    │──────► RYWTracker
24//! │  - Filter nodes │        (session LSNs)
25//! └─────────────────┘
26//!       │
27//!       ▼
28//!   Eligible Nodes
29//! ```
30//!
31//! # Example
32//!
33//! ```rust,ignore
34//! use heliosdb::proxy::lag::{LagMonitor, LagAwareRouter, LagRoutingConfig};
35//!
36//! let config = LagRoutingConfig::default();
37//! let monitor = LagMonitor::new(config.poll_interval);
38//! let router = LagAwareRouter::new(monitor, config);
39//!
40//! // Route with freshness requirement
41//! let decision = router.route(query, session_id, Some(Duration::from_millis(100)));
42//! ```
43
44pub mod config;
45pub mod metrics;
46pub mod monitor;
47pub mod router;
48pub mod ryw;
49
50// Re-exports for convenience
51pub use config::{LagCalculation, LagRoutingConfig, SyncModeLagConfig};
52pub use metrics::{LagMetrics, LagStatsSnapshot, NodeLagStats};
53pub use monitor::{LagInfo, LagMonitor, LagTrend, NodeLagData};
54pub use router::{LagAwareRouter, LagRoutingDecision, LagRoutingReason};
55pub use ryw::{ReadYourWritesTracker, RywSession, WorkflowConsistency, WorkflowTracker};
56
57/// SyncMode enum for replica synchronization classification
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
59pub enum SyncMode {
60    /// Synchronous replication - zero data loss guarantee
61    Sync,
62    /// Semi-synchronous - bounded lag guarantee
63    SemiSync,
64    /// Asynchronous - eventual consistency
65    Async,
66    /// Unknown or unclassified
67    #[default]
68    Unknown,
69}
70
71impl std::fmt::Display for SyncMode {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        match self {
74            SyncMode::Sync => write!(f, "sync"),
75            SyncMode::SemiSync => write!(f, "semisync"),
76            SyncMode::Async => write!(f, "async"),
77            SyncMode::Unknown => write!(f, "unknown"),
78        }
79    }
80}
81
82impl std::str::FromStr for SyncMode {
83    type Err = String;
84
85    fn from_str(s: &str) -> Result<Self, Self::Err> {
86        match s.to_lowercase().as_str() {
87            "sync" | "synchronous" => Ok(SyncMode::Sync),
88            "semisync" | "semi-sync" | "semi_sync" => Ok(SyncMode::SemiSync),
89            "async" | "asynchronous" => Ok(SyncMode::Async),
90            "unknown" => Ok(SyncMode::Unknown),
91            _ => Err(format!("Unknown sync mode: {}", s)),
92        }
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[test]
101    fn test_sync_mode_display() {
102        assert_eq!(SyncMode::Sync.to_string(), "sync");
103        assert_eq!(SyncMode::SemiSync.to_string(), "semisync");
104        assert_eq!(SyncMode::Async.to_string(), "async");
105        assert_eq!(SyncMode::Unknown.to_string(), "unknown");
106    }
107
108    #[test]
109    fn test_sync_mode_from_str() {
110        assert_eq!("sync".parse::<SyncMode>().unwrap(), SyncMode::Sync);
111        assert_eq!("synchronous".parse::<SyncMode>().unwrap(), SyncMode::Sync);
112        assert_eq!("semisync".parse::<SyncMode>().unwrap(), SyncMode::SemiSync);
113        assert_eq!("semi-sync".parse::<SyncMode>().unwrap(), SyncMode::SemiSync);
114        assert_eq!("async".parse::<SyncMode>().unwrap(), SyncMode::Async);
115        assert_eq!("asynchronous".parse::<SyncMode>().unwrap(), SyncMode::Async);
116        assert!("invalid".parse::<SyncMode>().is_err());
117    }
118}