use crate::resil::{
BreakerConfig, BreakerError, BreakerPolicyConfig, BreakerSnapshot, BreakerState, RollingWindow,
WindowOutcome,
};
#[derive(Debug)]
pub(crate) struct CircuitBreakerState {
config: BreakerConfig,
policy: BreakerPolicyConfig,
state: BreakerState,
failures: u32,
opened_at: Option<std::time::Instant>,
last_force_pass_at: Option<std::time::Instant>,
half_open_in_flight: u32,
decisions: u64,
window: RollingWindow,
}
impl CircuitBreakerState {
pub(crate) fn new(config: BreakerConfig, policy: BreakerPolicyConfig) -> Self {
Self {
window: RollingWindow::new(policy.window),
config: BreakerConfig {
failure_threshold: config.failure_threshold.max(1),
reset_timeout: config.reset_timeout,
},
policy: BreakerPolicyConfig {
failure_ratio_percent: policy.failure_ratio_percent.min(100),
drop_ratio_percent: policy.drop_ratio_percent.min(100),
half_open_max_calls: policy.half_open_max_calls.max(1),
sre_k_millis: policy.sre_k_millis.max(1000),
sre_protection: policy.sre_protection.max(1),
..policy
},
state: BreakerState::Closed,
failures: 0,
opened_at: None,
last_force_pass_at: None,
half_open_in_flight: 0,
decisions: 0,
}
}
pub(crate) fn state(&mut self) -> BreakerState {
self.refresh_state();
self.state
}
pub(crate) fn allow(&mut self) -> Result<(), BreakerError> {
self.refresh_state();
match self.state {
BreakerState::Closed => self.allow_closed(),
BreakerState::Open => {
self.window.record(WindowOutcome::Drop);
Err(BreakerError::Open)
}
BreakerState::HalfOpen => {
if self.half_open_in_flight < self.policy.half_open_max_calls {
self.half_open_in_flight += 1;
Ok(())
} else {
self.window.record(WindowOutcome::Drop);
Err(BreakerError::Open)
}
}
}
}
pub(crate) fn record_success(&mut self) {
self.window.record(WindowOutcome::Success);
self.failures = 0;
if self.state == BreakerState::HalfOpen {
self.half_open_in_flight = self.half_open_in_flight.saturating_sub(1);
}
self.close();
}
pub(crate) fn record_failure(&mut self) {
self.window.record(WindowOutcome::Failure);
if self.state == BreakerState::HalfOpen {
self.half_open_in_flight = self.half_open_in_flight.saturating_sub(1);
self.open();
return;
}
self.failures = self.failures.saturating_add(1);
if self.failures >= self.config.failure_threshold || self.window_is_unhealthy() {
self.open();
}
}
pub(crate) fn snapshot(&mut self) -> BreakerSnapshot {
BreakerSnapshot {
state: self.state(),
consecutive_failures: self.failures,
half_open_in_flight: self.half_open_in_flight,
window: self.window.snapshot(),
}
}
fn allow_closed(&mut self) -> Result<(), BreakerError> {
if self.policy.sre_rejection_enabled && self.should_drop_sre() {
self.window.record(WindowOutcome::Drop);
return Err(BreakerError::Dropped);
}
if self.window_is_unhealthy() && self.should_drop() {
self.window.record(WindowOutcome::Drop);
return Err(BreakerError::Dropped);
}
Ok(())
}
fn refresh_state(&mut self) {
if self.state != BreakerState::Open {
return;
}
let Some(opened_at) = self.opened_at else {
self.state = BreakerState::HalfOpen;
return;
};
if opened_at.elapsed() >= self.config.reset_timeout {
self.state = BreakerState::HalfOpen;
self.half_open_in_flight = 0;
self.last_force_pass_at = Some(std::time::Instant::now());
return;
}
if self.policy.force_pass_interval.is_zero() {
return;
}
let force_pass_due = self
.last_force_pass_at
.map(|last| last.elapsed() >= self.policy.force_pass_interval)
.unwrap_or_else(|| opened_at.elapsed() >= self.policy.force_pass_interval);
if force_pass_due {
self.state = BreakerState::HalfOpen;
self.half_open_in_flight = 0;
self.last_force_pass_at = Some(std::time::Instant::now());
}
}
fn window_is_unhealthy(&self) -> bool {
let snapshot = self.window.snapshot();
snapshot.successes + snapshot.failures >= self.policy.min_request_count
&& snapshot.failure_ratio() * 100.0 >= self.policy.failure_ratio_percent as f64
}
fn should_drop(&mut self) -> bool {
self.decisions = self.decisions.wrapping_add(1);
let bucket = self.decisions.wrapping_mul(37) % 100;
bucket < u64::from(self.policy.drop_ratio_percent)
}
fn should_drop_sre(&mut self) -> bool {
let snapshot = self.window.snapshot();
let accepts = snapshot.successes;
let total = snapshot.successes + snapshot.failures + snapshot.drops;
if total <= self.policy.sre_protection || accepts == 0 {
return false;
}
let weighted_accepts = self.policy.sre_k_millis as f64 / 1000.0 * accepts as f64;
let drop_ratio = (total.saturating_sub(self.policy.sre_protection) as f64
- weighted_accepts)
/ (total + 1) as f64;
if drop_ratio <= 0.0 {
return false;
}
self.decisions = self.decisions.wrapping_add(1);
let bucket = (self.decisions.wrapping_mul(37) % 10_000) as f64 / 10_000.0;
bucket < drop_ratio.min(1.0)
}
fn open(&mut self) {
self.state = BreakerState::Open;
self.opened_at = Some(std::time::Instant::now());
self.last_force_pass_at = None;
self.half_open_in_flight = 0;
}
fn close(&mut self) {
self.state = BreakerState::Closed;
self.opened_at = None;
self.last_force_pass_at = None;
self.half_open_in_flight = 0;
}
}