use std::collections::HashMap;
use std::time::Duration;
use super::SyncMode;
#[derive(Debug, Clone)]
pub struct LagRoutingConfig {
pub enabled: bool,
pub poll_interval: Duration,
pub lag_calculation: LagCalculation,
pub default_max_lag: Duration,
pub fresh_threshold: Duration,
pub stale_threshold: Duration,
pub fallback_to_primary: bool,
pub fallback_threshold: Duration,
pub read_your_writes: bool,
pub ryw_retention: Duration,
pub sync_mode_limits: HashMap<SyncMode, SyncModeLagConfig>,
pub enable_smoothing: bool,
pub smoothing_window: usize,
pub min_samples: usize,
}
impl Default for LagRoutingConfig {
fn default() -> Self {
let mut sync_mode_limits = HashMap::new();
sync_mode_limits.insert(
SyncMode::Sync,
SyncModeLagConfig {
max_lag: Duration::from_millis(0),
weight: 1.0,
},
);
sync_mode_limits.insert(
SyncMode::SemiSync,
SyncModeLagConfig {
max_lag: Duration::from_millis(500),
weight: 0.8,
},
);
sync_mode_limits.insert(
SyncMode::Async,
SyncModeLagConfig {
max_lag: Duration::from_secs(10),
weight: 0.5,
},
);
Self {
enabled: true,
poll_interval: Duration::from_millis(100),
lag_calculation: LagCalculation::default(),
default_max_lag: Duration::from_secs(1),
fresh_threshold: Duration::from_millis(100),
stale_threshold: Duration::from_secs(10),
fallback_to_primary: true,
fallback_threshold: Duration::from_secs(5),
read_your_writes: true,
ryw_retention: Duration::from_secs(300), sync_mode_limits,
enable_smoothing: true,
smoothing_window: 10,
min_samples: 3,
}
}
}
impl LagRoutingConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn with_default_max_lag(mut self, lag: Duration) -> Self {
self.default_max_lag = lag;
self
}
pub fn with_fallback_threshold(mut self, threshold: Duration) -> Self {
self.fallback_threshold = threshold;
self
}
pub fn with_read_your_writes(mut self, enabled: bool) -> Self {
self.read_your_writes = enabled;
self
}
pub fn with_ryw_retention(mut self, retention: Duration) -> Self {
self.ryw_retention = retention;
self
}
pub fn with_lag_calculation(mut self, method: LagCalculation) -> Self {
self.lag_calculation = method;
self
}
pub fn with_smoothing(mut self, enabled: bool, window: usize) -> Self {
self.enable_smoothing = enabled;
self.smoothing_window = window;
self
}
pub fn get_sync_mode_max_lag(&self, mode: SyncMode) -> Duration {
self.sync_mode_limits
.get(&mode)
.map(|c| c.max_lag)
.unwrap_or(self.default_max_lag)
}
pub fn get_sync_mode_weight(&self, mode: SyncMode) -> f64 {
self.sync_mode_limits
.get(&mode)
.map(|c| c.weight)
.unwrap_or(1.0)
}
}
#[derive(Debug, Clone)]
pub struct SyncModeLagConfig {
pub max_lag: Duration,
pub weight: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LagCalculation {
Wal {
bytes_per_second: u64,
},
Time,
Hybrid {
bytes_per_second: u64,
},
}
impl Default for LagCalculation {
fn default() -> Self {
LagCalculation::Hybrid {
bytes_per_second: 50_000,
}
}
}
impl LagCalculation {
pub fn wal(bytes_per_second: u64) -> Self {
LagCalculation::Wal { bytes_per_second }
}
pub fn time() -> Self {
LagCalculation::Time
}
pub fn hybrid(bytes_per_second: u64) -> Self {
LagCalculation::Hybrid { bytes_per_second }
}
pub fn calculate_lag(&self, lag_bytes: u64, time_lag: Option<Duration>) -> Duration {
match self {
LagCalculation::Wal { bytes_per_second } => {
if *bytes_per_second == 0 {
return Duration::ZERO;
}
Duration::from_secs_f64(lag_bytes as f64 / *bytes_per_second as f64)
}
LagCalculation::Time => time_lag.unwrap_or(Duration::ZERO),
LagCalculation::Hybrid { bytes_per_second } => {
let wal_lag = if *bytes_per_second > 0 {
Duration::from_secs_f64(lag_bytes as f64 / *bytes_per_second as f64)
} else {
Duration::ZERO
};
let time = time_lag.unwrap_or(Duration::ZERO);
wal_lag.max(time)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = LagRoutingConfig::default();
assert!(config.enabled);
assert_eq!(config.poll_interval, Duration::from_millis(100));
assert_eq!(config.default_max_lag, Duration::from_secs(1));
assert!(config.fallback_to_primary);
assert!(config.read_your_writes);
}
#[test]
fn test_builder_pattern() {
let config = LagRoutingConfig::new()
.with_poll_interval(Duration::from_millis(50))
.with_default_max_lag(Duration::from_millis(500))
.with_read_your_writes(false);
assert_eq!(config.poll_interval, Duration::from_millis(50));
assert_eq!(config.default_max_lag, Duration::from_millis(500));
assert!(!config.read_your_writes);
}
#[test]
fn test_sync_mode_limits() {
let config = LagRoutingConfig::default();
assert_eq!(
config.get_sync_mode_max_lag(SyncMode::Sync),
Duration::from_millis(0)
);
assert_eq!(
config.get_sync_mode_max_lag(SyncMode::SemiSync),
Duration::from_millis(500)
);
assert_eq!(
config.get_sync_mode_max_lag(SyncMode::Async),
Duration::from_secs(10)
);
}
#[test]
fn test_lag_calculation_wal() {
let calc = LagCalculation::wal(1000); let lag = calc.calculate_lag(5000, None);
assert_eq!(lag, Duration::from_secs(5));
}
#[test]
fn test_lag_calculation_time() {
let calc = LagCalculation::time();
let lag = calc.calculate_lag(5000, Some(Duration::from_secs(3)));
assert_eq!(lag, Duration::from_secs(3));
}
#[test]
fn test_lag_calculation_hybrid() {
let calc = LagCalculation::hybrid(1000); let lag = calc.calculate_lag(5000, Some(Duration::from_secs(3)));
assert_eq!(lag, Duration::from_secs(5));
let lag2 = calc.calculate_lag(2000, Some(Duration::from_secs(4)));
assert_eq!(lag2, Duration::from_secs(4));
}
}