use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::clock::{Clock, SystemClock};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CircuitBreakerConfig {
pub loss_limit: u32,
pub window_secs: u64,
pub cooldown_secs: u64,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
loss_limit: 4,
window_secs: 14_400,
cooldown_secs: 3_600,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CircuitBreakerSnapshot {
pub recent_losses: Vec<u64>,
pub tripped_at_unix_secs: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct CircuitBreaker {
config: CircuitBreakerConfig,
recent_losses: VecDeque<u64>,
tripped_at_unix_secs: Option<u64>,
clock: Arc<dyn Clock>,
}
impl CircuitBreaker {
pub fn new(config: CircuitBreakerConfig) -> Self {
Self::with_clock(config, Arc::new(SystemClock))
}
pub fn with_clock(config: CircuitBreakerConfig, clock: Arc<dyn Clock>) -> Self {
Self {
config,
recent_losses: VecDeque::with_capacity(16),
tripped_at_unix_secs: None,
clock,
}
}
pub fn tick(&mut self) {
let now = self.clock.now_unix_secs();
if let Some(t) = self.tripped_at_unix_secs
&& now.saturating_sub(t) >= self.config.cooldown_secs
{
self.reset();
}
self.evict_old(now);
}
pub fn record_loss(&mut self) {
let now = self.clock.now_unix_secs();
self.recent_losses.push_back(now);
self.evict_old(now);
if self.recent_losses.len() as u32 >= self.config.loss_limit {
self.tripped_at_unix_secs = Some(now);
tracing::warn!(
losses = self.recent_losses.len(),
window_secs = self.config.window_secs,
"circuit breaker tripped"
);
}
}
pub fn record_win(&mut self) {
self.evict_old(self.clock.now_unix_secs());
}
pub fn is_tripped(&self) -> bool {
self.tripped_at_unix_secs.is_some_and(|t| {
self.clock.now_unix_secs().saturating_sub(t) < self.config.cooldown_secs
})
}
pub fn snapshot(&self) -> CircuitBreakerSnapshot {
CircuitBreakerSnapshot {
recent_losses: self.recent_losses.iter().copied().collect(),
tripped_at_unix_secs: self.tripped_at_unix_secs,
}
}
pub fn restore(&mut self, snap: CircuitBreakerSnapshot) {
self.recent_losses = snap.recent_losses.into_iter().collect();
self.tripped_at_unix_secs = snap.tripped_at_unix_secs;
}
pub fn reset(&mut self) {
self.recent_losses.clear();
self.tripped_at_unix_secs = None;
}
pub fn recent_loss_count(&self) -> usize {
self.recent_losses.len()
}
pub fn cooldown_remaining(&self) -> Option<Duration> {
let t = self.tripped_at_unix_secs?;
let elapsed = self.clock.now_unix_secs().saturating_sub(t);
(elapsed < self.config.cooldown_secs)
.then(|| Duration::from_secs(self.config.cooldown_secs - elapsed))
}
fn evict_old(&mut self, now: u64) {
let cutoff = now.saturating_sub(self.config.window_secs);
while let Some(&ts) = self.recent_losses.front() {
if ts < cutoff {
self.recent_losses.pop_front();
} else {
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::clock::ManualClock;
fn cfg(loss_limit: u32, window: u64, cooldown: u64) -> CircuitBreakerConfig {
CircuitBreakerConfig {
loss_limit,
window_secs: window,
cooldown_secs: cooldown,
}
}
fn breaker(
loss_limit: u32,
window: u64,
cooldown: u64,
start: u64,
) -> (CircuitBreaker, Arc<ManualClock>) {
let clock = Arc::new(ManualClock::new(start));
let cb = CircuitBreaker::with_clock(cfg(loss_limit, window, cooldown), clock.clone());
(cb, clock)
}
#[test]
fn starts_untripped() {
let cb = CircuitBreaker::new(cfg(4, 14400, 3600));
assert!(!cb.is_tripped());
assert_eq!(cb.recent_loss_count(), 0);
}
#[test]
fn trips_at_limit() {
let mut cb = CircuitBreaker::new(cfg(3, 14400, 3600));
cb.record_loss();
cb.record_loss();
assert!(!cb.is_tripped());
cb.record_loss();
assert!(cb.is_tripped());
}
#[test]
fn win_does_not_untrip() {
let mut cb = CircuitBreaker::new(cfg(2, 14400, 3600));
cb.record_loss();
cb.record_loss();
assert!(cb.is_tripped());
cb.record_win();
assert!(cb.is_tripped());
}
#[test]
fn reset_clears_state() {
let mut cb = CircuitBreaker::new(cfg(2, 14400, 3600));
cb.record_loss();
cb.record_loss();
cb.reset();
assert!(!cb.is_tripped());
assert_eq!(cb.recent_loss_count(), 0);
}
#[test]
fn old_losses_evicted_from_rolling_window() {
let (mut cb, clock) = breaker(
3, 3600, 600, 1_000_000,
);
cb.record_loss(); cb.record_loss(); assert_eq!(cb.recent_loss_count(), 2);
clock.advance_secs(3_700);
cb.record_loss(); assert_eq!(
cb.recent_loss_count(),
1,
"losses outside the rolling window must be evicted"
);
assert!(!cb.is_tripped(), "rolling count of 1 should not trip");
}
#[test]
fn cooldown_auto_resets_on_tick() {
let (mut cb, clock) = breaker(
2, 3600, 600, 1_000_000,
);
cb.record_loss();
cb.record_loss();
assert!(cb.is_tripped());
assert_eq!(
cb.cooldown_remaining(),
Some(Duration::from_secs(600)),
"cooldown should report full remaining at trip time"
);
clock.advance_secs(300);
cb.tick();
assert!(cb.is_tripped());
assert_eq!(cb.cooldown_remaining(), Some(Duration::from_secs(300)));
clock.advance_secs(301);
cb.tick();
assert!(!cb.is_tripped());
assert_eq!(cb.cooldown_remaining(), None);
assert_eq!(cb.recent_loss_count(), 0);
}
#[test]
fn losses_spaced_outside_window_never_trip() {
let (mut cb, clock) = breaker(3, 3600, 600, 1_000_000);
cb.record_loss();
clock.advance_secs(3_700);
cb.record_loss();
clock.advance_secs(3_700);
cb.record_loss();
assert!(!cb.is_tripped());
assert_eq!(cb.recent_loss_count(), 1);
}
#[test]
fn snapshot_restore_preserves_tripped_state() {
let (mut cb, clock) = breaker(2, 14_400, 3_600, 1_000_000);
cb.record_loss();
cb.record_loss();
assert!(cb.is_tripped());
let snap = cb.snapshot();
assert_eq!(snap.recent_losses.len(), 2);
assert_eq!(snap.tripped_at_unix_secs, Some(1_000_000));
let mut restored = CircuitBreaker::with_clock(cfg(2, 14_400, 3_600), clock.clone());
restored.restore(snap.clone());
assert!(restored.is_tripped());
assert_eq!(restored.recent_loss_count(), 2);
assert_eq!(restored.snapshot(), snap);
}
#[test]
fn restore_then_tick_resets_after_cooldown_elapsed_during_downtime() {
let (mut cb, _clock) = breaker(2, 14_400, 600, 1_000_000);
cb.record_loss();
cb.record_loss();
assert!(cb.is_tripped());
let snap = cb.snapshot();
let later = Arc::new(ManualClock::new(1_000_700));
let mut restored = CircuitBreaker::with_clock(cfg(2, 14_400, 600), later);
restored.restore(snap);
restored.tick();
assert!(
!restored.is_tripped(),
"expired cooldown must reset on tick"
);
assert_eq!(restored.recent_loss_count(), 0);
}
#[test]
fn restore_then_tick_evicts_losses_outside_window() {
let (mut cb, _clock) = breaker(5, 3_600, 600, 1_000_000);
cb.record_loss();
cb.record_loss();
let snap = cb.snapshot();
let later = Arc::new(ManualClock::new(1_004_000));
let mut restored = CircuitBreaker::with_clock(cfg(5, 3_600, 600), later);
restored.restore(snap);
restored.tick();
assert_eq!(
restored.recent_loss_count(),
0,
"stale losses must be evicted"
);
}
}