use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::RwLock;
use serde::Serialize;
use tracing::{info, debug};
use super::health_check::{HealthCheckable, HealthStatus, ComponentType};
use super::config::MonitoringConfig;
#[derive(Debug, Serialize, Clone)]
pub struct SystemHealthReport {
pub overall_health: bool,
pub components: HashMap<String, HealthStatus>,
pub critical_components: Vec<String>,
pub warnings: Vec<String>,
pub timestamp: u64,
}
pub struct UnifiedMonitor {
monitors: Arc<RwLock<HashMap<String, Arc<dyn Monitor>>>>,
health_checks: Arc<RwLock<HashMap<String, Arc<dyn HealthCheckable>>>>, alerts: Arc<RwLock<VecDeque<Alert>>>,
config: MonitoringConfig,
health_history: Arc<RwLock<VecDeque<SystemHealthReport>>>, }
impl UnifiedMonitor {
pub fn new(config: MonitoringConfig) -> Self {
Self {
monitors: Arc::new(RwLock::new(HashMap::new())),
health_checks: Arc::new(RwLock::new(HashMap::new())), alerts: Arc::new(RwLock::new(VecDeque::with_capacity(config.max_alerts))),
config,
health_history: Arc::new(RwLock::new(VecDeque::with_capacity(100))), }
}
pub fn get_config(&self) -> &MonitoringConfig {
&self.config
}
async fn cleanup_old_data(&self) {
self.cleanup_old_alerts().await;
self.cleanup_old_health_history().await;
}
async fn record_internal_metrics(&self) {
let health_checks = self.health_checks.read().await;
let monitors = self.monitors.read().await;
let alerts = self.alerts.read().await;
let health_history_size = self.health_history.read().await.len();
debug!(
health_checks_count = health_checks.len(),
monitors_count = monitors.len(),
alerts_count = alerts.len(),
health_history_size = health_history_size,
"Monitoring internal stats"
);
}
pub async fn register_monitor(&self, monitor: Arc<dyn Monitor>) {
let mut monitors = self.monitors.write().await;
monitors.insert(monitor.name().to_string(), monitor.clone());
info!("Monitor registered: {}", monitor.name());
}
pub async fn get_monitor(&self, name: &str) -> Option<Arc<dyn Monitor>> {
let monitors = self.monitors.read().await;
monitors.get(name).cloned()
}
pub async fn add_alert(&self, level: AlertLevel, source: &str, message: &str) {
if !self.config.enabled {
return;
}
use std::sync::atomic::{AtomicUsize, Ordering};
static CLEANUP_COUNTER: AtomicUsize = AtomicUsize::new(0);
let count = CLEANUP_COUNTER.fetch_add(1, Ordering::Relaxed);
if count % 100 == 0 {
self.cleanup_old_alerts().await;
}
let alert = Alert {
level: level.clone(),
source: source.to_string(),
message: message.to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
metadata: HashMap::new(),
};
let mut alerts = self.alerts.write().await;
if alerts.len() >= self.config.max_alerts {
alerts.pop_front();
}
alerts.push_back(alert.clone());
if self.config.alerting_enabled {
match level {
AlertLevel::Info => info!("[{}] {}", source, message),
AlertLevel::Warning => tracing::warn!("[{}] {}", source, message),
AlertLevel::Error => tracing::error!("[{}] {}", source, message),
AlertLevel::Critical => {
tracing::error!("🚨 CRITICAL [{}] {}", source, message);
}
}
}
}
pub async fn collect_all_metrics(&self) -> HashMap<String, MonitorMetrics> {
let monitors = self.monitors.read().await;
let mut all_metrics = HashMap::new();
for (name, monitor) in monitors.iter() {
let metrics = monitor.collect_metrics().await;
all_metrics.insert(name.clone(), metrics);
}
all_metrics
}
pub async fn overall_health(&self) -> bool {
let monitors = self.monitors.read().await;
for monitor in monitors.values() {
if !monitor.health_check().await {
return false;
}
}
true
}
pub async fn get_prometheus_metrics(&self) -> String {
if !self.config.prometheus_enabled {
return String::new();
}
let all_metrics = self.collect_all_metrics().await;
let mut prometheus_output = String::new();
for (monitor_name, metrics) in all_metrics {
for (metric_name, metric_value) in metrics.metrics {
let metric_line = match metric_value {
MetricValue::Counter(value) => {
format!("{}_{}{{monitor=\"{}\"}} {}\n",
monitor_name, metric_name, monitor_name, value)
}
MetricValue::Gauge(value) => {
format!("{}_{}{{monitor=\"{}\"}} {}\n",
monitor_name, metric_name, monitor_name, value)
}
MetricValue::Boolean(value) => {
format!("{}_{}{{monitor=\"{}\"}} {}\n",
monitor_name, metric_name, monitor_name, if value { 1 } else { 0 })
}
_ => continue,
};
prometheus_output.push_str(&metric_line);
}
}
prometheus_output
}
pub async fn get_recent_alerts(&self, limit: usize) -> Vec<Alert> {
let alerts = self.alerts.read().await;
alerts.iter().rev().take(limit).cloned().collect()
}
pub async fn reset_all_metrics(&self) {
let monitors = self.monitors.read().await;
for monitor in monitors.values() {
monitor.reset_metrics().await;
}
info!("All metrics reset");
}
pub async fn register_health_check(&self, component: Arc<dyn HealthCheckable>) {
let mut health_checks = self.health_checks.write().await;
health_checks.insert(component.component_name().to_string(), component.clone());
info!("Health check registered: {}", component.component_name());
}
pub async fn comprehensive_health_check(&self) -> SystemHealthReport {
let health_checks = self.health_checks.read().await;
let start_time = std::time::Instant::now();
let mut health_futures = Vec::new();
let mut component_names = Vec::new();
for (name, health_check) in health_checks.iter() {
let name_clone = name.clone();
let health_check_clone = health_check.clone();
health_futures.push(async move {
let status = health_check_clone.health_check().await;
(name_clone, health_check_clone, status)
});
component_names.push(name.clone());
}
let results = futures::future::join_all(health_futures).await;
let mut components = HashMap::new();
let mut critical_components = Vec::new();
let mut warnings = Vec::new();
for (name, health_check, status) in results {
if !status.is_healthy {
match health_check.component_type() {
ComponentType::Database | ComponentType::ExternalService => {
critical_components.push(name.clone());
},
_ => {
warnings.push(format!("{}: {}", name, status.message));
}
}
}
components.insert(name, status);
}
let overall_health = critical_components.is_empty();
let report = SystemHealthReport {
overall_health,
components,
critical_components,
warnings,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
let mut history = self.health_history.write().await;
history.push_back(report.clone());
if history.len() > 50 {
history.pop_front();
}
static CLEANUP_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
let count = CLEANUP_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if count % 10 == 0 {
self.cleanup_old_data().await;
}
let duration = start_time.elapsed();
debug!(
duration_ms = duration.as_millis(),
"Health check completed"
);
self.record_internal_metrics().await;
report
}
pub async fn get_health_history(&self, limit: usize) -> Vec<SystemHealthReport> {
let history = self.health_history.read().await;
history.iter().rev().take(limit).cloned().collect()
}
pub async fn critical_health_check(&self) -> bool {
let report = self.comprehensive_health_check().await;
report.critical_components.is_empty()
}
pub async fn generate_web_report(&self) -> serde_json::Value {
let health_report = self.comprehensive_health_check().await;
let metrics = self.collect_all_metrics().await;
let recent_alerts = self.get_recent_alerts(10).await;
serde_json::json!({
"system_health": health_report,
"metrics": metrics,
"recent_alerts": recent_alerts,
"server_info": {
"uptime": self.get_uptime(),
"version": env!("CARGO_PKG_VERSION"),
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
})
}
async fn cleanup_old_alerts(&self) {
let retention_days = self.config.retention_days;
if retention_days == 0 {
return; }
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let retention_seconds = (retention_days as u64) * 24 * 60 * 60;
let mut alerts = self.alerts.write().await;
alerts.retain(|alert| {
now - alert.timestamp <= retention_seconds
});
}
async fn cleanup_old_health_history(&self) {
let retention_days = self.config.retention_days;
if retention_days == 0 {
return;
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let retention_seconds = (retention_days as u64) * 24 * 60 * 60;
let mut history = self.health_history.write().await;
history.retain(|report| {
now - report.timestamp <= retention_seconds
});
}
fn get_uptime(&self) -> u64 {
0
}
}
#[derive(Debug, Serialize, Clone)]
pub struct MonitorMetrics {
pub name: String,
pub timestamp: u64,
pub metrics: HashMap<String, MetricValue>,
pub health: bool,
}
#[derive(Debug, Serialize, Clone)]
pub enum MetricValue {
Counter(u64),
Gauge(f64),
Histogram(Vec<f64>),
String(String),
Boolean(bool),
}
#[derive(Debug, Serialize, Clone)]
pub struct Alert {
pub level: AlertLevel,
pub source: String,
pub message: String,
pub timestamp: u64,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Serialize, Clone, PartialEq)]
pub enum AlertLevel {
Info,
Warning,
Error,
Critical,
}
#[async_trait::async_trait]
pub trait Monitor: Send + Sync {
fn name(&self) -> &'static str;
async fn collect_metrics(&self) -> MonitorMetrics;
async fn health_check(&self) -> bool;
async fn reset_metrics(&self);
}