1use anyhow::Result;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9use tokio::sync::broadcast;
10
11use crate::rta_aggregators::{
12 AlertManager, DashboardData, ExportFormat, MetricsCollector, OverviewData, PerformanceMonitor,
13 QueryAnalyzer, QueryMetrics, SystemMetrics,
14};
15
16pub struct VectorAnalyticsEngine {
18 pub(crate) config: AnalyticsConfig,
19 pub(crate) metrics_collector: Arc<MetricsCollector>,
20 pub(crate) performance_monitor: Arc<PerformanceMonitor>,
21 pub(crate) query_analyzer: Arc<QueryAnalyzer>,
22 pub(crate) alert_manager: Arc<AlertManager>,
23 pub(crate) dashboard_data: Arc<RwLock<DashboardData>>,
24 pub(crate) event_broadcaster: broadcast::Sender<AnalyticsEvent>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct AnalyticsConfig {
30 pub enable_real_time: bool,
32 pub collection_interval: u64,
34 pub max_metrics_history: usize,
36 pub enable_query_analysis: bool,
38 pub enable_alerts: bool,
40 pub dashboard_refresh_interval: u64,
42 pub enable_tracing: bool,
44 pub enable_profiling: bool,
46 pub retention_days: u32,
48}
49
50impl Default for AnalyticsConfig {
51 fn default() -> Self {
52 Self {
53 enable_real_time: true,
54 collection_interval: 1,
55 max_metrics_history: 10000,
56 enable_query_analysis: true,
57 enable_alerts: true,
58 dashboard_refresh_interval: 5,
59 enable_tracing: true,
60 enable_profiling: true,
61 retention_days: 30,
62 }
63 }
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub enum AnalyticsEvent {
69 QueryExecuted {
70 query_id: String,
71 operation_type: String,
72 duration: Duration,
73 result_count: usize,
74 success: bool,
75 timestamp: SystemTime,
76 },
77 IndexUpdated {
78 index_name: String,
79 operation: String,
80 vectors_affected: usize,
81 timestamp: SystemTime,
82 },
83 PerformanceAlert {
84 alert_type: AlertType,
85 message: String,
86 severity: AlertSeverity,
87 timestamp: SystemTime,
88 },
89 SystemMetric {
90 metric_name: String,
91 value: f64,
92 unit: String,
93 timestamp: SystemTime,
94 },
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub enum AlertType {
100 HighLatency,
101 LowThroughput,
102 HighMemoryUsage,
103 HighCpuUsage,
104 QualityDegradation,
105 IndexCorruption,
106 SystemError,
107 ResourceLimitReached,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum AlertSeverity {
113 Critical,
114 Warning,
115 Info,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct Alert {
121 pub id: String,
122 pub alert_type: AlertType,
123 pub severity: AlertSeverity,
124 pub message: String,
125 pub timestamp: SystemTime,
126 pub resolved: bool,
127 pub resolved_timestamp: Option<SystemTime>,
128 pub metadata: HashMap<String, String>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct AnalyticsReport {
134 pub report_id: String,
135 pub start_time: SystemTime,
136 pub end_time: SystemTime,
137 pub query_metrics: QueryMetrics,
138 pub system_metrics: SystemMetrics,
139 pub quality_metrics: crate::rta_aggregators::QualityMetrics,
140 pub alerts: Vec<Alert>,
141 pub recommendations: Vec<String>,
142 pub generated_at: SystemTime,
143}
144
145#[derive(Debug, Clone)]
147pub struct SystemInfo {
148 pub cpu_usage: f64,
149 pub memory_usage: f64,
150 pub memory_total: u64,
151 pub disk_usage: f64,
152 pub network_throughput: f64,
153}
154
155impl Clone for VectorAnalyticsEngine {
156 fn clone(&self) -> Self {
157 Self {
158 config: self.config.clone(),
159 metrics_collector: Arc::clone(&self.metrics_collector),
160 performance_monitor: Arc::clone(&self.performance_monitor),
161 query_analyzer: Arc::clone(&self.query_analyzer),
162 alert_manager: Arc::clone(&self.alert_manager),
163 dashboard_data: Arc::clone(&self.dashboard_data),
164 event_broadcaster: self.event_broadcaster.clone(),
165 }
166 }
167}
168
169impl VectorAnalyticsEngine {
170 pub fn new(config: AnalyticsConfig) -> Self {
171 let (event_broadcaster, _) = broadcast::channel(1000);
172
173 let metrics_collector = Arc::new(MetricsCollector::new());
174 let performance_monitor = Arc::new(PerformanceMonitor::new());
175 let query_analyzer = Arc::new(QueryAnalyzer::new());
176 let alert_manager = Arc::new(AlertManager::new(
177 crate::rta_aggregators::AlertConfig::default(),
178 ));
179 let dashboard_data = Arc::new(RwLock::new(DashboardData::default()));
180
181 Self {
182 config,
183 metrics_collector,
184 performance_monitor,
185 query_analyzer,
186 alert_manager,
187 dashboard_data,
188 event_broadcaster,
189 }
190 }
191
192 pub fn record_query_execution(
194 &self,
195 query_id: String,
196 operation_type: String,
197 duration: Duration,
198 result_count: usize,
199 success: bool,
200 ) -> Result<()> {
201 {
203 let mut metrics = self.metrics_collector.query_metrics.write();
204 metrics.total_queries += 1;
205
206 if success {
207 metrics.successful_queries += 1;
208 } else {
209 metrics.failed_queries += 1;
210 }
211
212 self.update_latency_statistics(&mut metrics, duration);
214
215 *metrics
217 .query_distribution
218 .entry(operation_type.clone())
219 .or_insert(0) += 1;
220
221 metrics.error_rate =
223 (metrics.failed_queries as f64) / (metrics.total_queries as f64) * 100.0;
224 }
225
226 self.check_performance_alerts(duration, success)?;
228
229 let event = AnalyticsEvent::QueryExecuted {
231 query_id,
232 operation_type,
233 duration,
234 result_count,
235 success,
236 timestamp: SystemTime::now(),
237 };
238
239 let _ = self.event_broadcaster.send(event);
240
241 Ok(())
242 }
243
244 pub(crate) fn update_latency_statistics(&self, metrics: &mut QueryMetrics, duration: Duration) {
245 let timestamp = SystemTime::now();
246
247 metrics.latency_history.push_back((timestamp, duration));
249 if metrics.latency_history.len() > self.config.max_metrics_history {
250 metrics.latency_history.pop_front();
251 }
252
253 let latencies: Vec<Duration> = metrics.latency_history.iter().map(|(_, d)| *d).collect();
255
256 if !latencies.is_empty() {
257 let mut sorted_latencies = latencies.clone();
258 sorted_latencies.sort();
259
260 let len = sorted_latencies.len();
261 metrics.p50_latency = sorted_latencies[len / 2];
262 metrics.p95_latency = sorted_latencies[(len as f64 * 0.95) as usize];
263 metrics.p99_latency = sorted_latencies[(len as f64 * 0.99) as usize];
264 metrics.max_latency = *sorted_latencies
265 .last()
266 .expect("sorted_latencies validated to be non-empty");
267 metrics.min_latency = *sorted_latencies
268 .first()
269 .expect("collection validated to be non-empty");
270
271 let total_duration: Duration = latencies.iter().sum();
272 metrics.average_latency = total_duration / len as u32;
273 }
274 }
275
276 pub(crate) fn check_performance_alerts(&self, duration: Duration, success: bool) -> Result<()> {
277 let thresholds = self.performance_monitor.thresholds.read();
278
279 if duration.as_millis() > thresholds.max_latency_ms as u128 {
281 self.create_alert(
282 AlertType::HighLatency,
283 AlertSeverity::Warning,
284 format!(
285 "Query latency {}ms exceeds threshold {}ms",
286 duration.as_millis(),
287 thresholds.max_latency_ms
288 ),
289 )?;
290 }
291
292 if !success {
294 let metrics = self.metrics_collector.query_metrics.read();
295 if metrics.error_rate > thresholds.max_error_rate_percent {
296 self.create_alert(
297 AlertType::SystemError,
298 AlertSeverity::Critical,
299 format!(
300 "Error rate {:.2}% exceeds threshold {:.2}%",
301 metrics.error_rate, thresholds.max_error_rate_percent
302 ),
303 )?;
304 }
305 }
306
307 Ok(())
308 }
309
310 pub(crate) fn create_alert(
311 &self,
312 alert_type: AlertType,
313 severity: AlertSeverity,
314 message: String,
315 ) -> Result<()> {
316 let alert_id = format!(
317 "{:?}_{}",
318 alert_type,
319 SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
320 );
321
322 let alert = Alert {
323 id: alert_id,
324 alert_type,
325 severity,
326 message,
327 timestamp: SystemTime::now(),
328 resolved: false,
329 resolved_timestamp: None,
330 metadata: HashMap::new(),
331 };
332
333 {
335 let mut current_alerts = self.performance_monitor.current_alerts.write();
336 current_alerts.insert(alert.id.clone(), alert.clone());
337
338 let mut alert_history = self.performance_monitor.alert_history.write();
339 alert_history.push_back(alert.clone());
340 if alert_history.len() > self.config.max_metrics_history {
341 alert_history.pop_front();
342 }
343 }
344
345 self.alert_manager.send_alert(&alert)?;
347
348 let event = AnalyticsEvent::PerformanceAlert {
350 alert_type: alert.alert_type.clone(),
351 message: alert.message.clone(),
352 severity: alert.severity.clone(),
353 timestamp: alert.timestamp,
354 };
355
356 let _ = self.event_broadcaster.send(event);
357
358 Ok(())
359 }
360
361 pub fn record_distributed_query(
363 &self,
364 query_id: String,
365 node_count: usize,
366 total_duration: Duration,
367 _federation_id: Option<String>,
368 success: bool,
369 ) -> Result<()> {
370 {
372 let mut metrics = self.metrics_collector.query_metrics.write();
373 metrics.total_queries += 1;
374
375 if success {
376 metrics.successful_queries += 1;
377 } else {
378 metrics.failed_queries += 1;
379 }
380
381 self.update_latency_statistics(&mut metrics, total_duration);
383
384 let operation_type = format!("distributed_query_{node_count}_nodes");
386 *metrics
387 .query_distribution
388 .entry(operation_type)
389 .or_insert(0) += 1;
390
391 metrics.error_rate = if metrics.total_queries > 0 {
393 metrics.failed_queries as f64 / metrics.total_queries as f64
394 } else {
395 0.0
396 };
397 }
398
399 let event = AnalyticsEvent::QueryExecuted {
401 query_id: query_id.clone(),
402 operation_type: format!("distributed_query_{node_count}_nodes"),
403 duration: total_duration,
404 result_count: node_count,
405 success,
406 timestamp: SystemTime::now(),
407 };
408
409 let _ = self.event_broadcaster.send(event);
410
411 if total_duration.as_millis() > 5000 {
413 let message = format!(
414 "Distributed query {} across {} nodes took {}ms",
415 query_id,
416 node_count,
417 total_duration.as_millis()
418 );
419
420 self.create_alert(AlertType::HighLatency, AlertSeverity::Warning, message)?;
421 }
422
423 Ok(())
424 }
425
426 pub fn update_system_metrics(
428 &self,
429 cpu_usage: f64,
430 memory_usage: f64,
431 memory_total: u64,
432 ) -> Result<()> {
433 {
434 let mut metrics = self.metrics_collector.system_metrics.write();
435 metrics.cpu_usage = cpu_usage;
436 metrics.memory_usage = memory_usage;
437 metrics.memory_total = memory_total;
438 metrics.memory_available =
439 memory_total - (memory_total as f64 * memory_usage / 100.0) as u64;
440 }
441
442 let thresholds = self.performance_monitor.thresholds.read();
444
445 if cpu_usage > thresholds.max_cpu_usage_percent {
446 self.create_alert(
447 AlertType::HighCpuUsage,
448 AlertSeverity::Warning,
449 format!(
450 "CPU usage {:.2}% exceeds threshold {:.2}%",
451 cpu_usage, thresholds.max_cpu_usage_percent
452 ),
453 )?;
454 }
455
456 if memory_usage > thresholds.max_memory_usage_percent {
457 self.create_alert(
458 AlertType::HighMemoryUsage,
459 AlertSeverity::Warning,
460 format!(
461 "Memory usage {:.2}% exceeds threshold {:.2}%",
462 memory_usage, thresholds.max_memory_usage_percent
463 ),
464 )?;
465 }
466
467 Ok(())
468 }
469
470 pub fn get_dashboard_data(&self) -> DashboardData {
472 self.dashboard_data.read().clone()
473 }
474
475 pub fn subscribe_to_events(&self) -> broadcast::Receiver<AnalyticsEvent> {
477 self.event_broadcaster.subscribe()
478 }
479
480 pub fn generate_report(
482 &self,
483 start_time: SystemTime,
484 end_time: SystemTime,
485 ) -> Result<AnalyticsReport> {
486 let query_metrics = self.metrics_collector.query_metrics.read().clone();
487 let system_metrics = self.metrics_collector.system_metrics.read().clone();
488 let quality_metrics = self.metrics_collector.quality_metrics.read().clone();
489
490 Ok(AnalyticsReport {
491 report_id: format!(
492 "report_{}",
493 SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
494 ),
495 start_time,
496 end_time,
497 query_metrics,
498 system_metrics,
499 quality_metrics,
500 alerts: self.get_alerts_in_range(start_time, end_time)?,
501 recommendations: self.generate_recommendations()?,
502 generated_at: SystemTime::now(),
503 })
504 }
505
506 fn get_alerts_in_range(
507 &self,
508 start_time: SystemTime,
509 end_time: SystemTime,
510 ) -> Result<Vec<Alert>> {
511 let alert_history = self.performance_monitor.alert_history.read();
512 Ok(alert_history
513 .iter()
514 .filter(|alert| alert.timestamp >= start_time && alert.timestamp <= end_time)
515 .cloned()
516 .collect())
517 }
518
519 fn generate_recommendations(&self) -> Result<Vec<String>> {
520 let mut recommendations = Vec::new();
521
522 let query_metrics = self.metrics_collector.query_metrics.read();
523 let system_metrics = self.metrics_collector.system_metrics.read();
524
525 if query_metrics.average_latency.as_millis() > 50 {
527 recommendations
528 .push("Consider optimizing queries or adding more powerful hardware".to_string());
529 }
530
531 if system_metrics.memory_usage > 80.0 {
532 recommendations.push(
533 "Memory usage is high. Consider increasing memory or optimizing data structures"
534 .to_string(),
535 );
536 }
537
538 if system_metrics.cache_hit_ratio < 0.8 {
539 recommendations.push("Cache hit ratio is low. Consider increasing cache size or improving cache strategy".to_string());
540 }
541
542 Ok(recommendations)
543 }
544
545 pub fn export_metrics(&self, format: ExportFormat, destination: &str) -> Result<()> {
547 let metrics_data = self.collect_all_metrics()?;
548
549 match format {
550 ExportFormat::Json => self.export_as_json(&metrics_data, destination),
551 ExportFormat::Csv => self.export_as_csv(&metrics_data, destination),
552 ExportFormat::Prometheus => self.export_as_prometheus(&metrics_data, destination),
553 ExportFormat::InfluxDb => self.export_as_influxdb(&metrics_data, destination),
554 }
555 }
556
557 fn collect_all_metrics(&self) -> Result<HashMap<String, serde_json::Value>> {
558 let mut all_metrics = HashMap::new();
559
560 let query_metrics = self.metrics_collector.query_metrics.read();
561 let system_metrics = self.metrics_collector.system_metrics.read();
562 let quality_metrics = self.metrics_collector.quality_metrics.read();
563
564 all_metrics.insert(
565 "query_metrics".to_string(),
566 serde_json::to_value(&*query_metrics)?,
567 );
568 all_metrics.insert(
569 "system_metrics".to_string(),
570 serde_json::to_value(&*system_metrics)?,
571 );
572 all_metrics.insert(
573 "quality_metrics".to_string(),
574 serde_json::to_value(&*quality_metrics)?,
575 );
576
577 Ok(all_metrics)
578 }
579
580 fn export_as_json(
581 &self,
582 metrics: &HashMap<String, serde_json::Value>,
583 destination: &str,
584 ) -> Result<()> {
585 let json_data = serde_json::to_string_pretty(metrics)?;
586 std::fs::write(destination, json_data)?;
587 Ok(())
588 }
589
590 fn export_as_csv(
591 &self,
592 metrics: &HashMap<String, serde_json::Value>,
593 destination: &str,
594 ) -> Result<()> {
595 let mut csv_content = String::new();
596 csv_content.push_str("timestamp,metric_name,value,category\n");
597
598 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S");
599
600 if let Some(query_metrics) = metrics.get("query_metrics") {
602 if let Some(obj) = query_metrics.as_object() {
603 for (key, value) in obj {
604 if let Some(num_val) = value.as_f64() {
605 csv_content.push_str(&format!("{timestamp},query_{key},{num_val},query\n"));
606 }
607 }
608 }
609 }
610
611 if let Some(system_metrics) = metrics.get("system_metrics") {
613 if let Some(obj) = system_metrics.as_object() {
614 for (key, value) in obj {
615 if let Some(num_val) = value.as_f64() {
616 csv_content
617 .push_str(&format!("{timestamp},system_{key},{num_val},system\n"));
618 }
619 }
620 }
621 }
622
623 std::fs::write(destination, csv_content)?;
624 Ok(())
625 }
626
627 fn export_as_prometheus(
628 &self,
629 metrics: &HashMap<String, serde_json::Value>,
630 destination: &str,
631 ) -> Result<()> {
632 let mut prometheus_content = String::new();
633 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
634
635 if let Some(query_metrics) = metrics.get("query_metrics") {
637 if let Some(obj) = query_metrics.as_object() {
638 for (key, value) in obj {
639 if let Some(num_val) = value.as_f64() {
640 prometheus_content
641 .push_str(&format!("# HELP vector_query_{key} Query metric {key}\n"));
642 prometheus_content.push_str(&format!("# TYPE vector_query_{key} gauge\n"));
643 prometheus_content
644 .push_str(&format!("vector_query_{key} {num_val} {timestamp}\n"));
645 }
646 }
647 }
648 }
649
650 if let Some(system_metrics) = metrics.get("system_metrics") {
652 if let Some(obj) = system_metrics.as_object() {
653 for (key, value) in obj {
654 if let Some(num_val) = value.as_f64() {
655 prometheus_content
656 .push_str(&format!("# HELP vector_system_{key} System metric {key}\n"));
657 prometheus_content.push_str(&format!("# TYPE vector_system_{key} gauge\n"));
658 prometheus_content
659 .push_str(&format!("vector_system_{key} {num_val} {timestamp}\n"));
660 }
661 }
662 }
663 }
664
665 std::fs::write(destination, prometheus_content)?;
666 Ok(())
667 }
668
669 fn export_as_influxdb(
670 &self,
671 metrics: &HashMap<String, serde_json::Value>,
672 destination: &str,
673 ) -> Result<()> {
674 let mut influxdb_content = String::new();
675 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
676
677 if let Some(query_metrics) = metrics.get("query_metrics") {
679 if let Some(obj) = query_metrics.as_object() {
680 for (key, value) in obj {
681 if let Some(num_val) = value.as_f64() {
682 influxdb_content.push_str(&format!(
683 "vector_query,type=query {key}={num_val} {timestamp}\n"
684 ));
685 }
686 }
687 }
688 }
689
690 if let Some(system_metrics) = metrics.get("system_metrics") {
692 if let Some(obj) = system_metrics.as_object() {
693 for (key, value) in obj {
694 if let Some(num_val) = value.as_f64() {
695 influxdb_content.push_str(&format!(
696 "vector_system,type=system {key}={num_val} {timestamp}\n"
697 ));
698 }
699 }
700 }
701 }
702
703 std::fs::write(destination, influxdb_content)?;
704 Ok(())
705 }
706
707 pub async fn start_dashboard_updates(&self) -> Result<()> {
709 let dashboard_data = Arc::clone(&self.dashboard_data);
710 let metrics_collector = Arc::clone(&self.metrics_collector);
711 let performance_monitor = Arc::clone(&self.performance_monitor);
712 let refresh_interval = Duration::from_secs(self.config.dashboard_refresh_interval);
713
714 tokio::spawn(async move {
715 let mut interval = tokio::time::interval(refresh_interval);
716
717 loop {
718 interval.tick().await;
719
720 let updated_data =
722 Self::build_dashboard_data(&metrics_collector, &performance_monitor).await;
723
724 {
725 let mut data = dashboard_data.write();
726 *data = updated_data;
727 }
728 }
729 });
730
731 Ok(())
732 }
733
734 async fn build_dashboard_data(
735 metrics_collector: &MetricsCollector,
736 performance_monitor: &PerformanceMonitor,
737 ) -> DashboardData {
738 use crate::rta_aggregators::{
739 QualityMetricsData, QueryPerformanceData, SystemHealthData, UsageAnalyticsData,
740 };
741
742 let query_metrics = metrics_collector.query_metrics.read().clone();
743 let system_metrics = metrics_collector.system_metrics.read().clone();
744 let quality_metrics = metrics_collector.quality_metrics.read().clone();
745 let current_alerts: Vec<Alert> = performance_monitor
746 .current_alerts
747 .read()
748 .values()
749 .cloned()
750 .collect();
751
752 let health_score = Self::calculate_health_score(&system_metrics, &query_metrics);
754
755 let current_qps = Self::calculate_current_qps(&query_metrics);
757
758 DashboardData {
759 overview: OverviewData {
760 total_queries_today: query_metrics.total_queries,
761 average_latency: query_metrics.average_latency,
762 current_qps,
763 system_health_score: health_score,
764 active_alerts: current_alerts.len() as u64,
765 index_size: system_metrics.index_size,
766 vector_count: system_metrics.vector_count,
767 cache_hit_ratio: system_metrics.cache_hit_ratio,
768 },
769 query_performance: QueryPerformanceData {
770 latency_trends: query_metrics.latency_history.iter().cloned().collect(),
771 throughput_trends: query_metrics.throughput_history.iter().cloned().collect(),
772 error_rate_trends: vec![(SystemTime::now(), query_metrics.error_rate)],
773 top_slow_queries: vec![], query_distribution: query_metrics.query_distribution.clone(),
775 performance_percentiles: {
776 let mut percentiles = HashMap::new();
777 percentiles.insert("p50".to_string(), query_metrics.p50_latency);
778 percentiles.insert("p95".to_string(), query_metrics.p95_latency);
779 percentiles.insert("p99".to_string(), query_metrics.p99_latency);
780 percentiles
781 },
782 },
783 system_health: SystemHealthData {
784 cpu_usage: system_metrics.cpu_usage,
785 memory_usage: system_metrics.memory_usage,
786 disk_usage: system_metrics.disk_usage,
787 network_throughput: 0.0, resource_trends: vec![(SystemTime::now(), system_metrics.cpu_usage)],
789 capacity_forecast: vec![], bottlenecks: Self::identify_bottlenecks(&system_metrics, &query_metrics),
791 },
792 quality_metrics: QualityMetricsData {
793 recall_trends: vec![],
794 precision_trends: vec![],
795 similarity_distribution: quality_metrics.similarity_distribution.clone(),
796 quality_score: quality_metrics.average_similarity_score,
797 quality_trends: vec![(SystemTime::now(), quality_metrics.average_similarity_score)],
798 benchmark_comparisons: HashMap::new(),
799 },
800 usage_analytics: UsageAnalyticsData {
801 user_activity: vec![(SystemTime::now(), query_metrics.total_queries)],
802 popular_queries: vec![], usage_patterns: HashMap::new(),
804 growth_metrics: crate::rta_aggregators::GrowthMetrics::default(),
805 feature_usage: HashMap::new(),
806 },
807 alerts: current_alerts,
808 last_updated: SystemTime::now(),
809 }
810 }
811
812 fn calculate_health_score(system_metrics: &SystemMetrics, query_metrics: &QueryMetrics) -> f64 {
813 let mut score = 100.0;
814
815 if system_metrics.cpu_usage > 80.0 {
817 score -= (system_metrics.cpu_usage - 80.0) * 0.5;
818 }
819 if system_metrics.memory_usage > 80.0 {
820 score -= (system_metrics.memory_usage - 80.0) * 0.5;
821 }
822
823 if query_metrics.error_rate > 1.0 {
825 score -= query_metrics.error_rate * 10.0;
826 }
827
828 if query_metrics.average_latency.as_millis() > 100 {
830 score -= (query_metrics.average_latency.as_millis() as f64 - 100.0) * 0.1;
831 }
832
833 score.clamp(0.0, 100.0)
834 }
835
836 fn calculate_current_qps(query_metrics: &QueryMetrics) -> f64 {
837 if query_metrics.latency_history.len() < 2 {
839 return 0.0;
840 }
841
842 let now = SystemTime::now();
843 let one_second_ago = now - Duration::from_secs(1);
844
845 let recent_queries = query_metrics
846 .latency_history
847 .iter()
848 .filter(|(timestamp, _)| *timestamp >= one_second_ago)
849 .count();
850
851 recent_queries as f64
852 }
853
854 fn identify_bottlenecks(
855 system_metrics: &SystemMetrics,
856 query_metrics: &QueryMetrics,
857 ) -> Vec<String> {
858 let mut bottlenecks = Vec::new();
859
860 if system_metrics.cpu_usage > 90.0 {
861 bottlenecks.push("High CPU usage".to_string());
862 }
863
864 if system_metrics.memory_usage > 90.0 {
865 bottlenecks.push("High memory usage".to_string());
866 }
867
868 if query_metrics.average_latency.as_millis() > 500 {
869 bottlenecks.push("High query latency".to_string());
870 }
871
872 if system_metrics.cache_hit_ratio < 0.7 {
873 bottlenecks.push("Low cache hit ratio".to_string());
874 }
875
876 bottlenecks
877 }
878
879 pub fn generate_dashboard_html(&self) -> Result<String> {
881 use crate::rta_aggregators::format_alerts;
882
883 let dashboard_data = self.get_dashboard_data();
884
885 let html = format!(
886 r#"
887<!DOCTYPE html>
888<html>
889<head>
890 <title>OxiRS Vector Search Analytics Dashboard</title>
891 <style>
892 body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
893 .dashboard {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }}
894 .card {{ background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
895 .metric {{ display: flex; justify-content: space-between; margin: 10px 0; }}
896 .metric-value {{ font-weight: bold; color: #007acc; }}
897 .alert {{ padding: 10px; margin: 5px 0; border-radius: 4px; }}
898 .alert-critical {{ background-color: #ffebee; border-left: 4px solid #f44336; }}
899 .alert-warning {{ background-color: #fff3e0; border-left: 4px solid #ff9800; }}
900 .alert-info {{ background-color: #e3f2fd; border-left: 4px solid #2196f3; }}
901 .health-score {{ font-size: 2em; text-align: center; }}
902 .health-good {{ color: #4caf50; }}
903 .health-warning {{ color: #ff9800; }}
904 .health-critical {{ color: #f44336; }}
905 h1 {{ color: #333; text-align: center; }}
906 h2 {{ color: #555; margin-top: 0; }}
907 .refresh-time {{ text-align: center; color: #888; font-size: 0.9em; }}
908 </style>
909 <script>
910 function refreshPage() {{
911 window.location.reload();
912 }}
913 setInterval(refreshPage, 30000); // Refresh every 30 seconds
914 </script>
915</head>
916<body>
917 <h1>OxiRS Vector Search Analytics Dashboard</h1>
918 <p class="refresh-time">Last updated: {}</p>
919
920 <div class="dashboard">
921 <div class="card">
922 <h2>System Health</h2>
923 <div class="health-score {}">{:.1}%</div>
924 <div class="metric">
925 <span>Active Alerts:</span>
926 <span class="metric-value">{}</span>
927 </div>
928 </div>
929
930 <div class="card">
931 <h2>Query Performance</h2>
932 <div class="metric">
933 <span>Total Queries Today:</span>
934 <span class="metric-value">{}</span>
935 </div>
936 <div class="metric">
937 <span>Average Latency:</span>
938 <span class="metric-value">{}ms</span>
939 </div>
940 <div class="metric">
941 <span>Current QPS:</span>
942 <span class="metric-value">{:.1}</span>
943 </div>
944 </div>
945
946 <div class="card">
947 <h2>System Resources</h2>
948 <div class="metric">
949 <span>CPU Usage:</span>
950 <span class="metric-value">{:.1}%</span>
951 </div>
952 <div class="metric">
953 <span>Memory Usage:</span>
954 <span class="metric-value">{:.1}%</span>
955 </div>
956 <div class="metric">
957 <span>Cache Hit Ratio:</span>
958 <span class="metric-value">{:.1}%</span>
959 </div>
960 </div>
961
962 <div class="card">
963 <h2>Vector Index</h2>
964 <div class="metric">
965 <span>Vector Count:</span>
966 <span class="metric-value">{}</span>
967 </div>
968 <div class="metric">
969 <span>Index Size:</span>
970 <span class="metric-value">{} MB</span>
971 </div>
972 </div>
973
974 <div class="card">
975 <h2>Active Alerts</h2>
976 {}
977 </div>
978 </div>
979</body>
980</html>
981 "#,
982 chrono::DateTime::<chrono::Utc>::from(dashboard_data.last_updated)
983 .format("%Y-%m-%d %H:%M:%S UTC"),
984 if dashboard_data.overview.system_health_score >= 80.0 {
985 "health-good"
986 } else if dashboard_data.overview.system_health_score >= 60.0 {
987 "health-warning"
988 } else {
989 "health-critical"
990 },
991 dashboard_data.overview.system_health_score,
992 dashboard_data.overview.active_alerts,
993 dashboard_data.overview.total_queries_today,
994 dashboard_data.overview.average_latency.as_millis(),
995 dashboard_data.overview.current_qps,
996 dashboard_data.system_health.cpu_usage,
997 dashboard_data.system_health.memory_usage,
998 dashboard_data.overview.cache_hit_ratio * 100.0,
999 dashboard_data.overview.vector_count,
1000 dashboard_data.overview.index_size / (1024 * 1024), format_alerts(&dashboard_data.alerts)
1002 );
1003
1004 Ok(html)
1005 }
1006
1007 pub async fn start_system_monitoring(&self) -> Result<()> {
1009 let analytics_engine = self.clone();
1010
1011 tokio::spawn(async move {
1012 let mut interval = tokio::time::interval(Duration::from_secs(5));
1013
1014 loop {
1015 interval.tick().await;
1016
1017 if let Ok(system_info) = Self::collect_system_info().await {
1019 let _ = analytics_engine.update_system_metrics(
1020 system_info.cpu_usage,
1021 system_info.memory_usage,
1022 system_info.memory_total,
1023 );
1024 }
1025 }
1026 });
1027
1028 Ok(())
1029 }
1030
1031 async fn collect_system_info() -> Result<SystemInfo> {
1032 Ok(SystemInfo {
1035 cpu_usage: {
1036 #[allow(unused_imports)]
1037 use scirs2_core::random::{Random, Rng};
1038 let mut rng = Random::seed(42);
1039 45.0 + (rng.gen_range(0.0..1.0) * 20.0) },
1041 memory_usage: {
1042 #[allow(unused_imports)]
1043 use scirs2_core::random::{Random, Rng};
1044 let mut rng = Random::seed(42);
1045 60.0 + (rng.gen_range(0.0..1.0) * 20.0) },
1047 memory_total: 16 * 1024 * 1024 * 1024, disk_usage: 30.0,
1049 network_throughput: 100.0,
1050 })
1051 }
1052}