use anyhow::Result;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
use tracing::{debug, info};
use crate::{
nl2sparql::{OptimizationHint, OptimizationHintType, SPARQLGenerationResult},
rag::QueryIntent,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizerConfig {
pub enable_query_rewriting: bool,
pub enable_performance_analysis: bool,
pub enable_cost_estimation: bool,
pub enable_semantic_optimization: bool,
pub enable_index_suggestions: bool,
pub optimization_level: OptimizationLevel,
pub max_optimization_time: Duration,
pub performance_threshold: Duration, pub cache_optimized_queries: bool,
}
impl Default for OptimizerConfig {
fn default() -> Self {
Self {
enable_query_rewriting: true,
enable_performance_analysis: true,
enable_cost_estimation: true,
enable_semantic_optimization: true,
enable_index_suggestions: true,
optimization_level: OptimizationLevel::Balanced,
max_optimization_time: Duration::from_millis(500),
performance_threshold: Duration::from_millis(1000),
cache_optimized_queries: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OptimizationLevel {
Conservative, Balanced, Aggressive, Custom(Vec<OptimizationStrategy>), }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OptimizationStrategy {
TripleReordering,
FilterPushdown,
JoinOptimization,
SubqueryElimination,
UnionSimplification,
RedundancyRemoval,
IndexHints,
PropertyPathOptimization,
AggregationOptimization,
LimitPushdown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryAnalysis {
pub original_query: String,
pub estimated_cost: QueryCost,
pub complexity_score: f32,
pub potential_issues: Vec<QueryIssue>,
pub optimization_opportunities: Vec<OptimizationOpportunity>,
pub performance_prediction: PerformancePrediction,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryCost {
pub estimated_execution_time: Duration,
pub estimated_memory_usage: usize, pub estimated_network_io: usize, pub complexity_factor: f32, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryIssue {
pub issue_type: QueryIssueType,
pub description: String,
pub severity: IssueSeverity,
pub location: Option<String>, pub suggested_fix: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum QueryIssueType {
CartesianProduct,
UnboundVariable,
MissingIndex,
IneffientJoin,
RedundantPattern,
ComplexRegex,
UnoptimizedUnion,
LargeResultSet,
DeepNesting,
FunctionInFilter,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IssueSeverity {
Info,
Warning,
Error,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationOpportunity {
pub strategy: OptimizationStrategy,
pub description: String,
pub estimated_improvement: f32, pub confidence: f32, pub applicable: bool, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformancePrediction {
pub estimated_execution_time: Duration,
pub confidence_interval: (Duration, Duration), pub bottlenecks: Vec<String>,
pub scalability_concerns: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationResult {
pub original_query: String,
pub optimized_query: String,
pub applied_optimizations: Vec<AppliedOptimization>,
pub performance_improvement: f32, pub optimization_time: Duration,
pub warnings: Vec<String>,
pub analysis: QueryAnalysis,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppliedOptimization {
pub strategy: OptimizationStrategy,
pub description: String,
pub query_before: String,
pub query_after: String,
pub expected_improvement: f32,
}
pub struct AdvancedSPARQLOptimizer {
config: OptimizerConfig,
query_analyzer: QueryAnalyzer,
rewriter: QueryRewriter,
#[allow(dead_code)]
cost_estimator: CostEstimator,
optimization_cache: HashMap<String, OptimizationResult>,
}
impl AdvancedSPARQLOptimizer {
pub fn new(config: OptimizerConfig) -> Self {
Self {
query_analyzer: QueryAnalyzer::new(&config),
rewriter: QueryRewriter::new(&config),
cost_estimator: CostEstimator::new(&config),
optimization_cache: HashMap::new(),
config,
}
}
pub async fn optimize_query(
&mut self,
query: &str,
intent: &QueryIntent,
_generation_result: Option<&SPARQLGenerationResult>,
) -> Result<OptimizationResult> {
let start_time = SystemTime::now();
let cache_key = self.generate_cache_key(query, intent);
if self.config.cache_optimized_queries {
if let Some(cached_result) = self.optimization_cache.get(&cache_key) {
debug!("Using cached optimization for query");
return Ok(cached_result.clone());
}
}
info!(
"Starting advanced optimization for query: {}",
query.chars().take(100).collect::<String>()
);
let analysis = self.query_analyzer.analyze(query, intent).await?;
debug!(
"Query analysis complete: complexity={:.2}, issues={}",
analysis.complexity_score,
analysis.potential_issues.len()
);
let optimized_query = self
.rewriter
.rewrite_query(query, &analysis, intent)
.await?;
let original_cost = &analysis.estimated_cost;
let optimized_analysis = self
.query_analyzer
.analyze(&optimized_query, intent)
.await?;
let optimized_cost = &optimized_analysis.estimated_cost;
let performance_improvement = self.calculate_improvement(original_cost, optimized_cost);
let applied_optimizations = self.rewriter.get_applied_optimizations();
let optimization_time = start_time.elapsed().unwrap_or(Duration::ZERO);
let result = OptimizationResult {
original_query: query.to_string(),
optimized_query: optimized_query.clone(),
applied_optimizations,
performance_improvement,
optimization_time,
warnings: self.collect_warnings(&analysis, &optimized_analysis),
analysis,
};
if self.config.cache_optimized_queries {
self.optimization_cache.insert(cache_key, result.clone());
}
info!(
"Optimization complete: {:.1}% improvement expected in {:.1}ms",
performance_improvement,
optimization_time.as_millis()
);
Ok(result)
}
pub async fn generate_optimization_hints(
&self,
generation_result: &SPARQLGenerationResult,
) -> Result<Vec<OptimizationHint>> {
let analysis = self
.query_analyzer
.analyze(&generation_result.query, &QueryIntent::Explanation)
.await?;
let mut hints = Vec::new();
for issue in &analysis.potential_issues {
let hint_type = match issue.issue_type {
QueryIssueType::CartesianProduct => OptimizationHintType::UseFilter,
QueryIssueType::MissingIndex => OptimizationHintType::AddIndex,
QueryIssueType::IneffientJoin => OptimizationHintType::ReorderTriples,
QueryIssueType::RedundantPattern => OptimizationHintType::SimplifyExpression,
_ => OptimizationHintType::SimplifyExpression,
};
hints.push(OptimizationHint {
hint_type,
description: issue.description.clone(),
estimated_improvement: Some(20.0), });
}
for opportunity in &analysis.optimization_opportunities {
if opportunity.applicable && opportunity.estimated_improvement > 10.0 {
hints.push(OptimizationHint {
hint_type: OptimizationHintType::SimplifyExpression,
description: opportunity.description.clone(),
estimated_improvement: Some(opportunity.estimated_improvement),
});
}
}
Ok(hints)
}
pub async fn analyze_performance(
&self,
query: &str,
intent: &QueryIntent,
) -> Result<QueryAnalysis> {
self.query_analyzer.analyze(query, intent).await
}
pub fn get_optimization_stats(&self) -> OptimizationStats {
let total_optimizations = self.optimization_cache.len();
let avg_improvement = if total_optimizations > 0 {
self.optimization_cache
.values()
.map(|r| r.performance_improvement)
.sum::<f32>()
/ total_optimizations as f32
} else {
0.0
};
let cache_hit_rate = 0.0;
OptimizationStats {
total_optimizations,
average_improvement: avg_improvement,
cache_hit_rate,
total_optimization_time: self
.optimization_cache
.values()
.map(|r| r.optimization_time)
.sum(),
}
}
fn generate_cache_key(&self, query: &str, intent: &QueryIntent) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
query.hash(&mut hasher);
intent.hash(&mut hasher);
format!("opt_{:x}", hasher.finish())
}
fn calculate_improvement(&self, original: &QueryCost, optimized: &QueryCost) -> f32 {
let time_improvement =
if original.estimated_execution_time > optimized.estimated_execution_time {
let saved = original.estimated_execution_time - optimized.estimated_execution_time;
(saved.as_millis() as f32 / original.estimated_execution_time.as_millis() as f32)
* 100.0
} else {
0.0
};
let memory_improvement =
if original.estimated_memory_usage > optimized.estimated_memory_usage {
let saved = original.estimated_memory_usage - optimized.estimated_memory_usage;
(saved as f32 / original.estimated_memory_usage as f32) * 100.0
} else {
0.0
};
let complexity_improvement = if original.complexity_factor > optimized.complexity_factor {
let saved = original.complexity_factor - optimized.complexity_factor;
(saved / original.complexity_factor) * 100.0
} else {
0.0
};
(time_improvement * 0.5 + memory_improvement * 0.3 + complexity_improvement * 0.2).max(0.0)
}
fn collect_warnings(&self, original: &QueryAnalysis, optimized: &QueryAnalysis) -> Vec<String> {
let mut warnings = Vec::new();
for issue in &optimized.potential_issues {
if !original
.potential_issues
.iter()
.any(|orig| orig.issue_type == issue.issue_type)
{
warnings.push(format!(
"Optimization introduced new issue: {}",
issue.description
));
}
}
if optimized.complexity_score > original.complexity_score * 1.1 {
warnings.push("Optimization may have increased query complexity".to_string());
}
warnings
}
}
struct QueryAnalyzer {
_config: OptimizerConfig,
complexity_patterns: Vec<ComplexityPattern>,
}
impl QueryAnalyzer {
fn new(config: &OptimizerConfig) -> Self {
let complexity_patterns = vec![
ComplexityPattern {
pattern: Regex::new(r"(?i)\bUNION\b").expect("regex pattern should be valid"),
complexity_weight: 2.0,
_description: "UNION operations increase complexity".to_string(),
},
ComplexityPattern {
pattern: Regex::new(r"(?i)\bOPTIONAL\b").expect("regex pattern should be valid"),
complexity_weight: 1.5,
_description: "OPTIONAL patterns add complexity".to_string(),
},
ComplexityPattern {
pattern: Regex::new(r"(?i)\bFILTER\s+regex\(")
.expect("regex pattern should be valid"),
complexity_weight: 3.0,
_description: "REGEX filters are expensive".to_string(),
},
ComplexityPattern {
pattern: Regex::new(r"\{\s*SELECT").expect("regex pattern should be valid"),
complexity_weight: 2.5,
_description: "Subqueries increase complexity".to_string(),
},
];
Self {
_config: config.clone(),
complexity_patterns,
}
}
async fn analyze(&self, query: &str, intent: &QueryIntent) -> Result<QueryAnalysis> {
let complexity_score = self.calculate_complexity(query);
let estimated_cost = self.estimate_cost(query, complexity_score);
let potential_issues = self.identify_issues(query);
let optimization_opportunities = self.find_optimization_opportunities(query, intent);
let performance_prediction = self.predict_performance(query, &estimated_cost);
Ok(QueryAnalysis {
original_query: query.to_string(),
estimated_cost,
complexity_score,
potential_issues,
optimization_opportunities,
performance_prediction,
})
}
fn calculate_complexity(&self, query: &str) -> f32 {
let mut complexity = 1.0;
complexity += (query.len() as f32 / 1000.0).min(5.0);
for pattern in &self.complexity_patterns {
let matches = pattern.pattern.find_iter(query).count() as f32;
complexity += matches * pattern.complexity_weight;
}
let triple_patterns = query.matches(" . ").count() as f32;
complexity += triple_patterns * 0.1;
let variable_count = Regex::new(r"\?[a-zA-Z][a-zA-Z0-9_]*")
.expect("regex pattern should be valid")
.find_iter(query)
.count() as f32;
complexity += variable_count * 0.05;
complexity.max(1.0)
}
fn estimate_cost(&self, query: &str, complexity: f32) -> QueryCost {
let base_time = Duration::from_millis(100);
let estimated_execution_time =
Duration::from_millis((base_time.as_millis() as f32 * complexity) as u64);
let base_memory = 1024 * 1024; let estimated_memory_usage = (base_memory as f32 * complexity.sqrt()) as usize;
let triple_patterns = query.matches(" . ").count();
let estimated_network_io = (triple_patterns * 100).max(100);
QueryCost {
estimated_execution_time,
estimated_memory_usage,
estimated_network_io,
complexity_factor: complexity,
}
}
fn identify_issues(&self, query: &str) -> Vec<QueryIssue> {
let mut issues = Vec::new();
if !query.to_uppercase().contains("FILTER") && query.matches(" . ").count() > 3 {
issues.push(QueryIssue {
issue_type: QueryIssueType::CartesianProduct,
description: "Query may produce Cartesian product without proper filters"
.to_string(),
severity: IssueSeverity::Warning,
location: None,
suggested_fix: Some("Add FILTER clauses to constrain results".to_string()),
});
}
if Regex::new(r"(?i)regex\(")
.expect("regex pattern should be valid")
.is_match(query)
{
issues.push(QueryIssue {
issue_type: QueryIssueType::ComplexRegex,
description: "REGEX operations can be expensive".to_string(),
severity: IssueSeverity::Info,
location: None,
suggested_fix: Some(
"Consider using CONTAINS or STARTS_WITH if possible".to_string(),
),
});
}
if !query.to_uppercase().contains("LIMIT") && query.to_uppercase().contains("SELECT") {
issues.push(QueryIssue {
issue_type: QueryIssueType::LargeResultSet,
description: "Query without LIMIT may return large result sets".to_string(),
severity: IssueSeverity::Info,
location: None,
suggested_fix: Some("Add LIMIT clause to control result size".to_string()),
});
}
let filter_regex = Regex::new(r"(?i)FILTER\s*\([^)]*\?([a-zA-Z][a-zA-Z0-9_]*)[^)]*\)")
.expect("regex pattern should be valid");
if let Some(captures) = filter_regex.captures(query) {
let var_name = &captures[1];
let var_pattern = format!(r"\?{var_name}\s+[^?]*\.");
if !Regex::new(&var_pattern)
.expect("regex pattern should be valid")
.is_match(query)
{
issues.push(QueryIssue {
issue_type: QueryIssueType::UnboundVariable,
description: format!(
"Variable ?{var_name} used in FILTER may not be properly bound"
),
severity: IssueSeverity::Warning,
location: Some(format!("FILTER clause with ?{var_name}")),
suggested_fix: Some(
"Ensure variable is bound in a triple pattern before use in FILTER"
.to_string(),
),
});
}
}
issues
}
fn find_optimization_opportunities(
&self,
query: &str,
intent: &QueryIntent,
) -> Vec<OptimizationOpportunity> {
let mut opportunities = Vec::new();
if query.matches(" . ").count() > 2 {
opportunities.push(OptimizationOpportunity {
strategy: OptimizationStrategy::TripleReordering,
description: "Reorder triple patterns for better join performance".to_string(),
estimated_improvement: 15.0,
confidence: 0.7,
applicable: true,
});
}
if query.to_uppercase().contains("FILTER") && query.to_uppercase().contains("OPTIONAL") {
let filter_pos = query.to_uppercase().find("FILTER").unwrap_or(0);
let optional_pos = query.to_uppercase().find("OPTIONAL").unwrap_or(0);
if filter_pos > optional_pos {
opportunities.push(OptimizationOpportunity {
strategy: OptimizationStrategy::FilterPushdown,
description: "Move FILTER clauses before OPTIONAL for better performance"
.to_string(),
estimated_improvement: 25.0,
confidence: 0.8,
applicable: true,
});
}
}
if matches!(intent, QueryIntent::Listing) && !query.to_uppercase().contains("LIMIT") {
opportunities.push(OptimizationOpportunity {
strategy: OptimizationStrategy::LimitPushdown,
description: "Add LIMIT clause for list queries to improve performance".to_string(),
estimated_improvement: 30.0,
confidence: 0.9,
applicable: true,
});
}
if query.matches("UNION").count() > 1 {
opportunities.push(OptimizationOpportunity {
strategy: OptimizationStrategy::UnionSimplification,
description: "Simplify multiple UNION clauses if possible".to_string(),
estimated_improvement: 20.0,
confidence: 0.6,
applicable: true,
});
}
opportunities
}
fn predict_performance(&self, _query: &str, cost: &QueryCost) -> PerformancePrediction {
let base_time = cost.estimated_execution_time;
let confidence_interval = (
Duration::from_millis((base_time.as_millis() as f32 * 0.5) as u64),
Duration::from_millis((base_time.as_millis() as f32 * 2.0) as u64),
);
let mut bottlenecks = Vec::new();
let mut scalability_concerns = Vec::new();
if cost.complexity_factor > 5.0 {
bottlenecks.push("High query complexity".to_string());
scalability_concerns.push("Performance may degrade with larger datasets".to_string());
}
if cost.estimated_memory_usage > 10 * 1024 * 1024 {
bottlenecks.push("High memory usage".to_string());
}
if cost.estimated_network_io > 10000 {
bottlenecks.push("Large number of triple accesses".to_string());
scalability_concerns.push("Network I/O may become bottleneck".to_string());
}
PerformancePrediction {
estimated_execution_time: base_time,
confidence_interval,
bottlenecks,
scalability_concerns,
}
}
}
struct QueryRewriter {
config: OptimizerConfig,
applied_optimizations: Vec<AppliedOptimization>,
}
impl QueryRewriter {
fn new(config: &OptimizerConfig) -> Self {
Self {
config: config.clone(),
applied_optimizations: Vec::new(),
}
}
async fn rewrite_query(
&mut self,
query: &str,
_analysis: &QueryAnalysis,
intent: &QueryIntent,
) -> Result<String> {
self.applied_optimizations.clear();
let mut optimized_query = query.to_string();
let strategies = match &self.config.optimization_level {
OptimizationLevel::Conservative => vec![
OptimizationStrategy::RedundancyRemoval,
OptimizationStrategy::LimitPushdown,
],
OptimizationLevel::Balanced => vec![
OptimizationStrategy::RedundancyRemoval,
OptimizationStrategy::FilterPushdown,
OptimizationStrategy::TripleReordering,
OptimizationStrategy::LimitPushdown,
],
OptimizationLevel::Aggressive => vec![
OptimizationStrategy::RedundancyRemoval,
OptimizationStrategy::FilterPushdown,
OptimizationStrategy::TripleReordering,
OptimizationStrategy::JoinOptimization,
OptimizationStrategy::UnionSimplification,
OptimizationStrategy::LimitPushdown,
OptimizationStrategy::SubqueryElimination,
],
OptimizationLevel::Custom(strategies) => strategies.clone(),
};
for strategy in strategies {
let before_optimization = optimized_query.clone();
optimized_query = match strategy {
OptimizationStrategy::RedundancyRemoval => {
self.remove_redundancy(&optimized_query)?
}
OptimizationStrategy::FilterPushdown => self.pushdown_filters(&optimized_query)?,
OptimizationStrategy::TripleReordering => self.reorder_triples(&optimized_query)?,
OptimizationStrategy::LimitPushdown => {
self.pushdown_limit(&optimized_query, intent)?
}
OptimizationStrategy::UnionSimplification => {
self.simplify_unions(&optimized_query)?
}
OptimizationStrategy::JoinOptimization => self.optimize_joins(&optimized_query)?,
OptimizationStrategy::SubqueryElimination => {
self.eliminate_subqueries(&optimized_query)?
}
_ => optimized_query, };
if optimized_query != before_optimization {
self.applied_optimizations.push(AppliedOptimization {
strategy: strategy.clone(),
description: format!("Applied {strategy:?} optimization"),
query_before: before_optimization,
query_after: optimized_query.clone(),
expected_improvement: 10.0, });
}
}
Ok(optimized_query)
}
fn get_applied_optimizations(&self) -> Vec<AppliedOptimization> {
self.applied_optimizations.clone()
}
fn remove_redundancy(&self, query: &str) -> Result<String> {
let redundant_distinct = Regex::new(r"(?i)SELECT\s+DISTINCT\s+DISTINCT")?;
let optimized = redundant_distinct.replace_all(query, "SELECT DISTINCT");
let redundant_where = Regex::new(r"(?i)WHERE\s+WHERE")?;
let optimized = redundant_where.replace_all(&optimized, "WHERE");
Ok(optimized.to_string())
}
fn pushdown_filters(&self, query: &str) -> Result<String> {
let filter_after_optional =
Regex::new(r"(?i)(OPTIONAL\s*\{[^}]*\})\s*(FILTER\s*\([^)]*\))")?;
let optimized = filter_after_optional.replace_all(query, "$2 $1");
Ok(optimized.to_string())
}
fn reorder_triples(&self, query: &str) -> Result<String> {
Ok(query.to_string())
}
fn pushdown_limit(&self, query: &str, intent: &QueryIntent) -> Result<String> {
if matches!(intent, QueryIntent::Listing) && !query.to_uppercase().contains("LIMIT") {
if query.to_uppercase().contains("ORDER BY") {
let with_limit =
Regex::new(r"(?i)(ORDER\s+BY\s+[^}]+)")?.replace(query, "$1 LIMIT 100");
Ok(with_limit.to_string())
} else {
Ok(format!("{} LIMIT 100", query.trim_end()))
}
} else {
Ok(query.to_string())
}
}
fn simplify_unions(&self, query: &str) -> Result<String> {
Ok(query.to_string())
}
fn optimize_joins(&self, query: &str) -> Result<String> {
Ok(query.to_string())
}
fn eliminate_subqueries(&self, query: &str) -> Result<String> {
Ok(query.to_string())
}
}
struct CostEstimator {
_config: OptimizerConfig,
}
impl CostEstimator {
fn new(config: &OptimizerConfig) -> Self {
Self {
_config: config.clone(),
}
}
}
struct ComplexityPattern {
pattern: Regex,
complexity_weight: f32,
_description: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationStats {
pub total_optimizations: usize,
pub average_improvement: f32,
pub cache_hit_rate: f32,
pub total_optimization_time: Duration,
}