Skip to main content

oxirs_vec/
rta_engine.rs

1//! Real-time analytics engine and core event/metric types.
2
3use anyhow::Result;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9use tokio::sync::broadcast;
10
11use crate::rta_aggregators::{
12    AlertManager, DashboardData, ExportFormat, MetricsCollector, OverviewData, PerformanceMonitor,
13    QueryAnalyzer, QueryMetrics, SystemMetrics,
14};
15
16/// Real-time analytics engine for vector operations
17pub struct VectorAnalyticsEngine {
18    pub(crate) config: AnalyticsConfig,
19    pub(crate) metrics_collector: Arc<MetricsCollector>,
20    pub(crate) performance_monitor: Arc<PerformanceMonitor>,
21    pub(crate) query_analyzer: Arc<QueryAnalyzer>,
22    pub(crate) alert_manager: Arc<AlertManager>,
23    pub(crate) dashboard_data: Arc<RwLock<DashboardData>>,
24    pub(crate) event_broadcaster: broadcast::Sender<AnalyticsEvent>,
25}
26
27/// Configuration for analytics engine
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct AnalyticsConfig {
30    /// Enable real-time monitoring
31    pub enable_real_time: bool,
32    /// Metrics collection interval in seconds
33    pub collection_interval: u64,
34    /// Maximum number of metrics to retain in memory
35    pub max_metrics_history: usize,
36    /// Enable query pattern analysis
37    pub enable_query_analysis: bool,
38    /// Enable performance alerting
39    pub enable_alerts: bool,
40    /// Dashboard refresh interval in seconds
41    pub dashboard_refresh_interval: u64,
42    /// Enable detailed tracing
43    pub enable_tracing: bool,
44    /// Enable performance profiling
45    pub enable_profiling: bool,
46    /// Metrics retention period in days
47    pub retention_days: u32,
48}
49
50impl Default for AnalyticsConfig {
51    fn default() -> Self {
52        Self {
53            enable_real_time: true,
54            collection_interval: 1,
55            max_metrics_history: 10000,
56            enable_query_analysis: true,
57            enable_alerts: true,
58            dashboard_refresh_interval: 5,
59            enable_tracing: true,
60            enable_profiling: true,
61            retention_days: 30,
62        }
63    }
64}
65
66/// Analytics event types
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub enum AnalyticsEvent {
69    QueryExecuted {
70        query_id: String,
71        operation_type: String,
72        duration: Duration,
73        result_count: usize,
74        success: bool,
75        timestamp: SystemTime,
76    },
77    IndexUpdated {
78        index_name: String,
79        operation: String,
80        vectors_affected: usize,
81        timestamp: SystemTime,
82    },
83    PerformanceAlert {
84        alert_type: AlertType,
85        message: String,
86        severity: AlertSeverity,
87        timestamp: SystemTime,
88    },
89    SystemMetric {
90        metric_name: String,
91        value: f64,
92        unit: String,
93        timestamp: SystemTime,
94    },
95}
96
97/// Alert types for performance monitoring
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub enum AlertType {
100    HighLatency,
101    LowThroughput,
102    HighMemoryUsage,
103    HighCpuUsage,
104    QualityDegradation,
105    IndexCorruption,
106    SystemError,
107    ResourceLimitReached,
108}
109
110/// Alert severity levels
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub enum AlertSeverity {
113    Critical,
114    Warning,
115    Info,
116}
117
118/// Alert information
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct Alert {
121    pub id: String,
122    pub alert_type: AlertType,
123    pub severity: AlertSeverity,
124    pub message: String,
125    pub timestamp: SystemTime,
126    pub resolved: bool,
127    pub resolved_timestamp: Option<SystemTime>,
128    pub metadata: HashMap<String, String>,
129}
130
131/// Analytics report structure
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct AnalyticsReport {
134    pub report_id: String,
135    pub start_time: SystemTime,
136    pub end_time: SystemTime,
137    pub query_metrics: QueryMetrics,
138    pub system_metrics: SystemMetrics,
139    pub quality_metrics: crate::rta_aggregators::QualityMetrics,
140    pub alerts: Vec<Alert>,
141    pub recommendations: Vec<String>,
142    pub generated_at: SystemTime,
143}
144
145/// System information structure
146#[derive(Debug, Clone)]
147pub struct SystemInfo {
148    pub cpu_usage: f64,
149    pub memory_usage: f64,
150    pub memory_total: u64,
151    pub disk_usage: f64,
152    pub network_throughput: f64,
153}
154
155impl Clone for VectorAnalyticsEngine {
156    fn clone(&self) -> Self {
157        Self {
158            config: self.config.clone(),
159            metrics_collector: Arc::clone(&self.metrics_collector),
160            performance_monitor: Arc::clone(&self.performance_monitor),
161            query_analyzer: Arc::clone(&self.query_analyzer),
162            alert_manager: Arc::clone(&self.alert_manager),
163            dashboard_data: Arc::clone(&self.dashboard_data),
164            event_broadcaster: self.event_broadcaster.clone(),
165        }
166    }
167}
168
169impl VectorAnalyticsEngine {
170    pub fn new(config: AnalyticsConfig) -> Self {
171        let (event_broadcaster, _) = broadcast::channel(1000);
172
173        let metrics_collector = Arc::new(MetricsCollector::new());
174        let performance_monitor = Arc::new(PerformanceMonitor::new());
175        let query_analyzer = Arc::new(QueryAnalyzer::new());
176        let alert_manager = Arc::new(AlertManager::new(
177            crate::rta_aggregators::AlertConfig::default(),
178        ));
179        let dashboard_data = Arc::new(RwLock::new(DashboardData::default()));
180
181        Self {
182            config,
183            metrics_collector,
184            performance_monitor,
185            query_analyzer,
186            alert_manager,
187            dashboard_data,
188            event_broadcaster,
189        }
190    }
191
192    /// Record a query execution for analytics
193    pub fn record_query_execution(
194        &self,
195        query_id: String,
196        operation_type: String,
197        duration: Duration,
198        result_count: usize,
199        success: bool,
200    ) -> Result<()> {
201        // Update metrics
202        {
203            let mut metrics = self.metrics_collector.query_metrics.write();
204            metrics.total_queries += 1;
205
206            if success {
207                metrics.successful_queries += 1;
208            } else {
209                metrics.failed_queries += 1;
210            }
211
212            // Update latency statistics
213            self.update_latency_statistics(&mut metrics, duration);
214
215            // Update query distribution
216            *metrics
217                .query_distribution
218                .entry(operation_type.clone())
219                .or_insert(0) += 1;
220
221            // Update error rate
222            metrics.error_rate =
223                (metrics.failed_queries as f64) / (metrics.total_queries as f64) * 100.0;
224        }
225
226        // Check for alerts
227        self.check_performance_alerts(duration, success)?;
228
229        // Broadcast event
230        let event = AnalyticsEvent::QueryExecuted {
231            query_id,
232            operation_type,
233            duration,
234            result_count,
235            success,
236            timestamp: SystemTime::now(),
237        };
238
239        let _ = self.event_broadcaster.send(event);
240
241        Ok(())
242    }
243
244    pub(crate) fn update_latency_statistics(&self, metrics: &mut QueryMetrics, duration: Duration) {
245        let timestamp = SystemTime::now();
246
247        // Add to history
248        metrics.latency_history.push_back((timestamp, duration));
249        if metrics.latency_history.len() > self.config.max_metrics_history {
250            metrics.latency_history.pop_front();
251        }
252
253        // Update running averages and percentiles
254        let latencies: Vec<Duration> = metrics.latency_history.iter().map(|(_, d)| *d).collect();
255
256        if !latencies.is_empty() {
257            let mut sorted_latencies = latencies.clone();
258            sorted_latencies.sort();
259
260            let len = sorted_latencies.len();
261            metrics.p50_latency = sorted_latencies[len / 2];
262            metrics.p95_latency = sorted_latencies[(len as f64 * 0.95) as usize];
263            metrics.p99_latency = sorted_latencies[(len as f64 * 0.99) as usize];
264            metrics.max_latency = *sorted_latencies
265                .last()
266                .expect("sorted_latencies validated to be non-empty");
267            metrics.min_latency = *sorted_latencies
268                .first()
269                .expect("collection validated to be non-empty");
270
271            let total_duration: Duration = latencies.iter().sum();
272            metrics.average_latency = total_duration / len as u32;
273        }
274    }
275
276    pub(crate) fn check_performance_alerts(&self, duration: Duration, success: bool) -> Result<()> {
277        let thresholds = self.performance_monitor.thresholds.read();
278
279        // Check latency threshold
280        if duration.as_millis() > thresholds.max_latency_ms as u128 {
281            self.create_alert(
282                AlertType::HighLatency,
283                AlertSeverity::Warning,
284                format!(
285                    "Query latency {}ms exceeds threshold {}ms",
286                    duration.as_millis(),
287                    thresholds.max_latency_ms
288                ),
289            )?;
290        }
291
292        // Check error rate
293        if !success {
294            let metrics = self.metrics_collector.query_metrics.read();
295            if metrics.error_rate > thresholds.max_error_rate_percent {
296                self.create_alert(
297                    AlertType::SystemError,
298                    AlertSeverity::Critical,
299                    format!(
300                        "Error rate {:.2}% exceeds threshold {:.2}%",
301                        metrics.error_rate, thresholds.max_error_rate_percent
302                    ),
303                )?;
304            }
305        }
306
307        Ok(())
308    }
309
310    pub(crate) fn create_alert(
311        &self,
312        alert_type: AlertType,
313        severity: AlertSeverity,
314        message: String,
315    ) -> Result<()> {
316        let alert_id = format!(
317            "{:?}_{}",
318            alert_type,
319            SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
320        );
321
322        let alert = Alert {
323            id: alert_id,
324            alert_type,
325            severity,
326            message,
327            timestamp: SystemTime::now(),
328            resolved: false,
329            resolved_timestamp: None,
330            metadata: HashMap::new(),
331        };
332
333        // Store alert
334        {
335            let mut current_alerts = self.performance_monitor.current_alerts.write();
336            current_alerts.insert(alert.id.clone(), alert.clone());
337
338            let mut alert_history = self.performance_monitor.alert_history.write();
339            alert_history.push_back(alert.clone());
340            if alert_history.len() > self.config.max_metrics_history {
341                alert_history.pop_front();
342            }
343        }
344
345        // Send notifications
346        self.alert_manager.send_alert(&alert)?;
347
348        // Broadcast event
349        let event = AnalyticsEvent::PerformanceAlert {
350            alert_type: alert.alert_type.clone(),
351            message: alert.message.clone(),
352            severity: alert.severity.clone(),
353            timestamp: alert.timestamp,
354        };
355
356        let _ = self.event_broadcaster.send(event);
357
358        Ok(())
359    }
360
361    /// Record distributed query execution across multiple nodes
362    pub fn record_distributed_query(
363        &self,
364        query_id: String,
365        node_count: usize,
366        total_duration: Duration,
367        _federation_id: Option<String>,
368        success: bool,
369    ) -> Result<()> {
370        // Update distributed query metrics
371        {
372            let mut metrics = self.metrics_collector.query_metrics.write();
373            metrics.total_queries += 1;
374
375            if success {
376                metrics.successful_queries += 1;
377            } else {
378                metrics.failed_queries += 1;
379            }
380
381            // Update latency statistics for distributed queries
382            self.update_latency_statistics(&mut metrics, total_duration);
383
384            // Update distributed query distribution
385            let operation_type = format!("distributed_query_{node_count}_nodes");
386            *metrics
387                .query_distribution
388                .entry(operation_type)
389                .or_insert(0) += 1;
390
391            // Update error rate
392            metrics.error_rate = if metrics.total_queries > 0 {
393                metrics.failed_queries as f64 / metrics.total_queries as f64
394            } else {
395                0.0
396            };
397        }
398
399        // Record distributed query event
400        let event = AnalyticsEvent::QueryExecuted {
401            query_id: query_id.clone(),
402            operation_type: format!("distributed_query_{node_count}_nodes"),
403            duration: total_duration,
404            result_count: node_count,
405            success,
406            timestamp: SystemTime::now(),
407        };
408
409        let _ = self.event_broadcaster.send(event);
410
411        // Generate alert if distributed query is taking too long
412        if total_duration.as_millis() > 5000 {
413            let message = format!(
414                "Distributed query {} across {} nodes took {}ms",
415                query_id,
416                node_count,
417                total_duration.as_millis()
418            );
419
420            self.create_alert(AlertType::HighLatency, AlertSeverity::Warning, message)?;
421        }
422
423        Ok(())
424    }
425
426    /// Update system metrics
427    pub fn update_system_metrics(
428        &self,
429        cpu_usage: f64,
430        memory_usage: f64,
431        memory_total: u64,
432    ) -> Result<()> {
433        {
434            let mut metrics = self.metrics_collector.system_metrics.write();
435            metrics.cpu_usage = cpu_usage;
436            metrics.memory_usage = memory_usage;
437            metrics.memory_total = memory_total;
438            metrics.memory_available =
439                memory_total - (memory_total as f64 * memory_usage / 100.0) as u64;
440        }
441
442        // Check system alerts
443        let thresholds = self.performance_monitor.thresholds.read();
444
445        if cpu_usage > thresholds.max_cpu_usage_percent {
446            self.create_alert(
447                AlertType::HighCpuUsage,
448                AlertSeverity::Warning,
449                format!(
450                    "CPU usage {:.2}% exceeds threshold {:.2}%",
451                    cpu_usage, thresholds.max_cpu_usage_percent
452                ),
453            )?;
454        }
455
456        if memory_usage > thresholds.max_memory_usage_percent {
457            self.create_alert(
458                AlertType::HighMemoryUsage,
459                AlertSeverity::Warning,
460                format!(
461                    "Memory usage {:.2}% exceeds threshold {:.2}%",
462                    memory_usage, thresholds.max_memory_usage_percent
463                ),
464            )?;
465        }
466
467        Ok(())
468    }
469
470    /// Get current dashboard data
471    pub fn get_dashboard_data(&self) -> DashboardData {
472        self.dashboard_data.read().clone()
473    }
474
475    /// Subscribe to analytics events
476    pub fn subscribe_to_events(&self) -> broadcast::Receiver<AnalyticsEvent> {
477        self.event_broadcaster.subscribe()
478    }
479
480    /// Generate analytics report
481    pub fn generate_report(
482        &self,
483        start_time: SystemTime,
484        end_time: SystemTime,
485    ) -> Result<AnalyticsReport> {
486        let query_metrics = self.metrics_collector.query_metrics.read().clone();
487        let system_metrics = self.metrics_collector.system_metrics.read().clone();
488        let quality_metrics = self.metrics_collector.quality_metrics.read().clone();
489
490        Ok(AnalyticsReport {
491            report_id: format!(
492                "report_{}",
493                SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
494            ),
495            start_time,
496            end_time,
497            query_metrics,
498            system_metrics,
499            quality_metrics,
500            alerts: self.get_alerts_in_range(start_time, end_time)?,
501            recommendations: self.generate_recommendations()?,
502            generated_at: SystemTime::now(),
503        })
504    }
505
506    fn get_alerts_in_range(
507        &self,
508        start_time: SystemTime,
509        end_time: SystemTime,
510    ) -> Result<Vec<Alert>> {
511        let alert_history = self.performance_monitor.alert_history.read();
512        Ok(alert_history
513            .iter()
514            .filter(|alert| alert.timestamp >= start_time && alert.timestamp <= end_time)
515            .cloned()
516            .collect())
517    }
518
519    fn generate_recommendations(&self) -> Result<Vec<String>> {
520        let mut recommendations = Vec::new();
521
522        let query_metrics = self.metrics_collector.query_metrics.read();
523        let system_metrics = self.metrics_collector.system_metrics.read();
524
525        // Performance recommendations
526        if query_metrics.average_latency.as_millis() > 50 {
527            recommendations
528                .push("Consider optimizing queries or adding more powerful hardware".to_string());
529        }
530
531        if system_metrics.memory_usage > 80.0 {
532            recommendations.push(
533                "Memory usage is high. Consider increasing memory or optimizing data structures"
534                    .to_string(),
535            );
536        }
537
538        if system_metrics.cache_hit_ratio < 0.8 {
539            recommendations.push("Cache hit ratio is low. Consider increasing cache size or improving cache strategy".to_string());
540        }
541
542        Ok(recommendations)
543    }
544
545    /// Export metrics to external systems
546    pub fn export_metrics(&self, format: ExportFormat, destination: &str) -> Result<()> {
547        let metrics_data = self.collect_all_metrics()?;
548
549        match format {
550            ExportFormat::Json => self.export_as_json(&metrics_data, destination),
551            ExportFormat::Csv => self.export_as_csv(&metrics_data, destination),
552            ExportFormat::Prometheus => self.export_as_prometheus(&metrics_data, destination),
553            ExportFormat::InfluxDb => self.export_as_influxdb(&metrics_data, destination),
554        }
555    }
556
557    fn collect_all_metrics(&self) -> Result<HashMap<String, serde_json::Value>> {
558        let mut all_metrics = HashMap::new();
559
560        let query_metrics = self.metrics_collector.query_metrics.read();
561        let system_metrics = self.metrics_collector.system_metrics.read();
562        let quality_metrics = self.metrics_collector.quality_metrics.read();
563
564        all_metrics.insert(
565            "query_metrics".to_string(),
566            serde_json::to_value(&*query_metrics)?,
567        );
568        all_metrics.insert(
569            "system_metrics".to_string(),
570            serde_json::to_value(&*system_metrics)?,
571        );
572        all_metrics.insert(
573            "quality_metrics".to_string(),
574            serde_json::to_value(&*quality_metrics)?,
575        );
576
577        Ok(all_metrics)
578    }
579
580    fn export_as_json(
581        &self,
582        metrics: &HashMap<String, serde_json::Value>,
583        destination: &str,
584    ) -> Result<()> {
585        let json_data = serde_json::to_string_pretty(metrics)?;
586        std::fs::write(destination, json_data)?;
587        Ok(())
588    }
589
590    fn export_as_csv(
591        &self,
592        metrics: &HashMap<String, serde_json::Value>,
593        destination: &str,
594    ) -> Result<()> {
595        let mut csv_content = String::new();
596        csv_content.push_str("timestamp,metric_name,value,category\n");
597
598        let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S");
599
600        // Export query metrics
601        if let Some(query_metrics) = metrics.get("query_metrics") {
602            if let Some(obj) = query_metrics.as_object() {
603                for (key, value) in obj {
604                    if let Some(num_val) = value.as_f64() {
605                        csv_content.push_str(&format!("{timestamp},query_{key},{num_val},query\n"));
606                    }
607                }
608            }
609        }
610
611        // Export system metrics
612        if let Some(system_metrics) = metrics.get("system_metrics") {
613            if let Some(obj) = system_metrics.as_object() {
614                for (key, value) in obj {
615                    if let Some(num_val) = value.as_f64() {
616                        csv_content
617                            .push_str(&format!("{timestamp},system_{key},{num_val},system\n"));
618                    }
619                }
620            }
621        }
622
623        std::fs::write(destination, csv_content)?;
624        Ok(())
625    }
626
627    fn export_as_prometheus(
628        &self,
629        metrics: &HashMap<String, serde_json::Value>,
630        destination: &str,
631    ) -> Result<()> {
632        let mut prometheus_content = String::new();
633        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
634
635        // Export query metrics
636        if let Some(query_metrics) = metrics.get("query_metrics") {
637            if let Some(obj) = query_metrics.as_object() {
638                for (key, value) in obj {
639                    if let Some(num_val) = value.as_f64() {
640                        prometheus_content
641                            .push_str(&format!("# HELP vector_query_{key} Query metric {key}\n"));
642                        prometheus_content.push_str(&format!("# TYPE vector_query_{key} gauge\n"));
643                        prometheus_content
644                            .push_str(&format!("vector_query_{key} {num_val} {timestamp}\n"));
645                    }
646                }
647            }
648        }
649
650        // Export system metrics
651        if let Some(system_metrics) = metrics.get("system_metrics") {
652            if let Some(obj) = system_metrics.as_object() {
653                for (key, value) in obj {
654                    if let Some(num_val) = value.as_f64() {
655                        prometheus_content
656                            .push_str(&format!("# HELP vector_system_{key} System metric {key}\n"));
657                        prometheus_content.push_str(&format!("# TYPE vector_system_{key} gauge\n"));
658                        prometheus_content
659                            .push_str(&format!("vector_system_{key} {num_val} {timestamp}\n"));
660                    }
661                }
662            }
663        }
664
665        std::fs::write(destination, prometheus_content)?;
666        Ok(())
667    }
668
669    fn export_as_influxdb(
670        &self,
671        metrics: &HashMap<String, serde_json::Value>,
672        destination: &str,
673    ) -> Result<()> {
674        let mut influxdb_content = String::new();
675        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
676
677        // Export query metrics
678        if let Some(query_metrics) = metrics.get("query_metrics") {
679            if let Some(obj) = query_metrics.as_object() {
680                for (key, value) in obj {
681                    if let Some(num_val) = value.as_f64() {
682                        influxdb_content.push_str(&format!(
683                            "vector_query,type=query {key}={num_val} {timestamp}\n"
684                        ));
685                    }
686                }
687            }
688        }
689
690        // Export system metrics
691        if let Some(system_metrics) = metrics.get("system_metrics") {
692            if let Some(obj) = system_metrics.as_object() {
693                for (key, value) in obj {
694                    if let Some(num_val) = value.as_f64() {
695                        influxdb_content.push_str(&format!(
696                            "vector_system,type=system {key}={num_val} {timestamp}\n"
697                        ));
698                    }
699                }
700            }
701        }
702
703        std::fs::write(destination, influxdb_content)?;
704        Ok(())
705    }
706
707    /// Start real-time dashboard update loop
708    pub async fn start_dashboard_updates(&self) -> Result<()> {
709        let dashboard_data = Arc::clone(&self.dashboard_data);
710        let metrics_collector = Arc::clone(&self.metrics_collector);
711        let performance_monitor = Arc::clone(&self.performance_monitor);
712        let refresh_interval = Duration::from_secs(self.config.dashboard_refresh_interval);
713
714        tokio::spawn(async move {
715            let mut interval = tokio::time::interval(refresh_interval);
716
717            loop {
718                interval.tick().await;
719
720                // Update dashboard data
721                let updated_data =
722                    Self::build_dashboard_data(&metrics_collector, &performance_monitor).await;
723
724                {
725                    let mut data = dashboard_data.write();
726                    *data = updated_data;
727                }
728            }
729        });
730
731        Ok(())
732    }
733
734    async fn build_dashboard_data(
735        metrics_collector: &MetricsCollector,
736        performance_monitor: &PerformanceMonitor,
737    ) -> DashboardData {
738        use crate::rta_aggregators::{
739            QualityMetricsData, QueryPerformanceData, SystemHealthData, UsageAnalyticsData,
740        };
741
742        let query_metrics = metrics_collector.query_metrics.read().clone();
743        let system_metrics = metrics_collector.system_metrics.read().clone();
744        let quality_metrics = metrics_collector.quality_metrics.read().clone();
745        let current_alerts: Vec<Alert> = performance_monitor
746            .current_alerts
747            .read()
748            .values()
749            .cloned()
750            .collect();
751
752        // Calculate system health score
753        let health_score = Self::calculate_health_score(&system_metrics, &query_metrics);
754
755        // Calculate current QPS
756        let current_qps = Self::calculate_current_qps(&query_metrics);
757
758        DashboardData {
759            overview: OverviewData {
760                total_queries_today: query_metrics.total_queries,
761                average_latency: query_metrics.average_latency,
762                current_qps,
763                system_health_score: health_score,
764                active_alerts: current_alerts.len() as u64,
765                index_size: system_metrics.index_size,
766                vector_count: system_metrics.vector_count,
767                cache_hit_ratio: system_metrics.cache_hit_ratio,
768            },
769            query_performance: QueryPerformanceData {
770                latency_trends: query_metrics.latency_history.iter().cloned().collect(),
771                throughput_trends: query_metrics.throughput_history.iter().cloned().collect(),
772                error_rate_trends: vec![(SystemTime::now(), query_metrics.error_rate)],
773                top_slow_queries: vec![], // Would be populated with actual slow queries
774                query_distribution: query_metrics.query_distribution.clone(),
775                performance_percentiles: {
776                    let mut percentiles = HashMap::new();
777                    percentiles.insert("p50".to_string(), query_metrics.p50_latency);
778                    percentiles.insert("p95".to_string(), query_metrics.p95_latency);
779                    percentiles.insert("p99".to_string(), query_metrics.p99_latency);
780                    percentiles
781                },
782            },
783            system_health: SystemHealthData {
784                cpu_usage: system_metrics.cpu_usage,
785                memory_usage: system_metrics.memory_usage,
786                disk_usage: system_metrics.disk_usage,
787                network_throughput: 0.0, // Would be calculated from network metrics
788                resource_trends: vec![(SystemTime::now(), system_metrics.cpu_usage)],
789                capacity_forecast: vec![], // Would be calculated with forecasting algorithm
790                bottlenecks: Self::identify_bottlenecks(&system_metrics, &query_metrics),
791            },
792            quality_metrics: QualityMetricsData {
793                recall_trends: vec![],
794                precision_trends: vec![],
795                similarity_distribution: quality_metrics.similarity_distribution.clone(),
796                quality_score: quality_metrics.average_similarity_score,
797                quality_trends: vec![(SystemTime::now(), quality_metrics.average_similarity_score)],
798                benchmark_comparisons: HashMap::new(),
799            },
800            usage_analytics: UsageAnalyticsData {
801                user_activity: vec![(SystemTime::now(), query_metrics.total_queries)],
802                popular_queries: vec![], // Would be populated from query analyzer
803                usage_patterns: HashMap::new(),
804                growth_metrics: crate::rta_aggregators::GrowthMetrics::default(),
805                feature_usage: HashMap::new(),
806            },
807            alerts: current_alerts,
808            last_updated: SystemTime::now(),
809        }
810    }
811
812    fn calculate_health_score(system_metrics: &SystemMetrics, query_metrics: &QueryMetrics) -> f64 {
813        let mut score = 100.0;
814
815        // Deduct points for high resource usage
816        if system_metrics.cpu_usage > 80.0 {
817            score -= (system_metrics.cpu_usage - 80.0) * 0.5;
818        }
819        if system_metrics.memory_usage > 80.0 {
820            score -= (system_metrics.memory_usage - 80.0) * 0.5;
821        }
822
823        // Deduct points for high error rate
824        if query_metrics.error_rate > 1.0 {
825            score -= query_metrics.error_rate * 10.0;
826        }
827
828        // Deduct points for high latency
829        if query_metrics.average_latency.as_millis() > 100 {
830            score -= (query_metrics.average_latency.as_millis() as f64 - 100.0) * 0.1;
831        }
832
833        score.clamp(0.0, 100.0)
834    }
835
836    fn calculate_current_qps(query_metrics: &QueryMetrics) -> f64 {
837        // Calculate QPS from recent query history
838        if query_metrics.latency_history.len() < 2 {
839            return 0.0;
840        }
841
842        let now = SystemTime::now();
843        let one_second_ago = now - Duration::from_secs(1);
844
845        let recent_queries = query_metrics
846            .latency_history
847            .iter()
848            .filter(|(timestamp, _)| *timestamp >= one_second_ago)
849            .count();
850
851        recent_queries as f64
852    }
853
854    fn identify_bottlenecks(
855        system_metrics: &SystemMetrics,
856        query_metrics: &QueryMetrics,
857    ) -> Vec<String> {
858        let mut bottlenecks = Vec::new();
859
860        if system_metrics.cpu_usage > 90.0 {
861            bottlenecks.push("High CPU usage".to_string());
862        }
863
864        if system_metrics.memory_usage > 90.0 {
865            bottlenecks.push("High memory usage".to_string());
866        }
867
868        if query_metrics.average_latency.as_millis() > 500 {
869            bottlenecks.push("High query latency".to_string());
870        }
871
872        if system_metrics.cache_hit_ratio < 0.7 {
873            bottlenecks.push("Low cache hit ratio".to_string());
874        }
875
876        bottlenecks
877    }
878
879    /// Generate web dashboard HTML
880    pub fn generate_dashboard_html(&self) -> Result<String> {
881        use crate::rta_aggregators::format_alerts;
882
883        let dashboard_data = self.get_dashboard_data();
884
885        let html = format!(
886            r#"
887<!DOCTYPE html>
888<html>
889<head>
890    <title>OxiRS Vector Search Analytics Dashboard</title>
891    <style>
892        body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
893        .dashboard {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }}
894        .card {{ background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
895        .metric {{ display: flex; justify-content: space-between; margin: 10px 0; }}
896        .metric-value {{ font-weight: bold; color: #007acc; }}
897        .alert {{ padding: 10px; margin: 5px 0; border-radius: 4px; }}
898        .alert-critical {{ background-color: #ffebee; border-left: 4px solid #f44336; }}
899        .alert-warning {{ background-color: #fff3e0; border-left: 4px solid #ff9800; }}
900        .alert-info {{ background-color: #e3f2fd; border-left: 4px solid #2196f3; }}
901        .health-score {{ font-size: 2em; text-align: center; }}
902        .health-good {{ color: #4caf50; }}
903        .health-warning {{ color: #ff9800; }}
904        .health-critical {{ color: #f44336; }}
905        h1 {{ color: #333; text-align: center; }}
906        h2 {{ color: #555; margin-top: 0; }}
907        .refresh-time {{ text-align: center; color: #888; font-size: 0.9em; }}
908    </style>
909    <script>
910        function refreshPage() {{
911            window.location.reload();
912        }}
913        setInterval(refreshPage, 30000); // Refresh every 30 seconds
914    </script>
915</head>
916<body>
917    <h1>OxiRS Vector Search Analytics Dashboard</h1>
918    <p class="refresh-time">Last updated: {}</p>
919
920    <div class="dashboard">
921        <div class="card">
922            <h2>System Health</h2>
923            <div class="health-score {}">{:.1}%</div>
924            <div class="metric">
925                <span>Active Alerts:</span>
926                <span class="metric-value">{}</span>
927            </div>
928        </div>
929
930        <div class="card">
931            <h2>Query Performance</h2>
932            <div class="metric">
933                <span>Total Queries Today:</span>
934                <span class="metric-value">{}</span>
935            </div>
936            <div class="metric">
937                <span>Average Latency:</span>
938                <span class="metric-value">{}ms</span>
939            </div>
940            <div class="metric">
941                <span>Current QPS:</span>
942                <span class="metric-value">{:.1}</span>
943            </div>
944        </div>
945
946        <div class="card">
947            <h2>System Resources</h2>
948            <div class="metric">
949                <span>CPU Usage:</span>
950                <span class="metric-value">{:.1}%</span>
951            </div>
952            <div class="metric">
953                <span>Memory Usage:</span>
954                <span class="metric-value">{:.1}%</span>
955            </div>
956            <div class="metric">
957                <span>Cache Hit Ratio:</span>
958                <span class="metric-value">{:.1}%</span>
959            </div>
960        </div>
961
962        <div class="card">
963            <h2>Vector Index</h2>
964            <div class="metric">
965                <span>Vector Count:</span>
966                <span class="metric-value">{}</span>
967            </div>
968            <div class="metric">
969                <span>Index Size:</span>
970                <span class="metric-value">{} MB</span>
971            </div>
972        </div>
973
974        <div class="card">
975            <h2>Active Alerts</h2>
976            {}
977        </div>
978    </div>
979</body>
980</html>
981            "#,
982            chrono::DateTime::<chrono::Utc>::from(dashboard_data.last_updated)
983                .format("%Y-%m-%d %H:%M:%S UTC"),
984            if dashboard_data.overview.system_health_score >= 80.0 {
985                "health-good"
986            } else if dashboard_data.overview.system_health_score >= 60.0 {
987                "health-warning"
988            } else {
989                "health-critical"
990            },
991            dashboard_data.overview.system_health_score,
992            dashboard_data.overview.active_alerts,
993            dashboard_data.overview.total_queries_today,
994            dashboard_data.overview.average_latency.as_millis(),
995            dashboard_data.overview.current_qps,
996            dashboard_data.system_health.cpu_usage,
997            dashboard_data.system_health.memory_usage,
998            dashboard_data.overview.cache_hit_ratio * 100.0,
999            dashboard_data.overview.vector_count,
1000            dashboard_data.overview.index_size / (1024 * 1024), // Convert to MB
1001            format_alerts(&dashboard_data.alerts)
1002        );
1003
1004        Ok(html)
1005    }
1006
1007    /// Start comprehensive system monitoring
1008    pub async fn start_system_monitoring(&self) -> Result<()> {
1009        let analytics_engine = self.clone();
1010
1011        tokio::spawn(async move {
1012            let mut interval = tokio::time::interval(Duration::from_secs(5));
1013
1014            loop {
1015                interval.tick().await;
1016
1017                // Collect system metrics
1018                if let Ok(system_info) = Self::collect_system_info().await {
1019                    let _ = analytics_engine.update_system_metrics(
1020                        system_info.cpu_usage,
1021                        system_info.memory_usage,
1022                        system_info.memory_total,
1023                    );
1024                }
1025            }
1026        });
1027
1028        Ok(())
1029    }
1030
1031    async fn collect_system_info() -> Result<SystemInfo> {
1032        // In a real implementation, would use system monitoring library
1033        // For now, return mock data
1034        Ok(SystemInfo {
1035            cpu_usage: {
1036                #[allow(unused_imports)]
1037                use scirs2_core::random::{Random, Rng};
1038                let mut rng = Random::seed(42);
1039                45.0 + (rng.gen_range(0.0..1.0) * 20.0) // Mock: 45-65%
1040            },
1041            memory_usage: {
1042                #[allow(unused_imports)]
1043                use scirs2_core::random::{Random, Rng};
1044                let mut rng = Random::seed(42);
1045                60.0 + (rng.gen_range(0.0..1.0) * 20.0) // Mock: 60-80%
1046            },
1047            memory_total: 16 * 1024 * 1024 * 1024, // 16GB
1048            disk_usage: 30.0,
1049            network_throughput: 100.0,
1050        })
1051    }
1052}