1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TransactionId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33 EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34 FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35 HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
38 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
39 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
40 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
41 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnwindOperator,
42 VariableLengthExpandOperator,
43};
44use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
45use std::collections::HashMap;
46use std::sync::Arc;
47
48use crate::query::planner::common;
49use crate::query::planner::common::expression_to_string;
50use crate::query::planner::{
51 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
52 convert_unary_op, value_to_logical_type,
53};
54use crate::transaction::TransactionManager;
55
56struct RangeBounds<'a> {
58 min: Option<&'a Value>,
59 max: Option<&'a Value>,
60 min_inclusive: bool,
61 max_inclusive: bool,
62}
63
64pub struct Planner {
66 pub(super) store: Arc<dyn GraphStoreMut>,
68 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
70 pub(super) transaction_id: Option<TransactionId>,
72 pub(super) viewing_epoch: EpochId,
74 pub(super) anon_edge_counter: std::cell::Cell<u32>,
76 pub(super) factorized_execution: bool,
78 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
81 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
86 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
88 pub(super) correlated_param_state:
92 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
93 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
96 profiling: std::cell::Cell<bool>,
98 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
100 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
102 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
104 pub(super) read_only: bool,
108}
109
110impl Planner {
111 #[must_use]
116 pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
117 let epoch = store.current_epoch();
118 Self {
119 store,
120 transaction_manager: None,
121 transaction_id: None,
122 viewing_epoch: epoch,
123 anon_edge_counter: std::cell::Cell::new(0),
124 factorized_execution: true,
125 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
126 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
127 validator: None,
128 catalog: None,
129 correlated_param_state: std::cell::RefCell::new(None),
130 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
131 profiling: std::cell::Cell::new(false),
132 profile_entries: std::cell::RefCell::new(Vec::new()),
133 write_tracker: None,
134 session_context: grafeo_core::execution::operators::SessionContext::default(),
135 read_only: false,
136 }
137 }
138
139 #[must_use]
141 pub fn with_context(
142 store: Arc<dyn GraphStoreMut>,
143 transaction_manager: Arc<TransactionManager>,
144 transaction_id: Option<TransactionId>,
145 viewing_epoch: EpochId,
146 ) -> Self {
147 use crate::transaction::TransactionWriteTracker;
148
149 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
151 if transaction_id.is_some() {
152 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
153 &transaction_manager,
154 ))))
155 } else {
156 None
157 };
158
159 Self {
160 store,
161 transaction_manager: Some(transaction_manager),
162 transaction_id,
163 viewing_epoch,
164 anon_edge_counter: std::cell::Cell::new(0),
165 factorized_execution: true,
166 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
167 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
168 validator: None,
169 catalog: None,
170 correlated_param_state: std::cell::RefCell::new(None),
171 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
172 profiling: std::cell::Cell::new(false),
173 profile_entries: std::cell::RefCell::new(Vec::new()),
174 write_tracker,
175 session_context: grafeo_core::execution::operators::SessionContext::default(),
176 read_only: false,
177 }
178 }
179
180 #[must_use]
183 pub fn with_read_only(mut self, read_only: bool) -> Self {
184 self.read_only = read_only;
185 self
186 }
187
188 #[must_use]
190 pub fn viewing_epoch(&self) -> EpochId {
191 self.viewing_epoch
192 }
193
194 #[must_use]
196 pub fn transaction_id(&self) -> Option<TransactionId> {
197 self.transaction_id
198 }
199
200 #[must_use]
202 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
203 self.transaction_manager.as_ref()
204 }
205
206 #[must_use]
208 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
209 self.factorized_execution = enabled;
210 self
211 }
212
213 #[must_use]
215 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
216 self.validator = Some(validator);
217 self
218 }
219
220 #[must_use]
222 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
223 self.catalog = Some(catalog);
224 self
225 }
226
227 #[must_use]
229 pub fn with_session_context(
230 mut self,
231 context: grafeo_core::execution::operators::SessionContext,
232 ) -> Self {
233 self.session_context = context;
234 self
235 }
236
237 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
241 match op {
242 LogicalOperator::Expand(expand) => {
243 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
244
245 if is_single_hop {
246 let (inner_count, base) = Self::count_expand_chain(&expand.input);
247 (inner_count + 1, base)
248 } else {
249 (0, op)
250 }
251 }
252 _ => (0, op),
253 }
254 }
255
256 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
260 let mut chain = Vec::new();
261 let mut current = op;
262
263 while let LogicalOperator::Expand(expand) = current {
264 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
265 if !is_single_hop {
266 break;
267 }
268 chain.push(expand);
269 current = &expand.input;
270 }
271
272 chain.reverse();
273 chain
274 }
275
276 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
278 let _span = tracing::debug_span!("grafeo::query::plan").entered();
279 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
280 Ok(PhysicalPlan {
281 operator,
282 columns,
283 adaptive_context: None,
284 })
285 }
286
287 pub fn plan_profiled(
293 &self,
294 logical_plan: &LogicalPlan,
295 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
296 self.profiling.set(true);
297 self.profile_entries.borrow_mut().clear();
298
299 let result = self.plan_operator(&logical_plan.root);
300
301 self.profiling.set(false);
302 let (operator, columns) = result?;
303 let entries = self.profile_entries.borrow_mut().drain(..).collect();
304
305 Ok((
306 PhysicalPlan {
307 operator,
308 columns,
309 adaptive_context: None,
310 },
311 entries,
312 ))
313 }
314
315 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
317 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
318
319 let mut adaptive_context = AdaptiveContext::new();
320 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
321
322 Ok(PhysicalPlan {
323 operator,
324 columns,
325 adaptive_context: Some(adaptive_context),
326 })
327 }
328
329 fn collect_cardinality_estimates(
331 &self,
332 op: &LogicalOperator,
333 ctx: &mut AdaptiveContext,
334 depth: usize,
335 ) {
336 match op {
337 LogicalOperator::NodeScan(scan) => {
338 let estimate = if let Some(label) = &scan.label {
339 self.store.nodes_by_label(label).len() as f64
340 } else {
341 self.store.node_count() as f64
342 };
343 let id = format!("scan_{}", scan.variable);
344 ctx.set_estimate(&id, estimate);
345
346 if let Some(input) = &scan.input {
347 self.collect_cardinality_estimates(input, ctx, depth + 1);
348 }
349 }
350 LogicalOperator::Filter(filter) => {
351 let input_estimate = self.estimate_cardinality(&filter.input);
352 let estimate = input_estimate * 0.3;
353 let id = format!("filter_{depth}");
354 ctx.set_estimate(&id, estimate);
355
356 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
357 }
358 LogicalOperator::Expand(expand) => {
359 let input_estimate = self.estimate_cardinality(&expand.input);
360 let stats = self.store.statistics();
361 let avg_degree = self.estimate_expand_degree(&stats, expand);
362 let estimate = input_estimate * avg_degree;
363 let id = format!("expand_{}", expand.to_variable);
364 ctx.set_estimate(&id, estimate);
365
366 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
367 }
368 LogicalOperator::Join(join) => {
369 let left_est = self.estimate_cardinality(&join.left);
370 let right_est = self.estimate_cardinality(&join.right);
371 let estimate = (left_est * right_est).sqrt();
372 let id = format!("join_{depth}");
373 ctx.set_estimate(&id, estimate);
374
375 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
376 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
377 }
378 LogicalOperator::Aggregate(agg) => {
379 let input_estimate = self.estimate_cardinality(&agg.input);
380 let estimate = if agg.group_by.is_empty() {
381 1.0
382 } else {
383 (input_estimate * 0.1).max(1.0)
384 };
385 let id = format!("aggregate_{depth}");
386 ctx.set_estimate(&id, estimate);
387
388 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
389 }
390 LogicalOperator::Distinct(distinct) => {
391 let input_estimate = self.estimate_cardinality(&distinct.input);
392 let estimate = (input_estimate * 0.5).max(1.0);
393 let id = format!("distinct_{depth}");
394 ctx.set_estimate(&id, estimate);
395
396 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
397 }
398 LogicalOperator::Return(ret) => {
399 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
400 }
401 LogicalOperator::Limit(limit) => {
402 let input_estimate = self.estimate_cardinality(&limit.input);
403 let estimate = (input_estimate).min(limit.count.estimate());
404 let id = format!("limit_{depth}");
405 ctx.set_estimate(&id, estimate);
406
407 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
408 }
409 LogicalOperator::Skip(skip) => {
410 let input_estimate = self.estimate_cardinality(&skip.input);
411 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
412 let id = format!("skip_{depth}");
413 ctx.set_estimate(&id, estimate);
414
415 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
416 }
417 LogicalOperator::Sort(sort) => {
418 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
419 }
420 LogicalOperator::Union(union) => {
421 let estimate: f64 = union
422 .inputs
423 .iter()
424 .map(|input| self.estimate_cardinality(input))
425 .sum();
426 let id = format!("union_{depth}");
427 ctx.set_estimate(&id, estimate);
428
429 for input in &union.inputs {
430 self.collect_cardinality_estimates(input, ctx, depth + 1);
431 }
432 }
433 _ => {
434 }
436 }
437 }
438
439 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
441 match op {
442 LogicalOperator::NodeScan(scan) => {
443 if let Some(label) = &scan.label {
444 self.store.nodes_by_label(label).len() as f64
445 } else {
446 self.store.node_count() as f64
447 }
448 }
449 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
450 LogicalOperator::Expand(expand) => {
451 let stats = self.store.statistics();
452 let avg_degree = self.estimate_expand_degree(&stats, expand);
453 self.estimate_cardinality(&expand.input) * avg_degree
454 }
455 LogicalOperator::Join(join) => {
456 let left = self.estimate_cardinality(&join.left);
457 let right = self.estimate_cardinality(&join.right);
458 (left * right).sqrt()
459 }
460 LogicalOperator::Aggregate(agg) => {
461 if agg.group_by.is_empty() {
462 1.0
463 } else {
464 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
465 }
466 }
467 LogicalOperator::Distinct(distinct) => {
468 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
469 }
470 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
471 LogicalOperator::Limit(limit) => self
472 .estimate_cardinality(&limit.input)
473 .min(limit.count.estimate()),
474 LogicalOperator::Skip(skip) => {
475 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
476 }
477 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
478 LogicalOperator::Union(union) => union
479 .inputs
480 .iter()
481 .map(|input| self.estimate_cardinality(input))
482 .sum(),
483 LogicalOperator::Except(except) => {
484 let left = self.estimate_cardinality(&except.left);
485 let right = self.estimate_cardinality(&except.right);
486 (left - right).max(0.0)
487 }
488 LogicalOperator::Intersect(intersect) => {
489 let left = self.estimate_cardinality(&intersect.left);
490 let right = self.estimate_cardinality(&intersect.right);
491 left.min(right)
492 }
493 LogicalOperator::Otherwise(otherwise) => self
494 .estimate_cardinality(&otherwise.left)
495 .max(self.estimate_cardinality(&otherwise.right)),
496 _ => 1000.0,
497 }
498 }
499
500 fn estimate_expand_degree(
502 &self,
503 stats: &grafeo_core::statistics::Statistics,
504 expand: &ExpandOp,
505 ) -> f64 {
506 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
507 if expand.edge_types.len() == 1 {
508 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
509 } else if stats.total_nodes > 0 {
510 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
511 } else {
512 10.0
513 }
514 }
515
516 fn maybe_profile(
519 &self,
520 result: Result<(Box<dyn Operator>, Vec<String>)>,
521 op: &LogicalOperator,
522 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
523 if self.profiling.get() {
524 let (physical, columns) = result?;
525 let (entry, stats) =
526 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
527 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
528 self.profile_entries.borrow_mut().push(entry);
529 Ok((Box::new(profiled), columns))
530 } else {
531 result
532 }
533 }
534
535 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
537 let result = match op {
538 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
539 LogicalOperator::Expand(expand) => {
540 if self.factorized_execution {
541 let (chain_len, _base) = Self::count_expand_chain(op);
542 if chain_len >= 2 {
543 return self.maybe_profile(self.plan_expand_chain(op), op);
544 }
545 }
546 self.plan_expand(expand)
547 }
548 LogicalOperator::Return(ret) => self.plan_return(ret),
549 LogicalOperator::Filter(filter) => self.plan_filter(filter),
550 LogicalOperator::Project(project) => self.plan_project(project),
551 LogicalOperator::Limit(limit) => self.plan_limit(limit),
552 LogicalOperator::Skip(skip) => self.plan_skip(skip),
553 LogicalOperator::Sort(sort) => self.plan_sort(sort),
554 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
555 LogicalOperator::Join(join) => self.plan_join(join),
556 LogicalOperator::Union(union) => self.plan_union(union),
557 LogicalOperator::Except(except) => self.plan_except(except),
558 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
559 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
560 LogicalOperator::Apply(apply) => self.plan_apply(apply),
561 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
562 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
563 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
564 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
565 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
566 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
567 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
568 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
569 LogicalOperator::Merge(merge) => self.plan_merge(merge),
570 LogicalOperator::MergeRelationship(merge_rel) => {
571 self.plan_merge_relationship(merge_rel)
572 }
573 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
574 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
575 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
576 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
577 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
578 #[cfg(feature = "algos")]
579 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
580 #[cfg(not(feature = "algos"))]
581 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
582 "CALL procedures require the 'algos' feature".to_string(),
583 )),
584 LogicalOperator::ParameterScan(_param_scan) => {
585 let state = self
586 .correlated_param_state
587 .borrow()
588 .clone()
589 .ok_or_else(|| {
590 Error::Internal(
591 "ParameterScan without correlated Apply context".to_string(),
592 )
593 })?;
594 let columns = state.columns.clone();
597 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
598 Ok((operator, columns))
599 }
600 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
601 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
602 LogicalOperator::LoadData(load) => {
603 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
604 load.path.clone(),
605 load.format,
606 load.with_headers,
607 load.field_terminator,
608 load.variable.clone(),
609 ));
610 Ok((operator, vec![load.variable.clone()]))
611 }
612 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
613 LogicalOperator::VectorScan(_) => Err(Error::Internal(
614 "VectorScan requires vector-index feature".to_string(),
615 )),
616 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
617 "VectorJoin requires vector-index feature".to_string(),
618 )),
619 _ => Err(Error::Internal(format!(
620 "Unsupported operator: {:?}",
621 std::mem::discriminant(op)
622 ))),
623 };
624 self.maybe_profile(result, op)
625 }
626
627 fn plan_horizontal_aggregate(
629 &self,
630 ha: &HorizontalAggregateOp,
631 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
632 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
633
634 let list_col_idx = child_columns
635 .iter()
636 .position(|c| c == &ha.list_column)
637 .ok_or_else(|| {
638 Error::Internal(format!(
639 "HorizontalAggregate list column '{}' not found in {:?}",
640 ha.list_column, child_columns
641 ))
642 })?;
643
644 let entity_kind = match ha.entity_kind {
645 LogicalEntityKind::Edge => EntityKind::Edge,
646 LogicalEntityKind::Node => EntityKind::Node,
647 };
648
649 let function = convert_aggregate_function(ha.function);
650 let input_column_count = child_columns.len();
651
652 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
653 child_op,
654 list_col_idx,
655 entity_kind,
656 function,
657 ha.property.clone(),
658 Arc::clone(&self.store) as Arc<dyn GraphStore>,
659 input_column_count,
660 ));
661
662 let mut columns = child_columns;
663 columns.push(ha.alias.clone());
664 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
666
667 Ok((operator, columns))
668 }
669
670 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
672 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
673 let key_idx = child_columns
674 .iter()
675 .position(|c| c == &mc.key_var)
676 .ok_or_else(|| {
677 Error::Internal(format!(
678 "MapCollect key '{}' not in columns {:?}",
679 mc.key_var, child_columns
680 ))
681 })?;
682 let value_idx = child_columns
683 .iter()
684 .position(|c| c == &mc.value_var)
685 .ok_or_else(|| {
686 Error::Internal(format!(
687 "MapCollect value '{}' not in columns {:?}",
688 mc.value_var, child_columns
689 ))
690 })?;
691 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
692 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
693 Ok((operator, vec![mc.alias.clone()]))
694 }
695}
696
697#[cfg(feature = "algos")]
699struct StaticResultOperator {
700 rows: Vec<Vec<Value>>,
701 column_indices: Vec<usize>,
702 row_index: usize,
703}
704
705#[cfg(feature = "algos")]
706impl Operator for StaticResultOperator {
707 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
708 use grafeo_core::execution::DataChunk;
709
710 if self.row_index >= self.rows.len() {
711 return Ok(None);
712 }
713
714 let remaining = self.rows.len() - self.row_index;
715 let chunk_rows = remaining.min(1024);
716 let col_count = self.column_indices.len();
717
718 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
719 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
720
721 for row_offset in 0..chunk_rows {
722 let row = &self.rows[self.row_index + row_offset];
723 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
724 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
725 if let Some(col) = chunk.column_mut(col_idx) {
726 col.push_value(value);
727 }
728 }
729 }
730 chunk.set_count(chunk_rows);
731
732 self.row_index += chunk_rows;
733 Ok(Some(chunk))
734 }
735
736 fn reset(&mut self) {
737 self.row_index = 0;
738 }
739
740 fn name(&self) -> &'static str {
741 "StaticResult"
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748 use crate::query::plan::{
749 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
750 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
751 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
752 SkipOp as LogicalSkipOp, SortKey, SortOp,
753 };
754 use grafeo_common::types::Value;
755 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
756 use grafeo_core::graph::GraphStoreMut;
757 use grafeo_core::graph::lpg::LpgStore;
758
759 fn create_test_store() -> Arc<dyn GraphStoreMut> {
760 let store = Arc::new(LpgStore::new().unwrap());
761 store.create_node(&["Person"]);
762 store.create_node(&["Person"]);
763 store.create_node(&["Company"]);
764 store
765 }
766
767 #[test]
770 fn test_plan_simple_scan() {
771 let store = create_test_store();
772 let planner = Planner::new(store);
773
774 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
776 items: vec![ReturnItem {
777 expression: LogicalExpression::Variable("n".to_string()),
778 alias: None,
779 }],
780 distinct: false,
781 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
782 variable: "n".to_string(),
783 label: Some("Person".to_string()),
784 input: None,
785 })),
786 }));
787
788 let physical = planner.plan(&logical).unwrap();
789 assert_eq!(physical.columns(), &["n"]);
790 }
791
792 #[test]
793 fn test_plan_scan_without_label() {
794 let store = create_test_store();
795 let planner = Planner::new(store);
796
797 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
799 items: vec![ReturnItem {
800 expression: LogicalExpression::Variable("n".to_string()),
801 alias: None,
802 }],
803 distinct: false,
804 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
805 variable: "n".to_string(),
806 label: None,
807 input: None,
808 })),
809 }));
810
811 let physical = planner.plan(&logical).unwrap();
812 assert_eq!(physical.columns(), &["n"]);
813 }
814
815 #[test]
816 fn test_plan_return_with_alias() {
817 let store = create_test_store();
818 let planner = Planner::new(store);
819
820 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
822 items: vec![ReturnItem {
823 expression: LogicalExpression::Variable("n".to_string()),
824 alias: Some("person".to_string()),
825 }],
826 distinct: false,
827 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
828 variable: "n".to_string(),
829 label: Some("Person".to_string()),
830 input: None,
831 })),
832 }));
833
834 let physical = planner.plan(&logical).unwrap();
835 assert_eq!(physical.columns(), &["person"]);
836 }
837
838 #[test]
839 fn test_plan_return_property() {
840 let store = create_test_store();
841 let planner = Planner::new(store);
842
843 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
845 items: vec![ReturnItem {
846 expression: LogicalExpression::Property {
847 variable: "n".to_string(),
848 property: "name".to_string(),
849 },
850 alias: None,
851 }],
852 distinct: false,
853 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
854 variable: "n".to_string(),
855 label: Some("Person".to_string()),
856 input: None,
857 })),
858 }));
859
860 let physical = planner.plan(&logical).unwrap();
861 assert_eq!(physical.columns(), &["n.name"]);
862 }
863
864 #[test]
865 fn test_plan_return_literal() {
866 let store = create_test_store();
867 let planner = Planner::new(store);
868
869 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
871 items: vec![ReturnItem {
872 expression: LogicalExpression::Literal(Value::Int64(42)),
873 alias: Some("answer".to_string()),
874 }],
875 distinct: false,
876 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
877 variable: "n".to_string(),
878 label: None,
879 input: None,
880 })),
881 }));
882
883 let physical = planner.plan(&logical).unwrap();
884 assert_eq!(physical.columns(), &["answer"]);
885 }
886
887 #[test]
890 fn test_plan_filter_equality() {
891 let store = create_test_store();
892 let planner = Planner::new(store);
893
894 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
896 items: vec![ReturnItem {
897 expression: LogicalExpression::Variable("n".to_string()),
898 alias: None,
899 }],
900 distinct: false,
901 input: Box::new(LogicalOperator::Filter(FilterOp {
902 predicate: LogicalExpression::Binary {
903 left: Box::new(LogicalExpression::Property {
904 variable: "n".to_string(),
905 property: "age".to_string(),
906 }),
907 op: BinaryOp::Eq,
908 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
909 },
910 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
911 variable: "n".to_string(),
912 label: Some("Person".to_string()),
913 input: None,
914 })),
915 pushdown_hint: None,
916 })),
917 }));
918
919 let physical = planner.plan(&logical).unwrap();
920 assert_eq!(physical.columns(), &["n"]);
921 }
922
923 #[test]
924 fn test_plan_filter_compound_and() {
925 let store = create_test_store();
926 let planner = Planner::new(store);
927
928 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
930 items: vec![ReturnItem {
931 expression: LogicalExpression::Variable("n".to_string()),
932 alias: None,
933 }],
934 distinct: false,
935 input: Box::new(LogicalOperator::Filter(FilterOp {
936 predicate: LogicalExpression::Binary {
937 left: Box::new(LogicalExpression::Binary {
938 left: Box::new(LogicalExpression::Property {
939 variable: "n".to_string(),
940 property: "age".to_string(),
941 }),
942 op: BinaryOp::Gt,
943 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
944 }),
945 op: BinaryOp::And,
946 right: Box::new(LogicalExpression::Binary {
947 left: Box::new(LogicalExpression::Property {
948 variable: "n".to_string(),
949 property: "age".to_string(),
950 }),
951 op: BinaryOp::Lt,
952 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
953 }),
954 },
955 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
956 variable: "n".to_string(),
957 label: None,
958 input: None,
959 })),
960 pushdown_hint: None,
961 })),
962 }));
963
964 let physical = planner.plan(&logical).unwrap();
965 assert_eq!(physical.columns(), &["n"]);
966 }
967
968 #[test]
969 fn test_plan_filter_unary_not() {
970 let store = create_test_store();
971 let planner = Planner::new(store);
972
973 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
975 items: vec![ReturnItem {
976 expression: LogicalExpression::Variable("n".to_string()),
977 alias: None,
978 }],
979 distinct: false,
980 input: Box::new(LogicalOperator::Filter(FilterOp {
981 predicate: LogicalExpression::Unary {
982 op: UnaryOp::Not,
983 operand: Box::new(LogicalExpression::Property {
984 variable: "n".to_string(),
985 property: "active".to_string(),
986 }),
987 },
988 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
989 variable: "n".to_string(),
990 label: None,
991 input: None,
992 })),
993 pushdown_hint: None,
994 })),
995 }));
996
997 let physical = planner.plan(&logical).unwrap();
998 assert_eq!(physical.columns(), &["n"]);
999 }
1000
1001 #[test]
1002 fn test_plan_filter_is_null() {
1003 let store = create_test_store();
1004 let planner = Planner::new(store);
1005
1006 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1008 items: vec![ReturnItem {
1009 expression: LogicalExpression::Variable("n".to_string()),
1010 alias: None,
1011 }],
1012 distinct: false,
1013 input: Box::new(LogicalOperator::Filter(FilterOp {
1014 predicate: LogicalExpression::Unary {
1015 op: UnaryOp::IsNull,
1016 operand: Box::new(LogicalExpression::Property {
1017 variable: "n".to_string(),
1018 property: "email".to_string(),
1019 }),
1020 },
1021 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1022 variable: "n".to_string(),
1023 label: None,
1024 input: None,
1025 })),
1026 pushdown_hint: None,
1027 })),
1028 }));
1029
1030 let physical = planner.plan(&logical).unwrap();
1031 assert_eq!(physical.columns(), &["n"]);
1032 }
1033
1034 #[test]
1035 fn test_plan_filter_function_call() {
1036 let store = create_test_store();
1037 let planner = Planner::new(store);
1038
1039 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1041 items: vec![ReturnItem {
1042 expression: LogicalExpression::Variable("n".to_string()),
1043 alias: None,
1044 }],
1045 distinct: false,
1046 input: Box::new(LogicalOperator::Filter(FilterOp {
1047 predicate: LogicalExpression::Binary {
1048 left: Box::new(LogicalExpression::FunctionCall {
1049 name: "size".to_string(),
1050 args: vec![LogicalExpression::Property {
1051 variable: "n".to_string(),
1052 property: "friends".to_string(),
1053 }],
1054 distinct: false,
1055 }),
1056 op: BinaryOp::Gt,
1057 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1058 },
1059 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1060 variable: "n".to_string(),
1061 label: None,
1062 input: None,
1063 })),
1064 pushdown_hint: None,
1065 })),
1066 }));
1067
1068 let physical = planner.plan(&logical).unwrap();
1069 assert_eq!(physical.columns(), &["n"]);
1070 }
1071
1072 #[test]
1075 fn test_plan_expand_outgoing() {
1076 let store = create_test_store();
1077 let planner = Planner::new(store);
1078
1079 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1081 items: vec![
1082 ReturnItem {
1083 expression: LogicalExpression::Variable("a".to_string()),
1084 alias: None,
1085 },
1086 ReturnItem {
1087 expression: LogicalExpression::Variable("b".to_string()),
1088 alias: None,
1089 },
1090 ],
1091 distinct: false,
1092 input: Box::new(LogicalOperator::Expand(ExpandOp {
1093 from_variable: "a".to_string(),
1094 to_variable: "b".to_string(),
1095 edge_variable: None,
1096 direction: ExpandDirection::Outgoing,
1097 edge_types: vec!["KNOWS".to_string()],
1098 min_hops: 1,
1099 max_hops: Some(1),
1100 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1101 variable: "a".to_string(),
1102 label: Some("Person".to_string()),
1103 input: None,
1104 })),
1105 path_alias: None,
1106 path_mode: PathMode::Walk,
1107 })),
1108 }));
1109
1110 let physical = planner.plan(&logical).unwrap();
1111 assert!(physical.columns().contains(&"a".to_string()));
1113 assert!(physical.columns().contains(&"b".to_string()));
1114 }
1115
1116 #[test]
1117 fn test_plan_expand_with_edge_variable() {
1118 let store = create_test_store();
1119 let planner = Planner::new(store);
1120
1121 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1123 items: vec![
1124 ReturnItem {
1125 expression: LogicalExpression::Variable("a".to_string()),
1126 alias: None,
1127 },
1128 ReturnItem {
1129 expression: LogicalExpression::Variable("r".to_string()),
1130 alias: None,
1131 },
1132 ReturnItem {
1133 expression: LogicalExpression::Variable("b".to_string()),
1134 alias: None,
1135 },
1136 ],
1137 distinct: false,
1138 input: Box::new(LogicalOperator::Expand(ExpandOp {
1139 from_variable: "a".to_string(),
1140 to_variable: "b".to_string(),
1141 edge_variable: Some("r".to_string()),
1142 direction: ExpandDirection::Outgoing,
1143 edge_types: vec!["KNOWS".to_string()],
1144 min_hops: 1,
1145 max_hops: Some(1),
1146 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1147 variable: "a".to_string(),
1148 label: None,
1149 input: None,
1150 })),
1151 path_alias: None,
1152 path_mode: PathMode::Walk,
1153 })),
1154 }));
1155
1156 let physical = planner.plan(&logical).unwrap();
1157 assert!(physical.columns().contains(&"a".to_string()));
1158 assert!(physical.columns().contains(&"r".to_string()));
1159 assert!(physical.columns().contains(&"b".to_string()));
1160 }
1161
1162 #[test]
1165 fn test_plan_limit() {
1166 let store = create_test_store();
1167 let planner = Planner::new(store);
1168
1169 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1171 items: vec![ReturnItem {
1172 expression: LogicalExpression::Variable("n".to_string()),
1173 alias: None,
1174 }],
1175 distinct: false,
1176 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1177 count: 10.into(),
1178 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1179 variable: "n".to_string(),
1180 label: None,
1181 input: None,
1182 })),
1183 })),
1184 }));
1185
1186 let physical = planner.plan(&logical).unwrap();
1187 assert_eq!(physical.columns(), &["n"]);
1188 }
1189
1190 #[test]
1191 fn test_plan_skip() {
1192 let store = create_test_store();
1193 let planner = Planner::new(store);
1194
1195 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1197 items: vec![ReturnItem {
1198 expression: LogicalExpression::Variable("n".to_string()),
1199 alias: None,
1200 }],
1201 distinct: false,
1202 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1203 count: 5.into(),
1204 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1205 variable: "n".to_string(),
1206 label: None,
1207 input: None,
1208 })),
1209 })),
1210 }));
1211
1212 let physical = planner.plan(&logical).unwrap();
1213 assert_eq!(physical.columns(), &["n"]);
1214 }
1215
1216 #[test]
1217 fn test_plan_sort() {
1218 let store = create_test_store();
1219 let planner = Planner::new(store);
1220
1221 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1223 items: vec![ReturnItem {
1224 expression: LogicalExpression::Variable("n".to_string()),
1225 alias: None,
1226 }],
1227 distinct: false,
1228 input: Box::new(LogicalOperator::Sort(SortOp {
1229 keys: vec![SortKey {
1230 expression: LogicalExpression::Variable("n".to_string()),
1231 order: SortOrder::Ascending,
1232 nulls: None,
1233 }],
1234 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1235 variable: "n".to_string(),
1236 label: None,
1237 input: None,
1238 })),
1239 })),
1240 }));
1241
1242 let physical = planner.plan(&logical).unwrap();
1243 assert_eq!(physical.columns(), &["n"]);
1244 }
1245
1246 #[test]
1247 fn test_plan_sort_descending() {
1248 let store = create_test_store();
1249 let planner = Planner::new(store);
1250
1251 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1253 items: vec![ReturnItem {
1254 expression: LogicalExpression::Variable("n".to_string()),
1255 alias: None,
1256 }],
1257 distinct: false,
1258 input: Box::new(LogicalOperator::Sort(SortOp {
1259 keys: vec![SortKey {
1260 expression: LogicalExpression::Variable("n".to_string()),
1261 order: SortOrder::Descending,
1262 nulls: None,
1263 }],
1264 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1265 variable: "n".to_string(),
1266 label: None,
1267 input: None,
1268 })),
1269 })),
1270 }));
1271
1272 let physical = planner.plan(&logical).unwrap();
1273 assert_eq!(physical.columns(), &["n"]);
1274 }
1275
1276 #[test]
1277 fn test_plan_distinct() {
1278 let store = create_test_store();
1279 let planner = Planner::new(store);
1280
1281 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1283 items: vec![ReturnItem {
1284 expression: LogicalExpression::Variable("n".to_string()),
1285 alias: None,
1286 }],
1287 distinct: false,
1288 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1289 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1290 variable: "n".to_string(),
1291 label: None,
1292 input: None,
1293 })),
1294 columns: None,
1295 })),
1296 }));
1297
1298 let physical = planner.plan(&logical).unwrap();
1299 assert_eq!(physical.columns(), &["n"]);
1300 }
1301
1302 #[test]
1303 fn test_plan_distinct_with_columns() {
1304 let store = create_test_store();
1305 let planner = Planner::new(store);
1306
1307 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1309 items: vec![ReturnItem {
1310 expression: LogicalExpression::Variable("n".to_string()),
1311 alias: None,
1312 }],
1313 distinct: false,
1314 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1315 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1316 variable: "n".to_string(),
1317 label: None,
1318 input: None,
1319 })),
1320 columns: Some(vec!["n".to_string()]),
1321 })),
1322 }));
1323
1324 let physical = planner.plan(&logical).unwrap();
1325 assert_eq!(physical.columns(), &["n"]);
1326 }
1327
1328 #[test]
1329 fn test_plan_distinct_with_nonexistent_columns() {
1330 let store = create_test_store();
1331 let planner = Planner::new(store);
1332
1333 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1336 items: vec![ReturnItem {
1337 expression: LogicalExpression::Variable("n".to_string()),
1338 alias: None,
1339 }],
1340 distinct: false,
1341 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1342 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1343 variable: "n".to_string(),
1344 label: None,
1345 input: None,
1346 })),
1347 columns: Some(vec!["nonexistent".to_string()]),
1348 })),
1349 }));
1350
1351 let physical = planner.plan(&logical).unwrap();
1352 assert_eq!(physical.columns(), &["n"]);
1353 }
1354
1355 #[test]
1358 fn test_plan_aggregate_count() {
1359 let store = create_test_store();
1360 let planner = Planner::new(store);
1361
1362 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1364 items: vec![ReturnItem {
1365 expression: LogicalExpression::Variable("cnt".to_string()),
1366 alias: None,
1367 }],
1368 distinct: false,
1369 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1370 group_by: vec![],
1371 aggregates: vec![LogicalAggregateExpr {
1372 function: LogicalAggregateFunction::Count,
1373 expression: Some(LogicalExpression::Variable("n".to_string())),
1374 expression2: None,
1375 distinct: false,
1376 alias: Some("cnt".to_string()),
1377 percentile: None,
1378 separator: None,
1379 }],
1380 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1381 variable: "n".to_string(),
1382 label: None,
1383 input: None,
1384 })),
1385 having: None,
1386 })),
1387 }));
1388
1389 let physical = planner.plan(&logical).unwrap();
1390 assert!(physical.columns().contains(&"cnt".to_string()));
1391 }
1392
1393 #[test]
1394 fn test_plan_aggregate_with_group_by() {
1395 let store = create_test_store();
1396 let planner = Planner::new(store);
1397
1398 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1400 group_by: vec![LogicalExpression::Property {
1401 variable: "n".to_string(),
1402 property: "city".to_string(),
1403 }],
1404 aggregates: vec![LogicalAggregateExpr {
1405 function: LogicalAggregateFunction::Count,
1406 expression: Some(LogicalExpression::Variable("n".to_string())),
1407 expression2: None,
1408 distinct: false,
1409 alias: Some("cnt".to_string()),
1410 percentile: None,
1411 separator: None,
1412 }],
1413 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1414 variable: "n".to_string(),
1415 label: Some("Person".to_string()),
1416 input: None,
1417 })),
1418 having: None,
1419 }));
1420
1421 let physical = planner.plan(&logical).unwrap();
1422 assert_eq!(physical.columns().len(), 2);
1423 }
1424
1425 #[test]
1426 fn test_plan_aggregate_sum() {
1427 let store = create_test_store();
1428 let planner = Planner::new(store);
1429
1430 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1432 group_by: vec![],
1433 aggregates: vec![LogicalAggregateExpr {
1434 function: LogicalAggregateFunction::Sum,
1435 expression: Some(LogicalExpression::Property {
1436 variable: "n".to_string(),
1437 property: "value".to_string(),
1438 }),
1439 expression2: None,
1440 distinct: false,
1441 alias: Some("total".to_string()),
1442 percentile: None,
1443 separator: None,
1444 }],
1445 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1446 variable: "n".to_string(),
1447 label: None,
1448 input: None,
1449 })),
1450 having: None,
1451 }));
1452
1453 let physical = planner.plan(&logical).unwrap();
1454 assert!(physical.columns().contains(&"total".to_string()));
1455 }
1456
1457 #[test]
1458 fn test_plan_aggregate_avg() {
1459 let store = create_test_store();
1460 let planner = Planner::new(store);
1461
1462 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1464 group_by: vec![],
1465 aggregates: vec![LogicalAggregateExpr {
1466 function: LogicalAggregateFunction::Avg,
1467 expression: Some(LogicalExpression::Property {
1468 variable: "n".to_string(),
1469 property: "score".to_string(),
1470 }),
1471 expression2: None,
1472 distinct: false,
1473 alias: Some("average".to_string()),
1474 percentile: None,
1475 separator: None,
1476 }],
1477 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1478 variable: "n".to_string(),
1479 label: None,
1480 input: None,
1481 })),
1482 having: None,
1483 }));
1484
1485 let physical = planner.plan(&logical).unwrap();
1486 assert!(physical.columns().contains(&"average".to_string()));
1487 }
1488
1489 #[test]
1490 fn test_plan_aggregate_min_max() {
1491 let store = create_test_store();
1492 let planner = Planner::new(store);
1493
1494 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1496 group_by: vec![],
1497 aggregates: vec![
1498 LogicalAggregateExpr {
1499 function: LogicalAggregateFunction::Min,
1500 expression: Some(LogicalExpression::Property {
1501 variable: "n".to_string(),
1502 property: "age".to_string(),
1503 }),
1504 expression2: None,
1505 distinct: false,
1506 alias: Some("youngest".to_string()),
1507 percentile: None,
1508 separator: None,
1509 },
1510 LogicalAggregateExpr {
1511 function: LogicalAggregateFunction::Max,
1512 expression: Some(LogicalExpression::Property {
1513 variable: "n".to_string(),
1514 property: "age".to_string(),
1515 }),
1516 expression2: None,
1517 distinct: false,
1518 alias: Some("oldest".to_string()),
1519 percentile: None,
1520 separator: None,
1521 },
1522 ],
1523 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1524 variable: "n".to_string(),
1525 label: None,
1526 input: None,
1527 })),
1528 having: None,
1529 }));
1530
1531 let physical = planner.plan(&logical).unwrap();
1532 assert!(physical.columns().contains(&"youngest".to_string()));
1533 assert!(physical.columns().contains(&"oldest".to_string()));
1534 }
1535
1536 #[test]
1539 fn test_plan_inner_join() {
1540 let store = create_test_store();
1541 let planner = Planner::new(store);
1542
1543 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1545 items: vec![
1546 ReturnItem {
1547 expression: LogicalExpression::Variable("a".to_string()),
1548 alias: None,
1549 },
1550 ReturnItem {
1551 expression: LogicalExpression::Variable("b".to_string()),
1552 alias: None,
1553 },
1554 ],
1555 distinct: false,
1556 input: Box::new(LogicalOperator::Join(JoinOp {
1557 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1558 variable: "a".to_string(),
1559 label: Some("Person".to_string()),
1560 input: None,
1561 })),
1562 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1563 variable: "b".to_string(),
1564 label: Some("Company".to_string()),
1565 input: None,
1566 })),
1567 join_type: JoinType::Inner,
1568 conditions: vec![JoinCondition {
1569 left: LogicalExpression::Variable("a".to_string()),
1570 right: LogicalExpression::Variable("b".to_string()),
1571 }],
1572 })),
1573 }));
1574
1575 let physical = planner.plan(&logical).unwrap();
1576 assert!(physical.columns().contains(&"a".to_string()));
1577 assert!(physical.columns().contains(&"b".to_string()));
1578 }
1579
1580 #[test]
1581 fn test_plan_cross_join() {
1582 let store = create_test_store();
1583 let planner = Planner::new(store);
1584
1585 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1587 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1588 variable: "a".to_string(),
1589 label: None,
1590 input: None,
1591 })),
1592 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1593 variable: "b".to_string(),
1594 label: None,
1595 input: None,
1596 })),
1597 join_type: JoinType::Cross,
1598 conditions: vec![],
1599 }));
1600
1601 let physical = planner.plan(&logical).unwrap();
1602 assert_eq!(physical.columns().len(), 2);
1603 }
1604
1605 #[test]
1606 fn test_plan_left_join() {
1607 let store = create_test_store();
1608 let planner = Planner::new(store);
1609
1610 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1611 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1612 variable: "a".to_string(),
1613 label: None,
1614 input: None,
1615 })),
1616 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1617 variable: "b".to_string(),
1618 label: None,
1619 input: None,
1620 })),
1621 join_type: JoinType::Left,
1622 conditions: vec![],
1623 }));
1624
1625 let physical = planner.plan(&logical).unwrap();
1626 assert_eq!(physical.columns().len(), 2);
1627 }
1628
1629 #[test]
1632 fn test_plan_create_node() {
1633 let store = create_test_store();
1634 let planner = Planner::new(store);
1635
1636 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1638 variable: "n".to_string(),
1639 labels: vec!["Person".to_string()],
1640 properties: vec![(
1641 "name".to_string(),
1642 LogicalExpression::Literal(Value::String("Alix".into())),
1643 )],
1644 input: None,
1645 }));
1646
1647 let physical = planner.plan(&logical).unwrap();
1648 assert!(physical.columns().contains(&"n".to_string()));
1649 }
1650
1651 #[test]
1652 fn test_plan_create_edge() {
1653 let store = create_test_store();
1654 let planner = Planner::new(store);
1655
1656 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1658 variable: Some("r".to_string()),
1659 from_variable: "a".to_string(),
1660 to_variable: "b".to_string(),
1661 edge_type: "KNOWS".to_string(),
1662 properties: vec![],
1663 input: Box::new(LogicalOperator::Join(JoinOp {
1664 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1665 variable: "a".to_string(),
1666 label: None,
1667 input: None,
1668 })),
1669 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1670 variable: "b".to_string(),
1671 label: None,
1672 input: None,
1673 })),
1674 join_type: JoinType::Cross,
1675 conditions: vec![],
1676 })),
1677 }));
1678
1679 let physical = planner.plan(&logical).unwrap();
1680 assert!(physical.columns().contains(&"r".to_string()));
1681 }
1682
1683 #[test]
1684 fn test_plan_delete_node() {
1685 let store = create_test_store();
1686 let planner = Planner::new(store);
1687
1688 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1690 variable: "n".to_string(),
1691 detach: false,
1692 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1693 variable: "n".to_string(),
1694 label: None,
1695 input: None,
1696 })),
1697 }));
1698
1699 let physical = planner.plan(&logical).unwrap();
1700 assert!(physical.columns().contains(&"n".to_string()));
1701 }
1702
1703 #[test]
1706 fn test_plan_empty_errors() {
1707 let store = create_test_store();
1708 let planner = Planner::new(store);
1709
1710 let logical = LogicalPlan::new(LogicalOperator::Empty);
1711 let result = planner.plan(&logical);
1712 assert!(result.is_err());
1713 }
1714
1715 #[test]
1716 fn test_plan_missing_variable_in_return() {
1717 let store = create_test_store();
1718 let planner = Planner::new(store);
1719
1720 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1722 items: vec![ReturnItem {
1723 expression: LogicalExpression::Variable("missing".to_string()),
1724 alias: None,
1725 }],
1726 distinct: false,
1727 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1728 variable: "n".to_string(),
1729 label: None,
1730 input: None,
1731 })),
1732 }));
1733
1734 let result = planner.plan(&logical);
1735 assert!(result.is_err());
1736 }
1737
1738 #[test]
1741 fn test_convert_binary_ops() {
1742 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1743 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1744 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1745 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1746 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1747 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1748 assert!(convert_binary_op(BinaryOp::And).is_ok());
1749 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1750 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1751 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1752 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1753 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1754 }
1755
1756 #[test]
1757 fn test_convert_unary_ops() {
1758 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1759 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1760 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1761 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1762 }
1763
1764 #[test]
1765 fn test_convert_aggregate_functions() {
1766 assert!(matches!(
1767 convert_aggregate_function(LogicalAggregateFunction::Count),
1768 PhysicalAggregateFunction::Count
1769 ));
1770 assert!(matches!(
1771 convert_aggregate_function(LogicalAggregateFunction::Sum),
1772 PhysicalAggregateFunction::Sum
1773 ));
1774 assert!(matches!(
1775 convert_aggregate_function(LogicalAggregateFunction::Avg),
1776 PhysicalAggregateFunction::Avg
1777 ));
1778 assert!(matches!(
1779 convert_aggregate_function(LogicalAggregateFunction::Min),
1780 PhysicalAggregateFunction::Min
1781 ));
1782 assert!(matches!(
1783 convert_aggregate_function(LogicalAggregateFunction::Max),
1784 PhysicalAggregateFunction::Max
1785 ));
1786 }
1787
1788 #[test]
1789 fn test_planner_accessors() {
1790 let store = create_test_store();
1791 let planner = Planner::new(Arc::clone(&store));
1792
1793 assert!(planner.transaction_id().is_none());
1794 assert!(planner.transaction_manager().is_none());
1795 let _ = planner.viewing_epoch(); }
1797
1798 #[test]
1799 fn test_physical_plan_accessors() {
1800 let store = create_test_store();
1801 let planner = Planner::new(store);
1802
1803 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1804 variable: "n".to_string(),
1805 label: None,
1806 input: None,
1807 }));
1808
1809 let physical = planner.plan(&logical).unwrap();
1810 assert_eq!(physical.columns(), &["n"]);
1811
1812 let _ = physical.into_operator();
1814 }
1815
1816 #[test]
1819 fn test_plan_adaptive_with_scan() {
1820 let store = create_test_store();
1821 let planner = Planner::new(store);
1822
1823 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1825 items: vec![ReturnItem {
1826 expression: LogicalExpression::Variable("n".to_string()),
1827 alias: None,
1828 }],
1829 distinct: false,
1830 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1831 variable: "n".to_string(),
1832 label: Some("Person".to_string()),
1833 input: None,
1834 })),
1835 }));
1836
1837 let physical = planner.plan_adaptive(&logical).unwrap();
1838 assert_eq!(physical.columns(), &["n"]);
1839 assert!(physical.adaptive_context.is_some());
1841 }
1842
1843 #[test]
1844 fn test_plan_adaptive_with_filter() {
1845 let store = create_test_store();
1846 let planner = Planner::new(store);
1847
1848 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1850 items: vec![ReturnItem {
1851 expression: LogicalExpression::Variable("n".to_string()),
1852 alias: None,
1853 }],
1854 distinct: false,
1855 input: Box::new(LogicalOperator::Filter(FilterOp {
1856 predicate: LogicalExpression::Binary {
1857 left: Box::new(LogicalExpression::Property {
1858 variable: "n".to_string(),
1859 property: "age".to_string(),
1860 }),
1861 op: BinaryOp::Gt,
1862 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1863 },
1864 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1865 variable: "n".to_string(),
1866 label: None,
1867 input: None,
1868 })),
1869 pushdown_hint: None,
1870 })),
1871 }));
1872
1873 let physical = planner.plan_adaptive(&logical).unwrap();
1874 assert!(physical.adaptive_context.is_some());
1875 }
1876
1877 #[test]
1878 fn test_plan_adaptive_with_expand() {
1879 let store = create_test_store();
1880 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1881
1882 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1884 items: vec![
1885 ReturnItem {
1886 expression: LogicalExpression::Variable("a".to_string()),
1887 alias: None,
1888 },
1889 ReturnItem {
1890 expression: LogicalExpression::Variable("b".to_string()),
1891 alias: None,
1892 },
1893 ],
1894 distinct: false,
1895 input: Box::new(LogicalOperator::Expand(ExpandOp {
1896 from_variable: "a".to_string(),
1897 to_variable: "b".to_string(),
1898 edge_variable: None,
1899 direction: ExpandDirection::Outgoing,
1900 edge_types: vec!["KNOWS".to_string()],
1901 min_hops: 1,
1902 max_hops: Some(1),
1903 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1904 variable: "a".to_string(),
1905 label: None,
1906 input: None,
1907 })),
1908 path_alias: None,
1909 path_mode: PathMode::Walk,
1910 })),
1911 }));
1912
1913 let physical = planner.plan_adaptive(&logical).unwrap();
1914 assert!(physical.adaptive_context.is_some());
1915 }
1916
1917 #[test]
1918 fn test_plan_adaptive_with_join() {
1919 let store = create_test_store();
1920 let planner = Planner::new(store);
1921
1922 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1923 items: vec![
1924 ReturnItem {
1925 expression: LogicalExpression::Variable("a".to_string()),
1926 alias: None,
1927 },
1928 ReturnItem {
1929 expression: LogicalExpression::Variable("b".to_string()),
1930 alias: None,
1931 },
1932 ],
1933 distinct: false,
1934 input: Box::new(LogicalOperator::Join(JoinOp {
1935 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1936 variable: "a".to_string(),
1937 label: None,
1938 input: None,
1939 })),
1940 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1941 variable: "b".to_string(),
1942 label: None,
1943 input: None,
1944 })),
1945 join_type: JoinType::Cross,
1946 conditions: vec![],
1947 })),
1948 }));
1949
1950 let physical = planner.plan_adaptive(&logical).unwrap();
1951 assert!(physical.adaptive_context.is_some());
1952 }
1953
1954 #[test]
1955 fn test_plan_adaptive_with_aggregate() {
1956 let store = create_test_store();
1957 let planner = Planner::new(store);
1958
1959 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1960 group_by: vec![],
1961 aggregates: vec![LogicalAggregateExpr {
1962 function: LogicalAggregateFunction::Count,
1963 expression: Some(LogicalExpression::Variable("n".to_string())),
1964 expression2: None,
1965 distinct: false,
1966 alias: Some("cnt".to_string()),
1967 percentile: None,
1968 separator: None,
1969 }],
1970 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1971 variable: "n".to_string(),
1972 label: None,
1973 input: None,
1974 })),
1975 having: None,
1976 }));
1977
1978 let physical = planner.plan_adaptive(&logical).unwrap();
1979 assert!(physical.adaptive_context.is_some());
1980 }
1981
1982 #[test]
1983 fn test_plan_adaptive_with_distinct() {
1984 let store = create_test_store();
1985 let planner = Planner::new(store);
1986
1987 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1988 items: vec![ReturnItem {
1989 expression: LogicalExpression::Variable("n".to_string()),
1990 alias: None,
1991 }],
1992 distinct: false,
1993 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1994 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1995 variable: "n".to_string(),
1996 label: None,
1997 input: None,
1998 })),
1999 columns: None,
2000 })),
2001 }));
2002
2003 let physical = planner.plan_adaptive(&logical).unwrap();
2004 assert!(physical.adaptive_context.is_some());
2005 }
2006
2007 #[test]
2008 fn test_plan_adaptive_with_limit() {
2009 let store = create_test_store();
2010 let planner = Planner::new(store);
2011
2012 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2013 items: vec![ReturnItem {
2014 expression: LogicalExpression::Variable("n".to_string()),
2015 alias: None,
2016 }],
2017 distinct: false,
2018 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2019 count: 10.into(),
2020 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2021 variable: "n".to_string(),
2022 label: None,
2023 input: None,
2024 })),
2025 })),
2026 }));
2027
2028 let physical = planner.plan_adaptive(&logical).unwrap();
2029 assert!(physical.adaptive_context.is_some());
2030 }
2031
2032 #[test]
2033 fn test_plan_adaptive_with_skip() {
2034 let store = create_test_store();
2035 let planner = Planner::new(store);
2036
2037 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2038 items: vec![ReturnItem {
2039 expression: LogicalExpression::Variable("n".to_string()),
2040 alias: None,
2041 }],
2042 distinct: false,
2043 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2044 count: 5.into(),
2045 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2046 variable: "n".to_string(),
2047 label: None,
2048 input: None,
2049 })),
2050 })),
2051 }));
2052
2053 let physical = planner.plan_adaptive(&logical).unwrap();
2054 assert!(physical.adaptive_context.is_some());
2055 }
2056
2057 #[test]
2058 fn test_plan_adaptive_with_sort() {
2059 let store = create_test_store();
2060 let planner = Planner::new(store);
2061
2062 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2063 items: vec![ReturnItem {
2064 expression: LogicalExpression::Variable("n".to_string()),
2065 alias: None,
2066 }],
2067 distinct: false,
2068 input: Box::new(LogicalOperator::Sort(SortOp {
2069 keys: vec![SortKey {
2070 expression: LogicalExpression::Variable("n".to_string()),
2071 order: SortOrder::Ascending,
2072 nulls: None,
2073 }],
2074 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2075 variable: "n".to_string(),
2076 label: None,
2077 input: None,
2078 })),
2079 })),
2080 }));
2081
2082 let physical = planner.plan_adaptive(&logical).unwrap();
2083 assert!(physical.adaptive_context.is_some());
2084 }
2085
2086 #[test]
2087 fn test_plan_adaptive_with_union() {
2088 let store = create_test_store();
2089 let planner = Planner::new(store);
2090
2091 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2092 items: vec![ReturnItem {
2093 expression: LogicalExpression::Variable("n".to_string()),
2094 alias: None,
2095 }],
2096 distinct: false,
2097 input: Box::new(LogicalOperator::Union(UnionOp {
2098 inputs: vec![
2099 LogicalOperator::NodeScan(NodeScanOp {
2100 variable: "n".to_string(),
2101 label: Some("Person".to_string()),
2102 input: None,
2103 }),
2104 LogicalOperator::NodeScan(NodeScanOp {
2105 variable: "n".to_string(),
2106 label: Some("Company".to_string()),
2107 input: None,
2108 }),
2109 ],
2110 })),
2111 }));
2112
2113 let physical = planner.plan_adaptive(&logical).unwrap();
2114 assert!(physical.adaptive_context.is_some());
2115 }
2116
2117 #[test]
2120 fn test_plan_expand_variable_length() {
2121 let store = create_test_store();
2122 let planner = Planner::new(store);
2123
2124 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2126 items: vec![
2127 ReturnItem {
2128 expression: LogicalExpression::Variable("a".to_string()),
2129 alias: None,
2130 },
2131 ReturnItem {
2132 expression: LogicalExpression::Variable("b".to_string()),
2133 alias: None,
2134 },
2135 ],
2136 distinct: false,
2137 input: Box::new(LogicalOperator::Expand(ExpandOp {
2138 from_variable: "a".to_string(),
2139 to_variable: "b".to_string(),
2140 edge_variable: None,
2141 direction: ExpandDirection::Outgoing,
2142 edge_types: vec!["KNOWS".to_string()],
2143 min_hops: 1,
2144 max_hops: Some(3),
2145 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2146 variable: "a".to_string(),
2147 label: None,
2148 input: None,
2149 })),
2150 path_alias: None,
2151 path_mode: PathMode::Walk,
2152 })),
2153 }));
2154
2155 let physical = planner.plan(&logical).unwrap();
2156 assert!(physical.columns().contains(&"a".to_string()));
2157 assert!(physical.columns().contains(&"b".to_string()));
2158 }
2159
2160 #[test]
2161 fn test_plan_expand_with_path_alias() {
2162 let store = create_test_store();
2163 let planner = Planner::new(store);
2164
2165 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2167 items: vec![
2168 ReturnItem {
2169 expression: LogicalExpression::Variable("a".to_string()),
2170 alias: None,
2171 },
2172 ReturnItem {
2173 expression: LogicalExpression::Variable("b".to_string()),
2174 alias: None,
2175 },
2176 ],
2177 distinct: false,
2178 input: Box::new(LogicalOperator::Expand(ExpandOp {
2179 from_variable: "a".to_string(),
2180 to_variable: "b".to_string(),
2181 edge_variable: None,
2182 direction: ExpandDirection::Outgoing,
2183 edge_types: vec!["KNOWS".to_string()],
2184 min_hops: 1,
2185 max_hops: Some(3),
2186 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2187 variable: "a".to_string(),
2188 label: None,
2189 input: None,
2190 })),
2191 path_alias: Some("p".to_string()),
2192 path_mode: PathMode::Walk,
2193 })),
2194 }));
2195
2196 let physical = planner.plan(&logical).unwrap();
2197 assert!(physical.columns().contains(&"a".to_string()));
2199 assert!(physical.columns().contains(&"b".to_string()));
2200 }
2201
2202 #[test]
2203 fn test_plan_expand_incoming() {
2204 let store = create_test_store();
2205 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2206
2207 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2209 items: vec![
2210 ReturnItem {
2211 expression: LogicalExpression::Variable("a".to_string()),
2212 alias: None,
2213 },
2214 ReturnItem {
2215 expression: LogicalExpression::Variable("b".to_string()),
2216 alias: None,
2217 },
2218 ],
2219 distinct: false,
2220 input: Box::new(LogicalOperator::Expand(ExpandOp {
2221 from_variable: "a".to_string(),
2222 to_variable: "b".to_string(),
2223 edge_variable: None,
2224 direction: ExpandDirection::Incoming,
2225 edge_types: vec!["KNOWS".to_string()],
2226 min_hops: 1,
2227 max_hops: Some(1),
2228 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2229 variable: "a".to_string(),
2230 label: None,
2231 input: None,
2232 })),
2233 path_alias: None,
2234 path_mode: PathMode::Walk,
2235 })),
2236 }));
2237
2238 let physical = planner.plan(&logical).unwrap();
2239 assert!(physical.columns().contains(&"a".to_string()));
2240 assert!(physical.columns().contains(&"b".to_string()));
2241 }
2242
2243 #[test]
2244 fn test_plan_expand_both_directions() {
2245 let store = create_test_store();
2246 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2247
2248 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2250 items: vec![
2251 ReturnItem {
2252 expression: LogicalExpression::Variable("a".to_string()),
2253 alias: None,
2254 },
2255 ReturnItem {
2256 expression: LogicalExpression::Variable("b".to_string()),
2257 alias: None,
2258 },
2259 ],
2260 distinct: false,
2261 input: Box::new(LogicalOperator::Expand(ExpandOp {
2262 from_variable: "a".to_string(),
2263 to_variable: "b".to_string(),
2264 edge_variable: None,
2265 direction: ExpandDirection::Both,
2266 edge_types: vec!["KNOWS".to_string()],
2267 min_hops: 1,
2268 max_hops: Some(1),
2269 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2270 variable: "a".to_string(),
2271 label: None,
2272 input: None,
2273 })),
2274 path_alias: None,
2275 path_mode: PathMode::Walk,
2276 })),
2277 }));
2278
2279 let physical = planner.plan(&logical).unwrap();
2280 assert!(physical.columns().contains(&"a".to_string()));
2281 assert!(physical.columns().contains(&"b".to_string()));
2282 }
2283
2284 #[test]
2287 fn test_planner_with_context() {
2288 use crate::transaction::TransactionManager;
2289
2290 let store = create_test_store();
2291 let transaction_manager = Arc::new(TransactionManager::new());
2292 let transaction_id = transaction_manager.begin();
2293 let epoch = transaction_manager.current_epoch();
2294
2295 let planner = Planner::with_context(
2296 Arc::clone(&store),
2297 Arc::clone(&transaction_manager),
2298 Some(transaction_id),
2299 epoch,
2300 );
2301
2302 assert_eq!(planner.transaction_id(), Some(transaction_id));
2303 assert!(planner.transaction_manager().is_some());
2304 assert_eq!(planner.viewing_epoch(), epoch);
2305 }
2306
2307 #[test]
2308 fn test_planner_with_factorized_execution_disabled() {
2309 let store = create_test_store();
2310 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2311
2312 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2314 items: vec![
2315 ReturnItem {
2316 expression: LogicalExpression::Variable("a".to_string()),
2317 alias: None,
2318 },
2319 ReturnItem {
2320 expression: LogicalExpression::Variable("c".to_string()),
2321 alias: None,
2322 },
2323 ],
2324 distinct: false,
2325 input: Box::new(LogicalOperator::Expand(ExpandOp {
2326 from_variable: "b".to_string(),
2327 to_variable: "c".to_string(),
2328 edge_variable: None,
2329 direction: ExpandDirection::Outgoing,
2330 edge_types: vec![],
2331 min_hops: 1,
2332 max_hops: Some(1),
2333 input: Box::new(LogicalOperator::Expand(ExpandOp {
2334 from_variable: "a".to_string(),
2335 to_variable: "b".to_string(),
2336 edge_variable: None,
2337 direction: ExpandDirection::Outgoing,
2338 edge_types: vec![],
2339 min_hops: 1,
2340 max_hops: Some(1),
2341 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2342 variable: "a".to_string(),
2343 label: None,
2344 input: None,
2345 })),
2346 path_alias: None,
2347 path_mode: PathMode::Walk,
2348 })),
2349 path_alias: None,
2350 path_mode: PathMode::Walk,
2351 })),
2352 }));
2353
2354 let physical = planner.plan(&logical).unwrap();
2355 assert!(physical.columns().contains(&"a".to_string()));
2356 assert!(physical.columns().contains(&"c".to_string()));
2357 }
2358
2359 #[test]
2362 fn test_plan_sort_by_property() {
2363 let store = create_test_store();
2364 let planner = Planner::new(store);
2365
2366 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2368 items: vec![ReturnItem {
2369 expression: LogicalExpression::Variable("n".to_string()),
2370 alias: None,
2371 }],
2372 distinct: false,
2373 input: Box::new(LogicalOperator::Sort(SortOp {
2374 keys: vec![SortKey {
2375 expression: LogicalExpression::Property {
2376 variable: "n".to_string(),
2377 property: "name".to_string(),
2378 },
2379 order: SortOrder::Ascending,
2380 nulls: None,
2381 }],
2382 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2383 variable: "n".to_string(),
2384 label: None,
2385 input: None,
2386 })),
2387 })),
2388 }));
2389
2390 let physical = planner.plan(&logical).unwrap();
2391 assert!(physical.columns().contains(&"n".to_string()));
2393 }
2394
2395 #[test]
2398 fn test_plan_scan_with_input() {
2399 let store = create_test_store();
2400 let planner = Planner::new(store);
2401
2402 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2404 items: vec![
2405 ReturnItem {
2406 expression: LogicalExpression::Variable("a".to_string()),
2407 alias: None,
2408 },
2409 ReturnItem {
2410 expression: LogicalExpression::Variable("b".to_string()),
2411 alias: None,
2412 },
2413 ],
2414 distinct: false,
2415 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2416 variable: "b".to_string(),
2417 label: Some("Company".to_string()),
2418 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2419 variable: "a".to_string(),
2420 label: Some("Person".to_string()),
2421 input: None,
2422 }))),
2423 })),
2424 }));
2425
2426 let physical = planner.plan(&logical).unwrap();
2427 assert!(physical.columns().contains(&"a".to_string()));
2428 assert!(physical.columns().contains(&"b".to_string()));
2429 }
2430}