1use anyhow::Result;
7use chrono::{DateTime, Utc};
8use scirs2_core::random::{Random, Rng};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::sync::{Arc, RwLock};
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::task::JoinHandle;
15use tracing::{debug, error, info, warn};
16
17#[derive(Debug, Clone, Serialize, Deserialize, Default)]
19pub struct PerformanceMetrics {
20 pub latency: LatencyMetrics,
22 pub throughput: ThroughputMetrics,
24 pub resources: ResourceMetrics,
26 pub quality: QualityMetrics,
28 pub errors: ErrorMetrics,
30 pub cache: CacheMetrics,
32 pub drift: DriftMetrics,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct LatencyMetrics {
39 pub avg_embedding_time_ms: f64,
41 pub p50_latency_ms: f64,
43 pub p95_latency_ms: f64,
45 pub p99_latency_ms: f64,
47 pub max_latency_ms: f64,
49 pub min_latency_ms: f64,
51 pub end_to_end_latency_ms: f64,
53 pub model_inference_time_ms: f64,
55 pub queue_wait_time_ms: f64,
57 pub total_measurements: u64,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ThroughputMetrics {
64 pub requests_per_second: f64,
66 pub embeddings_per_second: f64,
68 pub batches_per_second: f64,
70 pub peak_throughput: f64,
72 pub concurrent_requests: u32,
74 pub max_concurrent_requests: u32,
76 pub total_requests: u64,
78 pub failed_requests: u64,
80 pub success_rate: f64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ResourceMetrics {
87 pub cpu_utilization_percent: f64,
89 pub memory_usage_mb: f64,
91 pub gpu_utilization_percent: f64,
93 pub gpu_memory_usage_mb: f64,
95 pub network_io_mbps: f64,
97 pub disk_io_mbps: f64,
99 pub peak_memory_mb: f64,
101 pub peak_gpu_memory_mb: f64,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct QualityMetrics {
108 pub avg_quality_score: f64,
110 pub isotropy_score: f64,
112 pub neighborhood_preservation: f64,
114 pub clustering_quality: f64,
116 pub similarity_correlation: f64,
118 pub quality_alerts: u32,
120 pub last_assessment: DateTime<Utc>,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct ErrorMetrics {
127 pub total_errors: u64,
129 pub error_rate_per_hour: f64,
131 pub errors_by_type: HashMap<String, u64>,
133 pub critical_errors: u64,
135 pub timeout_errors: u64,
137 pub model_errors: u64,
139 pub system_errors: u64,
141 pub last_error: Option<DateTime<Utc>>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct CacheMetrics {
148 pub hit_rate: f64,
150 pub l1_hit_rate: f64,
152 pub l2_hit_rate: f64,
154 pub l3_hit_rate: f64,
156 pub cache_memory_mb: f64,
158 pub cache_evictions: u64,
160 pub time_saved_seconds: f64,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct DriftMetrics {
167 pub quality_drift_score: f64,
169 pub performance_drift_score: f64,
171 pub distribution_shift: bool,
173 pub concept_drift_score: f64,
175 pub data_quality_issues: u32,
177 pub drift_alerts: u32,
179 pub last_drift_check: DateTime<Utc>,
181}
182
183pub struct PerformanceMonitor {
185 metrics: Arc<RwLock<PerformanceMetrics>>,
187 latency_window: Arc<Mutex<VecDeque<f64>>>,
189 throughput_window: Arc<Mutex<VecDeque<f64>>>,
191 error_log: Arc<Mutex<VecDeque<ErrorEvent>>>,
193 quality_history: Arc<Mutex<VecDeque<QualityAssessment>>>,
195 config: MonitoringConfig,
197 monitoring_tasks: Vec<JoinHandle<()>>,
199 alert_handlers: Vec<Box<dyn AlertHandler + Send + Sync>>,
201}
202
203#[derive(Debug, Clone)]
205pub struct MonitoringConfig {
206 pub collection_interval_seconds: u64,
208 pub latency_window_size: usize,
210 pub throughput_window_size: usize,
212 pub quality_assessment_interval_seconds: u64,
214 pub drift_detection_interval_seconds: u64,
216 pub enable_alerting: bool,
218 pub alert_thresholds: AlertThresholds,
220 pub export_config: ExportConfig,
222}
223
224#[derive(Debug, Clone)]
226pub struct AlertThresholds {
227 pub max_p95_latency_ms: f64,
229 pub min_throughput_rps: f64,
231 pub max_error_rate: f64,
233 pub min_cache_hit_rate: f64,
235 pub max_quality_drift: f64,
237 pub max_memory_usage_mb: f64,
239 pub max_gpu_memory_mb: f64,
241}
242
243#[derive(Debug, Clone)]
245pub struct ExportConfig {
246 pub enable_prometheus: bool,
248 pub prometheus_port: u16,
250 pub enable_opentelemetry: bool,
252 pub otlp_endpoint: Option<String>,
254 pub export_interval_seconds: u64,
256 pub enable_json_export: bool,
258 pub json_export_path: Option<String>,
260}
261
262#[derive(Debug, Clone)]
264pub struct ErrorEvent {
265 pub timestamp: DateTime<Utc>,
266 pub error_type: String,
267 pub error_message: String,
268 pub severity: ErrorSeverity,
269 pub context: HashMap<String, String>,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub enum ErrorSeverity {
275 Low,
276 Medium,
277 High,
278 Critical,
279}
280
281#[derive(Debug, Clone)]
283pub struct QualityAssessment {
284 pub timestamp: DateTime<Utc>,
285 pub quality_score: f64,
286 pub metrics: HashMap<String, f64>,
287 pub assessment_details: String,
288}
289
290pub trait AlertHandler {
292 fn handle_alert(&self, alert: Alert) -> Result<()>;
293}
294
295#[derive(Debug, Clone)]
297pub struct Alert {
298 pub alert_type: AlertType,
299 pub message: String,
300 pub severity: AlertSeverity,
301 pub timestamp: DateTime<Utc>,
302 pub metrics: HashMap<String, f64>,
303}
304
305#[derive(Debug, Clone)]
307pub enum AlertType {
308 HighLatency,
309 LowThroughput,
310 HighErrorRate,
311 LowCacheHitRate,
312 QualityDrift,
313 PerformanceDrift,
314 ResourceExhaustion,
315 SystemFailure,
316}
317
318#[derive(Debug, Clone)]
320pub enum AlertSeverity {
321 Info,
322 Warning,
323 Critical,
324 Emergency,
325}
326
327impl Default for MonitoringConfig {
328 fn default() -> Self {
329 Self {
330 collection_interval_seconds: 10,
331 latency_window_size: 1000,
332 throughput_window_size: 100,
333 quality_assessment_interval_seconds: 300, drift_detection_interval_seconds: 3600, enable_alerting: true,
336 alert_thresholds: AlertThresholds::default(),
337 export_config: ExportConfig::default(),
338 }
339 }
340}
341
342impl Default for AlertThresholds {
343 fn default() -> Self {
344 Self {
345 max_p95_latency_ms: 500.0,
346 min_throughput_rps: 100.0,
347 max_error_rate: 0.05, min_cache_hit_rate: 0.8, max_quality_drift: 0.1,
350 max_memory_usage_mb: 4096.0, max_gpu_memory_mb: 8192.0, }
353 }
354}
355
356impl Default for ExportConfig {
357 fn default() -> Self {
358 Self {
359 enable_prometheus: true,
360 prometheus_port: 9090,
361 enable_opentelemetry: false,
362 otlp_endpoint: None,
363 export_interval_seconds: 60,
364 enable_json_export: false,
365 json_export_path: None,
366 }
367 }
368}
369
370impl Default for LatencyMetrics {
371 fn default() -> Self {
372 Self {
373 avg_embedding_time_ms: 0.0,
374 p50_latency_ms: 0.0,
375 p95_latency_ms: 0.0,
376 p99_latency_ms: 0.0,
377 max_latency_ms: 0.0,
378 min_latency_ms: f64::MAX,
379 end_to_end_latency_ms: 0.0,
380 model_inference_time_ms: 0.0,
381 queue_wait_time_ms: 0.0,
382 total_measurements: 0,
383 }
384 }
385}
386
387impl Default for ThroughputMetrics {
388 fn default() -> Self {
389 Self {
390 requests_per_second: 0.0,
391 embeddings_per_second: 0.0,
392 batches_per_second: 0.0,
393 peak_throughput: 0.0,
394 concurrent_requests: 0,
395 max_concurrent_requests: 0,
396 total_requests: 0,
397 failed_requests: 0,
398 success_rate: 1.0,
399 }
400 }
401}
402
403impl Default for ResourceMetrics {
404 fn default() -> Self {
405 Self {
406 cpu_utilization_percent: 0.0,
407 memory_usage_mb: 0.0,
408 gpu_utilization_percent: 0.0,
409 gpu_memory_usage_mb: 0.0,
410 network_io_mbps: 0.0,
411 disk_io_mbps: 0.0,
412 peak_memory_mb: 0.0,
413 peak_gpu_memory_mb: 0.0,
414 }
415 }
416}
417
418impl Default for QualityMetrics {
419 fn default() -> Self {
420 Self {
421 avg_quality_score: 0.0,
422 isotropy_score: 0.0,
423 neighborhood_preservation: 0.0,
424 clustering_quality: 0.0,
425 similarity_correlation: 0.0,
426 quality_alerts: 0,
427 last_assessment: Utc::now(),
428 }
429 }
430}
431
432impl Default for ErrorMetrics {
433 fn default() -> Self {
434 Self {
435 total_errors: 0,
436 error_rate_per_hour: 0.0,
437 errors_by_type: HashMap::new(),
438 critical_errors: 0,
439 timeout_errors: 0,
440 model_errors: 0,
441 system_errors: 0,
442 last_error: None,
443 }
444 }
445}
446
447impl Default for CacheMetrics {
448 fn default() -> Self {
449 Self {
450 hit_rate: 0.0,
451 l1_hit_rate: 0.0,
452 l2_hit_rate: 0.0,
453 l3_hit_rate: 0.0,
454 cache_memory_mb: 0.0,
455 cache_evictions: 0,
456 time_saved_seconds: 0.0,
457 }
458 }
459}
460
461impl Default for DriftMetrics {
462 fn default() -> Self {
463 Self {
464 quality_drift_score: 0.0,
465 performance_drift_score: 0.0,
466 distribution_shift: false,
467 concept_drift_score: 0.0,
468 data_quality_issues: 0,
469 drift_alerts: 0,
470 last_drift_check: Utc::now(),
471 }
472 }
473}
474
475impl PerformanceMonitor {
476 pub fn new(config: MonitoringConfig) -> Self {
478 Self {
479 metrics: Arc::new(RwLock::new(PerformanceMetrics::default())),
480 latency_window: Arc::new(Mutex::new(VecDeque::with_capacity(
481 config.latency_window_size,
482 ))),
483 throughput_window: Arc::new(Mutex::new(VecDeque::with_capacity(
484 config.throughput_window_size,
485 ))),
486 error_log: Arc::new(Mutex::new(VecDeque::with_capacity(1000))),
487 quality_history: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
488 config,
489 monitoring_tasks: Vec::new(),
490 alert_handlers: Vec::new(),
491 }
492 }
493
494 pub async fn start(&mut self) -> Result<()> {
496 info!("Starting performance monitoring system");
497
498 let metrics_task = self.start_metrics_collection().await;
500 self.monitoring_tasks.push(metrics_task);
501
502 let drift_task = self.start_drift_detection().await;
504 self.monitoring_tasks.push(drift_task);
505
506 let quality_task = self.start_quality_assessment().await;
508 self.monitoring_tasks.push(quality_task);
509
510 if self.config.export_config.enable_prometheus {
512 let export_task = self.start_metrics_export().await;
513 self.monitoring_tasks.push(export_task);
514 }
515
516 info!("Performance monitoring system started successfully");
517 Ok(())
518 }
519
520 pub async fn stop(&mut self) {
522 info!("Stopping performance monitoring system");
523
524 for task in self.monitoring_tasks.drain(..) {
525 task.abort();
526 }
527
528 info!("Performance monitoring system stopped");
529 }
530
531 pub async fn record_latency(&self, latency_ms: f64) {
533 let mut window = self.latency_window.lock().await;
534
535 if window.len() >= self.config.latency_window_size {
537 window.pop_front();
538 }
539 window.push_back(latency_ms);
540
541 {
543 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
544 metrics.latency.total_measurements += 1;
545
546 metrics.latency.max_latency_ms = metrics.latency.max_latency_ms.max(latency_ms);
548 metrics.latency.min_latency_ms = metrics.latency.min_latency_ms.min(latency_ms);
549
550 let alpha = 0.1; metrics.latency.avg_embedding_time_ms =
553 alpha * latency_ms + (1.0 - alpha) * metrics.latency.avg_embedding_time_ms;
554
555 let mut sorted_latencies: Vec<f64> = window.iter().copied().collect();
557 sorted_latencies.sort_by(|a, b| {
558 a.partial_cmp(b)
559 .expect("latency values should be comparable")
560 });
561
562 if !sorted_latencies.is_empty() {
563 let len = sorted_latencies.len();
564 metrics.latency.p50_latency_ms = sorted_latencies[len * 50 / 100];
565 metrics.latency.p95_latency_ms = sorted_latencies[len * 95 / 100];
566 metrics.latency.p99_latency_ms = sorted_latencies[len * 99 / 100];
567 }
568 }
569
570 if self.config.enable_alerting {
572 self.check_latency_alerts(latency_ms).await;
573 }
574 }
575
576 pub async fn record_throughput(&self, requests_per_second: f64) {
578 let mut window = self.throughput_window.lock().await;
579
580 if window.len() >= self.config.throughput_window_size {
582 window.pop_front();
583 }
584 window.push_back(requests_per_second);
585
586 {
588 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
589 metrics.throughput.requests_per_second = requests_per_second;
590 metrics.throughput.peak_throughput =
591 metrics.throughput.peak_throughput.max(requests_per_second);
592
593 let avg_throughput = window.iter().sum::<f64>() / window.len() as f64;
595 metrics.throughput.requests_per_second = avg_throughput;
596 }
597
598 if self.config.enable_alerting {
600 self.check_throughput_alerts(requests_per_second).await;
601 }
602 }
603
604 pub async fn record_error(&self, error_event: ErrorEvent) {
606 let mut error_log = self.error_log.lock().await;
607
608 if error_log.len() >= 1000 {
610 error_log.pop_front();
611 }
612 error_log.push_back(error_event.clone());
613
614 {
616 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
617 metrics.errors.total_errors += 1;
618 metrics.errors.last_error = Some(error_event.timestamp);
619
620 *metrics
622 .errors
623 .errors_by_type
624 .entry(error_event.error_type.clone())
625 .or_insert(0) += 1;
626
627 if let ErrorSeverity::Critical = error_event.severity {
629 metrics.errors.critical_errors += 1
630 }
631
632 if error_event.error_type.contains("timeout") {
633 metrics.errors.timeout_errors += 1;
634 } else if error_event.error_type.contains("model") {
635 metrics.errors.model_errors += 1;
636 } else {
637 metrics.errors.system_errors += 1;
638 }
639
640 let total_requests = metrics.throughput.total_requests;
642 if total_requests > 0 {
643 metrics.errors.error_rate_per_hour =
644 (metrics.errors.total_errors as f64 / total_requests as f64) * 3600.0;
645 }
646 }
647
648 if matches!(error_event.severity, ErrorSeverity::Critical) {
650 self.handle_critical_error(error_event).await;
651 }
652 }
653
654 pub async fn update_resource_metrics(&self, resources: ResourceMetrics) {
656 {
657 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
658
659 metrics.resources.peak_memory_mb = metrics
661 .resources
662 .peak_memory_mb
663 .max(resources.memory_usage_mb);
664 metrics.resources.peak_gpu_memory_mb = metrics
665 .resources
666 .peak_gpu_memory_mb
667 .max(resources.gpu_memory_usage_mb);
668
669 metrics.resources = resources.clone();
670 }
671
672 if self.config.enable_alerting {
674 self.check_resource_alerts(resources).await;
675 }
676 }
677
678 pub async fn update_cache_metrics(&self, cache_metrics: CacheMetrics) {
680 {
681 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
682 metrics.cache = cache_metrics.clone();
683 }
684
685 if self.config.enable_alerting
687 && cache_metrics.hit_rate < self.config.alert_thresholds.min_cache_hit_rate
688 {
689 self.send_alert(Alert {
690 alert_type: AlertType::LowCacheHitRate,
691 message: format!(
692 "Cache hit rate dropped to {:.2}%",
693 cache_metrics.hit_rate * 100.0
694 ),
695 severity: AlertSeverity::Warning,
696 timestamp: Utc::now(),
697 metrics: HashMap::from([
698 ("hit_rate".to_string(), cache_metrics.hit_rate),
699 (
700 "threshold".to_string(),
701 self.config.alert_thresholds.min_cache_hit_rate,
702 ),
703 ]),
704 })
705 .await;
706 }
707 }
708
709 pub fn get_metrics(&self) -> PerformanceMetrics {
711 self.metrics
712 .read()
713 .expect("rwlock should not be poisoned")
714 .clone()
715 }
716
717 pub fn add_alert_handler(&mut self, handler: Box<dyn AlertHandler + Send + Sync>) {
719 self.alert_handlers.push(handler);
720 }
721
722 async fn start_metrics_collection(&self) -> JoinHandle<()> {
724 let metrics = Arc::clone(&self.metrics);
725 let interval = Duration::from_secs(self.config.collection_interval_seconds);
726
727 tokio::spawn(async move {
728 let mut interval_timer = tokio::time::interval(interval);
729
730 loop {
731 interval_timer.tick().await;
732
733 let system_metrics = Self::collect_system_metrics().await;
735
736 {
738 let mut metrics = metrics.write().expect("rwlock should not be poisoned");
739 metrics.resources = system_metrics;
740 }
741
742 debug!("Collected system metrics");
743 }
744 })
745 }
746
747 async fn start_drift_detection(&self) -> JoinHandle<()> {
749 let metrics = Arc::clone(&self.metrics);
750 let quality_history = Arc::clone(&self.quality_history);
751 let interval = Duration::from_secs(self.config.drift_detection_interval_seconds);
752
753 tokio::spawn(async move {
754 let mut interval_timer = tokio::time::interval(interval);
755
756 loop {
757 interval_timer.tick().await;
758
759 let drift_metrics = Self::detect_drift(&quality_history).await;
761
762 {
764 let mut metrics = metrics.write().expect("rwlock should not be poisoned");
765 metrics.drift = drift_metrics;
766 metrics.drift.last_drift_check = Utc::now();
767 }
768
769 info!("Performed drift detection analysis");
770 }
771 })
772 }
773
774 async fn start_quality_assessment(&self) -> JoinHandle<()> {
776 let metrics = Arc::clone(&self.metrics);
777 let quality_history = Arc::clone(&self.quality_history);
778 let interval = Duration::from_secs(self.config.quality_assessment_interval_seconds);
779
780 tokio::spawn(async move {
781 let mut interval_timer = tokio::time::interval(interval);
782
783 loop {
784 interval_timer.tick().await;
785
786 let quality_assessment = Self::assess_quality().await;
788
789 {
791 let mut history = quality_history.lock().await;
792 if history.len() >= 100 {
793 history.pop_front();
794 }
795 history.push_back(quality_assessment.clone());
796 }
797
798 {
800 let mut metrics = metrics.write().expect("rwlock should not be poisoned");
801 metrics.quality.avg_quality_score = quality_assessment.quality_score;
802 metrics.quality.last_assessment = quality_assessment.timestamp;
803
804 for (key, value) in &quality_assessment.metrics {
806 match key.as_str() {
807 "isotropy" => metrics.quality.isotropy_score = *value,
808 "neighborhood_preservation" => {
809 metrics.quality.neighborhood_preservation = *value
810 }
811 "clustering_quality" => metrics.quality.clustering_quality = *value,
812 "similarity_correlation" => {
813 metrics.quality.similarity_correlation = *value
814 }
815 _ => {}
816 }
817 }
818 }
819
820 info!(
821 "Performed quality assessment: score = {:.3}",
822 quality_assessment.quality_score
823 );
824 }
825 })
826 }
827
828 async fn start_metrics_export(&self) -> JoinHandle<()> {
830 let metrics = Arc::clone(&self.metrics);
831 let export_config = self.config.export_config.clone();
832 let interval = Duration::from_secs(export_config.export_interval_seconds);
833
834 tokio::spawn(async move {
835 let mut interval_timer = tokio::time::interval(interval);
836
837 loop {
838 interval_timer.tick().await;
839
840 let current_metrics = metrics
842 .read()
843 .expect("rwlock should not be poisoned")
844 .clone();
845
846 if export_config.enable_prometheus {
847 Self::export_prometheus_metrics(¤t_metrics).await;
848 }
849
850 if export_config.enable_json_export {
851 if let Some(ref path) = export_config.json_export_path {
852 Self::export_json_metrics(¤t_metrics, path).await;
853 }
854 }
855
856 debug!("Exported metrics");
857 }
858 })
859 }
860
861 async fn collect_system_metrics() -> ResourceMetrics {
863 let mut random = Random::default();
866 ResourceMetrics {
867 cpu_utilization_percent: random.random::<f64>() * 100.0,
868 memory_usage_mb: 1024.0 + random.random::<f64>() * 2048.0,
869 gpu_utilization_percent: random.random::<f64>() * 100.0,
870 gpu_memory_usage_mb: 2048.0 + random.random::<f64>() * 4096.0,
871 network_io_mbps: random.random::<f64>() * 100.0,
872 disk_io_mbps: random.random::<f64>() * 50.0,
873 peak_memory_mb: 3072.0,
874 peak_gpu_memory_mb: 6144.0,
875 }
876 }
877
878 async fn detect_drift(
880 quality_history: &Arc<Mutex<VecDeque<QualityAssessment>>>,
881 ) -> DriftMetrics {
882 let history = quality_history.lock().await;
883
884 if history.len() < 2 {
885 return DriftMetrics::default();
886 }
887
888 let recent_quality = history
890 .back()
891 .expect("quality history should not be empty")
892 .quality_score;
893 let baseline_quality = history
894 .front()
895 .expect("quality history should not be empty")
896 .quality_score;
897 let quality_drift = (recent_quality - baseline_quality).abs() / baseline_quality;
898
899 let mut random = Random::default();
901 DriftMetrics {
902 quality_drift_score: quality_drift,
903 performance_drift_score: random.random::<f64>() * 0.1,
904 distribution_shift: quality_drift > 0.1,
905 concept_drift_score: random.random::<f64>() * 0.05,
906 data_quality_issues: if quality_drift > 0.2 { 1 } else { 0 },
907 drift_alerts: if quality_drift > 0.15 { 1 } else { 0 },
908 last_drift_check: Utc::now(),
909 }
910 }
911
912 async fn assess_quality() -> QualityAssessment {
914 let mut random = Random::default();
917 let quality_score = 0.8 + random.random::<f64>() * 0.2;
918
919 let mut metrics = HashMap::new();
920 metrics.insert("isotropy".to_string(), 0.7 + random.random::<f64>() * 0.3);
921 metrics.insert(
922 "neighborhood_preservation".to_string(),
923 0.8 + random.random::<f64>() * 0.2,
924 );
925 metrics.insert(
926 "clustering_quality".to_string(),
927 0.75 + random.random::<f64>() * 0.25,
928 );
929 metrics.insert(
930 "similarity_correlation".to_string(),
931 0.85 + random.random::<f64>() * 0.15,
932 );
933
934 QualityAssessment {
935 timestamp: Utc::now(),
936 quality_score,
937 metrics,
938 assessment_details: format!(
939 "Quality assessment completed with score: {quality_score:.3}"
940 ),
941 }
942 }
943
944 async fn export_prometheus_metrics(metrics: &PerformanceMetrics) {
946 debug!(
948 "Exporting Prometheus metrics: P95 latency = {:.2}ms",
949 metrics.latency.p95_latency_ms
950 );
951 }
952
953 async fn export_json_metrics(metrics: &PerformanceMetrics, path: &str) {
955 match serde_json::to_string_pretty(metrics) {
956 Ok(json) => {
957 if let Err(e) = tokio::fs::write(path, json).await {
958 error!("Failed to export JSON metrics: {}", e);
959 }
960 }
961 Err(e) => error!("Failed to serialize metrics to JSON: {}", e),
962 }
963 }
964
965 async fn check_latency_alerts(&self, latency_ms: f64) {
967 if latency_ms > self.config.alert_thresholds.max_p95_latency_ms {
968 self.send_alert(Alert {
969 alert_type: AlertType::HighLatency,
970 message: format!("High latency detected: {latency_ms:.2}ms"),
971 severity: AlertSeverity::Warning,
972 timestamp: Utc::now(),
973 metrics: HashMap::from([
974 ("latency_ms".to_string(), latency_ms),
975 (
976 "threshold_ms".to_string(),
977 self.config.alert_thresholds.max_p95_latency_ms,
978 ),
979 ]),
980 })
981 .await;
982 }
983 }
984
985 async fn check_throughput_alerts(&self, throughput_rps: f64) {
987 if throughput_rps < self.config.alert_thresholds.min_throughput_rps {
988 self.send_alert(Alert {
989 alert_type: AlertType::LowThroughput,
990 message: format!("Low throughput detected: {throughput_rps:.2} req/s"),
991 severity: AlertSeverity::Warning,
992 timestamp: Utc::now(),
993 metrics: HashMap::from([
994 ("throughput_rps".to_string(), throughput_rps),
995 (
996 "threshold_rps".to_string(),
997 self.config.alert_thresholds.min_throughput_rps,
998 ),
999 ]),
1000 })
1001 .await;
1002 }
1003 }
1004
1005 async fn check_resource_alerts(&self, resources: ResourceMetrics) {
1007 if resources.memory_usage_mb > self.config.alert_thresholds.max_memory_usage_mb {
1008 self.send_alert(Alert {
1009 alert_type: AlertType::ResourceExhaustion,
1010 message: format!("High memory usage: {:.1}MB", resources.memory_usage_mb),
1011 severity: AlertSeverity::Critical,
1012 timestamp: Utc::now(),
1013 metrics: HashMap::from([
1014 ("memory_mb".to_string(), resources.memory_usage_mb),
1015 (
1016 "threshold_mb".to_string(),
1017 self.config.alert_thresholds.max_memory_usage_mb,
1018 ),
1019 ]),
1020 })
1021 .await;
1022 }
1023
1024 if resources.gpu_memory_usage_mb > self.config.alert_thresholds.max_gpu_memory_mb {
1025 self.send_alert(Alert {
1026 alert_type: AlertType::ResourceExhaustion,
1027 message: format!(
1028 "High GPU memory usage: {:.1}MB",
1029 resources.gpu_memory_usage_mb
1030 ),
1031 severity: AlertSeverity::Critical,
1032 timestamp: Utc::now(),
1033 metrics: HashMap::from([
1034 ("gpu_memory_mb".to_string(), resources.gpu_memory_usage_mb),
1035 (
1036 "threshold_mb".to_string(),
1037 self.config.alert_thresholds.max_gpu_memory_mb,
1038 ),
1039 ]),
1040 })
1041 .await;
1042 }
1043 }
1044
1045 async fn send_alert(&self, alert: Alert) {
1047 warn!(
1048 "Alert triggered: {:?} - {}",
1049 alert.alert_type, alert.message
1050 );
1051
1052 for handler in &self.alert_handlers {
1053 if let Err(e) = handler.handle_alert(alert.clone()) {
1054 error!("Alert handler failed: {}", e);
1055 }
1056 }
1057 }
1058
1059 async fn handle_critical_error(&self, error_event: ErrorEvent) {
1061 error!(
1062 "Critical error occurred: {} - {}",
1063 error_event.error_type, error_event.error_message
1064 );
1065
1066 self.send_alert(Alert {
1067 alert_type: AlertType::SystemFailure,
1068 message: format!("Critical error: {}", error_event.error_message),
1069 severity: AlertSeverity::Emergency,
1070 timestamp: error_event.timestamp,
1071 metrics: HashMap::new(),
1072 })
1073 .await;
1074 }
1075
1076 pub fn get_performance_summary(&self) -> String {
1078 let metrics = self.metrics.read().expect("rwlock should not be poisoned");
1079
1080 format!(
1081 "Performance Summary:\n\
1082 - P95 Latency: {:.2}ms\n\
1083 - Throughput: {:.1} req/s\n\
1084 - Error Rate: {:.3}%\n\
1085 - Cache Hit Rate: {:.1}%\n\
1086 - Memory Usage: {:.1}MB\n\
1087 - Quality Score: {:.3}",
1088 metrics.latency.p95_latency_ms,
1089 metrics.throughput.requests_per_second,
1090 (metrics.errors.total_errors as f64 / metrics.throughput.total_requests.max(1) as f64)
1091 * 100.0,
1092 metrics.cache.hit_rate * 100.0,
1093 metrics.resources.memory_usage_mb,
1094 metrics.quality.avg_quality_score
1095 )
1096 }
1097}
1098
1099pub struct ConsoleAlertHandler;
1101
1102impl AlertHandler for ConsoleAlertHandler {
1103 fn handle_alert(&self, alert: Alert) -> Result<()> {
1104 println!(
1105 "🚨 ALERT [{}]: {} - {}",
1106 format!("{:?}", alert.severity).to_uppercase(),
1107 alert.message,
1108 alert.timestamp.format("%Y-%m-%d %H:%M:%S UTC")
1109 );
1110 Ok(())
1111 }
1112}
1113
1114pub struct SlackAlertHandler {
1116 pub webhook_url: String,
1117}
1118
1119impl AlertHandler for SlackAlertHandler {
1120 fn handle_alert(&self, alert: Alert) -> Result<()> {
1121 info!(
1123 "Would send Slack alert to {}: {}",
1124 self.webhook_url, alert.message
1125 );
1126 Ok(())
1127 }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use super::*;
1133
1134 #[tokio::test]
1135 async fn test_performance_monitor_creation() {
1136 let config = MonitoringConfig::default();
1137 let monitor = PerformanceMonitor::new(config);
1138
1139 let metrics = monitor.get_metrics();
1140 assert_eq!(metrics.latency.total_measurements, 0);
1141 assert_eq!(metrics.throughput.total_requests, 0);
1142 }
1143
1144 #[tokio::test]
1145 async fn test_latency_recording() {
1146 let config = MonitoringConfig::default();
1147 let monitor = PerformanceMonitor::new(config);
1148
1149 monitor.record_latency(100.0).await;
1150 monitor.record_latency(150.0).await;
1151 monitor.record_latency(120.0).await;
1152
1153 let metrics = monitor.get_metrics();
1154 assert_eq!(metrics.latency.total_measurements, 3);
1155 assert_eq!(metrics.latency.max_latency_ms, 150.0);
1156 assert_eq!(metrics.latency.min_latency_ms, 100.0);
1157 }
1158
1159 #[tokio::test]
1160 async fn test_error_recording() {
1161 let config = MonitoringConfig::default();
1162 let monitor = PerformanceMonitor::new(config);
1163
1164 let error_event = ErrorEvent {
1165 timestamp: Utc::now(),
1166 error_type: "timeout".to_string(),
1167 error_message: "Request timeout".to_string(),
1168 severity: ErrorSeverity::Medium,
1169 context: HashMap::new(),
1170 };
1171
1172 monitor.record_error(error_event).await;
1173
1174 let metrics = monitor.get_metrics();
1175 assert_eq!(metrics.errors.total_errors, 1);
1176 assert_eq!(metrics.errors.timeout_errors, 1);
1177 }
1178
1179 #[test]
1180 fn test_alert_thresholds_default() {
1181 let thresholds = AlertThresholds::default();
1182 assert_eq!(thresholds.max_p95_latency_ms, 500.0);
1183 assert_eq!(thresholds.min_throughput_rps, 100.0);
1184 assert_eq!(thresholds.max_error_rate, 0.05);
1185 }
1186
1187 #[test]
1188 fn test_console_alert_handler() {
1189 let handler = ConsoleAlertHandler;
1190 let alert = Alert {
1191 alert_type: AlertType::HighLatency,
1192 message: "Test alert".to_string(),
1193 severity: AlertSeverity::Warning,
1194 timestamp: Utc::now(),
1195 metrics: HashMap::new(),
1196 };
1197
1198 assert!(handler.handle_alert(alert).is_ok());
1199 }
1200}