Skip to main content

arcly_http/resilience/
mod.rs

1//! Lock-free circuit breaker + the `BreakerOpen` sentinel surfaced when the
2//! breaker is OPEN.
3//!
4//! The breaker is `static`-constructible: `CircuitBreaker::const_new` builds
5//! one in const context so the `#[circuit_breaker(...)]` attribute macro can
6//! place a per-method breaker behind a `static`.
7
8pub use arcly_http_macros::circuit_breaker;
9
10pub mod bulkhead;
11pub mod distributed_rate_limit;
12pub mod dlock;
13pub mod rate_limit;
14pub mod timeout;
15pub use bulkhead::Bulkhead;
16pub use distributed_rate_limit::{
17    DistributedRateLimit, FailurePolicy, RateDecision, RateLimitBackend,
18};
19pub use dlock::{DLockBackend, DistributedLock, LockGuard};
20pub use rate_limit::RateLimit;
21
22use std::error::Error as StdError;
23use std::fmt;
24use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
25use std::time::Duration;
26
27use crate::web::error::{HttpError, ProblemDetails, ServiceUnavailable};
28
29const CLOSED: u8 = 0;
30const OPEN: u8 = 1;
31const HALF_OPEN: u8 = 2;
32
33/// Returned when the breaker rejects a call because it is OPEN.
34///
35/// Implements `HttpError` so handlers can simply `?` it — the caller sees
36/// `503 Service Unavailable` via the standard ProblemDetails pipeline. The
37/// `From<BreakerOpen>` blanket forwards through user error types too, as
38/// long as they implement `From<ServiceUnavailable>`.
39#[derive(Debug, Clone)]
40pub struct BreakerOpen;
41
42impl fmt::Display for BreakerOpen {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        write!(f, "circuit breaker open")
45    }
46}
47impl StdError for BreakerOpen {}
48impl HttpError for BreakerOpen {
49    fn problem(&self) -> ProblemDetails {
50        ServiceUnavailable::new("circuit breaker open").problem()
51    }
52}
53
54impl From<BreakerOpen> for ServiceUnavailable {
55    fn from(_: BreakerOpen) -> Self {
56        ServiceUnavailable::new("circuit breaker open")
57    }
58}
59
60pub struct CircuitBreaker {
61    state: AtomicU8,
62    failure_threshold: u32,
63    failure_count: AtomicU32,
64    cooldown_millis: u64,
65    /// For metrics/log labels. Empty for anonymous (macro-default) breakers.
66    name: &'static str,
67    /// Nanos since process start (from a monotonic clock), stored atomically.
68    /// Using monotonic time prevents NTP / DST clock-jump from corrupting the
69    /// cooldown calculation — the only property we need is "elapsed duration",
70    /// not wall-clock time.
71    last_state_change_nanos: AtomicU64,
72}
73
74impl CircuitBreaker {
75    /// Runtime constructor.
76    pub fn new(failure_threshold: u32, cooldown: Duration) -> Self {
77        Self::const_new(failure_threshold, cooldown.as_millis() as u64)
78    }
79
80    /// `const`-friendly constructor used by the `#[circuit_breaker]` macro.
81    pub const fn const_new(failure_threshold: u32, cooldown_millis: u64) -> Self {
82        Self::const_named("", failure_threshold, cooldown_millis)
83    }
84
85    /// Named variant — the name labels `circuit_breaker_transitions_total`
86    /// and the state-change logs.
87    pub const fn const_named(
88        name: &'static str,
89        failure_threshold: u32,
90        cooldown_millis: u64,
91    ) -> Self {
92        Self {
93            state: AtomicU8::new(CLOSED),
94            failure_threshold,
95            failure_count: AtomicU32::new(0),
96            cooldown_millis,
97            name,
98            last_state_change_nanos: AtomicU64::new(0),
99        }
100    }
101
102    fn note_transition(&self, to: &'static str) {
103        metrics::counter!(
104            "circuit_breaker_transitions_total",
105            "name" => self.name, "to" => to
106        )
107        .increment(1);
108        tracing::warn!(breaker = self.name, to, "circuit breaker state change");
109    }
110
111    #[inline]
112    pub fn state(&self) -> u8 {
113        self.state.load(Ordering::Relaxed)
114    }
115
116    fn now_nanos() -> u64 {
117        // Use a process-lifetime Instant as the epoch so elapsed calculations
118        // are immune to NTP adjustments and leap-second corrections.
119        use std::sync::OnceLock;
120        use std::time::Instant;
121        static EPOCH: OnceLock<Instant> = OnceLock::new();
122        let epoch = EPOCH.get_or_init(Instant::now);
123        epoch.elapsed().as_nanos() as u64
124    }
125
126    /// Execute `action` under breaker supervision.
127    ///
128    /// State machine (all transitions are single CAS operations — no locks):
129    ///
130    /// - **CLOSED**: calls flow; `failure_threshold` consecutive-window
131    ///   failures trip to OPEN (cooldown timestamp recorded).
132    /// - **OPEN**: rejected with `BreakerOpen` until the cooldown elapses;
133    ///   then exactly **one** caller (the `OPEN→HALF_OPEN` CAS winner)
134    ///   proceeds as the probe. Everyone else keeps getting `BreakerOpen` —
135    ///   a recovering dependency is never thundering-herded.
136    /// - **HALF_OPEN**: the probe's outcome decides — success closes the
137    ///   breaker (counters reset), failure **re-opens it with a fresh
138    ///   cooldown**.
139    pub async fn execute<F, Fut, T, E>(&self, mut action: F) -> Result<Result<T, E>, BreakerOpen>
140    where
141        F: FnMut() -> Fut,
142        Fut: std::future::Future<Output = Result<T, E>>,
143    {
144        // `true` only for the single half-open probe call.
145        let mut probing = false;
146
147        match self.state.load(Ordering::SeqCst) {
148            OPEN => {
149                let elapsed_ms = (Self::now_nanos()
150                    .saturating_sub(self.last_state_change_nanos.load(Ordering::Relaxed)))
151                    / 1_000_000;
152                if elapsed_ms <= self.cooldown_millis {
153                    return Err(BreakerOpen);
154                }
155                // Cooldown elapsed: claim the SINGLE probe slot. Losers stay
156                // rejected until the probe settles the state.
157                if self
158                    .state
159                    .compare_exchange(OPEN, HALF_OPEN, Ordering::SeqCst, Ordering::SeqCst)
160                    .is_err()
161                {
162                    return Err(BreakerOpen);
163                }
164                self.note_transition("half_open");
165                probing = true;
166            }
167            // A probe is already in flight — reject until it settles.
168            // (Previously every caller during HALF_OPEN flowed through,
169            // hammering the recovering dependency.)
170            HALF_OPEN => return Err(BreakerOpen),
171            _ => {} // CLOSED: flow
172        }
173
174        match action().await {
175            Ok(v) => {
176                if probing {
177                    self.state.store(CLOSED, Ordering::SeqCst);
178                    self.failure_count.store(0, Ordering::SeqCst);
179                    self.note_transition("closed");
180                }
181                Ok(Ok(v))
182            }
183            Err(e) => {
184                if probing {
185                    // Failed probe: re-open with a FRESH cooldown. (The old
186                    // code only CASed CLOSED→OPEN, so a failed probe left the
187                    // breaker stuck half-open, passing full traffic forever.)
188                    self.last_state_change_nanos
189                        .store(Self::now_nanos(), Ordering::Relaxed);
190                    self.state.store(OPEN, Ordering::SeqCst);
191                    self.note_transition("open");
192                    return Ok(Err(e));
193                }
194                let count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
195                if count >= self.failure_threshold
196                    && self
197                        .state
198                        .compare_exchange(CLOSED, OPEN, Ordering::SeqCst, Ordering::Relaxed)
199                        .is_ok()
200                {
201                    self.last_state_change_nanos
202                        .store(Self::now_nanos(), Ordering::Relaxed);
203                    self.note_transition("open");
204                }
205                Ok(Err(e))
206            }
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    fn fail() -> Result<(), &'static str> {
216        Err("boom")
217    }
218    fn ok() -> Result<(), &'static str> {
219        Ok(())
220    }
221
222    async fn drive(
223        b: &CircuitBreaker,
224        outcome: fn() -> Result<(), &'static str>,
225    ) -> Result<Result<(), &'static str>, BreakerOpen> {
226        b.execute(|| async move { outcome() }).await
227    }
228
229    #[tokio::test]
230    async fn trips_open_after_threshold_and_rejects() {
231        let b = CircuitBreaker::const_named("t", 3, 60_000);
232        for _ in 0..3 {
233            assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
234        }
235        assert_eq!(b.state(), OPEN);
236        // While open (long cooldown): rejected without invoking the action.
237        assert!(matches!(drive(&b, ok).await, Err(BreakerOpen)));
238    }
239
240    #[tokio::test]
241    async fn successful_probe_closes_the_breaker() {
242        let b = CircuitBreaker::const_named("t", 1, 10);
243        assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
244        assert_eq!(b.state(), OPEN);
245
246        tokio::time::sleep(std::time::Duration::from_millis(30)).await;
247        // Cooldown elapsed: the probe flows and its success closes.
248        assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
249        assert_eq!(b.state(), CLOSED);
250        // Closed again: traffic flows normally.
251        assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
252    }
253
254    #[tokio::test]
255    async fn failed_probe_reopens_with_fresh_cooldown() {
256        let b = CircuitBreaker::const_named("t", 1, 20);
257        assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
258        assert_eq!(b.state(), OPEN);
259
260        tokio::time::sleep(std::time::Duration::from_millis(40)).await;
261        // The probe fails → MUST re-open (the old state machine left the
262        // breaker stuck half-open here, passing full traffic forever).
263        assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
264        assert_eq!(b.state(), OPEN, "failed probe must re-open the breaker");
265        // Fresh cooldown: immediately rejected again.
266        assert!(matches!(drive(&b, ok).await, Err(BreakerOpen)));
267
268        // And it can still recover after the new cooldown.
269        tokio::time::sleep(std::time::Duration::from_millis(40)).await;
270        assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
271        assert_eq!(b.state(), CLOSED);
272    }
273
274    #[tokio::test]
275    async fn half_open_admits_exactly_one_probe() {
276        let b: &'static CircuitBreaker =
277            Box::leak(Box::new(CircuitBreaker::const_named("t", 1, 10)));
278        assert!(matches!(drive(b, fail).await, Ok(Err(_))));
279        tokio::time::sleep(std::time::Duration::from_millis(30)).await;
280
281        // The probe holds the slot while in flight; a concurrent caller must
282        // be rejected rather than stampeding the recovering dependency.
283        let gate = std::sync::Arc::new(tokio::sync::Notify::new());
284        let release = gate.clone();
285        let probe = tokio::spawn(async move {
286            b.execute(|| {
287                let release = release.clone();
288                async move {
289                    release.notify_one(); // probe is now in flight
290                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
291                    Ok::<(), &'static str>(())
292                }
293            })
294            .await
295        });
296
297        gate.notified().await;
298        assert_eq!(b.state(), HALF_OPEN);
299        assert!(
300            matches!(drive(b, ok).await, Err(BreakerOpen)),
301            "second caller during the probe must be rejected"
302        );
303
304        assert!(matches!(probe.await.expect("join"), Ok(Ok(()))));
305        assert_eq!(b.state(), CLOSED);
306    }
307}