heliosdb_proxy/lag/
config.rs1use std::collections::HashMap;
6use std::time::Duration;
7
8use super::SyncMode;
9
10#[derive(Debug, Clone)]
12pub struct LagRoutingConfig {
13 pub enabled: bool,
15
16 pub poll_interval: Duration,
18
19 pub lag_calculation: LagCalculation,
21
22 pub default_max_lag: Duration,
24
25 pub fresh_threshold: Duration,
27
28 pub stale_threshold: Duration,
30
31 pub fallback_to_primary: bool,
33
34 pub fallback_threshold: Duration,
36
37 pub read_your_writes: bool,
39
40 pub ryw_retention: Duration,
42
43 pub sync_mode_limits: HashMap<SyncMode, SyncModeLagConfig>,
45
46 pub enable_smoothing: bool,
48
49 pub smoothing_window: usize,
51
52 pub min_samples: usize,
54}
55
56impl Default for LagRoutingConfig {
57 fn default() -> Self {
58 let mut sync_mode_limits = HashMap::new();
59 sync_mode_limits.insert(
60 SyncMode::Sync,
61 SyncModeLagConfig {
62 max_lag: Duration::from_millis(0),
63 weight: 1.0,
64 },
65 );
66 sync_mode_limits.insert(
67 SyncMode::SemiSync,
68 SyncModeLagConfig {
69 max_lag: Duration::from_millis(500),
70 weight: 0.8,
71 },
72 );
73 sync_mode_limits.insert(
74 SyncMode::Async,
75 SyncModeLagConfig {
76 max_lag: Duration::from_secs(10),
77 weight: 0.5,
78 },
79 );
80
81 Self {
82 enabled: true,
83 poll_interval: Duration::from_millis(100),
84 lag_calculation: LagCalculation::default(),
85 default_max_lag: Duration::from_secs(1),
86 fresh_threshold: Duration::from_millis(100),
87 stale_threshold: Duration::from_secs(10),
88 fallback_to_primary: true,
89 fallback_threshold: Duration::from_secs(5),
90 read_your_writes: true,
91 ryw_retention: Duration::from_secs(300), sync_mode_limits,
93 enable_smoothing: true,
94 smoothing_window: 10,
95 min_samples: 3,
96 }
97 }
98}
99
100impl LagRoutingConfig {
101 pub fn new() -> Self {
103 Self::default()
104 }
105
106 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
108 self.poll_interval = interval;
109 self
110 }
111
112 pub fn with_default_max_lag(mut self, lag: Duration) -> Self {
114 self.default_max_lag = lag;
115 self
116 }
117
118 pub fn with_fallback_threshold(mut self, threshold: Duration) -> Self {
120 self.fallback_threshold = threshold;
121 self
122 }
123
124 pub fn with_read_your_writes(mut self, enabled: bool) -> Self {
126 self.read_your_writes = enabled;
127 self
128 }
129
130 pub fn with_ryw_retention(mut self, retention: Duration) -> Self {
132 self.ryw_retention = retention;
133 self
134 }
135
136 pub fn with_lag_calculation(mut self, method: LagCalculation) -> Self {
138 self.lag_calculation = method;
139 self
140 }
141
142 pub fn with_smoothing(mut self, enabled: bool, window: usize) -> Self {
144 self.enable_smoothing = enabled;
145 self.smoothing_window = window;
146 self
147 }
148
149 pub fn get_sync_mode_max_lag(&self, mode: SyncMode) -> Duration {
151 self.sync_mode_limits
152 .get(&mode)
153 .map(|c| c.max_lag)
154 .unwrap_or(self.default_max_lag)
155 }
156
157 pub fn get_sync_mode_weight(&self, mode: SyncMode) -> f64 {
159 self.sync_mode_limits
160 .get(&mode)
161 .map(|c| c.weight)
162 .unwrap_or(1.0)
163 }
164}
165
166#[derive(Debug, Clone)]
168pub struct SyncModeLagConfig {
169 pub max_lag: Duration,
171
172 pub weight: f64,
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
178pub enum LagCalculation {
179 Wal {
181 bytes_per_second: u64,
183 },
184
185 Time,
187
188 Hybrid {
190 bytes_per_second: u64,
192 },
193}
194
195impl Default for LagCalculation {
196 fn default() -> Self {
197 LagCalculation::Hybrid {
199 bytes_per_second: 50_000,
200 }
201 }
202}
203
204impl LagCalculation {
205 pub fn wal(bytes_per_second: u64) -> Self {
207 LagCalculation::Wal { bytes_per_second }
208 }
209
210 pub fn time() -> Self {
212 LagCalculation::Time
213 }
214
215 pub fn hybrid(bytes_per_second: u64) -> Self {
217 LagCalculation::Hybrid { bytes_per_second }
218 }
219
220 pub fn calculate_lag(&self, lag_bytes: u64, time_lag: Option<Duration>) -> Duration {
222 match self {
223 LagCalculation::Wal { bytes_per_second } => {
224 if *bytes_per_second == 0 {
225 return Duration::ZERO;
226 }
227 Duration::from_secs_f64(lag_bytes as f64 / *bytes_per_second as f64)
228 }
229 LagCalculation::Time => time_lag.unwrap_or(Duration::ZERO),
230 LagCalculation::Hybrid { bytes_per_second } => {
231 let wal_lag = if *bytes_per_second > 0 {
232 Duration::from_secs_f64(lag_bytes as f64 / *bytes_per_second as f64)
233 } else {
234 Duration::ZERO
235 };
236 let time = time_lag.unwrap_or(Duration::ZERO);
237 wal_lag.max(time)
238 }
239 }
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn test_default_config() {
249 let config = LagRoutingConfig::default();
250 assert!(config.enabled);
251 assert_eq!(config.poll_interval, Duration::from_millis(100));
252 assert_eq!(config.default_max_lag, Duration::from_secs(1));
253 assert!(config.fallback_to_primary);
254 assert!(config.read_your_writes);
255 }
256
257 #[test]
258 fn test_builder_pattern() {
259 let config = LagRoutingConfig::new()
260 .with_poll_interval(Duration::from_millis(50))
261 .with_default_max_lag(Duration::from_millis(500))
262 .with_read_your_writes(false);
263
264 assert_eq!(config.poll_interval, Duration::from_millis(50));
265 assert_eq!(config.default_max_lag, Duration::from_millis(500));
266 assert!(!config.read_your_writes);
267 }
268
269 #[test]
270 fn test_sync_mode_limits() {
271 let config = LagRoutingConfig::default();
272 assert_eq!(
273 config.get_sync_mode_max_lag(SyncMode::Sync),
274 Duration::from_millis(0)
275 );
276 assert_eq!(
277 config.get_sync_mode_max_lag(SyncMode::SemiSync),
278 Duration::from_millis(500)
279 );
280 assert_eq!(
281 config.get_sync_mode_max_lag(SyncMode::Async),
282 Duration::from_secs(10)
283 );
284 }
285
286 #[test]
287 fn test_lag_calculation_wal() {
288 let calc = LagCalculation::wal(1000); let lag = calc.calculate_lag(5000, None);
290 assert_eq!(lag, Duration::from_secs(5));
291 }
292
293 #[test]
294 fn test_lag_calculation_time() {
295 let calc = LagCalculation::time();
296 let lag = calc.calculate_lag(5000, Some(Duration::from_secs(3)));
297 assert_eq!(lag, Duration::from_secs(3));
298 }
299
300 #[test]
301 fn test_lag_calculation_hybrid() {
302 let calc = LagCalculation::hybrid(1000); let lag = calc.calculate_lag(5000, Some(Duration::from_secs(3)));
305 assert_eq!(lag, Duration::from_secs(5));
306
307 let lag2 = calc.calculate_lag(2000, Some(Duration::from_secs(4)));
309 assert_eq!(lag2, Duration::from_secs(4));
310 }
311}