rustrade_supervisor/
backoff.rs1use std::time::{Duration, Instant};
22
23use rand::RngExt;
24
25#[derive(Debug, Clone)]
43pub struct BackoffConfig {
44 pub base_delay: Duration,
46
47 pub max_delay: Duration,
49
50 pub cooldown_period: Duration,
53
54 pub max_retries: u32,
57
58 pub circuit_breaker_window: Duration,
61}
62
63impl Default for BackoffConfig {
64 fn default() -> Self {
65 Self {
66 base_delay: Duration::from_millis(100),
67 max_delay: Duration::from_secs(60),
68 cooldown_period: Duration::from_secs(300),
69 max_retries: 10,
70 circuit_breaker_window: Duration::from_secs(600),
71 }
72 }
73}
74
75impl BackoffConfig {
76 pub fn new(base_delay: Duration, max_delay: Duration) -> Self {
78 Self {
79 base_delay,
80 max_delay,
81 ..Default::default()
82 }
83 }
84
85 pub fn with_cooldown(mut self, cooldown: Duration) -> Self {
87 self.cooldown_period = cooldown;
88 self
89 }
90
91 pub fn with_circuit_breaker(mut self, max_retries: u32, window: Duration) -> Self {
93 self.max_retries = max_retries;
94 self.circuit_breaker_window = window;
95 self
96 }
97
98 pub fn without_circuit_breaker(mut self) -> Self {
100 self.max_retries = 0;
101 self
102 }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum BackoffAction {
108 Retry(Duration),
110
111 CircuitOpen {
114 failures: u32,
116 max_retries: u32,
118 },
119}
120
121#[derive(Debug, Clone)]
126pub struct BackoffState {
127 config: BackoffConfig,
128
129 attempt: u32,
131
132 failure_timestamps: Vec<Instant>,
135
136 last_start: Option<Instant>,
138}
139
140impl BackoffState {
141 pub fn new(config: BackoffConfig) -> Self {
143 Self {
144 config,
145 attempt: 0,
146 failure_timestamps: Vec::new(),
147 last_start: None,
148 }
149 }
150
151 pub fn with_defaults() -> Self {
153 Self::new(BackoffConfig::default())
154 }
155
156 pub fn record_start(&mut self) {
160 self.last_start = Some(Instant::now());
161 }
162
163 pub fn maybe_reset_on_cooldown(&mut self) {
170 if let Some(start) = self.last_start
171 && start.elapsed() >= self.config.cooldown_period
172 {
173 tracing::info!(
174 cooldown_secs = self.config.cooldown_period.as_secs(),
175 "service ran longer than cooldown period, resetting backoff"
176 );
177 self.attempt = 0;
178 self.failure_timestamps.clear();
179 }
180 }
181
182 pub fn next_backoff(&mut self) -> BackoffAction {
184 let now = Instant::now();
185
186 self.failure_timestamps.push(now);
187 self.attempt = self.attempt.saturating_add(1);
188
189 if self.config.max_retries > 0 {
191 let window_start = now - self.config.circuit_breaker_window;
192 self.failure_timestamps.retain(|ts| *ts >= window_start);
193
194 if self.failure_timestamps.len() as u32 >= self.config.max_retries {
195 return BackoffAction::CircuitOpen {
196 failures: self.failure_timestamps.len() as u32,
197 max_retries: self.config.max_retries,
198 };
199 }
200 }
201
202 let exp_delay = self.compute_exponential_delay();
203 let jittered = self.add_jitter(exp_delay);
204 BackoffAction::Retry(jittered)
205 }
206
207 pub fn reset(&mut self) {
209 self.attempt = 0;
210 self.failure_timestamps.clear();
211 self.last_start = None;
212 }
213
214 pub fn attempt(&self) -> u32 {
216 self.attempt
217 }
218
219 pub fn recent_failures(&self) -> usize {
221 self.failure_timestamps.len()
222 }
223
224 fn compute_exponential_delay(&self) -> Duration {
228 let base_ms = self.config.base_delay.as_millis() as u64;
229 let max_ms = self.config.max_delay.as_millis() as u64;
230
231 let shift = self.attempt.min(62);
234
235 let exp_ms = base_ms.saturating_mul(1u64.checked_shl(shift).unwrap_or(u64::MAX));
236 let capped_ms = exp_ms.min(max_ms);
237
238 Duration::from_millis(capped_ms)
239 }
240
241 fn add_jitter(&self, base: Duration) -> Duration {
243 if base.is_zero() {
244 return base;
245 }
246 let base_ms = base.as_millis() as u64;
247 let mut rng = rand::rng();
248 let jitter_ms = rng.random_range(0..=base_ms);
249 Duration::from_millis(jitter_ms)
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[test]
258 fn test_default_config() {
259 let cfg = BackoffConfig::default();
260 assert_eq!(cfg.base_delay, Duration::from_millis(100));
261 assert_eq!(cfg.max_delay, Duration::from_secs(60));
262 assert_eq!(cfg.cooldown_period, Duration::from_secs(300));
263 assert_eq!(cfg.max_retries, 10);
264 assert_eq!(cfg.circuit_breaker_window, Duration::from_secs(600));
265 }
266
267 #[test]
268 fn test_builder_pattern() {
269 let cfg = BackoffConfig::new(Duration::from_millis(200), Duration::from_secs(30))
270 .with_cooldown(Duration::from_secs(120))
271 .with_circuit_breaker(5, Duration::from_secs(60));
272
273 assert_eq!(cfg.base_delay, Duration::from_millis(200));
274 assert_eq!(cfg.max_delay, Duration::from_secs(30));
275 assert_eq!(cfg.cooldown_period, Duration::from_secs(120));
276 assert_eq!(cfg.max_retries, 5);
277 assert_eq!(cfg.circuit_breaker_window, Duration::from_secs(60));
278 }
279
280 #[test]
281 fn test_without_circuit_breaker() {
282 let cfg = BackoffConfig::default().without_circuit_breaker();
283 assert_eq!(cfg.max_retries, 0);
284 }
285
286 #[test]
287 fn test_exponential_growth_is_capped() {
288 let cfg = BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(5))
289 .without_circuit_breaker();
290 let state = BackoffState::new(cfg);
291
292 let mut s = state.clone();
293 s.attempt = 0;
294 assert_eq!(s.compute_exponential_delay(), Duration::from_millis(100));
295
296 s.attempt = 5;
297 assert_eq!(s.compute_exponential_delay(), Duration::from_millis(3200));
298
299 s.attempt = 10;
300 assert_eq!(s.compute_exponential_delay(), Duration::from_secs(5)); }
302
303 #[test]
304 fn test_exponential_no_overflow_at_high_attempts() {
305 let cfg = BackoffConfig::default().without_circuit_breaker();
306 let mut state = BackoffState::new(cfg);
307 state.attempt = 100;
308 assert_eq!(state.compute_exponential_delay(), Duration::from_secs(60));
309 }
310
311 #[test]
312 fn test_jitter_stays_within_bounds() {
313 let cfg = BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(60))
314 .without_circuit_breaker();
315 let state = BackoffState::new(cfg);
316 for _ in 0..1000 {
317 let jittered = state.add_jitter(Duration::from_millis(1000));
318 assert!(jittered <= Duration::from_millis(1000));
319 }
320 }
321
322 #[test]
323 fn test_jitter_zero_base() {
324 let cfg = BackoffConfig::default();
325 let state = BackoffState::new(cfg);
326 assert_eq!(state.add_jitter(Duration::ZERO), Duration::ZERO);
327 }
328
329 #[test]
330 fn test_next_backoff_increments_attempt() {
331 let cfg = BackoffConfig::default().without_circuit_breaker();
332 let mut state = BackoffState::new(cfg);
333 assert_eq!(state.attempt(), 0);
334 assert!(matches!(state.next_backoff(), BackoffAction::Retry(_)));
335 assert_eq!(state.attempt(), 1);
336 assert!(matches!(state.next_backoff(), BackoffAction::Retry(_)));
337 assert_eq!(state.attempt(), 2);
338 }
339
340 #[test]
341 fn test_circuit_breaker_trips() {
342 let cfg = BackoffConfig::default().with_circuit_breaker(3, Duration::from_secs(600));
343 let mut state = BackoffState::new(cfg);
344
345 assert!(matches!(state.next_backoff(), BackoffAction::Retry(_)));
346 assert!(matches!(state.next_backoff(), BackoffAction::Retry(_)));
347
348 assert!(matches!(
349 state.next_backoff(),
350 BackoffAction::CircuitOpen {
351 failures: 3,
352 max_retries: 3
353 }
354 ));
355 }
356
357 #[test]
358 fn test_reset_clears_state() {
359 let cfg = BackoffConfig::default().without_circuit_breaker();
360 let mut state = BackoffState::new(cfg);
361 state.next_backoff();
362 state.next_backoff();
363 assert_eq!(state.attempt(), 2);
364 assert_eq!(state.recent_failures(), 2);
365
366 state.reset();
367 assert_eq!(state.attempt(), 0);
368 assert_eq!(state.recent_failures(), 0);
369 }
370
371 #[test]
372 fn test_cooldown_resets_attempt() {
373 let cfg = BackoffConfig::default()
374 .with_cooldown(Duration::from_millis(50))
375 .without_circuit_breaker();
376 let mut state = BackoffState::new(cfg);
377
378 state.next_backoff();
379 state.next_backoff();
380 assert_eq!(state.attempt(), 2);
381
382 state.last_start = Some(Instant::now() - Duration::from_millis(100));
384
385 state.maybe_reset_on_cooldown();
386 assert_eq!(state.attempt(), 0);
387 assert_eq!(state.recent_failures(), 0);
388 }
389
390 #[test]
391 fn test_cooldown_does_not_reset_if_too_early() {
392 let cfg = BackoffConfig::default()
393 .with_cooldown(Duration::from_secs(300))
394 .without_circuit_breaker();
395 let mut state = BackoffState::new(cfg);
396
397 state.next_backoff();
398 state.next_backoff();
399 state.record_start();
400
401 state.maybe_reset_on_cooldown();
402 assert_eq!(state.attempt(), 2);
403 }
404
405 #[test]
406 fn test_backoff_delay_increases_monotonically_ignoring_jitter() {
407 let cfg = BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(60))
408 .without_circuit_breaker();
409 let mut state = BackoffState::new(cfg);
410
411 let delays: Vec<Duration> = (0..8)
412 .map(|_| {
413 let _ = state.next_backoff();
414 state.compute_exponential_delay()
415 })
416 .collect();
417
418 for window in delays.windows(2) {
419 assert!(window[1] >= window[0], "{:?} < {:?}", window[1], window[0]);
420 }
421 }
422
423 #[test]
424 fn test_attempt_saturates() {
425 let cfg = BackoffConfig::default().without_circuit_breaker();
426 let mut state = BackoffState::new(cfg);
427 state.attempt = u32::MAX;
428
429 assert!(matches!(state.next_backoff(), BackoffAction::Retry(_)));
430 assert_eq!(state.attempt(), u32::MAX);
431 }
432}