use core::time::Duration;
use std::collections::VecDeque;
use std::sync::{Mutex, MutexGuard, PoisonError};
use clock_lib::{Clock, Monotonic, SystemClock};
use crate::decision::Decision;
use crate::error::ThrottleError;
use crate::limiter::Limiter;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Trip {
Consecutive(u32),
Ratio {
window: u32,
ratio: f64,
min_calls: u32,
},
Windowed {
failures: u32,
period: Duration,
},
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BreakerState {
Closed,
Open,
HalfOpen,
}
struct Shared {
state: BreakerState,
consecutive: u32,
outcomes: VecDeque<bool>,
failure_times: VecDeque<u64>,
half_open_inflight: u32,
half_open_successes: u32,
open_until_ms: u64,
}
impl Shared {
fn new() -> Self {
Self {
state: BreakerState::Closed,
consecutive: 0,
outcomes: VecDeque::new(),
failure_times: VecDeque::new(),
half_open_inflight: 0,
half_open_successes: 0,
open_until_ms: 0,
}
}
fn reset_counters(&mut self) {
self.consecutive = 0;
self.outcomes.clear();
self.failure_times.clear();
self.half_open_inflight = 0;
self.half_open_successes = 0;
}
}
enum Admit {
Allow,
Reject(Duration),
}
pub struct CircuitBreaker<L, C = SystemClock>
where
C: Clock,
{
inner: L,
config: Config,
shared: Mutex<Shared>,
clock: C,
epoch: Monotonic,
}
#[derive(Debug, Clone, Copy)]
struct Config {
trip: Trip,
cooldown: Duration,
half_open_trials: u32,
half_open_required: u32,
}
impl CircuitBreaker<core::convert::Infallible> {
#[must_use]
pub fn builder() -> CircuitBreakerBuilder {
CircuitBreakerBuilder::new()
}
}
impl<L, C> CircuitBreaker<L, C>
where
L: Limiter,
C: Clock + Clone,
{
fn new(inner: L, config: Config, clock: C) -> Self {
let epoch = clock.now();
Self {
inner,
config,
shared: Mutex::new(Shared::new()),
clock,
epoch,
}
}
#[must_use]
pub fn with_clock<C2>(self, clock: C2) -> CircuitBreaker<L, C2>
where
C2: Clock + Clone,
{
CircuitBreaker::new(self.inner, self.config, clock)
}
#[must_use]
pub fn state(&self) -> BreakerState {
self.lock().state
}
pub fn inner(&self) -> &L {
&self.inner
}
#[inline]
fn lock(&self) -> MutexGuard<'_, Shared> {
self.shared.lock().unwrap_or_else(PoisonError::into_inner)
}
#[inline]
fn now_ms(&self) -> u64 {
let elapsed = self.clock.now().saturating_duration_since(self.epoch);
u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX)
}
fn admit(&self, now_ms: u64) -> Admit {
let mut shared = self.lock();
match shared.state {
BreakerState::Closed => Admit::Allow,
BreakerState::Open => {
if now_ms >= shared.open_until_ms {
shared.state = BreakerState::HalfOpen;
shared.half_open_inflight = 1;
shared.half_open_successes = 0;
crate::obs::circuit_transition("Open", "HalfOpen", 1);
Admit::Allow
} else {
Admit::Reject(Duration::from_millis(shared.open_until_ms - now_ms))
}
}
BreakerState::HalfOpen => {
if shared.half_open_inflight < self.config.half_open_trials {
shared.half_open_inflight += 1;
Admit::Allow
} else {
Admit::Reject(Duration::ZERO)
}
}
}
}
fn abort(&self) {
let mut shared = self.lock();
if shared.state == BreakerState::HalfOpen {
shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
}
}
fn record(&self, success: bool) {
let now_ms = self.now_ms();
let mut shared = self.lock();
match shared.state {
BreakerState::HalfOpen => {
shared.half_open_inflight = shared.half_open_inflight.saturating_sub(1);
if success {
shared.half_open_successes += 1;
if shared.half_open_successes >= self.config.half_open_required {
shared.state = BreakerState::Closed;
shared.reset_counters();
crate::obs::circuit_transition("HalfOpen", "Closed", 0);
}
} else {
self.open(&mut shared, now_ms);
}
}
BreakerState::Closed => {
if success {
shared.consecutive = 0;
record_outcome(&mut shared, false, now_ms, self.config.trip);
} else {
shared.consecutive += 1;
record_outcome(&mut shared, true, now_ms, self.config.trip);
if tripped(&shared, now_ms, self.config.trip) {
self.open(&mut shared, now_ms);
}
}
}
BreakerState::Open => {}
}
}
fn open(&self, shared: &mut Shared, now_ms: u64) {
let from = if shared.state == BreakerState::HalfOpen {
"HalfOpen"
} else {
"Closed"
};
shared.state = BreakerState::Open;
shared.open_until_ms = now_ms
.saturating_add(u64::try_from(self.config.cooldown.as_millis()).unwrap_or(u64::MAX));
shared.half_open_inflight = 0;
shared.half_open_successes = 0;
crate::obs::circuit_transition(from, "Open", 2);
}
pub fn record_success(&self) {
self.record(true);
}
pub fn record_failure(&self) {
self.record(false);
}
pub fn try_acquire(&self) -> Result<Option<Permit<'_, L, C>>, ThrottleError> {
let now_ms = self.now_ms();
match self.admit(now_ms) {
Admit::Reject(retry_after) => Err(ThrottleError::CircuitOpen { retry_after }),
Admit::Allow => match self.inner.acquire_cost(1) {
Decision::Acquired => Ok(Some(Permit::new(self))),
Decision::Retry { .. } => {
self.abort();
Ok(None)
}
Decision::Impossible => {
self.abort();
Err(ThrottleError::CostExceedsCapacity {
cost: 1,
capacity: self.inner.capacity(),
})
}
},
}
}
}
#[cfg(feature = "runtime")]
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
impl<L, C> CircuitBreaker<L, C>
where
L: Limiter,
C: Clock + Clone,
{
pub async fn acquire(&self) -> Result<Permit<'_, L, C>, ThrottleError> {
match self.admit(self.now_ms()) {
Admit::Reject(retry_after) => return Err(ThrottleError::CircuitOpen { retry_after }),
Admit::Allow => {}
}
loop {
match self.inner.acquire_cost(1) {
Decision::Acquired => return Ok(Permit::new(self)),
Decision::Retry { after } => crate::rt::sleep(after).await,
Decision::Impossible => {
self.abort();
return Err(ThrottleError::CostExceedsCapacity {
cost: 1,
capacity: self.inner.capacity(),
});
}
}
}
}
}
fn record_outcome(shared: &mut Shared, failure: bool, now_ms: u64, trip: Trip) {
match trip {
Trip::Consecutive(_) => {}
Trip::Ratio { window, .. } => {
shared.outcomes.push_back(failure);
while shared.outcomes.len() > window as usize {
let _ = shared.outcomes.pop_front();
}
}
Trip::Windowed { period, .. } => {
if failure {
shared.failure_times.push_back(now_ms);
}
let cutoff =
now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
while shared.failure_times.front().is_some_and(|&t| t < cutoff) {
let _ = shared.failure_times.pop_front();
}
}
}
}
fn tripped(shared: &Shared, now_ms: u64, trip: Trip) -> bool {
match trip {
Trip::Consecutive(n) => shared.consecutive >= n,
Trip::Ratio {
ratio, min_calls, ..
} => {
let total = shared.outcomes.len() as u32;
if total < min_calls || total == 0 {
return false;
}
let failures = shared.outcomes.iter().filter(|&&f| f).count() as u32;
f64::from(failures) / f64::from(total) >= ratio
}
Trip::Windowed { failures, period } => {
let cutoff =
now_ms.saturating_sub(u64::try_from(period.as_millis()).unwrap_or(u64::MAX));
let recent = shared
.failure_times
.iter()
.filter(|&&t| t >= cutoff)
.count() as u32;
recent >= failures
}
}
}
#[must_use = "settle the permit with `.success()` or `.failure()`; dropping it counts as a failure"]
pub struct Permit<'a, L, C>
where
L: Limiter,
C: Clock + Clone,
{
breaker: &'a CircuitBreaker<L, C>,
settled: bool,
}
impl<'a, L, C> Permit<'a, L, C>
where
L: Limiter,
C: Clock + Clone,
{
fn new(breaker: &'a CircuitBreaker<L, C>) -> Self {
Self {
breaker,
settled: false,
}
}
pub fn success(mut self) {
self.breaker.record(true);
self.settled = true;
}
pub fn failure(mut self) {
self.breaker.record(false);
self.settled = true;
}
}
impl<L, C> Drop for Permit<'_, L, C>
where
L: Limiter,
C: Clock + Clone,
{
fn drop(&mut self) {
if !self.settled {
self.breaker.record(false);
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct CircuitBreakerBuilder {
trip: Trip,
cooldown: Duration,
half_open_trials: u32,
half_open_required: u32,
}
impl Default for CircuitBreakerBuilder {
fn default() -> Self {
Self::new()
}
}
impl CircuitBreakerBuilder {
#[must_use]
pub fn new() -> Self {
Self {
trip: Trip::Consecutive(5),
cooldown: Duration::from_secs(30),
half_open_trials: 1,
half_open_required: 1,
}
}
#[must_use]
pub fn trip(mut self, trip: Trip) -> Self {
self.trip = trip;
self
}
#[must_use]
pub fn cooldown(mut self, cooldown: Duration) -> Self {
self.cooldown = cooldown;
self
}
#[must_use]
pub fn half_open(mut self, trials: u32, required: u32) -> Self {
self.half_open_trials = trials.max(1);
self.half_open_required = required.max(1).min(self.half_open_trials);
self
}
#[must_use]
pub fn build<L>(self, limiter: L) -> CircuitBreaker<L, SystemClock>
where
L: Limiter,
{
CircuitBreaker::new(
limiter,
Config {
trip: self.trip,
cooldown: self.cooldown,
half_open_trials: self.half_open_trials,
half_open_required: self.half_open_required,
},
SystemClock::new(),
)
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::expect_used)]
use super::{BreakerState, CircuitBreaker, Trip};
use crate::throttle::Throttle;
use clock_lib::ManualClock;
use core::time::Duration;
use std::sync::Arc;
fn assert_send_sync<T: Send + Sync>() {}
#[test]
fn test_breaker_is_send_sync() {
assert_send_sync::<CircuitBreaker<Throttle>>();
}
fn breaker(
trip: Trip,
cooldown: Duration,
clock: Arc<ManualClock>,
) -> CircuitBreaker<Throttle, Arc<ManualClock>> {
CircuitBreaker::builder()
.trip(trip)
.cooldown(cooldown)
.half_open(1, 1)
.build(Throttle::per_second(1_000_000))
.with_clock(clock)
}
#[test]
fn test_consecutive_failures_trip_open() {
let clock = Arc::new(ManualClock::new());
let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
assert_eq!(cb.state(), BreakerState::Closed);
cb.record_failure();
cb.record_failure();
assert_eq!(cb.state(), BreakerState::Closed);
cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
}
#[test]
fn test_success_resets_consecutive_count() {
let clock = Arc::new(ManualClock::new());
let cb = breaker(Trip::Consecutive(3), Duration::from_secs(10), clock);
cb.record_failure();
cb.record_failure();
cb.record_success(); cb.record_failure();
cb.record_failure();
assert_eq!(cb.state(), BreakerState::Closed); }
#[test]
fn test_open_sheds_requests_without_touching_limiter() {
let clock = Arc::new(ManualClock::new());
let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock);
cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
let before = cb.inner().available();
let result = cb.try_acquire();
assert!(matches!(
result,
Err(crate::ThrottleError::CircuitOpen { .. })
));
assert_eq!(cb.inner().available(), before);
}
#[test]
fn test_half_open_after_cooldown_then_close_on_success() {
let clock = Arc::new(ManualClock::new());
let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
clock.advance(Duration::from_secs(10)); let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
assert_eq!(cb.state(), BreakerState::HalfOpen);
permit.success();
assert_eq!(cb.state(), BreakerState::Closed);
}
#[test]
fn test_half_open_failure_reopens() {
let clock = Arc::new(ManualClock::new());
let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
cb.record_failure(); clock.advance(Duration::from_secs(10));
let permit = cb.try_acquire().unwrap().expect("a trial is admitted");
assert_eq!(cb.state(), BreakerState::HalfOpen);
permit.failure(); assert_eq!(cb.state(), BreakerState::Open);
}
#[test]
fn test_open_rejects_until_cooldown_elapses() {
let clock = Arc::new(ManualClock::new());
let cb = breaker(Trip::Consecutive(1), Duration::from_secs(10), clock.clone());
cb.record_failure(); clock.advance(Duration::from_secs(9)); assert!(matches!(
cb.try_acquire(),
Err(crate::ThrottleError::CircuitOpen { .. })
));
clock.advance(Duration::from_secs(1)); assert!(cb.try_acquire().unwrap().is_some());
}
#[test]
fn test_dropping_permit_counts_as_failure() {
let clock = Arc::new(ManualClock::new());
let cb = breaker(Trip::Consecutive(2), Duration::from_secs(10), clock);
drop(cb.try_acquire().unwrap());
assert_eq!(cb.state(), BreakerState::Closed);
drop(cb.try_acquire().unwrap());
assert_eq!(cb.state(), BreakerState::Open);
}
#[test]
fn test_ratio_trip() {
let clock = Arc::new(ManualClock::new());
let cb = breaker(
Trip::Ratio {
window: 10,
ratio: 0.5,
min_calls: 4,
},
Duration::from_secs(10),
clock,
);
cb.record_success();
cb.record_success();
assert_eq!(cb.state(), BreakerState::Closed);
cb.record_failure();
cb.record_failure(); assert_eq!(cb.state(), BreakerState::Open);
}
#[test]
fn test_windowed_trip_prunes_old_failures() {
let clock = Arc::new(ManualClock::new());
let cb = breaker(
Trip::Windowed {
failures: 3,
period: Duration::from_secs(5),
},
Duration::from_secs(10),
clock.clone(),
);
cb.record_failure();
clock.advance(Duration::from_secs(6)); cb.record_failure();
cb.record_failure();
assert_eq!(cb.state(), BreakerState::Closed); cb.record_failure();
assert_eq!(cb.state(), BreakerState::Open); }
}