use crate::error::FusekiResult;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::debug;
#[derive(Debug, Clone)]
pub struct AdvancedSubqueryOptimizer {
pub rewrite_engine: SubqueryRewriteEngine,
pub cost_estimator: SubqueryCostEstimator,
pub materialization_manager: MaterializationManager,
pub strategy_selector: ExecutionStrategySelector,
pub statistics: Arc<RwLock<SubqueryStatistics>>,
}
#[derive(Debug, Clone)]
pub struct SubqueryRewriteEngine {
pub rules: Vec<SubqueryRewriteRule>,
pub rule_order: Vec<usize>,
pub pattern_matcher: PatternMatcher,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubqueryRewriteRule {
pub id: String,
pub name: String,
pub description: String,
pub pattern: QueryPattern,
pub rewrite: RewriteAction,
pub conditions: Vec<RuleCondition>,
pub priority: i32,
pub estimated_benefit: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum QueryPattern {
Exists { pattern: String },
NotExists { pattern: String },
ScalarSelect { projection: String },
FromSubquery { alias: String },
InSubquery { variable: String },
NotInSubquery { variable: String },
Correlated { outer_vars: Vec<String> },
Lateral { dependency: String },
CommonTableExpression { name: String },
Custom { regex: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RewriteAction {
ExistsToSemiJoin,
NotExistsToAntiJoin,
SubqueryPullUp,
FilterPushDown { filter: String },
Materialize { cache_key: String },
InToJoin,
Decorrelate,
Flatten,
Custom { transformation: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuleCondition {
pub condition_type: ConditionType,
pub parameters: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConditionType {
SubquerySize { max_triples: usize },
IsCorrelated { expected: bool },
SelectivityRange { min: f64, max: f64 },
JoinCardinality { threshold: usize },
IndexAvailable { predicate: String },
MemoryLimit { max_mb: usize },
}
#[derive(Debug, Clone)]
pub struct SubqueryCostEstimator {
pub operation_costs: OperationCosts,
pub cardinality_estimator: CardinalityEstimator,
pub selectivity_estimator: SelectivityEstimator,
pub historical_stats: Arc<RwLock<HistoricalStats>>,
}
#[derive(Debug, Clone)]
pub struct OperationCosts {
pub scan_cost_per_triple: f64,
pub join_cost_per_pair: f64,
pub filter_cost_per_binding: f64,
pub sort_cost_factor: f64,
pub materialization_cost: f64,
pub network_cost_per_kb: f64,
}
impl Default for OperationCosts {
fn default() -> Self {
Self {
scan_cost_per_triple: 1.0,
join_cost_per_pair: 10.0,
filter_cost_per_binding: 0.1,
sort_cost_factor: 1.5,
materialization_cost: 50.0,
network_cost_per_kb: 5.0,
}
}
}
#[derive(Debug, Clone)]
pub struct MaterializationManager {
pub materialized_views: Arc<RwLock<HashMap<String, MaterializedView>>>,
pub policies: MaterializationPolicies,
pub cache_stats: Arc<RwLock<CacheStatistics>>,
}
#[derive(Debug, Clone)]
pub struct MaterializedView {
pub query_hash: String,
pub result_data: Vec<HashMap<String, serde_json::Value>>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub last_accessed: chrono::DateTime<chrono::Utc>,
pub access_count: u64,
pub size_bytes: usize,
pub is_valid: bool,
}
#[derive(Debug, Clone)]
pub struct MaterializationPolicies {
pub max_cache_size_mb: usize,
pub ttl_seconds: u64,
pub min_access_count: u64,
pub cost_threshold: f64,
}
#[derive(Debug, Clone)]
pub struct ExecutionStrategySelector {
pub strategies: Vec<ExecutionStrategy>,
pub selection_algorithm: SelectionAlgorithm,
}
#[derive(Debug, Clone)]
pub enum ExecutionStrategy {
MaterializeOnce,
CorrelatedExecution,
JoinConversion,
SemiJoin,
AntiJoin,
RemoteExecution,
ParallelExecution { max_threads: usize },
StreamingExecution,
}
#[derive(Debug, Clone)]
pub enum SelectionAlgorithm {
CostBased,
RuleBased,
MLBased { model_path: String },
Hybrid,
}
#[derive(Debug, Clone)]
pub struct PatternMatcher {
compiled_patterns: HashMap<String, CompiledPattern>,
pattern_index: PatternIndex,
}
#[derive(Debug, Clone)]
pub struct CompiledPattern {
pub pattern_id: String,
pub pattern_type: PatternType,
pub matcher: Box<dyn PatternMatcherTrait + Send + Sync>,
}
#[derive(Debug, Clone)]
pub enum PatternType {
Literal,
Regex,
Structural,
Semantic,
}
pub trait PatternMatcherTrait: std::fmt::Debug + Send + Sync {
fn matches(&self, query: &str) -> bool;
fn extract_bindings(&self, query: &str) -> HashMap<String, String>;
fn clone_box(&self) -> Box<dyn PatternMatcherTrait + Send + Sync>;
}
impl Clone for Box<dyn PatternMatcherTrait + Send + Sync> {
fn clone(&self) -> Self {
self.clone_box()
}
}
#[derive(Debug, Clone)]
struct LiteralMatcher {
pattern: String,
}
impl PatternMatcherTrait for LiteralMatcher {
fn matches(&self, query: &str) -> bool {
query.contains(&self.pattern)
}
fn extract_bindings(&self, _query: &str) -> HashMap<String, String> {
HashMap::new()
}
fn clone_box(&self) -> Box<dyn PatternMatcherTrait + Send + Sync> {
Box::new(self.clone())
}
}
#[derive(Debug, Clone)]
pub struct PatternIndex {
by_type: HashMap<String, Vec<String>>,
by_keyword: HashMap<String, Vec<String>>,
}
#[derive(Debug, Clone)]
pub struct CardinalityEstimator {
predicate_stats: HashMap<String, PredicateStats>,
join_selectivities: HashMap<(String, String), f64>,
}
#[derive(Debug, Clone)]
pub struct PredicateStats {
pub distinct_subjects: u64,
pub distinct_objects: u64,
pub total_triples: u64,
pub selectivity: f64,
}
#[derive(Debug, Clone)]
pub struct SelectivityEstimator {
filter_selectivities: HashMap<String, f64>,
default_selectivity: f64,
}
#[derive(Debug, Clone)]
pub struct HistoricalStats {
execution_times: HashMap<String, Vec<f64>>,
result_sizes: HashMap<String, Vec<usize>>,
optimization_success: HashMap<String, f64>,
}
#[derive(Debug, Clone, Default)]
pub struct SubqueryStatistics {
pub total_subqueries_optimized: u64,
pub successful_rewrites: u64,
pub failed_rewrites: u64,
pub materialization_hits: u64,
pub materialization_misses: u64,
pub average_optimization_time_ms: f64,
pub optimization_benefit_ratio: f64,
}
#[derive(Debug, Clone, Default)]
pub struct CacheStatistics {
pub total_entries: usize,
pub total_size_bytes: usize,
pub hit_count: u64,
pub miss_count: u64,
pub eviction_count: u64,
pub average_entry_lifetime_seconds: f64,
}
impl Default for AdvancedSubqueryOptimizer {
fn default() -> Self {
Self::new()
}
}
impl AdvancedSubqueryOptimizer {
pub fn new() -> Self {
Self {
rewrite_engine: SubqueryRewriteEngine::new(),
cost_estimator: SubqueryCostEstimator::new(),
materialization_manager: MaterializationManager::new(),
strategy_selector: ExecutionStrategySelector::new(),
statistics: Arc::new(RwLock::new(SubqueryStatistics::default())),
}
}
pub async fn optimize(&self, query: &str) -> FusekiResult<OptimizedQuery> {
let start_time = std::time::Instant::now();
let subqueries = self.extract_subqueries(query)?;
debug!("Found {} subqueries to optimize", subqueries.len());
let mut optimized_query = query.to_string();
let mut rewrites_applied = Vec::new();
for subquery in &subqueries {
if let Some(rewrite) = self.rewrite_engine.find_applicable_rewrite(subquery)? {
optimized_query = self.apply_rewrite(&optimized_query, subquery, &rewrite)?;
rewrites_applied.push(rewrite);
}
}
let execution_plan = self
.create_execution_plan(&optimized_query, &subqueries)
.await?;
let optimization_time = start_time.elapsed().as_millis() as f64;
self.update_statistics(rewrites_applied.len(), optimization_time)
.await;
Ok(OptimizedQuery {
original_query: query.to_string(),
optimized_query,
execution_plan,
rewrites_applied,
estimated_cost_reduction: self.estimate_cost_reduction(&subqueries),
optimization_time_ms: optimization_time,
})
}
fn extract_subqueries(&self, query: &str) -> FusekiResult<Vec<SubqueryInfo>> {
let mut subqueries = Vec::new();
if let Some(exists_subqueries) = self.extract_exists_subqueries(query) {
subqueries.extend(exists_subqueries);
}
if let Some(scalar_subqueries) = self.extract_scalar_subqueries(query) {
subqueries.extend(scalar_subqueries);
}
if let Some(from_subqueries) = self.extract_from_subqueries(query) {
subqueries.extend(from_subqueries);
}
if let Some(in_subqueries) = self.extract_in_subqueries(query) {
subqueries.extend(in_subqueries);
}
Ok(subqueries)
}
fn extract_exists_subqueries(&self, query: &str) -> Option<Vec<SubqueryInfo>> {
let mut subqueries = Vec::new();
let query_upper = query.to_uppercase();
let exists_positions: Vec<_> = query_upper.match_indices("EXISTS").collect();
for (pos, _) in exists_positions {
let is_not_exists = pos >= 4 && query_upper[..pos].ends_with("NOT ");
let subquery_type = if is_not_exists {
SubqueryType::NotExists
} else {
SubqueryType::Exists
};
if let Some(brace_start) = query[pos..].find('{') {
let abs_brace_start = pos + brace_start;
if let Some(subquery_content) =
self.extract_balanced_braces(&query[abs_brace_start..])
{
subqueries.push(SubqueryInfo {
id: format!("subquery_{pos}"),
query_text: subquery_content.clone(),
subquery_type,
is_correlated: self.detect_correlation(&subquery_content, query),
outer_vars: self.extract_variables(&subquery_content),
estimated_size: 10, estimated_selectivity: 0.1, estimated_cost: 100.0, filter_count: subquery_content.matches("FILTER").count(),
join_count: subquery_content.matches(" . ").count(),
outer_cardinality: 1000, dependencies: vec![], });
}
}
}
if subqueries.is_empty() {
None
} else {
Some(subqueries)
}
}
fn extract_scalar_subqueries(&self, query: &str) -> Option<Vec<SubqueryInfo>> {
let mut subqueries = Vec::new();
let query_upper = query.to_uppercase();
let mut search_pos = 0;
while let Some(filter_pos) = query_upper[search_pos..].find("FILTER(") {
let abs_filter_pos = search_pos + filter_pos + 7;
if let Some(select_pos) = query_upper[abs_filter_pos..].find("SELECT") {
let abs_select_pos = abs_filter_pos + select_pos;
if let Some(subquery_content) =
self.extract_parentheses_content(&query[abs_select_pos..])
{
subqueries.push(SubqueryInfo {
id: format!("scalar_subquery_{}", subqueries.len()),
query_text: subquery_content.clone(),
subquery_type: SubqueryType::Scalar,
is_correlated: self.detect_correlation(&subquery_content, query),
outer_vars: self.extract_variables(&subquery_content),
estimated_size: 20, estimated_selectivity: 0.1, estimated_cost: 50.0, filter_count: subquery_content.matches("FILTER").count(),
join_count: subquery_content.matches(" . ").count(),
outer_cardinality: 1000, dependencies: vec![],
});
}
}
search_pos = abs_filter_pos;
}
if subqueries.is_empty() {
None
} else {
Some(subqueries)
}
}
fn extract_from_subqueries(&self, query: &str) -> Option<Vec<SubqueryInfo>> {
let mut subqueries = Vec::new();
let query_lower = query.to_lowercase();
if let Some(where_start) = query_lower.find("where") {
let where_clause = &query[where_start..];
let mut chars = where_clause.chars().peekable();
let mut pos = 0;
while let Some(ch) = chars.next() {
if ch == '{' {
let _remaining: String = chars.clone().collect();
if let Some(subquery_content) =
self.extract_balanced_braces(&where_clause[pos..])
{
let subquery_text = subquery_content.trim();
if subquery_text.to_lowercase().contains("select") {
subqueries.push(SubqueryInfo {
id: format!("subquery_{}", subqueries.len()),
query_text: subquery_text.to_string(),
subquery_type: SubqueryType::From,
is_correlated: self.detect_correlation(subquery_text, query),
outer_vars: self.extract_variables(subquery_text),
estimated_size: 100, estimated_selectivity: 0.1, estimated_cost: 1.0,
filter_count: 0,
join_count: 1,
outer_cardinality: 1000,
dependencies: Vec::new(),
});
}
}
}
pos += ch.len_utf8();
}
}
if subqueries.is_empty() {
None
} else {
Some(subqueries)
}
}
fn extract_in_subqueries(&self, query: &str) -> Option<Vec<SubqueryInfo>> {
let mut subqueries = Vec::new();
let query_upper = query.to_uppercase();
let mut search_pos = 0;
while let Some(in_pos) = query_upper[search_pos..].find(" IN ") {
let abs_in_pos = search_pos + in_pos;
if let Some(paren_start) = query[abs_in_pos + 4..].find('(') {
let abs_paren_start = abs_in_pos + 4 + paren_start;
if let Some(subquery_content) =
self.extract_parentheses_content(&query[abs_paren_start..])
{
if subquery_content.to_uppercase().contains("SELECT") {
subqueries.push(SubqueryInfo {
id: format!("in_subquery_{}", subqueries.len()),
query_text: subquery_content.clone(),
subquery_type: SubqueryType::In,
is_correlated: self.detect_correlation(&subquery_content, query),
outer_vars: self.extract_variables(&subquery_content),
estimated_size: 50, estimated_selectivity: 0.2, estimated_cost: 75.0, filter_count: subquery_content.matches("FILTER").count(),
join_count: subquery_content.matches(" . ").count(),
outer_cardinality: 1000, dependencies: vec![],
});
}
}
}
search_pos = abs_in_pos + 4;
}
if subqueries.is_empty() {
None
} else {
Some(subqueries)
}
}
fn extract_variables(&self, query: &str) -> Vec<String> {
let mut variables = Vec::new();
let query_chars: Vec<char> = query.chars().collect();
for i in 0..query_chars.len() {
if query_chars[i] == '?' && i + 1 < query_chars.len() {
let mut var_name = String::new();
let mut j = i + 1;
while j < query_chars.len()
&& (query_chars[j].is_alphanumeric() || query_chars[j] == '_')
{
var_name.push(query_chars[j]);
j += 1;
}
if !var_name.is_empty() && !variables.contains(&var_name) {
variables.push(var_name);
}
}
}
variables
}
fn extract_balanced_braces(&self, text: &str) -> Option<String> {
if !text.starts_with('{') {
return None;
}
let mut brace_count = 0;
let mut end_pos = 0;
let chars: Vec<char> = text.chars().collect();
for (i, &ch) in chars.iter().enumerate() {
match ch {
'{' => brace_count += 1,
'}' => {
brace_count -= 1;
if brace_count == 0 {
end_pos = i + 1;
break;
}
}
_ => {}
}
}
if end_pos > 0 {
Some(text[..end_pos].to_string())
} else {
None
}
}
fn extract_parentheses_content(&self, text: &str) -> Option<String> {
if !text.starts_with('(') {
return None;
}
let mut paren_count = 0;
let mut end_pos = 0;
let chars: Vec<char> = text.chars().collect();
for (i, &ch) in chars.iter().enumerate() {
match ch {
'(' => paren_count += 1,
')' => {
paren_count -= 1;
if paren_count == 0 {
end_pos = i;
break;
}
}
_ => {}
}
}
if end_pos > 0 {
Some(text[1..end_pos].to_string())
} else {
None
}
}
fn detect_correlation(&self, subquery: &str, outer_query: &str) -> bool {
let outer_query_without_subquery = outer_query.replace(subquery, "");
let outer_vars = self.extract_variables(&outer_query_without_subquery);
let subquery_projected_vars =
if let Some(select_pos) = subquery.to_lowercase().find("select") {
let select_part = &subquery[select_pos..];
if let Some(where_pos) = select_part.to_lowercase().find("where") {
let select_clause = &select_part[..where_pos];
self.extract_variables(select_clause)
} else {
self.extract_variables(select_part)
}
} else {
Vec::new()
};
let truly_outer_vars: Vec<_> = outer_vars
.iter()
.filter(|var| !subquery_projected_vars.contains(var))
.collect();
let subquery_lower = subquery.to_lowercase();
if let Some(where_pos) = subquery_lower.find("where") {
let subquery_where = &subquery[where_pos..];
truly_outer_vars.iter().any(|var| {
let var_pattern = format!("?{var}");
subquery_where.contains(&var_pattern)
})
} else {
false
}
}
fn apply_rewrite(
&self,
query: &str,
subquery: &SubqueryInfo,
rewrite: &SubqueryRewriteRule,
) -> FusekiResult<String> {
match &rewrite.rewrite {
RewriteAction::ExistsToSemiJoin => self.rewrite_exists_to_semi_join(query, subquery),
RewriteAction::NotExistsToAntiJoin => {
self.rewrite_not_exists_to_anti_join(query, subquery)
}
RewriteAction::SubqueryPullUp => self.rewrite_subquery_pullup(query, subquery),
RewriteAction::FilterPushDown { filter } => {
self.rewrite_filter_pushdown(query, subquery, filter)
}
RewriteAction::InToJoin => self.rewrite_in_to_join(query, subquery),
RewriteAction::Decorrelate => self.decorrelate_subquery(query, subquery),
_ => Ok(query.to_string()),
}
}
fn rewrite_exists_to_semi_join(
&self,
query: &str,
subquery: &SubqueryInfo,
) -> FusekiResult<String> {
Ok(query.replace(
&format!("EXISTS {{ {} }}", subquery.query_text),
&format!("SEMI_JOIN {{ {} }}", subquery.query_text),
))
}
fn rewrite_not_exists_to_anti_join(
&self,
query: &str,
subquery: &SubqueryInfo,
) -> FusekiResult<String> {
Ok(query.replace(
&format!("NOT EXISTS {{ {} }}", subquery.query_text),
&format!("ANTI_JOIN {{ {} }}", subquery.query_text),
))
}
fn rewrite_subquery_pullup(
&self,
query: &str,
subquery: &SubqueryInfo,
) -> FusekiResult<String> {
if subquery.is_simple_projection() {
Ok(query.replace(
&format!("{{ SELECT * WHERE {{ {} }} }}", subquery.query_text),
&subquery.query_text,
))
} else {
Ok(query.to_string())
}
}
fn rewrite_filter_pushdown(
&self,
query: &str,
_subquery: &SubqueryInfo,
_filter: &str,
) -> FusekiResult<String> {
Ok(query.to_string())
}
fn rewrite_in_to_join(&self, query: &str, subquery: &SubqueryInfo) -> FusekiResult<String> {
let in_pattern = format!(" IN ({})", subquery.query_text);
let join_pattern = format!(" . {{ {} }}", subquery.query_text);
Ok(query.replace(&in_pattern, &join_pattern))
}
fn decorrelate_subquery(&self, query: &str, subquery: &SubqueryInfo) -> FusekiResult<String> {
if subquery.is_correlated {
if subquery.subquery_type == SubqueryType::Exists {
let exists_pattern = format!("EXISTS {{ {} }}", subquery.query_text);
let join_pattern = format!("{{ {} }}", subquery.query_text);
return Ok(query.replace(&exists_pattern, &join_pattern));
}
if subquery.subquery_type == SubqueryType::Scalar {
let subquery_pattern = format!("({})", subquery.query_text);
let join_pattern = format!("{{ {} }}", subquery.query_text);
return Ok(query.replace(&subquery_pattern, &join_pattern));
}
}
Ok(query.to_string())
}
async fn create_execution_plan(
&self,
query: &str,
subqueries: &[SubqueryInfo],
) -> FusekiResult<ExecutionPlan> {
let mut plan_steps = Vec::new();
for subquery in subqueries {
let strategy = self.strategy_selector.select_strategy(subquery)?;
let estimated_cost = self
.cost_estimator
.estimate_cost(subquery, &strategy)
.await?;
plan_steps.push(ExecutionStep {
subquery_id: subquery.id.clone(),
strategy,
estimated_cost,
dependencies: subquery.dependencies.clone(),
parallelizable: subquery.is_independent(),
});
}
let total_cost = plan_steps.iter().map(|s| s.estimated_cost).sum();
let parallelization_opportunities = self.identify_parallelization(&plan_steps);
Ok(ExecutionPlan {
query: query.to_string(),
steps: plan_steps,
total_estimated_cost: total_cost,
parallelization_opportunities,
})
}
fn identify_parallelization(&self, _steps: &[ExecutionStep]) -> Vec<ParallelGroup> {
Vec::new()
}
fn estimate_cost_reduction(&self, _subqueries: &[SubqueryInfo]) -> f64 {
0.3 }
async fn update_statistics(&self, rewrites_count: usize, optimization_time: f64) {
let mut stats = self.statistics.write().await;
stats.total_subqueries_optimized += 1;
stats.successful_rewrites += rewrites_count as u64;
let total_time =
stats.average_optimization_time_ms * stats.total_subqueries_optimized as f64;
stats.average_optimization_time_ms =
(total_time + optimization_time) / (stats.total_subqueries_optimized as f64);
}
}
impl Default for SubqueryRewriteEngine {
fn default() -> Self {
Self::new()
}
}
impl SubqueryRewriteEngine {
pub fn new() -> Self {
let rules = Self::create_default_rules();
let rule_order = Self::sort_rules_by_priority(&rules);
Self {
pattern_matcher: PatternMatcher::new(),
rules,
rule_order,
}
}
fn create_default_rules() -> Vec<SubqueryRewriteRule> {
vec![
SubqueryRewriteRule {
id: "exists_to_semi_join".to_string(),
name: "EXISTS to Semi-Join".to_string(),
description: "Convert EXISTS subqueries to semi-joins for better performance"
.to_string(),
pattern: QueryPattern::Exists {
pattern: "*".to_string(),
},
rewrite: RewriteAction::ExistsToSemiJoin,
conditions: vec![RuleCondition {
condition_type: ConditionType::SubquerySize { max_triples: 100 },
parameters: HashMap::new(),
}],
priority: 10,
estimated_benefit: 0.4,
},
SubqueryRewriteRule {
id: "not_exists_to_anti_join".to_string(),
name: "NOT EXISTS to Anti-Join".to_string(),
description: "Convert NOT EXISTS subqueries to anti-joins".to_string(),
pattern: QueryPattern::NotExists {
pattern: "*".to_string(),
},
rewrite: RewriteAction::NotExistsToAntiJoin,
conditions: vec![],
priority: 10,
estimated_benefit: 0.4,
},
SubqueryRewriteRule {
id: "simple_subquery_pullup".to_string(),
name: "Simple Subquery Pull-Up".to_string(),
description: "Pull up simple subqueries without aggregation or distinct"
.to_string(),
pattern: QueryPattern::FromSubquery {
alias: "*".to_string(),
},
rewrite: RewriteAction::SubqueryPullUp,
conditions: vec![RuleCondition {
condition_type: ConditionType::IsCorrelated { expected: false },
parameters: HashMap::new(),
}],
priority: 8,
estimated_benefit: 0.3,
},
SubqueryRewriteRule {
id: "in_to_join".to_string(),
name: "IN to Join Conversion".to_string(),
description: "Convert IN subqueries to joins when beneficial".to_string(),
pattern: QueryPattern::InSubquery {
variable: "*".to_string(),
},
rewrite: RewriteAction::InToJoin,
conditions: vec![RuleCondition {
condition_type: ConditionType::SelectivityRange { min: 0.0, max: 0.3 },
parameters: HashMap::new(),
}],
priority: 7,
estimated_benefit: 0.35,
},
SubqueryRewriteRule {
id: "decorrelate_simple".to_string(),
name: "Simple Decorrelation".to_string(),
description: "Decorrelate simple correlated subqueries".to_string(),
pattern: QueryPattern::Correlated { outer_vars: vec![] },
rewrite: RewriteAction::Decorrelate,
conditions: vec![],
priority: 9,
estimated_benefit: 0.5,
},
]
}
fn sort_rules_by_priority(rules: &[SubqueryRewriteRule]) -> Vec<usize> {
let mut indices: Vec<usize> = (0..rules.len()).collect();
indices.sort_by(|&a, &b| rules[b].priority.cmp(&rules[a].priority));
indices
}
pub fn find_applicable_rewrite(
&self,
subquery: &SubqueryInfo,
) -> FusekiResult<Option<SubqueryRewriteRule>> {
for &idx in &self.rule_order {
let rule = &self.rules[idx];
if self.pattern_matches(&rule.pattern, subquery)
&& self.conditions_satisfied(&rule.conditions, subquery)?
{
return Ok(Some(rule.clone()));
}
}
Ok(None)
}
fn pattern_matches(&self, pattern: &QueryPattern, subquery: &SubqueryInfo) -> bool {
match pattern {
QueryPattern::Exists { .. } => subquery.subquery_type == SubqueryType::Exists,
QueryPattern::NotExists { .. } => subquery.subquery_type == SubqueryType::NotExists,
QueryPattern::ScalarSelect { .. } => subquery.subquery_type == SubqueryType::Scalar,
QueryPattern::FromSubquery { .. } => subquery.subquery_type == SubqueryType::From,
QueryPattern::InSubquery { .. } => subquery.subquery_type == SubqueryType::In,
QueryPattern::NotInSubquery { .. } => subquery.subquery_type == SubqueryType::NotIn,
QueryPattern::Correlated { .. } => subquery.is_correlated,
_ => false,
}
}
fn conditions_satisfied(
&self,
conditions: &[RuleCondition],
subquery: &SubqueryInfo,
) -> FusekiResult<bool> {
for condition in conditions {
if !self.check_condition(condition, subquery)? {
return Ok(false);
}
}
Ok(true)
}
fn check_condition(
&self,
condition: &RuleCondition,
subquery: &SubqueryInfo,
) -> FusekiResult<bool> {
match &condition.condition_type {
ConditionType::SubquerySize { max_triples } => {
Ok(subquery.estimated_size <= *max_triples)
}
ConditionType::IsCorrelated { expected } => Ok(subquery.is_correlated == *expected),
ConditionType::SelectivityRange { min, max } => Ok(subquery.estimated_selectivity
>= *min
&& subquery.estimated_selectivity <= *max),
_ => Ok(true),
}
}
}
impl Default for SubqueryCostEstimator {
fn default() -> Self {
Self::new()
}
}
impl SubqueryCostEstimator {
pub fn new() -> Self {
Self {
operation_costs: OperationCosts::default(),
cardinality_estimator: CardinalityEstimator::new(),
selectivity_estimator: SelectivityEstimator::new(),
historical_stats: Arc::new(RwLock::new(HistoricalStats::new())),
}
}
pub async fn estimate_cost(
&self,
subquery: &SubqueryInfo,
strategy: &ExecutionStrategy,
) -> FusekiResult<f64> {
let base_cost = self.estimate_base_cost(subquery);
let strategy_multiplier = match strategy {
ExecutionStrategy::MaterializeOnce => 1.0,
ExecutionStrategy::CorrelatedExecution => subquery.outer_cardinality as f64,
ExecutionStrategy::JoinConversion => 0.7,
ExecutionStrategy::SemiJoin => 0.5,
ExecutionStrategy::AntiJoin => 0.6,
ExecutionStrategy::RemoteExecution => 2.0,
ExecutionStrategy::ParallelExecution { max_threads } => {
1.0 / (*max_threads as f64).sqrt()
}
ExecutionStrategy::StreamingExecution => 0.8,
};
Ok(base_cost * strategy_multiplier)
}
fn estimate_base_cost(&self, subquery: &SubqueryInfo) -> f64 {
let scan_cost = subquery.estimated_size as f64 * self.operation_costs.scan_cost_per_triple;
let filter_cost =
subquery.filter_count as f64 * self.operation_costs.filter_cost_per_binding;
let join_cost = subquery.join_count as f64 * self.operation_costs.join_cost_per_pair;
scan_cost + filter_cost + join_cost
}
}
impl Default for MaterializationManager {
fn default() -> Self {
Self::new()
}
}
impl MaterializationManager {
pub fn new() -> Self {
Self {
materialized_views: Arc::new(RwLock::new(HashMap::new())),
policies: MaterializationPolicies {
max_cache_size_mb: 100,
ttl_seconds: 3600,
min_access_count: 2,
cost_threshold: 100.0,
},
cache_stats: Arc::new(RwLock::new(CacheStatistics::default())),
}
}
pub async fn get_or_materialize(
&self,
subquery: &SubqueryInfo,
executor: impl Fn() -> FusekiResult<Vec<HashMap<String, serde_json::Value>>>,
) -> FusekiResult<Vec<HashMap<String, serde_json::Value>>> {
let cache_key = self.compute_cache_key(subquery);
if let Some(view) = self.get_cached_view(&cache_key).await? {
self.update_cache_stats(true).await;
return Ok(view.result_data);
}
self.update_cache_stats(false).await;
let results = executor()?;
if self.should_materialize(subquery, &results) {
self.materialize_view(cache_key, subquery, results.clone())
.await?;
}
Ok(results)
}
fn compute_cache_key(&self, subquery: &SubqueryInfo) -> String {
format!("{:x}", md5::compute(&subquery.query_text))
}
async fn get_cached_view(&self, key: &str) -> FusekiResult<Option<MaterializedView>> {
let views = self.materialized_views.read().await;
if let Some(view) = views.get(key) {
if view.is_valid && self.is_within_ttl(view) {
return Ok(Some(view.clone()));
}
}
Ok(None)
}
fn is_within_ttl(&self, view: &MaterializedView) -> bool {
let age = chrono::Utc::now() - view.created_at;
age.num_seconds() < self.policies.ttl_seconds as i64
}
fn should_materialize(
&self,
subquery: &SubqueryInfo,
results: &[HashMap<String, serde_json::Value>],
) -> bool {
let estimated_cost = subquery.estimated_cost;
let result_size = results.len();
estimated_cost > self.policies.cost_threshold && result_size < 10000
}
async fn materialize_view(
&self,
key: String,
_subquery: &SubqueryInfo,
results: Vec<HashMap<String, serde_json::Value>>,
) -> FusekiResult<()> {
let size_bytes = serde_json::to_vec(&results)?.len();
let view = MaterializedView {
query_hash: key.clone(),
result_data: results,
created_at: chrono::Utc::now(),
last_accessed: chrono::Utc::now(),
access_count: 1,
size_bytes,
is_valid: true,
};
let mut views = self.materialized_views.write().await;
self.evict_if_needed(&mut views, size_bytes).await?;
views.insert(key, view);
Ok(())
}
async fn evict_if_needed(
&self,
views: &mut HashMap<String, MaterializedView>,
new_size: usize,
) -> FusekiResult<()> {
let max_size = self.policies.max_cache_size_mb * 1024 * 1024;
let current_size: usize = views.values().map(|v| v.size_bytes).sum();
if current_size + new_size > max_size {
let mut entries: Vec<_> = views
.iter()
.map(|(k, v)| (k.clone(), v.last_accessed))
.collect();
entries.sort_by_key(|(_, last_accessed)| *last_accessed);
let mut freed = 0;
for (key, _) in entries {
if freed >= new_size {
break;
}
if let Some(view) = views.remove(&key) {
freed += view.size_bytes;
}
}
}
Ok(())
}
async fn update_cache_stats(&self, hit: bool) {
let mut stats = self.cache_stats.write().await;
if hit {
stats.hit_count += 1;
} else {
stats.miss_count += 1;
}
}
}
impl Default for ExecutionStrategySelector {
fn default() -> Self {
Self::new()
}
}
impl ExecutionStrategySelector {
pub fn new() -> Self {
Self {
strategies: Self::create_available_strategies(),
selection_algorithm: SelectionAlgorithm::CostBased,
}
}
fn create_available_strategies() -> Vec<ExecutionStrategy> {
vec![
ExecutionStrategy::MaterializeOnce,
ExecutionStrategy::CorrelatedExecution,
ExecutionStrategy::JoinConversion,
ExecutionStrategy::SemiJoin,
ExecutionStrategy::AntiJoin,
ExecutionStrategy::ParallelExecution { max_threads: 4 },
ExecutionStrategy::StreamingExecution,
]
}
pub fn select_strategy(&self, subquery: &SubqueryInfo) -> FusekiResult<ExecutionStrategy> {
match &self.selection_algorithm {
SelectionAlgorithm::CostBased => self.select_cost_based(subquery),
SelectionAlgorithm::RuleBased => self.select_rule_based(subquery),
_ => self.select_cost_based(subquery),
}
}
fn select_cost_based(&self, subquery: &SubqueryInfo) -> FusekiResult<ExecutionStrategy> {
match subquery.subquery_type {
SubqueryType::Exists => Ok(ExecutionStrategy::SemiJoin),
SubqueryType::NotExists => Ok(ExecutionStrategy::AntiJoin),
SubqueryType::Scalar if subquery.is_correlated => {
Ok(ExecutionStrategy::CorrelatedExecution)
}
SubqueryType::Scalar => Ok(ExecutionStrategy::MaterializeOnce),
SubqueryType::In => Ok(ExecutionStrategy::JoinConversion),
_ => Ok(ExecutionStrategy::MaterializeOnce),
}
}
fn select_rule_based(&self, subquery: &SubqueryInfo) -> FusekiResult<ExecutionStrategy> {
if subquery.is_correlated && subquery.outer_cardinality > 1000 {
Ok(ExecutionStrategy::MaterializeOnce)
} else if subquery.is_independent() && subquery.estimated_size > 10000 {
Ok(ExecutionStrategy::StreamingExecution)
} else {
self.select_cost_based(subquery)
}
}
}
impl Default for PatternMatcher {
fn default() -> Self {
Self::new()
}
}
impl PatternMatcher {
pub fn new() -> Self {
Self {
compiled_patterns: HashMap::new(),
pattern_index: PatternIndex {
by_type: HashMap::new(),
by_keyword: HashMap::new(),
},
}
}
}
impl Default for CardinalityEstimator {
fn default() -> Self {
Self::new()
}
}
impl CardinalityEstimator {
pub fn new() -> Self {
Self {
predicate_stats: HashMap::new(),
join_selectivities: HashMap::new(),
}
}
}
impl Default for SelectivityEstimator {
fn default() -> Self {
Self::new()
}
}
impl SelectivityEstimator {
pub fn new() -> Self {
Self {
filter_selectivities: HashMap::new(),
default_selectivity: 0.3,
}
}
}
impl Default for HistoricalStats {
fn default() -> Self {
Self::new()
}
}
impl HistoricalStats {
pub fn new() -> Self {
Self {
execution_times: HashMap::new(),
result_sizes: HashMap::new(),
optimization_success: HashMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct SubqueryInfo {
pub id: String,
pub query_text: String,
pub subquery_type: SubqueryType,
pub is_correlated: bool,
pub outer_vars: Vec<String>,
pub estimated_size: usize,
pub estimated_selectivity: f64,
pub estimated_cost: f64,
pub filter_count: usize,
pub join_count: usize,
pub outer_cardinality: usize,
pub dependencies: Vec<String>,
}
impl SubqueryInfo {
pub fn is_simple_projection(&self) -> bool {
self.filter_count == 0 && self.join_count <= 1 && !self.is_correlated
}
pub fn is_independent(&self) -> bool {
!self.is_correlated && self.dependencies.is_empty()
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum SubqueryType {
Exists,
NotExists,
Scalar,
From,
In,
NotIn,
Lateral,
With,
}
#[derive(Debug, Clone)]
pub struct OptimizedQuery {
pub original_query: String,
pub optimized_query: String,
pub execution_plan: ExecutionPlan,
pub rewrites_applied: Vec<SubqueryRewriteRule>,
pub estimated_cost_reduction: f64,
pub optimization_time_ms: f64,
}
#[derive(Debug, Clone)]
pub struct ExecutionPlan {
pub query: String,
pub steps: Vec<ExecutionStep>,
pub total_estimated_cost: f64,
pub parallelization_opportunities: Vec<ParallelGroup>,
}
#[derive(Debug, Clone)]
pub struct ExecutionStep {
pub subquery_id: String,
pub strategy: ExecutionStrategy,
pub estimated_cost: f64,
pub dependencies: Vec<String>,
pub parallelizable: bool,
}
#[derive(Debug, Clone)]
pub struct ParallelGroup {
pub group_id: String,
pub subquery_ids: Vec<String>,
pub max_parallelism: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subquery_optimizer_creation() {
let optimizer = AdvancedSubqueryOptimizer::new();
assert!(!optimizer.rewrite_engine.rules.is_empty());
}
#[test]
fn test_pattern_matching() {
let engine = SubqueryRewriteEngine::new();
let subquery = SubqueryInfo {
id: "sq1".to_string(),
query_text: "?s ?p ?o".to_string(),
subquery_type: SubqueryType::Exists,
is_correlated: false,
outer_vars: vec![],
estimated_size: 50,
estimated_selectivity: 0.2,
estimated_cost: 100.0,
filter_count: 0,
join_count: 0,
outer_cardinality: 1,
dependencies: vec![],
};
let rewrite = engine.find_applicable_rewrite(&subquery).unwrap();
assert!(rewrite.is_some());
assert_eq!(rewrite.unwrap().id, "exists_to_semi_join");
}
#[tokio::test]
async fn test_materialization_manager() {
let manager = MaterializationManager::new();
let subquery = SubqueryInfo {
id: "sq1".to_string(),
query_text: "SELECT * WHERE { ?s ?p ?o }".to_string(),
subquery_type: SubqueryType::Scalar,
is_correlated: false,
outer_vars: vec![],
estimated_size: 100,
estimated_selectivity: 0.5,
estimated_cost: 200.0,
filter_count: 0,
join_count: 0,
outer_cardinality: 1,
dependencies: vec![],
};
let results = manager
.get_or_materialize(&subquery, || {
Ok(vec![HashMap::from([
("s".to_string(), serde_json::json!("subject")),
("p".to_string(), serde_json::json!("predicate")),
("o".to_string(), serde_json::json!("object")),
])])
})
.await
.unwrap();
assert_eq!(results.len(), 1);
}
}