Skip to main content

dag_executor/advanced/
circuit_breaker.rs

1//! A thread-safe circuit breaker.
2
3use parking_lot::Mutex;
4use std::time::{Duration, Instant};
5
6/// The three classic circuit-breaker states.
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum CircuitState {
9    /// Requests flow normally.
10    Closed,
11    /// Requests are rejected immediately.
12    Open,
13    /// A trial request is allowed to test recovery.
14    HalfOpen,
15}
16
17#[derive(Debug)]
18struct Inner {
19    state: CircuitState,
20    consecutive_failures: u32,
21    opened_at: Option<Instant>,
22}
23
24/// Isolates a failing dependency by short-circuiting calls once failures cross
25/// a threshold, then probing for recovery after a cooldown.
26///
27/// Lifecycle: `Closed` → (failures ≥ `failure_threshold`) → `Open` →
28/// (after `cooldown`) → `HalfOpen` → (`success_threshold` successes) → `Closed`,
29/// or back to `Open` on a failed probe.
30pub struct CircuitBreaker {
31    failure_threshold: u32,
32    success_threshold: u32,
33    cooldown: Duration,
34    inner: Mutex<Inner>,
35}
36
37impl CircuitBreaker {
38    /// Create a breaker.
39    ///
40    /// * `failure_threshold` — consecutive failures that trip the breaker open.
41    /// * `success_threshold` — consecutive successes in half-open that re-close it.
42    /// * `cooldown` — how long to stay open before allowing a probe.
43    pub fn new(failure_threshold: u32, success_threshold: u32, cooldown: Duration) -> Self {
44        CircuitBreaker {
45            failure_threshold: failure_threshold.max(1),
46            success_threshold: success_threshold.max(1),
47            cooldown,
48            inner: Mutex::new(Inner {
49                state: CircuitState::Closed,
50                consecutive_failures: 0,
51                opened_at: None,
52            }),
53        }
54    }
55
56    /// The current state, accounting for an elapsed cooldown (which transitions
57    /// `Open` → `HalfOpen` lazily on inspection).
58    pub fn state(&self) -> CircuitState {
59        let mut inner = self.inner.lock();
60        self.refresh(&mut inner);
61        inner.state
62    }
63
64    /// Whether a call should be permitted right now.
65    ///
66    /// Returns `true` in `Closed` and `HalfOpen` (allowing a probe), `false`
67    /// while `Open`.
68    pub fn allow_request(&self) -> bool {
69        let mut inner = self.inner.lock();
70        self.refresh(&mut inner);
71        !matches!(inner.state, CircuitState::Open)
72    }
73
74    /// Record a successful call.
75    pub fn record_success(&self) {
76        let mut inner = self.inner.lock();
77        match inner.state {
78            CircuitState::HalfOpen => {
79                // Reuse the failure counter as a success counter in half-open.
80                inner.consecutive_failures += 1;
81                if inner.consecutive_failures >= self.success_threshold {
82                    inner.state = CircuitState::Closed;
83                    inner.consecutive_failures = 0;
84                    inner.opened_at = None;
85                }
86            }
87            _ => {
88                inner.state = CircuitState::Closed;
89                inner.consecutive_failures = 0;
90                inner.opened_at = None;
91            }
92        }
93    }
94
95    /// Record a failed call.
96    pub fn record_failure(&self) {
97        let mut inner = self.inner.lock();
98        match inner.state {
99            CircuitState::HalfOpen => {
100                // A failed probe re-opens immediately.
101                inner.state = CircuitState::Open;
102                inner.consecutive_failures = 0;
103                inner.opened_at = Some(Instant::now());
104            }
105            _ => {
106                inner.consecutive_failures += 1;
107                if inner.consecutive_failures >= self.failure_threshold {
108                    inner.state = CircuitState::Open;
109                    inner.opened_at = Some(Instant::now());
110                }
111            }
112        }
113    }
114
115    /// Move an expired `Open` breaker into `HalfOpen`.
116    fn refresh(&self, inner: &mut Inner) {
117        if inner.state == CircuitState::Open {
118            if let Some(opened) = inner.opened_at {
119                if opened.elapsed() >= self.cooldown {
120                    inner.state = CircuitState::HalfOpen;
121                    // Reset counter so successes can accumulate toward closing.
122                    inner.consecutive_failures = 0;
123                }
124            }
125        }
126    }
127}
128
129impl Default for CircuitBreaker {
130    fn default() -> Self {
131        CircuitBreaker::new(5, 2, Duration::from_secs(30))
132    }
133}