use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeploymentProfile {
Lan,
Wan,
Tuned,
}
impl DeploymentProfile {
pub fn defaults(self) -> (Duration, f64) {
match self {
DeploymentProfile::Lan => (Duration::from_millis(50), 3.0),
DeploymentProfile::Wan => (Duration::from_millis(200), 3.0),
DeploymentProfile::Tuned => (Duration::from_millis(50), 3.0),
}
}
pub fn as_str(self) -> &'static str {
match self {
DeploymentProfile::Lan => "lan_default",
DeploymentProfile::Wan => "wan_default",
DeploymentProfile::Tuned => "tuned",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct CircuitBreakerConfig {
pub profile: DeploymentProfile,
pub heartbeat_lag_floor: Duration,
pub heartbeat_baseline_multiplier: f64,
pub scheduling_latency_threshold: Duration,
pub term_churn_open_min_dwell: Duration,
pub lag_open_min_dwell: Duration,
pub min_cycle_time: Duration,
pub half_open_trial_fraction: f64,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
let (floor, mult) = DeploymentProfile::Lan.defaults();
Self {
profile: DeploymentProfile::Lan,
heartbeat_lag_floor: floor,
heartbeat_baseline_multiplier: mult,
scheduling_latency_threshold: Duration::from_millis(50),
term_churn_open_min_dwell: Duration::from_secs(60),
lag_open_min_dwell: Duration::from_secs(30),
min_cycle_time: Duration::from_secs(60),
half_open_trial_fraction: 0.10,
}
}
}
impl CircuitBreakerConfig {
pub fn for_profile(profile: DeploymentProfile) -> Self {
let (floor, mult) = profile.defaults();
Self {
profile,
heartbeat_lag_floor: floor,
heartbeat_baseline_multiplier: mult,
..Self::default()
}
}
pub fn validate(&self) -> Result<(), &'static str> {
if !self.heartbeat_baseline_multiplier.is_finite()
|| self.heartbeat_baseline_multiplier <= 0.0
{
return Err("heartbeat_baseline_multiplier must be positive and finite");
}
if !(0.0..=1.0).contains(&self.half_open_trial_fraction) {
return Err("half_open_trial_fraction must be in [0.0, 1.0]");
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BreakerState {
Closed = 0,
Open = 1,
HalfOpen = 2,
}
impl BreakerState {
pub fn as_str(self) -> &'static str {
match self {
BreakerState::Closed => "closed",
BreakerState::Open => "open",
BreakerState::HalfOpen => "half_open",
}
}
pub fn gauge_value(self) -> i64 {
self as i64
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OpenReason {
TermChurn,
SustainedLag,
SchedulingLatency,
ActiveElection,
HalfOpenTripBack,
}
impl OpenReason {
pub fn as_str(self) -> &'static str {
match self {
OpenReason::TermChurn => "term_churn",
OpenReason::SustainedLag => "sustained_lag",
OpenReason::SchedulingLatency => "scheduling_latency",
OpenReason::ActiveElection => "active_election",
OpenReason::HalfOpenTripBack => "half_open_trip_back",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct HealthObservation {
pub at: Instant,
pub scheduling_latency_p99: Duration,
pub heartbeat_lag_p99: Duration,
pub term_churn_in_60s: bool,
pub active_election: bool,
pub baseline_lag_p99: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BreakerVerdict {
Allow,
AllowTrial,
WouldReject { reason: OpenReason },
}
#[derive(Debug, Clone)]
pub struct CircuitBreaker {
cfg: CircuitBreakerConfig,
state: BreakerState,
open_reason: Option<OpenReason>,
state_entered_at: Option<Instant>,
last_open_at: Option<Instant>,
consecutive_scheduling_windows: u32,
consecutive_lag_windows: u32,
last_observation_at: Option<Instant>,
}
impl CircuitBreaker {
pub fn new(cfg: CircuitBreakerConfig) -> Self {
Self {
cfg,
state: BreakerState::Closed,
open_reason: None,
state_entered_at: None,
last_open_at: None,
consecutive_scheduling_windows: 0,
consecutive_lag_windows: 0,
last_observation_at: None,
}
}
pub fn state(&self) -> BreakerState {
self.state
}
pub fn open_reason(&self) -> Option<OpenReason> {
self.open_reason
}
pub fn config(&self) -> &CircuitBreakerConfig {
&self.cfg
}
fn lag_elevated(&self, obs: &HealthObservation) -> bool {
let mult_threshold = obs
.baseline_lag_p99
.map(|b| b.mul_f64(self.cfg.heartbeat_baseline_multiplier))
.unwrap_or(Duration::ZERO);
let threshold = self.cfg.heartbeat_lag_floor.max(mult_threshold);
obs.heartbeat_lag_p99 > threshold
}
fn scheduling_elevated(&self, obs: &HealthObservation) -> bool {
obs.scheduling_latency_p99 > self.cfg.scheduling_latency_threshold
}
pub fn observe(&mut self, obs: HealthObservation) -> BreakerState {
if self.scheduling_elevated(&obs) {
self.consecutive_scheduling_windows =
self.consecutive_scheduling_windows.saturating_add(1);
} else {
self.consecutive_scheduling_windows = 0;
}
if self.lag_elevated(&obs) {
self.consecutive_lag_windows = self.consecutive_lag_windows.saturating_add(1);
} else {
self.consecutive_lag_windows = 0;
}
match self.state {
BreakerState::Closed => self.evaluate_open_triggers(&obs),
BreakerState::Open => self.evaluate_close_eligibility(&obs),
BreakerState::HalfOpen => self.evaluate_half_open(&obs),
}
self.last_observation_at = Some(obs.at);
self.state
}
fn evaluate_open_triggers(&mut self, obs: &HealthObservation) {
if obs.term_churn_in_60s {
self.transition_to_open(obs.at, OpenReason::TermChurn);
return;
}
if obs.active_election {
self.transition_to_open(obs.at, OpenReason::ActiveElection);
return;
}
if self.consecutive_scheduling_windows >= 2 {
self.transition_to_open(obs.at, OpenReason::SchedulingLatency);
return;
}
if self.consecutive_lag_windows >= 3 {
self.transition_to_open(obs.at, OpenReason::SustainedLag);
}
}
fn evaluate_close_eligibility(&mut self, obs: &HealthObservation) {
if obs.term_churn_in_60s {
self.state_entered_at = Some(obs.at);
return;
}
if obs.active_election {
self.state_entered_at = Some(obs.at);
return;
}
if self.consecutive_scheduling_windows >= 2 {
self.state_entered_at = Some(obs.at);
return;
}
if self.consecutive_lag_windows >= 3 {
self.state_entered_at = Some(obs.at);
return;
}
let Some(entered_at) = self.state_entered_at else {
return;
};
let dwell = obs.at.saturating_duration_since(entered_at);
let required = match self.open_reason {
Some(OpenReason::TermChurn) => self.cfg.term_churn_open_min_dwell,
Some(OpenReason::SustainedLag) => self.cfg.lag_open_min_dwell,
Some(OpenReason::SchedulingLatency) => self.cfg.lag_open_min_dwell,
Some(OpenReason::ActiveElection) => self.cfg.lag_open_min_dwell,
Some(OpenReason::HalfOpenTripBack) => self.cfg.term_churn_open_min_dwell,
None => Duration::ZERO,
};
if dwell >= required {
self.transition_to_half_open(obs.at);
}
}
fn evaluate_half_open(&mut self, obs: &HealthObservation) {
if obs.term_churn_in_60s
|| obs.active_election
|| self.consecutive_scheduling_windows >= 2
|| self.consecutive_lag_windows >= 3
{
self.transition_to_open(obs.at, OpenReason::HalfOpenTripBack);
return;
}
let Some(entered_at) = self.state_entered_at else {
return;
};
let dwell = obs.at.saturating_duration_since(entered_at);
if dwell >= self.cfg.lag_open_min_dwell {
self.transition_to_closed(obs.at);
}
}
fn transition_to_open(&mut self, at: Instant, reason: OpenReason) {
if let Some(last) = self.last_open_at {
let since = at.saturating_duration_since(last);
if self.state == BreakerState::Closed && since < self.cfg.min_cycle_time {
self.state = BreakerState::Open;
self.open_reason = Some(reason);
self.state_entered_at = Some(at);
self.last_open_at = Some(at);
return;
}
}
self.state = BreakerState::Open;
self.open_reason = Some(reason);
self.state_entered_at = Some(at);
self.last_open_at = Some(at);
}
fn transition_to_half_open(&mut self, at: Instant) {
self.state = BreakerState::HalfOpen;
self.state_entered_at = Some(at);
}
fn transition_to_closed(&mut self, at: Instant) {
self.state = BreakerState::Closed;
self.open_reason = None;
self.state_entered_at = Some(at);
self.consecutive_scheduling_windows = 0;
self.consecutive_lag_windows = 0;
}
pub fn verdict(&self) -> BreakerVerdict {
match self.state {
BreakerState::Closed => BreakerVerdict::Allow,
BreakerState::HalfOpen => BreakerVerdict::AllowTrial,
BreakerState::Open => BreakerVerdict::WouldReject {
reason: self.open_reason.unwrap_or(OpenReason::SustainedLag),
},
}
}
pub fn record_trial_outcome(&mut self, at: Instant, observed_fault: bool) {
if observed_fault && self.state == BreakerState::HalfOpen {
self.transition_to_open(at, OpenReason::HalfOpenTripBack);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn obs_healthy(at: Instant) -> HealthObservation {
HealthObservation {
at,
scheduling_latency_p99: Duration::from_millis(2),
heartbeat_lag_p99: Duration::from_millis(5),
term_churn_in_60s: false,
active_election: false,
baseline_lag_p99: Some(Duration::from_millis(5)),
}
}
fn obs_term_churn(at: Instant) -> HealthObservation {
HealthObservation {
term_churn_in_60s: true,
..obs_healthy(at)
}
}
fn obs_high_sched(at: Instant) -> HealthObservation {
HealthObservation {
scheduling_latency_p99: Duration::from_millis(80),
..obs_healthy(at)
}
}
fn obs_high_lag(at: Instant) -> HealthObservation {
HealthObservation {
heartbeat_lag_p99: Duration::from_millis(200),
..obs_healthy(at)
}
}
#[test]
fn starts_closed() {
let b = CircuitBreaker::new(CircuitBreakerConfig::default());
assert_eq!(b.state(), BreakerState::Closed);
assert_eq!(b.open_reason(), None);
assert_eq!(b.verdict(), BreakerVerdict::Allow);
}
#[test]
fn term_churn_opens_immediately() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
let st = b.observe(obs_term_churn(t0));
assert_eq!(st, BreakerState::Open);
assert_eq!(b.open_reason(), Some(OpenReason::TermChurn));
}
#[test]
fn active_election_opens_immediately() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
let mut o = obs_healthy(t0);
o.active_election = true;
b.observe(o);
assert_eq!(b.state(), BreakerState::Open);
assert_eq!(b.open_reason(), Some(OpenReason::ActiveElection));
}
#[test]
fn scheduling_latency_requires_two_consecutive_windows() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
b.observe(obs_high_sched(t0));
assert_eq!(b.state(), BreakerState::Closed);
b.observe(obs_high_sched(t0 + Duration::from_secs(10)));
assert_eq!(b.state(), BreakerState::Open);
assert_eq!(b.open_reason(), Some(OpenReason::SchedulingLatency));
}
#[test]
fn scheduling_latency_resets_on_healthy_window() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
b.observe(obs_high_sched(t0));
b.observe(obs_healthy(t0 + Duration::from_secs(10)));
b.observe(obs_high_sched(t0 + Duration::from_secs(20)));
assert_eq!(b.state(), BreakerState::Closed);
}
#[test]
fn lag_requires_three_consecutive_windows() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
b.observe(obs_high_lag(t0));
assert_eq!(b.state(), BreakerState::Closed);
b.observe(obs_high_lag(t0 + Duration::from_secs(10)));
assert_eq!(b.state(), BreakerState::Closed);
b.observe(obs_high_lag(t0 + Duration::from_secs(20)));
assert_eq!(b.state(), BreakerState::Open);
assert_eq!(b.open_reason(), Some(OpenReason::SustainedLag));
}
#[test]
fn lag_uses_max_of_floor_and_baseline_multiple() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
let mut o = obs_healthy(t0);
o.heartbeat_lag_p99 = Duration::from_millis(200); o.baseline_lag_p99 = Some(Duration::from_millis(100));
b.observe(o);
b.observe(HealthObservation {
at: t0 + Duration::from_secs(10),
..o
});
b.observe(HealthObservation {
at: t0 + Duration::from_secs(20),
..o
});
assert_eq!(b.state(), BreakerState::Closed);
}
#[test]
fn lag_uses_floor_when_no_baseline() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
let mut o = obs_healthy(t0);
o.heartbeat_lag_p99 = Duration::from_millis(100); o.baseline_lag_p99 = None; b.observe(o);
b.observe(HealthObservation {
at: t0 + Duration::from_secs(10),
..o
});
b.observe(HealthObservation {
at: t0 + Duration::from_secs(20),
..o
});
assert_eq!(b.state(), BreakerState::Open);
}
#[test]
fn term_churn_dwell_is_60s() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
b.observe(obs_term_churn(t0));
assert_eq!(b.state(), BreakerState::Open);
b.observe(obs_healthy(t0 + Duration::from_secs(30)));
assert_eq!(b.state(), BreakerState::Open);
b.observe(obs_healthy(t0 + Duration::from_secs(60)));
assert_eq!(b.state(), BreakerState::HalfOpen);
}
#[test]
fn lag_dwell_is_30s() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
b.observe(obs_high_lag(t0));
b.observe(obs_high_lag(t0 + Duration::from_secs(10)));
b.observe(obs_high_lag(t0 + Duration::from_secs(20)));
assert_eq!(b.state(), BreakerState::Open);
b.observe(obs_healthy(t0 + Duration::from_secs(35)));
assert_eq!(b.state(), BreakerState::Open);
b.observe(obs_healthy(t0 + Duration::from_secs(50)));
assert_eq!(b.state(), BreakerState::HalfOpen);
}
#[test]
fn half_open_trips_back_on_fresh_fault() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
b.observe(obs_term_churn(t0));
b.observe(obs_healthy(t0 + Duration::from_secs(60)));
assert_eq!(b.state(), BreakerState::HalfOpen);
b.observe(obs_term_churn(t0 + Duration::from_secs(70)));
assert_eq!(b.state(), BreakerState::Open);
assert_eq!(b.open_reason(), Some(OpenReason::HalfOpenTripBack));
}
#[test]
fn half_open_closes_after_dwell() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
b.observe(obs_term_churn(t0));
b.observe(obs_healthy(t0 + Duration::from_secs(60))); assert_eq!(b.state(), BreakerState::HalfOpen);
b.observe(obs_healthy(t0 + Duration::from_secs(90)));
assert_eq!(b.state(), BreakerState::Closed);
assert_eq!(b.open_reason(), None);
}
#[test]
fn verdict_reflects_state() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
assert_eq!(b.verdict(), BreakerVerdict::Allow);
b.observe(obs_term_churn(Instant::now()));
assert!(matches!(
b.verdict(),
BreakerVerdict::WouldReject {
reason: OpenReason::TermChurn
}
));
}
#[test]
fn record_trial_outcome_only_acts_in_half_open() {
let mut b = CircuitBreaker::new(CircuitBreakerConfig::default());
let t0 = Instant::now();
b.record_trial_outcome(t0, true);
assert_eq!(b.state(), BreakerState::Closed);
b.observe(obs_term_churn(t0));
b.observe(obs_healthy(t0 + Duration::from_secs(60)));
assert_eq!(b.state(), BreakerState::HalfOpen);
b.record_trial_outcome(t0 + Duration::from_secs(61), true);
assert_eq!(b.state(), BreakerState::Open);
assert_eq!(b.open_reason(), Some(OpenReason::HalfOpenTripBack));
}
#[test]
fn config_validate_rejects_bad_multiplier() {
let mut cfg = CircuitBreakerConfig::default();
cfg.heartbeat_baseline_multiplier = 0.0;
assert!(cfg.validate().is_err());
cfg.heartbeat_baseline_multiplier = -1.0;
assert!(cfg.validate().is_err());
cfg.heartbeat_baseline_multiplier = f64::NAN;
assert!(cfg.validate().is_err());
}
#[test]
fn config_validate_rejects_bad_trial_fraction() {
let mut cfg = CircuitBreakerConfig::default();
cfg.half_open_trial_fraction = 1.5;
assert!(cfg.validate().is_err());
cfg.half_open_trial_fraction = -0.1;
assert!(cfg.validate().is_err());
}
#[test]
fn for_profile_sets_threshold_per_profile() {
let lan = CircuitBreakerConfig::for_profile(DeploymentProfile::Lan);
let wan = CircuitBreakerConfig::for_profile(DeploymentProfile::Wan);
assert_eq!(lan.heartbeat_lag_floor, Duration::from_millis(50));
assert_eq!(wan.heartbeat_lag_floor, Duration::from_millis(200));
}
#[test]
fn breaker_state_gauge_values_pinned() {
assert_eq!(BreakerState::Closed.gauge_value(), 0);
assert_eq!(BreakerState::Open.gauge_value(), 1);
assert_eq!(BreakerState::HalfOpen.gauge_value(), 2);
}
#[test]
fn open_reason_labels_pinned() {
assert_eq!(OpenReason::TermChurn.as_str(), "term_churn");
assert_eq!(OpenReason::SustainedLag.as_str(), "sustained_lag");
assert_eq!(OpenReason::SchedulingLatency.as_str(), "scheduling_latency");
assert_eq!(OpenReason::ActiveElection.as_str(), "active_election");
assert_eq!(OpenReason::HalfOpenTripBack.as_str(), "half_open_trip_back");
}
#[test]
fn anti_flapping_holds_open_when_cycle_too_fast() {
let cfg = CircuitBreakerConfig {
term_churn_open_min_dwell: Duration::from_millis(100),
lag_open_min_dwell: Duration::from_millis(100),
min_cycle_time: Duration::from_secs(60),
..CircuitBreakerConfig::default()
};
let mut b = CircuitBreaker::new(cfg);
let t0 = Instant::now();
b.observe(obs_term_churn(t0));
assert_eq!(b.state(), BreakerState::Open);
b.observe(obs_healthy(t0 + Duration::from_millis(150)));
assert_eq!(b.state(), BreakerState::HalfOpen);
b.observe(obs_healthy(t0 + Duration::from_millis(300)));
assert_eq!(b.state(), BreakerState::Closed);
b.observe(obs_term_churn(t0 + Duration::from_secs(5)));
assert_eq!(b.state(), BreakerState::Open);
}
}