Skip to main content

oxirs_vec/
enhanced_performance_monitoring.rs

1//! Enhanced Performance Monitoring and Analytics System
2//!
3//! This module provides comprehensive performance monitoring, analytics dashboard,
4//! and quality assurance metrics for the vector search engine.
5
6use anyhow::{anyhow, Result};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::Arc;
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12
13/// Configuration for performance monitoring
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct MonitoringConfig {
16    /// Enable real-time monitoring
17    pub enable_real_time: bool,
18    /// Enable detailed query logging
19    pub enable_query_logging: bool,
20    /// Enable system resource monitoring
21    pub enable_system_monitoring: bool,
22    /// Enable quality metrics collection
23    pub enable_quality_metrics: bool,
24    /// Metrics retention period
25    pub retention_period: Duration,
26    /// Sampling rate for metrics (0.0 to 1.0)
27    pub sampling_rate: f32,
28    /// Alert thresholds
29    pub alert_thresholds: AlertThresholds,
30    /// Export configuration
31    pub export_config: ExportConfig,
32}
33
34impl Default for MonitoringConfig {
35    fn default() -> Self {
36        Self {
37            enable_real_time: true,
38            enable_query_logging: true,
39            enable_system_monitoring: true,
40            enable_quality_metrics: true,
41            retention_period: Duration::from_secs(24 * 60 * 60), // 24 hours
42            sampling_rate: 1.0,
43            alert_thresholds: AlertThresholds::default(),
44            export_config: ExportConfig::default(),
45        }
46    }
47}
48
49/// Alert threshold configuration
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct AlertThresholds {
52    /// Maximum acceptable query latency (ms)
53    pub max_query_latency: f64,
54    /// Maximum acceptable error rate (0.0 to 1.0)
55    pub max_error_rate: f32,
56    /// Minimum acceptable recall@10
57    pub min_recall_at_10: f32,
58    /// Maximum memory usage (MB)
59    pub max_memory_usage: u64,
60    /// Maximum CPU usage (0.0 to 1.0)
61    pub max_cpu_usage: f32,
62}
63
64impl Default for AlertThresholds {
65    fn default() -> Self {
66        Self {
67            max_query_latency: 1000.0, // 1 second
68            max_error_rate: 0.05,      // 5%
69            min_recall_at_10: 0.90,    // 90%
70            max_memory_usage: 8192,    // 8GB
71            max_cpu_usage: 0.80,       // 80%
72        }
73    }
74}
75
76/// Export configuration for metrics
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ExportConfig {
79    /// Export format
80    pub format: ExportFormat,
81    /// Export interval
82    pub export_interval: Duration,
83    /// Export destination
84    pub destination: ExportDestination,
85    /// Include detailed metrics
86    pub include_detailed: bool,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub enum ExportFormat {
91    JSON,
92    CSV,
93    Prometheus,
94    InfluxDB,
95    ElasticSearch,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub enum ExportDestination {
100    File(String),
101    Http(String),
102    Database(String),
103    Console,
104}
105
106impl Default for ExportConfig {
107    fn default() -> Self {
108        Self {
109            format: ExportFormat::JSON,
110            export_interval: Duration::from_secs(60),
111            destination: ExportDestination::Console,
112            include_detailed: false,
113        }
114    }
115}
116
117/// Comprehensive performance monitor
118pub struct EnhancedPerformanceMonitor {
119    config: MonitoringConfig,
120    query_metrics: Arc<RwLock<QueryMetricsCollector>>,
121    system_metrics: Arc<RwLock<SystemMetricsCollector>>,
122    quality_metrics: Arc<RwLock<QualityMetricsCollector>>,
123    alert_manager: AlertManager,
124    analytics_engine: AnalyticsEngine,
125    dashboard_data: Arc<RwLock<DashboardData>>,
126}
127
128impl EnhancedPerformanceMonitor {
129    /// Create new performance monitor
130    pub fn new(config: MonitoringConfig) -> Self {
131        Self {
132            config: config.clone(),
133            query_metrics: Arc::new(RwLock::new(QueryMetricsCollector::new())),
134            system_metrics: Arc::new(RwLock::new(SystemMetricsCollector::new())),
135            quality_metrics: Arc::new(RwLock::new(QualityMetricsCollector::new())),
136            alert_manager: AlertManager::new(config.alert_thresholds.clone()),
137            analytics_engine: AnalyticsEngine::new(),
138            dashboard_data: Arc::new(RwLock::new(DashboardData::new())),
139        }
140    }
141
142    /// Record a query execution
143    pub fn record_query(&self, query_info: QueryInfo) {
144        if !self.config.enable_real_time {
145            return;
146        }
147
148        // Check sampling rate
149        if {
150            #[allow(unused_imports)]
151            use scirs2_core::random::{Random, Rng};
152            let mut rng = Random::seed(42);
153            rng.gen_range(0.0..1.0)
154        } > self.config.sampling_rate
155        {
156            return;
157        }
158
159        // Record query metrics
160        {
161            let mut metrics = self.query_metrics.write();
162            metrics.record_query(query_info.clone());
163        }
164
165        // Check for alerts
166        self.alert_manager.check_query_alerts(&query_info);
167
168        // Update dashboard data
169        {
170            let mut dashboard = self.dashboard_data.write();
171            dashboard.update_query_stats(&query_info);
172        }
173
174        // Run analytics
175        self.analytics_engine.analyze_query(&query_info);
176    }
177
178    /// Record system metrics
179    pub fn record_system_metrics(&self, metrics: SystemMetrics) {
180        if !self.config.enable_system_monitoring {
181            return;
182        }
183
184        {
185            let mut collector = self.system_metrics.write();
186            collector.record_metrics(metrics.clone());
187        }
188
189        // Check for system alerts
190        self.alert_manager.check_system_alerts(&metrics);
191
192        // Update dashboard
193        {
194            let mut dashboard = self.dashboard_data.write();
195            dashboard.update_system_stats(&metrics);
196        }
197    }
198
199    /// Record quality metrics
200    pub fn record_quality_metrics(&self, metrics: QualityMetrics) {
201        if !self.config.enable_quality_metrics {
202            return;
203        }
204
205        {
206            let mut collector = self.quality_metrics.write();
207            collector.record_metrics(metrics.clone());
208        }
209
210        // Check for quality alerts
211        self.alert_manager.check_quality_alerts(&metrics);
212
213        // Update dashboard
214        {
215            let mut dashboard = self.dashboard_data.write();
216            dashboard.update_quality_stats(&metrics);
217        }
218    }
219
220    /// Get current dashboard data
221    pub fn get_dashboard_data(&self) -> DashboardData {
222        self.dashboard_data.read().clone()
223    }
224
225    /// Generate comprehensive analytics report
226    pub fn generate_analytics_report(&self) -> AnalyticsReport {
227        let query_stats = self.query_metrics.read().get_statistics();
228        let system_stats = self.system_metrics.read().get_statistics();
229        let quality_stats = self.quality_metrics.read().get_statistics();
230        let alerts = self.alert_manager.get_active_alerts();
231
232        AnalyticsReport {
233            timestamp: SystemTime::now(),
234            query_statistics: query_stats,
235            system_statistics: system_stats,
236            quality_statistics: quality_stats,
237            active_alerts: alerts,
238            trends: self.analytics_engine.get_trends(),
239            recommendations: self.analytics_engine.get_recommendations(),
240        }
241    }
242
243    /// Export metrics
244    pub fn export_metrics(&self) -> Result<String> {
245        let report = self.generate_analytics_report();
246
247        match self.config.export_config.format {
248            ExportFormat::JSON => serde_json::to_string_pretty(&report)
249                .map_err(|e| anyhow!("JSON serialization error: {}", e)),
250            ExportFormat::CSV => self.generate_csv_export(&report),
251            ExportFormat::Prometheus => self.generate_prometheus_export(&report),
252            ExportFormat::InfluxDB => self.generate_influxdb_export(&report),
253            ExportFormat::ElasticSearch => self.generate_elasticsearch_export(&report),
254        }
255    }
256
257    /// Generate CSV export
258    fn generate_csv_export(&self, report: &AnalyticsReport) -> Result<String> {
259        let mut csv = String::new();
260        csv.push_str("metric,value,timestamp\n");
261
262        csv.push_str(&format!(
263            "total_queries,{},{}\n",
264            report.query_statistics.total_queries,
265            report
266                .timestamp
267                .duration_since(UNIX_EPOCH)
268                .expect("SystemTime should be after UNIX_EPOCH")
269                .as_secs()
270        ));
271
272        csv.push_str(&format!(
273            "avg_latency,{:.2},{}\n",
274            report.query_statistics.average_latency.as_millis(),
275            report
276                .timestamp
277                .duration_since(UNIX_EPOCH)
278                .expect("SystemTime should be after UNIX_EPOCH")
279                .as_secs()
280        ));
281
282        Ok(csv)
283    }
284
285    /// Generate Prometheus export
286    fn generate_prometheus_export(&self, report: &AnalyticsReport) -> Result<String> {
287        let mut prometheus = String::new();
288
289        prometheus.push_str("# HELP vector_search_queries_total Total number of queries\n");
290        prometheus.push_str("# TYPE vector_search_queries_total counter\n");
291        prometheus.push_str(&format!(
292            "vector_search_queries_total {}\n",
293            report.query_statistics.total_queries
294        ));
295
296        prometheus.push_str("# HELP vector_search_latency_seconds Query latency in seconds\n");
297        prometheus.push_str("# TYPE vector_search_latency_seconds histogram\n");
298        prometheus.push_str(&format!(
299            "vector_search_latency_seconds {:.6}\n",
300            report.query_statistics.average_latency.as_secs_f64()
301        ));
302
303        Ok(prometheus)
304    }
305
306    /// Generate InfluxDB line protocol export.
307    ///
308    /// Line protocol format: `measurement[,tag_key=tag_val]* field_key=field_val[,…] [timestamp]`
309    fn generate_influxdb_export(&self, report: &AnalyticsReport) -> Result<String> {
310        let ts_ns = report
311            .timestamp
312            .duration_since(UNIX_EPOCH)
313            .map_err(|e| anyhow!("SystemTime before UNIX_EPOCH: {}", e))?
314            .as_nanos();
315
316        let mut output = String::new();
317
318        // Query statistics measurement
319        output.push_str(&format!(
320            "vector_search_queries total_queries={}u,successful_queries={}u,failed_queries={}u,cache_hit_rate={:.6},avg_latency_ms={:.6} {}\n",
321            report.query_statistics.total_queries,
322            report.query_statistics.successful_queries,
323            report.query_statistics.failed_queries,
324            report.query_statistics.cache_hit_rate,
325            report.query_statistics.average_latency.as_secs_f64() * 1000.0,
326            ts_ns,
327        ));
328
329        // System statistics measurement
330        output.push_str(&format!(
331            "vector_search_system cpu_usage={:.6},memory_usage={}u,peak_memory_usage={}u,peak_cpu_usage={:.6} {}\n",
332            report.system_statistics.current_cpu_usage,
333            report.system_statistics.current_memory_usage,
334            report.system_statistics.peak_memory_usage,
335            report.system_statistics.peak_cpu_usage,
336            ts_ns,
337        ));
338
339        // Quality statistics measurement
340        output.push_str(&format!(
341            "vector_search_quality precision_at_10={:.6},recall_at_10={:.6},f1_score={:.6},trend_precision={:.6},trend_recall={:.6} {}\n",
342            report.quality_statistics.average_precision_at_10,
343            report.quality_statistics.average_recall_at_10,
344            report.quality_statistics.average_f1_score,
345            report.quality_statistics.trend_precision,
346            report.quality_statistics.trend_recall,
347            ts_ns,
348        ));
349
350        Ok(output)
351    }
352
353    /// Generate Elasticsearch bulk-index JSON export.
354    ///
355    /// Each document follows the `{ "index": {} }\n{ ...fields... }\n` ndjson format.
356    fn generate_elasticsearch_export(&self, report: &AnalyticsReport) -> Result<String> {
357        let ts_secs = report
358            .timestamp
359            .duration_since(UNIX_EPOCH)
360            .map_err(|e| anyhow!("SystemTime before UNIX_EPOCH: {}", e))?
361            .as_secs();
362
363        let doc = serde_json::json!({
364            "@timestamp": ts_secs,
365            "query_statistics": {
366                "total_queries": report.query_statistics.total_queries,
367                "successful_queries": report.query_statistics.successful_queries,
368                "failed_queries": report.query_statistics.failed_queries,
369                "average_latency_ms": report.query_statistics.average_latency.as_secs_f64() * 1000.0,
370                "min_latency_ms": report.query_statistics.min_latency.as_secs_f64() * 1000.0,
371                "max_latency_ms": report.query_statistics.max_latency.as_secs_f64() * 1000.0,
372                "cache_hit_rate": report.query_statistics.cache_hit_rate,
373                "throughput_qps": report.query_statistics.throughput_qps,
374            },
375            "system_statistics": {
376                "cpu_usage": report.system_statistics.current_cpu_usage,
377                "peak_cpu_usage": report.system_statistics.peak_cpu_usage,
378                "memory_usage_bytes": report.system_statistics.current_memory_usage,
379                "peak_memory_usage_bytes": report.system_statistics.peak_memory_usage,
380            },
381            "quality_statistics": {
382                "precision_at_10": report.quality_statistics.average_precision_at_10,
383                "recall_at_10": report.quality_statistics.average_recall_at_10,
384                "f1_score": report.quality_statistics.average_f1_score,
385                "trend_precision": report.quality_statistics.trend_precision,
386                "trend_recall": report.quality_statistics.trend_recall,
387            },
388            "active_alerts": report.active_alerts.len(),
389            "recommendations": report.recommendations.len(),
390        });
391
392        let action_line = "{\"index\":{}}\n";
393        let doc_line = serde_json::to_string(&doc)
394            .map_err(|e| anyhow!("Elasticsearch JSON serialization error: {}", e))?;
395
396        Ok(format!("{}{}\n", action_line, doc_line))
397    }
398
399    /// Start background monitoring
400    pub fn start_background_monitoring(&self) {
401        // In a real implementation, this would start background threads
402        // for continuous system monitoring, metric collection, etc.
403    }
404
405    /// Stop monitoring
406    pub fn stop_monitoring(&self) {
407        // Clean shutdown of monitoring systems
408    }
409}
410
411/// Query information for monitoring
412#[derive(Debug, Clone)]
413pub struct QueryInfo {
414    pub query_id: String,
415    pub query_type: QueryType,
416    pub query_text: Option<String>,
417    pub vector_dimensions: Option<usize>,
418    pub k_value: Option<usize>,
419    pub threshold: Option<f32>,
420    pub start_time: Instant,
421    pub end_time: Instant,
422    pub success: bool,
423    pub error_message: Option<String>,
424    pub results_count: usize,
425    pub index_used: Option<String>,
426    pub cache_hit: bool,
427}
428
429#[derive(Debug, Clone)]
430pub enum QueryType {
431    KNNSearch,
432    ThresholdSearch,
433    SimilarityCalculation,
434    TextEmbedding,
435    IndexUpdate,
436    Custom(String),
437}
438
439/// Query metrics collector
440#[derive(Debug)]
441pub struct QueryMetricsCollector {
442    queries: VecDeque<QueryInfo>,
443    statistics: QueryStatistics,
444    max_retention: usize,
445}
446
447impl Default for QueryMetricsCollector {
448    fn default() -> Self {
449        Self::new()
450    }
451}
452
453impl QueryMetricsCollector {
454    pub fn new() -> Self {
455        Self {
456            queries: VecDeque::new(),
457            statistics: QueryStatistics::default(),
458            max_retention: 10000, // Keep last 10k queries
459        }
460    }
461
462    pub fn record_query(&mut self, query: QueryInfo) {
463        let latency = query.end_time.duration_since(query.start_time);
464
465        // Update statistics
466        self.statistics.total_queries += 1;
467        if query.success {
468            self.statistics.successful_queries += 1;
469        } else {
470            self.statistics.failed_queries += 1;
471        }
472
473        // Update latency statistics
474        if self.statistics.total_queries == 1 {
475            self.statistics.average_latency = latency;
476            self.statistics.min_latency = latency;
477            self.statistics.max_latency = latency;
478        } else {
479            // Update running average
480            let total_time = self
481                .statistics
482                .average_latency
483                .mul_f64(self.statistics.total_queries as f64 - 1.0)
484                + latency;
485            self.statistics.average_latency =
486                total_time.div_f64(self.statistics.total_queries as f64);
487
488            if latency < self.statistics.min_latency {
489                self.statistics.min_latency = latency;
490            }
491            if latency > self.statistics.max_latency {
492                self.statistics.max_latency = latency;
493            }
494        }
495
496        // Update latency distribution
497        let latency_ms = latency.as_millis() as f64;
498        if latency_ms < 10.0 {
499            self.statistics.latency_distribution.p10 += 1;
500        } else if latency_ms < 50.0 {
501            self.statistics.latency_distribution.p50 += 1;
502        } else if latency_ms < 100.0 {
503            self.statistics.latency_distribution.p90 += 1;
504        } else if latency_ms < 500.0 {
505            self.statistics.latency_distribution.p95 += 1;
506        } else {
507            self.statistics.latency_distribution.p99 += 1;
508        }
509
510        // Cache hit rate
511        if query.cache_hit {
512            self.statistics.cache_hit_rate =
513                (self.statistics.cache_hit_rate * (self.statistics.total_queries - 1) as f32 + 1.0)
514                    / self.statistics.total_queries as f32;
515        } else {
516            self.statistics.cache_hit_rate = (self.statistics.cache_hit_rate
517                * (self.statistics.total_queries - 1) as f32)
518                / self.statistics.total_queries as f32;
519        }
520
521        // Store query for retention
522        self.queries.push_back(query);
523        if self.queries.len() > self.max_retention {
524            self.queries.pop_front();
525        }
526    }
527
528    pub fn get_statistics(&self) -> QueryStatistics {
529        self.statistics.clone()
530    }
531
532    pub fn get_recent_queries(&self, count: usize) -> Vec<&QueryInfo> {
533        self.queries.iter().rev().take(count).collect()
534    }
535}
536
537/// Query statistics
538#[derive(Debug, Clone, Default, Serialize, Deserialize)]
539pub struct QueryStatistics {
540    pub total_queries: u64,
541    pub successful_queries: u64,
542    pub failed_queries: u64,
543    pub average_latency: Duration,
544    pub min_latency: Duration,
545    pub max_latency: Duration,
546    pub latency_distribution: LatencyDistribution,
547    pub cache_hit_rate: f32,
548    pub throughput_qps: f32,
549}
550
551#[derive(Debug, Clone, Default, Serialize, Deserialize)]
552pub struct LatencyDistribution {
553    pub p10: u64, // < 10ms
554    pub p50: u64, // < 50ms
555    pub p90: u64, // < 100ms
556    pub p95: u64, // < 500ms
557    pub p99: u64, // >= 500ms
558}
559
560/// System metrics
561#[derive(Debug, Clone, Serialize, Deserialize)]
562pub struct SystemMetrics {
563    pub timestamp: SystemTime,
564    pub cpu_usage: f32,
565    pub memory_usage: u64, // bytes
566    pub memory_total: u64, // bytes
567    pub disk_usage: u64,   // bytes
568    pub disk_total: u64,   // bytes
569    pub network_in: u64,   // bytes/sec
570    pub network_out: u64,  // bytes/sec
571    pub open_file_descriptors: u32,
572    pub thread_count: u32,
573}
574
575/// System metrics collector
576#[derive(Debug)]
577pub struct SystemMetricsCollector {
578    metrics_history: VecDeque<SystemMetrics>,
579    statistics: SystemStatistics,
580    max_retention: usize,
581}
582
583impl Default for SystemMetricsCollector {
584    fn default() -> Self {
585        Self::new()
586    }
587}
588
589impl SystemMetricsCollector {
590    pub fn new() -> Self {
591        Self {
592            metrics_history: VecDeque::new(),
593            statistics: SystemStatistics::default(),
594            max_retention: 1440, // 24 hours of minute-by-minute data
595        }
596    }
597
598    pub fn record_metrics(&mut self, metrics: SystemMetrics) {
599        // Update statistics
600        self.statistics.current_cpu_usage = metrics.cpu_usage;
601        self.statistics.current_memory_usage = metrics.memory_usage;
602
603        if metrics.cpu_usage > self.statistics.peak_cpu_usage {
604            self.statistics.peak_cpu_usage = metrics.cpu_usage;
605        }
606
607        if metrics.memory_usage > self.statistics.peak_memory_usage {
608            self.statistics.peak_memory_usage = metrics.memory_usage;
609        }
610
611        // Store metrics
612        self.metrics_history.push_back(metrics);
613        if self.metrics_history.len() > self.max_retention {
614            self.metrics_history.pop_front();
615        }
616    }
617
618    pub fn get_statistics(&self) -> SystemStatistics {
619        self.statistics.clone()
620    }
621
622    pub fn get_recent_metrics(&self, count: usize) -> Vec<&SystemMetrics> {
623        self.metrics_history.iter().rev().take(count).collect()
624    }
625}
626
627/// System statistics
628#[derive(Debug, Clone, Default, Serialize, Deserialize)]
629pub struct SystemStatistics {
630    pub current_cpu_usage: f32,
631    pub peak_cpu_usage: f32,
632    pub current_memory_usage: u64,
633    pub peak_memory_usage: u64,
634    pub average_cpu_usage: f32,
635    pub average_memory_usage: u64,
636}
637
638/// Quality metrics
639#[derive(Debug, Clone, Serialize, Deserialize)]
640pub struct QualityMetrics {
641    pub timestamp: SystemTime,
642    pub precision_at_1: f32,
643    pub precision_at_5: f32,
644    pub precision_at_10: f32,
645    pub recall_at_1: f32,
646    pub recall_at_5: f32,
647    pub recall_at_10: f32,
648    pub f1_score: f32,
649    pub mrr: f32,  // Mean Reciprocal Rank
650    pub ndcg: f32, // Normalized Discounted Cumulative Gain
651    pub query_coverage: f32,
652    pub result_diversity: f32,
653}
654
655/// Quality metrics collector
656#[derive(Debug)]
657pub struct QualityMetricsCollector {
658    metrics_history: VecDeque<QualityMetrics>,
659    statistics: QualityStatistics,
660    max_retention: usize,
661}
662
663impl Default for QualityMetricsCollector {
664    fn default() -> Self {
665        Self::new()
666    }
667}
668
669impl QualityMetricsCollector {
670    pub fn new() -> Self {
671        Self {
672            metrics_history: VecDeque::new(),
673            statistics: QualityStatistics::default(),
674            max_retention: 1000,
675        }
676    }
677
678    pub fn record_metrics(&mut self, metrics: QualityMetrics) {
679        // Update running statistics
680        let count = self.metrics_history.len() as f32;
681        if count > 0.0 {
682            self.statistics.average_precision_at_10 =
683                (self.statistics.average_precision_at_10 * count + metrics.precision_at_10)
684                    / (count + 1.0);
685            self.statistics.average_recall_at_10 = (self.statistics.average_recall_at_10 * count
686                + metrics.recall_at_10)
687                / (count + 1.0);
688            self.statistics.average_f1_score =
689                (self.statistics.average_f1_score * count + metrics.f1_score) / (count + 1.0);
690        } else {
691            self.statistics.average_precision_at_10 = metrics.precision_at_10;
692            self.statistics.average_recall_at_10 = metrics.recall_at_10;
693            self.statistics.average_f1_score = metrics.f1_score;
694        }
695
696        // Store metrics
697        self.metrics_history.push_back(metrics);
698        if self.metrics_history.len() > self.max_retention {
699            self.metrics_history.pop_front();
700        }
701    }
702
703    pub fn get_statistics(&self) -> QualityStatistics {
704        self.statistics.clone()
705    }
706}
707
708/// Quality statistics
709#[derive(Debug, Clone, Default, Serialize, Deserialize)]
710pub struct QualityStatistics {
711    pub average_precision_at_10: f32,
712    pub average_recall_at_10: f32,
713    pub average_f1_score: f32,
714    pub trend_precision: f32,
715    pub trend_recall: f32,
716}
717
718/// Alert manager
719pub struct AlertManager {
720    thresholds: AlertThresholds,
721    active_alerts: Arc<RwLock<Vec<Alert>>>,
722}
723
724impl AlertManager {
725    pub fn new(thresholds: AlertThresholds) -> Self {
726        Self {
727            thresholds,
728            active_alerts: Arc::new(RwLock::new(Vec::new())),
729        }
730    }
731
732    pub fn check_query_alerts(&self, query: &QueryInfo) {
733        let latency_ms = query.end_time.duration_since(query.start_time).as_millis() as f64;
734
735        if latency_ms > self.thresholds.max_query_latency {
736            self.add_alert(Alert {
737                alert_type: AlertType::HighLatency,
738                severity: AlertSeverity::Warning,
739                message: format!(
740                    "Query latency {}ms exceeds threshold {}ms",
741                    latency_ms, self.thresholds.max_query_latency
742                ),
743                timestamp: SystemTime::now(),
744                source: "query_monitor".to_string(),
745            });
746        }
747    }
748
749    pub fn check_system_alerts(&self, metrics: &SystemMetrics) {
750        if metrics.cpu_usage > self.thresholds.max_cpu_usage {
751            self.add_alert(Alert {
752                alert_type: AlertType::HighCpuUsage,
753                severity: AlertSeverity::Warning,
754                message: format!(
755                    "CPU usage {:.1}% exceeds threshold {:.1}%",
756                    metrics.cpu_usage * 100.0,
757                    self.thresholds.max_cpu_usage * 100.0
758                ),
759                timestamp: SystemTime::now(),
760                source: "system_monitor".to_string(),
761            });
762        }
763
764        let memory_mb = metrics.memory_usage / (1024 * 1024);
765        if memory_mb > self.thresholds.max_memory_usage {
766            self.add_alert(Alert {
767                alert_type: AlertType::HighMemoryUsage,
768                severity: AlertSeverity::Critical,
769                message: format!(
770                    "Memory usage {}MB exceeds threshold {}MB",
771                    memory_mb, self.thresholds.max_memory_usage
772                ),
773                timestamp: SystemTime::now(),
774                source: "system_monitor".to_string(),
775            });
776        }
777    }
778
779    pub fn check_quality_alerts(&self, metrics: &QualityMetrics) {
780        if metrics.recall_at_10 < self.thresholds.min_recall_at_10 {
781            self.add_alert(Alert {
782                alert_type: AlertType::LowRecall,
783                severity: AlertSeverity::Warning,
784                message: format!(
785                    "Recall@10 {:.3} below threshold {:.3}",
786                    metrics.recall_at_10, self.thresholds.min_recall_at_10
787                ),
788                timestamp: SystemTime::now(),
789                source: "quality_monitor".to_string(),
790            });
791        }
792    }
793
794    fn add_alert(&self, alert: Alert) {
795        let mut alerts = self.active_alerts.write();
796        alerts.push(alert);
797
798        // Keep only recent alerts (last hour)
799        let cutoff = SystemTime::now() - Duration::from_secs(3600);
800        alerts.retain(|a| a.timestamp > cutoff);
801    }
802
803    pub fn get_active_alerts(&self) -> Vec<Alert> {
804        self.active_alerts.read().clone()
805    }
806}
807
808/// Alert information
809#[derive(Debug, Clone, Serialize, Deserialize)]
810pub struct Alert {
811    pub alert_type: AlertType,
812    pub severity: AlertSeverity,
813    pub message: String,
814    pub timestamp: SystemTime,
815    pub source: String,
816}
817
818#[derive(Debug, Clone, Serialize, Deserialize)]
819pub enum AlertType {
820    HighLatency,
821    HighCpuUsage,
822    HighMemoryUsage,
823    LowRecall,
824    HighErrorRate,
825    IndexCorruption,
826    ServiceDown,
827}
828
829#[derive(Debug, Clone, Serialize, Deserialize)]
830pub enum AlertSeverity {
831    Info,
832    Warning,
833    Critical,
834    Emergency,
835}
836
837/// Analytics engine for trend analysis and recommendations
838pub struct AnalyticsEngine {
839    trends: Arc<RwLock<HashMap<String, TrendData>>>,
840    recommendations: Arc<RwLock<Vec<Recommendation>>>,
841}
842
843impl Default for AnalyticsEngine {
844    fn default() -> Self {
845        Self::new()
846    }
847}
848
849impl AnalyticsEngine {
850    pub fn new() -> Self {
851        Self {
852            trends: Arc::new(RwLock::new(HashMap::new())),
853            recommendations: Arc::new(RwLock::new(Vec::new())),
854        }
855    }
856
857    pub fn analyze_query(&self, _query: &QueryInfo) {
858        // Placeholder for trend analysis
859        // In a full implementation, this would update trend data
860    }
861
862    pub fn get_trends(&self) -> HashMap<String, TrendData> {
863        self.trends.read().clone()
864    }
865
866    pub fn get_recommendations(&self) -> Vec<Recommendation> {
867        self.recommendations.read().clone()
868    }
869}
870
871/// Trend data for analytics
872#[derive(Debug, Clone, Serialize, Deserialize)]
873pub struct TrendData {
874    pub metric_name: String,
875    pub values: Vec<f64>,
876    pub timestamps: Vec<SystemTime>,
877    pub trend_direction: TrendDirection,
878    pub confidence: f32,
879}
880
881#[derive(Debug, Clone, Serialize, Deserialize)]
882pub enum TrendDirection {
883    Increasing,
884    Decreasing,
885    Stable,
886    Volatile,
887}
888
889/// Recommendation for system optimization
890#[derive(Debug, Clone, Serialize, Deserialize)]
891pub struct Recommendation {
892    pub category: RecommendationCategory,
893    pub priority: RecommendationPriority,
894    pub title: String,
895    pub description: String,
896    pub estimated_impact: String,
897    pub implementation_effort: String,
898}
899
900#[derive(Debug, Clone, Serialize, Deserialize)]
901pub enum RecommendationCategory {
902    Performance,
903    Quality,
904    ResourceOptimization,
905    Configuration,
906    Maintenance,
907}
908
909#[derive(Debug, Clone, Serialize, Deserialize)]
910pub enum RecommendationPriority {
911    Low,
912    Medium,
913    High,
914    Critical,
915}
916
917/// Dashboard data structure
918#[derive(Debug, Clone, Serialize, Deserialize)]
919pub struct DashboardData {
920    pub last_updated: SystemTime,
921    pub query_stats: QueryStatistics,
922    pub system_stats: SystemStatistics,
923    pub quality_stats: QualityStatistics,
924    pub alerts_count: usize,
925    pub trends: HashMap<String, Vec<f64>>,
926}
927
928impl DashboardData {
929    pub fn new() -> Self {
930        Self {
931            last_updated: SystemTime::now(),
932            query_stats: QueryStatistics::default(),
933            system_stats: SystemStatistics::default(),
934            quality_stats: QualityStatistics::default(),
935            alerts_count: 0,
936            trends: HashMap::new(),
937        }
938    }
939
940    pub fn update_query_stats(&mut self, _query: &QueryInfo) {
941        // Update query statistics
942        self.last_updated = SystemTime::now();
943        // Additional updates would be implemented here
944    }
945
946    pub fn update_system_stats(&mut self, _metrics: &SystemMetrics) {
947        // Update system statistics
948        self.last_updated = SystemTime::now();
949        // Additional updates would be implemented here
950    }
951
952    pub fn update_quality_stats(&mut self, _metrics: &QualityMetrics) {
953        // Update quality statistics
954        self.last_updated = SystemTime::now();
955        // Additional updates would be implemented here
956    }
957}
958
959impl Default for DashboardData {
960    fn default() -> Self {
961        Self::new()
962    }
963}
964
965/// Complete analytics report
966#[derive(Debug, Clone, Serialize, Deserialize)]
967pub struct AnalyticsReport {
968    pub timestamp: SystemTime,
969    pub query_statistics: QueryStatistics,
970    pub system_statistics: SystemStatistics,
971    pub quality_statistics: QualityStatistics,
972    pub active_alerts: Vec<Alert>,
973    pub trends: HashMap<String, TrendData>,
974    pub recommendations: Vec<Recommendation>,
975}
976
977#[cfg(test)]
978mod tests {
979    use super::*;
980
981    #[test]
982    fn test_query_metrics_collection() {
983        let mut collector = QueryMetricsCollector::new();
984
985        let query = QueryInfo {
986            query_id: "test_query".to_string(),
987            query_type: QueryType::KNNSearch,
988            query_text: Some("test query".to_string()),
989            vector_dimensions: Some(384),
990            k_value: Some(10),
991            threshold: None,
992            start_time: Instant::now() - Duration::from_millis(50),
993            end_time: Instant::now(),
994            success: true,
995            error_message: None,
996            results_count: 5,
997            index_used: Some("hnsw".to_string()),
998            cache_hit: false,
999        };
1000
1001        collector.record_query(query);
1002
1003        let stats = collector.get_statistics();
1004        assert_eq!(stats.total_queries, 1);
1005        assert_eq!(stats.successful_queries, 1);
1006        assert_eq!(stats.failed_queries, 0);
1007    }
1008
1009    #[test]
1010    fn test_alert_generation() {
1011        let thresholds = AlertThresholds {
1012            max_query_latency: 100.0,
1013            max_error_rate: 0.05,
1014            min_recall_at_10: 0.90,
1015            max_memory_usage: 1024,
1016            max_cpu_usage: 0.80,
1017        };
1018
1019        let alert_manager = AlertManager::new(thresholds);
1020
1021        let query = QueryInfo {
1022            query_id: "slow_query".to_string(),
1023            query_type: QueryType::KNNSearch,
1024            query_text: None,
1025            vector_dimensions: None,
1026            k_value: None,
1027            threshold: None,
1028            start_time: Instant::now() - Duration::from_millis(200), // Exceeds threshold
1029            end_time: Instant::now(),
1030            success: true,
1031            error_message: None,
1032            results_count: 5,
1033            index_used: None,
1034            cache_hit: false,
1035        };
1036
1037        alert_manager.check_query_alerts(&query);
1038
1039        let alerts = alert_manager.get_active_alerts();
1040        assert_eq!(alerts.len(), 1);
1041        assert!(matches!(alerts[0].alert_type, AlertType::HighLatency));
1042    }
1043
1044    #[test]
1045    fn test_performance_monitor() {
1046        let config = MonitoringConfig::default();
1047        let monitor = EnhancedPerformanceMonitor::new(config);
1048
1049        let query = QueryInfo {
1050            query_id: "test".to_string(),
1051            query_type: QueryType::KNNSearch,
1052            query_text: None,
1053            vector_dimensions: Some(384),
1054            k_value: Some(10),
1055            threshold: None,
1056            start_time: Instant::now() - Duration::from_millis(25),
1057            end_time: Instant::now(),
1058            success: true,
1059            error_message: None,
1060            results_count: 8,
1061            index_used: Some("hnsw".to_string()),
1062            cache_hit: true,
1063        };
1064
1065        monitor.record_query(query);
1066
1067        let dashboard = monitor.get_dashboard_data();
1068        assert!(dashboard.last_updated <= SystemTime::now());
1069    }
1070
1071    #[test]
1072    fn test_influxdb_export_format() {
1073        let mut config = MonitoringConfig::default();
1074        config.export_config.format = ExportFormat::InfluxDB;
1075        let monitor = EnhancedPerformanceMonitor::new(config);
1076
1077        let result = monitor.export_metrics();
1078        assert!(result.is_ok(), "InfluxDB export should succeed");
1079        let output = result.unwrap();
1080        // InfluxDB line protocol: measurement fields timestamp
1081        assert!(
1082            output.contains("vector_search_queries"),
1083            "Should contain query measurement"
1084        );
1085        assert!(
1086            output.contains("total_queries="),
1087            "Should contain total_queries field"
1088        );
1089        assert!(
1090            output.contains("vector_search_system"),
1091            "Should contain system measurement"
1092        );
1093        assert!(
1094            output.contains("vector_search_quality"),
1095            "Should contain quality measurement"
1096        );
1097        // Each line must have exactly the format: measurement fields timestamp
1098        for line in output.lines() {
1099            let parts: Vec<&str> = line.splitn(3, ' ').collect();
1100            assert_eq!(
1101                parts.len(),
1102                3,
1103                "Each InfluxDB line must have measurement, fields, and timestamp: {:?}",
1104                line
1105            );
1106        }
1107    }
1108
1109    #[test]
1110    fn test_elasticsearch_export_format() {
1111        let mut config = MonitoringConfig::default();
1112        config.export_config.format = ExportFormat::ElasticSearch;
1113        let monitor = EnhancedPerformanceMonitor::new(config);
1114
1115        let result = monitor.export_metrics();
1116        assert!(result.is_ok(), "ElasticSearch export should succeed");
1117        let output = result.unwrap();
1118
1119        // Elasticsearch bulk format: action line + doc line
1120        let lines: Vec<&str> = output.lines().collect();
1121        assert!(
1122            lines.len() >= 2,
1123            "Should have at least action and doc lines"
1124        );
1125        assert_eq!(lines[0], "{\"index\":{}}", "First line must be bulk action");
1126
1127        // The doc line must be valid JSON
1128        let doc: serde_json::Value =
1129            serde_json::from_str(lines[1]).expect("Elasticsearch document line must be valid JSON");
1130        assert!(
1131            doc.get("@timestamp").is_some(),
1132            "Document must contain @timestamp"
1133        );
1134        assert!(
1135            doc.get("query_statistics").is_some(),
1136            "Document must contain query_statistics"
1137        );
1138        assert!(
1139            doc.get("system_statistics").is_some(),
1140            "Document must contain system_statistics"
1141        );
1142    }
1143
1144    #[test]
1145    fn test_export_json_format() {
1146        let config = MonitoringConfig::default(); // default format is JSON
1147        let monitor = EnhancedPerformanceMonitor::new(config);
1148
1149        let result = monitor.export_metrics();
1150        assert!(result.is_ok(), "JSON export should succeed");
1151        let output = result.unwrap();
1152        let parsed: serde_json::Value =
1153            serde_json::from_str(&output).expect("JSON export must be valid JSON");
1154        assert!(parsed.get("query_statistics").is_some());
1155    }
1156}