Skip to main content

jflow_core/supervisor/
backoff.rs

1//! Exponential backoff with jitter for supervisor restart strategies.
2//!
3//! Implements the backoff algorithm described in the Janus Supervisor
4//! Architecture Refactor document:
5//!
6//! ```text
7//! delay = min(cap, base * 2^attempt) + jitter
8//! ```
9//!
10//! Where `jitter` is a random value in `[0, base * 2^attempt)` to prevent
11//! thundering-herd scenarios when multiple services restart simultaneously
12//! after a shared outage.
13//!
14//! # Features
15//!
16//! - **Exponential growth** with configurable base and cap
17//! - **Full jitter** to desynchronize concurrent restarts
18//! - **Cooldown reset**: if a service runs successfully for a configurable
19//!   period, the attempt counter resets to zero so occasional rare failures
20//!   don't accumulate toward a long backoff
21//! - **Circuit breaker**: if a service fails N times within a window T,
22//!   the supervisor can escalate (e.g., shut down the node, fire an alert)
23
24use std::time::{Duration, Instant};
25
26use rand::RngExt;
27
28/// Configuration for the exponential backoff strategy.
29///
30/// All fields have sensible defaults for a production trading system.
31#[derive(Debug, Clone)]
32pub struct BackoffConfig {
33    /// Initial delay before the first retry (default: 100ms).
34    pub base_delay: Duration,
35
36    /// Maximum delay between retries (default: 60s).
37    pub max_delay: Duration,
38
39    /// If a service runs successfully for at least this long, the attempt
40    /// counter resets to zero (default: 5 minutes).
41    pub cooldown_period: Duration,
42
43    /// Maximum consecutive failures before the circuit breaker trips.
44    /// Set to `0` to disable the circuit breaker (default: 10).
45    pub max_retries: u32,
46
47    /// Window within which `max_retries` failures trigger the circuit
48    /// breaker (default: 10 minutes). Only relevant when `max_retries > 0`.
49    pub circuit_breaker_window: Duration,
50}
51
52impl Default for BackoffConfig {
53    fn default() -> Self {
54        Self {
55            base_delay: Duration::from_millis(100),
56            max_delay: Duration::from_secs(60),
57            cooldown_period: Duration::from_secs(300), // 5 minutes
58            max_retries: 10,
59            circuit_breaker_window: Duration::from_secs(600), // 10 minutes
60        }
61    }
62}
63
64impl BackoffConfig {
65    /// Create a new config with the given base and max delays.
66    pub fn new(base_delay: Duration, max_delay: Duration) -> Self {
67        Self {
68            base_delay,
69            max_delay,
70            ..Default::default()
71        }
72    }
73
74    /// Builder: set the cooldown period.
75    pub fn with_cooldown(mut self, cooldown: Duration) -> Self {
76        self.cooldown_period = cooldown;
77        self
78    }
79
80    /// Builder: set the circuit breaker parameters.
81    pub fn with_circuit_breaker(mut self, max_retries: u32, window: Duration) -> Self {
82        self.max_retries = max_retries;
83        self.circuit_breaker_window = window;
84        self
85    }
86
87    /// Builder: disable the circuit breaker entirely.
88    pub fn without_circuit_breaker(mut self) -> Self {
89        self.max_retries = 0;
90        self
91    }
92}
93
94/// Result of computing the next backoff delay.
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub enum BackoffAction {
97    /// Wait for the given duration and then retry.
98    Retry(Duration),
99
100    /// The circuit breaker has tripped — too many failures within the
101    /// configured window. The supervisor should escalate (alert, shut
102    /// down, etc.) rather than retry.
103    CircuitOpen {
104        /// Total failures observed within the window.
105        failures: u32,
106        /// The configured maximum before tripping.
107        max_retries: u32,
108    },
109}
110
111/// Mutable state tracker for an individual service's backoff history.
112///
113/// One `BackoffState` instance is maintained per supervised service inside
114/// the [`JanusSupervisor`](super::JanusSupervisor).
115#[derive(Debug, Clone)]
116pub struct BackoffState {
117    config: BackoffConfig,
118
119    /// Number of consecutive failures (resets on cooldown).
120    attempt: u32,
121
122    /// Timestamps of recent failures (within the circuit-breaker window).
123    /// Older entries are pruned on each call to [`next_backoff`].
124    failure_timestamps: Vec<Instant>,
125
126    /// When the service last started successfully. Used for cooldown logic.
127    last_start: Option<Instant>,
128}
129
130impl BackoffState {
131    /// Create a fresh backoff state with the given configuration.
132    pub fn new(config: BackoffConfig) -> Self {
133        Self {
134            config,
135            attempt: 0,
136            failure_timestamps: Vec::new(),
137            last_start: None,
138        }
139    }
140
141    /// Create with default configuration.
142    pub fn with_defaults() -> Self {
143        Self::new(BackoffConfig::default())
144    }
145
146    /// Record that the service has started (or restarted) successfully.
147    ///
148    /// Called by the supervisor when a service's `run()` method is entered.
149    pub fn record_start(&mut self) {
150        self.last_start = Some(Instant::now());
151    }
152
153    /// Check if the cooldown period has elapsed since the last start,
154    /// and if so, reset the attempt counter.
155    ///
156    /// Called by the supervisor when a service exits so that a service
157    /// which ran healthily for a long time doesn't carry old failure
158    /// history into its next restart.
159    pub fn maybe_reset_on_cooldown(&mut self) {
160        if let Some(start) = self.last_start
161            && start.elapsed() >= self.config.cooldown_period
162        {
163            tracing::info!(
164                cooldown_secs = self.config.cooldown_period.as_secs(),
165                "Service ran for longer than cooldown period, resetting backoff"
166            );
167            self.attempt = 0;
168            self.failure_timestamps.clear();
169        }
170    }
171
172    /// Record a failure and compute the next action.
173    ///
174    /// Returns [`BackoffAction::Retry`] with the jittered delay, or
175    /// [`BackoffAction::CircuitOpen`] if the circuit breaker has tripped.
176    pub fn next_backoff(&mut self) -> BackoffAction {
177        let now = Instant::now();
178
179        // Record this failure
180        self.failure_timestamps.push(now);
181        self.attempt = self.attempt.saturating_add(1);
182
183        // Prune old failure timestamps outside the circuit-breaker window
184        if self.config.max_retries > 0 {
185            let window_start = now - self.config.circuit_breaker_window;
186            self.failure_timestamps.retain(|ts| *ts >= window_start);
187
188            // Check circuit breaker
189            if self.failure_timestamps.len() as u32 >= self.config.max_retries {
190                return BackoffAction::CircuitOpen {
191                    failures: self.failure_timestamps.len() as u32,
192                    max_retries: self.config.max_retries,
193                };
194            }
195        }
196
197        // Compute exponential delay: base * 2^attempt, capped at max_delay
198        let exp_delay = self.compute_exponential_delay();
199
200        // Add full jitter: uniform random in [0, exp_delay)
201        let jittered = self.add_jitter(exp_delay);
202
203        BackoffAction::Retry(jittered)
204    }
205
206    /// Reset all state (attempt counter, failure history, start time).
207    pub fn reset(&mut self) {
208        self.attempt = 0;
209        self.failure_timestamps.clear();
210        self.last_start = None;
211    }
212
213    /// Current attempt number (consecutive failures since last reset).
214    pub fn attempt(&self) -> u32 {
215        self.attempt
216    }
217
218    /// Number of failures recorded within the circuit-breaker window.
219    pub fn recent_failures(&self) -> usize {
220        self.failure_timestamps.len()
221    }
222
223    // ── Internal helpers ──────────────────────────────────────────────
224
225    /// Compute `min(cap, base * 2^attempt)` without overflow.
226    fn compute_exponential_delay(&self) -> Duration {
227        let base_ms = self.config.base_delay.as_millis() as u64;
228        let max_ms = self.config.max_delay.as_millis() as u64;
229
230        // Prevent overflow: if attempt >= 63 the shift would overflow u64,
231        // so we clamp early.
232        let shift = self.attempt.min(62) as u64;
233
234        // Saturating multiplication: base_ms * 2^shift, capped at max_ms
235        let exp_ms = base_ms.saturating_mul(1u64.checked_shl(shift as u32).unwrap_or(u64::MAX));
236        let capped_ms = exp_ms.min(max_ms);
237
238        Duration::from_millis(capped_ms)
239    }
240
241    /// Add full jitter: returns a duration in `[0, delay + delay_jitter_range)`.
242    ///
243    /// Uses "full jitter" strategy as recommended in the architecture doc
244    /// to desynchronize concurrent restarts after a shared outage.
245    fn add_jitter(&self, base: Duration) -> Duration {
246        if base.is_zero() {
247            return base;
248        }
249
250        let base_ms = base.as_millis() as u64;
251        let mut rng = rand::rng();
252
253        // Full jitter: pick a random value in [0, base_ms]
254        let jitter_ms = rng.random_range(0..=base_ms);
255
256        // The actual delay is a random value in [0, 2*base_ms] effectively,
257        // but we use the "full jitter" approach from AWS:
258        // delay = random_between(0, min(cap, base * 2^attempt))
259        // This gives better spread than "equal jitter" or "decorrelated jitter"
260        // for our use case of independent services.
261        Duration::from_millis(jitter_ms)
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268
269    #[test]
270    fn test_default_config() {
271        let cfg = BackoffConfig::default();
272        assert_eq!(cfg.base_delay, Duration::from_millis(100));
273        assert_eq!(cfg.max_delay, Duration::from_secs(60));
274        assert_eq!(cfg.cooldown_period, Duration::from_secs(300));
275        assert_eq!(cfg.max_retries, 10);
276        assert_eq!(cfg.circuit_breaker_window, Duration::from_secs(600));
277    }
278
279    #[test]
280    fn test_builder_pattern() {
281        let cfg = BackoffConfig::new(Duration::from_millis(200), Duration::from_secs(30))
282            .with_cooldown(Duration::from_secs(120))
283            .with_circuit_breaker(5, Duration::from_secs(60));
284
285        assert_eq!(cfg.base_delay, Duration::from_millis(200));
286        assert_eq!(cfg.max_delay, Duration::from_secs(30));
287        assert_eq!(cfg.cooldown_period, Duration::from_secs(120));
288        assert_eq!(cfg.max_retries, 5);
289        assert_eq!(cfg.circuit_breaker_window, Duration::from_secs(60));
290    }
291
292    #[test]
293    fn test_without_circuit_breaker() {
294        let cfg = BackoffConfig::default().without_circuit_breaker();
295        assert_eq!(cfg.max_retries, 0);
296    }
297
298    #[test]
299    fn test_exponential_growth_is_capped() {
300        let cfg = BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(5))
301            .without_circuit_breaker();
302
303        let state = BackoffState::new(cfg.clone());
304
305        // Attempt 0 => base * 2^0 = 100ms (before jitter)
306        // Attempt 5 => base * 2^5 = 3200ms
307        // Attempt 10 => base * 2^10 = 102400ms > 5000ms cap => 5000ms
308
309        // Verify the internal exponential calculation (no jitter)
310        let mut s = state.clone();
311        s.attempt = 0;
312        let d = s.compute_exponential_delay();
313        assert_eq!(d, Duration::from_millis(100));
314
315        s.attempt = 5;
316        let d = s.compute_exponential_delay();
317        assert_eq!(d, Duration::from_millis(3200));
318
319        s.attempt = 10;
320        let d = s.compute_exponential_delay();
321        assert_eq!(d, Duration::from_secs(5)); // capped
322    }
323
324    #[test]
325    fn test_exponential_no_overflow_at_high_attempts() {
326        let cfg = BackoffConfig::default().without_circuit_breaker();
327        let mut state = BackoffState::new(cfg);
328        state.attempt = 100; // Would overflow without clamping
329        let d = state.compute_exponential_delay();
330        assert_eq!(d, Duration::from_secs(60)); // Should be capped at max_delay
331    }
332
333    #[test]
334    fn test_jitter_stays_within_bounds() {
335        let cfg = BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(60))
336            .without_circuit_breaker();
337
338        let state = BackoffState::new(cfg);
339
340        // Run many iterations to statistically verify jitter bounds
341        for _ in 0..1000 {
342            let jittered = state.add_jitter(Duration::from_millis(1000));
343            assert!(jittered <= Duration::from_millis(1000));
344        }
345    }
346
347    #[test]
348    fn test_jitter_zero_base() {
349        let cfg = BackoffConfig::default();
350        let state = BackoffState::new(cfg);
351        let jittered = state.add_jitter(Duration::ZERO);
352        assert_eq!(jittered, Duration::ZERO);
353    }
354
355    #[test]
356    fn test_next_backoff_increments_attempt() {
357        let cfg = BackoffConfig::default().without_circuit_breaker();
358        let mut state = BackoffState::new(cfg);
359
360        assert_eq!(state.attempt(), 0);
361
362        let action = state.next_backoff();
363        assert!(matches!(action, BackoffAction::Retry(_)));
364        assert_eq!(state.attempt(), 1);
365
366        let action = state.next_backoff();
367        assert!(matches!(action, BackoffAction::Retry(_)));
368        assert_eq!(state.attempt(), 2);
369    }
370
371    #[test]
372    fn test_circuit_breaker_trips() {
373        let cfg = BackoffConfig::default().with_circuit_breaker(3, Duration::from_secs(600));
374
375        let mut state = BackoffState::new(cfg);
376
377        // First two failures should retry
378        assert!(matches!(state.next_backoff(), BackoffAction::Retry(_)));
379        assert!(matches!(state.next_backoff(), BackoffAction::Retry(_)));
380
381        // Third failure should trip the circuit breaker
382        let action = state.next_backoff();
383        assert!(matches!(
384            action,
385            BackoffAction::CircuitOpen {
386                failures: 3,
387                max_retries: 3
388            }
389        ));
390    }
391
392    #[test]
393    fn test_reset_clears_state() {
394        let cfg = BackoffConfig::default().without_circuit_breaker();
395        let mut state = BackoffState::new(cfg);
396
397        state.next_backoff();
398        state.next_backoff();
399        assert_eq!(state.attempt(), 2);
400        assert_eq!(state.recent_failures(), 2);
401
402        state.reset();
403        assert_eq!(state.attempt(), 0);
404        assert_eq!(state.recent_failures(), 0);
405    }
406
407    #[test]
408    fn test_cooldown_resets_attempt() {
409        let cfg = BackoffConfig::default()
410            .with_cooldown(Duration::from_millis(50))
411            .without_circuit_breaker();
412
413        let mut state = BackoffState::new(cfg);
414
415        // Simulate some failures
416        state.next_backoff();
417        state.next_backoff();
418        assert_eq!(state.attempt(), 2);
419
420        // Record start and sleep past cooldown
421        state.record_start();
422        // We can't sleep in a non-async test, but we can manipulate last_start
423        state.last_start = Some(Instant::now() - Duration::from_millis(100));
424
425        state.maybe_reset_on_cooldown();
426        assert_eq!(state.attempt(), 0);
427        assert_eq!(state.recent_failures(), 0);
428    }
429
430    #[test]
431    fn test_cooldown_does_not_reset_if_too_early() {
432        let cfg = BackoffConfig::default()
433            .with_cooldown(Duration::from_secs(300))
434            .without_circuit_breaker();
435
436        let mut state = BackoffState::new(cfg);
437
438        state.next_backoff();
439        state.next_backoff();
440        state.record_start(); // Just started — nowhere near 5 min
441
442        state.maybe_reset_on_cooldown();
443        assert_eq!(state.attempt(), 2); // Not reset
444    }
445
446    #[test]
447    fn test_backoff_delay_increases_monotonically_ignoring_jitter() {
448        let cfg = BackoffConfig::new(Duration::from_millis(100), Duration::from_secs(60))
449            .without_circuit_breaker();
450
451        let mut state = BackoffState::new(cfg);
452
453        // Verify the exponential base (before jitter) grows
454        let delays: Vec<Duration> = (0..8)
455            .map(|_| {
456                let _ = state.next_backoff();
457                state.compute_exponential_delay()
458            })
459            .collect();
460
461        // Each delay should be >= the previous one (monotonic)
462        for window in delays.windows(2) {
463            assert!(window[1] >= window[0], "{:?} < {:?}", window[1], window[0]);
464        }
465    }
466
467    #[test]
468    fn test_attempt_saturates() {
469        let cfg = BackoffConfig::default().without_circuit_breaker();
470        let mut state = BackoffState::new(cfg);
471        state.attempt = u32::MAX;
472
473        let action = state.next_backoff();
474        assert!(matches!(action, BackoffAction::Retry(_)));
475        assert_eq!(state.attempt(), u32::MAX); // Saturated, didn't wrap
476    }
477}