pub use arcly_http_macros::circuit_breaker;
pub mod bulkhead;
pub mod distributed_rate_limit;
pub mod dlock;
pub mod rate_limit;
pub mod timeout;
pub use bulkhead::Bulkhead;
pub use distributed_rate_limit::{
DistributedRateLimit, FailurePolicy, RateDecision, RateLimitBackend,
};
pub use dlock::{DLockBackend, DistributedLock, LockGuard};
pub use rate_limit::RateLimit;
use std::error::Error as StdError;
use std::fmt;
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::time::Duration;
use crate::web::error::{HttpError, ProblemDetails, ServiceUnavailable};
const CLOSED: u8 = 0;
const OPEN: u8 = 1;
const HALF_OPEN: u8 = 2;
#[derive(Debug, Clone)]
pub struct BreakerOpen;
impl fmt::Display for BreakerOpen {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "circuit breaker open")
}
}
impl StdError for BreakerOpen {}
impl HttpError for BreakerOpen {
fn problem(&self) -> ProblemDetails {
ServiceUnavailable::new("circuit breaker open").problem()
}
}
impl From<BreakerOpen> for ServiceUnavailable {
fn from(_: BreakerOpen) -> Self {
ServiceUnavailable::new("circuit breaker open")
}
}
pub struct CircuitBreaker {
state: AtomicU8,
failure_threshold: u32,
failure_count: AtomicU32,
cooldown_millis: u64,
name: &'static str,
last_state_change_nanos: AtomicU64,
}
impl CircuitBreaker {
pub fn new(failure_threshold: u32, cooldown: Duration) -> Self {
Self::const_new(failure_threshold, cooldown.as_millis() as u64)
}
pub const fn const_new(failure_threshold: u32, cooldown_millis: u64) -> Self {
Self::const_named("", failure_threshold, cooldown_millis)
}
pub const fn const_named(
name: &'static str,
failure_threshold: u32,
cooldown_millis: u64,
) -> Self {
Self {
state: AtomicU8::new(CLOSED),
failure_threshold,
failure_count: AtomicU32::new(0),
cooldown_millis,
name,
last_state_change_nanos: AtomicU64::new(0),
}
}
fn note_transition(&self, to: &'static str) {
metrics::counter!(
"circuit_breaker_transitions_total",
"name" => self.name, "to" => to
)
.increment(1);
tracing::warn!(breaker = self.name, to, "circuit breaker state change");
}
#[inline]
pub fn state(&self) -> u8 {
self.state.load(Ordering::Relaxed)
}
fn now_nanos() -> u64 {
use std::sync::OnceLock;
use std::time::Instant;
static EPOCH: OnceLock<Instant> = OnceLock::new();
let epoch = EPOCH.get_or_init(Instant::now);
epoch.elapsed().as_nanos() as u64
}
pub async fn execute<F, Fut, T, E>(&self, mut action: F) -> Result<Result<T, E>, BreakerOpen>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut probing = false;
match self.state.load(Ordering::SeqCst) {
OPEN => {
let elapsed_ms = (Self::now_nanos()
.saturating_sub(self.last_state_change_nanos.load(Ordering::Relaxed)))
/ 1_000_000;
if elapsed_ms <= self.cooldown_millis {
return Err(BreakerOpen);
}
if self
.state
.compare_exchange(OPEN, HALF_OPEN, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return Err(BreakerOpen);
}
self.note_transition("half_open");
probing = true;
}
HALF_OPEN => return Err(BreakerOpen),
_ => {} }
match action().await {
Ok(v) => {
if probing {
self.state.store(CLOSED, Ordering::SeqCst);
self.failure_count.store(0, Ordering::SeqCst);
self.note_transition("closed");
}
Ok(Ok(v))
}
Err(e) => {
if probing {
self.last_state_change_nanos
.store(Self::now_nanos(), Ordering::Relaxed);
self.state.store(OPEN, Ordering::SeqCst);
self.note_transition("open");
return Ok(Err(e));
}
let count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
if count >= self.failure_threshold
&& self
.state
.compare_exchange(CLOSED, OPEN, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
self.last_state_change_nanos
.store(Self::now_nanos(), Ordering::Relaxed);
self.note_transition("open");
}
Ok(Err(e))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fail() -> Result<(), &'static str> {
Err("boom")
}
fn ok() -> Result<(), &'static str> {
Ok(())
}
async fn drive(
b: &CircuitBreaker,
outcome: fn() -> Result<(), &'static str>,
) -> Result<Result<(), &'static str>, BreakerOpen> {
b.execute(|| async move { outcome() }).await
}
#[tokio::test]
async fn trips_open_after_threshold_and_rejects() {
let b = CircuitBreaker::const_named("t", 3, 60_000);
for _ in 0..3 {
assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
}
assert_eq!(b.state(), OPEN);
assert!(matches!(drive(&b, ok).await, Err(BreakerOpen)));
}
#[tokio::test]
async fn successful_probe_closes_the_breaker() {
let b = CircuitBreaker::const_named("t", 1, 10);
assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
assert_eq!(b.state(), OPEN);
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
assert_eq!(b.state(), CLOSED);
assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
}
#[tokio::test]
async fn failed_probe_reopens_with_fresh_cooldown() {
let b = CircuitBreaker::const_named("t", 1, 20);
assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
assert_eq!(b.state(), OPEN);
tokio::time::sleep(std::time::Duration::from_millis(40)).await;
assert!(matches!(drive(&b, fail).await, Ok(Err(_))));
assert_eq!(b.state(), OPEN, "failed probe must re-open the breaker");
assert!(matches!(drive(&b, ok).await, Err(BreakerOpen)));
tokio::time::sleep(std::time::Duration::from_millis(40)).await;
assert!(matches!(drive(&b, ok).await, Ok(Ok(()))));
assert_eq!(b.state(), CLOSED);
}
#[tokio::test]
async fn half_open_admits_exactly_one_probe() {
let b: &'static CircuitBreaker =
Box::leak(Box::new(CircuitBreaker::const_named("t", 1, 10)));
assert!(matches!(drive(b, fail).await, Ok(Err(_))));
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
let gate = std::sync::Arc::new(tokio::sync::Notify::new());
let release = gate.clone();
let probe = tokio::spawn(async move {
b.execute(|| {
let release = release.clone();
async move {
release.notify_one(); tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Ok::<(), &'static str>(())
}
})
.await
});
gate.notified().await;
assert_eq!(b.state(), HALF_OPEN);
assert!(
matches!(drive(b, ok).await, Err(BreakerOpen)),
"second caller during the probe must be rejected"
);
assert!(matches!(probe.await.expect("join"), Ok(Ok(()))));
assert_eq!(b.state(), CLOSED);
}
}