use super::{QualityTarget, SystemMetrics};
use crate::{Result, VoirsError};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityMetricSample {
#[serde(with = "system_time_serde")]
pub timestamp: SystemTime,
pub quality_score: f32,
pub latency_ms: u64,
pub rtf: f32,
pub cpu_usage: f32,
pub memory_usage: f32,
pub success: bool,
pub text_complexity: Option<f32>,
}
mod system_time_serde {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::time::{SystemTime, UNIX_EPOCH};
pub fn serialize<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let duration = time
.duration_since(UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH");
duration.as_secs().serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(deserializer)?;
Ok(UNIX_EPOCH + std::time::Duration::from_secs(secs))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertThreshold {
QualityDrop(f32),
LatencyExceeds(u64),
RtfExceeds(f32),
ErrorRateExceeds(f32),
CpuExceeds(f32),
MemoryExceeds(f32),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityAlert {
#[serde(with = "system_time_serde")]
pub timestamp: SystemTime,
pub severity: AlertSeverity,
pub message: String,
pub metric_name: String,
pub current_value: f32,
pub threshold_value: f32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum AlertSeverity {
Info,
Warning,
Critical,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TrendDirection {
Improving,
Stable,
Degrading,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricStatistics {
pub name: String,
pub current: f32,
pub average: f32,
pub min: f32,
pub max: f32,
pub std_dev: f32,
pub trend: TrendDirection,
pub sample_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardData {
pub current_quality_score: f32,
pub quality_trend: TrendDirection,
pub avg_latency_ms: f64,
pub latency_trend: TrendDirection,
pub current_rtf: f32,
pub success_rate: f64,
pub total_samples: usize,
pub recent_alerts: Vec<QualityAlert>,
pub metric_stats: Vec<MetricStatistics>,
#[serde(with = "system_time_serde")]
pub last_updated: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitorConfig {
pub sample_window_size: usize,
pub alert_thresholds: Vec<AlertThreshold>,
pub enable_trend_detection: bool,
pub trend_sensitivity: f32,
pub max_alerts_history: usize,
pub alert_retention: Duration,
}
impl Default for MonitorConfig {
fn default() -> Self {
Self {
sample_window_size: 100,
alert_thresholds: vec![
AlertThreshold::QualityDrop(0.20), AlertThreshold::LatencyExceeds(500), AlertThreshold::RtfExceeds(1.0),
AlertThreshold::ErrorRateExceeds(0.10), ],
enable_trend_detection: true,
trend_sensitivity: 0.1,
max_alerts_history: 100,
alert_retention: Duration::from_secs(3600), }
}
}
impl MonitorConfig {
pub fn with_sample_window(mut self, size: usize) -> Self {
self.sample_window_size = size;
self
}
pub fn with_alert_threshold(mut self, threshold: AlertThreshold) -> Self {
self.alert_thresholds.push(threshold);
self
}
pub fn with_trend_sensitivity(mut self, sensitivity: f32) -> Self {
self.trend_sensitivity = sensitivity.clamp(0.0, 1.0);
self
}
}
struct MonitorState {
config: MonitorConfig,
samples: VecDeque<QualityMetricSample>,
alerts: VecDeque<QualityAlert>,
last_update: Instant,
}
pub struct QualityMonitor {
state: Arc<RwLock<MonitorState>>,
}
impl QualityMonitor {
pub fn new(config: MonitorConfig) -> Self {
Self {
state: Arc::new(RwLock::new(MonitorState {
config,
samples: VecDeque::new(),
alerts: VecDeque::new(),
last_update: Instant::now(),
})),
}
}
pub async fn record_sample(&self, sample: QualityMetricSample) -> Result<()> {
let mut state = self.state.write().await;
state.samples.push_back(sample.clone());
if state.samples.len() > state.config.sample_window_size {
state.samples.pop_front();
}
self.check_alerts(&mut state, &sample).await;
self.clean_old_alerts(&mut state);
state.last_update = Instant::now();
Ok(())
}
pub async fn record_from_metrics(
&self,
metrics: &SystemMetrics,
quality: QualityTarget,
latency_ms: u64,
success: bool,
text_complexity: Option<f32>,
) -> Result<()> {
let sample = QualityMetricSample {
timestamp: SystemTime::now(),
quality_score: quality.score() as f32,
latency_ms,
rtf: metrics.current_rtf,
cpu_usage: metrics.cpu_usage,
memory_usage: metrics.memory_usage,
success,
text_complexity,
};
self.record_sample(sample).await
}
pub async fn get_dashboard_data(&self) -> Result<DashboardData> {
let state = self.state.read().await;
if state.samples.is_empty() {
return Ok(DashboardData {
current_quality_score: 0.0,
quality_trend: TrendDirection::Stable,
avg_latency_ms: 0.0,
latency_trend: TrendDirection::Stable,
current_rtf: 0.0,
success_rate: 0.0,
total_samples: 0,
recent_alerts: Vec::new(),
metric_stats: Vec::new(),
last_updated: SystemTime::now(),
});
}
let quality_stats = self.calculate_metric_stats(&state, "quality", |s| s.quality_score);
let latency_stats = self.calculate_metric_stats(&state, "latency", |s| s.latency_ms as f32);
let rtf_stats = self.calculate_metric_stats(&state, "rtf", |s| s.rtf);
let success_count = state.samples.iter().filter(|s| s.success).count();
let success_rate = success_count as f64 / state.samples.len() as f64;
let current_sample = state.samples.back().expect("value should be present");
Ok(DashboardData {
current_quality_score: current_sample.quality_score,
quality_trend: quality_stats.trend,
avg_latency_ms: latency_stats.average as f64,
latency_trend: latency_stats.trend,
current_rtf: current_sample.rtf,
success_rate,
total_samples: state.samples.len(),
recent_alerts: state.alerts.iter().rev().take(10).cloned().collect(),
metric_stats: vec![quality_stats, latency_stats, rtf_stats],
last_updated: SystemTime::now(),
})
}
pub async fn get_recent_alerts(&self, count: usize) -> Result<Vec<QualityAlert>> {
let state = self.state.read().await;
Ok(state.alerts.iter().rev().take(count).cloned().collect())
}
pub async fn get_samples(&self) -> Result<Vec<QualityMetricSample>> {
let state = self.state.read().await;
Ok(state.samples.iter().cloned().collect())
}
pub async fn clear(&self) -> Result<()> {
let mut state = self.state.write().await;
state.samples.clear();
state.alerts.clear();
Ok(())
}
async fn check_alerts(&self, state: &mut MonitorState, sample: &QualityMetricSample) {
for threshold in &state.config.alert_thresholds.clone() {
if let Some(alert) = self.evaluate_threshold(threshold, sample, state) {
tracing::warn!("Quality alert: {}", alert.message);
state.alerts.push_back(alert);
if state.alerts.len() > state.config.max_alerts_history {
state.alerts.pop_front();
}
}
}
}
fn evaluate_threshold(
&self,
threshold: &AlertThreshold,
sample: &QualityMetricSample,
state: &MonitorState,
) -> Option<QualityAlert> {
match threshold {
AlertThreshold::QualityDrop(percent) => {
if state.samples.len() < 10 {
return None; }
let recent_avg: f32 = state
.samples
.iter()
.rev()
.take(10)
.map(|s| s.quality_score)
.sum::<f32>()
/ 10.0;
let drop = (recent_avg - sample.quality_score) / recent_avg;
if drop > *percent {
Some(QualityAlert {
timestamp: SystemTime::now(),
severity: if drop > percent * 2.0 {
AlertSeverity::Critical
} else {
AlertSeverity::Warning
},
message: format!("Quality dropped by {:.1}%", drop * 100.0),
metric_name: "quality".to_string(),
current_value: sample.quality_score,
threshold_value: recent_avg * (1.0 - percent),
})
} else {
None
}
}
AlertThreshold::LatencyExceeds(ms) => {
if sample.latency_ms > *ms {
Some(QualityAlert {
timestamp: SystemTime::now(),
severity: if sample.latency_ms > ms * 2 {
AlertSeverity::Critical
} else {
AlertSeverity::Warning
},
message: format!("Latency exceeded {}ms: {}ms", ms, sample.latency_ms),
metric_name: "latency".to_string(),
current_value: sample.latency_ms as f32,
threshold_value: *ms as f32,
})
} else {
None
}
}
AlertThreshold::RtfExceeds(rtf) => {
if sample.rtf > *rtf {
Some(QualityAlert {
timestamp: SystemTime::now(),
severity: AlertSeverity::Warning,
message: format!("RTF exceeded {}: {:.2}", rtf, sample.rtf),
metric_name: "rtf".to_string(),
current_value: sample.rtf,
threshold_value: *rtf,
})
} else {
None
}
}
AlertThreshold::ErrorRateExceeds(rate) => {
if state.samples.len() < 10 {
return None;
}
let recent_errors = state
.samples
.iter()
.rev()
.take(10)
.filter(|s| !s.success)
.count();
let error_rate = recent_errors as f32 / 10.0;
if error_rate > *rate {
Some(QualityAlert {
timestamp: SystemTime::now(),
severity: AlertSeverity::Critical,
message: format!(
"Error rate exceeded {:.1}%: {:.1}%",
rate * 100.0,
error_rate * 100.0
),
metric_name: "error_rate".to_string(),
current_value: error_rate,
threshold_value: *rate,
})
} else {
None
}
}
AlertThreshold::CpuExceeds(percent) => {
if sample.cpu_usage > *percent {
Some(QualityAlert {
timestamp: SystemTime::now(),
severity: AlertSeverity::Warning,
message: format!("CPU usage high: {:.1}%", sample.cpu_usage * 100.0),
metric_name: "cpu_usage".to_string(),
current_value: sample.cpu_usage,
threshold_value: *percent,
})
} else {
None
}
}
AlertThreshold::MemoryExceeds(percent) => {
if sample.memory_usage > *percent {
Some(QualityAlert {
timestamp: SystemTime::now(),
severity: AlertSeverity::Warning,
message: format!("Memory usage high: {:.1}%", sample.memory_usage * 100.0),
metric_name: "memory_usage".to_string(),
current_value: sample.memory_usage,
threshold_value: *percent,
})
} else {
None
}
}
}
}
fn clean_old_alerts(&self, state: &mut MonitorState) {
let cutoff = SystemTime::now() - state.config.alert_retention;
state.alerts.retain(|alert| alert.timestamp > cutoff);
}
fn calculate_metric_stats<F>(
&self,
state: &MonitorState,
name: &str,
extractor: F,
) -> MetricStatistics
where
F: Fn(&QualityMetricSample) -> f32,
{
if state.samples.is_empty() {
return MetricStatistics {
name: name.to_string(),
current: 0.0,
average: 0.0,
min: 0.0,
max: 0.0,
std_dev: 0.0,
trend: TrendDirection::Stable,
sample_count: 0,
};
}
let values: Vec<f32> = state.samples.iter().map(&extractor).collect();
let current = *values.last().expect("collection should not be empty");
let sum: f32 = values.iter().sum();
let average = sum / values.len() as f32;
let min = values.iter().cloned().fold(f32::INFINITY, f32::min);
let max = values.iter().cloned().fold(f32::NEG_INFINITY, f32::max);
let variance: f32 =
values.iter().map(|v| (v - average).powi(2)).sum::<f32>() / values.len() as f32;
let std_dev = variance.sqrt();
let trend = if state.config.enable_trend_detection && values.len() >= 10 {
self.detect_trend(&values, state.config.trend_sensitivity)
} else {
TrendDirection::Stable
};
MetricStatistics {
name: name.to_string(),
current,
average,
min,
max,
std_dev,
trend,
sample_count: values.len(),
}
}
fn detect_trend(&self, values: &[f32], sensitivity: f32) -> TrendDirection {
if values.len() < 10 {
return TrendDirection::Stable;
}
let n = values.len() as f32;
let x_mean = (n - 1.0) / 2.0;
let y_mean: f32 = values.iter().sum::<f32>() / n;
let mut numerator = 0.0;
let mut denominator = 0.0;
for (i, &y) in values.iter().enumerate() {
let x = i as f32;
numerator += (x - x_mean) * (y - y_mean);
denominator += (x - x_mean).powi(2);
}
let slope = numerator / denominator;
let relative_slope = (slope / y_mean).abs();
if relative_slope < sensitivity {
TrendDirection::Stable
} else if slope > 0.0 {
TrendDirection::Improving
} else {
TrendDirection::Degrading
}
}
}
impl Default for QualityMonitor {
fn default() -> Self {
Self::new(MonitorConfig::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_monitor_config_default() {
let config = MonitorConfig::default();
assert_eq!(config.sample_window_size, 100);
assert!(config.enable_trend_detection);
assert_eq!(config.trend_sensitivity, 0.1);
}
#[test]
fn test_monitor_config_builder() {
let config = MonitorConfig::default()
.with_sample_window(200)
.with_trend_sensitivity(0.2)
.with_alert_threshold(AlertThreshold::QualityDrop(0.3));
assert_eq!(config.sample_window_size, 200);
assert_eq!(config.trend_sensitivity, 0.2);
assert_eq!(config.alert_thresholds.len(), 5); }
#[tokio::test]
async fn test_quality_monitor_creation() {
let monitor = QualityMonitor::new(MonitorConfig::default());
let dashboard = monitor.get_dashboard_data().await.unwrap();
assert_eq!(dashboard.total_samples, 0);
}
#[tokio::test]
async fn test_record_sample() {
let monitor = QualityMonitor::new(MonitorConfig::default());
let sample = QualityMetricSample {
timestamp: SystemTime::now(),
quality_score: 75.0,
latency_ms: 100,
rtf: 0.5,
cpu_usage: 0.4,
memory_usage: 0.5,
success: true,
text_complexity: Some(0.6),
};
monitor.record_sample(sample).await.unwrap();
let dashboard = monitor.get_dashboard_data().await.unwrap();
assert_eq!(dashboard.total_samples, 1);
assert_eq!(dashboard.current_quality_score, 75.0);
}
#[tokio::test]
async fn test_multiple_samples() {
let monitor = QualityMonitor::new(MonitorConfig::default());
for i in 0..10 {
let sample = QualityMetricSample {
timestamp: SystemTime::now(),
quality_score: 70.0 + i as f32,
latency_ms: 100,
rtf: 0.5,
cpu_usage: 0.4,
memory_usage: 0.5,
success: true,
text_complexity: None,
};
monitor.record_sample(sample).await.unwrap();
}
let dashboard = monitor.get_dashboard_data().await.unwrap();
assert_eq!(dashboard.total_samples, 10);
assert_eq!(dashboard.success_rate, 1.0);
}
#[tokio::test]
async fn test_clear_monitor() {
let monitor = QualityMonitor::new(MonitorConfig::default());
let sample = QualityMetricSample {
timestamp: SystemTime::now(),
quality_score: 75.0,
latency_ms: 100,
rtf: 0.5,
cpu_usage: 0.4,
memory_usage: 0.5,
success: true,
text_complexity: None,
};
monitor.record_sample(sample).await.unwrap();
monitor.clear().await.unwrap();
let dashboard = monitor.get_dashboard_data().await.unwrap();
assert_eq!(dashboard.total_samples, 0);
}
}