use crate::core::error::{Error, Result};
use crate::ml::serving::{DeploymentMetrics, HealthStatus, ModelMetadata};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMetrics {
pub model_name: String,
pub model_version: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub latency: LatencyMetrics,
pub throughput: ThroughputMetrics,
pub error_metrics: ErrorMetrics,
pub resource_utilization: ResourceUtilizationMetrics,
pub quality_metrics: QualityMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LatencyMetrics {
pub avg_latency_ms: f64,
pub p50_latency_ms: f64,
pub p95_latency_ms: f64,
pub p99_latency_ms: f64,
pub max_latency_ms: f64,
pub min_latency_ms: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThroughputMetrics {
pub requests_per_second: f64,
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub concurrent_requests: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorMetrics {
pub error_rate: f64,
pub error_rates_by_type: HashMap<String, f64>,
pub error_counts_by_type: HashMap<String, u64>,
pub recent_errors: Vec<ErrorEvent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorEvent {
pub error_type: String,
pub message: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub context: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceUtilizationMetrics {
pub cpu_utilization: f64,
pub memory_utilization: f64,
pub gpu_utilization: Option<f64>,
pub disk_io_utilization: f64,
pub network_io_utilization: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityMetrics {
pub accuracy: Option<f64>,
pub confidence_scores: ConfidenceMetrics,
pub data_drift: DriftMetrics,
pub model_drift: DriftMetrics,
pub feature_importance_drift: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConfidenceMetrics {
pub avg_confidence: f64,
pub min_confidence: f64,
pub max_confidence: f64,
pub low_confidence_rate: f64,
pub confidence_threshold: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DriftMetrics {
pub drift_score: f64,
pub drift_detected: bool,
pub detection_method: String,
pub threshold: f64,
pub drifting_features: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AlertSeverity {
Info,
Warning,
Critical,
Emergency,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertConfig {
pub name: String,
pub description: String,
pub metric: String,
pub threshold: f64,
pub operator: ComparisonOperator,
pub severity: AlertSeverity,
pub evaluation_window_seconds: u64,
pub consecutive_evaluations: usize,
pub cooldown_seconds: u64,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ComparisonOperator {
GreaterThan,
GreaterThanOrEqual,
LessThan,
LessThanOrEqual,
Equal,
NotEqual,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertEvent {
pub alert_config: AlertConfig,
pub current_value: f64,
pub threshold_value: f64,
pub message: String,
pub triggered_at: chrono::DateTime<chrono::Utc>,
pub model_name: String,
pub model_version: String,
pub context: HashMap<String, String>,
}
pub struct ModelMonitor {
model_metadata: ModelMetadata,
metrics_history: VecDeque<PerformanceMetrics>,
alert_configs: Vec<AlertConfig>,
alert_events: VecDeque<AlertEvent>,
alert_counters: HashMap<String, usize>,
last_alert_times: HashMap<String, Instant>,
max_history_size: usize,
collection_interval: Duration,
last_collection: Instant,
}
impl ModelMonitor {
pub fn new(model_metadata: ModelMetadata) -> Self {
Self {
model_metadata,
metrics_history: VecDeque::new(),
alert_configs: Vec::new(),
alert_events: VecDeque::new(),
alert_counters: HashMap::new(),
last_alert_times: HashMap::new(),
max_history_size: 1440, collection_interval: Duration::from_secs(60), last_collection: Instant::now(),
}
}
pub fn add_alert(&mut self, config: AlertConfig) {
self.alert_configs.push(config);
}
pub fn remove_alert(&mut self, alert_name: &str) {
self.alert_configs
.retain(|config| config.name != alert_name);
self.alert_counters.remove(alert_name);
self.last_alert_times.remove(alert_name);
}
pub fn collect_metrics(&mut self, deployment_metrics: &DeploymentMetrics) -> Result<()> {
if self.last_collection.elapsed() < self.collection_interval {
return Ok(());
}
let performance_metrics = PerformanceMetrics {
model_name: self.model_metadata.name.clone(),
model_version: self.model_metadata.version.clone(),
timestamp: chrono::Utc::now(),
latency: self.calculate_latency_metrics(deployment_metrics),
throughput: self.calculate_throughput_metrics(deployment_metrics),
error_metrics: self.calculate_error_metrics(deployment_metrics),
resource_utilization: self.calculate_resource_metrics(deployment_metrics),
quality_metrics: self.calculate_quality_metrics(),
};
self.metrics_history.push_back(performance_metrics.clone());
while self.metrics_history.len() > self.max_history_size {
self.metrics_history.pop_front();
}
self.evaluate_alerts(&performance_metrics)?;
self.last_collection = Instant::now();
Ok(())
}
fn calculate_latency_metrics(&self, deployment_metrics: &DeploymentMetrics) -> LatencyMetrics {
let avg_latency = deployment_metrics.avg_response_time_ms;
LatencyMetrics {
avg_latency_ms: avg_latency,
p50_latency_ms: avg_latency * 0.8,
p95_latency_ms: avg_latency * 1.5,
p99_latency_ms: avg_latency * 2.0,
max_latency_ms: avg_latency * 3.0,
min_latency_ms: avg_latency * 0.5,
}
}
fn calculate_throughput_metrics(
&self,
deployment_metrics: &DeploymentMetrics,
) -> ThroughputMetrics {
ThroughputMetrics {
requests_per_second: deployment_metrics.request_rate,
total_requests: deployment_metrics.total_requests,
successful_requests: deployment_metrics.successful_requests,
failed_requests: deployment_metrics.failed_requests,
concurrent_requests: deployment_metrics.active_instances as u64,
}
}
fn calculate_error_metrics(&self, deployment_metrics: &DeploymentMetrics) -> ErrorMetrics {
let mut error_rates_by_type = HashMap::new();
let mut error_counts_by_type = HashMap::new();
if deployment_metrics.error_rate > 0.0 {
error_rates_by_type.insert(
"prediction_error".to_string(),
deployment_metrics.error_rate * 0.7,
);
error_rates_by_type.insert(
"timeout_error".to_string(),
deployment_metrics.error_rate * 0.2,
);
error_rates_by_type.insert(
"validation_error".to_string(),
deployment_metrics.error_rate * 0.1,
);
let total_errors = deployment_metrics.failed_requests;
error_counts_by_type.insert(
"prediction_error".to_string(),
(total_errors as f64 * 0.7) as u64,
);
error_counts_by_type.insert(
"timeout_error".to_string(),
(total_errors as f64 * 0.2) as u64,
);
error_counts_by_type.insert(
"validation_error".to_string(),
(total_errors as f64 * 0.1) as u64,
);
}
ErrorMetrics {
error_rate: deployment_metrics.error_rate,
error_rates_by_type,
error_counts_by_type,
recent_errors: Vec::new(), }
}
fn calculate_resource_metrics(
&self,
deployment_metrics: &DeploymentMetrics,
) -> ResourceUtilizationMetrics {
ResourceUtilizationMetrics {
cpu_utilization: deployment_metrics.cpu_utilization,
memory_utilization: deployment_metrics.memory_utilization,
gpu_utilization: None, disk_io_utilization: deployment_metrics.cpu_utilization * 0.3, network_io_utilization: deployment_metrics.request_rate / 1000.0, }
}
fn calculate_quality_metrics(&self) -> QualityMetrics {
QualityMetrics {
accuracy: None, confidence_scores: ConfidenceMetrics {
avg_confidence: 0.85,
min_confidence: 0.1,
max_confidence: 0.99,
low_confidence_rate: 0.05,
confidence_threshold: 0.7,
},
data_drift: DriftMetrics {
drift_score: 0.02,
drift_detected: false,
detection_method: "PSI".to_string(),
threshold: 0.1,
drifting_features: Vec::new(),
},
model_drift: DriftMetrics {
drift_score: 0.01,
drift_detected: false,
detection_method: "performance_based".to_string(),
threshold: 0.05,
drifting_features: Vec::new(),
},
feature_importance_drift: Some(0.03),
}
}
fn evaluate_alerts(&mut self, metrics: &PerformanceMetrics) -> Result<()> {
let alert_configs = self.alert_configs.clone();
for config in &alert_configs {
if !config.enabled {
continue;
}
if let Some(last_alert_time) = self.last_alert_times.get(&config.name) {
if last_alert_time.elapsed() < Duration::from_secs(config.cooldown_seconds) {
continue;
}
}
let current_value = self.get_metric_value(metrics, &config.metric)?;
let threshold_exceeded = match config.operator {
ComparisonOperator::GreaterThan => current_value > config.threshold,
ComparisonOperator::GreaterThanOrEqual => current_value >= config.threshold,
ComparisonOperator::LessThan => current_value < config.threshold,
ComparisonOperator::LessThanOrEqual => current_value <= config.threshold,
ComparisonOperator::Equal => (current_value - config.threshold).abs() < 1e-10,
ComparisonOperator::NotEqual => (current_value - config.threshold).abs() >= 1e-10,
};
if threshold_exceeded {
let should_trigger = {
let counter = self.alert_counters.entry(config.name.clone()).or_insert(0);
*counter += 1;
*counter >= config.consecutive_evaluations
};
if should_trigger {
self.trigger_alert(config, current_value)?;
self.alert_counters.insert(config.name.clone(), 0); self.last_alert_times
.insert(config.name.clone(), Instant::now());
}
} else {
self.alert_counters.insert(config.name.clone(), 0);
}
}
Ok(())
}
fn get_metric_value(&self, metrics: &PerformanceMetrics, metric_name: &str) -> Result<f64> {
match metric_name {
"avg_latency_ms" => Ok(metrics.latency.avg_latency_ms),
"p95_latency_ms" => Ok(metrics.latency.p95_latency_ms),
"p99_latency_ms" => Ok(metrics.latency.p99_latency_ms),
"requests_per_second" => Ok(metrics.throughput.requests_per_second),
"error_rate" => Ok(metrics.error_metrics.error_rate),
"cpu_utilization" => Ok(metrics.resource_utilization.cpu_utilization),
"memory_utilization" => Ok(metrics.resource_utilization.memory_utilization),
"drift_score" => Ok(metrics.quality_metrics.data_drift.drift_score),
"avg_confidence" => Ok(metrics.quality_metrics.confidence_scores.avg_confidence),
_ => Err(Error::InvalidInput(format!(
"Unknown metric: {}",
metric_name
))),
}
}
fn trigger_alert(&mut self, config: &AlertConfig, current_value: f64) -> Result<()> {
let alert_event = AlertEvent {
alert_config: config.clone(),
current_value,
threshold_value: config.threshold,
message: format!(
"Alert '{}': {} {} {} (current: {:.4})",
config.name,
config.metric,
self.operator_to_string(&config.operator),
config.threshold,
current_value
),
triggered_at: chrono::Utc::now(),
model_name: self.model_metadata.name.clone(),
model_version: self.model_metadata.version.clone(),
context: HashMap::new(),
};
self.alert_events.push_back(alert_event.clone());
while self.alert_events.len() > 100 {
self.alert_events.pop_front();
}
log::warn!("Alert triggered: {}", alert_event.message);
Ok(())
}
fn operator_to_string(&self, operator: &ComparisonOperator) -> &'static str {
match operator {
ComparisonOperator::GreaterThan => ">",
ComparisonOperator::GreaterThanOrEqual => ">=",
ComparisonOperator::LessThan => "<",
ComparisonOperator::LessThanOrEqual => "<=",
ComparisonOperator::Equal => "==",
ComparisonOperator::NotEqual => "!=",
}
}
pub fn get_recent_metrics(&self, limit: usize) -> Vec<PerformanceMetrics> {
self.metrics_history
.iter()
.rev()
.take(limit)
.cloned()
.collect()
}
pub fn get_recent_alerts(&self, limit: usize) -> Vec<AlertEvent> {
self.alert_events
.iter()
.rev()
.take(limit)
.cloned()
.collect()
}
pub fn get_alert_configs(&self) -> &[AlertConfig] {
&self.alert_configs
}
pub fn get_metrics_summary(&self, window_minutes: usize) -> Option<MetricsSummary> {
let cutoff = chrono::Utc::now() - chrono::Duration::minutes(window_minutes as i64);
let recent_metrics: Vec<_> = self
.metrics_history
.iter()
.filter(|m| m.timestamp > cutoff)
.collect();
if recent_metrics.is_empty() {
return None;
}
let avg_latency = recent_metrics
.iter()
.map(|m| m.latency.avg_latency_ms)
.sum::<f64>()
/ recent_metrics.len() as f64;
let avg_throughput = recent_metrics
.iter()
.map(|m| m.throughput.requests_per_second)
.sum::<f64>()
/ recent_metrics.len() as f64;
let avg_error_rate = recent_metrics
.iter()
.map(|m| m.error_metrics.error_rate)
.sum::<f64>()
/ recent_metrics.len() as f64;
let avg_cpu = recent_metrics
.iter()
.map(|m| m.resource_utilization.cpu_utilization)
.sum::<f64>()
/ recent_metrics.len() as f64;
Some(MetricsSummary {
window_minutes,
avg_latency_ms: avg_latency,
avg_throughput,
avg_error_rate,
avg_cpu_utilization: avg_cpu,
total_requests: recent_metrics
.iter()
.map(|m| m.throughput.total_requests)
.sum(),
alert_count: self
.alert_events
.iter()
.filter(|e| e.triggered_at > cutoff)
.count(),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSummary {
pub window_minutes: usize,
pub avg_latency_ms: f64,
pub avg_throughput: f64,
pub avg_error_rate: f64,
pub avg_cpu_utilization: f64,
pub total_requests: u64,
pub alert_count: usize,
}
pub trait MetricsCollector {
fn collect_system_metrics(&self) -> Result<SystemMetrics>;
fn collect_model_metrics(&self, model_name: &str) -> Result<ModelSpecificMetrics>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemMetrics {
pub cpu_usage: f64,
pub memory_usage: u64,
pub memory_available: u64,
pub disk_usage: f64,
pub network_bytes_sent: u64,
pub network_bytes_received: u64,
pub load_average: f64,
pub process_count: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelSpecificMetrics {
pub model_memory_usage: u64,
pub model_init_time_ms: u64,
pub cache_hit_rate: f64,
pub feature_processing_time_ms: u64,
pub prediction_time_ms: u64,
}
pub struct DefaultMetricsCollector;
impl MetricsCollector for DefaultMetricsCollector {
fn collect_system_metrics(&self) -> Result<SystemMetrics> {
Ok(SystemMetrics {
cpu_usage: 0.45, memory_usage: 2_147_483_648, memory_available: 6_442_450_944, disk_usage: 0.75, network_bytes_sent: 1_048_576, network_bytes_received: 2_097_152, load_average: 1.5, process_count: 150, })
}
fn collect_model_metrics(&self, _model_name: &str) -> Result<ModelSpecificMetrics> {
Ok(ModelSpecificMetrics {
model_memory_usage: 536_870_912, model_init_time_ms: 2500, cache_hit_rate: 0.85, feature_processing_time_ms: 5, prediction_time_ms: 15, })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ml::serving::ModelMetadata;
fn create_test_metadata() -> ModelMetadata {
ModelMetadata {
name: "test_model".to_string(),
version: "1.0.0".to_string(),
model_type: "classification".to_string(),
feature_names: vec!["feature1".to_string(), "feature2".to_string()],
target_name: Some("target".to_string()),
description: "Test model".to_string(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
metrics: HashMap::new(),
metadata: HashMap::new(),
}
}
fn create_test_deployment_metrics() -> DeploymentMetrics {
use crate::ml::serving::deployment::DeploymentStatus;
crate::ml::serving::deployment::DeploymentMetrics {
status: DeploymentStatus::Running,
active_instances: 2,
cpu_utilization: 0.6,
memory_utilization: 0.7,
request_rate: 50.0,
avg_response_time_ms: 120.0,
error_rate: 0.02,
total_requests: 1000,
successful_requests: 980,
failed_requests: 20,
last_health_check: chrono::Utc::now(),
started_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
}
}
#[test]
fn test_model_monitor_creation() {
let metadata = create_test_metadata();
let monitor = ModelMonitor::new(metadata);
assert_eq!(monitor.model_metadata.name, "test_model");
assert_eq!(monitor.alert_configs.len(), 0);
assert_eq!(monitor.metrics_history.len(), 0);
}
#[test]
fn test_alert_config() {
let config = AlertConfig {
name: "high_latency".to_string(),
description: "Alert when latency is too high".to_string(),
metric: "avg_latency_ms".to_string(),
threshold: 200.0,
operator: ComparisonOperator::GreaterThan,
severity: AlertSeverity::Warning,
evaluation_window_seconds: 300,
consecutive_evaluations: 3,
cooldown_seconds: 600,
enabled: true,
};
assert_eq!(config.name, "high_latency");
assert_eq!(config.threshold, 200.0);
assert_eq!(config.severity, AlertSeverity::Warning);
}
#[test]
fn test_metrics_collector() {
let collector = DefaultMetricsCollector;
let system_metrics = collector
.collect_system_metrics()
.expect("operation should succeed");
assert!(system_metrics.cpu_usage >= 0.0 && system_metrics.cpu_usage <= 1.0);
let model_metrics = collector
.collect_model_metrics("test_model")
.expect("operation should succeed");
assert!(model_metrics.model_memory_usage > 0);
}
#[test]
fn test_performance_metrics() {
let metadata = create_test_metadata();
let mut monitor = ModelMonitor::new(metadata);
let deployment_metrics = create_test_deployment_metrics();
monitor.collection_interval = Duration::from_secs(0);
monitor
.collect_metrics(&deployment_metrics)
.expect("operation should succeed");
assert_eq!(monitor.metrics_history.len(), 1);
let metrics = &monitor.metrics_history[0];
assert_eq!(metrics.model_name, "test_model");
assert!(metrics.latency.avg_latency_ms > 0.0);
assert!(metrics.throughput.requests_per_second > 0.0);
}
}