use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Degraded { reason: String },
Unhealthy { reason: String },
}
impl fmt::Display for HealthStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
HealthStatus::Healthy => write!(f, "Healthy"),
HealthStatus::Degraded { reason } => write!(f, "Degraded: {}", reason),
HealthStatus::Unhealthy { reason } => write!(f, "Unhealthy: {}", reason),
}
}
}
pub trait HealthCheck: Send + Sync {
fn check_health(&self) -> HealthStatus;
fn status_message(&self) -> String;
}
impl<T: HealthCheck + 'static> HealthCheck for std::sync::Arc<T> {
fn check_health(&self) -> HealthStatus {
self.as_ref().check_health()
}
fn status_message(&self) -> String {
self.as_ref().status_message()
}
}
pub struct WorkerHealth {
pub worker_id: String,
pub status: Mutex<HealthStatus>,
pub last_check: Mutex<DateTime<Utc>>,
pub messages_processed: AtomicU64,
pub messages_failed: AtomicU64,
pub avg_processing_time_ms: AtomicU64,
pub consecutive_failures: AtomicU64,
pub last_error: Mutex<Option<String>>,
}
impl WorkerHealth {
pub fn new(worker_id: impl Into<String>) -> Self {
Self {
worker_id: worker_id.into(),
status: Mutex::new(HealthStatus::Healthy),
last_check: Mutex::new(Utc::now()),
messages_processed: AtomicU64::new(0),
messages_failed: AtomicU64::new(0),
avg_processing_time_ms: AtomicU64::new(0),
consecutive_failures: AtomicU64::new(0),
last_error: Mutex::new(None),
}
}
pub fn record_success(&self, processing_time: Duration) {
self.messages_processed.fetch_add(1, Ordering::Relaxed);
self.consecutive_failures.store(0, Ordering::Relaxed);
let current_avg = self.avg_processing_time_ms.load(Ordering::Relaxed);
let new_avg = if current_avg == 0 {
processing_time.as_millis() as u64
} else {
(current_avg + processing_time.as_millis() as u64) / 2
};
self.avg_processing_time_ms.store(new_avg, Ordering::Relaxed);
self.update_status();
}
pub fn record_failure(&self, error: impl Into<String>) {
self.messages_failed.fetch_add(1, Ordering::Relaxed);
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
*self.last_error.lock().unwrap() = Some(error.into());
self.update_status();
}
fn update_status(&self) {
let consecutive_failures = self.consecutive_failures.load(Ordering::Relaxed);
let total_messages = self.messages_processed.load(Ordering::Relaxed)
+ self.messages_failed.load(Ordering::Relaxed);
let new_status = if total_messages == 0 {
HealthStatus::Healthy
} else if consecutive_failures >= 10 {
HealthStatus::Unhealthy {
reason: format!("{} consecutive failures", consecutive_failures)
}
} else if consecutive_failures >= 3 {
HealthStatus::Degraded {
reason: format!("{} consecutive failures", consecutive_failures)
}
} else {
let failure_rate = self.messages_failed.load(Ordering::Relaxed) as f64 / total_messages as f64;
if failure_rate > 0.5 {
HealthStatus::Degraded {
reason: format!("High failure rate: {:.1}%", failure_rate * 100.0)
}
} else {
HealthStatus::Healthy
}
};
*self.status.lock().unwrap() = new_status;
*self.last_check.lock().unwrap() = Utc::now();
}
pub fn get_status(&self) -> HealthStatus {
self.status.lock().unwrap().clone()
}
}
impl fmt::Debug for WorkerHealth {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WorkerHealth")
.field("worker_id", &self.worker_id)
.field("status", &self.status)
.field("messages_processed", &self.messages_processed.load(Ordering::Relaxed))
.field("messages_failed", &self.messages_failed.load(Ordering::Relaxed))
.field("avg_processing_time_ms", &self.avg_processing_time_ms.load(Ordering::Relaxed))
.field("consecutive_failures", &self.consecutive_failures.load(Ordering::Relaxed))
.finish()
}
}
pub struct WorkerPoolHealth {
pool_name: String,
worker_count: usize,
is_running: bool,
}
impl WorkerPoolHealth {
pub fn new(pool_name: String, worker_count: usize, is_running: bool) -> Self {
Self {
pool_name,
worker_count,
is_running,
}
}
}
impl HealthCheck for WorkerPoolHealth {
fn check_health(&self) -> HealthStatus {
if !self.is_running {
HealthStatus::Unhealthy {
reason: "Pool is not running".to_string()
}
} else if self.worker_count == 0 {
HealthStatus::Degraded {
reason: "No workers available".to_string()
}
} else {
HealthStatus::Healthy
}
}
fn status_message(&self) -> String {
match self.check_health() {
HealthStatus::Healthy => {
format!("WorkerPool '{}' is healthy with {} workers.", self.pool_name, self.worker_count)
}
HealthStatus::Degraded { reason } => {
format!("WorkerPool '{}' is degraded: {}. {} workers running.",
self.pool_name, reason, self.worker_count)
}
HealthStatus::Unhealthy { reason } => {
format!("WorkerPool '{}' is unhealthy: {}.", self.pool_name, reason)
}
}
}
}