Skip to main content

nexo_resilience/
lib.rs

1//! Shared fault-tolerance primitives.
2//!
3//! [`CircuitBreaker`] guards calls to flaky external dependencies (LLM APIs,
4//! browser CDP, message broker). Three states — `Closed`, `Open`, `HalfOpen` —
5//! with exponential backoff and automatic probing.
6//!
7//! # Example
8//!
9//! ```
10//! use nexo_resilience::{CircuitBreaker, CircuitBreakerConfig};
11//!
12//! let cb = CircuitBreaker::new("github-api", CircuitBreakerConfig::default());
13//! assert!(cb.allow());
14//! cb.on_success();
15//! ```
16
17#![deny(missing_docs)]
18
19use std::sync::Mutex;
20use std::time::{Duration, Instant};
21
22/// Tunables for [`CircuitBreaker`]. Defaults match production
23/// posture for typical external API guards (5 failures → open,
24/// 2 successes → close, 10s initial backoff capped at 2 min).
25///
26/// Intentionally **not** `#[non_exhaustive]`: this struct is
27/// caller-populated and external callers must be able to build
28/// it via struct literal. Field additions are semver-major.
29#[derive(Debug, Clone)]
30pub struct CircuitBreakerConfig {
31    /// Consecutive failures while `Closed` before tripping `Open`.
32    pub failure_threshold: u32,
33    /// Consecutive successes while `HalfOpen` before closing.
34    pub success_threshold: u32,
35    /// First `Open` window after a clean → tripped transition.
36    pub initial_backoff: Duration,
37    /// Cap on the doubled backoff after repeated `HalfOpen → Open`
38    /// failures. Stops runaway exponential growth.
39    pub max_backoff: Duration,
40}
41
42impl Default for CircuitBreakerConfig {
43    fn default() -> Self {
44        Self {
45            failure_threshold: 5,
46            success_threshold: 2,
47            initial_backoff: Duration::from_secs(10),
48            max_backoff: Duration::from_secs(120),
49        }
50    }
51}
52
53#[derive(Debug)]
54enum State {
55    Closed {
56        consecutive_failures: u32,
57    },
58    Open {
59        until: Instant,
60        backoff: Duration,
61    },
62    HalfOpen {
63        consecutive_successes: u32,
64        backoff: Duration,
65    },
66}
67
68/// Outcome of a [`CircuitBreaker::call`] invocation. Either the
69/// breaker rejected the call without invoking the closure
70/// ([`CircuitError::Open`]) or the closure ran and returned its
71/// own typed error ([`CircuitError::Inner`]).
72///
73/// Intentionally **not** `#[non_exhaustive]`: this is a utility
74/// error wrapper that downstream callers pattern-match
75/// exhaustively. Adding a third variant is a deliberate semver
76/// signal that the breaker now has a new short-circuit reason
77/// callers must handle.
78#[derive(Debug, thiserror::Error)]
79pub enum CircuitError<E> {
80    /// The breaker is currently `Open`; the closure was not run.
81    /// The argument is the breaker's `name` for log correlation.
82    #[error("circuit breaker `{0}` is open")]
83    Open(String),
84
85    /// The closure executed and returned its own error.
86    #[error(transparent)]
87    Inner(E),
88}
89
90/// State machine guarding one external dependency.
91///
92/// Cheap to clone via `Arc`; methods are `&self` so a single
93/// instance can be shared across tasks. Internally synchronised
94/// via `Mutex`, with poison recovery — a panic elsewhere in the
95/// process can never leave the breaker permanently locked out.
96pub struct CircuitBreaker {
97    name: String,
98    config: CircuitBreakerConfig,
99    state: Mutex<State>,
100}
101
102/// Acquire the mutex, recovering from poisoning rather than cascading
103/// the panic. A resilience primitive must not itself become a single
104/// point of failure — if a panic somewhere else (tracing, allocator)
105/// poisoned the lock during a state transition, we'd rather keep the
106/// breaker running on the (still-valid) stored state than crash every
107/// subsequent call. The state enum itself is a simple value type with
108/// no partial-write corruption risk, so recovering is safe.
109fn lock_state(m: &Mutex<State>) -> std::sync::MutexGuard<'_, State> {
110    m.lock().unwrap_or_else(|poisoned| {
111        tracing::warn!("circuit breaker mutex was poisoned — recovering inner state");
112        poisoned.into_inner()
113    })
114}
115
116impl CircuitBreaker {
117    /// Build a fresh breaker in `Closed` state.
118    pub fn new(name: impl Into<String>, config: CircuitBreakerConfig) -> Self {
119        Self {
120            name: name.into(),
121            config,
122            state: Mutex::new(State::Closed {
123                consecutive_failures: 0,
124            }),
125        }
126    }
127
128    /// Operator-visible identifier used in tracing logs.
129    pub fn name(&self) -> &str {
130        &self.name
131    }
132
133    /// Returns true if the breaker currently rejects calls.
134    pub fn is_open(&self) -> bool {
135        let state = lock_state(&self.state);
136        matches!(&*state, State::Open { until, .. } if Instant::now() < *until)
137    }
138
139    /// Check whether a call may proceed. If the breaker is `Open` and the
140    /// backoff has elapsed, transitions to `HalfOpen` and returns `true` so
141    /// the caller may probe.
142    pub fn allow(&self) -> bool {
143        let mut state = lock_state(&self.state);
144        match &*state {
145            State::Closed { .. } | State::HalfOpen { .. } => true,
146            State::Open { until, backoff } => {
147                if Instant::now() >= *until {
148                    let backoff = *backoff;
149                    *state = State::HalfOpen {
150                        consecutive_successes: 0,
151                        backoff,
152                    };
153                    tracing::info!(name = %self.name, "circuit breaker: open → half-open");
154                    true
155                } else {
156                    false
157                }
158            }
159        }
160    }
161
162    /// Record a successful call. While `HalfOpen`, transitions to
163    /// `Closed` after `success_threshold` consecutive successes.
164    pub fn on_success(&self) {
165        let mut state = lock_state(&self.state);
166        match &mut *state {
167            State::Closed {
168                consecutive_failures,
169            } => {
170                *consecutive_failures = 0;
171            }
172            State::HalfOpen {
173                consecutive_successes,
174                ..
175            } => {
176                *consecutive_successes += 1;
177                if *consecutive_successes >= self.config.success_threshold {
178                    tracing::info!(name = %self.name, "circuit breaker: half-open → closed");
179                    *state = State::Closed {
180                        consecutive_failures: 0,
181                    };
182                }
183            }
184            State::Open { .. } => {}
185        }
186    }
187
188    /// Record a failed call. While `Closed`, increments the
189    /// failure counter and trips `Open` once the threshold is
190    /// reached. While `HalfOpen`, immediately reopens with a
191    /// doubled backoff. Calls while already `Open` extend the
192    /// backoff (defense against external floods after trip).
193    pub fn on_failure(&self) {
194        let mut state = lock_state(&self.state);
195        match &*state {
196            State::Closed {
197                consecutive_failures,
198            } => {
199                let next = consecutive_failures + 1;
200                if next >= self.config.failure_threshold {
201                    let backoff = self.config.initial_backoff;
202                    tracing::warn!(name = %self.name, failures = next, "circuit breaker: closed → open");
203                    *state = State::Open {
204                        until: Instant::now() + backoff,
205                        backoff,
206                    };
207                } else {
208                    *state = State::Closed {
209                        consecutive_failures: next,
210                    };
211                }
212            }
213            State::HalfOpen { backoff, .. } => {
214                let new_backoff = (*backoff * 2).min(self.config.max_backoff);
215                tracing::warn!(name = %self.name, backoff_secs = new_backoff.as_secs(), "circuit breaker: half-open → open (probe failed)");
216                *state = State::Open {
217                    until: Instant::now() + new_backoff,
218                    backoff: new_backoff,
219                };
220            }
221            State::Open { backoff, .. } => {
222                let new_backoff = (*backoff * 2).min(self.config.max_backoff);
223                *state = State::Open {
224                    until: Instant::now() + new_backoff,
225                    backoff: new_backoff,
226                };
227            }
228        }
229    }
230
231    /// Force the breaker into `Open` with the initial backoff. Used when
232    /// an out-of-band signal (e.g. broker disconnect) indicates the
233    /// dependency is down regardless of per-call failures.
234    pub fn trip(&self) {
235        let mut state = lock_state(&self.state);
236        let backoff = match &*state {
237            State::Open { backoff, .. } | State::HalfOpen { backoff, .. } => {
238                (*backoff * 2).min(self.config.max_backoff)
239            }
240            State::Closed { .. } => self.config.initial_backoff,
241        };
242        tracing::warn!(name = %self.name, backoff_secs = backoff.as_secs(), "circuit breaker tripped");
243        *state = State::Open {
244            until: Instant::now() + backoff,
245            backoff,
246        };
247    }
248
249    /// Force the breaker back to `Closed` with zero failures. Used when an
250    /// out-of-band signal confirms the dependency is healthy.
251    pub fn reset(&self) {
252        let mut state = lock_state(&self.state);
253        tracing::info!(name = %self.name, "circuit breaker reset");
254        *state = State::Closed {
255            consecutive_failures: 0,
256        };
257    }
258
259    /// Convenience wrapper: run `f`, record success/failure automatically,
260    /// and short-circuit with `CircuitError::Open` when the breaker rejects.
261    pub async fn call<F, Fut, T, E>(&self, f: F) -> Result<T, CircuitError<E>>
262    where
263        F: FnOnce() -> Fut,
264        Fut: std::future::Future<Output = Result<T, E>>,
265    {
266        if !self.allow() {
267            return Err(CircuitError::Open(self.name.clone()));
268        }
269        match f().await {
270            Ok(v) => {
271                self.on_success();
272                Ok(v)
273            }
274            Err(e) => {
275                self.on_failure();
276                Err(CircuitError::Inner(e))
277            }
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use std::time::Duration;
286
287    fn fast_config() -> CircuitBreakerConfig {
288        CircuitBreakerConfig {
289            failure_threshold: 2,
290            success_threshold: 2,
291            initial_backoff: Duration::from_millis(20),
292            max_backoff: Duration::from_millis(200),
293        }
294    }
295
296    #[test]
297    fn opens_after_threshold() {
298        let cb = CircuitBreaker::new("t", fast_config());
299        assert!(cb.allow());
300        cb.on_failure();
301        assert!(cb.allow());
302        cb.on_failure();
303        assert!(!cb.allow());
304        assert!(cb.is_open());
305    }
306
307    #[test]
308    fn success_resets_failure_count() {
309        let cb = CircuitBreaker::new("t", fast_config());
310        cb.on_failure();
311        cb.on_success();
312        cb.on_failure(); // only 1 consecutive failure again
313        assert!(cb.allow());
314    }
315
316    #[tokio::test]
317    async fn transitions_to_half_open_after_backoff() {
318        let cb = CircuitBreaker::new("t", fast_config());
319        cb.on_failure();
320        cb.on_failure();
321        assert!(cb.is_open());
322
323        tokio::time::sleep(Duration::from_millis(30)).await;
324        // allow() lazily transitions Open → HalfOpen when the backoff elapses
325        assert!(cb.allow());
326        assert!(!cb.is_open());
327    }
328
329    #[tokio::test]
330    async fn half_open_success_closes() {
331        let cb = CircuitBreaker::new("t", fast_config());
332        cb.on_failure();
333        cb.on_failure();
334        tokio::time::sleep(Duration::from_millis(30)).await;
335
336        assert!(cb.allow()); // → HalfOpen
337        cb.on_success();
338        cb.on_success();
339        // Now Closed — failures need to cross threshold again
340        cb.on_failure();
341        assert!(cb.allow());
342    }
343
344    #[tokio::test]
345    async fn half_open_failure_reopens_with_bigger_backoff() {
346        let cb = CircuitBreaker::new("t", fast_config());
347        cb.on_failure();
348        cb.on_failure();
349        tokio::time::sleep(Duration::from_millis(30)).await;
350        assert!(cb.allow()); // → HalfOpen
351        cb.on_failure(); // → Open, backoff doubled to ~40ms
352
353        tokio::time::sleep(Duration::from_millis(30)).await;
354        assert!(!cb.allow()); // still open (backoff is longer now)
355    }
356
357    #[tokio::test]
358    async fn call_short_circuits_when_open() {
359        let cb = CircuitBreaker::new("t", fast_config());
360        cb.trip();
361        let result: Result<(), CircuitError<&str>> = cb.call(|| async { Ok::<(), &str>(()) }).await;
362        assert!(matches!(result, Err(CircuitError::Open(_))));
363    }
364
365    #[tokio::test]
366    async fn call_records_success_and_failure() {
367        let cb = CircuitBreaker::new("t", fast_config());
368        let _: Result<(), CircuitError<&str>> = cb.call(|| async { Err("boom") }).await;
369        let _: Result<(), CircuitError<&str>> = cb.call(|| async { Err("boom") }).await;
370        assert!(cb.is_open());
371    }
372
373    #[test]
374    fn trip_and_reset_are_idempotent() {
375        let cb = CircuitBreaker::new("t", fast_config());
376        cb.trip();
377        assert!(cb.is_open());
378        cb.reset();
379        assert!(!cb.is_open());
380        cb.reset();
381        assert!(!cb.is_open());
382    }
383
384    #[tokio::test]
385    async fn trip_while_open_doubles_backoff() {
386        let cb = CircuitBreaker::new("t", fast_config());
387        cb.trip(); // first trip, backoff = 20ms
388        tokio::time::sleep(Duration::from_millis(15)).await;
389        cb.trip(); // second trip, backoff = 40ms (doubled, capped at 200ms)
390                   // Still well inside 40ms window
391        assert!(cb.is_open());
392        tokio::time::sleep(Duration::from_millis(15)).await;
393        // 30ms into the 40ms window — still open (would have been
394        // HalfOpen already with the original 20ms backoff)
395        assert!(cb.is_open());
396    }
397
398    #[test]
399    fn on_failure_while_open_extends_backoff() {
400        let cb = CircuitBreaker::new("t", fast_config());
401        cb.trip();
402        assert!(cb.is_open());
403        // Signalling another failure should NOT crash and should keep
404        // us open with a longer backoff (defensive-guard against
405        // external signals flooding in while we're already tripped).
406        cb.on_failure();
407        cb.on_failure();
408        assert!(cb.is_open());
409    }
410
411    #[test]
412    fn recovers_from_poisoned_mutex() {
413        // Simulate a poisoned lock by panicking inside a lock-holding
414        // closure. The standard `Mutex::lock()` would return Err after
415        // this; our `lock_state` helper unwraps the poison and keeps
416        // going so a panic in tracing / allocation elsewhere can't
417        // cascade-cripple the breaker.
418        use std::sync::Arc;
419        let cb = Arc::new(CircuitBreaker::new("t", fast_config()));
420        let cb2 = cb.clone();
421        let h = std::thread::spawn(move || {
422            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
423                let _guard = cb2.state.lock().unwrap();
424                panic!("simulated panic while holding lock");
425            }));
426        });
427        h.join().unwrap();
428        // Mutex is now poisoned. But our helpers should still work.
429        assert!(!cb.is_open()); // Closed by default
430        cb.on_failure();
431        cb.on_failure();
432        assert!(
433            cb.is_open(),
434            "breaker should have tripped despite poisoned mutex"
435        );
436    }
437}