pub mod alerts;
pub mod health;
pub mod metrics;
use crate::config::MonitoringConfig;
use crate::storage::StorageLayer;
use crate::utils::error::Result;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
#[derive(Clone)]
#[allow(dead_code)]
pub struct MonitoringSystem {
config: Arc<MonitoringConfig>,
storage: Arc<StorageLayer>,
metrics: Arc<metrics::MetricsCollector>,
health: Arc<health::HealthChecker>,
alerts: Option<Arc<alerts::AlertManager>>,
start_time: Instant,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct SystemMetrics {
pub timestamp: chrono::DateTime<chrono::Utc>,
pub requests: RequestMetrics,
pub providers: ProviderMetrics,
pub system: SystemResourceMetrics,
pub errors: ErrorMetrics,
pub performance: PerformanceMetrics,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct RequestMetrics {
pub total_requests: u64,
pub requests_per_second: f64,
pub avg_response_time_ms: f64,
pub p95_response_time_ms: f64,
pub p99_response_time_ms: f64,
pub success_rate: f64,
pub status_codes: std::collections::HashMap<u16, u64>,
pub endpoints: std::collections::HashMap<String, u64>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ProviderMetrics {
pub total_provider_requests: u64,
pub provider_success_rates: std::collections::HashMap<String, f64>,
pub provider_response_times: std::collections::HashMap<String, f64>,
pub provider_errors: std::collections::HashMap<String, u64>,
pub provider_usage: std::collections::HashMap<String, u64>,
pub token_usage: std::collections::HashMap<String, u64>,
pub costs: std::collections::HashMap<String, f64>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct SystemResourceMetrics {
pub cpu_usage: f64,
pub memory_usage: u64,
pub memory_usage_percent: f64,
pub disk_usage: u64,
pub disk_usage_percent: f64,
pub network_bytes_in: u64,
pub network_bytes_out: u64,
pub active_connections: u32,
pub database_connections: u32,
pub redis_connections: u32,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ErrorMetrics {
pub total_errors: u64,
pub error_rate: f64,
pub error_types: std::collections::HashMap<String, u64>,
pub error_endpoints: std::collections::HashMap<String, u64>,
pub critical_errors: u64,
pub warnings: u64,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct PerformanceMetrics {
pub cache_hit_rate: f64,
pub cache_miss_rate: f64,
pub avg_db_query_time_ms: f64,
pub queue_depth: u32,
pub throughput: f64,
pub latency_percentiles: LatencyPercentiles,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct LatencyPercentiles {
pub p50: f64,
pub p90: f64,
pub p95: f64,
pub p99: f64,
pub p999: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum AlertSeverity {
Info,
Warning,
Critical,
Emergency,
}
impl std::fmt::Display for AlertSeverity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AlertSeverity::Info => write!(f, "INFO"),
AlertSeverity::Warning => write!(f, "WARNING"),
AlertSeverity::Critical => write!(f, "CRITICAL"),
AlertSeverity::Emergency => write!(f, "EMERGENCY"),
}
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct Alert {
pub id: String,
pub severity: AlertSeverity,
pub title: String,
pub description: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub source: String,
pub metadata: serde_json::Value,
pub resolved: bool,
}
#[allow(dead_code)]
impl MonitoringSystem {
pub async fn new(config: &MonitoringConfig, storage: Arc<StorageLayer>) -> Result<Self> {
info!("Initializing monitoring system");
let config = Arc::new(config.clone());
let metrics = Arc::new(metrics::MetricsCollector::new(&config).await?);
let health = Arc::new(health::HealthChecker::new(storage.clone()).await?);
let alerts = None;
info!("Monitoring system initialized successfully");
Ok(Self {
config,
storage,
metrics,
health,
alerts,
start_time: Instant::now(),
})
}
pub async fn start(&self) -> Result<()> {
info!("Starting monitoring system");
self.metrics.start().await?;
self.health.start().await?;
if let Some(alerts) = &self.alerts {
alerts.start().await?;
}
self.start_background_tasks().await?;
info!("Monitoring system started successfully");
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping monitoring system");
self.metrics.stop().await?;
self.health.stop().await?;
if let Some(alerts) = &self.alerts {
alerts.stop().await?;
}
info!("Monitoring system stopped");
Ok(())
}
pub async fn get_metrics(&self) -> Result<SystemMetrics> {
debug!("Collecting system metrics");
let timestamp = chrono::Utc::now();
let requests = self.collect_request_metrics().await?;
let providers = self.collect_provider_metrics().await?;
let system = self.collect_system_metrics().await?;
let errors = self.collect_error_metrics().await?;
let performance = self.collect_performance_metrics().await?;
Ok(SystemMetrics {
timestamp,
requests,
providers,
system,
errors,
performance,
})
}
pub async fn record_request(
&self,
method: &str,
path: &str,
status_code: u16,
response_time: Duration,
user_id: Option<uuid::Uuid>,
api_key_id: Option<uuid::Uuid>,
) -> Result<()> {
self.metrics
.record_request(
method,
path,
status_code,
response_time,
user_id,
api_key_id,
)
.await
}
pub async fn record_provider_request(
&self,
provider: &str,
model: &str,
tokens_used: u32,
cost: f64,
response_time: Duration,
success: bool,
) -> Result<()> {
self.metrics
.record_provider_request(provider, model, tokens_used, cost, response_time, success)
.await
}
pub async fn record_error(
&self,
error_type: &str,
error_message: &str,
context: Option<serde_json::Value>,
) -> Result<()> {
self.metrics
.record_error(error_type, error_message, context)
.await
}
pub async fn send_alert(&self, alert: Alert) -> Result<()> {
if let Some(alerts) = &self.alerts {
alerts.send_alert(alert).await
} else {
warn!("Alert manager not configured, skipping alert");
Ok(())
}
}
pub fn get_uptime(&self) -> Duration {
self.start_time.elapsed()
}
pub async fn get_health_status(&self) -> Result<health::HealthStatus> {
self.health.get_status().await
}
async fn start_background_tasks(&self) -> Result<()> {
let monitoring = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
if let Err(e) = monitoring.aggregate_metrics().await {
warn!("Failed to aggregate metrics: {}", e);
}
}
});
let monitoring = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = monitoring.run_health_checks().await {
warn!("Health check failed: {}", e);
}
}
});
if self.alerts.is_some() {
let monitoring = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
if let Err(e) = monitoring.process_alerts().await {
warn!("Failed to process alerts: {}", e);
}
}
});
}
Ok(())
}
async fn aggregate_metrics(&self) -> Result<()> {
debug!("Aggregating metrics");
let _metrics = self.get_metrics().await?;
Ok(())
}
async fn run_health_checks(&self) -> Result<()> {
debug!("Running health checks");
let health_status = self.health.check_all().await?;
if !health_status.overall_healthy {
let alert = Alert {
id: uuid::Uuid::new_v4().to_string(),
severity: AlertSeverity::Critical,
title: "System Health Check Failed".to_string(),
description: format!(
"One or more system components are unhealthy: {:?}",
health_status
),
timestamp: chrono::Utc::now(),
source: "health_checker".to_string(),
metadata: serde_json::to_value(&health_status).unwrap_or_default(),
resolved: false,
};
self.send_alert(alert).await?;
}
Ok(())
}
async fn process_alerts(&self) -> Result<()> {
if let Some(alerts) = &self.alerts {
alerts.process_pending().await?;
}
Ok(())
}
async fn collect_request_metrics(&self) -> Result<RequestMetrics> {
self.metrics.get_request_metrics().await
}
async fn collect_provider_metrics(&self) -> Result<ProviderMetrics> {
self.metrics.get_provider_metrics().await
}
async fn collect_system_metrics(&self) -> Result<SystemResourceMetrics> {
self.metrics.get_system_metrics().await
}
async fn collect_error_metrics(&self) -> Result<ErrorMetrics> {
self.metrics.get_error_metrics().await
}
async fn collect_performance_metrics(&self) -> Result<PerformanceMetrics> {
self.metrics.get_performance_metrics().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_alert_creation() {
let alert = Alert {
id: "test-alert".to_string(),
severity: AlertSeverity::Warning,
title: "Test Alert".to_string(),
description: "This is a test alert".to_string(),
timestamp: chrono::Utc::now(),
source: "test".to_string(),
metadata: serde_json::json!({"test": true}),
resolved: false,
};
assert_eq!(alert.severity, AlertSeverity::Warning);
assert!(!alert.resolved);
}
#[test]
fn test_system_metrics_structure() {
let metrics = SystemMetrics {
timestamp: chrono::Utc::now(),
requests: RequestMetrics {
total_requests: 1000,
requests_per_second: 10.5,
avg_response_time_ms: 150.0,
p95_response_time_ms: 300.0,
p99_response_time_ms: 500.0,
success_rate: 99.5,
status_codes: std::collections::HashMap::new(),
endpoints: std::collections::HashMap::new(),
},
providers: ProviderMetrics {
total_provider_requests: 800,
provider_success_rates: std::collections::HashMap::new(),
provider_response_times: std::collections::HashMap::new(),
provider_errors: std::collections::HashMap::new(),
provider_usage: std::collections::HashMap::new(),
token_usage: std::collections::HashMap::new(),
costs: std::collections::HashMap::new(),
},
system: SystemResourceMetrics {
cpu_usage: 45.2,
memory_usage: 1024 * 1024 * 512, memory_usage_percent: 25.0,
disk_usage: 1024 * 1024 * 1024 * 10, disk_usage_percent: 50.0,
network_bytes_in: 1024 * 1024,
network_bytes_out: 1024 * 512,
active_connections: 100,
database_connections: 10,
redis_connections: 5,
},
errors: ErrorMetrics {
total_errors: 5,
error_rate: 0.1,
error_types: std::collections::HashMap::new(),
error_endpoints: std::collections::HashMap::new(),
critical_errors: 1,
warnings: 4,
},
performance: PerformanceMetrics {
cache_hit_rate: 85.5,
cache_miss_rate: 14.5,
avg_db_query_time_ms: 25.0,
queue_depth: 0,
throughput: 10.5,
latency_percentiles: LatencyPercentiles {
p50: 100.0,
p90: 200.0,
p95: 300.0,
p99: 500.0,
p999: 800.0,
},
},
};
assert_eq!(metrics.requests.total_requests, 1000);
assert_eq!(metrics.system.cpu_usage, 45.2);
assert_eq!(metrics.errors.critical_errors, 1);
}
}