Skip to main content

bob_core/
resilience.rs

1//! # Resilience Patterns
2//!
3//! Circuit breaker and retry configuration for preventing cascading failures.
4//!
5//! The [`CircuitBreaker`] is a pure state machine that tracks call outcomes
6//! and transitions between `Closed` → `Open` → `HalfOpen` → `Closed`.
7//! Adapters wrap their port implementations with [`CircuitBreaker::call`]
8//! to add automatic failure detection and recovery probing.
9//!
10//! [`RetryConfig`] provides exponential backoff parameters for adapters
11//! that wish to retry transient failures before surfacing errors.
12
13use std::{
14    sync::Mutex,
15    time::{Duration, Instant},
16};
17
18// ── Circuit Breaker ──────────────────────────────────────────────────
19
20/// Circuit breaker states.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum CircuitState {
23    /// Requests flow normally.
24    Closed,
25    /// Requests fail fast without calling the underlying service.
26    Open,
27    /// A single probe request is allowed to test recovery.
28    HalfOpen,
29}
30
31/// Circuit breaker configuration.
32#[derive(Debug, Clone)]
33pub struct CircuitBreakerConfig {
34    /// Number of consecutive failures before opening the circuit.
35    pub failure_threshold: u32,
36    /// Number of consecutive successes in `HalfOpen` before closing the circuit.
37    pub success_threshold: u32,
38    /// How long to wait in `Open` before transitioning to `HalfOpen`.
39    pub cooldown: Duration,
40}
41
42impl Default for CircuitBreakerConfig {
43    fn default() -> Self {
44        Self { failure_threshold: 5, success_threshold: 2, cooldown: Duration::from_secs(30) }
45    }
46}
47
48/// Inner state protected by a mutex.
49#[derive(Debug)]
50struct CircuitBreakerInner {
51    state: CircuitState,
52    failure_count: u32,
53    success_count: u32,
54    last_failure_at: Option<Instant>,
55}
56
57/// Circuit breaker for preventing cascading failures.
58///
59/// ## States
60///
61/// ```text
62/// Closed ──(failures ≥ threshold)──▶ Open
63///   ▲                                    │
64///   │                                    │ cooldown elapsed
65///   │                                    ▼
66///   └──(successes ≥ threshold)── HalfOpen ◀┘
67///                                      │
68///          (any failure) ──────────────▶ Open
69/// ```
70///
71/// ## Example
72///
73/// ```rust,ignore
74/// use bob_core::resilience::{CircuitBreaker, CircuitBreakerConfig};
75/// use std::time::Duration;
76///
77/// let cb = CircuitBreaker::new(CircuitBreakerConfig {
78///     failure_threshold: 3,
79///     success_threshold: 1,
80///     cooldown: Duration::from_secs(10),
81/// });
82///
83/// let result = cb.call(|| async { Ok::<_, &str>(42) }).await;
84/// assert_eq!(result, Ok(42));
85/// ```
86#[derive(Debug)]
87pub struct CircuitBreaker {
88    config: CircuitBreakerConfig,
89    inner: Mutex<CircuitBreakerInner>,
90}
91
92impl CircuitBreaker {
93    /// Create a new circuit breaker with the given configuration.
94    #[must_use]
95    pub fn new(config: CircuitBreakerConfig) -> Self {
96        Self {
97            config,
98            inner: Mutex::new(CircuitBreakerInner {
99                state: CircuitState::Closed,
100                failure_count: 0,
101                success_count: 0,
102                last_failure_at: None,
103            }),
104        }
105    }
106
107    /// Returns the current circuit state.
108    ///
109    /// If the circuit is `Open` and the cooldown has elapsed, this
110    /// transitions to `HalfOpen` before returning.
111    pub fn state(&self) -> CircuitState {
112        let mut inner = match self.inner.lock() {
113            Ok(guard) => guard,
114            Err(poisoned) => poisoned.into_inner(),
115        };
116        Self::maybe_transition_to_half_open(&mut inner, &self.config);
117        inner.state
118    }
119
120    /// Execute an async operation through the circuit breaker.
121    ///
122    /// Returns `Err(CircuitBreakerError::CircuitOpen)` immediately if the
123    /// circuit is open and the cooldown has not elapsed.
124    pub async fn call<F, Fut, T, E>(&self, f: F) -> Result<T, CircuitBreakerError<E>>
125    where
126        F: FnOnce() -> Fut,
127        Fut: std::future::Future<Output = Result<T, E>>,
128    {
129        // Pre-check: is the circuit open?
130        {
131            let mut inner = match self.inner.lock() {
132                Ok(guard) => guard,
133                Err(poisoned) => poisoned.into_inner(),
134            };
135            Self::maybe_transition_to_half_open(&mut inner, &self.config);
136            if inner.state == CircuitState::Open {
137                return Err(CircuitBreakerError::CircuitOpen);
138            }
139        }
140
141        // Execute the operation.
142        match f().await {
143            Ok(value) => {
144                self.on_success();
145                Ok(value)
146            }
147            Err(err) => {
148                self.on_failure();
149                Err(CircuitBreakerError::Inner(err))
150            }
151        }
152    }
153
154    /// Manually reset the circuit breaker to the closed state.
155    pub fn reset(&self) {
156        let mut inner = match self.inner.lock() {
157            Ok(guard) => guard,
158            Err(poisoned) => poisoned.into_inner(),
159        };
160        inner.state = CircuitState::Closed;
161        inner.failure_count = 0;
162        inner.success_count = 0;
163        inner.last_failure_at = None;
164    }
165
166    fn maybe_transition_to_half_open(
167        inner: &mut CircuitBreakerInner,
168        config: &CircuitBreakerConfig,
169    ) {
170        if inner.state == CircuitState::Open &&
171            let Some(last_failure) = inner.last_failure_at &&
172            last_failure.elapsed() >= config.cooldown
173        {
174            inner.state = CircuitState::HalfOpen;
175            inner.success_count = 0;
176        }
177    }
178
179    fn on_success(&self) {
180        let mut inner = match self.inner.lock() {
181            Ok(guard) => guard,
182            Err(poisoned) => poisoned.into_inner(),
183        };
184        match inner.state {
185            CircuitState::HalfOpen => {
186                inner.success_count = inner.success_count.saturating_add(1);
187                if inner.success_count >= self.config.success_threshold {
188                    inner.state = CircuitState::Closed;
189                    inner.failure_count = 0;
190                    inner.success_count = 0;
191                }
192            }
193            CircuitState::Closed => {
194                inner.failure_count = 0;
195            }
196            CircuitState::Open => {}
197        }
198    }
199
200    fn on_failure(&self) {
201        let mut inner = match self.inner.lock() {
202            Ok(guard) => guard,
203            Err(poisoned) => poisoned.into_inner(),
204        };
205        match inner.state {
206            CircuitState::Closed | CircuitState::HalfOpen => {
207                inner.failure_count = inner.failure_count.saturating_add(1);
208                if inner.failure_count >= self.config.failure_threshold {
209                    inner.state = CircuitState::Open;
210                    inner.last_failure_at = Some(Instant::now());
211                }
212            }
213            CircuitState::Open => {}
214        }
215    }
216}
217
218// ── Circuit Breaker Error ────────────────────────────────────────────
219
220/// Error returned by [`CircuitBreaker::call`].
221pub enum CircuitBreakerError<E> {
222    /// The circuit is open; the call was not attempted.
223    CircuitOpen,
224    /// The underlying operation returned an error.
225    Inner(E),
226}
227
228impl<E> std::fmt::Debug for CircuitBreakerError<E>
229where
230    E: std::fmt::Debug,
231{
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        match self {
234            Self::CircuitOpen => f.write_str("CircuitOpen"),
235            Self::Inner(e) => f.debug_tuple("Inner").field(e).finish(),
236        }
237    }
238}
239
240impl<E: std::fmt::Display> std::fmt::Display for CircuitBreakerError<E> {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        match self {
243            Self::CircuitOpen => f.write_str("circuit breaker is open"),
244            Self::Inner(e) => write!(f, "{e}"),
245        }
246    }
247}
248
249impl<E: std::fmt::Debug + std::fmt::Display> std::error::Error for CircuitBreakerError<E> {}
250
251impl<E> CircuitBreakerError<E> {
252    /// Stable error code.
253    #[must_use]
254    pub fn code(&self) -> &'static str {
255        match self {
256            Self::CircuitOpen => "BOB_CIRCUIT_OPEN",
257            Self::Inner(_) => "BOB_CIRCUIT_INNER",
258        }
259    }
260}
261
262// ── Retry Config ─────────────────────────────────────────────────────
263
264/// Exponential backoff retry parameters.
265///
266/// Adapters consume this configuration to drive retry loops.
267/// The struct itself is pure data; the actual `sleep` call
268/// is left to the adapter (which has access to `tokio::time`).
269#[derive(Debug, Clone)]
270pub struct RetryConfig {
271    /// Maximum number of retry attempts after the first failure.
272    pub max_retries: u32,
273    /// Initial delay between attempts.
274    pub initial_delay: Duration,
275    /// Maximum delay (caps the exponential growth).
276    pub max_delay: Duration,
277    /// Multiplier applied to the delay after each failure.
278    pub multiplier: f64,
279}
280
281impl Default for RetryConfig {
282    fn default() -> Self {
283        Self {
284            max_retries: 3,
285            initial_delay: Duration::from_millis(200),
286            max_delay: Duration::from_secs(10),
287            multiplier: 2.0,
288        }
289    }
290}
291
292impl RetryConfig {
293    /// Compute the delay for a given attempt number (0-indexed).
294    #[must_use]
295    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
296        let base = self.initial_delay.as_millis() as f64;
297        let scaled = base * self.multiplier.powi(attempt as i32);
298        let capped = scaled.min(self.max_delay.as_millis() as f64);
299        Duration::from_millis(capped as u64)
300    }
301}
302
303// ── Health Check ─────────────────────────────────────────────────────
304
305/// Simple health status for a single component.
306#[derive(Debug, Clone, Copy, PartialEq, Eq)]
307pub enum HealthStatus {
308    /// Component is operating normally.
309    Healthy,
310    /// Component is operational but degraded (e.g. high latency).
311    Degraded,
312    /// Component is not operational.
313    Unhealthy,
314}
315
316/// A health check result for one component.
317#[derive(Debug, Clone)]
318pub struct ComponentHealth {
319    /// Component name (e.g. `"llm:openai"`, `"mcp:filesystem"`).
320    pub name: String,
321    /// Current health status.
322    pub status: HealthStatus,
323    /// Optional human-readable detail.
324    pub detail: Option<String>,
325}
326
327// ── Tests ────────────────────────────────────────────────────────────
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    #[tokio::test]
334    async fn circuit_breaker_allows_calls_when_closed() {
335        let cb = CircuitBreaker::new(CircuitBreakerConfig::default());
336        assert_eq!(cb.state(), CircuitState::Closed);
337
338        let result = cb.call(|| async { Ok::<_, &str>(42) }).await;
339        assert!(result.is_ok());
340        assert_eq!(result.expect("should succeed"), 42);
341        assert_eq!(cb.state(), CircuitState::Closed);
342    }
343
344    #[tokio::test]
345    async fn circuit_breaker_opens_after_threshold() {
346        let cb = CircuitBreaker::new(CircuitBreakerConfig {
347            failure_threshold: 3,
348            success_threshold: 1,
349            cooldown: Duration::from_secs(60),
350        });
351
352        for _ in 0..3 {
353            let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
354        }
355
356        assert_eq!(cb.state(), CircuitState::Open);
357
358        let result = cb.call(|| async { Ok::<_, &str>(42) }).await;
359        assert!(matches!(result, Err(CircuitBreakerError::CircuitOpen)));
360    }
361
362    #[tokio::test]
363    async fn circuit_breaker_half_open_after_cooldown() {
364        let cb = CircuitBreaker::new(CircuitBreakerConfig {
365            failure_threshold: 1,
366            success_threshold: 1,
367            cooldown: Duration::from_millis(50),
368        });
369
370        let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
371        assert_eq!(cb.state(), CircuitState::Open);
372
373        tokio::time::sleep(Duration::from_millis(80)).await;
374        assert_eq!(cb.state(), CircuitState::HalfOpen);
375    }
376
377    #[tokio::test]
378    async fn circuit_breaker_closes_after_success_in_half_open() {
379        let cb = CircuitBreaker::new(CircuitBreakerConfig {
380            failure_threshold: 1,
381            success_threshold: 1,
382            cooldown: Duration::from_millis(10),
383        });
384
385        let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
386        tokio::time::sleep(Duration::from_millis(20)).await;
387
388        let result = cb.call(|| async { Ok::<_, &str>(42) }).await;
389        assert!(result.is_ok());
390        assert_eq!(result.expect("should succeed"), 42);
391        assert_eq!(cb.state(), CircuitState::Closed);
392    }
393
394    #[tokio::test]
395    async fn circuit_breaker_reopens_on_failure_in_half_open() {
396        let cb = CircuitBreaker::new(CircuitBreakerConfig {
397            failure_threshold: 1,
398            success_threshold: 2,
399            cooldown: Duration::from_millis(10),
400        });
401
402        let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
403        tokio::time::sleep(Duration::from_millis(20)).await;
404        assert_eq!(cb.state(), CircuitState::HalfOpen);
405
406        let _ = cb.call(|| async { Err::<(), _>("fail again") }).await;
407        assert_eq!(cb.state(), CircuitState::Open);
408    }
409
410    #[tokio::test]
411    async fn circuit_breaker_reset() {
412        let cb = CircuitBreaker::new(CircuitBreakerConfig {
413            failure_threshold: 1,
414            success_threshold: 1,
415            cooldown: Duration::from_secs(60),
416        });
417
418        let _ = cb.call(|| async { Err::<(), _>("fail") }).await;
419        assert!(cb.call(|| async { Ok::<_, &str>(1) }).await.is_err());
420
421        cb.reset();
422        assert_eq!(cb.state(), CircuitState::Closed);
423    }
424
425    #[test]
426    fn retry_config_delay_computation() {
427        let config = RetryConfig {
428            max_retries: 3,
429            initial_delay: Duration::from_millis(100),
430            max_delay: Duration::from_secs(5),
431            multiplier: 2.0,
432        };
433
434        assert_eq!(config.delay_for_attempt(0), Duration::from_millis(100));
435        assert_eq!(config.delay_for_attempt(1), Duration::from_millis(200));
436        assert_eq!(config.delay_for_attempt(2), Duration::from_millis(400));
437        assert_eq!(config.delay_for_attempt(10), Duration::from_secs(5)); // capped
438    }
439
440    #[test]
441    fn retry_config_default() {
442        let config = RetryConfig::default();
443        assert_eq!(config.max_retries, 3);
444        assert_eq!(config.delay_for_attempt(0), Duration::from_millis(200));
445    }
446}