1use anyhow::{anyhow, Result};
7use chrono;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, VecDeque};
11use std::sync::Arc;
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13use tokio::sync::broadcast;
14
15pub struct VectorAnalyticsEngine {
17 config: AnalyticsConfig,
18 metrics_collector: Arc<MetricsCollector>,
19 performance_monitor: Arc<PerformanceMonitor>,
20 query_analyzer: Arc<QueryAnalyzer>,
21 alert_manager: Arc<AlertManager>,
22 dashboard_data: Arc<RwLock<DashboardData>>,
23 event_broadcaster: broadcast::Sender<AnalyticsEvent>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct AnalyticsConfig {
29 pub enable_real_time: bool,
31 pub collection_interval: u64,
33 pub max_metrics_history: usize,
35 pub enable_query_analysis: bool,
37 pub enable_alerts: bool,
39 pub dashboard_refresh_interval: u64,
41 pub enable_tracing: bool,
43 pub enable_profiling: bool,
45 pub retention_days: u32,
47}
48
49impl Default for AnalyticsConfig {
50 fn default() -> Self {
51 Self {
52 enable_real_time: true,
53 collection_interval: 1,
54 max_metrics_history: 10000,
55 enable_query_analysis: true,
56 enable_alerts: true,
57 dashboard_refresh_interval: 5,
58 enable_tracing: true,
59 enable_profiling: true,
60 retention_days: 30,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub enum AnalyticsEvent {
68 QueryExecuted {
69 query_id: String,
70 operation_type: String,
71 duration: Duration,
72 result_count: usize,
73 success: bool,
74 timestamp: SystemTime,
75 },
76 IndexUpdated {
77 index_name: String,
78 operation: String,
79 vectors_affected: usize,
80 timestamp: SystemTime,
81 },
82 PerformanceAlert {
83 alert_type: AlertType,
84 message: String,
85 severity: AlertSeverity,
86 timestamp: SystemTime,
87 },
88 SystemMetric {
89 metric_name: String,
90 value: f64,
91 unit: String,
92 timestamp: SystemTime,
93 },
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub enum AlertType {
99 HighLatency,
100 LowThroughput,
101 HighMemoryUsage,
102 HighCpuUsage,
103 QualityDegradation,
104 IndexCorruption,
105 SystemError,
106 ResourceLimitReached,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub enum AlertSeverity {
112 Critical,
113 Warning,
114 Info,
115}
116
117#[derive(Debug)]
119pub struct MetricsCollector {
120 query_metrics: Arc<RwLock<QueryMetrics>>,
121 system_metrics: Arc<RwLock<SystemMetrics>>,
122 quality_metrics: Arc<RwLock<QualityMetrics>>,
123 custom_metrics: Arc<RwLock<HashMap<String, CustomMetric>>>,
124}
125
126#[derive(Debug, Clone, Default, Serialize, Deserialize)]
128pub struct QueryMetrics {
129 pub total_queries: u64,
130 pub successful_queries: u64,
131 pub failed_queries: u64,
132 pub average_latency: Duration,
133 pub p50_latency: Duration,
134 pub p95_latency: Duration,
135 pub p99_latency: Duration,
136 pub max_latency: Duration,
137 pub min_latency: Duration,
138 pub throughput_qps: f64,
139 pub latency_history: VecDeque<(SystemTime, Duration)>,
140 pub throughput_history: VecDeque<(SystemTime, f64)>,
141 pub error_rate: f64,
142 pub query_distribution: HashMap<String, u64>,
143}
144
145#[derive(Debug, Clone, Default, Serialize, Deserialize)]
147pub struct SystemMetrics {
148 pub cpu_usage: f64,
149 pub memory_usage: f64,
150 pub memory_total: u64,
151 pub memory_available: u64,
152 pub disk_usage: f64,
153 pub network_io: NetworkIO,
154 pub vector_count: u64,
155 pub index_size: u64,
156 pub cache_hit_ratio: f64,
157 pub gc_pressure: f64,
158 pub thread_count: u64,
159 pub system_load: f64,
160}
161
162#[derive(Debug, Clone, Default, Serialize, Deserialize)]
164pub struct NetworkIO {
165 pub bytes_sent: u64,
166 pub bytes_received: u64,
167 pub packets_sent: u64,
168 pub packets_received: u64,
169 pub connections_active: u64,
170}
171
172#[derive(Debug, Clone, Default, Serialize, Deserialize)]
174pub struct QualityMetrics {
175 pub recall_at_k: HashMap<usize, f64>,
176 pub precision_at_k: HashMap<usize, f64>,
177 pub ndcg_at_k: HashMap<usize, f64>,
178 pub mean_reciprocal_rank: f64,
179 pub average_similarity_score: f64,
180 pub similarity_distribution: Vec<f64>,
181 pub query_diversity: f64,
182 pub result_diversity: f64,
183 pub relevance_correlation: f64,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct CustomMetric {
189 pub name: String,
190 pub value: f64,
191 pub unit: String,
192 pub description: String,
193 pub timestamp: SystemTime,
194 pub tags: HashMap<String, String>,
195}
196
197#[derive(Debug)]
199pub struct PerformanceMonitor {
200 thresholds: Arc<RwLock<PerformanceThresholds>>,
201 alert_history: Arc<RwLock<VecDeque<Alert>>>,
202 current_alerts: Arc<RwLock<HashMap<String, Alert>>>,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct PerformanceThresholds {
208 pub max_latency_ms: u64,
209 pub min_throughput_qps: f64,
210 pub max_memory_usage_percent: f64,
211 pub max_cpu_usage_percent: f64,
212 pub min_cache_hit_ratio: f64,
213 pub max_error_rate_percent: f64,
214 pub min_recall_at_10: f64,
215 pub max_index_size_gb: f64,
216}
217
218impl Default for PerformanceThresholds {
219 fn default() -> Self {
220 Self {
221 max_latency_ms: 100,
222 min_throughput_qps: 100.0,
223 max_memory_usage_percent: 80.0,
224 max_cpu_usage_percent: 85.0,
225 min_cache_hit_ratio: 0.8,
226 max_error_rate_percent: 1.0,
227 min_recall_at_10: 0.9,
228 max_index_size_gb: 10.0,
229 }
230 }
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct Alert {
236 pub id: String,
237 pub alert_type: AlertType,
238 pub severity: AlertSeverity,
239 pub message: String,
240 pub timestamp: SystemTime,
241 pub resolved: bool,
242 pub resolved_timestamp: Option<SystemTime>,
243 pub metadata: HashMap<String, String>,
244}
245
246#[derive(Debug)]
248pub struct QueryAnalyzer {
249 query_patterns: Arc<RwLock<HashMap<String, QueryPattern>>>,
250 popular_queries: Arc<RwLock<VecDeque<PopularQuery>>>,
251 usage_trends: Arc<RwLock<UsageTrends>>,
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct QueryPattern {
257 pub pattern_id: String,
258 pub frequency: u64,
259 pub avg_latency: Duration,
260 pub success_rate: f64,
261 pub peak_hours: Vec<u8>, pub similarity_threshold_distribution: Vec<f64>,
263 pub result_size_distribution: Vec<usize>,
264 pub user_segments: HashMap<String, u64>,
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct PopularQuery {
270 pub query_text: String,
271 pub frequency: u64,
272 pub avg_similarity_score: f64,
273 pub timestamp: SystemTime,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct UsageTrends {
279 pub daily_query_counts: VecDeque<(SystemTime, u64)>,
280 pub hourly_patterns: [u64; 24],
281 pub weekly_patterns: [u64; 7],
282 pub growth_rate: f64,
283 pub seasonal_patterns: HashMap<String, f64>,
284 pub user_growth: f64,
285 pub feature_adoption: HashMap<String, f64>,
286}
287
288pub struct AlertManager {
290 config: AlertConfig,
291 notification_channels: Vec<Box<dyn NotificationChannel>>,
292 alert_rules: Arc<RwLock<Vec<AlertRule>>>,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
297pub struct AlertConfig {
298 pub enable_email: bool,
299 pub enable_slack: bool,
300 pub enable_webhook: bool,
301 pub email_recipients: Vec<String>,
302 pub slack_webhook: Option<String>,
303 pub webhook_url: Option<String>,
304 pub cooldown_period: Duration,
305 pub escalation_enabled: bool,
306}
307
308impl Default for AlertConfig {
309 fn default() -> Self {
310 Self {
311 enable_email: false,
312 enable_slack: false,
313 enable_webhook: false,
314 email_recipients: Vec::new(),
315 slack_webhook: None,
316 webhook_url: None,
317 cooldown_period: Duration::from_secs(300), escalation_enabled: false,
319 }
320 }
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize)]
325pub struct AlertRule {
326 pub name: String,
327 pub condition: String, pub severity: AlertSeverity,
329 pub enabled: bool,
330 pub cooldown: Duration,
331 pub actions: Vec<String>,
332}
333
334pub trait NotificationChannel: Send + Sync {
336 fn send_notification(&self, alert: &Alert) -> Result<()>;
337 fn get_channel_type(&self) -> String;
338}
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct DashboardData {
343 pub overview: OverviewData,
344 pub query_performance: QueryPerformanceData,
345 pub system_health: SystemHealthData,
346 pub quality_metrics: QualityMetricsData,
347 pub usage_analytics: UsageAnalyticsData,
348 pub alerts: Vec<Alert>,
349 pub last_updated: SystemTime,
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct OverviewData {
355 pub total_queries_today: u64,
356 pub average_latency: Duration,
357 pub current_qps: f64,
358 pub system_health_score: f64,
359 pub active_alerts: u64,
360 pub index_size: u64,
361 pub vector_count: u64,
362 pub cache_hit_ratio: f64,
363}
364
365#[derive(Debug, Clone, Serialize, Deserialize, Default)]
367pub struct QueryPerformanceData {
368 pub latency_trends: Vec<(SystemTime, Duration)>,
369 pub throughput_trends: Vec<(SystemTime, f64)>,
370 pub error_rate_trends: Vec<(SystemTime, f64)>,
371 pub top_slow_queries: Vec<(String, Duration)>,
372 pub query_distribution: HashMap<String, u64>,
373 pub performance_percentiles: HashMap<String, Duration>,
374}
375
376#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct SystemHealthData {
379 pub cpu_usage: f64,
380 pub memory_usage: f64,
381 pub disk_usage: f64,
382 pub network_throughput: f64,
383 pub resource_trends: Vec<(SystemTime, f64)>,
384 pub capacity_forecast: Vec<(SystemTime, f64)>,
385 pub bottlenecks: Vec<String>,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
390pub struct QualityMetricsData {
391 pub recall_trends: Vec<(SystemTime, f64)>,
392 pub precision_trends: Vec<(SystemTime, f64)>,
393 pub similarity_distribution: Vec<f64>,
394 pub quality_score: f64,
395 pub quality_trends: Vec<(SystemTime, f64)>,
396 pub benchmark_comparisons: HashMap<String, f64>,
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, Default)]
401pub struct UsageAnalyticsData {
402 pub user_activity: Vec<(SystemTime, u64)>,
403 pub popular_queries: Vec<PopularQuery>,
404 pub usage_patterns: HashMap<String, f64>,
405 pub growth_metrics: GrowthMetrics,
406 pub feature_usage: HashMap<String, u64>,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct GrowthMetrics {
412 pub daily_growth_rate: f64,
413 pub weekly_growth_rate: f64,
414 pub monthly_growth_rate: f64,
415 pub user_retention: f64,
416 pub query_volume_growth: f64,
417}
418
419impl Clone for VectorAnalyticsEngine {
420 fn clone(&self) -> Self {
421 Self {
422 config: self.config.clone(),
423 metrics_collector: Arc::clone(&self.metrics_collector),
424 performance_monitor: Arc::clone(&self.performance_monitor),
425 query_analyzer: Arc::clone(&self.query_analyzer),
426 alert_manager: Arc::clone(&self.alert_manager),
427 dashboard_data: Arc::clone(&self.dashboard_data),
428 event_broadcaster: self.event_broadcaster.clone(),
429 }
430 }
431}
432
433impl VectorAnalyticsEngine {
434 pub fn new(config: AnalyticsConfig) -> Self {
435 let (event_broadcaster, _) = broadcast::channel(1000);
436
437 let metrics_collector = Arc::new(MetricsCollector::new());
438 let performance_monitor = Arc::new(PerformanceMonitor::new());
439 let query_analyzer = Arc::new(QueryAnalyzer::new());
440 let alert_manager = Arc::new(AlertManager::new(AlertConfig::default()));
441 let dashboard_data = Arc::new(RwLock::new(DashboardData::default()));
442
443 Self {
444 config,
445 metrics_collector,
446 performance_monitor,
447 query_analyzer,
448 alert_manager,
449 dashboard_data,
450 event_broadcaster,
451 }
452 }
453
454 pub fn record_query_execution(
456 &self,
457 query_id: String,
458 operation_type: String,
459 duration: Duration,
460 result_count: usize,
461 success: bool,
462 ) -> Result<()> {
463 {
465 let mut metrics = self.metrics_collector.query_metrics.write();
466 metrics.total_queries += 1;
467
468 if success {
469 metrics.successful_queries += 1;
470 } else {
471 metrics.failed_queries += 1;
472 }
473
474 self.update_latency_statistics(&mut metrics, duration);
476
477 *metrics
479 .query_distribution
480 .entry(operation_type.clone())
481 .or_insert(0) += 1;
482
483 metrics.error_rate =
485 (metrics.failed_queries as f64) / (metrics.total_queries as f64) * 100.0;
486 }
487
488 self.check_performance_alerts(duration, success)?;
490
491 let event = AnalyticsEvent::QueryExecuted {
493 query_id,
494 operation_type,
495 duration,
496 result_count,
497 success,
498 timestamp: SystemTime::now(),
499 };
500
501 let _ = self.event_broadcaster.send(event);
502
503 Ok(())
504 }
505
506 fn update_latency_statistics(&self, metrics: &mut QueryMetrics, duration: Duration) {
507 let timestamp = SystemTime::now();
508
509 metrics.latency_history.push_back((timestamp, duration));
511 if metrics.latency_history.len() > self.config.max_metrics_history {
512 metrics.latency_history.pop_front();
513 }
514
515 let latencies: Vec<Duration> = metrics.latency_history.iter().map(|(_, d)| *d).collect();
517
518 if !latencies.is_empty() {
519 let mut sorted_latencies = latencies.clone();
520 sorted_latencies.sort();
521
522 let len = sorted_latencies.len();
523 metrics.p50_latency = sorted_latencies[len / 2];
524 metrics.p95_latency = sorted_latencies[(len as f64 * 0.95) as usize];
525 metrics.p99_latency = sorted_latencies[(len as f64 * 0.99) as usize];
526 metrics.max_latency = *sorted_latencies.last().unwrap();
527 metrics.min_latency = *sorted_latencies.first().unwrap();
528
529 let total_duration: Duration = latencies.iter().sum();
530 metrics.average_latency = total_duration / len as u32;
531 }
532 }
533
534 fn check_performance_alerts(&self, duration: Duration, success: bool) -> Result<()> {
535 let thresholds = self.performance_monitor.thresholds.read();
536
537 if duration.as_millis() > thresholds.max_latency_ms as u128 {
539 self.create_alert(
540 AlertType::HighLatency,
541 AlertSeverity::Warning,
542 format!(
543 "Query latency {}ms exceeds threshold {}ms",
544 duration.as_millis(),
545 thresholds.max_latency_ms
546 ),
547 )?;
548 }
549
550 if !success {
552 let metrics = self.metrics_collector.query_metrics.read();
553 if metrics.error_rate > thresholds.max_error_rate_percent {
554 self.create_alert(
555 AlertType::SystemError,
556 AlertSeverity::Critical,
557 format!(
558 "Error rate {:.2}% exceeds threshold {:.2}%",
559 metrics.error_rate, thresholds.max_error_rate_percent
560 ),
561 )?;
562 }
563 }
564
565 Ok(())
566 }
567
568 fn create_alert(
569 &self,
570 alert_type: AlertType,
571 severity: AlertSeverity,
572 message: String,
573 ) -> Result<()> {
574 let alert_id = format!(
575 "{:?}_{}",
576 alert_type,
577 SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
578 );
579
580 let alert = Alert {
581 id: alert_id,
582 alert_type,
583 severity,
584 message,
585 timestamp: SystemTime::now(),
586 resolved: false,
587 resolved_timestamp: None,
588 metadata: HashMap::new(),
589 };
590
591 {
593 let mut current_alerts = self.performance_monitor.current_alerts.write();
594 current_alerts.insert(alert.id.clone(), alert.clone());
595
596 let mut alert_history = self.performance_monitor.alert_history.write();
597 alert_history.push_back(alert.clone());
598 if alert_history.len() > self.config.max_metrics_history {
599 alert_history.pop_front();
600 }
601 }
602
603 self.alert_manager.send_alert(&alert)?;
605
606 let event = AnalyticsEvent::PerformanceAlert {
608 alert_type: alert.alert_type.clone(),
609 message: alert.message.clone(),
610 severity: alert.severity.clone(),
611 timestamp: alert.timestamp,
612 };
613
614 let _ = self.event_broadcaster.send(event);
615
616 Ok(())
617 }
618
619 pub fn record_distributed_query(
621 &self,
622 query_id: String,
623 node_count: usize,
624 total_duration: Duration,
625 _federation_id: Option<String>,
626 success: bool,
627 ) -> Result<()> {
628 {
630 let mut metrics = self.metrics_collector.query_metrics.write();
631 metrics.total_queries += 1;
632
633 if success {
634 metrics.successful_queries += 1;
635 } else {
636 metrics.failed_queries += 1;
637 }
638
639 self.update_latency_statistics(&mut metrics, total_duration);
641
642 let operation_type = format!("distributed_query_{node_count}_nodes");
644 *metrics
645 .query_distribution
646 .entry(operation_type)
647 .or_insert(0) += 1;
648
649 metrics.error_rate = if metrics.total_queries > 0 {
651 metrics.failed_queries as f64 / metrics.total_queries as f64
652 } else {
653 0.0
654 };
655 }
656
657 let event = AnalyticsEvent::QueryExecuted {
659 query_id: query_id.clone(),
660 operation_type: format!("distributed_query_{node_count}_nodes"),
661 duration: total_duration,
662 result_count: node_count,
663 success,
664 timestamp: SystemTime::now(),
665 };
666
667 let _ = self.event_broadcaster.send(event);
668
669 if total_duration.as_millis() > 5000 {
671 let message = format!(
672 "Distributed query {} across {} nodes took {}ms",
673 query_id,
674 node_count,
675 total_duration.as_millis()
676 );
677
678 self.create_alert(AlertType::HighLatency, AlertSeverity::Warning, message)?;
679 }
680
681 Ok(())
682 }
683
684 pub fn update_system_metrics(
686 &self,
687 cpu_usage: f64,
688 memory_usage: f64,
689 memory_total: u64,
690 ) -> Result<()> {
691 {
692 let mut metrics = self.metrics_collector.system_metrics.write();
693 metrics.cpu_usage = cpu_usage;
694 metrics.memory_usage = memory_usage;
695 metrics.memory_total = memory_total;
696 metrics.memory_available =
697 memory_total - (memory_total as f64 * memory_usage / 100.0) as u64;
698 }
699
700 let thresholds = self.performance_monitor.thresholds.read();
702
703 if cpu_usage > thresholds.max_cpu_usage_percent {
704 self.create_alert(
705 AlertType::HighCpuUsage,
706 AlertSeverity::Warning,
707 format!(
708 "CPU usage {:.2}% exceeds threshold {:.2}%",
709 cpu_usage, thresholds.max_cpu_usage_percent
710 ),
711 )?;
712 }
713
714 if memory_usage > thresholds.max_memory_usage_percent {
715 self.create_alert(
716 AlertType::HighMemoryUsage,
717 AlertSeverity::Warning,
718 format!(
719 "Memory usage {:.2}% exceeds threshold {:.2}%",
720 memory_usage, thresholds.max_memory_usage_percent
721 ),
722 )?;
723 }
724
725 Ok(())
726 }
727
728 pub fn get_dashboard_data(&self) -> DashboardData {
730 self.dashboard_data.read().clone()
731 }
732
733 pub fn subscribe_to_events(&self) -> broadcast::Receiver<AnalyticsEvent> {
735 self.event_broadcaster.subscribe()
736 }
737
738 pub fn generate_report(
740 &self,
741 start_time: SystemTime,
742 end_time: SystemTime,
743 ) -> Result<AnalyticsReport> {
744 let query_metrics = self.metrics_collector.query_metrics.read().clone();
745 let system_metrics = self.metrics_collector.system_metrics.read().clone();
746 let quality_metrics = self.metrics_collector.quality_metrics.read().clone();
747
748 Ok(AnalyticsReport {
749 report_id: format!(
750 "report_{}",
751 SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
752 ),
753 start_time,
754 end_time,
755 query_metrics,
756 system_metrics,
757 quality_metrics,
758 alerts: self.get_alerts_in_range(start_time, end_time)?,
759 recommendations: self.generate_recommendations()?,
760 generated_at: SystemTime::now(),
761 })
762 }
763
764 fn get_alerts_in_range(
765 &self,
766 start_time: SystemTime,
767 end_time: SystemTime,
768 ) -> Result<Vec<Alert>> {
769 let alert_history = self.performance_monitor.alert_history.read();
770 Ok(alert_history
771 .iter()
772 .filter(|alert| alert.timestamp >= start_time && alert.timestamp <= end_time)
773 .cloned()
774 .collect())
775 }
776
777 fn generate_recommendations(&self) -> Result<Vec<String>> {
778 let mut recommendations = Vec::new();
779
780 let query_metrics = self.metrics_collector.query_metrics.read();
781 let system_metrics = self.metrics_collector.system_metrics.read();
782
783 if query_metrics.average_latency.as_millis() > 50 {
785 recommendations
786 .push("Consider optimizing queries or adding more powerful hardware".to_string());
787 }
788
789 if system_metrics.memory_usage > 80.0 {
790 recommendations.push(
791 "Memory usage is high. Consider increasing memory or optimizing data structures"
792 .to_string(),
793 );
794 }
795
796 if system_metrics.cache_hit_ratio < 0.8 {
797 recommendations.push("Cache hit ratio is low. Consider increasing cache size or improving cache strategy".to_string());
798 }
799
800 Ok(recommendations)
801 }
802
803 pub fn export_metrics(&self, format: ExportFormat, destination: &str) -> Result<()> {
805 let metrics_data = self.collect_all_metrics()?;
806
807 match format {
808 ExportFormat::Json => self.export_as_json(&metrics_data, destination),
809 ExportFormat::Csv => self.export_as_csv(&metrics_data, destination),
810 ExportFormat::Prometheus => self.export_as_prometheus(&metrics_data, destination),
811 ExportFormat::InfluxDb => self.export_as_influxdb(&metrics_data, destination),
812 }
813 }
814
815 fn collect_all_metrics(&self) -> Result<HashMap<String, serde_json::Value>> {
816 let mut all_metrics = HashMap::new();
817
818 let query_metrics = self.metrics_collector.query_metrics.read();
819 let system_metrics = self.metrics_collector.system_metrics.read();
820 let quality_metrics = self.metrics_collector.quality_metrics.read();
821
822 all_metrics.insert(
823 "query_metrics".to_string(),
824 serde_json::to_value(&*query_metrics)?,
825 );
826 all_metrics.insert(
827 "system_metrics".to_string(),
828 serde_json::to_value(&*system_metrics)?,
829 );
830 all_metrics.insert(
831 "quality_metrics".to_string(),
832 serde_json::to_value(&*quality_metrics)?,
833 );
834
835 Ok(all_metrics)
836 }
837
838 fn export_as_json(
839 &self,
840 metrics: &HashMap<String, serde_json::Value>,
841 destination: &str,
842 ) -> Result<()> {
843 let json_data = serde_json::to_string_pretty(metrics)?;
844 std::fs::write(destination, json_data)?;
845 Ok(())
846 }
847
848 fn export_as_csv(
849 &self,
850 metrics: &HashMap<String, serde_json::Value>,
851 destination: &str,
852 ) -> Result<()> {
853 let mut csv_content = String::new();
854 csv_content.push_str("timestamp,metric_name,value,category\n");
855
856 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S");
857
858 if let Some(query_metrics) = metrics.get("query_metrics") {
860 if let Some(obj) = query_metrics.as_object() {
861 for (key, value) in obj {
862 if let Some(num_val) = value.as_f64() {
863 csv_content.push_str(&format!("{timestamp},query_{key},{num_val},query\n"));
864 }
865 }
866 }
867 }
868
869 if let Some(system_metrics) = metrics.get("system_metrics") {
871 if let Some(obj) = system_metrics.as_object() {
872 for (key, value) in obj {
873 if let Some(num_val) = value.as_f64() {
874 csv_content
875 .push_str(&format!("{timestamp},system_{key},{num_val},system\n"));
876 }
877 }
878 }
879 }
880
881 std::fs::write(destination, csv_content)?;
882 Ok(())
883 }
884
885 fn export_as_prometheus(
886 &self,
887 metrics: &HashMap<String, serde_json::Value>,
888 destination: &str,
889 ) -> Result<()> {
890 let mut prometheus_content = String::new();
891 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
892
893 if let Some(query_metrics) = metrics.get("query_metrics") {
895 if let Some(obj) = query_metrics.as_object() {
896 for (key, value) in obj {
897 if let Some(num_val) = value.as_f64() {
898 prometheus_content
899 .push_str(&format!("# HELP vector_query_{key} Query metric {key}\n"));
900 prometheus_content.push_str(&format!("# TYPE vector_query_{key} gauge\n"));
901 prometheus_content
902 .push_str(&format!("vector_query_{key} {num_val} {timestamp}\n"));
903 }
904 }
905 }
906 }
907
908 if let Some(system_metrics) = metrics.get("system_metrics") {
910 if let Some(obj) = system_metrics.as_object() {
911 for (key, value) in obj {
912 if let Some(num_val) = value.as_f64() {
913 prometheus_content
914 .push_str(&format!("# HELP vector_system_{key} System metric {key}\n"));
915 prometheus_content.push_str(&format!("# TYPE vector_system_{key} gauge\n"));
916 prometheus_content
917 .push_str(&format!("vector_system_{key} {num_val} {timestamp}\n"));
918 }
919 }
920 }
921 }
922
923 std::fs::write(destination, prometheus_content)?;
924 Ok(())
925 }
926
927 fn export_as_influxdb(
928 &self,
929 metrics: &HashMap<String, serde_json::Value>,
930 destination: &str,
931 ) -> Result<()> {
932 let mut influxdb_content = String::new();
933 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
934
935 if let Some(query_metrics) = metrics.get("query_metrics") {
937 if let Some(obj) = query_metrics.as_object() {
938 for (key, value) in obj {
939 if let Some(num_val) = value.as_f64() {
940 influxdb_content.push_str(&format!(
941 "vector_query,type=query {key}={num_val} {timestamp}\n"
942 ));
943 }
944 }
945 }
946 }
947
948 if let Some(system_metrics) = metrics.get("system_metrics") {
950 if let Some(obj) = system_metrics.as_object() {
951 for (key, value) in obj {
952 if let Some(num_val) = value.as_f64() {
953 influxdb_content.push_str(&format!(
954 "vector_system,type=system {key}={num_val} {timestamp}\n"
955 ));
956 }
957 }
958 }
959 }
960
961 std::fs::write(destination, influxdb_content)?;
962 Ok(())
963 }
964
965 pub async fn start_dashboard_updates(&self) -> Result<()> {
967 let dashboard_data = Arc::clone(&self.dashboard_data);
968 let metrics_collector = Arc::clone(&self.metrics_collector);
969 let performance_monitor = Arc::clone(&self.performance_monitor);
970 let refresh_interval = Duration::from_secs(self.config.dashboard_refresh_interval);
971
972 tokio::spawn(async move {
973 let mut interval = tokio::time::interval(refresh_interval);
974
975 loop {
976 interval.tick().await;
977
978 let updated_data =
980 Self::build_dashboard_data(&metrics_collector, &performance_monitor).await;
981
982 {
983 let mut data = dashboard_data.write();
984 *data = updated_data;
985 }
986 }
987 });
988
989 Ok(())
990 }
991
992 async fn build_dashboard_data(
993 metrics_collector: &MetricsCollector,
994 performance_monitor: &PerformanceMonitor,
995 ) -> DashboardData {
996 let query_metrics = metrics_collector.query_metrics.read().clone();
997 let system_metrics = metrics_collector.system_metrics.read().clone();
998 let quality_metrics = metrics_collector.quality_metrics.read().clone();
999 let current_alerts: Vec<Alert> = performance_monitor
1000 .current_alerts
1001 .read()
1002 .values()
1003 .cloned()
1004 .collect();
1005
1006 let health_score = Self::calculate_health_score(&system_metrics, &query_metrics);
1008
1009 let current_qps = Self::calculate_current_qps(&query_metrics);
1011
1012 DashboardData {
1013 overview: OverviewData {
1014 total_queries_today: query_metrics.total_queries,
1015 average_latency: query_metrics.average_latency,
1016 current_qps,
1017 system_health_score: health_score,
1018 active_alerts: current_alerts.len() as u64,
1019 index_size: system_metrics.index_size,
1020 vector_count: system_metrics.vector_count,
1021 cache_hit_ratio: system_metrics.cache_hit_ratio,
1022 },
1023 query_performance: QueryPerformanceData {
1024 latency_trends: query_metrics.latency_history.iter().cloned().collect(),
1025 throughput_trends: query_metrics.throughput_history.iter().cloned().collect(),
1026 error_rate_trends: vec![(SystemTime::now(), query_metrics.error_rate)],
1027 top_slow_queries: vec![], query_distribution: query_metrics.query_distribution.clone(),
1029 performance_percentiles: {
1030 let mut percentiles = HashMap::new();
1031 percentiles.insert("p50".to_string(), query_metrics.p50_latency);
1032 percentiles.insert("p95".to_string(), query_metrics.p95_latency);
1033 percentiles.insert("p99".to_string(), query_metrics.p99_latency);
1034 percentiles
1035 },
1036 },
1037 system_health: SystemHealthData {
1038 cpu_usage: system_metrics.cpu_usage,
1039 memory_usage: system_metrics.memory_usage,
1040 disk_usage: system_metrics.disk_usage,
1041 network_throughput: 0.0, resource_trends: vec![(SystemTime::now(), system_metrics.cpu_usage)],
1043 capacity_forecast: vec![], bottlenecks: Self::identify_bottlenecks(&system_metrics, &query_metrics),
1045 },
1046 quality_metrics: QualityMetricsData {
1047 recall_trends: vec![],
1048 precision_trends: vec![],
1049 similarity_distribution: quality_metrics.similarity_distribution.clone(),
1050 quality_score: quality_metrics.average_similarity_score,
1051 quality_trends: vec![(SystemTime::now(), quality_metrics.average_similarity_score)],
1052 benchmark_comparisons: HashMap::new(),
1053 },
1054 usage_analytics: UsageAnalyticsData {
1055 user_activity: vec![(SystemTime::now(), query_metrics.total_queries)],
1056 popular_queries: vec![], usage_patterns: HashMap::new(),
1058 growth_metrics: GrowthMetrics::default(),
1059 feature_usage: HashMap::new(),
1060 },
1061 alerts: current_alerts,
1062 last_updated: SystemTime::now(),
1063 }
1064 }
1065
1066 fn calculate_health_score(system_metrics: &SystemMetrics, query_metrics: &QueryMetrics) -> f64 {
1067 let mut score = 100.0;
1068
1069 if system_metrics.cpu_usage > 80.0 {
1071 score -= (system_metrics.cpu_usage - 80.0) * 0.5;
1072 }
1073 if system_metrics.memory_usage > 80.0 {
1074 score -= (system_metrics.memory_usage - 80.0) * 0.5;
1075 }
1076
1077 if query_metrics.error_rate > 1.0 {
1079 score -= query_metrics.error_rate * 10.0;
1080 }
1081
1082 if query_metrics.average_latency.as_millis() > 100 {
1084 score -= (query_metrics.average_latency.as_millis() as f64 - 100.0) * 0.1;
1085 }
1086
1087 score.clamp(0.0, 100.0)
1088 }
1089
1090 fn calculate_current_qps(query_metrics: &QueryMetrics) -> f64 {
1091 if query_metrics.latency_history.len() < 2 {
1093 return 0.0;
1094 }
1095
1096 let now = SystemTime::now();
1097 let one_second_ago = now - Duration::from_secs(1);
1098
1099 let recent_queries = query_metrics
1100 .latency_history
1101 .iter()
1102 .filter(|(timestamp, _)| *timestamp >= one_second_ago)
1103 .count();
1104
1105 recent_queries as f64
1106 }
1107
1108 fn identify_bottlenecks(
1109 system_metrics: &SystemMetrics,
1110 query_metrics: &QueryMetrics,
1111 ) -> Vec<String> {
1112 let mut bottlenecks = Vec::new();
1113
1114 if system_metrics.cpu_usage > 90.0 {
1115 bottlenecks.push("High CPU usage".to_string());
1116 }
1117
1118 if system_metrics.memory_usage > 90.0 {
1119 bottlenecks.push("High memory usage".to_string());
1120 }
1121
1122 if query_metrics.average_latency.as_millis() > 500 {
1123 bottlenecks.push("High query latency".to_string());
1124 }
1125
1126 if system_metrics.cache_hit_ratio < 0.7 {
1127 bottlenecks.push("Low cache hit ratio".to_string());
1128 }
1129
1130 bottlenecks
1131 }
1132
1133 pub fn generate_dashboard_html(&self) -> Result<String> {
1135 let dashboard_data = self.get_dashboard_data();
1136
1137 let html = format!(
1138 r#"
1139<!DOCTYPE html>
1140<html>
1141<head>
1142 <title>OxiRS Vector Search Analytics Dashboard</title>
1143 <style>
1144 body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
1145 .dashboard {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }}
1146 .card {{ background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
1147 .metric {{ display: flex; justify-content: space-between; margin: 10px 0; }}
1148 .metric-value {{ font-weight: bold; color: #007acc; }}
1149 .alert {{ padding: 10px; margin: 5px 0; border-radius: 4px; }}
1150 .alert-critical {{ background-color: #ffebee; border-left: 4px solid #f44336; }}
1151 .alert-warning {{ background-color: #fff3e0; border-left: 4px solid #ff9800; }}
1152 .alert-info {{ background-color: #e3f2fd; border-left: 4px solid #2196f3; }}
1153 .health-score {{ font-size: 2em; text-align: center; }}
1154 .health-good {{ color: #4caf50; }}
1155 .health-warning {{ color: #ff9800; }}
1156 .health-critical {{ color: #f44336; }}
1157 h1 {{ color: #333; text-align: center; }}
1158 h2 {{ color: #555; margin-top: 0; }}
1159 .refresh-time {{ text-align: center; color: #888; font-size: 0.9em; }}
1160 </style>
1161 <script>
1162 function refreshPage() {{
1163 window.location.reload();
1164 }}
1165 setInterval(refreshPage, 30000); // Refresh every 30 seconds
1166 </script>
1167</head>
1168<body>
1169 <h1>🔍 OxiRS Vector Search Analytics Dashboard</h1>
1170 <p class="refresh-time">Last updated: {}</p>
1171
1172 <div class="dashboard">
1173 <div class="card">
1174 <h2>System Health</h2>
1175 <div class="health-score {}">{:.1}%</div>
1176 <div class="metric">
1177 <span>Active Alerts:</span>
1178 <span class="metric-value">{}</span>
1179 </div>
1180 </div>
1181
1182 <div class="card">
1183 <h2>Query Performance</h2>
1184 <div class="metric">
1185 <span>Total Queries Today:</span>
1186 <span class="metric-value">{}</span>
1187 </div>
1188 <div class="metric">
1189 <span>Average Latency:</span>
1190 <span class="metric-value">{}ms</span>
1191 </div>
1192 <div class="metric">
1193 <span>Current QPS:</span>
1194 <span class="metric-value">{:.1}</span>
1195 </div>
1196 </div>
1197
1198 <div class="card">
1199 <h2>System Resources</h2>
1200 <div class="metric">
1201 <span>CPU Usage:</span>
1202 <span class="metric-value">{:.1}%</span>
1203 </div>
1204 <div class="metric">
1205 <span>Memory Usage:</span>
1206 <span class="metric-value">{:.1}%</span>
1207 </div>
1208 <div class="metric">
1209 <span>Cache Hit Ratio:</span>
1210 <span class="metric-value">{:.1}%</span>
1211 </div>
1212 </div>
1213
1214 <div class="card">
1215 <h2>Vector Index</h2>
1216 <div class="metric">
1217 <span>Vector Count:</span>
1218 <span class="metric-value">{}</span>
1219 </div>
1220 <div class="metric">
1221 <span>Index Size:</span>
1222 <span class="metric-value">{} MB</span>
1223 </div>
1224 </div>
1225
1226 <div class="card">
1227 <h2>Active Alerts</h2>
1228 {}
1229 </div>
1230 </div>
1231</body>
1232</html>
1233 "#,
1234 chrono::DateTime::<chrono::Utc>::from(dashboard_data.last_updated)
1235 .format("%Y-%m-%d %H:%M:%S UTC"),
1236 if dashboard_data.overview.system_health_score >= 80.0 {
1237 "health-good"
1238 } else if dashboard_data.overview.system_health_score >= 60.0 {
1239 "health-warning"
1240 } else {
1241 "health-critical"
1242 },
1243 dashboard_data.overview.system_health_score,
1244 dashboard_data.overview.active_alerts,
1245 dashboard_data.overview.total_queries_today,
1246 dashboard_data.overview.average_latency.as_millis(),
1247 dashboard_data.overview.current_qps,
1248 dashboard_data.system_health.cpu_usage,
1249 dashboard_data.system_health.memory_usage,
1250 dashboard_data.overview.cache_hit_ratio * 100.0,
1251 dashboard_data.overview.vector_count,
1252 dashboard_data.overview.index_size / (1024 * 1024), Self::format_alerts(&dashboard_data.alerts)
1254 );
1255
1256 Ok(html)
1257 }
1258
1259 fn format_alerts(alerts: &[Alert]) -> String {
1260 if alerts.is_empty() {
1261 return "<p>No active alerts</p>".to_string();
1262 }
1263
1264 alerts
1265 .iter()
1266 .map(|alert| {
1267 let class = match alert.severity {
1268 AlertSeverity::Critical => "alert-critical",
1269 AlertSeverity::Warning => "alert-warning",
1270 AlertSeverity::Info => "alert-info",
1271 };
1272
1273 format!(
1274 "<div class=\"alert {}\">
1275 <strong>{:?}</strong>: {}
1276 <br><small>{}</small>
1277 </div>",
1278 class,
1279 alert.alert_type,
1280 alert.message,
1281 chrono::DateTime::<chrono::Utc>::from(alert.timestamp).format("%H:%M:%S")
1282 )
1283 })
1284 .collect::<Vec<_>>()
1285 .join("\n")
1286 }
1287
1288 pub async fn start_system_monitoring(&self) -> Result<()> {
1290 let analytics_engine = self.clone();
1291
1292 tokio::spawn(async move {
1293 let mut interval = tokio::time::interval(Duration::from_secs(5));
1294
1295 loop {
1296 interval.tick().await;
1297
1298 if let Ok(system_info) = Self::collect_system_info().await {
1300 let _ = analytics_engine.update_system_metrics(
1301 system_info.cpu_usage,
1302 system_info.memory_usage,
1303 system_info.memory_total,
1304 );
1305 }
1306 }
1307 });
1308
1309 Ok(())
1310 }
1311
1312 async fn collect_system_info() -> Result<SystemInfo> {
1313 Ok(SystemInfo {
1316 cpu_usage: {
1317 #[allow(unused_imports)]
1318 use scirs2_core::random::{Random, Rng};
1319 let mut rng = Random::seed(42);
1320 45.0 + (rng.gen_range(0.0..1.0) * 20.0) },
1322 memory_usage: {
1323 #[allow(unused_imports)]
1324 use scirs2_core::random::{Random, Rng};
1325 let mut rng = Random::seed(42);
1326 60.0 + (rng.gen_range(0.0..1.0) * 20.0) },
1328 memory_total: 16 * 1024 * 1024 * 1024, disk_usage: 30.0,
1330 network_throughput: 100.0,
1331 })
1332 }
1333}
1334
1335#[derive(Debug, Clone)]
1337pub struct SystemInfo {
1338 pub cpu_usage: f64,
1339 pub memory_usage: f64,
1340 pub memory_total: u64,
1341 pub disk_usage: f64,
1342 pub network_throughput: f64,
1343}
1344
1345pub struct PerformanceProfiler {
1347 profiles: Arc<RwLock<HashMap<String, ProfileData>>>,
1348 active_profiles: Arc<RwLock<HashMap<String, Instant>>>,
1349}
1350
1351#[derive(Debug, Clone, Serialize, Deserialize)]
1352pub struct ProfileData {
1353 pub function_name: String,
1354 pub total_calls: u64,
1355 pub total_time: Duration,
1356 pub average_time: Duration,
1357 pub min_time: Duration,
1358 pub max_time: Duration,
1359 pub call_history: VecDeque<(SystemTime, Duration)>,
1360}
1361
1362impl Default for PerformanceProfiler {
1363 fn default() -> Self {
1364 Self::new()
1365 }
1366}
1367
1368impl PerformanceProfiler {
1369 pub fn new() -> Self {
1370 Self {
1371 profiles: Arc::new(RwLock::new(HashMap::new())),
1372 active_profiles: Arc::new(RwLock::new(HashMap::new())),
1373 }
1374 }
1375
1376 pub fn start_profile(&self, function_name: &str) -> String {
1377 let profile_id = format!(
1378 "{}_{}",
1379 function_name,
1380 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
1381 );
1382 let mut active = self.active_profiles.write();
1383 active.insert(profile_id.clone(), Instant::now());
1384 profile_id
1385 }
1386
1387 pub fn end_profile(&self, profile_id: &str) -> Result<Duration> {
1388 let mut active = self.active_profiles.write();
1389 let start_time = active
1390 .remove(profile_id)
1391 .ok_or_else(|| anyhow!("Profile ID not found: {}", profile_id))?;
1392
1393 let duration = start_time.elapsed();
1394
1395 let function_name = profile_id.split('_').next().unwrap_or("unknown");
1397
1398 let mut profiles = self.profiles.write();
1400 let profile = profiles
1401 .entry(function_name.to_string())
1402 .or_insert_with(|| ProfileData {
1403 function_name: function_name.to_string(),
1404 total_calls: 0,
1405 total_time: Duration::from_nanos(0),
1406 average_time: Duration::from_nanos(0),
1407 min_time: Duration::from_secs(u64::MAX),
1408 max_time: Duration::from_nanos(0),
1409 call_history: VecDeque::new(),
1410 });
1411
1412 profile.total_calls += 1;
1413 profile.total_time += duration;
1414 profile.average_time = profile.total_time / profile.total_calls as u32;
1415 profile.min_time = profile.min_time.min(duration);
1416 profile.max_time = profile.max_time.max(duration);
1417 profile
1418 .call_history
1419 .push_back((SystemTime::now(), duration));
1420
1421 while profile.call_history.len() > 1000 {
1423 profile.call_history.pop_front();
1424 }
1425
1426 Ok(duration)
1427 }
1428
1429 pub fn get_profile_report(&self) -> HashMap<String, ProfileData> {
1430 self.profiles.read().clone()
1431 }
1432}
1433
1434#[derive(Debug, Clone)]
1436pub enum ExportFormat {
1437 Json,
1438 Csv,
1439 Prometheus,
1440 InfluxDb,
1441}
1442
1443#[derive(Debug, Clone, Serialize, Deserialize)]
1445pub struct AnalyticsReport {
1446 pub report_id: String,
1447 pub start_time: SystemTime,
1448 pub end_time: SystemTime,
1449 pub query_metrics: QueryMetrics,
1450 pub system_metrics: SystemMetrics,
1451 pub quality_metrics: QualityMetrics,
1452 pub alerts: Vec<Alert>,
1453 pub recommendations: Vec<String>,
1454 pub generated_at: SystemTime,
1455}
1456
1457impl Default for MetricsCollector {
1458 fn default() -> Self {
1459 Self::new()
1460 }
1461}
1462
1463impl MetricsCollector {
1464 pub fn new() -> Self {
1465 Self {
1466 query_metrics: Arc::new(RwLock::new(QueryMetrics::default())),
1467 system_metrics: Arc::new(RwLock::new(SystemMetrics::default())),
1468 quality_metrics: Arc::new(RwLock::new(QualityMetrics::default())),
1469 custom_metrics: Arc::new(RwLock::new(HashMap::new())),
1470 }
1471 }
1472}
1473
1474impl Default for PerformanceMonitor {
1475 fn default() -> Self {
1476 Self::new()
1477 }
1478}
1479
1480impl PerformanceMonitor {
1481 pub fn new() -> Self {
1482 Self {
1483 thresholds: Arc::new(RwLock::new(PerformanceThresholds::default())),
1484 alert_history: Arc::new(RwLock::new(VecDeque::new())),
1485 current_alerts: Arc::new(RwLock::new(HashMap::new())),
1486 }
1487 }
1488}
1489
1490impl Default for QueryAnalyzer {
1491 fn default() -> Self {
1492 Self::new()
1493 }
1494}
1495
1496impl QueryAnalyzer {
1497 pub fn new() -> Self {
1498 Self {
1499 query_patterns: Arc::new(RwLock::new(HashMap::new())),
1500 popular_queries: Arc::new(RwLock::new(VecDeque::new())),
1501 usage_trends: Arc::new(RwLock::new(UsageTrends::default())),
1502 }
1503 }
1504}
1505
1506impl AlertManager {
1507 pub fn new(config: AlertConfig) -> Self {
1508 Self {
1509 config,
1510 notification_channels: Vec::new(),
1511 alert_rules: Arc::new(RwLock::new(Vec::new())),
1512 }
1513 }
1514
1515 pub fn send_alert(&self, alert: &Alert) -> Result<()> {
1516 for channel in &self.notification_channels {
1517 if let Err(e) = channel.send_notification(alert) {
1518 eprintln!(
1519 "Failed to send alert via {}: {}",
1520 channel.get_channel_type(),
1521 e
1522 );
1523 }
1524 }
1525 Ok(())
1526 }
1527
1528 pub fn add_notification_channel(&mut self, channel: Box<dyn NotificationChannel>) {
1529 self.notification_channels.push(channel);
1530 }
1531}
1532
1533impl Default for DashboardData {
1534 fn default() -> Self {
1535 Self {
1536 overview: OverviewData::default(),
1537 query_performance: QueryPerformanceData::default(),
1538 system_health: SystemHealthData::default(),
1539 quality_metrics: QualityMetricsData::default(),
1540 usage_analytics: UsageAnalyticsData::default(),
1541 alerts: Vec::new(),
1542 last_updated: SystemTime::now(),
1543 }
1544 }
1545}
1546
1547impl Default for OverviewData {
1548 fn default() -> Self {
1549 Self {
1550 total_queries_today: 0,
1551 average_latency: Duration::from_millis(0),
1552 current_qps: 0.0,
1553 system_health_score: 100.0,
1554 active_alerts: 0,
1555 index_size: 0,
1556 vector_count: 0,
1557 cache_hit_ratio: 0.0,
1558 }
1559 }
1560}
1561
1562impl Default for SystemHealthData {
1563 fn default() -> Self {
1564 Self {
1565 cpu_usage: 0.0,
1566 memory_usage: 0.0,
1567 disk_usage: 0.0,
1568 network_throughput: 0.0,
1569 resource_trends: Vec::new(),
1570 capacity_forecast: Vec::new(),
1571 bottlenecks: Vec::new(),
1572 }
1573 }
1574}
1575
1576impl Default for QualityMetricsData {
1577 fn default() -> Self {
1578 Self {
1579 recall_trends: Vec::new(),
1580 precision_trends: Vec::new(),
1581 similarity_distribution: Vec::new(),
1582 quality_score: 0.0,
1583 quality_trends: Vec::new(),
1584 benchmark_comparisons: HashMap::new(),
1585 }
1586 }
1587}
1588
1589impl Default for UsageTrends {
1590 fn default() -> Self {
1591 Self {
1592 daily_query_counts: VecDeque::new(),
1593 hourly_patterns: [0; 24],
1594 weekly_patterns: [0; 7],
1595 growth_rate: 0.0,
1596 seasonal_patterns: HashMap::new(),
1597 user_growth: 0.0,
1598 feature_adoption: HashMap::new(),
1599 }
1600 }
1601}
1602
1603impl Default for GrowthMetrics {
1604 fn default() -> Self {
1605 Self {
1606 daily_growth_rate: 0.0,
1607 weekly_growth_rate: 0.0,
1608 monthly_growth_rate: 0.0,
1609 user_retention: 0.0,
1610 query_volume_growth: 0.0,
1611 }
1612 }
1613}
1614
1615pub struct EmailNotificationChannel {
1617 smtp_config: SmtpConfig,
1618}
1619
1620#[derive(Debug, Clone)]
1621pub struct SmtpConfig {
1622 pub server: String,
1623 pub port: u16,
1624 pub username: String,
1625 pub password: String,
1626 pub from_address: String,
1627}
1628
1629impl EmailNotificationChannel {
1630 pub fn new(smtp_config: SmtpConfig) -> Self {
1631 Self { smtp_config }
1632 }
1633}
1634
1635impl NotificationChannel for EmailNotificationChannel {
1636 fn send_notification(&self, alert: &Alert) -> Result<()> {
1637 tracing::info!(
1639 "Email notification sent for alert {}: {}",
1640 alert.id,
1641 alert.message
1642 );
1643 Ok(())
1644 }
1645
1646 fn get_channel_type(&self) -> String {
1647 "email".to_string()
1648 }
1649}
1650
1651pub struct SlackNotificationChannel {
1653 webhook_url: String,
1654 client: reqwest::Client,
1655}
1656
1657impl SlackNotificationChannel {
1658 pub fn new(webhook_url: String) -> Self {
1659 Self {
1660 webhook_url,
1661 client: reqwest::Client::new(),
1662 }
1663 }
1664}
1665
1666impl NotificationChannel for SlackNotificationChannel {
1667 fn send_notification(&self, alert: &Alert) -> Result<()> {
1668 let _payload = serde_json::json!({
1669 "text": format!("🚨 Alert: {}", alert.message),
1670 "attachments": [{
1671 "color": match alert.severity {
1672 AlertSeverity::Critical => "danger",
1673 AlertSeverity::Warning => "warning",
1674 AlertSeverity::Info => "good",
1675 },
1676 "fields": [
1677 {
1678 "title": "Alert Type",
1679 "value": format!("{:?}", alert.alert_type),
1680 "short": true
1681 },
1682 {
1683 "title": "Severity",
1684 "value": format!("{:?}", alert.severity),
1685 "short": true
1686 },
1687 {
1688 "title": "Timestamp",
1689 "value": chrono::DateTime::<chrono::Utc>::from(alert.timestamp).format("%Y-%m-%d %H:%M:%S UTC").to_string(),
1690 "short": true
1691 }
1692 ]
1693 }]
1694 });
1695
1696 tracing::info!(
1698 "Slack notification sent for alert {}: {}",
1699 alert.id,
1700 alert.message
1701 );
1702 Ok(())
1703 }
1704
1705 fn get_channel_type(&self) -> String {
1706 "slack".to_string()
1707 }
1708}
1709
1710pub struct WebhookNotificationChannel {
1712 webhook_url: String,
1713 client: reqwest::Client,
1714 headers: HashMap<String, String>,
1715}
1716
1717impl WebhookNotificationChannel {
1718 pub fn new(webhook_url: String) -> Self {
1719 Self {
1720 webhook_url,
1721 client: reqwest::Client::new(),
1722 headers: HashMap::new(),
1723 }
1724 }
1725
1726 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
1727 self.headers = headers;
1728 self
1729 }
1730}
1731
1732impl NotificationChannel for WebhookNotificationChannel {
1733 fn send_notification(&self, alert: &Alert) -> Result<()> {
1734 let _payload = serde_json::to_value(alert)?;
1735
1736 tracing::info!(
1738 "Webhook notification sent for alert {}: {}",
1739 alert.id,
1740 alert.message
1741 );
1742 Ok(())
1743 }
1744
1745 fn get_channel_type(&self) -> String {
1746 "webhook".to_string()
1747 }
1748}
1749
1750#[cfg(test)]
1751mod tests {
1752 use super::*;
1753
1754 #[test]
1755 fn test_analytics_engine_creation() {
1756 let config = AnalyticsConfig::default();
1757 let engine = VectorAnalyticsEngine::new(config);
1758
1759 assert!(engine.config.enable_real_time);
1760 assert_eq!(engine.config.collection_interval, 1);
1761 }
1762
1763 #[test]
1764 fn test_query_recording() {
1765 let config = AnalyticsConfig::default();
1766 let engine = VectorAnalyticsEngine::new(config);
1767
1768 let result = engine.record_query_execution(
1769 "test_query_1".to_string(),
1770 "similarity_search".to_string(),
1771 Duration::from_millis(50),
1772 10,
1773 true,
1774 );
1775
1776 assert!(result.is_ok());
1777
1778 let metrics = engine.metrics_collector.query_metrics.read();
1779 assert_eq!(metrics.total_queries, 1);
1780 assert_eq!(metrics.successful_queries, 1);
1781 }
1782
1783 #[test]
1784 fn test_alert_creation() {
1785 let config = AnalyticsConfig::default();
1786 let engine = VectorAnalyticsEngine::new(config);
1787
1788 let result = engine.create_alert(
1789 AlertType::HighLatency,
1790 AlertSeverity::Warning,
1791 "Test alert message".to_string(),
1792 );
1793
1794 assert!(result.is_ok());
1795
1796 let current_alerts = engine.performance_monitor.current_alerts.read();
1797 assert_eq!(current_alerts.len(), 1);
1798 }
1799
1800 #[test]
1801 fn test_metrics_export() {
1802 let config = AnalyticsConfig::default();
1803 let engine = VectorAnalyticsEngine::new(config);
1804
1805 let _ = engine.record_query_execution(
1807 "test".to_string(),
1808 "search".to_string(),
1809 Duration::from_millis(25),
1810 5,
1811 true,
1812 );
1813
1814 let temp_file = "/tmp/test_metrics.json";
1816 let result = engine.export_metrics(ExportFormat::Json, temp_file);
1817 assert!(result.is_ok());
1818
1819 let _ = std::fs::remove_file(temp_file);
1821 }
1822}