1use anyhow::{anyhow, Result};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::Arc;
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct MonitoringConfig {
16 pub enable_real_time: bool,
18 pub enable_query_logging: bool,
20 pub enable_system_monitoring: bool,
22 pub enable_quality_metrics: bool,
24 pub retention_period: Duration,
26 pub sampling_rate: f32,
28 pub alert_thresholds: AlertThresholds,
30 pub export_config: ExportConfig,
32}
33
34impl Default for MonitoringConfig {
35 fn default() -> Self {
36 Self {
37 enable_real_time: true,
38 enable_query_logging: true,
39 enable_system_monitoring: true,
40 enable_quality_metrics: true,
41 retention_period: Duration::from_secs(24 * 60 * 60), sampling_rate: 1.0,
43 alert_thresholds: AlertThresholds::default(),
44 export_config: ExportConfig::default(),
45 }
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct AlertThresholds {
52 pub max_query_latency: f64,
54 pub max_error_rate: f32,
56 pub min_recall_at_10: f32,
58 pub max_memory_usage: u64,
60 pub max_cpu_usage: f32,
62}
63
64impl Default for AlertThresholds {
65 fn default() -> Self {
66 Self {
67 max_query_latency: 1000.0, max_error_rate: 0.05, min_recall_at_10: 0.90, max_memory_usage: 8192, max_cpu_usage: 0.80, }
73 }
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ExportConfig {
79 pub format: ExportFormat,
81 pub export_interval: Duration,
83 pub destination: ExportDestination,
85 pub include_detailed: bool,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub enum ExportFormat {
91 JSON,
92 CSV,
93 Prometheus,
94 InfluxDB,
95 ElasticSearch,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub enum ExportDestination {
100 File(String),
101 Http(String),
102 Database(String),
103 Console,
104}
105
106impl Default for ExportConfig {
107 fn default() -> Self {
108 Self {
109 format: ExportFormat::JSON,
110 export_interval: Duration::from_secs(60),
111 destination: ExportDestination::Console,
112 include_detailed: false,
113 }
114 }
115}
116
117pub struct EnhancedPerformanceMonitor {
119 config: MonitoringConfig,
120 query_metrics: Arc<RwLock<QueryMetricsCollector>>,
121 system_metrics: Arc<RwLock<SystemMetricsCollector>>,
122 quality_metrics: Arc<RwLock<QualityMetricsCollector>>,
123 alert_manager: AlertManager,
124 analytics_engine: AnalyticsEngine,
125 dashboard_data: Arc<RwLock<DashboardData>>,
126}
127
128impl EnhancedPerformanceMonitor {
129 pub fn new(config: MonitoringConfig) -> Self {
131 Self {
132 config: config.clone(),
133 query_metrics: Arc::new(RwLock::new(QueryMetricsCollector::new())),
134 system_metrics: Arc::new(RwLock::new(SystemMetricsCollector::new())),
135 quality_metrics: Arc::new(RwLock::new(QualityMetricsCollector::new())),
136 alert_manager: AlertManager::new(config.alert_thresholds.clone()),
137 analytics_engine: AnalyticsEngine::new(),
138 dashboard_data: Arc::new(RwLock::new(DashboardData::new())),
139 }
140 }
141
142 pub fn record_query(&self, query_info: QueryInfo) {
144 if !self.config.enable_real_time {
145 return;
146 }
147
148 if {
150 #[allow(unused_imports)]
151 use scirs2_core::random::{Random, Rng};
152 let mut rng = Random::seed(42);
153 rng.gen_range(0.0..1.0)
154 } > self.config.sampling_rate
155 {
156 return;
157 }
158
159 {
161 let mut metrics = self.query_metrics.write();
162 metrics.record_query(query_info.clone());
163 }
164
165 self.alert_manager.check_query_alerts(&query_info);
167
168 {
170 let mut dashboard = self.dashboard_data.write();
171 dashboard.update_query_stats(&query_info);
172 }
173
174 self.analytics_engine.analyze_query(&query_info);
176 }
177
178 pub fn record_system_metrics(&self, metrics: SystemMetrics) {
180 if !self.config.enable_system_monitoring {
181 return;
182 }
183
184 {
185 let mut collector = self.system_metrics.write();
186 collector.record_metrics(metrics.clone());
187 }
188
189 self.alert_manager.check_system_alerts(&metrics);
191
192 {
194 let mut dashboard = self.dashboard_data.write();
195 dashboard.update_system_stats(&metrics);
196 }
197 }
198
199 pub fn record_quality_metrics(&self, metrics: QualityMetrics) {
201 if !self.config.enable_quality_metrics {
202 return;
203 }
204
205 {
206 let mut collector = self.quality_metrics.write();
207 collector.record_metrics(metrics.clone());
208 }
209
210 self.alert_manager.check_quality_alerts(&metrics);
212
213 {
215 let mut dashboard = self.dashboard_data.write();
216 dashboard.update_quality_stats(&metrics);
217 }
218 }
219
220 pub fn get_dashboard_data(&self) -> DashboardData {
222 self.dashboard_data.read().clone()
223 }
224
225 pub fn generate_analytics_report(&self) -> AnalyticsReport {
227 let query_stats = self.query_metrics.read().get_statistics();
228 let system_stats = self.system_metrics.read().get_statistics();
229 let quality_stats = self.quality_metrics.read().get_statistics();
230 let alerts = self.alert_manager.get_active_alerts();
231
232 AnalyticsReport {
233 timestamp: SystemTime::now(),
234 query_statistics: query_stats,
235 system_statistics: system_stats,
236 quality_statistics: quality_stats,
237 active_alerts: alerts,
238 trends: self.analytics_engine.get_trends(),
239 recommendations: self.analytics_engine.get_recommendations(),
240 }
241 }
242
243 pub fn export_metrics(&self) -> Result<String> {
245 let report = self.generate_analytics_report();
246
247 match self.config.export_config.format {
248 ExportFormat::JSON => serde_json::to_string_pretty(&report)
249 .map_err(|e| anyhow!("JSON serialization error: {}", e)),
250 ExportFormat::CSV => self.generate_csv_export(&report),
251 ExportFormat::Prometheus => self.generate_prometheus_export(&report),
252 ExportFormat::InfluxDB => self.generate_influxdb_export(&report),
253 ExportFormat::ElasticSearch => self.generate_elasticsearch_export(&report),
254 }
255 }
256
257 fn generate_csv_export(&self, report: &AnalyticsReport) -> Result<String> {
259 let mut csv = String::new();
260 csv.push_str("metric,value,timestamp\n");
261
262 csv.push_str(&format!(
263 "total_queries,{},{}\n",
264 report.query_statistics.total_queries,
265 report
266 .timestamp
267 .duration_since(UNIX_EPOCH)
268 .expect("SystemTime should be after UNIX_EPOCH")
269 .as_secs()
270 ));
271
272 csv.push_str(&format!(
273 "avg_latency,{:.2},{}\n",
274 report.query_statistics.average_latency.as_millis(),
275 report
276 .timestamp
277 .duration_since(UNIX_EPOCH)
278 .expect("SystemTime should be after UNIX_EPOCH")
279 .as_secs()
280 ));
281
282 Ok(csv)
283 }
284
285 fn generate_prometheus_export(&self, report: &AnalyticsReport) -> Result<String> {
287 let mut prometheus = String::new();
288
289 prometheus.push_str("# HELP vector_search_queries_total Total number of queries\n");
290 prometheus.push_str("# TYPE vector_search_queries_total counter\n");
291 prometheus.push_str(&format!(
292 "vector_search_queries_total {}\n",
293 report.query_statistics.total_queries
294 ));
295
296 prometheus.push_str("# HELP vector_search_latency_seconds Query latency in seconds\n");
297 prometheus.push_str("# TYPE vector_search_latency_seconds histogram\n");
298 prometheus.push_str(&format!(
299 "vector_search_latency_seconds {:.6}\n",
300 report.query_statistics.average_latency.as_secs_f64()
301 ));
302
303 Ok(prometheus)
304 }
305
306 fn generate_influxdb_export(&self, report: &AnalyticsReport) -> Result<String> {
310 let ts_ns = report
311 .timestamp
312 .duration_since(UNIX_EPOCH)
313 .map_err(|e| anyhow!("SystemTime before UNIX_EPOCH: {}", e))?
314 .as_nanos();
315
316 let mut output = String::new();
317
318 output.push_str(&format!(
320 "vector_search_queries total_queries={}u,successful_queries={}u,failed_queries={}u,cache_hit_rate={:.6},avg_latency_ms={:.6} {}\n",
321 report.query_statistics.total_queries,
322 report.query_statistics.successful_queries,
323 report.query_statistics.failed_queries,
324 report.query_statistics.cache_hit_rate,
325 report.query_statistics.average_latency.as_secs_f64() * 1000.0,
326 ts_ns,
327 ));
328
329 output.push_str(&format!(
331 "vector_search_system cpu_usage={:.6},memory_usage={}u,peak_memory_usage={}u,peak_cpu_usage={:.6} {}\n",
332 report.system_statistics.current_cpu_usage,
333 report.system_statistics.current_memory_usage,
334 report.system_statistics.peak_memory_usage,
335 report.system_statistics.peak_cpu_usage,
336 ts_ns,
337 ));
338
339 output.push_str(&format!(
341 "vector_search_quality precision_at_10={:.6},recall_at_10={:.6},f1_score={:.6},trend_precision={:.6},trend_recall={:.6} {}\n",
342 report.quality_statistics.average_precision_at_10,
343 report.quality_statistics.average_recall_at_10,
344 report.quality_statistics.average_f1_score,
345 report.quality_statistics.trend_precision,
346 report.quality_statistics.trend_recall,
347 ts_ns,
348 ));
349
350 Ok(output)
351 }
352
353 fn generate_elasticsearch_export(&self, report: &AnalyticsReport) -> Result<String> {
357 let ts_secs = report
358 .timestamp
359 .duration_since(UNIX_EPOCH)
360 .map_err(|e| anyhow!("SystemTime before UNIX_EPOCH: {}", e))?
361 .as_secs();
362
363 let doc = serde_json::json!({
364 "@timestamp": ts_secs,
365 "query_statistics": {
366 "total_queries": report.query_statistics.total_queries,
367 "successful_queries": report.query_statistics.successful_queries,
368 "failed_queries": report.query_statistics.failed_queries,
369 "average_latency_ms": report.query_statistics.average_latency.as_secs_f64() * 1000.0,
370 "min_latency_ms": report.query_statistics.min_latency.as_secs_f64() * 1000.0,
371 "max_latency_ms": report.query_statistics.max_latency.as_secs_f64() * 1000.0,
372 "cache_hit_rate": report.query_statistics.cache_hit_rate,
373 "throughput_qps": report.query_statistics.throughput_qps,
374 },
375 "system_statistics": {
376 "cpu_usage": report.system_statistics.current_cpu_usage,
377 "peak_cpu_usage": report.system_statistics.peak_cpu_usage,
378 "memory_usage_bytes": report.system_statistics.current_memory_usage,
379 "peak_memory_usage_bytes": report.system_statistics.peak_memory_usage,
380 },
381 "quality_statistics": {
382 "precision_at_10": report.quality_statistics.average_precision_at_10,
383 "recall_at_10": report.quality_statistics.average_recall_at_10,
384 "f1_score": report.quality_statistics.average_f1_score,
385 "trend_precision": report.quality_statistics.trend_precision,
386 "trend_recall": report.quality_statistics.trend_recall,
387 },
388 "active_alerts": report.active_alerts.len(),
389 "recommendations": report.recommendations.len(),
390 });
391
392 let action_line = "{\"index\":{}}\n";
393 let doc_line = serde_json::to_string(&doc)
394 .map_err(|e| anyhow!("Elasticsearch JSON serialization error: {}", e))?;
395
396 Ok(format!("{}{}\n", action_line, doc_line))
397 }
398
399 pub fn start_background_monitoring(&self) {
401 }
404
405 pub fn stop_monitoring(&self) {
407 }
409}
410
411#[derive(Debug, Clone)]
413pub struct QueryInfo {
414 pub query_id: String,
415 pub query_type: QueryType,
416 pub query_text: Option<String>,
417 pub vector_dimensions: Option<usize>,
418 pub k_value: Option<usize>,
419 pub threshold: Option<f32>,
420 pub start_time: Instant,
421 pub end_time: Instant,
422 pub success: bool,
423 pub error_message: Option<String>,
424 pub results_count: usize,
425 pub index_used: Option<String>,
426 pub cache_hit: bool,
427}
428
429#[derive(Debug, Clone)]
430pub enum QueryType {
431 KNNSearch,
432 ThresholdSearch,
433 SimilarityCalculation,
434 TextEmbedding,
435 IndexUpdate,
436 Custom(String),
437}
438
439#[derive(Debug)]
441pub struct QueryMetricsCollector {
442 queries: VecDeque<QueryInfo>,
443 statistics: QueryStatistics,
444 max_retention: usize,
445}
446
447impl Default for QueryMetricsCollector {
448 fn default() -> Self {
449 Self::new()
450 }
451}
452
453impl QueryMetricsCollector {
454 pub fn new() -> Self {
455 Self {
456 queries: VecDeque::new(),
457 statistics: QueryStatistics::default(),
458 max_retention: 10000, }
460 }
461
462 pub fn record_query(&mut self, query: QueryInfo) {
463 let latency = query.end_time.duration_since(query.start_time);
464
465 self.statistics.total_queries += 1;
467 if query.success {
468 self.statistics.successful_queries += 1;
469 } else {
470 self.statistics.failed_queries += 1;
471 }
472
473 if self.statistics.total_queries == 1 {
475 self.statistics.average_latency = latency;
476 self.statistics.min_latency = latency;
477 self.statistics.max_latency = latency;
478 } else {
479 let total_time = self
481 .statistics
482 .average_latency
483 .mul_f64(self.statistics.total_queries as f64 - 1.0)
484 + latency;
485 self.statistics.average_latency =
486 total_time.div_f64(self.statistics.total_queries as f64);
487
488 if latency < self.statistics.min_latency {
489 self.statistics.min_latency = latency;
490 }
491 if latency > self.statistics.max_latency {
492 self.statistics.max_latency = latency;
493 }
494 }
495
496 let latency_ms = latency.as_millis() as f64;
498 if latency_ms < 10.0 {
499 self.statistics.latency_distribution.p10 += 1;
500 } else if latency_ms < 50.0 {
501 self.statistics.latency_distribution.p50 += 1;
502 } else if latency_ms < 100.0 {
503 self.statistics.latency_distribution.p90 += 1;
504 } else if latency_ms < 500.0 {
505 self.statistics.latency_distribution.p95 += 1;
506 } else {
507 self.statistics.latency_distribution.p99 += 1;
508 }
509
510 if query.cache_hit {
512 self.statistics.cache_hit_rate =
513 (self.statistics.cache_hit_rate * (self.statistics.total_queries - 1) as f32 + 1.0)
514 / self.statistics.total_queries as f32;
515 } else {
516 self.statistics.cache_hit_rate = (self.statistics.cache_hit_rate
517 * (self.statistics.total_queries - 1) as f32)
518 / self.statistics.total_queries as f32;
519 }
520
521 self.queries.push_back(query);
523 if self.queries.len() > self.max_retention {
524 self.queries.pop_front();
525 }
526 }
527
528 pub fn get_statistics(&self) -> QueryStatistics {
529 self.statistics.clone()
530 }
531
532 pub fn get_recent_queries(&self, count: usize) -> Vec<&QueryInfo> {
533 self.queries.iter().rev().take(count).collect()
534 }
535}
536
537#[derive(Debug, Clone, Default, Serialize, Deserialize)]
539pub struct QueryStatistics {
540 pub total_queries: u64,
541 pub successful_queries: u64,
542 pub failed_queries: u64,
543 pub average_latency: Duration,
544 pub min_latency: Duration,
545 pub max_latency: Duration,
546 pub latency_distribution: LatencyDistribution,
547 pub cache_hit_rate: f32,
548 pub throughput_qps: f32,
549}
550
551#[derive(Debug, Clone, Default, Serialize, Deserialize)]
552pub struct LatencyDistribution {
553 pub p10: u64, pub p50: u64, pub p90: u64, pub p95: u64, pub p99: u64, }
559
560#[derive(Debug, Clone, Serialize, Deserialize)]
562pub struct SystemMetrics {
563 pub timestamp: SystemTime,
564 pub cpu_usage: f32,
565 pub memory_usage: u64, pub memory_total: u64, pub disk_usage: u64, pub disk_total: u64, pub network_in: u64, pub network_out: u64, pub open_file_descriptors: u32,
572 pub thread_count: u32,
573}
574
575#[derive(Debug)]
577pub struct SystemMetricsCollector {
578 metrics_history: VecDeque<SystemMetrics>,
579 statistics: SystemStatistics,
580 max_retention: usize,
581}
582
583impl Default for SystemMetricsCollector {
584 fn default() -> Self {
585 Self::new()
586 }
587}
588
589impl SystemMetricsCollector {
590 pub fn new() -> Self {
591 Self {
592 metrics_history: VecDeque::new(),
593 statistics: SystemStatistics::default(),
594 max_retention: 1440, }
596 }
597
598 pub fn record_metrics(&mut self, metrics: SystemMetrics) {
599 self.statistics.current_cpu_usage = metrics.cpu_usage;
601 self.statistics.current_memory_usage = metrics.memory_usage;
602
603 if metrics.cpu_usage > self.statistics.peak_cpu_usage {
604 self.statistics.peak_cpu_usage = metrics.cpu_usage;
605 }
606
607 if metrics.memory_usage > self.statistics.peak_memory_usage {
608 self.statistics.peak_memory_usage = metrics.memory_usage;
609 }
610
611 self.metrics_history.push_back(metrics);
613 if self.metrics_history.len() > self.max_retention {
614 self.metrics_history.pop_front();
615 }
616 }
617
618 pub fn get_statistics(&self) -> SystemStatistics {
619 self.statistics.clone()
620 }
621
622 pub fn get_recent_metrics(&self, count: usize) -> Vec<&SystemMetrics> {
623 self.metrics_history.iter().rev().take(count).collect()
624 }
625}
626
627#[derive(Debug, Clone, Default, Serialize, Deserialize)]
629pub struct SystemStatistics {
630 pub current_cpu_usage: f32,
631 pub peak_cpu_usage: f32,
632 pub current_memory_usage: u64,
633 pub peak_memory_usage: u64,
634 pub average_cpu_usage: f32,
635 pub average_memory_usage: u64,
636}
637
638#[derive(Debug, Clone, Serialize, Deserialize)]
640pub struct QualityMetrics {
641 pub timestamp: SystemTime,
642 pub precision_at_1: f32,
643 pub precision_at_5: f32,
644 pub precision_at_10: f32,
645 pub recall_at_1: f32,
646 pub recall_at_5: f32,
647 pub recall_at_10: f32,
648 pub f1_score: f32,
649 pub mrr: f32, pub ndcg: f32, pub query_coverage: f32,
652 pub result_diversity: f32,
653}
654
655#[derive(Debug)]
657pub struct QualityMetricsCollector {
658 metrics_history: VecDeque<QualityMetrics>,
659 statistics: QualityStatistics,
660 max_retention: usize,
661}
662
663impl Default for QualityMetricsCollector {
664 fn default() -> Self {
665 Self::new()
666 }
667}
668
669impl QualityMetricsCollector {
670 pub fn new() -> Self {
671 Self {
672 metrics_history: VecDeque::new(),
673 statistics: QualityStatistics::default(),
674 max_retention: 1000,
675 }
676 }
677
678 pub fn record_metrics(&mut self, metrics: QualityMetrics) {
679 let count = self.metrics_history.len() as f32;
681 if count > 0.0 {
682 self.statistics.average_precision_at_10 =
683 (self.statistics.average_precision_at_10 * count + metrics.precision_at_10)
684 / (count + 1.0);
685 self.statistics.average_recall_at_10 = (self.statistics.average_recall_at_10 * count
686 + metrics.recall_at_10)
687 / (count + 1.0);
688 self.statistics.average_f1_score =
689 (self.statistics.average_f1_score * count + metrics.f1_score) / (count + 1.0);
690 } else {
691 self.statistics.average_precision_at_10 = metrics.precision_at_10;
692 self.statistics.average_recall_at_10 = metrics.recall_at_10;
693 self.statistics.average_f1_score = metrics.f1_score;
694 }
695
696 self.metrics_history.push_back(metrics);
698 if self.metrics_history.len() > self.max_retention {
699 self.metrics_history.pop_front();
700 }
701 }
702
703 pub fn get_statistics(&self) -> QualityStatistics {
704 self.statistics.clone()
705 }
706}
707
708#[derive(Debug, Clone, Default, Serialize, Deserialize)]
710pub struct QualityStatistics {
711 pub average_precision_at_10: f32,
712 pub average_recall_at_10: f32,
713 pub average_f1_score: f32,
714 pub trend_precision: f32,
715 pub trend_recall: f32,
716}
717
718pub struct AlertManager {
720 thresholds: AlertThresholds,
721 active_alerts: Arc<RwLock<Vec<Alert>>>,
722}
723
724impl AlertManager {
725 pub fn new(thresholds: AlertThresholds) -> Self {
726 Self {
727 thresholds,
728 active_alerts: Arc::new(RwLock::new(Vec::new())),
729 }
730 }
731
732 pub fn check_query_alerts(&self, query: &QueryInfo) {
733 let latency_ms = query.end_time.duration_since(query.start_time).as_millis() as f64;
734
735 if latency_ms > self.thresholds.max_query_latency {
736 self.add_alert(Alert {
737 alert_type: AlertType::HighLatency,
738 severity: AlertSeverity::Warning,
739 message: format!(
740 "Query latency {}ms exceeds threshold {}ms",
741 latency_ms, self.thresholds.max_query_latency
742 ),
743 timestamp: SystemTime::now(),
744 source: "query_monitor".to_string(),
745 });
746 }
747 }
748
749 pub fn check_system_alerts(&self, metrics: &SystemMetrics) {
750 if metrics.cpu_usage > self.thresholds.max_cpu_usage {
751 self.add_alert(Alert {
752 alert_type: AlertType::HighCpuUsage,
753 severity: AlertSeverity::Warning,
754 message: format!(
755 "CPU usage {:.1}% exceeds threshold {:.1}%",
756 metrics.cpu_usage * 100.0,
757 self.thresholds.max_cpu_usage * 100.0
758 ),
759 timestamp: SystemTime::now(),
760 source: "system_monitor".to_string(),
761 });
762 }
763
764 let memory_mb = metrics.memory_usage / (1024 * 1024);
765 if memory_mb > self.thresholds.max_memory_usage {
766 self.add_alert(Alert {
767 alert_type: AlertType::HighMemoryUsage,
768 severity: AlertSeverity::Critical,
769 message: format!(
770 "Memory usage {}MB exceeds threshold {}MB",
771 memory_mb, self.thresholds.max_memory_usage
772 ),
773 timestamp: SystemTime::now(),
774 source: "system_monitor".to_string(),
775 });
776 }
777 }
778
779 pub fn check_quality_alerts(&self, metrics: &QualityMetrics) {
780 if metrics.recall_at_10 < self.thresholds.min_recall_at_10 {
781 self.add_alert(Alert {
782 alert_type: AlertType::LowRecall,
783 severity: AlertSeverity::Warning,
784 message: format!(
785 "Recall@10 {:.3} below threshold {:.3}",
786 metrics.recall_at_10, self.thresholds.min_recall_at_10
787 ),
788 timestamp: SystemTime::now(),
789 source: "quality_monitor".to_string(),
790 });
791 }
792 }
793
794 fn add_alert(&self, alert: Alert) {
795 let mut alerts = self.active_alerts.write();
796 alerts.push(alert);
797
798 let cutoff = SystemTime::now() - Duration::from_secs(3600);
800 alerts.retain(|a| a.timestamp > cutoff);
801 }
802
803 pub fn get_active_alerts(&self) -> Vec<Alert> {
804 self.active_alerts.read().clone()
805 }
806}
807
808#[derive(Debug, Clone, Serialize, Deserialize)]
810pub struct Alert {
811 pub alert_type: AlertType,
812 pub severity: AlertSeverity,
813 pub message: String,
814 pub timestamp: SystemTime,
815 pub source: String,
816}
817
818#[derive(Debug, Clone, Serialize, Deserialize)]
819pub enum AlertType {
820 HighLatency,
821 HighCpuUsage,
822 HighMemoryUsage,
823 LowRecall,
824 HighErrorRate,
825 IndexCorruption,
826 ServiceDown,
827}
828
829#[derive(Debug, Clone, Serialize, Deserialize)]
830pub enum AlertSeverity {
831 Info,
832 Warning,
833 Critical,
834 Emergency,
835}
836
837pub struct AnalyticsEngine {
839 trends: Arc<RwLock<HashMap<String, TrendData>>>,
840 recommendations: Arc<RwLock<Vec<Recommendation>>>,
841}
842
843impl Default for AnalyticsEngine {
844 fn default() -> Self {
845 Self::new()
846 }
847}
848
849impl AnalyticsEngine {
850 pub fn new() -> Self {
851 Self {
852 trends: Arc::new(RwLock::new(HashMap::new())),
853 recommendations: Arc::new(RwLock::new(Vec::new())),
854 }
855 }
856
857 pub fn analyze_query(&self, _query: &QueryInfo) {
858 }
861
862 pub fn get_trends(&self) -> HashMap<String, TrendData> {
863 self.trends.read().clone()
864 }
865
866 pub fn get_recommendations(&self) -> Vec<Recommendation> {
867 self.recommendations.read().clone()
868 }
869}
870
871#[derive(Debug, Clone, Serialize, Deserialize)]
873pub struct TrendData {
874 pub metric_name: String,
875 pub values: Vec<f64>,
876 pub timestamps: Vec<SystemTime>,
877 pub trend_direction: TrendDirection,
878 pub confidence: f32,
879}
880
881#[derive(Debug, Clone, Serialize, Deserialize)]
882pub enum TrendDirection {
883 Increasing,
884 Decreasing,
885 Stable,
886 Volatile,
887}
888
889#[derive(Debug, Clone, Serialize, Deserialize)]
891pub struct Recommendation {
892 pub category: RecommendationCategory,
893 pub priority: RecommendationPriority,
894 pub title: String,
895 pub description: String,
896 pub estimated_impact: String,
897 pub implementation_effort: String,
898}
899
900#[derive(Debug, Clone, Serialize, Deserialize)]
901pub enum RecommendationCategory {
902 Performance,
903 Quality,
904 ResourceOptimization,
905 Configuration,
906 Maintenance,
907}
908
909#[derive(Debug, Clone, Serialize, Deserialize)]
910pub enum RecommendationPriority {
911 Low,
912 Medium,
913 High,
914 Critical,
915}
916
917#[derive(Debug, Clone, Serialize, Deserialize)]
919pub struct DashboardData {
920 pub last_updated: SystemTime,
921 pub query_stats: QueryStatistics,
922 pub system_stats: SystemStatistics,
923 pub quality_stats: QualityStatistics,
924 pub alerts_count: usize,
925 pub trends: HashMap<String, Vec<f64>>,
926}
927
928impl DashboardData {
929 pub fn new() -> Self {
930 Self {
931 last_updated: SystemTime::now(),
932 query_stats: QueryStatistics::default(),
933 system_stats: SystemStatistics::default(),
934 quality_stats: QualityStatistics::default(),
935 alerts_count: 0,
936 trends: HashMap::new(),
937 }
938 }
939
940 pub fn update_query_stats(&mut self, _query: &QueryInfo) {
941 self.last_updated = SystemTime::now();
943 }
945
946 pub fn update_system_stats(&mut self, _metrics: &SystemMetrics) {
947 self.last_updated = SystemTime::now();
949 }
951
952 pub fn update_quality_stats(&mut self, _metrics: &QualityMetrics) {
953 self.last_updated = SystemTime::now();
955 }
957}
958
959impl Default for DashboardData {
960 fn default() -> Self {
961 Self::new()
962 }
963}
964
965#[derive(Debug, Clone, Serialize, Deserialize)]
967pub struct AnalyticsReport {
968 pub timestamp: SystemTime,
969 pub query_statistics: QueryStatistics,
970 pub system_statistics: SystemStatistics,
971 pub quality_statistics: QualityStatistics,
972 pub active_alerts: Vec<Alert>,
973 pub trends: HashMap<String, TrendData>,
974 pub recommendations: Vec<Recommendation>,
975}
976
977#[cfg(test)]
978mod tests {
979 use super::*;
980
981 #[test]
982 fn test_query_metrics_collection() {
983 let mut collector = QueryMetricsCollector::new();
984
985 let query = QueryInfo {
986 query_id: "test_query".to_string(),
987 query_type: QueryType::KNNSearch,
988 query_text: Some("test query".to_string()),
989 vector_dimensions: Some(384),
990 k_value: Some(10),
991 threshold: None,
992 start_time: Instant::now() - Duration::from_millis(50),
993 end_time: Instant::now(),
994 success: true,
995 error_message: None,
996 results_count: 5,
997 index_used: Some("hnsw".to_string()),
998 cache_hit: false,
999 };
1000
1001 collector.record_query(query);
1002
1003 let stats = collector.get_statistics();
1004 assert_eq!(stats.total_queries, 1);
1005 assert_eq!(stats.successful_queries, 1);
1006 assert_eq!(stats.failed_queries, 0);
1007 }
1008
1009 #[test]
1010 fn test_alert_generation() {
1011 let thresholds = AlertThresholds {
1012 max_query_latency: 100.0,
1013 max_error_rate: 0.05,
1014 min_recall_at_10: 0.90,
1015 max_memory_usage: 1024,
1016 max_cpu_usage: 0.80,
1017 };
1018
1019 let alert_manager = AlertManager::new(thresholds);
1020
1021 let query = QueryInfo {
1022 query_id: "slow_query".to_string(),
1023 query_type: QueryType::KNNSearch,
1024 query_text: None,
1025 vector_dimensions: None,
1026 k_value: None,
1027 threshold: None,
1028 start_time: Instant::now() - Duration::from_millis(200), end_time: Instant::now(),
1030 success: true,
1031 error_message: None,
1032 results_count: 5,
1033 index_used: None,
1034 cache_hit: false,
1035 };
1036
1037 alert_manager.check_query_alerts(&query);
1038
1039 let alerts = alert_manager.get_active_alerts();
1040 assert_eq!(alerts.len(), 1);
1041 assert!(matches!(alerts[0].alert_type, AlertType::HighLatency));
1042 }
1043
1044 #[test]
1045 fn test_performance_monitor() {
1046 let config = MonitoringConfig::default();
1047 let monitor = EnhancedPerformanceMonitor::new(config);
1048
1049 let query = QueryInfo {
1050 query_id: "test".to_string(),
1051 query_type: QueryType::KNNSearch,
1052 query_text: None,
1053 vector_dimensions: Some(384),
1054 k_value: Some(10),
1055 threshold: None,
1056 start_time: Instant::now() - Duration::from_millis(25),
1057 end_time: Instant::now(),
1058 success: true,
1059 error_message: None,
1060 results_count: 8,
1061 index_used: Some("hnsw".to_string()),
1062 cache_hit: true,
1063 };
1064
1065 monitor.record_query(query);
1066
1067 let dashboard = monitor.get_dashboard_data();
1068 assert!(dashboard.last_updated <= SystemTime::now());
1069 }
1070
1071 #[test]
1072 fn test_influxdb_export_format() {
1073 let mut config = MonitoringConfig::default();
1074 config.export_config.format = ExportFormat::InfluxDB;
1075 let monitor = EnhancedPerformanceMonitor::new(config);
1076
1077 let result = monitor.export_metrics();
1078 assert!(result.is_ok(), "InfluxDB export should succeed");
1079 let output = result.unwrap();
1080 assert!(
1082 output.contains("vector_search_queries"),
1083 "Should contain query measurement"
1084 );
1085 assert!(
1086 output.contains("total_queries="),
1087 "Should contain total_queries field"
1088 );
1089 assert!(
1090 output.contains("vector_search_system"),
1091 "Should contain system measurement"
1092 );
1093 assert!(
1094 output.contains("vector_search_quality"),
1095 "Should contain quality measurement"
1096 );
1097 for line in output.lines() {
1099 let parts: Vec<&str> = line.splitn(3, ' ').collect();
1100 assert_eq!(
1101 parts.len(),
1102 3,
1103 "Each InfluxDB line must have measurement, fields, and timestamp: {:?}",
1104 line
1105 );
1106 }
1107 }
1108
1109 #[test]
1110 fn test_elasticsearch_export_format() {
1111 let mut config = MonitoringConfig::default();
1112 config.export_config.format = ExportFormat::ElasticSearch;
1113 let monitor = EnhancedPerformanceMonitor::new(config);
1114
1115 let result = monitor.export_metrics();
1116 assert!(result.is_ok(), "ElasticSearch export should succeed");
1117 let output = result.unwrap();
1118
1119 let lines: Vec<&str> = output.lines().collect();
1121 assert!(
1122 lines.len() >= 2,
1123 "Should have at least action and doc lines"
1124 );
1125 assert_eq!(lines[0], "{\"index\":{}}", "First line must be bulk action");
1126
1127 let doc: serde_json::Value =
1129 serde_json::from_str(lines[1]).expect("Elasticsearch document line must be valid JSON");
1130 assert!(
1131 doc.get("@timestamp").is_some(),
1132 "Document must contain @timestamp"
1133 );
1134 assert!(
1135 doc.get("query_statistics").is_some(),
1136 "Document must contain query_statistics"
1137 );
1138 assert!(
1139 doc.get("system_statistics").is_some(),
1140 "Document must contain system_statistics"
1141 );
1142 }
1143
1144 #[test]
1145 fn test_export_json_format() {
1146 let config = MonitoringConfig::default(); let monitor = EnhancedPerformanceMonitor::new(config);
1148
1149 let result = monitor.export_metrics();
1150 assert!(result.is_ok(), "JSON export should succeed");
1151 let output = result.unwrap();
1152 let parsed: serde_json::Value =
1153 serde_json::from_str(&output).expect("JSON export must be valid JSON");
1154 assert!(parsed.get("query_statistics").is_some());
1155 }
1156}