1use anyhow::Result;
7use chrono::{DateTime, Utc};
8use scirs2_core::random::{Random, Rng};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::sync::{Arc, RwLock};
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::task::JoinHandle;
15use tracing::{debug, error, info, warn};
16
17#[derive(Debug, Clone, Serialize, Deserialize, Default)]
19pub struct PerformanceMetrics {
20 pub latency: LatencyMetrics,
22 pub throughput: ThroughputMetrics,
24 pub resources: ResourceMetrics,
26 pub quality: QualityMetrics,
28 pub errors: ErrorMetrics,
30 pub cache: CacheMetrics,
32 pub drift: DriftMetrics,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct LatencyMetrics {
39 pub avg_embedding_time_ms: f64,
41 pub p50_latency_ms: f64,
43 pub p95_latency_ms: f64,
45 pub p99_latency_ms: f64,
47 pub max_latency_ms: f64,
49 pub min_latency_ms: f64,
51 pub end_to_end_latency_ms: f64,
53 pub model_inference_time_ms: f64,
55 pub queue_wait_time_ms: f64,
57 pub total_measurements: u64,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ThroughputMetrics {
64 pub requests_per_second: f64,
66 pub embeddings_per_second: f64,
68 pub batches_per_second: f64,
70 pub peak_throughput: f64,
72 pub concurrent_requests: u32,
74 pub max_concurrent_requests: u32,
76 pub total_requests: u64,
78 pub failed_requests: u64,
80 pub success_rate: f64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ResourceMetrics {
87 pub cpu_utilization_percent: f64,
89 pub memory_usage_mb: f64,
91 pub gpu_utilization_percent: f64,
93 pub gpu_memory_usage_mb: f64,
95 pub network_io_mbps: f64,
97 pub disk_io_mbps: f64,
99 pub peak_memory_mb: f64,
101 pub peak_gpu_memory_mb: f64,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct QualityMetrics {
108 pub avg_quality_score: f64,
110 pub isotropy_score: f64,
112 pub neighborhood_preservation: f64,
114 pub clustering_quality: f64,
116 pub similarity_correlation: f64,
118 pub quality_alerts: u32,
120 pub last_assessment: DateTime<Utc>,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct ErrorMetrics {
127 pub total_errors: u64,
129 pub error_rate_per_hour: f64,
131 pub errors_by_type: HashMap<String, u64>,
133 pub critical_errors: u64,
135 pub timeout_errors: u64,
137 pub model_errors: u64,
139 pub system_errors: u64,
141 pub last_error: Option<DateTime<Utc>>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct CacheMetrics {
148 pub hit_rate: f64,
150 pub l1_hit_rate: f64,
152 pub l2_hit_rate: f64,
154 pub l3_hit_rate: f64,
156 pub cache_memory_mb: f64,
158 pub cache_evictions: u64,
160 pub time_saved_seconds: f64,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct DriftMetrics {
167 pub quality_drift_score: f64,
169 pub performance_drift_score: f64,
171 pub distribution_shift: bool,
173 pub concept_drift_score: f64,
175 pub data_quality_issues: u32,
177 pub drift_alerts: u32,
179 pub last_drift_check: DateTime<Utc>,
181}
182
183pub struct PerformanceMonitor {
185 metrics: Arc<RwLock<PerformanceMetrics>>,
187 latency_window: Arc<Mutex<VecDeque<f64>>>,
189 throughput_window: Arc<Mutex<VecDeque<f64>>>,
191 error_log: Arc<Mutex<VecDeque<ErrorEvent>>>,
193 quality_history: Arc<Mutex<VecDeque<QualityAssessment>>>,
195 config: MonitoringConfig,
197 monitoring_tasks: Vec<JoinHandle<()>>,
199 alert_handlers: Vec<Box<dyn AlertHandler + Send + Sync>>,
201}
202
203#[derive(Debug, Clone)]
205pub struct MonitoringConfig {
206 pub collection_interval_seconds: u64,
208 pub latency_window_size: usize,
210 pub throughput_window_size: usize,
212 pub quality_assessment_interval_seconds: u64,
214 pub drift_detection_interval_seconds: u64,
216 pub enable_alerting: bool,
218 pub alert_thresholds: AlertThresholds,
220 pub export_config: ExportConfig,
222}
223
224#[derive(Debug, Clone)]
226pub struct AlertThresholds {
227 pub max_p95_latency_ms: f64,
229 pub min_throughput_rps: f64,
231 pub max_error_rate: f64,
233 pub min_cache_hit_rate: f64,
235 pub max_quality_drift: f64,
237 pub max_memory_usage_mb: f64,
239 pub max_gpu_memory_mb: f64,
241}
242
243#[derive(Debug, Clone)]
245pub struct ExportConfig {
246 pub enable_prometheus: bool,
248 pub prometheus_port: u16,
250 pub enable_opentelemetry: bool,
252 pub otlp_endpoint: Option<String>,
254 pub export_interval_seconds: u64,
256 pub enable_json_export: bool,
258 pub json_export_path: Option<String>,
260}
261
262#[derive(Debug, Clone)]
264pub struct ErrorEvent {
265 pub timestamp: DateTime<Utc>,
266 pub error_type: String,
267 pub error_message: String,
268 pub severity: ErrorSeverity,
269 pub context: HashMap<String, String>,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub enum ErrorSeverity {
275 Low,
276 Medium,
277 High,
278 Critical,
279}
280
281#[derive(Debug, Clone)]
283pub struct QualityAssessment {
284 pub timestamp: DateTime<Utc>,
285 pub quality_score: f64,
286 pub metrics: HashMap<String, f64>,
287 pub assessment_details: String,
288}
289
290pub trait AlertHandler {
292 fn handle_alert(&self, alert: Alert) -> Result<()>;
293}
294
295#[derive(Debug, Clone)]
297pub struct Alert {
298 pub alert_type: AlertType,
299 pub message: String,
300 pub severity: AlertSeverity,
301 pub timestamp: DateTime<Utc>,
302 pub metrics: HashMap<String, f64>,
303}
304
305#[derive(Debug, Clone)]
307pub enum AlertType {
308 HighLatency,
309 LowThroughput,
310 HighErrorRate,
311 LowCacheHitRate,
312 QualityDrift,
313 PerformanceDrift,
314 ResourceExhaustion,
315 SystemFailure,
316}
317
318#[derive(Debug, Clone)]
320pub enum AlertSeverity {
321 Info,
322 Warning,
323 Critical,
324 Emergency,
325}
326
327impl Default for MonitoringConfig {
328 fn default() -> Self {
329 Self {
330 collection_interval_seconds: 10,
331 latency_window_size: 1000,
332 throughput_window_size: 100,
333 quality_assessment_interval_seconds: 300, drift_detection_interval_seconds: 3600, enable_alerting: true,
336 alert_thresholds: AlertThresholds::default(),
337 export_config: ExportConfig::default(),
338 }
339 }
340}
341
342impl Default for AlertThresholds {
343 fn default() -> Self {
344 Self {
345 max_p95_latency_ms: 500.0,
346 min_throughput_rps: 100.0,
347 max_error_rate: 0.05, min_cache_hit_rate: 0.8, max_quality_drift: 0.1,
350 max_memory_usage_mb: 4096.0, max_gpu_memory_mb: 8192.0, }
353 }
354}
355
356impl Default for ExportConfig {
357 fn default() -> Self {
358 Self {
359 enable_prometheus: true,
360 prometheus_port: 9090,
361 enable_opentelemetry: false,
362 otlp_endpoint: None,
363 export_interval_seconds: 60,
364 enable_json_export: false,
365 json_export_path: None,
366 }
367 }
368}
369
370impl Default for LatencyMetrics {
371 fn default() -> Self {
372 Self {
373 avg_embedding_time_ms: 0.0,
374 p50_latency_ms: 0.0,
375 p95_latency_ms: 0.0,
376 p99_latency_ms: 0.0,
377 max_latency_ms: 0.0,
378 min_latency_ms: f64::MAX,
379 end_to_end_latency_ms: 0.0,
380 model_inference_time_ms: 0.0,
381 queue_wait_time_ms: 0.0,
382 total_measurements: 0,
383 }
384 }
385}
386
387impl Default for ThroughputMetrics {
388 fn default() -> Self {
389 Self {
390 requests_per_second: 0.0,
391 embeddings_per_second: 0.0,
392 batches_per_second: 0.0,
393 peak_throughput: 0.0,
394 concurrent_requests: 0,
395 max_concurrent_requests: 0,
396 total_requests: 0,
397 failed_requests: 0,
398 success_rate: 1.0,
399 }
400 }
401}
402
403impl Default for ResourceMetrics {
404 fn default() -> Self {
405 Self {
406 cpu_utilization_percent: 0.0,
407 memory_usage_mb: 0.0,
408 gpu_utilization_percent: 0.0,
409 gpu_memory_usage_mb: 0.0,
410 network_io_mbps: 0.0,
411 disk_io_mbps: 0.0,
412 peak_memory_mb: 0.0,
413 peak_gpu_memory_mb: 0.0,
414 }
415 }
416}
417
418impl Default for QualityMetrics {
419 fn default() -> Self {
420 Self {
421 avg_quality_score: 0.0,
422 isotropy_score: 0.0,
423 neighborhood_preservation: 0.0,
424 clustering_quality: 0.0,
425 similarity_correlation: 0.0,
426 quality_alerts: 0,
427 last_assessment: Utc::now(),
428 }
429 }
430}
431
432impl Default for ErrorMetrics {
433 fn default() -> Self {
434 Self {
435 total_errors: 0,
436 error_rate_per_hour: 0.0,
437 errors_by_type: HashMap::new(),
438 critical_errors: 0,
439 timeout_errors: 0,
440 model_errors: 0,
441 system_errors: 0,
442 last_error: None,
443 }
444 }
445}
446
447impl Default for CacheMetrics {
448 fn default() -> Self {
449 Self {
450 hit_rate: 0.0,
451 l1_hit_rate: 0.0,
452 l2_hit_rate: 0.0,
453 l3_hit_rate: 0.0,
454 cache_memory_mb: 0.0,
455 cache_evictions: 0,
456 time_saved_seconds: 0.0,
457 }
458 }
459}
460
461impl Default for DriftMetrics {
462 fn default() -> Self {
463 Self {
464 quality_drift_score: 0.0,
465 performance_drift_score: 0.0,
466 distribution_shift: false,
467 concept_drift_score: 0.0,
468 data_quality_issues: 0,
469 drift_alerts: 0,
470 last_drift_check: Utc::now(),
471 }
472 }
473}
474
475impl PerformanceMonitor {
476 pub fn new(config: MonitoringConfig) -> Self {
478 Self {
479 metrics: Arc::new(RwLock::new(PerformanceMetrics::default())),
480 latency_window: Arc::new(Mutex::new(VecDeque::with_capacity(
481 config.latency_window_size,
482 ))),
483 throughput_window: Arc::new(Mutex::new(VecDeque::with_capacity(
484 config.throughput_window_size,
485 ))),
486 error_log: Arc::new(Mutex::new(VecDeque::with_capacity(1000))),
487 quality_history: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
488 config,
489 monitoring_tasks: Vec::new(),
490 alert_handlers: Vec::new(),
491 }
492 }
493
494 pub async fn start(&mut self) -> Result<()> {
496 info!("Starting performance monitoring system");
497
498 let metrics_task = self.start_metrics_collection().await;
500 self.monitoring_tasks.push(metrics_task);
501
502 let drift_task = self.start_drift_detection().await;
504 self.monitoring_tasks.push(drift_task);
505
506 let quality_task = self.start_quality_assessment().await;
508 self.monitoring_tasks.push(quality_task);
509
510 if self.config.export_config.enable_prometheus {
512 let export_task = self.start_metrics_export().await;
513 self.monitoring_tasks.push(export_task);
514 }
515
516 info!("Performance monitoring system started successfully");
517 Ok(())
518 }
519
520 pub async fn stop(&mut self) {
522 info!("Stopping performance monitoring system");
523
524 for task in self.monitoring_tasks.drain(..) {
525 task.abort();
526 }
527
528 info!("Performance monitoring system stopped");
529 }
530
531 pub async fn record_latency(&self, latency_ms: f64) {
533 let mut window = self.latency_window.lock().await;
534
535 if window.len() >= self.config.latency_window_size {
537 window.pop_front();
538 }
539 window.push_back(latency_ms);
540
541 {
543 let mut metrics = self.metrics.write().unwrap();
544 metrics.latency.total_measurements += 1;
545
546 metrics.latency.max_latency_ms = metrics.latency.max_latency_ms.max(latency_ms);
548 metrics.latency.min_latency_ms = metrics.latency.min_latency_ms.min(latency_ms);
549
550 let alpha = 0.1; metrics.latency.avg_embedding_time_ms =
553 alpha * latency_ms + (1.0 - alpha) * metrics.latency.avg_embedding_time_ms;
554
555 let mut sorted_latencies: Vec<f64> = window.iter().copied().collect();
557 sorted_latencies.sort_by(|a, b| a.partial_cmp(b).unwrap());
558
559 if !sorted_latencies.is_empty() {
560 let len = sorted_latencies.len();
561 metrics.latency.p50_latency_ms = sorted_latencies[len * 50 / 100];
562 metrics.latency.p95_latency_ms = sorted_latencies[len * 95 / 100];
563 metrics.latency.p99_latency_ms = sorted_latencies[len * 99 / 100];
564 }
565 }
566
567 if self.config.enable_alerting {
569 self.check_latency_alerts(latency_ms).await;
570 }
571 }
572
573 pub async fn record_throughput(&self, requests_per_second: f64) {
575 let mut window = self.throughput_window.lock().await;
576
577 if window.len() >= self.config.throughput_window_size {
579 window.pop_front();
580 }
581 window.push_back(requests_per_second);
582
583 {
585 let mut metrics = self.metrics.write().unwrap();
586 metrics.throughput.requests_per_second = requests_per_second;
587 metrics.throughput.peak_throughput =
588 metrics.throughput.peak_throughput.max(requests_per_second);
589
590 let avg_throughput = window.iter().sum::<f64>() / window.len() as f64;
592 metrics.throughput.requests_per_second = avg_throughput;
593 }
594
595 if self.config.enable_alerting {
597 self.check_throughput_alerts(requests_per_second).await;
598 }
599 }
600
601 pub async fn record_error(&self, error_event: ErrorEvent) {
603 let mut error_log = self.error_log.lock().await;
604
605 if error_log.len() >= 1000 {
607 error_log.pop_front();
608 }
609 error_log.push_back(error_event.clone());
610
611 {
613 let mut metrics = self.metrics.write().unwrap();
614 metrics.errors.total_errors += 1;
615 metrics.errors.last_error = Some(error_event.timestamp);
616
617 *metrics
619 .errors
620 .errors_by_type
621 .entry(error_event.error_type.clone())
622 .or_insert(0) += 1;
623
624 if let ErrorSeverity::Critical = error_event.severity {
626 metrics.errors.critical_errors += 1
627 }
628
629 if error_event.error_type.contains("timeout") {
630 metrics.errors.timeout_errors += 1;
631 } else if error_event.error_type.contains("model") {
632 metrics.errors.model_errors += 1;
633 } else {
634 metrics.errors.system_errors += 1;
635 }
636
637 let total_requests = metrics.throughput.total_requests;
639 if total_requests > 0 {
640 metrics.errors.error_rate_per_hour =
641 (metrics.errors.total_errors as f64 / total_requests as f64) * 3600.0;
642 }
643 }
644
645 if matches!(error_event.severity, ErrorSeverity::Critical) {
647 self.handle_critical_error(error_event).await;
648 }
649 }
650
651 pub async fn update_resource_metrics(&self, resources: ResourceMetrics) {
653 {
654 let mut metrics = self.metrics.write().unwrap();
655
656 metrics.resources.peak_memory_mb = metrics
658 .resources
659 .peak_memory_mb
660 .max(resources.memory_usage_mb);
661 metrics.resources.peak_gpu_memory_mb = metrics
662 .resources
663 .peak_gpu_memory_mb
664 .max(resources.gpu_memory_usage_mb);
665
666 metrics.resources = resources.clone();
667 }
668
669 if self.config.enable_alerting {
671 self.check_resource_alerts(resources).await;
672 }
673 }
674
675 pub async fn update_cache_metrics(&self, cache_metrics: CacheMetrics) {
677 {
678 let mut metrics = self.metrics.write().unwrap();
679 metrics.cache = cache_metrics.clone();
680 }
681
682 if self.config.enable_alerting
684 && cache_metrics.hit_rate < self.config.alert_thresholds.min_cache_hit_rate
685 {
686 self.send_alert(Alert {
687 alert_type: AlertType::LowCacheHitRate,
688 message: format!(
689 "Cache hit rate dropped to {:.2}%",
690 cache_metrics.hit_rate * 100.0
691 ),
692 severity: AlertSeverity::Warning,
693 timestamp: Utc::now(),
694 metrics: HashMap::from([
695 ("hit_rate".to_string(), cache_metrics.hit_rate),
696 (
697 "threshold".to_string(),
698 self.config.alert_thresholds.min_cache_hit_rate,
699 ),
700 ]),
701 })
702 .await;
703 }
704 }
705
706 pub fn get_metrics(&self) -> PerformanceMetrics {
708 self.metrics.read().unwrap().clone()
709 }
710
711 pub fn add_alert_handler(&mut self, handler: Box<dyn AlertHandler + Send + Sync>) {
713 self.alert_handlers.push(handler);
714 }
715
716 async fn start_metrics_collection(&self) -> JoinHandle<()> {
718 let metrics = Arc::clone(&self.metrics);
719 let interval = Duration::from_secs(self.config.collection_interval_seconds);
720
721 tokio::spawn(async move {
722 let mut interval_timer = tokio::time::interval(interval);
723
724 loop {
725 interval_timer.tick().await;
726
727 let system_metrics = Self::collect_system_metrics().await;
729
730 {
732 let mut metrics = metrics.write().unwrap();
733 metrics.resources = system_metrics;
734 }
735
736 debug!("Collected system metrics");
737 }
738 })
739 }
740
741 async fn start_drift_detection(&self) -> JoinHandle<()> {
743 let metrics = Arc::clone(&self.metrics);
744 let quality_history = Arc::clone(&self.quality_history);
745 let interval = Duration::from_secs(self.config.drift_detection_interval_seconds);
746
747 tokio::spawn(async move {
748 let mut interval_timer = tokio::time::interval(interval);
749
750 loop {
751 interval_timer.tick().await;
752
753 let drift_metrics = Self::detect_drift(&quality_history).await;
755
756 {
758 let mut metrics = metrics.write().unwrap();
759 metrics.drift = drift_metrics;
760 metrics.drift.last_drift_check = Utc::now();
761 }
762
763 info!("Performed drift detection analysis");
764 }
765 })
766 }
767
768 async fn start_quality_assessment(&self) -> JoinHandle<()> {
770 let metrics = Arc::clone(&self.metrics);
771 let quality_history = Arc::clone(&self.quality_history);
772 let interval = Duration::from_secs(self.config.quality_assessment_interval_seconds);
773
774 tokio::spawn(async move {
775 let mut interval_timer = tokio::time::interval(interval);
776
777 loop {
778 interval_timer.tick().await;
779
780 let quality_assessment = Self::assess_quality().await;
782
783 {
785 let mut history = quality_history.lock().await;
786 if history.len() >= 100 {
787 history.pop_front();
788 }
789 history.push_back(quality_assessment.clone());
790 }
791
792 {
794 let mut metrics = metrics.write().unwrap();
795 metrics.quality.avg_quality_score = quality_assessment.quality_score;
796 metrics.quality.last_assessment = quality_assessment.timestamp;
797
798 for (key, value) in &quality_assessment.metrics {
800 match key.as_str() {
801 "isotropy" => metrics.quality.isotropy_score = *value,
802 "neighborhood_preservation" => {
803 metrics.quality.neighborhood_preservation = *value
804 }
805 "clustering_quality" => metrics.quality.clustering_quality = *value,
806 "similarity_correlation" => {
807 metrics.quality.similarity_correlation = *value
808 }
809 _ => {}
810 }
811 }
812 }
813
814 info!(
815 "Performed quality assessment: score = {:.3}",
816 quality_assessment.quality_score
817 );
818 }
819 })
820 }
821
822 async fn start_metrics_export(&self) -> JoinHandle<()> {
824 let metrics = Arc::clone(&self.metrics);
825 let export_config = self.config.export_config.clone();
826 let interval = Duration::from_secs(export_config.export_interval_seconds);
827
828 tokio::spawn(async move {
829 let mut interval_timer = tokio::time::interval(interval);
830
831 loop {
832 interval_timer.tick().await;
833
834 let current_metrics = metrics.read().unwrap().clone();
836
837 if export_config.enable_prometheus {
838 Self::export_prometheus_metrics(¤t_metrics).await;
839 }
840
841 if export_config.enable_json_export {
842 if let Some(ref path) = export_config.json_export_path {
843 Self::export_json_metrics(¤t_metrics, path).await;
844 }
845 }
846
847 debug!("Exported metrics");
848 }
849 })
850 }
851
852 async fn collect_system_metrics() -> ResourceMetrics {
854 let mut random = Random::default();
857 ResourceMetrics {
858 cpu_utilization_percent: random.random::<f64>() * 100.0,
859 memory_usage_mb: 1024.0 + random.random::<f64>() * 2048.0,
860 gpu_utilization_percent: random.random::<f64>() * 100.0,
861 gpu_memory_usage_mb: 2048.0 + random.random::<f64>() * 4096.0,
862 network_io_mbps: random.random::<f64>() * 100.0,
863 disk_io_mbps: random.random::<f64>() * 50.0,
864 peak_memory_mb: 3072.0,
865 peak_gpu_memory_mb: 6144.0,
866 }
867 }
868
869 async fn detect_drift(
871 quality_history: &Arc<Mutex<VecDeque<QualityAssessment>>>,
872 ) -> DriftMetrics {
873 let history = quality_history.lock().await;
874
875 if history.len() < 2 {
876 return DriftMetrics::default();
877 }
878
879 let recent_quality = history.back().unwrap().quality_score;
881 let baseline_quality = history.front().unwrap().quality_score;
882 let quality_drift = (recent_quality - baseline_quality).abs() / baseline_quality;
883
884 let mut random = Random::default();
886 DriftMetrics {
887 quality_drift_score: quality_drift,
888 performance_drift_score: random.random::<f64>() * 0.1,
889 distribution_shift: quality_drift > 0.1,
890 concept_drift_score: random.random::<f64>() * 0.05,
891 data_quality_issues: if quality_drift > 0.2 { 1 } else { 0 },
892 drift_alerts: if quality_drift > 0.15 { 1 } else { 0 },
893 last_drift_check: Utc::now(),
894 }
895 }
896
897 async fn assess_quality() -> QualityAssessment {
899 let mut random = Random::default();
902 let quality_score = 0.8 + random.random::<f64>() * 0.2;
903
904 let mut metrics = HashMap::new();
905 metrics.insert("isotropy".to_string(), 0.7 + random.random::<f64>() * 0.3);
906 metrics.insert(
907 "neighborhood_preservation".to_string(),
908 0.8 + random.random::<f64>() * 0.2,
909 );
910 metrics.insert(
911 "clustering_quality".to_string(),
912 0.75 + random.random::<f64>() * 0.25,
913 );
914 metrics.insert(
915 "similarity_correlation".to_string(),
916 0.85 + random.random::<f64>() * 0.15,
917 );
918
919 QualityAssessment {
920 timestamp: Utc::now(),
921 quality_score,
922 metrics,
923 assessment_details: format!(
924 "Quality assessment completed with score: {quality_score:.3}"
925 ),
926 }
927 }
928
929 async fn export_prometheus_metrics(metrics: &PerformanceMetrics) {
931 debug!(
933 "Exporting Prometheus metrics: P95 latency = {:.2}ms",
934 metrics.latency.p95_latency_ms
935 );
936 }
937
938 async fn export_json_metrics(metrics: &PerformanceMetrics, path: &str) {
940 match serde_json::to_string_pretty(metrics) {
941 Ok(json) => {
942 if let Err(e) = tokio::fs::write(path, json).await {
943 error!("Failed to export JSON metrics: {}", e);
944 }
945 }
946 Err(e) => error!("Failed to serialize metrics to JSON: {}", e),
947 }
948 }
949
950 async fn check_latency_alerts(&self, latency_ms: f64) {
952 if latency_ms > self.config.alert_thresholds.max_p95_latency_ms {
953 self.send_alert(Alert {
954 alert_type: AlertType::HighLatency,
955 message: format!("High latency detected: {latency_ms:.2}ms"),
956 severity: AlertSeverity::Warning,
957 timestamp: Utc::now(),
958 metrics: HashMap::from([
959 ("latency_ms".to_string(), latency_ms),
960 (
961 "threshold_ms".to_string(),
962 self.config.alert_thresholds.max_p95_latency_ms,
963 ),
964 ]),
965 })
966 .await;
967 }
968 }
969
970 async fn check_throughput_alerts(&self, throughput_rps: f64) {
972 if throughput_rps < self.config.alert_thresholds.min_throughput_rps {
973 self.send_alert(Alert {
974 alert_type: AlertType::LowThroughput,
975 message: format!("Low throughput detected: {throughput_rps:.2} req/s"),
976 severity: AlertSeverity::Warning,
977 timestamp: Utc::now(),
978 metrics: HashMap::from([
979 ("throughput_rps".to_string(), throughput_rps),
980 (
981 "threshold_rps".to_string(),
982 self.config.alert_thresholds.min_throughput_rps,
983 ),
984 ]),
985 })
986 .await;
987 }
988 }
989
990 async fn check_resource_alerts(&self, resources: ResourceMetrics) {
992 if resources.memory_usage_mb > self.config.alert_thresholds.max_memory_usage_mb {
993 self.send_alert(Alert {
994 alert_type: AlertType::ResourceExhaustion,
995 message: format!("High memory usage: {:.1}MB", resources.memory_usage_mb),
996 severity: AlertSeverity::Critical,
997 timestamp: Utc::now(),
998 metrics: HashMap::from([
999 ("memory_mb".to_string(), resources.memory_usage_mb),
1000 (
1001 "threshold_mb".to_string(),
1002 self.config.alert_thresholds.max_memory_usage_mb,
1003 ),
1004 ]),
1005 })
1006 .await;
1007 }
1008
1009 if resources.gpu_memory_usage_mb > self.config.alert_thresholds.max_gpu_memory_mb {
1010 self.send_alert(Alert {
1011 alert_type: AlertType::ResourceExhaustion,
1012 message: format!(
1013 "High GPU memory usage: {:.1}MB",
1014 resources.gpu_memory_usage_mb
1015 ),
1016 severity: AlertSeverity::Critical,
1017 timestamp: Utc::now(),
1018 metrics: HashMap::from([
1019 ("gpu_memory_mb".to_string(), resources.gpu_memory_usage_mb),
1020 (
1021 "threshold_mb".to_string(),
1022 self.config.alert_thresholds.max_gpu_memory_mb,
1023 ),
1024 ]),
1025 })
1026 .await;
1027 }
1028 }
1029
1030 async fn send_alert(&self, alert: Alert) {
1032 warn!(
1033 "Alert triggered: {:?} - {}",
1034 alert.alert_type, alert.message
1035 );
1036
1037 for handler in &self.alert_handlers {
1038 if let Err(e) = handler.handle_alert(alert.clone()) {
1039 error!("Alert handler failed: {}", e);
1040 }
1041 }
1042 }
1043
1044 async fn handle_critical_error(&self, error_event: ErrorEvent) {
1046 error!(
1047 "Critical error occurred: {} - {}",
1048 error_event.error_type, error_event.error_message
1049 );
1050
1051 self.send_alert(Alert {
1052 alert_type: AlertType::SystemFailure,
1053 message: format!("Critical error: {}", error_event.error_message),
1054 severity: AlertSeverity::Emergency,
1055 timestamp: error_event.timestamp,
1056 metrics: HashMap::new(),
1057 })
1058 .await;
1059 }
1060
1061 pub fn get_performance_summary(&self) -> String {
1063 let metrics = self.metrics.read().unwrap();
1064
1065 format!(
1066 "Performance Summary:\n\
1067 - P95 Latency: {:.2}ms\n\
1068 - Throughput: {:.1} req/s\n\
1069 - Error Rate: {:.3}%\n\
1070 - Cache Hit Rate: {:.1}%\n\
1071 - Memory Usage: {:.1}MB\n\
1072 - Quality Score: {:.3}",
1073 metrics.latency.p95_latency_ms,
1074 metrics.throughput.requests_per_second,
1075 (metrics.errors.total_errors as f64 / metrics.throughput.total_requests.max(1) as f64)
1076 * 100.0,
1077 metrics.cache.hit_rate * 100.0,
1078 metrics.resources.memory_usage_mb,
1079 metrics.quality.avg_quality_score
1080 )
1081 }
1082}
1083
1084pub struct ConsoleAlertHandler;
1086
1087impl AlertHandler for ConsoleAlertHandler {
1088 fn handle_alert(&self, alert: Alert) -> Result<()> {
1089 println!(
1090 "🚨 ALERT [{}]: {} - {}",
1091 format!("{:?}", alert.severity).to_uppercase(),
1092 alert.message,
1093 alert.timestamp.format("%Y-%m-%d %H:%M:%S UTC")
1094 );
1095 Ok(())
1096 }
1097}
1098
1099pub struct SlackAlertHandler {
1101 pub webhook_url: String,
1102}
1103
1104impl AlertHandler for SlackAlertHandler {
1105 fn handle_alert(&self, alert: Alert) -> Result<()> {
1106 info!(
1108 "Would send Slack alert to {}: {}",
1109 self.webhook_url, alert.message
1110 );
1111 Ok(())
1112 }
1113}
1114
1115#[cfg(test)]
1116mod tests {
1117 use super::*;
1118
1119 #[tokio::test]
1120 async fn test_performance_monitor_creation() {
1121 let config = MonitoringConfig::default();
1122 let monitor = PerformanceMonitor::new(config);
1123
1124 let metrics = monitor.get_metrics();
1125 assert_eq!(metrics.latency.total_measurements, 0);
1126 assert_eq!(metrics.throughput.total_requests, 0);
1127 }
1128
1129 #[tokio::test]
1130 async fn test_latency_recording() {
1131 let config = MonitoringConfig::default();
1132 let monitor = PerformanceMonitor::new(config);
1133
1134 monitor.record_latency(100.0).await;
1135 monitor.record_latency(150.0).await;
1136 monitor.record_latency(120.0).await;
1137
1138 let metrics = monitor.get_metrics();
1139 assert_eq!(metrics.latency.total_measurements, 3);
1140 assert_eq!(metrics.latency.max_latency_ms, 150.0);
1141 assert_eq!(metrics.latency.min_latency_ms, 100.0);
1142 }
1143
1144 #[tokio::test]
1145 async fn test_error_recording() {
1146 let config = MonitoringConfig::default();
1147 let monitor = PerformanceMonitor::new(config);
1148
1149 let error_event = ErrorEvent {
1150 timestamp: Utc::now(),
1151 error_type: "timeout".to_string(),
1152 error_message: "Request timeout".to_string(),
1153 severity: ErrorSeverity::Medium,
1154 context: HashMap::new(),
1155 };
1156
1157 monitor.record_error(error_event).await;
1158
1159 let metrics = monitor.get_metrics();
1160 assert_eq!(metrics.errors.total_errors, 1);
1161 assert_eq!(metrics.errors.timeout_errors, 1);
1162 }
1163
1164 #[test]
1165 fn test_alert_thresholds_default() {
1166 let thresholds = AlertThresholds::default();
1167 assert_eq!(thresholds.max_p95_latency_ms, 500.0);
1168 assert_eq!(thresholds.min_throughput_rps, 100.0);
1169 assert_eq!(thresholds.max_error_rate, 0.05);
1170 }
1171
1172 #[test]
1173 fn test_console_alert_handler() {
1174 let handler = ConsoleAlertHandler;
1175 let alert = Alert {
1176 alert_type: AlertType::HighLatency,
1177 message: "Test alert".to_string(),
1178 severity: AlertSeverity::Warning,
1179 timestamp: Utc::now(),
1180 metrics: HashMap::new(),
1181 };
1182
1183 assert!(handler.handle_alert(alert).is_ok());
1184 }
1185}