1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34 DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35 ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36 FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43 VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53 convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57struct RangeBounds<'a> {
59 min: Option<&'a Value>,
60 max: Option<&'a Value>,
61 min_inclusive: bool,
62 max_inclusive: bool,
63}
64
65pub struct Planner {
67 pub(super) store: Arc<dyn GraphStoreMut>,
69 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
71 pub(super) transaction_id: Option<TransactionId>,
73 pub(super) viewing_epoch: EpochId,
75 pub(super) anon_edge_counter: std::cell::Cell<u32>,
77 pub(super) factorized_execution: bool,
79 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
82 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
85 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
87 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
89 pub(super) correlated_param_state:
93 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
94 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
97 profiling: std::cell::Cell<bool>,
99 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
101 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
103 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
105 pub(super) read_only: bool,
109}
110
111impl Planner {
112 #[must_use]
117 pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
118 let epoch = store.current_epoch();
119 Self {
120 store,
121 transaction_manager: None,
122 transaction_id: None,
123 viewing_epoch: epoch,
124 anon_edge_counter: std::cell::Cell::new(0),
125 factorized_execution: true,
126 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
127 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
128 validator: None,
129 catalog: None,
130 correlated_param_state: std::cell::RefCell::new(None),
131 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
132 profiling: std::cell::Cell::new(false),
133 profile_entries: std::cell::RefCell::new(Vec::new()),
134 write_tracker: None,
135 session_context: grafeo_core::execution::operators::SessionContext::default(),
136 read_only: false,
137 }
138 }
139
140 #[must_use]
142 pub fn with_context(
143 store: Arc<dyn GraphStoreMut>,
144 transaction_manager: Arc<TransactionManager>,
145 transaction_id: Option<TransactionId>,
146 viewing_epoch: EpochId,
147 ) -> Self {
148 use crate::transaction::TransactionWriteTracker;
149
150 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
152 if transaction_id.is_some() {
153 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
154 &transaction_manager,
155 ))))
156 } else {
157 None
158 };
159
160 Self {
161 store,
162 transaction_manager: Some(transaction_manager),
163 transaction_id,
164 viewing_epoch,
165 anon_edge_counter: std::cell::Cell::new(0),
166 factorized_execution: true,
167 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
168 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
169 validator: None,
170 catalog: None,
171 correlated_param_state: std::cell::RefCell::new(None),
172 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
173 profiling: std::cell::Cell::new(false),
174 profile_entries: std::cell::RefCell::new(Vec::new()),
175 write_tracker,
176 session_context: grafeo_core::execution::operators::SessionContext::default(),
177 read_only: false,
178 }
179 }
180
181 #[must_use]
184 pub fn with_read_only(mut self, read_only: bool) -> Self {
185 self.read_only = read_only;
186 self
187 }
188
189 #[must_use]
191 pub fn viewing_epoch(&self) -> EpochId {
192 self.viewing_epoch
193 }
194
195 #[must_use]
197 pub fn transaction_id(&self) -> Option<TransactionId> {
198 self.transaction_id
199 }
200
201 #[must_use]
203 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
204 self.transaction_manager.as_ref()
205 }
206
207 #[must_use]
209 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
210 self.factorized_execution = enabled;
211 self
212 }
213
214 #[must_use]
216 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
217 self.validator = Some(validator);
218 self
219 }
220
221 #[must_use]
223 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
224 self.catalog = Some(catalog);
225 self
226 }
227
228 #[must_use]
230 pub fn with_session_context(
231 mut self,
232 context: grafeo_core::execution::operators::SessionContext,
233 ) -> Self {
234 self.session_context = context;
235 self
236 }
237
238 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
242 match op {
243 LogicalOperator::Expand(expand) => {
244 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
245
246 if is_single_hop {
247 let (inner_count, base) = Self::count_expand_chain(&expand.input);
248 (inner_count + 1, base)
249 } else {
250 (0, op)
251 }
252 }
253 _ => (0, op),
254 }
255 }
256
257 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
261 let mut chain = Vec::new();
262 let mut current = op;
263
264 while let LogicalOperator::Expand(expand) = current {
265 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
266 if !is_single_hop {
267 break;
268 }
269 chain.push(expand);
270 current = &expand.input;
271 }
272
273 chain.reverse();
274 chain
275 }
276
277 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
279 let _span = grafeo_debug_span!("grafeo::query::plan");
280 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
281 Ok(PhysicalPlan {
282 operator,
283 columns,
284 adaptive_context: None,
285 })
286 }
287
288 pub fn plan_profiled(
294 &self,
295 logical_plan: &LogicalPlan,
296 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
297 self.profiling.set(true);
298 self.profile_entries.borrow_mut().clear();
299
300 let result = self.plan_operator(&logical_plan.root);
301
302 self.profiling.set(false);
303 let (operator, columns) = result?;
304 let entries = self.profile_entries.borrow_mut().drain(..).collect();
305
306 Ok((
307 PhysicalPlan {
308 operator,
309 columns,
310 adaptive_context: None,
311 },
312 entries,
313 ))
314 }
315
316 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
318 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
319
320 let mut adaptive_context = AdaptiveContext::new();
321 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
322
323 Ok(PhysicalPlan {
324 operator,
325 columns,
326 adaptive_context: Some(adaptive_context),
327 })
328 }
329
330 fn collect_cardinality_estimates(
332 &self,
333 op: &LogicalOperator,
334 ctx: &mut AdaptiveContext,
335 depth: usize,
336 ) {
337 match op {
338 LogicalOperator::NodeScan(scan) => {
339 let estimate = if let Some(label) = &scan.label {
340 self.store.nodes_by_label(label).len() as f64
341 } else {
342 self.store.node_count() as f64
343 };
344 let id = format!("scan_{}", scan.variable);
345 ctx.set_estimate(&id, estimate);
346
347 if let Some(input) = &scan.input {
348 self.collect_cardinality_estimates(input, ctx, depth + 1);
349 }
350 }
351 LogicalOperator::Filter(filter) => {
352 let input_estimate = self.estimate_cardinality(&filter.input);
353 let estimate = input_estimate * 0.3;
354 let id = format!("filter_{depth}");
355 ctx.set_estimate(&id, estimate);
356
357 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
358 }
359 LogicalOperator::Expand(expand) => {
360 let input_estimate = self.estimate_cardinality(&expand.input);
361 let stats = self.store.statistics();
362 let avg_degree = self.estimate_expand_degree(&stats, expand);
363 let estimate = input_estimate * avg_degree;
364 let id = format!("expand_{}", expand.to_variable);
365 ctx.set_estimate(&id, estimate);
366
367 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
368 }
369 LogicalOperator::Join(join) => {
370 let left_est = self.estimate_cardinality(&join.left);
371 let right_est = self.estimate_cardinality(&join.right);
372 let estimate = (left_est * right_est).sqrt();
373 let id = format!("join_{depth}");
374 ctx.set_estimate(&id, estimate);
375
376 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
377 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
378 }
379 LogicalOperator::Aggregate(agg) => {
380 let input_estimate = self.estimate_cardinality(&agg.input);
381 let estimate = if agg.group_by.is_empty() {
382 1.0
383 } else {
384 (input_estimate * 0.1).max(1.0)
385 };
386 let id = format!("aggregate_{depth}");
387 ctx.set_estimate(&id, estimate);
388
389 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
390 }
391 LogicalOperator::Distinct(distinct) => {
392 let input_estimate = self.estimate_cardinality(&distinct.input);
393 let estimate = (input_estimate * 0.5).max(1.0);
394 let id = format!("distinct_{depth}");
395 ctx.set_estimate(&id, estimate);
396
397 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
398 }
399 LogicalOperator::Return(ret) => {
400 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
401 }
402 LogicalOperator::Limit(limit) => {
403 let input_estimate = self.estimate_cardinality(&limit.input);
404 let estimate = (input_estimate).min(limit.count.estimate());
405 let id = format!("limit_{depth}");
406 ctx.set_estimate(&id, estimate);
407
408 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
409 }
410 LogicalOperator::Skip(skip) => {
411 let input_estimate = self.estimate_cardinality(&skip.input);
412 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
413 let id = format!("skip_{depth}");
414 ctx.set_estimate(&id, estimate);
415
416 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
417 }
418 LogicalOperator::Sort(sort) => {
419 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
420 }
421 LogicalOperator::Union(union) => {
422 let estimate: f64 = union
423 .inputs
424 .iter()
425 .map(|input| self.estimate_cardinality(input))
426 .sum();
427 let id = format!("union_{depth}");
428 ctx.set_estimate(&id, estimate);
429
430 for input in &union.inputs {
431 self.collect_cardinality_estimates(input, ctx, depth + 1);
432 }
433 }
434 _ => {
435 }
437 }
438 }
439
440 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
442 match op {
443 LogicalOperator::NodeScan(scan) => {
444 if let Some(label) = &scan.label {
445 self.store.nodes_by_label(label).len() as f64
446 } else {
447 self.store.node_count() as f64
448 }
449 }
450 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
451 LogicalOperator::Expand(expand) => {
452 let stats = self.store.statistics();
453 let avg_degree = self.estimate_expand_degree(&stats, expand);
454 self.estimate_cardinality(&expand.input) * avg_degree
455 }
456 LogicalOperator::Join(join) => {
457 let left = self.estimate_cardinality(&join.left);
458 let right = self.estimate_cardinality(&join.right);
459 (left * right).sqrt()
460 }
461 LogicalOperator::Aggregate(agg) => {
462 if agg.group_by.is_empty() {
463 1.0
464 } else {
465 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
466 }
467 }
468 LogicalOperator::Distinct(distinct) => {
469 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
470 }
471 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
472 LogicalOperator::Limit(limit) => self
473 .estimate_cardinality(&limit.input)
474 .min(limit.count.estimate()),
475 LogicalOperator::Skip(skip) => {
476 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
477 }
478 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
479 LogicalOperator::Union(union) => union
480 .inputs
481 .iter()
482 .map(|input| self.estimate_cardinality(input))
483 .sum(),
484 LogicalOperator::Except(except) => {
485 let left = self.estimate_cardinality(&except.left);
486 let right = self.estimate_cardinality(&except.right);
487 (left - right).max(0.0)
488 }
489 LogicalOperator::Intersect(intersect) => {
490 let left = self.estimate_cardinality(&intersect.left);
491 let right = self.estimate_cardinality(&intersect.right);
492 left.min(right)
493 }
494 LogicalOperator::Otherwise(otherwise) => self
495 .estimate_cardinality(&otherwise.left)
496 .max(self.estimate_cardinality(&otherwise.right)),
497 _ => 1000.0,
498 }
499 }
500
501 fn estimate_expand_degree(
503 &self,
504 stats: &grafeo_core::statistics::Statistics,
505 expand: &ExpandOp,
506 ) -> f64 {
507 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
508 if expand.edge_types.len() == 1 {
509 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
510 } else if stats.total_nodes > 0 {
511 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
512 } else {
513 10.0
514 }
515 }
516
517 fn maybe_profile(
520 &self,
521 result: Result<(Box<dyn Operator>, Vec<String>)>,
522 op: &LogicalOperator,
523 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
524 if self.profiling.get() {
525 let (physical, columns) = result?;
526 let (entry, stats) =
527 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
528 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
529 self.profile_entries.borrow_mut().push(entry);
530 Ok((Box::new(profiled), columns))
531 } else {
532 result
533 }
534 }
535
536 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
538 let result = match op {
539 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
540 LogicalOperator::Expand(expand) => {
541 if self.factorized_execution {
542 let (chain_len, _base) = Self::count_expand_chain(op);
543 if chain_len >= 2 {
544 return self.maybe_profile(self.plan_expand_chain(op), op);
545 }
546 }
547 self.plan_expand(expand)
548 }
549 LogicalOperator::Return(ret) => self.plan_return(ret),
550 LogicalOperator::Filter(filter) => self.plan_filter(filter),
551 LogicalOperator::Project(project) => self.plan_project(project),
552 LogicalOperator::Limit(limit) => self.plan_limit(limit),
553 LogicalOperator::Skip(skip) => self.plan_skip(skip),
554 LogicalOperator::Sort(sort) => self.plan_sort(sort),
555 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
556 LogicalOperator::Join(join) => self.plan_join(join),
557 LogicalOperator::Union(union) => self.plan_union(union),
558 LogicalOperator::Except(except) => self.plan_except(except),
559 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
560 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
561 LogicalOperator::Apply(apply) => self.plan_apply(apply),
562 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
563 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
564 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
565 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
566 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
567 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
568 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
569 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
570 LogicalOperator::Merge(merge) => self.plan_merge(merge),
571 LogicalOperator::MergeRelationship(merge_rel) => {
572 self.plan_merge_relationship(merge_rel)
573 }
574 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
575 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
576 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
577 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
578 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
579 #[cfg(feature = "algos")]
580 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
581 #[cfg(not(feature = "algos"))]
582 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
583 "CALL procedures require the 'algos' feature".to_string(),
584 )),
585 LogicalOperator::ParameterScan(_param_scan) => {
586 let state = self
587 .correlated_param_state
588 .borrow()
589 .clone()
590 .ok_or_else(|| {
591 Error::Internal(
592 "ParameterScan without correlated Apply context".to_string(),
593 )
594 })?;
595 let columns = state.columns.clone();
598 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
599 Ok((operator, columns))
600 }
601 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
602 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
603 LogicalOperator::LoadData(load) => {
604 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
605 load.path.clone(),
606 load.format,
607 load.with_headers,
608 load.field_terminator,
609 load.variable.clone(),
610 ));
611 Ok((operator, vec![load.variable.clone()]))
612 }
613 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
614 LogicalOperator::VectorScan(_) => Err(Error::Internal(
615 "VectorScan requires vector-index feature".to_string(),
616 )),
617 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
618 "VectorJoin requires vector-index feature".to_string(),
619 )),
620 _ => Err(Error::Internal(format!(
621 "Unsupported operator: {:?}",
622 std::mem::discriminant(op)
623 ))),
624 };
625 self.maybe_profile(result, op)
626 }
627
628 fn plan_horizontal_aggregate(
630 &self,
631 ha: &HorizontalAggregateOp,
632 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
633 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
634
635 let list_col_idx = child_columns
636 .iter()
637 .position(|c| c == &ha.list_column)
638 .ok_or_else(|| {
639 Error::Internal(format!(
640 "HorizontalAggregate list column '{}' not found in {:?}",
641 ha.list_column, child_columns
642 ))
643 })?;
644
645 let entity_kind = match ha.entity_kind {
646 LogicalEntityKind::Edge => EntityKind::Edge,
647 LogicalEntityKind::Node => EntityKind::Node,
648 };
649
650 let function = convert_aggregate_function(ha.function);
651 let input_column_count = child_columns.len();
652
653 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
654 child_op,
655 list_col_idx,
656 entity_kind,
657 function,
658 ha.property.clone(),
659 Arc::clone(&self.store) as Arc<dyn GraphStore>,
660 input_column_count,
661 ));
662
663 let mut columns = child_columns;
664 columns.push(ha.alias.clone());
665 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
667
668 Ok((operator, columns))
669 }
670
671 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
673 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
674 let key_idx = child_columns
675 .iter()
676 .position(|c| c == &mc.key_var)
677 .ok_or_else(|| {
678 Error::Internal(format!(
679 "MapCollect key '{}' not in columns {:?}",
680 mc.key_var, child_columns
681 ))
682 })?;
683 let value_idx = child_columns
684 .iter()
685 .position(|c| c == &mc.value_var)
686 .ok_or_else(|| {
687 Error::Internal(format!(
688 "MapCollect value '{}' not in columns {:?}",
689 mc.value_var, child_columns
690 ))
691 })?;
692 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
693 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
694 Ok((operator, vec![mc.alias.clone()]))
695 }
696}
697
698#[cfg(feature = "algos")]
700struct StaticResultOperator {
701 rows: Vec<Vec<Value>>,
702 column_indices: Vec<usize>,
703 row_index: usize,
704}
705
706#[cfg(feature = "algos")]
707impl Operator for StaticResultOperator {
708 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
709 use grafeo_core::execution::DataChunk;
710
711 if self.row_index >= self.rows.len() {
712 return Ok(None);
713 }
714
715 let remaining = self.rows.len() - self.row_index;
716 let chunk_rows = remaining.min(1024);
717 let col_count = self.column_indices.len();
718
719 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
720 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
721
722 for row_offset in 0..chunk_rows {
723 let row = &self.rows[self.row_index + row_offset];
724 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
725 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
726 if let Some(col) = chunk.column_mut(col_idx) {
727 col.push_value(value);
728 }
729 }
730 }
731 chunk.set_count(chunk_rows);
732
733 self.row_index += chunk_rows;
734 Ok(Some(chunk))
735 }
736
737 fn reset(&mut self) {
738 self.row_index = 0;
739 }
740
741 fn name(&self) -> &'static str {
742 "StaticResult"
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use super::*;
749 use crate::query::plan::{
750 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
751 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
752 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
753 SkipOp as LogicalSkipOp, SortKey, SortOp,
754 };
755 use grafeo_common::types::Value;
756 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
757 use grafeo_core::graph::GraphStoreMut;
758 use grafeo_core::graph::lpg::LpgStore;
759
760 fn create_test_store() -> Arc<dyn GraphStoreMut> {
761 let store = Arc::new(LpgStore::new().unwrap());
762 store.create_node(&["Person"]);
763 store.create_node(&["Person"]);
764 store.create_node(&["Company"]);
765 store
766 }
767
768 #[test]
771 fn test_plan_simple_scan() {
772 let store = create_test_store();
773 let planner = Planner::new(store);
774
775 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
777 items: vec![ReturnItem {
778 expression: LogicalExpression::Variable("n".to_string()),
779 alias: None,
780 }],
781 distinct: false,
782 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
783 variable: "n".to_string(),
784 label: Some("Person".to_string()),
785 input: None,
786 })),
787 }));
788
789 let physical = planner.plan(&logical).unwrap();
790 assert_eq!(physical.columns(), &["n"]);
791 }
792
793 #[test]
794 fn test_plan_scan_without_label() {
795 let store = create_test_store();
796 let planner = Planner::new(store);
797
798 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
800 items: vec![ReturnItem {
801 expression: LogicalExpression::Variable("n".to_string()),
802 alias: None,
803 }],
804 distinct: false,
805 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
806 variable: "n".to_string(),
807 label: None,
808 input: None,
809 })),
810 }));
811
812 let physical = planner.plan(&logical).unwrap();
813 assert_eq!(physical.columns(), &["n"]);
814 }
815
816 #[test]
817 fn test_plan_return_with_alias() {
818 let store = create_test_store();
819 let planner = Planner::new(store);
820
821 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
823 items: vec![ReturnItem {
824 expression: LogicalExpression::Variable("n".to_string()),
825 alias: Some("person".to_string()),
826 }],
827 distinct: false,
828 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
829 variable: "n".to_string(),
830 label: Some("Person".to_string()),
831 input: None,
832 })),
833 }));
834
835 let physical = planner.plan(&logical).unwrap();
836 assert_eq!(physical.columns(), &["person"]);
837 }
838
839 #[test]
840 fn test_plan_return_property() {
841 let store = create_test_store();
842 let planner = Planner::new(store);
843
844 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
846 items: vec![ReturnItem {
847 expression: LogicalExpression::Property {
848 variable: "n".to_string(),
849 property: "name".to_string(),
850 },
851 alias: None,
852 }],
853 distinct: false,
854 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
855 variable: "n".to_string(),
856 label: Some("Person".to_string()),
857 input: None,
858 })),
859 }));
860
861 let physical = planner.plan(&logical).unwrap();
862 assert_eq!(physical.columns(), &["n.name"]);
863 }
864
865 #[test]
866 fn test_plan_return_literal() {
867 let store = create_test_store();
868 let planner = Planner::new(store);
869
870 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
872 items: vec![ReturnItem {
873 expression: LogicalExpression::Literal(Value::Int64(42)),
874 alias: Some("answer".to_string()),
875 }],
876 distinct: false,
877 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
878 variable: "n".to_string(),
879 label: None,
880 input: None,
881 })),
882 }));
883
884 let physical = planner.plan(&logical).unwrap();
885 assert_eq!(physical.columns(), &["answer"]);
886 }
887
888 #[test]
891 fn test_plan_filter_equality() {
892 let store = create_test_store();
893 let planner = Planner::new(store);
894
895 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
897 items: vec![ReturnItem {
898 expression: LogicalExpression::Variable("n".to_string()),
899 alias: None,
900 }],
901 distinct: false,
902 input: Box::new(LogicalOperator::Filter(FilterOp {
903 predicate: LogicalExpression::Binary {
904 left: Box::new(LogicalExpression::Property {
905 variable: "n".to_string(),
906 property: "age".to_string(),
907 }),
908 op: BinaryOp::Eq,
909 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
910 },
911 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
912 variable: "n".to_string(),
913 label: Some("Person".to_string()),
914 input: None,
915 })),
916 pushdown_hint: None,
917 })),
918 }));
919
920 let physical = planner.plan(&logical).unwrap();
921 assert_eq!(physical.columns(), &["n"]);
922 }
923
924 #[test]
925 fn test_plan_filter_compound_and() {
926 let store = create_test_store();
927 let planner = Planner::new(store);
928
929 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
931 items: vec![ReturnItem {
932 expression: LogicalExpression::Variable("n".to_string()),
933 alias: None,
934 }],
935 distinct: false,
936 input: Box::new(LogicalOperator::Filter(FilterOp {
937 predicate: LogicalExpression::Binary {
938 left: Box::new(LogicalExpression::Binary {
939 left: Box::new(LogicalExpression::Property {
940 variable: "n".to_string(),
941 property: "age".to_string(),
942 }),
943 op: BinaryOp::Gt,
944 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
945 }),
946 op: BinaryOp::And,
947 right: Box::new(LogicalExpression::Binary {
948 left: Box::new(LogicalExpression::Property {
949 variable: "n".to_string(),
950 property: "age".to_string(),
951 }),
952 op: BinaryOp::Lt,
953 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
954 }),
955 },
956 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
957 variable: "n".to_string(),
958 label: None,
959 input: None,
960 })),
961 pushdown_hint: None,
962 })),
963 }));
964
965 let physical = planner.plan(&logical).unwrap();
966 assert_eq!(physical.columns(), &["n"]);
967 }
968
969 #[test]
970 fn test_plan_filter_unary_not() {
971 let store = create_test_store();
972 let planner = Planner::new(store);
973
974 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
976 items: vec![ReturnItem {
977 expression: LogicalExpression::Variable("n".to_string()),
978 alias: None,
979 }],
980 distinct: false,
981 input: Box::new(LogicalOperator::Filter(FilterOp {
982 predicate: LogicalExpression::Unary {
983 op: UnaryOp::Not,
984 operand: Box::new(LogicalExpression::Property {
985 variable: "n".to_string(),
986 property: "active".to_string(),
987 }),
988 },
989 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
990 variable: "n".to_string(),
991 label: None,
992 input: None,
993 })),
994 pushdown_hint: None,
995 })),
996 }));
997
998 let physical = planner.plan(&logical).unwrap();
999 assert_eq!(physical.columns(), &["n"]);
1000 }
1001
1002 #[test]
1003 fn test_plan_filter_is_null() {
1004 let store = create_test_store();
1005 let planner = Planner::new(store);
1006
1007 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1009 items: vec![ReturnItem {
1010 expression: LogicalExpression::Variable("n".to_string()),
1011 alias: None,
1012 }],
1013 distinct: false,
1014 input: Box::new(LogicalOperator::Filter(FilterOp {
1015 predicate: LogicalExpression::Unary {
1016 op: UnaryOp::IsNull,
1017 operand: Box::new(LogicalExpression::Property {
1018 variable: "n".to_string(),
1019 property: "email".to_string(),
1020 }),
1021 },
1022 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1023 variable: "n".to_string(),
1024 label: None,
1025 input: None,
1026 })),
1027 pushdown_hint: None,
1028 })),
1029 }));
1030
1031 let physical = planner.plan(&logical).unwrap();
1032 assert_eq!(physical.columns(), &["n"]);
1033 }
1034
1035 #[test]
1036 fn test_plan_filter_function_call() {
1037 let store = create_test_store();
1038 let planner = Planner::new(store);
1039
1040 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1042 items: vec![ReturnItem {
1043 expression: LogicalExpression::Variable("n".to_string()),
1044 alias: None,
1045 }],
1046 distinct: false,
1047 input: Box::new(LogicalOperator::Filter(FilterOp {
1048 predicate: LogicalExpression::Binary {
1049 left: Box::new(LogicalExpression::FunctionCall {
1050 name: "size".to_string(),
1051 args: vec![LogicalExpression::Property {
1052 variable: "n".to_string(),
1053 property: "friends".to_string(),
1054 }],
1055 distinct: false,
1056 }),
1057 op: BinaryOp::Gt,
1058 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1059 },
1060 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1061 variable: "n".to_string(),
1062 label: None,
1063 input: None,
1064 })),
1065 pushdown_hint: None,
1066 })),
1067 }));
1068
1069 let physical = planner.plan(&logical).unwrap();
1070 assert_eq!(physical.columns(), &["n"]);
1071 }
1072
1073 #[test]
1076 fn test_plan_expand_outgoing() {
1077 let store = create_test_store();
1078 let planner = Planner::new(store);
1079
1080 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1082 items: vec![
1083 ReturnItem {
1084 expression: LogicalExpression::Variable("a".to_string()),
1085 alias: None,
1086 },
1087 ReturnItem {
1088 expression: LogicalExpression::Variable("b".to_string()),
1089 alias: None,
1090 },
1091 ],
1092 distinct: false,
1093 input: Box::new(LogicalOperator::Expand(ExpandOp {
1094 from_variable: "a".to_string(),
1095 to_variable: "b".to_string(),
1096 edge_variable: None,
1097 direction: ExpandDirection::Outgoing,
1098 edge_types: vec!["KNOWS".to_string()],
1099 min_hops: 1,
1100 max_hops: Some(1),
1101 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1102 variable: "a".to_string(),
1103 label: Some("Person".to_string()),
1104 input: None,
1105 })),
1106 path_alias: None,
1107 path_mode: PathMode::Walk,
1108 })),
1109 }));
1110
1111 let physical = planner.plan(&logical).unwrap();
1112 assert!(physical.columns().contains(&"a".to_string()));
1114 assert!(physical.columns().contains(&"b".to_string()));
1115 }
1116
1117 #[test]
1118 fn test_plan_expand_with_edge_variable() {
1119 let store = create_test_store();
1120 let planner = Planner::new(store);
1121
1122 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1124 items: vec![
1125 ReturnItem {
1126 expression: LogicalExpression::Variable("a".to_string()),
1127 alias: None,
1128 },
1129 ReturnItem {
1130 expression: LogicalExpression::Variable("r".to_string()),
1131 alias: None,
1132 },
1133 ReturnItem {
1134 expression: LogicalExpression::Variable("b".to_string()),
1135 alias: None,
1136 },
1137 ],
1138 distinct: false,
1139 input: Box::new(LogicalOperator::Expand(ExpandOp {
1140 from_variable: "a".to_string(),
1141 to_variable: "b".to_string(),
1142 edge_variable: Some("r".to_string()),
1143 direction: ExpandDirection::Outgoing,
1144 edge_types: vec!["KNOWS".to_string()],
1145 min_hops: 1,
1146 max_hops: Some(1),
1147 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1148 variable: "a".to_string(),
1149 label: None,
1150 input: None,
1151 })),
1152 path_alias: None,
1153 path_mode: PathMode::Walk,
1154 })),
1155 }));
1156
1157 let physical = planner.plan(&logical).unwrap();
1158 assert!(physical.columns().contains(&"a".to_string()));
1159 assert!(physical.columns().contains(&"r".to_string()));
1160 assert!(physical.columns().contains(&"b".to_string()));
1161 }
1162
1163 #[test]
1166 fn test_plan_limit() {
1167 let store = create_test_store();
1168 let planner = Planner::new(store);
1169
1170 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1172 items: vec![ReturnItem {
1173 expression: LogicalExpression::Variable("n".to_string()),
1174 alias: None,
1175 }],
1176 distinct: false,
1177 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1178 count: 10.into(),
1179 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1180 variable: "n".to_string(),
1181 label: None,
1182 input: None,
1183 })),
1184 })),
1185 }));
1186
1187 let physical = planner.plan(&logical).unwrap();
1188 assert_eq!(physical.columns(), &["n"]);
1189 }
1190
1191 #[test]
1192 fn test_plan_skip() {
1193 let store = create_test_store();
1194 let planner = Planner::new(store);
1195
1196 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1198 items: vec![ReturnItem {
1199 expression: LogicalExpression::Variable("n".to_string()),
1200 alias: None,
1201 }],
1202 distinct: false,
1203 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1204 count: 5.into(),
1205 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1206 variable: "n".to_string(),
1207 label: None,
1208 input: None,
1209 })),
1210 })),
1211 }));
1212
1213 let physical = planner.plan(&logical).unwrap();
1214 assert_eq!(physical.columns(), &["n"]);
1215 }
1216
1217 #[test]
1218 fn test_plan_sort() {
1219 let store = create_test_store();
1220 let planner = Planner::new(store);
1221
1222 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1224 items: vec![ReturnItem {
1225 expression: LogicalExpression::Variable("n".to_string()),
1226 alias: None,
1227 }],
1228 distinct: false,
1229 input: Box::new(LogicalOperator::Sort(SortOp {
1230 keys: vec![SortKey {
1231 expression: LogicalExpression::Variable("n".to_string()),
1232 order: SortOrder::Ascending,
1233 nulls: None,
1234 }],
1235 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1236 variable: "n".to_string(),
1237 label: None,
1238 input: None,
1239 })),
1240 })),
1241 }));
1242
1243 let physical = planner.plan(&logical).unwrap();
1244 assert_eq!(physical.columns(), &["n"]);
1245 }
1246
1247 #[test]
1248 fn test_plan_sort_descending() {
1249 let store = create_test_store();
1250 let planner = Planner::new(store);
1251
1252 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1254 items: vec![ReturnItem {
1255 expression: LogicalExpression::Variable("n".to_string()),
1256 alias: None,
1257 }],
1258 distinct: false,
1259 input: Box::new(LogicalOperator::Sort(SortOp {
1260 keys: vec![SortKey {
1261 expression: LogicalExpression::Variable("n".to_string()),
1262 order: SortOrder::Descending,
1263 nulls: None,
1264 }],
1265 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1266 variable: "n".to_string(),
1267 label: None,
1268 input: None,
1269 })),
1270 })),
1271 }));
1272
1273 let physical = planner.plan(&logical).unwrap();
1274 assert_eq!(physical.columns(), &["n"]);
1275 }
1276
1277 #[test]
1278 fn test_plan_distinct() {
1279 let store = create_test_store();
1280 let planner = Planner::new(store);
1281
1282 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1284 items: vec![ReturnItem {
1285 expression: LogicalExpression::Variable("n".to_string()),
1286 alias: None,
1287 }],
1288 distinct: false,
1289 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1290 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1291 variable: "n".to_string(),
1292 label: None,
1293 input: None,
1294 })),
1295 columns: None,
1296 })),
1297 }));
1298
1299 let physical = planner.plan(&logical).unwrap();
1300 assert_eq!(physical.columns(), &["n"]);
1301 }
1302
1303 #[test]
1304 fn test_plan_distinct_with_columns() {
1305 let store = create_test_store();
1306 let planner = Planner::new(store);
1307
1308 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1310 items: vec![ReturnItem {
1311 expression: LogicalExpression::Variable("n".to_string()),
1312 alias: None,
1313 }],
1314 distinct: false,
1315 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1316 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1317 variable: "n".to_string(),
1318 label: None,
1319 input: None,
1320 })),
1321 columns: Some(vec!["n".to_string()]),
1322 })),
1323 }));
1324
1325 let physical = planner.plan(&logical).unwrap();
1326 assert_eq!(physical.columns(), &["n"]);
1327 }
1328
1329 #[test]
1330 fn test_plan_distinct_with_nonexistent_columns() {
1331 let store = create_test_store();
1332 let planner = Planner::new(store);
1333
1334 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1337 items: vec![ReturnItem {
1338 expression: LogicalExpression::Variable("n".to_string()),
1339 alias: None,
1340 }],
1341 distinct: false,
1342 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1343 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1344 variable: "n".to_string(),
1345 label: None,
1346 input: None,
1347 })),
1348 columns: Some(vec!["nonexistent".to_string()]),
1349 })),
1350 }));
1351
1352 let physical = planner.plan(&logical).unwrap();
1353 assert_eq!(physical.columns(), &["n"]);
1354 }
1355
1356 #[test]
1359 fn test_plan_aggregate_count() {
1360 let store = create_test_store();
1361 let planner = Planner::new(store);
1362
1363 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1365 items: vec![ReturnItem {
1366 expression: LogicalExpression::Variable("cnt".to_string()),
1367 alias: None,
1368 }],
1369 distinct: false,
1370 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1371 group_by: vec![],
1372 aggregates: vec![LogicalAggregateExpr {
1373 function: LogicalAggregateFunction::Count,
1374 expression: Some(LogicalExpression::Variable("n".to_string())),
1375 expression2: None,
1376 distinct: false,
1377 alias: Some("cnt".to_string()),
1378 percentile: None,
1379 separator: None,
1380 }],
1381 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1382 variable: "n".to_string(),
1383 label: None,
1384 input: None,
1385 })),
1386 having: None,
1387 })),
1388 }));
1389
1390 let physical = planner.plan(&logical).unwrap();
1391 assert!(physical.columns().contains(&"cnt".to_string()));
1392 }
1393
1394 #[test]
1395 fn test_plan_aggregate_with_group_by() {
1396 let store = create_test_store();
1397 let planner = Planner::new(store);
1398
1399 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1401 group_by: vec![LogicalExpression::Property {
1402 variable: "n".to_string(),
1403 property: "city".to_string(),
1404 }],
1405 aggregates: vec![LogicalAggregateExpr {
1406 function: LogicalAggregateFunction::Count,
1407 expression: Some(LogicalExpression::Variable("n".to_string())),
1408 expression2: None,
1409 distinct: false,
1410 alias: Some("cnt".to_string()),
1411 percentile: None,
1412 separator: None,
1413 }],
1414 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1415 variable: "n".to_string(),
1416 label: Some("Person".to_string()),
1417 input: None,
1418 })),
1419 having: None,
1420 }));
1421
1422 let physical = planner.plan(&logical).unwrap();
1423 assert_eq!(physical.columns().len(), 2);
1424 }
1425
1426 #[test]
1427 fn test_plan_aggregate_sum() {
1428 let store = create_test_store();
1429 let planner = Planner::new(store);
1430
1431 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1433 group_by: vec![],
1434 aggregates: vec![LogicalAggregateExpr {
1435 function: LogicalAggregateFunction::Sum,
1436 expression: Some(LogicalExpression::Property {
1437 variable: "n".to_string(),
1438 property: "value".to_string(),
1439 }),
1440 expression2: None,
1441 distinct: false,
1442 alias: Some("total".to_string()),
1443 percentile: None,
1444 separator: None,
1445 }],
1446 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1447 variable: "n".to_string(),
1448 label: None,
1449 input: None,
1450 })),
1451 having: None,
1452 }));
1453
1454 let physical = planner.plan(&logical).unwrap();
1455 assert!(physical.columns().contains(&"total".to_string()));
1456 }
1457
1458 #[test]
1459 fn test_plan_aggregate_avg() {
1460 let store = create_test_store();
1461 let planner = Planner::new(store);
1462
1463 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1465 group_by: vec![],
1466 aggregates: vec![LogicalAggregateExpr {
1467 function: LogicalAggregateFunction::Avg,
1468 expression: Some(LogicalExpression::Property {
1469 variable: "n".to_string(),
1470 property: "score".to_string(),
1471 }),
1472 expression2: None,
1473 distinct: false,
1474 alias: Some("average".to_string()),
1475 percentile: None,
1476 separator: None,
1477 }],
1478 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1479 variable: "n".to_string(),
1480 label: None,
1481 input: None,
1482 })),
1483 having: None,
1484 }));
1485
1486 let physical = planner.plan(&logical).unwrap();
1487 assert!(physical.columns().contains(&"average".to_string()));
1488 }
1489
1490 #[test]
1491 fn test_plan_aggregate_min_max() {
1492 let store = create_test_store();
1493 let planner = Planner::new(store);
1494
1495 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1497 group_by: vec![],
1498 aggregates: vec![
1499 LogicalAggregateExpr {
1500 function: LogicalAggregateFunction::Min,
1501 expression: Some(LogicalExpression::Property {
1502 variable: "n".to_string(),
1503 property: "age".to_string(),
1504 }),
1505 expression2: None,
1506 distinct: false,
1507 alias: Some("youngest".to_string()),
1508 percentile: None,
1509 separator: None,
1510 },
1511 LogicalAggregateExpr {
1512 function: LogicalAggregateFunction::Max,
1513 expression: Some(LogicalExpression::Property {
1514 variable: "n".to_string(),
1515 property: "age".to_string(),
1516 }),
1517 expression2: None,
1518 distinct: false,
1519 alias: Some("oldest".to_string()),
1520 percentile: None,
1521 separator: None,
1522 },
1523 ],
1524 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1525 variable: "n".to_string(),
1526 label: None,
1527 input: None,
1528 })),
1529 having: None,
1530 }));
1531
1532 let physical = planner.plan(&logical).unwrap();
1533 assert!(physical.columns().contains(&"youngest".to_string()));
1534 assert!(physical.columns().contains(&"oldest".to_string()));
1535 }
1536
1537 #[test]
1540 fn test_plan_inner_join() {
1541 let store = create_test_store();
1542 let planner = Planner::new(store);
1543
1544 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1546 items: vec![
1547 ReturnItem {
1548 expression: LogicalExpression::Variable("a".to_string()),
1549 alias: None,
1550 },
1551 ReturnItem {
1552 expression: LogicalExpression::Variable("b".to_string()),
1553 alias: None,
1554 },
1555 ],
1556 distinct: false,
1557 input: Box::new(LogicalOperator::Join(JoinOp {
1558 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1559 variable: "a".to_string(),
1560 label: Some("Person".to_string()),
1561 input: None,
1562 })),
1563 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1564 variable: "b".to_string(),
1565 label: Some("Company".to_string()),
1566 input: None,
1567 })),
1568 join_type: JoinType::Inner,
1569 conditions: vec![JoinCondition {
1570 left: LogicalExpression::Variable("a".to_string()),
1571 right: LogicalExpression::Variable("b".to_string()),
1572 }],
1573 })),
1574 }));
1575
1576 let physical = planner.plan(&logical).unwrap();
1577 assert!(physical.columns().contains(&"a".to_string()));
1578 assert!(physical.columns().contains(&"b".to_string()));
1579 }
1580
1581 #[test]
1582 fn test_plan_cross_join() {
1583 let store = create_test_store();
1584 let planner = Planner::new(store);
1585
1586 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1588 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1589 variable: "a".to_string(),
1590 label: None,
1591 input: None,
1592 })),
1593 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1594 variable: "b".to_string(),
1595 label: None,
1596 input: None,
1597 })),
1598 join_type: JoinType::Cross,
1599 conditions: vec![],
1600 }));
1601
1602 let physical = planner.plan(&logical).unwrap();
1603 assert_eq!(physical.columns().len(), 2);
1604 }
1605
1606 #[test]
1607 fn test_plan_left_join() {
1608 let store = create_test_store();
1609 let planner = Planner::new(store);
1610
1611 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1612 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1613 variable: "a".to_string(),
1614 label: None,
1615 input: None,
1616 })),
1617 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1618 variable: "b".to_string(),
1619 label: None,
1620 input: None,
1621 })),
1622 join_type: JoinType::Left,
1623 conditions: vec![],
1624 }));
1625
1626 let physical = planner.plan(&logical).unwrap();
1627 assert_eq!(physical.columns().len(), 2);
1628 }
1629
1630 #[test]
1633 fn test_plan_create_node() {
1634 let store = create_test_store();
1635 let planner = Planner::new(store);
1636
1637 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1639 variable: "n".to_string(),
1640 labels: vec!["Person".to_string()],
1641 properties: vec![(
1642 "name".to_string(),
1643 LogicalExpression::Literal(Value::String("Alix".into())),
1644 )],
1645 input: None,
1646 }));
1647
1648 let physical = planner.plan(&logical).unwrap();
1649 assert!(physical.columns().contains(&"n".to_string()));
1650 }
1651
1652 #[test]
1653 fn test_plan_create_edge() {
1654 let store = create_test_store();
1655 let planner = Planner::new(store);
1656
1657 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1659 variable: Some("r".to_string()),
1660 from_variable: "a".to_string(),
1661 to_variable: "b".to_string(),
1662 edge_type: "KNOWS".to_string(),
1663 properties: vec![],
1664 input: Box::new(LogicalOperator::Join(JoinOp {
1665 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1666 variable: "a".to_string(),
1667 label: None,
1668 input: None,
1669 })),
1670 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1671 variable: "b".to_string(),
1672 label: None,
1673 input: None,
1674 })),
1675 join_type: JoinType::Cross,
1676 conditions: vec![],
1677 })),
1678 }));
1679
1680 let physical = planner.plan(&logical).unwrap();
1681 assert!(physical.columns().contains(&"r".to_string()));
1682 }
1683
1684 #[test]
1685 fn test_plan_delete_node() {
1686 let store = create_test_store();
1687 let planner = Planner::new(store);
1688
1689 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1691 variable: "n".to_string(),
1692 detach: false,
1693 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1694 variable: "n".to_string(),
1695 label: None,
1696 input: None,
1697 })),
1698 }));
1699
1700 let physical = planner.plan(&logical).unwrap();
1701 assert!(physical.columns().contains(&"n".to_string()));
1702 }
1703
1704 #[test]
1707 fn test_plan_empty_errors() {
1708 let store = create_test_store();
1709 let planner = Planner::new(store);
1710
1711 let logical = LogicalPlan::new(LogicalOperator::Empty);
1712 let result = planner.plan(&logical);
1713 assert!(result.is_err());
1714 }
1715
1716 #[test]
1717 fn test_plan_missing_variable_in_return() {
1718 let store = create_test_store();
1719 let planner = Planner::new(store);
1720
1721 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1723 items: vec![ReturnItem {
1724 expression: LogicalExpression::Variable("missing".to_string()),
1725 alias: None,
1726 }],
1727 distinct: false,
1728 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1729 variable: "n".to_string(),
1730 label: None,
1731 input: None,
1732 })),
1733 }));
1734
1735 let result = planner.plan(&logical);
1736 assert!(result.is_err());
1737 }
1738
1739 #[test]
1742 fn test_convert_binary_ops() {
1743 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1744 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1745 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1746 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1747 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1748 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1749 assert!(convert_binary_op(BinaryOp::And).is_ok());
1750 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1751 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1752 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1753 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1754 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1755 }
1756
1757 #[test]
1758 fn test_convert_unary_ops() {
1759 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1760 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1761 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1762 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1763 }
1764
1765 #[test]
1766 fn test_convert_aggregate_functions() {
1767 assert!(matches!(
1768 convert_aggregate_function(LogicalAggregateFunction::Count),
1769 PhysicalAggregateFunction::Count
1770 ));
1771 assert!(matches!(
1772 convert_aggregate_function(LogicalAggregateFunction::Sum),
1773 PhysicalAggregateFunction::Sum
1774 ));
1775 assert!(matches!(
1776 convert_aggregate_function(LogicalAggregateFunction::Avg),
1777 PhysicalAggregateFunction::Avg
1778 ));
1779 assert!(matches!(
1780 convert_aggregate_function(LogicalAggregateFunction::Min),
1781 PhysicalAggregateFunction::Min
1782 ));
1783 assert!(matches!(
1784 convert_aggregate_function(LogicalAggregateFunction::Max),
1785 PhysicalAggregateFunction::Max
1786 ));
1787 }
1788
1789 #[test]
1790 fn test_planner_accessors() {
1791 let store = create_test_store();
1792 let planner = Planner::new(Arc::clone(&store));
1793
1794 assert!(planner.transaction_id().is_none());
1795 assert!(planner.transaction_manager().is_none());
1796 let _ = planner.viewing_epoch(); }
1798
1799 #[test]
1800 fn test_physical_plan_accessors() {
1801 let store = create_test_store();
1802 let planner = Planner::new(store);
1803
1804 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1805 variable: "n".to_string(),
1806 label: None,
1807 input: None,
1808 }));
1809
1810 let physical = planner.plan(&logical).unwrap();
1811 assert_eq!(physical.columns(), &["n"]);
1812
1813 let _ = physical.into_operator();
1815 }
1816
1817 #[test]
1820 fn test_plan_adaptive_with_scan() {
1821 let store = create_test_store();
1822 let planner = Planner::new(store);
1823
1824 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1826 items: vec![ReturnItem {
1827 expression: LogicalExpression::Variable("n".to_string()),
1828 alias: None,
1829 }],
1830 distinct: false,
1831 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1832 variable: "n".to_string(),
1833 label: Some("Person".to_string()),
1834 input: None,
1835 })),
1836 }));
1837
1838 let physical = planner.plan_adaptive(&logical).unwrap();
1839 assert_eq!(physical.columns(), &["n"]);
1840 assert!(physical.adaptive_context.is_some());
1842 }
1843
1844 #[test]
1845 fn test_plan_adaptive_with_filter() {
1846 let store = create_test_store();
1847 let planner = Planner::new(store);
1848
1849 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1851 items: vec![ReturnItem {
1852 expression: LogicalExpression::Variable("n".to_string()),
1853 alias: None,
1854 }],
1855 distinct: false,
1856 input: Box::new(LogicalOperator::Filter(FilterOp {
1857 predicate: LogicalExpression::Binary {
1858 left: Box::new(LogicalExpression::Property {
1859 variable: "n".to_string(),
1860 property: "age".to_string(),
1861 }),
1862 op: BinaryOp::Gt,
1863 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1864 },
1865 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1866 variable: "n".to_string(),
1867 label: None,
1868 input: None,
1869 })),
1870 pushdown_hint: None,
1871 })),
1872 }));
1873
1874 let physical = planner.plan_adaptive(&logical).unwrap();
1875 assert!(physical.adaptive_context.is_some());
1876 }
1877
1878 #[test]
1879 fn test_plan_adaptive_with_expand() {
1880 let store = create_test_store();
1881 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1882
1883 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1885 items: vec![
1886 ReturnItem {
1887 expression: LogicalExpression::Variable("a".to_string()),
1888 alias: None,
1889 },
1890 ReturnItem {
1891 expression: LogicalExpression::Variable("b".to_string()),
1892 alias: None,
1893 },
1894 ],
1895 distinct: false,
1896 input: Box::new(LogicalOperator::Expand(ExpandOp {
1897 from_variable: "a".to_string(),
1898 to_variable: "b".to_string(),
1899 edge_variable: None,
1900 direction: ExpandDirection::Outgoing,
1901 edge_types: vec!["KNOWS".to_string()],
1902 min_hops: 1,
1903 max_hops: Some(1),
1904 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1905 variable: "a".to_string(),
1906 label: None,
1907 input: None,
1908 })),
1909 path_alias: None,
1910 path_mode: PathMode::Walk,
1911 })),
1912 }));
1913
1914 let physical = planner.plan_adaptive(&logical).unwrap();
1915 assert!(physical.adaptive_context.is_some());
1916 }
1917
1918 #[test]
1919 fn test_plan_adaptive_with_join() {
1920 let store = create_test_store();
1921 let planner = Planner::new(store);
1922
1923 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1924 items: vec![
1925 ReturnItem {
1926 expression: LogicalExpression::Variable("a".to_string()),
1927 alias: None,
1928 },
1929 ReturnItem {
1930 expression: LogicalExpression::Variable("b".to_string()),
1931 alias: None,
1932 },
1933 ],
1934 distinct: false,
1935 input: Box::new(LogicalOperator::Join(JoinOp {
1936 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1937 variable: "a".to_string(),
1938 label: None,
1939 input: None,
1940 })),
1941 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1942 variable: "b".to_string(),
1943 label: None,
1944 input: None,
1945 })),
1946 join_type: JoinType::Cross,
1947 conditions: vec![],
1948 })),
1949 }));
1950
1951 let physical = planner.plan_adaptive(&logical).unwrap();
1952 assert!(physical.adaptive_context.is_some());
1953 }
1954
1955 #[test]
1956 fn test_plan_adaptive_with_aggregate() {
1957 let store = create_test_store();
1958 let planner = Planner::new(store);
1959
1960 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1961 group_by: vec![],
1962 aggregates: vec![LogicalAggregateExpr {
1963 function: LogicalAggregateFunction::Count,
1964 expression: Some(LogicalExpression::Variable("n".to_string())),
1965 expression2: None,
1966 distinct: false,
1967 alias: Some("cnt".to_string()),
1968 percentile: None,
1969 separator: None,
1970 }],
1971 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1972 variable: "n".to_string(),
1973 label: None,
1974 input: None,
1975 })),
1976 having: None,
1977 }));
1978
1979 let physical = planner.plan_adaptive(&logical).unwrap();
1980 assert!(physical.adaptive_context.is_some());
1981 }
1982
1983 #[test]
1984 fn test_plan_adaptive_with_distinct() {
1985 let store = create_test_store();
1986 let planner = Planner::new(store);
1987
1988 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1989 items: vec![ReturnItem {
1990 expression: LogicalExpression::Variable("n".to_string()),
1991 alias: None,
1992 }],
1993 distinct: false,
1994 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1995 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1996 variable: "n".to_string(),
1997 label: None,
1998 input: None,
1999 })),
2000 columns: None,
2001 })),
2002 }));
2003
2004 let physical = planner.plan_adaptive(&logical).unwrap();
2005 assert!(physical.adaptive_context.is_some());
2006 }
2007
2008 #[test]
2009 fn test_plan_adaptive_with_limit() {
2010 let store = create_test_store();
2011 let planner = Planner::new(store);
2012
2013 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2014 items: vec![ReturnItem {
2015 expression: LogicalExpression::Variable("n".to_string()),
2016 alias: None,
2017 }],
2018 distinct: false,
2019 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2020 count: 10.into(),
2021 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2022 variable: "n".to_string(),
2023 label: None,
2024 input: None,
2025 })),
2026 })),
2027 }));
2028
2029 let physical = planner.plan_adaptive(&logical).unwrap();
2030 assert!(physical.adaptive_context.is_some());
2031 }
2032
2033 #[test]
2034 fn test_plan_adaptive_with_skip() {
2035 let store = create_test_store();
2036 let planner = Planner::new(store);
2037
2038 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2039 items: vec![ReturnItem {
2040 expression: LogicalExpression::Variable("n".to_string()),
2041 alias: None,
2042 }],
2043 distinct: false,
2044 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2045 count: 5.into(),
2046 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2047 variable: "n".to_string(),
2048 label: None,
2049 input: None,
2050 })),
2051 })),
2052 }));
2053
2054 let physical = planner.plan_adaptive(&logical).unwrap();
2055 assert!(physical.adaptive_context.is_some());
2056 }
2057
2058 #[test]
2059 fn test_plan_adaptive_with_sort() {
2060 let store = create_test_store();
2061 let planner = Planner::new(store);
2062
2063 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2064 items: vec![ReturnItem {
2065 expression: LogicalExpression::Variable("n".to_string()),
2066 alias: None,
2067 }],
2068 distinct: false,
2069 input: Box::new(LogicalOperator::Sort(SortOp {
2070 keys: vec![SortKey {
2071 expression: LogicalExpression::Variable("n".to_string()),
2072 order: SortOrder::Ascending,
2073 nulls: None,
2074 }],
2075 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2076 variable: "n".to_string(),
2077 label: None,
2078 input: None,
2079 })),
2080 })),
2081 }));
2082
2083 let physical = planner.plan_adaptive(&logical).unwrap();
2084 assert!(physical.adaptive_context.is_some());
2085 }
2086
2087 #[test]
2088 fn test_plan_adaptive_with_union() {
2089 let store = create_test_store();
2090 let planner = Planner::new(store);
2091
2092 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2093 items: vec![ReturnItem {
2094 expression: LogicalExpression::Variable("n".to_string()),
2095 alias: None,
2096 }],
2097 distinct: false,
2098 input: Box::new(LogicalOperator::Union(UnionOp {
2099 inputs: vec![
2100 LogicalOperator::NodeScan(NodeScanOp {
2101 variable: "n".to_string(),
2102 label: Some("Person".to_string()),
2103 input: None,
2104 }),
2105 LogicalOperator::NodeScan(NodeScanOp {
2106 variable: "n".to_string(),
2107 label: Some("Company".to_string()),
2108 input: None,
2109 }),
2110 ],
2111 })),
2112 }));
2113
2114 let physical = planner.plan_adaptive(&logical).unwrap();
2115 assert!(physical.adaptive_context.is_some());
2116 }
2117
2118 #[test]
2121 fn test_plan_expand_variable_length() {
2122 let store = create_test_store();
2123 let planner = Planner::new(store);
2124
2125 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2127 items: vec![
2128 ReturnItem {
2129 expression: LogicalExpression::Variable("a".to_string()),
2130 alias: None,
2131 },
2132 ReturnItem {
2133 expression: LogicalExpression::Variable("b".to_string()),
2134 alias: None,
2135 },
2136 ],
2137 distinct: false,
2138 input: Box::new(LogicalOperator::Expand(ExpandOp {
2139 from_variable: "a".to_string(),
2140 to_variable: "b".to_string(),
2141 edge_variable: None,
2142 direction: ExpandDirection::Outgoing,
2143 edge_types: vec!["KNOWS".to_string()],
2144 min_hops: 1,
2145 max_hops: Some(3),
2146 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2147 variable: "a".to_string(),
2148 label: None,
2149 input: None,
2150 })),
2151 path_alias: None,
2152 path_mode: PathMode::Walk,
2153 })),
2154 }));
2155
2156 let physical = planner.plan(&logical).unwrap();
2157 assert!(physical.columns().contains(&"a".to_string()));
2158 assert!(physical.columns().contains(&"b".to_string()));
2159 }
2160
2161 #[test]
2162 fn test_plan_expand_with_path_alias() {
2163 let store = create_test_store();
2164 let planner = Planner::new(store);
2165
2166 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2168 items: vec![
2169 ReturnItem {
2170 expression: LogicalExpression::Variable("a".to_string()),
2171 alias: None,
2172 },
2173 ReturnItem {
2174 expression: LogicalExpression::Variable("b".to_string()),
2175 alias: None,
2176 },
2177 ],
2178 distinct: false,
2179 input: Box::new(LogicalOperator::Expand(ExpandOp {
2180 from_variable: "a".to_string(),
2181 to_variable: "b".to_string(),
2182 edge_variable: None,
2183 direction: ExpandDirection::Outgoing,
2184 edge_types: vec!["KNOWS".to_string()],
2185 min_hops: 1,
2186 max_hops: Some(3),
2187 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2188 variable: "a".to_string(),
2189 label: None,
2190 input: None,
2191 })),
2192 path_alias: Some("p".to_string()),
2193 path_mode: PathMode::Walk,
2194 })),
2195 }));
2196
2197 let physical = planner.plan(&logical).unwrap();
2198 assert!(physical.columns().contains(&"a".to_string()));
2200 assert!(physical.columns().contains(&"b".to_string()));
2201 }
2202
2203 #[test]
2204 fn test_plan_expand_incoming() {
2205 let store = create_test_store();
2206 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2207
2208 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2210 items: vec![
2211 ReturnItem {
2212 expression: LogicalExpression::Variable("a".to_string()),
2213 alias: None,
2214 },
2215 ReturnItem {
2216 expression: LogicalExpression::Variable("b".to_string()),
2217 alias: None,
2218 },
2219 ],
2220 distinct: false,
2221 input: Box::new(LogicalOperator::Expand(ExpandOp {
2222 from_variable: "a".to_string(),
2223 to_variable: "b".to_string(),
2224 edge_variable: None,
2225 direction: ExpandDirection::Incoming,
2226 edge_types: vec!["KNOWS".to_string()],
2227 min_hops: 1,
2228 max_hops: Some(1),
2229 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2230 variable: "a".to_string(),
2231 label: None,
2232 input: None,
2233 })),
2234 path_alias: None,
2235 path_mode: PathMode::Walk,
2236 })),
2237 }));
2238
2239 let physical = planner.plan(&logical).unwrap();
2240 assert!(physical.columns().contains(&"a".to_string()));
2241 assert!(physical.columns().contains(&"b".to_string()));
2242 }
2243
2244 #[test]
2245 fn test_plan_expand_both_directions() {
2246 let store = create_test_store();
2247 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2248
2249 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2251 items: vec![
2252 ReturnItem {
2253 expression: LogicalExpression::Variable("a".to_string()),
2254 alias: None,
2255 },
2256 ReturnItem {
2257 expression: LogicalExpression::Variable("b".to_string()),
2258 alias: None,
2259 },
2260 ],
2261 distinct: false,
2262 input: Box::new(LogicalOperator::Expand(ExpandOp {
2263 from_variable: "a".to_string(),
2264 to_variable: "b".to_string(),
2265 edge_variable: None,
2266 direction: ExpandDirection::Both,
2267 edge_types: vec!["KNOWS".to_string()],
2268 min_hops: 1,
2269 max_hops: Some(1),
2270 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2271 variable: "a".to_string(),
2272 label: None,
2273 input: None,
2274 })),
2275 path_alias: None,
2276 path_mode: PathMode::Walk,
2277 })),
2278 }));
2279
2280 let physical = planner.plan(&logical).unwrap();
2281 assert!(physical.columns().contains(&"a".to_string()));
2282 assert!(physical.columns().contains(&"b".to_string()));
2283 }
2284
2285 #[test]
2288 fn test_planner_with_context() {
2289 use crate::transaction::TransactionManager;
2290
2291 let store = create_test_store();
2292 let transaction_manager = Arc::new(TransactionManager::new());
2293 let transaction_id = transaction_manager.begin();
2294 let epoch = transaction_manager.current_epoch();
2295
2296 let planner = Planner::with_context(
2297 Arc::clone(&store),
2298 Arc::clone(&transaction_manager),
2299 Some(transaction_id),
2300 epoch,
2301 );
2302
2303 assert_eq!(planner.transaction_id(), Some(transaction_id));
2304 assert!(planner.transaction_manager().is_some());
2305 assert_eq!(planner.viewing_epoch(), epoch);
2306 }
2307
2308 #[test]
2309 fn test_planner_with_factorized_execution_disabled() {
2310 let store = create_test_store();
2311 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2312
2313 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2315 items: vec![
2316 ReturnItem {
2317 expression: LogicalExpression::Variable("a".to_string()),
2318 alias: None,
2319 },
2320 ReturnItem {
2321 expression: LogicalExpression::Variable("c".to_string()),
2322 alias: None,
2323 },
2324 ],
2325 distinct: false,
2326 input: Box::new(LogicalOperator::Expand(ExpandOp {
2327 from_variable: "b".to_string(),
2328 to_variable: "c".to_string(),
2329 edge_variable: None,
2330 direction: ExpandDirection::Outgoing,
2331 edge_types: vec![],
2332 min_hops: 1,
2333 max_hops: Some(1),
2334 input: Box::new(LogicalOperator::Expand(ExpandOp {
2335 from_variable: "a".to_string(),
2336 to_variable: "b".to_string(),
2337 edge_variable: None,
2338 direction: ExpandDirection::Outgoing,
2339 edge_types: vec![],
2340 min_hops: 1,
2341 max_hops: Some(1),
2342 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2343 variable: "a".to_string(),
2344 label: None,
2345 input: None,
2346 })),
2347 path_alias: None,
2348 path_mode: PathMode::Walk,
2349 })),
2350 path_alias: None,
2351 path_mode: PathMode::Walk,
2352 })),
2353 }));
2354
2355 let physical = planner.plan(&logical).unwrap();
2356 assert!(physical.columns().contains(&"a".to_string()));
2357 assert!(physical.columns().contains(&"c".to_string()));
2358 }
2359
2360 #[test]
2363 fn test_plan_sort_by_property() {
2364 let store = create_test_store();
2365 let planner = Planner::new(store);
2366
2367 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2369 items: vec![ReturnItem {
2370 expression: LogicalExpression::Variable("n".to_string()),
2371 alias: None,
2372 }],
2373 distinct: false,
2374 input: Box::new(LogicalOperator::Sort(SortOp {
2375 keys: vec![SortKey {
2376 expression: LogicalExpression::Property {
2377 variable: "n".to_string(),
2378 property: "name".to_string(),
2379 },
2380 order: SortOrder::Ascending,
2381 nulls: None,
2382 }],
2383 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2384 variable: "n".to_string(),
2385 label: None,
2386 input: None,
2387 })),
2388 })),
2389 }));
2390
2391 let physical = planner.plan(&logical).unwrap();
2392 assert!(physical.columns().contains(&"n".to_string()));
2394 }
2395
2396 #[test]
2399 fn test_plan_scan_with_input() {
2400 let store = create_test_store();
2401 let planner = Planner::new(store);
2402
2403 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2405 items: vec![
2406 ReturnItem {
2407 expression: LogicalExpression::Variable("a".to_string()),
2408 alias: None,
2409 },
2410 ReturnItem {
2411 expression: LogicalExpression::Variable("b".to_string()),
2412 alias: None,
2413 },
2414 ],
2415 distinct: false,
2416 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2417 variable: "b".to_string(),
2418 label: Some("Company".to_string()),
2419 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2420 variable: "a".to_string(),
2421 label: Some("Person".to_string()),
2422 input: None,
2423 }))),
2424 })),
2425 }));
2426
2427 let physical = planner.plan(&logical).unwrap();
2428 assert!(physical.columns().contains(&"a".to_string()));
2429 assert!(physical.columns().contains(&"b".to_string()));
2430 }
2431}