Skip to main content

heliosdb_proxy/lag/
config.rs

1//! Lag routing configuration types
2//!
3//! Configuration for replica lag monitoring and routing decisions.
4
5use std::collections::HashMap;
6use std::time::Duration;
7
8use super::SyncMode;
9
10/// Main configuration for lag-aware routing
11#[derive(Debug, Clone)]
12pub struct LagRoutingConfig {
13    /// Enable lag-aware routing
14    pub enabled: bool,
15
16    /// Interval between lag status polls
17    pub poll_interval: Duration,
18
19    /// Method for calculating lag
20    pub lag_calculation: LagCalculation,
21
22    /// Default maximum acceptable lag for reads
23    pub default_max_lag: Duration,
24
25    /// Threshold for considering a node "fresh"
26    pub fresh_threshold: Duration,
27
28    /// Threshold for marking a node unhealthy due to lag
29    pub stale_threshold: Duration,
30
31    /// Whether to fall back to primary when all standbys are too laggy
32    pub fallback_to_primary: bool,
33
34    /// Lag threshold that triggers primary fallback
35    pub fallback_threshold: Duration,
36
37    /// Enable read-your-writes tracking
38    pub read_your_writes: bool,
39
40    /// How long to retain RYW LSN requirements
41    pub ryw_retention: Duration,
42
43    /// Per-sync-mode lag limits
44    pub sync_mode_limits: HashMap<SyncMode, SyncModeLagConfig>,
45
46    /// Enable lag trend smoothing to avoid oscillation
47    pub enable_smoothing: bool,
48
49    /// Number of samples for trend smoothing
50    pub smoothing_window: usize,
51
52    /// Minimum samples before trusting lag data
53    pub min_samples: usize,
54}
55
56impl Default for LagRoutingConfig {
57    fn default() -> Self {
58        let mut sync_mode_limits = HashMap::new();
59        sync_mode_limits.insert(
60            SyncMode::Sync,
61            SyncModeLagConfig {
62                max_lag: Duration::from_millis(0),
63                weight: 1.0,
64            },
65        );
66        sync_mode_limits.insert(
67            SyncMode::SemiSync,
68            SyncModeLagConfig {
69                max_lag: Duration::from_millis(500),
70                weight: 0.8,
71            },
72        );
73        sync_mode_limits.insert(
74            SyncMode::Async,
75            SyncModeLagConfig {
76                max_lag: Duration::from_secs(10),
77                weight: 0.5,
78            },
79        );
80
81        Self {
82            enabled: true,
83            poll_interval: Duration::from_millis(100),
84            lag_calculation: LagCalculation::default(),
85            default_max_lag: Duration::from_secs(1),
86            fresh_threshold: Duration::from_millis(100),
87            stale_threshold: Duration::from_secs(10),
88            fallback_to_primary: true,
89            fallback_threshold: Duration::from_secs(5),
90            read_your_writes: true,
91            ryw_retention: Duration::from_secs(300), // 5 minutes
92            sync_mode_limits,
93            enable_smoothing: true,
94            smoothing_window: 10,
95            min_samples: 3,
96        }
97    }
98}
99
100impl LagRoutingConfig {
101    /// Create a new config with all defaults
102    pub fn new() -> Self {
103        Self::default()
104    }
105
106    /// Builder pattern: set poll interval
107    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
108        self.poll_interval = interval;
109        self
110    }
111
112    /// Builder pattern: set default max lag
113    pub fn with_default_max_lag(mut self, lag: Duration) -> Self {
114        self.default_max_lag = lag;
115        self
116    }
117
118    /// Builder pattern: set fallback threshold
119    pub fn with_fallback_threshold(mut self, threshold: Duration) -> Self {
120        self.fallback_threshold = threshold;
121        self
122    }
123
124    /// Builder pattern: enable/disable read-your-writes
125    pub fn with_read_your_writes(mut self, enabled: bool) -> Self {
126        self.read_your_writes = enabled;
127        self
128    }
129
130    /// Builder pattern: set RYW retention
131    pub fn with_ryw_retention(mut self, retention: Duration) -> Self {
132        self.ryw_retention = retention;
133        self
134    }
135
136    /// Builder pattern: set lag calculation method
137    pub fn with_lag_calculation(mut self, method: LagCalculation) -> Self {
138        self.lag_calculation = method;
139        self
140    }
141
142    /// Builder pattern: enable/disable smoothing
143    pub fn with_smoothing(mut self, enabled: bool, window: usize) -> Self {
144        self.enable_smoothing = enabled;
145        self.smoothing_window = window;
146        self
147    }
148
149    /// Get the max lag allowed for a sync mode
150    pub fn get_sync_mode_max_lag(&self, mode: SyncMode) -> Duration {
151        self.sync_mode_limits
152            .get(&mode)
153            .map(|c| c.max_lag)
154            .unwrap_or(self.default_max_lag)
155    }
156
157    /// Get the weight for a sync mode (for load balancing)
158    pub fn get_sync_mode_weight(&self, mode: SyncMode) -> f64 {
159        self.sync_mode_limits
160            .get(&mode)
161            .map(|c| c.weight)
162            .unwrap_or(1.0)
163    }
164}
165
166/// Configuration for a specific sync mode
167#[derive(Debug, Clone)]
168pub struct SyncModeLagConfig {
169    /// Maximum acceptable lag for this sync mode
170    pub max_lag: Duration,
171
172    /// Weight for load balancing (higher = more preferred)
173    pub weight: f64,
174}
175
176/// Method for calculating replication lag
177#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178pub enum LagCalculation {
179    /// WAL-based lag (LSN difference converted to time)
180    Wal {
181        /// Estimated WAL bytes per second for time conversion
182        bytes_per_second: u64,
183    },
184
185    /// Time-based lag (last transaction timestamp)
186    Time,
187
188    /// Hybrid (use both WAL and time, take maximum)
189    Hybrid {
190        /// Estimated WAL bytes per second for time conversion
191        bytes_per_second: u64,
192    },
193}
194
195impl Default for LagCalculation {
196    fn default() -> Self {
197        // Default to hybrid with 50KB/s WAL rate estimate
198        LagCalculation::Hybrid {
199            bytes_per_second: 50_000,
200        }
201    }
202}
203
204impl LagCalculation {
205    /// Create WAL-based calculation
206    pub fn wal(bytes_per_second: u64) -> Self {
207        LagCalculation::Wal { bytes_per_second }
208    }
209
210    /// Create time-based calculation
211    pub fn time() -> Self {
212        LagCalculation::Time
213    }
214
215    /// Create hybrid calculation
216    pub fn hybrid(bytes_per_second: u64) -> Self {
217        LagCalculation::Hybrid { bytes_per_second }
218    }
219
220    /// Calculate lag duration from byte lag
221    pub fn calculate_lag(&self, lag_bytes: u64, time_lag: Option<Duration>) -> Duration {
222        match self {
223            LagCalculation::Wal { bytes_per_second } => {
224                if *bytes_per_second == 0 {
225                    return Duration::ZERO;
226                }
227                Duration::from_secs_f64(lag_bytes as f64 / *bytes_per_second as f64)
228            }
229            LagCalculation::Time => time_lag.unwrap_or(Duration::ZERO),
230            LagCalculation::Hybrid { bytes_per_second } => {
231                let wal_lag = if *bytes_per_second > 0 {
232                    Duration::from_secs_f64(lag_bytes as f64 / *bytes_per_second as f64)
233                } else {
234                    Duration::ZERO
235                };
236                let time = time_lag.unwrap_or(Duration::ZERO);
237                wal_lag.max(time)
238            }
239        }
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn test_default_config() {
249        let config = LagRoutingConfig::default();
250        assert!(config.enabled);
251        assert_eq!(config.poll_interval, Duration::from_millis(100));
252        assert_eq!(config.default_max_lag, Duration::from_secs(1));
253        assert!(config.fallback_to_primary);
254        assert!(config.read_your_writes);
255    }
256
257    #[test]
258    fn test_builder_pattern() {
259        let config = LagRoutingConfig::new()
260            .with_poll_interval(Duration::from_millis(50))
261            .with_default_max_lag(Duration::from_millis(500))
262            .with_read_your_writes(false);
263
264        assert_eq!(config.poll_interval, Duration::from_millis(50));
265        assert_eq!(config.default_max_lag, Duration::from_millis(500));
266        assert!(!config.read_your_writes);
267    }
268
269    #[test]
270    fn test_sync_mode_limits() {
271        let config = LagRoutingConfig::default();
272        assert_eq!(
273            config.get_sync_mode_max_lag(SyncMode::Sync),
274            Duration::from_millis(0)
275        );
276        assert_eq!(
277            config.get_sync_mode_max_lag(SyncMode::SemiSync),
278            Duration::from_millis(500)
279        );
280        assert_eq!(
281            config.get_sync_mode_max_lag(SyncMode::Async),
282            Duration::from_secs(10)
283        );
284    }
285
286    #[test]
287    fn test_lag_calculation_wal() {
288        let calc = LagCalculation::wal(1000); // 1000 bytes/sec
289        let lag = calc.calculate_lag(5000, None);
290        assert_eq!(lag, Duration::from_secs(5));
291    }
292
293    #[test]
294    fn test_lag_calculation_time() {
295        let calc = LagCalculation::time();
296        let lag = calc.calculate_lag(5000, Some(Duration::from_secs(3)));
297        assert_eq!(lag, Duration::from_secs(3));
298    }
299
300    #[test]
301    fn test_lag_calculation_hybrid() {
302        let calc = LagCalculation::hybrid(1000); // 1000 bytes/sec
303                                                  // WAL lag = 5s, time lag = 3s -> max = 5s
304        let lag = calc.calculate_lag(5000, Some(Duration::from_secs(3)));
305        assert_eq!(lag, Duration::from_secs(5));
306
307        // WAL lag = 2s, time lag = 4s -> max = 4s
308        let lag2 = calc.calculate_lag(2000, Some(Duration::from_secs(4)));
309        assert_eq!(lag2, Duration::from_secs(4));
310    }
311}