arcly-http 0.2.2

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Lock-free circuit breaker + the `BreakerOpen` sentinel surfaced when the
//! breaker is OPEN.
//!
//! The breaker is `static`-constructible: `CircuitBreaker::const_new` builds
//! one in const context so the `#[circuit_breaker(...)]` attribute macro can
//! place a per-method breaker behind a `static`.

pub use arcly_http_macros::circuit_breaker;

pub mod bulkhead;
pub mod distributed_rate_limit;
pub mod dlock;
pub mod rate_limit;
pub mod timeout;
pub use bulkhead::Bulkhead;
pub use distributed_rate_limit::{
    DistributedRateLimit, FailurePolicy, RateDecision, RateLimitBackend,
};
pub use dlock::{DLockBackend, DistributedLock, LockGuard};
pub use rate_limit::RateLimit;

use std::error::Error as StdError;
use std::fmt;
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::time::Duration;

use crate::web::error::{HttpError, ProblemDetails, ServiceUnavailable};

const CLOSED: u8 = 0;
const OPEN: u8 = 1;
const HALF_OPEN: u8 = 2;

/// Returned when the breaker rejects a call because it is OPEN.
///
/// Implements `HttpError` so handlers can simply `?` it — the caller sees
/// `503 Service Unavailable` via the standard ProblemDetails pipeline. The
/// `From<BreakerOpen>` blanket forwards through user error types too, as
/// long as they implement `From<ServiceUnavailable>`.
#[derive(Debug, Clone)]
pub struct BreakerOpen;

impl fmt::Display for BreakerOpen {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "circuit breaker open")
    }
}
impl StdError for BreakerOpen {}
impl HttpError for BreakerOpen {
    fn problem(&self) -> ProblemDetails {
        ServiceUnavailable::new("circuit breaker open").problem()
    }
}

impl From<BreakerOpen> for ServiceUnavailable {
    fn from(_: BreakerOpen) -> Self {
        ServiceUnavailable::new("circuit breaker open")
    }
}

pub struct CircuitBreaker {
    state: AtomicU8,
    failure_threshold: u32,
    failure_count: AtomicU32,
    cooldown_millis: u64,
    /// For metrics/log labels. Empty for anonymous (macro-default) breakers.
    name: &'static str,
    /// Nanos since process start (from a monotonic clock), stored atomically.
    /// Using monotonic time prevents NTP / DST clock-jump from corrupting the
    /// cooldown calculation — the only property we need is "elapsed duration",
    /// not wall-clock time.
    last_state_change_nanos: AtomicU64,
}

impl CircuitBreaker {
    /// Runtime constructor.
    pub fn new(failure_threshold: u32, cooldown: Duration) -> Self {
        Self::const_new(failure_threshold, cooldown.as_millis() as u64)
    }

    /// `const`-friendly constructor used by the `#[circuit_breaker]` macro.
    pub const fn const_new(failure_threshold: u32, cooldown_millis: u64) -> Self {
        Self::const_named("", failure_threshold, cooldown_millis)
    }

    /// Named variant — the name labels `circuit_breaker_transitions_total`
    /// and the state-change logs.
    pub const fn const_named(
        name: &'static str,
        failure_threshold: u32,
        cooldown_millis: u64,
    ) -> Self {
        Self {
            state: AtomicU8::new(CLOSED),
            failure_threshold,
            failure_count: AtomicU32::new(0),
            cooldown_millis,
            name,
            last_state_change_nanos: AtomicU64::new(0),
        }
    }

    fn note_transition(&self, to: &'static str) {
        metrics::counter!(
            "circuit_breaker_transitions_total",
            "name" => self.name, "to" => to
        )
        .increment(1);
        tracing::warn!(breaker = self.name, to, "circuit breaker state change");
    }

    #[inline]
    pub fn state(&self) -> u8 {
        self.state.load(Ordering::Relaxed)
    }

    fn now_nanos() -> u64 {
        // Use a process-lifetime Instant as the epoch so elapsed calculations
        // are immune to NTP adjustments and leap-second corrections.
        use std::sync::OnceLock;
        use std::time::Instant;
        static EPOCH: OnceLock<Instant> = OnceLock::new();
        let epoch = EPOCH.get_or_init(Instant::now);
        epoch.elapsed().as_nanos() as u64
    }

