use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use crate::core::ResourceStorage;
use super::config::HealthConfig;
#[derive(Debug, Clone)]
pub struct BackendHealthStatus {
pub backend_id: String,
pub is_healthy: bool,
pub last_success: Option<Instant>,
pub last_failure: Option<Instant>,
pub consecutive_failures: u32,
pub consecutive_successes: u32,
pub last_error: Option<String>,
pub avg_response_time_ms: f64,
response_times: Vec<u64>,
}
impl BackendHealthStatus {
pub fn new(backend_id: impl Into<String>) -> Self {
Self {
backend_id: backend_id.into(),
is_healthy: true, last_success: None,
last_failure: None,
consecutive_failures: 0,
consecutive_successes: 0,
last_error: None,
avg_response_time_ms: 0.0,
response_times: Vec::with_capacity(10),
}
}
pub fn record_success(&mut self, response_time_ms: u64) {
self.last_success = Some(Instant::now());
self.consecutive_successes += 1;
self.consecutive_failures = 0;
self.last_error = None;
self.response_times.push(response_time_ms);
if self.response_times.len() > 10 {
self.response_times.remove(0);
}
self.avg_response_time_ms =
self.response_times.iter().sum::<u64>() as f64 / self.response_times.len() as f64;
}
pub fn record_failure(&mut self, error: String) {
self.last_failure = Some(Instant::now());
self.consecutive_failures += 1;
self.consecutive_successes = 0;
self.last_error = Some(error);
}
pub fn update_health(&mut self, failure_threshold: u32, success_threshold: u32) {
if self.consecutive_failures >= failure_threshold {
if self.is_healthy {
warn!(
backend = %self.backend_id,
failures = self.consecutive_failures,
"Backend marked unhealthy"
);
}
self.is_healthy = false;
} else if self.consecutive_successes >= success_threshold {
if !self.is_healthy {
info!(
backend = %self.backend_id,
successes = self.consecutive_successes,
"Backend recovered"
);
}
self.is_healthy = true;
}
}
pub fn time_since_success(&self) -> Option<Duration> {
self.last_success.map(|t| t.elapsed())
}
pub fn time_since_failure(&self) -> Option<Duration> {
self.last_failure.map(|t| t.elapsed())
}
}
#[derive(Debug, Clone)]
pub struct CompositeHealthStatus {
pub backends: HashMap<String, BackendHealthStatus>,
pub is_healthy: bool,
pub degraded_backends: Vec<String>,
pub failed_backends: Vec<String>,
pub timestamp: Instant,
}
impl CompositeHealthStatus {
pub fn new(backends: HashMap<String, BackendHealthStatus>) -> Self {
let degraded_backends: Vec<_> = backends
.iter()
.filter(|(_, status)| !status.is_healthy)
.map(|(id, _)| id.clone())
.collect();
let failed_backends = Vec::new();
let is_healthy = degraded_backends.is_empty() && failed_backends.is_empty();
Self {
backends,
is_healthy,
degraded_backends,
failed_backends,
timestamp: Instant::now(),
}
}
pub fn primary_healthy(&self, primary_id: &str) -> bool {
self.backends
.get(primary_id)
.map(|s| s.is_healthy)
.unwrap_or(false)
}
pub fn healthy_count(&self) -> usize {
self.backends.values().filter(|s| s.is_healthy).count()
}
pub fn unhealthy_count(&self) -> usize {
self.backends.values().filter(|s| !s.is_healthy).count()
}
}
#[derive(Debug)]
pub enum HealthCheckResult {
Healthy {
response_time_ms: u64,
},
Unhealthy {
error: String,
},
Timeout,
}
pub struct HealthMonitor {
config: HealthConfig,
status: Arc<RwLock<HashMap<String, BackendHealthStatus>>>,
shutdown_tx: Option<mpsc::Sender<()>>,
}
impl HealthMonitor {
pub fn new(config: HealthConfig) -> Self {
Self {
config,
status: Arc::new(RwLock::new(HashMap::new())),
shutdown_tx: None,
}
}
pub fn start(
&mut self,
backends: HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
) -> tokio::task::JoinHandle<()> {
let (tx, rx) = mpsc::channel(1);
self.shutdown_tx = Some(tx);
let config = self.config.clone();
let status = self.status.clone();
{
let mut status_map = status.write();
for id in backends.keys() {
status_map.insert(id.clone(), BackendHealthStatus::new(id));
}
}
tokio::spawn(async move {
Self::health_check_loop(rx, backends, config, status).await;
})
}
pub async fn stop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(()).await;
}
}
pub fn backend_status(&self, backend_id: &str) -> Option<BackendHealthStatus> {
self.status.read().get(backend_id).cloned()
}
pub fn all_status(&self) -> CompositeHealthStatus {
CompositeHealthStatus::new(self.status.read().clone())
}
pub fn is_healthy(&self, backend_id: &str) -> bool {
self.status
.read()
.get(backend_id)
.map(|s| s.is_healthy)
.unwrap_or(false)
}
pub fn all_healthy(&self) -> bool {
self.status.read().values().all(|s| s.is_healthy)
}
pub async fn check_backend(
backend: &dyn ResourceStorage,
timeout: Duration,
) -> HealthCheckResult {
let start = Instant::now();
let check = async {
backend
.count(
&crate::tenant::TenantContext::system(),
Some("__health_check__"),
)
.await
};
match tokio::time::timeout(timeout, check).await {
Ok(Ok(_)) => HealthCheckResult::Healthy {
response_time_ms: start.elapsed().as_millis() as u64,
},
Ok(Err(_e)) => {
HealthCheckResult::Healthy {
response_time_ms: start.elapsed().as_millis() as u64,
}
}
Err(_) => HealthCheckResult::Timeout,
}
}
async fn health_check_loop(
mut shutdown_rx: mpsc::Receiver<()>,
backends: HashMap<String, Arc<dyn ResourceStorage + Send + Sync>>,
config: HealthConfig,
status: Arc<RwLock<HashMap<String, BackendHealthStatus>>>,
) {
let mut interval = tokio::time::interval(config.check_interval);
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
debug!("Health monitor shutting down");
break;
}
_ = interval.tick() => {
for (id, backend) in &backends {
let result = Self::check_backend(backend.as_ref(), config.timeout).await;
let mut status_map = status.write();
if let Some(backend_status) = status_map.get_mut(id) {
match result {
HealthCheckResult::Healthy { response_time_ms } => {
backend_status.record_success(response_time_ms);
}
HealthCheckResult::Unhealthy { error } => {
backend_status.record_failure(error);
}
HealthCheckResult::Timeout => {
backend_status.record_failure("Health check timed out".to_string());
}
}
backend_status.update_health(
config.failure_threshold,
config.success_threshold,
);
}
}
}
}
}
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct HealthCheckResponse {
pub status: String,
pub components: HashMap<String, ComponentHealth>,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ComponentHealth {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub response_time_ms: Option<u64>,
}
impl From<CompositeHealthStatus> for HealthCheckResponse {
fn from(status: CompositeHealthStatus) -> Self {
let overall_status = if status.is_healthy {
"healthy"
} else if status.failed_backends.is_empty() {
"degraded"
} else {
"unhealthy"
};
let components = status
.backends
.into_iter()
.map(|(id, backend_status)| {
let component_status = if backend_status.is_healthy {
"healthy"
} else {
"unhealthy"
};
(
id,
ComponentHealth {
status: component_status.to_string(),
details: backend_status.last_error,
response_time_ms: if backend_status.avg_response_time_ms > 0.0 {
Some(backend_status.avg_response_time_ms as u64)
} else {
None
},
},
)
})
.collect();
Self {
status: overall_status.to_string(),
components,
timestamp: chrono::Utc::now(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backend_health_status_new() {
let status = BackendHealthStatus::new("test-backend");
assert!(status.is_healthy);
assert_eq!(status.consecutive_failures, 0);
assert_eq!(status.consecutive_successes, 0);
}
#[test]
fn test_record_success() {
let mut status = BackendHealthStatus::new("test");
status.record_success(100);
assert!(status.last_success.is_some());
assert_eq!(status.consecutive_successes, 1);
assert_eq!(status.consecutive_failures, 0);
assert!((status.avg_response_time_ms - 100.0).abs() < 0.1);
}
#[test]
fn test_record_failure() {
let mut status = BackendHealthStatus::new("test");
status.record_failure("Connection refused".to_string());
assert!(status.last_failure.is_some());
assert_eq!(status.consecutive_failures, 1);
assert_eq!(status.last_error, Some("Connection refused".to_string()));
}
#[test]
fn test_update_health_becomes_unhealthy() {
let mut status = BackendHealthStatus::new("test");
for _ in 0..3 {
status.record_failure("Error".to_string());
}
status.update_health(3, 2);
assert!(!status.is_healthy);
}
#[test]
fn test_update_health_recovers() {
let mut status = BackendHealthStatus::new("test");
status.is_healthy = false;
for _ in 0..2 {
status.record_success(50);
}
status.update_health(3, 2);
assert!(status.is_healthy);
}
#[test]
fn test_composite_health_status() {
let mut backends = HashMap::new();
let mut healthy = BackendHealthStatus::new("healthy");
healthy.is_healthy = true;
let mut unhealthy = BackendHealthStatus::new("unhealthy");
unhealthy.is_healthy = false;
backends.insert("healthy".to_string(), healthy);
backends.insert("unhealthy".to_string(), unhealthy);
let status = CompositeHealthStatus::new(backends);
assert!(!status.is_healthy);
assert_eq!(status.healthy_count(), 1);
assert_eq!(status.unhealthy_count(), 1);
assert!(status.degraded_backends.contains(&"unhealthy".to_string()));
}
#[test]
fn test_health_check_response_from_status() {
let mut backends = HashMap::new();
backends.insert("primary".to_string(), BackendHealthStatus::new("primary"));
let status = CompositeHealthStatus::new(backends);
let response: HealthCheckResponse = status.into();
assert_eq!(response.status, "healthy");
assert!(response.components.contains_key("primary"));
}
}