use crate::algebra::{Algebra, Expression, TriplePattern, Variable};
use crate::query_analysis::{QueryAnalysis, QueryAnalyzer};
use crate::statistics_collector::StatisticsCollector;
use anyhow::{anyhow, Result};
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone)]
pub struct AlgebraGenerationConfig {
pub enable_join_reordering: bool,
pub enable_filter_pushdown: bool,
pub enable_projection_pushdown: bool,
pub max_join_reorder_attempts: usize,
pub reorder_cost_threshold: f64,
}
impl Default for AlgebraGenerationConfig {
fn default() -> Self {
Self {
enable_join_reordering: true,
enable_filter_pushdown: true,
enable_projection_pushdown: true,
max_join_reorder_attempts: 1000,
reorder_cost_threshold: 0.1,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JoinOrderingStrategy {
LeftDeep,
RightDeep,
Bushy,
Adaptive,
Greedy,
DynamicProgramming,
}
#[derive(Debug, Clone)]
pub struct JoinCost {
pub cardinality: usize,
pub cpu_cost: f64,
pub io_cost: f64,
pub memory_cost: f64,
pub total_cost: f64,
}
impl JoinCost {
pub fn new(cardinality: usize, cpu_cost: f64, io_cost: f64, memory_cost: f64) -> Self {
let total_cost = cpu_cost + io_cost + (memory_cost * 0.1); Self {
cardinality,
cpu_cost,
io_cost,
memory_cost,
total_cost,
}
}
pub fn infinite() -> Self {
Self {
cardinality: usize::MAX,
cpu_cost: f64::INFINITY,
io_cost: f64::INFINITY,
memory_cost: f64::INFINITY,
total_cost: f64::INFINITY,
}
}
}
#[derive(Debug, Clone)]
pub struct JoinCandidate {
pub algebra: Algebra,
pub cost: JoinCost,
pub available_variables: HashSet<Variable>,
}
pub struct AlgebraGenerator {
config: AlgebraGenerationConfig,
analyzer: QueryAnalyzer,
#[allow(dead_code)]
statistics: Option<StatisticsCollector>,
}
impl AlgebraGenerator {
pub fn new(config: AlgebraGenerationConfig) -> Self {
Self {
config,
analyzer: QueryAnalyzer::new(),
statistics: None,
}
}
pub fn with_statistics(
config: AlgebraGenerationConfig,
statistics: StatisticsCollector,
) -> Self {
Self {
config,
analyzer: QueryAnalyzer::new(),
statistics: Some(statistics),
}
}
pub fn generate_from_bgp(
&self,
patterns: Vec<TriplePattern>,
filters: Vec<Expression>,
projection: Option<Vec<Variable>>,
) -> Result<Algebra> {
if patterns.is_empty() {
return Err(anyhow!("Cannot generate algebra from empty BGP"));
}
let mut algebra = if patterns.len() == 1 {
Algebra::Bgp(vec![patterns[0].clone()])
} else {
Algebra::Bgp(patterns.clone())
};
if self.config.enable_join_reordering && patterns.len() > 1 {
algebra = self.reorder_joins(algebra, &patterns)?;
}
if self.config.enable_filter_pushdown {
algebra = self.apply_filters_with_pushdown(algebra, filters)?;
} else {
algebra = self.apply_filters(algebra, filters)?;
}
if let Some(vars) = projection {
if self.config.enable_projection_pushdown {
algebra = self.apply_projection_with_pushdown(algebra, vars)?;
} else {
algebra = Algebra::Project {
variables: vars,
pattern: Box::new(algebra),
};
}
}
Ok(algebra)
}
pub fn generate_join_order(
&self,
patterns: Vec<TriplePattern>,
strategy: JoinOrderingStrategy,
) -> Result<Algebra> {
match strategy {
JoinOrderingStrategy::LeftDeep => self.generate_left_deep_joins(patterns),
JoinOrderingStrategy::RightDeep => self.generate_right_deep_joins(patterns),
JoinOrderingStrategy::Bushy => self.generate_bushy_joins(patterns),
JoinOrderingStrategy::Adaptive => self.generate_adaptive_joins(patterns),
JoinOrderingStrategy::Greedy => self.generate_greedy_joins(patterns),
JoinOrderingStrategy::DynamicProgramming => self.generate_dp_joins(patterns),
}
}
fn generate_left_deep_joins(&self, patterns: Vec<TriplePattern>) -> Result<Algebra> {
if patterns.is_empty() {
return Err(anyhow!("No patterns to join"));
}
let mut result = Algebra::Bgp(vec![patterns[0].clone()]);
for pattern in patterns.iter().skip(1) {
result = Algebra::Join {
left: Box::new(result),
right: Box::new(Algebra::Bgp(vec![pattern.clone()])),
};
}
Ok(result)
}
fn generate_right_deep_joins(&self, patterns: Vec<TriplePattern>) -> Result<Algebra> {
if patterns.is_empty() {
return Err(anyhow!("No patterns to join"));
}
let mut result = Algebra::Bgp(vec![patterns[patterns.len() - 1].clone()]);
for pattern in patterns.iter().rev().skip(1) {
result = Algebra::Join {
left: Box::new(Algebra::Bgp(vec![pattern.clone()])),
right: Box::new(result),
};
}
Ok(result)
}
#[allow(clippy::only_used_in_recursion)]
fn generate_bushy_joins(&self, patterns: Vec<TriplePattern>) -> Result<Algebra> {
if patterns.is_empty() {
return Err(anyhow!("No patterns to join"));
}
if patterns.len() == 1 {
return Ok(Algebra::Bgp(vec![patterns[0].clone()]));
}
if patterns.len() == 2 {
return Ok(Algebra::Join {
left: Box::new(Algebra::Bgp(vec![patterns[0].clone()])),
right: Box::new(Algebra::Bgp(vec![patterns[1].clone()])),
});
}
let mid = patterns.len() / 2;
let (left_patterns, right_patterns) = patterns.split_at(mid);
let left_algebra = self.generate_bushy_joins(left_patterns.to_vec())?;
let right_algebra = self.generate_bushy_joins(right_patterns.to_vec())?;
Ok(Algebra::Join {
left: Box::new(left_algebra),
right: Box::new(right_algebra),
})
}
fn generate_adaptive_joins(&self, patterns: Vec<TriplePattern>) -> Result<Algebra> {
let join_variables = self.count_join_variables(&patterns);
let pattern_selectivity = self.estimate_pattern_selectivity(&patterns);
let strategy = if patterns.len() <= 3 {
JoinOrderingStrategy::LeftDeep
} else if join_variables > patterns.len() / 2 {
JoinOrderingStrategy::Bushy
} else if pattern_selectivity.iter().fold(0.0, |acc, &s| acc + s)
< patterns.len() as f64 * 0.1
{
JoinOrderingStrategy::Greedy
} else {
JoinOrderingStrategy::DynamicProgramming
};
self.generate_join_order(patterns, strategy)
}
fn generate_greedy_joins(&self, patterns: Vec<TriplePattern>) -> Result<Algebra> {
if patterns.is_empty() {
return Err(anyhow!("No patterns to join"));
}
let remaining_patterns = patterns;
let mut candidates: Vec<JoinCandidate> = Vec::new();
for pattern in &remaining_patterns {
let algebra = Algebra::Bgp(vec![pattern.clone()]);
let cost = self.estimate_cost(&algebra)?;
let variables = self.extract_variables(&algebra);
candidates.push(JoinCandidate {
algebra,
cost,
available_variables: variables,
});
}
while candidates.len() > 1 {
let mut best_join: Option<(usize, usize, JoinCandidate)> = None;
let mut best_cost = JoinCost::infinite();
for i in 0..candidates.len() {
for j in (i + 1)..candidates.len() {
if let Ok(join_candidate) =
self.create_join_candidate(&candidates[i], &candidates[j])
{
if join_candidate.cost.total_cost < best_cost.total_cost {
best_cost = join_candidate.cost.clone();
best_join = Some((i, j, join_candidate));
}
}
}
}
if let Some((i, j, join_candidate)) = best_join {
let (larger, smaller) = if i > j { (i, j) } else { (j, i) };
candidates.remove(larger);
candidates.remove(smaller);
candidates.push(join_candidate);
} else {
return self.generate_left_deep_joins(remaining_patterns);
}
}
Ok(candidates
.into_iter()
.next()
.expect("candidates validated to be non-empty")
.algebra)
}
fn generate_dp_joins(&self, patterns: Vec<TriplePattern>) -> Result<Algebra> {
if patterns.is_empty() {
return Err(anyhow!("No patterns to join"));
}
if patterns.len() == 1 {
return Ok(Algebra::Bgp(vec![patterns[0].clone()]));
}
if patterns.len() > 10 {
return self.generate_greedy_joins(patterns);
}
let n = patterns.len();
let mut dp: HashMap<u64, JoinCandidate> = HashMap::new();
for (i, pattern) in patterns.iter().enumerate() {
let mask = 1u64 << i;
let algebra = Algebra::Bgp(vec![pattern.clone()]);
let cost = self.estimate_cost(&algebra)?;
let variables = self.extract_variables(&algebra);
dp.insert(
mask,
JoinCandidate {
algebra,
cost,
available_variables: variables,
},
);
}
for size in 2..=n {
for mask in 1u64..(1u64 << n) {
if mask.count_ones() != size as u32 {
continue;
}
let mut best_candidate: Option<JoinCandidate> = None;
let mut best_cost = f64::INFINITY;
let mut submask = mask;
while submask > 0 {
let complement = mask ^ submask;
if complement > 0 && submask < complement {
if let (Some(left), Some(right)) = (dp.get(&submask), dp.get(&complement)) {
if let Ok(candidate) = self.create_join_candidate(left, right) {
if candidate.cost.total_cost < best_cost {
best_cost = candidate.cost.total_cost;
best_candidate = Some(candidate);
}
}
}
}
submask = (submask - 1) & mask;
}
if let Some(candidate) = best_candidate {
dp.insert(mask, candidate);
}
}
}
let full_mask = (1u64 << n) - 1;
dp.get(&full_mask)
.map(|candidate| candidate.algebra.clone())
.ok_or_else(|| anyhow!("Failed to generate optimal join order"))
}
fn create_join_candidate(
&self,
left: &JoinCandidate,
right: &JoinCandidate,
) -> Result<JoinCandidate> {
let shared_vars: HashSet<_> = left
.available_variables
.intersection(&right.available_variables)
.cloned()
.collect();
let algebra = Algebra::Join {
left: Box::new(left.algebra.clone()),
right: Box::new(right.algebra.clone()),
};
let cost = self.estimate_join_cost(&left.cost, &right.cost, shared_vars.len())?;
let mut available_variables = left.available_variables.clone();
available_variables.extend(right.available_variables.iter().cloned());
Ok(JoinCandidate {
algebra,
cost,
available_variables,
})
}
fn reorder_joins(&self, _algebra: Algebra, patterns: &[TriplePattern]) -> Result<Algebra> {
self.generate_greedy_joins(patterns.to_vec())
}
fn apply_filters_with_pushdown(
&self,
mut algebra: Algebra,
filters: Vec<Expression>,
) -> Result<Algebra> {
for filter in filters {
let analysis = self.analyzer.analyze(&algebra)?;
algebra = self.push_filter_down(algebra, filter, &analysis)?;
}
Ok(algebra)
}
fn apply_filters(&self, mut algebra: Algebra, filters: Vec<Expression>) -> Result<Algebra> {
for filter in filters {
algebra = Algebra::Filter {
condition: filter,
pattern: Box::new(algebra),
};
}
Ok(algebra)
}
#[allow(clippy::only_used_in_recursion)]
fn push_filter_down(
&self,
algebra: Algebra,
filter: Expression,
analysis: &QueryAnalysis,
) -> Result<Algebra> {
let mut filter_vars = HashSet::new();
self.collect_filter_variables(&filter, &mut filter_vars);
match algebra {
Algebra::Join { left, right } => {
let left_vars = self.extract_variables(&left);
let right_vars = self.extract_variables(&right);
if filter_vars.is_subset(&left_vars) {
let new_left = self.push_filter_down(*left, filter, analysis)?;
Ok(Algebra::Join {
left: Box::new(new_left),
right,
})
} else if filter_vars.is_subset(&right_vars) {
let new_right = self.push_filter_down(*right, filter, analysis)?;
Ok(Algebra::Join {
left,
right: Box::new(new_right),
})
} else {
Ok(Algebra::Filter {
condition: filter,
pattern: Box::new(Algebra::Join { left, right }),
})
}
}
_ => {
Ok(Algebra::Filter {
condition: filter,
pattern: Box::new(algebra),
})
}
}
}
fn apply_projection_with_pushdown(
&self,
algebra: Algebra,
variables: Vec<Variable>,
) -> Result<Algebra> {
Ok(Algebra::Project {
variables,
pattern: Box::new(algebra),
})
}
fn estimate_cost(&self, algebra: &Algebra) -> Result<JoinCost> {
match algebra {
Algebra::Bgp(patterns) if patterns.len() == 1 => {
Ok(JoinCost::new(1000, 1.0, 1.0, 0.1))
}
Algebra::Bgp(patterns) => {
let base_cost = patterns.len() as f64;
Ok(JoinCost::new(
1000 * patterns.len(),
base_cost,
base_cost,
base_cost * 0.1,
))
}
Algebra::Join { left, right } => {
let left_cost = self.estimate_cost(left)?;
let right_cost = self.estimate_cost(right)?;
self.estimate_join_cost(&left_cost, &right_cost, 1) }
_ => {
Ok(JoinCost::new(10000, 10.0, 10.0, 1.0))
}
}
}
fn estimate_join_cost(
&self,
left: &JoinCost,
right: &JoinCost,
shared_variables: usize,
) -> Result<JoinCost> {
let selectivity = if shared_variables > 0 {
0.1_f64.powi(shared_variables as i32)
} else {
1.0 };
let result_cardinality =
((left.cardinality as f64 * right.cardinality as f64 * selectivity) as usize).max(1);
let cpu_cost =
left.cpu_cost + right.cpu_cost + (left.cardinality + right.cardinality) as f64 * 0.001;
let io_cost = left.io_cost + right.io_cost;
let memory_cost = (left.cardinality + right.cardinality) as f64 * 0.0001;
Ok(JoinCost::new(
result_cardinality,
cpu_cost,
io_cost,
memory_cost,
))
}
fn count_join_variables(&self, patterns: &[TriplePattern]) -> usize {
let mut all_vars: HashSet<Variable> = HashSet::new();
let mut join_vars: HashSet<Variable> = HashSet::new();
for pattern in patterns {
let pattern_vars = self.extract_pattern_variables(pattern);
for var in &pattern_vars {
if all_vars.contains(var) {
join_vars.insert(var.clone());
} else {
all_vars.insert(var.clone());
}
}
}
join_vars.len()
}
fn estimate_pattern_selectivity(&self, patterns: &[TriplePattern]) -> Vec<f64> {
patterns
.iter()
.map(|pattern| {
let var_count = self.extract_pattern_variables(pattern).len();
match var_count {
0 => 0.001, 1 => 0.01, 2 => 0.1, _ => 0.5, }
})
.collect()
}
fn extract_variables(&self, algebra: &Algebra) -> HashSet<Variable> {
let mut variables = HashSet::new();
self.extract_variables_recursive(algebra, &mut variables);
variables
}
fn extract_variables_recursive(&self, algebra: &Algebra, variables: &mut HashSet<Variable>) {
match algebra {
Algebra::Bgp(patterns) if patterns.len() == 1 => {
variables.extend(self.extract_pattern_variables(&patterns[0]));
}
Algebra::Bgp(patterns) => {
for pattern in patterns {
variables.extend(self.extract_pattern_variables(pattern));
}
}
Algebra::Join { left, right } => {
self.extract_variables_recursive(left, variables);
self.extract_variables_recursive(right, variables);
}
_ => {} }
}
fn extract_pattern_variables(&self, pattern: &TriplePattern) -> HashSet<Variable> {
let mut variables = HashSet::new();
if let crate::algebra::Term::Variable(var) = &pattern.subject {
variables.insert(var.clone());
}
if let crate::algebra::Term::Variable(var) = &pattern.predicate {
variables.insert(var.clone());
}
if let crate::algebra::Term::Variable(var) = &pattern.object {
variables.insert(var.clone());
}
variables
}
#[allow(clippy::only_used_in_recursion)]
fn collect_filter_variables(&self, expression: &Expression, variables: &mut HashSet<Variable>) {
match expression {
Expression::Variable(var) => {
variables.insert(var.clone());
}
Expression::Binary { left, right, .. } => {
self.collect_filter_variables(left, variables);
self.collect_filter_variables(right, variables);
}
Expression::Unary { operand, .. } => {
self.collect_filter_variables(operand, variables);
}
Expression::Function { args, .. } => {
for arg in args {
self.collect_filter_variables(arg, variables);
}
}
Expression::Conditional {
condition,
then_expr: if_true,
else_expr: if_false,
} => {
self.collect_filter_variables(condition, variables);
self.collect_filter_variables(if_true, variables);
self.collect_filter_variables(if_false, variables);
}
Expression::Bound(var) => {
variables.insert(var.clone());
}
_ => {} }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::algebra::Term;
use oxirs_core::model::NamedNode;
#[test]
fn test_left_deep_join_generation() {
let generator = AlgebraGenerator::new(AlgebraGenerationConfig::default());
let patterns = vec![
TriplePattern {
subject: Term::Variable(Variable::new("s").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/p1")),
object: Term::Variable(Variable::new("o1").unwrap()),
},
TriplePattern {
subject: Term::Variable(Variable::new("s").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/p2")),
object: Term::Variable(Variable::new("o2").unwrap()),
},
];
let algebra = generator.generate_left_deep_joins(patterns).unwrap();
if let Algebra::Join { left, right } = algebra {
assert!(matches!(*left.as_ref(), Algebra::Bgp(ref patterns) if patterns.len() == 1));
assert!(matches!(*right.as_ref(), Algebra::Bgp(ref patterns) if patterns.len() == 1));
} else {
panic!("Expected join algebra");
}
}
#[test]
fn test_bushy_join_generation() {
let generator = AlgebraGenerator::new(AlgebraGenerationConfig::default());
let patterns = vec![
TriplePattern {
subject: Term::Variable(Variable::new("s1").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/p1")),
object: Term::Variable(Variable::new("o1").unwrap()),
},
TriplePattern {
subject: Term::Variable(Variable::new("s2").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/p2")),
object: Term::Variable(Variable::new("o2").unwrap()),
},
TriplePattern {
subject: Term::Variable(Variable::new("s3").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/p3")),
object: Term::Variable(Variable::new("o3").unwrap()),
},
TriplePattern {
subject: Term::Variable(Variable::new("s4").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/p4")),
object: Term::Variable(Variable::new("o4").unwrap()),
},
];
let algebra = generator.generate_bushy_joins(patterns).unwrap();
if let Algebra::Join { left, right } = algebra {
assert!(
matches!(*left.as_ref(), Algebra::Join { .. })
|| matches!(*left.as_ref(), Algebra::Bgp(ref patterns) if !patterns.is_empty())
);
assert!(
matches!(*right.as_ref(), Algebra::Join { .. })
|| matches!(*right.as_ref(), Algebra::Bgp(ref patterns) if !patterns.is_empty())
);
} else {
panic!("Expected join algebra");
}
}
#[test]
fn test_variable_extraction() {
let generator = AlgebraGenerator::new(AlgebraGenerationConfig::default());
let pattern = TriplePattern {
subject: Term::Variable(Variable::new("s").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/predicate")),
object: Term::Variable(Variable::new("o").unwrap()),
};
let variables = generator.extract_pattern_variables(&pattern);
assert_eq!(variables.len(), 2);
assert!(variables.contains(&Variable::new("s").unwrap()));
assert!(variables.contains(&Variable::new("o").unwrap()));
}
#[test]
fn test_cost_estimation() {
let generator = AlgebraGenerator::new(AlgebraGenerationConfig::default());
let pattern = TriplePattern {
subject: Term::Variable(Variable::new("s").unwrap()),
predicate: Term::Iri(NamedNode::new_unchecked("http://example.org/predicate")),
object: Term::Variable(Variable::new("o").unwrap()),
};
let algebra = Algebra::Bgp(vec![pattern]);
let cost = generator.estimate_cost(&algebra).unwrap();
assert!(cost.total_cost > 0.0);
assert!(cost.cardinality > 0);
}
}