Skip to main content

opendev_http/
circuit_breaker.rs

1//! Circuit breaker for provider API calls.
2//!
3//! Protects against cascading failures by tracking consecutive errors and
4//! temporarily rejecting requests when a provider is down. After a cooldown
5//! period a single probe request is allowed through to test recovery.
6//!
7//! States:
8//! - **Closed**: Normal operation. Failures increment the counter.
9//! - **Open**: Too many failures. All requests are rejected immediately.
10//! - **HalfOpen**: Cooldown elapsed. One probe request is allowed.
11
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicU32, Ordering};
14use std::time::{Duration, Instant};
15
16use serde::{Deserialize, Serialize};
17use tracing::{debug, info, warn};
18
19use crate::models::HttpError;
20
21/// Circuit breaker state.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum CircuitState {
24    /// Normal operation — requests flow through.
25    Closed,
26    /// Too many failures — requests are rejected immediately.
27    Open,
28    /// Cooldown elapsed — one probe request is permitted.
29    HalfOpen,
30}
31
32/// Configuration for a circuit breaker.
33///
34/// This struct is serializable so it can be loaded from config files or
35/// passed as part of provider configuration.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct CircuitBreakerConfig {
38    /// Number of consecutive failures before opening the circuit.
39    pub failure_threshold: u32,
40    /// Seconds the circuit stays open before transitioning to half-open.
41    pub reset_timeout_secs: u64,
42    /// Seconds between probe attempts in the half-open state.
43    /// Defaults to the same value as `reset_timeout_secs` if not set.
44    pub probe_interval_secs: u64,
45}
46
47impl Default for CircuitBreakerConfig {
48    fn default() -> Self {
49        Self {
50            failure_threshold: 5,
51            reset_timeout_secs: 30,
52            probe_interval_secs: 30,
53        }
54    }
55}
56
57/// A circuit breaker that tracks consecutive failures and opens the circuit
58/// when a configurable threshold is reached.
59pub struct CircuitBreaker {
60    /// Number of consecutive failures observed.
61    failure_count: AtomicU32,
62    /// Number of consecutive failures required to open the circuit.
63    threshold: u32,
64    /// Timestamp of the most recent failure (used for cooldown calculation).
65    last_failure: Mutex<Option<Instant>>,
66    /// How long the circuit stays open before transitioning to half-open.
67    cooldown: Duration,
68    /// Name of the provider (used in log messages).
69    provider: String,
70}
71
72impl CircuitBreaker {
73    /// Create a new circuit breaker.
74    ///
75    /// * `provider` — human-readable provider name for log messages.
76    /// * `threshold` — number of consecutive failures before opening.
77    /// * `cooldown` — time to wait in the open state before probing.
78    pub fn new(provider: impl Into<String>, threshold: u32, cooldown: Duration) -> Self {
79        Self {
80            failure_count: AtomicU32::new(0),
81            threshold,
82            last_failure: Mutex::new(None),
83            cooldown,
84            provider: provider.into(),
85        }
86    }
87
88    /// Create a circuit breaker with sensible defaults (5 failures, 30s cooldown).
89    pub fn with_defaults(provider: impl Into<String>) -> Self {
90        Self::new(provider, 5, Duration::from_secs(30))
91    }
92
93    /// Create a circuit breaker from a [`CircuitBreakerConfig`].
94    pub fn from_config(provider: impl Into<String>, config: &CircuitBreakerConfig) -> Self {
95        Self::new(
96            provider,
97            config.failure_threshold,
98            Duration::from_secs(config.reset_timeout_secs),
99        )
100    }
101
102    /// Return the current state of the circuit.
103    pub fn state(&self) -> CircuitState {
104        let failures = self.failure_count.load(Ordering::Relaxed);
105
106        if failures < self.threshold {
107            return CircuitState::Closed;
108        }
109
110        // Circuit has reached the failure threshold — check cooldown.
111        let lock = self.last_failure.lock().unwrap_or_else(|e| e.into_inner());
112        match *lock {
113            Some(last) if last.elapsed() >= self.cooldown => CircuitState::HalfOpen,
114            _ => CircuitState::Open,
115        }
116    }
117
118    /// Check whether a request should be allowed through.
119    ///
120    /// Returns `Ok(())` if the request may proceed, or `Err(HttpError)` if
121    /// the circuit is open and the request should be rejected.
122    pub fn check(&self) -> Result<(), HttpError> {
123        match self.state() {
124            CircuitState::Closed => Ok(()),
125            CircuitState::HalfOpen => {
126                debug!(
127                    provider = %self.provider,
128                    "Circuit half-open, allowing probe request"
129                );
130                Ok(())
131            }
132            CircuitState::Open => {
133                let remaining = {
134                    let lock = self.last_failure.lock().unwrap_or_else(|e| e.into_inner());
135                    lock.map(|last| self.cooldown.saturating_sub(last.elapsed()))
136                        .unwrap_or(self.cooldown)
137                };
138                warn!(
139                    provider = %self.provider,
140                    remaining_secs = remaining.as_secs(),
141                    "Circuit open, rejecting request"
142                );
143                Err(HttpError::Other(format!(
144                    "Circuit breaker open for provider '{}'. \
145                     Too many consecutive failures ({}). \
146                     Will retry in {}s.",
147                    self.provider,
148                    self.failure_count.load(Ordering::Relaxed),
149                    remaining.as_secs(),
150                )))
151            }
152        }
153    }
154
155    /// Record a successful request. Resets the failure counter and closes the
156    /// circuit if it was half-open.
157    pub fn record_success(&self) {
158        let prev = self.failure_count.swap(0, Ordering::Relaxed);
159        if prev >= self.threshold {
160            info!(
161                provider = %self.provider,
162                "Circuit breaker closed after successful probe"
163            );
164        }
165    }
166
167    /// Record a failed request. Increments the failure counter and, if the
168    /// threshold is reached, opens the circuit.
169    pub fn record_failure(&self) {
170        let new_count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
171
172        // Update last-failure timestamp.
173        {
174            let mut lock = self.last_failure.lock().unwrap_or_else(|e| e.into_inner());
175            *lock = Some(Instant::now());
176        }
177
178        if new_count == self.threshold {
179            warn!(
180                provider = %self.provider,
181                threshold = self.threshold,
182                cooldown_secs = self.cooldown.as_secs(),
183                "Circuit breaker opened after {} consecutive failures",
184                self.threshold
185            );
186        }
187    }
188
189    /// Get the current failure count.
190    pub fn failure_count(&self) -> u32 {
191        self.failure_count.load(Ordering::Relaxed)
192    }
193
194    /// Reset the circuit breaker to its initial (closed) state.
195    pub fn reset(&self) {
196        self.failure_count.store(0, Ordering::Relaxed);
197        let mut lock = self.last_failure.lock().unwrap_or_else(|e| e.into_inner());
198        *lock = None;
199    }
200}
201
202impl std::fmt::Debug for CircuitBreaker {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.debug_struct("CircuitBreaker")
205            .field("provider", &self.provider)
206            .field("state", &self.state())
207            .field("failure_count", &self.failure_count.load(Ordering::Relaxed))
208            .field("threshold", &self.threshold)
209            .field("cooldown", &self.cooldown)
210            .finish()
211    }
212}
213
214#[cfg(test)]
215#[path = "circuit_breaker_tests.rs"]
216mod tests;