1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34 DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35 ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36 FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43 VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53 convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57struct RangeBounds<'a> {
59 min: Option<&'a Value>,
60 max: Option<&'a Value>,
61 min_inclusive: bool,
62 max_inclusive: bool,
63}
64
65pub struct Planner {
67 pub(super) store: Arc<dyn GraphStore>,
69 pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
71 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
73 pub(super) transaction_id: Option<TransactionId>,
75 pub(super) viewing_epoch: EpochId,
77 pub(super) anon_edge_counter: std::cell::Cell<u32>,
79 pub(super) factorized_execution: bool,
81 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
87 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
89 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
91 pub(super) correlated_param_state:
95 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
96 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
99 profiling: std::cell::Cell<bool>,
101 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
103 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
105 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
107 pub(super) read_only: bool,
111}
112
113impl Planner {
114 #[must_use]
119 pub fn new(store: Arc<dyn GraphStore>) -> Self {
120 let epoch = store.current_epoch();
121 Self {
122 store,
123 write_store: None,
124 transaction_manager: None,
125 transaction_id: None,
126 viewing_epoch: epoch,
127 anon_edge_counter: std::cell::Cell::new(0),
128 factorized_execution: true,
129 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131 validator: None,
132 catalog: None,
133 correlated_param_state: std::cell::RefCell::new(None),
134 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
135 profiling: std::cell::Cell::new(false),
136 profile_entries: std::cell::RefCell::new(Vec::new()),
137 write_tracker: None,
138 session_context: grafeo_core::execution::operators::SessionContext::default(),
139 read_only: false,
140 }
141 }
142
143 #[must_use]
145 pub fn with_context(
146 store: Arc<dyn GraphStore>,
147 write_store: Option<Arc<dyn GraphStoreMut>>,
148 transaction_manager: Arc<TransactionManager>,
149 transaction_id: Option<TransactionId>,
150 viewing_epoch: EpochId,
151 ) -> Self {
152 use crate::transaction::TransactionWriteTracker;
153
154 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
156 if transaction_id.is_some() {
157 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
158 &transaction_manager,
159 ))))
160 } else {
161 None
162 };
163
164 Self {
165 store,
166 write_store,
167 transaction_manager: Some(transaction_manager),
168 transaction_id,
169 viewing_epoch,
170 anon_edge_counter: std::cell::Cell::new(0),
171 factorized_execution: true,
172 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
173 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
174 validator: None,
175 catalog: None,
176 correlated_param_state: std::cell::RefCell::new(None),
177 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
178 profiling: std::cell::Cell::new(false),
179 profile_entries: std::cell::RefCell::new(Vec::new()),
180 write_tracker,
181 session_context: grafeo_core::execution::operators::SessionContext::default(),
182 read_only: false,
183 }
184 }
185
186 #[must_use]
189 pub fn with_read_only(mut self, read_only: bool) -> Self {
190 self.read_only = read_only;
191 self
192 }
193
194 fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
196 self.write_store
197 .as_ref()
198 .map(Arc::clone)
199 .ok_or(Error::Transaction(
200 grafeo_common::utils::error::TransactionError::ReadOnly,
201 ))
202 }
203
204 #[must_use]
206 pub fn viewing_epoch(&self) -> EpochId {
207 self.viewing_epoch
208 }
209
210 #[must_use]
212 pub fn transaction_id(&self) -> Option<TransactionId> {
213 self.transaction_id
214 }
215
216 #[must_use]
218 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
219 self.transaction_manager.as_ref()
220 }
221
222 #[must_use]
224 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
225 self.factorized_execution = enabled;
226 self
227 }
228
229 #[must_use]
231 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
232 self.validator = Some(validator);
233 self
234 }
235
236 #[must_use]
238 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
239 self.catalog = Some(catalog);
240 self
241 }
242
243 #[must_use]
245 pub fn with_session_context(
246 mut self,
247 context: grafeo_core::execution::operators::SessionContext,
248 ) -> Self {
249 self.session_context = context;
250 self
251 }
252
253 pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
259 let name = edge_variable.clone().unwrap_or_else(|| {
260 let count = self.anon_edge_counter.get();
261 self.anon_edge_counter.set(count + 1);
262 format!("_anon_edge_{}", count)
263 });
264 self.edge_columns.borrow_mut().insert(name.clone());
265 name
266 }
267
268 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
272 match op {
273 LogicalOperator::Expand(expand) => {
274 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
275
276 if is_single_hop {
277 let (inner_count, base) = Self::count_expand_chain(&expand.input);
278 (inner_count + 1, base)
279 } else {
280 (0, op)
281 }
282 }
283 _ => (0, op),
284 }
285 }
286
287 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
291 let mut chain = Vec::new();
292 let mut current = op;
293
294 while let LogicalOperator::Expand(expand) = current {
295 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
296 if !is_single_hop {
297 break;
298 }
299 chain.push(expand);
300 current = &expand.input;
301 }
302
303 chain.reverse();
304 chain
305 }
306
307 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
314 let _span = grafeo_debug_span!("grafeo::query::plan");
315 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
316 Ok(PhysicalPlan {
317 operator,
318 columns,
319 adaptive_context: None,
320 })
321 }
322
323 pub fn plan_profiled(
334 &self,
335 logical_plan: &LogicalPlan,
336 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
337 self.profiling.set(true);
338 self.profile_entries.borrow_mut().clear();
339
340 let result = self.plan_operator(&logical_plan.root);
341
342 self.profiling.set(false);
343 let (operator, columns) = result?;
344 let entries = self.profile_entries.borrow_mut().drain(..).collect();
345
346 Ok((
347 PhysicalPlan {
348 operator,
349 columns,
350 adaptive_context: None,
351 },
352 entries,
353 ))
354 }
355
356 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
363 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
364
365 let mut adaptive_context = AdaptiveContext::new();
366 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
367
368 Ok(PhysicalPlan {
369 operator,
370 columns,
371 adaptive_context: Some(adaptive_context),
372 })
373 }
374
375 fn collect_cardinality_estimates(
377 &self,
378 op: &LogicalOperator,
379 ctx: &mut AdaptiveContext,
380 depth: usize,
381 ) {
382 match op {
383 LogicalOperator::NodeScan(scan) => {
384 let estimate = if let Some(label) = &scan.label {
385 self.store.nodes_by_label(label).len() as f64
386 } else {
387 self.store.node_count() as f64
388 };
389 let id = format!("scan_{}", scan.variable);
390 ctx.set_estimate(&id, estimate);
391
392 if let Some(input) = &scan.input {
393 self.collect_cardinality_estimates(input, ctx, depth + 1);
394 }
395 }
396 LogicalOperator::Filter(filter) => {
397 let input_estimate = self.estimate_cardinality(&filter.input);
398 let estimate = input_estimate * 0.3;
399 let id = format!("filter_{depth}");
400 ctx.set_estimate(&id, estimate);
401
402 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
403 }
404 LogicalOperator::Expand(expand) => {
405 let input_estimate = self.estimate_cardinality(&expand.input);
406 let stats = self.store.statistics();
407 let avg_degree = self.estimate_expand_degree(&stats, expand);
408 let estimate = input_estimate * avg_degree;
409 let id = format!("expand_{}", expand.to_variable);
410 ctx.set_estimate(&id, estimate);
411
412 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
413 }
414 LogicalOperator::Join(join) => {
415 let left_est = self.estimate_cardinality(&join.left);
416 let right_est = self.estimate_cardinality(&join.right);
417 let estimate = (left_est * right_est).sqrt();
418 let id = format!("join_{depth}");
419 ctx.set_estimate(&id, estimate);
420
421 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
422 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
423 }
424 LogicalOperator::Aggregate(agg) => {
425 let input_estimate = self.estimate_cardinality(&agg.input);
426 let estimate = if agg.group_by.is_empty() {
427 1.0
428 } else {
429 (input_estimate * 0.1).max(1.0)
430 };
431 let id = format!("aggregate_{depth}");
432 ctx.set_estimate(&id, estimate);
433
434 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
435 }
436 LogicalOperator::Distinct(distinct) => {
437 let input_estimate = self.estimate_cardinality(&distinct.input);
438 let estimate = (input_estimate * 0.5).max(1.0);
439 let id = format!("distinct_{depth}");
440 ctx.set_estimate(&id, estimate);
441
442 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
443 }
444 LogicalOperator::Return(ret) => {
445 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
446 }
447 LogicalOperator::Limit(limit) => {
448 let input_estimate = self.estimate_cardinality(&limit.input);
449 let estimate = (input_estimate).min(limit.count.estimate());
450 let id = format!("limit_{depth}");
451 ctx.set_estimate(&id, estimate);
452
453 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
454 }
455 LogicalOperator::Skip(skip) => {
456 let input_estimate = self.estimate_cardinality(&skip.input);
457 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
458 let id = format!("skip_{depth}");
459 ctx.set_estimate(&id, estimate);
460
461 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
462 }
463 LogicalOperator::Sort(sort) => {
464 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
465 }
466 LogicalOperator::Union(union) => {
467 let estimate: f64 = union
468 .inputs
469 .iter()
470 .map(|input| self.estimate_cardinality(input))
471 .sum();
472 let id = format!("union_{depth}");
473 ctx.set_estimate(&id, estimate);
474
475 for input in &union.inputs {
476 self.collect_cardinality_estimates(input, ctx, depth + 1);
477 }
478 }
479 _ => {
480 }
482 }
483 }
484
485 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
487 match op {
488 LogicalOperator::NodeScan(scan) => {
489 if let Some(label) = &scan.label {
490 self.store.nodes_by_label(label).len() as f64
491 } else {
492 self.store.node_count() as f64
493 }
494 }
495 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
496 LogicalOperator::Expand(expand) => {
497 let stats = self.store.statistics();
498 let avg_degree = self.estimate_expand_degree(&stats, expand);
499 self.estimate_cardinality(&expand.input) * avg_degree
500 }
501 LogicalOperator::Join(join) => {
502 let left = self.estimate_cardinality(&join.left);
503 let right = self.estimate_cardinality(&join.right);
504 (left * right).sqrt()
505 }
506 LogicalOperator::Aggregate(agg) => {
507 if agg.group_by.is_empty() {
508 1.0
509 } else {
510 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
511 }
512 }
513 LogicalOperator::Distinct(distinct) => {
514 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
515 }
516 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
517 LogicalOperator::Limit(limit) => self
518 .estimate_cardinality(&limit.input)
519 .min(limit.count.estimate()),
520 LogicalOperator::Skip(skip) => {
521 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
522 }
523 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
524 LogicalOperator::Union(union) => union
525 .inputs
526 .iter()
527 .map(|input| self.estimate_cardinality(input))
528 .sum(),
529 LogicalOperator::Except(except) => {
530 let left = self.estimate_cardinality(&except.left);
531 let right = self.estimate_cardinality(&except.right);
532 (left - right).max(0.0)
533 }
534 LogicalOperator::Intersect(intersect) => {
535 let left = self.estimate_cardinality(&intersect.left);
536 let right = self.estimate_cardinality(&intersect.right);
537 left.min(right)
538 }
539 LogicalOperator::Otherwise(otherwise) => self
540 .estimate_cardinality(&otherwise.left)
541 .max(self.estimate_cardinality(&otherwise.right)),
542 _ => 1000.0,
543 }
544 }
545
546 fn estimate_expand_degree(
548 &self,
549 stats: &grafeo_core::statistics::Statistics,
550 expand: &ExpandOp,
551 ) -> f64 {
552 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
553 if expand.edge_types.len() == 1 {
554 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
555 } else if stats.total_nodes > 0 {
556 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
557 } else {
558 10.0
559 }
560 }
561
562 fn maybe_profile(
565 &self,
566 result: Result<(Box<dyn Operator>, Vec<String>)>,
567 op: &LogicalOperator,
568 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
569 if self.profiling.get() {
570 let (physical, columns) = result?;
571 let (entry, stats) =
572 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
573 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
574 self.profile_entries.borrow_mut().push(entry);
575 Ok((Box::new(profiled), columns))
576 } else {
577 result
578 }
579 }
580
581 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
583 let result = match op {
584 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
585 LogicalOperator::Expand(expand) => {
586 if self.factorized_execution {
587 let (chain_len, _base) = Self::count_expand_chain(op);
588 if chain_len >= 2 {
589 return self.maybe_profile(self.plan_expand_chain(op), op);
590 }
591 }
592 self.plan_expand(expand)
593 }
594 LogicalOperator::Return(ret) => self.plan_return(ret),
595 LogicalOperator::Filter(filter) => self.plan_filter(filter),
596 LogicalOperator::Project(project) => self.plan_project(project),
597 LogicalOperator::Limit(limit) => self.plan_limit(limit),
598 LogicalOperator::Skip(skip) => self.plan_skip(skip),
599 LogicalOperator::Sort(sort) => self.plan_sort(sort),
600 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
601 LogicalOperator::Join(join) => self.plan_join(join),
602 LogicalOperator::Union(union) => self.plan_union(union),
603 LogicalOperator::Except(except) => self.plan_except(except),
604 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
605 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
606 LogicalOperator::Apply(apply) => self.plan_apply(apply),
607 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
608 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
609 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
610 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
611 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
612 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
613 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
614 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
615 LogicalOperator::Merge(merge) => self.plan_merge(merge),
616 LogicalOperator::MergeRelationship(merge_rel) => {
617 self.plan_merge_relationship(merge_rel)
618 }
619 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
620 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
621 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
622 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
623 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
624 #[cfg(feature = "algos")]
625 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
626 #[cfg(not(feature = "algos"))]
627 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
628 "CALL procedures require the 'algos' feature".to_string(),
629 )),
630 LogicalOperator::ParameterScan(_param_scan) => {
631 let state = self
632 .correlated_param_state
633 .borrow()
634 .clone()
635 .ok_or_else(|| {
636 Error::Internal(
637 "ParameterScan without correlated Apply context".to_string(),
638 )
639 })?;
640 let columns = state.columns.clone();
643 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
644 Ok((operator, columns))
645 }
646 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
647 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
648 LogicalOperator::LoadData(load) => {
649 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
650 load.path.clone(),
651 load.format,
652 load.with_headers,
653 load.field_terminator,
654 load.variable.clone(),
655 ));
656 Ok((operator, vec![load.variable.clone()]))
657 }
658 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
659 LogicalOperator::VectorScan(_) => Err(Error::Internal(
660 "VectorScan requires vector-index feature".to_string(),
661 )),
662 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
663 "VectorJoin requires vector-index feature".to_string(),
664 )),
665 _ => Err(Error::Internal(format!(
666 "Unsupported operator: {:?}",
667 std::mem::discriminant(op)
668 ))),
669 };
670 self.maybe_profile(result, op)
671 }
672
673 fn plan_horizontal_aggregate(
675 &self,
676 ha: &HorizontalAggregateOp,
677 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
678 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
679
680 let list_col_idx = child_columns
681 .iter()
682 .position(|c| c == &ha.list_column)
683 .ok_or_else(|| {
684 Error::Internal(format!(
685 "HorizontalAggregate list column '{}' not found in {:?}",
686 ha.list_column, child_columns
687 ))
688 })?;
689
690 let entity_kind = match ha.entity_kind {
691 LogicalEntityKind::Edge => EntityKind::Edge,
692 LogicalEntityKind::Node => EntityKind::Node,
693 };
694
695 let function = convert_aggregate_function(ha.function);
696 let input_column_count = child_columns.len();
697
698 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
699 child_op,
700 list_col_idx,
701 entity_kind,
702 function,
703 ha.property.clone(),
704 Arc::clone(&self.store) as Arc<dyn GraphStore>,
705 input_column_count,
706 ));
707
708 let mut columns = child_columns;
709 columns.push(ha.alias.clone());
710 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
712
713 Ok((operator, columns))
714 }
715
716 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
718 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
719 let key_idx = child_columns
720 .iter()
721 .position(|c| c == &mc.key_var)
722 .ok_or_else(|| {
723 Error::Internal(format!(
724 "MapCollect key '{}' not in columns {:?}",
725 mc.key_var, child_columns
726 ))
727 })?;
728 let value_idx = child_columns
729 .iter()
730 .position(|c| c == &mc.value_var)
731 .ok_or_else(|| {
732 Error::Internal(format!(
733 "MapCollect value '{}' not in columns {:?}",
734 mc.value_var, child_columns
735 ))
736 })?;
737 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
738 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
739 Ok((operator, vec![mc.alias.clone()]))
740 }
741}
742
743#[cfg(feature = "algos")]
745struct StaticResultOperator {
746 rows: Vec<Vec<Value>>,
747 column_indices: Vec<usize>,
748 row_index: usize,
749}
750
751#[cfg(feature = "algos")]
752impl Operator for StaticResultOperator {
753 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
754 use grafeo_core::execution::DataChunk;
755
756 if self.row_index >= self.rows.len() {
757 return Ok(None);
758 }
759
760 let remaining = self.rows.len() - self.row_index;
761 let chunk_rows = remaining.min(1024);
762 let col_count = self.column_indices.len();
763
764 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
765 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
766
767 for row_offset in 0..chunk_rows {
768 let row = &self.rows[self.row_index + row_offset];
769 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
770 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
771 if let Some(col) = chunk.column_mut(col_idx) {
772 col.push_value(value);
773 }
774 }
775 }
776 chunk.set_count(chunk_rows);
777
778 self.row_index += chunk_rows;
779 Ok(Some(chunk))
780 }
781
782 fn reset(&mut self) {
783 self.row_index = 0;
784 }
785
786 fn name(&self) -> &'static str {
787 "StaticResult"
788 }
789}
790
791#[cfg(test)]
792mod tests {
793 use super::*;
794 use crate::query::plan::{
795 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
796 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
797 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
798 SkipOp as LogicalSkipOp, SortKey, SortOp,
799 };
800 use grafeo_common::types::Value;
801 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
802 use grafeo_core::graph::GraphStoreMut;
803 use grafeo_core::graph::lpg::LpgStore;
804
805 fn create_test_store() -> Arc<LpgStore> {
806 let store = Arc::new(LpgStore::new().unwrap());
807 store.create_node(&["Person"]);
808 store.create_node(&["Person"]);
809 store.create_node(&["Company"]);
810 store
811 }
812
813 #[test]
816 fn test_plan_simple_scan() {
817 let store = create_test_store();
818 let planner = Planner::new(store);
819
820 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
822 items: vec![ReturnItem {
823 expression: LogicalExpression::Variable("n".to_string()),
824 alias: None,
825 }],
826 distinct: false,
827 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
828 variable: "n".to_string(),
829 label: Some("Person".to_string()),
830 input: None,
831 })),
832 }));
833
834 let physical = planner.plan(&logical).unwrap();
835 assert_eq!(physical.columns(), &["n"]);
836 }
837
838 #[test]
839 fn test_plan_scan_without_label() {
840 let store = create_test_store();
841 let planner = Planner::new(store);
842
843 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
845 items: vec![ReturnItem {
846 expression: LogicalExpression::Variable("n".to_string()),
847 alias: None,
848 }],
849 distinct: false,
850 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
851 variable: "n".to_string(),
852 label: None,
853 input: None,
854 })),
855 }));
856
857 let physical = planner.plan(&logical).unwrap();
858 assert_eq!(physical.columns(), &["n"]);
859 }
860
861 #[test]
862 fn test_plan_return_with_alias() {
863 let store = create_test_store();
864 let planner = Planner::new(store);
865
866 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
868 items: vec![ReturnItem {
869 expression: LogicalExpression::Variable("n".to_string()),
870 alias: Some("person".to_string()),
871 }],
872 distinct: false,
873 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
874 variable: "n".to_string(),
875 label: Some("Person".to_string()),
876 input: None,
877 })),
878 }));
879
880 let physical = planner.plan(&logical).unwrap();
881 assert_eq!(physical.columns(), &["person"]);
882 }
883
884 #[test]
885 fn test_plan_return_property() {
886 let store = create_test_store();
887 let planner = Planner::new(store);
888
889 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
891 items: vec![ReturnItem {
892 expression: LogicalExpression::Property {
893 variable: "n".to_string(),
894 property: "name".to_string(),
895 },
896 alias: None,
897 }],
898 distinct: false,
899 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
900 variable: "n".to_string(),
901 label: Some("Person".to_string()),
902 input: None,
903 })),
904 }));
905
906 let physical = planner.plan(&logical).unwrap();
907 assert_eq!(physical.columns(), &["n.name"]);
908 }
909
910 #[test]
911 fn test_plan_return_literal() {
912 let store = create_test_store();
913 let planner = Planner::new(store);
914
915 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
917 items: vec![ReturnItem {
918 expression: LogicalExpression::Literal(Value::Int64(42)),
919 alias: Some("answer".to_string()),
920 }],
921 distinct: false,
922 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
923 variable: "n".to_string(),
924 label: None,
925 input: None,
926 })),
927 }));
928
929 let physical = planner.plan(&logical).unwrap();
930 assert_eq!(physical.columns(), &["answer"]);
931 }
932
933 #[test]
936 fn test_plan_filter_equality() {
937 let store = create_test_store();
938 let planner = Planner::new(store);
939
940 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
942 items: vec![ReturnItem {
943 expression: LogicalExpression::Variable("n".to_string()),
944 alias: None,
945 }],
946 distinct: false,
947 input: Box::new(LogicalOperator::Filter(FilterOp {
948 predicate: LogicalExpression::Binary {
949 left: Box::new(LogicalExpression::Property {
950 variable: "n".to_string(),
951 property: "age".to_string(),
952 }),
953 op: BinaryOp::Eq,
954 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
955 },
956 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
957 variable: "n".to_string(),
958 label: Some("Person".to_string()),
959 input: None,
960 })),
961 pushdown_hint: None,
962 })),
963 }));
964
965 let physical = planner.plan(&logical).unwrap();
966 assert_eq!(physical.columns(), &["n"]);
967 }
968
969 #[test]
970 fn test_plan_filter_compound_and() {
971 let store = create_test_store();
972 let planner = Planner::new(store);
973
974 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
976 items: vec![ReturnItem {
977 expression: LogicalExpression::Variable("n".to_string()),
978 alias: None,
979 }],
980 distinct: false,
981 input: Box::new(LogicalOperator::Filter(FilterOp {
982 predicate: LogicalExpression::Binary {
983 left: Box::new(LogicalExpression::Binary {
984 left: Box::new(LogicalExpression::Property {
985 variable: "n".to_string(),
986 property: "age".to_string(),
987 }),
988 op: BinaryOp::Gt,
989 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
990 }),
991 op: BinaryOp::And,
992 right: Box::new(LogicalExpression::Binary {
993 left: Box::new(LogicalExpression::Property {
994 variable: "n".to_string(),
995 property: "age".to_string(),
996 }),
997 op: BinaryOp::Lt,
998 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
999 }),
1000 },
1001 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1002 variable: "n".to_string(),
1003 label: None,
1004 input: None,
1005 })),
1006 pushdown_hint: None,
1007 })),
1008 }));
1009
1010 let physical = planner.plan(&logical).unwrap();
1011 assert_eq!(physical.columns(), &["n"]);
1012 }
1013
1014 #[test]
1015 fn test_plan_filter_unary_not() {
1016 let store = create_test_store();
1017 let planner = Planner::new(store);
1018
1019 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1021 items: vec![ReturnItem {
1022 expression: LogicalExpression::Variable("n".to_string()),
1023 alias: None,
1024 }],
1025 distinct: false,
1026 input: Box::new(LogicalOperator::Filter(FilterOp {
1027 predicate: LogicalExpression::Unary {
1028 op: UnaryOp::Not,
1029 operand: Box::new(LogicalExpression::Property {
1030 variable: "n".to_string(),
1031 property: "active".to_string(),
1032 }),
1033 },
1034 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1035 variable: "n".to_string(),
1036 label: None,
1037 input: None,
1038 })),
1039 pushdown_hint: None,
1040 })),
1041 }));
1042
1043 let physical = planner.plan(&logical).unwrap();
1044 assert_eq!(physical.columns(), &["n"]);
1045 }
1046
1047 #[test]
1048 fn test_plan_filter_is_null() {
1049 let store = create_test_store();
1050 let planner = Planner::new(store);
1051
1052 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1054 items: vec![ReturnItem {
1055 expression: LogicalExpression::Variable("n".to_string()),
1056 alias: None,
1057 }],
1058 distinct: false,
1059 input: Box::new(LogicalOperator::Filter(FilterOp {
1060 predicate: LogicalExpression::Unary {
1061 op: UnaryOp::IsNull,
1062 operand: Box::new(LogicalExpression::Property {
1063 variable: "n".to_string(),
1064 property: "email".to_string(),
1065 }),
1066 },
1067 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1068 variable: "n".to_string(),
1069 label: None,
1070 input: None,
1071 })),
1072 pushdown_hint: None,
1073 })),
1074 }));
1075
1076 let physical = planner.plan(&logical).unwrap();
1077 assert_eq!(physical.columns(), &["n"]);
1078 }
1079
1080 #[test]
1081 fn test_plan_filter_function_call() {
1082 let store = create_test_store();
1083 let planner = Planner::new(store);
1084
1085 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1087 items: vec![ReturnItem {
1088 expression: LogicalExpression::Variable("n".to_string()),
1089 alias: None,
1090 }],
1091 distinct: false,
1092 input: Box::new(LogicalOperator::Filter(FilterOp {
1093 predicate: LogicalExpression::Binary {
1094 left: Box::new(LogicalExpression::FunctionCall {
1095 name: "size".to_string(),
1096 args: vec![LogicalExpression::Property {
1097 variable: "n".to_string(),
1098 property: "friends".to_string(),
1099 }],
1100 distinct: false,
1101 }),
1102 op: BinaryOp::Gt,
1103 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1104 },
1105 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1106 variable: "n".to_string(),
1107 label: None,
1108 input: None,
1109 })),
1110 pushdown_hint: None,
1111 })),
1112 }));
1113
1114 let physical = planner.plan(&logical).unwrap();
1115 assert_eq!(physical.columns(), &["n"]);
1116 }
1117
1118 #[test]
1121 fn test_plan_expand_outgoing() {
1122 let store = create_test_store();
1123 let planner = Planner::new(store);
1124
1125 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1127 items: vec![
1128 ReturnItem {
1129 expression: LogicalExpression::Variable("a".to_string()),
1130 alias: None,
1131 },
1132 ReturnItem {
1133 expression: LogicalExpression::Variable("b".to_string()),
1134 alias: None,
1135 },
1136 ],
1137 distinct: false,
1138 input: Box::new(LogicalOperator::Expand(ExpandOp {
1139 from_variable: "a".to_string(),
1140 to_variable: "b".to_string(),
1141 edge_variable: None,
1142 direction: ExpandDirection::Outgoing,
1143 edge_types: vec!["KNOWS".to_string()],
1144 min_hops: 1,
1145 max_hops: Some(1),
1146 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1147 variable: "a".to_string(),
1148 label: Some("Person".to_string()),
1149 input: None,
1150 })),
1151 path_alias: None,
1152 path_mode: PathMode::Walk,
1153 })),
1154 }));
1155
1156 let physical = planner.plan(&logical).unwrap();
1157 assert!(physical.columns().contains(&"a".to_string()));
1159 assert!(physical.columns().contains(&"b".to_string()));
1160 }
1161
1162 #[test]
1163 fn test_plan_expand_with_edge_variable() {
1164 let store = create_test_store();
1165 let planner = Planner::new(store);
1166
1167 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1169 items: vec![
1170 ReturnItem {
1171 expression: LogicalExpression::Variable("a".to_string()),
1172 alias: None,
1173 },
1174 ReturnItem {
1175 expression: LogicalExpression::Variable("r".to_string()),
1176 alias: None,
1177 },
1178 ReturnItem {
1179 expression: LogicalExpression::Variable("b".to_string()),
1180 alias: None,
1181 },
1182 ],
1183 distinct: false,
1184 input: Box::new(LogicalOperator::Expand(ExpandOp {
1185 from_variable: "a".to_string(),
1186 to_variable: "b".to_string(),
1187 edge_variable: Some("r".to_string()),
1188 direction: ExpandDirection::Outgoing,
1189 edge_types: vec!["KNOWS".to_string()],
1190 min_hops: 1,
1191 max_hops: Some(1),
1192 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1193 variable: "a".to_string(),
1194 label: None,
1195 input: None,
1196 })),
1197 path_alias: None,
1198 path_mode: PathMode::Walk,
1199 })),
1200 }));
1201
1202 let physical = planner.plan(&logical).unwrap();
1203 assert!(physical.columns().contains(&"a".to_string()));
1204 assert!(physical.columns().contains(&"r".to_string()));
1205 assert!(physical.columns().contains(&"b".to_string()));
1206 }
1207
1208 #[test]
1211 fn test_plan_limit() {
1212 let store = create_test_store();
1213 let planner = Planner::new(store);
1214
1215 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1217 items: vec![ReturnItem {
1218 expression: LogicalExpression::Variable("n".to_string()),
1219 alias: None,
1220 }],
1221 distinct: false,
1222 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1223 count: 10.into(),
1224 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1225 variable: "n".to_string(),
1226 label: None,
1227 input: None,
1228 })),
1229 })),
1230 }));
1231
1232 let physical = planner.plan(&logical).unwrap();
1233 assert_eq!(physical.columns(), &["n"]);
1234 }
1235
1236 #[test]
1237 fn test_plan_skip() {
1238 let store = create_test_store();
1239 let planner = Planner::new(store);
1240
1241 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1243 items: vec![ReturnItem {
1244 expression: LogicalExpression::Variable("n".to_string()),
1245 alias: None,
1246 }],
1247 distinct: false,
1248 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1249 count: 5.into(),
1250 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1251 variable: "n".to_string(),
1252 label: None,
1253 input: None,
1254 })),
1255 })),
1256 }));
1257
1258 let physical = planner.plan(&logical).unwrap();
1259 assert_eq!(physical.columns(), &["n"]);
1260 }
1261
1262 #[test]
1263 fn test_plan_sort() {
1264 let store = create_test_store();
1265 let planner = Planner::new(store);
1266
1267 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1269 items: vec![ReturnItem {
1270 expression: LogicalExpression::Variable("n".to_string()),
1271 alias: None,
1272 }],
1273 distinct: false,
1274 input: Box::new(LogicalOperator::Sort(SortOp {
1275 keys: vec![SortKey {
1276 expression: LogicalExpression::Variable("n".to_string()),
1277 order: SortOrder::Ascending,
1278 nulls: None,
1279 }],
1280 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1281 variable: "n".to_string(),
1282 label: None,
1283 input: None,
1284 })),
1285 })),
1286 }));
1287
1288 let physical = planner.plan(&logical).unwrap();
1289 assert_eq!(physical.columns(), &["n"]);
1290 }
1291
1292 #[test]
1293 fn test_plan_sort_descending() {
1294 let store = create_test_store();
1295 let planner = Planner::new(store);
1296
1297 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1299 items: vec![ReturnItem {
1300 expression: LogicalExpression::Variable("n".to_string()),
1301 alias: None,
1302 }],
1303 distinct: false,
1304 input: Box::new(LogicalOperator::Sort(SortOp {
1305 keys: vec![SortKey {
1306 expression: LogicalExpression::Variable("n".to_string()),
1307 order: SortOrder::Descending,
1308 nulls: None,
1309 }],
1310 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1311 variable: "n".to_string(),
1312 label: None,
1313 input: None,
1314 })),
1315 })),
1316 }));
1317
1318 let physical = planner.plan(&logical).unwrap();
1319 assert_eq!(physical.columns(), &["n"]);
1320 }
1321
1322 #[test]
1323 fn test_plan_distinct() {
1324 let store = create_test_store();
1325 let planner = Planner::new(store);
1326
1327 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1329 items: vec![ReturnItem {
1330 expression: LogicalExpression::Variable("n".to_string()),
1331 alias: None,
1332 }],
1333 distinct: false,
1334 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1335 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1336 variable: "n".to_string(),
1337 label: None,
1338 input: None,
1339 })),
1340 columns: None,
1341 })),
1342 }));
1343
1344 let physical = planner.plan(&logical).unwrap();
1345 assert_eq!(physical.columns(), &["n"]);
1346 }
1347
1348 #[test]
1349 fn test_plan_distinct_with_columns() {
1350 let store = create_test_store();
1351 let planner = Planner::new(store);
1352
1353 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1355 items: vec![ReturnItem {
1356 expression: LogicalExpression::Variable("n".to_string()),
1357 alias: None,
1358 }],
1359 distinct: false,
1360 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1361 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1362 variable: "n".to_string(),
1363 label: None,
1364 input: None,
1365 })),
1366 columns: Some(vec!["n".to_string()]),
1367 })),
1368 }));
1369
1370 let physical = planner.plan(&logical).unwrap();
1371 assert_eq!(physical.columns(), &["n"]);
1372 }
1373
1374 #[test]
1375 fn test_plan_distinct_with_nonexistent_columns() {
1376 let store = create_test_store();
1377 let planner = Planner::new(store);
1378
1379 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1382 items: vec![ReturnItem {
1383 expression: LogicalExpression::Variable("n".to_string()),
1384 alias: None,
1385 }],
1386 distinct: false,
1387 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1388 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1389 variable: "n".to_string(),
1390 label: None,
1391 input: None,
1392 })),
1393 columns: Some(vec!["nonexistent".to_string()]),
1394 })),
1395 }));
1396
1397 let physical = planner.plan(&logical).unwrap();
1398 assert_eq!(physical.columns(), &["n"]);
1399 }
1400
1401 #[test]
1404 fn test_plan_aggregate_count() {
1405 let store = create_test_store();
1406 let planner = Planner::new(store);
1407
1408 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1410 items: vec![ReturnItem {
1411 expression: LogicalExpression::Variable("cnt".to_string()),
1412 alias: None,
1413 }],
1414 distinct: false,
1415 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1416 group_by: vec![],
1417 aggregates: vec![LogicalAggregateExpr {
1418 function: LogicalAggregateFunction::Count,
1419 expression: Some(LogicalExpression::Variable("n".to_string())),
1420 expression2: None,
1421 distinct: false,
1422 alias: Some("cnt".to_string()),
1423 percentile: None,
1424 separator: None,
1425 }],
1426 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1427 variable: "n".to_string(),
1428 label: None,
1429 input: None,
1430 })),
1431 having: None,
1432 })),
1433 }));
1434
1435 let physical = planner.plan(&logical).unwrap();
1436 assert!(physical.columns().contains(&"cnt".to_string()));
1437 }
1438
1439 #[test]
1440 fn test_plan_aggregate_with_group_by() {
1441 let store = create_test_store();
1442 let planner = Planner::new(store);
1443
1444 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1446 group_by: vec![LogicalExpression::Property {
1447 variable: "n".to_string(),
1448 property: "city".to_string(),
1449 }],
1450 aggregates: vec![LogicalAggregateExpr {
1451 function: LogicalAggregateFunction::Count,
1452 expression: Some(LogicalExpression::Variable("n".to_string())),
1453 expression2: None,
1454 distinct: false,
1455 alias: Some("cnt".to_string()),
1456 percentile: None,
1457 separator: None,
1458 }],
1459 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1460 variable: "n".to_string(),
1461 label: Some("Person".to_string()),
1462 input: None,
1463 })),
1464 having: None,
1465 }));
1466
1467 let physical = planner.plan(&logical).unwrap();
1468 assert_eq!(physical.columns().len(), 2);
1469 }
1470
1471 #[test]
1472 fn test_plan_aggregate_sum() {
1473 let store = create_test_store();
1474 let planner = Planner::new(store);
1475
1476 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1478 group_by: vec![],
1479 aggregates: vec![LogicalAggregateExpr {
1480 function: LogicalAggregateFunction::Sum,
1481 expression: Some(LogicalExpression::Property {
1482 variable: "n".to_string(),
1483 property: "value".to_string(),
1484 }),
1485 expression2: None,
1486 distinct: false,
1487 alias: Some("total".to_string()),
1488 percentile: None,
1489 separator: None,
1490 }],
1491 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1492 variable: "n".to_string(),
1493 label: None,
1494 input: None,
1495 })),
1496 having: None,
1497 }));
1498
1499 let physical = planner.plan(&logical).unwrap();
1500 assert!(physical.columns().contains(&"total".to_string()));
1501 }
1502
1503 #[test]
1504 fn test_plan_aggregate_avg() {
1505 let store = create_test_store();
1506 let planner = Planner::new(store);
1507
1508 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1510 group_by: vec![],
1511 aggregates: vec![LogicalAggregateExpr {
1512 function: LogicalAggregateFunction::Avg,
1513 expression: Some(LogicalExpression::Property {
1514 variable: "n".to_string(),
1515 property: "score".to_string(),
1516 }),
1517 expression2: None,
1518 distinct: false,
1519 alias: Some("average".to_string()),
1520 percentile: None,
1521 separator: None,
1522 }],
1523 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1524 variable: "n".to_string(),
1525 label: None,
1526 input: None,
1527 })),
1528 having: None,
1529 }));
1530
1531 let physical = planner.plan(&logical).unwrap();
1532 assert!(physical.columns().contains(&"average".to_string()));
1533 }
1534
1535 #[test]
1536 fn test_plan_aggregate_min_max() {
1537 let store = create_test_store();
1538 let planner = Planner::new(store);
1539
1540 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1542 group_by: vec![],
1543 aggregates: vec![
1544 LogicalAggregateExpr {
1545 function: LogicalAggregateFunction::Min,
1546 expression: Some(LogicalExpression::Property {
1547 variable: "n".to_string(),
1548 property: "age".to_string(),
1549 }),
1550 expression2: None,
1551 distinct: false,
1552 alias: Some("youngest".to_string()),
1553 percentile: None,
1554 separator: None,
1555 },
1556 LogicalAggregateExpr {
1557 function: LogicalAggregateFunction::Max,
1558 expression: Some(LogicalExpression::Property {
1559 variable: "n".to_string(),
1560 property: "age".to_string(),
1561 }),
1562 expression2: None,
1563 distinct: false,
1564 alias: Some("oldest".to_string()),
1565 percentile: None,
1566 separator: None,
1567 },
1568 ],
1569 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1570 variable: "n".to_string(),
1571 label: None,
1572 input: None,
1573 })),
1574 having: None,
1575 }));
1576
1577 let physical = planner.plan(&logical).unwrap();
1578 assert!(physical.columns().contains(&"youngest".to_string()));
1579 assert!(physical.columns().contains(&"oldest".to_string()));
1580 }
1581
1582 #[test]
1585 fn test_plan_inner_join() {
1586 let store = create_test_store();
1587 let planner = Planner::new(store);
1588
1589 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1591 items: vec![
1592 ReturnItem {
1593 expression: LogicalExpression::Variable("a".to_string()),
1594 alias: None,
1595 },
1596 ReturnItem {
1597 expression: LogicalExpression::Variable("b".to_string()),
1598 alias: None,
1599 },
1600 ],
1601 distinct: false,
1602 input: Box::new(LogicalOperator::Join(JoinOp {
1603 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1604 variable: "a".to_string(),
1605 label: Some("Person".to_string()),
1606 input: None,
1607 })),
1608 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1609 variable: "b".to_string(),
1610 label: Some("Company".to_string()),
1611 input: None,
1612 })),
1613 join_type: JoinType::Inner,
1614 conditions: vec![JoinCondition {
1615 left: LogicalExpression::Variable("a".to_string()),
1616 right: LogicalExpression::Variable("b".to_string()),
1617 }],
1618 })),
1619 }));
1620
1621 let physical = planner.plan(&logical).unwrap();
1622 assert!(physical.columns().contains(&"a".to_string()));
1623 assert!(physical.columns().contains(&"b".to_string()));
1624 }
1625
1626 #[test]
1627 fn test_plan_cross_join() {
1628 let store = create_test_store();
1629 let planner = Planner::new(store);
1630
1631 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1633 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1634 variable: "a".to_string(),
1635 label: None,
1636 input: None,
1637 })),
1638 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1639 variable: "b".to_string(),
1640 label: None,
1641 input: None,
1642 })),
1643 join_type: JoinType::Cross,
1644 conditions: vec![],
1645 }));
1646
1647 let physical = planner.plan(&logical).unwrap();
1648 assert_eq!(physical.columns().len(), 2);
1649 }
1650
1651 #[test]
1652 fn test_plan_left_join() {
1653 let store = create_test_store();
1654 let planner = Planner::new(store);
1655
1656 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1657 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1658 variable: "a".to_string(),
1659 label: None,
1660 input: None,
1661 })),
1662 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1663 variable: "b".to_string(),
1664 label: None,
1665 input: None,
1666 })),
1667 join_type: JoinType::Left,
1668 conditions: vec![],
1669 }));
1670
1671 let physical = planner.plan(&logical).unwrap();
1672 assert_eq!(physical.columns().len(), 2);
1673 }
1674
1675 fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1678 let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStore>);
1679 p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1680 p
1681 }
1682
1683 #[test]
1684 fn test_plan_create_node() {
1685 let store = create_test_store();
1686 let planner = create_writable_planner(&store);
1687
1688 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1690 variable: "n".to_string(),
1691 labels: vec!["Person".to_string()],
1692 properties: vec![(
1693 "name".to_string(),
1694 LogicalExpression::Literal(Value::String("Alix".into())),
1695 )],
1696 input: None,
1697 }));
1698
1699 let physical = planner.plan(&logical).unwrap();
1700 assert!(physical.columns().contains(&"n".to_string()));
1701 }
1702
1703 #[test]
1704 fn test_plan_create_edge() {
1705 let store = create_test_store();
1706 let planner = create_writable_planner(&store);
1707
1708 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1710 variable: Some("r".to_string()),
1711 from_variable: "a".to_string(),
1712 to_variable: "b".to_string(),
1713 edge_type: "KNOWS".to_string(),
1714 properties: vec![],
1715 input: Box::new(LogicalOperator::Join(JoinOp {
1716 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1717 variable: "a".to_string(),
1718 label: None,
1719 input: None,
1720 })),
1721 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1722 variable: "b".to_string(),
1723 label: None,
1724 input: None,
1725 })),
1726 join_type: JoinType::Cross,
1727 conditions: vec![],
1728 })),
1729 }));
1730
1731 let physical = planner.plan(&logical).unwrap();
1732 assert!(physical.columns().contains(&"r".to_string()));
1733 }
1734
1735 #[test]
1736 fn test_plan_delete_node() {
1737 let store = create_test_store();
1738 let planner = create_writable_planner(&store);
1739
1740 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1742 variable: "n".to_string(),
1743 detach: false,
1744 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1745 variable: "n".to_string(),
1746 label: None,
1747 input: None,
1748 })),
1749 }));
1750
1751 let physical = planner.plan(&logical).unwrap();
1752 assert!(physical.columns().contains(&"n".to_string()));
1753 }
1754
1755 #[test]
1758 fn test_plan_empty_errors() {
1759 let store = create_test_store();
1760 let planner = Planner::new(store);
1761
1762 let logical = LogicalPlan::new(LogicalOperator::Empty);
1763 let result = planner.plan(&logical);
1764 assert!(result.is_err());
1765 }
1766
1767 #[test]
1768 fn test_plan_missing_variable_in_return() {
1769 let store = create_test_store();
1770 let planner = Planner::new(store);
1771
1772 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1774 items: vec![ReturnItem {
1775 expression: LogicalExpression::Variable("missing".to_string()),
1776 alias: None,
1777 }],
1778 distinct: false,
1779 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1780 variable: "n".to_string(),
1781 label: None,
1782 input: None,
1783 })),
1784 }));
1785
1786 let result = planner.plan(&logical);
1787 assert!(result.is_err());
1788 }
1789
1790 #[test]
1793 fn test_convert_binary_ops() {
1794 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1795 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1796 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1797 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1798 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1799 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1800 assert!(convert_binary_op(BinaryOp::And).is_ok());
1801 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1802 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1803 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1804 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1805 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1806 }
1807
1808 #[test]
1809 fn test_convert_unary_ops() {
1810 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1811 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1812 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1813 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1814 }
1815
1816 #[test]
1817 fn test_convert_aggregate_functions() {
1818 assert!(matches!(
1819 convert_aggregate_function(LogicalAggregateFunction::Count),
1820 PhysicalAggregateFunction::Count
1821 ));
1822 assert!(matches!(
1823 convert_aggregate_function(LogicalAggregateFunction::Sum),
1824 PhysicalAggregateFunction::Sum
1825 ));
1826 assert!(matches!(
1827 convert_aggregate_function(LogicalAggregateFunction::Avg),
1828 PhysicalAggregateFunction::Avg
1829 ));
1830 assert!(matches!(
1831 convert_aggregate_function(LogicalAggregateFunction::Min),
1832 PhysicalAggregateFunction::Min
1833 ));
1834 assert!(matches!(
1835 convert_aggregate_function(LogicalAggregateFunction::Max),
1836 PhysicalAggregateFunction::Max
1837 ));
1838 }
1839
1840 #[test]
1841 fn test_planner_accessors() {
1842 let store = create_test_store();
1843 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1844
1845 assert!(planner.transaction_id().is_none());
1846 assert!(planner.transaction_manager().is_none());
1847 let _ = planner.viewing_epoch(); }
1849
1850 #[test]
1851 fn test_physical_plan_accessors() {
1852 let store = create_test_store();
1853 let planner = Planner::new(store);
1854
1855 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1856 variable: "n".to_string(),
1857 label: None,
1858 input: None,
1859 }));
1860
1861 let physical = planner.plan(&logical).unwrap();
1862 assert_eq!(physical.columns(), &["n"]);
1863
1864 let _ = physical.into_operator();
1866 }
1867
1868 #[test]
1871 fn test_plan_adaptive_with_scan() {
1872 let store = create_test_store();
1873 let planner = Planner::new(store);
1874
1875 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1877 items: vec![ReturnItem {
1878 expression: LogicalExpression::Variable("n".to_string()),
1879 alias: None,
1880 }],
1881 distinct: false,
1882 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1883 variable: "n".to_string(),
1884 label: Some("Person".to_string()),
1885 input: None,
1886 })),
1887 }));
1888
1889 let physical = planner.plan_adaptive(&logical).unwrap();
1890 assert_eq!(physical.columns(), &["n"]);
1891 assert!(physical.adaptive_context.is_some());
1893 }
1894
1895 #[test]
1896 fn test_plan_adaptive_with_filter() {
1897 let store = create_test_store();
1898 let planner = Planner::new(store);
1899
1900 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1902 items: vec![ReturnItem {
1903 expression: LogicalExpression::Variable("n".to_string()),
1904 alias: None,
1905 }],
1906 distinct: false,
1907 input: Box::new(LogicalOperator::Filter(FilterOp {
1908 predicate: LogicalExpression::Binary {
1909 left: Box::new(LogicalExpression::Property {
1910 variable: "n".to_string(),
1911 property: "age".to_string(),
1912 }),
1913 op: BinaryOp::Gt,
1914 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1915 },
1916 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1917 variable: "n".to_string(),
1918 label: None,
1919 input: None,
1920 })),
1921 pushdown_hint: None,
1922 })),
1923 }));
1924
1925 let physical = planner.plan_adaptive(&logical).unwrap();
1926 assert!(physical.adaptive_context.is_some());
1927 }
1928
1929 #[test]
1930 fn test_plan_adaptive_with_expand() {
1931 let store = create_test_store();
1932 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
1933 .with_factorized_execution(false);
1934
1935 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1937 items: vec![
1938 ReturnItem {
1939 expression: LogicalExpression::Variable("a".to_string()),
1940 alias: None,
1941 },
1942 ReturnItem {
1943 expression: LogicalExpression::Variable("b".to_string()),
1944 alias: None,
1945 },
1946 ],
1947 distinct: false,
1948 input: Box::new(LogicalOperator::Expand(ExpandOp {
1949 from_variable: "a".to_string(),
1950 to_variable: "b".to_string(),
1951 edge_variable: None,
1952 direction: ExpandDirection::Outgoing,
1953 edge_types: vec!["KNOWS".to_string()],
1954 min_hops: 1,
1955 max_hops: Some(1),
1956 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1957 variable: "a".to_string(),
1958 label: None,
1959 input: None,
1960 })),
1961 path_alias: None,
1962 path_mode: PathMode::Walk,
1963 })),
1964 }));
1965
1966 let physical = planner.plan_adaptive(&logical).unwrap();
1967 assert!(physical.adaptive_context.is_some());
1968 }
1969
1970 #[test]
1971 fn test_plan_adaptive_with_join() {
1972 let store = create_test_store();
1973 let planner = Planner::new(store);
1974
1975 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1976 items: vec![
1977 ReturnItem {
1978 expression: LogicalExpression::Variable("a".to_string()),
1979 alias: None,
1980 },
1981 ReturnItem {
1982 expression: LogicalExpression::Variable("b".to_string()),
1983 alias: None,
1984 },
1985 ],
1986 distinct: false,
1987 input: Box::new(LogicalOperator::Join(JoinOp {
1988 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1989 variable: "a".to_string(),
1990 label: None,
1991 input: None,
1992 })),
1993 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1994 variable: "b".to_string(),
1995 label: None,
1996 input: None,
1997 })),
1998 join_type: JoinType::Cross,
1999 conditions: vec![],
2000 })),
2001 }));
2002
2003 let physical = planner.plan_adaptive(&logical).unwrap();
2004 assert!(physical.adaptive_context.is_some());
2005 }
2006
2007 #[test]
2008 fn test_plan_adaptive_with_aggregate() {
2009 let store = create_test_store();
2010 let planner = Planner::new(store);
2011
2012 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2013 group_by: vec![],
2014 aggregates: vec![LogicalAggregateExpr {
2015 function: LogicalAggregateFunction::Count,
2016 expression: Some(LogicalExpression::Variable("n".to_string())),
2017 expression2: None,
2018 distinct: false,
2019 alias: Some("cnt".to_string()),
2020 percentile: None,
2021 separator: None,
2022 }],
2023 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2024 variable: "n".to_string(),
2025 label: None,
2026 input: None,
2027 })),
2028 having: None,
2029 }));
2030
2031 let physical = planner.plan_adaptive(&logical).unwrap();
2032 assert!(physical.adaptive_context.is_some());
2033 }
2034
2035 #[test]
2036 fn test_plan_adaptive_with_distinct() {
2037 let store = create_test_store();
2038 let planner = Planner::new(store);
2039
2040 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2041 items: vec![ReturnItem {
2042 expression: LogicalExpression::Variable("n".to_string()),
2043 alias: None,
2044 }],
2045 distinct: false,
2046 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2047 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2048 variable: "n".to_string(),
2049 label: None,
2050 input: None,
2051 })),
2052 columns: None,
2053 })),
2054 }));
2055
2056 let physical = planner.plan_adaptive(&logical).unwrap();
2057 assert!(physical.adaptive_context.is_some());
2058 }
2059
2060 #[test]
2061 fn test_plan_adaptive_with_limit() {
2062 let store = create_test_store();
2063 let planner = Planner::new(store);
2064
2065 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2066 items: vec![ReturnItem {
2067 expression: LogicalExpression::Variable("n".to_string()),
2068 alias: None,
2069 }],
2070 distinct: false,
2071 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2072 count: 10.into(),
2073 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2074 variable: "n".to_string(),
2075 label: None,
2076 input: None,
2077 })),
2078 })),
2079 }));
2080
2081 let physical = planner.plan_adaptive(&logical).unwrap();
2082 assert!(physical.adaptive_context.is_some());
2083 }
2084
2085 #[test]
2086 fn test_plan_adaptive_with_skip() {
2087 let store = create_test_store();
2088 let planner = Planner::new(store);
2089
2090 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2091 items: vec![ReturnItem {
2092 expression: LogicalExpression::Variable("n".to_string()),
2093 alias: None,
2094 }],
2095 distinct: false,
2096 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2097 count: 5.into(),
2098 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2099 variable: "n".to_string(),
2100 label: None,
2101 input: None,
2102 })),
2103 })),
2104 }));
2105
2106 let physical = planner.plan_adaptive(&logical).unwrap();
2107 assert!(physical.adaptive_context.is_some());
2108 }
2109
2110 #[test]
2111 fn test_plan_adaptive_with_sort() {
2112 let store = create_test_store();
2113 let planner = Planner::new(store);
2114
2115 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2116 items: vec![ReturnItem {
2117 expression: LogicalExpression::Variable("n".to_string()),
2118 alias: None,
2119 }],
2120 distinct: false,
2121 input: Box::new(LogicalOperator::Sort(SortOp {
2122 keys: vec![SortKey {
2123 expression: LogicalExpression::Variable("n".to_string()),
2124 order: SortOrder::Ascending,
2125 nulls: None,
2126 }],
2127 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2128 variable: "n".to_string(),
2129 label: None,
2130 input: None,
2131 })),
2132 })),
2133 }));
2134
2135 let physical = planner.plan_adaptive(&logical).unwrap();
2136 assert!(physical.adaptive_context.is_some());
2137 }
2138
2139 #[test]
2140 fn test_plan_adaptive_with_union() {
2141 let store = create_test_store();
2142 let planner = Planner::new(store);
2143
2144 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2145 items: vec![ReturnItem {
2146 expression: LogicalExpression::Variable("n".to_string()),
2147 alias: None,
2148 }],
2149 distinct: false,
2150 input: Box::new(LogicalOperator::Union(UnionOp {
2151 inputs: vec![
2152 LogicalOperator::NodeScan(NodeScanOp {
2153 variable: "n".to_string(),
2154 label: Some("Person".to_string()),
2155 input: None,
2156 }),
2157 LogicalOperator::NodeScan(NodeScanOp {
2158 variable: "n".to_string(),
2159 label: Some("Company".to_string()),
2160 input: None,
2161 }),
2162 ],
2163 })),
2164 }));
2165
2166 let physical = planner.plan_adaptive(&logical).unwrap();
2167 assert!(physical.adaptive_context.is_some());
2168 }
2169
2170 #[test]
2173 fn test_plan_expand_variable_length() {
2174 let store = create_test_store();
2175 let planner = Planner::new(store);
2176
2177 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2179 items: vec![
2180 ReturnItem {
2181 expression: LogicalExpression::Variable("a".to_string()),
2182 alias: None,
2183 },
2184 ReturnItem {
2185 expression: LogicalExpression::Variable("b".to_string()),
2186 alias: None,
2187 },
2188 ],
2189 distinct: false,
2190 input: Box::new(LogicalOperator::Expand(ExpandOp {
2191 from_variable: "a".to_string(),
2192 to_variable: "b".to_string(),
2193 edge_variable: None,
2194 direction: ExpandDirection::Outgoing,
2195 edge_types: vec!["KNOWS".to_string()],
2196 min_hops: 1,
2197 max_hops: Some(3),
2198 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2199 variable: "a".to_string(),
2200 label: None,
2201 input: None,
2202 })),
2203 path_alias: None,
2204 path_mode: PathMode::Walk,
2205 })),
2206 }));
2207
2208 let physical = planner.plan(&logical).unwrap();
2209 assert!(physical.columns().contains(&"a".to_string()));
2210 assert!(physical.columns().contains(&"b".to_string()));
2211 }
2212
2213 #[test]
2214 fn test_plan_expand_with_path_alias() {
2215 let store = create_test_store();
2216 let planner = Planner::new(store);
2217
2218 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2220 items: vec![
2221 ReturnItem {
2222 expression: LogicalExpression::Variable("a".to_string()),
2223 alias: None,
2224 },
2225 ReturnItem {
2226 expression: LogicalExpression::Variable("b".to_string()),
2227 alias: None,
2228 },
2229 ],
2230 distinct: false,
2231 input: Box::new(LogicalOperator::Expand(ExpandOp {
2232 from_variable: "a".to_string(),
2233 to_variable: "b".to_string(),
2234 edge_variable: None,
2235 direction: ExpandDirection::Outgoing,
2236 edge_types: vec!["KNOWS".to_string()],
2237 min_hops: 1,
2238 max_hops: Some(3),
2239 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2240 variable: "a".to_string(),
2241 label: None,
2242 input: None,
2243 })),
2244 path_alias: Some("p".to_string()),
2245 path_mode: PathMode::Walk,
2246 })),
2247 }));
2248
2249 let physical = planner.plan(&logical).unwrap();
2250 assert!(physical.columns().contains(&"a".to_string()));
2252 assert!(physical.columns().contains(&"b".to_string()));
2253 }
2254
2255 #[test]
2256 fn test_plan_expand_incoming() {
2257 let store = create_test_store();
2258 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2259 .with_factorized_execution(false);
2260
2261 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2263 items: vec![
2264 ReturnItem {
2265 expression: LogicalExpression::Variable("a".to_string()),
2266 alias: None,
2267 },
2268 ReturnItem {
2269 expression: LogicalExpression::Variable("b".to_string()),
2270 alias: None,
2271 },
2272 ],
2273 distinct: false,
2274 input: Box::new(LogicalOperator::Expand(ExpandOp {
2275 from_variable: "a".to_string(),
2276 to_variable: "b".to_string(),
2277 edge_variable: None,
2278 direction: ExpandDirection::Incoming,
2279 edge_types: vec!["KNOWS".to_string()],
2280 min_hops: 1,
2281 max_hops: Some(1),
2282 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2283 variable: "a".to_string(),
2284 label: None,
2285 input: None,
2286 })),
2287 path_alias: None,
2288 path_mode: PathMode::Walk,
2289 })),
2290 }));
2291
2292 let physical = planner.plan(&logical).unwrap();
2293 assert!(physical.columns().contains(&"a".to_string()));
2294 assert!(physical.columns().contains(&"b".to_string()));
2295 }
2296
2297 #[test]
2298 fn test_plan_expand_both_directions() {
2299 let store = create_test_store();
2300 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2301 .with_factorized_execution(false);
2302
2303 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2305 items: vec![
2306 ReturnItem {
2307 expression: LogicalExpression::Variable("a".to_string()),
2308 alias: None,
2309 },
2310 ReturnItem {
2311 expression: LogicalExpression::Variable("b".to_string()),
2312 alias: None,
2313 },
2314 ],
2315 distinct: false,
2316 input: Box::new(LogicalOperator::Expand(ExpandOp {
2317 from_variable: "a".to_string(),
2318 to_variable: "b".to_string(),
2319 edge_variable: None,
2320 direction: ExpandDirection::Both,
2321 edge_types: vec!["KNOWS".to_string()],
2322 min_hops: 1,
2323 max_hops: Some(1),
2324 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2325 variable: "a".to_string(),
2326 label: None,
2327 input: None,
2328 })),
2329 path_alias: None,
2330 path_mode: PathMode::Walk,
2331 })),
2332 }));
2333
2334 let physical = planner.plan(&logical).unwrap();
2335 assert!(physical.columns().contains(&"a".to_string()));
2336 assert!(physical.columns().contains(&"b".to_string()));
2337 }
2338
2339 #[test]
2342 fn test_planner_with_context() {
2343 use crate::transaction::TransactionManager;
2344
2345 let store = create_test_store();
2346 let transaction_manager = Arc::new(TransactionManager::new());
2347 let transaction_id = transaction_manager.begin();
2348 let epoch = transaction_manager.current_epoch();
2349
2350 let planner = Planner::with_context(
2351 Arc::clone(&store) as Arc<dyn GraphStore>,
2352 Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2353 Arc::clone(&transaction_manager),
2354 Some(transaction_id),
2355 epoch,
2356 );
2357
2358 assert_eq!(planner.transaction_id(), Some(transaction_id));
2359 assert!(planner.transaction_manager().is_some());
2360 assert_eq!(planner.viewing_epoch(), epoch);
2361 }
2362
2363 #[test]
2364 fn test_planner_with_factorized_execution_disabled() {
2365 let store = create_test_store();
2366 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2367 .with_factorized_execution(false);
2368
2369 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2371 items: vec![
2372 ReturnItem {
2373 expression: LogicalExpression::Variable("a".to_string()),
2374 alias: None,
2375 },
2376 ReturnItem {
2377 expression: LogicalExpression::Variable("c".to_string()),
2378 alias: None,
2379 },
2380 ],
2381 distinct: false,
2382 input: Box::new(LogicalOperator::Expand(ExpandOp {
2383 from_variable: "b".to_string(),
2384 to_variable: "c".to_string(),
2385 edge_variable: None,
2386 direction: ExpandDirection::Outgoing,
2387 edge_types: vec![],
2388 min_hops: 1,
2389 max_hops: Some(1),
2390 input: Box::new(LogicalOperator::Expand(ExpandOp {
2391 from_variable: "a".to_string(),
2392 to_variable: "b".to_string(),
2393 edge_variable: None,
2394 direction: ExpandDirection::Outgoing,
2395 edge_types: vec![],
2396 min_hops: 1,
2397 max_hops: Some(1),
2398 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2399 variable: "a".to_string(),
2400 label: None,
2401 input: None,
2402 })),
2403 path_alias: None,
2404 path_mode: PathMode::Walk,
2405 })),
2406 path_alias: None,
2407 path_mode: PathMode::Walk,
2408 })),
2409 }));
2410
2411 let physical = planner.plan(&logical).unwrap();
2412 assert!(physical.columns().contains(&"a".to_string()));
2413 assert!(physical.columns().contains(&"c".to_string()));
2414 }
2415
2416 #[test]
2419 fn test_plan_sort_by_property() {
2420 let store = create_test_store();
2421 let planner = Planner::new(store);
2422
2423 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2425 items: vec![ReturnItem {
2426 expression: LogicalExpression::Variable("n".to_string()),
2427 alias: None,
2428 }],
2429 distinct: false,
2430 input: Box::new(LogicalOperator::Sort(SortOp {
2431 keys: vec![SortKey {
2432 expression: LogicalExpression::Property {
2433 variable: "n".to_string(),
2434 property: "name".to_string(),
2435 },
2436 order: SortOrder::Ascending,
2437 nulls: None,
2438 }],
2439 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2440 variable: "n".to_string(),
2441 label: None,
2442 input: None,
2443 })),
2444 })),
2445 }));
2446
2447 let physical = planner.plan(&logical).unwrap();
2448 assert!(physical.columns().contains(&"n".to_string()));
2450 }
2451
2452 #[test]
2455 fn test_plan_scan_with_input() {
2456 let store = create_test_store();
2457 let planner = Planner::new(store);
2458
2459 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2461 items: vec![
2462 ReturnItem {
2463 expression: LogicalExpression::Variable("a".to_string()),
2464 alias: None,
2465 },
2466 ReturnItem {
2467 expression: LogicalExpression::Variable("b".to_string()),
2468 alias: None,
2469 },
2470 ],
2471 distinct: false,
2472 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2473 variable: "b".to_string(),
2474 label: Some("Company".to_string()),
2475 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2476 variable: "a".to_string(),
2477 label: Some("Person".to_string()),
2478 input: None,
2479 }))),
2480 })),
2481 }));
2482
2483 let physical = planner.plan(&logical).unwrap();
2484 assert!(physical.columns().contains(&"a".to_string()));
2485 assert!(physical.columns().contains(&"b".to_string()));
2486 }
2487}