1use crate::quality::{ArtifactDetector, ArtifactType, DetectedArtifacts, QualityAssessment};
7use crate::{Error, Result};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tracing::{debug, error, info, warn};
15
16#[derive(Debug)]
18pub struct QualityMonitor {
19 config: MonitorConfig,
21 collector: Arc<Mutex<MetricsCollector>>,
23 alerter: Arc<Mutex<AlertSystem>>,
25 monitor_task: Option<JoinHandle<()>>,
27 quality_sender: Option<mpsc::UnboundedSender<QualityEvent>>,
29 performance_tracker: Arc<Mutex<PerformanceTracker>>,
31}
32
33#[derive(Debug, Clone)]
35pub struct MonitorConfig {
36 pub monitoring_interval_ms: u64,
38 pub max_history_length: usize,
40 pub quality_alert_threshold: f32,
42 pub latency_alert_threshold_ms: u64,
44 pub enable_artifact_alerts: bool,
46 pub enable_performance_tracking: bool,
48 pub enable_trend_analysis: bool,
50 pub report_interval_seconds: u64,
52}
53
54impl Default for MonitorConfig {
55 fn default() -> Self {
56 Self {
57 monitoring_interval_ms: 100,
58 max_history_length: 10000,
59 quality_alert_threshold: 0.7,
60 latency_alert_threshold_ms: 100,
61 enable_artifact_alerts: true,
62 enable_performance_tracking: true,
63 enable_trend_analysis: true,
64 report_interval_seconds: 300, }
66 }
67}
68
69#[derive(Debug, Clone)]
71pub enum QualityEvent {
72 QualityUpdate {
74 timestamp: Instant,
76 session_id: String,
78 overall_quality: f32,
80 artifacts: Option<DetectedArtifacts>,
82 processing_latency_ms: u64,
84 metadata: HashMap<String, f32>,
86 },
87 PerformanceUpdate {
89 timestamp: Instant,
91 session_id: String,
93 cpu_usage_percent: f32,
95 memory_usage_mb: f64,
97 throughput_samples_per_sec: f64,
99 queue_length: usize,
101 },
102 QualityAlert {
104 alert: QualityAlert,
106 },
107 SystemStatus {
109 timestamp: Instant,
111 status: SystemStatus,
113 },
114}
115
116#[derive(Debug, Clone)]
118pub struct QualityAlert {
119 pub timestamp: Instant,
121 pub session_id: String,
123 pub alert_type: AlertType,
125 pub severity: AlertSeverity,
127 pub message: String,
129 pub suggested_action: Option<String>,
131 pub metadata: HashMap<String, String>,
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
137pub enum AlertType {
138 QualityDegradation,
140 HighLatency,
142 ArtifactsDetected,
144 PerformanceIssue,
146 SystemOverload,
148 MemoryPressure,
150 SessionFailure,
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
156pub enum AlertSeverity {
157 Info,
159 Warning,
161 Critical,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub enum SystemStatus {
168 Healthy,
170 Degraded,
172 Overloaded,
174 Failing,
176}
177
178#[derive(Debug)]
180pub struct MetricsCollector {
181 quality_history: VecDeque<QualityDataPoint>,
183 session_metrics: HashMap<String, SessionMetrics>,
185 aggregate_stats: AggregateStats,
187 trend_analyzer: TrendAnalyzer,
189}
190
191#[derive(Debug, Clone)]
193pub struct QualityDataPoint {
194 pub timestamp: Instant,
196 pub session_id: String,
198 pub overall_quality: f32,
200 pub artifact_score: f32,
202 pub processing_latency_ms: u64,
204 pub artifacts_by_type: HashMap<ArtifactType, f32>,
206 pub metadata: HashMap<String, f32>,
208}
209
210#[derive(Debug, Clone)]
212pub struct SessionMetrics {
213 pub session_id: String,
215 pub start_time: Instant,
217 pub samples_processed: u64,
219 pub average_quality: f32,
221 pub quality_trend: VecDeque<f32>,
223 pub artifact_counts: HashMap<ArtifactType, u64>,
225 pub performance_metrics: PerformanceMetrics,
227 pub alerts: Vec<QualityAlert>,
229}
230
231#[derive(Debug, Clone, Default)]
233pub struct PerformanceMetrics {
234 pub average_latency_ms: f64,
236 pub peak_latency_ms: u64,
238 pub throughput_samples_per_sec: f64,
240 pub cpu_usage_percent: f32,
242 pub memory_usage_mb: f64,
244}
245
246#[derive(Debug, Clone, Default)]
248pub struct AggregateStats {
249 pub total_points: u64,
251 pub overall_avg_quality: f32,
253 pub quality_variance: f32,
255 pub average_latency_ms: f64,
257 pub throughput_samples_per_sec: f64,
259}
260
261#[derive(Debug, Default)]
263pub struct TrendAnalyzer {
264 pub quality_trend: TrendDirection,
266 pub latency_trend: TrendDirection,
268 pub throughput_trend: TrendDirection,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
274pub enum TrendDirection {
275 Unknown,
277 Improving,
279 Stable,
281 Degrading,
283}
284
285impl Default for TrendDirection {
286 fn default() -> Self {
287 TrendDirection::Unknown
288 }
289}
290
291#[derive(Debug)]
293pub struct AlertSystem {
294 config: AlertConfig,
295 active_alerts: HashMap<String, Vec<QualityAlert>>,
296 alert_history: VecDeque<QualityAlert>,
297 handlers: Vec<Box<dyn AlertHandler>>,
298}
299
300#[derive(Debug, Clone)]
302pub struct AlertConfig {
303 pub alert_cooldown_seconds: u64,
305 pub max_alert_history: usize,
307 pub enable_email_alerts: bool,
309 pub enable_slack_alerts: bool,
311 pub enable_webhook_alerts: bool,
313}
314
315impl Default for AlertConfig {
316 fn default() -> Self {
317 Self {
318 alert_cooldown_seconds: 60,
319 max_alert_history: 1000,
320 enable_email_alerts: false,
321 enable_slack_alerts: false,
322 enable_webhook_alerts: false,
323 }
324 }
325}
326
327pub trait AlertHandler: Send + Sync + std::fmt::Debug {
329 fn handle_alert(&mut self, alert: &QualityAlert) -> Result<()>;
331 fn name(&self) -> &str;
333}
334
335#[derive(Debug)]
337pub struct LoggingAlertHandler;
338
339impl AlertHandler for LoggingAlertHandler {
340 fn handle_alert(&mut self, alert: &QualityAlert) -> Result<()> {
341 match alert.severity {
342 AlertSeverity::Info => {
343 info!("[ALERT] {}: {}", alert.alert_type.as_str(), alert.message)
344 }
345 AlertSeverity::Warning => {
346 warn!("[ALERT] {}: {}", alert.alert_type.as_str(), alert.message)
347 }
348 AlertSeverity::Critical => {
349 error!("[ALERT] {}: {}", alert.alert_type.as_str(), alert.message)
350 }
351 }
352 Ok(())
353 }
354
355 fn name(&self) -> &str {
356 "logging"
357 }
358}
359
360#[derive(Debug, Default)]
362pub struct PerformanceTracker {
363 pub start_time: Option<Instant>,
365 pub active_sessions: usize,
367 pub total_sessions: u64,
369 pub system_resources: SystemResources,
371 pub performance_trends: PerformanceTrends,
373}
374
375#[derive(Debug, Default)]
377pub struct PerformanceTrends {
378 pub cpu_trend: VecDeque<f32>,
380 pub memory_trend: VecDeque<f64>,
382 pub throughput_trend: VecDeque<f64>,
384}
385
386#[derive(Debug, Default, Clone, Serialize, Deserialize)]
388pub struct SystemResources {
389 pub cpu_usage_percent: f32,
391 pub memory_usage_mb: f64,
393 pub gpu_usage_percent: Option<f32>,
395 pub disk_io_mb_per_sec: f64,
397 pub network_io_mb_per_sec: f64,
399}
400
401#[derive(Debug)]
403pub struct QualityDashboard {
404 pub current_sessions: HashMap<String, SessionDashboard>,
406 pub system_overview: SystemOverview,
408 pub trends: DashboardTrends,
410 pub recent_alerts: VecDeque<QualityAlert>,
412}
413
414#[derive(Debug, Clone)]
416pub struct SessionDashboard {
417 pub session_id: String,
419 pub start_time: Instant,
421 pub current_quality: f32,
423 pub quality_trend: Vec<f32>,
425 pub current_latency_ms: u64,
427 pub throughput_samples_per_sec: f64,
429 pub active_artifacts: Vec<ArtifactType>,
431 pub alert_count: usize,
433}
434
435#[derive(Debug, Clone, Default)]
437pub struct SystemOverview {
438 pub active_sessions: usize,
440 pub total_sessions_today: u64,
442 pub average_quality: f32,
444 pub system_load_percent: f32,
446 pub memory_usage_mb: f64,
448 pub uptime_hours: f64,
450 pub alerts_last_hour: usize,
452}
453
454#[derive(Debug, Default)]
456pub struct DashboardTrends {
457 pub quality_over_time: Vec<(Instant, f32)>,
459 pub latency_over_time: Vec<(Instant, f64)>,
461 pub throughput_over_time: Vec<(Instant, f64)>,
463 pub resource_usage_over_time: Vec<(Instant, SystemResources)>,
465}
466
467impl Default for QualityMonitor {
469 fn default() -> Self {
470 Self::new()
471 }
472}
473
474impl QualityMonitor {
476 pub fn new() -> Self {
478 Self::with_config(MonitorConfig::default())
479 }
480
481 pub fn with_config(config: MonitorConfig) -> Self {
483 let collector = Arc::new(Mutex::new(MetricsCollector::new(config.max_history_length)));
484 let alerter = Arc::new(Mutex::new(AlertSystem::new()));
485 let performance_tracker = Arc::new(Mutex::new(PerformanceTracker::default()));
486
487 Self {
488 config,
489 collector,
490 alerter,
491 monitor_task: None,
492 quality_sender: None,
493 performance_tracker,
494 }
495 }
496
497 pub async fn start_monitoring(&mut self) -> Result<()> {
499 if self.monitor_task.is_some() {
500 return Err(Error::validation("Monitoring already started".to_string()));
501 }
502
503 let (sender, mut receiver) = mpsc::unbounded_channel();
504 self.quality_sender = Some(sender);
505
506 let collector = Arc::clone(&self.collector);
507 let alerter = Arc::clone(&self.alerter);
508 let performance_tracker = Arc::clone(&self.performance_tracker);
509 let config = self.config.clone();
510
511 let monitor_task = tokio::spawn(async move {
512 let mut report_timer =
513 tokio::time::interval(Duration::from_secs(config.report_interval_seconds));
514
515 loop {
516 tokio::select! {
517 event = receiver.recv() => {
519 match event {
520 Some(quality_event) => {
521 Self::process_quality_event(
522 quality_event,
523 &collector,
524 &alerter,
525 &performance_tracker,
526 &config,
527 ).await;
528 }
529 None => {
530 info!("Quality monitoring channel closed");
531 break;
532 }
533 }
534 }
535
536 _ = report_timer.tick() => {
538 if let Err(e) = Self::generate_periodic_report(&collector, &performance_tracker).await {
539 error!("Failed to generate periodic report: {}", e);
540 }
541 }
542 }
543 }
544 });
545
546 self.monitor_task = Some(monitor_task);
547
548 info!(
549 "Quality monitoring started with interval {}ms",
550 self.config.monitoring_interval_ms
551 );
552 Ok(())
553 }
554
555 pub async fn stop_monitoring(&mut self) -> Result<()> {
557 if let Some(task) = self.monitor_task.take() {
558 task.abort();
559 info!("Quality monitoring stopped");
560 }
561
562 self.quality_sender = None;
563 Ok(())
564 }
565
566 pub fn submit_quality_data(
568 &self,
569 session_id: String,
570 overall_quality: f32,
571 artifacts: Option<DetectedArtifacts>,
572 processing_latency_ms: u64,
573 metadata: HashMap<String, f32>,
574 ) -> Result<()> {
575 if let Some(sender) = &self.quality_sender {
576 let event = QualityEvent::QualityUpdate {
577 timestamp: Instant::now(),
578 session_id,
579 overall_quality,
580 artifacts,
581 processing_latency_ms,
582 metadata,
583 };
584
585 sender
586 .send(event)
587 .map_err(|e| Error::runtime(format!("Failed to submit quality data: {e}")))?;
588 }
589
590 Ok(())
591 }
592
593 pub fn submit_performance_data(
595 &self,
596 session_id: String,
597 cpu_usage_percent: f32,
598 memory_usage_mb: f64,
599 throughput_samples_per_sec: f64,
600 queue_length: usize,
601 ) -> Result<()> {
602 if let Some(sender) = &self.quality_sender {
603 let event = QualityEvent::PerformanceUpdate {
604 timestamp: Instant::now(),
605 session_id,
606 cpu_usage_percent,
607 memory_usage_mb,
608 throughput_samples_per_sec,
609 queue_length,
610 };
611
612 sender
613 .send(event)
614 .map_err(|e| Error::runtime(format!("Failed to submit performance data: {e}")))?;
615 }
616
617 Ok(())
618 }
619
620 pub async fn get_dashboard(&self) -> Result<QualityDashboard> {
622 let collector = self.collector.lock().expect("Collector lock poisoned");
623 let alerter = self.alerter.lock().expect("Alerter lock poisoned");
624 let performance_tracker = self
625 .performance_tracker
626 .lock()
627 .expect("Performance tracker lock poisoned");
628
629 let mut current_sessions = HashMap::new();
630
631 for (session_id, metrics) in &collector.session_metrics {
632 current_sessions.insert(
633 session_id.clone(),
634 SessionDashboard {
635 session_id: session_id.clone(),
636 start_time: metrics.start_time,
637 current_quality: metrics.quality_trend.back().copied().unwrap_or(0.0),
638 quality_trend: metrics.quality_trend.iter().copied().collect(),
639 current_latency_ms: metrics.performance_metrics.peak_latency_ms,
640 throughput_samples_per_sec: metrics
641 .performance_metrics
642 .throughput_samples_per_sec,
643 active_artifacts: metrics.artifact_counts.keys().copied().collect(),
644 alert_count: metrics.alerts.len(),
645 },
646 );
647 }
648
649 let system_overview = SystemOverview {
650 active_sessions: collector.session_metrics.len(), total_sessions_today: performance_tracker.total_sessions,
652 average_quality: collector.aggregate_stats.overall_avg_quality,
653 system_load_percent: performance_tracker.system_resources.cpu_usage_percent,
654 memory_usage_mb: performance_tracker.system_resources.memory_usage_mb,
655 uptime_hours: performance_tracker
656 .start_time
657 .map(|start| start.elapsed().as_secs_f64() / 3600.0)
658 .unwrap_or(0.0),
659 alerts_last_hour: alerter
660 .alert_history
661 .iter()
662 .filter(|alert| alert.timestamp.elapsed() < Duration::from_secs(3600))
663 .count(),
664 };
665
666 let trends = DashboardTrends {
667 quality_over_time: collector
668 .quality_history
669 .iter()
670 .map(|point| (point.timestamp, point.overall_quality))
671 .collect(),
672 latency_over_time: collector
673 .quality_history
674 .iter()
675 .map(|point| (point.timestamp, point.processing_latency_ms as f64))
676 .collect(),
677 throughput_over_time: {
678 let mut throughput_data = Vec::new();
680 let throughput_trends = &performance_tracker.performance_trends.throughput_trend;
681
682 if !throughput_trends.is_empty() && !collector.quality_history.is_empty() {
683 let trend_count = throughput_trends.len();
684 let history_count = collector.quality_history.len();
685
686 let step_size = if history_count >= trend_count {
688 history_count / trend_count
689 } else {
690 1
691 };
692
693 for (i, &throughput) in throughput_trends.iter().enumerate() {
694 let history_index = (i * step_size).min(history_count.saturating_sub(1));
695 if let Some(quality_point) = collector.quality_history.get(history_index) {
696 throughput_data.push((quality_point.timestamp, throughput));
697 }
698 }
699 }
700 throughput_data
701 },
702 resource_usage_over_time: {
703 let mut resource_data = Vec::new();
705 let cpu_trends = &performance_tracker.performance_trends.cpu_trend;
706 let memory_trends = &performance_tracker.performance_trends.memory_trend;
707
708 if let Some(start_time) = performance_tracker.start_time {
709 let trend_count = cpu_trends.len().min(memory_trends.len());
710
711 for i in 0..trend_count {
713 let timestamp = start_time + Duration::from_secs((i as u64) * 60); let cpu_usage = cpu_trends.get(i).copied().unwrap_or(0.0);
715 let memory_usage = memory_trends.get(i).copied().unwrap_or(0.0);
716
717 let resources = SystemResources {
718 cpu_usage_percent: cpu_usage,
719 memory_usage_mb: memory_usage,
720 gpu_usage_percent: performance_tracker
721 .system_resources
722 .gpu_usage_percent,
723 disk_io_mb_per_sec: performance_tracker
724 .system_resources
725 .disk_io_mb_per_sec,
726 network_io_mb_per_sec: performance_tracker
727 .system_resources
728 .network_io_mb_per_sec,
729 };
730 resource_data.push((timestamp, resources));
731 }
732 }
733 resource_data
734 },
735 };
736
737 let recent_alerts = alerter
738 .alert_history
739 .iter()
740 .rev()
741 .take(20)
742 .cloned()
743 .collect();
744
745 Ok(QualityDashboard {
746 current_sessions,
747 system_overview,
748 trends,
749 recent_alerts,
750 })
751 }
752
753 async fn process_quality_event(
756 event: QualityEvent,
757 collector: &Arc<Mutex<MetricsCollector>>,
758 alerter: &Arc<Mutex<AlertSystem>>,
759 performance_tracker: &Arc<Mutex<PerformanceTracker>>,
760 config: &MonitorConfig,
761 ) {
762 match event {
763 QualityEvent::QualityUpdate {
764 timestamp,
765 session_id,
766 overall_quality,
767 artifacts,
768 processing_latency_ms,
769 metadata,
770 } => {
771 let is_new_session = {
773 let mut collector_guard = collector.lock().expect("Collector lock poisoned");
774 let data_point = QualityDataPoint {
775 timestamp,
776 session_id: session_id.clone(),
777 overall_quality,
778 artifact_score: artifacts.as_ref().map(|a| a.overall_score).unwrap_or(0.0),
779 processing_latency_ms,
780 artifacts_by_type: artifacts
781 .as_ref()
782 .map(|a| a.artifact_types.clone())
783 .unwrap_or_default(),
784 metadata,
785 };
786 collector_guard.add_quality_data_point(data_point)
787 };
788
789 if is_new_session {
791 let mut tracker_guard = performance_tracker
792 .lock()
793 .expect("Performance tracker lock poisoned");
794 tracker_guard.active_sessions += 1;
795 tracker_guard.total_sessions += 1;
796 }
797
798 if overall_quality < config.quality_alert_threshold {
800 let alert = QualityAlert {
801 timestamp,
802 session_id: session_id.clone(),
803 alert_type: AlertType::QualityDegradation,
804 severity: if overall_quality < 0.2 {
805 AlertSeverity::Critical
806 } else {
807 AlertSeverity::Warning
808 },
809 message: format!("Quality degraded to {overall_quality:.2}"),
810 suggested_action: Some(
811 "Consider adjusting conversion parameters".to_string(),
812 ),
813 metadata: HashMap::new(),
814 };
815
816 let mut alerter_guard = alerter.lock().expect("Alerter lock poisoned");
817 alerter_guard.add_alert(alert);
818 }
819
820 if processing_latency_ms > config.latency_alert_threshold_ms {
822 let alert = QualityAlert {
823 timestamp,
824 session_id: session_id.clone(),
825 alert_type: AlertType::HighLatency,
826 severity: AlertSeverity::Warning,
827 message: format!("High processing latency: {processing_latency_ms}ms"),
828 suggested_action: Some(
829 "Check system resources and consider load balancing".to_string(),
830 ),
831 metadata: HashMap::new(),
832 };
833
834 let mut alerter_guard = alerter.lock().expect("Alerter lock poisoned");
835 alerter_guard.add_alert(alert);
836 }
837 }
838
839 QualityEvent::PerformanceUpdate {
840 timestamp: _,
841 session_id: _,
842 cpu_usage_percent,
843 memory_usage_mb,
844 throughput_samples_per_sec,
845 queue_length,
846 } => {
847 let mut tracker_guard = performance_tracker
848 .lock()
849 .expect("Performance tracker lock poisoned");
850 tracker_guard.update_performance_metrics(
851 cpu_usage_percent,
852 memory_usage_mb,
853 throughput_samples_per_sec,
854 queue_length,
855 );
856 }
857
858 QualityEvent::QualityAlert { .. } => {
859 }
861
862 QualityEvent::SystemStatus { .. } => {
863 }
865 }
866 }
867
868 async fn generate_periodic_report(
869 collector: &Arc<Mutex<MetricsCollector>>,
870 performance_tracker: &Arc<Mutex<PerformanceTracker>>,
871 ) -> Result<()> {
872 let (stats, performance_stats) = {
873 let collector_guard = collector.lock().expect("Collector lock poisoned");
874 let tracker_guard = performance_tracker
875 .lock()
876 .expect("Performance tracker lock poisoned");
877 (
878 collector_guard.aggregate_stats.clone(),
879 tracker_guard.system_resources.clone(),
880 )
881 };
882
883 info!("=== Quality Monitoring Report ===");
884 info!("Total data points: {}", stats.total_points);
885 info!("Average quality: {:.3}", stats.overall_avg_quality);
886 info!(
887 "System CPU usage: {:.1}%",
888 performance_stats.cpu_usage_percent
889 );
890 info!(
891 "System memory usage: {:.1} MB",
892 performance_stats.memory_usage_mb
893 );
894
895 Ok(())
896 }
897}
898
899impl MetricsCollector {
901 fn new(max_history_length: usize) -> Self {
902 Self {
903 quality_history: VecDeque::with_capacity(max_history_length),
904 session_metrics: HashMap::new(),
905 aggregate_stats: AggregateStats::default(),
906 trend_analyzer: TrendAnalyzer::default(),
907 }
908 }
909
910 fn add_quality_data_point(&mut self, data_point: QualityDataPoint) -> bool {
911 let session_id = data_point.session_id.clone();
912 let timestamp = data_point.timestamp;
913 let artifacts_by_type = data_point.artifacts_by_type.clone();
914 let overall_quality = data_point.overall_quality;
915
916 self.quality_history.push_back(data_point);
917 if self.quality_history.len() > self.quality_history.capacity() {
918 self.quality_history.pop_front();
919 }
920
921 let is_new_session = !self.session_metrics.contains_key(&session_id);
923 let session_metrics = self
924 .session_metrics
925 .entry(session_id.clone())
926 .or_insert_with(|| SessionMetrics {
927 session_id: session_id.clone(),
928 start_time: timestamp,
929 samples_processed: 0,
930 average_quality: overall_quality,
931 quality_trend: VecDeque::with_capacity(100),
932 artifact_counts: HashMap::new(),
933 performance_metrics: PerformanceMetrics::default(),
934 alerts: Vec::new(),
935 });
936
937 session_metrics.samples_processed += 1;
938 session_metrics.quality_trend.push_back(overall_quality);
939 if session_metrics.quality_trend.len() > 100 {
940 session_metrics.quality_trend.pop_front();
941 }
942
943 for artifact_type in artifacts_by_type.keys() {
945 *session_metrics
946 .artifact_counts
947 .entry(*artifact_type)
948 .or_insert(0) += 1;
949 }
950
951 self.update_aggregate_stats(overall_quality);
953
954 is_new_session
956 }
957
958 fn update_aggregate_stats(&mut self, overall_quality: f32) {
959 self.aggregate_stats.total_points += 1;
960
961 let n = self.aggregate_stats.total_points as f32;
963 self.aggregate_stats.overall_avg_quality =
964 (self.aggregate_stats.overall_avg_quality * (n - 1.0) + overall_quality) / n;
965 }
966}
967
968impl AlertSystem {
969 fn new() -> Self {
970 let handlers: Vec<Box<dyn AlertHandler>> = vec![Box::new(LoggingAlertHandler)];
971
972 Self {
973 config: AlertConfig::default(),
974 active_alerts: HashMap::new(),
975 alert_history: VecDeque::with_capacity(1000),
976 handlers,
977 }
978 }
979
980 fn add_alert(&mut self, alert: QualityAlert) {
981 let should_add = self
983 .alert_history
984 .iter()
985 .filter(|existing| {
986 existing.session_id == alert.session_id
987 && existing.alert_type.as_str() == alert.alert_type.as_str()
988 })
989 .all(|existing| {
990 existing.timestamp.elapsed()
991 > Duration::from_secs(self.config.alert_cooldown_seconds)
992 });
993
994 if should_add {
995 self.active_alerts
997 .entry(alert.session_id.clone())
998 .or_default()
999 .push(alert.clone());
1000
1001 self.alert_history.push_back(alert.clone());
1003 if self.alert_history.len() > self.config.max_alert_history {
1004 self.alert_history.pop_front();
1005 }
1006
1007 for handler in &mut self.handlers {
1009 if let Err(e) = handler.handle_alert(&alert) {
1010 error!("Alert handler '{}' failed: {}", handler.name(), e);
1011 }
1012 }
1013 }
1014 }
1015}
1016
1017impl PerformanceTracker {
1018 fn update_performance_metrics(
1019 &mut self,
1020 cpu_usage_percent: f32,
1021 memory_usage_mb: f64,
1022 throughput_samples_per_sec: f64,
1023 _queue_length: usize,
1024 ) {
1025 if self.start_time.is_none() {
1026 self.start_time = Some(Instant::now());
1027 }
1028
1029 self.system_resources.cpu_usage_percent = cpu_usage_percent;
1030 self.system_resources.memory_usage_mb = memory_usage_mb;
1031
1032 self.performance_trends
1034 .throughput_trend
1035 .push_back(throughput_samples_per_sec);
1036 if self.performance_trends.throughput_trend.len() > 100 {
1037 self.performance_trends.throughput_trend.pop_front();
1038 }
1039 }
1040}
1041
1042impl AlertType {
1044 pub fn as_str(&self) -> &'static str {
1046 match self {
1047 AlertType::QualityDegradation => "quality_degradation",
1048 AlertType::HighLatency => "high_latency",
1049 AlertType::ArtifactsDetected => "artifacts_detected",
1050 AlertType::PerformanceIssue => "performance_issue",
1051 AlertType::SystemOverload => "system_overload",
1052 AlertType::MemoryPressure => "memory_pressure",
1053 AlertType::SessionFailure => "session_failure",
1054 }
1055 }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060 use super::*;
1061
1062 #[test]
1063 fn test_monitor_config_default() {
1064 let config = MonitorConfig::default();
1065 assert_eq!(config.monitoring_interval_ms, 100);
1066 assert_eq!(config.quality_alert_threshold, 0.7);
1067 }
1068
1069 #[test]
1070 fn test_alert_type_as_str() {
1071 assert_eq!(
1072 AlertType::QualityDegradation.as_str(),
1073 "quality_degradation"
1074 );
1075 assert_eq!(AlertType::HighLatency.as_str(), "high_latency");
1076 }
1077
1078 #[tokio::test]
1079 async fn test_quality_monitor_creation() {
1080 let monitor = QualityMonitor::new();
1081 assert!(monitor.monitor_task.is_none());
1082 assert!(monitor.quality_sender.is_none());
1083 }
1084}