Skip to main content

arcly_http/resilience/
mod.rs

1//! Lock-free circuit breaker + the `BreakerOpen` sentinel surfaced when the
2//! breaker is OPEN.
3//!
4//! The breaker is `static`-constructible: `CircuitBreaker::const_new` builds
5//! one in const context so the `#[circuit_breaker(...)]` attribute macro can
6//! place a per-method breaker behind a `static`.
7
8pub use arcly_http_macros::circuit_breaker;
9
10pub mod bulkhead;
11pub mod distributed_rate_limit;
12pub mod dlock;
13pub mod rate_limit;
14pub mod timeout;
15pub use bulkhead::Bulkhead;
16pub use distributed_rate_limit::{
17    DistributedRateLimit, FailurePolicy, RateDecision, RateLimitBackend,
18};
19pub use dlock::{DLockBackend, DistributedLock, LockGuard};
20pub use rate_limit::RateLimit;
21
22use std::error::Error as StdError;
23use std::fmt;
24use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
25use std::time::Duration;
26
27use crate::web::error::{HttpError, ProblemDetails, ServiceUnavailable};
28
29const CLOSED: u8 = 0;
30const OPEN: u8 = 1;
31const HALF_OPEN: u8 = 2;
32
33/// Returned when the breaker rejects a call because it is OPEN.
34///
35/// Implements `HttpError` so handlers can simply `?` it — the caller sees
36/// `503 Service Unavailable` via the standard ProblemDetails pipeline. The
37/// `From<BreakerOpen>` blanket forwards through user error types too, as
38/// long as they implement `From<ServiceUnavailable>`.
39#[derive(Debug, Clone)]
40pub struct BreakerOpen;
41
42impl fmt::Display for BreakerOpen {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        write!(f, "circuit breaker open")
45    }
46}
47impl StdError for BreakerOpen {}
48impl HttpError for BreakerOpen {
49    fn problem(&self) -> ProblemDetails {
50        ServiceUnavailable::new("circuit breaker open").problem()
51    }
52}
53
54impl From<BreakerOpen> for ServiceUnavailable {
55    fn from(_: BreakerOpen) -> Self {
56        ServiceUnavailable::new("circuit breaker open")
57    }
58}
59
60pub struct CircuitBreaker {
61    state: AtomicU8,
62    failure_threshold: u32,
63    failure_count: AtomicU32,
64    cooldown_millis: u64,
65    /// Nanos since process start (from a monotonic clock), stored atomically.
66    /// Using monotonic time prevents NTP / DST clock-jump from corrupting the
67    /// cooldown calculation — the only property we need is "elapsed duration",
68    /// not wall-clock time.
69    last_state_change_nanos: AtomicU64,
70}
71
72impl CircuitBreaker {
73    /// Runtime constructor.
74    pub fn new(failure_threshold: u32, cooldown: Duration) -> Self {
75        Self::const_new(failure_threshold, cooldown.as_millis() as u64)
76    }
77
78    /// `const`-friendly constructor used by the `#[circuit_breaker]` macro.
79    pub const fn const_new(failure_threshold: u32, cooldown_millis: u64) -> Self {
80        Self {
81            state: AtomicU8::new(CLOSED),
82            failure_threshold,
83            failure_count: AtomicU32::new(0),
84            cooldown_millis,
85            last_state_change_nanos: AtomicU64::new(0),
86        }
87    }
88
89    #[inline]
90    pub fn state(&self) -> u8 {
91        self.state.load(Ordering::Relaxed)
92    }
93
94    fn now_nanos() -> u64 {
95        // Use a process-lifetime Instant as the epoch so elapsed calculations
96        // are immune to NTP adjustments and leap-second corrections.
97        use std::sync::OnceLock;
98        use std::time::Instant;
99        static EPOCH: OnceLock<Instant> = OnceLock::new();
100        let epoch = EPOCH.get_or_init(Instant::now);
101        epoch.elapsed().as_nanos() as u64
102    }
103
104    /// Execute `action` under breaker supervision. If the breaker is OPEN and
105    /// the cooldown hasn't elapsed, returns `Err(BreakerOpen)` without
106    /// invoking the action.
107    pub async fn execute<F, Fut, T, E>(&self, mut action: F) -> Result<Result<T, E>, BreakerOpen>
108    where
109        F: FnMut() -> Fut,
110        Fut: std::future::Future<Output = Result<T, E>>,
111    {
112        let current = self.state.load(Ordering::Relaxed);
113
114        if current == OPEN {
115            let elapsed_ms = (Self::now_nanos()
116                .saturating_sub(self.last_state_change_nanos.load(Ordering::Relaxed)))
117                / 1_000_000;
118            if elapsed_ms > self.cooldown_millis {
119                let _ = self.state.compare_exchange(
120                    OPEN,
121                    HALF_OPEN,
122                    Ordering::SeqCst,
123                    Ordering::Relaxed,
124                );
125            } else {
126                return Err(BreakerOpen);
127            }
128        }
129
130        match action().await {
131            Ok(v) => {
132                if self.state.load(Ordering::SeqCst) == HALF_OPEN {
133                    self.state.store(CLOSED, Ordering::SeqCst);
134                    self.failure_count.store(0, Ordering::SeqCst);
135                }
136                Ok(Ok(v))
137            }
138            Err(e) => {
139                let count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
140                if count >= self.failure_threshold
141                    && self
142                        .state
143                        .compare_exchange(CLOSED, OPEN, Ordering::SeqCst, Ordering::Relaxed)
144                        .is_ok()
145                {
146                    self.last_state_change_nanos
147                        .store(Self::now_nanos(), Ordering::Relaxed);
148                }
149                Ok(Err(e))
150            }
151        }
152    }
153}