pub mod advanced_statistics;
pub mod algebra;
pub mod binding_optimizer;
pub mod cost_based_optimizer;
pub mod cost_estimator;
pub mod distributed;
pub mod exec;
pub mod functions;
pub mod gpu;
pub mod jit;
pub mod ml_optimizer;
pub mod optimizer;
pub mod parser;
pub mod pattern_optimizer;
pub mod pattern_unification;
pub mod plan;
pub mod plan_cache;
pub mod profiled_plan_builder;
pub mod property_function_registry;
pub mod property_paths;
pub mod query_plan_visualizer;
pub mod query_profiler;
pub mod result_cache;
pub mod sparql_algebra;
pub mod sparql_query;
pub mod statistics;
pub mod streaming_results;
pub mod update;
pub mod wasm;
pub use property_function_registry::{
PropertyFunction, PropertyFunctionArg, PropertyFunctionBinding, PropertyFunctionFactory,
PropertyFunctionMetadata, PropertyFunctionRegistry, PropertyFunctionResult,
};
pub use crate::{GraphName, Triple};
pub use sparql_algebra::{
Expression as SparqlExpression, GraphPattern as SparqlGraphPattern, NamedNodePattern,
PropertyPathExpression, TermPattern as SparqlTermPattern, TriplePattern as SparqlTriplePattern,
};
pub use sparql_query::*;
pub use plan::ExecutionPlan;
pub use advanced_statistics::{AdvancedStatistics, AdvancedStatisticsCollector, PatternExecution};
pub use algebra::{
AlgebraTriplePattern, Expression as AlgebraExpression, GraphPattern as AlgebraGraphPattern,
PropertyPath, Query as AlgebraQuery, TermPattern as AlgebraTermPattern,
};
pub use binding_optimizer::{BindingIterator, BindingOptimizer, BindingSet, Constraint, TermType};
pub use cost_based_optimizer::{
CostBasedOptimizer, CostConfiguration, Optimization, OptimizedPlan, OptimizerStats,
};
pub use distributed::{DistributedConfig, DistributedQueryEngine, FederatedEndpoint};
pub use gpu::{GpuBackend, GpuQueryExecutor};
pub use jit::{JitCompiler, JitConfig};
pub use ml_optimizer::{
MLOptimizationResult, MLOptimizerConfig, MLQueryOptimizer, PatternFeatures, PerformanceMetrics,
TrainingStats,
};
pub use optimizer::{AIQueryOptimizer, MultiQueryOptimizer};
pub use parser::*;
pub use pattern_optimizer::{IndexType, OptimizedPatternPlan, PatternExecutor, PatternOptimizer};
pub use pattern_unification::{
PatternConverter, PatternOptimizer as UnifiedPatternOptimizer, UnifiedTermPattern,
UnifiedTriplePattern,
};
pub use plan_cache::{
CacheConfig, CacheStatistics, CachedPlan, LruQueryPlanCache, PlanCacheStats, QueryPlan,
QueryPlanCache, SerializablePlan,
};
pub use profiled_plan_builder::{
CacheEffectiveness, ExecutionComparison, ImprovementLevel, PerformanceAnalysis,
PerformanceGrade, ProfiledPlanBuilder, ProfilingReport,
};
pub use query_plan_visualizer::{
HintSeverity, OptimizationHint, QueryPlanNode, QueryPlanSummary, QueryPlanVisualizer,
};
pub use query_profiler::{
ProfiledQuery, ProfilerConfig, ProfilingStatistics, QueryProfiler, QueryProfilingSession,
QueryStatistics,
};
pub use result_cache::{CacheConfig as ResultCacheConfig, CacheStats, QueryResultCache};
pub use statistics::{
GraphStatistics, PredicateStatistics, QueryExecutionStats, SelectivityInfo, StatisticsSummary,
};
pub use streaming_results::{
ConstructResults, SelectResults, Solution as StreamingSolution, SolutionMetadata,
StreamingConfig, StreamingProgress, StreamingQueryResults, StreamingResultBuilder,
};
pub use update::{UpdateExecutor, UpdateParser};
pub use wasm::{OptimizationLevel, WasmQueryCompiler, WasmTarget};
pub use exec::{QueryExecutor, QueryResults, Solution};
use crate::model::{Object, Predicate, Subject, Term, Variable};
use crate::OxirsError;
use crate::Store;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use algebra::TermPattern;
type FederatedQueryFuture<'a> =
Pin<Box<dyn Future<Output = Result<Vec<HashMap<String, Term>>, OxirsError>> + 'a>>;
#[derive(Debug, Clone)]
pub enum QueryResult {
Select {
variables: Vec<String>,
bindings: Vec<HashMap<String, Term>>,
},
Ask(bool),
Construct(Vec<crate::model::Triple>),
}
pub struct QueryEngine {
parser: parser::SparqlParser,
executor_config: QueryExecutorConfig,
federation_executor: Option<crate::federation::FederationExecutor>,
}
impl Default for QueryEngine {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct QueryExecutorConfig {
pub max_results: usize,
pub timeout_ms: Option<u64>,
pub optimize: bool,
}
impl Default for QueryExecutorConfig {
fn default() -> Self {
Self {
max_results: 10000,
timeout_ms: Some(30000),
optimize: true,
}
}
}
impl QueryEngine {
pub fn new() -> Self {
Self {
parser: parser::SparqlParser::new(),
executor_config: QueryExecutorConfig::default(),
federation_executor: crate::federation::FederationExecutor::new().ok(),
}
}
pub fn with_config(config: QueryExecutorConfig) -> Self {
Self {
parser: parser::SparqlParser::new(),
executor_config: config,
federation_executor: crate::federation::FederationExecutor::new().ok(),
}
}
pub fn with_federation(mut self) -> Self {
self.federation_executor = crate::federation::FederationExecutor::new().ok();
self
}
pub fn without_federation(mut self) -> Self {
self.federation_executor = None;
self
}
pub fn query(&self, query_str: &str, store: &dyn Store) -> Result<QueryResult, OxirsError> {
let parsed_query = self.parser.parse(query_str)?;
self.execute_query(&parsed_query, store)
}
pub fn execute_query(
&self,
query: &sparql_query::Query,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
match query {
sparql_query::Query::Select {
pattern, dataset, ..
} => self.execute_select_query(pattern, dataset.as_ref(), store),
sparql_query::Query::Ask {
pattern, dataset, ..
} => self.execute_ask_query(pattern, dataset.as_ref(), store),
sparql_query::Query::Construct {
template,
pattern,
dataset,
..
} => self.execute_construct_query(template, pattern, dataset.as_ref(), store),
sparql_query::Query::Describe {
pattern, dataset, ..
} => self.execute_describe_query(pattern, dataset.as_ref(), store),
}
}
fn execute_select_query(
&self,
pattern: &SparqlGraphPattern,
_dataset: Option<&QueryDataset>,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
let executor = QueryExecutor::new(store);
let plan = self.pattern_to_plan(pattern)?;
let solutions = executor.execute(&plan)?;
let variables = self.extract_variables(pattern);
let bindings: Vec<HashMap<String, Term>> = solutions
.into_iter()
.take(self.executor_config.max_results)
.map(|sol| {
let mut binding = HashMap::new();
for var in &variables {
if let Some(term) = sol.get(var) {
binding.insert(var.name().to_string(), term.clone());
}
}
binding
})
.collect();
Ok(QueryResult::Select {
variables: variables
.into_iter()
.map(|v| v.name().to_string())
.collect(),
bindings,
})
}
fn execute_ask_query(
&self,
pattern: &SparqlGraphPattern,
_dataset: Option<&QueryDataset>,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
let executor = QueryExecutor::new(store);
let plan = self.pattern_to_plan(pattern)?;
let solutions = executor.execute(&plan)?;
Ok(QueryResult::Ask(!solutions.is_empty()))
}
fn execute_construct_query(
&self,
template: &[SparqlTriplePattern],
pattern: &SparqlGraphPattern,
_dataset: Option<&QueryDataset>,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
let executor = QueryExecutor::new(store);
let plan = self.pattern_to_plan(pattern)?;
let solutions = executor.execute(&plan)?;
let mut triples = Vec::new();
for solution in solutions.into_iter().take(self.executor_config.max_results) {
for triple_pattern in template {
if let Some(triple) = self.instantiate_triple_pattern(triple_pattern, &solution)? {
triples.push(triple);
}
}
}
Ok(QueryResult::Construct(triples))
}
fn execute_describe_query(
&self,
pattern: &SparqlGraphPattern,
_dataset: Option<&QueryDataset>,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
let executor = QueryExecutor::new(store);
let plan = self.pattern_to_plan(pattern)?;
let solutions = executor.execute(&plan)?;
let mut triples = Vec::new();
for solution in solutions.into_iter().take(self.executor_config.max_results) {
for (_, term) in solution.iter() {
if let Ok(store_quads) =
store.find_quads(None, None, None, Some(&GraphName::DefaultGraph))
{
for quad in store_quads {
let triple = Triple::new(
quad.subject().clone(),
quad.predicate().clone(),
quad.object().clone(),
);
if self.triple_involves_term(&triple, term) {
triples.push(triple);
}
}
}
}
}
triples.dedup();
Ok(QueryResult::Construct(triples))
}
fn pattern_to_plan(&self, pattern: &SparqlGraphPattern) -> Result<ExecutionPlan, OxirsError> {
match pattern {
SparqlGraphPattern::Bgp { patterns } => {
if patterns.len() == 1 {
Ok(ExecutionPlan::TripleScan {
pattern: self.convert_sparql_triple_pattern(&patterns[0])?,
})
} else {
let mut plan = ExecutionPlan::TripleScan {
pattern: self.convert_sparql_triple_pattern(&patterns[0])?,
};
for triple_pattern in &patterns[1..] {
let right_plan = ExecutionPlan::TripleScan {
pattern: self.convert_sparql_triple_pattern(triple_pattern)?,
};
let join_vars = self.find_join_variables(&plan, &right_plan);
plan = ExecutionPlan::HashJoin {
left: Box::new(plan),
right: Box::new(right_plan),
join_vars,
};
}
Ok(plan)
}
}
SparqlGraphPattern::Join { left, right } => {
let left_plan = self.pattern_to_plan(left)?;
let right_plan = self.pattern_to_plan(right)?;
let join_vars = self.find_join_variables(&left_plan, &right_plan);
Ok(ExecutionPlan::HashJoin {
left: Box::new(left_plan),
right: Box::new(right_plan),
join_vars,
})
}
SparqlGraphPattern::Filter { expr, inner } => {
let input_plan = self.pattern_to_plan(inner)?;
let condition = self.convert_expression(expr.clone())?;
Ok(ExecutionPlan::Filter {
input: Box::new(input_plan),
condition,
})
}
SparqlGraphPattern::Union { left, right } => {
let left_plan = self.pattern_to_plan(left)?;
let right_plan = self.pattern_to_plan(right)?;
Ok(ExecutionPlan::Union {
left: Box::new(left_plan),
right: Box::new(right_plan),
})
}
SparqlGraphPattern::Project { inner, variables } => {
let input_plan = self.pattern_to_plan(inner)?;
Ok(ExecutionPlan::Project {
input: Box::new(input_plan),
vars: variables.clone(),
})
}
SparqlGraphPattern::Distinct { inner } => {
let input_plan = self.pattern_to_plan(inner)?;
Ok(ExecutionPlan::Distinct {
input: Box::new(input_plan),
})
}
SparqlGraphPattern::Slice {
inner,
start,
length,
} => {
let input_plan = self.pattern_to_plan(inner)?;
Ok(ExecutionPlan::Limit {
input: Box::new(input_plan),
limit: length.unwrap_or(usize::MAX),
offset: *start,
})
}
_ => {
Err(OxirsError::Query(format!(
"Unsupported graph pattern type: {pattern:?}"
)))
}
}
}
fn convert_sparql_triple_pattern(
&self,
pattern: &SparqlTriplePattern,
) -> Result<crate::model::pattern::TriplePattern, OxirsError> {
use crate::model::pattern::*;
let subject = match &pattern.subject {
SparqlTermPattern::Variable(v) => Some(SubjectPattern::Variable(v.clone())),
SparqlTermPattern::NamedNode(n) => Some(SubjectPattern::NamedNode(n.clone())),
SparqlTermPattern::BlankNode(b) => Some(SubjectPattern::BlankNode(b.clone())),
_ => None,
};
let predicate = match &pattern.predicate {
SparqlTermPattern::Variable(v) => Some(PredicatePattern::Variable(v.clone())),
SparqlTermPattern::NamedNode(n) => Some(PredicatePattern::NamedNode(n.clone())),
_ => None,
};
let object = match &pattern.object {
SparqlTermPattern::Variable(v) => Some(ObjectPattern::Variable(v.clone())),
SparqlTermPattern::NamedNode(n) => Some(ObjectPattern::NamedNode(n.clone())),
SparqlTermPattern::BlankNode(b) => Some(ObjectPattern::BlankNode(b.clone())),
SparqlTermPattern::Literal(l) => Some(ObjectPattern::Literal(l.clone())),
#[cfg(feature = "sparql-12")]
SparqlTermPattern::Triple(_) => {
None
}
};
Ok(crate::model::pattern::TriplePattern {
subject,
predicate,
object,
})
}
#[allow(dead_code)]
fn convert_triple_pattern(
&self,
pattern: &AlgebraTriplePattern,
) -> Result<crate::model::pattern::TriplePattern, OxirsError> {
use crate::model::pattern::*;
let subject = match &pattern.subject {
TermPattern::Variable(v) => Some(SubjectPattern::Variable(v.clone())),
TermPattern::NamedNode(n) => Some(SubjectPattern::NamedNode(n.clone())),
TermPattern::BlankNode(b) => Some(SubjectPattern::BlankNode(b.clone())),
_ => None,
};
let predicate = match &pattern.predicate {
TermPattern::Variable(v) => Some(PredicatePattern::Variable(v.clone())),
TermPattern::NamedNode(n) => Some(PredicatePattern::NamedNode(n.clone())),
_ => None,
};
let object = match &pattern.object {
TermPattern::Variable(v) => Some(ObjectPattern::Variable(v.clone())),
TermPattern::NamedNode(n) => Some(ObjectPattern::NamedNode(n.clone())),
TermPattern::BlankNode(b) => Some(ObjectPattern::BlankNode(b.clone())),
TermPattern::Literal(l) => Some(ObjectPattern::Literal(l.clone())),
TermPattern::QuotedTriple(_) => {
panic!("RDF-star quoted triples not yet fully supported in query module")
}
};
Ok(crate::model::pattern::TriplePattern {
subject,
predicate,
object,
})
}
fn find_join_variables(&self, _left: &ExecutionPlan, _right: &ExecutionPlan) -> Vec<Variable> {
Vec::new()
}
#[allow(clippy::only_used_in_recursion)]
fn convert_expression(
&self,
expr: sparql_algebra::Expression,
) -> Result<AlgebraExpression, OxirsError> {
use sparql_algebra::Expression as SparqlExpr;
use AlgebraExpression as AlgebraExpr;
match expr {
SparqlExpr::NamedNode(n) => Ok(AlgebraExpr::Term(crate::model::Term::NamedNode(n))),
SparqlExpr::Literal(l) => Ok(AlgebraExpr::Term(crate::model::Term::Literal(l))),
SparqlExpr::Variable(v) => Ok(AlgebraExpr::Variable(v)),
SparqlExpr::Or(left, right) => {
let left_expr = self.convert_expression(*left)?;
let right_expr = self.convert_expression(*right)?;
Ok(AlgebraExpr::Or(Box::new(left_expr), Box::new(right_expr)))
}
SparqlExpr::And(left, right) => {
let left_expr = self.convert_expression(*left)?;
let right_expr = self.convert_expression(*right)?;
Ok(AlgebraExpr::And(Box::new(left_expr), Box::new(right_expr)))
}
SparqlExpr::Equal(left, right) => {
let left_expr = self.convert_expression(*left)?;
let right_expr = self.convert_expression(*right)?;
Ok(AlgebraExpr::Equal(
Box::new(left_expr),
Box::new(right_expr),
))
}
SparqlExpr::SameTerm(left, right) => {
let left_expr = self.convert_expression(*left)?;
let right_expr = self.convert_expression(*right)?;
Ok(AlgebraExpr::Equal(
Box::new(left_expr),
Box::new(right_expr),
)) }
SparqlExpr::Greater(left, right) => {
let left_expr = self.convert_expression(*left)?;
let right_expr = self.convert_expression(*right)?;
Ok(AlgebraExpr::Greater(
Box::new(left_expr),
Box::new(right_expr),
))
}
SparqlExpr::GreaterOrEqual(left, right) => {
let left_expr = self.convert_expression(*left)?;
let right_expr = self.convert_expression(*right)?;
Ok(AlgebraExpr::GreaterOrEqual(
Box::new(left_expr),
Box::new(right_expr),
))
}
SparqlExpr::Less(left, right) => {
let left_expr = self.convert_expression(*left)?;
let right_expr = self.convert_expression(*right)?;
Ok(AlgebraExpr::Less(Box::new(left_expr), Box::new(right_expr)))
}
SparqlExpr::LessOrEqual(left, right) => {
let left_expr = self.convert_expression(*left)?;
let right_expr = self.convert_expression(*right)?;
Ok(AlgebraExpr::LessOrEqual(
Box::new(left_expr),
Box::new(right_expr),
))
}
SparqlExpr::Not(inner) => {
let inner_expr = self.convert_expression(*inner)?;
Ok(AlgebraExpr::Not(Box::new(inner_expr)))
}
_ => {
Err(OxirsError::Query(format!(
"Expression type not yet supported in conversion: {expr:?}"
)))
}
}
}
fn extract_variables(&self, pattern: &SparqlGraphPattern) -> Vec<Variable> {
let mut variables = Vec::new();
self.collect_variables_from_pattern(pattern, &mut variables);
variables.sort_by_key(|v: &Variable| v.name().to_owned());
variables.dedup();
variables
}
fn collect_variables_from_pattern(
&self,
pattern: &SparqlGraphPattern,
variables: &mut Vec<Variable>,
) {
match pattern {
SparqlGraphPattern::Bgp { patterns } => {
for triple_pattern in patterns {
self.collect_variables_from_triple_pattern(triple_pattern, variables);
}
}
SparqlGraphPattern::Join { left, right } => {
self.collect_variables_from_pattern(left, variables);
self.collect_variables_from_pattern(right, variables);
}
SparqlGraphPattern::Filter { inner, .. } => {
self.collect_variables_from_pattern(inner, variables);
}
SparqlGraphPattern::Union { left, right } => {
self.collect_variables_from_pattern(left, variables);
self.collect_variables_from_pattern(right, variables);
}
SparqlGraphPattern::Project {
inner,
variables: proj_vars,
} => {
self.collect_variables_from_pattern(inner, variables);
variables.extend(proj_vars.iter().cloned());
}
SparqlGraphPattern::Distinct { inner } => {
self.collect_variables_from_pattern(inner, variables);
}
SparqlGraphPattern::Slice { inner, .. } => {
self.collect_variables_from_pattern(inner, variables);
}
_ => {
}
}
}
fn collect_variables_from_triple_pattern(
&self,
pattern: &SparqlTriplePattern,
variables: &mut Vec<Variable>,
) {
if let SparqlTermPattern::Variable(v) = &pattern.subject {
variables.push(v.clone());
}
if let SparqlTermPattern::Variable(v) = &pattern.predicate {
variables.push(v.clone());
}
if let SparqlTermPattern::Variable(v) = &pattern.object {
variables.push(v.clone());
}
}
fn instantiate_triple_pattern(
&self,
pattern: &SparqlTriplePattern,
solution: &Solution,
) -> Result<Option<crate::model::Triple>, OxirsError> {
use crate::model::*;
let subject = match &pattern.subject {
SparqlTermPattern::Variable(v) => {
if let Some(term) = solution.get(v) {
match term {
Term::NamedNode(n) => Subject::NamedNode(n.clone()),
Term::BlankNode(b) => Subject::BlankNode(b.clone()),
_ => return Ok(None), }
} else {
return Ok(None); }
}
SparqlTermPattern::NamedNode(n) => Subject::NamedNode(n.clone()),
SparqlTermPattern::BlankNode(b) => Subject::BlankNode(b.clone()),
_ => return Ok(None), };
let predicate = match &pattern.predicate {
SparqlTermPattern::Variable(v) => {
if let Some(Term::NamedNode(n)) = solution.get(v) {
Predicate::NamedNode(n.clone())
} else {
return Ok(None); }
}
SparqlTermPattern::NamedNode(n) => Predicate::NamedNode(n.clone()),
_ => return Ok(None), };
let object = match &pattern.object {
SparqlTermPattern::Variable(v) => {
if let Some(term) = solution.get(v) {
match term {
Term::NamedNode(n) => Object::NamedNode(n.clone()),
Term::BlankNode(b) => Object::BlankNode(b.clone()),
Term::Literal(l) => Object::Literal(l.clone()),
_ => return Ok(None), }
} else {
return Ok(None); }
}
SparqlTermPattern::NamedNode(n) => Object::NamedNode(n.clone()),
SparqlTermPattern::BlankNode(b) => Object::BlankNode(b.clone()),
SparqlTermPattern::Literal(l) => Object::Literal(l.clone()),
#[cfg(feature = "sparql-12")]
SparqlTermPattern::Triple(_) => {
return Ok(None);
}
};
Ok(Some(Triple::new(subject, predicate, object)))
}
pub async fn query_async(
&self,
query_str: &str,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
let parsed_query = self.parser.parse(query_str)?;
self.execute_query_async(&parsed_query, store).await
}
pub async fn execute_query_async(
&self,
query: &sparql_query::Query,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
match query {
sparql_query::Query::Select {
pattern, dataset, ..
} => {
self.execute_select_query_async(pattern, dataset.as_ref(), store)
.await
}
sparql_query::Query::Ask {
pattern, dataset, ..
} => {
self.execute_ask_query_async(pattern, dataset.as_ref(), store)
.await
}
sparql_query::Query::Construct {
template,
pattern,
dataset,
..
} => {
self.execute_construct_query_async(template, pattern, dataset.as_ref(), store)
.await
}
sparql_query::Query::Describe {
pattern, dataset, ..
} => {
self.execute_describe_query_async(pattern, dataset.as_ref(), store)
.await
}
}
}
async fn execute_select_query_async(
&self,
pattern: &SparqlGraphPattern,
_dataset: Option<&QueryDataset>,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
if self.contains_service_clause(pattern) {
return self.execute_federated_select(pattern, store).await;
}
self.execute_select_query(pattern, _dataset, store)
}
async fn execute_ask_query_async(
&self,
pattern: &SparqlGraphPattern,
dataset: Option<&QueryDataset>,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
if self.contains_service_clause(pattern) {
let result = self.execute_federated_select(pattern, store).await?;
if let QueryResult::Select { bindings, .. } = result {
return Ok(QueryResult::Ask(!bindings.is_empty()));
}
}
self.execute_ask_query(pattern, dataset, store)
}
async fn execute_construct_query_async(
&self,
template: &[SparqlTriplePattern],
pattern: &SparqlGraphPattern,
dataset: Option<&QueryDataset>,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
if self.contains_service_clause(pattern) {
return Err(OxirsError::Federation(
"CONSTRUCT with SERVICE is not yet fully supported".to_string(),
));
}
self.execute_construct_query(template, pattern, dataset, store)
}
async fn execute_describe_query_async(
&self,
pattern: &SparqlGraphPattern,
dataset: Option<&QueryDataset>,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
if self.contains_service_clause(pattern) {
return Err(OxirsError::Federation(
"DESCRIBE with SERVICE is not yet fully supported".to_string(),
));
}
self.execute_describe_query(pattern, dataset, store)
}
fn contains_service_clause(&self, pattern: &SparqlGraphPattern) -> bool {
matches!(pattern, SparqlGraphPattern::Service { .. })
|| Self::pattern_contains_service_recursive(pattern)
}
fn pattern_contains_service_recursive(pattern: &SparqlGraphPattern) -> bool {
match pattern {
SparqlGraphPattern::Service { .. } => true,
SparqlGraphPattern::Join { left, right }
| SparqlGraphPattern::Union { left, right } => {
Self::pattern_contains_service_recursive(left)
|| Self::pattern_contains_service_recursive(right)
}
SparqlGraphPattern::Filter { inner, .. }
| SparqlGraphPattern::Distinct { inner }
| SparqlGraphPattern::Reduced { inner }
| SparqlGraphPattern::Project { inner, .. } => {
Self::pattern_contains_service_recursive(inner)
}
SparqlGraphPattern::LeftJoin { left, right, .. } => {
Self::pattern_contains_service_recursive(left)
|| Self::pattern_contains_service_recursive(right)
}
_ => false,
}
}
async fn execute_federated_select(
&self,
pattern: &SparqlGraphPattern,
store: &dyn Store,
) -> Result<QueryResult, OxirsError> {
let federation_executor = self.federation_executor.as_ref().ok_or_else(|| {
OxirsError::Federation("Federation executor not available".to_string())
})?;
let local_bindings = Vec::new();
let bindings = self
.execute_pattern_with_federation(pattern, local_bindings, federation_executor, store)
.await?;
let variables = self
.extract_variables(pattern)
.into_iter()
.map(|v| v.name().to_string())
.collect();
Ok(QueryResult::Select {
variables,
bindings,
})
}
fn execute_pattern_with_federation<'a>(
&'a self,
pattern: &'a SparqlGraphPattern,
current_bindings: Vec<HashMap<String, Term>>,
federation_executor: &'a crate::federation::FederationExecutor,
store: &'a dyn Store,
) -> FederatedQueryFuture<'a> {
Box::pin(async move {
let current_bindings = current_bindings;
match pattern {
SparqlGraphPattern::Service {
name,
inner,
silent,
} => {
let remote_bindings = federation_executor
.execute_service(name, inner, *silent, ¤t_bindings)
.await?;
if current_bindings.is_empty() {
Ok(remote_bindings)
} else {
Ok(federation_executor.merge_bindings(current_bindings, remote_bindings))
}
}
SparqlGraphPattern::Join { left, right } => {
let left_bindings = self
.execute_pattern_with_federation(
left,
current_bindings,
federation_executor,
store,
)
.await?;
self.execute_pattern_with_federation(
right,
left_bindings,
federation_executor,
store,
)
.await
}
_ => {
let executor = QueryExecutor::new(store);
let plan = self.pattern_to_plan(pattern)?;
let solutions = executor.execute(&plan)?;
let bindings: Vec<HashMap<String, Term>> = solutions
.into_iter()
.take(self.executor_config.max_results)
.map(|sol| {
sol.iter()
.map(|(var, term)| (var.name().to_string(), term.clone()))
.collect()
})
.collect();
if current_bindings.is_empty() {
Ok(bindings)
} else {
Ok(federation_executor.merge_bindings(current_bindings, bindings))
}
}
}
})
}
fn triple_involves_term(&self, triple: &crate::model::Triple, term: &Term) -> bool {
match term {
Term::NamedNode(n) => {
matches!(triple.subject(), Subject::NamedNode(sn) if sn == n)
|| matches!(triple.predicate(), Predicate::NamedNode(pn) if pn == n)
|| matches!(triple.object(), Object::NamedNode(on) if on == n)
}
Term::BlankNode(b) => {
matches!(triple.subject(), Subject::BlankNode(sb) if sb == b)
|| matches!(triple.object(), Object::BlankNode(ob) if ob == b)
}
Term::Literal(l) => {
matches!(triple.object(), Object::Literal(ol) if ol == l)
}
_ => false,
}
}
}