use crate::{
config::PerformanceConfig,
error::{FusekiError, FusekiResult},
store::Store,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{RwLock, Semaphore};
use tracing::{debug, info, instrument};
#[derive(Clone)]
pub struct QueryOptimizer {
config: Arc<PerformanceConfig>,
statistics: Arc<RwLock<DatabaseStatistics>>,
plan_cache: Arc<RwLock<HashMap<String, OptimizedQueryPlan>>>,
cost_model: Arc<CostModel>,
execution_engine: Arc<ParallelExecutionEngine>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseStatistics {
pub total_triples: u64,
pub total_graphs: u32,
pub predicate_stats: HashMap<String, PredicateStatistics>,
pub graph_stats: HashMap<String, GraphStatistics>,
pub index_stats: HashMap<String, IndexStatistics>,
pub last_updated: std::time::SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PredicateStatistics {
pub frequency: u64,
pub selectivity: f64,
pub distinct_subjects: u64,
pub distinct_objects: u64,
pub avg_subject_fanout: f64,
pub avg_object_fanout: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphStatistics {
pub triple_count: u64,
pub predicate_count: u32,
pub subject_count: u64,
pub object_count: u64,
pub avg_outdegree: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexStatistics {
pub index_type: String,
pub size_bytes: u64,
pub access_cost: f64,
pub selectivity: f64,
pub last_access: std::time::SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizedQueryPlan {
pub plan_id: String,
pub original_query: String,
pub optimized_query: String,
pub execution_steps: Vec<ExecutionStep>,
pub estimated_cost: f64,
pub estimated_cardinality: u64,
pub optimization_hints: Vec<OptimizationHint>,
pub parallel_segments: Vec<ParallelSegment>,
pub created_at: std::time::SystemTime,
pub hit_count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionStep {
pub step_id: u32,
pub operation: String,
pub estimated_cost: f64,
pub estimated_rows: u64,
pub dependencies: Vec<u32>,
pub can_parallelize: bool,
pub index_hints: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationHint {
pub hint_type: String,
pub description: String,
pub confidence: f64,
pub estimated_improvement: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParallelSegment {
pub segment_id: u32,
pub operations: Vec<u32>,
pub estimated_parallelism: u32,
pub merge_strategy: String,
}
#[derive(Debug)]
pub struct CostModel {
pub triple_access_cost: f64,
pub index_access_cost: f64,
pub join_cost_factor: f64,
pub sort_cost_factor: f64,
pub network_cost_factor: f64,
pub memory_cost_factor: f64,
}
#[derive(Debug)]
pub struct ParallelExecutionEngine {
pub max_parallelism: usize,
pub work_queue: Arc<RwLock<Vec<WorkItem>>>,
pub execution_semaphore: Arc<Semaphore>,
pub completion_tracker: Arc<RwLock<HashMap<String, ExecutionStatus>>>,
}
#[derive(Debug, Clone)]
pub struct WorkItem {
pub item_id: String,
pub operation: String,
pub priority: u32,
pub estimated_cost: f64,
pub dependencies: Vec<String>,
pub created_at: Instant,
}
#[derive(Debug, Clone)]
pub struct ExecutionStatus {
pub status: String,
pub progress: f64,
pub start_time: Instant,
pub estimated_completion: Option<Instant>,
}
#[derive(Debug, Clone)]
pub struct QueryComplexityParams<'a> {
pub triple_patterns: &'a [TriplePattern],
pub join_count: u32,
pub filter_count: u32,
pub has_aggregation: bool,
pub has_subqueries: bool,
pub has_optional: bool,
pub has_union: bool,
}
#[derive(Debug)]
pub struct QueryAnalysis {
pub query_complexity: f64,
pub join_count: u32,
pub filter_count: u32,
pub triple_patterns: Vec<TriplePattern>,
pub has_aggregation: bool,
pub has_subqueries: bool,
pub has_optional: bool,
pub has_union: bool,
pub estimated_cardinality: u64,
}
#[derive(Debug, Clone)]
pub struct TriplePattern {
pub subject: String,
pub predicate: String,
pub object: String,
pub graph: Option<String>,
pub is_bound: (bool, bool, bool),
pub estimated_selectivity: f64,
}
impl QueryOptimizer {
pub fn new(config: PerformanceConfig) -> FusekiResult<Self> {
let statistics = Arc::new(RwLock::new(DatabaseStatistics::default()));
let plan_cache = Arc::new(RwLock::new(HashMap::new()));
let cost_model = Arc::new(CostModel {
triple_access_cost: 1.0,
index_access_cost: 0.1,
join_cost_factor: 2.0,
sort_cost_factor: 1.5,
network_cost_factor: 10.0,
memory_cost_factor: 0.5,
});
let execution_engine = Arc::new(ParallelExecutionEngine {
max_parallelism: config.query_optimization.thread_pool_size,
work_queue: Arc::new(RwLock::new(Vec::new())),
execution_semaphore: Arc::new(Semaphore::new(
config.query_optimization.thread_pool_size,
)),
completion_tracker: Arc::new(RwLock::new(HashMap::new())),
});
Ok(Self {
config: Arc::new(config),
statistics,
plan_cache,
cost_model,
execution_engine,
})
}
#[instrument(skip(self, store))]
pub async fn optimize_query(
&self,
query: &str,
store: &Store,
dataset: &str,
) -> FusekiResult<OptimizedQueryPlan> {
let plan_id = self.generate_plan_id(query);
if let Some(cached_plan) = self.get_cached_plan(&plan_id).await {
info!("Using cached optimization plan: {}", plan_id);
return Ok(cached_plan);
}
debug!("Starting query optimization for: {}", plan_id);
let analysis = self.analyze_query(query).await?;
self.update_statistics(store, dataset).await?;
let stats = self.statistics.read().await;
let candidate_plans = self
.generate_candidate_plans(query, &analysis, &stats)
.await?;
let best_plan = self.select_best_plan(candidate_plans, &stats).await?;
let optimized_plan = self.apply_index_optimizations(best_plan, &stats).await?;
let final_plan = self.generate_parallel_strategy(optimized_plan).await?;
self.cache_plan(plan_id.clone(), final_plan.clone()).await;
info!("Query optimization completed for: {}", plan_id);
Ok(final_plan)
}
#[instrument(skip(self))]
async fn analyze_query(&self, query: &str) -> FusekiResult<QueryAnalysis> {
debug!("Analyzing query structure");
let query_lower = query.to_lowercase();
let join_count = query_lower.matches("join").count() as u32;
let filter_count = query_lower.matches("filter").count() as u32;
let triple_patterns = self.extract_triple_patterns(query).await?;
let has_aggregation = query_lower.contains("group by")
|| query_lower.contains("count(")
|| query_lower.contains("sum(")
|| query_lower.contains("avg(");
let has_subqueries =
query_lower.contains("select") && query_lower.matches("select").count() > 1;
let has_optional = query_lower.contains("optional");
let has_union = query_lower.contains("union");
let params = QueryComplexityParams {
triple_patterns: &triple_patterns,
join_count,
filter_count,
has_aggregation,
has_subqueries,
has_optional,
has_union,
};
let complexity = self.calculate_query_complexity(¶ms);
let estimated_cardinality = self.estimate_query_cardinality(&triple_patterns).await;
Ok(QueryAnalysis {
query_complexity: complexity,
join_count,
filter_count,
triple_patterns,
has_aggregation,
has_subqueries,
has_optional,
has_union,
estimated_cardinality,
})
}
#[instrument(skip(self, stats))]
async fn generate_candidate_plans(
&self,
query: &str,
analysis: &QueryAnalysis,
stats: &DatabaseStatistics,
) -> FusekiResult<Vec<OptimizedQueryPlan>> {
debug!("Generating candidate plans");
let mut plans = Vec::new();
let plan1 = self
.create_left_to_right_plan(query, analysis, stats)
.await?;
plans.push(plan1);
if analysis.join_count <= 10 {
let plan2 = self
.create_optimal_join_plan(query, analysis, stats)
.await?;
plans.push(plan2);
}
let plan3 = self
.create_index_optimized_plan(query, analysis, stats)
.await?;
plans.push(plan3);
if analysis.query_complexity > 5.0 {
let plan4 = self
.create_parallel_first_plan(query, analysis, stats)
.await?;
plans.push(plan4);
}
debug!("Generated {} candidate plans", plans.len());
Ok(plans)
}
#[instrument(skip(self, plans, stats))]
async fn select_best_plan(
&self,
plans: Vec<OptimizedQueryPlan>,
stats: &DatabaseStatistics,
) -> FusekiResult<OptimizedQueryPlan> {
debug!("Selecting best plan from {} candidates", plans.len());
let mut best_plan = None;
let mut best_cost = f64::INFINITY;
for plan in plans {
let cost = self.calculate_plan_cost(&plan, stats).await;
debug!("Plan {} cost: {:.2}", plan.plan_id, cost);
if cost < best_cost {
best_cost = cost;
best_plan = Some(plan);
}
}
best_plan.ok_or_else(|| FusekiError::internal("No valid execution plan found"))
}
#[instrument(skip(self, plan, stats))]
async fn apply_index_optimizations(
&self,
mut plan: OptimizedQueryPlan,
stats: &DatabaseStatistics,
) -> FusekiResult<OptimizedQueryPlan> {
debug!("Applying index optimizations");
for step in &mut plan.execution_steps {
if step.operation.contains("triple_pattern") {
let best_index = self.find_best_index_for_step(step, stats).await;
if let Some(index) = best_index {
step.index_hints.push(index);
step.estimated_cost *= 0.1; }
}
}
let index_hint = OptimizationHint {
hint_type: "INDEX_OPTIMIZATION".to_string(),
description: "Applied index-aware optimizations".to_string(),
confidence: 0.9,
estimated_improvement: 0.8,
};
plan.optimization_hints.push(index_hint);
Ok(plan)
}
#[instrument(skip(self, plan))]
async fn generate_parallel_strategy(
&self,
mut plan: OptimizedQueryPlan,
) -> FusekiResult<OptimizedQueryPlan> {
debug!("Generating parallel execution strategy");
let parallelizable_ops: Vec<u32> = plan
.execution_steps
.iter()
.filter(|step| step.can_parallelize)
.map(|step| step.step_id)
.collect();
if parallelizable_ops.len() > 1 {
let chunk_size =
(parallelizable_ops.len() / self.execution_engine.max_parallelism).max(1);
for (segment_id, chunk) in parallelizable_ops.chunks(chunk_size).enumerate() {
let segment = ParallelSegment {
segment_id: segment_id as u32,
operations: chunk.to_vec(),
estimated_parallelism: chunk.len().min(self.execution_engine.max_parallelism)
as u32,
merge_strategy: "UNION_ALL".to_string(),
};
plan.parallel_segments.push(segment);
}
let parallel_hint = OptimizationHint {
hint_type: "PARALLELIZATION".to_string(),
description: format!("Created {} parallel segments", plan.parallel_segments.len()),
confidence: 0.8,
estimated_improvement: 0.6,
};
plan.optimization_hints.push(parallel_hint);
}
Ok(plan)
}
#[instrument(skip(self, _store))]
async fn update_statistics(&self, _store: &Store, dataset: &str) -> FusekiResult<()> {
debug!("Updating database statistics for dataset: {}", dataset);
let mut stats = self.statistics.write().await;
stats.total_triples = 1000000; stats.total_graphs = 10;
stats.last_updated = std::time::SystemTime::now();
stats.predicate_stats.insert(
"http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(),
PredicateStatistics {
frequency: 100000,
selectivity: 0.1,
distinct_subjects: 50000,
distinct_objects: 1000,
avg_subject_fanout: 2.0,
avg_object_fanout: 100.0,
},
);
Ok(())
}
fn calculate_query_complexity(&self, params: &QueryComplexityParams) -> f64 {
let mut complexity = params.triple_patterns.len() as f64;
complexity += params.join_count as f64 * 2.0;
complexity += params.filter_count as f64 * 0.5;
if params.has_aggregation {
complexity += 3.0;
}
if params.has_subqueries {
complexity += 5.0;
}
if params.has_optional {
complexity += 2.0;
}
if params.has_union {
complexity += 1.5;
}
complexity
}
async fn extract_triple_patterns(&self, _query: &str) -> FusekiResult<Vec<TriplePattern>> {
let patterns = vec![TriplePattern {
subject: "?s".to_string(),
predicate: "?p".to_string(),
object: "?o".to_string(),
graph: None,
is_bound: (false, false, false),
estimated_selectivity: 0.1,
}];
Ok(patterns)
}
async fn estimate_query_cardinality(&self, patterns: &[TriplePattern]) -> u64 {
let base_cardinality = 1000u64;
patterns
.iter()
.map(|p| (base_cardinality as f64 * p.estimated_selectivity) as u64)
.sum()
}
async fn create_left_to_right_plan(
&self,
query: &str,
analysis: &QueryAnalysis,
_stats: &DatabaseStatistics,
) -> FusekiResult<OptimizedQueryPlan> {
Ok(OptimizedQueryPlan {
plan_id: format!("left_to_right_{}", self.generate_plan_id(query)),
original_query: query.to_string(),
optimized_query: query.to_string(),
execution_steps: vec![],
estimated_cost: analysis.query_complexity * 10.0,
estimated_cardinality: analysis.estimated_cardinality,
optimization_hints: vec![],
parallel_segments: vec![],
created_at: std::time::SystemTime::now(),
hit_count: 0,
})
}
async fn create_optimal_join_plan(
&self,
query: &str,
analysis: &QueryAnalysis,
_stats: &DatabaseStatistics,
) -> FusekiResult<OptimizedQueryPlan> {
Ok(OptimizedQueryPlan {
plan_id: format!("optimal_join_{}", self.generate_plan_id(query)),
original_query: query.to_string(),
optimized_query: query.to_string(),
execution_steps: vec![],
estimated_cost: analysis.query_complexity * 5.0, estimated_cardinality: analysis.estimated_cardinality / 2,
optimization_hints: vec![],
parallel_segments: vec![],
created_at: std::time::SystemTime::now(),
hit_count: 0,
})
}
async fn create_index_optimized_plan(
&self,
query: &str,
analysis: &QueryAnalysis,
_stats: &DatabaseStatistics,
) -> FusekiResult<OptimizedQueryPlan> {
Ok(OptimizedQueryPlan {
plan_id: format!("index_optimized_{}", self.generate_plan_id(query)),
original_query: query.to_string(),
optimized_query: query.to_string(),
execution_steps: vec![],
estimated_cost: analysis.query_complexity * 2.0, estimated_cardinality: analysis.estimated_cardinality,
optimization_hints: vec![],
parallel_segments: vec![],
created_at: std::time::SystemTime::now(),
hit_count: 0,
})
}
async fn create_parallel_first_plan(
&self,
query: &str,
analysis: &QueryAnalysis,
_stats: &DatabaseStatistics,
) -> FusekiResult<OptimizedQueryPlan> {
Ok(OptimizedQueryPlan {
plan_id: format!("parallel_first_{}", self.generate_plan_id(query)),
original_query: query.to_string(),
optimized_query: query.to_string(),
execution_steps: vec![],
estimated_cost: analysis.query_complexity * 3.0,
estimated_cardinality: analysis.estimated_cardinality,
optimization_hints: vec![],
parallel_segments: vec![],
created_at: std::time::SystemTime::now(),
hit_count: 0,
})
}
async fn calculate_plan_cost(
&self,
plan: &OptimizedQueryPlan,
_stats: &DatabaseStatistics,
) -> f64 {
plan.estimated_cost
}
async fn find_best_index_for_step(
&self,
_step: &ExecutionStep,
_stats: &DatabaseStatistics,
) -> Option<String> {
Some("SPO_INDEX".to_string())
}
async fn get_cached_plan(&self, plan_id: &str) -> Option<OptimizedQueryPlan> {
let cache = self.plan_cache.read().await;
cache.get(plan_id).cloned()
}
async fn cache_plan(&self, plan_id: String, plan: OptimizedQueryPlan) {
let mut cache = self.plan_cache.write().await;
cache.insert(plan_id, plan);
}
fn generate_plan_id(&self, query: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
query.hash(&mut hasher);
format!("plan_{:x}", hasher.finish())
}
pub async fn get_optimization_stats(&self) -> HashMap<String, serde_json::Value> {
let mut stats = HashMap::new();
let plan_cache = self.plan_cache.read().await;
stats.insert(
"cached_plans".to_string(),
serde_json::json!(plan_cache.len()),
);
let db_stats = self.statistics.read().await;
stats.insert(
"total_triples".to_string(),
serde_json::json!(db_stats.total_triples),
);
stats.insert(
"indexed_predicates".to_string(),
serde_json::json!(db_stats.predicate_stats.len()),
);
stats
}
pub async fn get_cached_plans(&self) -> Vec<OptimizedQueryPlan> {
let plan_cache = self.plan_cache.read().await;
plan_cache.values().cloned().collect()
}
pub async fn clear_plan_cache(&self) -> usize {
let mut plan_cache = self.plan_cache.write().await;
let count = plan_cache.len();
plan_cache.clear();
info!("Cleared {} cached query plans", count);
count
}
pub async fn get_database_statistics(&self) -> DatabaseStatistics {
self.statistics.read().await.clone()
}
}
impl Default for DatabaseStatistics {
fn default() -> Self {
Self {
total_triples: 0,
total_graphs: 0,
predicate_stats: HashMap::new(),
graph_stats: HashMap::new(),
index_stats: HashMap::new(),
last_updated: std::time::SystemTime::now(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{CacheConfig, ConnectionPoolConfig, QueryOptimizationConfig};
fn create_test_optimizer() -> QueryOptimizer {
let config = PerformanceConfig {
caching: CacheConfig {
enabled: true,
max_size: 100,
ttl_secs: 300,
query_cache_enabled: true,
result_cache_enabled: true,
plan_cache_enabled: true,
},
query_optimization: QueryOptimizationConfig {
enabled: true,
max_query_time_secs: 300,
max_result_size: 1000000,
parallel_execution: true,
thread_pool_size: 4,
},
connection_pool: ConnectionPoolConfig {
min_connections: 1,
max_connections: 5,
connection_timeout_secs: 30,
idle_timeout_secs: 300,
max_lifetime_secs: 3600,
},
rate_limiting: None,
};
QueryOptimizer::new(config).unwrap()
}
#[tokio::test]
async fn test_query_analysis() {
let optimizer = create_test_optimizer();
let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o . ?s rdf:type ?type }";
let analysis = optimizer.analyze_query(query).await.unwrap();
assert!(analysis.query_complexity > 0.0);
assert!(!analysis.has_aggregation);
}
#[tokio::test]
async fn test_plan_caching() {
let optimizer = create_test_optimizer();
let query = "SELECT * WHERE { ?s ?p ?o }";
let plan_id = optimizer.generate_plan_id(query);
assert!(optimizer.get_cached_plan(&plan_id).await.is_none());
let plan = OptimizedQueryPlan {
plan_id: plan_id.clone(),
original_query: query.to_string(),
optimized_query: query.to_string(),
execution_steps: vec![],
estimated_cost: 10.0,
estimated_cardinality: 100,
optimization_hints: vec![],
parallel_segments: vec![],
created_at: std::time::SystemTime::now(),
hit_count: 0,
};
optimizer.cache_plan(plan_id.clone(), plan).await;
assert!(optimizer.get_cached_plan(&plan_id).await.is_some());
}
#[test]
fn test_complexity_calculation() {
let optimizer = create_test_optimizer();
let patterns = vec![];
let params = QueryComplexityParams {
triple_patterns: &patterns,
join_count: 2,
filter_count: 1,
has_aggregation: true,
has_subqueries: false,
has_optional: true,
has_union: false,
};
let complexity = optimizer.calculate_query_complexity(¶ms);
assert!(complexity > 0.0);
}
}