use anyhow::{Result, Context};
use scirs2_core::ndarray_ext::{Array1, Array2};
use scirs2_core::metrics::{Counter, Timer, Histogram};
use scirs2_core::random::{Random, Rng};
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant, SystemTime};
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone)]
pub struct AnalyzerConfig {
pub max_records: usize,
pub enable_ml_predictions: bool,
pub enable_detailed_profiling: bool,
pub slow_query_threshold_ms: u64,
pub enable_auto_suggestions: bool,
pub sampling_rate: f64,
}
impl Default for AnalyzerConfig {
fn default() -> Self {
Self {
max_records: 10000,
enable_ml_predictions: true,
enable_detailed_profiling: false,
slow_query_threshold_ms: 1000,
enable_auto_suggestions: true,
sampling_rate: 1.0,
}
}
}
impl AnalyzerConfig {
pub fn lightweight() -> Self {
Self {
max_records: 1000,
enable_ml_predictions: false,
enable_detailed_profiling: false,
slow_query_threshold_ms: 5000,
enable_auto_suggestions: false,
sampling_rate: 0.1,
}
}
pub fn comprehensive() -> Self {
Self {
max_records: 50000,
enable_ml_predictions: true,
enable_detailed_profiling: true,
slow_query_threshold_ms: 500,
enable_auto_suggestions: true,
sampling_rate: 1.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionPhases {
pub parse_ms: f64,
pub optimize_ms: f64,
pub execute_ms: f64,
pub serialize_ms: f64,
}
impl ExecutionPhases {
pub fn total_ms(&self) -> f64 {
self.parse_ms + self.optimize_ms + self.execute_ms + self.serialize_ms
}
pub fn slowest_phase(&self) -> &str {
let phases = [
("parse", self.parse_ms),
("optimize", self.optimize_ms),
("execute", self.execute_ms),
("serialize", self.serialize_ms),
];
phases.iter()
.max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|p| p.0)
.unwrap_or("unknown")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceUtilization {
pub peak_memory_mb: f64,
pub avg_cpu: f64,
pub io_operations: u64,
pub network_bytes: u64,
}
impl Default for ResourceUtilization {
fn default() -> Self {
Self {
peak_memory_mb: 0.0,
avg_cpu: 0.0,
io_operations: 0,
network_bytes: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionProfile {
pub query: String,
pub timestamp: SystemTime,
pub duration: Duration,
pub result_count: usize,
pub phases: Option<ExecutionPhases>,
pub resources: Option<ResourceUtilization>,
pub was_cached: bool,
pub complexity_score: f64,
}
impl ExecutionProfile {
pub fn new(query: impl Into<String>) -> Self {
Self {
query: query.into(),
timestamp: SystemTime::now(),
duration: Duration::from_secs(0),
result_count: 0,
phases: None,
resources: None,
was_cached: false,
complexity_score: 0.0,
}
}
pub fn with_duration_ms(mut self, ms: u64) -> Self {
self.duration = Duration::from_millis(ms);
self
}
pub fn with_result_count(mut self, count: usize) -> Self {
self.result_count = count;
self
}
pub fn with_phases(mut self, phases: ExecutionPhases) -> Self {
self.phases = Some(phases);
self
}
pub fn with_resources(mut self, resources: ResourceUtilization) -> Self {
self.resources = Some(resources);
self
}
pub fn cached(mut self) -> Self {
self.was_cached = true;
self
}
pub fn with_complexity(mut self, score: f64) -> Self {
self.complexity_score = score;
self
}
pub fn duration_ms(&self) -> f64 {
self.duration.as_secs_f64() * 1000.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BottleneckType {
Parsing,
Optimization,
Execution,
Serialization,
Memory,
Cpu,
Io,
CartesianProduct,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceBottleneck {
pub bottleneck_type: BottleneckType,
pub severity: f64,
pub description: String,
pub recommendation: String,
pub estimated_improvement: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceInsights {
pub score: f64,
pub bottlenecks: Vec<PerformanceBottleneck>,
pub recommendations: Vec<String>,
pub predicted_duration_ms: Option<f64>,
pub summary: PerformanceSummary,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceSummary {
pub total_queries: usize,
pub avg_duration_ms: f64,
pub median_duration_ms: f64,
pub p95_duration_ms: f64,
pub p99_duration_ms: f64,
pub cache_hit_rate: f64,
pub slow_query_count: usize,
}
impl Default for PerformanceSummary {
fn default() -> Self {
Self {
total_queries: 0,
avg_duration_ms: 0.0,
median_duration_ms: 0.0,
p95_duration_ms: 0.0,
p99_duration_ms: 0.0,
cache_hit_rate: 0.0,
slow_query_count: 0,
}
}
}
pub struct QueryPerformanceAnalyzer {
config: AnalyzerConfig,
history: VecDeque<ExecutionProfile>,
queries_analyzed: Counter,
bottlenecks_detected: Counter,
analysis_duration: Timer,
rng: Random,
}
impl QueryPerformanceAnalyzer {
pub fn new(config: AnalyzerConfig) -> Self {
Self {
config,
history: VecDeque::new(),
queries_analyzed: Counter::new("queries_analyzed".to_string()),
bottlenecks_detected: Counter::new("bottlenecks_detected".to_string()),
analysis_duration: Timer::new("analysis_duration".to_string()),
rng: Random::default(),
}
}
pub fn record_execution(&mut self, profile: ExecutionProfile) -> Result<()> {
if self.rng.random_f64() > self.config.sampling_rate {
return Ok(());
}
while self.history.len() >= self.config.max_records {
self.history.pop_front();
}
self.history.push_back(profile);
self.queries_analyzed.inc();
Ok(())
}
pub fn analyze_performance(&self) -> Result<PerformanceInsights> {
let start = Instant::now();
if self.history.is_empty() {
return Ok(PerformanceInsights {
score: 0.0,
bottlenecks: Vec::new(),
recommendations: vec!["No queries analyzed yet".to_string()],
predicted_duration_ms: None,
summary: PerformanceSummary::default(),
});
}
let summary = self.calculate_summary()?;
let bottlenecks = self.detect_bottlenecks()?;
let recommendations = self.generate_recommendations(&bottlenecks)?;
let score = self.calculate_performance_score(&summary, &bottlenecks)?;
let predicted_duration_ms = if self.config.enable_ml_predictions {
Some(self.predict_duration()?)
} else {
None
};
let insights = PerformanceInsights {
score,
bottlenecks,
recommendations,
predicted_duration_ms,
summary,
};
Ok(insights)
}
fn calculate_summary(&self) -> Result<PerformanceSummary> {
let mut durations: Vec<f64> = self.history
.iter()
.map(|p| p.duration_ms())
.collect();
durations.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let avg = durations.iter().sum::<f64>() / durations.len() as f64;
let median = if !durations.is_empty() {
durations[durations.len() / 2]
} else {
0.0
};
let p95_idx = (durations.len() as f64 * 0.95) as usize;
let p99_idx = (durations.len() as f64 * 0.99) as usize;
let p95 = durations.get(p95_idx).copied().unwrap_or(0.0);
let p99 = durations.get(p99_idx).copied().unwrap_or(0.0);
let cached = self.history.iter().filter(|p| p.was_cached).count();
let cache_hit_rate = cached as f64 / self.history.len() as f64;
let slow_count = self.history
.iter()
.filter(|p| p.duration_ms() > self.config.slow_query_threshold_ms as f64)
.count();
Ok(PerformanceSummary {
total_queries: self.history.len(),
avg_duration_ms: avg,
median_duration_ms: median,
p95_duration_ms: p95,
p99_duration_ms: p99,
cache_hit_rate,
slow_query_count: slow_count,
})
}
fn detect_bottlenecks(&self) -> Result<Vec<PerformanceBottleneck>> {
let mut bottlenecks = Vec::new();
let profiles_with_phases: Vec<_> = self.history
.iter()
.filter_map(|p| p.phases.as_ref().map(|ph| (p, ph)))
.collect();
if !profiles_with_phases.is_empty() {
let avg_parse = profiles_with_phases.iter()
.map(|(_, ph)| ph.parse_ms)
.sum::<f64>() / profiles_with_phases.len() as f64;
if avg_parse > 100.0 {
bottlenecks.push(PerformanceBottleneck {
bottleneck_type: BottleneckType::Parsing,
severity: (avg_parse / 1000.0).min(1.0),
description: format!("Average parsing time is {:.0}ms", avg_parse),
recommendation: "Consider caching parsed queries or simplifying query syntax".to_string(),
estimated_improvement: 20.0,
});
}
let avg_optimize = profiles_with_phases.iter()
.map(|(_, ph)| ph.optimize_ms)
.sum::<f64>() / profiles_with_phases.len() as f64;
if avg_optimize > 500.0 {
bottlenecks.push(PerformanceBottleneck {
bottleneck_type: BottleneckType::Optimization,
severity: (avg_optimize / 2000.0).min(1.0),
description: format!("Average optimization time is {:.0}ms", avg_optimize),
recommendation: "Enable query plan caching or simplify join patterns".to_string(),
estimated_improvement: 35.0,
});
}
let avg_execute = profiles_with_phases.iter()
.map(|(_, ph)| ph.execute_ms)
.sum::<f64>() / profiles_with_phases.len() as f64;
if avg_execute > 1000.0 {
bottlenecks.push(PerformanceBottleneck {
bottleneck_type: BottleneckType::Execution,
severity: (avg_execute / 5000.0).min(1.0),
description: format!("Average execution time is {:.0}ms", avg_execute),
recommendation: "Add indexes, use filters early, or limit result sets".to_string(),
estimated_improvement: 50.0,
});
}
}
let profiles_with_resources: Vec<_> = self.history
.iter()
.filter_map(|p| p.resources.as_ref().map(|r| (p, r)))
.collect();
if !profiles_with_resources.is_empty() {
let avg_memory = profiles_with_resources.iter()
.map(|(_, r)| r.peak_memory_mb)
.sum::<f64>() / profiles_with_resources.len() as f64;
if avg_memory > 1024.0 {
bottlenecks.push(PerformanceBottleneck {
bottleneck_type: BottleneckType::Memory,
severity: (avg_memory / 4096.0).min(1.0),
description: format!("High memory usage: {:.0}MB average", avg_memory),
recommendation: "Enable streaming results or reduce intermediate result sizes".to_string(),
estimated_improvement: 30.0,
});
}
}
let slow_queries = self.history.iter()
.filter(|p| p.duration_ms() > self.config.slow_query_threshold_ms as f64)
.count();
if slow_queries as f64 / self.history.len() as f64 > 0.1 {
let severity = (slow_queries as f64 / self.history.len() as f64).min(1.0);
bottlenecks.push(PerformanceBottleneck {
bottleneck_type: BottleneckType::CartesianProduct,
severity,
description: format!("{}% of queries are slow (>{} ms)",
(severity * 100.0) as u32,
self.config.slow_query_threshold_ms),
recommendation: "Review query patterns for cartesian products or missing filters".to_string(),
estimated_improvement: 60.0,
});
}
if !bottlenecks.is_empty() {
for _ in 0..bottlenecks.len() {
self.bottlenecks_detected.inc();
}
}
Ok(bottlenecks)
}
fn generate_recommendations(&self, bottlenecks: &[PerformanceBottleneck]) -> Result<Vec<String>> {
let mut recommendations = Vec::new();
for bottleneck in bottlenecks {
if bottleneck.severity > 0.5 {
recommendations.push(bottleneck.recommendation.clone());
}
}
let summary = self.calculate_summary()?;
if summary.cache_hit_rate < 0.3 {
recommendations.push("Low cache hit rate - consider enabling result caching".to_string());
}
if summary.p99_duration_ms > summary.avg_duration_ms * 5.0 {
recommendations.push("High variance in query performance - analyze outliers".to_string());
}
if recommendations.is_empty() {
recommendations.push("Performance is within acceptable parameters".to_string());
}
Ok(recommendations)
}
fn calculate_performance_score(
&self,
summary: &PerformanceSummary,
bottlenecks: &[PerformanceBottleneck],
) -> Result<f64> {
let mut score = 100.0;
if summary.avg_duration_ms > self.config.slow_query_threshold_ms as f64 {
let penalty = ((summary.avg_duration_ms / self.config.slow_query_threshold_ms as f64) - 1.0) * 30.0;
score -= penalty.min(40.0);
}
for bottleneck in bottlenecks {
score -= bottleneck.severity * 15.0;
}
score += summary.cache_hit_rate * 10.0;
Ok(score.max(0.0).min(100.0))
}
fn predict_duration(&self) -> Result<f64> {
if self.history.len() < 10 {
return Ok(0.0);
}
let alpha = 0.3;
let mut ema = self.history[0].duration_ms();
for profile in self.history.iter().skip(1) {
ema = alpha * profile.duration_ms() + (1.0 - alpha) * ema;
}
Ok(ema)
}
pub fn history(&self) -> &VecDeque<ExecutionProfile> {
&self.history
}
pub fn clear_history(&mut self) {
self.history.clear();
}
pub fn queries_analyzed_count(&self) -> u64 {
self.queries_analyzed.get()
}
pub fn bottlenecks_detected_count(&self) -> u64 {
self.bottlenecks_detected.get()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_analyzer_creation() {
let analyzer = QueryPerformanceAnalyzer::new(AnalyzerConfig::default());
assert_eq!(analyzer.history().len(), 0);
assert_eq!(analyzer.queries_analyzed_count(), 0);
}
#[test]
fn test_record_execution() {
let mut analyzer = QueryPerformanceAnalyzer::new(AnalyzerConfig::default());
let profile = ExecutionProfile::new("SELECT * WHERE { ?s ?p ?o }")
.with_duration_ms(100)
.with_result_count(50);
analyzer.record_execution(profile).unwrap();
assert_eq!(analyzer.history().len(), 1);
}
#[test]
fn test_execution_profile_builder() {
let profile = ExecutionProfile::new("SELECT ?s WHERE { ?s ?p ?o }")
.with_duration_ms(250)
.with_result_count(100)
.cached()
.with_complexity(0.5);
assert_eq!(profile.duration_ms(), 250.0);
assert_eq!(profile.result_count, 100);
assert!(profile.was_cached);
assert_eq!(profile.complexity_score, 0.5);
}
#[test]
fn test_performance_analysis_empty() {
let analyzer = QueryPerformanceAnalyzer::new(AnalyzerConfig::default());
let insights = analyzer.analyze_performance().unwrap();
assert_eq!(insights.score, 0.0);
assert_eq!(insights.summary.total_queries, 0);
}
#[test]
fn test_performance_analysis_with_data() {
let mut analyzer = QueryPerformanceAnalyzer::new(AnalyzerConfig::default());
for i in 1..=10 {
let profile = ExecutionProfile::new(format!("SELECT {} WHERE {{ ?s ?p ?o }}", i))
.with_duration_ms(i * 100)
.with_result_count(i * 10);
analyzer.record_execution(profile).unwrap();
}
let insights = analyzer.analyze_performance().unwrap();
assert_eq!(insights.summary.total_queries, 10);
assert!(insights.score > 0.0);
assert!(insights.summary.avg_duration_ms > 0.0);
}
#[test]
fn test_statistical_summary() {
let mut analyzer = QueryPerformanceAnalyzer::new(AnalyzerConfig::default());
let durations = vec![100, 200, 300, 400, 500];
for duration in durations {
let profile = ExecutionProfile::new("SELECT * WHERE { ?s ?p ?o }")
.with_duration_ms(duration);
analyzer.record_execution(profile).unwrap();
}
let summary = analyzer.calculate_summary().unwrap();
assert_eq!(summary.total_queries, 5);
assert_eq!(summary.avg_duration_ms, 300.0);
assert_eq!(summary.median_duration_ms, 300.0);
}
#[test]
fn test_bottleneck_detection() {
let mut analyzer = QueryPerformanceAnalyzer::new(
AnalyzerConfig::default().with_slow_query_threshold_ms(100)
);
for _ in 0..5 {
let phases = ExecutionPhases {
parse_ms: 10.0,
optimize_ms: 20.0,
execute_ms: 2000.0, serialize_ms: 10.0,
};
let profile = ExecutionProfile::new("SELECT * WHERE { ?s ?p ?o }")
.with_duration_ms(2040)
.with_phases(phases);
analyzer.record_execution(profile).unwrap();
}
let bottlenecks = analyzer.detect_bottlenecks().unwrap();
assert!(!bottlenecks.is_empty());
assert!(bottlenecks.iter().any(|b| b.bottleneck_type == BottleneckType::Execution));
}
#[test]
fn test_max_records_limit() {
let config = AnalyzerConfig {
max_records: 5,
..Default::default()
};
let mut analyzer = QueryPerformanceAnalyzer::new(config);
for i in 0..10 {
let profile = ExecutionProfile::new(format!("Query {}", i))
.with_duration_ms(100);
analyzer.record_execution(profile).unwrap();
}
assert_eq!(analyzer.history().len(), 5);
}
#[test]
fn test_config_presets() {
let lightweight = AnalyzerConfig::lightweight();
assert_eq!(lightweight.max_records, 1000);
assert!(!lightweight.enable_ml_predictions);
let comprehensive = AnalyzerConfig::comprehensive();
assert_eq!(comprehensive.max_records, 50000);
assert!(comprehensive.enable_ml_predictions);
}
#[test]
fn test_cache_hit_rate_calculation() {
let mut analyzer = QueryPerformanceAnalyzer::new(AnalyzerConfig::default());
for i in 0..10 {
let mut profile = ExecutionProfile::new(format!("Query {}", i))
.with_duration_ms(100);
if i < 3 {
profile = profile.cached();
}
analyzer.record_execution(profile).unwrap();
}
let summary = analyzer.calculate_summary().unwrap();
assert_eq!(summary.cache_hit_rate, 0.3);
}
#[test]
fn test_execution_phases() {
let phases = ExecutionPhases {
parse_ms: 10.0,
optimize_ms: 50.0,
execute_ms: 200.0,
serialize_ms: 5.0,
};
assert_eq!(phases.total_ms(), 265.0);
assert_eq!(phases.slowest_phase(), "execute");
}
#[test]
fn test_performance_score_calculation() {
let analyzer = QueryPerformanceAnalyzer::new(AnalyzerConfig::default());
let summary = PerformanceSummary {
total_queries: 10,
avg_duration_ms: 500.0,
median_duration_ms: 400.0,
p95_duration_ms: 900.0,
p99_duration_ms: 1000.0,
cache_hit_rate: 0.5,
slow_query_count: 2,
};
let score = analyzer.calculate_performance_score(&summary, &[]).unwrap();
assert!(score > 0.0 && score <= 100.0);
}
}