use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use tracing::{debug, info, span, Level};
use crate::model::{StarTerm, StarTriple};
use crate::store::StarStore;
use crate::StarResult;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryPlan {
operations: Vec<QueryOperation>,
estimated_cost: f64,
estimated_cardinality: usize,
statistics: QueryStatistics,
index_selections: HashMap<usize, IndexChoice>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum QueryOperation {
TripleScan {
pattern: TriplePattern,
selectivity: f64,
cardinality: usize,
},
Join {
left: usize,
right: usize,
join_vars: Vec<String>,
strategy: JoinStrategy,
cost: f64,
},
Filter {
input: usize,
expression: FilterExpression,
selectivity: f64,
},
Project {
input: usize,
variables: Vec<String>,
},
Distinct {
input: usize,
},
OrderBy {
input: usize,
order_specs: Vec<OrderSpec>,
},
Limit {
input: usize,
limit: usize,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct TriplePattern {
pub subject: PatternTerm,
pub predicate: PatternTerm,
pub object: PatternTerm,
pub is_quoted: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum PatternTerm {
Variable(String),
Constant(String),
QuotedPattern(Box<TriplePattern>),
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum JoinStrategy {
NestedLoop,
Hash,
Merge,
IndexNestedLoop,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FilterExpression {
GreaterThan(String, f64),
LessThan(String, f64),
Equals(String, String),
And(Box<FilterExpression>, Box<FilterExpression>),
Or(Box<FilterExpression>, Box<FilterExpression>),
Not(Box<FilterExpression>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderSpec {
pub variable: String,
pub ascending: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum IndexChoice {
SPO,
POS,
OSP,
FullScan,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct QueryStatistics {
pub total_triples: usize,
pub quoted_triples: usize,
pub distinct_subjects: usize,
pub distinct_predicates: usize,
pub distinct_objects: usize,
pub avg_nesting_depth: f64,
}
impl QueryPlan {
pub fn new() -> Self {
Self {
operations: Vec::new(),
estimated_cost: 0.0,
estimated_cardinality: 0,
statistics: QueryStatistics::default(),
index_selections: HashMap::new(),
}
}
pub fn estimated_cost(&self) -> f64 {
self.estimated_cost
}
pub fn estimated_cardinality(&self) -> usize {
self.estimated_cardinality
}
pub fn operations(&self) -> &[QueryOperation] {
&self.operations
}
pub fn add_operation(&mut self, operation: QueryOperation) {
self.operations.push(operation);
}
pub fn set_estimated_cost(&mut self, cost: f64) {
self.estimated_cost = cost;
}
pub fn set_estimated_cardinality(&mut self, cardinality: usize) {
self.estimated_cardinality = cardinality;
}
}
impl Default for QueryPlan {
fn default() -> Self {
Self::new()
}
}
pub struct QueryOptimizer {
statistics: QueryStatistics,
plan_cache: HashMap<String, QueryPlan>,
config: OptimizerConfig,
}
#[derive(Debug, Clone)]
pub struct OptimizerConfig {
pub enable_join_reordering: bool,
pub enable_filter_pushdown: bool,
pub enable_index_selection: bool,
pub enable_plan_caching: bool,
pub max_cached_plans: usize,
pub hash_join_threshold: usize,
}
impl Default for OptimizerConfig {
fn default() -> Self {
Self {
enable_join_reordering: true,
enable_filter_pushdown: true,
enable_index_selection: true,
enable_plan_caching: true,
max_cached_plans: 1000,
hash_join_threshold: 1000,
}
}
}
impl QueryOptimizer {
pub fn new() -> Self {
Self {
statistics: QueryStatistics::default(),
plan_cache: HashMap::new(),
config: OptimizerConfig::default(),
}
}
pub fn with_config(config: OptimizerConfig) -> Self {
Self {
statistics: QueryStatistics::default(),
plan_cache: HashMap::new(),
config,
}
}
pub fn update_statistics(&mut self, store: &StarStore) -> StarResult<()> {
let span = span!(Level::INFO, "update_optimizer_statistics");
let _enter = span.enter();
let all_triples = store.query(None, None, None)?;
self.statistics.total_triples = all_triples.len();
self.statistics.quoted_triples = all_triples
.iter()
.filter(|t| {
matches!(t.subject, StarTerm::QuotedTriple(_))
|| matches!(t.object, StarTerm::QuotedTriple(_))
})
.count();
let mut subjects = HashSet::new();
let mut predicates = HashSet::new();
let mut objects = HashSet::new();
for triple in &all_triples {
subjects.insert(format!("{:?}", triple.subject));
predicates.insert(format!("{:?}", triple.predicate));
objects.insert(format!("{:?}", triple.object));
}
self.statistics.distinct_subjects = subjects.len();
self.statistics.distinct_predicates = predicates.len();
self.statistics.distinct_objects = objects.len();
let total_depth: usize = all_triples
.iter()
.map(|t| self.calculate_nesting_depth(t))
.sum();
self.statistics.avg_nesting_depth = if !all_triples.is_empty() {
total_depth as f64 / all_triples.len() as f64
} else {
0.0
};
info!(
"Updated optimizer statistics: {} total triples, {} quoted triples",
self.statistics.total_triples, self.statistics.quoted_triples
);
Ok(())
}
fn calculate_nesting_depth(&self, triple: &StarTriple) -> usize {
let subject_depth = self.term_depth(&triple.subject);
let object_depth = self.term_depth(&triple.object);
subject_depth.max(object_depth)
}
fn term_depth(&self, term: &StarTerm) -> usize {
match term {
StarTerm::QuotedTriple(inner) => 1 + self.calculate_nesting_depth(inner),
_ => 0,
}
}
pub fn optimize_query(&mut self, _query: &str) -> StarResult<QueryPlan> {
let span = span!(Level::INFO, "optimize_query");
let _enter = span.enter();
if self.config.enable_plan_caching {
if let Some(cached_plan) = self.plan_cache.get(_query) {
debug!("Using cached query plan");
return Ok(cached_plan.clone());
}
}
let mut plan = QueryPlan::new();
plan.statistics = self.statistics.clone();
let pattern = TriplePattern {
subject: PatternTerm::Variable("s".to_string()),
predicate: PatternTerm::Variable("p".to_string()),
object: PatternTerm::Variable("o".to_string()),
is_quoted: false,
};
let selectivity = self.estimate_selectivity(&pattern);
let cardinality = (self.statistics.total_triples as f64 * selectivity) as usize;
plan.add_operation(QueryOperation::TripleScan {
pattern,
selectivity,
cardinality,
});
let cost = self.estimate_plan_cost(&plan);
plan.set_estimated_cost(cost);
plan.set_estimated_cardinality(cardinality);
if self.config.enable_plan_caching && self.plan_cache.len() < self.config.max_cached_plans {
self.plan_cache.insert(_query.to_string(), plan.clone());
}
info!("Generated query plan with estimated cost: {}", cost);
Ok(plan)
}
fn estimate_selectivity(&self, pattern: &TriplePattern) -> f64 {
let mut selectivity = 1.0;
match &pattern.subject {
PatternTerm::Constant(_) => {
selectivity *= 1.0 / self.statistics.distinct_subjects.max(1) as f64;
}
PatternTerm::QuotedPattern(_) => {
selectivity *= self.statistics.quoted_triples as f64
/ self.statistics.total_triples.max(1) as f64;
}
_ => {}
}
if let PatternTerm::Constant(_) = &pattern.predicate {
selectivity *= 1.0 / self.statistics.distinct_predicates.max(1) as f64;
}
match &pattern.object {
PatternTerm::Constant(_) => {
selectivity *= 1.0 / self.statistics.distinct_objects.max(1) as f64;
}
PatternTerm::QuotedPattern(_) => {
selectivity *= self.statistics.quoted_triples as f64
/ self.statistics.total_triples.max(1) as f64;
}
_ => {}
}
selectivity.max(0.0001) }
fn estimate_plan_cost(&self, plan: &QueryPlan) -> f64 {
let mut total_cost = 0.0;
for operation in &plan.operations {
match operation {
QueryOperation::TripleScan { cardinality, .. } => {
total_cost += *cardinality as f64;
}
QueryOperation::Join {
left,
right,
strategy,
..
} => {
let left_card = self.get_operation_cardinality(&plan.operations[*left]);
let right_card = self.get_operation_cardinality(&plan.operations[*right]);
let join_cost = match strategy {
JoinStrategy::NestedLoop => left_card as f64 * right_card as f64,
JoinStrategy::Hash => (left_card + right_card) as f64 * 1.5,
JoinStrategy::Merge => (left_card + right_card) as f64 * 1.2,
JoinStrategy::IndexNestedLoop => {
left_card as f64 * (right_card as f64).log2()
}
};
total_cost += join_cost;
}
QueryOperation::Filter {
input, selectivity, ..
} => {
let input_card = self.get_operation_cardinality(&plan.operations[*input]);
total_cost += input_card as f64 * (1.0 + selectivity);
}
QueryOperation::Project { input, .. } => {
let input_card = self.get_operation_cardinality(&plan.operations[*input]);
total_cost += input_card as f64 * 0.5;
}
QueryOperation::Distinct { input } => {
let input_card = self.get_operation_cardinality(&plan.operations[*input]);
total_cost += input_card as f64 * (input_card as f64).log2();
}
QueryOperation::OrderBy { input, .. } => {
let input_card = self.get_operation_cardinality(&plan.operations[*input]);
total_cost += input_card as f64 * (input_card as f64).log2();
}
QueryOperation::Limit { .. } => {
total_cost += 1.0; }
}
}
total_cost
}
fn get_operation_cardinality(&self, operation: &QueryOperation) -> usize {
match operation {
QueryOperation::TripleScan { cardinality, .. } => *cardinality,
QueryOperation::Join { cost, .. } => *cost as usize,
QueryOperation::Filter { selectivity, .. } => {
(self.statistics.total_triples as f64 * selectivity) as usize
}
_ => 1000, }
}
pub fn select_index(&self, pattern: &TriplePattern) -> IndexChoice {
if !self.config.enable_index_selection {
return IndexChoice::FullScan;
}
match (&pattern.subject, &pattern.predicate, &pattern.object) {
(PatternTerm::Constant(_), _, _) => IndexChoice::SPO,
(_, PatternTerm::Constant(_), _) => IndexChoice::POS,
(_, _, PatternTerm::Constant(_)) => IndexChoice::OSP,
_ => IndexChoice::FullScan,
}
}
pub fn clear_cache(&mut self) {
self.plan_cache.clear();
}
pub fn cache_size(&self) -> usize {
self.plan_cache.len()
}
}
impl Default for QueryOptimizer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_query_plan_creation() {
let mut plan = QueryPlan::new();
let pattern = TriplePattern {
subject: PatternTerm::Variable("s".to_string()),
predicate: PatternTerm::Constant("http://example.org/p".to_string()),
object: PatternTerm::Variable("o".to_string()),
is_quoted: false,
};
plan.add_operation(QueryOperation::TripleScan {
pattern,
selectivity: 0.1,
cardinality: 100,
});
assert_eq!(plan.operations().len(), 1);
}
#[test]
fn test_optimizer_creation() {
let optimizer = QueryOptimizer::new();
assert_eq!(optimizer.cache_size(), 0);
}
#[test]
fn test_selectivity_estimation() {
let mut optimizer = QueryOptimizer::new();
optimizer.statistics.total_triples = 10000;
optimizer.statistics.distinct_subjects = 1000;
optimizer.statistics.distinct_predicates = 50;
optimizer.statistics.distinct_objects = 5000;
let pattern1 = TriplePattern {
subject: PatternTerm::Variable("s".to_string()),
predicate: PatternTerm::Variable("p".to_string()),
object: PatternTerm::Variable("o".to_string()),
is_quoted: false,
};
let sel1 = optimizer.estimate_selectivity(&pattern1);
assert!(
sel1 > 0.5,
"All-variable pattern should have high selectivity"
);
let pattern2 = TriplePattern {
subject: PatternTerm::Constant("http://example.org/s".to_string()),
predicate: PatternTerm::Variable("p".to_string()),
object: PatternTerm::Variable("o".to_string()),
is_quoted: false,
};
let sel2 = optimizer.estimate_selectivity(&pattern2);
assert!(
sel2 < sel1,
"Pattern with constant should have lower selectivity: {} vs {}",
sel2,
sel1
);
}
#[test]
fn test_index_selection() {
let optimizer = QueryOptimizer::new();
let pattern1 = TriplePattern {
subject: PatternTerm::Constant("s".to_string()),
predicate: PatternTerm::Variable("p".to_string()),
object: PatternTerm::Variable("o".to_string()),
is_quoted: false,
};
assert_eq!(optimizer.select_index(&pattern1), IndexChoice::SPO);
let pattern2 = TriplePattern {
subject: PatternTerm::Variable("s".to_string()),
predicate: PatternTerm::Constant("p".to_string()),
object: PatternTerm::Variable("o".to_string()),
is_quoted: false,
};
assert_eq!(optimizer.select_index(&pattern2), IndexChoice::POS);
let pattern3 = TriplePattern {
subject: PatternTerm::Variable("s".to_string()),
predicate: PatternTerm::Variable("p".to_string()),
object: PatternTerm::Constant("o".to_string()),
is_quoted: false,
};
assert_eq!(optimizer.select_index(&pattern3), IndexChoice::OSP);
}
#[test]
fn test_cost_estimation() {
let mut plan = QueryPlan::new();
let pattern = TriplePattern {
subject: PatternTerm::Variable("s".to_string()),
predicate: PatternTerm::Variable("p".to_string()),
object: PatternTerm::Variable("o".to_string()),
is_quoted: false,
};
plan.add_operation(QueryOperation::TripleScan {
pattern,
selectivity: 1.0,
cardinality: 1000,
});
let optimizer = QueryOptimizer::new();
let cost = optimizer.estimate_plan_cost(&plan);
assert!(cost > 0.0);
}
#[test]
fn test_plan_caching() {
let mut optimizer = QueryOptimizer::new();
let query = "SELECT * WHERE { ?s ?p ?o }";
let plan1 = optimizer.optimize_query(query).unwrap();
assert_eq!(optimizer.cache_size(), 1);
let plan2 = optimizer.optimize_query(query).unwrap();
assert_eq!(plan1.estimated_cost(), plan2.estimated_cost());
}
}