use super::config::CircuitBreakerConfig;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open {
since_ms: u64,
},
HalfOpen {
probes_in_flight: u32,
},
}
#[derive(Debug)]
pub struct EndpointHealth {
state: CircuitState,
config: CircuitBreakerConfig,
avg_latency_ns: u64,
total_requests: u64,
consecutive_failures: u32,
error_rate: f64,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct EndpointStatus {
pub state: CircuitState,
pub avg_latency_ns: u64,
pub total_requests: u64,
pub error_rate: f64,
pub consecutive_failures: u32,
}
impl EndpointHealth {
pub fn new(config: CircuitBreakerConfig) -> Self {
Self {
state: CircuitState::Closed,
config,
avg_latency_ns: 0,
total_requests: 0,
consecutive_failures: 0,
error_rate: 0.0,
}
}
pub fn state(&self) -> CircuitState {
self.state
}
pub fn is_callable(&mut self, now_ms: u64) -> bool {
match self.state {
CircuitState::Closed => true,
CircuitState::Open { since_ms } => {
let elapsed = now_ms.saturating_sub(since_ms);
if elapsed >= self.config.recovery_timeout.as_millis() as u64 {
self.state = CircuitState::HalfOpen {
probes_in_flight: 1,
};
true
} else {
false
}
}
CircuitState::HalfOpen { probes_in_flight } => {
if probes_in_flight < self.config.half_open_max_requests {
self.state = CircuitState::HalfOpen {
probes_in_flight: probes_in_flight + 1,
};
true
} else {
false
}
}
}
}
pub fn record_success(&mut self, latency_ns: u64) {
if self.total_requests == 0 {
self.avg_latency_ns = latency_ns;
} else {
self.avg_latency_ns = self
.avg_latency_ns
.saturating_mul(4)
.saturating_add(latency_ns)
/ 5;
}
self.total_requests += 1;
self.consecutive_failures = 0;
self.error_rate *= 0.9;
match self.state {
CircuitState::HalfOpen { .. } => {
self.state = CircuitState::Closed;
}
CircuitState::Open { .. } => {
self.state = CircuitState::Closed;
}
CircuitState::Closed => {}
}
}
pub fn record_failure(&mut self, now_ms: u64) {
self.consecutive_failures += 1;
self.error_rate = self.error_rate * 0.9 + 0.1;
match self.state {
CircuitState::Closed => {
if self.consecutive_failures >= self.config.failure_threshold {
self.state = CircuitState::Open { since_ms: now_ms };
}
}
CircuitState::HalfOpen { .. } => {
self.state = CircuitState::Open { since_ms: now_ms };
}
CircuitState::Open { .. } => {
self.state = CircuitState::Open { since_ms: now_ms };
}
}
}
pub fn avg_latency_ns(&self) -> u64 {
self.avg_latency_ns
}
pub fn status(&self) -> EndpointStatus {
EndpointStatus {
state: self.state,
avg_latency_ns: self.avg_latency_ns,
total_requests: self.total_requests,
error_rate: self.error_rate,
consecutive_failures: self.consecutive_failures,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn default_config() -> CircuitBreakerConfig {
CircuitBreakerConfig {
failure_threshold: 3,
recovery_timeout: Duration::from_secs(30),
half_open_max_requests: 1,
}
}
#[test]
fn new_endpoint_is_closed_and_callable() {
let mut h = EndpointHealth::new(default_config());
assert_eq!(h.state(), CircuitState::Closed);
assert!(h.is_callable(0));
}
#[test]
fn opens_after_consecutive_failures_reach_threshold() {
let mut h = EndpointHealth::new(default_config());
h.record_failure(1000);
h.record_failure(2000);
assert_eq!(h.state(), CircuitState::Closed); h.record_failure(3000);
assert!(matches!(h.state(), CircuitState::Open { since_ms: 3000 }));
}
#[test]
fn open_circuit_rejects_requests() {
let mut h = EndpointHealth::new(default_config());
for t in 1..=3 {
h.record_failure(t * 1000);
}
assert!(!h.is_callable(5000));
}
#[test]
fn open_transitions_to_half_open_after_recovery_timeout() {
let mut h = EndpointHealth::new(default_config());
for t in 1..=3 {
h.record_failure(t * 1000);
}
assert!(matches!(h.state(), CircuitState::Open { .. }));
assert!(h.is_callable(33_000));
assert!(matches!(
h.state(),
CircuitState::HalfOpen {
probes_in_flight: 1
}
));
}
#[test]
fn half_open_limits_probes() {
let config = CircuitBreakerConfig {
half_open_max_requests: 2,
..default_config()
};
let mut h = EndpointHealth::new(config);
for t in 1..=3 {
h.record_failure(t * 1000);
}
assert!(h.is_callable(33_000)); assert!(h.is_callable(33_001)); assert!(!h.is_callable(33_002)); }
#[test]
fn half_open_success_closes_circuit() {
let mut h = EndpointHealth::new(default_config());
for t in 1..=3 {
h.record_failure(t * 1000);
}
h.is_callable(33_000); h.record_success(500_000); assert_eq!(h.state(), CircuitState::Closed);
assert!(h.is_callable(33_001));
}
#[test]
fn half_open_failure_reopens_circuit() {
let mut h = EndpointHealth::new(default_config());
for t in 1..=3 {
h.record_failure(t * 1000);
}
h.is_callable(33_000); h.record_failure(33_500); assert!(matches!(h.state(), CircuitState::Open { since_ms: 33_500 }));
}
#[test]
fn first_sample_sets_latency_directly() {
let mut h = EndpointHealth::new(default_config());
h.record_success(10_000);
assert_eq!(h.avg_latency_ns(), 10_000);
}
#[test]
fn ema_converges_toward_new_value() {
let mut h = EndpointHealth::new(default_config());
h.record_success(10_000); h.record_success(20_000); assert_eq!(h.avg_latency_ns(), 12_000);
h.record_success(20_000); assert_eq!(h.avg_latency_ns(), 13_600);
}
#[test]
fn success_resets_consecutive_failures() {
let mut h = EndpointHealth::new(default_config());
h.record_failure(1000);
h.record_failure(2000);
assert_eq!(h.status().consecutive_failures, 2);
h.record_success(1000);
assert_eq!(h.status().consecutive_failures, 0);
}
#[test]
fn error_rate_increases_on_failures() {
let mut h = EndpointHealth::new(default_config());
assert_eq!(h.status().error_rate, 0.0);
h.record_failure(1000);
assert!((h.status().error_rate - 0.1).abs() < 1e-10);
h.record_failure(2000);
assert!((h.status().error_rate - 0.19).abs() < 1e-10);
}
#[test]
fn error_rate_decays_on_success() {
let mut h = EndpointHealth::new(default_config());
h.record_failure(1000); h.record_success(1000); assert!((h.status().error_rate - 0.09).abs() < 1e-10);
}
#[test]
fn status_reflects_current_state() {
let mut h = EndpointHealth::new(default_config());
h.record_success(5000);
h.record_success(15000);
let s = h.status();
assert_eq!(s.state, CircuitState::Closed);
assert_eq!(s.total_requests, 2);
assert_eq!(s.consecutive_failures, 0);
assert_eq!(s.avg_latency_ns, 7000);
}
#[test]
fn failure_threshold_one_opens_immediately() {
let config = CircuitBreakerConfig {
failure_threshold: 1,
..default_config()
};
let mut h = EndpointHealth::new(config);
h.record_failure(100);
assert!(matches!(h.state(), CircuitState::Open { .. }));
}
#[test]
fn recovery_timeout_zero_transitions_immediately() {
let config = CircuitBreakerConfig {
recovery_timeout: Duration::ZERO,
..default_config()
};
let mut h = EndpointHealth::new(config);
for t in 1..=3 {
h.record_failure(t * 1000);
}
assert!(h.is_callable(3000));
assert!(matches!(h.state(), CircuitState::HalfOpen { .. }));
}
#[test]
fn multiple_failures_in_open_update_timestamp() {
let mut h = EndpointHealth::new(default_config());
for t in 1..=3 {
h.record_failure(t * 1000);
}
h.record_failure(5000);
assert!(matches!(h.state(), CircuitState::Open { since_ms: 5000 }));
}
#[test]
fn full_lifecycle_closed_open_halfopen_closed() {
let config = CircuitBreakerConfig {
failure_threshold: 2,
recovery_timeout: Duration::from_millis(100),
half_open_max_requests: 1,
};
let mut h = EndpointHealth::new(config);
assert_eq!(h.state(), CircuitState::Closed);
h.record_failure(10);
h.record_failure(20);
assert!(matches!(h.state(), CircuitState::Open { .. }));
assert!(!h.is_callable(50)); assert!(h.is_callable(200)); assert!(matches!(h.state(), CircuitState::HalfOpen { .. }));
h.record_success(1_000_000);
assert_eq!(h.state(), CircuitState::Closed);
}
}