use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::traits::{
Alert, AlertCategory, AlertConfig, AlertHandler, AlertSeverity, AlertThrottling,
MetricPoint, MetricsStorage, HealthStatus,
};
use super::types::{PerformanceMetrics, ResourceUtilization, StreamStatus};
use super::config::MonitoringConfig;
use super::PipelineError;
pub struct PipelinePerformanceMonitor {
config: MonitoringConfig,
metrics_storage: Arc<dyn MetricsStorage>,
alert_manager: Arc<AlertManager>,
health_checker: Arc<HealthChecker>,
metrics_collector: Arc<MetricsCollector>,
is_running: Arc<Mutex<bool>>,
}
impl PipelinePerformanceMonitor {
pub fn new(
config: MonitoringConfig,
metrics_storage: Arc<dyn MetricsStorage>,
alert_handler: Arc<dyn AlertHandler>,
) -> Self {
let alert_manager = Arc::new(AlertManager::new(alert_handler));
let health_checker = Arc::new(HealthChecker::new());
let metrics_collector = Arc::new(MetricsCollector::new());
Self {
config,
metrics_storage,
alert_manager,
health_checker,
metrics_collector,
is_running: Arc::new(Mutex::new(false)),
}
}
pub async fn start(&self) -> Result<(), PipelineError> {
let mut running = self.is_running.lock().await;
if *running {
return Err(PipelineError::AlreadyRunning);
}
*running = true;
self.start_metrics_collection().await;
self.start_health_monitoring().await;
self.start_alert_processing().await;
Ok(())
}
pub async fn stop(&self) -> Result<(), PipelineError> {
let mut running = self.is_running.lock().await;
if !*running {
return Err(PipelineError::NotRunning);
}
*running = false;
Ok(())
}
pub async fn record_metric(&self, metric: MetricPoint) -> Result<(), PipelineError> {
self.metrics_storage.store_metric(metric).await
.map_err(|e| PipelineError::MonitoringError {
message: format!("Failed to store metric: {}", e),
})
}
pub async fn get_current_metrics(&self) -> Result<PerformanceMetrics, PipelineError> {
self.metrics_collector.get_current_metrics().await
}
pub async fn get_health_status(&self) -> Result<HealthStatus, PipelineError> {
self.health_checker.get_overall_health().await
}
pub async fn register_alert(&self, alert: Alert) -> Result<(), PipelineError> {
self.alert_manager.register_alert(alert).await
}
async fn start_metrics_collection(&self) {
let metrics_collector = Arc::clone(&self.metrics_collector);
let metrics_storage = Arc::clone(&self.metrics_storage);
let is_running = Arc::clone(&self.is_running);
let collection_interval = self.config.metrics_collection_interval;
tokio::spawn(async move {
while *is_running.lock().await {
if let Ok(metrics) = metrics_collector.collect_metrics().await {
for metric in metrics {
let _ = metrics_storage.store_metric(metric).await;
}
}
tokio::time::sleep(collection_interval).await;
}
});
}
async fn start_health_monitoring(&self) {
let health_checker = Arc::clone(&self.health_checker);
let alert_manager = Arc::clone(&self.alert_manager);
let is_running = Arc::clone(&self.is_running);
let health_check_interval = self.config.health_check_interval;
tokio::spawn(async move {
while *is_running.lock().await {
if let Ok(health_status) = health_checker.check_health().await {
if health_status != HealthStatus::Healthy {
let alert = Alert {
id: Uuid::new_v4(),
category: AlertCategory::Health,
severity: AlertSeverity::Warning,
message: format!("Health check failed: {:?}", health_status),
timestamp: SystemTime::now(),
source: "health_monitor".to_string(),
metadata: HashMap::new(),
};
let _ = alert_manager.register_alert(alert).await;
}
}
tokio::time::sleep(health_check_interval).await;
}
});
}
async fn start_alert_processing(&self) {
let alert_manager = Arc::clone(&self.alert_manager);
let is_running = Arc::clone(&self.is_running);
tokio::spawn(async move {
while *is_running.lock().await {
let _ = alert_manager.process_alerts().await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}
}
pub struct AlertManager {
alert_handler: Arc<dyn AlertHandler>,
active_alerts: Arc<RwLock<HashMap<Uuid, Alert>>>,
alert_history: Arc<Mutex<VecDeque<Alert>>>,
throttling_state: Arc<RwLock<HashMap<String, Instant>>>,
}
impl AlertManager {
pub fn new(alert_handler: Arc<dyn AlertHandler>) -> Self {
Self {
alert_handler,
active_alerts: Arc::new(RwLock::new(HashMap::new())),
alert_history: Arc::new(Mutex::new(VecDeque::new())),
throttling_state: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register_alert(&self, alert: Alert) -> Result<(), PipelineError> {
if self.should_throttle_alert(&alert).await {
return Ok(());
}
{
let mut active_alerts = self.active_alerts.write().await;
active_alerts.insert(alert.id, alert.clone());
}
{
let mut history = self.alert_history.lock().await;
history.push_back(alert.clone());
while history.len() > 1000 {
history.pop_front();
}
}
self.alert_handler.handle_alert(alert).await
.map_err(|e| PipelineError::MonitoringError {
message: format!("Alert handling failed: {}", e),
})
}
pub async fn process_alerts(&self) -> Result<(), PipelineError> {
let alerts = {
self.active_alerts.read().await.values().cloned().collect::<Vec<_>>()
};
for alert in alerts {
self.check_alert_status(alert).await?;
}
Ok(())
}
pub async fn get_active_alerts(&self) -> Vec<Alert> {
self.active_alerts.read().await.values().cloned().collect()
}
async fn should_throttle_alert(&self, alert: &Alert) -> bool {
let throttle_key = format!("{}:{}", alert.category, alert.source);
let throttling_state = self.throttling_state.read().await;
if let Some(last_sent) = throttling_state.get(&throttle_key) {
let throttle_duration = match alert.severity {
AlertSeverity::Critical => Duration::from_secs(60),
AlertSeverity::Error => Duration::from_secs(300),
AlertSeverity::Warning => Duration::from_secs(600),
AlertSeverity::Info => Duration::from_secs(1200),
};
last_sent.elapsed() < throttle_duration
} else {
false
}
}
async fn check_alert_status(&self, _alert: Alert) -> Result<(), PipelineError> {
Ok(())
}
}
pub struct HealthChecker {
component_health: Arc<RwLock<HashMap<String, HealthStatus>>>,
}
impl HealthChecker {
pub fn new() -> Self {
Self {
component_health: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn check_health(&self) -> Result<HealthStatus, PipelineError> {
let component_health = self.component_health.read().await;
if component_health.is_empty() {
return Ok(HealthStatus::Healthy);
}
for status in component_health.values() {
match status {
HealthStatus::Critical | HealthStatus::Unhealthy => return Ok(*status),
_ => continue,
}
}
for status in component_health.values() {
if *status == HealthStatus::Degraded {
return Ok(HealthStatus::Degraded);
}
}
Ok(HealthStatus::Healthy)
}
pub async fn get_overall_health(&self) -> Result<HealthStatus, PipelineError> {
self.check_health().await
}
pub async fn update_component_health(&self, component: String, status: HealthStatus) {
let mut component_health = self.component_health.write().await;
component_health.insert(component, status);
}
}
pub struct MetricsCollector {
current_metrics: Arc<RwLock<PerformanceMetrics>>,
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
current_metrics: Arc::new(RwLock::new(PerformanceMetrics::default())),
}
}
pub async fn collect_metrics(&self) -> Result<Vec<MetricPoint>, PipelineError> {
let mut metrics = Vec::new();
let timestamp = SystemTime::now();
metrics.push(MetricPoint {
name: "cpu_usage".to_string(),
value: self.get_cpu_usage().await,
timestamp,
labels: HashMap::new(),
});
metrics.push(MetricPoint {
name: "memory_usage".to_string(),
value: self.get_memory_usage().await,
timestamp,
labels: HashMap::new(),
});
metrics.push(MetricPoint {
name: "embedding_throughput".to_string(),
value: self.get_embedding_throughput().await,
timestamp,
labels: HashMap::new(),
});
Ok(metrics)
}
pub async fn get_current_metrics(&self) -> Result<PerformanceMetrics, PipelineError> {
let metrics = self.current_metrics.read().await;
Ok(metrics.clone())
}
async fn get_cpu_usage(&self) -> f64 {
50.0
}
async fn get_memory_usage(&self) -> f64 {
1024.0 * 1024.0 * 512.0 }
async fn get_embedding_throughput(&self) -> f64 {
100.0 }
}
pub struct InMemoryMetricsStorage {
metrics: Arc<RwLock<VecDeque<MetricPoint>>>,
max_metrics: usize,
}
impl InMemoryMetricsStorage {
pub fn new(max_metrics: usize) -> Self {
Self {
metrics: Arc::new(RwLock::new(VecDeque::new())),
max_metrics,
}
}
}
#[async_trait::async_trait]
impl MetricsStorage for InMemoryMetricsStorage {
async fn store_metric(&self, metric: MetricPoint) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut metrics = self.metrics.write().await;
metrics.push_back(metric);
while metrics.len() > self.max_metrics {
metrics.pop_front();
}
Ok(())
}
async fn get_metrics(
&self,
metric_name: &str,
start_time: SystemTime,
end_time: SystemTime,
) -> Result<Vec<MetricPoint>, Box<dyn std::error::Error + Send + Sync>> {
let metrics = self.metrics.read().await;
let filtered_metrics: Vec<MetricPoint> = metrics
.iter()
.filter(|m| {
m.name == metric_name
&& m.timestamp >= start_time
&& m.timestamp <= end_time
})
.cloned()
.collect();
Ok(filtered_metrics)
}
}
pub struct ConsoleAlertHandler;
#[async_trait::async_trait]
impl AlertHandler for ConsoleAlertHandler {
async fn handle_alert(&self, alert: Alert) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!(
"[ALERT] {} - {} - {} - {}",
alert.severity,
alert.category,
alert.source,
alert.message
);
Ok(())
}
}
#[cfg(test)]
mod tests {
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
use super::*;
#[tokio::test]
async fn test_metrics_collector_creation() -> Result<()> {
let collector = MetricsCollector::new();
let metrics = collector.collect_metrics().await?;
assert!(!metrics.is_empty());
Ok(())
}
#[tokio::test]
async fn test_health_checker() -> Result<()> {
let checker = HealthChecker::new();
let health = checker.get_overall_health().await?;
assert_eq!(health, HealthStatus::Healthy);
Ok(())
}
#[tokio::test]
async fn test_alert_manager() {
let handler = Arc::new(ConsoleAlertHandler);
let manager = AlertManager::new(handler);
let alert = Alert {
id: Uuid::new_v4(),
category: AlertCategory::Performance,
severity: AlertSeverity::Warning,
message: "Test alert".to_string(),
timestamp: SystemTime::now(),
source: "test".to_string(),
metadata: HashMap::new(),
};
let result = manager.register_alert(alert).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_in_memory_metrics_storage() {
let storage = InMemoryMetricsStorage::new(100);
let metric = MetricPoint {
name: "test_metric".to_string(),
value: 42.0,
timestamp: SystemTime::now(),
labels: HashMap::new(),
};
let result = storage.store_metric(metric).await;
assert!(result.is_ok());
}
}