use crate::frozen::{FrozenIndexedDataset, TermId};
use crate::native_exec;
use crate::profile;
use oxigraph::sparql::{PreparedSparqlQuery, QueryResults, SparqlEvaluator};
use oxigraph::store::Store;
use oxrdf::{
Graph, GraphName, GraphNameRef, Literal, NamedNode, NamedOrBlankNode, Quad, QuadRef, Term,
Triple, Variable,
};
use shifty_algebra::{Path, SparqlConstraint};
use shifty_opt::{NativeQueryPlan, QueryForm, lower_query_with_stats};
use spargebra::algebra::{
AggregateExpression, Expression, GraphPattern, OrderExpression, PropertyPathExpression,
};
use spargebra::term::{NamedNodePattern, TermPattern, TriplePattern};
use spargebra::{Query, SparqlParser};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::rc::Rc;
pub(crate) use crate::frozen::SHAPES_GRAPH_IRI;
pub(crate) struct SparqlExecutor {
store: Option<Store>,
frozen: Option<FrozenIndexedDataset>,
prepared: RefCell<HashMap<String, PreparedSparqlQuery>>,
parsed: RefCell<HashMap<String, Query>>,
compiled: RefCell<HashMap<CompileKey, Rc<Compiled>>>,
constructs: RefCell<HashMap<String, Rc<CompiledConstruct>>>,
shapes_graph: Option<NamedNode>,
}
#[derive(PartialEq, Eq, Hash)]
struct CompileKey {
query: String,
path: Option<String>,
shape: Option<String>,
}
struct Compiled {
plan: Option<NativeQueryPlan>,
query: Query,
batched: RefCell<Option<BatchedResults>>,
}
struct BatchedResults {
covered: HashSet<Term>,
map: HashMap<Term, Vec<SparqlViolation>>,
}
struct CompiledConstruct {
plan: Option<NativeQueryPlan>,
template: Vec<TriplePattern>,
}
#[derive(Debug, Clone)]
pub(crate) struct SparqlViolation {
pub value: Option<Term>,
pub path: Option<Term>,
pub message: Option<Term>,
pub bindings: HashMap<String, Term>,
}
impl SparqlExecutor {
pub fn new(graph: &Graph) -> Result<Self, String> {
Self::build(graph, None)
}
fn build(context: &Graph, shapes: Option<&Graph>) -> Result<Self, String> {
let store = Store::new().map_err(|e| e.to_string())?;
store
.extend(context.iter().map(|triple| {
Quad::new(
triple.subject.into_owned(),
triple.predicate.into_owned(),
triple.object.into_owned(),
GraphName::DefaultGraph,
)
}))
.map_err(|e| e.to_string())?;
let shapes_graph = match shapes {
Some(shapes) => {
let iri = NamedNode::new(SHAPES_GRAPH_IRI).expect("static IRI is valid");
store
.extend(shapes.iter().map(|triple| {
Quad::new(
triple.subject.into_owned(),
triple.predicate.into_owned(),
triple.object.into_owned(),
GraphName::NamedNode(iri.clone()),
)
}))
.map_err(|e| e.to_string())?;
Some(iri)
}
None => None,
};
Ok(Self {
store: Some(store),
frozen: None,
prepared: RefCell::new(HashMap::new()),
parsed: RefCell::new(HashMap::new()),
compiled: RefCell::new(HashMap::new()),
constructs: RefCell::new(HashMap::new()),
shapes_graph,
})
}
pub fn from_frozen(frozen: FrozenIndexedDataset, has_shapes_graph: bool) -> Self {
Self {
store: None,
frozen: Some(frozen),
prepared: RefCell::new(HashMap::new()),
parsed: RefCell::new(HashMap::new()),
compiled: RefCell::new(HashMap::new()),
constructs: RefCell::new(HashMap::new()),
shapes_graph: has_shapes_graph
.then(|| NamedNode::new(SHAPES_GRAPH_IRI).expect("static IRI is valid")),
}
}
pub(crate) fn frozen(&self) -> Option<&FrozenIndexedDataset> {
self.frozen.as_ref()
}
pub fn insert(&self, triple: &Triple) -> Result<(), String> {
self.store()?
.insert(QuadRef::new(
triple.subject.as_ref(),
triple.predicate.as_ref(),
triple.object.as_ref(),
GraphNameRef::DefaultGraph,
))
.map_err(|e| e.to_string())
}
pub fn target_nodes(&self, query: &str) -> Result<Vec<Term>, String> {
let fp = profile::fingerprint(query);
let start = std::time::Instant::now();
let results = if let Some(frozen) = &self.frozen {
self.prepared(query)?
.on_queryable_dataset(frozen)
.execute()
.map_err(err)?
} else {
self.prepared(query)?
.on_store(self.store()?)
.execute()
.map_err(err)?
};
let QueryResults::Solutions(solutions) = results else {
return Err("SPARQL target did not produce SELECT solutions".to_string());
};
let mut nodes = Vec::new();
for solution in solutions {
if let Some(node) = solution.map_err(err)?.get("this") {
nodes.push(node.clone());
}
}
profile::record(
&fp,
start.elapsed().as_micros() as u64,
profile::ExecutorKind::Fallback { reason: None },
);
Ok(nodes)
}
pub fn constraint_violations(
&self,
constraint: &SparqlConstraint,
focus: &Term,
) -> Result<Vec<SparqlViolation>, String> {
let compiled = self.compile_constraint(constraint)?;
let fp = profile::fingerprint(&constraint.query);
let start = std::time::Instant::now();
if let Some(batched) = compiled.batched.borrow().as_ref()
&& batched.covered.contains(focus)
{
let violations = batched.map.get(focus).cloned().unwrap_or_default();
profile::record(
&fp,
start.elapsed().as_micros() as u64,
profile::ExecutorKind::Fallback { reason: None },
);
return Ok(violations);
}
if let Some(plan) = &compiled.plan {
let violations = self.run_native(plan, focus);
#[cfg(debug_assertions)]
{
let reference = self.run_fallback(&compiled.query, focus)?;
assert_violations_match(&constraint.query, focus, &violations, &reference);
}
profile::record(
&fp,
start.elapsed().as_micros() as u64,
profile::ExecutorKind::Native,
);
Ok(violations)
} else {
let result = self.run_fallback(&compiled.query, focus)?;
profile::record(
&fp,
start.elapsed().as_micros() as u64,
profile::ExecutorKind::Fallback { reason: None },
);
Ok(result)
}
}
fn compile_constraint(&self, constraint: &SparqlConstraint) -> Result<Rc<Compiled>, String> {
let key = CompileKey {
query: constraint.query.clone(),
path: constraint.path.as_ref().map(path_key),
shape: constraint.shape.as_ref().map(|s| s.to_string()),
};
if let Some(compiled) = self.compiled.borrow().get(&key) {
return Ok(compiled.clone());
}
let mut query = self.parse(&constraint.query)?;
if let Some(path) = &constraint.path {
match path {
Path::Pred(predicate) => substitute_query(
&mut query,
&variable("PATH"),
&Term::NamedNode(predicate.clone()),
),
complex => {
let sparql_path = path_to_property_path(complex).ok_or_else(|| {
format!(
"SHACL path cannot be expressed as a SPARQL property path: {complex:?}"
)
})?;
query = rewrite_path_query(query, &sparql_path);
}
}
}
if let Some(shape) = &constraint.shape {
substitute_query(&mut query, &variable("currentShape"), shape);
}
if let Some(graph) = &self.shapes_graph {
substitute_query(
&mut query,
&variable("shapesGraph"),
&Term::NamedNode(graph.clone()),
);
}
let plan = match &self.frozen {
Some(frozen) => lower_query_with_stats(&query, Some(&frozen.plan_stats())).ok(),
None => None,
};
let compiled = Rc::new(Compiled {
plan,
query,
batched: RefCell::new(None),
});
self.compiled.borrow_mut().insert(key, compiled.clone());
Ok(compiled)
}
fn run_native(&self, plan: &NativeQueryPlan, focus: &Term) -> Vec<SparqlViolation> {
let frozen = self
.frozen
.as_ref()
.expect("native plan implies a frozen dataset (compile_constraint guards this)");
let foci = [focus.clone()];
let result = native_exec::execute(plan, frozen, &foci);
let solutions = &result.solutions[0];
match result.form {
QueryForm::Ask => {
if solutions.is_empty() {
Vec::new()
} else {
vec![SparqlViolation {
value: None,
path: None,
message: None,
bindings: HashMap::new(),
}]
}
}
QueryForm::Select => solutions
.iter()
.map(|b| SparqlViolation {
value: b.get("value").cloned(),
path: b.get("path").cloned(),
message: b.get("message").cloned(),
bindings: b.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
})
.collect(),
}
}
fn run_fallback(&self, query: &Query, focus: &Term) -> Result<Vec<SparqlViolation>, String> {
let mut query = query.clone();
substitute_query(&mut query, &variable("this"), focus);
let prepared = SparqlEvaluator::new().for_query(query);
let query_result = if let Some(frozen) = &self.frozen {
prepared
.on_queryable_dataset(frozen)
.execute()
.map_err(err)?
} else {
prepared.on_store(self.store()?).execute().map_err(err)?
};
match query_result {
QueryResults::Solutions(solutions) => {
let vars: Vec<String> = solutions
.variables()
.iter()
.map(|v| v.as_str().to_string())
.collect();
solutions
.map(|solution| {
let solution = solution.map_err(err)?;
let bindings = vars
.iter()
.filter_map(|name| {
solution
.get(name.as_str())
.map(|t| (name.clone(), t.clone()))
})
.collect();
Ok(SparqlViolation {
value: solution.get("value").cloned(),
path: solution.get("path").cloned(),
message: solution.get("message").cloned(),
bindings,
})
})
.collect()
}
QueryResults::Boolean(violates) => Ok(if violates {
vec![SparqlViolation {
value: None,
path: None,
message: None,
bindings: HashMap::new(),
}]
} else {
Vec::new()
}),
QueryResults::Graph(_) => {
Err("SPARQL constraint unexpectedly produced graph results".to_string())
}
}
}
pub fn prefetch_constraint(
&self,
constraint: &SparqlConstraint,
foci: &[Term],
) -> Result<(), String> {
let compiled = self.compile_constraint(constraint)?;
if compiled.plan.is_some() || compiled.batched.borrow().is_some() {
return Ok(());
}
if foci.len() < 2 {
return Ok(());
}
let fp = profile::fingerprint(&constraint.query);
let start = std::time::Instant::now();
if let Some(batched) = self.run_fallback_batch(&compiled.query, foci)? {
*compiled.batched.borrow_mut() = Some(batched);
profile::record(
&fp,
start.elapsed().as_micros() as u64,
profile::ExecutorKind::Fallback { reason: None },
);
}
Ok(())
}
fn run_fallback_batch(
&self,
query: &Query,
foci: &[Term],
) -> Result<Option<BatchedResults>, String> {
let Some((batched, covered)) = plan_batch_query(query, foci) else {
return Ok(None);
};
let covered: HashSet<Term> = covered.into_iter().collect();
let prepared = SparqlEvaluator::new().for_query(batched);
let query_result = if let Some(frozen) = &self.frozen {
prepared
.on_queryable_dataset(frozen)
.execute()
.map_err(err)?
} else {
prepared.on_store(self.store()?).execute().map_err(err)?
};
let QueryResults::Solutions(solutions) = query_result else {
return Ok(None);
};
let vars: Vec<String> = solutions
.variables()
.iter()
.map(|v| v.as_str().to_string())
.collect();
let mut map: HashMap<Term, Vec<SparqlViolation>> = HashMap::new();
for solution in solutions {
let solution = solution.map_err(err)?;
let Some(this) = solution.get("this") else {
continue;
};
if !covered.contains(this) {
continue;
}
let bindings = vars
.iter()
.filter(|name| name.as_str() != "this")
.filter_map(|name| {
solution
.get(name.as_str())
.map(|t| (name.clone(), t.clone()))
})
.collect();
map.entry(this.clone()).or_default().push(SparqlViolation {
value: solution.get("value").cloned(),
path: solution.get("path").cloned(),
message: solution.get("message").cloned(),
bindings,
});
}
Ok(Some(BatchedResults { covered, map }))
}
pub fn construct_many(
&self,
query: &str,
foci: &[Term],
frozen: Option<&FrozenIndexedDataset>,
) -> Result<Vec<Triple>, String> {
const BATCH_SIZE: usize = 2048;
if foci.is_empty() {
return Ok(Vec::new());
}
let fp = profile::fingerprint(query);
let start = std::time::Instant::now();
let mut triples = Vec::new();
let compiled = self.compile_construct(query)?;
let executor = if let (Some(plan), Some(frozen)) = (&compiled.plan, frozen) {
for chunk in foci.chunks(BATCH_SIZE) {
let result = native_exec::execute_ids(plan, frozen, chunk);
for (focus_index, solutions) in result.solutions.into_iter().enumerate() {
for bindings in solutions {
instantiate_template(
&compiled.template,
plan,
frozen,
&chunk[focus_index],
&bindings,
&mut triples,
);
}
}
}
profile::ExecutorKind::Native
} else {
for focus in foci.iter().filter(|f| !matches!(f, Term::BlankNode(_))) {
triples.extend(self.construct_one(query, focus)?);
}
let blank_foci: Vec<&Term> = foci
.iter()
.filter(|f| matches!(f, Term::BlankNode(_)))
.collect();
if !blank_foci.is_empty() {
let foci_set: HashSet<Term> = blank_foci.iter().map(|&f| f.clone()).collect();
let store = self.store()?;
let raw_triples: Vec<Triple> = match SparqlEvaluator::new()
.for_query(self.parse(query)?)
.on_store(store)
.execute()
.map_err(err)?
{
QueryResults::Graph(iter) => iter
.filter_map(|t| t.ok())
.filter(|t| foci_set.contains(&Term::from(t.subject.clone())))
.collect(),
_ => return Err("SPARQL rule did not produce CONSTRUCT graph results".into()),
};
triples.extend(raw_triples);
}
let store = self.store()?;
triples.retain(|triple| {
!store
.contains(QuadRef::new(
triple.subject.as_ref(),
triple.predicate.as_ref(),
triple.object.as_ref(),
GraphNameRef::DefaultGraph,
))
.unwrap_or(false)
});
profile::ExecutorKind::Fallback { reason: None }
};
profile::record(&fp, start.elapsed().as_micros() as u64, executor);
Ok(triples)
}
pub fn construct_delta_foci(
&self,
query: &str,
delta: &[Triple],
frozen: Option<&FrozenIndexedDataset>,
) -> Result<Option<HashSet<Term>>, String> {
let compiled = self.compile_construct(query)?;
let (Some(plan), Some(frozen)) = (&compiled.plan, frozen) else {
return Ok(None);
};
Ok(
native_exec::delta_focus_ids(plan, frozen, delta).map(|ids| {
ids.into_iter()
.filter_map(|id| frozen.externalize(id))
.collect()
}),
)
}
fn compile_construct(&self, query: &str) -> Result<Rc<CompiledConstruct>, String> {
if let Some(compiled) = self.constructs.borrow().get(query) {
return Ok(compiled.clone());
}
let parsed = self.parse(query)?;
let Query::Construct {
template,
dataset,
pattern,
base_iri,
} = parsed
else {
return Err("SPARQL rule did not contain a CONSTRUCT query".to_string());
};
let plan = if dataset.is_none() && !template.iter().any(triple_has_blank_node) {
let stats = self.frozen.as_ref().map(|f| f.plan_stats());
lower_query_with_stats(
&Query::Select {
dataset: None,
pattern,
base_iri,
},
stats.as_ref(),
)
.ok()
} else {
None
};
let compiled = Rc::new(CompiledConstruct { plan, template });
self.constructs
.borrow_mut()
.insert(query.to_string(), compiled.clone());
Ok(compiled)
}
fn construct_one(&self, query: &str, focus: &Term) -> Result<Vec<Triple>, String> {
let mut query = self.parse(query)?;
substitute_query(&mut query, &variable("this"), focus);
let prepared = SparqlEvaluator::new().for_query(query);
match prepared.on_store(self.store()?).execute().map_err(err)? {
QueryResults::Graph(triples) => triples.map(|triple| triple.map_err(err)).collect(),
_ => Err("SPARQL rule did not produce CONSTRUCT graph results".to_string()),
}
}
fn prepared(&self, query: &str) -> Result<PreparedSparqlQuery, String> {
if let Some(prepared) = self.prepared.borrow().get(query) {
return Ok(prepared.clone());
}
let prepared = SparqlEvaluator::new().parse_query(query).map_err(err)?;
self.prepared
.borrow_mut()
.insert(query.to_string(), prepared.clone());
Ok(prepared)
}
fn parse(&self, query: &str) -> Result<Query, String> {
if let Some(parsed) = self.parsed.borrow().get(query) {
return Ok(parsed.clone());
}
let parsed = SparqlParser::new().parse_query(query).map_err(err)?;
self.parsed
.borrow_mut()
.insert(query.to_string(), parsed.clone());
Ok(parsed)
}
pub fn call_sparql_function(
&self,
raw_query: &str,
params: &[(String, Term)],
) -> Result<Vec<Term>, String> {
let mut query = self.parse(raw_query)?;
for (name, value) in params {
let var = Variable::new(name).map_err(err)?;
substitute_query(&mut query, &var, value);
}
let prepared = SparqlEvaluator::new().for_query(query);
let result = if let Some(frozen) = &self.frozen {
prepared
.on_queryable_dataset(frozen)
.execute()
.map_err(err)?
} else {
prepared.on_store(self.store()?).execute().map_err(err)?
};
match result {
QueryResults::Solutions(solutions) => {
let mut out = Vec::new();
for sol in solutions {
if let Some(t) = sol.map_err(err)?.get("result") {
out.push(t.clone());
}
}
Ok(out)
}
QueryResults::Boolean(b) => Ok(if b {
vec![Term::Literal(Literal::from(true))]
} else {
vec![]
}),
QueryResults::Graph(_) => Err("sh:SPARQLFunction produced graph results".to_string()),
}
}
fn store(&self) -> Result<&Store, String> {
self.store
.as_ref()
.ok_or_else(|| "mutable SPARQL store is unavailable during validation".to_string())
}
#[cfg(test)]
fn has_store(&self) -> bool {
self.store.is_some()
}
}
fn triple_has_blank_node(triple: &TriplePattern) -> bool {
matches!(triple.subject, TermPattern::BlankNode(_))
|| matches!(triple.object, TermPattern::BlankNode(_))
}
fn instantiate_template(
template: &[TriplePattern],
plan: &NativeQueryPlan,
frozen: &FrozenIndexedDataset,
focus: &Term,
bindings: &native_exec::NativeIdBindings,
out: &mut Vec<Triple>,
) {
for triple in template {
let Some(subject_id) = resolve_template_id(&triple.subject, plan, frozen, focus, bindings)
else {
continue;
};
let Some(predicate_id) =
resolve_template_predicate_id(&triple.predicate, plan, frozen, focus, bindings)
else {
continue;
};
let Some(object_id) = resolve_template_id(&triple.object, plan, frozen, focus, bindings)
else {
continue;
};
if frozen.contains_ids(subject_id, predicate_id, object_id) {
continue;
}
let Some(subject) = frozen.externalize(subject_id).and_then(|term| match term {
Term::NamedNode(node) => Some(NamedOrBlankNode::NamedNode(node)),
Term::BlankNode(node) => Some(NamedOrBlankNode::BlankNode(node)),
Term::Literal(_) => None,
}) else {
continue;
};
let Some(Term::NamedNode(predicate)) = frozen.externalize(predicate_id) else {
continue;
};
let Some(object) = frozen.externalize(object_id) else {
continue;
};
out.push(Triple::new(subject, predicate, object));
}
}
fn resolve_template_id(
pattern: &TermPattern,
plan: &NativeQueryPlan,
frozen: &FrozenIndexedDataset,
focus: &Term,
bindings: &native_exec::NativeIdBindings,
) -> Option<TermId> {
match pattern {
TermPattern::NamedNode(node) => Some(frozen.intern(&Term::NamedNode(node.clone()))),
TermPattern::BlankNode(node) => Some(frozen.intern(&Term::BlankNode(node.clone()))),
TermPattern::Literal(literal) => Some(frozen.intern(&Term::Literal(literal.clone()))),
TermPattern::Variable(variable) if variable.as_str() == "this" => {
Some(frozen.intern(focus))
}
TermPattern::Variable(variable) => {
let id = plan.var_id(variable.as_str())?;
bindings.get(&id).copied()
}
#[allow(unreachable_patterns)]
_ => None,
}
}
fn resolve_template_predicate_id(
pattern: &NamedNodePattern,
plan: &NativeQueryPlan,
frozen: &FrozenIndexedDataset,
focus: &Term,
bindings: &native_exec::NativeIdBindings,
) -> Option<TermId> {
match pattern {
NamedNodePattern::NamedNode(node) => Some(frozen.intern(&Term::NamedNode(node.clone()))),
NamedNodePattern::Variable(variable) => {
if variable.as_str() == "this" {
Some(frozen.intern(focus))
} else {
let id = plan.var_id(variable.as_str())?;
bindings.get(&id).copied()
}
}
}
}
fn path_to_property_path(path: &Path) -> Option<PropertyPathExpression> {
match path {
Path::Id => None,
Path::Pred(n) => Some(PropertyPathExpression::NamedNode(n.clone())),
Path::Inverse(inner) => {
path_to_property_path(inner).map(|p| PropertyPathExpression::Reverse(Box::new(p)))
}
Path::Seq(parts) => {
let sparql: Vec<_> = parts
.iter()
.map(path_to_property_path)
.collect::<Option<Vec<_>>>()?;
sparql
.into_iter()
.reduce(|a, b| PropertyPathExpression::Sequence(Box::new(a), Box::new(b)))
}
Path::Alt(parts) => {
let has_id = parts.iter().any(|p| matches!(p, Path::Id));
let sparql: Vec<_> = parts
.iter()
.filter(|p| !matches!(p, Path::Id))
.map(path_to_property_path)
.collect::<Option<Vec<_>>>()?;
if sparql.is_empty() {
return None;
}
let base = sparql
.into_iter()
.reduce(|a, b| PropertyPathExpression::Alternative(Box::new(a), Box::new(b)))?;
if has_id {
Some(PropertyPathExpression::ZeroOrOne(Box::new(base)))
} else {
Some(base)
}
}
Path::Star(inner) => {
path_to_property_path(inner).map(|p| PropertyPathExpression::ZeroOrMore(Box::new(p)))
}
}
}
fn rewrite_path_query(query: Query, path: &PropertyPathExpression) -> Query {
match query {
Query::Select {
dataset,
pattern,
base_iri,
} => Query::Select {
dataset,
pattern: rewrite_path_pattern(pattern, path),
base_iri,
},
Query::Ask {
dataset,
pattern,
base_iri,
} => Query::Ask {
dataset,
pattern: rewrite_path_pattern(pattern, path),
base_iri,
},
other => other,
}
}
fn rewrite_path_pattern(pattern: GraphPattern, path: &PropertyPathExpression) -> GraphPattern {
match pattern {
GraphPattern::Bgp { patterns } => {
let mut result = GraphPattern::Bgp { patterns: vec![] };
let mut remaining = Vec::new();
for triple in patterns {
if matches!(&triple.predicate, NamedNodePattern::Variable(v) if v.as_str() == "PATH")
{
let path_gp = GraphPattern::Path {
subject: triple.subject,
path: path.clone(),
object: triple.object,
};
result = GraphPattern::Join {
left: Box::new(result),
right: Box::new(path_gp),
};
} else {
remaining.push(triple);
}
}
if remaining.is_empty() {
result
} else {
let bgp = GraphPattern::Bgp {
patterns: remaining,
};
GraphPattern::Join {
left: Box::new(bgp),
right: Box::new(result),
}
}
}
GraphPattern::Path { .. } => pattern,
GraphPattern::Join { left, right } => GraphPattern::Join {
left: Box::new(rewrite_path_pattern(*left, path)),
right: Box::new(rewrite_path_pattern(*right, path)),
},
GraphPattern::Union { left, right } => GraphPattern::Union {
left: Box::new(rewrite_path_pattern(*left, path)),
right: Box::new(rewrite_path_pattern(*right, path)),
},
GraphPattern::Minus { left, right } => GraphPattern::Minus {
left: Box::new(rewrite_path_pattern(*left, path)),
right: Box::new(rewrite_path_pattern(*right, path)),
},
GraphPattern::Lateral { left, right } => GraphPattern::Lateral {
left: Box::new(rewrite_path_pattern(*left, path)),
right: Box::new(rewrite_path_pattern(*right, path)),
},
GraphPattern::LeftJoin {
left,
right,
expression,
} => GraphPattern::LeftJoin {
left: Box::new(rewrite_path_pattern(*left, path)),
right: Box::new(rewrite_path_pattern(*right, path)),
expression,
},
GraphPattern::Filter { expr, inner } => GraphPattern::Filter {
expr,
inner: Box::new(rewrite_path_pattern(*inner, path)),
},
GraphPattern::Graph { name, inner } => GraphPattern::Graph {
name,
inner: Box::new(rewrite_path_pattern(*inner, path)),
},
GraphPattern::Extend {
inner,
variable,
expression,
} => GraphPattern::Extend {
inner: Box::new(rewrite_path_pattern(*inner, path)),
variable,
expression,
},
GraphPattern::OrderBy { inner, expression } => GraphPattern::OrderBy {
inner: Box::new(rewrite_path_pattern(*inner, path)),
expression,
},
GraphPattern::Project { inner, variables } => GraphPattern::Project {
inner: Box::new(rewrite_path_pattern(*inner, path)),
variables,
},
GraphPattern::Distinct { inner } => GraphPattern::Distinct {
inner: Box::new(rewrite_path_pattern(*inner, path)),
},
GraphPattern::Reduced { inner } => GraphPattern::Reduced {
inner: Box::new(rewrite_path_pattern(*inner, path)),
},
GraphPattern::Slice {
inner,
start,
length,
} => GraphPattern::Slice {
inner: Box::new(rewrite_path_pattern(*inner, path)),
start,
length,
},
GraphPattern::Group {
inner,
variables,
aggregates,
} => GraphPattern::Group {
inner: Box::new(rewrite_path_pattern(*inner, path)),
variables,
aggregates,
},
GraphPattern::Service {
name,
inner,
silent,
} => GraphPattern::Service {
name,
inner: Box::new(rewrite_path_pattern(*inner, path)),
silent,
},
GraphPattern::Values { .. } => pattern,
}
}
fn plan_batch_query(query: &Query, foci: &[Term]) -> Option<(Query, Vec<Term>)> {
let this = Variable::new("this").ok()?;
let (pattern, is_ask) = match query {
Query::Select { pattern, .. } => (pattern, false),
Query::Ask { pattern, .. } => (pattern, true),
_ => return None,
};
if pattern_has_group_or_slice(pattern) || !this_required(pattern, &this) {
return None;
}
let covered: Vec<Term> = foci
.iter()
.filter(|f| matches!(f, Term::NamedNode(_)))
.cloned()
.collect();
if covered.len() < 2 {
return None;
}
let (dataset, base_iri) = match query {
Query::Select {
dataset, base_iri, ..
}
| Query::Ask {
dataset, base_iri, ..
} => (dataset.clone(), base_iri.clone()),
_ => unreachable!(),
};
let rewritten = if is_ask {
Query::Select {
dataset,
pattern: GraphPattern::Project {
inner: Box::new(GraphPattern::Distinct {
inner: Box::new(pattern.clone()),
}),
variables: vec![this],
},
base_iri,
}
} else {
Query::Select {
dataset,
pattern: ensure_projected(pattern.clone(), &this)?,
base_iri,
}
};
Some((rewritten, covered))
}
fn this_required(p: &GraphPattern, var: &Variable) -> bool {
match p {
GraphPattern::Bgp { patterns } => patterns
.iter()
.any(|t| term_pattern_is(&t.subject, var) || term_pattern_is(&t.object, var)),
GraphPattern::Path {
subject, object, ..
} => term_pattern_is(subject, var) || term_pattern_is(object, var),
GraphPattern::Join { left, right } | GraphPattern::Lateral { left, right } => {
this_required(left, var) || this_required(right, var)
}
GraphPattern::Union { left, right } => {
this_required(left, var) && this_required(right, var)
}
GraphPattern::LeftJoin { left, .. } | GraphPattern::Minus { left, .. } => {
this_required(left, var)
}
GraphPattern::Filter { inner, .. }
| GraphPattern::Graph { inner, .. }
| GraphPattern::Extend { inner, .. }
| GraphPattern::OrderBy { inner, .. }
| GraphPattern::Project { inner, .. }
| GraphPattern::Distinct { inner }
| GraphPattern::Reduced { inner }
| GraphPattern::Slice { inner, .. }
| GraphPattern::Group { inner, .. }
| GraphPattern::Service { inner, .. } => this_required(inner, var),
GraphPattern::Values {
variables,
bindings,
} => {
variables.iter().position(|v| v == var).is_some_and(|col| {
bindings
.iter()
.all(|row| row.get(col).is_some_and(Option::is_some))
})
}
}
}
fn ensure_projected(pattern: GraphPattern, this: &Variable) -> Option<GraphPattern> {
match pattern {
GraphPattern::Project {
inner,
mut variables,
} => {
if !variables.iter().any(|v| v == this) {
variables.push(this.clone());
}
Some(GraphPattern::Project { inner, variables })
}
GraphPattern::Distinct { inner } => Some(GraphPattern::Distinct {
inner: Box::new(ensure_projected(*inner, this)?),
}),
GraphPattern::Reduced { inner } => Some(GraphPattern::Reduced {
inner: Box::new(ensure_projected(*inner, this)?),
}),
GraphPattern::OrderBy { inner, expression } => Some(GraphPattern::OrderBy {
inner: Box::new(ensure_projected(*inner, this)?),
expression,
}),
_ => None,
}
}
fn pattern_has_group_or_slice(p: &GraphPattern) -> bool {
match p {
GraphPattern::Group { .. } | GraphPattern::Slice { .. } => true,
GraphPattern::Bgp { .. } | GraphPattern::Path { .. } | GraphPattern::Values { .. } => false,
GraphPattern::Join { left, right }
| GraphPattern::Union { left, right }
| GraphPattern::Minus { left, right }
| GraphPattern::Lateral { left, right }
| GraphPattern::LeftJoin { left, right, .. } => {
pattern_has_group_or_slice(left) || pattern_has_group_or_slice(right)
}
GraphPattern::Filter { inner, .. }
| GraphPattern::Graph { inner, .. }
| GraphPattern::Extend { inner, .. }
| GraphPattern::OrderBy { inner, .. }
| GraphPattern::Project { inner, .. }
| GraphPattern::Distinct { inner }
| GraphPattern::Reduced { inner }
| GraphPattern::Service { inner, .. } => pattern_has_group_or_slice(inner),
}
}
fn term_pattern_is(t: &TermPattern, var: &Variable) -> bool {
matches!(t, TermPattern::Variable(v) if v == var)
}
fn substitute_query(query: &mut Query, var: &Variable, value: &Term) {
match query {
Query::Select { pattern, .. }
| Query::Describe { pattern, .. }
| Query::Ask { pattern, .. } => substitute_pattern(pattern, var, value),
Query::Construct {
template, pattern, ..
} => {
for triple in template {
substitute_triple(triple, var, value);
}
substitute_pattern(pattern, var, value);
}
}
}
fn substitute_pattern(pattern: &mut GraphPattern, var: &Variable, value: &Term) {
match pattern {
GraphPattern::Bgp { patterns } => {
for triple in patterns {
substitute_triple(triple, var, value);
}
}
GraphPattern::Path {
subject, object, ..
} => {
substitute_term_pattern(subject, var, value);
substitute_term_pattern(object, var, value);
}
GraphPattern::Join { left, right }
| GraphPattern::Union { left, right }
| GraphPattern::Minus { left, right }
| GraphPattern::Lateral { left, right } => {
substitute_pattern(left, var, value);
substitute_pattern(right, var, value);
}
GraphPattern::LeftJoin {
left,
right,
expression,
} => {
substitute_pattern(left, var, value);
substitute_pattern(right, var, value);
if let Some(expression) = expression {
substitute_expr(expression, var, value);
}
}
GraphPattern::Filter { expr, inner } => {
substitute_expr(expr, var, value);
substitute_pattern(inner, var, value);
}
GraphPattern::Graph { name, inner } => {
substitute_named_node_pattern(name, var, value);
substitute_pattern(inner, var, value);
}
GraphPattern::Extend {
inner, expression, ..
} => {
substitute_pattern(inner, var, value);
substitute_expr(expression, var, value);
}
GraphPattern::OrderBy { inner, expression } => {
substitute_pattern(inner, var, value);
for order in expression {
match order {
OrderExpression::Asc(e) | OrderExpression::Desc(e) => {
substitute_expr(e, var, value)
}
}
}
}
GraphPattern::Project { inner, .. }
| GraphPattern::Distinct { inner }
| GraphPattern::Reduced { inner }
| GraphPattern::Slice { inner, .. } => substitute_pattern(inner, var, value),
GraphPattern::Group {
inner, aggregates, ..
} => {
substitute_pattern(inner, var, value);
for (_, aggregate) in aggregates {
if let AggregateExpression::FunctionCall { expr, .. } = aggregate {
substitute_expr(expr, var, value);
}
}
}
GraphPattern::Service { name, inner, .. } => {
substitute_named_node_pattern(name, var, value);
substitute_pattern(inner, var, value);
}
GraphPattern::Values { .. } => {}
}
}
fn substitute_triple(triple: &mut TriplePattern, var: &Variable, value: &Term) {
substitute_term_pattern(&mut triple.subject, var, value);
substitute_named_node_pattern(&mut triple.predicate, var, value);
substitute_term_pattern(&mut triple.object, var, value);
}
fn substitute_term_pattern(pattern: &mut TermPattern, var: &Variable, value: &Term) {
if matches!(pattern, TermPattern::Variable(v) if v == var)
&& let Some(replacement) = term_to_term_pattern(value)
{
*pattern = replacement;
}
}
fn substitute_named_node_pattern(pattern: &mut NamedNodePattern, var: &Variable, value: &Term) {
if matches!(pattern, NamedNodePattern::Variable(v) if v == var)
&& let Term::NamedNode(node) = value
{
*pattern = NamedNodePattern::NamedNode(node.clone());
}
}
fn substitute_expr(expr: &mut Expression, var: &Variable, value: &Term) {
match expr {
Expression::Variable(v) if v == var => {
if let Some(replacement) = term_to_expr(value) {
*expr = replacement;
}
}
Expression::Bound(v) if v == var => *expr = Expression::Literal(Literal::from(true)),
Expression::Variable(_)
| Expression::Bound(_)
| Expression::NamedNode(_)
| Expression::Literal(_) => {}
Expression::Or(a, b)
| Expression::And(a, b)
| Expression::Equal(a, b)
| Expression::SameTerm(a, b)
| Expression::Greater(a, b)
| Expression::GreaterOrEqual(a, b)
| Expression::Less(a, b)
| Expression::LessOrEqual(a, b)
| Expression::Add(a, b)
| Expression::Subtract(a, b)
| Expression::Multiply(a, b)
| Expression::Divide(a, b) => {
substitute_expr(a, var, value);
substitute_expr(b, var, value);
}
Expression::In(a, list) => {
substitute_expr(a, var, value);
for e in list {
substitute_expr(e, var, value);
}
}
Expression::UnaryPlus(a) | Expression::UnaryMinus(a) | Expression::Not(a) => {
substitute_expr(a, var, value)
}
Expression::Exists(pattern) => substitute_pattern(pattern, var, value),
Expression::If(a, b, c) => {
substitute_expr(a, var, value);
substitute_expr(b, var, value);
substitute_expr(c, var, value);
}
Expression::Coalesce(list) | Expression::FunctionCall(_, list) => {
for e in list {
substitute_expr(e, var, value);
}
}
}
}
fn term_to_term_pattern(value: &Term) -> Option<TermPattern> {
match value {
Term::NamedNode(n) => Some(TermPattern::NamedNode(n.clone())),
Term::BlankNode(b) => Some(TermPattern::BlankNode(b.clone())),
Term::Literal(l) => Some(TermPattern::Literal(l.clone())),
#[allow(unreachable_patterns)]
_ => None,
}
}
fn term_to_expr(value: &Term) -> Option<Expression> {
match value {
Term::NamedNode(n) => Some(Expression::NamedNode(n.clone())),
Term::Literal(l) => Some(Expression::Literal(l.clone())),
_ => None,
}
}
fn variable(name: &str) -> Variable {
Variable::new(name).expect("static SPARQL variable name")
}
fn err(error: impl std::fmt::Display) -> String {
error.to_string()
}
fn path_key(path: &Path) -> String {
shifty_algebra::render::path_to_string(path)
}
#[cfg(debug_assertions)]
fn assert_violations_match(
query: &str,
focus: &Term,
native: &[SparqlViolation],
reference: &[SparqlViolation],
) {
fn canonical(violations: &[SparqlViolation]) -> Vec<(String, String)> {
let mut keyed: Vec<(String, String)> = violations
.iter()
.map(|v| {
(
v.value.as_ref().map(Term::to_string).unwrap_or_default(),
v.path.as_ref().map(Term::to_string).unwrap_or_default(),
)
})
.collect();
keyed.sort();
keyed
}
let native = canonical(native);
let reference = canonical(reference);
assert_eq!(
native, reference,
"native vs Spareval disagreement for focus {focus}\n query: {query}\n native: {native:?}\n spareval: {reference:?}",
);
}
#[cfg(test)]
mod storage_tests {
use super::*;
#[test]
fn validation_executor_does_not_allocate_mutable_store() {
let executor =
SparqlExecutor::from_frozen(FrozenIndexedDataset::from_graph(&Graph::new()), false);
assert!(!executor.has_store());
}
}
#[cfg(test)]
mod batch_tests {
use super::*;
use shifty_algebra::SparqlQueryKind;
fn nn(s: &str) -> NamedNode {
NamedNode::new(s).unwrap()
}
fn subject(s: &str) -> Term {
Term::NamedNode(nn(&format!("http://ex/{s}")))
}
fn sample_graph() -> Graph {
let mut g = Graph::new();
for (s, v) in [("a", 5i64), ("b", 15), ("c", 25)] {
g.insert(&Triple::new(
nn(&format!("http://ex/{s}")),
nn("http://ex/value"),
Literal::from(v),
));
}
g
}
fn foci() -> Vec<Term> {
vec![subject("a"), subject("b"), subject("c")]
}
fn per_focus(exec: &SparqlExecutor, c: &SparqlConstraint, foci: &[Term]) -> Vec<String> {
let mut out = Vec::new();
for f in foci {
for v in exec.constraint_violations(c, f).unwrap() {
out.push(format!("{f:?}|{:?}", v.value));
}
}
out.sort();
out
}
#[test]
fn batched_select_matches_per_focus() {
let constraint = SparqlConstraint {
kind: SparqlQueryKind::Select,
query: "SELECT $this ?value WHERE { \
$this <http://ex/value> ?value . \
OPTIONAL { $this <http://ex/other> ?o } \
FILTER(!bound(?o)) }"
.to_string(),
path: None,
shape: None,
messages: Vec::new(),
};
let g = sample_graph();
let foci = foci();
let baseline = SparqlExecutor::from_frozen(FrozenIndexedDataset::from_graph(&g), false);
let want = per_focus(&baseline, &constraint, &foci);
let batched = SparqlExecutor::from_frozen(FrozenIndexedDataset::from_graph(&g), false);
batched.prefetch_constraint(&constraint, &foci).unwrap();
let got = per_focus(&batched, &constraint, &foci);
assert_eq!(want.len(), 3, "every subject should violate");
assert_eq!(want, got);
}
#[test]
fn batched_ask_matches_per_focus() {
let constraint = SparqlConstraint {
kind: SparqlQueryKind::Ask,
query: "ASK { $this <http://ex/value> ?value . \
OPTIONAL { $this <http://ex/other> ?o } \
FILTER(?value > 10) }"
.to_string(),
path: None,
shape: None,
messages: Vec::new(),
};
let g = sample_graph();
let foci = foci();
let baseline = SparqlExecutor::from_frozen(FrozenIndexedDataset::from_graph(&g), false);
let want: Vec<usize> = foci
.iter()
.map(|f| {
baseline
.constraint_violations(&constraint, f)
.unwrap()
.len()
})
.collect();
let batched = SparqlExecutor::from_frozen(FrozenIndexedDataset::from_graph(&g), false);
batched.prefetch_constraint(&constraint, &foci).unwrap();
let got: Vec<usize> = foci
.iter()
.map(|f| batched.constraint_violations(&constraint, f).unwrap().len())
.collect();
assert_eq!(want, vec![0, 1, 1]);
assert_eq!(want, got);
}
#[test]
fn blank_node_foci_fall_through_to_per_focus() {
use oxrdf::BlankNode;
let mut g = Graph::new();
let blank = BlankNode::default();
g.insert(&Triple::new(
nn("http://ex/a"),
nn("http://ex/flag"),
Literal::from(true),
));
g.insert(&Triple::new(
blank.clone(),
nn("http://ex/flag"),
Literal::from(true),
));
g.insert(&Triple::new(
nn("http://ex/x"),
nn("http://ex/ref"),
nn("http://ex/a"),
));
g.insert(&Triple::new(
nn("http://ex/y"),
nn("http://ex/ref"),
blank.clone(),
));
let constraint = SparqlConstraint {
kind: SparqlQueryKind::Select,
query: "SELECT ?s $this WHERE { \
$this <http://ex/flag> true . \
?s <http://ex/ref> $this . \
OPTIONAL { $this <http://ex/other> ?o } }"
.to_string(),
path: None,
shape: None,
messages: Vec::new(),
};
let foci = vec![
Term::NamedNode(nn("http://ex/a")),
Term::BlankNode(blank),
subject("c"),
];
let baseline = SparqlExecutor::from_frozen(FrozenIndexedDataset::from_graph(&g), false);
let want = per_focus(&baseline, &constraint, &foci);
let batched = SparqlExecutor::from_frozen(FrozenIndexedDataset::from_graph(&g), false);
batched.prefetch_constraint(&constraint, &foci).unwrap();
let got = per_focus(&batched, &constraint, &foci);
assert!(!want.is_empty());
assert_eq!(want, got);
}
#[test]
fn aggregating_query_is_not_batched_but_still_correct() {
let constraint = SparqlConstraint {
kind: SparqlQueryKind::Select,
query: "SELECT $this (COUNT(?value) AS ?n) WHERE { \
$this <http://ex/value> ?value } \
GROUP BY $this HAVING (COUNT(?value) > 5)"
.to_string(),
path: None,
shape: None,
messages: Vec::new(),
};
let g = sample_graph();
let foci = foci();
let baseline = SparqlExecutor::from_frozen(FrozenIndexedDataset::from_graph(&g), false);
let want = per_focus(&baseline, &constraint, &foci);
let batched = SparqlExecutor::from_frozen(FrozenIndexedDataset::from_graph(&g), false);
batched.prefetch_constraint(&constraint, &foci).unwrap();
let got = per_focus(&batched, &constraint, &foci);
assert_eq!(want, got); }
}