use std::sync::Arc;
use std::time::{Duration, Instant};
use super::config::LagRoutingConfig;
use super::monitor::{LagInfo, LagMonitor, NodeId};
use super::ryw::ReadYourWritesTracker;
use super::SyncMode;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LagRoutingReason {
Primary(String),
FreshnessMatch(String),
LsnMatch(String),
FallbackToPrimary(String),
RywFallback(String),
NoEligibleNodes(String),
Bypassed(String),
}
impl std::fmt::Display for LagRoutingReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LagRoutingReason::Primary(msg) => write!(f, "primary: {}", msg),
LagRoutingReason::FreshnessMatch(msg) => write!(f, "freshness: {}", msg),
LagRoutingReason::LsnMatch(msg) => write!(f, "lsn: {}", msg),
LagRoutingReason::FallbackToPrimary(msg) => write!(f, "fallback: {}", msg),
LagRoutingReason::RywFallback(msg) => write!(f, "ryw-fallback: {}", msg),
LagRoutingReason::NoEligibleNodes(msg) => write!(f, "no-nodes: {}", msg),
LagRoutingReason::Bypassed(msg) => write!(f, "bypassed: {}", msg),
}
}
}
#[derive(Debug, Clone)]
pub struct LagRoutingDecision {
pub target_node: Option<NodeId>,
pub use_primary: bool,
pub reason: LagRoutingReason,
pub lag_info: Option<LagInfo>,
pub elapsed: Duration,
pub max_lag_applied: Option<Duration>,
pub lsn_requirement: Option<u64>,
}
impl LagRoutingDecision {
pub fn primary(reason: LagRoutingReason, elapsed: Duration) -> Self {
Self {
target_node: None,
use_primary: true,
reason,
lag_info: None,
elapsed,
max_lag_applied: None,
lsn_requirement: None,
}
}
pub fn standby(
node_id: NodeId,
reason: LagRoutingReason,
lag_info: LagInfo,
elapsed: Duration,
) -> Self {
Self {
target_node: Some(node_id),
use_primary: false,
reason,
lag_info: Some(lag_info),
elapsed,
max_lag_applied: None,
lsn_requirement: None,
}
}
pub fn with_max_lag(mut self, max_lag: Duration) -> Self {
self.max_lag_applied = Some(max_lag);
self
}
pub fn with_lsn(mut self, lsn: u64) -> Self {
self.lsn_requirement = Some(lsn);
self
}
}
pub struct LagAwareRouter {
lag_monitor: Arc<LagMonitor>,
ryw_tracker: Arc<ReadYourWritesTracker>,
config: LagRoutingConfig,
}
impl LagAwareRouter {
pub fn new(
lag_monitor: Arc<LagMonitor>,
ryw_tracker: Arc<ReadYourWritesTracker>,
config: LagRoutingConfig,
) -> Self {
Self {
lag_monitor,
ryw_tracker,
config,
}
}
pub fn with_shared(
lag_monitor: Arc<LagMonitor>,
config: LagRoutingConfig,
) -> Self {
let ryw_tracker = Arc::new(ReadYourWritesTracker::new(config.ryw_retention));
Self::new(lag_monitor, ryw_tracker, config)
}
pub fn ryw_tracker(&self) -> &Arc<ReadYourWritesTracker> {
&self.ryw_tracker
}
pub fn route(
&self,
session_id: Option<&str>,
max_lag: Option<Duration>,
prefer_sync_mode: Option<SyncMode>,
) -> LagRoutingDecision {
let start = Instant::now();
if !self.config.enabled {
return LagRoutingDecision::primary(
LagRoutingReason::Bypassed("Lag routing disabled".to_string()),
start.elapsed(),
);
}
let max_lag = max_lag.unwrap_or(self.config.default_max_lag);
if self.config.read_your_writes {
if let Some(sid) = session_id {
if let Some(required_lsn) = self.ryw_tracker.get_required_lsn(sid) {
return self
.route_with_lsn_requirement(required_lsn, start)
.with_lsn(required_lsn);
}
}
}
self.route_by_freshness(max_lag, prefer_sync_mode, start)
}
fn route_with_lsn_requirement(
&self,
required_lsn: u64,
start: Instant,
) -> LagRoutingDecision {
let eligible = self.lag_monitor.get_nodes_at_lsn(required_lsn);
if eligible.is_empty() {
if self.config.fallback_to_primary {
return LagRoutingDecision::primary(
LagRoutingReason::RywFallback(format!(
"No standby reached LSN {}",
required_lsn
)),
start.elapsed(),
);
}
return LagRoutingDecision::primary(
LagRoutingReason::NoEligibleNodes(
"No standby caught up for RYW".to_string(),
),
start.elapsed(),
);
}
let (node_id, lag_info) = self.select_best_node(&eligible);
LagRoutingDecision::standby(
node_id,
LagRoutingReason::LsnMatch(format!("Reached LSN {}", required_lsn)),
lag_info,
start.elapsed(),
)
}
fn route_by_freshness(
&self,
max_lag: Duration,
prefer_sync_mode: Option<SyncMode>,
start: Instant,
) -> LagRoutingDecision {
if max_lag == Duration::ZERO {
let sync_nodes = self.lag_monitor.get_nodes_by_sync_mode(SyncMode::Sync);
let fresh_sync: Vec<_> = sync_nodes
.into_iter()
.filter(|n| {
self.lag_monitor
.get_lag(n)
.map(|info| info.healthy)
.unwrap_or(false)
})
.collect();
if fresh_sync.is_empty() {
return LagRoutingDecision::primary(
LagRoutingReason::Primary(
"Zero lag required, no sync standby available".to_string(),
),
start.elapsed(),
)
.with_max_lag(max_lag);
}
let (node_id, lag_info) = self.select_best_node(&fresh_sync);
return LagRoutingDecision::standby(
node_id,
LagRoutingReason::FreshnessMatch("Sync standby with zero lag".to_string()),
lag_info,
start.elapsed(),
)
.with_max_lag(max_lag);
}
let mut eligible = self.lag_monitor.get_fresh_nodes(max_lag);
if let Some(mode) = prefer_sync_mode {
let mode_nodes = self.lag_monitor.get_nodes_by_sync_mode(mode);
let preferred: Vec<_> = eligible
.iter()
.filter(|n| mode_nodes.contains(n))
.cloned()
.collect();
if !preferred.is_empty() {
eligible = preferred;
}
}
if eligible.is_empty() {
if self.config.fallback_to_primary {
return LagRoutingDecision::primary(
LagRoutingReason::FallbackToPrimary(format!(
"All standbys exceed {}ms lag",
max_lag.as_millis()
)),
start.elapsed(),
)
.with_max_lag(max_lag);
}
if let Some((node_id, lag_info)) = self.lag_monitor.get_freshest_standby() {
return LagRoutingDecision::standby(
node_id,
LagRoutingReason::FreshnessMatch(format!(
"Best available ({}ms lag, wanted {}ms)",
lag_info.lag_time.as_millis(),
max_lag.as_millis()
)),
lag_info,
start.elapsed(),
)
.with_max_lag(max_lag);
}
return LagRoutingDecision::primary(
LagRoutingReason::NoEligibleNodes("No healthy standbys".to_string()),
start.elapsed(),
)
.with_max_lag(max_lag);
}
let (node_id, lag_info) = self.select_best_node(&eligible);
LagRoutingDecision::standby(
node_id,
LagRoutingReason::FreshnessMatch(format!(
"{}ms lag <= {}ms requirement",
lag_info.lag_time.as_millis(),
max_lag.as_millis()
)),
lag_info,
start.elapsed(),
)
.with_max_lag(max_lag)
}
fn select_best_node(&self, eligible: &[NodeId]) -> (NodeId, LagInfo) {
let mut best: Option<(NodeId, LagInfo, f64)> = None;
for node_id in eligible {
if let Some(lag_info) = self.lag_monitor.get_lag(node_id) {
let weight = self.config.get_sync_mode_weight(lag_info.sync_mode);
let score = lag_info.lag_time.as_secs_f64() / weight;
if best.is_none() || score < best.as_ref().unwrap().2 {
best = Some((node_id.clone(), lag_info, score));
}
}
}
best.map(|(id, info, _)| (id, info))
.unwrap_or_else(|| {
(
eligible[0].clone(),
self.lag_monitor
.get_lag(&eligible[0])
.unwrap_or_default(),
)
})
}
pub fn record_write(&self, session_id: &str, write_lsn: u64) {
if self.config.read_your_writes {
self.ryw_tracker.record_write(session_id, write_lsn);
}
}
pub fn clear_ryw(&self, session_id: &str) {
self.ryw_tracker.clear(session_id);
}
pub fn config(&self) -> &LagRoutingConfig {
&self.config
}
}
impl std::fmt::Debug for LagAwareRouter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LagAwareRouter")
.field("enabled", &self.config.enabled)
.field("default_max_lag", &self.config.default_max_lag)
.field("ryw_enabled", &self.config.read_your_writes)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lag::config::LagCalculation;
fn setup_router() -> LagAwareRouter {
let config = LagRoutingConfig::new()
.with_lag_calculation(LagCalculation::time())
.with_default_max_lag(Duration::from_secs(1));
let monitor = Arc::new(LagMonitor::new(config.clone()));
monitor.update_primary_lsn(10000);
monitor.register_standby("sync-1", SyncMode::Sync);
monitor.update_standby_lag("sync-1", 9999, Some(Duration::from_millis(5)));
monitor.register_standby("async-1", SyncMode::Async);
monitor.update_standby_lag("async-1", 9500, Some(Duration::from_millis(200)));
monitor.register_standby("async-2", SyncMode::Async);
monitor.update_standby_lag("async-2", 9000, Some(Duration::from_secs(2)));
LagAwareRouter::with_shared(monitor, config)
}
#[test]
fn test_route_with_default_lag() {
let router = setup_router();
let decision = router.route(None, None, None);
assert!(!decision.use_primary);
assert!(decision.target_node.is_some());
let node = decision.target_node.unwrap();
assert!(node == "sync-1" || node == "async-1");
}
#[test]
fn test_route_zero_lag() {
let router = setup_router();
let decision = router.route(None, Some(Duration::ZERO), None);
if decision.target_node.is_some() {
assert_eq!(decision.target_node.as_ref().unwrap(), "sync-1");
}
}
#[test]
fn test_route_tight_lag() {
let router = setup_router();
let decision = router.route(None, Some(Duration::from_millis(10)), None);
if !decision.use_primary {
assert_eq!(decision.target_node.as_ref().unwrap(), "sync-1");
}
}
#[test]
fn test_route_prefer_sync_mode() {
let router = setup_router();
let decision = router.route(None, Some(Duration::from_secs(1)), Some(SyncMode::Sync));
assert!(!decision.use_primary);
assert_eq!(decision.target_node.as_ref().unwrap(), "sync-1");
}
#[test]
fn test_route_read_your_writes() {
let config = LagRoutingConfig::new()
.with_lag_calculation(LagCalculation::time())
.with_read_your_writes(true);
let monitor = Arc::new(LagMonitor::new(config.clone()));
monitor.update_primary_lsn(10000);
monitor.register_standby("standby-1", SyncMode::Async);
monitor.update_standby_lag("standby-1", 9500, Some(Duration::from_millis(100)));
let router = LagAwareRouter::with_shared(monitor, config);
router.record_write("session-1", 9800);
let decision = router.route(Some("session-1"), None, None);
assert!(decision.use_primary);
match decision.reason {
LagRoutingReason::RywFallback(_) => {}
_ => panic!("Expected RywFallback reason"),
}
}
#[test]
fn test_route_ryw_satisfied() {
let config = LagRoutingConfig::new()
.with_lag_calculation(LagCalculation::time())
.with_read_your_writes(true);
let monitor = Arc::new(LagMonitor::new(config.clone()));
monitor.update_primary_lsn(10000);
monitor.register_standby("standby-1", SyncMode::Async);
monitor.update_standby_lag("standby-1", 9800, Some(Duration::from_millis(100)));
let router = LagAwareRouter::with_shared(monitor, config);
router.record_write("session-1", 9700);
let decision = router.route(Some("session-1"), None, None);
assert!(!decision.use_primary);
assert_eq!(decision.target_node.as_ref().unwrap(), "standby-1");
match decision.reason {
LagRoutingReason::LsnMatch(_) => {}
_ => panic!("Expected LsnMatch reason"),
}
}
#[test]
fn test_routing_decision_display() {
let reason = LagRoutingReason::FreshnessMatch("100ms lag".to_string());
assert!(reason.to_string().contains("freshness"));
let reason = LagRoutingReason::FallbackToPrimary("all laggy".to_string());
assert!(reason.to_string().contains("fallback"));
}
#[test]
fn test_disabled_routing() {
let mut config = LagRoutingConfig::new();
config.enabled = false;
let monitor = Arc::new(LagMonitor::new(config.clone()));
let router = LagAwareRouter::with_shared(monitor, config);
let decision = router.route(None, None, None);
assert!(decision.use_primary);
match decision.reason {
LagRoutingReason::Bypassed(_) => {}
_ => panic!("Expected Bypassed reason"),
}
}
#[test]
fn test_select_best_node_prefers_lower_lag() {
let config = LagRoutingConfig::new()
.with_lag_calculation(LagCalculation::time());
let monitor = Arc::new(LagMonitor::new(config.clone()));
monitor.update_primary_lsn(10000);
monitor.register_standby("slow", SyncMode::Async);
monitor.update_standby_lag("slow", 9000, Some(Duration::from_millis(500)));
monitor.register_standby("fast", SyncMode::Async);
monitor.update_standby_lag("fast", 9900, Some(Duration::from_millis(50)));
let router = LagAwareRouter::with_shared(monitor, config);
let decision = router.route(None, Some(Duration::from_secs(1)), None);
assert!(!decision.use_primary);
assert_eq!(decision.target_node.as_ref().unwrap(), "fast");
}
}