use anyhow::Result;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthMonitorConfig {
pub check_interval: Duration,
pub unhealthy_threshold: u32,
pub recovery_check_interval: Duration,
pub enable_predictive_analytics: bool,
pub alert_thresholds: AlertThresholds,
pub auto_recovery: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertThresholds {
pub cpu_usage_percent: f64,
pub memory_usage_percent: f64,
pub disk_usage_percent: f64,
pub connection_count_threshold: u32,
pub error_rate_threshold: f64,
pub latency_threshold_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthMetrics {
pub timestamp: DateTime<Utc>,
pub cpu_usage: f64,
pub memory_usage: f64,
pub disk_usage: f64,
pub active_connections: u32,
pub total_messages: u64,
pub error_count: u64,
pub average_latency_ms: f64,
pub throughput_msgs_per_sec: f64,
pub jetstream_usage: JetStreamMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JetStreamMetrics {
pub streams: u32,
pub consumers: u32,
pub messages: u64,
pub bytes: u64,
pub cluster_size: u32,
pub memory_usage: u64,
pub storage_usage: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum HealthStatus {
Healthy,
Degraded,
Critical,
Recovering,
Unknown,
}
pub struct HealthMonitor {
config: HealthMonitorConfig,
metrics_history: RwLock<Vec<HealthMetrics>>,
current_status: RwLock<HealthStatus>,
last_check: RwLock<DateTime<Utc>>,
consecutive_failures: RwLock<u32>,
predictions: RwLock<HashMap<String, f64>>,
}
impl HealthMonitor {
pub fn new(config: HealthMonitorConfig) -> Self {
Self {
config,
metrics_history: RwLock::new(Vec::new()),
current_status: RwLock::new(HealthStatus::Unknown),
last_check: RwLock::new(Utc::now()),
consecutive_failures: RwLock::new(0),
predictions: RwLock::new(HashMap::new()),
}
}
pub async fn perform_health_check(&self, connection_url: &str) -> Result<HealthMetrics> {
let start_time = Utc::now();
let metrics = self.collect_system_metrics(connection_url).await?;
let mut history = self.metrics_history.write().await;
history.push(metrics.clone());
if history.len() > 1000 {
history.remove(0);
}
let status = self.analyze_health_status(&metrics, &history).await;
*self.current_status.write().await = status.clone();
*self.last_check.write().await = start_time;
let mut failures = self.consecutive_failures.write().await;
match status {
HealthStatus::Healthy => *failures = 0,
_ => *failures += 1,
}
if self.config.enable_predictive_analytics {
self.run_predictive_analytics(&history).await;
}
debug!(
"Health check completed for {}: {:?}",
connection_url, status
);
Ok(metrics)
}
async fn collect_system_metrics(&self, _connection_url: &str) -> Result<HealthMetrics> {
let now = Utc::now();
let time_factor = (now.timestamp() % 60) as f64 / 60.0;
Ok(HealthMetrics {
timestamp: now,
cpu_usage: 20.0 + time_factor * 30.0, memory_usage: 40.0 + time_factor * 20.0, disk_usage: 15.0 + time_factor * 10.0, active_connections: 50 + (time_factor * 100.0) as u32,
total_messages: 1000000 + (time_factor * 100000.0) as u64,
error_count: (time_factor * 50.0) as u64,
average_latency_ms: 5.0 + time_factor * 15.0, throughput_msgs_per_sec: 1000.0 + time_factor * 2000.0,
jetstream_usage: JetStreamMetrics {
streams: 10,
consumers: 25,
messages: 500000,
bytes: 1024 * 1024 * 100, cluster_size: 3,
memory_usage: 1024 * 1024 * 50, storage_usage: 1024 * 1024 * 1024, },
})
}
async fn analyze_health_status(
&self,
current: &HealthMetrics,
history: &[HealthMetrics],
) -> HealthStatus {
let thresholds = &self.config.alert_thresholds;
if current.cpu_usage > thresholds.cpu_usage_percent * 1.5
|| current.memory_usage > thresholds.memory_usage_percent * 1.5
|| current.error_count > 100
{
return HealthStatus::Critical;
}
if current.cpu_usage > thresholds.cpu_usage_percent
|| current.memory_usage > thresholds.memory_usage_percent
|| current.average_latency_ms > thresholds.latency_threshold_ms as f64
{
return HealthStatus::Degraded;
}
if history.len() >= 3 {
let recent_metrics = &history[history.len() - 3..];
let improving_trend = recent_metrics.windows(2).all(|w| {
w[1].cpu_usage < w[0].cpu_usage && w[1].average_latency_ms < w[0].average_latency_ms
});
if improving_trend {
let failures = *self.consecutive_failures.read().await;
if failures > 0 {
return HealthStatus::Recovering;
}
}
}
HealthStatus::Healthy
}
async fn run_predictive_analytics(&self, history: &[HealthMetrics]) {
if history.len() < 10 {
return; }
let mut predictions = self.predictions.write().await;
predictions.insert(
"cpu_trend".to_string(),
self.predict_linear_trend(history, |m| m.cpu_usage),
);
predictions.insert(
"memory_trend".to_string(),
self.predict_linear_trend(history, |m| m.memory_usage),
);
predictions.insert(
"latency_trend".to_string(),
self.predict_linear_trend(history, |m| m.average_latency_ms),
);
if let Some(cpu_trend) = predictions.get("cpu_trend") {
if *cpu_trend > 0.5 {
warn!("Predictive analytics: CPU usage trending upward");
}
}
debug!(
"Predictive analytics completed with {} predictions",
predictions.len()
);
}
fn predict_linear_trend<F>(&self, history: &[HealthMetrics], extractor: F) -> f64
where
F: Fn(&HealthMetrics) -> f64,
{
if history.len() < 2 {
return 0.0;
}
let recent_count = std::cmp::min(10, history.len());
let recent_metrics = &history[history.len() - recent_count..];
let values: Vec<f64> = recent_metrics.iter().map(extractor).collect();
let n = values.len() as f64;
let sum_x: f64 = (0..values.len()).map(|i| i as f64).sum();
let sum_y: f64 = values.iter().sum();
let sum_xy: f64 = values.iter().enumerate().map(|(i, &y)| i as f64 * y).sum();
let sum_x2: f64 = (0..values.len()).map(|i| (i as f64).powi(2)).sum();
(n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x.powi(2))
}
pub async fn get_current_status(&self) -> HealthStatus {
self.current_status.read().await.clone()
}
pub async fn get_latest_metrics(&self) -> Option<HealthMetrics> {
let history = self.metrics_history.read().await;
history.last().cloned()
}
pub async fn get_metrics_history(&self, limit: Option<usize>) -> Vec<HealthMetrics> {
let history = self.metrics_history.read().await;
if let Some(limit) = limit {
if history.len() <= limit {
history.clone()
} else {
history[history.len() - limit..].to_vec()
}
} else {
history.clone()
}
}
pub async fn get_predictions(&self) -> HashMap<String, f64> {
self.predictions.read().await.clone()
}
pub async fn reset(&self) {
*self.metrics_history.write().await = Vec::new();
*self.current_status.write().await = HealthStatus::Unknown;
*self.consecutive_failures.write().await = 0;
*self.predictions.write().await = HashMap::new();
info!("Health monitor state reset");
}
}
impl Default for HealthMonitorConfig {
fn default() -> Self {
Self {
check_interval: Duration::seconds(30),
unhealthy_threshold: 3,
recovery_check_interval: Duration::seconds(10),
enable_predictive_analytics: true,
alert_thresholds: AlertThresholds::default(),
auto_recovery: true,
}
}
}
impl Default for AlertThresholds {
fn default() -> Self {
Self {
cpu_usage_percent: 80.0,
memory_usage_percent: 85.0,
disk_usage_percent: 90.0,
connection_count_threshold: 1000,
error_rate_threshold: 0.05, latency_threshold_ms: 100,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_health_monitor_creation() {
let config = HealthMonitorConfig::default();
let monitor = HealthMonitor::new(config);
let status = monitor.get_current_status().await;
assert_eq!(status, HealthStatus::Unknown);
}
#[tokio::test]
async fn test_health_check() {
let config = HealthMonitorConfig::default();
let monitor = HealthMonitor::new(config);
let metrics = monitor
.perform_health_check("nats://localhost:4222")
.await
.unwrap();
assert!(metrics.cpu_usage >= 0.0);
assert!(metrics.memory_usage >= 0.0);
let status = monitor.get_current_status().await;
assert_ne!(status, HealthStatus::Unknown);
}
#[tokio::test]
async fn test_metrics_history() {
let config = HealthMonitorConfig::default();
let monitor = HealthMonitor::new(config);
for _ in 0..5 {
monitor
.perform_health_check("nats://localhost:4222")
.await
.unwrap();
}
let history = monitor.get_metrics_history(None).await;
assert_eq!(history.len(), 5);
}
}