Skip to main content

oxios_kernel/
circuit_breaker.rs

1//! Circuit breaker for LLM provider calls.
2//!
3//! States: Closed (normal) → Open (failing) → Half-Open (testing)
4//!
5//! The circuit breaker prevents cascading failures when the LLM provider
6//! is experiencing issues. When the circuit is open, requests are rejected
7//! immediately instead of waiting for timeouts.
8
9use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
10use std::time::{Duration, SystemTime};
11
12/// Circuit is allowing requests (normal operation).
13const STATE_CLOSED: u32 = 0;
14/// Circuit is rejecting requests (provider is failing).
15const STATE_OPEN: u32 = 1;
16/// Circuit is allowing one request to test if provider is healthy.
17const STATE_HALF_OPEN: u32 = 2;
18
19/// A simple 3-state circuit breaker for protecting against cascading failures.
20///
21/// # States
22/// - **Closed**: Normal operation. All requests pass through.
23/// - **Open**: Provider is failing. Requests are rejected immediately.
24/// - **Half-Open**: Testing if provider recovered. One request is allowed through.
25///
26/// # Transitions
27/// - Closed → Open: After `threshold` consecutive failures
28/// - Open → Half-Open: After `timeout` seconds have passed
29/// - Half-Open → Closed: On success
30/// - Half-Open → Open: On failure
31pub struct CircuitBreaker {
32    state: AtomicU32,
33    failure_count: AtomicU32,
34    last_failure_ts: AtomicU64,
35    /// Atomic flag to ensure only one request passes through in half-open state.
36    half_open_probe_sent: AtomicBool,
37    threshold: u32,
38    timeout: Duration,
39}
40
41impl CircuitBreaker {
42    /// Create a new circuit breaker.
43    ///
44    /// - `threshold`: Number of consecutive failures before opening the circuit.
45    /// - `timeout_secs`: Seconds to wait before attempting to close the circuit.
46    pub fn new(threshold: u32, timeout_secs: u64) -> Self {
47        Self {
48            state: AtomicU32::new(STATE_CLOSED),
49            failure_count: AtomicU32::new(0),
50            last_failure_ts: AtomicU64::new(0),
51            half_open_probe_sent: AtomicBool::new(false),
52            threshold,
53            timeout: Duration::from_secs(timeout_secs),
54        }
55    }
56
57    /// Check if a request is allowed through the circuit.
58    ///
59    /// Returns `true` if the circuit is closed or half-open (with probe gate).
60    /// Returns `false` if the circuit is open or half-open probe already sent.
61    pub fn is_allowed(&self) -> bool {
62        let state = self.state.load(Ordering::Acquire);
63
64        match state {
65            STATE_CLOSED => true,
66            STATE_OPEN
67                // Check if enough time has passed to attempt a reset.
68                if self.should_attempt_reset() => {
69                    // Atomically transition to half-open. Only one caller wins.
70                    match self.state.compare_exchange(
71                        STATE_OPEN,
72                        STATE_HALF_OPEN,
73                        Ordering::AcqRel,
74                        Ordering::Acquire,
75                    ) {
76                        Ok(_) => {
77                            // The winner claims the single probe slot.
78                            self.half_open_probe_sent.store(true, Ordering::Release);
79                            true
80                        }
81                        Err(_) => {
82                            // Lost the race — state already changed by another thread.
83                            // Fall through to the half-open check below by re-reading.
84                            self.half_open_probe_sent
85                                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
86                                .is_ok()
87                        }
88                    }
89                }
90            STATE_HALF_OPEN => {
91                // Only allow a single probe request through in half-open state.
92                // compare_exchange ensures only one caller wins the race.
93                self.half_open_probe_sent
94                    .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
95                    .is_ok()
96            }
97            _ => false,
98        }
99    }
100
101    fn should_attempt_reset(&self) -> bool {
102        let last_ts_ms = self.last_failure_ts.load(Ordering::Acquire);
103        if last_ts_ms == 0 {
104            return true;
105        }
106        let now = SystemTime::now()
107            .duration_since(SystemTime::UNIX_EPOCH)
108            .unwrap_or_default();
109        let last = Duration::from_millis(last_ts_ms);
110        let elapsed = now.saturating_sub(last);
111        elapsed >= self.timeout
112    }
113
114    /// Record a successful call. Closes the circuit on success.
115    pub fn record_success(&self) {
116        self.failure_count.store(0, Ordering::Release);
117        self.state.store(STATE_CLOSED, Ordering::Release);
118        self.half_open_probe_sent.store(false, Ordering::Release);
119        crate::metrics::get_metrics()
120            .llm_circuit_breaker_state
121            .set(0.0);
122    }
123
124    /// Record a failed call. Opens the circuit if the failure threshold is exceeded.
125    pub fn record_failure(&self) {
126        let failures = self.failure_count.fetch_add(1, Ordering::AcqRel) + 1;
127        let now = SystemTime::now()
128            .duration_since(SystemTime::UNIX_EPOCH)
129            .map(|d| d.as_millis() as u64)
130            .unwrap_or(0);
131        self.last_failure_ts.store(now, Ordering::Release);
132
133        if failures >= self.threshold {
134            self.state.store(STATE_OPEN, Ordering::Release);
135            self.half_open_probe_sent.store(false, Ordering::Release);
136            tracing::warn!(
137                failures,
138                threshold = self.threshold,
139                "Circuit breaker OPEN — too many failures"
140            );
141            crate::metrics::get_metrics()
142                .llm_circuit_breaker_state
143                .set(1.0);
144        }
145    }
146
147    /// Get the current state as a string for debugging/metrics.
148    pub fn state(&self) -> &'static str {
149        match self.state.load(Ordering::Acquire) {
150            STATE_CLOSED => "closed",
151            STATE_OPEN => "open",
152            STATE_HALF_OPEN => "half_open",
153            _ => "unknown",
154        }
155    }
156
157    /// Get the current failure count.
158    #[allow(dead_code)]
159    pub fn failure_count(&self) -> u32 {
160        self.failure_count.load(Ordering::Acquire)
161    }
162}
163
164impl Default for CircuitBreaker {
165    fn default() -> Self {
166        // 5 failures opens the circuit, 30 second timeout before attempting reset
167        Self::new(5, 30)
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn test_circuit_starts_closed() {
177        let cb = CircuitBreaker::default();
178        assert!(cb.is_allowed());
179        assert_eq!(cb.state(), "closed");
180    }
181
182    #[test]
183    fn test_circuit_opens_after_threshold_failures() {
184        let cb = CircuitBreaker::new(3, 60);
185        for _ in 0..2 {
186            cb.record_failure();
187        }
188        assert!(cb.is_allowed()); // still closed
189
190        cb.record_failure(); // 3rd failure
191        assert!(!cb.is_allowed()); // now open
192        assert_eq!(cb.state(), "open");
193    }
194
195    #[test]
196    fn test_circuit_closes_on_success() {
197        let cb = CircuitBreaker::default();
198        cb.record_failure();
199        cb.record_failure();
200        cb.record_failure();
201        cb.record_failure();
202        cb.record_failure(); // circuit is open
203
204        cb.record_success();
205        assert!(cb.is_allowed());
206        assert_eq!(cb.state(), "closed");
207    }
208
209    #[test]
210    fn test_half_open_allows_only_one_probe() {
211        let cb = CircuitBreaker::new(1, 1); // opens on 1 failure, 1s timeout
212        cb.record_failure(); // circuit is now open
213        assert!(!cb.is_allowed()); // still open — timeout hasn't passed
214
215        // Wait for timeout to pass
216        std::thread::sleep(std::time::Duration::from_millis(1100));
217        // First call transitions OPEN → HALF_OPEN and claims the probe slot.
218        assert!(cb.is_allowed()); // first probe allowed
219        assert!(!cb.is_allowed()); // second probe blocked
220        assert!(!cb.is_allowed()); // third probe blocked
221        assert_eq!(cb.state(), "half_open");
222    }
223
224    #[test]
225    fn test_half_open_opens_on_failure() {
226        let cb = CircuitBreaker::new(1, 1);
227        cb.record_failure(); // open
228        std::thread::sleep(std::time::Duration::from_millis(1100));
229        assert!(cb.is_allowed()); // half-open probe
230
231        cb.record_failure(); // probe failed → back to open
232        assert_eq!(cb.state(), "open");
233        // New timeout hasn't elapsed yet
234        assert!(!cb.is_allowed()); // blocked
235    }
236
237    #[test]
238    fn test_half_open_closes_on_success() {
239        let cb = CircuitBreaker::new(1, 1);
240        cb.record_failure(); // open
241        std::thread::sleep(std::time::Duration::from_millis(1100));
242        assert!(cb.is_allowed()); // half-open probe
243
244        cb.record_success(); // probe succeeded → closed
245        assert_eq!(cb.state(), "closed");
246        assert!(cb.is_allowed()); // all requests allowed again
247    }
248}