Skip to main content

heliosdb_proxy/lag/
mod.rs

1//! Replica Lag-Aware Routing Module
2//!
3//! This module provides lag-aware query routing for HeliosProxy, ensuring
4//! queries are routed to replicas that meet freshness requirements.
5//!
6//! # Features
7//!
8//! - Continuous lag monitoring via WAL position tracking
9//! - Read-your-writes consistency guarantees
10//! - Configurable lag thresholds per sync mode
11//! - Integration with routing hints (`/*helios:lag=X*/`)
12//!
13//! # Architecture
14//!
15//! ```text
16//! Query + Lag Hint
17//!       │
18//!       ▼
19//! ┌─────────────────┐
20//! │  LagAwareRouter │──────► LagMonitor
21//! │                 │        (node lag data)
22//! │  - Extract hint │
23//! │  - Check RYW    │──────► RYWTracker
24//! │  - Filter nodes │        (session LSNs)
25//! └─────────────────┘
26//!       │
27//!       ▼
28//!   Eligible Nodes
29//! ```
30//!
31//! # Example
32//!
33//! ```rust,ignore
34//! use heliosdb::proxy::lag::{LagMonitor, LagAwareRouter, LagRoutingConfig};
35//!
36//! let config = LagRoutingConfig::default();
37//! let monitor = LagMonitor::new(config.poll_interval);
38//! let router = LagAwareRouter::new(monitor, config);
39//!
40//! // Route with freshness requirement
41//! let decision = router.route(query, session_id, Some(Duration::from_millis(100)));
42//! ```
43
44pub mod config;
45pub mod metrics;
46pub mod monitor;
47pub mod router;
48pub mod ryw;
49
50// Re-exports for convenience
51pub use config::{
52    LagCalculation, LagRoutingConfig, SyncModeLagConfig,
53};
54pub use metrics::{
55    LagMetrics, LagStatsSnapshot, NodeLagStats,
56};
57pub use monitor::{
58    LagInfo, LagMonitor, LagTrend, NodeLagData,
59};
60pub use router::{
61    LagAwareRouter, LagRoutingDecision, LagRoutingReason,
62};
63pub use ryw::{
64    ReadYourWritesTracker, RywSession, WorkflowConsistency, WorkflowTracker,
65};
66
67/// SyncMode enum for replica synchronization classification
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
69pub enum SyncMode {
70    /// Synchronous replication - zero data loss guarantee
71    Sync,
72    /// Semi-synchronous - bounded lag guarantee
73    SemiSync,
74    /// Asynchronous - eventual consistency
75    Async,
76    /// Unknown or unclassified
77    Unknown,
78}
79
80impl Default for SyncMode {
81    fn default() -> Self {
82        Self::Unknown
83    }
84}
85
86impl std::fmt::Display for SyncMode {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            SyncMode::Sync => write!(f, "sync"),
90            SyncMode::SemiSync => write!(f, "semisync"),
91            SyncMode::Async => write!(f, "async"),
92            SyncMode::Unknown => write!(f, "unknown"),
93        }
94    }
95}
96
97impl std::str::FromStr for SyncMode {
98    type Err = String;
99
100    fn from_str(s: &str) -> Result<Self, Self::Err> {
101        match s.to_lowercase().as_str() {
102            "sync" | "synchronous" => Ok(SyncMode::Sync),
103            "semisync" | "semi-sync" | "semi_sync" => Ok(SyncMode::SemiSync),
104            "async" | "asynchronous" => Ok(SyncMode::Async),
105            "unknown" => Ok(SyncMode::Unknown),
106            _ => Err(format!("Unknown sync mode: {}", s)),
107        }
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114
115    #[test]
116    fn test_sync_mode_display() {
117        assert_eq!(SyncMode::Sync.to_string(), "sync");
118        assert_eq!(SyncMode::SemiSync.to_string(), "semisync");
119        assert_eq!(SyncMode::Async.to_string(), "async");
120        assert_eq!(SyncMode::Unknown.to_string(), "unknown");
121    }
122
123    #[test]
124    fn test_sync_mode_from_str() {
125        assert_eq!("sync".parse::<SyncMode>().unwrap(), SyncMode::Sync);
126        assert_eq!("synchronous".parse::<SyncMode>().unwrap(), SyncMode::Sync);
127        assert_eq!("semisync".parse::<SyncMode>().unwrap(), SyncMode::SemiSync);
128        assert_eq!("semi-sync".parse::<SyncMode>().unwrap(), SyncMode::SemiSync);
129        assert_eq!("async".parse::<SyncMode>().unwrap(), SyncMode::Async);
130        assert_eq!("asynchronous".parse::<SyncMode>().unwrap(), SyncMode::Async);
131        assert!("invalid".parse::<SyncMode>().is_err());
132    }
133}