use super::binding::{Binding, Value, Var};
use super::iterator::{
BindingIterator, IterError, QueryIter, QueryIterBase, QueryIterDistinct, QueryIterFilter,
QueryIterJoin, QueryIterProject, QueryIterSlice, QueryIterSort, QueryIterUnion, SortKey,
};
use super::op::*;
use super::transform::{transform_op, OpStats, TransformPushFilter};
use crate::storage::query::executors::{
Aggregator, AvgAggregator, CountAggregator, CountDistinctAggregator, GroupConcatAggregator,
MaxAggregator, MinAggregator, SampleAggregator, SumAggregator,
};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct QueryContext {
pub timeout: Option<Duration>,
pub limit: Option<u64>,
pub optimization_level: u8,
pub collect_stats: bool,
pub params: HashMap<String, Value>,
}
impl QueryContext {
pub fn new() -> Self {
Self {
timeout: Some(Duration::from_secs(60)),
limit: None,
optimization_level: 1,
collect_stats: false,
params: HashMap::new(),
}
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_limit(mut self, limit: u64) -> Self {
self.limit = Some(limit);
self
}
pub fn with_optimization(mut self, level: u8) -> Self {
self.optimization_level = level;
self
}
pub fn with_stats(mut self) -> Self {
self.collect_stats = true;
self
}
pub fn with_param(mut self, name: &str, value: Value) -> Self {
self.params.insert(name.to_string(), value);
self
}
}
impl Default for QueryContext {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionStats {
pub planning_time: Duration,
pub execution_time: Duration,
pub result_count: u64,
pub bindings_processed: u64,
pub join_count: u64,
pub filter_count: u64,
pub cache_hits: u64,
pub index_lookups: u64,
}
impl ExecutionStats {
pub fn new() -> Self {
Self::default()
}
pub fn merge(&mut self, other: &ExecutionStats) {
self.planning_time += other.planning_time;
self.execution_time += other.execution_time;
self.result_count += other.result_count;
self.bindings_processed += other.bindings_processed;
self.join_count += other.join_count;
self.filter_count += other.filter_count;
self.cache_hits += other.cache_hits;
self.index_lookups += other.index_lookups;
}
}
pub struct QueryResult {
pub iter: QueryIter,
pub stats: Option<ExecutionStats>,
}
impl QueryResult {
pub fn new(iter: QueryIter) -> Self {
Self { iter, stats: None }
}
pub fn with_stats(iter: QueryIter, stats: ExecutionStats) -> Self {
Self {
iter,
stats: Some(stats),
}
}
pub fn collect(self) -> Result<Vec<Binding>, IterError> {
self.iter.collect()
}
pub fn first(mut self) -> Result<Option<Binding>, IterError> {
self.iter.next().transpose()
}
pub fn statistics(&self) -> Option<&ExecutionStats> {
self.stats.as_ref()
}
}
pub trait QueryEngine: Send + Sync {
fn name(&self) -> &str;
fn execute(&self, op: Op, ctx: &QueryContext) -> Result<QueryResult, EngineError>;
fn optimize(&self, op: Op, level: u8) -> Op {
if level == 0 {
return op;
}
let mut push_filter = TransformPushFilter::new();
transform_op(&mut push_filter, op)
}
fn capabilities(&self) -> EngineCapabilities {
EngineCapabilities::default()
}
}
#[derive(Debug, Clone, Default)]
pub struct EngineCapabilities {
pub graph_patterns: bool,
pub aggregation: bool,
pub subqueries: bool,
pub property_paths: bool,
pub updates: bool,
pub transactions: bool,
}
#[derive(Debug, Clone)]
pub enum EngineError {
Unsupported(String),
Execution(String),
Timeout,
InvalidQuery(String),
ResourceLimit(String),
}
impl std::fmt::Display for EngineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EngineError::Unsupported(msg) => write!(f, "Unsupported operation: {}", msg),
EngineError::Execution(msg) => write!(f, "Execution error: {}", msg),
EngineError::Timeout => write!(f, "Query timeout"),
EngineError::InvalidQuery(msg) => write!(f, "Invalid query: {}", msg),
EngineError::ResourceLimit(msg) => write!(f, "Resource limit: {}", msg),
}
}
}
impl std::error::Error for EngineError {}
pub trait QueryEngineFactory: Send + Sync {
fn name(&self) -> &str;
fn create(&self) -> Box<dyn QueryEngine>;
fn accepts(&self, _ctx: &QueryContext) -> bool {
true
}
}
pub struct QueryEngineRegistry {
factories: HashMap<String, Box<dyn QueryEngineFactory>>,
default_engine: Option<String>,
}
mod query_registry_impl;
impl Default for QueryEngineRegistry {
fn default() -> Self {
Self::with_default()
}
}
pub struct InMemoryEngine {
data: Arc<HashMap<String, Vec<Binding>>>,
}
mod in_memory_impl;
fn bindings_share_vars(left: &Binding, right: &Binding) -> bool {
left.all_vars().iter().any(|var| right.contains(var))
}
fn bindings_compatible(left: &Binding, right: &Binding) -> bool {
left.all_vars().iter().all(|var| {
if right.contains(var) {
left.get(var) == right.get(var)
} else {
true
}
})
}
impl Clone for InMemoryEngine {
fn clone(&self) -> Self {
Self {
data: Arc::clone(&self.data),
}
}
}
impl Default for InMemoryEngine {
fn default() -> Self {
Self::new()
}
}
impl QueryEngine for InMemoryEngine {
fn name(&self) -> &str {
"memory"
}
fn execute(&self, op: Op, ctx: &QueryContext) -> Result<QueryResult, EngineError> {
let start = Instant::now();
let optimized = if ctx.optimization_level > 0 {
self.optimize(op, ctx.optimization_level)
} else {
op
};
let planning_time = start.elapsed();
let exec_start = Instant::now();
let iter = self.execute_op(&optimized);
let iter: Box<dyn BindingIterator> = if let Some(limit) = ctx.limit {
Box::new(QueryIterSlice::limit(iter, limit))
} else {
iter
};
let query_iter = QueryIter::new(iter);
let mut stats = ExecutionStats::new();
stats.planning_time = planning_time;
stats.execution_time = exec_start.elapsed();
let op_stats = OpStats::collect(&optimized);
stats.join_count = op_stats.join_count as u64;
stats.filter_count = op_stats.filter_count as u64;
if ctx.collect_stats {
Ok(QueryResult::with_stats(query_iter, stats))
} else {
Ok(QueryResult::new(query_iter))
}
}
fn capabilities(&self) -> EngineCapabilities {
EngineCapabilities {
graph_patterns: true,
aggregation: false, subqueries: true,
property_paths: false,
updates: false,
transactions: false,
}
}
}
pub struct InMemoryEngineFactory;
impl QueryEngineFactory for InMemoryEngineFactory {
fn name(&self) -> &str {
"memory"
}
fn create(&self) -> Box<dyn QueryEngine> {
Box::new(InMemoryEngine::new())
}
}
fn binding_hash(binding: &Binding) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
let mut vars: Vec<_> = binding.all_vars();
vars.sort_by_key(|v| v.name());
for var in vars {
var.name().hash(&mut hasher);
if let Some(value) = binding.get(var) {
match value {
Value::Node(id) => {
"node".hash(&mut hasher);
id.hash(&mut hasher);
}
Value::Edge(id) => {
"edge".hash(&mut hasher);
id.hash(&mut hasher);
}
Value::String(s) => {
"string".hash(&mut hasher);
s.hash(&mut hasher);
}
Value::Integer(i) => {
"int".hash(&mut hasher);
i.hash(&mut hasher);
}
Value::Float(f) => {
"float".hash(&mut hasher);
f.to_bits().hash(&mut hasher);
}
Value::Boolean(b) => {
"bool".hash(&mut hasher);
b.hash(&mut hasher);
}
Value::Uri(u) => {
"uri".hash(&mut hasher);
u.hash(&mut hasher);
}
Value::Null => {
"null".hash(&mut hasher);
}
}
} else {
"unbound".hash(&mut hasher);
}
}
hasher.finish()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_query_context() {
let ctx = QueryContext::new()
.with_timeout(Duration::from_secs(30))
.with_limit(100)
.with_optimization(2)
.with_stats();
assert_eq!(ctx.timeout, Some(Duration::from_secs(30)));
assert_eq!(ctx.limit, Some(100));
assert_eq!(ctx.optimization_level, 2);
assert!(ctx.collect_stats);
}
#[test]
fn test_registry() {
let registry = QueryEngineRegistry::with_default();
assert!(registry.get("memory").is_some());
assert!(registry.get_default().is_some());
}
#[test]
fn test_in_memory_engine_empty() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let bgp = OpBGP::new();
let result = engine.execute(Op::BGP(bgp), &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert!(bindings.is_empty());
}
#[test]
fn test_in_memory_engine_table() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(
vec![Var::new("x"), Var::new("y")],
vec![
vec![Some(Value::Integer(1)), Some(Value::Integer(2))],
vec![Some(Value::Integer(3)), Some(Value::Integer(4))],
],
);
let result = engine.execute(Op::Table(table), &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 2);
}
#[test]
fn test_in_memory_engine_filter() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(
vec![Var::new("x")],
vec![
vec![Some(Value::Integer(1))],
vec![Some(Value::Integer(5))],
vec![Some(Value::Integer(10))],
],
);
let filter = FilterExpr::Gt(
ExprTerm::Var(Var::new("x")),
ExprTerm::Const(Value::Integer(3)),
);
let op = Op::Filter(OpFilter::new(filter, Op::Table(table)));
let result = engine.execute(op, &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 2); }
#[test]
fn test_in_memory_engine_group() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(
vec![Var::new("dept"), Var::new("salary")],
vec![
vec![
Some(Value::String("A".to_string())),
Some(Value::Integer(100)),
],
vec![
Some(Value::String("A".to_string())),
Some(Value::Integer(200)),
],
vec![
Some(Value::String("B".to_string())),
Some(Value::Integer(150)),
],
],
);
let group = OpGroup::new(Op::Table(table), vec![Var::new("dept")]).with_aggregate(
Var::new("total"),
Aggregate::Sum(ExprTerm::Var(Var::new("salary"))),
);
let result = engine.execute(Op::Group(group), &ctx).unwrap();
let mut bindings: Vec<_> = result.collect().unwrap();
bindings.sort_by(|a, b| {
let a_val = a
.get(&Var::new("dept"))
.and_then(|v| match v {
Value::String(s) => Some(s.as_str()),
_ => None,
})
.unwrap_or("");
let b_val = b
.get(&Var::new("dept"))
.and_then(|v| match v {
Value::String(s) => Some(s.as_str()),
_ => None,
})
.unwrap_or("");
a_val.cmp(b_val)
});
assert_eq!(bindings.len(), 2);
let total_a = bindings[0]
.get(&Var::new("total"))
.cloned()
.unwrap_or(Value::Null);
let total_b = bindings[1]
.get(&Var::new("total"))
.cloned()
.unwrap_or(Value::Null);
assert_eq!(total_a, Value::Integer(300));
assert_eq!(total_b, Value::Integer(150));
}
#[test]
fn test_in_memory_engine_extend() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(
vec![Var::new("x")],
vec![vec![Some(Value::Integer(1))], vec![Some(Value::Integer(2))]],
);
let extend = OpExtend::new(
Op::Table(table),
Var::new("xs"),
ExprTerm::Str(Box::new(ExprTerm::Var(Var::new("x")))),
);
let result = engine.execute(Op::Extend(extend), &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 2);
assert_eq!(
bindings[0].get(&Var::new("xs")),
Some(&Value::String("1".to_string()))
);
assert_eq!(
bindings[1].get(&Var::new("xs")),
Some(&Value::String("2".to_string()))
);
}
#[test]
fn test_in_memory_engine_minus() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let left = OpTable::new(
vec![Var::new("x")],
vec![
vec![Some(Value::Integer(1))],
vec![Some(Value::Integer(2))],
vec![Some(Value::Integer(3))],
],
);
let right = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(2))]]);
let minus = OpMinus::new(Op::Table(left), Op::Table(right));
let result = engine.execute(Op::Minus(minus), &ctx).unwrap();
let mut bindings: Vec<_> = result.collect().unwrap();
bindings.sort_by(|a, b| {
let a_val = a
.get(&Var::new("x"))
.and_then(|v| match v {
Value::Integer(i) => Some(*i),
_ => None,
})
.unwrap_or(0);
let b_val = b
.get(&Var::new("x"))
.and_then(|v| match v {
Value::Integer(i) => Some(*i),
_ => None,
})
.unwrap_or(0);
a_val.cmp(&b_val)
});
let values: Vec<i64> = bindings
.iter()
.filter_map(|b| b.get(&Var::new("x")))
.filter_map(|v| match v {
Value::Integer(i) => Some(*i),
_ => None,
})
.collect();
assert_eq!(values, vec![1, 3]);
}
#[test]
fn test_in_memory_engine_minus_shared_vars() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let left = OpTable::new(
vec![Var::new("x"), Var::new("y")],
vec![
vec![Some(Value::Integer(1)), Some(Value::Integer(10))],
vec![Some(Value::Integer(2)), Some(Value::Integer(20))],
],
);
let right = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(1))]]);
let minus = OpMinus::new(Op::Table(left), Op::Table(right));
let result = engine.execute(Op::Minus(minus), &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 1);
assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(2)));
}
#[test]
fn test_in_memory_engine_extend_conflict() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(1))]]);
let extend = OpExtend::new(
Op::Table(table),
Var::new("x"),
ExprTerm::Const(Value::Integer(2)),
);
let result = engine.execute(Op::Extend(extend), &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert!(bindings.is_empty());
}
#[test]
fn test_in_memory_engine_extend_unbound_keeps_row() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(1))]]);
let extend = OpExtend::new(
Op::Table(table),
Var::new("z"),
ExprTerm::Var(Var::new("missing")),
);
let result = engine.execute(Op::Extend(extend), &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 1);
assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(1)));
assert_eq!(bindings[0].get(&Var::new("z")), None);
}
#[test]
fn test_in_memory_engine_slice() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(
vec![Var::new("x")],
(1..=10).map(|i| vec![Some(Value::Integer(i))]).collect(),
);
let op = Op::Slice(OpSlice::new(Op::Table(table), 2, Some(3)));
let result = engine.execute(op, &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 3);
assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(3)));
}
#[test]
fn test_in_memory_engine_project() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(
vec![Var::new("x"), Var::new("y"), Var::new("z")],
vec![vec![
Some(Value::Integer(1)),
Some(Value::Integer(2)),
Some(Value::Integer(3)),
]],
);
let op = Op::Project(OpProject::new(
vec![Var::new("x"), Var::new("z")],
Op::Table(table),
));
let result = engine.execute(op, &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 1);
assert!(bindings[0].contains(&Var::new("x")));
assert!(!bindings[0].contains(&Var::new("y")));
assert!(bindings[0].contains(&Var::new("z")));
}
#[test]
fn test_in_memory_engine_distinct() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(
vec![Var::new("x")],
vec![
vec![Some(Value::Integer(1))],
vec![Some(Value::Integer(2))],
vec![Some(Value::Integer(1))],
vec![Some(Value::Integer(3))],
vec![Some(Value::Integer(2))],
],
);
let op = Op::Distinct(OpDistinct::new(Op::Table(table)));
let result = engine.execute(op, &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 3);
}
#[test]
fn test_in_memory_engine_union() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table1 = OpTable::new(
vec![Var::new("x")],
vec![vec![Some(Value::Integer(1))], vec![Some(Value::Integer(2))]],
);
let table2 = OpTable::new(
vec![Var::new("x")],
vec![vec![Some(Value::Integer(3))], vec![Some(Value::Integer(4))]],
);
let op = Op::Union(OpUnion::new(Op::Table(table1), Op::Table(table2)));
let result = engine.execute(op, &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 4);
}
#[test]
fn test_in_memory_engine_order() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new();
let table = OpTable::new(
vec![Var::new("x")],
vec![
vec![Some(Value::Integer(3))],
vec![Some(Value::Integer(1))],
vec![Some(Value::Integer(2))],
],
);
let op = Op::Order(OpOrder::new(
Op::Table(table),
vec![OrderKey::asc(ExprTerm::Var(Var::new("x")))],
));
let result = engine.execute(op, &ctx).unwrap();
let bindings: Vec<_> = result.collect().unwrap();
assert_eq!(bindings.len(), 3);
assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(1)));
assert_eq!(bindings[1].get(&Var::new("x")), Some(&Value::Integer(2)));
assert_eq!(bindings[2].get(&Var::new("x")), Some(&Value::Integer(3)));
}
#[test]
fn test_engine_with_stats() {
let engine = InMemoryEngine::new();
let ctx = QueryContext::new().with_stats();
let table = OpTable::unit();
let result = engine.execute(Op::Table(table), &ctx).unwrap();
assert!(result.stats.is_some());
}
#[test]
fn test_engine_capabilities() {
let engine = InMemoryEngine::new();
let caps = engine.capabilities();
assert!(caps.graph_patterns);
assert!(caps.subqueries);
assert!(!caps.transactions);
}
}