1use super::binding::{Binding, Value, Var};
12use super::iterator::{
13 BindingIterator, IterError, QueryIter, QueryIterBase, QueryIterDistinct, QueryIterFilter,
14 QueryIterJoin, QueryIterProject, QueryIterSlice, QueryIterSort, QueryIterUnion, SortKey,
15};
16use super::op::*;
17use super::transform::{transform_op, OpStats, TransformPushFilter};
18use crate::storage::query::executors::{
19 Aggregator, AvgAggregator, CountAggregator, CountDistinctAggregator, GroupConcatAggregator,
20 MaxAggregator, MinAggregator, SampleAggregator, SumAggregator,
21};
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26#[derive(Debug, Clone)]
28pub struct QueryContext {
29 pub timeout: Option<Duration>,
31 pub limit: Option<u64>,
33 pub optimization_level: u8,
35 pub collect_stats: bool,
37 pub params: HashMap<String, Value>,
39}
40
41impl QueryContext {
42 pub fn new() -> Self {
44 Self {
45 timeout: Some(Duration::from_secs(60)),
46 limit: None,
47 optimization_level: 1,
48 collect_stats: false,
49 params: HashMap::new(),
50 }
51 }
52
53 pub fn with_timeout(mut self, timeout: Duration) -> Self {
55 self.timeout = Some(timeout);
56 self
57 }
58
59 pub fn with_limit(mut self, limit: u64) -> Self {
61 self.limit = Some(limit);
62 self
63 }
64
65 pub fn with_optimization(mut self, level: u8) -> Self {
67 self.optimization_level = level;
68 self
69 }
70
71 pub fn with_stats(mut self) -> Self {
73 self.collect_stats = true;
74 self
75 }
76
77 pub fn with_param(mut self, name: &str, value: Value) -> Self {
79 self.params.insert(name.to_string(), value);
80 self
81 }
82}
83
84impl Default for QueryContext {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90#[derive(Debug, Clone, Default)]
92pub struct ExecutionStats {
93 pub planning_time: Duration,
95 pub execution_time: Duration,
97 pub result_count: u64,
99 pub bindings_processed: u64,
101 pub join_count: u64,
103 pub filter_count: u64,
105 pub cache_hits: u64,
107 pub index_lookups: u64,
109}
110
111impl ExecutionStats {
112 pub fn new() -> Self {
114 Self::default()
115 }
116
117 pub fn merge(&mut self, other: &ExecutionStats) {
119 self.planning_time += other.planning_time;
120 self.execution_time += other.execution_time;
121 self.result_count += other.result_count;
122 self.bindings_processed += other.bindings_processed;
123 self.join_count += other.join_count;
124 self.filter_count += other.filter_count;
125 self.cache_hits += other.cache_hits;
126 self.index_lookups += other.index_lookups;
127 }
128}
129
130pub struct QueryResult {
132 pub iter: QueryIter,
134 pub stats: Option<ExecutionStats>,
136}
137
138impl QueryResult {
139 pub fn new(iter: QueryIter) -> Self {
141 Self { iter, stats: None }
142 }
143
144 pub fn with_stats(iter: QueryIter, stats: ExecutionStats) -> Self {
146 Self {
147 iter,
148 stats: Some(stats),
149 }
150 }
151
152 pub fn collect(self) -> Result<Vec<Binding>, IterError> {
154 self.iter.collect()
155 }
156
157 pub fn first(mut self) -> Result<Option<Binding>, IterError> {
159 self.iter.next().transpose()
160 }
161
162 pub fn statistics(&self) -> Option<&ExecutionStats> {
164 self.stats.as_ref()
165 }
166}
167
168pub trait QueryEngine: Send + Sync {
170 fn name(&self) -> &str;
172
173 fn execute(&self, op: Op, ctx: &QueryContext) -> Result<QueryResult, EngineError>;
175
176 fn optimize(&self, op: Op, level: u8) -> Op {
178 if level == 0 {
179 return op;
180 }
181
182 let mut push_filter = TransformPushFilter::new();
184 transform_op(&mut push_filter, op)
185 }
186
187 fn capabilities(&self) -> EngineCapabilities {
189 EngineCapabilities::default()
190 }
191}
192
193#[derive(Debug, Clone, Default)]
195pub struct EngineCapabilities {
196 pub graph_patterns: bool,
198 pub aggregation: bool,
200 pub subqueries: bool,
202 pub property_paths: bool,
204 pub updates: bool,
206 pub transactions: bool,
208}
209
210#[derive(Debug, Clone)]
212pub enum EngineError {
213 Unsupported(String),
215 Execution(String),
217 Timeout,
219 InvalidQuery(String),
221 ResourceLimit(String),
223}
224
225impl std::fmt::Display for EngineError {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 match self {
228 EngineError::Unsupported(msg) => write!(f, "Unsupported operation: {}", msg),
229 EngineError::Execution(msg) => write!(f, "Execution error: {}", msg),
230 EngineError::Timeout => write!(f, "Query timeout"),
231 EngineError::InvalidQuery(msg) => write!(f, "Invalid query: {}", msg),
232 EngineError::ResourceLimit(msg) => write!(f, "Resource limit: {}", msg),
233 }
234 }
235}
236
237impl std::error::Error for EngineError {}
238
239pub trait QueryEngineFactory: Send + Sync {
241 fn name(&self) -> &str;
243
244 fn create(&self) -> Box<dyn QueryEngine>;
246
247 fn accepts(&self, _ctx: &QueryContext) -> bool {
249 true
250 }
251}
252
253pub struct QueryEngineRegistry {
255 factories: HashMap<String, Box<dyn QueryEngineFactory>>,
257 default_engine: Option<String>,
259}
260
261mod query_registry_impl;
262
263impl Default for QueryEngineRegistry {
264 fn default() -> Self {
265 Self::with_default()
266 }
267}
268
269pub struct InMemoryEngine {
271 data: Arc<HashMap<String, Vec<Binding>>>,
273}
274
275mod in_memory_impl;
276fn bindings_share_vars(left: &Binding, right: &Binding) -> bool {
277 left.all_vars().iter().any(|var| right.contains(var))
278}
279
280fn bindings_compatible(left: &Binding, right: &Binding) -> bool {
281 left.all_vars().iter().all(|var| {
282 if right.contains(var) {
283 left.get(var) == right.get(var)
284 } else {
285 true
286 }
287 })
288}
289
290impl Clone for InMemoryEngine {
291 fn clone(&self) -> Self {
292 Self {
293 data: Arc::clone(&self.data),
294 }
295 }
296}
297
298impl Default for InMemoryEngine {
299 fn default() -> Self {
300 Self::new()
301 }
302}
303
304impl QueryEngine for InMemoryEngine {
305 fn name(&self) -> &str {
306 "memory"
307 }
308
309 fn execute(&self, op: Op, ctx: &QueryContext) -> Result<QueryResult, EngineError> {
310 let start = Instant::now();
311
312 let optimized = if ctx.optimization_level > 0 {
314 self.optimize(op, ctx.optimization_level)
315 } else {
316 op
317 };
318
319 let planning_time = start.elapsed();
320
321 let exec_start = Instant::now();
323 let iter = self.execute_op(&optimized);
324
325 let iter: Box<dyn BindingIterator> = if let Some(limit) = ctx.limit {
327 Box::new(QueryIterSlice::limit(iter, limit))
328 } else {
329 iter
330 };
331
332 let query_iter = QueryIter::new(iter);
333
334 let mut stats = ExecutionStats::new();
335 stats.planning_time = planning_time;
336 stats.execution_time = exec_start.elapsed();
337
338 let op_stats = OpStats::collect(&optimized);
340 stats.join_count = op_stats.join_count as u64;
341 stats.filter_count = op_stats.filter_count as u64;
342
343 if ctx.collect_stats {
344 Ok(QueryResult::with_stats(query_iter, stats))
345 } else {
346 Ok(QueryResult::new(query_iter))
347 }
348 }
349
350 fn capabilities(&self) -> EngineCapabilities {
351 EngineCapabilities {
352 graph_patterns: true,
353 aggregation: false, subqueries: true,
355 property_paths: false,
356 updates: false,
357 transactions: false,
358 }
359 }
360}
361
362pub struct InMemoryEngineFactory;
364
365impl QueryEngineFactory for InMemoryEngineFactory {
366 fn name(&self) -> &str {
367 "memory"
368 }
369
370 fn create(&self) -> Box<dyn QueryEngine> {
371 Box::new(InMemoryEngine::new())
372 }
373}
374
375fn binding_hash(binding: &Binding) -> u64 {
377 use std::collections::hash_map::DefaultHasher;
378 use std::hash::{Hash, Hasher};
379
380 let mut hasher = DefaultHasher::new();
381
382 let mut vars: Vec<_> = binding.all_vars();
384 vars.sort_by_key(|v| v.name());
385
386 for var in vars {
387 var.name().hash(&mut hasher);
388 if let Some(value) = binding.get(var) {
390 match value {
391 Value::Node(id) => {
392 "node".hash(&mut hasher);
393 id.hash(&mut hasher);
394 }
395 Value::Edge(id) => {
396 "edge".hash(&mut hasher);
397 id.hash(&mut hasher);
398 }
399 Value::String(s) => {
400 "string".hash(&mut hasher);
401 s.hash(&mut hasher);
402 }
403 Value::Integer(i) => {
404 "int".hash(&mut hasher);
405 i.hash(&mut hasher);
406 }
407 Value::Float(f) => {
408 "float".hash(&mut hasher);
409 f.to_bits().hash(&mut hasher);
410 }
411 Value::Boolean(b) => {
412 "bool".hash(&mut hasher);
413 b.hash(&mut hasher);
414 }
415 Value::Uri(u) => {
416 "uri".hash(&mut hasher);
417 u.hash(&mut hasher);
418 }
419 Value::Null => {
420 "null".hash(&mut hasher);
421 }
422 }
423 } else {
424 "unbound".hash(&mut hasher);
425 }
426 }
427
428 hasher.finish()
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434
435 #[test]
436 fn test_query_context() {
437 let ctx = QueryContext::new()
438 .with_timeout(Duration::from_secs(30))
439 .with_limit(100)
440 .with_optimization(2)
441 .with_stats();
442
443 assert_eq!(ctx.timeout, Some(Duration::from_secs(30)));
444 assert_eq!(ctx.limit, Some(100));
445 assert_eq!(ctx.optimization_level, 2);
446 assert!(ctx.collect_stats);
447 }
448
449 #[test]
450 fn test_registry() {
451 let registry = QueryEngineRegistry::with_default();
452 assert!(registry.get("memory").is_some());
453 assert!(registry.get_default().is_some());
454 }
455
456 #[test]
457 fn test_in_memory_engine_empty() {
458 let engine = InMemoryEngine::new();
459 let ctx = QueryContext::new();
460
461 let bgp = OpBGP::new();
462 let result = engine.execute(Op::BGP(bgp), &ctx).unwrap();
463
464 let bindings: Vec<_> = result.collect().unwrap();
465 assert!(bindings.is_empty());
466 }
467
468 #[test]
469 fn test_in_memory_engine_table() {
470 let engine = InMemoryEngine::new();
471 let ctx = QueryContext::new();
472
473 let table = OpTable::new(
474 vec![Var::new("x"), Var::new("y")],
475 vec![
476 vec![Some(Value::Integer(1)), Some(Value::Integer(2))],
477 vec![Some(Value::Integer(3)), Some(Value::Integer(4))],
478 ],
479 );
480
481 let result = engine.execute(Op::Table(table), &ctx).unwrap();
482 let bindings: Vec<_> = result.collect().unwrap();
483
484 assert_eq!(bindings.len(), 2);
485 }
486
487 #[test]
488 fn test_in_memory_engine_filter() {
489 let engine = InMemoryEngine::new();
490 let ctx = QueryContext::new();
491
492 let table = OpTable::new(
493 vec![Var::new("x")],
494 vec![
495 vec![Some(Value::Integer(1))],
496 vec![Some(Value::Integer(5))],
497 vec![Some(Value::Integer(10))],
498 ],
499 );
500
501 let filter = FilterExpr::Gt(
502 ExprTerm::Var(Var::new("x")),
503 ExprTerm::Const(Value::Integer(3)),
504 );
505
506 let op = Op::Filter(OpFilter::new(filter, Op::Table(table)));
507
508 let result = engine.execute(op, &ctx).unwrap();
509 let bindings: Vec<_> = result.collect().unwrap();
510
511 assert_eq!(bindings.len(), 2); }
513
514 #[test]
515 fn test_in_memory_engine_group() {
516 let engine = InMemoryEngine::new();
517 let ctx = QueryContext::new();
518
519 let table = OpTable::new(
520 vec![Var::new("dept"), Var::new("salary")],
521 vec![
522 vec![
523 Some(Value::String("A".to_string())),
524 Some(Value::Integer(100)),
525 ],
526 vec![
527 Some(Value::String("A".to_string())),
528 Some(Value::Integer(200)),
529 ],
530 vec![
531 Some(Value::String("B".to_string())),
532 Some(Value::Integer(150)),
533 ],
534 ],
535 );
536
537 let group = OpGroup::new(Op::Table(table), vec![Var::new("dept")]).with_aggregate(
538 Var::new("total"),
539 Aggregate::Sum(ExprTerm::Var(Var::new("salary"))),
540 );
541
542 let result = engine.execute(Op::Group(group), &ctx).unwrap();
543 let mut bindings: Vec<_> = result.collect().unwrap();
544 bindings.sort_by(|a, b| {
545 let a_val = a
546 .get(&Var::new("dept"))
547 .and_then(|v| match v {
548 Value::String(s) => Some(s.as_str()),
549 _ => None,
550 })
551 .unwrap_or("");
552 let b_val = b
553 .get(&Var::new("dept"))
554 .and_then(|v| match v {
555 Value::String(s) => Some(s.as_str()),
556 _ => None,
557 })
558 .unwrap_or("");
559 a_val.cmp(b_val)
560 });
561
562 assert_eq!(bindings.len(), 2);
563 let total_a = bindings[0]
564 .get(&Var::new("total"))
565 .cloned()
566 .unwrap_or(Value::Null);
567 let total_b = bindings[1]
568 .get(&Var::new("total"))
569 .cloned()
570 .unwrap_or(Value::Null);
571 assert_eq!(total_a, Value::Integer(300));
572 assert_eq!(total_b, Value::Integer(150));
573 }
574
575 #[test]
576 fn test_in_memory_engine_extend() {
577 let engine = InMemoryEngine::new();
578 let ctx = QueryContext::new();
579
580 let table = OpTable::new(
581 vec![Var::new("x")],
582 vec![vec![Some(Value::Integer(1))], vec![Some(Value::Integer(2))]],
583 );
584
585 let extend = OpExtend::new(
586 Op::Table(table),
587 Var::new("xs"),
588 ExprTerm::Str(Box::new(ExprTerm::Var(Var::new("x")))),
589 );
590
591 let result = engine.execute(Op::Extend(extend), &ctx).unwrap();
592 let bindings: Vec<_> = result.collect().unwrap();
593
594 assert_eq!(bindings.len(), 2);
595 assert_eq!(
596 bindings[0].get(&Var::new("xs")),
597 Some(&Value::String("1".to_string()))
598 );
599 assert_eq!(
600 bindings[1].get(&Var::new("xs")),
601 Some(&Value::String("2".to_string()))
602 );
603 }
604
605 #[test]
606 fn test_in_memory_engine_minus() {
607 let engine = InMemoryEngine::new();
608 let ctx = QueryContext::new();
609
610 let left = OpTable::new(
611 vec![Var::new("x")],
612 vec![
613 vec![Some(Value::Integer(1))],
614 vec![Some(Value::Integer(2))],
615 vec![Some(Value::Integer(3))],
616 ],
617 );
618
619 let right = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(2))]]);
620
621 let minus = OpMinus::new(Op::Table(left), Op::Table(right));
622 let result = engine.execute(Op::Minus(minus), &ctx).unwrap();
623 let mut bindings: Vec<_> = result.collect().unwrap();
624 bindings.sort_by(|a, b| {
625 let a_val = a
626 .get(&Var::new("x"))
627 .and_then(|v| match v {
628 Value::Integer(i) => Some(*i),
629 _ => None,
630 })
631 .unwrap_or(0);
632 let b_val = b
633 .get(&Var::new("x"))
634 .and_then(|v| match v {
635 Value::Integer(i) => Some(*i),
636 _ => None,
637 })
638 .unwrap_or(0);
639 a_val.cmp(&b_val)
640 });
641
642 let values: Vec<i64> = bindings
643 .iter()
644 .filter_map(|b| b.get(&Var::new("x")))
645 .filter_map(|v| match v {
646 Value::Integer(i) => Some(*i),
647 _ => None,
648 })
649 .collect();
650
651 assert_eq!(values, vec![1, 3]);
652 }
653
654 #[test]
655 fn test_in_memory_engine_minus_shared_vars() {
656 let engine = InMemoryEngine::new();
657 let ctx = QueryContext::new();
658
659 let left = OpTable::new(
660 vec![Var::new("x"), Var::new("y")],
661 vec![
662 vec![Some(Value::Integer(1)), Some(Value::Integer(10))],
663 vec![Some(Value::Integer(2)), Some(Value::Integer(20))],
664 ],
665 );
666
667 let right = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(1))]]);
668
669 let minus = OpMinus::new(Op::Table(left), Op::Table(right));
670 let result = engine.execute(Op::Minus(minus), &ctx).unwrap();
671 let bindings: Vec<_> = result.collect().unwrap();
672
673 assert_eq!(bindings.len(), 1);
674 assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(2)));
675 }
676
677 #[test]
678 fn test_in_memory_engine_extend_conflict() {
679 let engine = InMemoryEngine::new();
680 let ctx = QueryContext::new();
681
682 let table = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(1))]]);
683
684 let extend = OpExtend::new(
685 Op::Table(table),
686 Var::new("x"),
687 ExprTerm::Const(Value::Integer(2)),
688 );
689
690 let result = engine.execute(Op::Extend(extend), &ctx).unwrap();
691 let bindings: Vec<_> = result.collect().unwrap();
692
693 assert!(bindings.is_empty());
694 }
695
696 #[test]
697 fn test_in_memory_engine_extend_unbound_keeps_row() {
698 let engine = InMemoryEngine::new();
699 let ctx = QueryContext::new();
700
701 let table = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(1))]]);
702
703 let extend = OpExtend::new(
704 Op::Table(table),
705 Var::new("z"),
706 ExprTerm::Var(Var::new("missing")),
707 );
708
709 let result = engine.execute(Op::Extend(extend), &ctx).unwrap();
710 let bindings: Vec<_> = result.collect().unwrap();
711
712 assert_eq!(bindings.len(), 1);
713 assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(1)));
714 assert_eq!(bindings[0].get(&Var::new("z")), None);
715 }
716
717 #[test]
718 fn test_in_memory_engine_slice() {
719 let engine = InMemoryEngine::new();
720 let ctx = QueryContext::new();
721
722 let table = OpTable::new(
723 vec![Var::new("x")],
724 (1..=10).map(|i| vec![Some(Value::Integer(i))]).collect(),
725 );
726
727 let op = Op::Slice(OpSlice::new(Op::Table(table), 2, Some(3)));
728
729 let result = engine.execute(op, &ctx).unwrap();
730 let bindings: Vec<_> = result.collect().unwrap();
731
732 assert_eq!(bindings.len(), 3);
733 assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(3)));
734 }
735
736 #[test]
737 fn test_in_memory_engine_project() {
738 let engine = InMemoryEngine::new();
739 let ctx = QueryContext::new();
740
741 let table = OpTable::new(
742 vec![Var::new("x"), Var::new("y"), Var::new("z")],
743 vec![vec![
744 Some(Value::Integer(1)),
745 Some(Value::Integer(2)),
746 Some(Value::Integer(3)),
747 ]],
748 );
749
750 let op = Op::Project(OpProject::new(
751 vec![Var::new("x"), Var::new("z")],
752 Op::Table(table),
753 ));
754
755 let result = engine.execute(op, &ctx).unwrap();
756 let bindings: Vec<_> = result.collect().unwrap();
757
758 assert_eq!(bindings.len(), 1);
759 assert!(bindings[0].contains(&Var::new("x")));
760 assert!(!bindings[0].contains(&Var::new("y")));
761 assert!(bindings[0].contains(&Var::new("z")));
762 }
763
764 #[test]
765 fn test_in_memory_engine_distinct() {
766 let engine = InMemoryEngine::new();
767 let ctx = QueryContext::new();
768
769 let table = OpTable::new(
770 vec![Var::new("x")],
771 vec![
772 vec![Some(Value::Integer(1))],
773 vec![Some(Value::Integer(2))],
774 vec![Some(Value::Integer(1))],
775 vec![Some(Value::Integer(3))],
776 vec![Some(Value::Integer(2))],
777 ],
778 );
779
780 let op = Op::Distinct(OpDistinct::new(Op::Table(table)));
781
782 let result = engine.execute(op, &ctx).unwrap();
783 let bindings: Vec<_> = result.collect().unwrap();
784
785 assert_eq!(bindings.len(), 3);
786 }
787
788 #[test]
789 fn test_in_memory_engine_union() {
790 let engine = InMemoryEngine::new();
791 let ctx = QueryContext::new();
792
793 let table1 = OpTable::new(
794 vec![Var::new("x")],
795 vec![vec![Some(Value::Integer(1))], vec![Some(Value::Integer(2))]],
796 );
797
798 let table2 = OpTable::new(
799 vec![Var::new("x")],
800 vec![vec![Some(Value::Integer(3))], vec![Some(Value::Integer(4))]],
801 );
802
803 let op = Op::Union(OpUnion::new(Op::Table(table1), Op::Table(table2)));
804
805 let result = engine.execute(op, &ctx).unwrap();
806 let bindings: Vec<_> = result.collect().unwrap();
807
808 assert_eq!(bindings.len(), 4);
809 }
810
811 #[test]
812 fn test_in_memory_engine_order() {
813 let engine = InMemoryEngine::new();
814 let ctx = QueryContext::new();
815
816 let table = OpTable::new(
817 vec![Var::new("x")],
818 vec![
819 vec![Some(Value::Integer(3))],
820 vec![Some(Value::Integer(1))],
821 vec![Some(Value::Integer(2))],
822 ],
823 );
824
825 let op = Op::Order(OpOrder::new(
826 Op::Table(table),
827 vec![OrderKey::asc(ExprTerm::Var(Var::new("x")))],
828 ));
829
830 let result = engine.execute(op, &ctx).unwrap();
831 let bindings: Vec<_> = result.collect().unwrap();
832
833 assert_eq!(bindings.len(), 3);
834 assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(1)));
835 assert_eq!(bindings[1].get(&Var::new("x")), Some(&Value::Integer(2)));
836 assert_eq!(bindings[2].get(&Var::new("x")), Some(&Value::Integer(3)));
837 }
838
839 #[test]
840 fn test_engine_with_stats() {
841 let engine = InMemoryEngine::new();
842 let ctx = QueryContext::new().with_stats();
843
844 let table = OpTable::unit();
845 let result = engine.execute(Op::Table(table), &ctx).unwrap();
846
847 assert!(result.stats.is_some());
848 }
849
850 #[test]
851 fn test_engine_capabilities() {
852 let engine = InMemoryEngine::new();
853 let caps = engine.capabilities();
854
855 assert!(caps.graph_patterns);
856 assert!(caps.subqueries);
857 assert!(!caps.transactions);
858 }
859}