use rayon::prelude::*;
use std::collections::HashMap;
use crate::{StarResult, StarTerm, StarTriple};
pub type StarBinding = HashMap<String, StarTerm>;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum StarBgpTerm {
Concrete(StarTerm),
Variable(String),
}
impl StarBgpTerm {
pub fn var(name: impl Into<String>) -> Self {
Self::Variable(name.into())
}
pub fn term(t: StarTerm) -> Self {
Self::Concrete(t)
}
pub fn variable_name(&self) -> Option<&str> {
match self {
Self::Variable(name) => Some(name.as_str()),
Self::Concrete(_) => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StarBgpTriple {
pub subject: StarBgpTerm,
pub predicate: StarBgpTerm,
pub object: StarBgpTerm,
}
impl StarBgpTriple {
pub fn new(subject: StarBgpTerm, predicate: StarBgpTerm, object: StarBgpTerm) -> Self {
Self {
subject,
predicate,
object,
}
}
pub fn variables(&self) -> Vec<String> {
let mut vars = Vec::with_capacity(3);
if let Some(v) = self.subject.variable_name() {
vars.push(v.to_owned());
}
if let Some(v) = self.predicate.variable_name() {
vars.push(v.to_owned());
}
if let Some(v) = self.object.variable_name() {
vars.push(v.to_owned());
}
vars
}
}
pub trait StarTripleStore: Send + Sync {
fn match_pattern(
&self,
pattern: &StarBgpTriple,
seed: Option<&StarBinding>,
) -> StarResult<Vec<StarBinding>>;
fn triple_count(&self) -> usize;
}
pub struct ParallelStarBgpEvaluator {
parallelism: usize,
nested_loop_threshold: usize,
}
impl ParallelStarBgpEvaluator {
pub fn new(parallelism: usize) -> Self {
Self {
parallelism: parallelism.max(1),
nested_loop_threshold: 1024,
}
}
pub fn with_nested_loop_threshold(mut self, threshold: usize) -> Self {
self.nested_loop_threshold = threshold;
self
}
pub fn evaluate(
&self,
patterns: &[StarBgpTriple],
store: &dyn StarTripleStore,
) -> StarResult<Vec<StarBinding>> {
if patterns.is_empty() {
return Ok(vec![StarBinding::new()]);
}
let mut ordered: Vec<&StarBgpTriple> = patterns.iter().collect();
ordered.sort_by_key(|p| p.variables().len());
let first = ordered[0];
let mut result = store.match_pattern(first, None)?;
for pattern in ordered.iter().skip(1) {
let join_vars: Vec<String> = shared_variables_with_bindings(pattern, &result);
if join_vars.is_empty() {
let right = store.match_pattern(pattern, None)?;
result = cross_product(result, right);
} else {
let right = self.evaluate_with_seeds(pattern, &result, store)?;
result = if result.len() <= self.nested_loop_threshold {
Self::nested_loop_join(result, right, &join_vars)
} else {
Self::hash_join(result, right, &join_vars)
};
}
}
Ok(result)
}
fn evaluate_with_seeds(
&self,
pattern: &StarBgpTriple,
seeds: &[StarBinding],
store: &dyn StarTripleStore,
) -> StarResult<Vec<StarBinding>> {
let chunk_size = ((seeds.len() / self.parallelism) + 1).max(16);
let chunks: Vec<&[StarBinding]> = seeds.chunks(chunk_size).collect();
let results: Vec<StarResult<Vec<StarBinding>>> = chunks
.par_iter()
.map(|chunk| {
let mut local: Vec<StarBinding> = Vec::new();
for seed in *chunk {
let rows = store.match_pattern(pattern, Some(seed))?;
local.extend(rows);
}
Ok(local)
})
.collect();
let mut flat: Vec<StarBinding> = Vec::new();
for r in results {
flat.extend(r?);
}
Ok(flat)
}
fn nested_loop_join(
left: Vec<StarBinding>,
right: Vec<StarBinding>,
join_vars: &[String],
) -> Vec<StarBinding> {
let mut out = Vec::new();
for lb in &left {
for rb in &right {
if let Some(merged) = merge_bindings_compatible(lb, rb, join_vars) {
out.push(merged);
}
}
}
out
}
fn hash_join(
left: Vec<StarBinding>,
right: Vec<StarBinding>,
join_vars: &[String],
) -> Vec<StarBinding> {
let mut table: HashMap<Vec<Option<StarTerm>>, Vec<StarBinding>> = HashMap::new();
for rb in right {
let key: Vec<Option<StarTerm>> = join_vars.iter().map(|v| rb.get(v).cloned()).collect();
table.entry(key).or_default().push(rb);
}
let mut out = Vec::new();
for lb in &left {
let key: Vec<Option<StarTerm>> = join_vars.iter().map(|v| lb.get(v).cloned()).collect();
if let Some(right_matches) = table.get(&key) {
for rb in right_matches {
let mut merged = lb.clone();
for (k, v) in rb {
if !merged.contains_key(k.as_str()) {
merged.insert(k.clone(), v.clone());
}
}
out.push(merged);
}
}
}
out
}
}
fn shared_variables_with_bindings(
pattern: &StarBgpTriple,
bindings: &[StarBinding],
) -> Vec<String> {
let pattern_vars = pattern.variables();
if bindings.is_empty() || pattern_vars.is_empty() {
return Vec::new();
}
let bound_vars: std::collections::HashSet<&str> = bindings
.iter()
.flat_map(|b| b.keys().map(|k| k.as_str()))
.collect();
pattern_vars
.into_iter()
.filter(|v| bound_vars.contains(v.as_str()))
.collect()
}
pub fn shared_variables(a: &StarBgpTriple, b: &StarBgpTriple) -> Vec<String> {
let a_vars: std::collections::HashSet<String> = a.variables().into_iter().collect();
b.variables()
.into_iter()
.filter(|v| a_vars.contains(v))
.collect()
}
pub fn term_matches(pattern: &StarBgpTerm, candidate: &StarTerm, bindings: &StarBinding) -> bool {
match pattern {
StarBgpTerm::Concrete(t) => t == candidate,
StarBgpTerm::Variable(v) => match bindings.get(v.as_str()) {
Some(bound) => bound == candidate,
None => true,
},
}
}
pub fn merge_bindings_compatible(
a: &StarBinding,
b: &StarBinding,
join_vars: &[String],
) -> Option<StarBinding> {
for var in join_vars {
match (a.get(var.as_str()), b.get(var.as_str())) {
(Some(va), Some(vb)) if va != vb => return None,
_ => {}
}
}
let mut merged = a.clone();
for (k, v) in b {
merged.entry(k.clone()).or_insert_with(|| v.clone());
}
Some(merged)
}
fn cross_product(left: Vec<StarBinding>, right: Vec<StarBinding>) -> Vec<StarBinding> {
let mut out = Vec::with_capacity(left.len() * right.len());
for lb in &left {
for rb in &right {
let mut merged = lb.clone();
for (k, v) in rb {
merged.insert(k.clone(), v.clone());
}
out.push(merged);
}
}
out
}
pub struct InMemoryStarStore {
triples: Vec<StarTriple>,
}
impl InMemoryStarStore {
pub fn new(triples: Vec<StarTriple>) -> Self {
Self { triples }
}
pub fn insert(&mut self, triple: StarTriple) {
self.triples.push(triple);
}
}
impl StarTripleStore for InMemoryStarStore {
fn match_pattern(
&self,
pattern: &StarBgpTriple,
seed: Option<&StarBinding>,
) -> StarResult<Vec<StarBinding>> {
let empty_binding = StarBinding::new();
let seed_ref = seed.unwrap_or(&empty_binding);
let bindings: Vec<StarBinding> = self
.triples
.par_iter()
.filter_map(|triple| {
if !term_matches(&pattern.subject, &triple.subject, seed_ref) {
return None;
}
if !term_matches(&pattern.predicate, &triple.predicate, seed_ref) {
return None;
}
if !term_matches(&pattern.object, &triple.object, seed_ref) {
return None;
}
let mut new_bindings = seed_ref.clone();
bind_if_var(&pattern.subject, &triple.subject, &mut new_bindings);
bind_if_var(&pattern.predicate, &triple.predicate, &mut new_bindings);
bind_if_var(&pattern.object, &triple.object, &mut new_bindings);
Some(new_bindings)
})
.collect();
Ok(bindings)
}
fn triple_count(&self) -> usize {
self.triples.len()
}
}
fn bind_if_var(pattern: &StarBgpTerm, concrete: &StarTerm, bindings: &mut StarBinding) {
if let StarBgpTerm::Variable(v) = pattern {
bindings
.entry(v.clone())
.or_insert_with(|| concrete.clone());
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_triple(s: &str, p: &str, o: &str) -> StarTriple {
StarTriple::new(
StarTerm::iri(s).expect("valid IRI"),
StarTerm::iri(p).expect("valid IRI"),
StarTerm::iri(o).expect("valid IRI"),
)
}
fn make_store() -> InMemoryStarStore {
InMemoryStarStore::new(vec![
make_triple(
"http://ex.org/alice",
"http://ex.org/knows",
"http://ex.org/bob",
),
make_triple(
"http://ex.org/alice",
"http://ex.org/age",
"http://ex.org/30",
),
make_triple(
"http://ex.org/bob",
"http://ex.org/knows",
"http://ex.org/carol",
),
make_triple(
"http://ex.org/carol",
"http://ex.org/age",
"http://ex.org/25",
),
])
}
#[test]
fn test_empty_bgp_yields_one_empty_binding() {
let store = make_store();
let evaluator = ParallelStarBgpEvaluator::new(2);
let results = evaluator.evaluate(&[], &store).expect("evaluate ok");
assert_eq!(results.len(), 1);
assert!(results[0].is_empty());
}
#[test]
fn test_single_pattern_with_concrete_subject() {
let store = make_store();
let evaluator = ParallelStarBgpEvaluator::new(2);
let alice = StarTerm::iri("http://ex.org/alice").expect("valid IRI");
let pattern = StarBgpTriple::new(
StarBgpTerm::term(alice),
StarBgpTerm::var("p"),
StarBgpTerm::var("o"),
);
let results = evaluator.evaluate(&[pattern], &store).expect("evaluate ok");
assert_eq!(results.len(), 2);
}
#[test]
fn test_two_pattern_join_via_shared_variable() {
let store = make_store();
let evaluator = ParallelStarBgpEvaluator::new(2);
let knows = StarTerm::iri("http://ex.org/knows").expect("valid IRI");
let age = StarTerm::iri("http://ex.org/age").expect("valid IRI");
let p1 = StarBgpTriple::new(
StarBgpTerm::var("person"),
StarBgpTerm::term(knows),
StarBgpTerm::var("friend"),
);
let p2 = StarBgpTriple::new(
StarBgpTerm::var("friend"),
StarBgpTerm::term(age),
StarBgpTerm::var("age"),
);
let results = evaluator.evaluate(&[p1, p2], &store).expect("evaluate ok");
assert!(!results.is_empty(), "should have at least one result");
for row in &results {
assert!(row.contains_key("person"), "row should have ?person");
assert!(row.contains_key("friend"), "row should have ?friend");
assert!(row.contains_key("age"), "row should have ?age");
}
}
#[test]
fn test_quoted_triple_pattern_matching() {
let inner = StarTriple::new(
StarTerm::iri("http://ex.org/alice").expect("valid IRI"),
StarTerm::iri("http://ex.org/age").expect("valid IRI"),
StarTerm::literal("30").expect("valid literal"),
);
let outer = StarTriple::new(
StarTerm::quoted_triple(inner.clone()),
StarTerm::iri("http://ex.org/certainty").expect("valid IRI"),
StarTerm::literal("0.9").expect("valid literal"),
);
let store = InMemoryStarStore::new(vec![outer]);
let evaluator = ParallelStarBgpEvaluator::new(2);
let certainty = StarTerm::iri("http://ex.org/certainty").expect("valid IRI");
let pattern = StarBgpTriple::new(
StarBgpTerm::term(StarTerm::quoted_triple(inner)),
StarBgpTerm::term(certainty),
StarBgpTerm::var("c"),
);
let results = evaluator.evaluate(&[pattern], &store).expect("evaluate ok");
assert_eq!(results.len(), 1);
let c = results[0].get("c").expect("?c should be bound");
assert_eq!(*c, StarTerm::literal("0.9").expect("valid literal"));
}
#[test]
fn test_hash_join_used_for_large_sets() {
let mut triples = Vec::new();
for i in 0..2000_usize {
triples.push(StarTriple::new(
StarTerm::iri(&format!("http://ex.org/s{i}")).expect("valid IRI"),
StarTerm::iri("http://ex.org/knows").expect("valid IRI"),
StarTerm::iri(&format!("http://ex.org/o{i}")).expect("valid IRI"),
));
triples.push(StarTriple::new(
StarTerm::iri(&format!("http://ex.org/o{i}")).expect("valid IRI"),
StarTerm::iri("http://ex.org/age").expect("valid IRI"),
StarTerm::literal(&format!("{i}")).expect("valid literal"),
));
}
let store = InMemoryStarStore::new(triples);
let evaluator = ParallelStarBgpEvaluator::new(4);
let knows = StarTerm::iri("http://ex.org/knows").expect("valid IRI");
let age = StarTerm::iri("http://ex.org/age").expect("valid IRI");
let p1 = StarBgpTriple::new(
StarBgpTerm::var("s"),
StarBgpTerm::term(knows),
StarBgpTerm::var("o"),
);
let p2 = StarBgpTriple::new(
StarBgpTerm::var("o"),
StarBgpTerm::term(age),
StarBgpTerm::var("a"),
);
let results = evaluator.evaluate(&[p1, p2], &store).expect("evaluate ok");
assert_eq!(results.len(), 2000);
}
#[test]
fn test_shared_variables_helper() {
let a = StarBgpTriple::new(
StarBgpTerm::var("x"),
StarBgpTerm::var("p"),
StarBgpTerm::var("y"),
);
let b = StarBgpTriple::new(
StarBgpTerm::var("y"),
StarBgpTerm::var("q"),
StarBgpTerm::var("z"),
);
let shared = shared_variables(&a, &b);
assert_eq!(shared, vec!["y".to_owned()]);
}
#[test]
fn test_term_matches_variable() {
let alice = StarTerm::iri("http://ex.org/alice").expect("valid IRI");
let mut bindings = StarBinding::new();
assert!(term_matches(&StarBgpTerm::var("x"), &alice, &bindings));
bindings.insert("x".to_owned(), alice.clone());
assert!(term_matches(&StarBgpTerm::var("x"), &alice, &bindings));
let bob = StarTerm::iri("http://ex.org/bob").expect("valid IRI");
assert!(!term_matches(&StarBgpTerm::var("x"), &bob, &bindings));
}
}