use crate::query::advanced_statistics::AdvancedStatisticsCollector;
use crate::query::algebra::{AlgebraTriplePattern, GraphPattern, TermPattern};
use crate::query::query_plan_visualizer::QueryPlanNode;
use crate::OxirsError;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
pub struct CostBasedOptimizer {
advanced_stats: Arc<AdvancedStatisticsCollector>,
stats: Arc<StatisticsCollector>,
cost_config: CostConfiguration,
query_count: AtomicU64,
}
impl CostBasedOptimizer {
pub fn new() -> Self {
Self {
advanced_stats: Arc::new(AdvancedStatisticsCollector::new()),
stats: Arc::new(StatisticsCollector::new()),
cost_config: CostConfiguration::default(),
query_count: AtomicU64::new(0),
}
}
pub fn with_config(config: CostConfiguration) -> Self {
Self {
advanced_stats: Arc::new(AdvancedStatisticsCollector::new()),
stats: Arc::new(StatisticsCollector::new()),
cost_config: config,
query_count: AtomicU64::new(0),
}
}
pub fn optimize_pattern(&self, pattern: &GraphPattern) -> Result<OptimizedPlan, OxirsError> {
self.query_count.fetch_add(1, Ordering::Relaxed);
match pattern {
GraphPattern::Bgp(patterns) => self.optimize_bgp(patterns),
GraphPattern::Join(left, right) => self.optimize_join(left, right),
GraphPattern::LeftJoin { left, right, .. } => self.optimize_left_join(left, right),
GraphPattern::Filter { expr: _, inner } => self.optimize_filter(inner),
GraphPattern::Union(left, right) => self.optimize_union(left, right),
GraphPattern::Extend { inner, .. } => self.optimize_pattern(inner),
GraphPattern::Minus(left, right) => self.optimize_minus(left, right),
GraphPattern::Service { inner, .. } => self.optimize_pattern(inner),
GraphPattern::Graph { inner, .. } => self.optimize_pattern(inner),
GraphPattern::OrderBy { inner, .. } => self.optimize_pattern(inner),
GraphPattern::Project { inner, .. } => self.optimize_pattern(inner),
GraphPattern::Distinct(inner) => self.optimize_pattern(inner),
GraphPattern::Reduced(inner) => self.optimize_pattern(inner),
GraphPattern::Slice { inner, .. } => self.optimize_pattern(inner),
GraphPattern::Group { inner, .. } => self.optimize_pattern(inner),
GraphPattern::Path {
subject,
path,
object,
..
} => {
self.optimize_property_path(subject, path, object)
}
GraphPattern::Values { .. } => {
Ok(OptimizedPlan::empty())
}
}
}
fn optimize_bgp(&self, patterns: &[AlgebraTriplePattern]) -> Result<OptimizedPlan, OxirsError> {
if patterns.is_empty() {
return Ok(OptimizedPlan::empty());
}
let mut pattern_costs: Vec<(usize, PatternCost)> = patterns
.iter()
.enumerate()
.map(|(idx, pattern)| {
let cost = self.estimate_pattern_cost(pattern);
(idx, cost)
})
.collect();
pattern_costs.sort_by(|a, b| {
a.1.estimated_cardinality
.partial_cmp(&b.1.estimated_cardinality)
.unwrap_or(std::cmp::Ordering::Equal)
});
let optimal_order: Vec<usize> = pattern_costs.iter().map(|(idx, _)| *idx).collect();
let total_cost: f64 = pattern_costs.iter().map(|(_, cost)| cost.total_cost).sum();
Ok(OptimizedPlan {
join_order: optimal_order,
estimated_cost: total_cost,
estimated_cardinality: self.estimate_bgp_cardinality(patterns, &pattern_costs),
use_index: self.should_use_index(patterns),
parallel_execution: self.should_parallelize(patterns, total_cost),
optimizations: vec![Optimization::JoinReordering],
})
}
fn optimize_join(
&self,
left: &GraphPattern,
right: &GraphPattern,
) -> Result<OptimizedPlan, OxirsError> {
let left_plan = self.optimize_pattern(left)?;
let right_plan = self.optimize_pattern(right)?;
let (first, second, swapped) =
if left_plan.estimated_cardinality < right_plan.estimated_cardinality {
(left_plan, right_plan, false)
} else {
(right_plan, left_plan, true)
};
let join_cost = self.estimate_join_cost(&first, &second);
Ok(OptimizedPlan {
join_order: if swapped { vec![1, 0] } else { vec![0, 1] },
estimated_cost: first.estimated_cost + second.estimated_cost + join_cost,
estimated_cardinality: self.estimate_join_cardinality(&first, &second),
use_index: true,
parallel_execution: join_cost > self.cost_config.parallel_threshold,
optimizations: vec![Optimization::JoinReordering, Optimization::HashJoin],
})
}
fn optimize_left_join(
&self,
left: &GraphPattern,
right: &GraphPattern,
) -> Result<OptimizedPlan, OxirsError> {
let left_plan = self.optimize_pattern(left)?;
let right_plan = self.optimize_pattern(right)?;
let join_cost = self.estimate_join_cost(&left_plan, &right_plan);
Ok(OptimizedPlan {
join_order: vec![0, 1], estimated_cost: left_plan.estimated_cost + right_plan.estimated_cost + join_cost,
estimated_cardinality: left_plan.estimated_cardinality, use_index: true,
parallel_execution: false, optimizations: vec![Optimization::IndexNLJ],
})
}
fn optimize_filter(&self, inner: &GraphPattern) -> Result<OptimizedPlan, OxirsError> {
let mut inner_plan = self.optimize_pattern(inner)?;
inner_plan.estimated_cardinality = ((inner_plan.estimated_cardinality as f64)
* self.cost_config.filter_selectivity)
as usize;
inner_plan.optimizations.push(Optimization::FilterPushdown);
Ok(inner_plan)
}
fn optimize_union(
&self,
left: &GraphPattern,
right: &GraphPattern,
) -> Result<OptimizedPlan, OxirsError> {
let left_plan = self.optimize_pattern(left)?;
let right_plan = self.optimize_pattern(right)?;
Ok(OptimizedPlan {
join_order: vec![0, 1],
estimated_cost: left_plan.estimated_cost + right_plan.estimated_cost,
estimated_cardinality: left_plan.estimated_cardinality
+ right_plan.estimated_cardinality,
use_index: true,
parallel_execution: true, optimizations: vec![Optimization::ParallelUnion],
})
}
fn optimize_minus(
&self,
left: &GraphPattern,
right: &GraphPattern,
) -> Result<OptimizedPlan, OxirsError> {
let left_plan = self.optimize_pattern(left)?;
let right_plan = self.optimize_pattern(right)?;
Ok(OptimizedPlan {
join_order: vec![0, 1],
estimated_cost: left_plan.estimated_cost + right_plan.estimated_cost,
estimated_cardinality: ((left_plan.estimated_cardinality as f64) * 0.7) as usize, use_index: true,
parallel_execution: false,
optimizations: vec![Optimization::HashAntiJoin],
})
}
fn optimize_property_path(
&self,
_subject: &TermPattern,
path: &crate::query::algebra::PropertyPath,
_object: &TermPattern,
) -> Result<OptimizedPlan, OxirsError> {
use crate::query::algebra::PropertyPath;
let (complexity_factor, estimated_card) = self.estimate_path_complexity(path);
let base_cost = self.stats.total_triples() as f64 * self.cost_config.sequential_scan_cost;
let estimated_cost = base_cost * complexity_factor;
let parallel_execution = complexity_factor > 10.0 && estimated_card > 1000;
Ok(OptimizedPlan {
join_order: vec![],
estimated_cost,
estimated_cardinality: estimated_card,
use_index: matches!(path, PropertyPath::Predicate(_)), parallel_execution,
optimizations: vec![Optimization::PropertyPathEvaluation],
})
}
fn estimate_path_complexity(&self, path: &crate::query::algebra::PropertyPath) -> (f64, usize) {
use crate::query::algebra::PropertyPath;
match path {
PropertyPath::Predicate(_) => {
let card = (self.stats.total_triples() / 10).max(1); (1.0, card)
}
PropertyPath::Inverse(inner) => {
let (inner_complexity, inner_card) = self.estimate_path_complexity(inner);
(inner_complexity * 1.2, inner_card)
}
PropertyPath::Sequence(left, right) => {
let (left_complexity, left_card) = self.estimate_path_complexity(left);
let (right_complexity, _) = self.estimate_path_complexity(right);
let complexity = left_complexity * right_complexity;
let card = (left_card as f64 * 0.5) as usize;
(complexity, card.max(1))
}
PropertyPath::Alternative(left, right) => {
let (left_complexity, left_card) = self.estimate_path_complexity(left);
let (right_complexity, right_card) = self.estimate_path_complexity(right);
let complexity = (left_complexity + right_complexity) / 2.0;
let card = left_card + right_card;
(complexity, card)
}
PropertyPath::ZeroOrMore(inner) => {
let (inner_complexity, _) = self.estimate_path_complexity(inner);
let complexity = inner_complexity * 50.0;
let card = (self.stats.total_triples() as f64 * 0.3) as usize;
(complexity.min(100.0), card.max(1))
}
PropertyPath::OneOrMore(inner) => {
let (inner_complexity, _) = self.estimate_path_complexity(inner);
let complexity = inner_complexity * 30.0;
let card = (self.stats.total_triples() as f64 * 0.2) as usize;
(complexity.min(100.0), card.max(1))
}
PropertyPath::ZeroOrOne(inner) => {
let (inner_complexity, inner_card) = self.estimate_path_complexity(inner);
(inner_complexity * 1.5, (inner_card as f64 * 1.2) as usize)
}
PropertyPath::NegatedPropertySet(props) => {
let complexity = 2.0 + (props.len() as f64 * 0.5);
let card = (self.stats.total_triples() as f64 * 0.8) as usize;
(complexity, card.max(1))
}
}
}
fn estimate_pattern_cost(&self, pattern: &AlgebraTriplePattern) -> PatternCost {
let estimated_card =
if let Some(hist_card) = self.advanced_stats.estimate_cardinality(pattern) {
hist_card
} else {
let selectivity = self.calculate_selectivity(pattern);
(self.stats.total_triples() as f64 * selectivity) as usize
};
let io_cost = if self.can_use_index(pattern) {
estimated_card as f64 * self.cost_config.index_access_cost
} else {
self.stats.total_triples() as f64 * self.cost_config.sequential_scan_cost
};
let cpu_cost = estimated_card as f64 * self.cost_config.cpu_tuple_cost;
PatternCost {
estimated_cardinality: estimated_card,
io_cost,
cpu_cost,
total_cost: io_cost + cpu_cost,
}
}
fn calculate_selectivity(&self, pattern: &AlgebraTriplePattern) -> f64 {
let mut selectivity = 1.0;
match &pattern.subject {
TermPattern::Variable(_) => selectivity *= 0.5, _ => selectivity *= 0.01, }
match &pattern.predicate {
TermPattern::Variable(_) => selectivity *= 0.5,
_ => selectivity *= 0.1,
}
match &pattern.object {
TermPattern::Variable(_) => selectivity *= 0.5,
_ => selectivity *= 0.01,
}
selectivity
}
fn can_use_index(&self, pattern: &AlgebraTriplePattern) -> bool {
!matches!(pattern.subject, TermPattern::Variable(_))
|| !matches!(pattern.predicate, TermPattern::Variable(_))
|| !matches!(pattern.object, TermPattern::Variable(_))
}
fn estimate_bgp_cardinality(
&self,
_patterns: &[AlgebraTriplePattern],
pattern_costs: &[(usize, PatternCost)],
) -> usize {
if pattern_costs.is_empty() {
return 0;
}
let mut cardinality = pattern_costs[0].1.estimated_cardinality as f64;
for (_, cost) in pattern_costs.iter().skip(1) {
let join_selectivity = 0.1; cardinality *= join_selectivity * (cost.estimated_cardinality as f64);
}
cardinality.max(1.0) as usize
}
fn should_use_index(&self, patterns: &[AlgebraTriplePattern]) -> bool {
patterns.iter().any(|p| self.can_use_index(p))
}
fn should_parallelize(&self, _patterns: &[AlgebraTriplePattern], total_cost: f64) -> bool {
total_cost > self.cost_config.parallel_threshold
}
fn estimate_join_cost(&self, left: &OptimizedPlan, right: &OptimizedPlan) -> f64 {
let build_cost = left.estimated_cardinality as f64 * self.cost_config.hash_build_cost;
let probe_cost = right.estimated_cardinality as f64 * self.cost_config.hash_probe_cost;
build_cost + probe_cost
}
fn estimate_join_cardinality(&self, left: &OptimizedPlan, right: &OptimizedPlan) -> usize {
let join_selectivity = self
.advanced_stats
.estimate_join_selectivity(left.estimated_cardinality, right.estimated_cardinality);
let product = left.estimated_cardinality as f64 * right.estimated_cardinality as f64;
(product * join_selectivity).max(1.0) as usize
}
pub fn stats(&self) -> OptimizerStats {
OptimizerStats {
queries_optimized: self.query_count.load(Ordering::Relaxed),
total_triples: self.stats.total_triples(),
}
}
pub fn advanced_stats(&self) -> crate::query::advanced_statistics::AdvancedStatistics {
self.advanced_stats.get_statistics()
}
pub fn get_pattern_history(
&self,
pattern: &AlgebraTriplePattern,
) -> Vec<crate::query::advanced_statistics::PatternExecution> {
self.advanced_stats.get_pattern_history(pattern)
}
pub fn clear_statistics(&self) {
self.advanced_stats.clear();
self.query_count.store(0, Ordering::Relaxed);
}
pub fn update_stats(&self, pattern: &GraphPattern, actual_cardinality: usize) {
self.update_stats_with_time(pattern, actual_cardinality, 0);
}
pub fn update_stats_with_time(
&self,
pattern: &GraphPattern,
actual_cardinality: usize,
execution_time_ms: u64,
) {
if let GraphPattern::Bgp(patterns) = pattern {
for triple_pattern in patterns {
self.advanced_stats.record_pattern_execution(
triple_pattern,
actual_cardinality,
execution_time_ms,
);
}
}
const ALPHA: f64 = 0.2;
let pattern_hash = self.compute_pattern_hash(pattern);
let mut pattern_stats = self
.stats
.pattern_stats
.write()
.expect("lock should not be poisoned");
let entry = pattern_stats
.entry(pattern_hash)
.or_insert_with(|| PatternStats {
execution_count: 0,
avg_cardinality: actual_cardinality as f64,
last_cardinality: actual_cardinality,
});
entry.avg_cardinality =
ALPHA * (actual_cardinality as f64) + (1.0 - ALPHA) * entry.avg_cardinality;
entry.last_cardinality = actual_cardinality;
entry.execution_count += 1;
if entry.execution_count > 5 {
let deviation =
(actual_cardinality as f64 - entry.avg_cardinality).abs() / entry.avg_cardinality;
if deviation > 0.5 {
tracing::debug!(
pattern_hash = pattern_hash,
actual = actual_cardinality,
avg = entry.avg_cardinality,
deviation_pct = deviation * 100.0,
"Significant cardinality deviation detected"
);
}
}
}
pub fn record_join_execution(
&self,
left_pattern: &GraphPattern,
right_pattern: &GraphPattern,
left_cardinality: usize,
right_cardinality: usize,
result_cardinality: usize,
) {
self.advanced_stats.record_join_execution(
left_pattern,
right_pattern,
left_cardinality,
right_cardinality,
result_cardinality,
);
}
fn compute_pattern_hash(&self, pattern: &GraphPattern) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
match pattern {
GraphPattern::Bgp(patterns) => {
"bgp".hash(&mut hasher);
patterns.len().hash(&mut hasher);
}
GraphPattern::Join(_, _) => {
"join".hash(&mut hasher);
}
GraphPattern::LeftJoin { .. } => {
"leftjoin".hash(&mut hasher);
}
GraphPattern::Union(_, _) => {
"union".hash(&mut hasher);
}
GraphPattern::Filter { .. } => {
"filter".hash(&mut hasher);
}
GraphPattern::Extend { .. } => {
"extend".hash(&mut hasher);
}
GraphPattern::Group { .. } => {
"group".hash(&mut hasher);
}
GraphPattern::Service { .. } => {
"service".hash(&mut hasher);
}
GraphPattern::Minus(_, _) => {
"minus".hash(&mut hasher);
}
GraphPattern::Graph { .. } => {
"graph".hash(&mut hasher);
}
GraphPattern::OrderBy { .. } => {
"orderby".hash(&mut hasher);
}
GraphPattern::Project { .. } => {
"project".hash(&mut hasher);
}
GraphPattern::Distinct(_) => {
"distinct".hash(&mut hasher);
}
GraphPattern::Reduced(_) => {
"reduced".hash(&mut hasher);
}
GraphPattern::Slice { .. } => {
"slice".hash(&mut hasher);
}
GraphPattern::Path { .. } => {
"path".hash(&mut hasher);
}
GraphPattern::Values { .. } => {
"values".hash(&mut hasher);
}
}
hasher.finish()
}
pub fn get_learned_cardinality(&self, pattern: &GraphPattern) -> Option<f64> {
let pattern_hash = self.compute_pattern_hash(pattern);
let pattern_stats = self
.stats
.pattern_stats
.read()
.expect("lock should not be poisoned");
pattern_stats.get(&pattern_hash).map(|s| s.avg_cardinality)
}
pub fn to_visual_plan(&self, pattern: &GraphPattern, plan: &OptimizedPlan) -> QueryPlanNode {
self.pattern_to_visual_node(pattern, Some(plan))
}
fn pattern_to_visual_node(
&self,
pattern: &GraphPattern,
plan: Option<&OptimizedPlan>,
) -> QueryPlanNode {
match pattern {
GraphPattern::Bgp(patterns) => {
let mut node =
QueryPlanNode::new("BGP", format!("{} triple patterns", patterns.len()))
.with_metadata("pattern_count", patterns.len().to_string());
if let Some(p) = plan {
node = node
.with_estimated_cardinality(p.estimated_cardinality)
.with_estimated_cost(p.estimated_cost);
if p.use_index {
node = node.with_index("SPO/POS/OSP");
}
}
for (i, idx) in plan
.map(|p| &p.join_order)
.unwrap_or(&vec![])
.iter()
.enumerate()
{
if let Some(triple_pattern) = patterns.get(*idx) {
let pattern_desc = format!(
"{} {} {}",
Self::term_pattern_to_string(&triple_pattern.subject),
Self::term_pattern_to_string(&triple_pattern.predicate),
Self::term_pattern_to_string(&triple_pattern.object)
);
let cost = self.estimate_pattern_cost(triple_pattern);
let mut child = QueryPlanNode::new("TriplePattern", pattern_desc)
.with_estimated_cardinality(cost.estimated_cardinality)
.with_estimated_cost(cost.total_cost)
.with_metadata("order", (i + 1).to_string());
if self.can_use_index(triple_pattern) {
child = child.with_index(self.suggest_index(triple_pattern));
}
node.add_child(child);
}
}
node
}
GraphPattern::Join(left, right) => {
let mut node = QueryPlanNode::new("Join", "Hash Join");
if let Some(p) = plan {
node = node
.with_estimated_cardinality(p.estimated_cardinality)
.with_estimated_cost(p.estimated_cost);
if p.parallel_execution {
node = node.with_metadata("execution", "parallel");
}
}
let left_plan = self.optimize_pattern(left).ok();
let right_plan = self.optimize_pattern(right).ok();
let swapped = plan.map(|p| p.join_order == vec![1, 0]).unwrap_or(false);
if swapped {
node.add_child(self.pattern_to_visual_node(right, right_plan.as_ref()));
node.add_child(self.pattern_to_visual_node(left, left_plan.as_ref()));
} else {
node.add_child(self.pattern_to_visual_node(left, left_plan.as_ref()));
node.add_child(self.pattern_to_visual_node(right, right_plan.as_ref()));
}
node
}
GraphPattern::LeftJoin { left, right, .. } => {
let mut node = QueryPlanNode::new("LeftJoin", "Optional Pattern");
if let Some(p) = plan {
node = node
.with_estimated_cardinality(p.estimated_cardinality)
.with_estimated_cost(p.estimated_cost);
}
let left_plan = self.optimize_pattern(left).ok();
let right_plan = self.optimize_pattern(right).ok();
node.add_child(self.pattern_to_visual_node(left, left_plan.as_ref()));
node.add_child(self.pattern_to_visual_node(right, right_plan.as_ref()));
node
}
GraphPattern::Filter { expr: _, inner } => {
let inner_plan = self.optimize_pattern(inner).ok();
let mut node = QueryPlanNode::new("Filter", "Filter expression");
if let Some(p) = plan {
node = node
.with_estimated_cardinality(p.estimated_cardinality)
.with_estimated_cost(p.estimated_cost)
.with_metadata(
"selectivity",
format!("{:.2}", self.cost_config.filter_selectivity),
);
}
node.add_child(self.pattern_to_visual_node(inner, inner_plan.as_ref()));
node
}
GraphPattern::Union(left, right) => {
let mut node = QueryPlanNode::new("Union", "Parallel Union");
if let Some(p) = plan {
node = node
.with_estimated_cardinality(p.estimated_cardinality)
.with_estimated_cost(p.estimated_cost)
.with_metadata("execution", "parallel");
}
let left_plan = self.optimize_pattern(left).ok();
let right_plan = self.optimize_pattern(right).ok();
node.add_child(self.pattern_to_visual_node(left, left_plan.as_ref()));
node.add_child(self.pattern_to_visual_node(right, right_plan.as_ref()));
node
}
GraphPattern::Minus(left, right) => {
let mut node = QueryPlanNode::new("Minus", "Hash Anti-Join");
if let Some(p) = plan {
node = node
.with_estimated_cardinality(p.estimated_cardinality)
.with_estimated_cost(p.estimated_cost);
}
let left_plan = self.optimize_pattern(left).ok();
let right_plan = self.optimize_pattern(right).ok();
node.add_child(self.pattern_to_visual_node(left, left_plan.as_ref()));
node.add_child(self.pattern_to_visual_node(right, right_plan.as_ref()));
node
}
GraphPattern::Extend { inner, .. } => {
let inner_plan = self.optimize_pattern(inner).ok();
let mut node = QueryPlanNode::new("Extend", "Variable binding");
if let Some(p) = plan {
node = node.with_estimated_cardinality(p.estimated_cardinality);
}
node.add_child(self.pattern_to_visual_node(inner, inner_plan.as_ref()));
node
}
GraphPattern::Service { .. } => QueryPlanNode::new("Service", "Federated query")
.with_metadata("type", "remote")
.with_metadata("note", "actual_cardinality_depends_on_remote_endpoint"),
GraphPattern::Graph { inner, .. } => {
let inner_plan = self.optimize_pattern(inner).ok();
let mut node = QueryPlanNode::new("Graph", "Named graph access");
if let Some(p) = plan {
node = node.with_estimated_cardinality(p.estimated_cardinality);
}
node.add_child(self.pattern_to_visual_node(inner, inner_plan.as_ref()));
node
}
GraphPattern::OrderBy { inner, .. } => {
let inner_plan = self.optimize_pattern(inner).ok();
let mut node = QueryPlanNode::new("OrderBy", "Sort operation");
if let Some(p) = plan {
node = node.with_estimated_cardinality(p.estimated_cardinality);
}
node.add_child(self.pattern_to_visual_node(inner, inner_plan.as_ref()));
node
}
GraphPattern::Project { inner, variables } => {
let inner_plan = self.optimize_pattern(inner).ok();
let mut node =
QueryPlanNode::new("Project", format!("{} variables", variables.len()));
if let Some(p) = plan {
node = node.with_estimated_cardinality(p.estimated_cardinality);
}
node.add_child(self.pattern_to_visual_node(inner, inner_plan.as_ref()));
node
}
GraphPattern::Distinct(inner) => {
let inner_plan = self.optimize_pattern(inner).ok();
let mut node = QueryPlanNode::new("Distinct", "Remove duplicates");
if let Some(p) = plan {
node = node.with_estimated_cardinality(p.estimated_cardinality);
}
node.add_child(self.pattern_to_visual_node(inner, inner_plan.as_ref()));
node
}
GraphPattern::Reduced(inner) => {
let inner_plan = self.optimize_pattern(inner).ok();
let mut node = QueryPlanNode::new("Reduced", "Best-effort deduplication");
if let Some(p) = plan {
node = node.with_estimated_cardinality(p.estimated_cardinality);
}
node.add_child(self.pattern_to_visual_node(inner, inner_plan.as_ref()));
node
}
GraphPattern::Slice {
inner,
offset,
limit,
} => {
let inner_plan = self.optimize_pattern(inner).ok();
let limit_str = limit
.map(|l| l.to_string())
.unwrap_or_else(|| "∞".to_string());
let mut node =
QueryPlanNode::new("Slice", format!("LIMIT {} OFFSET {}", limit_str, offset));
if let Some(p) = plan {
let limited_card = if let Some(lim) = limit {
(*lim).min(p.estimated_cardinality.saturating_sub(*offset))
} else {
p.estimated_cardinality.saturating_sub(*offset)
};
node = node.with_estimated_cardinality(limited_card);
}
node.add_child(self.pattern_to_visual_node(inner, inner_plan.as_ref()));
node
}
GraphPattern::Group { inner, .. } => {
let inner_plan = self.optimize_pattern(inner).ok();
let mut node = QueryPlanNode::new("Group", "GROUP BY aggregation");
if let Some(p) = plan {
node = node.with_estimated_cardinality(p.estimated_cardinality);
}
node.add_child(self.pattern_to_visual_node(inner, inner_plan.as_ref()));
node
}
GraphPattern::Path { .. } => {
QueryPlanNode::new("PropertyPath", "Property path traversal")
.with_metadata("note", "cardinality_depends_on_path_length")
}
GraphPattern::Values { bindings, .. } => {
QueryPlanNode::new("Values", format!("{} bindings", bindings.len()))
.with_estimated_cardinality(bindings.len())
.with_estimated_cost(0.0)
}
}
}
fn term_pattern_to_string(term: &TermPattern) -> String {
match term {
TermPattern::Variable(v) => format!("?{}", v),
TermPattern::NamedNode(n) => {
let uri = n.as_str();
if let Some(local) = uri.rsplit('/').next() {
local.to_string()
} else {
uri.to_string()
}
}
TermPattern::BlankNode(b) => format!("_:{}", b),
TermPattern::Literal(l) => {
let value = l.value();
if value.len() > 20 {
format!("\"{}...\"", &value[..17])
} else {
format!("\"{}\"", value)
}
}
TermPattern::QuotedTriple(t) => format!(
"<<{} {} {}>>",
Self::term_pattern_to_string(&t.subject),
Self::term_pattern_to_string(&t.predicate),
Self::term_pattern_to_string(&t.object)
),
}
}
fn suggest_index(&self, pattern: &AlgebraTriplePattern) -> String {
let s_bound = !matches!(pattern.subject, TermPattern::Variable(_));
let p_bound = !matches!(pattern.predicate, TermPattern::Variable(_));
let o_bound = !matches!(pattern.object, TermPattern::Variable(_));
match (s_bound, p_bound, o_bound) {
(true, _, _) => "SPO",
(false, true, _) => "POS",
(false, false, true) => "OSP",
_ => "FullScan",
}
.to_string()
}
}
impl Default for CostBasedOptimizer {
fn default() -> Self {
Self::new()
}
}
struct StatisticsCollector {
total_triples: AtomicU64,
#[allow(dead_code)]
predicate_stats: HashMap<String, PredicateStats>,
pattern_stats: Arc<RwLock<HashMap<u64, PatternStats>>>,
}
impl StatisticsCollector {
fn new() -> Self {
Self {
total_triples: AtomicU64::new(1_000_000), predicate_stats: HashMap::new(),
pattern_stats: Arc::new(RwLock::new(HashMap::new())),
}
}
fn total_triples(&self) -> usize {
self.total_triples.load(Ordering::Relaxed) as usize
}
}
#[derive(Debug, Clone)]
struct PatternStats {
execution_count: usize,
avg_cardinality: f64,
last_cardinality: usize,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct PredicateStats {
count: usize,
avg_objects_per_subject: f64,
distinct_subjects: usize,
distinct_objects: usize,
}
#[derive(Debug, Clone)]
pub struct CostConfiguration {
pub sequential_scan_cost: f64,
pub index_access_cost: f64,
pub cpu_tuple_cost: f64,
pub hash_build_cost: f64,
pub hash_probe_cost: f64,
pub filter_selectivity: f64,
pub parallel_threshold: f64,
}
impl Default for CostConfiguration {
fn default() -> Self {
Self {
sequential_scan_cost: 1.0,
index_access_cost: 0.01,
cpu_tuple_cost: 0.001,
hash_build_cost: 0.005,
hash_probe_cost: 0.002,
filter_selectivity: 0.3,
parallel_threshold: 10000.0,
}
}
}
#[derive(Debug, Clone)]
struct PatternCost {
estimated_cardinality: usize,
#[allow(dead_code)]
io_cost: f64,
#[allow(dead_code)]
cpu_cost: f64,
total_cost: f64,
}
#[derive(Debug, Clone)]
pub struct OptimizedPlan {
pub join_order: Vec<usize>,
pub estimated_cost: f64,
pub estimated_cardinality: usize,
pub use_index: bool,
pub parallel_execution: bool,
pub optimizations: Vec<Optimization>,
}
impl OptimizedPlan {
fn empty() -> Self {
Self {
join_order: vec![],
estimated_cost: 0.0,
estimated_cardinality: 0,
use_index: false,
parallel_execution: false,
optimizations: vec![],
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Optimization {
JoinReordering,
HashJoin,
FilterPushdown,
IndexAccess,
IndexNLJ,
ParallelUnion,
HashAntiJoin,
PropertyPathEvaluation,
}
#[derive(Debug, Clone)]
pub struct OptimizerStats {
pub queries_optimized: u64,
pub total_triples: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{NamedNode, Variable};
use crate::query::algebra::TermPattern;
fn create_test_pattern(
subject: TermPattern,
predicate: TermPattern,
object: TermPattern,
) -> AlgebraTriplePattern {
AlgebraTriplePattern {
subject,
predicate,
object,
}
}
#[test]
fn test_optimizer_creation() {
let optimizer = CostBasedOptimizer::new();
let stats = optimizer.stats();
assert_eq!(stats.queries_optimized, 0);
assert!(stats.total_triples > 0);
}
#[test]
fn test_selectivity_calculation() {
let optimizer = CostBasedOptimizer::new();
let pattern1 = create_test_pattern(
TermPattern::Variable(Variable::new("s").expect("valid variable name")),
TermPattern::Variable(Variable::new("p").expect("valid variable name")),
TermPattern::Variable(Variable::new("o").expect("valid variable name")),
);
let cost1 = optimizer.estimate_pattern_cost(&pattern1);
assert!(cost1.estimated_cardinality > 0);
let pattern2 = create_test_pattern(
TermPattern::NamedNode(NamedNode::new("http://example.org/s").expect("valid IRI")),
TermPattern::NamedNode(NamedNode::new("http://example.org/p").expect("valid IRI")),
TermPattern::NamedNode(NamedNode::new("http://example.org/o").expect("valid IRI")),
);
let cost2 = optimizer.estimate_pattern_cost(&pattern2);
assert!(cost2.estimated_cardinality < cost1.estimated_cardinality);
}
#[test]
fn test_cost_configuration() {
let config = CostConfiguration::default();
assert!(config.index_access_cost < config.sequential_scan_cost);
assert!(config.cpu_tuple_cost < config.index_access_cost);
}
#[test]
fn test_can_use_index() {
let optimizer = CostBasedOptimizer::new();
let pattern1 = create_test_pattern(
TermPattern::NamedNode(NamedNode::new("http://example.org/s").expect("valid IRI")),
TermPattern::Variable(Variable::new("p").expect("valid variable name")),
TermPattern::Variable(Variable::new("o").expect("valid variable name")),
);
assert!(optimizer.can_use_index(&pattern1));
let pattern2 = create_test_pattern(
TermPattern::Variable(Variable::new("s").expect("valid variable name")),
TermPattern::Variable(Variable::new("p").expect("valid variable name")),
TermPattern::Variable(Variable::new("o").expect("valid variable name")),
);
assert!(!optimizer.can_use_index(&pattern2));
}
#[test]
fn test_empty_plan() {
let plan = OptimizedPlan::empty();
assert_eq!(plan.join_order.len(), 0);
assert_eq!(plan.estimated_cost, 0.0);
assert_eq!(plan.estimated_cardinality, 0);
}
}