use super::{GpuMetrics, MemoryMetrics, PerformanceMetrics, SynthesisMetrics, SystemMetrics};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::{mpsc, watch, RwLock};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitorConfig {
pub interval: Duration,
pub enabled: bool,
pub thresholds: AlertThresholds,
pub alerts: AlertConfig,
pub targets: Vec<MonitorTarget>,
pub retention_duration: Duration,
pub auto_recovery: AutoRecoveryConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertThresholds {
pub cpu_usage_percent: Option<f64>,
pub memory_usage_percent: Option<f64>,
pub gpu_utilization_percent: Option<f64>,
pub gpu_memory_percent: Option<f64>,
pub min_real_time_factor: Option<f64>,
pub max_synthesis_time_ms: Option<f64>,
pub max_queue_depth: Option<usize>,
pub min_success_rate_percent: Option<f64>,
pub max_error_rate_percent: Option<f64>,
pub max_gpu_temperature: Option<f64>,
pub max_disk_usage_percent: Option<f64>,
pub max_network_bps: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertConfig {
pub console_logging: bool,
pub file_logging: Option<std::path::PathBuf>,
pub email_notifications: Option<EmailConfig>,
pub webhook_notifications: Option<WebhookConfig>,
pub cooldown_duration: Duration,
pub max_alerts_per_hour: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailConfig {
pub smtp_server: String,
pub smtp_port: u16,
pub username: String,
pub password: String,
pub from_email: String,
pub to_emails: Vec<String>,
pub use_tls: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub url: String,
pub method: String,
pub headers: HashMap<String, String>,
pub timeout_seconds: u64,
pub retry_attempts: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitorTarget {
pub name: String,
pub target_type: MonitorTargetType,
pub enabled: bool,
pub custom_thresholds: Option<AlertThresholds>,
pub severity: AlertSeverity,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum MonitorTargetType {
SystemCpu,
SystemMemory,
Gpu,
SynthesisPerformance,
QueueDepth,
ErrorRate,
DiskIo,
NetworkIo,
Custom(String),
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum AlertSeverity {
Info,
Warning,
Critical,
Emergency,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutoRecoveryConfig {
pub enabled: bool,
pub actions: Vec<RecoveryAction>,
pub max_attempts_per_hour: usize,
pub retry_delay: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RecoveryAction {
ReduceBatchSize { min_size: usize },
ClearCaches,
RestartWorkers,
ReduceParallelism { min_threads: usize },
EnableMemoryOptimization,
ReduceQuality,
PauseProcessing { duration: Duration },
CustomCommand { command: String, args: Vec<String> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceAlert {
pub id: String,
pub timestamp: u64,
pub severity: AlertSeverity,
pub target: MonitorTargetType,
pub message: String,
pub current_value: f64,
pub threshold_value: f64,
pub metric_name: String,
pub context: HashMap<String, String>,
pub resolved: bool,
pub resolved_at: Option<u64>,
}
#[derive(Debug)]
pub struct AlertManager {
config: AlertConfig,
active_alerts: Arc<RwLock<HashMap<String, PerformanceAlert>>>,
alert_history: Arc<RwLock<Vec<PerformanceAlert>>>,
alert_cooldowns: Arc<RwLock<HashMap<String, Instant>>>,
hourly_alert_count: Arc<RwLock<usize>>,
last_hour_reset: Arc<RwLock<Instant>>,
}
pub struct PerformanceMonitor {
config: MonitorConfig,
alert_manager: AlertManager,
current_metrics: Arc<RwLock<Option<PerformanceMetrics>>>,
is_running: Arc<RwLock<bool>>,
metrics_history: Arc<RwLock<Vec<PerformanceMetrics>>>,
alert_sender: mpsc::UnboundedSender<PerformanceAlert>,
alert_receiver: Arc<RwLock<Option<mpsc::UnboundedReceiver<PerformanceAlert>>>>,
shutdown_sender: Option<watch::Sender<bool>>,
}
impl PerformanceMonitor {
pub fn new(config: MonitorConfig) -> Self {
let (alert_sender, alert_receiver) = mpsc::unbounded_channel();
let (shutdown_sender, _) = watch::channel(false);
Self {
alert_manager: AlertManager::new(config.alerts.clone()),
config,
current_metrics: Arc::new(RwLock::new(None)),
is_running: Arc::new(RwLock::new(false)),
metrics_history: Arc::new(RwLock::new(Vec::new())),
alert_sender,
alert_receiver: Arc::new(RwLock::new(Some(alert_receiver))),
shutdown_sender: Some(shutdown_sender),
}
}
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut is_running = self.is_running.write().await;
if *is_running {
return Ok(());
}
*is_running = true;
drop(is_running);
tracing::info!(
"Starting performance monitor with interval: {:?}",
self.config.interval
);
self.start_alert_processor().await?;
self.start_monitoring_loop().await?;
Ok(())
}
pub async fn stop(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut is_running = self.is_running.write().await;
if !*is_running {
return Ok(());
}
*is_running = false;
if let Some(sender) = &self.shutdown_sender {
let _ = sender.send(true);
}
tracing::info!("Stopped performance monitor");
Ok(())
}
pub async fn update_metrics(&self, metrics: PerformanceMetrics) {
let mut current = self.current_metrics.write().await;
*current = Some(metrics.clone());
drop(current);
let mut history = self.metrics_history.write().await;
history.push(metrics.clone());
let max_history =
(self.config.retention_duration.as_secs() / self.config.interval.as_secs()) as usize;
if history.len() > max_history {
history.remove(0);
}
drop(history);
self.check_alerts(&metrics).await;
}
async fn check_alerts(&self, metrics: &PerformanceMetrics) {
for target in &self.config.targets {
if !target.enabled {
continue;
}
let thresholds = target
.custom_thresholds
.as_ref()
.unwrap_or(&self.config.thresholds);
if let Some(alert) = self.check_target_alerts(target, metrics, thresholds).await {
let _ = self.alert_sender.send(alert);
}
}
}
async fn check_target_alerts(
&self,
target: &MonitorTarget,
metrics: &PerformanceMetrics,
thresholds: &AlertThresholds,
) -> Option<PerformanceAlert> {
match target.target_type {
MonitorTargetType::SystemCpu => {
if let Some(threshold) = thresholds.cpu_usage_percent {
if metrics.system.cpu_usage > threshold {
return Some(
self.create_alert(
target,
"High CPU usage detected",
metrics.system.cpu_usage,
threshold,
"cpu_usage_percent",
)
.await,
);
}
}
}
MonitorTargetType::SystemMemory => {
let memory_usage_percent = (metrics.system.memory_used as f64
/ (metrics.system.memory_used + metrics.system.memory_available) as f64)
* 100.0;
if let Some(threshold) = thresholds.memory_usage_percent {
if memory_usage_percent > threshold {
return Some(
self.create_alert(
target,
"High memory usage detected",
memory_usage_percent,
threshold,
"memory_usage_percent",
)
.await,
);
}
}
}
MonitorTargetType::Gpu => {
if let Some(ref gpu_metrics) = metrics.gpu {
if let Some(threshold) = thresholds.gpu_utilization_percent {
if gpu_metrics.utilization > threshold {
return Some(
self.create_alert(
target,
"High GPU utilization detected",
gpu_metrics.utilization,
threshold,
"gpu_utilization_percent",
)
.await,
);
}
}
let gpu_memory_percent =
(gpu_metrics.memory_used as f64 / gpu_metrics.memory_total as f64) * 100.0;
if let Some(threshold) = thresholds.gpu_memory_percent {
if gpu_memory_percent > threshold {
return Some(
self.create_alert(
target,
"High GPU memory usage detected",
gpu_memory_percent,
threshold,
"gpu_memory_percent",
)
.await,
);
}
}
if let Some(threshold) = thresholds.max_gpu_temperature {
if gpu_metrics.temperature > threshold {
return Some(
self.create_alert(
target,
"High GPU temperature detected",
gpu_metrics.temperature,
threshold,
"gpu_temperature",
)
.await,
);
}
}
}
}
MonitorTargetType::SynthesisPerformance => {
if let Some(threshold) = thresholds.min_real_time_factor {
if metrics.synthesis.real_time_factor < threshold {
return Some(
self.create_alert(
target,
"Poor synthesis performance detected",
metrics.synthesis.real_time_factor,
threshold,
"real_time_factor",
)
.await,
);
}
}
if let Some(threshold) = thresholds.max_synthesis_time_ms {
if metrics.synthesis.avg_synthesis_time_ms > threshold {
return Some(
self.create_alert(
target,
"High synthesis time detected",
metrics.synthesis.avg_synthesis_time_ms,
threshold,
"synthesis_time_ms",
)
.await,
);
}
}
}
MonitorTargetType::QueueDepth => {
if let Some(threshold) = thresholds.max_queue_depth {
if metrics.synthesis.queue_depth > threshold {
return Some(
self.create_alert(
target,
"High queue depth detected",
metrics.synthesis.queue_depth as f64,
threshold as f64,
"queue_depth",
)
.await,
);
}
}
}
MonitorTargetType::ErrorRate => {
let error_rate = if metrics.synthesis.total_operations > 0 {
(metrics.synthesis.failed_operations as f64
/ metrics.synthesis.total_operations as f64)
* 100.0
} else {
0.0
};
if let Some(threshold) = thresholds.max_error_rate_percent {
if error_rate > threshold {
return Some(
self.create_alert(
target,
"High error rate detected",
error_rate,
threshold,
"error_rate_percent",
)
.await,
);
}
}
}
MonitorTargetType::DiskIo => {
let total_disk_bps = metrics.system.disk_read_bps + metrics.system.disk_write_bps;
if let Some(threshold) = thresholds.max_network_bps {
if total_disk_bps > threshold {
return Some(
self.create_alert(
target,
"High disk I/O detected",
total_disk_bps as f64,
threshold as f64,
"disk_io_bps",
)
.await,
);
}
}
}
MonitorTargetType::NetworkIo => {
if let Some(threshold) = thresholds.max_network_bps {
if metrics.system.network_bps > threshold {
return Some(
self.create_alert(
target,
"High network I/O detected",
metrics.system.network_bps as f64,
threshold as f64,
"network_io_bps",
)
.await,
);
}
}
}
MonitorTargetType::Custom(_) => {
}
}
None
}
async fn create_alert(
&self,
target: &MonitorTarget,
message: &str,
current_value: f64,
threshold_value: f64,
metric_name: &str,
) -> PerformanceAlert {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let alert_id = format!(
"{}_{}_{}_{}",
target.name,
metric_name,
timestamp,
fastrand::u32(..)
);
let mut context = HashMap::new();
context.insert("target_name".to_string(), target.name.clone());
context.insert("metric_name".to_string(), metric_name.to_string());
context.insert("current_value".to_string(), current_value.to_string());
context.insert("threshold_value".to_string(), threshold_value.to_string());
PerformanceAlert {
id: alert_id,
timestamp,
severity: target.severity.clone(),
target: target.target_type.clone(),
message: message.to_string(),
current_value,
threshold_value,
metric_name: metric_name.to_string(),
context,
resolved: false,
resolved_at: None,
}
}
async fn start_alert_processor(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut receiver = self.alert_receiver.write().await;
if let Some(rx) = receiver.take() {
let alert_manager = self.alert_manager.clone();
let auto_recovery = self.config.auto_recovery.clone();
tokio::spawn(async move {
let mut rx = rx;
while let Some(alert) = rx.recv().await {
alert_manager.process_alert(alert.clone()).await;
if auto_recovery.enabled {
Self::attempt_auto_recovery(&alert, &auto_recovery).await;
}
}
});
}
Ok(())
}
async fn start_monitoring_loop(&self) -> Result<(), Box<dyn std::error::Error>> {
let is_running = self.is_running.clone();
let interval = self.config.interval;
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
loop {
interval_timer.tick().await;
let running = is_running.read().await;
if !*running {
break;
}
drop(running);
tracing::debug!("Performance monitoring tick");
}
});
Ok(())
}
async fn attempt_auto_recovery(alert: &PerformanceAlert, config: &AutoRecoveryConfig) {
for action in &config.actions {
match action {
RecoveryAction::ReduceBatchSize { min_size } => {
tracing::info!("Auto-recovery: Reducing batch size (min: {})", min_size);
}
RecoveryAction::ClearCaches => {
tracing::info!("Auto-recovery: Clearing caches");
}
RecoveryAction::RestartWorkers => {
tracing::info!("Auto-recovery: Restarting worker threads");
}
RecoveryAction::ReduceParallelism { min_threads } => {
tracing::info!("Auto-recovery: Reducing parallelism (min: {})", min_threads);
}
RecoveryAction::EnableMemoryOptimization => {
tracing::info!("Auto-recovery: Enabling memory optimization");
}
RecoveryAction::ReduceQuality => {
tracing::info!("Auto-recovery: Reducing synthesis quality");
}
RecoveryAction::PauseProcessing { duration } => {
tracing::info!("Auto-recovery: Pausing processing for {:?}", duration);
tokio::time::sleep(*duration).await;
}
RecoveryAction::CustomCommand { command, args } => {
tracing::info!(
"Auto-recovery: Executing custom command: {} {:?}",
command,
args
);
}
}
tokio::time::sleep(config.retry_delay).await;
}
}
pub async fn is_running(&self) -> bool {
*self.is_running.read().await
}
pub async fn get_current_metrics(&self) -> Option<PerformanceMetrics> {
self.current_metrics.read().await.clone()
}
pub async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
self.alert_manager.get_active_alerts().await
}
pub async fn get_alert_history(&self, limit: Option<usize>) -> Vec<PerformanceAlert> {
self.alert_manager.get_alert_history(limit).await
}
pub async fn update_config(&mut self, config: MonitorConfig) {
self.config = config;
}
}
impl AlertManager {
pub fn new(config: AlertConfig) -> Self {
Self {
config,
active_alerts: Arc::new(RwLock::new(HashMap::new())),
alert_history: Arc::new(RwLock::new(Vec::new())),
alert_cooldowns: Arc::new(RwLock::new(HashMap::new())),
hourly_alert_count: Arc::new(RwLock::new(0)),
last_hour_reset: Arc::new(RwLock::new(Instant::now())),
}
}
async fn process_alert(&self, alert: PerformanceAlert) {
if self.is_in_cooldown(&alert).await {
return;
}
if !self.can_send_alert().await {
tracing::warn!(
"Alert rate limit exceeded, skipping alert: {}",
alert.message
);
return;
}
let mut active = self.active_alerts.write().await;
active.insert(alert.id.clone(), alert.clone());
drop(active);
let mut history = self.alert_history.write().await;
history.push(alert.clone());
drop(history);
self.set_cooldown(&alert).await;
self.send_notifications(&alert).await;
self.increment_hourly_count().await;
}
async fn is_in_cooldown(&self, alert: &PerformanceAlert) -> bool {
let cooldowns = self.alert_cooldowns.read().await;
if let Some(&last_sent) = cooldowns.get(&alert.metric_name) {
last_sent.elapsed() < self.config.cooldown_duration
} else {
false
}
}
async fn set_cooldown(&self, alert: &PerformanceAlert) {
let mut cooldowns = self.alert_cooldowns.write().await;
cooldowns.insert(alert.metric_name.clone(), Instant::now());
}
async fn can_send_alert(&self) -> bool {
let mut last_reset = self.last_hour_reset.write().await;
if last_reset.elapsed() >= Duration::from_secs(3600) {
*last_reset = Instant::now();
let mut count = self.hourly_alert_count.write().await;
*count = 0;
}
drop(last_reset);
let count = self.hourly_alert_count.read().await;
*count < self.config.max_alerts_per_hour
}
async fn increment_hourly_count(&self) {
let mut count = self.hourly_alert_count.write().await;
*count += 1;
}
async fn send_notifications(&self, alert: &PerformanceAlert) {
if self.config.console_logging {
match alert.severity {
AlertSeverity::Info => {
tracing::info!("ALERT [{}]: {}", alert.severity_string(), alert.message)
}
AlertSeverity::Warning => {
tracing::warn!("ALERT [{}]: {}", alert.severity_string(), alert.message)
}
AlertSeverity::Critical => {
tracing::error!("ALERT [{}]: {}", alert.severity_string(), alert.message)
}
AlertSeverity::Emergency => {
tracing::error!("ALERT [{}]: {}", alert.severity_string(), alert.message)
}
}
}
if let Some(ref log_path) = self.config.file_logging {
let log_entry = format!(
"{} [{}] {}: {} (current: {:.2}, threshold: {:.2})\n",
alert.timestamp,
alert.severity_string(),
alert.metric_name,
alert.message,
alert.current_value,
alert.threshold_value
);
if let Err(e) = tokio::fs::write(log_path, log_entry).await {
tracing::error!("Failed to write alert to log file: {}", e);
}
}
if let Some(ref email_config) = self.config.email_notifications {
if let Err(e) = self.send_email_alert(alert, email_config).await {
tracing::error!("Failed to send email alert: {}", e);
}
}
if let Some(ref webhook_config) = self.config.webhook_notifications {
if let Err(e) = self.send_webhook_alert(alert, webhook_config).await {
tracing::error!("Failed to send webhook alert: {}", e);
}
}
}
async fn send_email_alert(
&self,
alert: &PerformanceAlert,
config: &EmailConfig,
) -> Result<(), Box<dyn std::error::Error>> {
tracing::info!(
"Would send email alert to {:?}: {}",
config.to_emails,
alert.message
);
Ok(())
}
async fn send_webhook_alert(
&self,
alert: &PerformanceAlert,
config: &WebhookConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let payload = serde_json::to_value(alert)?;
for attempt in 0..=config.retry_attempts {
let mut request = match config.method.to_uppercase().as_str() {
"POST" => client.post(&config.url),
"PUT" => client.put(&config.url),
"PATCH" => client.patch(&config.url),
_ => client.get(&config.url),
};
for (key, value) in &config.headers {
request = request.header(key, value);
}
if matches!(
config.method.to_uppercase().as_str(),
"POST" | "PUT" | "PATCH"
) {
request = request.json(&payload);
}
let response = request
.timeout(Duration::from_secs(config.timeout_seconds))
.send()
.await;
match response {
Ok(resp) if resp.status().is_success() => {
tracing::info!("Webhook alert sent successfully to {}", config.url);
return Ok(());
}
Ok(resp) => {
tracing::warn!(
"Webhook alert failed with status {}: {}",
resp.status(),
config.url
);
}
Err(e) => {
tracing::warn!("Webhook alert attempt {} failed: {}", attempt + 1, e);
}
}
if attempt < config.retry_attempts {
tokio::time::sleep(Duration::from_secs(2_u64.pow(attempt as u32))).await;
}
}
Err("All webhook attempts failed".into())
}
async fn get_active_alerts(&self) -> Vec<PerformanceAlert> {
let active = self.active_alerts.read().await;
active.values().cloned().collect()
}
async fn get_alert_history(&self, limit: Option<usize>) -> Vec<PerformanceAlert> {
let history = self.alert_history.read().await;
if let Some(limit) = limit {
history.iter().rev().take(limit).cloned().collect()
} else {
history.clone()
}
}
pub async fn resolve_alert(&self, alert_id: &str) -> bool {
let mut active = self.active_alerts.write().await;
if let Some(mut alert) = active.remove(alert_id) {
alert.resolved = true;
alert.resolved_at = Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
);
let mut history = self.alert_history.write().await;
if let Some(hist_alert) = history.iter_mut().find(|a| a.id == alert_id) {
hist_alert.resolved = true;
hist_alert.resolved_at = alert.resolved_at;
}
true
} else {
false
}
}
}
impl Clone for AlertManager {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
active_alerts: self.active_alerts.clone(),
alert_history: self.alert_history.clone(),
alert_cooldowns: self.alert_cooldowns.clone(),
hourly_alert_count: self.hourly_alert_count.clone(),
last_hour_reset: self.last_hour_reset.clone(),
}
}
}
impl PerformanceAlert {
pub fn severity_string(&self) -> &'static str {
match self.severity {
AlertSeverity::Info => "INFO",
AlertSeverity::Warning => "WARNING",
AlertSeverity::Critical => "CRITICAL",
AlertSeverity::Emergency => "EMERGENCY",
}
}
}
impl Default for MonitorConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(10),
enabled: false,
thresholds: AlertThresholds::default(),
alerts: AlertConfig::default(),
targets: vec![
MonitorTarget {
name: "system_cpu".to_string(),
target_type: MonitorTargetType::SystemCpu,
enabled: true,
custom_thresholds: None,
severity: AlertSeverity::Warning,
},
MonitorTarget {
name: "system_memory".to_string(),
target_type: MonitorTargetType::SystemMemory,
enabled: true,
custom_thresholds: None,
severity: AlertSeverity::Warning,
},
MonitorTarget {
name: "synthesis_performance".to_string(),
target_type: MonitorTargetType::SynthesisPerformance,
enabled: true,
custom_thresholds: None,
severity: AlertSeverity::Critical,
},
],
retention_duration: Duration::from_secs(3600), auto_recovery: AutoRecoveryConfig::default(),
}
}
}
impl Default for AlertThresholds {
fn default() -> Self {
Self {
cpu_usage_percent: Some(80.0),
memory_usage_percent: Some(85.0),
gpu_utilization_percent: Some(95.0),
gpu_memory_percent: Some(90.0),
min_real_time_factor: Some(0.8),
max_synthesis_time_ms: Some(5000.0),
max_queue_depth: Some(20),
min_success_rate_percent: Some(90.0),
max_error_rate_percent: Some(10.0),
max_gpu_temperature: Some(85.0),
max_disk_usage_percent: Some(90.0),
max_network_bps: Some(1_000_000_000), }
}
}
impl Default for AlertConfig {
fn default() -> Self {
Self {
console_logging: true,
file_logging: None,
email_notifications: None,
webhook_notifications: None,
cooldown_duration: Duration::from_secs(300), max_alerts_per_hour: 20,
}
}
}
impl Default for AutoRecoveryConfig {
fn default() -> Self {
Self {
enabled: false,
actions: vec![
RecoveryAction::ReduceBatchSize { min_size: 8 },
RecoveryAction::ClearCaches,
RecoveryAction::ReduceParallelism { min_threads: 2 },
],
max_attempts_per_hour: 5,
retry_delay: Duration::from_secs(30),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_performance_monitor_creation() {
let config = MonitorConfig::default();
let monitor = PerformanceMonitor::new(config);
assert!(!monitor.is_running().await);
}
#[tokio::test]
async fn test_alert_creation() {
let config = MonitorConfig::default();
let monitor = PerformanceMonitor::new(config);
let target = MonitorTarget {
name: "test_cpu".to_string(),
target_type: MonitorTargetType::SystemCpu,
enabled: true,
custom_thresholds: None,
severity: AlertSeverity::Warning,
};
let alert = monitor
.create_alert(
&target,
"Test alert message",
85.0,
80.0,
"cpu_usage_percent",
)
.await;
assert_eq!(alert.severity, AlertSeverity::Warning);
assert_eq!(alert.current_value, 85.0);
assert_eq!(alert.threshold_value, 80.0);
assert!(!alert.resolved);
}
#[tokio::test]
async fn test_alert_manager() {
let config = AlertConfig::default();
let manager = AlertManager::new(config);
let mut alert = PerformanceAlert {
id: "test_alert".to_string(),
timestamp: 1234567890,
severity: AlertSeverity::Warning,
target: MonitorTargetType::SystemCpu,
message: "Test alert".to_string(),
current_value: 85.0,
threshold_value: 80.0,
metric_name: "cpu_usage".to_string(),
context: HashMap::new(),
resolved: false,
resolved_at: None,
};
manager.process_alert(alert.clone()).await;
let active = manager.get_active_alerts().await;
assert_eq!(active.len(), 1);
assert_eq!(active[0].id, "test_alert");
let resolved = manager.resolve_alert("test_alert").await;
assert!(resolved);
let active = manager.get_active_alerts().await;
assert_eq!(active.len(), 0);
}
#[test]
fn test_alert_thresholds_default() {
let thresholds = AlertThresholds::default();
assert_eq!(thresholds.cpu_usage_percent, Some(80.0));
assert_eq!(thresholds.memory_usage_percent, Some(85.0));
assert_eq!(thresholds.min_real_time_factor, Some(0.8));
}
#[test]
fn test_recovery_actions() {
let action = RecoveryAction::ReduceBatchSize { min_size: 8 };
match action {
RecoveryAction::ReduceBatchSize { min_size } => {
assert_eq!(min_size, 8);
}
_ => panic!("Wrong action type"),
}
}
#[tokio::test]
async fn test_monitor_start_stop() {
let config = MonitorConfig::default();
let mut monitor = PerformanceMonitor::new(config);
assert!(!monitor.is_running().await);
monitor.start().await.unwrap();
assert!(monitor.is_running().await);
monitor.stop().await.unwrap();
assert!(!monitor.is_running().await);
}
}