use crate::algebra::{Term, TriplePattern};
use std::collections::HashMap;
use std::time::Instant;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CardinalityEstimate {
pub min: u64,
pub max: u64,
pub expected: u64,
}
impl CardinalityEstimate {
pub fn new(min: u64, max: u64, expected: u64) -> Self {
Self { min, max, expected }
}
pub fn exact(count: u64) -> Self {
Self {
min: count,
max: count,
expected: count,
}
}
pub fn unknown(triple_count: u64) -> Self {
Self {
min: 0,
max: triple_count,
expected: triple_count / 2 + 1,
}
}
}
impl std::fmt::Display for CardinalityEstimate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}..{}, ~{}]", self.min, self.max, self.expected)
}
}
#[derive(Debug, Clone)]
pub struct PatternCost {
pub estimated_cardinality: u64,
pub selectivity: f64,
pub bound_count: u8,
}
impl PatternCost {
pub fn ordering_score(&self) -> f64 {
let base = (self.estimated_cardinality as f64 + 1.0) * self.selectivity;
let bound_bias = 1.0 / (1.0 + self.bound_count as f64);
base * bound_bias
}
}
#[derive(Debug, Clone, Default)]
pub struct JoinGraphStats {
pub triple_count: u64,
pub distinct_subjects: u64,
pub distinct_predicates: u64,
pub distinct_objects: u64,
pub per_predicate_counts: HashMap<String, u64>,
}
impl JoinGraphStats {
pub fn new(triple_count: u64) -> Self {
Self {
triple_count,
distinct_subjects: (triple_count / 3).max(1),
distinct_predicates: (triple_count / 10).max(1),
distinct_objects: (triple_count / 2).max(1),
per_predicate_counts: HashMap::new(),
}
}
pub fn with_details(
triple_count: u64,
distinct_subjects: u64,
distinct_predicates: u64,
distinct_objects: u64,
per_predicate_counts: HashMap<String, u64>,
) -> Self {
Self {
triple_count,
distinct_subjects,
distinct_predicates,
distinct_objects,
per_predicate_counts,
}
}
pub fn add_predicate_count(&mut self, predicate: impl Into<String>, count: u64) {
self.per_predicate_counts.insert(predicate.into(), count);
}
pub fn predicate_selectivity(&self, predicate_uri: &str) -> f64 {
if self.triple_count == 0 {
return 1.0;
}
let count = self
.per_predicate_counts
.get(predicate_uri)
.copied()
.unwrap_or(self.triple_count / self.distinct_predicates.max(1));
(count as f64) / (self.triple_count as f64)
}
}
pub struct JoinOrderOptimizer {
dp_threshold: usize,
}
impl JoinOrderOptimizer {
pub fn new() -> Self {
Self { dp_threshold: 4 }
}
pub fn with_dp_threshold(mut self, threshold: usize) -> Self {
self.dp_threshold = threshold;
self
}
pub fn bound_variable_count(pattern: &TriplePattern) -> u8 {
let subject_bound = !matches!(pattern.subject, Term::Variable(_));
let predicate_bound = !matches!(pattern.predicate, Term::Variable(_));
let object_bound = !matches!(pattern.object, Term::Variable(_));
subject_bound as u8 + predicate_bound as u8 + object_bound as u8
}
pub fn estimate_cardinality(
pattern: &TriplePattern,
stats: &JoinGraphStats,
) -> CardinalityEstimate {
if stats.triple_count == 0 {
return CardinalityEstimate::exact(0);
}
let bound = Self::bound_variable_count(pattern);
match bound {
3 => {
CardinalityEstimate::new(0, 1, 1)
}
2 => {
let expected = match (&pattern.subject, &pattern.predicate, &pattern.object) {
(Term::Variable(_), _, _) => {
let pred_sel =
Self::predicate_selectivity_from_term(&pattern.predicate, stats);
((stats.triple_count as f64 * pred_sel)
/ stats.distinct_objects.max(1) as f64) as u64
}
(_, Term::Variable(_), _) => {
(stats.triple_count / stats.distinct_subjects.max(1))
/ stats.distinct_objects.max(1)
}
(_, _, Term::Variable(_)) => {
let pred_sel =
Self::predicate_selectivity_from_term(&pattern.predicate, stats);
(stats.triple_count as f64 * pred_sel
/ stats.distinct_subjects.max(1) as f64) as u64
}
_ => 1,
};
let expected = expected.max(1);
CardinalityEstimate::new(0, expected * 2, expected)
}
1 => {
let expected = match (&pattern.subject, &pattern.predicate, &pattern.object) {
(_, Term::Variable(_), Term::Variable(_)) => {
stats.triple_count / stats.distinct_subjects.max(1)
}
(Term::Variable(_), _, Term::Variable(_)) => {
let pred_sel =
Self::predicate_selectivity_from_term(&pattern.predicate, stats);
(stats.triple_count as f64 * pred_sel) as u64
}
(Term::Variable(_), Term::Variable(_), _) => {
stats.triple_count / stats.distinct_objects.max(1)
}
_ => stats.triple_count / 3,
};
let expected = expected.max(1);
CardinalityEstimate::new(0, stats.triple_count, expected)
}
_ => {
CardinalityEstimate::unknown(stats.triple_count)
}
}
}
pub fn pattern_cost(pattern: &TriplePattern, stats: &JoinGraphStats) -> PatternCost {
let bound_count = Self::bound_variable_count(pattern);
let estimate = Self::estimate_cardinality(pattern, stats);
let selectivity = if stats.triple_count > 0 {
(estimate.expected as f64) / (stats.triple_count as f64)
} else {
1.0
};
PatternCost {
estimated_cardinality: estimate.expected,
selectivity: selectivity.clamp(1e-9, 1.0),
bound_count,
}
}
pub fn reorder_joins(
&self,
patterns: &[TriplePattern],
stats: &JoinGraphStats,
) -> Vec<TriplePattern> {
if patterns.len() <= 1 {
return patterns.to_vec();
}
if patterns.len() <= self.dp_threshold {
self.dp_reorder(patterns, stats)
} else {
self.greedy_reorder(patterns, stats)
}
}
fn greedy_reorder(
&self,
patterns: &[TriplePattern],
stats: &JoinGraphStats,
) -> Vec<TriplePattern> {
let mut scored: Vec<(f64, TriplePattern)> = patterns
.iter()
.map(|p| {
let cost = Self::pattern_cost(p, stats);
(cost.ordering_score(), p.clone())
})
.collect();
scored.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
scored.into_iter().map(|(_, p)| p).collect()
}
fn dp_reorder(&self, patterns: &[TriplePattern], stats: &JoinGraphStats) -> Vec<TriplePattern> {
let n = patterns.len();
if n == 0 {
return vec![];
}
let costs: Vec<PatternCost> = patterns
.iter()
.map(|p| Self::pattern_cost(p, stats))
.collect();
let full_mask = (1usize << n) - 1;
let mut dp = vec![(f64::INFINITY, usize::MAX); 1 << n];
let mut parent = vec![(usize::MAX, usize::MAX); 1 << n];
dp[0] = (0.0, usize::MAX);
for mask in 0..=full_mask {
if dp[mask].0 == f64::INFINITY && mask != 0 {
continue;
}
let position = mask.count_ones() as usize;
let weight = (n - position) as f64;
for (i, cost) in costs.iter().enumerate() {
if mask & (1 << i) != 0 {
continue;
}
let new_mask = mask | (1 << i);
let step_cost = cost.ordering_score() * weight;
let new_cost = dp[mask].0 + step_cost;
if new_cost < dp[new_mask].0 {
dp[new_mask] = (new_cost, i);
parent[new_mask] = (mask, i);
}
}
}
let mut order = Vec::with_capacity(n);
let mut mask = full_mask;
while mask != 0 {
let (prev_mask, idx) = parent[mask];
if idx == usize::MAX {
break;
}
order.push(idx);
mask = prev_mask;
}
order.reverse();
order.iter().map(|&i| patterns[i].clone()).collect()
}
fn predicate_selectivity_from_term(term: &Term, stats: &JoinGraphStats) -> f64 {
if let Term::Iri(iri) = term {
stats.predicate_selectivity(iri.as_str())
} else {
1.0 / stats.distinct_predicates.max(1) as f64
}
}
}
impl Default for JoinOrderOptimizer {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct AdaptiveQueryPlan {
pub patterns: Vec<TriplePattern>,
pub estimated_costs: Vec<PatternCost>,
pub reorder_threshold: u64,
current_index: usize,
created_at: Instant,
}
impl AdaptiveQueryPlan {
pub fn new(patterns: Vec<TriplePattern>, stats: &JoinGraphStats) -> Self {
let optimizer = JoinOrderOptimizer::new();
let ordered = optimizer.reorder_joins(&patterns, stats);
let estimated_costs = ordered
.iter()
.map(|p| JoinOrderOptimizer::pattern_cost(p, stats))
.collect();
Self {
patterns: ordered,
estimated_costs,
reorder_threshold: 5,
current_index: 0,
created_at: Instant::now(),
}
}
pub fn with_reorder_threshold(mut self, threshold: u64) -> Self {
self.reorder_threshold = threshold;
self
}
pub fn should_reorder(&self, actual_cardinality: u64, pattern_idx: usize) -> bool {
if pattern_idx >= self.estimated_costs.len() {
return false;
}
let estimated = self.estimated_costs[pattern_idx].estimated_cardinality;
if estimated == 0 {
return actual_cardinality > 0;
}
let ratio = if actual_cardinality > estimated {
actual_cardinality / estimated
} else {
estimated / actual_cardinality.max(1)
};
ratio >= self.reorder_threshold
}
pub fn reorder_from(&mut self, pattern_idx: usize, stats: &JoinGraphStats) {
if pattern_idx >= self.patterns.len() {
return;
}
let remaining = self.patterns[pattern_idx..].to_vec();
let optimizer = JoinOrderOptimizer::new();
let reordered = optimizer.reorder_joins(&remaining, stats);
let new_costs: Vec<PatternCost> = reordered
.iter()
.map(|p| JoinOrderOptimizer::pattern_cost(p, stats))
.collect();
self.patterns.splice(pattern_idx.., reordered);
self.estimated_costs.splice(pattern_idx.., new_costs);
}
pub fn next_pattern(&mut self) -> Option<&TriplePattern> {
if self.current_index < self.patterns.len() {
let p = &self.patterns[self.current_index];
self.current_index += 1;
Some(p)
} else {
None
}
}
pub fn reset(&mut self) {
self.current_index = 0;
}
pub fn remaining(&self) -> usize {
self.patterns.len().saturating_sub(self.current_index)
}
pub fn elapsed(&self) -> std::time::Duration {
self.created_at.elapsed()
}
pub fn len(&self) -> usize {
self.patterns.len()
}
pub fn is_empty(&self) -> bool {
self.patterns.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::algebra::Term;
use oxirs_core::model::NamedNode;
fn iri(s: &str) -> Term {
Term::Iri(NamedNode::new(s).expect("valid IRI"))
}
fn var(name: &str) -> Term {
use oxirs_core::model::Variable;
Term::Variable(Variable::new(name).expect("valid variable name"))
}
fn make_pattern(s: Term, p: Term, o: Term) -> TriplePattern {
TriplePattern {
subject: s,
predicate: p,
object: o,
}
}
fn default_stats() -> JoinGraphStats {
let mut stats = JoinGraphStats::new(10_000);
stats.add_predicate_count("http://ex.org/type", 500);
stats.add_predicate_count("http://ex.org/name", 8_000);
stats.add_predicate_count("http://ex.org/rare", 10);
stats
}
#[test]
fn test_cardinality_estimate_exact() {
let est = CardinalityEstimate::exact(42);
assert_eq!(est.min, 42);
assert_eq!(est.max, 42);
assert_eq!(est.expected, 42);
}
#[test]
fn test_cardinality_estimate_unknown() {
let est = CardinalityEstimate::unknown(1_000);
assert_eq!(est.min, 0);
assert!(est.expected > 0);
assert!(est.max >= est.expected);
}
#[test]
fn test_cardinality_estimate_display() {
let est = CardinalityEstimate::new(1, 100, 50);
let s = format!("{est}");
assert!(s.contains("1..100"));
}
#[test]
fn test_graph_stats_new() {
let stats = JoinGraphStats::new(1_000);
assert_eq!(stats.triple_count, 1_000);
assert!(stats.distinct_subjects > 0);
}
#[test]
fn test_predicate_selectivity_known() {
let mut stats = JoinGraphStats::new(10_000);
stats.add_predicate_count("http://ex.org/p", 1_000);
let sel = stats.predicate_selectivity("http://ex.org/p");
assert!((sel - 0.1).abs() < 1e-9);
}
#[test]
fn test_predicate_selectivity_unknown() {
let stats = JoinGraphStats::new(10_000);
let sel = stats.predicate_selectivity("http://ex.org/unknown");
assert!(sel > 0.0 && sel <= 1.0);
}
#[test]
fn test_predicate_selectivity_zero_triples() {
let stats = JoinGraphStats::new(0);
let sel = stats.predicate_selectivity("http://ex.org/p");
assert_eq!(sel, 1.0);
}
#[test]
fn test_bound_count_all_variables() {
let p = make_pattern(var("s"), var("p"), var("o"));
assert_eq!(JoinOrderOptimizer::bound_variable_count(&p), 0);
}
#[test]
fn test_bound_count_one_bound() {
let p = make_pattern(iri("http://ex.org/s"), var("p"), var("o"));
assert_eq!(JoinOrderOptimizer::bound_variable_count(&p), 1);
}
#[test]
fn test_bound_count_two_bound() {
let p = make_pattern(iri("http://ex.org/s"), iri("http://ex.org/p"), var("o"));
assert_eq!(JoinOrderOptimizer::bound_variable_count(&p), 2);
}
#[test]
fn test_bound_count_all_bound() {
let p = make_pattern(
iri("http://ex.org/s"),
iri("http://ex.org/p"),
iri("http://ex.org/o"),
);
assert_eq!(JoinOrderOptimizer::bound_variable_count(&p), 3);
}
#[test]
fn test_estimate_fully_bound() {
let stats = default_stats();
let p = make_pattern(
iri("http://ex.org/s"),
iri("http://ex.org/p"),
iri("http://ex.org/o"),
);
let est = JoinOrderOptimizer::estimate_cardinality(&p, &stats);
assert_eq!(est.max, 1);
}
#[test]
fn test_estimate_two_bound_spo() {
let stats = default_stats();
let p = make_pattern(iri("http://ex.org/s"), iri("http://ex.org/type"), var("o"));
let est = JoinOrderOptimizer::estimate_cardinality(&p, &stats);
assert!(est.expected > 0);
}
#[test]
fn test_estimate_full_scan() {
let stats = default_stats();
let p = make_pattern(var("s"), var("p"), var("o"));
let est = JoinOrderOptimizer::estimate_cardinality(&p, &stats);
assert_eq!(est.max, stats.triple_count);
}
#[test]
fn test_estimate_zero_graph() {
let stats = JoinGraphStats::new(0);
let p = make_pattern(var("s"), var("p"), var("o"));
let est = JoinOrderOptimizer::estimate_cardinality(&p, &stats);
assert_eq!(est.expected, 0);
}
#[test]
fn test_pattern_cost_ordering_score_fully_bound_lowest() {
let stats = default_stats();
let fully_bound = make_pattern(
iri("http://ex.org/s"),
iri("http://ex.org/p"),
iri("http://ex.org/o"),
);
let full_scan = make_pattern(var("s"), var("p"), var("o"));
let cost_bound = JoinOrderOptimizer::pattern_cost(&fully_bound, &stats);
let cost_scan = JoinOrderOptimizer::pattern_cost(&full_scan, &stats);
assert!(
cost_bound.ordering_score() < cost_scan.ordering_score(),
"fully bound should score lower than full scan"
);
}
#[test]
fn test_pattern_cost_selectivity_range() {
let stats = default_stats();
let p = make_pattern(var("s"), iri("http://ex.org/type"), var("o"));
let cost = JoinOrderOptimizer::pattern_cost(&p, &stats);
assert!(cost.selectivity > 0.0);
assert!(cost.selectivity <= 1.0);
}
#[test]
fn test_reorder_empty() {
let optimizer = JoinOrderOptimizer::new();
let stats = default_stats();
let result = optimizer.reorder_joins(&[], &stats);
assert!(result.is_empty());
}
#[test]
fn test_reorder_single() {
let optimizer = JoinOrderOptimizer::new();
let stats = default_stats();
let p = make_pattern(var("s"), var("p"), var("o"));
let result = optimizer.reorder_joins(std::slice::from_ref(&p), &stats);
assert_eq!(result.len(), 1);
assert_eq!(result[0], p);
}
#[test]
fn test_reorder_places_selective_first() {
let optimizer = JoinOrderOptimizer::new();
let stats = default_stats();
let fully_bound = make_pattern(
iri("http://ex.org/s"),
iri("http://ex.org/type"),
iri("http://ex.org/o"),
);
let full_scan = make_pattern(var("s"), var("p"), var("o"));
let one_bound = make_pattern(iri("http://ex.org/s"), var("p"), var("o"));
let patterns = vec![full_scan.clone(), one_bound.clone(), fully_bound.clone()];
let result = optimizer.reorder_joins(&patterns, &stats);
assert_eq!(result.len(), 3);
assert_eq!(result[0], fully_bound, "fully bound should come first");
}
#[test]
fn test_reorder_greedy_large_list() {
let optimizer = JoinOrderOptimizer::new().with_dp_threshold(2);
let stats = default_stats();
let patterns: Vec<TriplePattern> = (0..6)
.map(|i| {
if i % 2 == 0 {
make_pattern(
iri(&format!("http://ex.org/s{i}")),
iri("http://ex.org/type"),
var("o"),
)
} else {
make_pattern(var("s"), var("p"), var("o"))
}
})
.collect();
let result = optimizer.reorder_joins(&patterns, &stats);
assert_eq!(result.len(), 6);
let first_cost = JoinOrderOptimizer::pattern_cost(&result[0], &stats);
let last_cost = JoinOrderOptimizer::pattern_cost(&result[5], &stats);
assert!(first_cost.ordering_score() <= last_cost.ordering_score());
}
#[test]
fn test_reorder_dp_vs_greedy_equivalence_small() {
let optimizer_dp = JoinOrderOptimizer::new().with_dp_threshold(5);
let optimizer_greedy = JoinOrderOptimizer::new().with_dp_threshold(0);
let stats = default_stats();
let patterns = vec![
make_pattern(var("s"), var("p"), var("o")),
make_pattern(iri("http://ex.org/s"), iri("http://ex.org/type"), var("o")),
make_pattern(iri("http://ex.org/s"), var("p"), var("o")),
];
let dp_result = optimizer_dp.reorder_joins(&patterns, &stats);
let greedy_result = optimizer_greedy.reorder_joins(&patterns, &stats);
assert_eq!(dp_result.len(), greedy_result.len());
}
#[test]
fn test_adaptive_plan_creation() {
let stats = default_stats();
let patterns = vec![
make_pattern(var("s"), var("p"), var("o")),
make_pattern(iri("http://ex.org/s"), iri("http://ex.org/type"), var("o")),
];
let plan = AdaptiveQueryPlan::new(patterns, &stats);
assert_eq!(plan.len(), 2);
assert!(!plan.is_empty());
}
#[test]
fn test_adaptive_plan_should_reorder_large_divergence() {
let stats = default_stats();
let patterns = vec![make_pattern(
iri("http://ex.org/s"),
iri("http://ex.org/type"),
var("o"),
)];
let plan = AdaptiveQueryPlan::new(patterns, &stats).with_reorder_threshold(5);
let actual = plan.estimated_costs[0].estimated_cardinality * 100;
assert!(plan.should_reorder(actual, 0));
}
#[test]
fn test_adaptive_plan_should_not_reorder_small_divergence() {
let stats = default_stats();
let patterns = vec![make_pattern(
iri("http://ex.org/s"),
iri("http://ex.org/type"),
var("o"),
)];
let plan = AdaptiveQueryPlan::new(patterns, &stats).with_reorder_threshold(10);
let estimated = plan.estimated_costs[0].estimated_cardinality;
let actual = estimated * 2;
assert!(!plan.should_reorder(actual, 0));
}
#[test]
fn test_adaptive_plan_reorder_from() {
let stats = default_stats();
let patterns = vec![
make_pattern(var("s"), var("p"), var("o")),
make_pattern(var("s2"), var("p2"), var("o2")),
make_pattern(iri("http://ex.org/s"), iri("http://ex.org/type"), var("o")),
];
let mut plan = AdaptiveQueryPlan::new(patterns, &stats);
plan.reorder_from(1, &stats);
assert_eq!(plan.len(), 3);
}
#[test]
fn test_adaptive_plan_next_pattern() {
let stats = default_stats();
let patterns = vec![
make_pattern(var("s"), var("p"), var("o")),
make_pattern(iri("http://ex.org/s"), var("p"), var("o")),
];
let mut plan = AdaptiveQueryPlan::new(patterns, &stats);
assert!(plan.next_pattern().is_some());
assert!(plan.next_pattern().is_some());
assert!(plan.next_pattern().is_none());
}
#[test]
fn test_adaptive_plan_reset() {
let stats = default_stats();
let patterns = vec![make_pattern(var("s"), var("p"), var("o"))];
let mut plan = AdaptiveQueryPlan::new(patterns, &stats);
plan.next_pattern();
assert_eq!(plan.remaining(), 0);
plan.reset();
assert_eq!(plan.remaining(), 1);
}
#[test]
fn test_adaptive_plan_remaining() {
let stats = default_stats();
let patterns = vec![
make_pattern(var("s"), var("p"), var("o")),
make_pattern(var("a"), var("b"), var("c")),
make_pattern(var("x"), var("y"), var("z")),
];
let mut plan = AdaptiveQueryPlan::new(patterns, &stats);
assert_eq!(plan.remaining(), 3);
plan.next_pattern();
assert_eq!(plan.remaining(), 2);
}
#[test]
fn test_adaptive_plan_elapsed() {
let stats = default_stats();
let patterns = vec![make_pattern(var("s"), var("p"), var("o"))];
let plan = AdaptiveQueryPlan::new(patterns, &stats);
assert!(plan.elapsed().as_secs() < 5);
}
#[test]
fn test_adaptive_plan_out_of_bounds_reorder() {
let stats = default_stats();
let patterns = vec![make_pattern(var("s"), var("p"), var("o"))];
let mut plan = AdaptiveQueryPlan::new(patterns, &stats);
plan.reorder_from(100, &stats);
assert_eq!(plan.len(), 1);
}
#[test]
fn test_adaptive_plan_should_reorder_zero_estimated() {
let stats = JoinGraphStats::new(0);
let patterns = vec![make_pattern(var("s"), var("p"), var("o"))];
let plan = AdaptiveQueryPlan::new(patterns, &stats);
assert!(plan.should_reorder(1, 0));
assert!(!plan.should_reorder(0, 0));
}
}