1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TransactionId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33 EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34 FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35 HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
38 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
39 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
40 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
41 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnwindOperator,
42 VariableLengthExpandOperator,
43};
44use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
45use std::collections::HashMap;
46use std::sync::Arc;
47
48use crate::query::planner::common;
49use crate::query::planner::common::expression_to_string;
50use crate::query::planner::{
51 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
52 convert_unary_op, value_to_logical_type,
53};
54use crate::transaction::TransactionManager;
55
56struct RangeBounds<'a> {
58 min: Option<&'a Value>,
59 max: Option<&'a Value>,
60 min_inclusive: bool,
61 max_inclusive: bool,
62}
63
64pub struct Planner {
66 pub(super) store: Arc<dyn GraphStoreMut>,
68 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
70 pub(super) transaction_id: Option<TransactionId>,
72 pub(super) viewing_epoch: EpochId,
74 pub(super) anon_edge_counter: std::cell::Cell<u32>,
76 pub(super) factorized_execution: bool,
78 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
81 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
86 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
88 pub(super) correlated_param_state:
92 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
93 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
96 profiling: std::cell::Cell<bool>,
98 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
100 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
102 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
104}
105
106impl Planner {
107 #[must_use]
112 pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
113 let epoch = store.current_epoch();
114 Self {
115 store,
116 transaction_manager: None,
117 transaction_id: None,
118 viewing_epoch: epoch,
119 anon_edge_counter: std::cell::Cell::new(0),
120 factorized_execution: true,
121 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
122 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
123 validator: None,
124 catalog: None,
125 correlated_param_state: std::cell::RefCell::new(None),
126 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
127 profiling: std::cell::Cell::new(false),
128 profile_entries: std::cell::RefCell::new(Vec::new()),
129 write_tracker: None,
130 session_context: grafeo_core::execution::operators::SessionContext::default(),
131 }
132 }
133
134 #[must_use]
136 pub fn with_context(
137 store: Arc<dyn GraphStoreMut>,
138 transaction_manager: Arc<TransactionManager>,
139 transaction_id: Option<TransactionId>,
140 viewing_epoch: EpochId,
141 ) -> Self {
142 use crate::transaction::TransactionWriteTracker;
143
144 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
146 if transaction_id.is_some() {
147 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
148 &transaction_manager,
149 ))))
150 } else {
151 None
152 };
153
154 Self {
155 store,
156 transaction_manager: Some(transaction_manager),
157 transaction_id,
158 viewing_epoch,
159 anon_edge_counter: std::cell::Cell::new(0),
160 factorized_execution: true,
161 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
162 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
163 validator: None,
164 catalog: None,
165 correlated_param_state: std::cell::RefCell::new(None),
166 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
167 profiling: std::cell::Cell::new(false),
168 profile_entries: std::cell::RefCell::new(Vec::new()),
169 write_tracker,
170 session_context: grafeo_core::execution::operators::SessionContext::default(),
171 }
172 }
173
174 #[must_use]
176 pub fn viewing_epoch(&self) -> EpochId {
177 self.viewing_epoch
178 }
179
180 #[must_use]
182 pub fn transaction_id(&self) -> Option<TransactionId> {
183 self.transaction_id
184 }
185
186 #[must_use]
188 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
189 self.transaction_manager.as_ref()
190 }
191
192 #[must_use]
194 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
195 self.factorized_execution = enabled;
196 self
197 }
198
199 #[must_use]
201 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
202 self.validator = Some(validator);
203 self
204 }
205
206 #[must_use]
208 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
209 self.catalog = Some(catalog);
210 self
211 }
212
213 #[must_use]
215 pub fn with_session_context(
216 mut self,
217 context: grafeo_core::execution::operators::SessionContext,
218 ) -> Self {
219 self.session_context = context;
220 self
221 }
222
223 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
227 match op {
228 LogicalOperator::Expand(expand) => {
229 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
230
231 if is_single_hop {
232 let (inner_count, base) = Self::count_expand_chain(&expand.input);
233 (inner_count + 1, base)
234 } else {
235 (0, op)
236 }
237 }
238 _ => (0, op),
239 }
240 }
241
242 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
246 let mut chain = Vec::new();
247 let mut current = op;
248
249 while let LogicalOperator::Expand(expand) = current {
250 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
251 if !is_single_hop {
252 break;
253 }
254 chain.push(expand);
255 current = &expand.input;
256 }
257
258 chain.reverse();
259 chain
260 }
261
262 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
264 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
265 Ok(PhysicalPlan {
266 operator,
267 columns,
268 adaptive_context: None,
269 })
270 }
271
272 pub fn plan_profiled(
278 &self,
279 logical_plan: &LogicalPlan,
280 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
281 self.profiling.set(true);
282 self.profile_entries.borrow_mut().clear();
283
284 let result = self.plan_operator(&logical_plan.root);
285
286 self.profiling.set(false);
287 let (operator, columns) = result?;
288 let entries = self.profile_entries.borrow_mut().drain(..).collect();
289
290 Ok((
291 PhysicalPlan {
292 operator,
293 columns,
294 adaptive_context: None,
295 },
296 entries,
297 ))
298 }
299
300 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
302 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
303
304 let mut adaptive_context = AdaptiveContext::new();
305 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
306
307 Ok(PhysicalPlan {
308 operator,
309 columns,
310 adaptive_context: Some(adaptive_context),
311 })
312 }
313
314 fn collect_cardinality_estimates(
316 &self,
317 op: &LogicalOperator,
318 ctx: &mut AdaptiveContext,
319 depth: usize,
320 ) {
321 match op {
322 LogicalOperator::NodeScan(scan) => {
323 let estimate = if let Some(label) = &scan.label {
324 self.store.nodes_by_label(label).len() as f64
325 } else {
326 self.store.node_count() as f64
327 };
328 let id = format!("scan_{}", scan.variable);
329 ctx.set_estimate(&id, estimate);
330
331 if let Some(input) = &scan.input {
332 self.collect_cardinality_estimates(input, ctx, depth + 1);
333 }
334 }
335 LogicalOperator::Filter(filter) => {
336 let input_estimate = self.estimate_cardinality(&filter.input);
337 let estimate = input_estimate * 0.3;
338 let id = format!("filter_{depth}");
339 ctx.set_estimate(&id, estimate);
340
341 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
342 }
343 LogicalOperator::Expand(expand) => {
344 let input_estimate = self.estimate_cardinality(&expand.input);
345 let stats = self.store.statistics();
346 let avg_degree = self.estimate_expand_degree(&stats, expand);
347 let estimate = input_estimate * avg_degree;
348 let id = format!("expand_{}", expand.to_variable);
349 ctx.set_estimate(&id, estimate);
350
351 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
352 }
353 LogicalOperator::Join(join) => {
354 let left_est = self.estimate_cardinality(&join.left);
355 let right_est = self.estimate_cardinality(&join.right);
356 let estimate = (left_est * right_est).sqrt();
357 let id = format!("join_{depth}");
358 ctx.set_estimate(&id, estimate);
359
360 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
361 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
362 }
363 LogicalOperator::Aggregate(agg) => {
364 let input_estimate = self.estimate_cardinality(&agg.input);
365 let estimate = if agg.group_by.is_empty() {
366 1.0
367 } else {
368 (input_estimate * 0.1).max(1.0)
369 };
370 let id = format!("aggregate_{depth}");
371 ctx.set_estimate(&id, estimate);
372
373 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
374 }
375 LogicalOperator::Distinct(distinct) => {
376 let input_estimate = self.estimate_cardinality(&distinct.input);
377 let estimate = (input_estimate * 0.5).max(1.0);
378 let id = format!("distinct_{depth}");
379 ctx.set_estimate(&id, estimate);
380
381 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
382 }
383 LogicalOperator::Return(ret) => {
384 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
385 }
386 LogicalOperator::Limit(limit) => {
387 let input_estimate = self.estimate_cardinality(&limit.input);
388 let estimate = (input_estimate).min(limit.count.estimate());
389 let id = format!("limit_{depth}");
390 ctx.set_estimate(&id, estimate);
391
392 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
393 }
394 LogicalOperator::Skip(skip) => {
395 let input_estimate = self.estimate_cardinality(&skip.input);
396 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
397 let id = format!("skip_{depth}");
398 ctx.set_estimate(&id, estimate);
399
400 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
401 }
402 LogicalOperator::Sort(sort) => {
403 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
404 }
405 LogicalOperator::Union(union) => {
406 let estimate: f64 = union
407 .inputs
408 .iter()
409 .map(|input| self.estimate_cardinality(input))
410 .sum();
411 let id = format!("union_{depth}");
412 ctx.set_estimate(&id, estimate);
413
414 for input in &union.inputs {
415 self.collect_cardinality_estimates(input, ctx, depth + 1);
416 }
417 }
418 _ => {
419 }
421 }
422 }
423
424 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
426 match op {
427 LogicalOperator::NodeScan(scan) => {
428 if let Some(label) = &scan.label {
429 self.store.nodes_by_label(label).len() as f64
430 } else {
431 self.store.node_count() as f64
432 }
433 }
434 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
435 LogicalOperator::Expand(expand) => {
436 let stats = self.store.statistics();
437 let avg_degree = self.estimate_expand_degree(&stats, expand);
438 self.estimate_cardinality(&expand.input) * avg_degree
439 }
440 LogicalOperator::Join(join) => {
441 let left = self.estimate_cardinality(&join.left);
442 let right = self.estimate_cardinality(&join.right);
443 (left * right).sqrt()
444 }
445 LogicalOperator::Aggregate(agg) => {
446 if agg.group_by.is_empty() {
447 1.0
448 } else {
449 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
450 }
451 }
452 LogicalOperator::Distinct(distinct) => {
453 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
454 }
455 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
456 LogicalOperator::Limit(limit) => self
457 .estimate_cardinality(&limit.input)
458 .min(limit.count.estimate()),
459 LogicalOperator::Skip(skip) => {
460 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
461 }
462 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
463 LogicalOperator::Union(union) => union
464 .inputs
465 .iter()
466 .map(|input| self.estimate_cardinality(input))
467 .sum(),
468 LogicalOperator::Except(except) => {
469 let left = self.estimate_cardinality(&except.left);
470 let right = self.estimate_cardinality(&except.right);
471 (left - right).max(0.0)
472 }
473 LogicalOperator::Intersect(intersect) => {
474 let left = self.estimate_cardinality(&intersect.left);
475 let right = self.estimate_cardinality(&intersect.right);
476 left.min(right)
477 }
478 LogicalOperator::Otherwise(otherwise) => self
479 .estimate_cardinality(&otherwise.left)
480 .max(self.estimate_cardinality(&otherwise.right)),
481 _ => 1000.0,
482 }
483 }
484
485 fn estimate_expand_degree(
487 &self,
488 stats: &grafeo_core::statistics::Statistics,
489 expand: &ExpandOp,
490 ) -> f64 {
491 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
492 if expand.edge_types.len() == 1 {
493 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
494 } else if stats.total_nodes > 0 {
495 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
496 } else {
497 10.0
498 }
499 }
500
501 fn maybe_profile(
504 &self,
505 result: Result<(Box<dyn Operator>, Vec<String>)>,
506 op: &LogicalOperator,
507 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
508 if self.profiling.get() {
509 let (physical, columns) = result?;
510 let (entry, stats) =
511 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
512 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
513 self.profile_entries.borrow_mut().push(entry);
514 Ok((Box::new(profiled), columns))
515 } else {
516 result
517 }
518 }
519
520 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
522 let result = match op {
523 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
524 LogicalOperator::Expand(expand) => {
525 if self.factorized_execution {
526 let (chain_len, _base) = Self::count_expand_chain(op);
527 if chain_len >= 2 {
528 return self.maybe_profile(self.plan_expand_chain(op), op);
529 }
530 }
531 self.plan_expand(expand)
532 }
533 LogicalOperator::Return(ret) => self.plan_return(ret),
534 LogicalOperator::Filter(filter) => self.plan_filter(filter),
535 LogicalOperator::Project(project) => self.plan_project(project),
536 LogicalOperator::Limit(limit) => self.plan_limit(limit),
537 LogicalOperator::Skip(skip) => self.plan_skip(skip),
538 LogicalOperator::Sort(sort) => self.plan_sort(sort),
539 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
540 LogicalOperator::Join(join) => self.plan_join(join),
541 LogicalOperator::Union(union) => self.plan_union(union),
542 LogicalOperator::Except(except) => self.plan_except(except),
543 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
544 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
545 LogicalOperator::Apply(apply) => self.plan_apply(apply),
546 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
547 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
548 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
549 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
550 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
551 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
552 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
553 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
554 LogicalOperator::Merge(merge) => self.plan_merge(merge),
555 LogicalOperator::MergeRelationship(merge_rel) => {
556 self.plan_merge_relationship(merge_rel)
557 }
558 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
559 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
560 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
561 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
562 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
563 #[cfg(feature = "algos")]
564 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
565 #[cfg(not(feature = "algos"))]
566 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
567 "CALL procedures require the 'algos' feature".to_string(),
568 )),
569 LogicalOperator::ParameterScan(_param_scan) => {
570 let state = self
571 .correlated_param_state
572 .borrow()
573 .clone()
574 .ok_or_else(|| {
575 Error::Internal(
576 "ParameterScan without correlated Apply context".to_string(),
577 )
578 })?;
579 let columns = state.columns.clone();
582 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
583 Ok((operator, columns))
584 }
585 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
586 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
587 LogicalOperator::LoadData(load) => {
588 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
589 load.path.clone(),
590 load.format,
591 load.with_headers,
592 load.field_terminator,
593 load.variable.clone(),
594 ));
595 Ok((operator, vec![load.variable.clone()]))
596 }
597 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
598 LogicalOperator::VectorScan(_) => Err(Error::Internal(
599 "VectorScan requires vector-index feature".to_string(),
600 )),
601 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
602 "VectorJoin requires vector-index feature".to_string(),
603 )),
604 _ => Err(Error::Internal(format!(
605 "Unsupported operator: {:?}",
606 std::mem::discriminant(op)
607 ))),
608 };
609 self.maybe_profile(result, op)
610 }
611
612 fn plan_horizontal_aggregate(
614 &self,
615 ha: &HorizontalAggregateOp,
616 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
617 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
618
619 let list_col_idx = child_columns
620 .iter()
621 .position(|c| c == &ha.list_column)
622 .ok_or_else(|| {
623 Error::Internal(format!(
624 "HorizontalAggregate list column '{}' not found in {:?}",
625 ha.list_column, child_columns
626 ))
627 })?;
628
629 let entity_kind = match ha.entity_kind {
630 LogicalEntityKind::Edge => EntityKind::Edge,
631 LogicalEntityKind::Node => EntityKind::Node,
632 };
633
634 let function = convert_aggregate_function(ha.function);
635 let input_column_count = child_columns.len();
636
637 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
638 child_op,
639 list_col_idx,
640 entity_kind,
641 function,
642 ha.property.clone(),
643 Arc::clone(&self.store) as Arc<dyn GraphStore>,
644 input_column_count,
645 ));
646
647 let mut columns = child_columns;
648 columns.push(ha.alias.clone());
649 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
651
652 Ok((operator, columns))
653 }
654
655 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
657 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
658 let key_idx = child_columns
659 .iter()
660 .position(|c| c == &mc.key_var)
661 .ok_or_else(|| {
662 Error::Internal(format!(
663 "MapCollect key '{}' not in columns {:?}",
664 mc.key_var, child_columns
665 ))
666 })?;
667 let value_idx = child_columns
668 .iter()
669 .position(|c| c == &mc.value_var)
670 .ok_or_else(|| {
671 Error::Internal(format!(
672 "MapCollect value '{}' not in columns {:?}",
673 mc.value_var, child_columns
674 ))
675 })?;
676 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
677 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
678 Ok((operator, vec![mc.alias.clone()]))
679 }
680}
681
682#[cfg(feature = "algos")]
684struct StaticResultOperator {
685 rows: Vec<Vec<Value>>,
686 column_indices: Vec<usize>,
687 row_index: usize,
688}
689
690#[cfg(feature = "algos")]
691impl Operator for StaticResultOperator {
692 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
693 use grafeo_core::execution::DataChunk;
694
695 if self.row_index >= self.rows.len() {
696 return Ok(None);
697 }
698
699 let remaining = self.rows.len() - self.row_index;
700 let chunk_rows = remaining.min(1024);
701 let col_count = self.column_indices.len();
702
703 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
704 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
705
706 for row_offset in 0..chunk_rows {
707 let row = &self.rows[self.row_index + row_offset];
708 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
709 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
710 if let Some(col) = chunk.column_mut(col_idx) {
711 col.push_value(value);
712 }
713 }
714 }
715 chunk.set_count(chunk_rows);
716
717 self.row_index += chunk_rows;
718 Ok(Some(chunk))
719 }
720
721 fn reset(&mut self) {
722 self.row_index = 0;
723 }
724
725 fn name(&self) -> &'static str {
726 "StaticResult"
727 }
728}
729
730#[cfg(test)]
731mod tests {
732 use super::*;
733 use crate::query::plan::{
734 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
735 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
736 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
737 SkipOp as LogicalSkipOp, SortKey, SortOp,
738 };
739 use grafeo_common::types::Value;
740 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
741 use grafeo_core::graph::GraphStoreMut;
742 use grafeo_core::graph::lpg::LpgStore;
743
744 fn create_test_store() -> Arc<dyn GraphStoreMut> {
745 let store = Arc::new(LpgStore::new().unwrap());
746 store.create_node(&["Person"]);
747 store.create_node(&["Person"]);
748 store.create_node(&["Company"]);
749 store
750 }
751
752 #[test]
755 fn test_plan_simple_scan() {
756 let store = create_test_store();
757 let planner = Planner::new(store);
758
759 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
761 items: vec![ReturnItem {
762 expression: LogicalExpression::Variable("n".to_string()),
763 alias: None,
764 }],
765 distinct: false,
766 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
767 variable: "n".to_string(),
768 label: Some("Person".to_string()),
769 input: None,
770 })),
771 }));
772
773 let physical = planner.plan(&logical).unwrap();
774 assert_eq!(physical.columns(), &["n"]);
775 }
776
777 #[test]
778 fn test_plan_scan_without_label() {
779 let store = create_test_store();
780 let planner = Planner::new(store);
781
782 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
784 items: vec![ReturnItem {
785 expression: LogicalExpression::Variable("n".to_string()),
786 alias: None,
787 }],
788 distinct: false,
789 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
790 variable: "n".to_string(),
791 label: None,
792 input: None,
793 })),
794 }));
795
796 let physical = planner.plan(&logical).unwrap();
797 assert_eq!(physical.columns(), &["n"]);
798 }
799
800 #[test]
801 fn test_plan_return_with_alias() {
802 let store = create_test_store();
803 let planner = Planner::new(store);
804
805 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
807 items: vec![ReturnItem {
808 expression: LogicalExpression::Variable("n".to_string()),
809 alias: Some("person".to_string()),
810 }],
811 distinct: false,
812 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
813 variable: "n".to_string(),
814 label: Some("Person".to_string()),
815 input: None,
816 })),
817 }));
818
819 let physical = planner.plan(&logical).unwrap();
820 assert_eq!(physical.columns(), &["person"]);
821 }
822
823 #[test]
824 fn test_plan_return_property() {
825 let store = create_test_store();
826 let planner = Planner::new(store);
827
828 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
830 items: vec![ReturnItem {
831 expression: LogicalExpression::Property {
832 variable: "n".to_string(),
833 property: "name".to_string(),
834 },
835 alias: None,
836 }],
837 distinct: false,
838 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
839 variable: "n".to_string(),
840 label: Some("Person".to_string()),
841 input: None,
842 })),
843 }));
844
845 let physical = planner.plan(&logical).unwrap();
846 assert_eq!(physical.columns(), &["n.name"]);
847 }
848
849 #[test]
850 fn test_plan_return_literal() {
851 let store = create_test_store();
852 let planner = Planner::new(store);
853
854 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
856 items: vec![ReturnItem {
857 expression: LogicalExpression::Literal(Value::Int64(42)),
858 alias: Some("answer".to_string()),
859 }],
860 distinct: false,
861 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
862 variable: "n".to_string(),
863 label: None,
864 input: None,
865 })),
866 }));
867
868 let physical = planner.plan(&logical).unwrap();
869 assert_eq!(physical.columns(), &["answer"]);
870 }
871
872 #[test]
875 fn test_plan_filter_equality() {
876 let store = create_test_store();
877 let planner = Planner::new(store);
878
879 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
881 items: vec![ReturnItem {
882 expression: LogicalExpression::Variable("n".to_string()),
883 alias: None,
884 }],
885 distinct: false,
886 input: Box::new(LogicalOperator::Filter(FilterOp {
887 predicate: LogicalExpression::Binary {
888 left: Box::new(LogicalExpression::Property {
889 variable: "n".to_string(),
890 property: "age".to_string(),
891 }),
892 op: BinaryOp::Eq,
893 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
894 },
895 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
896 variable: "n".to_string(),
897 label: Some("Person".to_string()),
898 input: None,
899 })),
900 pushdown_hint: None,
901 })),
902 }));
903
904 let physical = planner.plan(&logical).unwrap();
905 assert_eq!(physical.columns(), &["n"]);
906 }
907
908 #[test]
909 fn test_plan_filter_compound_and() {
910 let store = create_test_store();
911 let planner = Planner::new(store);
912
913 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
915 items: vec![ReturnItem {
916 expression: LogicalExpression::Variable("n".to_string()),
917 alias: None,
918 }],
919 distinct: false,
920 input: Box::new(LogicalOperator::Filter(FilterOp {
921 predicate: LogicalExpression::Binary {
922 left: Box::new(LogicalExpression::Binary {
923 left: Box::new(LogicalExpression::Property {
924 variable: "n".to_string(),
925 property: "age".to_string(),
926 }),
927 op: BinaryOp::Gt,
928 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
929 }),
930 op: BinaryOp::And,
931 right: Box::new(LogicalExpression::Binary {
932 left: Box::new(LogicalExpression::Property {
933 variable: "n".to_string(),
934 property: "age".to_string(),
935 }),
936 op: BinaryOp::Lt,
937 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
938 }),
939 },
940 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
941 variable: "n".to_string(),
942 label: None,
943 input: None,
944 })),
945 pushdown_hint: None,
946 })),
947 }));
948
949 let physical = planner.plan(&logical).unwrap();
950 assert_eq!(physical.columns(), &["n"]);
951 }
952
953 #[test]
954 fn test_plan_filter_unary_not() {
955 let store = create_test_store();
956 let planner = Planner::new(store);
957
958 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
960 items: vec![ReturnItem {
961 expression: LogicalExpression::Variable("n".to_string()),
962 alias: None,
963 }],
964 distinct: false,
965 input: Box::new(LogicalOperator::Filter(FilterOp {
966 predicate: LogicalExpression::Unary {
967 op: UnaryOp::Not,
968 operand: Box::new(LogicalExpression::Property {
969 variable: "n".to_string(),
970 property: "active".to_string(),
971 }),
972 },
973 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
974 variable: "n".to_string(),
975 label: None,
976 input: None,
977 })),
978 pushdown_hint: None,
979 })),
980 }));
981
982 let physical = planner.plan(&logical).unwrap();
983 assert_eq!(physical.columns(), &["n"]);
984 }
985
986 #[test]
987 fn test_plan_filter_is_null() {
988 let store = create_test_store();
989 let planner = Planner::new(store);
990
991 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
993 items: vec![ReturnItem {
994 expression: LogicalExpression::Variable("n".to_string()),
995 alias: None,
996 }],
997 distinct: false,
998 input: Box::new(LogicalOperator::Filter(FilterOp {
999 predicate: LogicalExpression::Unary {
1000 op: UnaryOp::IsNull,
1001 operand: Box::new(LogicalExpression::Property {
1002 variable: "n".to_string(),
1003 property: "email".to_string(),
1004 }),
1005 },
1006 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1007 variable: "n".to_string(),
1008 label: None,
1009 input: None,
1010 })),
1011 pushdown_hint: None,
1012 })),
1013 }));
1014
1015 let physical = planner.plan(&logical).unwrap();
1016 assert_eq!(physical.columns(), &["n"]);
1017 }
1018
1019 #[test]
1020 fn test_plan_filter_function_call() {
1021 let store = create_test_store();
1022 let planner = Planner::new(store);
1023
1024 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1026 items: vec![ReturnItem {
1027 expression: LogicalExpression::Variable("n".to_string()),
1028 alias: None,
1029 }],
1030 distinct: false,
1031 input: Box::new(LogicalOperator::Filter(FilterOp {
1032 predicate: LogicalExpression::Binary {
1033 left: Box::new(LogicalExpression::FunctionCall {
1034 name: "size".to_string(),
1035 args: vec![LogicalExpression::Property {
1036 variable: "n".to_string(),
1037 property: "friends".to_string(),
1038 }],
1039 distinct: false,
1040 }),
1041 op: BinaryOp::Gt,
1042 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1043 },
1044 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1045 variable: "n".to_string(),
1046 label: None,
1047 input: None,
1048 })),
1049 pushdown_hint: None,
1050 })),
1051 }));
1052
1053 let physical = planner.plan(&logical).unwrap();
1054 assert_eq!(physical.columns(), &["n"]);
1055 }
1056
1057 #[test]
1060 fn test_plan_expand_outgoing() {
1061 let store = create_test_store();
1062 let planner = Planner::new(store);
1063
1064 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1066 items: vec![
1067 ReturnItem {
1068 expression: LogicalExpression::Variable("a".to_string()),
1069 alias: None,
1070 },
1071 ReturnItem {
1072 expression: LogicalExpression::Variable("b".to_string()),
1073 alias: None,
1074 },
1075 ],
1076 distinct: false,
1077 input: Box::new(LogicalOperator::Expand(ExpandOp {
1078 from_variable: "a".to_string(),
1079 to_variable: "b".to_string(),
1080 edge_variable: None,
1081 direction: ExpandDirection::Outgoing,
1082 edge_types: vec!["KNOWS".to_string()],
1083 min_hops: 1,
1084 max_hops: Some(1),
1085 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1086 variable: "a".to_string(),
1087 label: Some("Person".to_string()),
1088 input: None,
1089 })),
1090 path_alias: None,
1091 path_mode: PathMode::Walk,
1092 })),
1093 }));
1094
1095 let physical = planner.plan(&logical).unwrap();
1096 assert!(physical.columns().contains(&"a".to_string()));
1098 assert!(physical.columns().contains(&"b".to_string()));
1099 }
1100
1101 #[test]
1102 fn test_plan_expand_with_edge_variable() {
1103 let store = create_test_store();
1104 let planner = Planner::new(store);
1105
1106 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1108 items: vec![
1109 ReturnItem {
1110 expression: LogicalExpression::Variable("a".to_string()),
1111 alias: None,
1112 },
1113 ReturnItem {
1114 expression: LogicalExpression::Variable("r".to_string()),
1115 alias: None,
1116 },
1117 ReturnItem {
1118 expression: LogicalExpression::Variable("b".to_string()),
1119 alias: None,
1120 },
1121 ],
1122 distinct: false,
1123 input: Box::new(LogicalOperator::Expand(ExpandOp {
1124 from_variable: "a".to_string(),
1125 to_variable: "b".to_string(),
1126 edge_variable: Some("r".to_string()),
1127 direction: ExpandDirection::Outgoing,
1128 edge_types: vec!["KNOWS".to_string()],
1129 min_hops: 1,
1130 max_hops: Some(1),
1131 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1132 variable: "a".to_string(),
1133 label: None,
1134 input: None,
1135 })),
1136 path_alias: None,
1137 path_mode: PathMode::Walk,
1138 })),
1139 }));
1140
1141 let physical = planner.plan(&logical).unwrap();
1142 assert!(physical.columns().contains(&"a".to_string()));
1143 assert!(physical.columns().contains(&"r".to_string()));
1144 assert!(physical.columns().contains(&"b".to_string()));
1145 }
1146
1147 #[test]
1150 fn test_plan_limit() {
1151 let store = create_test_store();
1152 let planner = Planner::new(store);
1153
1154 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1156 items: vec![ReturnItem {
1157 expression: LogicalExpression::Variable("n".to_string()),
1158 alias: None,
1159 }],
1160 distinct: false,
1161 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1162 count: 10.into(),
1163 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1164 variable: "n".to_string(),
1165 label: None,
1166 input: None,
1167 })),
1168 })),
1169 }));
1170
1171 let physical = planner.plan(&logical).unwrap();
1172 assert_eq!(physical.columns(), &["n"]);
1173 }
1174
1175 #[test]
1176 fn test_plan_skip() {
1177 let store = create_test_store();
1178 let planner = Planner::new(store);
1179
1180 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1182 items: vec![ReturnItem {
1183 expression: LogicalExpression::Variable("n".to_string()),
1184 alias: None,
1185 }],
1186 distinct: false,
1187 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1188 count: 5.into(),
1189 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1190 variable: "n".to_string(),
1191 label: None,
1192 input: None,
1193 })),
1194 })),
1195 }));
1196
1197 let physical = planner.plan(&logical).unwrap();
1198 assert_eq!(physical.columns(), &["n"]);
1199 }
1200
1201 #[test]
1202 fn test_plan_sort() {
1203 let store = create_test_store();
1204 let planner = Planner::new(store);
1205
1206 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1208 items: vec![ReturnItem {
1209 expression: LogicalExpression::Variable("n".to_string()),
1210 alias: None,
1211 }],
1212 distinct: false,
1213 input: Box::new(LogicalOperator::Sort(SortOp {
1214 keys: vec![SortKey {
1215 expression: LogicalExpression::Variable("n".to_string()),
1216 order: SortOrder::Ascending,
1217 nulls: None,
1218 }],
1219 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1220 variable: "n".to_string(),
1221 label: None,
1222 input: None,
1223 })),
1224 })),
1225 }));
1226
1227 let physical = planner.plan(&logical).unwrap();
1228 assert_eq!(physical.columns(), &["n"]);
1229 }
1230
1231 #[test]
1232 fn test_plan_sort_descending() {
1233 let store = create_test_store();
1234 let planner = Planner::new(store);
1235
1236 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1238 items: vec![ReturnItem {
1239 expression: LogicalExpression::Variable("n".to_string()),
1240 alias: None,
1241 }],
1242 distinct: false,
1243 input: Box::new(LogicalOperator::Sort(SortOp {
1244 keys: vec![SortKey {
1245 expression: LogicalExpression::Variable("n".to_string()),
1246 order: SortOrder::Descending,
1247 nulls: None,
1248 }],
1249 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1250 variable: "n".to_string(),
1251 label: None,
1252 input: None,
1253 })),
1254 })),
1255 }));
1256
1257 let physical = planner.plan(&logical).unwrap();
1258 assert_eq!(physical.columns(), &["n"]);
1259 }
1260
1261 #[test]
1262 fn test_plan_distinct() {
1263 let store = create_test_store();
1264 let planner = Planner::new(store);
1265
1266 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1268 items: vec![ReturnItem {
1269 expression: LogicalExpression::Variable("n".to_string()),
1270 alias: None,
1271 }],
1272 distinct: false,
1273 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1274 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1275 variable: "n".to_string(),
1276 label: None,
1277 input: None,
1278 })),
1279 columns: None,
1280 })),
1281 }));
1282
1283 let physical = planner.plan(&logical).unwrap();
1284 assert_eq!(physical.columns(), &["n"]);
1285 }
1286
1287 #[test]
1288 fn test_plan_distinct_with_columns() {
1289 let store = create_test_store();
1290 let planner = Planner::new(store);
1291
1292 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1294 items: vec![ReturnItem {
1295 expression: LogicalExpression::Variable("n".to_string()),
1296 alias: None,
1297 }],
1298 distinct: false,
1299 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1300 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1301 variable: "n".to_string(),
1302 label: None,
1303 input: None,
1304 })),
1305 columns: Some(vec!["n".to_string()]),
1306 })),
1307 }));
1308
1309 let physical = planner.plan(&logical).unwrap();
1310 assert_eq!(physical.columns(), &["n"]);
1311 }
1312
1313 #[test]
1314 fn test_plan_distinct_with_nonexistent_columns() {
1315 let store = create_test_store();
1316 let planner = Planner::new(store);
1317
1318 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1321 items: vec![ReturnItem {
1322 expression: LogicalExpression::Variable("n".to_string()),
1323 alias: None,
1324 }],
1325 distinct: false,
1326 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1327 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1328 variable: "n".to_string(),
1329 label: None,
1330 input: None,
1331 })),
1332 columns: Some(vec!["nonexistent".to_string()]),
1333 })),
1334 }));
1335
1336 let physical = planner.plan(&logical).unwrap();
1337 assert_eq!(physical.columns(), &["n"]);
1338 }
1339
1340 #[test]
1343 fn test_plan_aggregate_count() {
1344 let store = create_test_store();
1345 let planner = Planner::new(store);
1346
1347 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1349 items: vec![ReturnItem {
1350 expression: LogicalExpression::Variable("cnt".to_string()),
1351 alias: None,
1352 }],
1353 distinct: false,
1354 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1355 group_by: vec![],
1356 aggregates: vec![LogicalAggregateExpr {
1357 function: LogicalAggregateFunction::Count,
1358 expression: Some(LogicalExpression::Variable("n".to_string())),
1359 expression2: None,
1360 distinct: false,
1361 alias: Some("cnt".to_string()),
1362 percentile: None,
1363 separator: None,
1364 }],
1365 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1366 variable: "n".to_string(),
1367 label: None,
1368 input: None,
1369 })),
1370 having: None,
1371 })),
1372 }));
1373
1374 let physical = planner.plan(&logical).unwrap();
1375 assert!(physical.columns().contains(&"cnt".to_string()));
1376 }
1377
1378 #[test]
1379 fn test_plan_aggregate_with_group_by() {
1380 let store = create_test_store();
1381 let planner = Planner::new(store);
1382
1383 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1385 group_by: vec![LogicalExpression::Property {
1386 variable: "n".to_string(),
1387 property: "city".to_string(),
1388 }],
1389 aggregates: vec![LogicalAggregateExpr {
1390 function: LogicalAggregateFunction::Count,
1391 expression: Some(LogicalExpression::Variable("n".to_string())),
1392 expression2: None,
1393 distinct: false,
1394 alias: Some("cnt".to_string()),
1395 percentile: None,
1396 separator: None,
1397 }],
1398 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1399 variable: "n".to_string(),
1400 label: Some("Person".to_string()),
1401 input: None,
1402 })),
1403 having: None,
1404 }));
1405
1406 let physical = planner.plan(&logical).unwrap();
1407 assert_eq!(physical.columns().len(), 2);
1408 }
1409
1410 #[test]
1411 fn test_plan_aggregate_sum() {
1412 let store = create_test_store();
1413 let planner = Planner::new(store);
1414
1415 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1417 group_by: vec![],
1418 aggregates: vec![LogicalAggregateExpr {
1419 function: LogicalAggregateFunction::Sum,
1420 expression: Some(LogicalExpression::Property {
1421 variable: "n".to_string(),
1422 property: "value".to_string(),
1423 }),
1424 expression2: None,
1425 distinct: false,
1426 alias: Some("total".to_string()),
1427 percentile: None,
1428 separator: None,
1429 }],
1430 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1431 variable: "n".to_string(),
1432 label: None,
1433 input: None,
1434 })),
1435 having: None,
1436 }));
1437
1438 let physical = planner.plan(&logical).unwrap();
1439 assert!(physical.columns().contains(&"total".to_string()));
1440 }
1441
1442 #[test]
1443 fn test_plan_aggregate_avg() {
1444 let store = create_test_store();
1445 let planner = Planner::new(store);
1446
1447 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1449 group_by: vec![],
1450 aggregates: vec![LogicalAggregateExpr {
1451 function: LogicalAggregateFunction::Avg,
1452 expression: Some(LogicalExpression::Property {
1453 variable: "n".to_string(),
1454 property: "score".to_string(),
1455 }),
1456 expression2: None,
1457 distinct: false,
1458 alias: Some("average".to_string()),
1459 percentile: None,
1460 separator: None,
1461 }],
1462 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1463 variable: "n".to_string(),
1464 label: None,
1465 input: None,
1466 })),
1467 having: None,
1468 }));
1469
1470 let physical = planner.plan(&logical).unwrap();
1471 assert!(physical.columns().contains(&"average".to_string()));
1472 }
1473
1474 #[test]
1475 fn test_plan_aggregate_min_max() {
1476 let store = create_test_store();
1477 let planner = Planner::new(store);
1478
1479 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1481 group_by: vec![],
1482 aggregates: vec![
1483 LogicalAggregateExpr {
1484 function: LogicalAggregateFunction::Min,
1485 expression: Some(LogicalExpression::Property {
1486 variable: "n".to_string(),
1487 property: "age".to_string(),
1488 }),
1489 expression2: None,
1490 distinct: false,
1491 alias: Some("youngest".to_string()),
1492 percentile: None,
1493 separator: None,
1494 },
1495 LogicalAggregateExpr {
1496 function: LogicalAggregateFunction::Max,
1497 expression: Some(LogicalExpression::Property {
1498 variable: "n".to_string(),
1499 property: "age".to_string(),
1500 }),
1501 expression2: None,
1502 distinct: false,
1503 alias: Some("oldest".to_string()),
1504 percentile: None,
1505 separator: None,
1506 },
1507 ],
1508 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1509 variable: "n".to_string(),
1510 label: None,
1511 input: None,
1512 })),
1513 having: None,
1514 }));
1515
1516 let physical = planner.plan(&logical).unwrap();
1517 assert!(physical.columns().contains(&"youngest".to_string()));
1518 assert!(physical.columns().contains(&"oldest".to_string()));
1519 }
1520
1521 #[test]
1524 fn test_plan_inner_join() {
1525 let store = create_test_store();
1526 let planner = Planner::new(store);
1527
1528 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1530 items: vec![
1531 ReturnItem {
1532 expression: LogicalExpression::Variable("a".to_string()),
1533 alias: None,
1534 },
1535 ReturnItem {
1536 expression: LogicalExpression::Variable("b".to_string()),
1537 alias: None,
1538 },
1539 ],
1540 distinct: false,
1541 input: Box::new(LogicalOperator::Join(JoinOp {
1542 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1543 variable: "a".to_string(),
1544 label: Some("Person".to_string()),
1545 input: None,
1546 })),
1547 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1548 variable: "b".to_string(),
1549 label: Some("Company".to_string()),
1550 input: None,
1551 })),
1552 join_type: JoinType::Inner,
1553 conditions: vec![JoinCondition {
1554 left: LogicalExpression::Variable("a".to_string()),
1555 right: LogicalExpression::Variable("b".to_string()),
1556 }],
1557 })),
1558 }));
1559
1560 let physical = planner.plan(&logical).unwrap();
1561 assert!(physical.columns().contains(&"a".to_string()));
1562 assert!(physical.columns().contains(&"b".to_string()));
1563 }
1564
1565 #[test]
1566 fn test_plan_cross_join() {
1567 let store = create_test_store();
1568 let planner = Planner::new(store);
1569
1570 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1572 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1573 variable: "a".to_string(),
1574 label: None,
1575 input: None,
1576 })),
1577 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1578 variable: "b".to_string(),
1579 label: None,
1580 input: None,
1581 })),
1582 join_type: JoinType::Cross,
1583 conditions: vec![],
1584 }));
1585
1586 let physical = planner.plan(&logical).unwrap();
1587 assert_eq!(physical.columns().len(), 2);
1588 }
1589
1590 #[test]
1591 fn test_plan_left_join() {
1592 let store = create_test_store();
1593 let planner = Planner::new(store);
1594
1595 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1596 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1597 variable: "a".to_string(),
1598 label: None,
1599 input: None,
1600 })),
1601 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1602 variable: "b".to_string(),
1603 label: None,
1604 input: None,
1605 })),
1606 join_type: JoinType::Left,
1607 conditions: vec![],
1608 }));
1609
1610 let physical = planner.plan(&logical).unwrap();
1611 assert_eq!(physical.columns().len(), 2);
1612 }
1613
1614 #[test]
1617 fn test_plan_create_node() {
1618 let store = create_test_store();
1619 let planner = Planner::new(store);
1620
1621 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1623 variable: "n".to_string(),
1624 labels: vec!["Person".to_string()],
1625 properties: vec![(
1626 "name".to_string(),
1627 LogicalExpression::Literal(Value::String("Alix".into())),
1628 )],
1629 input: None,
1630 }));
1631
1632 let physical = planner.plan(&logical).unwrap();
1633 assert!(physical.columns().contains(&"n".to_string()));
1634 }
1635
1636 #[test]
1637 fn test_plan_create_edge() {
1638 let store = create_test_store();
1639 let planner = Planner::new(store);
1640
1641 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1643 variable: Some("r".to_string()),
1644 from_variable: "a".to_string(),
1645 to_variable: "b".to_string(),
1646 edge_type: "KNOWS".to_string(),
1647 properties: vec![],
1648 input: Box::new(LogicalOperator::Join(JoinOp {
1649 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1650 variable: "a".to_string(),
1651 label: None,
1652 input: None,
1653 })),
1654 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1655 variable: "b".to_string(),
1656 label: None,
1657 input: None,
1658 })),
1659 join_type: JoinType::Cross,
1660 conditions: vec![],
1661 })),
1662 }));
1663
1664 let physical = planner.plan(&logical).unwrap();
1665 assert!(physical.columns().contains(&"r".to_string()));
1666 }
1667
1668 #[test]
1669 fn test_plan_delete_node() {
1670 let store = create_test_store();
1671 let planner = Planner::new(store);
1672
1673 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1675 variable: "n".to_string(),
1676 detach: false,
1677 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1678 variable: "n".to_string(),
1679 label: None,
1680 input: None,
1681 })),
1682 }));
1683
1684 let physical = planner.plan(&logical).unwrap();
1685 assert!(physical.columns().contains(&"n".to_string()));
1686 }
1687
1688 #[test]
1691 fn test_plan_empty_errors() {
1692 let store = create_test_store();
1693 let planner = Planner::new(store);
1694
1695 let logical = LogicalPlan::new(LogicalOperator::Empty);
1696 let result = planner.plan(&logical);
1697 assert!(result.is_err());
1698 }
1699
1700 #[test]
1701 fn test_plan_missing_variable_in_return() {
1702 let store = create_test_store();
1703 let planner = Planner::new(store);
1704
1705 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1707 items: vec![ReturnItem {
1708 expression: LogicalExpression::Variable("missing".to_string()),
1709 alias: None,
1710 }],
1711 distinct: false,
1712 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1713 variable: "n".to_string(),
1714 label: None,
1715 input: None,
1716 })),
1717 }));
1718
1719 let result = planner.plan(&logical);
1720 assert!(result.is_err());
1721 }
1722
1723 #[test]
1726 fn test_convert_binary_ops() {
1727 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1728 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1729 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1730 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1731 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1732 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1733 assert!(convert_binary_op(BinaryOp::And).is_ok());
1734 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1735 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1736 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1737 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1738 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1739 }
1740
1741 #[test]
1742 fn test_convert_unary_ops() {
1743 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1744 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1745 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1746 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1747 }
1748
1749 #[test]
1750 fn test_convert_aggregate_functions() {
1751 assert!(matches!(
1752 convert_aggregate_function(LogicalAggregateFunction::Count),
1753 PhysicalAggregateFunction::Count
1754 ));
1755 assert!(matches!(
1756 convert_aggregate_function(LogicalAggregateFunction::Sum),
1757 PhysicalAggregateFunction::Sum
1758 ));
1759 assert!(matches!(
1760 convert_aggregate_function(LogicalAggregateFunction::Avg),
1761 PhysicalAggregateFunction::Avg
1762 ));
1763 assert!(matches!(
1764 convert_aggregate_function(LogicalAggregateFunction::Min),
1765 PhysicalAggregateFunction::Min
1766 ));
1767 assert!(matches!(
1768 convert_aggregate_function(LogicalAggregateFunction::Max),
1769 PhysicalAggregateFunction::Max
1770 ));
1771 }
1772
1773 #[test]
1774 fn test_planner_accessors() {
1775 let store = create_test_store();
1776 let planner = Planner::new(Arc::clone(&store));
1777
1778 assert!(planner.transaction_id().is_none());
1779 assert!(planner.transaction_manager().is_none());
1780 let _ = planner.viewing_epoch(); }
1782
1783 #[test]
1784 fn test_physical_plan_accessors() {
1785 let store = create_test_store();
1786 let planner = Planner::new(store);
1787
1788 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1789 variable: "n".to_string(),
1790 label: None,
1791 input: None,
1792 }));
1793
1794 let physical = planner.plan(&logical).unwrap();
1795 assert_eq!(physical.columns(), &["n"]);
1796
1797 let _ = physical.into_operator();
1799 }
1800
1801 #[test]
1804 fn test_plan_adaptive_with_scan() {
1805 let store = create_test_store();
1806 let planner = Planner::new(store);
1807
1808 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1810 items: vec![ReturnItem {
1811 expression: LogicalExpression::Variable("n".to_string()),
1812 alias: None,
1813 }],
1814 distinct: false,
1815 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816 variable: "n".to_string(),
1817 label: Some("Person".to_string()),
1818 input: None,
1819 })),
1820 }));
1821
1822 let physical = planner.plan_adaptive(&logical).unwrap();
1823 assert_eq!(physical.columns(), &["n"]);
1824 assert!(physical.adaptive_context.is_some());
1826 }
1827
1828 #[test]
1829 fn test_plan_adaptive_with_filter() {
1830 let store = create_test_store();
1831 let planner = Planner::new(store);
1832
1833 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1835 items: vec![ReturnItem {
1836 expression: LogicalExpression::Variable("n".to_string()),
1837 alias: None,
1838 }],
1839 distinct: false,
1840 input: Box::new(LogicalOperator::Filter(FilterOp {
1841 predicate: LogicalExpression::Binary {
1842 left: Box::new(LogicalExpression::Property {
1843 variable: "n".to_string(),
1844 property: "age".to_string(),
1845 }),
1846 op: BinaryOp::Gt,
1847 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1848 },
1849 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1850 variable: "n".to_string(),
1851 label: None,
1852 input: None,
1853 })),
1854 pushdown_hint: None,
1855 })),
1856 }));
1857
1858 let physical = planner.plan_adaptive(&logical).unwrap();
1859 assert!(physical.adaptive_context.is_some());
1860 }
1861
1862 #[test]
1863 fn test_plan_adaptive_with_expand() {
1864 let store = create_test_store();
1865 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1866
1867 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1869 items: vec![
1870 ReturnItem {
1871 expression: LogicalExpression::Variable("a".to_string()),
1872 alias: None,
1873 },
1874 ReturnItem {
1875 expression: LogicalExpression::Variable("b".to_string()),
1876 alias: None,
1877 },
1878 ],
1879 distinct: false,
1880 input: Box::new(LogicalOperator::Expand(ExpandOp {
1881 from_variable: "a".to_string(),
1882 to_variable: "b".to_string(),
1883 edge_variable: None,
1884 direction: ExpandDirection::Outgoing,
1885 edge_types: vec!["KNOWS".to_string()],
1886 min_hops: 1,
1887 max_hops: Some(1),
1888 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1889 variable: "a".to_string(),
1890 label: None,
1891 input: None,
1892 })),
1893 path_alias: None,
1894 path_mode: PathMode::Walk,
1895 })),
1896 }));
1897
1898 let physical = planner.plan_adaptive(&logical).unwrap();
1899 assert!(physical.adaptive_context.is_some());
1900 }
1901
1902 #[test]
1903 fn test_plan_adaptive_with_join() {
1904 let store = create_test_store();
1905 let planner = Planner::new(store);
1906
1907 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1908 items: vec![
1909 ReturnItem {
1910 expression: LogicalExpression::Variable("a".to_string()),
1911 alias: None,
1912 },
1913 ReturnItem {
1914 expression: LogicalExpression::Variable("b".to_string()),
1915 alias: None,
1916 },
1917 ],
1918 distinct: false,
1919 input: Box::new(LogicalOperator::Join(JoinOp {
1920 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1921 variable: "a".to_string(),
1922 label: None,
1923 input: None,
1924 })),
1925 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1926 variable: "b".to_string(),
1927 label: None,
1928 input: None,
1929 })),
1930 join_type: JoinType::Cross,
1931 conditions: vec![],
1932 })),
1933 }));
1934
1935 let physical = planner.plan_adaptive(&logical).unwrap();
1936 assert!(physical.adaptive_context.is_some());
1937 }
1938
1939 #[test]
1940 fn test_plan_adaptive_with_aggregate() {
1941 let store = create_test_store();
1942 let planner = Planner::new(store);
1943
1944 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1945 group_by: vec![],
1946 aggregates: vec![LogicalAggregateExpr {
1947 function: LogicalAggregateFunction::Count,
1948 expression: Some(LogicalExpression::Variable("n".to_string())),
1949 expression2: None,
1950 distinct: false,
1951 alias: Some("cnt".to_string()),
1952 percentile: None,
1953 separator: None,
1954 }],
1955 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1956 variable: "n".to_string(),
1957 label: None,
1958 input: None,
1959 })),
1960 having: None,
1961 }));
1962
1963 let physical = planner.plan_adaptive(&logical).unwrap();
1964 assert!(physical.adaptive_context.is_some());
1965 }
1966
1967 #[test]
1968 fn test_plan_adaptive_with_distinct() {
1969 let store = create_test_store();
1970 let planner = Planner::new(store);
1971
1972 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1973 items: vec![ReturnItem {
1974 expression: LogicalExpression::Variable("n".to_string()),
1975 alias: None,
1976 }],
1977 distinct: false,
1978 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1979 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1980 variable: "n".to_string(),
1981 label: None,
1982 input: None,
1983 })),
1984 columns: None,
1985 })),
1986 }));
1987
1988 let physical = planner.plan_adaptive(&logical).unwrap();
1989 assert!(physical.adaptive_context.is_some());
1990 }
1991
1992 #[test]
1993 fn test_plan_adaptive_with_limit() {
1994 let store = create_test_store();
1995 let planner = Planner::new(store);
1996
1997 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1998 items: vec![ReturnItem {
1999 expression: LogicalExpression::Variable("n".to_string()),
2000 alias: None,
2001 }],
2002 distinct: false,
2003 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2004 count: 10.into(),
2005 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2006 variable: "n".to_string(),
2007 label: None,
2008 input: None,
2009 })),
2010 })),
2011 }));
2012
2013 let physical = planner.plan_adaptive(&logical).unwrap();
2014 assert!(physical.adaptive_context.is_some());
2015 }
2016
2017 #[test]
2018 fn test_plan_adaptive_with_skip() {
2019 let store = create_test_store();
2020 let planner = Planner::new(store);
2021
2022 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2023 items: vec![ReturnItem {
2024 expression: LogicalExpression::Variable("n".to_string()),
2025 alias: None,
2026 }],
2027 distinct: false,
2028 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2029 count: 5.into(),
2030 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2031 variable: "n".to_string(),
2032 label: None,
2033 input: None,
2034 })),
2035 })),
2036 }));
2037
2038 let physical = planner.plan_adaptive(&logical).unwrap();
2039 assert!(physical.adaptive_context.is_some());
2040 }
2041
2042 #[test]
2043 fn test_plan_adaptive_with_sort() {
2044 let store = create_test_store();
2045 let planner = Planner::new(store);
2046
2047 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2048 items: vec![ReturnItem {
2049 expression: LogicalExpression::Variable("n".to_string()),
2050 alias: None,
2051 }],
2052 distinct: false,
2053 input: Box::new(LogicalOperator::Sort(SortOp {
2054 keys: vec![SortKey {
2055 expression: LogicalExpression::Variable("n".to_string()),
2056 order: SortOrder::Ascending,
2057 nulls: None,
2058 }],
2059 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2060 variable: "n".to_string(),
2061 label: None,
2062 input: None,
2063 })),
2064 })),
2065 }));
2066
2067 let physical = planner.plan_adaptive(&logical).unwrap();
2068 assert!(physical.adaptive_context.is_some());
2069 }
2070
2071 #[test]
2072 fn test_plan_adaptive_with_union() {
2073 let store = create_test_store();
2074 let planner = Planner::new(store);
2075
2076 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2077 items: vec![ReturnItem {
2078 expression: LogicalExpression::Variable("n".to_string()),
2079 alias: None,
2080 }],
2081 distinct: false,
2082 input: Box::new(LogicalOperator::Union(UnionOp {
2083 inputs: vec![
2084 LogicalOperator::NodeScan(NodeScanOp {
2085 variable: "n".to_string(),
2086 label: Some("Person".to_string()),
2087 input: None,
2088 }),
2089 LogicalOperator::NodeScan(NodeScanOp {
2090 variable: "n".to_string(),
2091 label: Some("Company".to_string()),
2092 input: None,
2093 }),
2094 ],
2095 })),
2096 }));
2097
2098 let physical = planner.plan_adaptive(&logical).unwrap();
2099 assert!(physical.adaptive_context.is_some());
2100 }
2101
2102 #[test]
2105 fn test_plan_expand_variable_length() {
2106 let store = create_test_store();
2107 let planner = Planner::new(store);
2108
2109 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2111 items: vec![
2112 ReturnItem {
2113 expression: LogicalExpression::Variable("a".to_string()),
2114 alias: None,
2115 },
2116 ReturnItem {
2117 expression: LogicalExpression::Variable("b".to_string()),
2118 alias: None,
2119 },
2120 ],
2121 distinct: false,
2122 input: Box::new(LogicalOperator::Expand(ExpandOp {
2123 from_variable: "a".to_string(),
2124 to_variable: "b".to_string(),
2125 edge_variable: None,
2126 direction: ExpandDirection::Outgoing,
2127 edge_types: vec!["KNOWS".to_string()],
2128 min_hops: 1,
2129 max_hops: Some(3),
2130 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2131 variable: "a".to_string(),
2132 label: None,
2133 input: None,
2134 })),
2135 path_alias: None,
2136 path_mode: PathMode::Walk,
2137 })),
2138 }));
2139
2140 let physical = planner.plan(&logical).unwrap();
2141 assert!(physical.columns().contains(&"a".to_string()));
2142 assert!(physical.columns().contains(&"b".to_string()));
2143 }
2144
2145 #[test]
2146 fn test_plan_expand_with_path_alias() {
2147 let store = create_test_store();
2148 let planner = Planner::new(store);
2149
2150 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2152 items: vec![
2153 ReturnItem {
2154 expression: LogicalExpression::Variable("a".to_string()),
2155 alias: None,
2156 },
2157 ReturnItem {
2158 expression: LogicalExpression::Variable("b".to_string()),
2159 alias: None,
2160 },
2161 ],
2162 distinct: false,
2163 input: Box::new(LogicalOperator::Expand(ExpandOp {
2164 from_variable: "a".to_string(),
2165 to_variable: "b".to_string(),
2166 edge_variable: None,
2167 direction: ExpandDirection::Outgoing,
2168 edge_types: vec!["KNOWS".to_string()],
2169 min_hops: 1,
2170 max_hops: Some(3),
2171 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2172 variable: "a".to_string(),
2173 label: None,
2174 input: None,
2175 })),
2176 path_alias: Some("p".to_string()),
2177 path_mode: PathMode::Walk,
2178 })),
2179 }));
2180
2181 let physical = planner.plan(&logical).unwrap();
2182 assert!(physical.columns().contains(&"a".to_string()));
2184 assert!(physical.columns().contains(&"b".to_string()));
2185 }
2186
2187 #[test]
2188 fn test_plan_expand_incoming() {
2189 let store = create_test_store();
2190 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2191
2192 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2194 items: vec![
2195 ReturnItem {
2196 expression: LogicalExpression::Variable("a".to_string()),
2197 alias: None,
2198 },
2199 ReturnItem {
2200 expression: LogicalExpression::Variable("b".to_string()),
2201 alias: None,
2202 },
2203 ],
2204 distinct: false,
2205 input: Box::new(LogicalOperator::Expand(ExpandOp {
2206 from_variable: "a".to_string(),
2207 to_variable: "b".to_string(),
2208 edge_variable: None,
2209 direction: ExpandDirection::Incoming,
2210 edge_types: vec!["KNOWS".to_string()],
2211 min_hops: 1,
2212 max_hops: Some(1),
2213 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2214 variable: "a".to_string(),
2215 label: None,
2216 input: None,
2217 })),
2218 path_alias: None,
2219 path_mode: PathMode::Walk,
2220 })),
2221 }));
2222
2223 let physical = planner.plan(&logical).unwrap();
2224 assert!(physical.columns().contains(&"a".to_string()));
2225 assert!(physical.columns().contains(&"b".to_string()));
2226 }
2227
2228 #[test]
2229 fn test_plan_expand_both_directions() {
2230 let store = create_test_store();
2231 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2232
2233 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2235 items: vec![
2236 ReturnItem {
2237 expression: LogicalExpression::Variable("a".to_string()),
2238 alias: None,
2239 },
2240 ReturnItem {
2241 expression: LogicalExpression::Variable("b".to_string()),
2242 alias: None,
2243 },
2244 ],
2245 distinct: false,
2246 input: Box::new(LogicalOperator::Expand(ExpandOp {
2247 from_variable: "a".to_string(),
2248 to_variable: "b".to_string(),
2249 edge_variable: None,
2250 direction: ExpandDirection::Both,
2251 edge_types: vec!["KNOWS".to_string()],
2252 min_hops: 1,
2253 max_hops: Some(1),
2254 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2255 variable: "a".to_string(),
2256 label: None,
2257 input: None,
2258 })),
2259 path_alias: None,
2260 path_mode: PathMode::Walk,
2261 })),
2262 }));
2263
2264 let physical = planner.plan(&logical).unwrap();
2265 assert!(physical.columns().contains(&"a".to_string()));
2266 assert!(physical.columns().contains(&"b".to_string()));
2267 }
2268
2269 #[test]
2272 fn test_planner_with_context() {
2273 use crate::transaction::TransactionManager;
2274
2275 let store = create_test_store();
2276 let transaction_manager = Arc::new(TransactionManager::new());
2277 let transaction_id = transaction_manager.begin();
2278 let epoch = transaction_manager.current_epoch();
2279
2280 let planner = Planner::with_context(
2281 Arc::clone(&store),
2282 Arc::clone(&transaction_manager),
2283 Some(transaction_id),
2284 epoch,
2285 );
2286
2287 assert_eq!(planner.transaction_id(), Some(transaction_id));
2288 assert!(planner.transaction_manager().is_some());
2289 assert_eq!(planner.viewing_epoch(), epoch);
2290 }
2291
2292 #[test]
2293 fn test_planner_with_factorized_execution_disabled() {
2294 let store = create_test_store();
2295 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2296
2297 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2299 items: vec![
2300 ReturnItem {
2301 expression: LogicalExpression::Variable("a".to_string()),
2302 alias: None,
2303 },
2304 ReturnItem {
2305 expression: LogicalExpression::Variable("c".to_string()),
2306 alias: None,
2307 },
2308 ],
2309 distinct: false,
2310 input: Box::new(LogicalOperator::Expand(ExpandOp {
2311 from_variable: "b".to_string(),
2312 to_variable: "c".to_string(),
2313 edge_variable: None,
2314 direction: ExpandDirection::Outgoing,
2315 edge_types: vec![],
2316 min_hops: 1,
2317 max_hops: Some(1),
2318 input: Box::new(LogicalOperator::Expand(ExpandOp {
2319 from_variable: "a".to_string(),
2320 to_variable: "b".to_string(),
2321 edge_variable: None,
2322 direction: ExpandDirection::Outgoing,
2323 edge_types: vec![],
2324 min_hops: 1,
2325 max_hops: Some(1),
2326 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2327 variable: "a".to_string(),
2328 label: None,
2329 input: None,
2330 })),
2331 path_alias: None,
2332 path_mode: PathMode::Walk,
2333 })),
2334 path_alias: None,
2335 path_mode: PathMode::Walk,
2336 })),
2337 }));
2338
2339 let physical = planner.plan(&logical).unwrap();
2340 assert!(physical.columns().contains(&"a".to_string()));
2341 assert!(physical.columns().contains(&"c".to_string()));
2342 }
2343
2344 #[test]
2347 fn test_plan_sort_by_property() {
2348 let store = create_test_store();
2349 let planner = Planner::new(store);
2350
2351 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2353 items: vec![ReturnItem {
2354 expression: LogicalExpression::Variable("n".to_string()),
2355 alias: None,
2356 }],
2357 distinct: false,
2358 input: Box::new(LogicalOperator::Sort(SortOp {
2359 keys: vec![SortKey {
2360 expression: LogicalExpression::Property {
2361 variable: "n".to_string(),
2362 property: "name".to_string(),
2363 },
2364 order: SortOrder::Ascending,
2365 nulls: None,
2366 }],
2367 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2368 variable: "n".to_string(),
2369 label: None,
2370 input: None,
2371 })),
2372 })),
2373 }));
2374
2375 let physical = planner.plan(&logical).unwrap();
2376 assert!(physical.columns().contains(&"n".to_string()));
2378 }
2379
2380 #[test]
2383 fn test_plan_scan_with_input() {
2384 let store = create_test_store();
2385 let planner = Planner::new(store);
2386
2387 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2389 items: vec![
2390 ReturnItem {
2391 expression: LogicalExpression::Variable("a".to_string()),
2392 alias: None,
2393 },
2394 ReturnItem {
2395 expression: LogicalExpression::Variable("b".to_string()),
2396 alias: None,
2397 },
2398 ],
2399 distinct: false,
2400 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2401 variable: "b".to_string(),
2402 label: Some("Company".to_string()),
2403 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2404 variable: "a".to_string(),
2405 label: Some("Person".to_string()),
2406 input: None,
2407 }))),
2408 })),
2409 }));
2410
2411 let physical = planner.plan(&logical).unwrap();
2412 assert!(physical.columns().contains(&"a".to_string()));
2413 assert!(physical.columns().contains(&"b".to_string()));
2414 }
2415}