1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TxId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33 EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34 FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35 HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37 MapCollectOperator, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator,
38 NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator, ParameterScanOperator,
39 ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator, ScanOperator,
40 SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SortDirection,
41 SortKey as PhysicalSortKey, SortOperator, UnwindOperator, VariableLengthExpandOperator,
42};
43use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
44use std::collections::HashMap;
45use std::sync::Arc;
46
47use crate::query::planner::common;
48use crate::query::planner::common::expression_to_string;
49use crate::query::planner::{
50 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
51 convert_unary_op, value_to_logical_type,
52};
53use crate::transaction::TransactionManager;
54
55struct RangeBounds<'a> {
57 min: Option<&'a Value>,
58 max: Option<&'a Value>,
59 min_inclusive: bool,
60 max_inclusive: bool,
61}
62
63pub struct Planner {
65 pub(super) store: Arc<dyn GraphStoreMut>,
67 pub(super) tx_manager: Option<Arc<TransactionManager>>,
69 pub(super) tx_id: Option<TxId>,
71 pub(super) viewing_epoch: EpochId,
73 pub(super) anon_edge_counter: std::cell::Cell<u32>,
75 pub(super) factorized_execution: bool,
77 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
80 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
83 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
85 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
87 pub(super) correlated_param_state:
91 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
92 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
95}
96
97impl Planner {
98 #[must_use]
103 pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
104 let epoch = store.current_epoch();
105 Self {
106 store,
107 tx_manager: None,
108 tx_id: None,
109 viewing_epoch: epoch,
110 anon_edge_counter: std::cell::Cell::new(0),
111 factorized_execution: true,
112 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
113 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
114 validator: None,
115 catalog: None,
116 correlated_param_state: std::cell::RefCell::new(None),
117 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
118 }
119 }
120
121 #[must_use]
123 pub fn with_context(
124 store: Arc<dyn GraphStoreMut>,
125 tx_manager: Arc<TransactionManager>,
126 tx_id: Option<TxId>,
127 viewing_epoch: EpochId,
128 ) -> Self {
129 Self {
130 store,
131 tx_manager: Some(tx_manager),
132 tx_id,
133 viewing_epoch,
134 anon_edge_counter: std::cell::Cell::new(0),
135 factorized_execution: true,
136 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
137 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
138 validator: None,
139 catalog: None,
140 correlated_param_state: std::cell::RefCell::new(None),
141 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
142 }
143 }
144
145 #[must_use]
147 pub fn viewing_epoch(&self) -> EpochId {
148 self.viewing_epoch
149 }
150
151 #[must_use]
153 pub fn tx_id(&self) -> Option<TxId> {
154 self.tx_id
155 }
156
157 #[must_use]
159 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
160 self.tx_manager.as_ref()
161 }
162
163 #[must_use]
165 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
166 self.factorized_execution = enabled;
167 self
168 }
169
170 #[must_use]
172 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
173 self.validator = Some(validator);
174 self
175 }
176
177 #[must_use]
179 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
180 self.catalog = Some(catalog);
181 self
182 }
183
184 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
188 match op {
189 LogicalOperator::Expand(expand) => {
190 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
191
192 if is_single_hop {
193 let (inner_count, base) = Self::count_expand_chain(&expand.input);
194 (inner_count + 1, base)
195 } else {
196 (0, op)
197 }
198 }
199 _ => (0, op),
200 }
201 }
202
203 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
207 let mut chain = Vec::new();
208 let mut current = op;
209
210 while let LogicalOperator::Expand(expand) = current {
211 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
212 if !is_single_hop {
213 break;
214 }
215 chain.push(expand);
216 current = &expand.input;
217 }
218
219 chain.reverse();
220 chain
221 }
222
223 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
225 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
226 Ok(PhysicalPlan {
227 operator,
228 columns,
229 adaptive_context: None,
230 })
231 }
232
233 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
235 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
236
237 let mut adaptive_context = AdaptiveContext::new();
238 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
239
240 Ok(PhysicalPlan {
241 operator,
242 columns,
243 adaptive_context: Some(adaptive_context),
244 })
245 }
246
247 fn collect_cardinality_estimates(
249 &self,
250 op: &LogicalOperator,
251 ctx: &mut AdaptiveContext,
252 depth: usize,
253 ) {
254 match op {
255 LogicalOperator::NodeScan(scan) => {
256 let estimate = if let Some(label) = &scan.label {
257 self.store.nodes_by_label(label).len() as f64
258 } else {
259 self.store.node_count() as f64
260 };
261 let id = format!("scan_{}", scan.variable);
262 ctx.set_estimate(&id, estimate);
263
264 if let Some(input) = &scan.input {
265 self.collect_cardinality_estimates(input, ctx, depth + 1);
266 }
267 }
268 LogicalOperator::Filter(filter) => {
269 let input_estimate = self.estimate_cardinality(&filter.input);
270 let estimate = input_estimate * 0.3;
271 let id = format!("filter_{depth}");
272 ctx.set_estimate(&id, estimate);
273
274 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
275 }
276 LogicalOperator::Expand(expand) => {
277 let input_estimate = self.estimate_cardinality(&expand.input);
278 let stats = self.store.statistics();
279 let avg_degree = self.estimate_expand_degree(&stats, expand);
280 let estimate = input_estimate * avg_degree;
281 let id = format!("expand_{}", expand.to_variable);
282 ctx.set_estimate(&id, estimate);
283
284 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
285 }
286 LogicalOperator::Join(join) => {
287 let left_est = self.estimate_cardinality(&join.left);
288 let right_est = self.estimate_cardinality(&join.right);
289 let estimate = (left_est * right_est).sqrt();
290 let id = format!("join_{depth}");
291 ctx.set_estimate(&id, estimate);
292
293 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
294 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
295 }
296 LogicalOperator::Aggregate(agg) => {
297 let input_estimate = self.estimate_cardinality(&agg.input);
298 let estimate = if agg.group_by.is_empty() {
299 1.0
300 } else {
301 (input_estimate * 0.1).max(1.0)
302 };
303 let id = format!("aggregate_{depth}");
304 ctx.set_estimate(&id, estimate);
305
306 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
307 }
308 LogicalOperator::Distinct(distinct) => {
309 let input_estimate = self.estimate_cardinality(&distinct.input);
310 let estimate = (input_estimate * 0.5).max(1.0);
311 let id = format!("distinct_{depth}");
312 ctx.set_estimate(&id, estimate);
313
314 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
315 }
316 LogicalOperator::Return(ret) => {
317 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
318 }
319 LogicalOperator::Limit(limit) => {
320 let input_estimate = self.estimate_cardinality(&limit.input);
321 let estimate = (input_estimate).min(limit.count as f64);
322 let id = format!("limit_{depth}");
323 ctx.set_estimate(&id, estimate);
324
325 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
326 }
327 LogicalOperator::Skip(skip) => {
328 let input_estimate = self.estimate_cardinality(&skip.input);
329 let estimate = (input_estimate - skip.count as f64).max(0.0);
330 let id = format!("skip_{depth}");
331 ctx.set_estimate(&id, estimate);
332
333 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
334 }
335 LogicalOperator::Sort(sort) => {
336 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
337 }
338 LogicalOperator::Union(union) => {
339 let estimate: f64 = union
340 .inputs
341 .iter()
342 .map(|input| self.estimate_cardinality(input))
343 .sum();
344 let id = format!("union_{depth}");
345 ctx.set_estimate(&id, estimate);
346
347 for input in &union.inputs {
348 self.collect_cardinality_estimates(input, ctx, depth + 1);
349 }
350 }
351 _ => {
352 }
354 }
355 }
356
357 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
359 match op {
360 LogicalOperator::NodeScan(scan) => {
361 if let Some(label) = &scan.label {
362 self.store.nodes_by_label(label).len() as f64
363 } else {
364 self.store.node_count() as f64
365 }
366 }
367 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
368 LogicalOperator::Expand(expand) => {
369 let stats = self.store.statistics();
370 let avg_degree = self.estimate_expand_degree(&stats, expand);
371 self.estimate_cardinality(&expand.input) * avg_degree
372 }
373 LogicalOperator::Join(join) => {
374 let left = self.estimate_cardinality(&join.left);
375 let right = self.estimate_cardinality(&join.right);
376 (left * right).sqrt()
377 }
378 LogicalOperator::Aggregate(agg) => {
379 if agg.group_by.is_empty() {
380 1.0
381 } else {
382 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
383 }
384 }
385 LogicalOperator::Distinct(distinct) => {
386 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
387 }
388 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
389 LogicalOperator::Limit(limit) => self
390 .estimate_cardinality(&limit.input)
391 .min(limit.count as f64),
392 LogicalOperator::Skip(skip) => {
393 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
394 }
395 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
396 LogicalOperator::Union(union) => union
397 .inputs
398 .iter()
399 .map(|input| self.estimate_cardinality(input))
400 .sum(),
401 LogicalOperator::Except(except) => {
402 let left = self.estimate_cardinality(&except.left);
403 let right = self.estimate_cardinality(&except.right);
404 (left - right).max(0.0)
405 }
406 LogicalOperator::Intersect(intersect) => {
407 let left = self.estimate_cardinality(&intersect.left);
408 let right = self.estimate_cardinality(&intersect.right);
409 left.min(right)
410 }
411 LogicalOperator::Otherwise(otherwise) => self
412 .estimate_cardinality(&otherwise.left)
413 .max(self.estimate_cardinality(&otherwise.right)),
414 _ => 1000.0,
415 }
416 }
417
418 fn estimate_expand_degree(
420 &self,
421 stats: &grafeo_core::statistics::Statistics,
422 expand: &ExpandOp,
423 ) -> f64 {
424 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
425 if expand.edge_types.len() == 1 {
426 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
427 } else if stats.total_nodes > 0 {
428 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
429 } else {
430 10.0
431 }
432 }
433
434 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
436 match op {
437 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
438 LogicalOperator::Expand(expand) => {
439 if self.factorized_execution {
440 let (chain_len, _base) = Self::count_expand_chain(op);
441 if chain_len >= 2 {
442 return self.plan_expand_chain(op);
443 }
444 }
445 self.plan_expand(expand)
446 }
447 LogicalOperator::Return(ret) => self.plan_return(ret),
448 LogicalOperator::Filter(filter) => self.plan_filter(filter),
449 LogicalOperator::Project(project) => self.plan_project(project),
450 LogicalOperator::Limit(limit) => self.plan_limit(limit),
451 LogicalOperator::Skip(skip) => self.plan_skip(skip),
452 LogicalOperator::Sort(sort) => self.plan_sort(sort),
453 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
454 LogicalOperator::Join(join) => self.plan_join(join),
455 LogicalOperator::Union(union) => self.plan_union(union),
456 LogicalOperator::Except(except) => self.plan_except(except),
457 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
458 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
459 LogicalOperator::Apply(apply) => self.plan_apply(apply),
460 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
461 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
462 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
463 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
464 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
465 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
466 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
467 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
468 LogicalOperator::Merge(merge) => self.plan_merge(merge),
469 LogicalOperator::MergeRelationship(merge_rel) => {
470 self.plan_merge_relationship(merge_rel)
471 }
472 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
473 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
474 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
475 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
476 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
477 #[cfg(feature = "algos")]
478 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
479 #[cfg(not(feature = "algos"))]
480 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
481 "CALL procedures require the 'algos' feature".to_string(),
482 )),
483 LogicalOperator::ParameterScan(param_scan) => {
484 let state = self
485 .correlated_param_state
486 .borrow()
487 .clone()
488 .ok_or_else(|| {
489 Error::Internal(
490 "ParameterScan without correlated Apply context".to_string(),
491 )
492 })?;
493 let columns = param_scan.columns.clone();
494 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
495 Ok((operator, columns))
496 }
497 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
498 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
499 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
500 LogicalOperator::VectorScan(_) => Err(Error::Internal(
501 "VectorScan requires vector-index feature".to_string(),
502 )),
503 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
504 "VectorJoin requires vector-index feature".to_string(),
505 )),
506 _ => Err(Error::Internal(format!(
507 "Unsupported operator: {:?}",
508 std::mem::discriminant(op)
509 ))),
510 }
511 }
512
513 fn plan_horizontal_aggregate(
515 &self,
516 ha: &HorizontalAggregateOp,
517 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
518 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
519
520 let list_col_idx = child_columns
521 .iter()
522 .position(|c| c == &ha.list_column)
523 .ok_or_else(|| {
524 Error::Internal(format!(
525 "HorizontalAggregate list column '{}' not found in {:?}",
526 ha.list_column, child_columns
527 ))
528 })?;
529
530 let entity_kind = match ha.entity_kind {
531 LogicalEntityKind::Edge => EntityKind::Edge,
532 LogicalEntityKind::Node => EntityKind::Node,
533 };
534
535 let function = convert_aggregate_function(ha.function);
536 let input_column_count = child_columns.len();
537
538 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
539 child_op,
540 list_col_idx,
541 entity_kind,
542 function,
543 ha.property.clone(),
544 Arc::clone(&self.store) as Arc<dyn GraphStore>,
545 input_column_count,
546 ));
547
548 let mut columns = child_columns;
549 columns.push(ha.alias.clone());
550 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
552
553 Ok((operator, columns))
554 }
555
556 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
558 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
559 let key_idx = child_columns
560 .iter()
561 .position(|c| c == &mc.key_var)
562 .ok_or_else(|| {
563 Error::Internal(format!(
564 "MapCollect key '{}' not in columns {:?}",
565 mc.key_var, child_columns
566 ))
567 })?;
568 let value_idx = child_columns
569 .iter()
570 .position(|c| c == &mc.value_var)
571 .ok_or_else(|| {
572 Error::Internal(format!(
573 "MapCollect value '{}' not in columns {:?}",
574 mc.value_var, child_columns
575 ))
576 })?;
577 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
578 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
579 Ok((operator, vec![mc.alias.clone()]))
580 }
581}
582
583#[cfg(feature = "algos")]
585struct StaticResultOperator {
586 rows: Vec<Vec<Value>>,
587 column_indices: Vec<usize>,
588 row_index: usize,
589}
590
591#[cfg(feature = "algos")]
592impl Operator for StaticResultOperator {
593 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
594 use grafeo_core::execution::DataChunk;
595
596 if self.row_index >= self.rows.len() {
597 return Ok(None);
598 }
599
600 let remaining = self.rows.len() - self.row_index;
601 let chunk_rows = remaining.min(1024);
602 let col_count = self.column_indices.len();
603
604 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
605 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
606
607 for row_offset in 0..chunk_rows {
608 let row = &self.rows[self.row_index + row_offset];
609 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
610 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
611 if let Some(col) = chunk.column_mut(col_idx) {
612 col.push_value(value);
613 }
614 }
615 }
616 chunk.set_count(chunk_rows);
617
618 self.row_index += chunk_rows;
619 Ok(Some(chunk))
620 }
621
622 fn reset(&mut self) {
623 self.row_index = 0;
624 }
625
626 fn name(&self) -> &'static str {
627 "StaticResult"
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use crate::query::plan::{
635 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
636 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
637 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
638 SkipOp as LogicalSkipOp, SortKey, SortOp,
639 };
640 use grafeo_common::types::Value;
641 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
642 use grafeo_core::graph::GraphStoreMut;
643 use grafeo_core::graph::lpg::LpgStore;
644
645 fn create_test_store() -> Arc<dyn GraphStoreMut> {
646 let store = Arc::new(LpgStore::new().unwrap());
647 store.create_node(&["Person"]);
648 store.create_node(&["Person"]);
649 store.create_node(&["Company"]);
650 store
651 }
652
653 #[test]
656 fn test_plan_simple_scan() {
657 let store = create_test_store();
658 let planner = Planner::new(store);
659
660 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
662 items: vec![ReturnItem {
663 expression: LogicalExpression::Variable("n".to_string()),
664 alias: None,
665 }],
666 distinct: false,
667 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
668 variable: "n".to_string(),
669 label: Some("Person".to_string()),
670 input: None,
671 })),
672 }));
673
674 let physical = planner.plan(&logical).unwrap();
675 assert_eq!(physical.columns(), &["n"]);
676 }
677
678 #[test]
679 fn test_plan_scan_without_label() {
680 let store = create_test_store();
681 let planner = Planner::new(store);
682
683 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
685 items: vec![ReturnItem {
686 expression: LogicalExpression::Variable("n".to_string()),
687 alias: None,
688 }],
689 distinct: false,
690 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
691 variable: "n".to_string(),
692 label: None,
693 input: None,
694 })),
695 }));
696
697 let physical = planner.plan(&logical).unwrap();
698 assert_eq!(physical.columns(), &["n"]);
699 }
700
701 #[test]
702 fn test_plan_return_with_alias() {
703 let store = create_test_store();
704 let planner = Planner::new(store);
705
706 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
708 items: vec![ReturnItem {
709 expression: LogicalExpression::Variable("n".to_string()),
710 alias: Some("person".to_string()),
711 }],
712 distinct: false,
713 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
714 variable: "n".to_string(),
715 label: Some("Person".to_string()),
716 input: None,
717 })),
718 }));
719
720 let physical = planner.plan(&logical).unwrap();
721 assert_eq!(physical.columns(), &["person"]);
722 }
723
724 #[test]
725 fn test_plan_return_property() {
726 let store = create_test_store();
727 let planner = Planner::new(store);
728
729 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
731 items: vec![ReturnItem {
732 expression: LogicalExpression::Property {
733 variable: "n".to_string(),
734 property: "name".to_string(),
735 },
736 alias: None,
737 }],
738 distinct: false,
739 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
740 variable: "n".to_string(),
741 label: Some("Person".to_string()),
742 input: None,
743 })),
744 }));
745
746 let physical = planner.plan(&logical).unwrap();
747 assert_eq!(physical.columns(), &["n.name"]);
748 }
749
750 #[test]
751 fn test_plan_return_literal() {
752 let store = create_test_store();
753 let planner = Planner::new(store);
754
755 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
757 items: vec![ReturnItem {
758 expression: LogicalExpression::Literal(Value::Int64(42)),
759 alias: Some("answer".to_string()),
760 }],
761 distinct: false,
762 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
763 variable: "n".to_string(),
764 label: None,
765 input: None,
766 })),
767 }));
768
769 let physical = planner.plan(&logical).unwrap();
770 assert_eq!(physical.columns(), &["answer"]);
771 }
772
773 #[test]
776 fn test_plan_filter_equality() {
777 let store = create_test_store();
778 let planner = Planner::new(store);
779
780 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
782 items: vec![ReturnItem {
783 expression: LogicalExpression::Variable("n".to_string()),
784 alias: None,
785 }],
786 distinct: false,
787 input: Box::new(LogicalOperator::Filter(FilterOp {
788 predicate: LogicalExpression::Binary {
789 left: Box::new(LogicalExpression::Property {
790 variable: "n".to_string(),
791 property: "age".to_string(),
792 }),
793 op: BinaryOp::Eq,
794 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
795 },
796 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
797 variable: "n".to_string(),
798 label: Some("Person".to_string()),
799 input: None,
800 })),
801 pushdown_hint: None,
802 })),
803 }));
804
805 let physical = planner.plan(&logical).unwrap();
806 assert_eq!(physical.columns(), &["n"]);
807 }
808
809 #[test]
810 fn test_plan_filter_compound_and() {
811 let store = create_test_store();
812 let planner = Planner::new(store);
813
814 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
816 items: vec![ReturnItem {
817 expression: LogicalExpression::Variable("n".to_string()),
818 alias: None,
819 }],
820 distinct: false,
821 input: Box::new(LogicalOperator::Filter(FilterOp {
822 predicate: LogicalExpression::Binary {
823 left: Box::new(LogicalExpression::Binary {
824 left: Box::new(LogicalExpression::Property {
825 variable: "n".to_string(),
826 property: "age".to_string(),
827 }),
828 op: BinaryOp::Gt,
829 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
830 }),
831 op: BinaryOp::And,
832 right: Box::new(LogicalExpression::Binary {
833 left: Box::new(LogicalExpression::Property {
834 variable: "n".to_string(),
835 property: "age".to_string(),
836 }),
837 op: BinaryOp::Lt,
838 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
839 }),
840 },
841 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
842 variable: "n".to_string(),
843 label: None,
844 input: None,
845 })),
846 pushdown_hint: None,
847 })),
848 }));
849
850 let physical = planner.plan(&logical).unwrap();
851 assert_eq!(physical.columns(), &["n"]);
852 }
853
854 #[test]
855 fn test_plan_filter_unary_not() {
856 let store = create_test_store();
857 let planner = Planner::new(store);
858
859 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
861 items: vec![ReturnItem {
862 expression: LogicalExpression::Variable("n".to_string()),
863 alias: None,
864 }],
865 distinct: false,
866 input: Box::new(LogicalOperator::Filter(FilterOp {
867 predicate: LogicalExpression::Unary {
868 op: UnaryOp::Not,
869 operand: Box::new(LogicalExpression::Property {
870 variable: "n".to_string(),
871 property: "active".to_string(),
872 }),
873 },
874 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
875 variable: "n".to_string(),
876 label: None,
877 input: None,
878 })),
879 pushdown_hint: None,
880 })),
881 }));
882
883 let physical = planner.plan(&logical).unwrap();
884 assert_eq!(physical.columns(), &["n"]);
885 }
886
887 #[test]
888 fn test_plan_filter_is_null() {
889 let store = create_test_store();
890 let planner = Planner::new(store);
891
892 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
894 items: vec![ReturnItem {
895 expression: LogicalExpression::Variable("n".to_string()),
896 alias: None,
897 }],
898 distinct: false,
899 input: Box::new(LogicalOperator::Filter(FilterOp {
900 predicate: LogicalExpression::Unary {
901 op: UnaryOp::IsNull,
902 operand: Box::new(LogicalExpression::Property {
903 variable: "n".to_string(),
904 property: "email".to_string(),
905 }),
906 },
907 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
908 variable: "n".to_string(),
909 label: None,
910 input: None,
911 })),
912 pushdown_hint: None,
913 })),
914 }));
915
916 let physical = planner.plan(&logical).unwrap();
917 assert_eq!(physical.columns(), &["n"]);
918 }
919
920 #[test]
921 fn test_plan_filter_function_call() {
922 let store = create_test_store();
923 let planner = Planner::new(store);
924
925 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
927 items: vec![ReturnItem {
928 expression: LogicalExpression::Variable("n".to_string()),
929 alias: None,
930 }],
931 distinct: false,
932 input: Box::new(LogicalOperator::Filter(FilterOp {
933 predicate: LogicalExpression::Binary {
934 left: Box::new(LogicalExpression::FunctionCall {
935 name: "size".to_string(),
936 args: vec![LogicalExpression::Property {
937 variable: "n".to_string(),
938 property: "friends".to_string(),
939 }],
940 distinct: false,
941 }),
942 op: BinaryOp::Gt,
943 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
944 },
945 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
946 variable: "n".to_string(),
947 label: None,
948 input: None,
949 })),
950 pushdown_hint: None,
951 })),
952 }));
953
954 let physical = planner.plan(&logical).unwrap();
955 assert_eq!(physical.columns(), &["n"]);
956 }
957
958 #[test]
961 fn test_plan_expand_outgoing() {
962 let store = create_test_store();
963 let planner = Planner::new(store);
964
965 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
967 items: vec![
968 ReturnItem {
969 expression: LogicalExpression::Variable("a".to_string()),
970 alias: None,
971 },
972 ReturnItem {
973 expression: LogicalExpression::Variable("b".to_string()),
974 alias: None,
975 },
976 ],
977 distinct: false,
978 input: Box::new(LogicalOperator::Expand(ExpandOp {
979 from_variable: "a".to_string(),
980 to_variable: "b".to_string(),
981 edge_variable: None,
982 direction: ExpandDirection::Outgoing,
983 edge_types: vec!["KNOWS".to_string()],
984 min_hops: 1,
985 max_hops: Some(1),
986 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
987 variable: "a".to_string(),
988 label: Some("Person".to_string()),
989 input: None,
990 })),
991 path_alias: None,
992 path_mode: PathMode::Walk,
993 })),
994 }));
995
996 let physical = planner.plan(&logical).unwrap();
997 assert!(physical.columns().contains(&"a".to_string()));
999 assert!(physical.columns().contains(&"b".to_string()));
1000 }
1001
1002 #[test]
1003 fn test_plan_expand_with_edge_variable() {
1004 let store = create_test_store();
1005 let planner = Planner::new(store);
1006
1007 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1009 items: vec![
1010 ReturnItem {
1011 expression: LogicalExpression::Variable("a".to_string()),
1012 alias: None,
1013 },
1014 ReturnItem {
1015 expression: LogicalExpression::Variable("r".to_string()),
1016 alias: None,
1017 },
1018 ReturnItem {
1019 expression: LogicalExpression::Variable("b".to_string()),
1020 alias: None,
1021 },
1022 ],
1023 distinct: false,
1024 input: Box::new(LogicalOperator::Expand(ExpandOp {
1025 from_variable: "a".to_string(),
1026 to_variable: "b".to_string(),
1027 edge_variable: Some("r".to_string()),
1028 direction: ExpandDirection::Outgoing,
1029 edge_types: vec!["KNOWS".to_string()],
1030 min_hops: 1,
1031 max_hops: Some(1),
1032 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1033 variable: "a".to_string(),
1034 label: None,
1035 input: None,
1036 })),
1037 path_alias: None,
1038 path_mode: PathMode::Walk,
1039 })),
1040 }));
1041
1042 let physical = planner.plan(&logical).unwrap();
1043 assert!(physical.columns().contains(&"a".to_string()));
1044 assert!(physical.columns().contains(&"r".to_string()));
1045 assert!(physical.columns().contains(&"b".to_string()));
1046 }
1047
1048 #[test]
1051 fn test_plan_limit() {
1052 let store = create_test_store();
1053 let planner = Planner::new(store);
1054
1055 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1057 items: vec![ReturnItem {
1058 expression: LogicalExpression::Variable("n".to_string()),
1059 alias: None,
1060 }],
1061 distinct: false,
1062 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1063 count: 10,
1064 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1065 variable: "n".to_string(),
1066 label: None,
1067 input: None,
1068 })),
1069 })),
1070 }));
1071
1072 let physical = planner.plan(&logical).unwrap();
1073 assert_eq!(physical.columns(), &["n"]);
1074 }
1075
1076 #[test]
1077 fn test_plan_skip() {
1078 let store = create_test_store();
1079 let planner = Planner::new(store);
1080
1081 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1083 items: vec![ReturnItem {
1084 expression: LogicalExpression::Variable("n".to_string()),
1085 alias: None,
1086 }],
1087 distinct: false,
1088 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1089 count: 5,
1090 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1091 variable: "n".to_string(),
1092 label: None,
1093 input: None,
1094 })),
1095 })),
1096 }));
1097
1098 let physical = planner.plan(&logical).unwrap();
1099 assert_eq!(physical.columns(), &["n"]);
1100 }
1101
1102 #[test]
1103 fn test_plan_sort() {
1104 let store = create_test_store();
1105 let planner = Planner::new(store);
1106
1107 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1109 items: vec![ReturnItem {
1110 expression: LogicalExpression::Variable("n".to_string()),
1111 alias: None,
1112 }],
1113 distinct: false,
1114 input: Box::new(LogicalOperator::Sort(SortOp {
1115 keys: vec![SortKey {
1116 expression: LogicalExpression::Variable("n".to_string()),
1117 order: SortOrder::Ascending,
1118 nulls: None,
1119 }],
1120 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1121 variable: "n".to_string(),
1122 label: None,
1123 input: None,
1124 })),
1125 })),
1126 }));
1127
1128 let physical = planner.plan(&logical).unwrap();
1129 assert_eq!(physical.columns(), &["n"]);
1130 }
1131
1132 #[test]
1133 fn test_plan_sort_descending() {
1134 let store = create_test_store();
1135 let planner = Planner::new(store);
1136
1137 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1139 items: vec![ReturnItem {
1140 expression: LogicalExpression::Variable("n".to_string()),
1141 alias: None,
1142 }],
1143 distinct: false,
1144 input: Box::new(LogicalOperator::Sort(SortOp {
1145 keys: vec![SortKey {
1146 expression: LogicalExpression::Variable("n".to_string()),
1147 order: SortOrder::Descending,
1148 nulls: None,
1149 }],
1150 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1151 variable: "n".to_string(),
1152 label: None,
1153 input: None,
1154 })),
1155 })),
1156 }));
1157
1158 let physical = planner.plan(&logical).unwrap();
1159 assert_eq!(physical.columns(), &["n"]);
1160 }
1161
1162 #[test]
1163 fn test_plan_distinct() {
1164 let store = create_test_store();
1165 let planner = Planner::new(store);
1166
1167 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1169 items: vec![ReturnItem {
1170 expression: LogicalExpression::Variable("n".to_string()),
1171 alias: None,
1172 }],
1173 distinct: false,
1174 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1175 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1176 variable: "n".to_string(),
1177 label: None,
1178 input: None,
1179 })),
1180 columns: None,
1181 })),
1182 }));
1183
1184 let physical = planner.plan(&logical).unwrap();
1185 assert_eq!(physical.columns(), &["n"]);
1186 }
1187
1188 #[test]
1189 fn test_plan_distinct_with_columns() {
1190 let store = create_test_store();
1191 let planner = Planner::new(store);
1192
1193 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1195 items: vec![ReturnItem {
1196 expression: LogicalExpression::Variable("n".to_string()),
1197 alias: None,
1198 }],
1199 distinct: false,
1200 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1201 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1202 variable: "n".to_string(),
1203 label: None,
1204 input: None,
1205 })),
1206 columns: Some(vec!["n".to_string()]),
1207 })),
1208 }));
1209
1210 let physical = planner.plan(&logical).unwrap();
1211 assert_eq!(physical.columns(), &["n"]);
1212 }
1213
1214 #[test]
1215 fn test_plan_distinct_with_nonexistent_columns() {
1216 let store = create_test_store();
1217 let planner = Planner::new(store);
1218
1219 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1222 items: vec![ReturnItem {
1223 expression: LogicalExpression::Variable("n".to_string()),
1224 alias: None,
1225 }],
1226 distinct: false,
1227 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1228 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1229 variable: "n".to_string(),
1230 label: None,
1231 input: None,
1232 })),
1233 columns: Some(vec!["nonexistent".to_string()]),
1234 })),
1235 }));
1236
1237 let physical = planner.plan(&logical).unwrap();
1238 assert_eq!(physical.columns(), &["n"]);
1239 }
1240
1241 #[test]
1244 fn test_plan_aggregate_count() {
1245 let store = create_test_store();
1246 let planner = Planner::new(store);
1247
1248 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1250 items: vec![ReturnItem {
1251 expression: LogicalExpression::Variable("cnt".to_string()),
1252 alias: None,
1253 }],
1254 distinct: false,
1255 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1256 group_by: vec![],
1257 aggregates: vec![LogicalAggregateExpr {
1258 function: LogicalAggregateFunction::Count,
1259 expression: Some(LogicalExpression::Variable("n".to_string())),
1260 expression2: None,
1261 distinct: false,
1262 alias: Some("cnt".to_string()),
1263 percentile: None,
1264 }],
1265 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1266 variable: "n".to_string(),
1267 label: None,
1268 input: None,
1269 })),
1270 having: None,
1271 })),
1272 }));
1273
1274 let physical = planner.plan(&logical).unwrap();
1275 assert!(physical.columns().contains(&"cnt".to_string()));
1276 }
1277
1278 #[test]
1279 fn test_plan_aggregate_with_group_by() {
1280 let store = create_test_store();
1281 let planner = Planner::new(store);
1282
1283 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1285 group_by: vec![LogicalExpression::Property {
1286 variable: "n".to_string(),
1287 property: "city".to_string(),
1288 }],
1289 aggregates: vec![LogicalAggregateExpr {
1290 function: LogicalAggregateFunction::Count,
1291 expression: Some(LogicalExpression::Variable("n".to_string())),
1292 expression2: None,
1293 distinct: false,
1294 alias: Some("cnt".to_string()),
1295 percentile: None,
1296 }],
1297 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1298 variable: "n".to_string(),
1299 label: Some("Person".to_string()),
1300 input: None,
1301 })),
1302 having: None,
1303 }));
1304
1305 let physical = planner.plan(&logical).unwrap();
1306 assert_eq!(physical.columns().len(), 2);
1307 }
1308
1309 #[test]
1310 fn test_plan_aggregate_sum() {
1311 let store = create_test_store();
1312 let planner = Planner::new(store);
1313
1314 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1316 group_by: vec![],
1317 aggregates: vec![LogicalAggregateExpr {
1318 function: LogicalAggregateFunction::Sum,
1319 expression: Some(LogicalExpression::Property {
1320 variable: "n".to_string(),
1321 property: "value".to_string(),
1322 }),
1323 expression2: None,
1324 distinct: false,
1325 alias: Some("total".to_string()),
1326 percentile: None,
1327 }],
1328 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1329 variable: "n".to_string(),
1330 label: None,
1331 input: None,
1332 })),
1333 having: None,
1334 }));
1335
1336 let physical = planner.plan(&logical).unwrap();
1337 assert!(physical.columns().contains(&"total".to_string()));
1338 }
1339
1340 #[test]
1341 fn test_plan_aggregate_avg() {
1342 let store = create_test_store();
1343 let planner = Planner::new(store);
1344
1345 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1347 group_by: vec![],
1348 aggregates: vec![LogicalAggregateExpr {
1349 function: LogicalAggregateFunction::Avg,
1350 expression: Some(LogicalExpression::Property {
1351 variable: "n".to_string(),
1352 property: "score".to_string(),
1353 }),
1354 expression2: None,
1355 distinct: false,
1356 alias: Some("average".to_string()),
1357 percentile: None,
1358 }],
1359 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1360 variable: "n".to_string(),
1361 label: None,
1362 input: None,
1363 })),
1364 having: None,
1365 }));
1366
1367 let physical = planner.plan(&logical).unwrap();
1368 assert!(physical.columns().contains(&"average".to_string()));
1369 }
1370
1371 #[test]
1372 fn test_plan_aggregate_min_max() {
1373 let store = create_test_store();
1374 let planner = Planner::new(store);
1375
1376 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1378 group_by: vec![],
1379 aggregates: vec![
1380 LogicalAggregateExpr {
1381 function: LogicalAggregateFunction::Min,
1382 expression: Some(LogicalExpression::Property {
1383 variable: "n".to_string(),
1384 property: "age".to_string(),
1385 }),
1386 expression2: None,
1387 distinct: false,
1388 alias: Some("youngest".to_string()),
1389 percentile: None,
1390 },
1391 LogicalAggregateExpr {
1392 function: LogicalAggregateFunction::Max,
1393 expression: Some(LogicalExpression::Property {
1394 variable: "n".to_string(),
1395 property: "age".to_string(),
1396 }),
1397 expression2: None,
1398 distinct: false,
1399 alias: Some("oldest".to_string()),
1400 percentile: None,
1401 },
1402 ],
1403 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1404 variable: "n".to_string(),
1405 label: None,
1406 input: None,
1407 })),
1408 having: None,
1409 }));
1410
1411 let physical = planner.plan(&logical).unwrap();
1412 assert!(physical.columns().contains(&"youngest".to_string()));
1413 assert!(physical.columns().contains(&"oldest".to_string()));
1414 }
1415
1416 #[test]
1419 fn test_plan_inner_join() {
1420 let store = create_test_store();
1421 let planner = Planner::new(store);
1422
1423 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1425 items: vec![
1426 ReturnItem {
1427 expression: LogicalExpression::Variable("a".to_string()),
1428 alias: None,
1429 },
1430 ReturnItem {
1431 expression: LogicalExpression::Variable("b".to_string()),
1432 alias: None,
1433 },
1434 ],
1435 distinct: false,
1436 input: Box::new(LogicalOperator::Join(JoinOp {
1437 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1438 variable: "a".to_string(),
1439 label: Some("Person".to_string()),
1440 input: None,
1441 })),
1442 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1443 variable: "b".to_string(),
1444 label: Some("Company".to_string()),
1445 input: None,
1446 })),
1447 join_type: JoinType::Inner,
1448 conditions: vec![JoinCondition {
1449 left: LogicalExpression::Variable("a".to_string()),
1450 right: LogicalExpression::Variable("b".to_string()),
1451 }],
1452 })),
1453 }));
1454
1455 let physical = planner.plan(&logical).unwrap();
1456 assert!(physical.columns().contains(&"a".to_string()));
1457 assert!(physical.columns().contains(&"b".to_string()));
1458 }
1459
1460 #[test]
1461 fn test_plan_cross_join() {
1462 let store = create_test_store();
1463 let planner = Planner::new(store);
1464
1465 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1467 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1468 variable: "a".to_string(),
1469 label: None,
1470 input: None,
1471 })),
1472 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1473 variable: "b".to_string(),
1474 label: None,
1475 input: None,
1476 })),
1477 join_type: JoinType::Cross,
1478 conditions: vec![],
1479 }));
1480
1481 let physical = planner.plan(&logical).unwrap();
1482 assert_eq!(physical.columns().len(), 2);
1483 }
1484
1485 #[test]
1486 fn test_plan_left_join() {
1487 let store = create_test_store();
1488 let planner = Planner::new(store);
1489
1490 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1491 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1492 variable: "a".to_string(),
1493 label: None,
1494 input: None,
1495 })),
1496 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1497 variable: "b".to_string(),
1498 label: None,
1499 input: None,
1500 })),
1501 join_type: JoinType::Left,
1502 conditions: vec![],
1503 }));
1504
1505 let physical = planner.plan(&logical).unwrap();
1506 assert_eq!(physical.columns().len(), 2);
1507 }
1508
1509 #[test]
1512 fn test_plan_create_node() {
1513 let store = create_test_store();
1514 let planner = Planner::new(store);
1515
1516 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1518 variable: "n".to_string(),
1519 labels: vec!["Person".to_string()],
1520 properties: vec![(
1521 "name".to_string(),
1522 LogicalExpression::Literal(Value::String("Alix".into())),
1523 )],
1524 input: None,
1525 }));
1526
1527 let physical = planner.plan(&logical).unwrap();
1528 assert!(physical.columns().contains(&"n".to_string()));
1529 }
1530
1531 #[test]
1532 fn test_plan_create_edge() {
1533 let store = create_test_store();
1534 let planner = Planner::new(store);
1535
1536 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1538 variable: Some("r".to_string()),
1539 from_variable: "a".to_string(),
1540 to_variable: "b".to_string(),
1541 edge_type: "KNOWS".to_string(),
1542 properties: vec![],
1543 input: Box::new(LogicalOperator::Join(JoinOp {
1544 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1545 variable: "a".to_string(),
1546 label: None,
1547 input: None,
1548 })),
1549 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1550 variable: "b".to_string(),
1551 label: None,
1552 input: None,
1553 })),
1554 join_type: JoinType::Cross,
1555 conditions: vec![],
1556 })),
1557 }));
1558
1559 let physical = planner.plan(&logical).unwrap();
1560 assert!(physical.columns().contains(&"r".to_string()));
1561 }
1562
1563 #[test]
1564 fn test_plan_delete_node() {
1565 let store = create_test_store();
1566 let planner = Planner::new(store);
1567
1568 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1570 variable: "n".to_string(),
1571 detach: false,
1572 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1573 variable: "n".to_string(),
1574 label: None,
1575 input: None,
1576 })),
1577 }));
1578
1579 let physical = planner.plan(&logical).unwrap();
1580 assert!(physical.columns().contains(&"deleted_count".to_string()));
1581 }
1582
1583 #[test]
1586 fn test_plan_empty_errors() {
1587 let store = create_test_store();
1588 let planner = Planner::new(store);
1589
1590 let logical = LogicalPlan::new(LogicalOperator::Empty);
1591 let result = planner.plan(&logical);
1592 assert!(result.is_err());
1593 }
1594
1595 #[test]
1596 fn test_plan_missing_variable_in_return() {
1597 let store = create_test_store();
1598 let planner = Planner::new(store);
1599
1600 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1602 items: vec![ReturnItem {
1603 expression: LogicalExpression::Variable("missing".to_string()),
1604 alias: None,
1605 }],
1606 distinct: false,
1607 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1608 variable: "n".to_string(),
1609 label: None,
1610 input: None,
1611 })),
1612 }));
1613
1614 let result = planner.plan(&logical);
1615 assert!(result.is_err());
1616 }
1617
1618 #[test]
1621 fn test_convert_binary_ops() {
1622 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1623 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1624 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1625 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1626 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1627 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1628 assert!(convert_binary_op(BinaryOp::And).is_ok());
1629 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1630 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1631 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1632 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1633 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1634 }
1635
1636 #[test]
1637 fn test_convert_unary_ops() {
1638 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1639 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1640 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1641 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1642 }
1643
1644 #[test]
1645 fn test_convert_aggregate_functions() {
1646 assert!(matches!(
1647 convert_aggregate_function(LogicalAggregateFunction::Count),
1648 PhysicalAggregateFunction::Count
1649 ));
1650 assert!(matches!(
1651 convert_aggregate_function(LogicalAggregateFunction::Sum),
1652 PhysicalAggregateFunction::Sum
1653 ));
1654 assert!(matches!(
1655 convert_aggregate_function(LogicalAggregateFunction::Avg),
1656 PhysicalAggregateFunction::Avg
1657 ));
1658 assert!(matches!(
1659 convert_aggregate_function(LogicalAggregateFunction::Min),
1660 PhysicalAggregateFunction::Min
1661 ));
1662 assert!(matches!(
1663 convert_aggregate_function(LogicalAggregateFunction::Max),
1664 PhysicalAggregateFunction::Max
1665 ));
1666 }
1667
1668 #[test]
1669 fn test_planner_accessors() {
1670 let store = create_test_store();
1671 let planner = Planner::new(Arc::clone(&store));
1672
1673 assert!(planner.tx_id().is_none());
1674 assert!(planner.tx_manager().is_none());
1675 let _ = planner.viewing_epoch(); }
1677
1678 #[test]
1679 fn test_physical_plan_accessors() {
1680 let store = create_test_store();
1681 let planner = Planner::new(store);
1682
1683 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1684 variable: "n".to_string(),
1685 label: None,
1686 input: None,
1687 }));
1688
1689 let physical = planner.plan(&logical).unwrap();
1690 assert_eq!(physical.columns(), &["n"]);
1691
1692 let _ = physical.into_operator();
1694 }
1695
1696 #[test]
1699 fn test_plan_adaptive_with_scan() {
1700 let store = create_test_store();
1701 let planner = Planner::new(store);
1702
1703 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1705 items: vec![ReturnItem {
1706 expression: LogicalExpression::Variable("n".to_string()),
1707 alias: None,
1708 }],
1709 distinct: false,
1710 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1711 variable: "n".to_string(),
1712 label: Some("Person".to_string()),
1713 input: None,
1714 })),
1715 }));
1716
1717 let physical = planner.plan_adaptive(&logical).unwrap();
1718 assert_eq!(physical.columns(), &["n"]);
1719 assert!(physical.adaptive_context.is_some());
1721 }
1722
1723 #[test]
1724 fn test_plan_adaptive_with_filter() {
1725 let store = create_test_store();
1726 let planner = Planner::new(store);
1727
1728 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1730 items: vec![ReturnItem {
1731 expression: LogicalExpression::Variable("n".to_string()),
1732 alias: None,
1733 }],
1734 distinct: false,
1735 input: Box::new(LogicalOperator::Filter(FilterOp {
1736 predicate: LogicalExpression::Binary {
1737 left: Box::new(LogicalExpression::Property {
1738 variable: "n".to_string(),
1739 property: "age".to_string(),
1740 }),
1741 op: BinaryOp::Gt,
1742 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1743 },
1744 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1745 variable: "n".to_string(),
1746 label: None,
1747 input: None,
1748 })),
1749 pushdown_hint: None,
1750 })),
1751 }));
1752
1753 let physical = planner.plan_adaptive(&logical).unwrap();
1754 assert!(physical.adaptive_context.is_some());
1755 }
1756
1757 #[test]
1758 fn test_plan_adaptive_with_expand() {
1759 let store = create_test_store();
1760 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1761
1762 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1764 items: vec![
1765 ReturnItem {
1766 expression: LogicalExpression::Variable("a".to_string()),
1767 alias: None,
1768 },
1769 ReturnItem {
1770 expression: LogicalExpression::Variable("b".to_string()),
1771 alias: None,
1772 },
1773 ],
1774 distinct: false,
1775 input: Box::new(LogicalOperator::Expand(ExpandOp {
1776 from_variable: "a".to_string(),
1777 to_variable: "b".to_string(),
1778 edge_variable: None,
1779 direction: ExpandDirection::Outgoing,
1780 edge_types: vec!["KNOWS".to_string()],
1781 min_hops: 1,
1782 max_hops: Some(1),
1783 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1784 variable: "a".to_string(),
1785 label: None,
1786 input: None,
1787 })),
1788 path_alias: None,
1789 path_mode: PathMode::Walk,
1790 })),
1791 }));
1792
1793 let physical = planner.plan_adaptive(&logical).unwrap();
1794 assert!(physical.adaptive_context.is_some());
1795 }
1796
1797 #[test]
1798 fn test_plan_adaptive_with_join() {
1799 let store = create_test_store();
1800 let planner = Planner::new(store);
1801
1802 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1803 items: vec![
1804 ReturnItem {
1805 expression: LogicalExpression::Variable("a".to_string()),
1806 alias: None,
1807 },
1808 ReturnItem {
1809 expression: LogicalExpression::Variable("b".to_string()),
1810 alias: None,
1811 },
1812 ],
1813 distinct: false,
1814 input: Box::new(LogicalOperator::Join(JoinOp {
1815 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816 variable: "a".to_string(),
1817 label: None,
1818 input: None,
1819 })),
1820 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1821 variable: "b".to_string(),
1822 label: None,
1823 input: None,
1824 })),
1825 join_type: JoinType::Cross,
1826 conditions: vec![],
1827 })),
1828 }));
1829
1830 let physical = planner.plan_adaptive(&logical).unwrap();
1831 assert!(physical.adaptive_context.is_some());
1832 }
1833
1834 #[test]
1835 fn test_plan_adaptive_with_aggregate() {
1836 let store = create_test_store();
1837 let planner = Planner::new(store);
1838
1839 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1840 group_by: vec![],
1841 aggregates: vec![LogicalAggregateExpr {
1842 function: LogicalAggregateFunction::Count,
1843 expression: Some(LogicalExpression::Variable("n".to_string())),
1844 expression2: None,
1845 distinct: false,
1846 alias: Some("cnt".to_string()),
1847 percentile: None,
1848 }],
1849 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1850 variable: "n".to_string(),
1851 label: None,
1852 input: None,
1853 })),
1854 having: None,
1855 }));
1856
1857 let physical = planner.plan_adaptive(&logical).unwrap();
1858 assert!(physical.adaptive_context.is_some());
1859 }
1860
1861 #[test]
1862 fn test_plan_adaptive_with_distinct() {
1863 let store = create_test_store();
1864 let planner = Planner::new(store);
1865
1866 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1867 items: vec![ReturnItem {
1868 expression: LogicalExpression::Variable("n".to_string()),
1869 alias: None,
1870 }],
1871 distinct: false,
1872 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1873 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1874 variable: "n".to_string(),
1875 label: None,
1876 input: None,
1877 })),
1878 columns: None,
1879 })),
1880 }));
1881
1882 let physical = planner.plan_adaptive(&logical).unwrap();
1883 assert!(physical.adaptive_context.is_some());
1884 }
1885
1886 #[test]
1887 fn test_plan_adaptive_with_limit() {
1888 let store = create_test_store();
1889 let planner = Planner::new(store);
1890
1891 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1892 items: vec![ReturnItem {
1893 expression: LogicalExpression::Variable("n".to_string()),
1894 alias: None,
1895 }],
1896 distinct: false,
1897 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1898 count: 10,
1899 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1900 variable: "n".to_string(),
1901 label: None,
1902 input: None,
1903 })),
1904 })),
1905 }));
1906
1907 let physical = planner.plan_adaptive(&logical).unwrap();
1908 assert!(physical.adaptive_context.is_some());
1909 }
1910
1911 #[test]
1912 fn test_plan_adaptive_with_skip() {
1913 let store = create_test_store();
1914 let planner = Planner::new(store);
1915
1916 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1917 items: vec![ReturnItem {
1918 expression: LogicalExpression::Variable("n".to_string()),
1919 alias: None,
1920 }],
1921 distinct: false,
1922 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1923 count: 5,
1924 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1925 variable: "n".to_string(),
1926 label: None,
1927 input: None,
1928 })),
1929 })),
1930 }));
1931
1932 let physical = planner.plan_adaptive(&logical).unwrap();
1933 assert!(physical.adaptive_context.is_some());
1934 }
1935
1936 #[test]
1937 fn test_plan_adaptive_with_sort() {
1938 let store = create_test_store();
1939 let planner = Planner::new(store);
1940
1941 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1942 items: vec![ReturnItem {
1943 expression: LogicalExpression::Variable("n".to_string()),
1944 alias: None,
1945 }],
1946 distinct: false,
1947 input: Box::new(LogicalOperator::Sort(SortOp {
1948 keys: vec![SortKey {
1949 expression: LogicalExpression::Variable("n".to_string()),
1950 order: SortOrder::Ascending,
1951 nulls: None,
1952 }],
1953 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1954 variable: "n".to_string(),
1955 label: None,
1956 input: None,
1957 })),
1958 })),
1959 }));
1960
1961 let physical = planner.plan_adaptive(&logical).unwrap();
1962 assert!(physical.adaptive_context.is_some());
1963 }
1964
1965 #[test]
1966 fn test_plan_adaptive_with_union() {
1967 let store = create_test_store();
1968 let planner = Planner::new(store);
1969
1970 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1971 items: vec![ReturnItem {
1972 expression: LogicalExpression::Variable("n".to_string()),
1973 alias: None,
1974 }],
1975 distinct: false,
1976 input: Box::new(LogicalOperator::Union(UnionOp {
1977 inputs: vec![
1978 LogicalOperator::NodeScan(NodeScanOp {
1979 variable: "n".to_string(),
1980 label: Some("Person".to_string()),
1981 input: None,
1982 }),
1983 LogicalOperator::NodeScan(NodeScanOp {
1984 variable: "n".to_string(),
1985 label: Some("Company".to_string()),
1986 input: None,
1987 }),
1988 ],
1989 })),
1990 }));
1991
1992 let physical = planner.plan_adaptive(&logical).unwrap();
1993 assert!(physical.adaptive_context.is_some());
1994 }
1995
1996 #[test]
1999 fn test_plan_expand_variable_length() {
2000 let store = create_test_store();
2001 let planner = Planner::new(store);
2002
2003 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2005 items: vec![
2006 ReturnItem {
2007 expression: LogicalExpression::Variable("a".to_string()),
2008 alias: None,
2009 },
2010 ReturnItem {
2011 expression: LogicalExpression::Variable("b".to_string()),
2012 alias: None,
2013 },
2014 ],
2015 distinct: false,
2016 input: Box::new(LogicalOperator::Expand(ExpandOp {
2017 from_variable: "a".to_string(),
2018 to_variable: "b".to_string(),
2019 edge_variable: None,
2020 direction: ExpandDirection::Outgoing,
2021 edge_types: vec!["KNOWS".to_string()],
2022 min_hops: 1,
2023 max_hops: Some(3),
2024 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2025 variable: "a".to_string(),
2026 label: None,
2027 input: None,
2028 })),
2029 path_alias: None,
2030 path_mode: PathMode::Walk,
2031 })),
2032 }));
2033
2034 let physical = planner.plan(&logical).unwrap();
2035 assert!(physical.columns().contains(&"a".to_string()));
2036 assert!(physical.columns().contains(&"b".to_string()));
2037 }
2038
2039 #[test]
2040 fn test_plan_expand_with_path_alias() {
2041 let store = create_test_store();
2042 let planner = Planner::new(store);
2043
2044 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2046 items: vec![
2047 ReturnItem {
2048 expression: LogicalExpression::Variable("a".to_string()),
2049 alias: None,
2050 },
2051 ReturnItem {
2052 expression: LogicalExpression::Variable("b".to_string()),
2053 alias: None,
2054 },
2055 ],
2056 distinct: false,
2057 input: Box::new(LogicalOperator::Expand(ExpandOp {
2058 from_variable: "a".to_string(),
2059 to_variable: "b".to_string(),
2060 edge_variable: None,
2061 direction: ExpandDirection::Outgoing,
2062 edge_types: vec!["KNOWS".to_string()],
2063 min_hops: 1,
2064 max_hops: Some(3),
2065 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2066 variable: "a".to_string(),
2067 label: None,
2068 input: None,
2069 })),
2070 path_alias: Some("p".to_string()),
2071 path_mode: PathMode::Walk,
2072 })),
2073 }));
2074
2075 let physical = planner.plan(&logical).unwrap();
2076 assert!(physical.columns().contains(&"a".to_string()));
2078 assert!(physical.columns().contains(&"b".to_string()));
2079 }
2080
2081 #[test]
2082 fn test_plan_expand_incoming() {
2083 let store = create_test_store();
2084 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2085
2086 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2088 items: vec![
2089 ReturnItem {
2090 expression: LogicalExpression::Variable("a".to_string()),
2091 alias: None,
2092 },
2093 ReturnItem {
2094 expression: LogicalExpression::Variable("b".to_string()),
2095 alias: None,
2096 },
2097 ],
2098 distinct: false,
2099 input: Box::new(LogicalOperator::Expand(ExpandOp {
2100 from_variable: "a".to_string(),
2101 to_variable: "b".to_string(),
2102 edge_variable: None,
2103 direction: ExpandDirection::Incoming,
2104 edge_types: vec!["KNOWS".to_string()],
2105 min_hops: 1,
2106 max_hops: Some(1),
2107 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2108 variable: "a".to_string(),
2109 label: None,
2110 input: None,
2111 })),
2112 path_alias: None,
2113 path_mode: PathMode::Walk,
2114 })),
2115 }));
2116
2117 let physical = planner.plan(&logical).unwrap();
2118 assert!(physical.columns().contains(&"a".to_string()));
2119 assert!(physical.columns().contains(&"b".to_string()));
2120 }
2121
2122 #[test]
2123 fn test_plan_expand_both_directions() {
2124 let store = create_test_store();
2125 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2126
2127 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2129 items: vec![
2130 ReturnItem {
2131 expression: LogicalExpression::Variable("a".to_string()),
2132 alias: None,
2133 },
2134 ReturnItem {
2135 expression: LogicalExpression::Variable("b".to_string()),
2136 alias: None,
2137 },
2138 ],
2139 distinct: false,
2140 input: Box::new(LogicalOperator::Expand(ExpandOp {
2141 from_variable: "a".to_string(),
2142 to_variable: "b".to_string(),
2143 edge_variable: None,
2144 direction: ExpandDirection::Both,
2145 edge_types: vec!["KNOWS".to_string()],
2146 min_hops: 1,
2147 max_hops: Some(1),
2148 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2149 variable: "a".to_string(),
2150 label: None,
2151 input: None,
2152 })),
2153 path_alias: None,
2154 path_mode: PathMode::Walk,
2155 })),
2156 }));
2157
2158 let physical = planner.plan(&logical).unwrap();
2159 assert!(physical.columns().contains(&"a".to_string()));
2160 assert!(physical.columns().contains(&"b".to_string()));
2161 }
2162
2163 #[test]
2166 fn test_planner_with_context() {
2167 use crate::transaction::TransactionManager;
2168
2169 let store = create_test_store();
2170 let tx_manager = Arc::new(TransactionManager::new());
2171 let tx_id = tx_manager.begin();
2172 let epoch = tx_manager.current_epoch();
2173
2174 let planner = Planner::with_context(
2175 Arc::clone(&store),
2176 Arc::clone(&tx_manager),
2177 Some(tx_id),
2178 epoch,
2179 );
2180
2181 assert_eq!(planner.tx_id(), Some(tx_id));
2182 assert!(planner.tx_manager().is_some());
2183 assert_eq!(planner.viewing_epoch(), epoch);
2184 }
2185
2186 #[test]
2187 fn test_planner_with_factorized_execution_disabled() {
2188 let store = create_test_store();
2189 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2190
2191 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2193 items: vec![
2194 ReturnItem {
2195 expression: LogicalExpression::Variable("a".to_string()),
2196 alias: None,
2197 },
2198 ReturnItem {
2199 expression: LogicalExpression::Variable("c".to_string()),
2200 alias: None,
2201 },
2202 ],
2203 distinct: false,
2204 input: Box::new(LogicalOperator::Expand(ExpandOp {
2205 from_variable: "b".to_string(),
2206 to_variable: "c".to_string(),
2207 edge_variable: None,
2208 direction: ExpandDirection::Outgoing,
2209 edge_types: vec![],
2210 min_hops: 1,
2211 max_hops: Some(1),
2212 input: Box::new(LogicalOperator::Expand(ExpandOp {
2213 from_variable: "a".to_string(),
2214 to_variable: "b".to_string(),
2215 edge_variable: None,
2216 direction: ExpandDirection::Outgoing,
2217 edge_types: vec![],
2218 min_hops: 1,
2219 max_hops: Some(1),
2220 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2221 variable: "a".to_string(),
2222 label: None,
2223 input: None,
2224 })),
2225 path_alias: None,
2226 path_mode: PathMode::Walk,
2227 })),
2228 path_alias: None,
2229 path_mode: PathMode::Walk,
2230 })),
2231 }));
2232
2233 let physical = planner.plan(&logical).unwrap();
2234 assert!(physical.columns().contains(&"a".to_string()));
2235 assert!(physical.columns().contains(&"c".to_string()));
2236 }
2237
2238 #[test]
2241 fn test_plan_sort_by_property() {
2242 let store = create_test_store();
2243 let planner = Planner::new(store);
2244
2245 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2247 items: vec![ReturnItem {
2248 expression: LogicalExpression::Variable("n".to_string()),
2249 alias: None,
2250 }],
2251 distinct: false,
2252 input: Box::new(LogicalOperator::Sort(SortOp {
2253 keys: vec![SortKey {
2254 expression: LogicalExpression::Property {
2255 variable: "n".to_string(),
2256 property: "name".to_string(),
2257 },
2258 order: SortOrder::Ascending,
2259 nulls: None,
2260 }],
2261 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2262 variable: "n".to_string(),
2263 label: None,
2264 input: None,
2265 })),
2266 })),
2267 }));
2268
2269 let physical = planner.plan(&logical).unwrap();
2270 assert!(physical.columns().contains(&"n".to_string()));
2272 }
2273
2274 #[test]
2277 fn test_plan_scan_with_input() {
2278 let store = create_test_store();
2279 let planner = Planner::new(store);
2280
2281 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2283 items: vec![
2284 ReturnItem {
2285 expression: LogicalExpression::Variable("a".to_string()),
2286 alias: None,
2287 },
2288 ReturnItem {
2289 expression: LogicalExpression::Variable("b".to_string()),
2290 alias: None,
2291 },
2292 ],
2293 distinct: false,
2294 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2295 variable: "b".to_string(),
2296 label: Some("Company".to_string()),
2297 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2298 variable: "a".to_string(),
2299 label: Some("Person".to_string()),
2300 input: None,
2301 }))),
2302 })),
2303 }));
2304
2305 let physical = planner.plan(&logical).unwrap();
2306 assert!(physical.columns().contains(&"a".to_string()));
2307 assert!(physical.columns().contains(&"b".to_string()));
2308 }
2309}