use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::interval;
use terraphim_types::capability::ProcessId;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
Timeout,
Terminated,
}
impl std::fmt::Display for HealthStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HealthStatus::Healthy => write!(f, "healthy"),
HealthStatus::Degraded => write!(f, "degraded"),
HealthStatus::Unhealthy => write!(f, "unhealthy"),
HealthStatus::Timeout => write!(f, "timeout"),
HealthStatus::Terminated => write!(f, "terminated"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
impl std::fmt::Display for CircuitState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CircuitState::Closed => write!(f, "closed"),
CircuitState::Open => write!(f, "open"),
CircuitState::HalfOpen => write!(f, "half_open"),
}
}
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u32,
pub cooldown: Duration,
pub success_threshold: u32,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 3,
cooldown: Duration::from_secs(30),
success_threshold: 1,
}
}
}
#[derive(Debug)]
pub struct CircuitBreaker {
state: CircuitState,
consecutive_failures: u32,
consecutive_successes: u32,
last_state_change: Instant,
config: CircuitBreakerConfig,
}
impl CircuitBreaker {
pub fn new(config: CircuitBreakerConfig) -> Self {
Self {
state: CircuitState::Closed,
consecutive_failures: 0,
consecutive_successes: 0,
last_state_change: Instant::now(),
config,
}
}
pub fn state(&self) -> CircuitState {
if self.state == CircuitState::Open
&& self.last_state_change.elapsed() >= self.config.cooldown
{
CircuitState::HalfOpen
} else {
self.state
}
}
pub fn should_allow(&self) -> bool {
match self.state() {
CircuitState::Closed => true,
CircuitState::Open => false,
CircuitState::HalfOpen => true, }
}
pub fn record_success(&mut self) {
self.consecutive_failures = 0;
match self.state() {
CircuitState::HalfOpen => {
self.consecutive_successes += 1;
if self.consecutive_successes >= self.config.success_threshold {
self.transition(CircuitState::Closed);
self.consecutive_successes = 0;
tracing::info!("Circuit breaker closed after successful probe");
}
}
CircuitState::Closed => {
}
CircuitState::Open => {
}
}
}
pub fn record_failure(&mut self) {
self.consecutive_successes = 0;
self.consecutive_failures += 1;
match self.state() {
CircuitState::Closed => {
if self.consecutive_failures >= self.config.failure_threshold {
self.transition(CircuitState::Open);
tracing::warn!(
consecutive_failures = self.consecutive_failures,
"Circuit breaker opened"
);
}
}
CircuitState::HalfOpen => {
self.transition(CircuitState::Open);
tracing::warn!("Circuit breaker re-opened after failed probe");
}
CircuitState::Open => {
}
}
}
pub fn reset(&mut self) {
self.transition(CircuitState::Closed);
self.consecutive_failures = 0;
self.consecutive_successes = 0;
}
fn transition(&mut self, new_state: CircuitState) {
self.state = new_state;
self.last_state_change = Instant::now();
}
}
#[derive(Debug, Clone)]
pub struct HealthRecord {
pub timestamp: Instant,
pub status: HealthStatus,
}
#[derive(Debug)]
pub struct HealthHistory {
records: VecDeque<HealthRecord>,
max_records: usize,
window: Duration,
}
impl HealthHistory {
pub fn new(max_records: usize, window: Duration) -> Self {
Self {
records: VecDeque::with_capacity(max_records),
max_records,
window,
}
}
pub fn record(&mut self, status: HealthStatus) {
let now = Instant::now();
self.evict_stale(now);
if self.records.len() >= self.max_records {
self.records.pop_front();
}
self.records.push_back(HealthRecord {
timestamp: now,
status,
});
}
pub fn success_rate(&self) -> f64 {
let now = Instant::now();
let cutoff = now.checked_sub(self.window).unwrap_or(now);
let window_records: Vec<_> = self
.records
.iter()
.filter(|r| r.timestamp >= cutoff)
.collect();
if window_records.is_empty() {
return 1.0; }
let healthy_count = window_records
.iter()
.filter(|r| r.status == HealthStatus::Healthy)
.count();
healthy_count as f64 / window_records.len() as f64
}
pub fn assess(&self, degradation_threshold: f64) -> HealthStatus {
let rate = self.success_rate();
if rate >= degradation_threshold {
HealthStatus::Healthy
} else if rate > 0.0 {
HealthStatus::Degraded
} else {
HealthStatus::Unhealthy
}
}
pub fn len(&self) -> usize {
self.records.len()
}
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
fn evict_stale(&mut self, now: Instant) {
let cutoff = now.checked_sub(self.window).unwrap_or(now);
while let Some(front) = self.records.front() {
if front.timestamp < cutoff {
self.records.pop_front();
} else {
break;
}
}
}
}
#[derive(Debug, Clone)]
pub struct HealthChecker {
process_id: ProcessId,
interval: Duration,
healthy: Arc<AtomicBool>,
status: Arc<std::sync::Mutex<HealthStatus>>,
}
impl HealthChecker {
pub fn new(process_id: ProcessId, interval: Duration) -> Self {
let checker = Self {
process_id,
interval,
healthy: Arc::new(AtomicBool::new(true)),
status: Arc::new(std::sync::Mutex::new(HealthStatus::Healthy)),
};
checker.start_check_loop();
checker
}
pub async fn is_healthy(&self) -> bool {
self.healthy.load(Ordering::Relaxed)
}
pub fn status(&self) -> HealthStatus {
*self.status.lock().unwrap()
}
pub fn mark_unhealthy(&self) {
self.healthy.store(false, Ordering::Relaxed);
*self.status.lock().unwrap() = HealthStatus::Unhealthy;
}
pub fn mark_degraded(&self) {
self.healthy.store(true, Ordering::Relaxed);
*self.status.lock().unwrap() = HealthStatus::Degraded;
}
pub fn mark_terminated(&self) {
self.healthy.store(false, Ordering::Relaxed);
*self.status.lock().unwrap() = HealthStatus::Terminated;
}
fn start_check_loop(&self) {
let healthy = Arc::clone(&self.healthy);
let status = Arc::clone(&self.status);
let interval_duration = self.interval;
let process_id = self.process_id;
tokio::spawn(async move {
let mut ticker = interval(interval_duration);
loop {
ticker.tick().await;
if !healthy.load(Ordering::Relaxed) {
let current = *status.lock().unwrap();
tracing::debug!(
process_id = %process_id,
status = %current,
"Health check exiting loop"
);
break;
}
tracing::debug!(process_id = %process_id, "Health check: healthy");
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_health_status_display() {
assert_eq!(format!("{}", HealthStatus::Healthy), "healthy");
assert_eq!(format!("{}", HealthStatus::Degraded), "degraded");
assert_eq!(format!("{}", HealthStatus::Unhealthy), "unhealthy");
assert_eq!(format!("{}", HealthStatus::Timeout), "timeout");
assert_eq!(format!("{}", HealthStatus::Terminated), "terminated");
}
#[tokio::test]
async fn test_health_checker() {
let checker = HealthChecker::new(ProcessId::new(), Duration::from_secs(1));
assert!(checker.is_healthy().await);
assert_eq!(checker.status(), HealthStatus::Healthy);
checker.mark_unhealthy();
assert!(!checker.is_healthy().await);
assert_eq!(checker.status(), HealthStatus::Unhealthy);
}
#[tokio::test]
async fn test_health_checker_degraded() {
let checker = HealthChecker::new(ProcessId::new(), Duration::from_secs(1));
checker.mark_degraded();
assert!(checker.is_healthy().await);
assert_eq!(checker.status(), HealthStatus::Degraded);
}
#[test]
fn test_circuit_breaker_starts_closed() {
let cb = CircuitBreaker::new(CircuitBreakerConfig::default());
assert_eq!(cb.state(), CircuitState::Closed);
assert!(cb.should_allow());
}
#[test]
fn test_circuit_breaker_opens_on_failures() {
let config = CircuitBreakerConfig {
failure_threshold: 3,
cooldown: Duration::from_secs(30),
success_threshold: 1,
};
let mut cb = CircuitBreaker::new(config);
cb.record_failure();
assert_eq!(cb.state(), CircuitState::Closed);
cb.record_failure();
assert_eq!(cb.state(), CircuitState::Closed);
cb.record_failure(); assert_eq!(cb.state(), CircuitState::Open);
assert!(!cb.should_allow());
}
#[test]
fn test_circuit_breaker_success_resets_failure_count() {
let config = CircuitBreakerConfig {
failure_threshold: 3,
..Default::default()
};
let mut cb = CircuitBreaker::new(config);
cb.record_failure();
cb.record_failure();
cb.record_success(); cb.record_failure();
cb.record_failure();
assert_eq!(cb.state(), CircuitState::Closed);
}
#[test]
fn test_circuit_breaker_half_open_after_cooldown() {
let config = CircuitBreakerConfig {
failure_threshold: 1,
cooldown: Duration::from_millis(100), success_threshold: 1,
};
let mut cb = CircuitBreaker::new(config);
cb.record_failure(); assert_eq!(cb.state(), CircuitState::Open);
std::thread::sleep(Duration::from_millis(150));
assert_eq!(cb.state(), CircuitState::HalfOpen);
assert!(cb.should_allow());
}
#[test]
fn test_circuit_breaker_half_open_success_closes() {
let config = CircuitBreakerConfig {
failure_threshold: 1,
cooldown: Duration::from_millis(100),
success_threshold: 1,
};
let mut cb = CircuitBreaker::new(config);
cb.record_failure(); std::thread::sleep(Duration::from_millis(150)); assert_eq!(cb.state(), CircuitState::HalfOpen);
cb.record_success(); assert_eq!(cb.state(), CircuitState::Closed);
}
#[test]
fn test_circuit_breaker_half_open_failure_reopens() {
let config = CircuitBreakerConfig {
failure_threshold: 1,
cooldown: Duration::from_millis(100),
success_threshold: 1,
};
let mut cb = CircuitBreaker::new(config);
cb.record_failure(); std::thread::sleep(Duration::from_millis(150));
cb.record_failure(); assert_eq!(cb.state(), CircuitState::Open);
}
#[test]
fn test_circuit_breaker_reset() {
let config = CircuitBreakerConfig {
failure_threshold: 1,
..Default::default()
};
let mut cb = CircuitBreaker::new(config);
cb.record_failure(); assert_eq!(cb.state(), CircuitState::Open);
cb.reset();
assert_eq!(cb.state(), CircuitState::Closed);
assert!(cb.should_allow());
}
#[test]
fn test_health_history_empty() {
let history = HealthHistory::new(100, Duration::from_secs(60));
assert!(history.is_empty());
assert_eq!(history.len(), 0);
assert_eq!(history.success_rate(), 1.0); }
#[test]
fn test_health_history_all_healthy() {
let mut history = HealthHistory::new(100, Duration::from_secs(60));
for _ in 0..10 {
history.record(HealthStatus::Healthy);
}
assert_eq!(history.success_rate(), 1.0);
assert_eq!(history.assess(0.8), HealthStatus::Healthy);
}
#[test]
fn test_health_history_mixed() {
let mut history = HealthHistory::new(100, Duration::from_secs(60));
for _ in 0..7 {
history.record(HealthStatus::Healthy);
}
for _ in 0..3 {
history.record(HealthStatus::Unhealthy);
}
let rate = history.success_rate();
assert!((rate - 0.7).abs() < 0.01);
assert_eq!(history.assess(0.8), HealthStatus::Degraded);
}
#[test]
fn test_health_history_all_unhealthy() {
let mut history = HealthHistory::new(100, Duration::from_secs(60));
for _ in 0..5 {
history.record(HealthStatus::Unhealthy);
}
assert_eq!(history.success_rate(), 0.0);
assert_eq!(history.assess(0.5), HealthStatus::Unhealthy);
}
#[test]
fn test_health_history_max_records() {
let mut history = HealthHistory::new(5, Duration::from_secs(600));
for _ in 0..10 {
history.record(HealthStatus::Healthy);
}
assert_eq!(history.len(), 5);
}
#[test]
fn test_circuit_state_display() {
assert_eq!(format!("{}", CircuitState::Closed), "closed");
assert_eq!(format!("{}", CircuitState::Open), "open");
assert_eq!(format!("{}", CircuitState::HalfOpen), "half_open");
}
}
#[cfg(test)]
mod proptest_tests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn circuit_breaker_state_always_valid(
actions in prop::collection::vec(prop::bool::ANY, 1..50),
) {
let config = CircuitBreakerConfig {
failure_threshold: 3,
cooldown: Duration::from_millis(1),
success_threshold: 1,
};
let mut cb = CircuitBreaker::new(config);
for success in &actions {
if *success {
cb.record_success();
} else {
cb.record_failure();
}
let state = cb.state();
prop_assert!(
state == CircuitState::Closed
|| state == CircuitState::Open
|| state == CircuitState::HalfOpen
);
}
}
#[test]
fn health_history_success_rate_in_range(
statuses in prop::collection::vec(
prop_oneof![
Just(HealthStatus::Healthy),
Just(HealthStatus::Unhealthy),
Just(HealthStatus::Degraded),
],
1..100,
),
) {
let mut history = HealthHistory::new(200, Duration::from_secs(600));
for status in &statuses {
history.record(*status);
}
let rate = history.success_rate();
prop_assert!(rate >= 0.0);
prop_assert!(rate <= 1.0);
}
}
}