1pub use crate::monitoring_health::{
10 Alert, AlertHandler, AlertSeverity, AlertThresholds, AlertType, ComponentHealth,
11 ConsoleAlertHandler, HealthCheckResult, HealthChecker, HealthStatus, SlackAlertHandler,
12};
13pub use crate::monitoring_metrics::{
14 CacheMetrics, DriftMetrics, ErrorMetrics, LatencyMetrics, MetricsCollector, PerformanceMetrics,
15 QualityMetrics, ResourceMetrics, ThroughputMetrics,
16};
17
18use anyhow::Result;
19use chrono::Utc;
20use scirs2_core::random::{Random, RngExt};
21use std::collections::{HashMap, VecDeque};
22use std::sync::{Arc, RwLock};
23use std::time::Duration;
24use tokio::sync::Mutex;
25use tokio::task::JoinHandle;
26use tracing::{debug, error, info, warn};
27
28pub use crate::monitoring_metrics::{ErrorEvent, ErrorSeverity, QualityAssessment};
30
31#[derive(Debug, Clone)]
33pub struct MonitoringConfig {
34 pub collection_interval_seconds: u64,
36 pub latency_window_size: usize,
38 pub throughput_window_size: usize,
40 pub quality_assessment_interval_seconds: u64,
42 pub drift_detection_interval_seconds: u64,
44 pub enable_alerting: bool,
46 pub alert_thresholds: AlertThresholds,
48 pub export_config: ExportConfig,
50}
51
52#[derive(Debug, Clone)]
54pub struct ExportConfig {
55 pub enable_prometheus: bool,
57 pub prometheus_port: u16,
59 pub enable_opentelemetry: bool,
61 pub otlp_endpoint: Option<String>,
63 pub export_interval_seconds: u64,
65 pub enable_json_export: bool,
67 pub json_export_path: Option<String>,
69}
70
71impl Default for MonitoringConfig {
72 fn default() -> Self {
73 Self {
74 collection_interval_seconds: 10,
75 latency_window_size: 1000,
76 throughput_window_size: 100,
77 quality_assessment_interval_seconds: 300, drift_detection_interval_seconds: 3600, enable_alerting: true,
80 alert_thresholds: AlertThresholds::default(),
81 export_config: ExportConfig::default(),
82 }
83 }
84}
85
86impl Default for ExportConfig {
87 fn default() -> Self {
88 Self {
89 enable_prometheus: true,
90 prometheus_port: 9090,
91 enable_opentelemetry: false,
92 otlp_endpoint: None,
93 export_interval_seconds: 60,
94 enable_json_export: false,
95 json_export_path: None,
96 }
97 }
98}
99
100pub struct PerformanceMonitor {
102 metrics: Arc<RwLock<PerformanceMetrics>>,
104 latency_window: Arc<Mutex<VecDeque<f64>>>,
106 throughput_window: Arc<Mutex<VecDeque<f64>>>,
108 error_log: Arc<Mutex<VecDeque<ErrorEvent>>>,
110 quality_history: Arc<Mutex<VecDeque<QualityAssessment>>>,
112 config: MonitoringConfig,
114 monitoring_tasks: Vec<JoinHandle<()>>,
116 alert_handlers: Vec<Box<dyn AlertHandler + Send + Sync>>,
118}
119
120impl PerformanceMonitor {
121 pub fn new(config: MonitoringConfig) -> Self {
123 Self {
124 metrics: Arc::new(RwLock::new(PerformanceMetrics::default())),
125 latency_window: Arc::new(Mutex::new(VecDeque::with_capacity(
126 config.latency_window_size,
127 ))),
128 throughput_window: Arc::new(Mutex::new(VecDeque::with_capacity(
129 config.throughput_window_size,
130 ))),
131 error_log: Arc::new(Mutex::new(VecDeque::with_capacity(1000))),
132 quality_history: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
133 config,
134 monitoring_tasks: Vec::new(),
135 alert_handlers: Vec::new(),
136 }
137 }
138
139 pub async fn start(&mut self) -> Result<()> {
141 info!("Starting performance monitoring system");
142
143 let metrics_task = self.start_metrics_collection().await;
144 self.monitoring_tasks.push(metrics_task);
145
146 let drift_task = self.start_drift_detection().await;
147 self.monitoring_tasks.push(drift_task);
148
149 let quality_task = self.start_quality_assessment().await;
150 self.monitoring_tasks.push(quality_task);
151
152 if self.config.export_config.enable_prometheus {
153 let export_task = self.start_metrics_export().await;
154 self.monitoring_tasks.push(export_task);
155 }
156
157 info!("Performance monitoring system started successfully");
158 Ok(())
159 }
160
161 pub async fn stop(&mut self) {
163 info!("Stopping performance monitoring system");
164 for task in self.monitoring_tasks.drain(..) {
165 task.abort();
166 }
167 info!("Performance monitoring system stopped");
168 }
169
170 pub async fn record_latency(&self, latency_ms: f64) {
172 let mut window = self.latency_window.lock().await;
173
174 if window.len() >= self.config.latency_window_size {
175 window.pop_front();
176 }
177 window.push_back(latency_ms);
178
179 {
180 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
181 metrics.latency.total_measurements += 1;
182
183 metrics.latency.max_latency_ms = metrics.latency.max_latency_ms.max(latency_ms);
184 metrics.latency.min_latency_ms = metrics.latency.min_latency_ms.min(latency_ms);
185
186 let alpha = 0.1;
187 metrics.latency.avg_embedding_time_ms =
188 alpha * latency_ms + (1.0 - alpha) * metrics.latency.avg_embedding_time_ms;
189
190 let mut sorted_latencies: Vec<f64> = window.iter().copied().collect();
191 sorted_latencies.sort_by(|a, b| {
192 a.partial_cmp(b)
193 .expect("latency values should be comparable")
194 });
195
196 if !sorted_latencies.is_empty() {
197 let len = sorted_latencies.len();
198 metrics.latency.p50_latency_ms = sorted_latencies[len * 50 / 100];
199 metrics.latency.p95_latency_ms = sorted_latencies[len * 95 / 100];
200 metrics.latency.p99_latency_ms = sorted_latencies[len * 99 / 100];
201 }
202 }
203
204 if self.config.enable_alerting {
205 self.check_latency_alerts(latency_ms).await;
206 }
207 }
208
209 pub async fn record_throughput(&self, requests_per_second: f64) {
211 let mut window = self.throughput_window.lock().await;
212
213 if window.len() >= self.config.throughput_window_size {
214 window.pop_front();
215 }
216 window.push_back(requests_per_second);
217
218 {
219 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
220 metrics.throughput.peak_throughput =
221 metrics.throughput.peak_throughput.max(requests_per_second);
222
223 let avg_throughput = window.iter().sum::<f64>() / window.len() as f64;
224 metrics.throughput.requests_per_second = avg_throughput;
225 }
226
227 if self.config.enable_alerting {
228 self.check_throughput_alerts(requests_per_second).await;
229 }
230 }
231
232 pub async fn record_error(&self, error_event: ErrorEvent) {
234 let mut error_log = self.error_log.lock().await;
235
236 if error_log.len() >= 1000 {
237 error_log.pop_front();
238 }
239 error_log.push_back(error_event.clone());
240
241 {
242 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
243 metrics.errors.total_errors += 1;
244 metrics.errors.last_error = Some(error_event.timestamp);
245
246 *metrics
247 .errors
248 .errors_by_type
249 .entry(error_event.error_type.clone())
250 .or_insert(0) += 1;
251
252 if let ErrorSeverity::Critical = error_event.severity {
253 metrics.errors.critical_errors += 1
254 }
255
256 if error_event.error_type.contains("timeout") {
257 metrics.errors.timeout_errors += 1;
258 } else if error_event.error_type.contains("model") {
259 metrics.errors.model_errors += 1;
260 } else {
261 metrics.errors.system_errors += 1;
262 }
263
264 let total_requests = metrics.throughput.total_requests;
265 if total_requests > 0 {
266 metrics.errors.error_rate_per_hour =
267 (metrics.errors.total_errors as f64 / total_requests as f64) * 3600.0;
268 }
269 }
270
271 if matches!(error_event.severity, ErrorSeverity::Critical) {
272 self.handle_critical_error(error_event).await;
273 }
274 }
275
276 pub async fn update_resource_metrics(&self, resources: ResourceMetrics) {
278 {
279 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
280
281 metrics.resources.peak_memory_mb = metrics
282 .resources
283 .peak_memory_mb
284 .max(resources.memory_usage_mb);
285 metrics.resources.peak_gpu_memory_mb = metrics
286 .resources
287 .peak_gpu_memory_mb
288 .max(resources.gpu_memory_usage_mb);
289
290 metrics.resources = resources.clone();
291 }
292
293 if self.config.enable_alerting {
294 self.check_resource_alerts(resources).await;
295 }
296 }
297
298 pub async fn update_cache_metrics(&self, cache_metrics: CacheMetrics) {
300 {
301 let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
302 metrics.cache = cache_metrics.clone();
303 }
304
305 if self.config.enable_alerting
306 && cache_metrics.hit_rate < self.config.alert_thresholds.min_cache_hit_rate
307 {
308 self.send_alert(Alert {
309 alert_type: AlertType::LowCacheHitRate,
310 message: format!(
311 "Cache hit rate dropped to {:.2}%",
312 cache_metrics.hit_rate * 100.0
313 ),
314 severity: AlertSeverity::Warning,
315 timestamp: Utc::now(),
316 metrics: HashMap::from([
317 ("hit_rate".to_string(), cache_metrics.hit_rate),
318 (
319 "threshold".to_string(),
320 self.config.alert_thresholds.min_cache_hit_rate,
321 ),
322 ]),
323 })
324 .await;
325 }
326 }
327
328 pub fn get_metrics(&self) -> PerformanceMetrics {
330 self.metrics
331 .read()
332 .expect("rwlock should not be poisoned")
333 .clone()
334 }
335
336 pub fn add_alert_handler(&mut self, handler: Box<dyn AlertHandler + Send + Sync>) {
338 self.alert_handlers.push(handler);
339 }
340
341 pub fn get_performance_summary(&self) -> String {
343 let metrics = self.metrics.read().expect("rwlock should not be poisoned");
344
345 format!(
346 "Performance Summary:\n\
347 - P95 Latency: {:.2}ms\n\
348 - Throughput: {:.1} req/s\n\
349 - Error Rate: {:.3}%\n\
350 - Cache Hit Rate: {:.1}%\n\
351 - Memory Usage: {:.1}MB\n\
352 - Quality Score: {:.3}",
353 metrics.latency.p95_latency_ms,
354 metrics.throughput.requests_per_second,
355 (metrics.errors.total_errors as f64 / metrics.throughput.total_requests.max(1) as f64)
356 * 100.0,
357 metrics.cache.hit_rate * 100.0,
358 metrics.resources.memory_usage_mb,
359 metrics.quality.avg_quality_score
360 )
361 }
362
363 async fn start_metrics_collection(&self) -> JoinHandle<()> {
366 let metrics = Arc::clone(&self.metrics);
367 let interval = Duration::from_secs(self.config.collection_interval_seconds);
368
369 tokio::spawn(async move {
370 let mut interval_timer = tokio::time::interval(interval);
371 loop {
372 interval_timer.tick().await;
373 let system_metrics = Self::collect_system_metrics().await;
374 {
375 let mut metrics = metrics.write().expect("rwlock should not be poisoned");
376 metrics.resources = system_metrics;
377 }
378 debug!("Collected system metrics");
379 }
380 })
381 }
382
383 async fn start_drift_detection(&self) -> JoinHandle<()> {
384 let metrics = Arc::clone(&self.metrics);
385 let quality_history = Arc::clone(&self.quality_history);
386 let interval = Duration::from_secs(self.config.drift_detection_interval_seconds);
387
388 tokio::spawn(async move {
389 let mut interval_timer = tokio::time::interval(interval);
390 loop {
391 interval_timer.tick().await;
392 let drift_metrics = Self::detect_drift(&quality_history).await;
393 {
394 let mut metrics = metrics.write().expect("rwlock should not be poisoned");
395 metrics.drift = drift_metrics;
396 metrics.drift.last_drift_check = Utc::now();
397 }
398 info!("Performed drift detection analysis");
399 }
400 })
401 }
402
403 async fn start_quality_assessment(&self) -> JoinHandle<()> {
404 let metrics = Arc::clone(&self.metrics);
405 let quality_history = Arc::clone(&self.quality_history);
406 let interval = Duration::from_secs(self.config.quality_assessment_interval_seconds);
407
408 tokio::spawn(async move {
409 let mut interval_timer = tokio::time::interval(interval);
410 loop {
411 interval_timer.tick().await;
412 let quality_assessment = Self::assess_quality().await;
413 {
414 let mut history = quality_history.lock().await;
415 if history.len() >= 100 {
416 history.pop_front();
417 }
418 history.push_back(quality_assessment.clone());
419 }
420 {
421 let mut metrics = metrics.write().expect("rwlock should not be poisoned");
422 metrics.quality.avg_quality_score = quality_assessment.quality_score;
423 metrics.quality.last_assessment = quality_assessment.timestamp;
424
425 for (key, value) in &quality_assessment.metrics {
426 match key.as_str() {
427 "isotropy" => metrics.quality.isotropy_score = *value,
428 "neighborhood_preservation" => {
429 metrics.quality.neighborhood_preservation = *value
430 }
431 "clustering_quality" => metrics.quality.clustering_quality = *value,
432 "similarity_correlation" => {
433 metrics.quality.similarity_correlation = *value
434 }
435 _ => {}
436 }
437 }
438 }
439 info!(
440 "Performed quality assessment: score = {:.3}",
441 quality_assessment.quality_score
442 );
443 }
444 })
445 }
446
447 async fn start_metrics_export(&self) -> JoinHandle<()> {
448 let metrics = Arc::clone(&self.metrics);
449 let export_config = self.config.export_config.clone();
450 let interval = Duration::from_secs(export_config.export_interval_seconds);
451
452 tokio::spawn(async move {
453 let mut interval_timer = tokio::time::interval(interval);
454 loop {
455 interval_timer.tick().await;
456 let current_metrics = metrics
457 .read()
458 .expect("rwlock should not be poisoned")
459 .clone();
460
461 if export_config.enable_prometheus {
462 Self::export_prometheus_metrics(¤t_metrics).await;
463 }
464 if export_config.enable_json_export {
465 if let Some(ref path) = export_config.json_export_path {
466 Self::export_json_metrics(¤t_metrics, path).await;
467 }
468 }
469 debug!("Exported metrics");
470 }
471 })
472 }
473
474 async fn collect_system_metrics() -> ResourceMetrics {
477 let mut random = Random::default();
478 ResourceMetrics {
479 cpu_utilization_percent: random.random::<f64>() * 100.0,
480 memory_usage_mb: 1024.0 + random.random::<f64>() * 2048.0,
481 gpu_utilization_percent: random.random::<f64>() * 100.0,
482 gpu_memory_usage_mb: 2048.0 + random.random::<f64>() * 4096.0,
483 network_io_mbps: random.random::<f64>() * 100.0,
484 disk_io_mbps: random.random::<f64>() * 50.0,
485 peak_memory_mb: 3072.0,
486 peak_gpu_memory_mb: 6144.0,
487 }
488 }
489
490 async fn detect_drift(
491 quality_history: &Arc<Mutex<VecDeque<QualityAssessment>>>,
492 ) -> DriftMetrics {
493 let history = quality_history.lock().await;
494
495 if history.len() < 2 {
496 return DriftMetrics::default();
497 }
498
499 let recent_quality = history
500 .back()
501 .expect("quality history should not be empty")
502 .quality_score;
503 let baseline_quality = history
504 .front()
505 .expect("quality history should not be empty")
506 .quality_score;
507 let quality_drift = (recent_quality - baseline_quality).abs() / baseline_quality;
508
509 let mut random = Random::default();
510 DriftMetrics {
511 quality_drift_score: quality_drift,
512 performance_drift_score: random.random::<f64>() * 0.1,
513 distribution_shift: quality_drift > 0.1,
514 concept_drift_score: random.random::<f64>() * 0.05,
515 data_quality_issues: if quality_drift > 0.2 { 1 } else { 0 },
516 drift_alerts: if quality_drift > 0.15 { 1 } else { 0 },
517 last_drift_check: Utc::now(),
518 }
519 }
520
521 async fn assess_quality() -> QualityAssessment {
522 let mut random = Random::default();
523 let quality_score = 0.8 + random.random::<f64>() * 0.2;
524
525 let mut metrics = HashMap::new();
526 metrics.insert("isotropy".to_string(), 0.7 + random.random::<f64>() * 0.3);
527 metrics.insert(
528 "neighborhood_preservation".to_string(),
529 0.8 + random.random::<f64>() * 0.2,
530 );
531 metrics.insert(
532 "clustering_quality".to_string(),
533 0.75 + random.random::<f64>() * 0.25,
534 );
535 metrics.insert(
536 "similarity_correlation".to_string(),
537 0.85 + random.random::<f64>() * 0.15,
538 );
539
540 QualityAssessment {
541 timestamp: Utc::now(),
542 quality_score,
543 metrics,
544 assessment_details: format!(
545 "Quality assessment completed with score: {quality_score:.3}"
546 ),
547 }
548 }
549
550 async fn export_prometheus_metrics(metrics: &PerformanceMetrics) {
551 debug!(
552 "Exporting Prometheus metrics: P95 latency = {:.2}ms",
553 metrics.latency.p95_latency_ms
554 );
555 }
556
557 async fn export_json_metrics(metrics: &PerformanceMetrics, path: &str) {
558 match serde_json::to_string_pretty(metrics) {
559 Ok(json) => {
560 if let Err(e) = tokio::fs::write(path, json).await {
561 error!("Failed to export JSON metrics: {}", e);
562 }
563 }
564 Err(e) => error!("Failed to serialize metrics to JSON: {}", e),
565 }
566 }
567
568 async fn check_latency_alerts(&self, latency_ms: f64) {
569 if latency_ms > self.config.alert_thresholds.max_p95_latency_ms {
570 self.send_alert(Alert {
571 alert_type: AlertType::HighLatency,
572 message: format!("High latency detected: {latency_ms:.2}ms"),
573 severity: AlertSeverity::Warning,
574 timestamp: Utc::now(),
575 metrics: HashMap::from([
576 ("latency_ms".to_string(), latency_ms),
577 (
578 "threshold_ms".to_string(),
579 self.config.alert_thresholds.max_p95_latency_ms,
580 ),
581 ]),
582 })
583 .await;
584 }
585 }
586
587 async fn check_throughput_alerts(&self, throughput_rps: f64) {
588 if throughput_rps < self.config.alert_thresholds.min_throughput_rps {
589 self.send_alert(Alert {
590 alert_type: AlertType::LowThroughput,
591 message: format!("Low throughput detected: {throughput_rps:.2} req/s"),
592 severity: AlertSeverity::Warning,
593 timestamp: Utc::now(),
594 metrics: HashMap::from([
595 ("throughput_rps".to_string(), throughput_rps),
596 (
597 "threshold_rps".to_string(),
598 self.config.alert_thresholds.min_throughput_rps,
599 ),
600 ]),
601 })
602 .await;
603 }
604 }
605
606 async fn check_resource_alerts(&self, resources: ResourceMetrics) {
607 if resources.memory_usage_mb > self.config.alert_thresholds.max_memory_usage_mb {
608 self.send_alert(Alert {
609 alert_type: AlertType::ResourceExhaustion,
610 message: format!("High memory usage: {:.1}MB", resources.memory_usage_mb),
611 severity: AlertSeverity::Critical,
612 timestamp: Utc::now(),
613 metrics: HashMap::from([
614 ("memory_mb".to_string(), resources.memory_usage_mb),
615 (
616 "threshold_mb".to_string(),
617 self.config.alert_thresholds.max_memory_usage_mb,
618 ),
619 ]),
620 })
621 .await;
622 }
623
624 if resources.gpu_memory_usage_mb > self.config.alert_thresholds.max_gpu_memory_mb {
625 self.send_alert(Alert {
626 alert_type: AlertType::ResourceExhaustion,
627 message: format!(
628 "High GPU memory usage: {:.1}MB",
629 resources.gpu_memory_usage_mb
630 ),
631 severity: AlertSeverity::Critical,
632 timestamp: Utc::now(),
633 metrics: HashMap::from([
634 ("gpu_memory_mb".to_string(), resources.gpu_memory_usage_mb),
635 (
636 "threshold_mb".to_string(),
637 self.config.alert_thresholds.max_gpu_memory_mb,
638 ),
639 ]),
640 })
641 .await;
642 }
643 }
644
645 async fn send_alert(&self, alert: Alert) {
646 warn!(
647 "Alert triggered: {:?} - {}",
648 alert.alert_type, alert.message
649 );
650 for handler in &self.alert_handlers {
651 if let Err(e) = handler.handle_alert(alert.clone()) {
652 error!("Alert handler failed: {}", e);
653 }
654 }
655 }
656
657 async fn handle_critical_error(&self, error_event: ErrorEvent) {
658 error!(
659 "Critical error occurred: {} - {}",
660 error_event.error_type, error_event.error_message
661 );
662 self.send_alert(Alert {
663 alert_type: AlertType::SystemFailure,
664 message: format!("Critical error: {}", error_event.error_message),
665 severity: AlertSeverity::Emergency,
666 timestamp: error_event.timestamp,
667 metrics: HashMap::new(),
668 })
669 .await;
670 }
671}