pub use arcly_http_macros::circuit_breaker;
pub mod bulkhead;
pub mod distributed_rate_limit;
pub mod dlock;
pub mod rate_limit;
pub mod timeout;
pub use bulkhead::Bulkhead;
pub use distributed_rate_limit::{
DistributedRateLimit, FailurePolicy, RateDecision, RateLimitBackend,
};
pub use dlock::{DLockBackend, DistributedLock, LockGuard};
pub use rate_limit::RateLimit;
use std::error::Error as StdError;
use std::fmt;
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::time::Duration;
use crate::web::error::{HttpError, ProblemDetails, ServiceUnavailable};
const CLOSED: u8 = 0;
const OPEN: u8 = 1;
const HALF_OPEN: u8 = 2;
#[derive(Debug, Clone)]
pub struct BreakerOpen;
impl fmt::Display for BreakerOpen {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "circuit breaker open")
}
}
impl StdError for BreakerOpen {}
impl HttpError for BreakerOpen {
fn problem(&self) -> ProblemDetails {
ServiceUnavailable::new("circuit breaker open").problem()
}
}
impl From<BreakerOpen> for ServiceUnavailable {
fn from(_: BreakerOpen) -> Self {
ServiceUnavailable::new("circuit breaker open")
}
}
pub struct CircuitBreaker {
state: AtomicU8,
failure_threshold: u32,
failure_count: AtomicU32,
cooldown_millis: u64,
last_state_change_nanos: AtomicU64,
}
impl CircuitBreaker {
pub fn new(failure_threshold: u32, cooldown: Duration) -> Self {
Self::const_new(failure_threshold, cooldown.as_millis() as u64)
}
pub const fn const_new(failure_threshold: u32, cooldown_millis: u64) -> Self {
Self {
state: AtomicU8::new(CLOSED),
failure_threshold,
failure_count: AtomicU32::new(0),
cooldown_millis,
last_state_change_nanos: AtomicU64::new(0),
}
}
#[inline]
pub fn state(&self) -> u8 {
self.state.load(Ordering::Relaxed)
}
fn now_nanos() -> u64 {
use std::sync::OnceLock;
use std::time::Instant;
static EPOCH: OnceLock<Instant> = OnceLock::new();
let epoch = EPOCH.get_or_init(Instant::now);
epoch.elapsed().as_nanos() as u64
}
pub async fn execute<F, Fut, T, E>(&self, mut action: F) -> Result<Result<T, E>, BreakerOpen>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let current = self.state.load(Ordering::Relaxed);
if current == OPEN {
let elapsed_ms = (Self::now_nanos()
.saturating_sub(self.last_state_change_nanos.load(Ordering::Relaxed)))
/ 1_000_000;
if elapsed_ms > self.cooldown_millis {
let _ = self.state.compare_exchange(
OPEN,
HALF_OPEN,
Ordering::SeqCst,
Ordering::Relaxed,
);
} else {
return Err(BreakerOpen);
}
}
match action().await {
Ok(v) => {
if self.state.load(Ordering::SeqCst) == HALF_OPEN {
self.state.store(CLOSED, Ordering::SeqCst);
self.failure_count.store(0, Ordering::SeqCst);
}
Ok(Ok(v))
}
Err(e) => {
let count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
if count >= self.failure_threshold
&& self
.state
.compare_exchange(CLOSED, OPEN, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
self.last_state_change_nanos
.store(Self::now_nanos(), Ordering::Relaxed);
}
Ok(Err(e))
}
}
}
}