use crate::algebra::{Binding, Solution, Term, Variable};
use anyhow::{bail, Result};
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, PartialEq)]
pub struct ValuesClause {
pub variables: Vec<Variable>,
pub data: Vec<Vec<Option<Term>>>,
}
impl ValuesClause {
pub fn new(variables: Vec<Variable>) -> Self {
Self {
variables,
data: Vec::new(),
}
}
pub fn with_data(variables: Vec<Variable>, data: Vec<Vec<Option<Term>>>) -> Result<Self> {
for row in &data {
if row.len() != variables.len() {
bail!(
"VALUES data row has {} values but {} variables defined",
row.len(),
variables.len()
);
}
}
Ok(Self { variables, data })
}
pub fn add_row(&mut self, row: Vec<Option<Term>>) -> Result<()> {
if row.len() != self.variables.len() {
bail!(
"Row has {} values but {} variables defined",
row.len(),
self.variables.len()
);
}
self.data.push(row);
Ok(())
}
pub fn row_count(&self) -> usize {
self.data.len()
}
pub fn variable_count(&self) -> usize {
self.variables.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn to_solution(&self) -> Solution {
let mut solution = Solution::new();
for row in &self.data {
let mut binding = Binding::new();
for (var, value) in self.variables.iter().zip(row.iter()) {
if let Some(term) = value {
binding.insert(var.clone(), term.clone());
}
}
solution.push(binding);
}
solution
}
pub fn statistics(&self) -> ValuesStatistics {
let mut undef_count = 0;
let mut distinct_values_per_var = HashMap::new();
for var in &self.variables {
distinct_values_per_var.insert(var.clone(), HashSet::new());
}
for row in &self.data {
for (var, value) in self.variables.iter().zip(row.iter()) {
if let Some(term) = value {
distinct_values_per_var
.get_mut(var)
.expect("variable should exist in initialized map")
.insert(term.clone());
} else {
undef_count += 1;
}
}
}
let selectivity: HashMap<Variable, f64> = distinct_values_per_var
.iter()
.map(|(var, values)| {
let selectivity = if self.data.is_empty() {
1.0
} else {
values.len() as f64 / self.data.len() as f64
};
(var.clone(), selectivity)
})
.collect();
ValuesStatistics {
row_count: self.data.len(),
variable_count: self.variables.len(),
undef_count,
selectivity,
estimated_memory: self.estimate_memory_usage(),
}
}
fn estimate_memory_usage(&self) -> usize {
self.data.len() * self.variables.len() * 32
}
}
#[derive(Debug, Clone)]
pub struct ValuesStatistics {
pub row_count: usize,
pub variable_count: usize,
pub undef_count: usize,
pub selectivity: HashMap<Variable, f64>,
pub estimated_memory: usize,
}
pub struct ValuesOptimizer;
impl ValuesOptimizer {
pub fn optimize(values: &ValuesClause) -> OptimizedValues {
let stats = values.statistics();
let strategy = if values.row_count() <= 10 {
ValuesExecutionStrategy::Inline
} else if values.row_count() <= 1000 {
ValuesExecutionStrategy::HashIndex
} else {
ValuesExecutionStrategy::Materialized
};
let pushable = values.row_count() < 100 && stats.undef_count == 0;
let most_selective = stats
.selectivity
.iter()
.min_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
.map(|(var, _)| var.clone());
OptimizedValues {
original: values.clone(),
strategy,
pushable,
most_selective_var: most_selective,
statistics: stats,
}
}
pub fn suggest_variable_order(values: &ValuesClause) -> Vec<Variable> {
let stats = values.statistics();
let mut vars_with_selectivity: Vec<_> = stats
.selectivity
.iter()
.map(|(var, sel)| (var.clone(), *sel))
.collect();
vars_with_selectivity
.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
vars_with_selectivity
.into_iter()
.map(|(var, _)| var)
.collect()
}
}
#[derive(Debug, Clone)]
pub struct OptimizedValues {
pub original: ValuesClause,
pub strategy: ValuesExecutionStrategy,
pub pushable: bool,
pub most_selective_var: Option<Variable>,
pub statistics: ValuesStatistics,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValuesExecutionStrategy {
Inline,
HashIndex,
Materialized,
}
pub struct IndexedValues {
values: ValuesClause,
indexes: HashMap<Variable, HashMap<Term, Vec<usize>>>,
}
impl IndexedValues {
pub fn new(values: ValuesClause) -> Self {
let mut indexes = HashMap::new();
for (var_idx, var) in values.variables.iter().enumerate() {
let mut var_index: HashMap<Term, Vec<usize>> = HashMap::new();
for (row_idx, row) in values.data.iter().enumerate() {
if let Some(Some(term)) = row.get(var_idx) {
var_index.entry(term.clone()).or_default().push(row_idx);
}
}
indexes.insert(var.clone(), var_index);
}
Self { values, indexes }
}
pub fn lookup(&self, var: &Variable, term: &Term) -> Vec<Binding> {
if let Some(var_index) = self.indexes.get(var) {
if let Some(row_indices) = var_index.get(term) {
return row_indices
.iter()
.filter_map(|&idx| self.row_to_binding(idx))
.collect();
}
}
Vec::new()
}
pub fn probe(&self, binding: &Binding) -> Vec<Binding> {
let mut candidates: Option<HashSet<usize>> = None;
for (var, term) in binding {
if let Some(var_index) = self.indexes.get(var) {
if let Some(row_indices) = var_index.get(term) {
let row_set: HashSet<usize> = row_indices.iter().copied().collect();
candidates = Some(if let Some(existing) = candidates {
existing.intersection(&row_set).copied().collect()
} else {
row_set
});
}
}
}
if let Some(candidate_rows) = candidates {
candidate_rows
.into_iter()
.filter_map(|idx| self.row_to_binding(idx))
.collect()
} else {
(0..self.values.data.len())
.filter_map(|idx| self.row_to_binding(idx))
.collect()
}
}
fn row_to_binding(&self, row_idx: usize) -> Option<Binding> {
let row = self.values.data.get(row_idx)?;
let mut binding = Binding::new();
for (var, value) in self.values.variables.iter().zip(row.iter()) {
if let Some(term) = value {
binding.insert(var.clone(), term.clone());
}
}
Some(binding)
}
pub fn all_bindings(&self) -> Vec<Binding> {
(0..self.values.data.len())
.filter_map(|idx| self.row_to_binding(idx))
.collect()
}
pub fn statistics(&self) -> ValuesStatistics {
self.values.statistics()
}
}
pub struct ValuesJoinOptimizer;
impl ValuesJoinOptimizer {
pub fn optimize_join(
values: &ValuesClause,
other_variables: &HashSet<Variable>,
) -> JoinStrategy {
let _stats = values.statistics();
let values_vars: HashSet<_> = values.variables.iter().collect();
let shared_vars: Vec<_> = values_vars
.intersection(&other_variables.iter().collect::<HashSet<_>>())
.map(|&v| v.clone())
.collect();
if shared_vars.is_empty() {
return JoinStrategy::CrossProduct;
}
if values.row_count() <= 10 {
JoinStrategy::NestedLoop
} else if shared_vars.len() == 1 {
JoinStrategy::IndexNestedLoop {
index_var: shared_vars[0].clone(),
}
} else {
JoinStrategy::HashJoin {
join_vars: shared_vars,
}
}
}
pub fn can_push_values(values: &ValuesClause, join_vars: &[Variable]) -> bool {
if values.row_count() >= 100 {
return false;
}
let stats = values.statistics();
if stats.undef_count > 0 {
return false;
}
values.variables.iter().any(|v| join_vars.contains(v))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JoinStrategy {
CrossProduct,
NestedLoop,
IndexNestedLoop { index_var: Variable },
HashJoin { join_vars: Vec<Variable> },
}
pub struct ValuesExecutor {
indexed_cache: HashMap<String, IndexedValues>,
}
impl ValuesExecutor {
pub fn new() -> Self {
Self {
indexed_cache: HashMap::new(),
}
}
pub fn execute(&mut self, values: &ValuesClause) -> Result<Solution> {
Ok(values.to_solution())
}
pub fn execute_indexed(&mut self, values: &ValuesClause) -> Result<IndexedValues> {
let cache_key = self.compute_cache_key(values);
if let Some(indexed) = self.indexed_cache.get(&cache_key) {
return Ok(IndexedValues {
values: indexed.values.clone(),
indexes: indexed.indexes.clone(),
});
}
let indexed = IndexedValues::new(values.clone());
self.indexed_cache
.insert(cache_key, indexed.clone_structure());
Ok(indexed)
}
pub fn join_indexed(&mut self, values: &ValuesClause, solution: Solution) -> Result<Solution> {
let indexed = self.execute_indexed(values)?;
let mut result = Solution::new();
for binding in solution {
let matches = indexed.probe(&binding);
for values_binding in &matches {
let mut merged = binding.clone();
for (var, term) in values_binding {
if let Some(existing) = merged.get(var) {
if existing != term {
continue;
}
} else {
merged.insert(var.clone(), term.clone());
}
}
result.push(merged);
}
if matches.is_empty() && !has_shared_vars(values, &binding) {
result.push(binding);
}
}
Ok(result)
}
fn compute_cache_key(&self, values: &ValuesClause) -> String {
format!("{:?}", values)
}
}
impl Default for ValuesExecutor {
fn default() -> Self {
Self::new()
}
}
impl IndexedValues {
fn clone_structure(&self) -> Self {
Self {
values: self.values.clone(),
indexes: self.indexes.clone(),
}
}
}
fn has_shared_vars(values: &ValuesClause, binding: &Binding) -> bool {
values.variables.iter().any(|v| binding.contains_key(v))
}
pub struct ValuesBuilder {
variables: Vec<Variable>,
rows: Vec<Vec<Option<Term>>>,
}
impl ValuesBuilder {
pub fn new() -> Self {
Self {
variables: Vec::new(),
rows: Vec::new(),
}
}
pub fn add_variable(mut self, var: Variable) -> Self {
self.variables.push(var);
self
}
pub fn add_variables(mut self, vars: Vec<Variable>) -> Self {
self.variables.extend(vars);
self
}
pub fn add_row(mut self, values: Vec<Option<Term>>) -> Result<Self> {
if values.len() != self.variables.len() {
bail!(
"Row has {} values but {} variables defined",
values.len(),
self.variables.len()
);
}
self.rows.push(values);
Ok(self)
}
pub fn build(self) -> Result<ValuesClause> {
ValuesClause::with_data(self.variables, self.rows)
}
}
impl Default for ValuesBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use oxirs_core::model::NamedNode;
fn create_test_variable(name: &str) -> Variable {
Variable::new(name).unwrap()
}
fn create_test_iri(iri: &str) -> Term {
Term::Iri(NamedNode::new(iri).unwrap())
}
#[test]
fn test_values_clause_creation() {
let var_x = create_test_variable("x");
let var_y = create_test_variable("y");
let mut values = ValuesClause::new(vec![var_x.clone(), var_y.clone()]);
assert_eq!(values.variable_count(), 2);
assert_eq!(values.row_count(), 0);
assert!(values.is_empty());
values
.add_row(vec![
Some(create_test_iri("http://example.org/a")),
Some(create_test_iri("http://example.org/b")),
])
.unwrap();
assert_eq!(values.row_count(), 1);
assert!(!values.is_empty());
}
#[test]
fn test_values_with_undef() {
let var_x = create_test_variable("x");
let var_y = create_test_variable("y");
let values = ValuesClause::with_data(
vec![var_x.clone(), var_y.clone()],
vec![
vec![
Some(create_test_iri("http://example.org/a")),
Some(create_test_iri("http://example.org/b")),
],
vec![
Some(create_test_iri("http://example.org/c")),
None, ],
],
)
.unwrap();
let solution = values.to_solution();
assert_eq!(solution.len(), 2);
assert_eq!(solution[0].len(), 2);
assert_eq!(solution[1].len(), 1);
assert!(solution[1].contains_key(&var_x));
assert!(!solution[1].contains_key(&var_y));
}
#[test]
fn test_values_statistics() {
let var_x = create_test_variable("x");
let values = ValuesClause::with_data(
vec![var_x.clone()],
vec![
vec![Some(create_test_iri("http://example.org/a"))],
vec![Some(create_test_iri("http://example.org/a"))],
vec![Some(create_test_iri("http://example.org/b"))],
],
)
.unwrap();
let stats = values.statistics();
assert_eq!(stats.row_count, 3);
assert_eq!(stats.variable_count, 1);
assert_eq!(stats.undef_count, 0);
let selectivity = stats.selectivity.get(&var_x).unwrap();
assert!((selectivity - 0.666).abs() < 0.01);
}
#[test]
fn test_values_optimizer() {
let var_x = create_test_variable("x");
let values = ValuesClause::with_data(
vec![var_x],
vec![
vec![Some(create_test_iri("http://example.org/a"))],
vec![Some(create_test_iri("http://example.org/b"))],
],
)
.unwrap();
let optimized = ValuesOptimizer::optimize(&values);
assert_eq!(optimized.strategy, ValuesExecutionStrategy::Inline);
assert!(optimized.pushable);
}
#[test]
fn test_indexed_values() {
let var_x = create_test_variable("x");
let var_y = create_test_variable("y");
let values = ValuesClause::with_data(
vec![var_x.clone(), var_y.clone()],
vec![
vec![
Some(create_test_iri("http://example.org/a")),
Some(create_test_iri("http://example.org/1")),
],
vec![
Some(create_test_iri("http://example.org/b")),
Some(create_test_iri("http://example.org/2")),
],
],
)
.unwrap();
let indexed = IndexedValues::new(values);
let results = indexed.lookup(&var_x, &create_test_iri("http://example.org/a"));
assert_eq!(results.len(), 1);
assert_eq!(results[0].len(), 2);
let all = indexed.all_bindings();
assert_eq!(all.len(), 2);
}
#[test]
fn test_indexed_values_probe() {
let var_x = create_test_variable("x");
let var_y = create_test_variable("y");
let values = ValuesClause::with_data(
vec![var_x.clone(), var_y.clone()],
vec![
vec![
Some(create_test_iri("http://example.org/a")),
Some(create_test_iri("http://example.org/1")),
],
vec![
Some(create_test_iri("http://example.org/a")),
Some(create_test_iri("http://example.org/2")),
],
],
)
.unwrap();
let indexed = IndexedValues::new(values);
let mut probe_binding = Binding::new();
probe_binding.insert(var_x.clone(), create_test_iri("http://example.org/a"));
let results = indexed.probe(&probe_binding);
assert_eq!(results.len(), 2); }
#[test]
fn test_values_join_optimizer() {
let var_x = create_test_variable("x");
let data: Vec<Vec<Option<Term>>> = (0..15)
.map(|i| {
vec![Some(create_test_iri(&format!(
"http://example.org/item{}",
i
)))]
})
.collect();
let values = ValuesClause::with_data(vec![var_x.clone()], data).unwrap();
let other_vars = [var_x.clone()].into_iter().collect();
let strategy = ValuesJoinOptimizer::optimize_join(&values, &other_vars);
assert_eq!(strategy, JoinStrategy::IndexNestedLoop { index_var: var_x });
}
#[test]
fn test_values_executor() {
let var_x = create_test_variable("x");
let values = ValuesClause::with_data(
vec![var_x],
vec![
vec![Some(create_test_iri("http://example.org/a"))],
vec![Some(create_test_iri("http://example.org/b"))],
],
)
.unwrap();
let mut executor = ValuesExecutor::new();
let solution = executor.execute(&values).unwrap();
assert_eq!(solution.len(), 2);
}
#[test]
fn test_values_builder() {
let var_x = create_test_variable("x");
let var_y = create_test_variable("y");
let values = ValuesBuilder::new()
.add_variable(var_x)
.add_variable(var_y)
.add_row(vec![
Some(create_test_iri("http://example.org/a")),
Some(create_test_iri("http://example.org/1")),
])
.unwrap()
.build()
.unwrap();
assert_eq!(values.variable_count(), 2);
assert_eq!(values.row_count(), 1);
}
#[test]
fn test_values_invalid_row() {
let var_x = create_test_variable("x");
let mut values = ValuesClause::new(vec![var_x]);
let result = values.add_row(vec![
Some(create_test_iri("http://example.org/a")),
Some(create_test_iri("http://example.org/b")),
]);
assert!(result.is_err());
}
#[test]
fn test_variable_reordering() {
let var_x = create_test_variable("x");
let var_y = create_test_variable("y");
let values = ValuesClause::with_data(
vec![var_x.clone(), var_y.clone()],
vec![
vec![
Some(create_test_iri("http://example.org/a")),
Some(create_test_iri("http://example.org/1")),
],
vec![
Some(create_test_iri("http://example.org/a")),
Some(create_test_iri("http://example.org/2")),
],
vec![
Some(create_test_iri("http://example.org/b")),
Some(create_test_iri("http://example.org/3")),
],
],
)
.unwrap();
let ordered = ValuesOptimizer::suggest_variable_order(&values);
assert_eq!(ordered.len(), 2);
assert_eq!(ordered[0], var_x);
}
#[test]
fn test_can_push_values() {
let var_x = create_test_variable("x");
let values = ValuesClause::with_data(
vec![var_x.clone()],
vec![vec![Some(create_test_iri("http://example.org/a"))]],
)
.unwrap();
assert!(ValuesJoinOptimizer::can_push_values(&values, &[var_x]));
}
#[test]
fn test_large_values_strategy() {
let var_x = create_test_variable("x");
let mut data = Vec::new();
for i in 0..2000 {
data.push(vec![Some(create_test_iri(&format!(
"http://example.org/{}",
i
)))]);
}
let values = ValuesClause::with_data(vec![var_x], data).unwrap();
let optimized = ValuesOptimizer::optimize(&values);
assert_eq!(optimized.strategy, ValuesExecutionStrategy::Materialized);
assert!(!optimized.pushable); }
}