1use anyhow::{anyhow, Result};
7use chrono;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::sync::Arc;
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13use tokio::sync::broadcast;
14
15pub struct VectorAnalyticsEngine {
17 config: AnalyticsConfig,
18 metrics_collector: Arc<MetricsCollector>,
19 performance_monitor: Arc<PerformanceMonitor>,
20 query_analyzer: Arc<QueryAnalyzer>,
21 alert_manager: Arc<AlertManager>,
22 dashboard_data: Arc<RwLock<DashboardData>>,
23 event_broadcaster: broadcast::Sender<AnalyticsEvent>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct AnalyticsConfig {
29 pub enable_real_time: bool,
31 pub collection_interval: u64,
33 pub max_metrics_history: usize,
35 pub enable_query_analysis: bool,
37 pub enable_alerts: bool,
39 pub dashboard_refresh_interval: u64,
41 pub enable_tracing: bool,
43 pub enable_profiling: bool,
45 pub retention_days: u32,
47}
48
49impl Default for AnalyticsConfig {
50 fn default() -> Self {
51 Self {
52 enable_real_time: true,
53 collection_interval: 1,
54 max_metrics_history: 10000,
55 enable_query_analysis: true,
56 enable_alerts: true,
57 dashboard_refresh_interval: 5,
58 enable_tracing: true,
59 enable_profiling: true,
60 retention_days: 30,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub enum AnalyticsEvent {
68 QueryExecuted {
69 query_id: String,
70 operation_type: String,
71 duration: Duration,
72 result_count: usize,
73 success: bool,
74 timestamp: SystemTime,
75 },
76 IndexUpdated {
77 index_name: String,
78 operation: String,
79 vectors_affected: usize,
80 timestamp: SystemTime,
81 },
82 PerformanceAlert {
83 alert_type: AlertType,
84 message: String,
85 severity: AlertSeverity,
86 timestamp: SystemTime,
87 },
88 SystemMetric {
89 metric_name: String,
90 value: f64,
91 unit: String,
92 timestamp: SystemTime,
93 },
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub enum AlertType {
99 HighLatency,
100 LowThroughput,
101 HighMemoryUsage,
102 HighCpuUsage,
103 QualityDegradation,
104 IndexCorruption,
105 SystemError,
106 ResourceLimitReached,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub enum AlertSeverity {
112 Critical,
113 Warning,
114 Info,
115}
116
117#[derive(Debug)]
119pub struct MetricsCollector {
120 query_metrics: Arc<RwLock<QueryMetrics>>,
121 system_metrics: Arc<RwLock<SystemMetrics>>,
122 quality_metrics: Arc<RwLock<QualityMetrics>>,
123 custom_metrics: Arc<RwLock<HashMap<String, CustomMetric>>>,
124}
125
126#[derive(Debug, Clone, Default, Serialize, Deserialize)]
128pub struct QueryMetrics {
129 pub total_queries: u64,
130 pub successful_queries: u64,
131 pub failed_queries: u64,
132 pub average_latency: Duration,
133 pub p50_latency: Duration,
134 pub p95_latency: Duration,
135 pub p99_latency: Duration,
136 pub max_latency: Duration,
137 pub min_latency: Duration,
138 pub throughput_qps: f64,
139 pub latency_history: VecDeque<(SystemTime, Duration)>,
140 pub throughput_history: VecDeque<(SystemTime, f64)>,
141 pub error_rate: f64,
142 pub query_distribution: HashMap<String, u64>,
143}
144
145#[derive(Debug, Clone, Default, Serialize, Deserialize)]
147pub struct SystemMetrics {
148 pub cpu_usage: f64,
149 pub memory_usage: f64,
150 pub memory_total: u64,
151 pub memory_available: u64,
152 pub disk_usage: f64,
153 pub network_io: NetworkIO,
154 pub vector_count: u64,
155 pub index_size: u64,
156 pub cache_hit_ratio: f64,
157 pub gc_pressure: f64,
158 pub thread_count: u64,
159 pub system_load: f64,
160}
161
162#[derive(Debug, Clone, Default, Serialize, Deserialize)]
164pub struct NetworkIO {
165 pub bytes_sent: u64,
166 pub bytes_received: u64,
167 pub packets_sent: u64,
168 pub packets_received: u64,
169 pub connections_active: u64,
170}
171
172#[derive(Debug, Clone, Default, Serialize, Deserialize)]
174pub struct QualityMetrics {
175 pub recall_at_k: HashMap<usize, f64>,
176 pub precision_at_k: HashMap<usize, f64>,
177 pub ndcg_at_k: HashMap<usize, f64>,
178 pub mean_reciprocal_rank: f64,
179 pub average_similarity_score: f64,
180 pub similarity_distribution: Vec<f64>,
181 pub query_diversity: f64,
182 pub result_diversity: f64,
183 pub relevance_correlation: f64,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct CustomMetric {
189 pub name: String,
190 pub value: f64,
191 pub unit: String,
192 pub description: String,
193 pub timestamp: SystemTime,
194 pub tags: HashMap<String, String>,
195}
196
197#[derive(Debug)]
199pub struct PerformanceMonitor {
200 thresholds: Arc<RwLock<PerformanceThresholds>>,
201 alert_history: Arc<RwLock<VecDeque<Alert>>>,
202 current_alerts: Arc<RwLock<HashMap<String, Alert>>>,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct PerformanceThresholds {
208 pub max_latency_ms: u64,
209 pub min_throughput_qps: f64,
210 pub max_memory_usage_percent: f64,
211 pub max_cpu_usage_percent: f64,
212 pub min_cache_hit_ratio: f64,
213 pub max_error_rate_percent: f64,
214 pub min_recall_at_10: f64,
215 pub max_index_size_gb: f64,
216}
217
218impl Default for PerformanceThresholds {
219 fn default() -> Self {
220 Self {
221 max_latency_ms: 100,
222 min_throughput_qps: 100.0,
223 max_memory_usage_percent: 80.0,
224 max_cpu_usage_percent: 85.0,
225 min_cache_hit_ratio: 0.8,
226 max_error_rate_percent: 1.0,
227 min_recall_at_10: 0.9,
228 max_index_size_gb: 10.0,
229 }
230 }
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct Alert {
236 pub id: String,
237 pub alert_type: AlertType,
238 pub severity: AlertSeverity,
239 pub message: String,
240 pub timestamp: SystemTime,
241 pub resolved: bool,
242 pub resolved_timestamp: Option<SystemTime>,
243 pub metadata: HashMap<String, String>,
244}
245
246#[derive(Debug)]
248pub struct QueryAnalyzer {
249 query_patterns: Arc<RwLock<HashMap<String, QueryPattern>>>,
250 popular_queries: Arc<RwLock<VecDeque<PopularQuery>>>,
251 usage_trends: Arc<RwLock<UsageTrends>>,
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct QueryPattern {
257 pub pattern_id: String,
258 pub frequency: u64,
259 pub avg_latency: Duration,
260 pub success_rate: f64,
261 pub peak_hours: Vec<u8>, pub similarity_threshold_distribution: Vec<f64>,
263 pub result_size_distribution: Vec<usize>,
264 pub user_segments: HashMap<String, u64>,
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct PopularQuery {
270 pub query_text: String,
271 pub frequency: u64,
272 pub avg_similarity_score: f64,
273 pub timestamp: SystemTime,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct UsageTrends {
279 pub daily_query_counts: VecDeque<(SystemTime, u64)>,
280 pub hourly_patterns: [u64; 24],
281 pub weekly_patterns: [u64; 7],
282 pub growth_rate: f64,
283 pub seasonal_patterns: HashMap<String, f64>,
284 pub user_growth: f64,
285 pub feature_adoption: HashMap<String, f64>,
286}
287
288pub struct AlertManager {
290 config: AlertConfig,
291 notification_channels: Vec<Box<dyn NotificationChannel>>,
292 alert_rules: Arc<RwLock<Vec<AlertRule>>>,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
297pub struct AlertConfig {
298 pub enable_email: bool,
299 pub enable_slack: bool,
300 pub enable_webhook: bool,
301 pub email_recipients: Vec<String>,
302 pub slack_webhook: Option<String>,
303 pub webhook_url: Option<String>,
304 pub cooldown_period: Duration,
305 pub escalation_enabled: bool,
306}
307
308impl Default for AlertConfig {
309 fn default() -> Self {
310 Self {
311 enable_email: false,
312 enable_slack: false,
313 enable_webhook: false,
314 email_recipients: Vec::new(),
315 slack_webhook: None,
316 webhook_url: None,
317 cooldown_period: Duration::from_secs(300), escalation_enabled: false,
319 }
320 }
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize)]
325pub struct AlertRule {
326 pub name: String,
327 pub condition: String, pub severity: AlertSeverity,
329 pub enabled: bool,
330 pub cooldown: Duration,
331 pub actions: Vec<String>,
332}
333
334pub trait NotificationChannel: Send + Sync {
336 fn send_notification(&self, alert: &Alert) -> Result<()>;
337 fn get_channel_type(&self) -> String;
338}
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct DashboardData {
343 pub overview: OverviewData,
344 pub query_performance: QueryPerformanceData,
345 pub system_health: SystemHealthData,
346 pub quality_metrics: QualityMetricsData,
347 pub usage_analytics: UsageAnalyticsData,
348 pub alerts: Vec<Alert>,
349 pub last_updated: SystemTime,
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct OverviewData {
355 pub total_queries_today: u64,
356 pub average_latency: Duration,
357 pub current_qps: f64,
358 pub system_health_score: f64,
359 pub active_alerts: u64,
360 pub index_size: u64,
361 pub vector_count: u64,
362 pub cache_hit_ratio: f64,
363}
364
365#[derive(Debug, Clone, Serialize, Deserialize, Default)]
367pub struct QueryPerformanceData {
368 pub latency_trends: Vec<(SystemTime, Duration)>,
369 pub throughput_trends: Vec<(SystemTime, f64)>,
370 pub error_rate_trends: Vec<(SystemTime, f64)>,
371 pub top_slow_queries: Vec<(String, Duration)>,
372 pub query_distribution: HashMap<String, u64>,
373 pub performance_percentiles: HashMap<String, Duration>,
374}
375
376#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct SystemHealthData {
379 pub cpu_usage: f64,
380 pub memory_usage: f64,
381 pub disk_usage: f64,
382 pub network_throughput: f64,
383 pub resource_trends: Vec<(SystemTime, f64)>,
384 pub capacity_forecast: Vec<(SystemTime, f64)>,
385 pub bottlenecks: Vec<String>,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
390pub struct QualityMetricsData {
391 pub recall_trends: Vec<(SystemTime, f64)>,
392 pub precision_trends: Vec<(SystemTime, f64)>,
393 pub similarity_distribution: Vec<f64>,
394 pub quality_score: f64,
395 pub quality_trends: Vec<(SystemTime, f64)>,
396 pub benchmark_comparisons: HashMap<String, f64>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, Default)]
401pub struct UsageAnalyticsData {
402 pub user_activity: Vec<(SystemTime, u64)>,
403 pub popular_queries: Vec<PopularQuery>,
404 pub usage_patterns: HashMap<String, f64>,
405 pub growth_metrics: GrowthMetrics,
406 pub feature_usage: HashMap<String, u64>,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct GrowthMetrics {
412 pub daily_growth_rate: f64,
413 pub weekly_growth_rate: f64,
414 pub monthly_growth_rate: f64,
415 pub user_retention: f64,
416 pub query_volume_growth: f64,
417}
418
419impl Clone for VectorAnalyticsEngine {
420 fn clone(&self) -> Self {
421 Self {
422 config: self.config.clone(),
423 metrics_collector: Arc::clone(&self.metrics_collector),
424 performance_monitor: Arc::clone(&self.performance_monitor),
425 query_analyzer: Arc::clone(&self.query_analyzer),
426 alert_manager: Arc::clone(&self.alert_manager),
427 dashboard_data: Arc::clone(&self.dashboard_data),
428 event_broadcaster: self.event_broadcaster.clone(),
429 }
430 }
431}
432
433impl VectorAnalyticsEngine {
434 pub fn new(config: AnalyticsConfig) -> Self {
435 let (event_broadcaster, _) = broadcast::channel(1000);
436
437 let metrics_collector = Arc::new(MetricsCollector::new());
438 let performance_monitor = Arc::new(PerformanceMonitor::new());
439 let query_analyzer = Arc::new(QueryAnalyzer::new());
440 let alert_manager = Arc::new(AlertManager::new(AlertConfig::default()));
441 let dashboard_data = Arc::new(RwLock::new(DashboardData::default()));
442
443 Self {
444 config,
445 metrics_collector,
446 performance_monitor,
447 query_analyzer,
448 alert_manager,
449 dashboard_data,
450 event_broadcaster,
451 }
452 }
453
454 pub fn record_query_execution(
456 &self,
457 query_id: String,
458 operation_type: String,
459 duration: Duration,
460 result_count: usize,
461 success: bool,
462 ) -> Result<()> {
463 {
465 let mut metrics = self.metrics_collector.query_metrics.write();
466 metrics.total_queries += 1;
467
468 if success {
469 metrics.successful_queries += 1;
470 } else {
471 metrics.failed_queries += 1;
472 }
473
474 self.update_latency_statistics(&mut metrics, duration);
476
477 *metrics
479 .query_distribution
480 .entry(operation_type.clone())
481 .or_insert(0) += 1;
482
483 metrics.error_rate =
485 (metrics.failed_queries as f64) / (metrics.total_queries as f64) * 100.0;
486 }
487
488 self.check_performance_alerts(duration, success)?;
490
491 let event = AnalyticsEvent::QueryExecuted {
493 query_id,
494 operation_type,
495 duration,
496 result_count,
497 success,
498 timestamp: SystemTime::now(),
499 };
500
501 let _ = self.event_broadcaster.send(event);
502
503 Ok(())
504 }
505
506 fn update_latency_statistics(&self, metrics: &mut QueryMetrics, duration: Duration) {
507 let timestamp = SystemTime::now();
508
509 metrics.latency_history.push_back((timestamp, duration));
511 if metrics.latency_history.len() > self.config.max_metrics_history {
512 metrics.latency_history.pop_front();
513 }
514
515 let latencies: Vec<Duration> = metrics.latency_history.iter().map(|(_, d)| *d).collect();
517
518 if !latencies.is_empty() {
519 let mut sorted_latencies = latencies.clone();
520 sorted_latencies.sort();
521
522 let len = sorted_latencies.len();
523 metrics.p50_latency = sorted_latencies[len / 2];
524 metrics.p95_latency = sorted_latencies[(len as f64 * 0.95) as usize];
525 metrics.p99_latency = sorted_latencies[(len as f64 * 0.99) as usize];
526 metrics.max_latency = *sorted_latencies
527 .last()
528 .expect("sorted_latencies validated to be non-empty");
529 metrics.min_latency = *sorted_latencies
530 .first()
531 .expect("collection validated to be non-empty");
532
533 let total_duration: Duration = latencies.iter().sum();
534 metrics.average_latency = total_duration / len as u32;
535 }
536 }
537
538 fn check_performance_alerts(&self, duration: Duration, success: bool) -> Result<()> {
539 let thresholds = self.performance_monitor.thresholds.read();
540
541 if duration.as_millis() > thresholds.max_latency_ms as u128 {
543 self.create_alert(
544 AlertType::HighLatency,
545 AlertSeverity::Warning,
546 format!(
547 "Query latency {}ms exceeds threshold {}ms",
548 duration.as_millis(),
549 thresholds.max_latency_ms
550 ),
551 )?;
552 }
553
554 if !success {
556 let metrics = self.metrics_collector.query_metrics.read();
557 if metrics.error_rate > thresholds.max_error_rate_percent {
558 self.create_alert(
559 AlertType::SystemError,
560 AlertSeverity::Critical,
561 format!(
562 "Error rate {:.2}% exceeds threshold {:.2}%",
563 metrics.error_rate, thresholds.max_error_rate_percent
564 ),
565 )?;
566 }
567 }
568
569 Ok(())
570 }
571
572 fn create_alert(
573 &self,
574 alert_type: AlertType,
575 severity: AlertSeverity,
576 message: String,
577 ) -> Result<()> {
578 let alert_id = format!(
579 "{:?}_{}",
580 alert_type,
581 SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
582 );
583
584 let alert = Alert {
585 id: alert_id,
586 alert_type,
587 severity,
588 message,
589 timestamp: SystemTime::now(),
590 resolved: false,
591 resolved_timestamp: None,
592 metadata: HashMap::new(),
593 };
594
595 {
597 let mut current_alerts = self.performance_monitor.current_alerts.write();
598 current_alerts.insert(alert.id.clone(), alert.clone());
599
600 let mut alert_history = self.performance_monitor.alert_history.write();
601 alert_history.push_back(alert.clone());
602 if alert_history.len() > self.config.max_metrics_history {
603 alert_history.pop_front();
604 }
605 }
606
607 self.alert_manager.send_alert(&alert)?;
609
610 let event = AnalyticsEvent::PerformanceAlert {
612 alert_type: alert.alert_type.clone(),
613 message: alert.message.clone(),
614 severity: alert.severity.clone(),
615 timestamp: alert.timestamp,
616 };
617
618 let _ = self.event_broadcaster.send(event);
619
620 Ok(())
621 }
622
623 pub fn record_distributed_query(
625 &self,
626 query_id: String,
627 node_count: usize,
628 total_duration: Duration,
629 _federation_id: Option<String>,
630 success: bool,
631 ) -> Result<()> {
632 {
634 let mut metrics = self.metrics_collector.query_metrics.write();
635 metrics.total_queries += 1;
636
637 if success {
638 metrics.successful_queries += 1;
639 } else {
640 metrics.failed_queries += 1;
641 }
642
643 self.update_latency_statistics(&mut metrics, total_duration);
645
646 let operation_type = format!("distributed_query_{node_count}_nodes");
648 *metrics
649 .query_distribution
650 .entry(operation_type)
651 .or_insert(0) += 1;
652
653 metrics.error_rate = if metrics.total_queries > 0 {
655 metrics.failed_queries as f64 / metrics.total_queries as f64
656 } else {
657 0.0
658 };
659 }
660
661 let event = AnalyticsEvent::QueryExecuted {
663 query_id: query_id.clone(),
664 operation_type: format!("distributed_query_{node_count}_nodes"),
665 duration: total_duration,
666 result_count: node_count,
667 success,
668 timestamp: SystemTime::now(),
669 };
670
671 let _ = self.event_broadcaster.send(event);
672
673 if total_duration.as_millis() > 5000 {
675 let message = format!(
676 "Distributed query {} across {} nodes took {}ms",
677 query_id,
678 node_count,
679 total_duration.as_millis()
680 );
681
682 self.create_alert(AlertType::HighLatency, AlertSeverity::Warning, message)?;
683 }
684
685 Ok(())
686 }
687
688 pub fn update_system_metrics(
690 &self,
691 cpu_usage: f64,
692 memory_usage: f64,
693 memory_total: u64,
694 ) -> Result<()> {
695 {
696 let mut metrics = self.metrics_collector.system_metrics.write();
697 metrics.cpu_usage = cpu_usage;
698 metrics.memory_usage = memory_usage;
699 metrics.memory_total = memory_total;
700 metrics.memory_available =
701 memory_total - (memory_total as f64 * memory_usage / 100.0) as u64;
702 }
703
704 let thresholds = self.performance_monitor.thresholds.read();
706
707 if cpu_usage > thresholds.max_cpu_usage_percent {
708 self.create_alert(
709 AlertType::HighCpuUsage,
710 AlertSeverity::Warning,
711 format!(
712 "CPU usage {:.2}% exceeds threshold {:.2}%",
713 cpu_usage, thresholds.max_cpu_usage_percent
714 ),
715 )?;
716 }
717
718 if memory_usage > thresholds.max_memory_usage_percent {
719 self.create_alert(
720 AlertType::HighMemoryUsage,
721 AlertSeverity::Warning,
722 format!(
723 "Memory usage {:.2}% exceeds threshold {:.2}%",
724 memory_usage, thresholds.max_memory_usage_percent
725 ),
726 )?;
727 }
728
729 Ok(())
730 }
731
732 pub fn get_dashboard_data(&self) -> DashboardData {
734 self.dashboard_data.read().clone()
735 }
736
737 pub fn subscribe_to_events(&self) -> broadcast::Receiver<AnalyticsEvent> {
739 self.event_broadcaster.subscribe()
740 }
741
742 pub fn generate_report(
744 &self,
745 start_time: SystemTime,
746 end_time: SystemTime,
747 ) -> Result<AnalyticsReport> {
748 let query_metrics = self.metrics_collector.query_metrics.read().clone();
749 let system_metrics = self.metrics_collector.system_metrics.read().clone();
750 let quality_metrics = self.metrics_collector.quality_metrics.read().clone();
751
752 Ok(AnalyticsReport {
753 report_id: format!(
754 "report_{}",
755 SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
756 ),
757 start_time,
758 end_time,
759 query_metrics,
760 system_metrics,
761 quality_metrics,
762 alerts: self.get_alerts_in_range(start_time, end_time)?,
763 recommendations: self.generate_recommendations()?,
764 generated_at: SystemTime::now(),
765 })
766 }
767
768 fn get_alerts_in_range(
769 &self,
770 start_time: SystemTime,
771 end_time: SystemTime,
772 ) -> Result<Vec<Alert>> {
773 let alert_history = self.performance_monitor.alert_history.read();
774 Ok(alert_history
775 .iter()
776 .filter(|alert| alert.timestamp >= start_time && alert.timestamp <= end_time)
777 .cloned()
778 .collect())
779 }
780
781 fn generate_recommendations(&self) -> Result<Vec<String>> {
782 let mut recommendations = Vec::new();
783
784 let query_metrics = self.metrics_collector.query_metrics.read();
785 let system_metrics = self.metrics_collector.system_metrics.read();
786
787 if query_metrics.average_latency.as_millis() > 50 {
789 recommendations
790 .push("Consider optimizing queries or adding more powerful hardware".to_string());
791 }
792
793 if system_metrics.memory_usage > 80.0 {
794 recommendations.push(
795 "Memory usage is high. Consider increasing memory or optimizing data structures"
796 .to_string(),
797 );
798 }
799
800 if system_metrics.cache_hit_ratio < 0.8 {
801 recommendations.push("Cache hit ratio is low. Consider increasing cache size or improving cache strategy".to_string());
802 }
803
804 Ok(recommendations)
805 }
806
807 pub fn export_metrics(&self, format: ExportFormat, destination: &str) -> Result<()> {
809 let metrics_data = self.collect_all_metrics()?;
810
811 match format {
812 ExportFormat::Json => self.export_as_json(&metrics_data, destination),
813 ExportFormat::Csv => self.export_as_csv(&metrics_data, destination),
814 ExportFormat::Prometheus => self.export_as_prometheus(&metrics_data, destination),
815 ExportFormat::InfluxDb => self.export_as_influxdb(&metrics_data, destination),
816 }
817 }
818
819 fn collect_all_metrics(&self) -> Result<HashMap<String, serde_json::Value>> {
820 let mut all_metrics = HashMap::new();
821
822 let query_metrics = self.metrics_collector.query_metrics.read();
823 let system_metrics = self.metrics_collector.system_metrics.read();
824 let quality_metrics = self.metrics_collector.quality_metrics.read();
825
826 all_metrics.insert(
827 "query_metrics".to_string(),
828 serde_json::to_value(&*query_metrics)?,
829 );
830 all_metrics.insert(
831 "system_metrics".to_string(),
832 serde_json::to_value(&*system_metrics)?,
833 );
834 all_metrics.insert(
835 "quality_metrics".to_string(),
836 serde_json::to_value(&*quality_metrics)?,
837 );
838
839 Ok(all_metrics)
840 }
841
842 fn export_as_json(
843 &self,
844 metrics: &HashMap<String, serde_json::Value>,
845 destination: &str,
846 ) -> Result<()> {
847 let json_data = serde_json::to_string_pretty(metrics)?;
848 std::fs::write(destination, json_data)?;
849 Ok(())
850 }
851
852 fn export_as_csv(
853 &self,
854 metrics: &HashMap<String, serde_json::Value>,
855 destination: &str,
856 ) -> Result<()> {
857 let mut csv_content = String::new();
858 csv_content.push_str("timestamp,metric_name,value,category\n");
859
860 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S");
861
862 if let Some(query_metrics) = metrics.get("query_metrics") {
864 if let Some(obj) = query_metrics.as_object() {
865 for (key, value) in obj {
866 if let Some(num_val) = value.as_f64() {
867 csv_content.push_str(&format!("{timestamp},query_{key},{num_val},query\n"));
868 }
869 }
870 }
871 }
872
873 if let Some(system_metrics) = metrics.get("system_metrics") {
875 if let Some(obj) = system_metrics.as_object() {
876 for (key, value) in obj {
877 if let Some(num_val) = value.as_f64() {
878 csv_content
879 .push_str(&format!("{timestamp},system_{key},{num_val},system\n"));
880 }
881 }
882 }
883 }
884
885 std::fs::write(destination, csv_content)?;
886 Ok(())
887 }
888
889 fn export_as_prometheus(
890 &self,
891 metrics: &HashMap<String, serde_json::Value>,
892 destination: &str,
893 ) -> Result<()> {
894 let mut prometheus_content = String::new();
895 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
896
897 if let Some(query_metrics) = metrics.get("query_metrics") {
899 if let Some(obj) = query_metrics.as_object() {
900 for (key, value) in obj {
901 if let Some(num_val) = value.as_f64() {
902 prometheus_content
903 .push_str(&format!("# HELP vector_query_{key} Query metric {key}\n"));
904 prometheus_content.push_str(&format!("# TYPE vector_query_{key} gauge\n"));
905 prometheus_content
906 .push_str(&format!("vector_query_{key} {num_val} {timestamp}\n"));
907 }
908 }
909 }
910 }
911
912 if let Some(system_metrics) = metrics.get("system_metrics") {
914 if let Some(obj) = system_metrics.as_object() {
915 for (key, value) in obj {
916 if let Some(num_val) = value.as_f64() {
917 prometheus_content
918 .push_str(&format!("# HELP vector_system_{key} System metric {key}\n"));
919 prometheus_content.push_str(&format!("# TYPE vector_system_{key} gauge\n"));
920 prometheus_content
921 .push_str(&format!("vector_system_{key} {num_val} {timestamp}\n"));
922 }
923 }
924 }
925 }
926
927 std::fs::write(destination, prometheus_content)?;
928 Ok(())
929 }
930
931 fn export_as_influxdb(
932 &self,
933 metrics: &HashMap<String, serde_json::Value>,
934 destination: &str,
935 ) -> Result<()> {
936 let mut influxdb_content = String::new();
937 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
938
939 if let Some(query_metrics) = metrics.get("query_metrics") {
941 if let Some(obj) = query_metrics.as_object() {
942 for (key, value) in obj {
943 if let Some(num_val) = value.as_f64() {
944 influxdb_content.push_str(&format!(
945 "vector_query,type=query {key}={num_val} {timestamp}\n"
946 ));
947 }
948 }
949 }
950 }
951
952 if let Some(system_metrics) = metrics.get("system_metrics") {
954 if let Some(obj) = system_metrics.as_object() {
955 for (key, value) in obj {
956 if let Some(num_val) = value.as_f64() {
957 influxdb_content.push_str(&format!(
958 "vector_system,type=system {key}={num_val} {timestamp}\n"
959 ));
960 }
961 }
962 }
963 }
964
965 std::fs::write(destination, influxdb_content)?;
966 Ok(())
967 }
968
969 pub async fn start_dashboard_updates(&self) -> Result<()> {
971 let dashboard_data = Arc::clone(&self.dashboard_data);
972 let metrics_collector = Arc::clone(&self.metrics_collector);
973 let performance_monitor = Arc::clone(&self.performance_monitor);
974 let refresh_interval = Duration::from_secs(self.config.dashboard_refresh_interval);
975
976 tokio::spawn(async move {
977 let mut interval = tokio::time::interval(refresh_interval);
978
979 loop {
980 interval.tick().await;
981
982 let updated_data =
984 Self::build_dashboard_data(&metrics_collector, &performance_monitor).await;
985
986 {
987 let mut data = dashboard_data.write();
988 *data = updated_data;
989 }
990 }
991 });
992
993 Ok(())
994 }
995
996 async fn build_dashboard_data(
997 metrics_collector: &MetricsCollector,
998 performance_monitor: &PerformanceMonitor,
999 ) -> DashboardData {
1000 let query_metrics = metrics_collector.query_metrics.read().clone();
1001 let system_metrics = metrics_collector.system_metrics.read().clone();
1002 let quality_metrics = metrics_collector.quality_metrics.read().clone();
1003 let current_alerts: Vec<Alert> = performance_monitor
1004 .current_alerts
1005 .read()
1006 .values()
1007 .cloned()
1008 .collect();
1009
1010 let health_score = Self::calculate_health_score(&system_metrics, &query_metrics);
1012
1013 let current_qps = Self::calculate_current_qps(&query_metrics);
1015
1016 DashboardData {
1017 overview: OverviewData {
1018 total_queries_today: query_metrics.total_queries,
1019 average_latency: query_metrics.average_latency,
1020 current_qps,
1021 system_health_score: health_score,
1022 active_alerts: current_alerts.len() as u64,
1023 index_size: system_metrics.index_size,
1024 vector_count: system_metrics.vector_count,
1025 cache_hit_ratio: system_metrics.cache_hit_ratio,
1026 },
1027 query_performance: QueryPerformanceData {
1028 latency_trends: query_metrics.latency_history.iter().cloned().collect(),
1029 throughput_trends: query_metrics.throughput_history.iter().cloned().collect(),
1030 error_rate_trends: vec![(SystemTime::now(), query_metrics.error_rate)],
1031 top_slow_queries: vec![], query_distribution: query_metrics.query_distribution.clone(),
1033 performance_percentiles: {
1034 let mut percentiles = HashMap::new();
1035 percentiles.insert("p50".to_string(), query_metrics.p50_latency);
1036 percentiles.insert("p95".to_string(), query_metrics.p95_latency);
1037 percentiles.insert("p99".to_string(), query_metrics.p99_latency);
1038 percentiles
1039 },
1040 },
1041 system_health: SystemHealthData {
1042 cpu_usage: system_metrics.cpu_usage,
1043 memory_usage: system_metrics.memory_usage,
1044 disk_usage: system_metrics.disk_usage,
1045 network_throughput: 0.0, resource_trends: vec![(SystemTime::now(), system_metrics.cpu_usage)],
1047 capacity_forecast: vec![], bottlenecks: Self::identify_bottlenecks(&system_metrics, &query_metrics),
1049 },
1050 quality_metrics: QualityMetricsData {
1051 recall_trends: vec![],
1052 precision_trends: vec![],
1053 similarity_distribution: quality_metrics.similarity_distribution.clone(),
1054 quality_score: quality_metrics.average_similarity_score,
1055 quality_trends: vec![(SystemTime::now(), quality_metrics.average_similarity_score)],
1056 benchmark_comparisons: HashMap::new(),
1057 },
1058 usage_analytics: UsageAnalyticsData {
1059 user_activity: vec![(SystemTime::now(), query_metrics.total_queries)],
1060 popular_queries: vec![], usage_patterns: HashMap::new(),
1062 growth_metrics: GrowthMetrics::default(),
1063 feature_usage: HashMap::new(),
1064 },
1065 alerts: current_alerts,
1066 last_updated: SystemTime::now(),
1067 }
1068 }
1069
1070 fn calculate_health_score(system_metrics: &SystemMetrics, query_metrics: &QueryMetrics) -> f64 {
1071 let mut score = 100.0;
1072
1073 if system_metrics.cpu_usage > 80.0 {
1075 score -= (system_metrics.cpu_usage - 80.0) * 0.5;
1076 }
1077 if system_metrics.memory_usage > 80.0 {
1078 score -= (system_metrics.memory_usage - 80.0) * 0.5;
1079 }
1080
1081 if query_metrics.error_rate > 1.0 {
1083 score -= query_metrics.error_rate * 10.0;
1084 }
1085
1086 if query_metrics.average_latency.as_millis() > 100 {
1088 score -= (query_metrics.average_latency.as_millis() as f64 - 100.0) * 0.1;
1089 }
1090
1091 score.clamp(0.0, 100.0)
1092 }
1093
1094 fn calculate_current_qps(query_metrics: &QueryMetrics) -> f64 {
1095 if query_metrics.latency_history.len() < 2 {
1097 return 0.0;
1098 }
1099
1100 let now = SystemTime::now();
1101 let one_second_ago = now - Duration::from_secs(1);
1102
1103 let recent_queries = query_metrics
1104 .latency_history
1105 .iter()
1106 .filter(|(timestamp, _)| *timestamp >= one_second_ago)
1107 .count();
1108
1109 recent_queries as f64
1110 }
1111
1112 fn identify_bottlenecks(
1113 system_metrics: &SystemMetrics,
1114 query_metrics: &QueryMetrics,
1115 ) -> Vec<String> {
1116 let mut bottlenecks = Vec::new();
1117
1118 if system_metrics.cpu_usage > 90.0 {
1119 bottlenecks.push("High CPU usage".to_string());
1120 }
1121
1122 if system_metrics.memory_usage > 90.0 {
1123 bottlenecks.push("High memory usage".to_string());
1124 }
1125
1126 if query_metrics.average_latency.as_millis() > 500 {
1127 bottlenecks.push("High query latency".to_string());
1128 }
1129
1130 if system_metrics.cache_hit_ratio < 0.7 {
1131 bottlenecks.push("Low cache hit ratio".to_string());
1132 }
1133
1134 bottlenecks
1135 }
1136
1137 pub fn generate_dashboard_html(&self) -> Result<String> {
1139 let dashboard_data = self.get_dashboard_data();
1140
1141 let html = format!(
1142 r#"
1143<!DOCTYPE html>
1144<html>
1145<head>
1146 <title>OxiRS Vector Search Analytics Dashboard</title>
1147 <style>
1148 body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
1149 .dashboard {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }}
1150 .card {{ background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
1151 .metric {{ display: flex; justify-content: space-between; margin: 10px 0; }}
1152 .metric-value {{ font-weight: bold; color: #007acc; }}
1153 .alert {{ padding: 10px; margin: 5px 0; border-radius: 4px; }}
1154 .alert-critical {{ background-color: #ffebee; border-left: 4px solid #f44336; }}
1155 .alert-warning {{ background-color: #fff3e0; border-left: 4px solid #ff9800; }}
1156 .alert-info {{ background-color: #e3f2fd; border-left: 4px solid #2196f3; }}
1157 .health-score {{ font-size: 2em; text-align: center; }}
1158 .health-good {{ color: #4caf50; }}
1159 .health-warning {{ color: #ff9800; }}
1160 .health-critical {{ color: #f44336; }}
1161 h1 {{ color: #333; text-align: center; }}
1162 h2 {{ color: #555; margin-top: 0; }}
1163 .refresh-time {{ text-align: center; color: #888; font-size: 0.9em; }}
1164 </style>
1165 <script>
1166 function refreshPage() {{
1167 window.location.reload();
1168 }}
1169 setInterval(refreshPage, 30000); // Refresh every 30 seconds
1170 </script>
1171</head>
1172<body>
1173 <h1>🔍 OxiRS Vector Search Analytics Dashboard</h1>
1174 <p class="refresh-time">Last updated: {}</p>
1175
1176 <div class="dashboard">
1177 <div class="card">
1178 <h2>System Health</h2>
1179 <div class="health-score {}">{:.1}%</div>
1180 <div class="metric">
1181 <span>Active Alerts:</span>
1182 <span class="metric-value">{}</span>
1183 </div>
1184 </div>
1185
1186 <div class="card">
1187 <h2>Query Performance</h2>
1188 <div class="metric">
1189 <span>Total Queries Today:</span>
1190 <span class="metric-value">{}</span>
1191 </div>
1192 <div class="metric">
1193 <span>Average Latency:</span>
1194 <span class="metric-value">{}ms</span>
1195 </div>
1196 <div class="metric">
1197 <span>Current QPS:</span>
1198 <span class="metric-value">{:.1}</span>
1199 </div>
1200 </div>
1201
1202 <div class="card">
1203 <h2>System Resources</h2>
1204 <div class="metric">
1205 <span>CPU Usage:</span>
1206 <span class="metric-value">{:.1}%</span>
1207 </div>
1208 <div class="metric">
1209 <span>Memory Usage:</span>
1210 <span class="metric-value">{:.1}%</span>
1211 </div>
1212 <div class="metric">
1213 <span>Cache Hit Ratio:</span>
1214 <span class="metric-value">{:.1}%</span>
1215 </div>
1216 </div>
1217
1218 <div class="card">
1219 <h2>Vector Index</h2>
1220 <div class="metric">
1221 <span>Vector Count:</span>
1222 <span class="metric-value">{}</span>
1223 </div>
1224 <div class="metric">
1225 <span>Index Size:</span>
1226 <span class="metric-value">{} MB</span>
1227 </div>
1228 </div>
1229
1230 <div class="card">
1231 <h2>Active Alerts</h2>
1232 {}
1233 </div>
1234 </div>
1235</body>
1236</html>
1237 "#,
1238 chrono::DateTime::<chrono::Utc>::from(dashboard_data.last_updated)
1239 .format("%Y-%m-%d %H:%M:%S UTC"),
1240 if dashboard_data.overview.system_health_score >= 80.0 {
1241 "health-good"
1242 } else if dashboard_data.overview.system_health_score >= 60.0 {
1243 "health-warning"
1244 } else {
1245 "health-critical"
1246 },
1247 dashboard_data.overview.system_health_score,
1248 dashboard_data.overview.active_alerts,
1249 dashboard_data.overview.total_queries_today,
1250 dashboard_data.overview.average_latency.as_millis(),
1251 dashboard_data.overview.current_qps,
1252 dashboard_data.system_health.cpu_usage,
1253 dashboard_data.system_health.memory_usage,
1254 dashboard_data.overview.cache_hit_ratio * 100.0,
1255 dashboard_data.overview.vector_count,
1256 dashboard_data.overview.index_size / (1024 * 1024), Self::format_alerts(&dashboard_data.alerts)
1258 );
1259
1260 Ok(html)
1261 }
1262
1263 fn format_alerts(alerts: &[Alert]) -> String {
1264 if alerts.is_empty() {
1265 return "<p>No active alerts</p>".to_string();
1266 }
1267
1268 alerts
1269 .iter()
1270 .map(|alert| {
1271 let class = match alert.severity {
1272 AlertSeverity::Critical => "alert-critical",
1273 AlertSeverity::Warning => "alert-warning",
1274 AlertSeverity::Info => "alert-info",
1275 };
1276
1277 format!(
1278 "<div class=\"alert {}\">
1279 <strong>{:?}</strong>: {}
1280 <br><small>{}</small>
1281 </div>",
1282 class,
1283 alert.alert_type,
1284 alert.message,
1285 chrono::DateTime::<chrono::Utc>::from(alert.timestamp).format("%H:%M:%S")
1286 )
1287 })
1288 .collect::<Vec<_>>()
1289 .join("\n")
1290 }
1291
1292 pub async fn start_system_monitoring(&self) -> Result<()> {
1294 let analytics_engine = self.clone();
1295
1296 tokio::spawn(async move {
1297 let mut interval = tokio::time::interval(Duration::from_secs(5));
1298
1299 loop {
1300 interval.tick().await;
1301
1302 if let Ok(system_info) = Self::collect_system_info().await {
1304 let _ = analytics_engine.update_system_metrics(
1305 system_info.cpu_usage,
1306 system_info.memory_usage,
1307 system_info.memory_total,
1308 );
1309 }
1310 }
1311 });
1312
1313 Ok(())
1314 }
1315
1316 async fn collect_system_info() -> Result<SystemInfo> {
1317 Ok(SystemInfo {
1320 cpu_usage: {
1321 #[allow(unused_imports)]
1322 use scirs2_core::random::{Random, Rng};
1323 let mut rng = Random::seed(42);
1324 45.0 + (rng.gen_range(0.0..1.0) * 20.0) },
1326 memory_usage: {
1327 #[allow(unused_imports)]
1328 use scirs2_core::random::{Random, Rng};
1329 let mut rng = Random::seed(42);
1330 60.0 + (rng.gen_range(0.0..1.0) * 20.0) },
1332 memory_total: 16 * 1024 * 1024 * 1024, disk_usage: 30.0,
1334 network_throughput: 100.0,
1335 })
1336 }
1337}
1338
1339#[derive(Debug, Clone)]
1341pub struct SystemInfo {
1342 pub cpu_usage: f64,
1343 pub memory_usage: f64,
1344 pub memory_total: u64,
1345 pub disk_usage: f64,
1346 pub network_throughput: f64,
1347}
1348
1349pub struct PerformanceProfiler {
1351 profiles: Arc<RwLock<HashMap<String, ProfileData>>>,
1352 active_profiles: Arc<RwLock<HashMap<String, Instant>>>,
1353}
1354
1355#[derive(Debug, Clone, Serialize, Deserialize)]
1356pub struct ProfileData {
1357 pub function_name: String,
1358 pub total_calls: u64,
1359 pub total_time: Duration,
1360 pub average_time: Duration,
1361 pub min_time: Duration,
1362 pub max_time: Duration,
1363 pub call_history: VecDeque<(SystemTime, Duration)>,
1364}
1365
1366impl Default for PerformanceProfiler {
1367 fn default() -> Self {
1368 Self::new()
1369 }
1370}
1371
1372impl PerformanceProfiler {
1373 pub fn new() -> Self {
1374 Self {
1375 profiles: Arc::new(RwLock::new(HashMap::new())),
1376 active_profiles: Arc::new(RwLock::new(HashMap::new())),
1377 }
1378 }
1379
1380 pub fn start_profile(&self, function_name: &str) -> String {
1381 let profile_id = format!(
1382 "{}_{}",
1383 function_name,
1384 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
1385 );
1386 let mut active = self.active_profiles.write();
1387 active.insert(profile_id.clone(), Instant::now());
1388 profile_id
1389 }
1390
1391 pub fn end_profile(&self, profile_id: &str) -> Result<Duration> {
1392 let mut active = self.active_profiles.write();
1393 let start_time = active
1394 .remove(profile_id)
1395 .ok_or_else(|| anyhow!("Profile ID not found: {}", profile_id))?;
1396
1397 let duration = start_time.elapsed();
1398
1399 let function_name = profile_id.split('_').next().unwrap_or("unknown");
1401
1402 let mut profiles = self.profiles.write();
1404 let profile = profiles
1405 .entry(function_name.to_string())
1406 .or_insert_with(|| ProfileData {
1407 function_name: function_name.to_string(),
1408 total_calls: 0,
1409 total_time: Duration::from_nanos(0),
1410 average_time: Duration::from_nanos(0),
1411 min_time: Duration::from_secs(u64::MAX),
1412 max_time: Duration::from_nanos(0),
1413 call_history: VecDeque::new(),
1414 });
1415
1416 profile.total_calls += 1;
1417 profile.total_time += duration;
1418 profile.average_time = profile.total_time / profile.total_calls as u32;
1419 profile.min_time = profile.min_time.min(duration);
1420 profile.max_time = profile.max_time.max(duration);
1421 profile
1422 .call_history
1423 .push_back((SystemTime::now(), duration));
1424
1425 while profile.call_history.len() > 1000 {
1427 profile.call_history.pop_front();
1428 }
1429
1430 Ok(duration)
1431 }
1432
1433 pub fn get_profile_report(&self) -> HashMap<String, ProfileData> {
1434 self.profiles.read().clone()
1435 }
1436}
1437
1438#[derive(Debug, Clone)]
1440pub enum ExportFormat {
1441 Json,
1442 Csv,
1443 Prometheus,
1444 InfluxDb,
1445}
1446
1447#[derive(Debug, Clone, Serialize, Deserialize)]
1449pub struct AnalyticsReport {
1450 pub report_id: String,
1451 pub start_time: SystemTime,
1452 pub end_time: SystemTime,
1453 pub query_metrics: QueryMetrics,
1454 pub system_metrics: SystemMetrics,
1455 pub quality_metrics: QualityMetrics,
1456 pub alerts: Vec<Alert>,
1457 pub recommendations: Vec<String>,
1458 pub generated_at: SystemTime,
1459}
1460
1461impl Default for MetricsCollector {
1462 fn default() -> Self {
1463 Self::new()
1464 }
1465}
1466
1467impl MetricsCollector {
1468 pub fn new() -> Self {
1469 Self {
1470 query_metrics: Arc::new(RwLock::new(QueryMetrics::default())),
1471 system_metrics: Arc::new(RwLock::new(SystemMetrics::default())),
1472 quality_metrics: Arc::new(RwLock::new(QualityMetrics::default())),
1473 custom_metrics: Arc::new(RwLock::new(HashMap::new())),
1474 }
1475 }
1476}
1477
1478impl Default for PerformanceMonitor {
1479 fn default() -> Self {
1480 Self::new()
1481 }
1482}
1483
1484impl PerformanceMonitor {
1485 pub fn new() -> Self {
1486 Self {
1487 thresholds: Arc::new(RwLock::new(PerformanceThresholds::default())),
1488 alert_history: Arc::new(RwLock::new(VecDeque::new())),
1489 current_alerts: Arc::new(RwLock::new(HashMap::new())),
1490 }
1491 }
1492}
1493
1494impl Default for QueryAnalyzer {
1495 fn default() -> Self {
1496 Self::new()
1497 }
1498}
1499
1500impl QueryAnalyzer {
1501 pub fn new() -> Self {
1502 Self {
1503 query_patterns: Arc::new(RwLock::new(HashMap::new())),
1504 popular_queries: Arc::new(RwLock::new(VecDeque::new())),
1505 usage_trends: Arc::new(RwLock::new(UsageTrends::default())),
1506 }
1507 }
1508}
1509
1510impl AlertManager {
1511 pub fn new(config: AlertConfig) -> Self {
1512 Self {
1513 config,
1514 notification_channels: Vec::new(),
1515 alert_rules: Arc::new(RwLock::new(Vec::new())),
1516 }
1517 }
1518
1519 pub fn send_alert(&self, alert: &Alert) -> Result<()> {
1520 for channel in &self.notification_channels {
1521 if let Err(e) = channel.send_notification(alert) {
1522 eprintln!(
1523 "Failed to send alert via {}: {}",
1524 channel.get_channel_type(),
1525 e
1526 );
1527 }
1528 }
1529 Ok(())
1530 }
1531
1532 pub fn add_notification_channel(&mut self, channel: Box<dyn NotificationChannel>) {
1533 self.notification_channels.push(channel);
1534 }
1535}
1536
1537impl Default for DashboardData {
1538 fn default() -> Self {
1539 Self {
1540 overview: OverviewData::default(),
1541 query_performance: QueryPerformanceData::default(),
1542 system_health: SystemHealthData::default(),
1543 quality_metrics: QualityMetricsData::default(),
1544 usage_analytics: UsageAnalyticsData::default(),
1545 alerts: Vec::new(),
1546 last_updated: SystemTime::now(),
1547 }
1548 }
1549}
1550
1551impl Default for OverviewData {
1552 fn default() -> Self {
1553 Self {
1554 total_queries_today: 0,
1555 average_latency: Duration::from_millis(0),
1556 current_qps: 0.0,
1557 system_health_score: 100.0,
1558 active_alerts: 0,
1559 index_size: 0,
1560 vector_count: 0,
1561 cache_hit_ratio: 0.0,
1562 }
1563 }
1564}
1565
1566impl Default for SystemHealthData {
1567 fn default() -> Self {
1568 Self {
1569 cpu_usage: 0.0,
1570 memory_usage: 0.0,
1571 disk_usage: 0.0,
1572 network_throughput: 0.0,
1573 resource_trends: Vec::new(),
1574 capacity_forecast: Vec::new(),
1575 bottlenecks: Vec::new(),
1576 }
1577 }
1578}
1579
1580impl Default for QualityMetricsData {
1581 fn default() -> Self {
1582 Self {
1583 recall_trends: Vec::new(),
1584 precision_trends: Vec::new(),
1585 similarity_distribution: Vec::new(),
1586 quality_score: 0.0,
1587 quality_trends: Vec::new(),
1588 benchmark_comparisons: HashMap::new(),
1589 }
1590 }
1591}
1592
1593impl Default for UsageTrends {
1594 fn default() -> Self {
1595 Self {
1596 daily_query_counts: VecDeque::new(),
1597 hourly_patterns: [0; 24],
1598 weekly_patterns: [0; 7],
1599 growth_rate: 0.0,
1600 seasonal_patterns: HashMap::new(),
1601 user_growth: 0.0,
1602 feature_adoption: HashMap::new(),
1603 }
1604 }
1605}
1606
1607impl Default for GrowthMetrics {
1608 fn default() -> Self {
1609 Self {
1610 daily_growth_rate: 0.0,
1611 weekly_growth_rate: 0.0,
1612 monthly_growth_rate: 0.0,
1613 user_retention: 0.0,
1614 query_volume_growth: 0.0,
1615 }
1616 }
1617}
1618
1619pub struct EmailNotificationChannel {
1621 smtp_config: SmtpConfig,
1622}
1623
1624#[derive(Debug, Clone)]
1625pub struct SmtpConfig {
1626 pub server: String,
1627 pub port: u16,
1628 pub username: String,
1629 pub password: String,
1630 pub from_address: String,
1631}
1632
1633impl EmailNotificationChannel {
1634 pub fn new(smtp_config: SmtpConfig) -> Self {
1635 Self { smtp_config }
1636 }
1637}
1638
1639impl NotificationChannel for EmailNotificationChannel {
1640 fn send_notification(&self, alert: &Alert) -> Result<()> {
1641 tracing::info!(
1643 "Email notification sent for alert {}: {}",
1644 alert.id,
1645 alert.message
1646 );
1647 Ok(())
1648 }
1649
1650 fn get_channel_type(&self) -> String {
1651 "email".to_string()
1652 }
1653}
1654
1655pub struct SlackNotificationChannel {
1657 webhook_url: String,
1658 client: reqwest::Client,
1659}
1660
1661impl SlackNotificationChannel {
1662 pub fn new(webhook_url: String) -> Self {
1663 Self {
1664 webhook_url,
1665 client: reqwest::Client::new(),
1666 }
1667 }
1668}
1669
1670impl NotificationChannel for SlackNotificationChannel {
1671 fn send_notification(&self, alert: &Alert) -> Result<()> {
1672 let _payload = serde_json::json!({
1673 "text": format!("🚨 Alert: {}", alert.message),
1674 "attachments": [{
1675 "color": match alert.severity {
1676 AlertSeverity::Critical => "danger",
1677 AlertSeverity::Warning => "warning",
1678 AlertSeverity::Info => "good",
1679 },
1680 "fields": [
1681 {
1682 "title": "Alert Type",
1683 "value": format!("{:?}", alert.alert_type),
1684 "short": true
1685 },
1686 {
1687 "title": "Severity",
1688 "value": format!("{:?}", alert.severity),
1689 "short": true
1690 },
1691 {
1692 "title": "Timestamp",
1693 "value": chrono::DateTime::<chrono::Utc>::from(alert.timestamp).format("%Y-%m-%d %H:%M:%S UTC").to_string(),
1694 "short": true
1695 }
1696 ]
1697 }]
1698 });
1699
1700 tracing::info!(
1702 "Slack notification sent for alert {}: {}",
1703 alert.id,
1704 alert.message
1705 );
1706 Ok(())
1707 }
1708
1709 fn get_channel_type(&self) -> String {
1710 "slack".to_string()
1711 }
1712}
1713
1714pub struct WebhookNotificationChannel {
1716 webhook_url: String,
1717 client: reqwest::Client,
1718 headers: HashMap<String, String>,
1719}
1720
1721impl WebhookNotificationChannel {
1722 pub fn new(webhook_url: String) -> Self {
1723 Self {
1724 webhook_url,
1725 client: reqwest::Client::new(),
1726 headers: HashMap::new(),
1727 }
1728 }
1729
1730 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
1731 self.headers = headers;
1732 self
1733 }
1734}
1735
1736impl NotificationChannel for WebhookNotificationChannel {
1737 fn send_notification(&self, alert: &Alert) -> Result<()> {
1738 let _payload = serde_json::to_value(alert)?;
1739
1740 tracing::info!(
1742 "Webhook notification sent for alert {}: {}",
1743 alert.id,
1744 alert.message
1745 );
1746 Ok(())
1747 }
1748
1749 fn get_channel_type(&self) -> String {
1750 "webhook".to_string()
1751 }
1752}
1753
1754#[cfg(test)]
1755mod tests {
1756 use super::*;
1757
1758 #[test]
1759 fn test_analytics_engine_creation() {
1760 let config = AnalyticsConfig::default();
1761 let engine = VectorAnalyticsEngine::new(config);
1762
1763 assert!(engine.config.enable_real_time);
1764 assert_eq!(engine.config.collection_interval, 1);
1765 }
1766
1767 #[test]
1768 fn test_query_recording() {
1769 let config = AnalyticsConfig::default();
1770 let engine = VectorAnalyticsEngine::new(config);
1771
1772 let result = engine.record_query_execution(
1773 "test_query_1".to_string(),
1774 "similarity_search".to_string(),
1775 Duration::from_millis(50),
1776 10,
1777 true,
1778 );
1779
1780 assert!(result.is_ok());
1781
1782 let metrics = engine.metrics_collector.query_metrics.read();
1783 assert_eq!(metrics.total_queries, 1);
1784 assert_eq!(metrics.successful_queries, 1);
1785 }
1786
1787 #[test]
1788 fn test_alert_creation() {
1789 let config = AnalyticsConfig::default();
1790 let engine = VectorAnalyticsEngine::new(config);
1791
1792 let result = engine.create_alert(
1793 AlertType::HighLatency,
1794 AlertSeverity::Warning,
1795 "Test alert message".to_string(),
1796 );
1797
1798 assert!(result.is_ok());
1799
1800 let current_alerts = engine.performance_monitor.current_alerts.read();
1801 assert_eq!(current_alerts.len(), 1);
1802 }
1803
1804 #[test]
1805 fn test_metrics_export() {
1806 let config = AnalyticsConfig::default();
1807 let engine = VectorAnalyticsEngine::new(config);
1808
1809 let _ = engine.record_query_execution(
1811 "test".to_string(),
1812 "search".to_string(),
1813 Duration::from_millis(25),
1814 5,
1815 true,
1816 );
1817
1818 let temp_file = "/tmp/test_metrics.json";
1820 let result = engine.export_metrics(ExportFormat::Json, temp_file);
1821 assert!(result.is_ok());
1822
1823 let _ = std::fs::remove_file(temp_file);
1825 }
1826}