Skip to main content

voirs_conversion/
monitoring.rs

1//! Real-time quality monitoring system for voice conversion
2//!
3//! This module provides real-time monitoring capabilities for tracking quality metrics,
4//! detecting performance issues, and generating alerts during voice conversion operations.
5
6use crate::quality::{ArtifactDetector, ArtifactType, DetectedArtifacts, QualityAssessment};
7use crate::{Error, Result};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tracing::{debug, error, info, warn};
15
16/// Real-time quality monitoring system
17#[derive(Debug)]
18pub struct QualityMonitor {
19    /// Configuration for monitoring behavior
20    config: MonitorConfig,
21    /// Quality metrics collector
22    collector: Arc<Mutex<MetricsCollector>>,
23    /// Alert system for quality issues
24    alerter: Arc<Mutex<AlertSystem>>,
25    /// Background monitoring task handle
26    monitor_task: Option<JoinHandle<()>>,
27    /// Channel for sending quality data
28    quality_sender: Option<mpsc::UnboundedSender<QualityEvent>>,
29    /// Performance tracker
30    performance_tracker: Arc<Mutex<PerformanceTracker>>,
31}
32
33/// Configuration for quality monitoring
34#[derive(Debug, Clone)]
35pub struct MonitorConfig {
36    /// Monitoring interval in milliseconds
37    pub monitoring_interval_ms: u64,
38    /// Maximum history length for trend analysis
39    pub max_history_length: usize,
40    /// Quality threshold for alerts (0.0 to 1.0)
41    pub quality_alert_threshold: f32,
42    /// Latency threshold for alerts in milliseconds
43    pub latency_alert_threshold_ms: u64,
44    /// Enable artifact detection alerts
45    pub enable_artifact_alerts: bool,
46    /// Enable performance tracking
47    pub enable_performance_tracking: bool,
48    /// Enable trend analysis
49    pub enable_trend_analysis: bool,
50    /// Report generation interval in seconds
51    pub report_interval_seconds: u64,
52}
53
54impl Default for MonitorConfig {
55    fn default() -> Self {
56        Self {
57            monitoring_interval_ms: 100,
58            max_history_length: 10000,
59            quality_alert_threshold: 0.7,
60            latency_alert_threshold_ms: 100,
61            enable_artifact_alerts: true,
62            enable_performance_tracking: true,
63            enable_trend_analysis: true,
64            report_interval_seconds: 300, // 5 minutes
65        }
66    }
67}
68
69/// Quality event types for monitoring
70#[derive(Debug, Clone)]
71pub enum QualityEvent {
72    /// Quality update with metrics
73    QualityUpdate {
74        /// Timestamp when the quality measurement was taken
75        timestamp: Instant,
76        /// Unique identifier for the conversion session
77        session_id: String,
78        /// Overall quality score from 0.0 to 1.0
79        overall_quality: f32,
80        /// Detected audio artifacts if any
81        artifacts: Option<DetectedArtifacts>,
82        /// Processing latency in milliseconds
83        processing_latency_ms: u64,
84        /// Additional quality metrics as key-value pairs
85        metadata: HashMap<String, f32>,
86    },
87    /// Performance update
88    PerformanceUpdate {
89        /// Timestamp when the performance measurement was taken
90        timestamp: Instant,
91        /// Unique identifier for the conversion session
92        session_id: String,
93        /// CPU usage as a percentage (0.0 to 100.0)
94        cpu_usage_percent: f32,
95        /// Memory usage in megabytes
96        memory_usage_mb: f64,
97        /// Processing throughput in samples per second
98        throughput_samples_per_sec: f64,
99        /// Current processing queue length
100        queue_length: usize,
101    },
102    /// Quality alert
103    QualityAlert {
104        /// The quality alert to be processed
105        alert: QualityAlert,
106    },
107    /// System status update
108    SystemStatus {
109        /// Timestamp of the status update
110        timestamp: Instant,
111        /// Current system status
112        status: SystemStatus,
113    },
114}
115
116/// Quality alert structure
117#[derive(Debug, Clone)]
118pub struct QualityAlert {
119    /// Timestamp when the alert was generated
120    pub timestamp: Instant,
121    /// Session identifier associated with this alert
122    pub session_id: String,
123    /// Type of alert being raised
124    pub alert_type: AlertType,
125    /// Severity level of the alert
126    pub severity: AlertSeverity,
127    /// Human-readable alert message
128    pub message: String,
129    /// Optional suggested action to resolve the issue
130    pub suggested_action: Option<String>,
131    /// Additional metadata about the alert as key-value pairs
132    pub metadata: HashMap<String, String>,
133}
134
135/// Alert types
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
137pub enum AlertType {
138    /// Quality score has fallen below acceptable threshold
139    QualityDegradation,
140    /// Processing latency exceeds configured limits
141    HighLatency,
142    /// Audio artifacts detected in output
143    ArtifactsDetected,
144    /// General performance degradation detected
145    PerformanceIssue,
146    /// System resources are critically overloaded
147    SystemOverload,
148    /// Memory usage approaching system limits
149    MemoryPressure,
150    /// Conversion session has failed
151    SessionFailure,
152}
153
154/// Alert severity levels
155#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
156pub enum AlertSeverity {
157    /// Informational alert, no action required
158    Info,
159    /// Warning level, attention recommended
160    Warning,
161    /// Critical alert requiring immediate attention
162    Critical,
163}
164
165/// System status
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub enum SystemStatus {
168    /// System operating normally with good performance
169    Healthy,
170    /// System performance is degraded but functional
171    Degraded,
172    /// System resources are overloaded, quality may suffer
173    Overloaded,
174    /// System is experiencing failures
175    Failing,
176}
177
178/// Metrics collector
179#[derive(Debug)]
180pub struct MetricsCollector {
181    /// Quality history
182    quality_history: VecDeque<QualityDataPoint>,
183    /// Per-session metrics
184    session_metrics: HashMap<String, SessionMetrics>,
185    /// Aggregate statistics
186    aggregate_stats: AggregateStats,
187    /// Trend analyzer
188    trend_analyzer: TrendAnalyzer,
189}
190
191/// Quality data point
192#[derive(Debug, Clone)]
193pub struct QualityDataPoint {
194    /// Timestamp of this quality measurement
195    pub timestamp: Instant,
196    /// Session identifier for this measurement
197    pub session_id: String,
198    /// Overall quality score from 0.0 to 1.0
199    pub overall_quality: f32,
200    /// Aggregate artifact severity score
201    pub artifact_score: f32,
202    /// Processing latency in milliseconds
203    pub processing_latency_ms: u64,
204    /// Severity scores for each detected artifact type
205    pub artifacts_by_type: HashMap<ArtifactType, f32>,
206    /// Additional quality metrics as key-value pairs
207    pub metadata: HashMap<String, f32>,
208}
209
210/// Per-session metrics
211#[derive(Debug, Clone)]
212pub struct SessionMetrics {
213    /// Unique session identifier
214    pub session_id: String,
215    /// When the session started
216    pub start_time: Instant,
217    /// Total number of audio samples processed
218    pub samples_processed: u64,
219    /// Average quality score for this session
220    pub average_quality: f32,
221    /// Recent quality scores showing trend over time
222    pub quality_trend: VecDeque<f32>,
223    /// Count of each artifact type detected
224    pub artifact_counts: HashMap<ArtifactType, u64>,
225    /// Performance metrics for this session
226    pub performance_metrics: PerformanceMetrics,
227    /// Alerts generated during this session
228    pub alerts: Vec<QualityAlert>,
229}
230
231/// Performance metrics
232#[derive(Debug, Clone, Default)]
233pub struct PerformanceMetrics {
234    /// Average processing latency in milliseconds
235    pub average_latency_ms: f64,
236    /// Peak processing latency in milliseconds
237    pub peak_latency_ms: u64,
238    /// Processing throughput in samples per second
239    pub throughput_samples_per_sec: f64,
240    /// CPU usage as a percentage (0.0 to 100.0)
241    pub cpu_usage_percent: f32,
242    /// Memory usage in megabytes
243    pub memory_usage_mb: f64,
244}
245
246/// Aggregate statistics
247#[derive(Debug, Clone, Default)]
248pub struct AggregateStats {
249    /// Total number of quality data points collected
250    pub total_points: u64,
251    /// Overall average quality across all measurements
252    pub overall_avg_quality: f32,
253    /// Variance in quality measurements
254    pub quality_variance: f32,
255    /// Average processing latency in milliseconds
256    pub average_latency_ms: f64,
257    /// Average throughput in samples per second
258    pub throughput_samples_per_sec: f64,
259}
260
261/// Trend analyzer
262#[derive(Debug, Default)]
263pub struct TrendAnalyzer {
264    /// Current quality trend direction
265    pub quality_trend: TrendDirection,
266    /// Current latency trend direction
267    pub latency_trend: TrendDirection,
268    /// Current throughput trend direction
269    pub throughput_trend: TrendDirection,
270}
271
272/// Trend direction
273#[derive(Debug, Clone, Copy, PartialEq, Eq)]
274pub enum TrendDirection {
275    /// Trend direction not yet determined
276    Unknown,
277    /// Metric is improving over time
278    Improving,
279    /// Metric is stable with minimal variation
280    Stable,
281    /// Metric is degrading over time
282    Degrading,
283}
284
285impl Default for TrendDirection {
286    fn default() -> Self {
287        TrendDirection::Unknown
288    }
289}
290
291/// Alert system
292#[derive(Debug)]
293pub struct AlertSystem {
294    config: AlertConfig,
295    active_alerts: HashMap<String, Vec<QualityAlert>>,
296    alert_history: VecDeque<QualityAlert>,
297    handlers: Vec<Box<dyn AlertHandler>>,
298}
299
300/// Alert configuration
301#[derive(Debug, Clone)]
302pub struct AlertConfig {
303    /// Minimum seconds between duplicate alerts
304    pub alert_cooldown_seconds: u64,
305    /// Maximum number of alerts to keep in history
306    pub max_alert_history: usize,
307    /// Enable email notifications for alerts
308    pub enable_email_alerts: bool,
309    /// Enable Slack notifications for alerts
310    pub enable_slack_alerts: bool,
311    /// Enable webhook notifications for alerts
312    pub enable_webhook_alerts: bool,
313}
314
315impl Default for AlertConfig {
316    fn default() -> Self {
317        Self {
318            alert_cooldown_seconds: 60,
319            max_alert_history: 1000,
320            enable_email_alerts: false,
321            enable_slack_alerts: false,
322            enable_webhook_alerts: false,
323        }
324    }
325}
326
327/// Alert handler trait
328pub trait AlertHandler: Send + Sync + std::fmt::Debug {
329    /// Process and handle a quality alert
330    fn handle_alert(&mut self, alert: &QualityAlert) -> Result<()>;
331    /// Get the name of this alert handler
332    fn name(&self) -> &str;
333}
334
335/// Logging alert handler
336#[derive(Debug)]
337pub struct LoggingAlertHandler;
338
339impl AlertHandler for LoggingAlertHandler {
340    fn handle_alert(&mut self, alert: &QualityAlert) -> Result<()> {
341        match alert.severity {
342            AlertSeverity::Info => {
343                info!("[ALERT] {}: {}", alert.alert_type.as_str(), alert.message)
344            }
345            AlertSeverity::Warning => {
346                warn!("[ALERT] {}: {}", alert.alert_type.as_str(), alert.message)
347            }
348            AlertSeverity::Critical => {
349                error!("[ALERT] {}: {}", alert.alert_type.as_str(), alert.message)
350            }
351        }
352        Ok(())
353    }
354
355    fn name(&self) -> &str {
356        "logging"
357    }
358}
359
360/// Performance tracker
361#[derive(Debug, Default)]
362pub struct PerformanceTracker {
363    /// When performance tracking started
364    pub start_time: Option<Instant>,
365    /// Current number of active conversion sessions
366    pub active_sessions: usize,
367    /// Total number of sessions since start
368    pub total_sessions: u64,
369    /// Current system resource usage
370    pub system_resources: SystemResources,
371    /// Historical performance trend data
372    pub performance_trends: PerformanceTrends,
373}
374
375/// Performance trends
376#[derive(Debug, Default)]
377pub struct PerformanceTrends {
378    /// Historical CPU usage measurements
379    pub cpu_trend: VecDeque<f32>,
380    /// Historical memory usage measurements
381    pub memory_trend: VecDeque<f64>,
382    /// Historical throughput measurements
383    pub throughput_trend: VecDeque<f64>,
384}
385
386/// System resource usage data
387#[derive(Debug, Default, Clone, Serialize, Deserialize)]
388pub struct SystemResources {
389    /// CPU usage as a percentage (0.0 to 100.0)
390    pub cpu_usage_percent: f32,
391    /// Memory usage in megabytes
392    pub memory_usage_mb: f64,
393    /// GPU usage percentage if GPU is available
394    pub gpu_usage_percent: Option<f32>,
395    /// Disk I/O rate in megabytes per second
396    pub disk_io_mb_per_sec: f64,
397    /// Network I/O rate in megabytes per second
398    pub network_io_mb_per_sec: f64,
399}
400
401/// Dashboard data structure for real-time monitoring visualization
402#[derive(Debug)]
403pub struct QualityDashboard {
404    /// Dashboard data for each active session
405    pub current_sessions: HashMap<String, SessionDashboard>,
406    /// System-wide overview metrics
407    pub system_overview: SystemOverview,
408    /// Historical trend data for visualization
409    pub trends: DashboardTrends,
410    /// Most recent alerts for display
411    pub recent_alerts: VecDeque<QualityAlert>,
412}
413
414/// Per-session dashboard data
415#[derive(Debug, Clone)]
416pub struct SessionDashboard {
417    /// Unique session identifier
418    pub session_id: String,
419    /// When this session started
420    pub start_time: Instant,
421    /// Most recent quality score
422    pub current_quality: f32,
423    /// Recent quality scores for trend visualization
424    pub quality_trend: Vec<f32>,
425    /// Current processing latency in milliseconds
426    pub current_latency_ms: u64,
427    /// Current throughput in samples per second
428    pub throughput_samples_per_sec: f64,
429    /// Currently detected artifact types
430    pub active_artifacts: Vec<ArtifactType>,
431    /// Total number of alerts for this session
432    pub alert_count: usize,
433}
434
435/// System overview for dashboard
436#[derive(Debug, Clone, Default)]
437pub struct SystemOverview {
438    /// Number of currently active sessions
439    pub active_sessions: usize,
440    /// Total sessions processed today
441    pub total_sessions_today: u64,
442    /// Average quality score across all sessions
443    pub average_quality: f32,
444    /// Overall system load as a percentage
445    pub system_load_percent: f32,
446    /// Total memory usage in megabytes
447    pub memory_usage_mb: f64,
448    /// System uptime in hours
449    pub uptime_hours: f64,
450    /// Number of alerts in the last hour
451    pub alerts_last_hour: usize,
452}
453
454/// Trend data for dashboard visualization
455#[derive(Debug, Default)]
456pub struct DashboardTrends {
457    /// Quality measurements over time for trend charts
458    pub quality_over_time: Vec<(Instant, f32)>,
459    /// Latency measurements over time for trend charts
460    pub latency_over_time: Vec<(Instant, f64)>,
461    /// Throughput measurements over time for trend charts
462    pub throughput_over_time: Vec<(Instant, f64)>,
463    /// System resource usage over time for trend charts
464    pub resource_usage_over_time: Vec<(Instant, SystemResources)>,
465}
466
467/// Default implementation
468impl Default for QualityMonitor {
469    fn default() -> Self {
470        Self::new()
471    }
472}
473
474// Implementation of main QualityMonitor
475impl QualityMonitor {
476    /// Create new quality monitor with default configuration
477    pub fn new() -> Self {
478        Self::with_config(MonitorConfig::default())
479    }
480
481    /// Create new quality monitor with custom configuration
482    pub fn with_config(config: MonitorConfig) -> Self {
483        let collector = Arc::new(Mutex::new(MetricsCollector::new(config.max_history_length)));
484        let alerter = Arc::new(Mutex::new(AlertSystem::new()));
485        let performance_tracker = Arc::new(Mutex::new(PerformanceTracker::default()));
486
487        Self {
488            config,
489            collector,
490            alerter,
491            monitor_task: None,
492            quality_sender: None,
493            performance_tracker,
494        }
495    }
496
497    /// Start real-time monitoring
498    pub async fn start_monitoring(&mut self) -> Result<()> {
499        if self.monitor_task.is_some() {
500            return Err(Error::validation("Monitoring already started".to_string()));
501        }
502
503        let (sender, mut receiver) = mpsc::unbounded_channel();
504        self.quality_sender = Some(sender);
505
506        let collector = Arc::clone(&self.collector);
507        let alerter = Arc::clone(&self.alerter);
508        let performance_tracker = Arc::clone(&self.performance_tracker);
509        let config = self.config.clone();
510
511        let monitor_task = tokio::spawn(async move {
512            let mut report_timer =
513                tokio::time::interval(Duration::from_secs(config.report_interval_seconds));
514
515            loop {
516                tokio::select! {
517                    // Handle incoming quality events
518                    event = receiver.recv() => {
519                        match event {
520                            Some(quality_event) => {
521                                Self::process_quality_event(
522                                    quality_event,
523                                    &collector,
524                                    &alerter,
525                                    &performance_tracker,
526                                    &config,
527                                ).await;
528                            }
529                            None => {
530                                info!("Quality monitoring channel closed");
531                                break;
532                            }
533                        }
534                    }
535
536                    // Generate periodic reports
537                    _ = report_timer.tick() => {
538                        if let Err(e) = Self::generate_periodic_report(&collector, &performance_tracker).await {
539                            error!("Failed to generate periodic report: {}", e);
540                        }
541                    }
542                }
543            }
544        });
545
546        self.monitor_task = Some(monitor_task);
547
548        info!(
549            "Quality monitoring started with interval {}ms",
550            self.config.monitoring_interval_ms
551        );
552        Ok(())
553    }
554
555    /// Stop monitoring
556    pub async fn stop_monitoring(&mut self) -> Result<()> {
557        if let Some(task) = self.monitor_task.take() {
558            task.abort();
559            info!("Quality monitoring stopped");
560        }
561
562        self.quality_sender = None;
563        Ok(())
564    }
565
566    /// Submit quality data for monitoring
567    pub fn submit_quality_data(
568        &self,
569        session_id: String,
570        overall_quality: f32,
571        artifacts: Option<DetectedArtifacts>,
572        processing_latency_ms: u64,
573        metadata: HashMap<String, f32>,
574    ) -> Result<()> {
575        if let Some(sender) = &self.quality_sender {
576            let event = QualityEvent::QualityUpdate {
577                timestamp: Instant::now(),
578                session_id,
579                overall_quality,
580                artifacts,
581                processing_latency_ms,
582                metadata,
583            };
584
585            sender
586                .send(event)
587                .map_err(|e| Error::runtime(format!("Failed to submit quality data: {e}")))?;
588        }
589
590        Ok(())
591    }
592
593    /// Submit performance data for monitoring
594    pub fn submit_performance_data(
595        &self,
596        session_id: String,
597        cpu_usage_percent: f32,
598        memory_usage_mb: f64,
599        throughput_samples_per_sec: f64,
600        queue_length: usize,
601    ) -> Result<()> {
602        if let Some(sender) = &self.quality_sender {
603            let event = QualityEvent::PerformanceUpdate {
604                timestamp: Instant::now(),
605                session_id,
606                cpu_usage_percent,
607                memory_usage_mb,
608                throughput_samples_per_sec,
609                queue_length,
610            };
611
612            sender
613                .send(event)
614                .map_err(|e| Error::runtime(format!("Failed to submit performance data: {e}")))?;
615        }
616
617        Ok(())
618    }
619
620    /// Get current dashboard data
621    pub async fn get_dashboard(&self) -> Result<QualityDashboard> {
622        let collector = self.collector.lock().expect("Collector lock poisoned");
623        let alerter = self.alerter.lock().expect("Alerter lock poisoned");
624        let performance_tracker = self
625            .performance_tracker
626            .lock()
627            .expect("Performance tracker lock poisoned");
628
629        let mut current_sessions = HashMap::new();
630
631        for (session_id, metrics) in &collector.session_metrics {
632            current_sessions.insert(
633                session_id.clone(),
634                SessionDashboard {
635                    session_id: session_id.clone(),
636                    start_time: metrics.start_time,
637                    current_quality: metrics.quality_trend.back().copied().unwrap_or(0.0),
638                    quality_trend: metrics.quality_trend.iter().copied().collect(),
639                    current_latency_ms: metrics.performance_metrics.peak_latency_ms,
640                    throughput_samples_per_sec: metrics
641                        .performance_metrics
642                        .throughput_samples_per_sec,
643                    active_artifacts: metrics.artifact_counts.keys().copied().collect(),
644                    alert_count: metrics.alerts.len(),
645                },
646            );
647        }
648
649        let system_overview = SystemOverview {
650            active_sessions: collector.session_metrics.len(), // Use actual session count
651            total_sessions_today: performance_tracker.total_sessions,
652            average_quality: collector.aggregate_stats.overall_avg_quality,
653            system_load_percent: performance_tracker.system_resources.cpu_usage_percent,
654            memory_usage_mb: performance_tracker.system_resources.memory_usage_mb,
655            uptime_hours: performance_tracker
656                .start_time
657                .map(|start| start.elapsed().as_secs_f64() / 3600.0)
658                .unwrap_or(0.0),
659            alerts_last_hour: alerter
660                .alert_history
661                .iter()
662                .filter(|alert| alert.timestamp.elapsed() < Duration::from_secs(3600))
663                .count(),
664        };
665
666        let trends = DashboardTrends {
667            quality_over_time: collector
668                .quality_history
669                .iter()
670                .map(|point| (point.timestamp, point.overall_quality))
671                .collect(),
672            latency_over_time: collector
673                .quality_history
674                .iter()
675                .map(|point| (point.timestamp, point.processing_latency_ms as f64))
676                .collect(),
677            throughput_over_time: {
678                // Build throughput tracking from performance trends and quality history
679                let mut throughput_data = Vec::new();
680                let throughput_trends = &performance_tracker.performance_trends.throughput_trend;
681
682                if !throughput_trends.is_empty() && !collector.quality_history.is_empty() {
683                    let trend_count = throughput_trends.len();
684                    let history_count = collector.quality_history.len();
685
686                    // Use quality history timestamps as reference points for throughput data
687                    let step_size = if history_count >= trend_count {
688                        history_count / trend_count
689                    } else {
690                        1
691                    };
692
693                    for (i, &throughput) in throughput_trends.iter().enumerate() {
694                        let history_index = (i * step_size).min(history_count.saturating_sub(1));
695                        if let Some(quality_point) = collector.quality_history.get(history_index) {
696                            throughput_data.push((quality_point.timestamp, throughput));
697                        }
698                    }
699                }
700                throughput_data
701            },
702            resource_usage_over_time: {
703                // Build resource usage tracking from performance trends
704                let mut resource_data = Vec::new();
705                let cpu_trends = &performance_tracker.performance_trends.cpu_trend;
706                let memory_trends = &performance_tracker.performance_trends.memory_trend;
707
708                if let Some(start_time) = performance_tracker.start_time {
709                    let trend_count = cpu_trends.len().min(memory_trends.len());
710
711                    // Create evenly distributed timestamps based on system start time
712                    for i in 0..trend_count {
713                        let timestamp = start_time + Duration::from_secs((i as u64) * 60); // 1-minute intervals
714                        let cpu_usage = cpu_trends.get(i).copied().unwrap_or(0.0);
715                        let memory_usage = memory_trends.get(i).copied().unwrap_or(0.0);
716
717                        let resources = SystemResources {
718                            cpu_usage_percent: cpu_usage,
719                            memory_usage_mb: memory_usage,
720                            gpu_usage_percent: performance_tracker
721                                .system_resources
722                                .gpu_usage_percent,
723                            disk_io_mb_per_sec: performance_tracker
724                                .system_resources
725                                .disk_io_mb_per_sec,
726                            network_io_mb_per_sec: performance_tracker
727                                .system_resources
728                                .network_io_mb_per_sec,
729                        };
730                        resource_data.push((timestamp, resources));
731                    }
732                }
733                resource_data
734            },
735        };
736
737        let recent_alerts = alerter
738            .alert_history
739            .iter()
740            .rev()
741            .take(20)
742            .cloned()
743            .collect();
744
745        Ok(QualityDashboard {
746            current_sessions,
747            system_overview,
748            trends,
749            recent_alerts,
750        })
751    }
752
753    // Private implementation methods
754
755    async fn process_quality_event(
756        event: QualityEvent,
757        collector: &Arc<Mutex<MetricsCollector>>,
758        alerter: &Arc<Mutex<AlertSystem>>,
759        performance_tracker: &Arc<Mutex<PerformanceTracker>>,
760        config: &MonitorConfig,
761    ) {
762        match event {
763            QualityEvent::QualityUpdate {
764                timestamp,
765                session_id,
766                overall_quality,
767                artifacts,
768                processing_latency_ms,
769                metadata,
770            } => {
771                // Update collector and check for new session
772                let is_new_session = {
773                    let mut collector_guard = collector.lock().expect("Collector lock poisoned");
774                    let data_point = QualityDataPoint {
775                        timestamp,
776                        session_id: session_id.clone(),
777                        overall_quality,
778                        artifact_score: artifacts.as_ref().map(|a| a.overall_score).unwrap_or(0.0),
779                        processing_latency_ms,
780                        artifacts_by_type: artifacts
781                            .as_ref()
782                            .map(|a| a.artifact_types.clone())
783                            .unwrap_or_default(),
784                        metadata,
785                    };
786                    collector_guard.add_quality_data_point(data_point)
787                };
788
789                // Update active session count if this is a new session
790                if is_new_session {
791                    let mut tracker_guard = performance_tracker
792                        .lock()
793                        .expect("Performance tracker lock poisoned");
794                    tracker_guard.active_sessions += 1;
795                    tracker_guard.total_sessions += 1;
796                }
797
798                // Check for quality alerts
799                if overall_quality < config.quality_alert_threshold {
800                    let alert = QualityAlert {
801                        timestamp,
802                        session_id: session_id.clone(),
803                        alert_type: AlertType::QualityDegradation,
804                        severity: if overall_quality < 0.2 {
805                            AlertSeverity::Critical
806                        } else {
807                            AlertSeverity::Warning
808                        },
809                        message: format!("Quality degraded to {overall_quality:.2}"),
810                        suggested_action: Some(
811                            "Consider adjusting conversion parameters".to_string(),
812                        ),
813                        metadata: HashMap::new(),
814                    };
815
816                    let mut alerter_guard = alerter.lock().expect("Alerter lock poisoned");
817                    alerter_guard.add_alert(alert);
818                }
819
820                // Check for latency alerts
821                if processing_latency_ms > config.latency_alert_threshold_ms {
822                    let alert = QualityAlert {
823                        timestamp,
824                        session_id: session_id.clone(),
825                        alert_type: AlertType::HighLatency,
826                        severity: AlertSeverity::Warning,
827                        message: format!("High processing latency: {processing_latency_ms}ms"),
828                        suggested_action: Some(
829                            "Check system resources and consider load balancing".to_string(),
830                        ),
831                        metadata: HashMap::new(),
832                    };
833
834                    let mut alerter_guard = alerter.lock().expect("Alerter lock poisoned");
835                    alerter_guard.add_alert(alert);
836                }
837            }
838
839            QualityEvent::PerformanceUpdate {
840                timestamp: _,
841                session_id: _,
842                cpu_usage_percent,
843                memory_usage_mb,
844                throughput_samples_per_sec,
845                queue_length,
846            } => {
847                let mut tracker_guard = performance_tracker
848                    .lock()
849                    .expect("Performance tracker lock poisoned");
850                tracker_guard.update_performance_metrics(
851                    cpu_usage_percent,
852                    memory_usage_mb,
853                    throughput_samples_per_sec,
854                    queue_length,
855                );
856            }
857
858            QualityEvent::QualityAlert { .. } => {
859                // Handle external alerts
860            }
861
862            QualityEvent::SystemStatus { .. } => {
863                // Handle system status updates
864            }
865        }
866    }
867
868    async fn generate_periodic_report(
869        collector: &Arc<Mutex<MetricsCollector>>,
870        performance_tracker: &Arc<Mutex<PerformanceTracker>>,
871    ) -> Result<()> {
872        let (stats, performance_stats) = {
873            let collector_guard = collector.lock().expect("Collector lock poisoned");
874            let tracker_guard = performance_tracker
875                .lock()
876                .expect("Performance tracker lock poisoned");
877            (
878                collector_guard.aggregate_stats.clone(),
879                tracker_guard.system_resources.clone(),
880            )
881        };
882
883        info!("=== Quality Monitoring Report ===");
884        info!("Total data points: {}", stats.total_points);
885        info!("Average quality: {:.3}", stats.overall_avg_quality);
886        info!(
887            "System CPU usage: {:.1}%",
888            performance_stats.cpu_usage_percent
889        );
890        info!(
891            "System memory usage: {:.1} MB",
892            performance_stats.memory_usage_mb
893        );
894
895        Ok(())
896    }
897}
898
899// Implementation of helper structs
900impl MetricsCollector {
901    fn new(max_history_length: usize) -> Self {
902        Self {
903            quality_history: VecDeque::with_capacity(max_history_length),
904            session_metrics: HashMap::new(),
905            aggregate_stats: AggregateStats::default(),
906            trend_analyzer: TrendAnalyzer::default(),
907        }
908    }
909
910    fn add_quality_data_point(&mut self, data_point: QualityDataPoint) -> bool {
911        let session_id = data_point.session_id.clone();
912        let timestamp = data_point.timestamp;
913        let artifacts_by_type = data_point.artifacts_by_type.clone();
914        let overall_quality = data_point.overall_quality;
915
916        self.quality_history.push_back(data_point);
917        if self.quality_history.len() > self.quality_history.capacity() {
918            self.quality_history.pop_front();
919        }
920
921        // Update session metrics
922        let is_new_session = !self.session_metrics.contains_key(&session_id);
923        let session_metrics = self
924            .session_metrics
925            .entry(session_id.clone())
926            .or_insert_with(|| SessionMetrics {
927                session_id: session_id.clone(),
928                start_time: timestamp,
929                samples_processed: 0,
930                average_quality: overall_quality,
931                quality_trend: VecDeque::with_capacity(100),
932                artifact_counts: HashMap::new(),
933                performance_metrics: PerformanceMetrics::default(),
934                alerts: Vec::new(),
935            });
936
937        session_metrics.samples_processed += 1;
938        session_metrics.quality_trend.push_back(overall_quality);
939        if session_metrics.quality_trend.len() > 100 {
940            session_metrics.quality_trend.pop_front();
941        }
942
943        // Update artifact counts
944        for artifact_type in artifacts_by_type.keys() {
945            *session_metrics
946                .artifact_counts
947                .entry(*artifact_type)
948                .or_insert(0) += 1;
949        }
950
951        // Update aggregate statistics
952        self.update_aggregate_stats(overall_quality);
953
954        // Return whether this was a new session
955        is_new_session
956    }
957
958    fn update_aggregate_stats(&mut self, overall_quality: f32) {
959        self.aggregate_stats.total_points += 1;
960
961        // Update running average
962        let n = self.aggregate_stats.total_points as f32;
963        self.aggregate_stats.overall_avg_quality =
964            (self.aggregate_stats.overall_avg_quality * (n - 1.0) + overall_quality) / n;
965    }
966}
967
968impl AlertSystem {
969    fn new() -> Self {
970        let handlers: Vec<Box<dyn AlertHandler>> = vec![Box::new(LoggingAlertHandler)];
971
972        Self {
973            config: AlertConfig::default(),
974            active_alerts: HashMap::new(),
975            alert_history: VecDeque::with_capacity(1000),
976            handlers,
977        }
978    }
979
980    fn add_alert(&mut self, alert: QualityAlert) {
981        // Check cooldown period
982        let should_add = self
983            .alert_history
984            .iter()
985            .filter(|existing| {
986                existing.session_id == alert.session_id
987                    && existing.alert_type.as_str() == alert.alert_type.as_str()
988            })
989            .all(|existing| {
990                existing.timestamp.elapsed()
991                    > Duration::from_secs(self.config.alert_cooldown_seconds)
992            });
993
994        if should_add {
995            // Add to active alerts
996            self.active_alerts
997                .entry(alert.session_id.clone())
998                .or_default()
999                .push(alert.clone());
1000
1001            // Add to history
1002            self.alert_history.push_back(alert.clone());
1003            if self.alert_history.len() > self.config.max_alert_history {
1004                self.alert_history.pop_front();
1005            }
1006
1007            // Notify handlers
1008            for handler in &mut self.handlers {
1009                if let Err(e) = handler.handle_alert(&alert) {
1010                    error!("Alert handler '{}' failed: {}", handler.name(), e);
1011                }
1012            }
1013        }
1014    }
1015}
1016
1017impl PerformanceTracker {
1018    fn update_performance_metrics(
1019        &mut self,
1020        cpu_usage_percent: f32,
1021        memory_usage_mb: f64,
1022        throughput_samples_per_sec: f64,
1023        _queue_length: usize,
1024    ) {
1025        if self.start_time.is_none() {
1026            self.start_time = Some(Instant::now());
1027        }
1028
1029        self.system_resources.cpu_usage_percent = cpu_usage_percent;
1030        self.system_resources.memory_usage_mb = memory_usage_mb;
1031
1032        // Update trends
1033        self.performance_trends
1034            .throughput_trend
1035            .push_back(throughput_samples_per_sec);
1036        if self.performance_trends.throughput_trend.len() > 100 {
1037            self.performance_trends.throughput_trend.pop_front();
1038        }
1039    }
1040}
1041
1042// Utility implementations
1043impl AlertType {
1044    /// Get the string representation of this alert type
1045    pub fn as_str(&self) -> &'static str {
1046        match self {
1047            AlertType::QualityDegradation => "quality_degradation",
1048            AlertType::HighLatency => "high_latency",
1049            AlertType::ArtifactsDetected => "artifacts_detected",
1050            AlertType::PerformanceIssue => "performance_issue",
1051            AlertType::SystemOverload => "system_overload",
1052            AlertType::MemoryPressure => "memory_pressure",
1053            AlertType::SessionFailure => "session_failure",
1054        }
1055    }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060    use super::*;
1061
1062    #[test]
1063    fn test_monitor_config_default() {
1064        let config = MonitorConfig::default();
1065        assert_eq!(config.monitoring_interval_ms, 100);
1066        assert_eq!(config.quality_alert_threshold, 0.7);
1067    }
1068
1069    #[test]
1070    fn test_alert_type_as_str() {
1071        assert_eq!(
1072            AlertType::QualityDegradation.as_str(),
1073            "quality_degradation"
1074        );
1075        assert_eq!(AlertType::HighLatency.as_str(), "high_latency");
1076    }
1077
1078    #[tokio::test]
1079    async fn test_quality_monitor_creation() {
1080        let monitor = QualityMonitor::new();
1081        assert!(monitor.monitor_task.is_none());
1082        assert!(monitor.quality_sender.is_none());
1083    }
1084}