use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
pub(in crate::stdlib) struct CircuitBreakerState {
pub(in crate::stdlib) failure_threshold: u32,
pub(in crate::stdlib) reset_after_ms: i64,
pub(in crate::stdlib) failures: AtomicU32,
pub(in crate::stdlib) opened_at_ms: AtomicI64,
pub(in crate::stdlib) half_open_in_flight: Mutex<bool>,
}
impl CircuitBreakerState {
pub(in crate::stdlib) fn new(failure_threshold: u32, reset_after_ms: i64) -> Self {
Self {
failure_threshold,
reset_after_ms,
failures: AtomicU32::new(0),
opened_at_ms: AtomicI64::new(0),
half_open_in_flight: Mutex::new(false),
}
}
pub(in crate::stdlib) fn disabled() -> Self {
Self::new(0, 0)
}
pub(in crate::stdlib) fn is_enabled(&self) -> bool {
self.failure_threshold > 0
}
pub(in crate::stdlib) fn admit(&self) -> Allow {
if !self.is_enabled() {
return Allow::Closed;
}
let opened_at = self.opened_at_ms.load(Ordering::Acquire);
if opened_at == 0 {
return Allow::Closed;
}
let now = now_ms();
if now.saturating_sub(opened_at) < self.reset_after_ms {
return Allow::Open;
}
let mut guard = match self.half_open_in_flight.try_lock() {
Ok(g) => g,
Err(_) => return Allow::Open,
};
if *guard {
return Allow::Open;
}
*guard = true;
Allow::Probe
}
pub(in crate::stdlib) fn record_success(&self, was_probe: bool) {
if !self.is_enabled() {
return;
}
self.failures.store(0, Ordering::Release);
self.opened_at_ms.store(0, Ordering::Release);
if was_probe {
if let Ok(mut g) = self.half_open_in_flight.lock() {
*g = false;
}
}
}
pub(in crate::stdlib) fn record_failure(&self, was_probe: bool) {
if !self.is_enabled() {
return;
}
if was_probe {
self.opened_at_ms.store(now_ms(), Ordering::Release);
if let Ok(mut g) = self.half_open_in_flight.lock() {
*g = false;
}
return;
}
let prev = self.failures.fetch_add(1, Ordering::AcqRel);
if prev + 1 >= self.failure_threshold {
self.opened_at_ms.store(now_ms(), Ordering::Release);
}
}
pub(in crate::stdlib) fn snapshot(&self) -> CircuitSnapshot {
if !self.is_enabled() {
return CircuitSnapshot {
state: "disabled",
failures: 0,
opened_at_ms: None,
};
}
let opened_at = self.opened_at_ms.load(Ordering::Acquire);
let state = if opened_at == 0 {
"closed"
} else if now_ms().saturating_sub(opened_at) >= self.reset_after_ms {
"half_open"
} else {
"open"
};
CircuitSnapshot {
state,
failures: self.failures.load(Ordering::Acquire),
opened_at_ms: if opened_at == 0 {
None
} else {
Some(opened_at)
},
}
}
}
pub(in crate::stdlib) enum Allow {
Closed,
Probe,
Open,
}
pub(in crate::stdlib) struct CircuitSnapshot {
pub(in crate::stdlib) state: &'static str,
pub(in crate::stdlib) failures: u32,
pub(in crate::stdlib) opened_at_ms: Option<i64>,
}
fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn disabled_breaker_always_allows() {
let cb = CircuitBreakerState::disabled();
assert!(matches!(cb.admit(), Allow::Closed));
cb.record_failure(false);
cb.record_failure(false);
cb.record_failure(false);
assert!(matches!(cb.admit(), Allow::Closed));
}
#[test]
fn opens_after_threshold_then_probes() {
let cb = CircuitBreakerState::new(3, 50);
cb.record_failure(false);
cb.record_failure(false);
assert!(matches!(cb.admit(), Allow::Closed));
cb.record_failure(false);
assert!(matches!(cb.admit(), Allow::Open));
std::thread::sleep(std::time::Duration::from_millis(60));
let first = cb.admit();
assert!(matches!(first, Allow::Probe));
assert!(matches!(cb.admit(), Allow::Open));
cb.record_success(true);
assert!(matches!(cb.admit(), Allow::Closed));
}
#[test]
fn failed_probe_restarts_cooldown() {
let cb = CircuitBreakerState::new(1, 30);
cb.record_failure(false);
assert!(matches!(cb.admit(), Allow::Open));
std::thread::sleep(std::time::Duration::from_millis(40));
let probe = cb.admit();
assert!(matches!(probe, Allow::Probe));
cb.record_failure(true);
assert!(matches!(cb.admit(), Allow::Open));
}
#[test]
fn snapshot_reports_state_transitions() {
let cb = CircuitBreakerState::new(2, 20);
let s = cb.snapshot();
assert_eq!(s.state, "closed");
cb.record_failure(false);
cb.record_failure(false);
let s = cb.snapshot();
assert_eq!(s.state, "open");
assert_eq!(s.failures, 2);
std::thread::sleep(std::time::Duration::from_millis(30));
let s = cb.snapshot();
assert_eq!(s.state, "half_open");
}
}