arcly-http 0.1.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Lock-free circuit breaker + the `BreakerOpen` sentinel surfaced when the
//! breaker is OPEN.
//!
//! The breaker is `static`-constructible: `CircuitBreaker::const_new` builds
//! one in const context so the `#[circuit_breaker(...)]` attribute macro can
//! place a per-method breaker behind a `static`.

pub use arcly_http_macros::circuit_breaker;

pub mod bulkhead;
pub mod distributed_rate_limit;
pub mod dlock;
pub mod rate_limit;
pub mod timeout;
pub use bulkhead::Bulkhead;
pub use distributed_rate_limit::{
    DistributedRateLimit, FailurePolicy, RateDecision, RateLimitBackend,
};
pub use dlock::{DLockBackend, DistributedLock, LockGuard};
pub use rate_limit::RateLimit;

use std::error::Error as StdError;
use std::fmt;
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::time::Duration;

use crate::web::error::{HttpError, ProblemDetails, ServiceUnavailable};

const CLOSED: u8 = 0;
const OPEN: u8 = 1;
const HALF_OPEN: u8 = 2;

/// Returned when the breaker rejects a call because it is OPEN.
///
/// Implements `HttpError` so handlers can simply `?` it — the caller sees
/// `503 Service Unavailable` via the standard ProblemDetails pipeline. The
/// `From<BreakerOpen>` blanket forwards through user error types too, as
/// long as they implement `From<ServiceUnavailable>`.
#[derive(Debug, Clone)]
pub struct BreakerOpen;

impl fmt::Display for BreakerOpen {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "circuit breaker open")
    }
}
impl StdError for BreakerOpen {}
impl HttpError for BreakerOpen {
    fn problem(&self) -> ProblemDetails {
        ServiceUnavailable::new("circuit breaker open").problem()
    }
}

impl From<BreakerOpen> for ServiceUnavailable {
    fn from(_: BreakerOpen) -> Self {
        ServiceUnavailable::new("circuit breaker open")
    }
}

pub struct CircuitBreaker {
    state: AtomicU8,
    failure_threshold: u32,
    failure_count: AtomicU32,
    cooldown_millis: u64,
    /// Nanos since process start (from a monotonic clock), stored atomically.
    /// Using monotonic time prevents NTP / DST clock-jump from corrupting the
    /// cooldown calculation — the only property we need is "elapsed duration",
    /// not wall-clock time.
    last_state_change_nanos: AtomicU64,
}

impl CircuitBreaker {
    /// Runtime constructor.
    pub fn new(failure_threshold: u32, cooldown: Duration) -> Self {
        Self::const_new(failure_threshold, cooldown.as_millis() as u64)
    }

    /// `const`-friendly constructor used by the `#[circuit_breaker]` macro.
    pub const fn const_new(failure_threshold: u32, cooldown_millis: u64) -> Self {
        Self {
            state: AtomicU8::new(CLOSED),
            failure_threshold,
            failure_count: AtomicU32::new(0),
            cooldown_millis,
            last_state_change_nanos: AtomicU64::new(0),
        }
    }

    #[inline]
    pub fn state(&self) -> u8 {
        self.state.load(Ordering::Relaxed)
    }

    fn now_nanos() -> u64 {
        // Use a process-lifetime Instant as the epoch so elapsed calculations
        // are immune to NTP adjustments and leap-second corrections.
        use std::sync::OnceLock;
        use std::time::Instant;
        static EPOCH: OnceLock<Instant> = OnceLock::new();
        let epoch = EPOCH.get_or_init(Instant::now);
        epoch.elapsed().as_nanos() as u64
    }

    /// Execute `action` under breaker supervision. If the breaker is OPEN and
    /// the cooldown hasn't elapsed, returns `Err(BreakerOpen)` without
    /// invoking the action.
    pub async fn execute<F, Fut, T, E>(&self, mut action: F) -> Result<Result<T, E>, BreakerOpen>
    where
        F: FnMut() -> Fut,
        Fut: std::future::Future<Output = Result<T, E>>,
    {
        let current = self.state.load(Ordering::Relaxed);

        if current == OPEN {
            let elapsed_ms = (Self::now_nanos()
                .saturating_sub(self.last_state_change_nanos.load(Ordering::Relaxed)))
                / 1_000_000;
            if elapsed_ms > self.cooldown_millis {
                let _ = self.state.compare_exchange(
                    OPEN,
                    HALF_OPEN,
                    Ordering::SeqCst,
                    Ordering::Relaxed,
                );
            } else {
                return Err(BreakerOpen);
            }
        }

        match action().await {
            Ok(v) => {
                if self.state.load(Ordering::SeqCst) == HALF_OPEN {
                    self.state.store(CLOSED, Ordering::SeqCst);
                    self.failure_count.store(0, Ordering::SeqCst);
                }
                Ok(Ok(v))
            }
            Err(e) => {
                let count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
                if count >= self.failure_threshold
                    && self
                        .state
                        .compare_exchange(CLOSED, OPEN, Ordering::SeqCst, Ordering::Relaxed)
                        .is_ok()
                {
                    self.last_state_change_nanos
                        .store(Self::now_nanos(), Ordering::Relaxed);
                }
                Ok(Err(e))
            }
        }
    }
}