    /// Execute `action` under breaker supervision.
    ///
    /// State machine (all transitions are single CAS operations — no locks):
    ///
    /// - **CLOSED**: calls flow; `failure_threshold` consecutive-window
    ///   failures trip to OPEN (cooldown timestamp recorded).
    /// - **OPEN**: rejected with `BreakerOpen` until the cooldown elapses;
    ///   then exactly **one** caller (the `OPEN→HALF_OPEN` CAS winner)
    ///   proceeds as the probe. Everyone else keeps getting `BreakerOpen` —
    ///   a recovering dependency is never thundering-herded.
    /// - **HALF_OPEN**: the probe's outcome decides — success closes the
    ///   breaker (counters reset), failure **re-opens it with a fresh
    ///   cooldown**.
    pub async fn execute<F, Fut, T, E>(&self, mut action: F) -> Result<Result<T, E>, BreakerOpen>
    where
        F: FnMut() -> Fut,
        Fut: std::future::Future<Output = Result<T, E>>,
    {
        // `true` only for the single half-open probe call.
        let mut probing = false;

        match self.state.load(Ordering::SeqCst) {
            OPEN => {
                let elapsed_ms = (Self::now_nanos()
                    .saturating_sub(self.last_state_change_nanos.load(Ordering::Relaxed)))
                    / 1_000_000;
                if elapsed_ms <= self.cooldown_millis {
                    return Err(BreakerOpen);
                }
                // Cooldown elapsed: claim the SINGLE probe slot. Losers stay
                // rejected until the probe settles the state.
                if self
                    .state
                    .compare_exchange(OPEN, HALF_OPEN, Ordering::SeqCst, Ordering::SeqCst)
                    .is_err()
                {
                    return Err(BreakerOpen);
                }
                self.note_transition("half_open");
                probing = true;
            }
            // A probe is already in flight — reject until it settles.
            // (Previously every caller during HALF_OPEN flowed through,
            // hammering the recovering dependency.)
            HALF_OPEN => return Err(BreakerOpen),
            _ => {} // CLOSED: flow
        }

        match action().await {
            Ok(v) => {
                if probing {
                    self.state.store(CLOSED, Ordering::SeqCst);
                    self.failure_count.store(0, Ordering::SeqCst);
                    self.note_transition("closed");
                }
                Ok(Ok(v))
            }
            Err(e) => {
                if probing {
                    // Failed probe: re-open with a FRESH cooldown. (The old
                    // code only CASed CLOSED→OPEN, so a failed probe left the
                    // breaker stuck half-open, passing full traffic forever.)
                    self.last_state_change_nanos
                        .store(Self::now_nanos(), Ordering::Relaxed);
                    self.state.store(OPEN, Ordering::SeqCst);
                    self.note_transition("open");
                    return Ok(Err(e));
                }
                let count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
                if count >= self.failure_threshold
                    && self
                        .state
                        .compare_exchange(CLOSED, OPEN, Ordering::SeqCst, Ordering::Relaxed)
                        .is_ok()
                {
                    self.last_state_change_nanos
                        .store(Self::now_nanos(), Ordering::Relaxed);
                    self.note_transition("open");
                }
                Ok(Err(e))
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn fail() -> Result<(), &'static str> {
        Err("boom")
    }
    fn ok() -> Result<(), &'static str> {
        Ok(())
    }

    async fn drive(
        b: &CircuitBreaker,
        outcome: fn() -> Result<(), &'static str>,
    ) -> Result<Result<(), &'static str>, BreakerOpen> {
        b.execute(|| async move { outcome() }).await
    }

    #[tokio::test]
    async fn trips_open_after_threshold_and_rejects() {
        let b = CircuitBreaker::const_named("t", 3, 60_000);
        for _ in 0..3 {
            assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
        }
        assert_eq!(b.state(), OPEN);
        // While open (long cooldown): rejected without invoking the action.
        assert!(matches!(drive(&b, ok).await, Err(BreakerOpen)));
    }

    #[tokio::test]
    async fn successful_probe_closes_the_breaker() {
        let b = CircuitBreaker::const_named("t", 1, 10);
        assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
        assert_eq!(b.state(), OPEN);

        tokio::time::sleep(std::time::Duration::from_millis(30)).await;
        // Cooldown elapsed: the probe flows and its success closes.
        assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
        assert_eq!(b.state(), CLOSED);
        // Closed again: traffic flows normally.
        assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
    }

    #[tokio::test]
    async fn failed_probe_reopens_with_fresh_cooldown() {
        let b = CircuitBreaker::const_named("t", 1, 20);
        assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
        assert_eq!(b.state(), OPEN);

        tokio::time::sleep(std::time::Duration::from_millis(40)).await;
        // The probe fails → MUST re-open (the old state machine left the
        // breaker stuck half-open here, passing full traffic forever).
        assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
        assert_eq!(b.state(), OPEN, "failed probe must re-open the breaker");
        // Fresh cooldown: immediately rejected again.
        assert!(matches!(drive(&b, ok).await, Err(BreakerOpen)));

        // And it can still recover after the new cooldown.
        tokio::time::sleep(std::time::Duration::from_millis(40)).await;
        assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
        assert_eq!(b.state(), CLOSED);
    }

    #[tokio::test]
    async fn half_open_admits_exactly_one_probe() {
        let b: &'static CircuitBreaker =
            Box::leak(Box::new(CircuitBreaker::const_named("t", 1, 10)));
        assert!(matches!(drive(b, fail).await, Ok(Err(_))));
        tokio::time::sleep(std::time::Duration::from_millis(30)).await;

        // The probe holds the slot while in flight; a concurrent caller must
        // be rejected rather than stampeding the recovering dependency.
        let gate = std::sync::Arc::new(tokio::sync::Notify::new());
        let release = gate.clone();
        let probe = tokio::spawn(async move {
            b.execute(|| {
                let release = release.clone();
                async move {
                    release.notify_one(); // probe is now in flight
                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
                    Ok::<(), &'static str>(())
                }
            })
            .await
        });

        gate.notified().await;
        assert_eq!(b.state(), HALF_OPEN);
        assert!(
            matches!(drive(b, ok).await, Err(BreakerOpen)),
            "second caller during the probe must be rejected"
        );

        assert!(matches!(probe.await.expect("join"), Ok(Ok(()))));
        assert_eq!(b.state(), CLOSED);
    }
}