use anyhow::{anyhow, Result};
use chrono;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast;
pub struct VectorAnalyticsEngine {
config: AnalyticsConfig,
metrics_collector: Arc<MetricsCollector>,
performance_monitor: Arc<PerformanceMonitor>,
query_analyzer: Arc<QueryAnalyzer>,
alert_manager: Arc<AlertManager>,
dashboard_data: Arc<RwLock<DashboardData>>,
event_broadcaster: broadcast::Sender<AnalyticsEvent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalyticsConfig {
pub enable_real_time: bool,
pub collection_interval: u64,
pub max_metrics_history: usize,
pub enable_query_analysis: bool,
pub enable_alerts: bool,
pub dashboard_refresh_interval: u64,
pub enable_tracing: bool,
pub enable_profiling: bool,
pub retention_days: u32,
}
impl Default for AnalyticsConfig {
fn default() -> Self {
Self {
enable_real_time: true,
collection_interval: 1,
max_metrics_history: 10000,
enable_query_analysis: true,
enable_alerts: true,
dashboard_refresh_interval: 5,
enable_tracing: true,
enable_profiling: true,
retention_days: 30,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AnalyticsEvent {
QueryExecuted {
query_id: String,
operation_type: String,
duration: Duration,
result_count: usize,
success: bool,
timestamp: SystemTime,
},
IndexUpdated {
index_name: String,
operation: String,
vectors_affected: usize,
timestamp: SystemTime,
},
PerformanceAlert {
alert_type: AlertType,
message: String,
severity: AlertSeverity,
timestamp: SystemTime,
},
SystemMetric {
metric_name: String,
value: f64,
unit: String,
timestamp: SystemTime,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertType {
HighLatency,
LowThroughput,
HighMemoryUsage,
HighCpuUsage,
QualityDegradation,
IndexCorruption,
SystemError,
ResourceLimitReached,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertSeverity {
Critical,
Warning,
Info,
}
#[derive(Debug)]
pub struct MetricsCollector {
query_metrics: Arc<RwLock<QueryMetrics>>,
system_metrics: Arc<RwLock<SystemMetrics>>,
quality_metrics: Arc<RwLock<QualityMetrics>>,
custom_metrics: Arc<RwLock<HashMap<String, CustomMetric>>>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct QueryMetrics {
pub total_queries: u64,
pub successful_queries: u64,
pub failed_queries: u64,
pub average_latency: Duration,
pub p50_latency: Duration,
pub p95_latency: Duration,
pub p99_latency: Duration,
pub max_latency: Duration,
pub min_latency: Duration,
pub throughput_qps: f64,
pub latency_history: VecDeque<(SystemTime, Duration)>,
pub throughput_history: VecDeque<(SystemTime, f64)>,
pub error_rate: f64,
pub query_distribution: HashMap<String, u64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemMetrics {
pub cpu_usage: f64,
pub memory_usage: f64,
pub memory_total: u64,
pub memory_available: u64,
pub disk_usage: f64,
pub network_io: NetworkIO,
pub vector_count: u64,
pub index_size: u64,
pub cache_hit_ratio: f64,
pub gc_pressure: f64,
pub thread_count: u64,
pub system_load: f64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NetworkIO {
pub bytes_sent: u64,
pub bytes_received: u64,
pub packets_sent: u64,
pub packets_received: u64,
pub connections_active: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct QualityMetrics {
pub recall_at_k: HashMap<usize, f64>,
pub precision_at_k: HashMap<usize, f64>,
pub ndcg_at_k: HashMap<usize, f64>,
pub mean_reciprocal_rank: f64,
pub average_similarity_score: f64,
pub similarity_distribution: Vec<f64>,
pub query_diversity: f64,
pub result_diversity: f64,
pub relevance_correlation: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomMetric {
pub name: String,
pub value: f64,
pub unit: String,
pub description: String,
pub timestamp: SystemTime,
pub tags: HashMap<String, String>,
}
#[derive(Debug)]
pub struct PerformanceMonitor {
thresholds: Arc<RwLock<PerformanceThresholds>>,
alert_history: Arc<RwLock<VecDeque<Alert>>>,
current_alerts: Arc<RwLock<HashMap<String, Alert>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceThresholds {
pub max_latency_ms: u64,
pub min_throughput_qps: f64,
pub max_memory_usage_percent: f64,
pub max_cpu_usage_percent: f64,
pub min_cache_hit_ratio: f64,
pub max_error_rate_percent: f64,
pub min_recall_at_10: f64,
pub max_index_size_gb: f64,
}
impl Default for PerformanceThresholds {
fn default() -> Self {
Self {
max_latency_ms: 100,
min_throughput_qps: 100.0,
max_memory_usage_percent: 80.0,
max_cpu_usage_percent: 85.0,
min_cache_hit_ratio: 0.8,
max_error_rate_percent: 1.0,
min_recall_at_10: 0.9,
max_index_size_gb: 10.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
pub id: String,
pub alert_type: AlertType,
pub severity: AlertSeverity,
pub message: String,
pub timestamp: SystemTime,
pub resolved: bool,
pub resolved_timestamp: Option<SystemTime>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug)]
pub struct QueryAnalyzer {
query_patterns: Arc<RwLock<HashMap<String, QueryPattern>>>,
popular_queries: Arc<RwLock<VecDeque<PopularQuery>>>,
usage_trends: Arc<RwLock<UsageTrends>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryPattern {
pub pattern_id: String,
pub frequency: u64,
pub avg_latency: Duration,
pub success_rate: f64,
pub peak_hours: Vec<u8>, pub similarity_threshold_distribution: Vec<f64>,
pub result_size_distribution: Vec<usize>,
pub user_segments: HashMap<String, u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PopularQuery {
pub query_text: String,
pub frequency: u64,
pub avg_similarity_score: f64,
pub timestamp: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UsageTrends {
pub daily_query_counts: VecDeque<(SystemTime, u64)>,
pub hourly_patterns: [u64; 24],
pub weekly_patterns: [u64; 7],
pub growth_rate: f64,
pub seasonal_patterns: HashMap<String, f64>,
pub user_growth: f64,
pub feature_adoption: HashMap<String, f64>,
}
pub struct AlertManager {
config: AlertConfig,
notification_channels: Vec<Box<dyn NotificationChannel>>,
alert_rules: Arc<RwLock<Vec<AlertRule>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertConfig {
pub enable_email: bool,
pub enable_slack: bool,
pub enable_webhook: bool,
pub email_recipients: Vec<String>,
pub slack_webhook: Option<String>,
pub webhook_url: Option<String>,
pub cooldown_period: Duration,
pub escalation_enabled: bool,
}
impl Default for AlertConfig {
fn default() -> Self {
Self {
enable_email: false,
enable_slack: false,
enable_webhook: false,
email_recipients: Vec::new(),
slack_webhook: None,
webhook_url: None,
cooldown_period: Duration::from_secs(300), escalation_enabled: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertRule {
pub name: String,
pub condition: String, pub severity: AlertSeverity,
pub enabled: bool,
pub cooldown: Duration,
pub actions: Vec<String>,
}
pub trait NotificationChannel: Send + Sync {
fn send_notification(&self, alert: &Alert) -> Result<()>;
fn get_channel_type(&self) -> String;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardData {
pub overview: OverviewData,
pub query_performance: QueryPerformanceData,
pub system_health: SystemHealthData,
pub quality_metrics: QualityMetricsData,
pub usage_analytics: UsageAnalyticsData,
pub alerts: Vec<Alert>,
pub last_updated: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OverviewData {
pub total_queries_today: u64,
pub average_latency: Duration,
pub current_qps: f64,
pub system_health_score: f64,
pub active_alerts: u64,
pub index_size: u64,
pub vector_count: u64,
pub cache_hit_ratio: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct QueryPerformanceData {
pub latency_trends: Vec<(SystemTime, Duration)>,
pub throughput_trends: Vec<(SystemTime, f64)>,
pub error_rate_trends: Vec<(SystemTime, f64)>,
pub top_slow_queries: Vec<(String, Duration)>,
pub query_distribution: HashMap<String, u64>,
pub performance_percentiles: HashMap<String, Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemHealthData {
pub cpu_usage: f64,
pub memory_usage: f64,
pub disk_usage: f64,
pub network_throughput: f64,
pub resource_trends: Vec<(SystemTime, f64)>,
pub capacity_forecast: Vec<(SystemTime, f64)>,
pub bottlenecks: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityMetricsData {
pub recall_trends: Vec<(SystemTime, f64)>,
pub precision_trends: Vec<(SystemTime, f64)>,
pub similarity_distribution: Vec<f64>,
pub quality_score: f64,
pub quality_trends: Vec<(SystemTime, f64)>,
pub benchmark_comparisons: HashMap<String, f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UsageAnalyticsData {
pub user_activity: Vec<(SystemTime, u64)>,
pub popular_queries: Vec<PopularQuery>,
pub usage_patterns: HashMap<String, f64>,
pub growth_metrics: GrowthMetrics,
pub feature_usage: HashMap<String, u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GrowthMetrics {
pub daily_growth_rate: f64,
pub weekly_growth_rate: f64,
pub monthly_growth_rate: f64,
pub user_retention: f64,
pub query_volume_growth: f64,
}
impl Clone for VectorAnalyticsEngine {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
metrics_collector: Arc::clone(&self.metrics_collector),
performance_monitor: Arc::clone(&self.performance_monitor),
query_analyzer: Arc::clone(&self.query_analyzer),
alert_manager: Arc::clone(&self.alert_manager),
dashboard_data: Arc::clone(&self.dashboard_data),
event_broadcaster: self.event_broadcaster.clone(),
}
}
}
impl VectorAnalyticsEngine {
pub fn new(config: AnalyticsConfig) -> Self {
let (event_broadcaster, _) = broadcast::channel(1000);
let metrics_collector = Arc::new(MetricsCollector::new());
let performance_monitor = Arc::new(PerformanceMonitor::new());
let query_analyzer = Arc::new(QueryAnalyzer::new());
let alert_manager = Arc::new(AlertManager::new(AlertConfig::default()));
let dashboard_data = Arc::new(RwLock::new(DashboardData::default()));
Self {
config,
metrics_collector,
performance_monitor,
query_analyzer,
alert_manager,
dashboard_data,
event_broadcaster,
}
}
pub fn record_query_execution(
&self,
query_id: String,
operation_type: String,
duration: Duration,
result_count: usize,
success: bool,
) -> Result<()> {
{
let mut metrics = self.metrics_collector.query_metrics.write();
metrics.total_queries += 1;
if success {
metrics.successful_queries += 1;
} else {
metrics.failed_queries += 1;
}
self.update_latency_statistics(&mut metrics, duration);
*metrics
.query_distribution
.entry(operation_type.clone())
.or_insert(0) += 1;
metrics.error_rate =
(metrics.failed_queries as f64) / (metrics.total_queries as f64) * 100.0;
}
self.check_performance_alerts(duration, success)?;
let event = AnalyticsEvent::QueryExecuted {
query_id,
operation_type,
duration,
result_count,
success,
timestamp: SystemTime::now(),
};
let _ = self.event_broadcaster.send(event);
Ok(())
}
fn update_latency_statistics(&self, metrics: &mut QueryMetrics, duration: Duration) {
let timestamp = SystemTime::now();
metrics.latency_history.push_back((timestamp, duration));
if metrics.latency_history.len() > self.config.max_metrics_history {
metrics.latency_history.pop_front();
}
let latencies: Vec<Duration> = metrics.latency_history.iter().map(|(_, d)| *d).collect();
if !latencies.is_empty() {
let mut sorted_latencies = latencies.clone();
sorted_latencies.sort();
let len = sorted_latencies.len();
metrics.p50_latency = sorted_latencies[len / 2];
metrics.p95_latency = sorted_latencies[(len as f64 * 0.95) as usize];
metrics.p99_latency = sorted_latencies[(len as f64 * 0.99) as usize];
metrics.max_latency = *sorted_latencies
.last()
.expect("sorted_latencies validated to be non-empty");
metrics.min_latency = *sorted_latencies
.first()
.expect("collection validated to be non-empty");
let total_duration: Duration = latencies.iter().sum();
metrics.average_latency = total_duration / len as u32;
}
}
fn check_performance_alerts(&self, duration: Duration, success: bool) -> Result<()> {
let thresholds = self.performance_monitor.thresholds.read();
if duration.as_millis() > thresholds.max_latency_ms as u128 {
self.create_alert(
AlertType::HighLatency,
AlertSeverity::Warning,
format!(
"Query latency {}ms exceeds threshold {}ms",
duration.as_millis(),
thresholds.max_latency_ms
),
)?;
}
if !success {
let metrics = self.metrics_collector.query_metrics.read();
if metrics.error_rate > thresholds.max_error_rate_percent {
self.create_alert(
AlertType::SystemError,
AlertSeverity::Critical,
format!(
"Error rate {:.2}% exceeds threshold {:.2}%",
metrics.error_rate, thresholds.max_error_rate_percent
),
)?;
}
}
Ok(())
}
fn create_alert(
&self,
alert_type: AlertType,
severity: AlertSeverity,
message: String,
) -> Result<()> {
let alert_id = format!(
"{:?}_{}",
alert_type,
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
);
let alert = Alert {
id: alert_id,
alert_type,
severity,
message,
timestamp: SystemTime::now(),
resolved: false,
resolved_timestamp: None,
metadata: HashMap::new(),
};
{
let mut current_alerts = self.performance_monitor.current_alerts.write();
current_alerts.insert(alert.id.clone(), alert.clone());
let mut alert_history = self.performance_monitor.alert_history.write();
alert_history.push_back(alert.clone());
if alert_history.len() > self.config.max_metrics_history {
alert_history.pop_front();
}
}
self.alert_manager.send_alert(&alert)?;
let event = AnalyticsEvent::PerformanceAlert {
alert_type: alert.alert_type.clone(),
message: alert.message.clone(),
severity: alert.severity.clone(),
timestamp: alert.timestamp,
};
let _ = self.event_broadcaster.send(event);
Ok(())
}
pub fn record_distributed_query(
&self,
query_id: String,
node_count: usize,
total_duration: Duration,
_federation_id: Option<String>,
success: bool,
) -> Result<()> {
{
let mut metrics = self.metrics_collector.query_metrics.write();
metrics.total_queries += 1;
if success {
metrics.successful_queries += 1;
} else {
metrics.failed_queries += 1;
}
self.update_latency_statistics(&mut metrics, total_duration);
let operation_type = format!("distributed_query_{node_count}_nodes");
*metrics
.query_distribution
.entry(operation_type)
.or_insert(0) += 1;
metrics.error_rate = if metrics.total_queries > 0 {
metrics.failed_queries as f64 / metrics.total_queries as f64
} else {
0.0
};
}
let event = AnalyticsEvent::QueryExecuted {
query_id: query_id.clone(),
operation_type: format!("distributed_query_{node_count}_nodes"),
duration: total_duration,
result_count: node_count,
success,
timestamp: SystemTime::now(),
};
let _ = self.event_broadcaster.send(event);
if total_duration.as_millis() > 5000 {
let message = format!(
"Distributed query {} across {} nodes took {}ms",
query_id,
node_count,
total_duration.as_millis()
);
self.create_alert(AlertType::HighLatency, AlertSeverity::Warning, message)?;
}
Ok(())
}
pub fn update_system_metrics(
&self,
cpu_usage: f64,
memory_usage: f64,
memory_total: u64,
) -> Result<()> {
{
let mut metrics = self.metrics_collector.system_metrics.write();
metrics.cpu_usage = cpu_usage;
metrics.memory_usage = memory_usage;
metrics.memory_total = memory_total;
metrics.memory_available =
memory_total - (memory_total as f64 * memory_usage / 100.0) as u64;
}
let thresholds = self.performance_monitor.thresholds.read();
if cpu_usage > thresholds.max_cpu_usage_percent {
self.create_alert(
AlertType::HighCpuUsage,
AlertSeverity::Warning,
format!(
"CPU usage {:.2}% exceeds threshold {:.2}%",
cpu_usage, thresholds.max_cpu_usage_percent
),
)?;
}
if memory_usage > thresholds.max_memory_usage_percent {
self.create_alert(
AlertType::HighMemoryUsage,
AlertSeverity::Warning,
format!(
"Memory usage {:.2}% exceeds threshold {:.2}%",
memory_usage, thresholds.max_memory_usage_percent
),
)?;
}
Ok(())
}
pub fn get_dashboard_data(&self) -> DashboardData {
self.dashboard_data.read().clone()
}
pub fn subscribe_to_events(&self) -> broadcast::Receiver<AnalyticsEvent> {
self.event_broadcaster.subscribe()
}
pub fn generate_report(
&self,
start_time: SystemTime,
end_time: SystemTime,
) -> Result<AnalyticsReport> {
let query_metrics = self.metrics_collector.query_metrics.read().clone();
let system_metrics = self.metrics_collector.system_metrics.read().clone();
let quality_metrics = self.metrics_collector.quality_metrics.read().clone();
Ok(AnalyticsReport {
report_id: format!(
"report_{}",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
),
start_time,
end_time,
query_metrics,
system_metrics,
quality_metrics,
alerts: self.get_alerts_in_range(start_time, end_time)?,
recommendations: self.generate_recommendations()?,
generated_at: SystemTime::now(),
})
}
fn get_alerts_in_range(
&self,
start_time: SystemTime,
end_time: SystemTime,
) -> Result<Vec<Alert>> {
let alert_history = self.performance_monitor.alert_history.read();
Ok(alert_history
.iter()
.filter(|alert| alert.timestamp >= start_time && alert.timestamp <= end_time)
.cloned()
.collect())
}
fn generate_recommendations(&self) -> Result<Vec<String>> {
let mut recommendations = Vec::new();
let query_metrics = self.metrics_collector.query_metrics.read();
let system_metrics = self.metrics_collector.system_metrics.read();
if query_metrics.average_latency.as_millis() > 50 {
recommendations
.push("Consider optimizing queries or adding more powerful hardware".to_string());
}
if system_metrics.memory_usage > 80.0 {
recommendations.push(
"Memory usage is high. Consider increasing memory or optimizing data structures"
.to_string(),
);
}
if system_metrics.cache_hit_ratio < 0.8 {
recommendations.push("Cache hit ratio is low. Consider increasing cache size or improving cache strategy".to_string());
}
Ok(recommendations)
}
pub fn export_metrics(&self, format: ExportFormat, destination: &str) -> Result<()> {
let metrics_data = self.collect_all_metrics()?;
match format {
ExportFormat::Json => self.export_as_json(&metrics_data, destination),
ExportFormat::Csv => self.export_as_csv(&metrics_data, destination),
ExportFormat::Prometheus => self.export_as_prometheus(&metrics_data, destination),
ExportFormat::InfluxDb => self.export_as_influxdb(&metrics_data, destination),
}
}
fn collect_all_metrics(&self) -> Result<HashMap<String, serde_json::Value>> {
let mut all_metrics = HashMap::new();
let query_metrics = self.metrics_collector.query_metrics.read();
let system_metrics = self.metrics_collector.system_metrics.read();
let quality_metrics = self.metrics_collector.quality_metrics.read();
all_metrics.insert(
"query_metrics".to_string(),
serde_json::to_value(&*query_metrics)?,
);
all_metrics.insert(
"system_metrics".to_string(),
serde_json::to_value(&*system_metrics)?,
);
all_metrics.insert(
"quality_metrics".to_string(),
serde_json::to_value(&*quality_metrics)?,
);
Ok(all_metrics)
}
fn export_as_json(
&self,
metrics: &HashMap<String, serde_json::Value>,
destination: &str,
) -> Result<()> {
let json_data = serde_json::to_string_pretty(metrics)?;
std::fs::write(destination, json_data)?;
Ok(())
}
fn export_as_csv(
&self,
metrics: &HashMap<String, serde_json::Value>,
destination: &str,
) -> Result<()> {
let mut csv_content = String::new();
csv_content.push_str("timestamp,metric_name,value,category\n");
let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S");
if let Some(query_metrics) = metrics.get("query_metrics") {
if let Some(obj) = query_metrics.as_object() {
for (key, value) in obj {
if let Some(num_val) = value.as_f64() {
csv_content.push_str(&format!("{timestamp},query_{key},{num_val},query\n"));
}
}
}
}
if let Some(system_metrics) = metrics.get("system_metrics") {
if let Some(obj) = system_metrics.as_object() {
for (key, value) in obj {
if let Some(num_val) = value.as_f64() {
csv_content
.push_str(&format!("{timestamp},system_{key},{num_val},system\n"));
}
}
}
}
std::fs::write(destination, csv_content)?;
Ok(())
}
fn export_as_prometheus(
&self,
metrics: &HashMap<String, serde_json::Value>,
destination: &str,
) -> Result<()> {
let mut prometheus_content = String::new();
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
if let Some(query_metrics) = metrics.get("query_metrics") {
if let Some(obj) = query_metrics.as_object() {
for (key, value) in obj {
if let Some(num_val) = value.as_f64() {
prometheus_content
.push_str(&format!("# HELP vector_query_{key} Query metric {key}\n"));
prometheus_content.push_str(&format!("# TYPE vector_query_{key} gauge\n"));
prometheus_content
.push_str(&format!("vector_query_{key} {num_val} {timestamp}\n"));
}
}
}
}
if let Some(system_metrics) = metrics.get("system_metrics") {
if let Some(obj) = system_metrics.as_object() {
for (key, value) in obj {
if let Some(num_val) = value.as_f64() {
prometheus_content
.push_str(&format!("# HELP vector_system_{key} System metric {key}\n"));
prometheus_content.push_str(&format!("# TYPE vector_system_{key} gauge\n"));
prometheus_content
.push_str(&format!("vector_system_{key} {num_val} {timestamp}\n"));
}
}
}
}
std::fs::write(destination, prometheus_content)?;
Ok(())
}
fn export_as_influxdb(
&self,
metrics: &HashMap<String, serde_json::Value>,
destination: &str,
) -> Result<()> {
let mut influxdb_content = String::new();
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
if let Some(query_metrics) = metrics.get("query_metrics") {
if let Some(obj) = query_metrics.as_object() {
for (key, value) in obj {
if let Some(num_val) = value.as_f64() {
influxdb_content.push_str(&format!(
"vector_query,type=query {key}={num_val} {timestamp}\n"
));
}
}
}
}
if let Some(system_metrics) = metrics.get("system_metrics") {
if let Some(obj) = system_metrics.as_object() {
for (key, value) in obj {
if let Some(num_val) = value.as_f64() {
influxdb_content.push_str(&format!(
"vector_system,type=system {key}={num_val} {timestamp}\n"
));
}
}
}
}
std::fs::write(destination, influxdb_content)?;
Ok(())
}
pub async fn start_dashboard_updates(&self) -> Result<()> {
let dashboard_data = Arc::clone(&self.dashboard_data);
let metrics_collector = Arc::clone(&self.metrics_collector);
let performance_monitor = Arc::clone(&self.performance_monitor);
let refresh_interval = Duration::from_secs(self.config.dashboard_refresh_interval);
tokio::spawn(async move {
let mut interval = tokio::time::interval(refresh_interval);
loop {
interval.tick().await;
let updated_data =
Self::build_dashboard_data(&metrics_collector, &performance_monitor).await;
{
let mut data = dashboard_data.write();
*data = updated_data;
}
}
});
Ok(())
}
async fn build_dashboard_data(
metrics_collector: &MetricsCollector,
performance_monitor: &PerformanceMonitor,
) -> DashboardData {
let query_metrics = metrics_collector.query_metrics.read().clone();
let system_metrics = metrics_collector.system_metrics.read().clone();
let quality_metrics = metrics_collector.quality_metrics.read().clone();
let current_alerts: Vec<Alert> = performance_monitor
.current_alerts
.read()
.values()
.cloned()
.collect();
let health_score = Self::calculate_health_score(&system_metrics, &query_metrics);
let current_qps = Self::calculate_current_qps(&query_metrics);
DashboardData {
overview: OverviewData {
total_queries_today: query_metrics.total_queries,
average_latency: query_metrics.average_latency,
current_qps,
system_health_score: health_score,
active_alerts: current_alerts.len() as u64,
index_size: system_metrics.index_size,
vector_count: system_metrics.vector_count,
cache_hit_ratio: system_metrics.cache_hit_ratio,
},
query_performance: QueryPerformanceData {
latency_trends: query_metrics.latency_history.iter().cloned().collect(),
throughput_trends: query_metrics.throughput_history.iter().cloned().collect(),
error_rate_trends: vec![(SystemTime::now(), query_metrics.error_rate)],
top_slow_queries: vec![], query_distribution: query_metrics.query_distribution.clone(),
performance_percentiles: {
let mut percentiles = HashMap::new();
percentiles.insert("p50".to_string(), query_metrics.p50_latency);
percentiles.insert("p95".to_string(), query_metrics.p95_latency);
percentiles.insert("p99".to_string(), query_metrics.p99_latency);
percentiles
},
},
system_health: SystemHealthData {
cpu_usage: system_metrics.cpu_usage,
memory_usage: system_metrics.memory_usage,
disk_usage: system_metrics.disk_usage,
network_throughput: 0.0, resource_trends: vec![(SystemTime::now(), system_metrics.cpu_usage)],
capacity_forecast: vec![], bottlenecks: Self::identify_bottlenecks(&system_metrics, &query_metrics),
},
quality_metrics: QualityMetricsData {
recall_trends: vec![],
precision_trends: vec![],
similarity_distribution: quality_metrics.similarity_distribution.clone(),
quality_score: quality_metrics.average_similarity_score,
quality_trends: vec![(SystemTime::now(), quality_metrics.average_similarity_score)],
benchmark_comparisons: HashMap::new(),
},
usage_analytics: UsageAnalyticsData {
user_activity: vec![(SystemTime::now(), query_metrics.total_queries)],
popular_queries: vec![], usage_patterns: HashMap::new(),
growth_metrics: GrowthMetrics::default(),
feature_usage: HashMap::new(),
},
alerts: current_alerts,
last_updated: SystemTime::now(),
}
}
fn calculate_health_score(system_metrics: &SystemMetrics, query_metrics: &QueryMetrics) -> f64 {
let mut score = 100.0;
if system_metrics.cpu_usage > 80.0 {
score -= (system_metrics.cpu_usage - 80.0) * 0.5;
}
if system_metrics.memory_usage > 80.0 {
score -= (system_metrics.memory_usage - 80.0) * 0.5;
}
if query_metrics.error_rate > 1.0 {
score -= query_metrics.error_rate * 10.0;
}
if query_metrics.average_latency.as_millis() > 100 {
score -= (query_metrics.average_latency.as_millis() as f64 - 100.0) * 0.1;
}
score.clamp(0.0, 100.0)
}
fn calculate_current_qps(query_metrics: &QueryMetrics) -> f64 {
if query_metrics.latency_history.len() < 2 {
return 0.0;
}
let now = SystemTime::now();
let one_second_ago = now - Duration::from_secs(1);
let recent_queries = query_metrics
.latency_history
.iter()
.filter(|(timestamp, _)| *timestamp >= one_second_ago)
.count();
recent_queries as f64
}
fn identify_bottlenecks(
system_metrics: &SystemMetrics,
query_metrics: &QueryMetrics,
) -> Vec<String> {
let mut bottlenecks = Vec::new();
if system_metrics.cpu_usage > 90.0 {
bottlenecks.push("High CPU usage".to_string());
}
if system_metrics.memory_usage > 90.0 {
bottlenecks.push("High memory usage".to_string());
}
if query_metrics.average_latency.as_millis() > 500 {
bottlenecks.push("High query latency".to_string());
}
if system_metrics.cache_hit_ratio < 0.7 {
bottlenecks.push("Low cache hit ratio".to_string());
}
bottlenecks
}
pub fn generate_dashboard_html(&self) -> Result<String> {
let dashboard_data = self.get_dashboard_data();
let html = format!(
r#"
<!DOCTYPE html>
<html>
<head>
<title>OxiRS Vector Search Analytics Dashboard</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
.dashboard {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }}
.card {{ background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
.metric {{ display: flex; justify-content: space-between; margin: 10px 0; }}
.metric-value {{ font-weight: bold; color: #007acc; }}
.alert {{ padding: 10px; margin: 5px 0; border-radius: 4px; }}
.alert-critical {{ background-color: #ffebee; border-left: 4px solid #f44336; }}
.alert-warning {{ background-color: #fff3e0; border-left: 4px solid #ff9800; }}
.alert-info {{ background-color: #e3f2fd; border-left: 4px solid #2196f3; }}
.health-score {{ font-size: 2em; text-align: center; }}
.health-good {{ color: #4caf50; }}
.health-warning {{ color: #ff9800; }}
.health-critical {{ color: #f44336; }}
h1 {{ color: #333; text-align: center; }}
h2 {{ color: #555; margin-top: 0; }}
.refresh-time {{ text-align: center; color: #888; font-size: 0.9em; }}
</style>
<script>
function refreshPage() {{
window.location.reload();
}}
setInterval(refreshPage, 30000); // Refresh every 30 seconds
</script>
</head>
<body>
<h1>🔍 OxiRS Vector Search Analytics Dashboard</h1>
<p class="refresh-time">Last updated: {}</p>
<div class="dashboard">
<div class="card">
<h2>System Health</h2>
<div class="health-score {}">{:.1}%</div>
<div class="metric">
<span>Active Alerts:</span>
<span class="metric-value">{}</span>
</div>
</div>
<div class="card">
<h2>Query Performance</h2>
<div class="metric">
<span>Total Queries Today:</span>
<span class="metric-value">{}</span>
</div>
<div class="metric">
<span>Average Latency:</span>
<span class="metric-value">{}ms</span>
</div>
<div class="metric">
<span>Current QPS:</span>
<span class="metric-value">{:.1}</span>
</div>
</div>
<div class="card">
<h2>System Resources</h2>
<div class="metric">
<span>CPU Usage:</span>
<span class="metric-value">{:.1}%</span>
</div>
<div class="metric">
<span>Memory Usage:</span>
<span class="metric-value">{:.1}%</span>
</div>
<div class="metric">
<span>Cache Hit Ratio:</span>
<span class="metric-value">{:.1}%</span>
</div>
</div>
<div class="card">
<h2>Vector Index</h2>
<div class="metric">
<span>Vector Count:</span>
<span class="metric-value">{}</span>
</div>
<div class="metric">
<span>Index Size:</span>
<span class="metric-value">{} MB</span>
</div>
</div>
<div class="card">
<h2>Active Alerts</h2>
{}
</div>
</div>
</body>
</html>
"#,
chrono::DateTime::<chrono::Utc>::from(dashboard_data.last_updated)
.format("%Y-%m-%d %H:%M:%S UTC"),
if dashboard_data.overview.system_health_score >= 80.0 {
"health-good"
} else if dashboard_data.overview.system_health_score >= 60.0 {
"health-warning"
} else {
"health-critical"
},
dashboard_data.overview.system_health_score,
dashboard_data.overview.active_alerts,
dashboard_data.overview.total_queries_today,
dashboard_data.overview.average_latency.as_millis(),
dashboard_data.overview.current_qps,
dashboard_data.system_health.cpu_usage,
dashboard_data.system_health.memory_usage,
dashboard_data.overview.cache_hit_ratio * 100.0,
dashboard_data.overview.vector_count,
dashboard_data.overview.index_size / (1024 * 1024), Self::format_alerts(&dashboard_data.alerts)
);
Ok(html)
}
fn format_alerts(alerts: &[Alert]) -> String {
if alerts.is_empty() {
return "<p>No active alerts</p>".to_string();
}
alerts
.iter()
.map(|alert| {
let class = match alert.severity {
AlertSeverity::Critical => "alert-critical",
AlertSeverity::Warning => "alert-warning",
AlertSeverity::Info => "alert-info",
};
format!(
"<div class=\"alert {}\">
<strong>{:?}</strong>: {}
<br><small>{}</small>
</div>",
class,
alert.alert_type,
alert.message,
chrono::DateTime::<chrono::Utc>::from(alert.timestamp).format("%H:%M:%S")
)
})
.collect::<Vec<_>>()
.join("\n")
}
pub async fn start_system_monitoring(&self) -> Result<()> {
let analytics_engine = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
if let Ok(system_info) = Self::collect_system_info().await {
let _ = analytics_engine.update_system_metrics(
system_info.cpu_usage,
system_info.memory_usage,
system_info.memory_total,
);
}
}
});
Ok(())
}
async fn collect_system_info() -> Result<SystemInfo> {
Ok(SystemInfo {
cpu_usage: {
#[allow(unused_imports)]
use scirs2_core::random::{Random, Rng};
let mut rng = Random::seed(42);
45.0 + (rng.gen_range(0.0..1.0) * 20.0) },
memory_usage: {
#[allow(unused_imports)]
use scirs2_core::random::{Random, Rng};
let mut rng = Random::seed(42);
60.0 + (rng.gen_range(0.0..1.0) * 20.0) },
memory_total: 16 * 1024 * 1024 * 1024, disk_usage: 30.0,
network_throughput: 100.0,
})
}
}
#[derive(Debug, Clone)]
pub struct SystemInfo {
pub cpu_usage: f64,
pub memory_usage: f64,
pub memory_total: u64,
pub disk_usage: f64,
pub network_throughput: f64,
}
pub struct PerformanceProfiler {
profiles: Arc<RwLock<HashMap<String, ProfileData>>>,
active_profiles: Arc<RwLock<HashMap<String, Instant>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProfileData {
pub function_name: String,
pub total_calls: u64,
pub total_time: Duration,
pub average_time: Duration,
pub min_time: Duration,
pub max_time: Duration,
pub call_history: VecDeque<(SystemTime, Duration)>,
}
impl Default for PerformanceProfiler {
fn default() -> Self {
Self::new()
}
}
impl PerformanceProfiler {
pub fn new() -> Self {
Self {
profiles: Arc::new(RwLock::new(HashMap::new())),
active_profiles: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn start_profile(&self, function_name: &str) -> String {
let profile_id = format!(
"{}_{}",
function_name,
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
);
let mut active = self.active_profiles.write();
active.insert(profile_id.clone(), Instant::now());
profile_id
}
pub fn end_profile(&self, profile_id: &str) -> Result<Duration> {
let mut active = self.active_profiles.write();
let start_time = active
.remove(profile_id)
.ok_or_else(|| anyhow!("Profile ID not found: {}", profile_id))?;
let duration = start_time.elapsed();
let function_name = profile_id.split('_').next().unwrap_or("unknown");
let mut profiles = self.profiles.write();
let profile = profiles
.entry(function_name.to_string())
.or_insert_with(|| ProfileData {
function_name: function_name.to_string(),
total_calls: 0,
total_time: Duration::from_nanos(0),
average_time: Duration::from_nanos(0),
min_time: Duration::from_secs(u64::MAX),
max_time: Duration::from_nanos(0),
call_history: VecDeque::new(),
});
profile.total_calls += 1;
profile.total_time += duration;
profile.average_time = profile.total_time / profile.total_calls as u32;
profile.min_time = profile.min_time.min(duration);
profile.max_time = profile.max_time.max(duration);
profile
.call_history
.push_back((SystemTime::now(), duration));
while profile.call_history.len() > 1000 {
profile.call_history.pop_front();
}
Ok(duration)
}
pub fn get_profile_report(&self) -> HashMap<String, ProfileData> {
self.profiles.read().clone()
}
}
#[derive(Debug, Clone)]
pub enum ExportFormat {
Json,
Csv,
Prometheus,
InfluxDb,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalyticsReport {
pub report_id: String,
pub start_time: SystemTime,
pub end_time: SystemTime,
pub query_metrics: QueryMetrics,
pub system_metrics: SystemMetrics,
pub quality_metrics: QualityMetrics,
pub alerts: Vec<Alert>,
pub recommendations: Vec<String>,
pub generated_at: SystemTime,
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
query_metrics: Arc::new(RwLock::new(QueryMetrics::default())),
system_metrics: Arc::new(RwLock::new(SystemMetrics::default())),
quality_metrics: Arc::new(RwLock::new(QualityMetrics::default())),
custom_metrics: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl Default for PerformanceMonitor {
fn default() -> Self {
Self::new()
}
}
impl PerformanceMonitor {
pub fn new() -> Self {
Self {
thresholds: Arc::new(RwLock::new(PerformanceThresholds::default())),
alert_history: Arc::new(RwLock::new(VecDeque::new())),
current_alerts: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl Default for QueryAnalyzer {
fn default() -> Self {
Self::new()
}
}
impl QueryAnalyzer {
pub fn new() -> Self {
Self {
query_patterns: Arc::new(RwLock::new(HashMap::new())),
popular_queries: Arc::new(RwLock::new(VecDeque::new())),
usage_trends: Arc::new(RwLock::new(UsageTrends::default())),
}
}
}
impl AlertManager {
pub fn new(config: AlertConfig) -> Self {
Self {
config,
notification_channels: Vec::new(),
alert_rules: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn send_alert(&self, alert: &Alert) -> Result<()> {
for channel in &self.notification_channels {
if let Err(e) = channel.send_notification(alert) {
eprintln!(
"Failed to send alert via {}: {}",
channel.get_channel_type(),
e
);
}
}
Ok(())
}
pub fn add_notification_channel(&mut self, channel: Box<dyn NotificationChannel>) {
self.notification_channels.push(channel);
}
}
impl Default for DashboardData {
fn default() -> Self {
Self {
overview: OverviewData::default(),
query_performance: QueryPerformanceData::default(),
system_health: SystemHealthData::default(),
quality_metrics: QualityMetricsData::default(),
usage_analytics: UsageAnalyticsData::default(),
alerts: Vec::new(),
last_updated: SystemTime::now(),
}
}
}
impl Default for OverviewData {
fn default() -> Self {
Self {
total_queries_today: 0,
average_latency: Duration::from_millis(0),
current_qps: 0.0,
system_health_score: 100.0,
active_alerts: 0,
index_size: 0,
vector_count: 0,
cache_hit_ratio: 0.0,
}
}
}
impl Default for SystemHealthData {
fn default() -> Self {
Self {
cpu_usage: 0.0,
memory_usage: 0.0,
disk_usage: 0.0,
network_throughput: 0.0,
resource_trends: Vec::new(),
capacity_forecast: Vec::new(),
bottlenecks: Vec::new(),
}
}
}
impl Default for QualityMetricsData {
fn default() -> Self {
Self {
recall_trends: Vec::new(),
precision_trends: Vec::new(),
similarity_distribution: Vec::new(),
quality_score: 0.0,
quality_trends: Vec::new(),
benchmark_comparisons: HashMap::new(),
}
}
}
impl Default for UsageTrends {
fn default() -> Self {
Self {
daily_query_counts: VecDeque::new(),
hourly_patterns: [0; 24],
weekly_patterns: [0; 7],
growth_rate: 0.0,
seasonal_patterns: HashMap::new(),
user_growth: 0.0,
feature_adoption: HashMap::new(),
}
}
}
impl Default for GrowthMetrics {
fn default() -> Self {
Self {
daily_growth_rate: 0.0,
weekly_growth_rate: 0.0,
monthly_growth_rate: 0.0,
user_retention: 0.0,
query_volume_growth: 0.0,
}
}
}
pub struct EmailNotificationChannel {
smtp_config: SmtpConfig,
}
#[derive(Debug, Clone)]
pub struct SmtpConfig {
pub server: String,
pub port: u16,
pub username: String,
pub password: String,
pub from_address: String,
}
impl EmailNotificationChannel {
pub fn new(smtp_config: SmtpConfig) -> Self {
Self { smtp_config }
}
}
impl NotificationChannel for EmailNotificationChannel {
fn send_notification(&self, alert: &Alert) -> Result<()> {
tracing::info!(
"Email notification sent for alert {}: {}",
alert.id,
alert.message
);
Ok(())
}
fn get_channel_type(&self) -> String {
"email".to_string()
}
}
pub struct SlackNotificationChannel {
webhook_url: String,
client: reqwest::Client,
}
impl SlackNotificationChannel {
pub fn new(webhook_url: String) -> Self {
Self {
webhook_url,
client: reqwest::Client::new(),
}
}
}
impl NotificationChannel for SlackNotificationChannel {
fn send_notification(&self, alert: &Alert) -> Result<()> {
let _payload = serde_json::json!({
"text": format!("🚨 Alert: {}", alert.message),
"attachments": [{
"color": match alert.severity {
AlertSeverity::Critical => "danger",
AlertSeverity::Warning => "warning",
AlertSeverity::Info => "good",
},
"fields": [
{
"title": "Alert Type",
"value": format!("{:?}", alert.alert_type),
"short": true
},
{
"title": "Severity",
"value": format!("{:?}", alert.severity),
"short": true
},
{
"title": "Timestamp",
"value": chrono::DateTime::<chrono::Utc>::from(alert.timestamp).format("%Y-%m-%d %H:%M:%S UTC").to_string(),
"short": true
}
]
}]
});
tracing::info!(
"Slack notification sent for alert {}: {}",
alert.id,
alert.message
);
Ok(())
}
fn get_channel_type(&self) -> String {
"slack".to_string()
}
}
pub struct WebhookNotificationChannel {
webhook_url: String,
client: reqwest::Client,
headers: HashMap<String, String>,
}
impl WebhookNotificationChannel {
pub fn new(webhook_url: String) -> Self {
Self {
webhook_url,
client: reqwest::Client::new(),
headers: HashMap::new(),
}
}
pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
self.headers = headers;
self
}
}
impl NotificationChannel for WebhookNotificationChannel {
fn send_notification(&self, alert: &Alert) -> Result<()> {
let _payload = serde_json::to_value(alert)?;
tracing::info!(
"Webhook notification sent for alert {}: {}",
alert.id,
alert.message
);
Ok(())
}
fn get_channel_type(&self) -> String {
"webhook".to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_analytics_engine_creation() {
let config = AnalyticsConfig::default();
let engine = VectorAnalyticsEngine::new(config);
assert!(engine.config.enable_real_time);
assert_eq!(engine.config.collection_interval, 1);
}
#[test]
fn test_query_recording() {
let config = AnalyticsConfig::default();
let engine = VectorAnalyticsEngine::new(config);
let result = engine.record_query_execution(
"test_query_1".to_string(),
"similarity_search".to_string(),
Duration::from_millis(50),
10,
true,
);
assert!(result.is_ok());
let metrics = engine.metrics_collector.query_metrics.read();
assert_eq!(metrics.total_queries, 1);
assert_eq!(metrics.successful_queries, 1);
}
#[test]
fn test_alert_creation() {
let config = AnalyticsConfig::default();
let engine = VectorAnalyticsEngine::new(config);
let result = engine.create_alert(
AlertType::HighLatency,
AlertSeverity::Warning,
"Test alert message".to_string(),
);
assert!(result.is_ok());
let current_alerts = engine.performance_monitor.current_alerts.read();
assert_eq!(current_alerts.len(), 1);
}
#[test]
fn test_metrics_export() {
let config = AnalyticsConfig::default();
let engine = VectorAnalyticsEngine::new(config);
let _ = engine.record_query_execution(
"test".to_string(),
"search".to_string(),
Duration::from_millis(25),
5,
true,
);
let temp_file = "/tmp/test_metrics.json";
let result = engine.export_metrics(ExportFormat::Json, temp_file);
assert!(result.is_ok());
let _ = std::fs::remove_file(temp_file);
}
}