Skip to main content

oxirs_embed/
monitoring.rs

1//! Comprehensive monitoring and metrics system for embedding models.
2//!
3//! This is the facade module. Implementation is split across:
4//! - [`crate::monitoring_metrics`] — metric types, collectors, aggregators
5//! - [`crate::monitoring_health`] — health check logic, alerting, threshold monitoring
6//! - [`crate::monitoring_tests`] — tests
7
8// Re-export all public types from sibling modules
9pub use crate::monitoring_health::{
10    Alert, AlertHandler, AlertSeverity, AlertThresholds, AlertType, ComponentHealth,
11    ConsoleAlertHandler, HealthCheckResult, HealthChecker, HealthStatus, SlackAlertHandler,
12};
13pub use crate::monitoring_metrics::{
14    CacheMetrics, DriftMetrics, ErrorMetrics, LatencyMetrics, MetricsCollector, PerformanceMetrics,
15    QualityMetrics, ResourceMetrics, ThroughputMetrics,
16};
17
18use anyhow::Result;
19use chrono::Utc;
20use scirs2_core::random::{Random, RngExt};
21use std::collections::{HashMap, VecDeque};
22use std::sync::{Arc, RwLock};
23use std::time::Duration;
24use tokio::sync::Mutex;
25use tokio::task::JoinHandle;
26use tracing::{debug, error, info, warn};
27
28// ErrorEvent and ErrorSeverity live in monitoring_metrics but are also used here
29pub use crate::monitoring_metrics::{ErrorEvent, ErrorSeverity, QualityAssessment};
30
31/// Monitoring configuration
32#[derive(Debug, Clone)]
33pub struct MonitoringConfig {
34    /// Metrics collection interval (seconds)
35    pub collection_interval_seconds: u64,
36    /// Latency window size for percentile calculations
37    pub latency_window_size: usize,
38    /// Throughput window size
39    pub throughput_window_size: usize,
40    /// Quality assessment interval (seconds)
41    pub quality_assessment_interval_seconds: u64,
42    /// Drift detection interval (seconds)
43    pub drift_detection_interval_seconds: u64,
44    /// Enable real-time alerting
45    pub enable_alerting: bool,
46    /// Alert thresholds
47    pub alert_thresholds: AlertThresholds,
48    /// Metrics export configuration
49    pub export_config: ExportConfig,
50}
51
52/// Metrics export configuration
53#[derive(Debug, Clone)]
54pub struct ExportConfig {
55    /// Enable Prometheus metrics export
56    pub enable_prometheus: bool,
57    /// Prometheus metrics port
58    pub prometheus_port: u16,
59    /// Enable OpenTelemetry export
60    pub enable_opentelemetry: bool,
61    /// OTLP endpoint
62    pub otlp_endpoint: Option<String>,
63    /// Export interval (seconds)
64    pub export_interval_seconds: u64,
65    /// Enable JSON metrics export
66    pub enable_json_export: bool,
67    /// JSON export path
68    pub json_export_path: Option<String>,
69}
70
71impl Default for MonitoringConfig {
72    fn default() -> Self {
73        Self {
74            collection_interval_seconds: 10,
75            latency_window_size: 1000,
76            throughput_window_size: 100,
77            quality_assessment_interval_seconds: 300, // 5 minutes
78            drift_detection_interval_seconds: 3600,   // 1 hour
79            enable_alerting: true,
80            alert_thresholds: AlertThresholds::default(),
81            export_config: ExportConfig::default(),
82        }
83    }
84}
85
86impl Default for ExportConfig {
87    fn default() -> Self {
88        Self {
89            enable_prometheus: true,
90            prometheus_port: 9090,
91            enable_opentelemetry: false,
92            otlp_endpoint: None,
93            export_interval_seconds: 60,
94            enable_json_export: false,
95            json_export_path: None,
96        }
97    }
98}
99
100/// Performance monitoring manager
101pub struct PerformanceMonitor {
102    /// Current metrics
103    metrics: Arc<RwLock<PerformanceMetrics>>,
104    /// Latency measurements window
105    latency_window: Arc<Mutex<VecDeque<f64>>>,
106    /// Throughput measurements window
107    throughput_window: Arc<Mutex<VecDeque<f64>>>,
108    /// Error tracking
109    error_log: Arc<Mutex<VecDeque<ErrorEvent>>>,
110    /// Quality assessments
111    quality_history: Arc<Mutex<VecDeque<QualityAssessment>>>,
112    /// Monitoring configuration
113    config: MonitoringConfig,
114    /// Background monitoring tasks
115    monitoring_tasks: Vec<JoinHandle<()>>,
116    /// Alert handlers
117    alert_handlers: Vec<Box<dyn AlertHandler + Send + Sync>>,
118}
119
120impl PerformanceMonitor {
121    /// Create new performance monitor
122    pub fn new(config: MonitoringConfig) -> Self {
123        Self {
124            metrics: Arc::new(RwLock::new(PerformanceMetrics::default())),
125            latency_window: Arc::new(Mutex::new(VecDeque::with_capacity(
126                config.latency_window_size,
127            ))),
128            throughput_window: Arc::new(Mutex::new(VecDeque::with_capacity(
129                config.throughput_window_size,
130            ))),
131            error_log: Arc::new(Mutex::new(VecDeque::with_capacity(1000))),
132            quality_history: Arc::new(Mutex::new(VecDeque::with_capacity(100))),
133            config,
134            monitoring_tasks: Vec::new(),
135            alert_handlers: Vec::new(),
136        }
137    }
138
139    /// Start monitoring services
140    pub async fn start(&mut self) -> Result<()> {
141        info!("Starting performance monitoring system");
142
143        let metrics_task = self.start_metrics_collection().await;
144        self.monitoring_tasks.push(metrics_task);
145
146        let drift_task = self.start_drift_detection().await;
147        self.monitoring_tasks.push(drift_task);
148
149        let quality_task = self.start_quality_assessment().await;
150        self.monitoring_tasks.push(quality_task);
151
152        if self.config.export_config.enable_prometheus {
153            let export_task = self.start_metrics_export().await;
154            self.monitoring_tasks.push(export_task);
155        }
156
157        info!("Performance monitoring system started successfully");
158        Ok(())
159    }
160
161    /// Stop monitoring services
162    pub async fn stop(&mut self) {
163        info!("Stopping performance monitoring system");
164        for task in self.monitoring_tasks.drain(..) {
165            task.abort();
166        }
167        info!("Performance monitoring system stopped");
168    }
169
170    /// Record request latency
171    pub async fn record_latency(&self, latency_ms: f64) {
172        let mut window = self.latency_window.lock().await;
173
174        if window.len() >= self.config.latency_window_size {
175            window.pop_front();
176        }
177        window.push_back(latency_ms);
178
179        {
180            let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
181            metrics.latency.total_measurements += 1;
182
183            metrics.latency.max_latency_ms = metrics.latency.max_latency_ms.max(latency_ms);
184            metrics.latency.min_latency_ms = metrics.latency.min_latency_ms.min(latency_ms);
185
186            let alpha = 0.1;
187            metrics.latency.avg_embedding_time_ms =
188                alpha * latency_ms + (1.0 - alpha) * metrics.latency.avg_embedding_time_ms;
189
190            let mut sorted_latencies: Vec<f64> = window.iter().copied().collect();
191            sorted_latencies.sort_by(|a, b| {
192                a.partial_cmp(b)
193                    .expect("latency values should be comparable")
194            });
195
196            if !sorted_latencies.is_empty() {
197                let len = sorted_latencies.len();
198                metrics.latency.p50_latency_ms = sorted_latencies[len * 50 / 100];
199                metrics.latency.p95_latency_ms = sorted_latencies[len * 95 / 100];
200                metrics.latency.p99_latency_ms = sorted_latencies[len * 99 / 100];
201            }
202        }
203
204        if self.config.enable_alerting {
205            self.check_latency_alerts(latency_ms).await;
206        }
207    }
208
209    /// Record throughput measurement
210    pub async fn record_throughput(&self, requests_per_second: f64) {
211        let mut window = self.throughput_window.lock().await;
212
213        if window.len() >= self.config.throughput_window_size {
214            window.pop_front();
215        }
216        window.push_back(requests_per_second);
217
218        {
219            let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
220            metrics.throughput.peak_throughput =
221                metrics.throughput.peak_throughput.max(requests_per_second);
222
223            let avg_throughput = window.iter().sum::<f64>() / window.len() as f64;
224            metrics.throughput.requests_per_second = avg_throughput;
225        }
226
227        if self.config.enable_alerting {
228            self.check_throughput_alerts(requests_per_second).await;
229        }
230    }
231
232    /// Record error event
233    pub async fn record_error(&self, error_event: ErrorEvent) {
234        let mut error_log = self.error_log.lock().await;
235
236        if error_log.len() >= 1000 {
237            error_log.pop_front();
238        }
239        error_log.push_back(error_event.clone());
240
241        {
242            let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
243            metrics.errors.total_errors += 1;
244            metrics.errors.last_error = Some(error_event.timestamp);
245
246            *metrics
247                .errors
248                .errors_by_type
249                .entry(error_event.error_type.clone())
250                .or_insert(0) += 1;
251
252            if let ErrorSeverity::Critical = error_event.severity {
253                metrics.errors.critical_errors += 1
254            }
255
256            if error_event.error_type.contains("timeout") {
257                metrics.errors.timeout_errors += 1;
258            } else if error_event.error_type.contains("model") {
259                metrics.errors.model_errors += 1;
260            } else {
261                metrics.errors.system_errors += 1;
262            }
263
264            let total_requests = metrics.throughput.total_requests;
265            if total_requests > 0 {
266                metrics.errors.error_rate_per_hour =
267                    (metrics.errors.total_errors as f64 / total_requests as f64) * 3600.0;
268            }
269        }
270
271        if matches!(error_event.severity, ErrorSeverity::Critical) {
272            self.handle_critical_error(error_event).await;
273        }
274    }
275
276    /// Update resource metrics
277    pub async fn update_resource_metrics(&self, resources: ResourceMetrics) {
278        {
279            let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
280
281            metrics.resources.peak_memory_mb = metrics
282                .resources
283                .peak_memory_mb
284                .max(resources.memory_usage_mb);
285            metrics.resources.peak_gpu_memory_mb = metrics
286                .resources
287                .peak_gpu_memory_mb
288                .max(resources.gpu_memory_usage_mb);
289
290            metrics.resources = resources.clone();
291        }
292
293        if self.config.enable_alerting {
294            self.check_resource_alerts(resources).await;
295        }
296    }
297
298    /// Update cache metrics
299    pub async fn update_cache_metrics(&self, cache_metrics: CacheMetrics) {
300        {
301            let mut metrics = self.metrics.write().expect("rwlock should not be poisoned");
302            metrics.cache = cache_metrics.clone();
303        }
304
305        if self.config.enable_alerting
306            && cache_metrics.hit_rate < self.config.alert_thresholds.min_cache_hit_rate
307        {
308            self.send_alert(Alert {
309                alert_type: AlertType::LowCacheHitRate,
310                message: format!(
311                    "Cache hit rate dropped to {:.2}%",
312                    cache_metrics.hit_rate * 100.0
313                ),
314                severity: AlertSeverity::Warning,
315                timestamp: Utc::now(),
316                metrics: HashMap::from([
317                    ("hit_rate".to_string(), cache_metrics.hit_rate),
318                    (
319                        "threshold".to_string(),
320                        self.config.alert_thresholds.min_cache_hit_rate,
321                    ),
322                ]),
323            })
324            .await;
325        }
326    }
327
328    /// Get current metrics snapshot
329    pub fn get_metrics(&self) -> PerformanceMetrics {
330        self.metrics
331            .read()
332            .expect("rwlock should not be poisoned")
333            .clone()
334    }
335
336    /// Add alert handler
337    pub fn add_alert_handler(&mut self, handler: Box<dyn AlertHandler + Send + Sync>) {
338        self.alert_handlers.push(handler);
339    }
340
341    /// Get performance summary
342    pub fn get_performance_summary(&self) -> String {
343        let metrics = self.metrics.read().expect("rwlock should not be poisoned");
344
345        format!(
346            "Performance Summary:\n\
347             - P95 Latency: {:.2}ms\n\
348             - Throughput: {:.1} req/s\n\
349             - Error Rate: {:.3}%\n\
350             - Cache Hit Rate: {:.1}%\n\
351             - Memory Usage: {:.1}MB\n\
352             - Quality Score: {:.3}",
353            metrics.latency.p95_latency_ms,
354            metrics.throughput.requests_per_second,
355            (metrics.errors.total_errors as f64 / metrics.throughput.total_requests.max(1) as f64)
356                * 100.0,
357            metrics.cache.hit_rate * 100.0,
358            metrics.resources.memory_usage_mb,
359            metrics.quality.avg_quality_score
360        )
361    }
362
363    // ---- Background tasks ----
364
365    async fn start_metrics_collection(&self) -> JoinHandle<()> {
366        let metrics = Arc::clone(&self.metrics);
367        let interval = Duration::from_secs(self.config.collection_interval_seconds);
368
369        tokio::spawn(async move {
370            let mut interval_timer = tokio::time::interval(interval);
371            loop {
372                interval_timer.tick().await;
373                let system_metrics = Self::collect_system_metrics().await;
374                {
375                    let mut metrics = metrics.write().expect("rwlock should not be poisoned");
376                    metrics.resources = system_metrics;
377                }
378                debug!("Collected system metrics");
379            }
380        })
381    }
382
383    async fn start_drift_detection(&self) -> JoinHandle<()> {
384        let metrics = Arc::clone(&self.metrics);
385        let quality_history = Arc::clone(&self.quality_history);
386        let interval = Duration::from_secs(self.config.drift_detection_interval_seconds);
387
388        tokio::spawn(async move {
389            let mut interval_timer = tokio::time::interval(interval);
390            loop {
391                interval_timer.tick().await;
392                let drift_metrics = Self::detect_drift(&quality_history).await;
393                {
394                    let mut metrics = metrics.write().expect("rwlock should not be poisoned");
395                    metrics.drift = drift_metrics;
396                    metrics.drift.last_drift_check = Utc::now();
397                }
398                info!("Performed drift detection analysis");
399            }
400        })
401    }
402
403    async fn start_quality_assessment(&self) -> JoinHandle<()> {
404        let metrics = Arc::clone(&self.metrics);
405        let quality_history = Arc::clone(&self.quality_history);
406        let interval = Duration::from_secs(self.config.quality_assessment_interval_seconds);
407
408        tokio::spawn(async move {
409            let mut interval_timer = tokio::time::interval(interval);
410            loop {
411                interval_timer.tick().await;
412                let quality_assessment = Self::assess_quality().await;
413                {
414                    let mut history = quality_history.lock().await;
415                    if history.len() >= 100 {
416                        history.pop_front();
417                    }
418                    history.push_back(quality_assessment.clone());
419                }
420                {
421                    let mut metrics = metrics.write().expect("rwlock should not be poisoned");
422                    metrics.quality.avg_quality_score = quality_assessment.quality_score;
423                    metrics.quality.last_assessment = quality_assessment.timestamp;
424
425                    for (key, value) in &quality_assessment.metrics {
426                        match key.as_str() {
427                            "isotropy" => metrics.quality.isotropy_score = *value,
428                            "neighborhood_preservation" => {
429                                metrics.quality.neighborhood_preservation = *value
430                            }
431                            "clustering_quality" => metrics.quality.clustering_quality = *value,
432                            "similarity_correlation" => {
433                                metrics.quality.similarity_correlation = *value
434                            }
435                            _ => {}
436                        }
437                    }
438                }
439                info!(
440                    "Performed quality assessment: score = {:.3}",
441                    quality_assessment.quality_score
442                );
443            }
444        })
445    }
446
447    async fn start_metrics_export(&self) -> JoinHandle<()> {
448        let metrics = Arc::clone(&self.metrics);
449        let export_config = self.config.export_config.clone();
450        let interval = Duration::from_secs(export_config.export_interval_seconds);
451
452        tokio::spawn(async move {
453            let mut interval_timer = tokio::time::interval(interval);
454            loop {
455                interval_timer.tick().await;
456                let current_metrics = metrics
457                    .read()
458                    .expect("rwlock should not be poisoned")
459                    .clone();
460
461                if export_config.enable_prometheus {
462                    Self::export_prometheus_metrics(&current_metrics).await;
463                }
464                if export_config.enable_json_export {
465                    if let Some(ref path) = export_config.json_export_path {
466                        Self::export_json_metrics(&current_metrics, path).await;
467                    }
468                }
469                debug!("Exported metrics");
470            }
471        })
472    }
473
474    // ---- Internal helpers ----
475
476    async fn collect_system_metrics() -> ResourceMetrics {
477        let mut random = Random::default();
478        ResourceMetrics {
479            cpu_utilization_percent: random.random::<f64>() * 100.0,
480            memory_usage_mb: 1024.0 + random.random::<f64>() * 2048.0,
481            gpu_utilization_percent: random.random::<f64>() * 100.0,
482            gpu_memory_usage_mb: 2048.0 + random.random::<f64>() * 4096.0,
483            network_io_mbps: random.random::<f64>() * 100.0,
484            disk_io_mbps: random.random::<f64>() * 50.0,
485            peak_memory_mb: 3072.0,
486            peak_gpu_memory_mb: 6144.0,
487        }
488    }
489
490    async fn detect_drift(
491        quality_history: &Arc<Mutex<VecDeque<QualityAssessment>>>,
492    ) -> DriftMetrics {
493        let history = quality_history.lock().await;
494
495        if history.len() < 2 {
496            return DriftMetrics::default();
497        }
498
499        let recent_quality = history
500            .back()
501            .expect("quality history should not be empty")
502            .quality_score;
503        let baseline_quality = history
504            .front()
505            .expect("quality history should not be empty")
506            .quality_score;
507        let quality_drift = (recent_quality - baseline_quality).abs() / baseline_quality;
508
509        let mut random = Random::default();
510        DriftMetrics {
511            quality_drift_score: quality_drift,
512            performance_drift_score: random.random::<f64>() * 0.1,
513            distribution_shift: quality_drift > 0.1,
514            concept_drift_score: random.random::<f64>() * 0.05,
515            data_quality_issues: if quality_drift > 0.2 { 1 } else { 0 },
516            drift_alerts: if quality_drift > 0.15 { 1 } else { 0 },
517            last_drift_check: Utc::now(),
518        }
519    }
520
521    async fn assess_quality() -> QualityAssessment {
522        let mut random = Random::default();
523        let quality_score = 0.8 + random.random::<f64>() * 0.2;
524
525        let mut metrics = HashMap::new();
526        metrics.insert("isotropy".to_string(), 0.7 + random.random::<f64>() * 0.3);
527        metrics.insert(
528            "neighborhood_preservation".to_string(),
529            0.8 + random.random::<f64>() * 0.2,
530        );
531        metrics.insert(
532            "clustering_quality".to_string(),
533            0.75 + random.random::<f64>() * 0.25,
534        );
535        metrics.insert(
536            "similarity_correlation".to_string(),
537            0.85 + random.random::<f64>() * 0.15,
538        );
539
540        QualityAssessment {
541            timestamp: Utc::now(),
542            quality_score,
543            metrics,
544            assessment_details: format!(
545                "Quality assessment completed with score: {quality_score:.3}"
546            ),
547        }
548    }
549
550    async fn export_prometheus_metrics(metrics: &PerformanceMetrics) {
551        debug!(
552            "Exporting Prometheus metrics: P95 latency = {:.2}ms",
553            metrics.latency.p95_latency_ms
554        );
555    }
556
557    async fn export_json_metrics(metrics: &PerformanceMetrics, path: &str) {
558        match serde_json::to_string_pretty(metrics) {
559            Ok(json) => {
560                if let Err(e) = tokio::fs::write(path, json).await {
561                    error!("Failed to export JSON metrics: {}", e);
562                }
563            }
564            Err(e) => error!("Failed to serialize metrics to JSON: {}", e),
565        }
566    }
567
568    async fn check_latency_alerts(&self, latency_ms: f64) {
569        if latency_ms > self.config.alert_thresholds.max_p95_latency_ms {
570            self.send_alert(Alert {
571                alert_type: AlertType::HighLatency,
572                message: format!("High latency detected: {latency_ms:.2}ms"),
573                severity: AlertSeverity::Warning,
574                timestamp: Utc::now(),
575                metrics: HashMap::from([
576                    ("latency_ms".to_string(), latency_ms),
577                    (
578                        "threshold_ms".to_string(),
579                        self.config.alert_thresholds.max_p95_latency_ms,
580                    ),
581                ]),
582            })
583            .await;
584        }
585    }
586
587    async fn check_throughput_alerts(&self, throughput_rps: f64) {
588        if throughput_rps < self.config.alert_thresholds.min_throughput_rps {
589            self.send_alert(Alert {
590                alert_type: AlertType::LowThroughput,
591                message: format!("Low throughput detected: {throughput_rps:.2} req/s"),
592                severity: AlertSeverity::Warning,
593                timestamp: Utc::now(),
594                metrics: HashMap::from([
595                    ("throughput_rps".to_string(), throughput_rps),
596                    (
597                        "threshold_rps".to_string(),
598                        self.config.alert_thresholds.min_throughput_rps,
599                    ),
600                ]),
601            })
602            .await;
603        }
604    }
605
606    async fn check_resource_alerts(&self, resources: ResourceMetrics) {
607        if resources.memory_usage_mb > self.config.alert_thresholds.max_memory_usage_mb {
608            self.send_alert(Alert {
609                alert_type: AlertType::ResourceExhaustion,
610                message: format!("High memory usage: {:.1}MB", resources.memory_usage_mb),
611                severity: AlertSeverity::Critical,
612                timestamp: Utc::now(),
613                metrics: HashMap::from([
614                    ("memory_mb".to_string(), resources.memory_usage_mb),
615                    (
616                        "threshold_mb".to_string(),
617                        self.config.alert_thresholds.max_memory_usage_mb,
618                    ),
619                ]),
620            })
621            .await;
622        }
623
624        if resources.gpu_memory_usage_mb > self.config.alert_thresholds.max_gpu_memory_mb {
625            self.send_alert(Alert {
626                alert_type: AlertType::ResourceExhaustion,
627                message: format!(
628                    "High GPU memory usage: {:.1}MB",
629                    resources.gpu_memory_usage_mb
630                ),
631                severity: AlertSeverity::Critical,
632                timestamp: Utc::now(),
633                metrics: HashMap::from([
634                    ("gpu_memory_mb".to_string(), resources.gpu_memory_usage_mb),
635                    (
636                        "threshold_mb".to_string(),
637                        self.config.alert_thresholds.max_gpu_memory_mb,
638                    ),
639                ]),
640            })
641            .await;
642        }
643    }
644
645    async fn send_alert(&self, alert: Alert) {
646        warn!(
647            "Alert triggered: {:?} - {}",
648            alert.alert_type, alert.message
649        );
650        for handler in &self.alert_handlers {
651            if let Err(e) = handler.handle_alert(alert.clone()) {
652                error!("Alert handler failed: {}", e);
653            }
654        }
655    }
656
657    async fn handle_critical_error(&self, error_event: ErrorEvent) {
658        error!(
659            "Critical error occurred: {} - {}",
660            error_event.error_type, error_event.error_message
661        );
662        self.send_alert(Alert {
663            alert_type: AlertType::SystemFailure,
664            message: format!("Critical error: {}", error_event.error_message),
665            severity: AlertSeverity::Emergency,
666            timestamp: error_event.timestamp,
667            metrics: HashMap::new(),
668        })
669        .await;
670    }
671}