use std::{
collections::VecDeque,
fmt,
time::{Duration, Instant},
};
use crate::utils::datetime::format_duration;
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum CircuitBreakerState {
#[default]
Closed,
Open { opened_at: Instant },
HalfOpen,
}
impl CircuitBreakerState {
pub fn is_open(&self) -> bool {
matches!(self, CircuitBreakerState::Open { .. })
}
pub fn is_half_open(&self) -> bool {
matches!(self, CircuitBreakerState::HalfOpen)
}
pub fn is_closed(&self) -> bool {
matches!(self, CircuitBreakerState::Closed)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionAttemptResult {
Success,
Failure,
}
#[derive(Debug, Clone)]
struct ConnectionAttempt {
timestamp: Instant,
result: ConnectionAttemptResult,
}
#[derive(Debug, Clone, Default)]
pub struct PeerHealthMetrics {
circuit_breaker_state: CircuitBreakerState,
last_attempt: Option<Instant>,
consecutive_failures: usize,
connection_attempts: VecDeque<ConnectionAttempt>,
avg_connection_latency: Option<Duration>,
}
impl PeerHealthMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn record_success(&mut self, latency: Option<Duration>) {
self.last_attempt = Some(Instant::now());
self.consecutive_failures = 0;
self.add_attempt(ConnectionAttemptResult::Success);
if let Some(latency) = latency {
self.update_latency(latency);
}
if self.circuit_breaker_state == CircuitBreakerState::HalfOpen {
self.circuit_breaker_state = CircuitBreakerState::Closed;
}
}
pub fn record_failure(&mut self, failure_threshold: usize) {
self.last_attempt = Some(Instant::now());
self.consecutive_failures += 1;
self.add_attempt(ConnectionAttemptResult::Failure);
if self.consecutive_failures >= failure_threshold && !self.circuit_breaker_state.is_open() {
self.circuit_breaker_state = CircuitBreakerState::Open {
opened_at: Instant::now(),
};
}
}
pub fn should_allow_connection(&self, retry_interval: Duration) -> bool {
match &self.circuit_breaker_state {
CircuitBreakerState::Closed => true,
CircuitBreakerState::HalfOpen => true,
CircuitBreakerState::Open { opened_at } => opened_at.elapsed() >= retry_interval,
}
}
pub fn try_half_open(&mut self, retry_interval: Duration) -> bool {
if let CircuitBreakerState::Open { opened_at } = &self.circuit_breaker_state {
if opened_at.elapsed() >= retry_interval {
self.circuit_breaker_state = CircuitBreakerState::HalfOpen;
return true;
}
}
false
}
pub fn success_rate(&self, window: Duration) -> f32 {
let cutoff = Instant::now() - window;
let recent_attempts: Vec<_> = self
.connection_attempts
.iter()
.filter(|attempt| attempt.timestamp > cutoff)
.collect();
if recent_attempts.is_empty() {
return 0.25; }
let successes = recent_attempts
.iter()
.filter(|attempt| attempt.result == ConnectionAttemptResult::Success)
.count();
let alpha = 1.0;
let beta = 3.0;
let success_count = successes as f32;
let total_attempts = recent_attempts.len() as f32;
(alpha + success_count) / (alpha + beta + total_attempts)
}
pub fn circuit_breaker_state(&self) -> &CircuitBreakerState {
&self.circuit_breaker_state
}
pub fn consecutive_failures(&self) -> usize {
self.consecutive_failures
}
pub fn last_attempt(&self) -> Option<Instant> {
self.last_attempt
}
pub fn avg_connection_latency(&self) -> Option<Duration> {
self.avg_connection_latency
}
pub fn cleanup_old_attempts(&mut self, window: Duration) {
let cutoff = Instant::now() - window;
while let Some(front) = self.connection_attempts.front() {
if front.timestamp <= cutoff {
self.connection_attempts.pop_front();
} else {
break;
}
}
}
pub fn health_score(&self, window: Duration) -> f32 {
if self.circuit_breaker_state.is_open() {
return 0.0;
}
let success_rate = self.success_rate(window);
let failure_penalty = if self.consecutive_failures > 0 {
0.1 * self.consecutive_failures as f32
} else {
0.0
};
(success_rate - failure_penalty).clamp(0.0, 1.0)
}
fn add_attempt(&mut self, result: ConnectionAttemptResult) {
self.connection_attempts.push_back(ConnectionAttempt {
timestamp: Instant::now(),
result,
});
const MAX_ATTEMPTS_HISTORY: usize = 100;
if self.connection_attempts.len() > MAX_ATTEMPTS_HISTORY {
self.connection_attempts.pop_front();
}
}
fn update_latency(&mut self, new_latency: Duration) {
match self.avg_connection_latency {
Some(current_avg) => {
const ALPHA: f32 = 0.3;
let new_avg_millis =
(1.0 - ALPHA) * current_avg.as_millis() as f32 + ALPHA * new_latency.as_millis() as f32;
#[allow(clippy::cast_possible_truncation)]
let millis = new_avg_millis as u64;
self.avg_connection_latency = Some(Duration::from_millis(millis));
},
None => {
self.avg_connection_latency = Some(new_latency);
},
}
}
}
impl fmt::Display for PeerHealthMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Health(state: {:?}", self.circuit_breaker_state)?;
if self.consecutive_failures > 0 {
write!(f, ", failures: {}", self.consecutive_failures)?;
}
if let Some(latency) = self.avg_connection_latency {
write!(f, ", latency: {}", format_duration(latency))?;
}
write!(f, ")")
}
}
impl fmt::Display for CircuitBreakerState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CircuitBreakerState::Closed => write!(f, "Closed"),
CircuitBreakerState::Open { opened_at } => {
write!(f, "Open({})", format_duration(opened_at.elapsed()))
},
CircuitBreakerState::HalfOpen => write!(f, "HalfOpen"),
}
}
}
#[cfg(test)]
mod tests {
use std::thread;
use super::*;
#[test]
fn test_circuit_breaker_transitions() {
let mut metrics = PeerHealthMetrics::new();
let failure_threshold = 3;
let retry_interval = Duration::from_millis(100);
assert!(metrics.circuit_breaker_state.is_closed());
assert!(metrics.should_allow_connection(retry_interval));
for _ in 0..failure_threshold {
metrics.record_failure(failure_threshold);
}
assert!(metrics.circuit_breaker_state.is_open());
assert!(!metrics.should_allow_connection(retry_interval));
thread::sleep(retry_interval + Duration::from_millis(10));
assert!(metrics.should_allow_connection(retry_interval));
assert!(metrics.try_half_open(retry_interval));
assert!(metrics.circuit_breaker_state.is_half_open());
metrics.record_success(Some(Duration::from_millis(50)));
assert!(metrics.circuit_breaker_state.is_closed());
}
#[test]
fn test_success_rate_calculation() {
let mut metrics = PeerHealthMetrics::new();
let window = Duration::from_secs(60);
metrics.record_success(None);
metrics.record_success(None);
metrics.record_failure(5);
metrics.record_success(None);
let success_rate = metrics.success_rate(window);
assert_eq!(success_rate, 0.5);
}
#[test]
fn test_circuit_breaker_cooldown_preservation() {
let mut metrics = PeerHealthMetrics::new();
for _ in 0..5 {
metrics.record_failure(3);
}
assert!(metrics.circuit_breaker_state.is_open());
let original_opened_at = match &metrics.circuit_breaker_state {
CircuitBreakerState::Open { opened_at } => *opened_at,
_ => panic!("Circuit breaker should be open"),
};
metrics.record_failure(3);
metrics.record_failure(3);
if let CircuitBreakerState::Open { opened_at } = &metrics.circuit_breaker_state {
assert_eq!(
*opened_at, original_opened_at,
"Circuit breaker cooldown should not reset on repeated failures"
);
} else {
panic!("Circuit breaker should still be open");
}
}
#[test]
fn test_bayesian_success_rate_with_no_data() {
let metrics = PeerHealthMetrics::new();
let window = Duration::from_secs(60);
let success_rate = metrics.success_rate(window);
assert_eq!(success_rate, 0.25);
}
#[test]
fn test_bayesian_success_rate_calculation() {
let mut metrics = PeerHealthMetrics::new();
let window = Duration::from_secs(60);
for _ in 0..5 {
metrics.record_success(None);
}
let rate1 = metrics.success_rate(window);
assert!((rate1 - 6.0 / 9.0).abs() < 0.001);
let mut metrics = PeerHealthMetrics::new();
for _ in 0..5 {
metrics.record_failure(10); }
let rate2 = metrics.success_rate(window);
assert!((rate2 - 1.0 / 9.0).abs() < 0.001);
}
#[test]
fn test_health_score() {
let mut metrics = PeerHealthMetrics::new();
let window = Duration::from_secs(60);
assert_eq!(metrics.health_score(window), 0.25);
metrics.record_success(None);
metrics.record_success(None);
metrics.record_failure(5);
let score = metrics.health_score(window);
assert!(score < 1.0); assert!(score > 0.0);
metrics.record_failure(5);
metrics.record_failure(5);
metrics.record_failure(5);
assert_eq!(metrics.health_score(window), 0.0);
}
}