use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use super::collector::CacheTelemetryCollector;
use super::types::{
AggregatedStats, AlertSeverity, AlertThresholds, AlertType, CacheTelemetryMetrics,
PerformanceAlert, PerformanceBaselines, TelemetryConfig,
};
pub struct EnhancedTelemetryCollector {
pub(super) base_collector: CacheTelemetryCollector,
alert_thresholds: Arc<Mutex<AlertThresholds>>,
baselines: Arc<Mutex<PerformanceBaselines>>,
aggregated_stats: Arc<Mutex<AggregatedStats>>,
active_alerts: Arc<Mutex<HashMap<AlertType, PerformanceAlert>>>,
}
impl EnhancedTelemetryCollector {
pub fn new(config: TelemetryConfig) -> Self {
Self {
base_collector: CacheTelemetryCollector::new(config),
alert_thresholds: Arc::new(Mutex::new(AlertThresholds::default())),
baselines: Arc::new(Mutex::new(PerformanceBaselines::default())),
aggregated_stats: Arc::new(Mutex::new(AggregatedStats::default())),
active_alerts: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn record_hit(&self, latency: Duration, size_bytes: Option<usize>, key_hash: u64) {
self.base_collector
.record_hit(latency, size_bytes, key_hash);
self.update_aggregated_stats();
self.check_alerts();
}
pub fn record_miss(&self, latency: Duration, size_bytes: Option<usize>, key_hash: u64) {
self.base_collector
.record_miss(latency, size_bytes, key_hash);
self.update_aggregated_stats();
self.check_alerts();
}
pub fn get_metrics(&self) -> CacheTelemetryMetrics {
self.base_collector.get_metrics()
}
pub fn get_aggregated_stats(&self) -> AggregatedStats {
self.aggregated_stats
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
self.active_alerts
.lock()
.expect("active_alerts lock should not be poisoned")
.values()
.cloned()
.collect()
}
pub fn establish_baselines(&self, sample_count: usize) {
let snapshots = self.base_collector.get_snapshots();
if snapshots.is_empty() {
return;
}
let recent: Vec<_> = snapshots.iter().rev().take(sample_count).collect();
if recent.is_empty() {
return;
}
let n = recent.len() as f64;
let avg_hit_rate = recent.iter().map(|s| s.metrics.hit_ratio).sum::<f64>() / n;
let avg_latency = recent
.iter()
.map(|s| s.metrics.avg_hit_latency_us)
.sum::<f64>()
/ n;
let avg_throughput = recent
.iter()
.map(|s| s.metrics.requests_per_second)
.sum::<f64>()
/ n;
let mut baselines = self.baselines.lock().expect("lock should not be poisoned");
baselines.baseline_hit_rate = avg_hit_rate;
baselines.baseline_latency_us = avg_latency;
baselines.baseline_throughput = avg_throughput;
baselines.established_at = SystemTime::now();
baselines.sample_count = recent.len();
}
pub fn get_baselines(&self) -> PerformanceBaselines {
self.baselines
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn set_alert_thresholds(&self, thresholds: AlertThresholds) {
*self
.alert_thresholds
.lock()
.expect("lock should not be poisoned") = thresholds;
}
fn update_aggregated_stats(&self) {
let snapshots = self.base_collector.get_snapshots();
if snapshots.is_empty() {
return;
}
let recent: Vec<_> = snapshots.iter().rev().take(60).collect();
if recent.is_empty() {
return;
}
let mut stats = self
.aggregated_stats
.lock()
.expect("lock should not be poisoned");
let n = recent.len() as f64;
stats.moving_avg_hit_rate = recent.iter().map(|s| s.metrics.hit_ratio).sum::<f64>() / n;
stats.moving_avg_latency_us = recent
.iter()
.map(|s| s.metrics.avg_hit_latency_us)
.sum::<f64>()
/ n;
let hit_rate_variance = recent
.iter()
.map(|s| (s.metrics.hit_ratio - stats.moving_avg_hit_rate).powi(2))
.sum::<f64>()
/ n;
stats.hit_rate_stddev = hit_rate_variance.sqrt();
let latency_variance = recent
.iter()
.map(|s| (s.metrics.avg_hit_latency_us - stats.moving_avg_latency_us).powi(2))
.sum::<f64>()
/ n;
stats.latency_stddev = latency_variance.sqrt();
for snapshot in &recent {
stats.peak_hit_rate = stats.peak_hit_rate.max(snapshot.metrics.hit_ratio);
stats.lowest_hit_rate = stats.lowest_hit_rate.min(snapshot.metrics.hit_ratio);
}
stats.total_requests = recent
.last()
.map(|s| s.metrics.hits + s.metrics.misses)
.unwrap_or(0);
}
fn check_alerts(&self) {
let metrics = self.get_metrics();
let stats = self.get_aggregated_stats();
let thresholds = self
.alert_thresholds
.lock()
.expect("lock should not be poisoned");
let mut alerts = self
.active_alerts
.lock()
.expect("lock should not be poisoned");
if metrics.hit_ratio < thresholds.min_hit_rate {
let alert = PerformanceAlert {
alert_type: AlertType::LowHitRate,
severity: if metrics.hit_ratio < thresholds.min_hit_rate * 0.8 {
AlertSeverity::Critical
} else {
AlertSeverity::Warning
},
description: format!(
"Cache hit rate ({:.2}%) below threshold ({:.2}%)",
metrics.hit_ratio * 100.0,
thresholds.min_hit_rate * 100.0
),
metric_value: metrics.hit_ratio,
threshold_value: thresholds.min_hit_rate,
timestamp: SystemTime::now(),
};
alerts.insert(AlertType::LowHitRate, alert);
} else {
alerts.remove(&AlertType::LowHitRate);
}
if metrics.avg_hit_latency_us > thresholds.max_latency_us {
let alert = PerformanceAlert {
alert_type: AlertType::HighLatency,
severity: if metrics.avg_hit_latency_us > thresholds.max_latency_us * 1.5 {
AlertSeverity::Critical
} else {
AlertSeverity::Warning
},
description: format!(
"Average latency ({:.2}\u{b5}s) exceeds threshold ({:.2}\u{b5}s)",
metrics.avg_hit_latency_us, thresholds.max_latency_us
),
metric_value: metrics.avg_hit_latency_us,
threshold_value: thresholds.max_latency_us,
timestamp: SystemTime::now(),
};
alerts.insert(AlertType::HighLatency, alert);
} else {
alerts.remove(&AlertType::HighLatency);
}
if metrics.eviction_rate > thresholds.max_eviction_rate {
let alert = PerformanceAlert {
alert_type: AlertType::HighEvictionRate,
severity: AlertSeverity::Warning,
description: format!(
"Eviction rate ({:.2}/s) exceeds threshold ({:.2}/s)",
metrics.eviction_rate, thresholds.max_eviction_rate
),
metric_value: metrics.eviction_rate,
threshold_value: thresholds.max_eviction_rate,
timestamp: SystemTime::now(),
};
alerts.insert(AlertType::HighEvictionRate, alert);
} else {
alerts.remove(&AlertType::HighEvictionRate);
}
if stats.hit_rate_stddev > 0.0 {
let z_score =
(metrics.hit_ratio - stats.moving_avg_hit_rate).abs() / stats.hit_rate_stddev;
if z_score > thresholds.anomaly_stddev_multiplier {
let alert = PerformanceAlert {
alert_type: AlertType::AnomalyDetected,
severity: AlertSeverity::Warning,
description: format!("Hit rate anomaly detected (z-score: {:.2})", z_score),
metric_value: z_score,
threshold_value: thresholds.anomaly_stddev_multiplier,
timestamp: SystemTime::now(),
};
alerts.insert(AlertType::AnomalyDetected, alert);
} else {
alerts.remove(&AlertType::AnomalyDetected);
}
}
}
}