use anyhow::{anyhow, Result};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoringConfig {
pub enable_real_time: bool,
pub enable_query_logging: bool,
pub enable_system_monitoring: bool,
pub enable_quality_metrics: bool,
pub retention_period: Duration,
pub sampling_rate: f32,
pub alert_thresholds: AlertThresholds,
pub export_config: ExportConfig,
}
impl Default for MonitoringConfig {
fn default() -> Self {
Self {
enable_real_time: true,
enable_query_logging: true,
enable_system_monitoring: true,
enable_quality_metrics: true,
retention_period: Duration::from_secs(24 * 60 * 60), sampling_rate: 1.0,
alert_thresholds: AlertThresholds::default(),
export_config: ExportConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertThresholds {
pub max_query_latency: f64,
pub max_error_rate: f32,
pub min_recall_at_10: f32,
pub max_memory_usage: u64,
pub max_cpu_usage: f32,
}
impl Default for AlertThresholds {
fn default() -> Self {
Self {
max_query_latency: 1000.0, max_error_rate: 0.05, min_recall_at_10: 0.90, max_memory_usage: 8192, max_cpu_usage: 0.80, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportConfig {
pub format: ExportFormat,
pub export_interval: Duration,
pub destination: ExportDestination,
pub include_detailed: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExportFormat {
JSON,
CSV,
Prometheus,
InfluxDB,
ElasticSearch,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExportDestination {
File(String),
Http(String),
Database(String),
Console,
}
impl Default for ExportConfig {
fn default() -> Self {
Self {
format: ExportFormat::JSON,
export_interval: Duration::from_secs(60),
destination: ExportDestination::Console,
include_detailed: false,
}
}
}
pub struct EnhancedPerformanceMonitor {
config: MonitoringConfig,
query_metrics: Arc<RwLock<QueryMetricsCollector>>,
system_metrics: Arc<RwLock<SystemMetricsCollector>>,
quality_metrics: Arc<RwLock<QualityMetricsCollector>>,
alert_manager: AlertManager,
analytics_engine: AnalyticsEngine,
dashboard_data: Arc<RwLock<DashboardData>>,
}
impl EnhancedPerformanceMonitor {
pub fn new(config: MonitoringConfig) -> Self {
Self {
config: config.clone(),
query_metrics: Arc::new(RwLock::new(QueryMetricsCollector::new())),
system_metrics: Arc::new(RwLock::new(SystemMetricsCollector::new())),
quality_metrics: Arc::new(RwLock::new(QualityMetricsCollector::new())),
alert_manager: AlertManager::new(config.alert_thresholds.clone()),
analytics_engine: AnalyticsEngine::new(),
dashboard_data: Arc::new(RwLock::new(DashboardData::new())),
}
}
pub fn record_query(&self, query_info: QueryInfo) {
if !self.config.enable_real_time {
return;
}
if {
#[allow(unused_imports)]
use scirs2_core::random::{Random, Rng};
let mut rng = Random::seed(42);
rng.gen_range(0.0..1.0)
} > self.config.sampling_rate
{
return;
}
{
let mut metrics = self.query_metrics.write();
metrics.record_query(query_info.clone());
}
self.alert_manager.check_query_alerts(&query_info);
{
let mut dashboard = self.dashboard_data.write();
dashboard.update_query_stats(&query_info);
}
self.analytics_engine.analyze_query(&query_info);
}
pub fn record_system_metrics(&self, metrics: SystemMetrics) {
if !self.config.enable_system_monitoring {
return;
}
{
let mut collector = self.system_metrics.write();
collector.record_metrics(metrics.clone());
}
self.alert_manager.check_system_alerts(&metrics);
{
let mut dashboard = self.dashboard_data.write();
dashboard.update_system_stats(&metrics);
}
}
pub fn record_quality_metrics(&self, metrics: QualityMetrics) {
if !self.config.enable_quality_metrics {
return;
}
{
let mut collector = self.quality_metrics.write();
collector.record_metrics(metrics.clone());
}
self.alert_manager.check_quality_alerts(&metrics);
{
let mut dashboard = self.dashboard_data.write();
dashboard.update_quality_stats(&metrics);
}
}
pub fn get_dashboard_data(&self) -> DashboardData {
self.dashboard_data.read().clone()
}
pub fn generate_analytics_report(&self) -> AnalyticsReport {
let query_stats = self.query_metrics.read().get_statistics();
let system_stats = self.system_metrics.read().get_statistics();
let quality_stats = self.quality_metrics.read().get_statistics();
let alerts = self.alert_manager.get_active_alerts();
AnalyticsReport {
timestamp: SystemTime::now(),
query_statistics: query_stats,
system_statistics: system_stats,
quality_statistics: quality_stats,
active_alerts: alerts,
trends: self.analytics_engine.get_trends(),
recommendations: self.analytics_engine.get_recommendations(),
}
}
pub fn export_metrics(&self) -> Result<String> {
let report = self.generate_analytics_report();
match self.config.export_config.format {
ExportFormat::JSON => serde_json::to_string_pretty(&report)
.map_err(|e| anyhow!("JSON serialization error: {}", e)),
ExportFormat::CSV => self.generate_csv_export(&report),
ExportFormat::Prometheus => self.generate_prometheus_export(&report),
_ => Err(anyhow!("Export format not yet implemented")),
}
}
fn generate_csv_export(&self, report: &AnalyticsReport) -> Result<String> {
let mut csv = String::new();
csv.push_str("metric,value,timestamp\n");
csv.push_str(&format!(
"total_queries,{},{}\n",
report.query_statistics.total_queries,
report
.timestamp
.duration_since(UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs()
));
csv.push_str(&format!(
"avg_latency,{:.2},{}\n",
report.query_statistics.average_latency.as_millis(),
report
.timestamp
.duration_since(UNIX_EPOCH)
.expect("SystemTime should be after UNIX_EPOCH")
.as_secs()
));
Ok(csv)
}
fn generate_prometheus_export(&self, report: &AnalyticsReport) -> Result<String> {
let mut prometheus = String::new();
prometheus.push_str("# HELP vector_search_queries_total Total number of queries\n");
prometheus.push_str("# TYPE vector_search_queries_total counter\n");
prometheus.push_str(&format!(
"vector_search_queries_total {}\n",
report.query_statistics.total_queries
));
prometheus.push_str("# HELP vector_search_latency_seconds Query latency in seconds\n");
prometheus.push_str("# TYPE vector_search_latency_seconds histogram\n");
prometheus.push_str(&format!(
"vector_search_latency_seconds {:.6}\n",
report.query_statistics.average_latency.as_secs_f64()
));
Ok(prometheus)
}
pub fn start_background_monitoring(&self) {
}
pub fn stop_monitoring(&self) {
}
}
#[derive(Debug, Clone)]
pub struct QueryInfo {
pub query_id: String,
pub query_type: QueryType,
pub query_text: Option<String>,
pub vector_dimensions: Option<usize>,
pub k_value: Option<usize>,
pub threshold: Option<f32>,
pub start_time: Instant,
pub end_time: Instant,
pub success: bool,
pub error_message: Option<String>,
pub results_count: usize,
pub index_used: Option<String>,
pub cache_hit: bool,
}
#[derive(Debug, Clone)]
pub enum QueryType {
KNNSearch,
ThresholdSearch,
SimilarityCalculation,
TextEmbedding,
IndexUpdate,
Custom(String),
}
#[derive(Debug)]
pub struct QueryMetricsCollector {
queries: VecDeque<QueryInfo>,
statistics: QueryStatistics,
max_retention: usize,
}
impl Default for QueryMetricsCollector {
fn default() -> Self {
Self::new()
}
}
impl QueryMetricsCollector {
pub fn new() -> Self {
Self {
queries: VecDeque::new(),
statistics: QueryStatistics::default(),
max_retention: 10000, }
}
pub fn record_query(&mut self, query: QueryInfo) {
let latency = query.end_time.duration_since(query.start_time);
self.statistics.total_queries += 1;
if query.success {
self.statistics.successful_queries += 1;
} else {
self.statistics.failed_queries += 1;
}
if self.statistics.total_queries == 1 {
self.statistics.average_latency = latency;
self.statistics.min_latency = latency;
self.statistics.max_latency = latency;
} else {
let total_time = self
.statistics
.average_latency
.mul_f64(self.statistics.total_queries as f64 - 1.0)
+ latency;
self.statistics.average_latency =
total_time.div_f64(self.statistics.total_queries as f64);
if latency < self.statistics.min_latency {
self.statistics.min_latency = latency;
}
if latency > self.statistics.max_latency {
self.statistics.max_latency = latency;
}
}
let latency_ms = latency.as_millis() as f64;
if latency_ms < 10.0 {
self.statistics.latency_distribution.p10 += 1;
} else if latency_ms < 50.0 {
self.statistics.latency_distribution.p50 += 1;
} else if latency_ms < 100.0 {
self.statistics.latency_distribution.p90 += 1;
} else if latency_ms < 500.0 {
self.statistics.latency_distribution.p95 += 1;
} else {
self.statistics.latency_distribution.p99 += 1;
}
if query.cache_hit {
self.statistics.cache_hit_rate =
(self.statistics.cache_hit_rate * (self.statistics.total_queries - 1) as f32 + 1.0)
/ self.statistics.total_queries as f32;
} else {
self.statistics.cache_hit_rate = (self.statistics.cache_hit_rate
* (self.statistics.total_queries - 1) as f32)
/ self.statistics.total_queries as f32;
}
self.queries.push_back(query);
if self.queries.len() > self.max_retention {
self.queries.pop_front();
}
}
pub fn get_statistics(&self) -> QueryStatistics {
self.statistics.clone()
}
pub fn get_recent_queries(&self, count: usize) -> Vec<&QueryInfo> {
self.queries.iter().rev().take(count).collect()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct QueryStatistics {
pub total_queries: u64,
pub successful_queries: u64,
pub failed_queries: u64,
pub average_latency: Duration,
pub min_latency: Duration,
pub max_latency: Duration,
pub latency_distribution: LatencyDistribution,
pub cache_hit_rate: f32,
pub throughput_qps: f32,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct LatencyDistribution {
pub p10: u64, pub p50: u64, pub p90: u64, pub p95: u64, pub p99: u64, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemMetrics {
pub timestamp: SystemTime,
pub cpu_usage: f32,
pub memory_usage: u64, pub memory_total: u64, pub disk_usage: u64, pub disk_total: u64, pub network_in: u64, pub network_out: u64, pub open_file_descriptors: u32,
pub thread_count: u32,
}
#[derive(Debug)]
pub struct SystemMetricsCollector {
metrics_history: VecDeque<SystemMetrics>,
statistics: SystemStatistics,
max_retention: usize,
}
impl Default for SystemMetricsCollector {
fn default() -> Self {
Self::new()
}
}
impl SystemMetricsCollector {
pub fn new() -> Self {
Self {
metrics_history: VecDeque::new(),
statistics: SystemStatistics::default(),
max_retention: 1440, }
}
pub fn record_metrics(&mut self, metrics: SystemMetrics) {
self.statistics.current_cpu_usage = metrics.cpu_usage;
self.statistics.current_memory_usage = metrics.memory_usage;
if metrics.cpu_usage > self.statistics.peak_cpu_usage {
self.statistics.peak_cpu_usage = metrics.cpu_usage;
}
if metrics.memory_usage > self.statistics.peak_memory_usage {
self.statistics.peak_memory_usage = metrics.memory_usage;
}
self.metrics_history.push_back(metrics);
if self.metrics_history.len() > self.max_retention {
self.metrics_history.pop_front();
}
}
pub fn get_statistics(&self) -> SystemStatistics {
self.statistics.clone()
}
pub fn get_recent_metrics(&self, count: usize) -> Vec<&SystemMetrics> {
self.metrics_history.iter().rev().take(count).collect()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemStatistics {
pub current_cpu_usage: f32,
pub peak_cpu_usage: f32,
pub current_memory_usage: u64,
pub peak_memory_usage: u64,
pub average_cpu_usage: f32,
pub average_memory_usage: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityMetrics {
pub timestamp: SystemTime,
pub precision_at_1: f32,
pub precision_at_5: f32,
pub precision_at_10: f32,
pub recall_at_1: f32,
pub recall_at_5: f32,
pub recall_at_10: f32,
pub f1_score: f32,
pub mrr: f32, pub ndcg: f32, pub query_coverage: f32,
pub result_diversity: f32,
}
#[derive(Debug)]
pub struct QualityMetricsCollector {
metrics_history: VecDeque<QualityMetrics>,
statistics: QualityStatistics,
max_retention: usize,
}
impl Default for QualityMetricsCollector {
fn default() -> Self {
Self::new()
}
}
impl QualityMetricsCollector {
pub fn new() -> Self {
Self {
metrics_history: VecDeque::new(),
statistics: QualityStatistics::default(),
max_retention: 1000,
}
}
pub fn record_metrics(&mut self, metrics: QualityMetrics) {
let count = self.metrics_history.len() as f32;
if count > 0.0 {
self.statistics.average_precision_at_10 =
(self.statistics.average_precision_at_10 * count + metrics.precision_at_10)
/ (count + 1.0);
self.statistics.average_recall_at_10 = (self.statistics.average_recall_at_10 * count
+ metrics.recall_at_10)
/ (count + 1.0);
self.statistics.average_f1_score =
(self.statistics.average_f1_score * count + metrics.f1_score) / (count + 1.0);
} else {
self.statistics.average_precision_at_10 = metrics.precision_at_10;
self.statistics.average_recall_at_10 = metrics.recall_at_10;
self.statistics.average_f1_score = metrics.f1_score;
}
self.metrics_history.push_back(metrics);
if self.metrics_history.len() > self.max_retention {
self.metrics_history.pop_front();
}
}
pub fn get_statistics(&self) -> QualityStatistics {
self.statistics.clone()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct QualityStatistics {
pub average_precision_at_10: f32,
pub average_recall_at_10: f32,
pub average_f1_score: f32,
pub trend_precision: f32,
pub trend_recall: f32,
}
pub struct AlertManager {
thresholds: AlertThresholds,
active_alerts: Arc<RwLock<Vec<Alert>>>,
}
impl AlertManager {
pub fn new(thresholds: AlertThresholds) -> Self {
Self {
thresholds,
active_alerts: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn check_query_alerts(&self, query: &QueryInfo) {
let latency_ms = query.end_time.duration_since(query.start_time).as_millis() as f64;
if latency_ms > self.thresholds.max_query_latency {
self.add_alert(Alert {
alert_type: AlertType::HighLatency,
severity: AlertSeverity::Warning,
message: format!(
"Query latency {}ms exceeds threshold {}ms",
latency_ms, self.thresholds.max_query_latency
),
timestamp: SystemTime::now(),
source: "query_monitor".to_string(),
});
}
}
pub fn check_system_alerts(&self, metrics: &SystemMetrics) {
if metrics.cpu_usage > self.thresholds.max_cpu_usage {
self.add_alert(Alert {
alert_type: AlertType::HighCpuUsage,
severity: AlertSeverity::Warning,
message: format!(
"CPU usage {:.1}% exceeds threshold {:.1}%",
metrics.cpu_usage * 100.0,
self.thresholds.max_cpu_usage * 100.0
),
timestamp: SystemTime::now(),
source: "system_monitor".to_string(),
});
}
let memory_mb = metrics.memory_usage / (1024 * 1024);
if memory_mb > self.thresholds.max_memory_usage {
self.add_alert(Alert {
alert_type: AlertType::HighMemoryUsage,
severity: AlertSeverity::Critical,
message: format!(
"Memory usage {}MB exceeds threshold {}MB",
memory_mb, self.thresholds.max_memory_usage
),
timestamp: SystemTime::now(),
source: "system_monitor".to_string(),
});
}
}
pub fn check_quality_alerts(&self, metrics: &QualityMetrics) {
if metrics.recall_at_10 < self.thresholds.min_recall_at_10 {
self.add_alert(Alert {
alert_type: AlertType::LowRecall,
severity: AlertSeverity::Warning,
message: format!(
"Recall@10 {:.3} below threshold {:.3}",
metrics.recall_at_10, self.thresholds.min_recall_at_10
),
timestamp: SystemTime::now(),
source: "quality_monitor".to_string(),
});
}
}
fn add_alert(&self, alert: Alert) {
let mut alerts = self.active_alerts.write();
alerts.push(alert);
let cutoff = SystemTime::now() - Duration::from_secs(3600);
alerts.retain(|a| a.timestamp > cutoff);
}
pub fn get_active_alerts(&self) -> Vec<Alert> {
self.active_alerts.read().clone()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
pub alert_type: AlertType,
pub severity: AlertSeverity,
pub message: String,
pub timestamp: SystemTime,
pub source: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertType {
HighLatency,
HighCpuUsage,
HighMemoryUsage,
LowRecall,
HighErrorRate,
IndexCorruption,
ServiceDown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertSeverity {
Info,
Warning,
Critical,
Emergency,
}
pub struct AnalyticsEngine {
trends: Arc<RwLock<HashMap<String, TrendData>>>,
recommendations: Arc<RwLock<Vec<Recommendation>>>,
}
impl Default for AnalyticsEngine {
fn default() -> Self {
Self::new()
}
}
impl AnalyticsEngine {
pub fn new() -> Self {
Self {
trends: Arc::new(RwLock::new(HashMap::new())),
recommendations: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn analyze_query(&self, _query: &QueryInfo) {
}
pub fn get_trends(&self) -> HashMap<String, TrendData> {
self.trends.read().clone()
}
pub fn get_recommendations(&self) -> Vec<Recommendation> {
self.recommendations.read().clone()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrendData {
pub metric_name: String,
pub values: Vec<f64>,
pub timestamps: Vec<SystemTime>,
pub trend_direction: TrendDirection,
pub confidence: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TrendDirection {
Increasing,
Decreasing,
Stable,
Volatile,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Recommendation {
pub category: RecommendationCategory,
pub priority: RecommendationPriority,
pub title: String,
pub description: String,
pub estimated_impact: String,
pub implementation_effort: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RecommendationCategory {
Performance,
Quality,
ResourceOptimization,
Configuration,
Maintenance,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RecommendationPriority {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardData {
pub last_updated: SystemTime,
pub query_stats: QueryStatistics,
pub system_stats: SystemStatistics,
pub quality_stats: QualityStatistics,
pub alerts_count: usize,
pub trends: HashMap<String, Vec<f64>>,
}
impl DashboardData {
pub fn new() -> Self {
Self {
last_updated: SystemTime::now(),
query_stats: QueryStatistics::default(),
system_stats: SystemStatistics::default(),
quality_stats: QualityStatistics::default(),
alerts_count: 0,
trends: HashMap::new(),
}
}
pub fn update_query_stats(&mut self, _query: &QueryInfo) {
self.last_updated = SystemTime::now();
}
pub fn update_system_stats(&mut self, _metrics: &SystemMetrics) {
self.last_updated = SystemTime::now();
}
pub fn update_quality_stats(&mut self, _metrics: &QualityMetrics) {
self.last_updated = SystemTime::now();
}
}
impl Default for DashboardData {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalyticsReport {
pub timestamp: SystemTime,
pub query_statistics: QueryStatistics,
pub system_statistics: SystemStatistics,
pub quality_statistics: QualityStatistics,
pub active_alerts: Vec<Alert>,
pub trends: HashMap<String, TrendData>,
pub recommendations: Vec<Recommendation>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_query_metrics_collection() {
let mut collector = QueryMetricsCollector::new();
let query = QueryInfo {
query_id: "test_query".to_string(),
query_type: QueryType::KNNSearch,
query_text: Some("test query".to_string()),
vector_dimensions: Some(384),
k_value: Some(10),
threshold: None,
start_time: Instant::now() - Duration::from_millis(50),
end_time: Instant::now(),
success: true,
error_message: None,
results_count: 5,
index_used: Some("hnsw".to_string()),
cache_hit: false,
};
collector.record_query(query);
let stats = collector.get_statistics();
assert_eq!(stats.total_queries, 1);
assert_eq!(stats.successful_queries, 1);
assert_eq!(stats.failed_queries, 0);
}
#[test]
fn test_alert_generation() {
let thresholds = AlertThresholds {
max_query_latency: 100.0,
max_error_rate: 0.05,
min_recall_at_10: 0.90,
max_memory_usage: 1024,
max_cpu_usage: 0.80,
};
let alert_manager = AlertManager::new(thresholds);
let query = QueryInfo {
query_id: "slow_query".to_string(),
query_type: QueryType::KNNSearch,
query_text: None,
vector_dimensions: None,
k_value: None,
threshold: None,
start_time: Instant::now() - Duration::from_millis(200), end_time: Instant::now(),
success: true,
error_message: None,
results_count: 5,
index_used: None,
cache_hit: false,
};
alert_manager.check_query_alerts(&query);
let alerts = alert_manager.get_active_alerts();
assert_eq!(alerts.len(), 1);
assert!(matches!(alerts[0].alert_type, AlertType::HighLatency));
}
#[test]
fn test_performance_monitor() {
let config = MonitoringConfig::default();
let monitor = EnhancedPerformanceMonitor::new(config);
let query = QueryInfo {
query_id: "test".to_string(),
query_type: QueryType::KNNSearch,
query_text: None,
vector_dimensions: Some(384),
k_value: Some(10),
threshold: None,
start_time: Instant::now() - Duration::from_millis(25),
end_time: Instant::now(),
success: true,
error_message: None,
results_count: 8,
index_used: Some("hnsw".to_string()),
cache_hit: true,
};
monitor.record_query(query);
let dashboard = monitor.get_dashboard_data();
assert!(dashboard.last_updated <= SystemTime::now());
}
}