Skip to main content

reddb_server/storage/query/engine/
registry.rs

1//! Query Engine Registry
2//!
3//! Central registry for query execution engines.
4//!
5//! # Design
6//!
7//! - Factory pattern for creating engines
8//! - Registry for engine lookup by name
9//! - Engine abstraction over different backends
10
11use super::binding::{Binding, Value, Var};
12use super::iterator::{
13    BindingIterator, IterError, QueryIter, QueryIterBase, QueryIterDistinct, QueryIterFilter,
14    QueryIterJoin, QueryIterProject, QueryIterSlice, QueryIterSort, QueryIterUnion, SortKey,
15};
16use super::op::*;
17use super::transform::{transform_op, OpStats, TransformPushFilter};
18use crate::storage::query::executors::{
19    Aggregator, AvgAggregator, CountAggregator, CountDistinctAggregator, GroupConcatAggregator,
20    MaxAggregator, MinAggregator, SampleAggregator, SumAggregator,
21};
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26/// Query execution context
27#[derive(Debug, Clone)]
28pub struct QueryContext {
29    /// Query timeout
30    pub timeout: Option<Duration>,
31    /// Maximum results
32    pub limit: Option<u64>,
33    /// Optimization level (0 = none, 1 = basic, 2 = aggressive)
34    pub optimization_level: u8,
35    /// Collect statistics
36    pub collect_stats: bool,
37    /// Custom parameters
38    pub params: HashMap<String, Value>,
39}
40
41impl QueryContext {
42    /// Create default context
43    pub fn new() -> Self {
44        Self {
45            timeout: Some(Duration::from_secs(60)),
46            limit: None,
47            optimization_level: 1,
48            collect_stats: false,
49            params: HashMap::new(),
50        }
51    }
52
53    /// Set timeout
54    pub fn with_timeout(mut self, timeout: Duration) -> Self {
55        self.timeout = Some(timeout);
56        self
57    }
58
59    /// Set limit
60    pub fn with_limit(mut self, limit: u64) -> Self {
61        self.limit = Some(limit);
62        self
63    }
64
65    /// Set optimization level
66    pub fn with_optimization(mut self, level: u8) -> Self {
67        self.optimization_level = level;
68        self
69    }
70
71    /// Enable statistics collection
72    pub fn with_stats(mut self) -> Self {
73        self.collect_stats = true;
74        self
75    }
76
77    /// Add parameter
78    pub fn with_param(mut self, name: &str, value: Value) -> Self {
79        self.params.insert(name.to_string(), value);
80        self
81    }
82}
83
84impl Default for QueryContext {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90/// Query execution statistics
91#[derive(Debug, Clone, Default)]
92pub struct ExecutionStats {
93    /// Planning time
94    pub planning_time: Duration,
95    /// Execution time
96    pub execution_time: Duration,
97    /// Result count
98    pub result_count: u64,
99    /// Bindings processed
100    pub bindings_processed: u64,
101    /// Join operations
102    pub join_count: u64,
103    /// Filter operations
104    pub filter_count: u64,
105    /// Cache hits
106    pub cache_hits: u64,
107    /// Index lookups
108    pub index_lookups: u64,
109}
110
111impl ExecutionStats {
112    /// Create new stats
113    pub fn new() -> Self {
114        Self::default()
115    }
116
117    /// Merge with another stats
118    pub fn merge(&mut self, other: &ExecutionStats) {
119        self.planning_time += other.planning_time;
120        self.execution_time += other.execution_time;
121        self.result_count += other.result_count;
122        self.bindings_processed += other.bindings_processed;
123        self.join_count += other.join_count;
124        self.filter_count += other.filter_count;
125        self.cache_hits += other.cache_hits;
126        self.index_lookups += other.index_lookups;
127    }
128}
129
130/// Query execution result
131pub struct QueryResult {
132    /// Result iterator
133    pub iter: QueryIter,
134    /// Execution statistics
135    pub stats: Option<ExecutionStats>,
136}
137
138impl QueryResult {
139    /// Create result
140    pub fn new(iter: QueryIter) -> Self {
141        Self { iter, stats: None }
142    }
143
144    /// Create result with stats
145    pub fn with_stats(iter: QueryIter, stats: ExecutionStats) -> Self {
146        Self {
147            iter,
148            stats: Some(stats),
149        }
150    }
151
152    /// Collect all results
153    pub fn collect(self) -> Result<Vec<Binding>, IterError> {
154        self.iter.collect()
155    }
156
157    /// Get first result
158    pub fn first(mut self) -> Result<Option<Binding>, IterError> {
159        self.iter.next().transpose()
160    }
161
162    /// Get statistics
163    pub fn statistics(&self) -> Option<&ExecutionStats> {
164        self.stats.as_ref()
165    }
166}
167
168/// Query engine trait
169pub trait QueryEngine: Send + Sync {
170    /// Engine name
171    fn name(&self) -> &str;
172
173    /// Execute an Op tree
174    fn execute(&self, op: Op, ctx: &QueryContext) -> Result<QueryResult, EngineError>;
175
176    /// Optimize an Op tree
177    fn optimize(&self, op: Op, level: u8) -> Op {
178        if level == 0 {
179            return op;
180        }
181
182        // Apply standard optimizations
183        let mut push_filter = TransformPushFilter::new();
184        transform_op(&mut push_filter, op)
185    }
186
187    /// Get engine capabilities
188    fn capabilities(&self) -> EngineCapabilities {
189        EngineCapabilities::default()
190    }
191}
192
193/// Engine capabilities
194#[derive(Debug, Clone, Default)]
195pub struct EngineCapabilities {
196    /// Supports graph patterns
197    pub graph_patterns: bool,
198    /// Supports aggregation
199    pub aggregation: bool,
200    /// Supports subqueries
201    pub subqueries: bool,
202    /// Supports property paths
203    pub property_paths: bool,
204    /// Supports updates
205    pub updates: bool,
206    /// Supports transactions
207    pub transactions: bool,
208}
209
210/// Engine errors
211#[derive(Debug, Clone)]
212pub enum EngineError {
213    /// Unsupported operation
214    Unsupported(String),
215    /// Execution error
216    Execution(String),
217    /// Timeout
218    Timeout,
219    /// Invalid query
220    InvalidQuery(String),
221    /// Resource limit exceeded
222    ResourceLimit(String),
223}
224
225impl std::fmt::Display for EngineError {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        match self {
228            EngineError::Unsupported(msg) => write!(f, "Unsupported operation: {}", msg),
229            EngineError::Execution(msg) => write!(f, "Execution error: {}", msg),
230            EngineError::Timeout => write!(f, "Query timeout"),
231            EngineError::InvalidQuery(msg) => write!(f, "Invalid query: {}", msg),
232            EngineError::ResourceLimit(msg) => write!(f, "Resource limit: {}", msg),
233        }
234    }
235}
236
237impl std::error::Error for EngineError {}
238
239/// Factory for creating query engines
240pub trait QueryEngineFactory: Send + Sync {
241    /// Factory name
242    fn name(&self) -> &str;
243
244    /// Create engine instance
245    fn create(&self) -> Box<dyn QueryEngine>;
246
247    /// Check if factory can create engine for this context
248    fn accepts(&self, _ctx: &QueryContext) -> bool {
249        true
250    }
251}
252
253/// Engine registry
254pub struct QueryEngineRegistry {
255    /// Registered factories
256    factories: HashMap<String, Box<dyn QueryEngineFactory>>,
257    /// Default engine name
258    default_engine: Option<String>,
259}
260
261mod query_registry_impl;
262
263impl Default for QueryEngineRegistry {
264    fn default() -> Self {
265        Self::with_default()
266    }
267}
268
269/// In-memory query engine
270pub struct InMemoryEngine {
271    /// Data store (BGP patterns map to bindings)
272    data: Arc<HashMap<String, Vec<Binding>>>,
273}
274
275mod in_memory_impl;
276fn bindings_share_vars(left: &Binding, right: &Binding) -> bool {
277    left.all_vars().iter().any(|var| right.contains(var))
278}
279
280fn bindings_compatible(left: &Binding, right: &Binding) -> bool {
281    left.all_vars().iter().all(|var| {
282        if right.contains(var) {
283            left.get(var) == right.get(var)
284        } else {
285            true
286        }
287    })
288}
289
290impl Clone for InMemoryEngine {
291    fn clone(&self) -> Self {
292        Self {
293            data: Arc::clone(&self.data),
294        }
295    }
296}
297
298impl Default for InMemoryEngine {
299    fn default() -> Self {
300        Self::new()
301    }
302}
303
304impl QueryEngine for InMemoryEngine {
305    fn name(&self) -> &str {
306        "memory"
307    }
308
309    fn execute(&self, op: Op, ctx: &QueryContext) -> Result<QueryResult, EngineError> {
310        let start = Instant::now();
311
312        // Optimize if requested
313        let optimized = if ctx.optimization_level > 0 {
314            self.optimize(op, ctx.optimization_level)
315        } else {
316            op
317        };
318
319        let planning_time = start.elapsed();
320
321        // Execute
322        let exec_start = Instant::now();
323        let iter = self.execute_op(&optimized);
324
325        // Apply context limit if set
326        let iter: Box<dyn BindingIterator> = if let Some(limit) = ctx.limit {
327            Box::new(QueryIterSlice::limit(iter, limit))
328        } else {
329            iter
330        };
331
332        let query_iter = QueryIter::new(iter);
333
334        let mut stats = ExecutionStats::new();
335        stats.planning_time = planning_time;
336        stats.execution_time = exec_start.elapsed();
337
338        // Collect op statistics
339        let op_stats = OpStats::collect(&optimized);
340        stats.join_count = op_stats.join_count as u64;
341        stats.filter_count = op_stats.filter_count as u64;
342
343        if ctx.collect_stats {
344            Ok(QueryResult::with_stats(query_iter, stats))
345        } else {
346            Ok(QueryResult::new(query_iter))
347        }
348    }
349
350    fn capabilities(&self) -> EngineCapabilities {
351        EngineCapabilities {
352            graph_patterns: true,
353            aggregation: false, // Not fully implemented
354            subqueries: true,
355            property_paths: false,
356            updates: false,
357            transactions: false,
358        }
359    }
360}
361
362/// Factory for in-memory engine
363pub struct InMemoryEngineFactory;
364
365impl QueryEngineFactory for InMemoryEngineFactory {
366    fn name(&self) -> &str {
367        "memory"
368    }
369
370    fn create(&self) -> Box<dyn QueryEngine> {
371        Box::new(InMemoryEngine::new())
372    }
373}
374
375/// Compute a hash for a binding (for set operations)
376fn binding_hash(binding: &Binding) -> u64 {
377    use std::collections::hash_map::DefaultHasher;
378    use std::hash::{Hash, Hasher};
379
380    let mut hasher = DefaultHasher::new();
381
382    // Get sorted keys for deterministic ordering
383    let mut vars: Vec<_> = binding.all_vars();
384    vars.sort_by_key(|v| v.name());
385
386    for var in vars {
387        var.name().hash(&mut hasher);
388        // Hash value based on type
389        if let Some(value) = binding.get(var) {
390            match value {
391                Value::Node(id) => {
392                    "node".hash(&mut hasher);
393                    id.hash(&mut hasher);
394                }
395                Value::Edge(id) => {
396                    "edge".hash(&mut hasher);
397                    id.hash(&mut hasher);
398                }
399                Value::String(s) => {
400                    "string".hash(&mut hasher);
401                    s.hash(&mut hasher);
402                }
403                Value::Integer(i) => {
404                    "int".hash(&mut hasher);
405                    i.hash(&mut hasher);
406                }
407                Value::Float(f) => {
408                    "float".hash(&mut hasher);
409                    f.to_bits().hash(&mut hasher);
410                }
411                Value::Boolean(b) => {
412                    "bool".hash(&mut hasher);
413                    b.hash(&mut hasher);
414                }
415                Value::Uri(u) => {
416                    "uri".hash(&mut hasher);
417                    u.hash(&mut hasher);
418                }
419                Value::Null => {
420                    "null".hash(&mut hasher);
421                }
422            }
423        } else {
424            "unbound".hash(&mut hasher);
425        }
426    }
427
428    hasher.finish()
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    #[test]
436    fn test_query_context() {
437        let ctx = QueryContext::new()
438            .with_timeout(Duration::from_secs(30))
439            .with_limit(100)
440            .with_optimization(2)
441            .with_stats();
442
443        assert_eq!(ctx.timeout, Some(Duration::from_secs(30)));
444        assert_eq!(ctx.limit, Some(100));
445        assert_eq!(ctx.optimization_level, 2);
446        assert!(ctx.collect_stats);
447    }
448
449    #[test]
450    fn test_registry() {
451        let registry = QueryEngineRegistry::with_default();
452        assert!(registry.get("memory").is_some());
453        assert!(registry.get_default().is_some());
454    }
455
456    #[test]
457    fn test_in_memory_engine_empty() {
458        let engine = InMemoryEngine::new();
459        let ctx = QueryContext::new();
460
461        let bgp = OpBGP::new();
462        let result = engine.execute(Op::BGP(bgp), &ctx).unwrap();
463
464        let bindings: Vec<_> = result.collect().unwrap();
465        assert!(bindings.is_empty());
466    }
467
468    #[test]
469    fn test_in_memory_engine_table() {
470        let engine = InMemoryEngine::new();
471        let ctx = QueryContext::new();
472
473        let table = OpTable::new(
474            vec![Var::new("x"), Var::new("y")],
475            vec![
476                vec![Some(Value::Integer(1)), Some(Value::Integer(2))],
477                vec![Some(Value::Integer(3)), Some(Value::Integer(4))],
478            ],
479        );
480
481        let result = engine.execute(Op::Table(table), &ctx).unwrap();
482        let bindings: Vec<_> = result.collect().unwrap();
483
484        assert_eq!(bindings.len(), 2);
485    }
486
487    #[test]
488    fn test_in_memory_engine_filter() {
489        let engine = InMemoryEngine::new();
490        let ctx = QueryContext::new();
491
492        let table = OpTable::new(
493            vec![Var::new("x")],
494            vec![
495                vec![Some(Value::Integer(1))],
496                vec![Some(Value::Integer(5))],
497                vec![Some(Value::Integer(10))],
498            ],
499        );
500
501        let filter = FilterExpr::Gt(
502            ExprTerm::Var(Var::new("x")),
503            ExprTerm::Const(Value::Integer(3)),
504        );
505
506        let op = Op::Filter(OpFilter::new(filter, Op::Table(table)));
507
508        let result = engine.execute(op, &ctx).unwrap();
509        let bindings: Vec<_> = result.collect().unwrap();
510
511        assert_eq!(bindings.len(), 2); // 5 and 10
512    }
513
514    #[test]
515    fn test_in_memory_engine_group() {
516        let engine = InMemoryEngine::new();
517        let ctx = QueryContext::new();
518
519        let table = OpTable::new(
520            vec![Var::new("dept"), Var::new("salary")],
521            vec![
522                vec![
523                    Some(Value::String("A".to_string())),
524                    Some(Value::Integer(100)),
525                ],
526                vec![
527                    Some(Value::String("A".to_string())),
528                    Some(Value::Integer(200)),
529                ],
530                vec![
531                    Some(Value::String("B".to_string())),
532                    Some(Value::Integer(150)),
533                ],
534            ],
535        );
536
537        let group = OpGroup::new(Op::Table(table), vec![Var::new("dept")]).with_aggregate(
538            Var::new("total"),
539            Aggregate::Sum(ExprTerm::Var(Var::new("salary"))),
540        );
541
542        let result = engine.execute(Op::Group(group), &ctx).unwrap();
543        let mut bindings: Vec<_> = result.collect().unwrap();
544        bindings.sort_by(|a, b| {
545            let a_val = a
546                .get(&Var::new("dept"))
547                .and_then(|v| match v {
548                    Value::String(s) => Some(s.as_str()),
549                    _ => None,
550                })
551                .unwrap_or("");
552            let b_val = b
553                .get(&Var::new("dept"))
554                .and_then(|v| match v {
555                    Value::String(s) => Some(s.as_str()),
556                    _ => None,
557                })
558                .unwrap_or("");
559            a_val.cmp(b_val)
560        });
561
562        assert_eq!(bindings.len(), 2);
563        let total_a = bindings[0]
564            .get(&Var::new("total"))
565            .cloned()
566            .unwrap_or(Value::Null);
567        let total_b = bindings[1]
568            .get(&Var::new("total"))
569            .cloned()
570            .unwrap_or(Value::Null);
571        assert_eq!(total_a, Value::Integer(300));
572        assert_eq!(total_b, Value::Integer(150));
573    }
574
575    #[test]
576    fn test_in_memory_engine_extend() {
577        let engine = InMemoryEngine::new();
578        let ctx = QueryContext::new();
579
580        let table = OpTable::new(
581            vec![Var::new("x")],
582            vec![vec![Some(Value::Integer(1))], vec![Some(Value::Integer(2))]],
583        );
584
585        let extend = OpExtend::new(
586            Op::Table(table),
587            Var::new("xs"),
588            ExprTerm::Str(Box::new(ExprTerm::Var(Var::new("x")))),
589        );
590
591        let result = engine.execute(Op::Extend(extend), &ctx).unwrap();
592        let bindings: Vec<_> = result.collect().unwrap();
593
594        assert_eq!(bindings.len(), 2);
595        assert_eq!(
596            bindings[0].get(&Var::new("xs")),
597            Some(&Value::String("1".to_string()))
598        );
599        assert_eq!(
600            bindings[1].get(&Var::new("xs")),
601            Some(&Value::String("2".to_string()))
602        );
603    }
604
605    #[test]
606    fn test_in_memory_engine_minus() {
607        let engine = InMemoryEngine::new();
608        let ctx = QueryContext::new();
609
610        let left = OpTable::new(
611            vec![Var::new("x")],
612            vec![
613                vec![Some(Value::Integer(1))],
614                vec![Some(Value::Integer(2))],
615                vec![Some(Value::Integer(3))],
616            ],
617        );
618
619        let right = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(2))]]);
620
621        let minus = OpMinus::new(Op::Table(left), Op::Table(right));
622        let result = engine.execute(Op::Minus(minus), &ctx).unwrap();
623        let mut bindings: Vec<_> = result.collect().unwrap();
624        bindings.sort_by(|a, b| {
625            let a_val = a
626                .get(&Var::new("x"))
627                .and_then(|v| match v {
628                    Value::Integer(i) => Some(*i),
629                    _ => None,
630                })
631                .unwrap_or(0);
632            let b_val = b
633                .get(&Var::new("x"))
634                .and_then(|v| match v {
635                    Value::Integer(i) => Some(*i),
636                    _ => None,
637                })
638                .unwrap_or(0);
639            a_val.cmp(&b_val)
640        });
641
642        let values: Vec<i64> = bindings
643            .iter()
644            .filter_map(|b| b.get(&Var::new("x")))
645            .filter_map(|v| match v {
646                Value::Integer(i) => Some(*i),
647                _ => None,
648            })
649            .collect();
650
651        assert_eq!(values, vec![1, 3]);
652    }
653
654    #[test]
655    fn test_in_memory_engine_minus_shared_vars() {
656        let engine = InMemoryEngine::new();
657        let ctx = QueryContext::new();
658
659        let left = OpTable::new(
660            vec![Var::new("x"), Var::new("y")],
661            vec![
662                vec![Some(Value::Integer(1)), Some(Value::Integer(10))],
663                vec![Some(Value::Integer(2)), Some(Value::Integer(20))],
664            ],
665        );
666
667        let right = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(1))]]);
668
669        let minus = OpMinus::new(Op::Table(left), Op::Table(right));
670        let result = engine.execute(Op::Minus(minus), &ctx).unwrap();
671        let bindings: Vec<_> = result.collect().unwrap();
672
673        assert_eq!(bindings.len(), 1);
674        assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(2)));
675    }
676
677    #[test]
678    fn test_in_memory_engine_extend_conflict() {
679        let engine = InMemoryEngine::new();
680        let ctx = QueryContext::new();
681
682        let table = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(1))]]);
683
684        let extend = OpExtend::new(
685            Op::Table(table),
686            Var::new("x"),
687            ExprTerm::Const(Value::Integer(2)),
688        );
689
690        let result = engine.execute(Op::Extend(extend), &ctx).unwrap();
691        let bindings: Vec<_> = result.collect().unwrap();
692
693        assert!(bindings.is_empty());
694    }
695
696    #[test]
697    fn test_in_memory_engine_extend_unbound_keeps_row() {
698        let engine = InMemoryEngine::new();
699        let ctx = QueryContext::new();
700
701        let table = OpTable::new(vec![Var::new("x")], vec![vec![Some(Value::Integer(1))]]);
702
703        let extend = OpExtend::new(
704            Op::Table(table),
705            Var::new("z"),
706            ExprTerm::Var(Var::new("missing")),
707        );
708
709        let result = engine.execute(Op::Extend(extend), &ctx).unwrap();
710        let bindings: Vec<_> = result.collect().unwrap();
711
712        assert_eq!(bindings.len(), 1);
713        assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(1)));
714        assert_eq!(bindings[0].get(&Var::new("z")), None);
715    }
716
717    #[test]
718    fn test_in_memory_engine_slice() {
719        let engine = InMemoryEngine::new();
720        let ctx = QueryContext::new();
721
722        let table = OpTable::new(
723            vec![Var::new("x")],
724            (1..=10).map(|i| vec![Some(Value::Integer(i))]).collect(),
725        );
726
727        let op = Op::Slice(OpSlice::new(Op::Table(table), 2, Some(3)));
728
729        let result = engine.execute(op, &ctx).unwrap();
730        let bindings: Vec<_> = result.collect().unwrap();
731
732        assert_eq!(bindings.len(), 3);
733        assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(3)));
734    }
735
736    #[test]
737    fn test_in_memory_engine_project() {
738        let engine = InMemoryEngine::new();
739        let ctx = QueryContext::new();
740
741        let table = OpTable::new(
742            vec![Var::new("x"), Var::new("y"), Var::new("z")],
743            vec![vec![
744                Some(Value::Integer(1)),
745                Some(Value::Integer(2)),
746                Some(Value::Integer(3)),
747            ]],
748        );
749
750        let op = Op::Project(OpProject::new(
751            vec![Var::new("x"), Var::new("z")],
752            Op::Table(table),
753        ));
754
755        let result = engine.execute(op, &ctx).unwrap();
756        let bindings: Vec<_> = result.collect().unwrap();
757
758        assert_eq!(bindings.len(), 1);
759        assert!(bindings[0].contains(&Var::new("x")));
760        assert!(!bindings[0].contains(&Var::new("y")));
761        assert!(bindings[0].contains(&Var::new("z")));
762    }
763
764    #[test]
765    fn test_in_memory_engine_distinct() {
766        let engine = InMemoryEngine::new();
767        let ctx = QueryContext::new();
768
769        let table = OpTable::new(
770            vec![Var::new("x")],
771            vec![
772                vec![Some(Value::Integer(1))],
773                vec![Some(Value::Integer(2))],
774                vec![Some(Value::Integer(1))],
775                vec![Some(Value::Integer(3))],
776                vec![Some(Value::Integer(2))],
777            ],
778        );
779
780        let op = Op::Distinct(OpDistinct::new(Op::Table(table)));
781
782        let result = engine.execute(op, &ctx).unwrap();
783        let bindings: Vec<_> = result.collect().unwrap();
784
785        assert_eq!(bindings.len(), 3);
786    }
787
788    #[test]
789    fn test_in_memory_engine_union() {
790        let engine = InMemoryEngine::new();
791        let ctx = QueryContext::new();
792
793        let table1 = OpTable::new(
794            vec![Var::new("x")],
795            vec![vec![Some(Value::Integer(1))], vec![Some(Value::Integer(2))]],
796        );
797
798        let table2 = OpTable::new(
799            vec![Var::new("x")],
800            vec![vec![Some(Value::Integer(3))], vec![Some(Value::Integer(4))]],
801        );
802
803        let op = Op::Union(OpUnion::new(Op::Table(table1), Op::Table(table2)));
804
805        let result = engine.execute(op, &ctx).unwrap();
806        let bindings: Vec<_> = result.collect().unwrap();
807
808        assert_eq!(bindings.len(), 4);
809    }
810
811    #[test]
812    fn test_in_memory_engine_order() {
813        let engine = InMemoryEngine::new();
814        let ctx = QueryContext::new();
815
816        let table = OpTable::new(
817            vec![Var::new("x")],
818            vec![
819                vec![Some(Value::Integer(3))],
820                vec![Some(Value::Integer(1))],
821                vec![Some(Value::Integer(2))],
822            ],
823        );
824
825        let op = Op::Order(OpOrder::new(
826            Op::Table(table),
827            vec![OrderKey::asc(ExprTerm::Var(Var::new("x")))],
828        ));
829
830        let result = engine.execute(op, &ctx).unwrap();
831        let bindings: Vec<_> = result.collect().unwrap();
832
833        assert_eq!(bindings.len(), 3);
834        assert_eq!(bindings[0].get(&Var::new("x")), Some(&Value::Integer(1)));
835        assert_eq!(bindings[1].get(&Var::new("x")), Some(&Value::Integer(2)));
836        assert_eq!(bindings[2].get(&Var::new("x")), Some(&Value::Integer(3)));
837    }
838
839    #[test]
840    fn test_engine_with_stats() {
841        let engine = InMemoryEngine::new();
842        let ctx = QueryContext::new().with_stats();
843
844        let table = OpTable::unit();
845        let result = engine.execute(Op::Table(table), &ctx).unwrap();
846
847        assert!(result.stats.is_some());
848    }
849
850    #[test]
851    fn test_engine_capabilities() {
852        let engine = InMemoryEngine::new();
853        let caps = engine.capabilities();
854
855        assert!(caps.graph_patterns);
856        assert!(caps.subqueries);
857        assert!(!caps.transactions);
858    }
859}