jflow_core/supervisor/
backoff.rs1use std::time::{Duration, Instant};
25
26use rand::RngExt;
27
28#[derive(Debug, Clone)]
32pub struct BackoffConfig {
33 pub base_delay: Duration,
35
36 pub max_delay: Duration,
38
39 pub cooldown_period: Duration,
42
43 pub max_retries: u32,
46
47 pub circuit_breaker_window: Duration,
50}
51
52impl Default for BackoffConfig {
53 fn default() -> Self {
54 Self {
55 base_delay: Duration::from_millis(100),
56 max_delay: Duration::from_secs(60),
57 cooldown_period: Duration::from_secs(300), max_retries: 10,
59 circuit_breaker_window: Duration::from_secs(600), }
61 }
62}
63
64impl BackoffConfig {
65 pub fn new(base_delay: Duration, max_delay: Duration) -> Self {
67 Self {
68 base_delay,
69 max_delay,
70 ..Default::default()
71 }
72 }
73
74 pub fn with_cooldown(mut self, cooldown: Duration) -> Self {
76 self.cooldown_period = cooldown;
77 self
78 }
79
80 pub fn with_circuit_breaker(mut self, max_retries: u32, window: Duration) -> Self {
82 self.max_retries = max_retries;
83 self.circuit_breaker_window = window;
84 self
85 }
86
87 pub fn without_circuit_breaker(mut self) -> Self {
89 self.max_retries = 0;
90 self
91 }
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
96pub enum BackoffAction {
97 Retry(Duration),
99
100 CircuitOpen {
104 failures: u32,
106 max_retries: u32,
108 },
109}
110
111#[derive(Debug, Clone)]
116pub struct BackoffState {
117 config: BackoffConfig,
118
119 attempt: u32,
121
122 failure_timestamps: Vec<Instant>,
125
126 last_start: Option<Instant>,
128}
129
130impl BackoffState {
131 pub fn new(config: BackoffConfig) -> Self {
133 Self {
134 config,
135 attempt: 0,
136 failure_timestamps: Vec::new(),
137 last_start: None,
138 }
139 }
140
141 pub fn with_defaults() -> Self {
143 Self::new(BackoffConfig::default())
144 }
145
146 pub fn record_start(&mut self) {
150 self.last_start = Some(Instant::now());
151 }
152
153 pub fn maybe_reset_on_cooldown(&mut self) {
160 if let Some(start) = self.last_start
161 && start.elapsed() >= self.config.cooldown_period
162 {
163 tracing::info!(
164 cooldown_secs = self.config.cooldown_period.as_secs(),
165 "Service ran for longer than cooldown period, resetting backoff"
166 );
167 self.attempt = 0;
168 self.failure_timestamps.clear();
169 }
170 }
171
172 pub fn next_backoff(&mut self) -> BackoffAction {
177 let now = Instant::now();
178
179 self.failure_timestamps.push(now);
181 self.attempt = self.attempt.saturating_add(1);
182
183 if self.config.max_retries > 0 {
185 let window_start = now - self.config.circuit_breaker_window;
186 self.failure_timestamps.retain(|ts| *ts >= window_start);
187
188 if self.failure_timestamps.len() as u32 >= self.config.max_retries {
190 return BackoffAction::CircuitOpen {
191 failures: self.failure_timestamps.len() as u32,
192 max_retries: self.config.max_retries,
193 };
194 }
195 }
196
197 let exp_delay = self.compute_exponential_delay();
199
200 let jittered = self.add_jitter(exp_delay);
202
203 BackoffAction::Retry(jittered)
204 }
205
206 pub fn reset(&mut self) {
208 self.attempt = 0;
209 self.failure_timestamps.clear();
210 self.last_start = None;
211 }
212
213 pub fn attempt(&self) -> u32 {
215 self.attempt
216 }
217
218 pub fn recent_failures(&self) -> usize {
220 self.failure_timestamps.len()
221 }
222
223 fn compute_exponential_delay(&self) -> Duration {
227 let base_ms = self.config.base_delay.as_millis() as u64;
228 let max_ms = self.config.max_delay.as_millis() as u64;
229
230 let shift = self.attempt.min(62) as u64;
233
234 let exp_ms = base_ms.saturating_mul(1u64.checked_shl(shift as u32).unwrap_or(u64::MAX));
236 let capped_ms = exp_ms.min(max_ms);
237
238 Duration::from_millis(capped_ms)
239 }
240
241 fn add_jitter(&self, base: Duration) -> Duration {
246 if base.is_zero() {
247 return base;
248 }
249
250 let base_ms = base.as_millis() as u64;
251 let mut rng = rand::rng();
252
253 let jitter_ms = rng.random_range(0..=base_ms);
255
256 Duration::from_millis(jitter_ms)
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268
269 #[test]
270 fn test_default_config() {
271 let cfg = BackoffConfig::default();
272 assert_eq!(cfg.base_delay, Duration::from_millis(100));
273 assert_eq!(cfg.max_delay, Duration::from_secs(60));
274 assert_eq!(cfg.cooldown_period, Duration::from_secs(300));
275 assert_eq!(cfg.max_retries, 10);
276 assert_eq!(cfg.circuit_breaker_window, Duration::from_secs(600));
277 }
278
279 #[test]
280 fn test_builder_pattern() {
281 let cfg = BackoffConfig::new(Duration::from_millis(200), Duration::from_secs(30))
282 .with_cooldown(Duration::from_secs(120))
283 .with_circuit_breaker(5, Duration::from_secs(60));
284
285 assert_eq!(cfg.base_delay, Duration::from_millis(200));
286 assert_eq!(cfg.max_delay, Duration::from_secs(30));
287 assert_eq!(cfg.cooldown_period, Duration::from_secs(120));
288 assert_eq!(cfg.max_retries, 5);
289 assert_eq!(cfg.circuit_breaker_window, Duration::from_secs(60));
290 }
291
292 #[test]
293 fn test_without_circuit_breaker() {
294 let cfg = BackoffConfig::default().without_circuit_breaker();
295 assert_eq!(cfg.max_retries, 0);
296 }
297
298 #[test]
299 fn test_exponential_growth_is_capped() {
300 let cfg = BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(5))
301 .without_circuit_breaker();
302
303 let state = BackoffState::new(cfg.clone());
304
305 let mut s = state.clone();
311 s.attempt = 0;
312 let d = s.compute_exponential_delay();
313 assert_eq!(d, Duration::from_millis(100));
314
315 s.attempt = 5;
316 let d = s.compute_exponential_delay();
317 assert_eq!(d, Duration::from_millis(3200));
318
319 s.attempt = 10;
320 let d = s.compute_exponential_delay();
321 assert_eq!(d, Duration::from_secs(5)); }
323
324 #[test]
325 fn test_exponential_no_overflow_at_high_attempts() {
326 let cfg = BackoffConfig::default().without_circuit_breaker();
327 let mut state = BackoffState::new(cfg);
328 state.attempt = 100; let d = state.compute_exponential_delay();
330 assert_eq!(d, Duration::from_secs(60)); }
332
333 #[test]
334 fn test_jitter_stays_within_bounds() {
335 let cfg = BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(60))
336 .without_circuit_breaker();
337
338 let state = BackoffState::new(cfg);
339
340 for _ in 0..1000 {
342 let jittered = state.add_jitter(Duration::from_millis(1000));
343 assert!(jittered <= Duration::from_millis(1000));
344 }
345 }
346
347 #[test]
348 fn test_jitter_zero_base() {
349 let cfg = BackoffConfig::default();
350 let state = BackoffState::new(cfg);
351 let jittered = state.add_jitter(Duration::ZERO);
352 assert_eq!(jittered, Duration::ZERO);
353 }
354
355 #[test]
356 fn test_next_backoff_increments_attempt() {
357 let cfg = BackoffConfig::default().without_circuit_breaker();
358 let mut state = BackoffState::new(cfg);
359
360 assert_eq!(state.attempt(), 0);
361
362 let action = state.next_backoff();
363 assert!(matches!(action, BackoffAction::Retry(_)));
364 assert_eq!(state.attempt(), 1);
365
366 let action = state.next_backoff();
367 assert!(matches!(action, BackoffAction::Retry(_)));
368 assert_eq!(state.attempt(), 2);
369 }
370
371 #[test]
372 fn test_circuit_breaker_trips() {
373 let cfg = BackoffConfig::default().with_circuit_breaker(3, Duration::from_secs(600));
374
375 let mut state = BackoffState::new(cfg);
376
377 assert!(matches!(state.next_backoff(), BackoffAction::Retry(_)));
379 assert!(matches!(state.next_backoff(), BackoffAction::Retry(_)));
380
381 let action = state.next_backoff();
383 assert!(matches!(
384 action,
385 BackoffAction::CircuitOpen {
386 failures: 3,
387 max_retries: 3
388 }
389 ));
390 }
391
392 #[test]
393 fn test_reset_clears_state() {
394 let cfg = BackoffConfig::default().without_circuit_breaker();
395 let mut state = BackoffState::new(cfg);
396
397 state.next_backoff();
398 state.next_backoff();
399 assert_eq!(state.attempt(), 2);
400 assert_eq!(state.recent_failures(), 2);
401
402 state.reset();
403 assert_eq!(state.attempt(), 0);
404 assert_eq!(state.recent_failures(), 0);
405 }
406
407 #[test]
408 fn test_cooldown_resets_attempt() {
409 let cfg = BackoffConfig::default()
410 .with_cooldown(Duration::from_millis(50))
411 .without_circuit_breaker();
412
413 let mut state = BackoffState::new(cfg);
414
415 state.next_backoff();
417 state.next_backoff();
418 assert_eq!(state.attempt(), 2);
419
420 state.record_start();
422 state.last_start = Some(Instant::now() - Duration::from_millis(100));
424
425 state.maybe_reset_on_cooldown();
426 assert_eq!(state.attempt(), 0);
427 assert_eq!(state.recent_failures(), 0);
428 }
429
430 #[test]
431 fn test_cooldown_does_not_reset_if_too_early() {
432 let cfg = BackoffConfig::default()
433 .with_cooldown(Duration::from_secs(300))
434 .without_circuit_breaker();
435
436 let mut state = BackoffState::new(cfg);
437
438 state.next_backoff();
439 state.next_backoff();
440 state.record_start(); state.maybe_reset_on_cooldown();
443 assert_eq!(state.attempt(), 2); }
445
446 #[test]
447 fn test_backoff_delay_increases_monotonically_ignoring_jitter() {
448 let cfg = BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(60))
449 .without_circuit_breaker();
450
451 let mut state = BackoffState::new(cfg);
452
453 let delays: Vec<Duration> = (0..8)
455 .map(|_| {
456 let _ = state.next_backoff();
457 state.compute_exponential_delay()
458 })
459 .collect();
460
461 for window in delays.windows(2) {
463 assert!(window[1] >= window[0], "{:?} < {:?}", window[1], window[0]);
464 }
465 }
466
467 #[test]
468 fn test_attempt_saturates() {
469 let cfg = BackoffConfig::default().without_circuit_breaker();
470 let mut state = BackoffState::new(cfg);
471 state.attempt = u32::MAX;
472
473 let action = state.next_backoff();
474 assert!(matches!(action, BackoffAction::Retry(_)));
475 assert_eq!(state.attempt(), u32::MAX); }
477}