1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34 DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35 ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36 FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43 VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53 convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57struct RangeBounds<'a> {
59 min: Option<&'a Value>,
60 max: Option<&'a Value>,
61 min_inclusive: bool,
62 max_inclusive: bool,
63}
64
65pub struct Planner {
67 pub(super) store: Arc<dyn GraphStore>,
69 pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
71 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
73 pub(super) transaction_id: Option<TransactionId>,
75 pub(super) viewing_epoch: EpochId,
77 pub(super) anon_edge_counter: std::cell::Cell<u32>,
79 pub(super) factorized_execution: bool,
81 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
87 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
89 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
91 pub(super) correlated_param_state:
95 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
96 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
99 profiling: std::cell::Cell<bool>,
101 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
103 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
105 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
107 pub(super) read_only: bool,
111}
112
113impl Planner {
114 #[must_use]
119 pub fn new(store: Arc<dyn GraphStore>) -> Self {
120 let epoch = store.current_epoch();
121 Self {
122 store,
123 write_store: None,
124 transaction_manager: None,
125 transaction_id: None,
126 viewing_epoch: epoch,
127 anon_edge_counter: std::cell::Cell::new(0),
128 factorized_execution: true,
129 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131 validator: None,
132 catalog: None,
133 correlated_param_state: std::cell::RefCell::new(None),
134 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
135 profiling: std::cell::Cell::new(false),
136 profile_entries: std::cell::RefCell::new(Vec::new()),
137 write_tracker: None,
138 session_context: grafeo_core::execution::operators::SessionContext::default(),
139 read_only: false,
140 }
141 }
142
143 #[must_use]
145 pub fn with_context(
146 store: Arc<dyn GraphStore>,
147 write_store: Option<Arc<dyn GraphStoreMut>>,
148 transaction_manager: Arc<TransactionManager>,
149 transaction_id: Option<TransactionId>,
150 viewing_epoch: EpochId,
151 ) -> Self {
152 use crate::transaction::TransactionWriteTracker;
153
154 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
156 if transaction_id.is_some() {
157 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
158 &transaction_manager,
159 ))))
160 } else {
161 None
162 };
163
164 Self {
165 store,
166 write_store,
167 transaction_manager: Some(transaction_manager),
168 transaction_id,
169 viewing_epoch,
170 anon_edge_counter: std::cell::Cell::new(0),
171 factorized_execution: true,
172 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
173 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
174 validator: None,
175 catalog: None,
176 correlated_param_state: std::cell::RefCell::new(None),
177 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
178 profiling: std::cell::Cell::new(false),
179 profile_entries: std::cell::RefCell::new(Vec::new()),
180 write_tracker,
181 session_context: grafeo_core::execution::operators::SessionContext::default(),
182 read_only: false,
183 }
184 }
185
186 #[must_use]
189 pub fn with_read_only(mut self, read_only: bool) -> Self {
190 self.read_only = read_only;
191 self
192 }
193
194 fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
196 self.write_store
197 .as_ref()
198 .map(Arc::clone)
199 .ok_or(Error::Transaction(
200 grafeo_common::utils::error::TransactionError::ReadOnly,
201 ))
202 }
203
204 #[must_use]
206 pub fn viewing_epoch(&self) -> EpochId {
207 self.viewing_epoch
208 }
209
210 #[must_use]
212 pub fn transaction_id(&self) -> Option<TransactionId> {
213 self.transaction_id
214 }
215
216 #[must_use]
218 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
219 self.transaction_manager.as_ref()
220 }
221
222 #[must_use]
224 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
225 self.factorized_execution = enabled;
226 self
227 }
228
229 #[must_use]
231 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
232 self.validator = Some(validator);
233 self
234 }
235
236 #[must_use]
238 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
239 self.catalog = Some(catalog);
240 self
241 }
242
243 #[must_use]
245 pub fn with_session_context(
246 mut self,
247 context: grafeo_core::execution::operators::SessionContext,
248 ) -> Self {
249 self.session_context = context;
250 self
251 }
252
253 pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
259 let name = edge_variable.clone().unwrap_or_else(|| {
260 let count = self.anon_edge_counter.get();
261 self.anon_edge_counter.set(count + 1);
262 format!("_anon_edge_{}", count)
263 });
264 self.edge_columns.borrow_mut().insert(name.clone());
265 name
266 }
267
268 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
272 match op {
273 LogicalOperator::Expand(expand) => {
274 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
275
276 if is_single_hop {
277 let (inner_count, base) = Self::count_expand_chain(&expand.input);
278 (inner_count + 1, base)
279 } else {
280 (0, op)
281 }
282 }
283 _ => (0, op),
284 }
285 }
286
287 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
291 let mut chain = Vec::new();
292 let mut current = op;
293
294 while let LogicalOperator::Expand(expand) = current {
295 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
296 if !is_single_hop {
297 break;
298 }
299 chain.push(expand);
300 current = &expand.input;
301 }
302
303 chain.reverse();
304 chain
305 }
306
307 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
314 let _span = grafeo_debug_span!("grafeo::query::plan");
315 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
316 Ok(PhysicalPlan {
317 operator,
318 columns,
319 adaptive_context: None,
320 })
321 }
322
323 pub fn plan_profiled(
334 &self,
335 logical_plan: &LogicalPlan,
336 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
337 self.profiling.set(true);
338 self.profile_entries.borrow_mut().clear();
339
340 let result = self.plan_operator(&logical_plan.root);
341
342 self.profiling.set(false);
343 let (operator, columns) = result?;
344 let entries = self.profile_entries.borrow_mut().drain(..).collect();
345
346 Ok((
347 PhysicalPlan {
348 operator,
349 columns,
350 adaptive_context: None,
351 },
352 entries,
353 ))
354 }
355
356 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
363 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
364
365 let mut adaptive_context = AdaptiveContext::new();
366 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
367
368 Ok(PhysicalPlan {
369 operator,
370 columns,
371 adaptive_context: Some(adaptive_context),
372 })
373 }
374
375 fn collect_cardinality_estimates(
377 &self,
378 op: &LogicalOperator,
379 ctx: &mut AdaptiveContext,
380 depth: usize,
381 ) {
382 match op {
383 LogicalOperator::NodeScan(scan) => {
384 let estimate = if let Some(label) = &scan.label {
385 self.store.nodes_by_label(label).len() as f64
386 } else {
387 self.store.node_count() as f64
388 };
389 let id = format!("scan_{}", scan.variable);
390 ctx.set_estimate(&id, estimate);
391
392 if let Some(input) = &scan.input {
393 self.collect_cardinality_estimates(input, ctx, depth + 1);
394 }
395 }
396 LogicalOperator::Filter(filter) => {
397 let input_estimate = self.estimate_cardinality(&filter.input);
398 let estimate = input_estimate * 0.3;
399 let id = format!("filter_{depth}");
400 ctx.set_estimate(&id, estimate);
401
402 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
403 }
404 LogicalOperator::Expand(expand) => {
405 let input_estimate = self.estimate_cardinality(&expand.input);
406 let stats = self.store.statistics();
407 let avg_degree = self.estimate_expand_degree(&stats, expand);
408 let estimate = input_estimate * avg_degree;
409 let id = format!("expand_{}", expand.to_variable);
410 ctx.set_estimate(&id, estimate);
411
412 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
413 }
414 LogicalOperator::Join(join) => {
415 let left_est = self.estimate_cardinality(&join.left);
416 let right_est = self.estimate_cardinality(&join.right);
417 let estimate = (left_est * right_est).sqrt();
418 let id = format!("join_{depth}");
419 ctx.set_estimate(&id, estimate);
420
421 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
422 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
423 }
424 LogicalOperator::Aggregate(agg) => {
425 let input_estimate = self.estimate_cardinality(&agg.input);
426 let estimate = if agg.group_by.is_empty() {
427 1.0
428 } else {
429 (input_estimate * 0.1).max(1.0)
430 };
431 let id = format!("aggregate_{depth}");
432 ctx.set_estimate(&id, estimate);
433
434 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
435 }
436 LogicalOperator::Distinct(distinct) => {
437 let input_estimate = self.estimate_cardinality(&distinct.input);
438 let estimate = (input_estimate * 0.5).max(1.0);
439 let id = format!("distinct_{depth}");
440 ctx.set_estimate(&id, estimate);
441
442 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
443 }
444 LogicalOperator::Return(ret) => {
445 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
446 }
447 LogicalOperator::Limit(limit) => {
448 let input_estimate = self.estimate_cardinality(&limit.input);
449 let estimate = (input_estimate).min(limit.count.estimate());
450 let id = format!("limit_{depth}");
451 ctx.set_estimate(&id, estimate);
452
453 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
454 }
455 LogicalOperator::Skip(skip) => {
456 let input_estimate = self.estimate_cardinality(&skip.input);
457 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
458 let id = format!("skip_{depth}");
459 ctx.set_estimate(&id, estimate);
460
461 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
462 }
463 LogicalOperator::Sort(sort) => {
464 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
465 }
466 LogicalOperator::Union(union) => {
467 let estimate: f64 = union
468 .inputs
469 .iter()
470 .map(|input| self.estimate_cardinality(input))
471 .sum();
472 let id = format!("union_{depth}");
473 ctx.set_estimate(&id, estimate);
474
475 for input in &union.inputs {
476 self.collect_cardinality_estimates(input, ctx, depth + 1);
477 }
478 }
479 _ => {
480 }
482 }
483 }
484
485 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
487 match op {
488 LogicalOperator::NodeScan(scan) => {
489 if let Some(label) = &scan.label {
490 self.store.nodes_by_label(label).len() as f64
491 } else {
492 self.store.node_count() as f64
493 }
494 }
495 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
496 LogicalOperator::Expand(expand) => {
497 let stats = self.store.statistics();
498 let avg_degree = self.estimate_expand_degree(&stats, expand);
499 self.estimate_cardinality(&expand.input) * avg_degree
500 }
501 LogicalOperator::Join(join) => {
502 let left = self.estimate_cardinality(&join.left);
503 let right = self.estimate_cardinality(&join.right);
504 (left * right).sqrt()
505 }
506 LogicalOperator::Aggregate(agg) => {
507 if agg.group_by.is_empty() {
508 1.0
509 } else {
510 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
511 }
512 }
513 LogicalOperator::Distinct(distinct) => {
514 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
515 }
516 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
517 LogicalOperator::Limit(limit) => self
518 .estimate_cardinality(&limit.input)
519 .min(limit.count.estimate()),
520 LogicalOperator::Skip(skip) => {
521 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
522 }
523 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
524 LogicalOperator::Union(union) => union
525 .inputs
526 .iter()
527 .map(|input| self.estimate_cardinality(input))
528 .sum(),
529 LogicalOperator::Except(except) => {
530 let left = self.estimate_cardinality(&except.left);
531 let right = self.estimate_cardinality(&except.right);
532 (left - right).max(0.0)
533 }
534 LogicalOperator::Intersect(intersect) => {
535 let left = self.estimate_cardinality(&intersect.left);
536 let right = self.estimate_cardinality(&intersect.right);
537 left.min(right)
538 }
539 LogicalOperator::Otherwise(otherwise) => self
540 .estimate_cardinality(&otherwise.left)
541 .max(self.estimate_cardinality(&otherwise.right)),
542 _ => 1000.0,
543 }
544 }
545
546 fn estimate_expand_degree(
548 &self,
549 stats: &grafeo_core::statistics::Statistics,
550 expand: &ExpandOp,
551 ) -> f64 {
552 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
553 if expand.edge_types.len() == 1 {
554 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
555 } else if stats.total_nodes > 0 {
556 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
557 } else {
558 10.0
559 }
560 }
561
562 fn maybe_profile(
565 &self,
566 result: Result<(Box<dyn Operator>, Vec<String>)>,
567 op: &LogicalOperator,
568 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
569 if self.profiling.get() {
570 let (physical, columns) = result?;
571 let (entry, stats) =
572 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
573 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
574 self.profile_entries.borrow_mut().push(entry);
575 Ok((Box::new(profiled), columns))
576 } else {
577 result
578 }
579 }
580
581 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
583 let result = match op {
584 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
585 LogicalOperator::Expand(expand) => {
586 if self.factorized_execution {
587 let (chain_len, _base) = Self::count_expand_chain(op);
588 if chain_len >= 2 {
589 return self.maybe_profile(self.plan_expand_chain(op), op);
590 }
591 }
592 self.plan_expand(expand)
593 }
594 LogicalOperator::Return(ret) => self.plan_return(ret),
595 LogicalOperator::Filter(filter) => self.plan_filter(filter),
596 LogicalOperator::Project(project) => self.plan_project(project),
597 LogicalOperator::Limit(limit) => self.plan_limit(limit),
598 LogicalOperator::Skip(skip) => self.plan_skip(skip),
599 LogicalOperator::Sort(sort) => self.plan_sort(sort),
600 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
601 LogicalOperator::Join(join) => self.plan_join(join),
602 LogicalOperator::Union(union) => self.plan_union(union),
603 LogicalOperator::Except(except) => self.plan_except(except),
604 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
605 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
606 LogicalOperator::Apply(apply) => self.plan_apply(apply),
607 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
608 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
609 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
610 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
611 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
612 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
613 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
614 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
615 LogicalOperator::Merge(merge) => self.plan_merge(merge),
616 LogicalOperator::MergeRelationship(merge_rel) => {
617 self.plan_merge_relationship(merge_rel)
618 }
619 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
620 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
621 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
622 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
623 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
624 #[cfg(feature = "algos")]
625 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
626 #[cfg(not(feature = "algos"))]
627 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
628 "CALL procedures require the 'algos' feature".to_string(),
629 )),
630 LogicalOperator::ParameterScan(_param_scan) => {
631 let state = self
632 .correlated_param_state
633 .borrow()
634 .clone()
635 .ok_or_else(|| {
636 Error::Internal(
637 "ParameterScan without correlated Apply context".to_string(),
638 )
639 })?;
640 let columns = state.columns.clone();
643 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
644 Ok((operator, columns))
645 }
646 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
647 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
648 LogicalOperator::LoadData(load) => {
649 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
650 load.path.clone(),
651 load.format,
652 load.with_headers,
653 load.field_terminator,
654 load.variable.clone(),
655 ));
656 Ok((operator, vec![load.variable.clone()]))
657 }
658 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
659 LogicalOperator::VectorScan(_) => Err(Error::Internal(
660 "VectorScan requires vector-index feature".to_string(),
661 )),
662 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
663 "VectorJoin requires vector-index feature".to_string(),
664 )),
665 _ => Err(Error::Internal(format!(
666 "Unsupported operator: {:?}",
667 std::mem::discriminant(op)
668 ))),
669 };
670 self.maybe_profile(result, op)
671 }
672
673 fn plan_horizontal_aggregate(
675 &self,
676 ha: &HorizontalAggregateOp,
677 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
678 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
679
680 let list_col_idx = child_columns
681 .iter()
682 .position(|c| c == &ha.list_column)
683 .ok_or_else(|| {
684 Error::Internal(format!(
685 "HorizontalAggregate list column '{}' not found in {:?}",
686 ha.list_column, child_columns
687 ))
688 })?;
689
690 let entity_kind = match ha.entity_kind {
691 LogicalEntityKind::Edge => EntityKind::Edge,
692 LogicalEntityKind::Node => EntityKind::Node,
693 };
694
695 let function = convert_aggregate_function(ha.function);
696 let input_column_count = child_columns.len();
697
698 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
699 child_op,
700 list_col_idx,
701 entity_kind,
702 function,
703 ha.property.clone(),
704 Arc::clone(&self.store) as Arc<dyn GraphStore>,
705 input_column_count,
706 ));
707
708 let mut columns = child_columns;
709 columns.push(ha.alias.clone());
710 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
712
713 Ok((operator, columns))
714 }
715
716 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
718 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
719 let key_idx = child_columns
720 .iter()
721 .position(|c| c == &mc.key_var)
722 .ok_or_else(|| {
723 Error::Internal(format!(
724 "MapCollect key '{}' not in columns {:?}",
725 mc.key_var, child_columns
726 ))
727 })?;
728 let value_idx = child_columns
729 .iter()
730 .position(|c| c == &mc.value_var)
731 .ok_or_else(|| {
732 Error::Internal(format!(
733 "MapCollect value '{}' not in columns {:?}",
734 mc.value_var, child_columns
735 ))
736 })?;
737 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
738 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
739 Ok((operator, vec![mc.alias.clone()]))
740 }
741}
742
743#[cfg(feature = "algos")]
745struct StaticResultOperator {
746 rows: Vec<Vec<Value>>,
747 column_indices: Vec<usize>,
748 row_index: usize,
749}
750
751#[cfg(feature = "algos")]
752impl Operator for StaticResultOperator {
753 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
754 use grafeo_core::execution::DataChunk;
755
756 if self.row_index >= self.rows.len() {
757 return Ok(None);
758 }
759
760 let remaining = self.rows.len() - self.row_index;
761 let chunk_rows = remaining.min(1024);
762 let col_count = self.column_indices.len();
763
764 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
765 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
766
767 for row_offset in 0..chunk_rows {
768 let row = &self.rows[self.row_index + row_offset];
769 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
770 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
771 if let Some(col) = chunk.column_mut(col_idx) {
772 col.push_value(value);
773 }
774 }
775 }
776 chunk.set_count(chunk_rows);
777
778 self.row_index += chunk_rows;
779 Ok(Some(chunk))
780 }
781
782 fn reset(&mut self) {
783 self.row_index = 0;
784 }
785
786 fn name(&self) -> &'static str {
787 "StaticResult"
788 }
789
790 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
791 self
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use super::*;
798 use crate::query::plan::{
799 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
800 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
801 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
802 SkipOp as LogicalSkipOp, SortKey, SortOp,
803 };
804 use grafeo_common::types::Value;
805 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
806 use grafeo_core::graph::GraphStoreMut;
807 use grafeo_core::graph::lpg::LpgStore;
808
809 fn create_test_store() -> Arc<LpgStore> {
810 let store = Arc::new(LpgStore::new().unwrap());
811 store.create_node(&["Person"]);
812 store.create_node(&["Person"]);
813 store.create_node(&["Company"]);
814 store
815 }
816
817 #[test]
820 fn test_plan_simple_scan() {
821 let store = create_test_store();
822 let planner = Planner::new(store);
823
824 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
826 items: vec![ReturnItem {
827 expression: LogicalExpression::Variable("n".to_string()),
828 alias: None,
829 }],
830 distinct: false,
831 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
832 variable: "n".to_string(),
833 label: Some("Person".to_string()),
834 input: None,
835 })),
836 }));
837
838 let physical = planner.plan(&logical).unwrap();
839 assert_eq!(physical.columns(), &["n"]);
840 }
841
842 #[test]
843 fn test_plan_scan_without_label() {
844 let store = create_test_store();
845 let planner = Planner::new(store);
846
847 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
849 items: vec![ReturnItem {
850 expression: LogicalExpression::Variable("n".to_string()),
851 alias: None,
852 }],
853 distinct: false,
854 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
855 variable: "n".to_string(),
856 label: None,
857 input: None,
858 })),
859 }));
860
861 let physical = planner.plan(&logical).unwrap();
862 assert_eq!(physical.columns(), &["n"]);
863 }
864
865 #[test]
866 fn test_plan_return_with_alias() {
867 let store = create_test_store();
868 let planner = Planner::new(store);
869
870 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
872 items: vec![ReturnItem {
873 expression: LogicalExpression::Variable("n".to_string()),
874 alias: Some("person".to_string()),
875 }],
876 distinct: false,
877 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
878 variable: "n".to_string(),
879 label: Some("Person".to_string()),
880 input: None,
881 })),
882 }));
883
884 let physical = planner.plan(&logical).unwrap();
885 assert_eq!(physical.columns(), &["person"]);
886 }
887
888 #[test]
889 fn test_plan_return_property() {
890 let store = create_test_store();
891 let planner = Planner::new(store);
892
893 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
895 items: vec![ReturnItem {
896 expression: LogicalExpression::Property {
897 variable: "n".to_string(),
898 property: "name".to_string(),
899 },
900 alias: None,
901 }],
902 distinct: false,
903 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
904 variable: "n".to_string(),
905 label: Some("Person".to_string()),
906 input: None,
907 })),
908 }));
909
910 let physical = planner.plan(&logical).unwrap();
911 assert_eq!(physical.columns(), &["n.name"]);
912 }
913
914 #[test]
915 fn test_plan_return_literal() {
916 let store = create_test_store();
917 let planner = Planner::new(store);
918
919 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
921 items: vec![ReturnItem {
922 expression: LogicalExpression::Literal(Value::Int64(42)),
923 alias: Some("answer".to_string()),
924 }],
925 distinct: false,
926 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
927 variable: "n".to_string(),
928 label: None,
929 input: None,
930 })),
931 }));
932
933 let physical = planner.plan(&logical).unwrap();
934 assert_eq!(physical.columns(), &["answer"]);
935 }
936
937 #[test]
940 fn test_plan_filter_equality() {
941 let store = create_test_store();
942 let planner = Planner::new(store);
943
944 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
946 items: vec![ReturnItem {
947 expression: LogicalExpression::Variable("n".to_string()),
948 alias: None,
949 }],
950 distinct: false,
951 input: Box::new(LogicalOperator::Filter(FilterOp {
952 predicate: LogicalExpression::Binary {
953 left: Box::new(LogicalExpression::Property {
954 variable: "n".to_string(),
955 property: "age".to_string(),
956 }),
957 op: BinaryOp::Eq,
958 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
959 },
960 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
961 variable: "n".to_string(),
962 label: Some("Person".to_string()),
963 input: None,
964 })),
965 pushdown_hint: None,
966 })),
967 }));
968
969 let physical = planner.plan(&logical).unwrap();
970 assert_eq!(physical.columns(), &["n"]);
971 }
972
973 #[test]
974 fn test_plan_filter_compound_and() {
975 let store = create_test_store();
976 let planner = Planner::new(store);
977
978 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
980 items: vec![ReturnItem {
981 expression: LogicalExpression::Variable("n".to_string()),
982 alias: None,
983 }],
984 distinct: false,
985 input: Box::new(LogicalOperator::Filter(FilterOp {
986 predicate: LogicalExpression::Binary {
987 left: Box::new(LogicalExpression::Binary {
988 left: Box::new(LogicalExpression::Property {
989 variable: "n".to_string(),
990 property: "age".to_string(),
991 }),
992 op: BinaryOp::Gt,
993 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
994 }),
995 op: BinaryOp::And,
996 right: Box::new(LogicalExpression::Binary {
997 left: Box::new(LogicalExpression::Property {
998 variable: "n".to_string(),
999 property: "age".to_string(),
1000 }),
1001 op: BinaryOp::Lt,
1002 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1003 }),
1004 },
1005 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1006 variable: "n".to_string(),
1007 label: None,
1008 input: None,
1009 })),
1010 pushdown_hint: None,
1011 })),
1012 }));
1013
1014 let physical = planner.plan(&logical).unwrap();
1015 assert_eq!(physical.columns(), &["n"]);
1016 }
1017
1018 #[test]
1019 fn test_plan_filter_unary_not() {
1020 let store = create_test_store();
1021 let planner = Planner::new(store);
1022
1023 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1025 items: vec![ReturnItem {
1026 expression: LogicalExpression::Variable("n".to_string()),
1027 alias: None,
1028 }],
1029 distinct: false,
1030 input: Box::new(LogicalOperator::Filter(FilterOp {
1031 predicate: LogicalExpression::Unary {
1032 op: UnaryOp::Not,
1033 operand: Box::new(LogicalExpression::Property {
1034 variable: "n".to_string(),
1035 property: "active".to_string(),
1036 }),
1037 },
1038 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1039 variable: "n".to_string(),
1040 label: None,
1041 input: None,
1042 })),
1043 pushdown_hint: None,
1044 })),
1045 }));
1046
1047 let physical = planner.plan(&logical).unwrap();
1048 assert_eq!(physical.columns(), &["n"]);
1049 }
1050
1051 #[test]
1052 fn test_plan_filter_is_null() {
1053 let store = create_test_store();
1054 let planner = Planner::new(store);
1055
1056 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1058 items: vec![ReturnItem {
1059 expression: LogicalExpression::Variable("n".to_string()),
1060 alias: None,
1061 }],
1062 distinct: false,
1063 input: Box::new(LogicalOperator::Filter(FilterOp {
1064 predicate: LogicalExpression::Unary {
1065 op: UnaryOp::IsNull,
1066 operand: Box::new(LogicalExpression::Property {
1067 variable: "n".to_string(),
1068 property: "email".to_string(),
1069 }),
1070 },
1071 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1072 variable: "n".to_string(),
1073 label: None,
1074 input: None,
1075 })),
1076 pushdown_hint: None,
1077 })),
1078 }));
1079
1080 let physical = planner.plan(&logical).unwrap();
1081 assert_eq!(physical.columns(), &["n"]);
1082 }
1083
1084 #[test]
1085 fn test_plan_filter_function_call() {
1086 let store = create_test_store();
1087 let planner = Planner::new(store);
1088
1089 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1091 items: vec![ReturnItem {
1092 expression: LogicalExpression::Variable("n".to_string()),
1093 alias: None,
1094 }],
1095 distinct: false,
1096 input: Box::new(LogicalOperator::Filter(FilterOp {
1097 predicate: LogicalExpression::Binary {
1098 left: Box::new(LogicalExpression::FunctionCall {
1099 name: "size".to_string(),
1100 args: vec![LogicalExpression::Property {
1101 variable: "n".to_string(),
1102 property: "friends".to_string(),
1103 }],
1104 distinct: false,
1105 }),
1106 op: BinaryOp::Gt,
1107 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1108 },
1109 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1110 variable: "n".to_string(),
1111 label: None,
1112 input: None,
1113 })),
1114 pushdown_hint: None,
1115 })),
1116 }));
1117
1118 let physical = planner.plan(&logical).unwrap();
1119 assert_eq!(physical.columns(), &["n"]);
1120 }
1121
1122 #[test]
1125 fn test_plan_expand_outgoing() {
1126 let store = create_test_store();
1127 let planner = Planner::new(store);
1128
1129 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1131 items: vec![
1132 ReturnItem {
1133 expression: LogicalExpression::Variable("a".to_string()),
1134 alias: None,
1135 },
1136 ReturnItem {
1137 expression: LogicalExpression::Variable("b".to_string()),
1138 alias: None,
1139 },
1140 ],
1141 distinct: false,
1142 input: Box::new(LogicalOperator::Expand(ExpandOp {
1143 from_variable: "a".to_string(),
1144 to_variable: "b".to_string(),
1145 edge_variable: None,
1146 direction: ExpandDirection::Outgoing,
1147 edge_types: vec!["KNOWS".to_string()],
1148 min_hops: 1,
1149 max_hops: Some(1),
1150 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1151 variable: "a".to_string(),
1152 label: Some("Person".to_string()),
1153 input: None,
1154 })),
1155 path_alias: None,
1156 path_mode: PathMode::Walk,
1157 })),
1158 }));
1159
1160 let physical = planner.plan(&logical).unwrap();
1161 assert!(physical.columns().contains(&"a".to_string()));
1163 assert!(physical.columns().contains(&"b".to_string()));
1164 }
1165
1166 #[test]
1167 fn test_plan_expand_with_edge_variable() {
1168 let store = create_test_store();
1169 let planner = Planner::new(store);
1170
1171 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1173 items: vec![
1174 ReturnItem {
1175 expression: LogicalExpression::Variable("a".to_string()),
1176 alias: None,
1177 },
1178 ReturnItem {
1179 expression: LogicalExpression::Variable("r".to_string()),
1180 alias: None,
1181 },
1182 ReturnItem {
1183 expression: LogicalExpression::Variable("b".to_string()),
1184 alias: None,
1185 },
1186 ],
1187 distinct: false,
1188 input: Box::new(LogicalOperator::Expand(ExpandOp {
1189 from_variable: "a".to_string(),
1190 to_variable: "b".to_string(),
1191 edge_variable: Some("r".to_string()),
1192 direction: ExpandDirection::Outgoing,
1193 edge_types: vec!["KNOWS".to_string()],
1194 min_hops: 1,
1195 max_hops: Some(1),
1196 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1197 variable: "a".to_string(),
1198 label: None,
1199 input: None,
1200 })),
1201 path_alias: None,
1202 path_mode: PathMode::Walk,
1203 })),
1204 }));
1205
1206 let physical = planner.plan(&logical).unwrap();
1207 assert!(physical.columns().contains(&"a".to_string()));
1208 assert!(physical.columns().contains(&"r".to_string()));
1209 assert!(physical.columns().contains(&"b".to_string()));
1210 }
1211
1212 #[test]
1215 fn test_plan_limit() {
1216 let store = create_test_store();
1217 let planner = Planner::new(store);
1218
1219 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1221 items: vec![ReturnItem {
1222 expression: LogicalExpression::Variable("n".to_string()),
1223 alias: None,
1224 }],
1225 distinct: false,
1226 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1227 count: 10.into(),
1228 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1229 variable: "n".to_string(),
1230 label: None,
1231 input: None,
1232 })),
1233 })),
1234 }));
1235
1236 let physical = planner.plan(&logical).unwrap();
1237 assert_eq!(physical.columns(), &["n"]);
1238 }
1239
1240 #[test]
1241 fn test_plan_skip() {
1242 let store = create_test_store();
1243 let planner = Planner::new(store);
1244
1245 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1247 items: vec![ReturnItem {
1248 expression: LogicalExpression::Variable("n".to_string()),
1249 alias: None,
1250 }],
1251 distinct: false,
1252 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1253 count: 5.into(),
1254 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1255 variable: "n".to_string(),
1256 label: None,
1257 input: None,
1258 })),
1259 })),
1260 }));
1261
1262 let physical = planner.plan(&logical).unwrap();
1263 assert_eq!(physical.columns(), &["n"]);
1264 }
1265
1266 #[test]
1267 fn test_plan_sort() {
1268 let store = create_test_store();
1269 let planner = Planner::new(store);
1270
1271 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1273 items: vec![ReturnItem {
1274 expression: LogicalExpression::Variable("n".to_string()),
1275 alias: None,
1276 }],
1277 distinct: false,
1278 input: Box::new(LogicalOperator::Sort(SortOp {
1279 keys: vec![SortKey {
1280 expression: LogicalExpression::Variable("n".to_string()),
1281 order: SortOrder::Ascending,
1282 nulls: None,
1283 }],
1284 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1285 variable: "n".to_string(),
1286 label: None,
1287 input: None,
1288 })),
1289 })),
1290 }));
1291
1292 let physical = planner.plan(&logical).unwrap();
1293 assert_eq!(physical.columns(), &["n"]);
1294 }
1295
1296 #[test]
1297 fn test_plan_sort_descending() {
1298 let store = create_test_store();
1299 let planner = Planner::new(store);
1300
1301 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1303 items: vec![ReturnItem {
1304 expression: LogicalExpression::Variable("n".to_string()),
1305 alias: None,
1306 }],
1307 distinct: false,
1308 input: Box::new(LogicalOperator::Sort(SortOp {
1309 keys: vec![SortKey {
1310 expression: LogicalExpression::Variable("n".to_string()),
1311 order: SortOrder::Descending,
1312 nulls: None,
1313 }],
1314 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1315 variable: "n".to_string(),
1316 label: None,
1317 input: None,
1318 })),
1319 })),
1320 }));
1321
1322 let physical = planner.plan(&logical).unwrap();
1323 assert_eq!(physical.columns(), &["n"]);
1324 }
1325
1326 #[test]
1327 fn test_plan_distinct() {
1328 let store = create_test_store();
1329 let planner = Planner::new(store);
1330
1331 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1333 items: vec![ReturnItem {
1334 expression: LogicalExpression::Variable("n".to_string()),
1335 alias: None,
1336 }],
1337 distinct: false,
1338 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1339 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1340 variable: "n".to_string(),
1341 label: None,
1342 input: None,
1343 })),
1344 columns: None,
1345 })),
1346 }));
1347
1348 let physical = planner.plan(&logical).unwrap();
1349 assert_eq!(physical.columns(), &["n"]);
1350 }
1351
1352 #[test]
1353 fn test_plan_distinct_with_columns() {
1354 let store = create_test_store();
1355 let planner = Planner::new(store);
1356
1357 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1359 items: vec![ReturnItem {
1360 expression: LogicalExpression::Variable("n".to_string()),
1361 alias: None,
1362 }],
1363 distinct: false,
1364 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1365 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1366 variable: "n".to_string(),
1367 label: None,
1368 input: None,
1369 })),
1370 columns: Some(vec!["n".to_string()]),
1371 })),
1372 }));
1373
1374 let physical = planner.plan(&logical).unwrap();
1375 assert_eq!(physical.columns(), &["n"]);
1376 }
1377
1378 #[test]
1379 fn test_plan_distinct_with_nonexistent_columns() {
1380 let store = create_test_store();
1381 let planner = Planner::new(store);
1382
1383 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1386 items: vec![ReturnItem {
1387 expression: LogicalExpression::Variable("n".to_string()),
1388 alias: None,
1389 }],
1390 distinct: false,
1391 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1392 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1393 variable: "n".to_string(),
1394 label: None,
1395 input: None,
1396 })),
1397 columns: Some(vec!["nonexistent".to_string()]),
1398 })),
1399 }));
1400
1401 let physical = planner.plan(&logical).unwrap();
1402 assert_eq!(physical.columns(), &["n"]);
1403 }
1404
1405 #[test]
1408 fn test_plan_aggregate_count() {
1409 let store = create_test_store();
1410 let planner = Planner::new(store);
1411
1412 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1414 items: vec![ReturnItem {
1415 expression: LogicalExpression::Variable("cnt".to_string()),
1416 alias: None,
1417 }],
1418 distinct: false,
1419 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1420 group_by: vec![],
1421 aggregates: vec![LogicalAggregateExpr {
1422 function: LogicalAggregateFunction::Count,
1423 expression: Some(LogicalExpression::Variable("n".to_string())),
1424 expression2: None,
1425 distinct: false,
1426 alias: Some("cnt".to_string()),
1427 percentile: None,
1428 separator: None,
1429 }],
1430 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1431 variable: "n".to_string(),
1432 label: None,
1433 input: None,
1434 })),
1435 having: None,
1436 })),
1437 }));
1438
1439 let physical = planner.plan(&logical).unwrap();
1440 assert!(physical.columns().contains(&"cnt".to_string()));
1441 }
1442
1443 #[test]
1444 fn test_plan_aggregate_with_group_by() {
1445 let store = create_test_store();
1446 let planner = Planner::new(store);
1447
1448 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1450 group_by: vec![LogicalExpression::Property {
1451 variable: "n".to_string(),
1452 property: "city".to_string(),
1453 }],
1454 aggregates: vec![LogicalAggregateExpr {
1455 function: LogicalAggregateFunction::Count,
1456 expression: Some(LogicalExpression::Variable("n".to_string())),
1457 expression2: None,
1458 distinct: false,
1459 alias: Some("cnt".to_string()),
1460 percentile: None,
1461 separator: None,
1462 }],
1463 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1464 variable: "n".to_string(),
1465 label: Some("Person".to_string()),
1466 input: None,
1467 })),
1468 having: None,
1469 }));
1470
1471 let physical = planner.plan(&logical).unwrap();
1472 assert_eq!(physical.columns().len(), 2);
1473 }
1474
1475 #[test]
1476 fn test_plan_aggregate_sum() {
1477 let store = create_test_store();
1478 let planner = Planner::new(store);
1479
1480 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1482 group_by: vec![],
1483 aggregates: vec![LogicalAggregateExpr {
1484 function: LogicalAggregateFunction::Sum,
1485 expression: Some(LogicalExpression::Property {
1486 variable: "n".to_string(),
1487 property: "value".to_string(),
1488 }),
1489 expression2: None,
1490 distinct: false,
1491 alias: Some("total".to_string()),
1492 percentile: None,
1493 separator: None,
1494 }],
1495 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1496 variable: "n".to_string(),
1497 label: None,
1498 input: None,
1499 })),
1500 having: None,
1501 }));
1502
1503 let physical = planner.plan(&logical).unwrap();
1504 assert!(physical.columns().contains(&"total".to_string()));
1505 }
1506
1507 #[test]
1508 fn test_plan_aggregate_avg() {
1509 let store = create_test_store();
1510 let planner = Planner::new(store);
1511
1512 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1514 group_by: vec![],
1515 aggregates: vec![LogicalAggregateExpr {
1516 function: LogicalAggregateFunction::Avg,
1517 expression: Some(LogicalExpression::Property {
1518 variable: "n".to_string(),
1519 property: "score".to_string(),
1520 }),
1521 expression2: None,
1522 distinct: false,
1523 alias: Some("average".to_string()),
1524 percentile: None,
1525 separator: None,
1526 }],
1527 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1528 variable: "n".to_string(),
1529 label: None,
1530 input: None,
1531 })),
1532 having: None,
1533 }));
1534
1535 let physical = planner.plan(&logical).unwrap();
1536 assert!(physical.columns().contains(&"average".to_string()));
1537 }
1538
1539 #[test]
1540 fn test_plan_aggregate_min_max() {
1541 let store = create_test_store();
1542 let planner = Planner::new(store);
1543
1544 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1546 group_by: vec![],
1547 aggregates: vec![
1548 LogicalAggregateExpr {
1549 function: LogicalAggregateFunction::Min,
1550 expression: Some(LogicalExpression::Property {
1551 variable: "n".to_string(),
1552 property: "age".to_string(),
1553 }),
1554 expression2: None,
1555 distinct: false,
1556 alias: Some("youngest".to_string()),
1557 percentile: None,
1558 separator: None,
1559 },
1560 LogicalAggregateExpr {
1561 function: LogicalAggregateFunction::Max,
1562 expression: Some(LogicalExpression::Property {
1563 variable: "n".to_string(),
1564 property: "age".to_string(),
1565 }),
1566 expression2: None,
1567 distinct: false,
1568 alias: Some("oldest".to_string()),
1569 percentile: None,
1570 separator: None,
1571 },
1572 ],
1573 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1574 variable: "n".to_string(),
1575 label: None,
1576 input: None,
1577 })),
1578 having: None,
1579 }));
1580
1581 let physical = planner.plan(&logical).unwrap();
1582 assert!(physical.columns().contains(&"youngest".to_string()));
1583 assert!(physical.columns().contains(&"oldest".to_string()));
1584 }
1585
1586 #[test]
1589 fn test_plan_inner_join() {
1590 let store = create_test_store();
1591 let planner = Planner::new(store);
1592
1593 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1595 items: vec![
1596 ReturnItem {
1597 expression: LogicalExpression::Variable("a".to_string()),
1598 alias: None,
1599 },
1600 ReturnItem {
1601 expression: LogicalExpression::Variable("b".to_string()),
1602 alias: None,
1603 },
1604 ],
1605 distinct: false,
1606 input: Box::new(LogicalOperator::Join(JoinOp {
1607 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1608 variable: "a".to_string(),
1609 label: Some("Person".to_string()),
1610 input: None,
1611 })),
1612 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1613 variable: "b".to_string(),
1614 label: Some("Company".to_string()),
1615 input: None,
1616 })),
1617 join_type: JoinType::Inner,
1618 conditions: vec![JoinCondition {
1619 left: LogicalExpression::Variable("a".to_string()),
1620 right: LogicalExpression::Variable("b".to_string()),
1621 }],
1622 })),
1623 }));
1624
1625 let physical = planner.plan(&logical).unwrap();
1626 assert!(physical.columns().contains(&"a".to_string()));
1627 assert!(physical.columns().contains(&"b".to_string()));
1628 }
1629
1630 #[test]
1631 fn test_plan_cross_join() {
1632 let store = create_test_store();
1633 let planner = Planner::new(store);
1634
1635 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1637 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1638 variable: "a".to_string(),
1639 label: None,
1640 input: None,
1641 })),
1642 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1643 variable: "b".to_string(),
1644 label: None,
1645 input: None,
1646 })),
1647 join_type: JoinType::Cross,
1648 conditions: vec![],
1649 }));
1650
1651 let physical = planner.plan(&logical).unwrap();
1652 assert_eq!(physical.columns().len(), 2);
1653 }
1654
1655 #[test]
1656 fn test_plan_left_join() {
1657 let store = create_test_store();
1658 let planner = Planner::new(store);
1659
1660 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1661 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1662 variable: "a".to_string(),
1663 label: None,
1664 input: None,
1665 })),
1666 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1667 variable: "b".to_string(),
1668 label: None,
1669 input: None,
1670 })),
1671 join_type: JoinType::Left,
1672 conditions: vec![],
1673 }));
1674
1675 let physical = planner.plan(&logical).unwrap();
1676 assert_eq!(physical.columns().len(), 2);
1677 }
1678
1679 fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1682 let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStore>);
1683 p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1684 p
1685 }
1686
1687 #[test]
1688 fn test_plan_create_node() {
1689 let store = create_test_store();
1690 let planner = create_writable_planner(&store);
1691
1692 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1694 variable: "n".to_string(),
1695 labels: vec!["Person".to_string()],
1696 properties: vec![(
1697 "name".to_string(),
1698 LogicalExpression::Literal(Value::String("Alix".into())),
1699 )],
1700 input: None,
1701 }));
1702
1703 let physical = planner.plan(&logical).unwrap();
1704 assert!(physical.columns().contains(&"n".to_string()));
1705 }
1706
1707 #[test]
1708 fn test_plan_create_edge() {
1709 let store = create_test_store();
1710 let planner = create_writable_planner(&store);
1711
1712 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1714 variable: Some("r".to_string()),
1715 from_variable: "a".to_string(),
1716 to_variable: "b".to_string(),
1717 edge_type: "KNOWS".to_string(),
1718 properties: vec![],
1719 input: Box::new(LogicalOperator::Join(JoinOp {
1720 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1721 variable: "a".to_string(),
1722 label: None,
1723 input: None,
1724 })),
1725 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1726 variable: "b".to_string(),
1727 label: None,
1728 input: None,
1729 })),
1730 join_type: JoinType::Cross,
1731 conditions: vec![],
1732 })),
1733 }));
1734
1735 let physical = planner.plan(&logical).unwrap();
1736 assert!(physical.columns().contains(&"r".to_string()));
1737 }
1738
1739 #[test]
1740 fn test_plan_delete_node() {
1741 let store = create_test_store();
1742 let planner = create_writable_planner(&store);
1743
1744 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1746 variable: "n".to_string(),
1747 detach: false,
1748 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1749 variable: "n".to_string(),
1750 label: None,
1751 input: None,
1752 })),
1753 }));
1754
1755 let physical = planner.plan(&logical).unwrap();
1756 assert!(physical.columns().contains(&"n".to_string()));
1757 }
1758
1759 #[test]
1762 fn test_plan_empty_errors() {
1763 let store = create_test_store();
1764 let planner = Planner::new(store);
1765
1766 let logical = LogicalPlan::new(LogicalOperator::Empty);
1767 let result = planner.plan(&logical);
1768 assert!(result.is_err());
1769 }
1770
1771 #[test]
1772 fn test_plan_missing_variable_in_return() {
1773 let store = create_test_store();
1774 let planner = Planner::new(store);
1775
1776 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1778 items: vec![ReturnItem {
1779 expression: LogicalExpression::Variable("missing".to_string()),
1780 alias: None,
1781 }],
1782 distinct: false,
1783 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1784 variable: "n".to_string(),
1785 label: None,
1786 input: None,
1787 })),
1788 }));
1789
1790 let result = planner.plan(&logical);
1791 assert!(result.is_err());
1792 }
1793
1794 #[test]
1797 fn test_convert_binary_ops() {
1798 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1799 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1800 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1801 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1802 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1803 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1804 assert!(convert_binary_op(BinaryOp::And).is_ok());
1805 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1806 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1807 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1808 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1809 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1810 }
1811
1812 #[test]
1813 fn test_convert_unary_ops() {
1814 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1815 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1816 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1817 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1818 }
1819
1820 #[test]
1821 fn test_convert_aggregate_functions() {
1822 assert!(matches!(
1823 convert_aggregate_function(LogicalAggregateFunction::Count),
1824 PhysicalAggregateFunction::Count
1825 ));
1826 assert!(matches!(
1827 convert_aggregate_function(LogicalAggregateFunction::Sum),
1828 PhysicalAggregateFunction::Sum
1829 ));
1830 assert!(matches!(
1831 convert_aggregate_function(LogicalAggregateFunction::Avg),
1832 PhysicalAggregateFunction::Avg
1833 ));
1834 assert!(matches!(
1835 convert_aggregate_function(LogicalAggregateFunction::Min),
1836 PhysicalAggregateFunction::Min
1837 ));
1838 assert!(matches!(
1839 convert_aggregate_function(LogicalAggregateFunction::Max),
1840 PhysicalAggregateFunction::Max
1841 ));
1842 }
1843
1844 #[test]
1845 fn test_planner_accessors() {
1846 let store = create_test_store();
1847 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1848
1849 assert!(planner.transaction_id().is_none());
1850 assert!(planner.transaction_manager().is_none());
1851 let _ = planner.viewing_epoch(); }
1853
1854 #[test]
1855 fn test_physical_plan_accessors() {
1856 let store = create_test_store();
1857 let planner = Planner::new(store);
1858
1859 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1860 variable: "n".to_string(),
1861 label: None,
1862 input: None,
1863 }));
1864
1865 let physical = planner.plan(&logical).unwrap();
1866 assert_eq!(physical.columns(), &["n"]);
1867
1868 let _ = physical.into_operator();
1870 }
1871
1872 #[test]
1875 fn test_plan_adaptive_with_scan() {
1876 let store = create_test_store();
1877 let planner = Planner::new(store);
1878
1879 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1881 items: vec![ReturnItem {
1882 expression: LogicalExpression::Variable("n".to_string()),
1883 alias: None,
1884 }],
1885 distinct: false,
1886 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1887 variable: "n".to_string(),
1888 label: Some("Person".to_string()),
1889 input: None,
1890 })),
1891 }));
1892
1893 let physical = planner.plan_adaptive(&logical).unwrap();
1894 assert_eq!(physical.columns(), &["n"]);
1895 assert!(physical.adaptive_context.is_some());
1897 }
1898
1899 #[test]
1900 fn test_plan_adaptive_with_filter() {
1901 let store = create_test_store();
1902 let planner = Planner::new(store);
1903
1904 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1906 items: vec![ReturnItem {
1907 expression: LogicalExpression::Variable("n".to_string()),
1908 alias: None,
1909 }],
1910 distinct: false,
1911 input: Box::new(LogicalOperator::Filter(FilterOp {
1912 predicate: LogicalExpression::Binary {
1913 left: Box::new(LogicalExpression::Property {
1914 variable: "n".to_string(),
1915 property: "age".to_string(),
1916 }),
1917 op: BinaryOp::Gt,
1918 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1919 },
1920 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1921 variable: "n".to_string(),
1922 label: None,
1923 input: None,
1924 })),
1925 pushdown_hint: None,
1926 })),
1927 }));
1928
1929 let physical = planner.plan_adaptive(&logical).unwrap();
1930 assert!(physical.adaptive_context.is_some());
1931 }
1932
1933 #[test]
1934 fn test_plan_adaptive_with_expand() {
1935 let store = create_test_store();
1936 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
1937 .with_factorized_execution(false);
1938
1939 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1941 items: vec![
1942 ReturnItem {
1943 expression: LogicalExpression::Variable("a".to_string()),
1944 alias: None,
1945 },
1946 ReturnItem {
1947 expression: LogicalExpression::Variable("b".to_string()),
1948 alias: None,
1949 },
1950 ],
1951 distinct: false,
1952 input: Box::new(LogicalOperator::Expand(ExpandOp {
1953 from_variable: "a".to_string(),
1954 to_variable: "b".to_string(),
1955 edge_variable: None,
1956 direction: ExpandDirection::Outgoing,
1957 edge_types: vec!["KNOWS".to_string()],
1958 min_hops: 1,
1959 max_hops: Some(1),
1960 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1961 variable: "a".to_string(),
1962 label: None,
1963 input: None,
1964 })),
1965 path_alias: None,
1966 path_mode: PathMode::Walk,
1967 })),
1968 }));
1969
1970 let physical = planner.plan_adaptive(&logical).unwrap();
1971 assert!(physical.adaptive_context.is_some());
1972 }
1973
1974 #[test]
1975 fn test_plan_adaptive_with_join() {
1976 let store = create_test_store();
1977 let planner = Planner::new(store);
1978
1979 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1980 items: vec![
1981 ReturnItem {
1982 expression: LogicalExpression::Variable("a".to_string()),
1983 alias: None,
1984 },
1985 ReturnItem {
1986 expression: LogicalExpression::Variable("b".to_string()),
1987 alias: None,
1988 },
1989 ],
1990 distinct: false,
1991 input: Box::new(LogicalOperator::Join(JoinOp {
1992 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1993 variable: "a".to_string(),
1994 label: None,
1995 input: None,
1996 })),
1997 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1998 variable: "b".to_string(),
1999 label: None,
2000 input: None,
2001 })),
2002 join_type: JoinType::Cross,
2003 conditions: vec![],
2004 })),
2005 }));
2006
2007 let physical = planner.plan_adaptive(&logical).unwrap();
2008 assert!(physical.adaptive_context.is_some());
2009 }
2010
2011 #[test]
2012 fn test_plan_adaptive_with_aggregate() {
2013 let store = create_test_store();
2014 let planner = Planner::new(store);
2015
2016 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2017 group_by: vec![],
2018 aggregates: vec![LogicalAggregateExpr {
2019 function: LogicalAggregateFunction::Count,
2020 expression: Some(LogicalExpression::Variable("n".to_string())),
2021 expression2: None,
2022 distinct: false,
2023 alias: Some("cnt".to_string()),
2024 percentile: None,
2025 separator: None,
2026 }],
2027 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2028 variable: "n".to_string(),
2029 label: None,
2030 input: None,
2031 })),
2032 having: None,
2033 }));
2034
2035 let physical = planner.plan_adaptive(&logical).unwrap();
2036 assert!(physical.adaptive_context.is_some());
2037 }
2038
2039 #[test]
2040 fn test_plan_adaptive_with_distinct() {
2041 let store = create_test_store();
2042 let planner = Planner::new(store);
2043
2044 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2045 items: vec![ReturnItem {
2046 expression: LogicalExpression::Variable("n".to_string()),
2047 alias: None,
2048 }],
2049 distinct: false,
2050 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2051 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2052 variable: "n".to_string(),
2053 label: None,
2054 input: None,
2055 })),
2056 columns: None,
2057 })),
2058 }));
2059
2060 let physical = planner.plan_adaptive(&logical).unwrap();
2061 assert!(physical.adaptive_context.is_some());
2062 }
2063
2064 #[test]
2065 fn test_plan_adaptive_with_limit() {
2066 let store = create_test_store();
2067 let planner = Planner::new(store);
2068
2069 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2070 items: vec![ReturnItem {
2071 expression: LogicalExpression::Variable("n".to_string()),
2072 alias: None,
2073 }],
2074 distinct: false,
2075 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2076 count: 10.into(),
2077 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2078 variable: "n".to_string(),
2079 label: None,
2080 input: None,
2081 })),
2082 })),
2083 }));
2084
2085 let physical = planner.plan_adaptive(&logical).unwrap();
2086 assert!(physical.adaptive_context.is_some());
2087 }
2088
2089 #[test]
2090 fn test_plan_adaptive_with_skip() {
2091 let store = create_test_store();
2092 let planner = Planner::new(store);
2093
2094 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2095 items: vec![ReturnItem {
2096 expression: LogicalExpression::Variable("n".to_string()),
2097 alias: None,
2098 }],
2099 distinct: false,
2100 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2101 count: 5.into(),
2102 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2103 variable: "n".to_string(),
2104 label: None,
2105 input: None,
2106 })),
2107 })),
2108 }));
2109
2110 let physical = planner.plan_adaptive(&logical).unwrap();
2111 assert!(physical.adaptive_context.is_some());
2112 }
2113
2114 #[test]
2115 fn test_plan_adaptive_with_sort() {
2116 let store = create_test_store();
2117 let planner = Planner::new(store);
2118
2119 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2120 items: vec![ReturnItem {
2121 expression: LogicalExpression::Variable("n".to_string()),
2122 alias: None,
2123 }],
2124 distinct: false,
2125 input: Box::new(LogicalOperator::Sort(SortOp {
2126 keys: vec![SortKey {
2127 expression: LogicalExpression::Variable("n".to_string()),
2128 order: SortOrder::Ascending,
2129 nulls: None,
2130 }],
2131 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2132 variable: "n".to_string(),
2133 label: None,
2134 input: None,
2135 })),
2136 })),
2137 }));
2138
2139 let physical = planner.plan_adaptive(&logical).unwrap();
2140 assert!(physical.adaptive_context.is_some());
2141 }
2142
2143 #[test]
2144 fn test_plan_adaptive_with_union() {
2145 let store = create_test_store();
2146 let planner = Planner::new(store);
2147
2148 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2149 items: vec![ReturnItem {
2150 expression: LogicalExpression::Variable("n".to_string()),
2151 alias: None,
2152 }],
2153 distinct: false,
2154 input: Box::new(LogicalOperator::Union(UnionOp {
2155 inputs: vec![
2156 LogicalOperator::NodeScan(NodeScanOp {
2157 variable: "n".to_string(),
2158 label: Some("Person".to_string()),
2159 input: None,
2160 }),
2161 LogicalOperator::NodeScan(NodeScanOp {
2162 variable: "n".to_string(),
2163 label: Some("Company".to_string()),
2164 input: None,
2165 }),
2166 ],
2167 })),
2168 }));
2169
2170 let physical = planner.plan_adaptive(&logical).unwrap();
2171 assert!(physical.adaptive_context.is_some());
2172 }
2173
2174 #[test]
2177 fn test_plan_expand_variable_length() {
2178 let store = create_test_store();
2179 let planner = Planner::new(store);
2180
2181 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2183 items: vec![
2184 ReturnItem {
2185 expression: LogicalExpression::Variable("a".to_string()),
2186 alias: None,
2187 },
2188 ReturnItem {
2189 expression: LogicalExpression::Variable("b".to_string()),
2190 alias: None,
2191 },
2192 ],
2193 distinct: false,
2194 input: Box::new(LogicalOperator::Expand(ExpandOp {
2195 from_variable: "a".to_string(),
2196 to_variable: "b".to_string(),
2197 edge_variable: None,
2198 direction: ExpandDirection::Outgoing,
2199 edge_types: vec!["KNOWS".to_string()],
2200 min_hops: 1,
2201 max_hops: Some(3),
2202 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2203 variable: "a".to_string(),
2204 label: None,
2205 input: None,
2206 })),
2207 path_alias: None,
2208 path_mode: PathMode::Walk,
2209 })),
2210 }));
2211
2212 let physical = planner.plan(&logical).unwrap();
2213 assert!(physical.columns().contains(&"a".to_string()));
2214 assert!(physical.columns().contains(&"b".to_string()));
2215 }
2216
2217 #[test]
2218 fn test_plan_expand_with_path_alias() {
2219 let store = create_test_store();
2220 let planner = Planner::new(store);
2221
2222 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2224 items: vec![
2225 ReturnItem {
2226 expression: LogicalExpression::Variable("a".to_string()),
2227 alias: None,
2228 },
2229 ReturnItem {
2230 expression: LogicalExpression::Variable("b".to_string()),
2231 alias: None,
2232 },
2233 ],
2234 distinct: false,
2235 input: Box::new(LogicalOperator::Expand(ExpandOp {
2236 from_variable: "a".to_string(),
2237 to_variable: "b".to_string(),
2238 edge_variable: None,
2239 direction: ExpandDirection::Outgoing,
2240 edge_types: vec!["KNOWS".to_string()],
2241 min_hops: 1,
2242 max_hops: Some(3),
2243 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2244 variable: "a".to_string(),
2245 label: None,
2246 input: None,
2247 })),
2248 path_alias: Some("p".to_string()),
2249 path_mode: PathMode::Walk,
2250 })),
2251 }));
2252
2253 let physical = planner.plan(&logical).unwrap();
2254 assert!(physical.columns().contains(&"a".to_string()));
2256 assert!(physical.columns().contains(&"b".to_string()));
2257 }
2258
2259 #[test]
2260 fn test_plan_expand_incoming() {
2261 let store = create_test_store();
2262 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2263 .with_factorized_execution(false);
2264
2265 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2267 items: vec![
2268 ReturnItem {
2269 expression: LogicalExpression::Variable("a".to_string()),
2270 alias: None,
2271 },
2272 ReturnItem {
2273 expression: LogicalExpression::Variable("b".to_string()),
2274 alias: None,
2275 },
2276 ],
2277 distinct: false,
2278 input: Box::new(LogicalOperator::Expand(ExpandOp {
2279 from_variable: "a".to_string(),
2280 to_variable: "b".to_string(),
2281 edge_variable: None,
2282 direction: ExpandDirection::Incoming,
2283 edge_types: vec!["KNOWS".to_string()],
2284 min_hops: 1,
2285 max_hops: Some(1),
2286 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2287 variable: "a".to_string(),
2288 label: None,
2289 input: None,
2290 })),
2291 path_alias: None,
2292 path_mode: PathMode::Walk,
2293 })),
2294 }));
2295
2296 let physical = planner.plan(&logical).unwrap();
2297 assert!(physical.columns().contains(&"a".to_string()));
2298 assert!(physical.columns().contains(&"b".to_string()));
2299 }
2300
2301 #[test]
2302 fn test_plan_expand_both_directions() {
2303 let store = create_test_store();
2304 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2305 .with_factorized_execution(false);
2306
2307 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2309 items: vec![
2310 ReturnItem {
2311 expression: LogicalExpression::Variable("a".to_string()),
2312 alias: None,
2313 },
2314 ReturnItem {
2315 expression: LogicalExpression::Variable("b".to_string()),
2316 alias: None,
2317 },
2318 ],
2319 distinct: false,
2320 input: Box::new(LogicalOperator::Expand(ExpandOp {
2321 from_variable: "a".to_string(),
2322 to_variable: "b".to_string(),
2323 edge_variable: None,
2324 direction: ExpandDirection::Both,
2325 edge_types: vec!["KNOWS".to_string()],
2326 min_hops: 1,
2327 max_hops: Some(1),
2328 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2329 variable: "a".to_string(),
2330 label: None,
2331 input: None,
2332 })),
2333 path_alias: None,
2334 path_mode: PathMode::Walk,
2335 })),
2336 }));
2337
2338 let physical = planner.plan(&logical).unwrap();
2339 assert!(physical.columns().contains(&"a".to_string()));
2340 assert!(physical.columns().contains(&"b".to_string()));
2341 }
2342
2343 #[test]
2346 fn test_planner_with_context() {
2347 use crate::transaction::TransactionManager;
2348
2349 let store = create_test_store();
2350 let transaction_manager = Arc::new(TransactionManager::new());
2351 let transaction_id = transaction_manager.begin();
2352 let epoch = transaction_manager.current_epoch();
2353
2354 let planner = Planner::with_context(
2355 Arc::clone(&store) as Arc<dyn GraphStore>,
2356 Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2357 Arc::clone(&transaction_manager),
2358 Some(transaction_id),
2359 epoch,
2360 );
2361
2362 assert_eq!(planner.transaction_id(), Some(transaction_id));
2363 assert!(planner.transaction_manager().is_some());
2364 assert_eq!(planner.viewing_epoch(), epoch);
2365 }
2366
2367 #[test]
2368 fn test_planner_with_factorized_execution_disabled() {
2369 let store = create_test_store();
2370 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2371 .with_factorized_execution(false);
2372
2373 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2375 items: vec![
2376 ReturnItem {
2377 expression: LogicalExpression::Variable("a".to_string()),
2378 alias: None,
2379 },
2380 ReturnItem {
2381 expression: LogicalExpression::Variable("c".to_string()),
2382 alias: None,
2383 },
2384 ],
2385 distinct: false,
2386 input: Box::new(LogicalOperator::Expand(ExpandOp {
2387 from_variable: "b".to_string(),
2388 to_variable: "c".to_string(),
2389 edge_variable: None,
2390 direction: ExpandDirection::Outgoing,
2391 edge_types: vec![],
2392 min_hops: 1,
2393 max_hops: Some(1),
2394 input: Box::new(LogicalOperator::Expand(ExpandOp {
2395 from_variable: "a".to_string(),
2396 to_variable: "b".to_string(),
2397 edge_variable: None,
2398 direction: ExpandDirection::Outgoing,
2399 edge_types: vec![],
2400 min_hops: 1,
2401 max_hops: Some(1),
2402 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2403 variable: "a".to_string(),
2404 label: None,
2405 input: None,
2406 })),
2407 path_alias: None,
2408 path_mode: PathMode::Walk,
2409 })),
2410 path_alias: None,
2411 path_mode: PathMode::Walk,
2412 })),
2413 }));
2414
2415 let physical = planner.plan(&logical).unwrap();
2416 assert!(physical.columns().contains(&"a".to_string()));
2417 assert!(physical.columns().contains(&"c".to_string()));
2418 }
2419
2420 #[test]
2423 fn test_plan_sort_by_property() {
2424 let store = create_test_store();
2425 let planner = Planner::new(store);
2426
2427 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2429 items: vec![ReturnItem {
2430 expression: LogicalExpression::Variable("n".to_string()),
2431 alias: None,
2432 }],
2433 distinct: false,
2434 input: Box::new(LogicalOperator::Sort(SortOp {
2435 keys: vec![SortKey {
2436 expression: LogicalExpression::Property {
2437 variable: "n".to_string(),
2438 property: "name".to_string(),
2439 },
2440 order: SortOrder::Ascending,
2441 nulls: None,
2442 }],
2443 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2444 variable: "n".to_string(),
2445 label: None,
2446 input: None,
2447 })),
2448 })),
2449 }));
2450
2451 let physical = planner.plan(&logical).unwrap();
2452 assert!(physical.columns().contains(&"n".to_string()));
2454 }
2455
2456 #[test]
2459 fn test_plan_scan_with_input() {
2460 let store = create_test_store();
2461 let planner = Planner::new(store);
2462
2463 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2465 items: vec![
2466 ReturnItem {
2467 expression: LogicalExpression::Variable("a".to_string()),
2468 alias: None,
2469 },
2470 ReturnItem {
2471 expression: LogicalExpression::Variable("b".to_string()),
2472 alias: None,
2473 },
2474 ],
2475 distinct: false,
2476 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2477 variable: "b".to_string(),
2478 label: Some("Company".to_string()),
2479 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2480 variable: "a".to_string(),
2481 label: Some("Person".to_string()),
2482 input: None,
2483 }))),
2484 })),
2485 }));
2486
2487 let physical = planner.plan(&logical).unwrap();
2488 assert!(physical.columns().contains(&"a".to_string()));
2489 assert!(physical.columns().contains(&"b".to_string()));
2490 }
2491}