1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34 DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35 ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36 FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43 VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53 convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57struct RangeBounds<'a> {
59 min: Option<&'a Value>,
60 max: Option<&'a Value>,
61 min_inclusive: bool,
62 max_inclusive: bool,
63}
64
65pub struct Planner {
67 pub(super) store: Arc<dyn GraphStore>,
69 pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
71 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
73 pub(super) transaction_id: Option<TransactionId>,
75 pub(super) viewing_epoch: EpochId,
77 pub(super) anon_edge_counter: std::cell::Cell<u32>,
79 pub(super) factorized_execution: bool,
81 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
87 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
89 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
91 pub(super) correlated_param_state:
95 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
96 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
99 profiling: std::cell::Cell<bool>,
101 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
103 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
105 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
107 pub(super) read_only: bool,
111}
112
113impl Planner {
114 #[must_use]
119 pub fn new(store: Arc<dyn GraphStore>) -> Self {
120 let epoch = store.current_epoch();
121 Self {
122 store,
123 write_store: None,
124 transaction_manager: None,
125 transaction_id: None,
126 viewing_epoch: epoch,
127 anon_edge_counter: std::cell::Cell::new(0),
128 factorized_execution: true,
129 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131 validator: None,
132 catalog: None,
133 correlated_param_state: std::cell::RefCell::new(None),
134 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
135 profiling: std::cell::Cell::new(false),
136 profile_entries: std::cell::RefCell::new(Vec::new()),
137 write_tracker: None,
138 session_context: grafeo_core::execution::operators::SessionContext::default(),
139 read_only: false,
140 }
141 }
142
143 #[must_use]
145 pub fn with_context(
146 store: Arc<dyn GraphStore>,
147 write_store: Option<Arc<dyn GraphStoreMut>>,
148 transaction_manager: Arc<TransactionManager>,
149 transaction_id: Option<TransactionId>,
150 viewing_epoch: EpochId,
151 ) -> Self {
152 use crate::transaction::TransactionWriteTracker;
153
154 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
156 if transaction_id.is_some() {
157 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
158 &transaction_manager,
159 ))))
160 } else {
161 None
162 };
163
164 Self {
165 store,
166 write_store,
167 transaction_manager: Some(transaction_manager),
168 transaction_id,
169 viewing_epoch,
170 anon_edge_counter: std::cell::Cell::new(0),
171 factorized_execution: true,
172 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
173 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
174 validator: None,
175 catalog: None,
176 correlated_param_state: std::cell::RefCell::new(None),
177 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
178 profiling: std::cell::Cell::new(false),
179 profile_entries: std::cell::RefCell::new(Vec::new()),
180 write_tracker,
181 session_context: grafeo_core::execution::operators::SessionContext::default(),
182 read_only: false,
183 }
184 }
185
186 #[must_use]
189 pub fn with_read_only(mut self, read_only: bool) -> Self {
190 self.read_only = read_only;
191 self
192 }
193
194 fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
196 self.write_store
197 .as_ref()
198 .map(Arc::clone)
199 .ok_or(Error::Transaction(
200 grafeo_common::utils::error::TransactionError::ReadOnly,
201 ))
202 }
203
204 #[must_use]
206 pub fn viewing_epoch(&self) -> EpochId {
207 self.viewing_epoch
208 }
209
210 #[must_use]
212 pub fn transaction_id(&self) -> Option<TransactionId> {
213 self.transaction_id
214 }
215
216 #[must_use]
218 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
219 self.transaction_manager.as_ref()
220 }
221
222 #[must_use]
224 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
225 self.factorized_execution = enabled;
226 self
227 }
228
229 #[must_use]
231 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
232 self.validator = Some(validator);
233 self
234 }
235
236 #[must_use]
238 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
239 self.catalog = Some(catalog);
240 self
241 }
242
243 #[must_use]
245 pub fn with_session_context(
246 mut self,
247 context: grafeo_core::execution::operators::SessionContext,
248 ) -> Self {
249 self.session_context = context;
250 self
251 }
252
253 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
257 match op {
258 LogicalOperator::Expand(expand) => {
259 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
260
261 if is_single_hop {
262 let (inner_count, base) = Self::count_expand_chain(&expand.input);
263 (inner_count + 1, base)
264 } else {
265 (0, op)
266 }
267 }
268 _ => (0, op),
269 }
270 }
271
272 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
276 let mut chain = Vec::new();
277 let mut current = op;
278
279 while let LogicalOperator::Expand(expand) = current {
280 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
281 if !is_single_hop {
282 break;
283 }
284 chain.push(expand);
285 current = &expand.input;
286 }
287
288 chain.reverse();
289 chain
290 }
291
292 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
299 let _span = grafeo_debug_span!("grafeo::query::plan");
300 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
301 Ok(PhysicalPlan {
302 operator,
303 columns,
304 adaptive_context: None,
305 })
306 }
307
308 pub fn plan_profiled(
319 &self,
320 logical_plan: &LogicalPlan,
321 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
322 self.profiling.set(true);
323 self.profile_entries.borrow_mut().clear();
324
325 let result = self.plan_operator(&logical_plan.root);
326
327 self.profiling.set(false);
328 let (operator, columns) = result?;
329 let entries = self.profile_entries.borrow_mut().drain(..).collect();
330
331 Ok((
332 PhysicalPlan {
333 operator,
334 columns,
335 adaptive_context: None,
336 },
337 entries,
338 ))
339 }
340
341 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
348 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
349
350 let mut adaptive_context = AdaptiveContext::new();
351 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
352
353 Ok(PhysicalPlan {
354 operator,
355 columns,
356 adaptive_context: Some(adaptive_context),
357 })
358 }
359
360 fn collect_cardinality_estimates(
362 &self,
363 op: &LogicalOperator,
364 ctx: &mut AdaptiveContext,
365 depth: usize,
366 ) {
367 match op {
368 LogicalOperator::NodeScan(scan) => {
369 let estimate = if let Some(label) = &scan.label {
370 self.store.nodes_by_label(label).len() as f64
371 } else {
372 self.store.node_count() as f64
373 };
374 let id = format!("scan_{}", scan.variable);
375 ctx.set_estimate(&id, estimate);
376
377 if let Some(input) = &scan.input {
378 self.collect_cardinality_estimates(input, ctx, depth + 1);
379 }
380 }
381 LogicalOperator::Filter(filter) => {
382 let input_estimate = self.estimate_cardinality(&filter.input);
383 let estimate = input_estimate * 0.3;
384 let id = format!("filter_{depth}");
385 ctx.set_estimate(&id, estimate);
386
387 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
388 }
389 LogicalOperator::Expand(expand) => {
390 let input_estimate = self.estimate_cardinality(&expand.input);
391 let stats = self.store.statistics();
392 let avg_degree = self.estimate_expand_degree(&stats, expand);
393 let estimate = input_estimate * avg_degree;
394 let id = format!("expand_{}", expand.to_variable);
395 ctx.set_estimate(&id, estimate);
396
397 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
398 }
399 LogicalOperator::Join(join) => {
400 let left_est = self.estimate_cardinality(&join.left);
401 let right_est = self.estimate_cardinality(&join.right);
402 let estimate = (left_est * right_est).sqrt();
403 let id = format!("join_{depth}");
404 ctx.set_estimate(&id, estimate);
405
406 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
407 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
408 }
409 LogicalOperator::Aggregate(agg) => {
410 let input_estimate = self.estimate_cardinality(&agg.input);
411 let estimate = if agg.group_by.is_empty() {
412 1.0
413 } else {
414 (input_estimate * 0.1).max(1.0)
415 };
416 let id = format!("aggregate_{depth}");
417 ctx.set_estimate(&id, estimate);
418
419 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
420 }
421 LogicalOperator::Distinct(distinct) => {
422 let input_estimate = self.estimate_cardinality(&distinct.input);
423 let estimate = (input_estimate * 0.5).max(1.0);
424 let id = format!("distinct_{depth}");
425 ctx.set_estimate(&id, estimate);
426
427 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
428 }
429 LogicalOperator::Return(ret) => {
430 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
431 }
432 LogicalOperator::Limit(limit) => {
433 let input_estimate = self.estimate_cardinality(&limit.input);
434 let estimate = (input_estimate).min(limit.count.estimate());
435 let id = format!("limit_{depth}");
436 ctx.set_estimate(&id, estimate);
437
438 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
439 }
440 LogicalOperator::Skip(skip) => {
441 let input_estimate = self.estimate_cardinality(&skip.input);
442 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
443 let id = format!("skip_{depth}");
444 ctx.set_estimate(&id, estimate);
445
446 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
447 }
448 LogicalOperator::Sort(sort) => {
449 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
450 }
451 LogicalOperator::Union(union) => {
452 let estimate: f64 = union
453 .inputs
454 .iter()
455 .map(|input| self.estimate_cardinality(input))
456 .sum();
457 let id = format!("union_{depth}");
458 ctx.set_estimate(&id, estimate);
459
460 for input in &union.inputs {
461 self.collect_cardinality_estimates(input, ctx, depth + 1);
462 }
463 }
464 _ => {
465 }
467 }
468 }
469
470 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
472 match op {
473 LogicalOperator::NodeScan(scan) => {
474 if let Some(label) = &scan.label {
475 self.store.nodes_by_label(label).len() as f64
476 } else {
477 self.store.node_count() as f64
478 }
479 }
480 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
481 LogicalOperator::Expand(expand) => {
482 let stats = self.store.statistics();
483 let avg_degree = self.estimate_expand_degree(&stats, expand);
484 self.estimate_cardinality(&expand.input) * avg_degree
485 }
486 LogicalOperator::Join(join) => {
487 let left = self.estimate_cardinality(&join.left);
488 let right = self.estimate_cardinality(&join.right);
489 (left * right).sqrt()
490 }
491 LogicalOperator::Aggregate(agg) => {
492 if agg.group_by.is_empty() {
493 1.0
494 } else {
495 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
496 }
497 }
498 LogicalOperator::Distinct(distinct) => {
499 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
500 }
501 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
502 LogicalOperator::Limit(limit) => self
503 .estimate_cardinality(&limit.input)
504 .min(limit.count.estimate()),
505 LogicalOperator::Skip(skip) => {
506 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
507 }
508 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
509 LogicalOperator::Union(union) => union
510 .inputs
511 .iter()
512 .map(|input| self.estimate_cardinality(input))
513 .sum(),
514 LogicalOperator::Except(except) => {
515 let left = self.estimate_cardinality(&except.left);
516 let right = self.estimate_cardinality(&except.right);
517 (left - right).max(0.0)
518 }
519 LogicalOperator::Intersect(intersect) => {
520 let left = self.estimate_cardinality(&intersect.left);
521 let right = self.estimate_cardinality(&intersect.right);
522 left.min(right)
523 }
524 LogicalOperator::Otherwise(otherwise) => self
525 .estimate_cardinality(&otherwise.left)
526 .max(self.estimate_cardinality(&otherwise.right)),
527 _ => 1000.0,
528 }
529 }
530
531 fn estimate_expand_degree(
533 &self,
534 stats: &grafeo_core::statistics::Statistics,
535 expand: &ExpandOp,
536 ) -> f64 {
537 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
538 if expand.edge_types.len() == 1 {
539 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
540 } else if stats.total_nodes > 0 {
541 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
542 } else {
543 10.0
544 }
545 }
546
547 fn maybe_profile(
550 &self,
551 result: Result<(Box<dyn Operator>, Vec<String>)>,
552 op: &LogicalOperator,
553 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
554 if self.profiling.get() {
555 let (physical, columns) = result?;
556 let (entry, stats) =
557 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
558 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
559 self.profile_entries.borrow_mut().push(entry);
560 Ok((Box::new(profiled), columns))
561 } else {
562 result
563 }
564 }
565
566 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
568 let result = match op {
569 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
570 LogicalOperator::Expand(expand) => {
571 if self.factorized_execution {
572 let (chain_len, _base) = Self::count_expand_chain(op);
573 if chain_len >= 2 {
574 return self.maybe_profile(self.plan_expand_chain(op), op);
575 }
576 }
577 self.plan_expand(expand)
578 }
579 LogicalOperator::Return(ret) => self.plan_return(ret),
580 LogicalOperator::Filter(filter) => self.plan_filter(filter),
581 LogicalOperator::Project(project) => self.plan_project(project),
582 LogicalOperator::Limit(limit) => self.plan_limit(limit),
583 LogicalOperator::Skip(skip) => self.plan_skip(skip),
584 LogicalOperator::Sort(sort) => self.plan_sort(sort),
585 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
586 LogicalOperator::Join(join) => self.plan_join(join),
587 LogicalOperator::Union(union) => self.plan_union(union),
588 LogicalOperator::Except(except) => self.plan_except(except),
589 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
590 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
591 LogicalOperator::Apply(apply) => self.plan_apply(apply),
592 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
593 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
594 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
595 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
596 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
597 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
598 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
599 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
600 LogicalOperator::Merge(merge) => self.plan_merge(merge),
601 LogicalOperator::MergeRelationship(merge_rel) => {
602 self.plan_merge_relationship(merge_rel)
603 }
604 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
605 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
606 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
607 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
608 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
609 #[cfg(feature = "algos")]
610 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
611 #[cfg(not(feature = "algos"))]
612 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
613 "CALL procedures require the 'algos' feature".to_string(),
614 )),
615 LogicalOperator::ParameterScan(_param_scan) => {
616 let state = self
617 .correlated_param_state
618 .borrow()
619 .clone()
620 .ok_or_else(|| {
621 Error::Internal(
622 "ParameterScan without correlated Apply context".to_string(),
623 )
624 })?;
625 let columns = state.columns.clone();
628 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
629 Ok((operator, columns))
630 }
631 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
632 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
633 LogicalOperator::LoadData(load) => {
634 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
635 load.path.clone(),
636 load.format,
637 load.with_headers,
638 load.field_terminator,
639 load.variable.clone(),
640 ));
641 Ok((operator, vec![load.variable.clone()]))
642 }
643 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
644 LogicalOperator::VectorScan(_) => Err(Error::Internal(
645 "VectorScan requires vector-index feature".to_string(),
646 )),
647 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
648 "VectorJoin requires vector-index feature".to_string(),
649 )),
650 _ => Err(Error::Internal(format!(
651 "Unsupported operator: {:?}",
652 std::mem::discriminant(op)
653 ))),
654 };
655 self.maybe_profile(result, op)
656 }
657
658 fn plan_horizontal_aggregate(
660 &self,
661 ha: &HorizontalAggregateOp,
662 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
663 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
664
665 let list_col_idx = child_columns
666 .iter()
667 .position(|c| c == &ha.list_column)
668 .ok_or_else(|| {
669 Error::Internal(format!(
670 "HorizontalAggregate list column '{}' not found in {:?}",
671 ha.list_column, child_columns
672 ))
673 })?;
674
675 let entity_kind = match ha.entity_kind {
676 LogicalEntityKind::Edge => EntityKind::Edge,
677 LogicalEntityKind::Node => EntityKind::Node,
678 };
679
680 let function = convert_aggregate_function(ha.function);
681 let input_column_count = child_columns.len();
682
683 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
684 child_op,
685 list_col_idx,
686 entity_kind,
687 function,
688 ha.property.clone(),
689 Arc::clone(&self.store) as Arc<dyn GraphStore>,
690 input_column_count,
691 ));
692
693 let mut columns = child_columns;
694 columns.push(ha.alias.clone());
695 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
697
698 Ok((operator, columns))
699 }
700
701 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
703 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
704 let key_idx = child_columns
705 .iter()
706 .position(|c| c == &mc.key_var)
707 .ok_or_else(|| {
708 Error::Internal(format!(
709 "MapCollect key '{}' not in columns {:?}",
710 mc.key_var, child_columns
711 ))
712 })?;
713 let value_idx = child_columns
714 .iter()
715 .position(|c| c == &mc.value_var)
716 .ok_or_else(|| {
717 Error::Internal(format!(
718 "MapCollect value '{}' not in columns {:?}",
719 mc.value_var, child_columns
720 ))
721 })?;
722 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
723 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
724 Ok((operator, vec![mc.alias.clone()]))
725 }
726}
727
728#[cfg(feature = "algos")]
730struct StaticResultOperator {
731 rows: Vec<Vec<Value>>,
732 column_indices: Vec<usize>,
733 row_index: usize,
734}
735
736#[cfg(feature = "algos")]
737impl Operator for StaticResultOperator {
738 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
739 use grafeo_core::execution::DataChunk;
740
741 if self.row_index >= self.rows.len() {
742 return Ok(None);
743 }
744
745 let remaining = self.rows.len() - self.row_index;
746 let chunk_rows = remaining.min(1024);
747 let col_count = self.column_indices.len();
748
749 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
750 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
751
752 for row_offset in 0..chunk_rows {
753 let row = &self.rows[self.row_index + row_offset];
754 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
755 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
756 if let Some(col) = chunk.column_mut(col_idx) {
757 col.push_value(value);
758 }
759 }
760 }
761 chunk.set_count(chunk_rows);
762
763 self.row_index += chunk_rows;
764 Ok(Some(chunk))
765 }
766
767 fn reset(&mut self) {
768 self.row_index = 0;
769 }
770
771 fn name(&self) -> &'static str {
772 "StaticResult"
773 }
774}
775
776#[cfg(test)]
777mod tests {
778 use super::*;
779 use crate::query::plan::{
780 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
781 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
782 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
783 SkipOp as LogicalSkipOp, SortKey, SortOp,
784 };
785 use grafeo_common::types::Value;
786 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
787 use grafeo_core::graph::GraphStoreMut;
788 use grafeo_core::graph::lpg::LpgStore;
789
790 fn create_test_store() -> Arc<LpgStore> {
791 let store = Arc::new(LpgStore::new().unwrap());
792 store.create_node(&["Person"]);
793 store.create_node(&["Person"]);
794 store.create_node(&["Company"]);
795 store
796 }
797
798 #[test]
801 fn test_plan_simple_scan() {
802 let store = create_test_store();
803 let planner = Planner::new(store);
804
805 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
807 items: vec![ReturnItem {
808 expression: LogicalExpression::Variable("n".to_string()),
809 alias: None,
810 }],
811 distinct: false,
812 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
813 variable: "n".to_string(),
814 label: Some("Person".to_string()),
815 input: None,
816 })),
817 }));
818
819 let physical = planner.plan(&logical).unwrap();
820 assert_eq!(physical.columns(), &["n"]);
821 }
822
823 #[test]
824 fn test_plan_scan_without_label() {
825 let store = create_test_store();
826 let planner = Planner::new(store);
827
828 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
830 items: vec![ReturnItem {
831 expression: LogicalExpression::Variable("n".to_string()),
832 alias: None,
833 }],
834 distinct: false,
835 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
836 variable: "n".to_string(),
837 label: None,
838 input: None,
839 })),
840 }));
841
842 let physical = planner.plan(&logical).unwrap();
843 assert_eq!(physical.columns(), &["n"]);
844 }
845
846 #[test]
847 fn test_plan_return_with_alias() {
848 let store = create_test_store();
849 let planner = Planner::new(store);
850
851 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
853 items: vec![ReturnItem {
854 expression: LogicalExpression::Variable("n".to_string()),
855 alias: Some("person".to_string()),
856 }],
857 distinct: false,
858 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
859 variable: "n".to_string(),
860 label: Some("Person".to_string()),
861 input: None,
862 })),
863 }));
864
865 let physical = planner.plan(&logical).unwrap();
866 assert_eq!(physical.columns(), &["person"]);
867 }
868
869 #[test]
870 fn test_plan_return_property() {
871 let store = create_test_store();
872 let planner = Planner::new(store);
873
874 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
876 items: vec![ReturnItem {
877 expression: LogicalExpression::Property {
878 variable: "n".to_string(),
879 property: "name".to_string(),
880 },
881 alias: None,
882 }],
883 distinct: false,
884 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
885 variable: "n".to_string(),
886 label: Some("Person".to_string()),
887 input: None,
888 })),
889 }));
890
891 let physical = planner.plan(&logical).unwrap();
892 assert_eq!(physical.columns(), &["n.name"]);
893 }
894
895 #[test]
896 fn test_plan_return_literal() {
897 let store = create_test_store();
898 let planner = Planner::new(store);
899
900 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
902 items: vec![ReturnItem {
903 expression: LogicalExpression::Literal(Value::Int64(42)),
904 alias: Some("answer".to_string()),
905 }],
906 distinct: false,
907 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
908 variable: "n".to_string(),
909 label: None,
910 input: None,
911 })),
912 }));
913
914 let physical = planner.plan(&logical).unwrap();
915 assert_eq!(physical.columns(), &["answer"]);
916 }
917
918 #[test]
921 fn test_plan_filter_equality() {
922 let store = create_test_store();
923 let planner = Planner::new(store);
924
925 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
927 items: vec![ReturnItem {
928 expression: LogicalExpression::Variable("n".to_string()),
929 alias: None,
930 }],
931 distinct: false,
932 input: Box::new(LogicalOperator::Filter(FilterOp {
933 predicate: LogicalExpression::Binary {
934 left: Box::new(LogicalExpression::Property {
935 variable: "n".to_string(),
936 property: "age".to_string(),
937 }),
938 op: BinaryOp::Eq,
939 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
940 },
941 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
942 variable: "n".to_string(),
943 label: Some("Person".to_string()),
944 input: None,
945 })),
946 pushdown_hint: None,
947 })),
948 }));
949
950 let physical = planner.plan(&logical).unwrap();
951 assert_eq!(physical.columns(), &["n"]);
952 }
953
954 #[test]
955 fn test_plan_filter_compound_and() {
956 let store = create_test_store();
957 let planner = Planner::new(store);
958
959 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
961 items: vec![ReturnItem {
962 expression: LogicalExpression::Variable("n".to_string()),
963 alias: None,
964 }],
965 distinct: false,
966 input: Box::new(LogicalOperator::Filter(FilterOp {
967 predicate: LogicalExpression::Binary {
968 left: Box::new(LogicalExpression::Binary {
969 left: Box::new(LogicalExpression::Property {
970 variable: "n".to_string(),
971 property: "age".to_string(),
972 }),
973 op: BinaryOp::Gt,
974 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
975 }),
976 op: BinaryOp::And,
977 right: Box::new(LogicalExpression::Binary {
978 left: Box::new(LogicalExpression::Property {
979 variable: "n".to_string(),
980 property: "age".to_string(),
981 }),
982 op: BinaryOp::Lt,
983 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
984 }),
985 },
986 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
987 variable: "n".to_string(),
988 label: None,
989 input: None,
990 })),
991 pushdown_hint: None,
992 })),
993 }));
994
995 let physical = planner.plan(&logical).unwrap();
996 assert_eq!(physical.columns(), &["n"]);
997 }
998
999 #[test]
1000 fn test_plan_filter_unary_not() {
1001 let store = create_test_store();
1002 let planner = Planner::new(store);
1003
1004 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1006 items: vec![ReturnItem {
1007 expression: LogicalExpression::Variable("n".to_string()),
1008 alias: None,
1009 }],
1010 distinct: false,
1011 input: Box::new(LogicalOperator::Filter(FilterOp {
1012 predicate: LogicalExpression::Unary {
1013 op: UnaryOp::Not,
1014 operand: Box::new(LogicalExpression::Property {
1015 variable: "n".to_string(),
1016 property: "active".to_string(),
1017 }),
1018 },
1019 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1020 variable: "n".to_string(),
1021 label: None,
1022 input: None,
1023 })),
1024 pushdown_hint: None,
1025 })),
1026 }));
1027
1028 let physical = planner.plan(&logical).unwrap();
1029 assert_eq!(physical.columns(), &["n"]);
1030 }
1031
1032 #[test]
1033 fn test_plan_filter_is_null() {
1034 let store = create_test_store();
1035 let planner = Planner::new(store);
1036
1037 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1039 items: vec![ReturnItem {
1040 expression: LogicalExpression::Variable("n".to_string()),
1041 alias: None,
1042 }],
1043 distinct: false,
1044 input: Box::new(LogicalOperator::Filter(FilterOp {
1045 predicate: LogicalExpression::Unary {
1046 op: UnaryOp::IsNull,
1047 operand: Box::new(LogicalExpression::Property {
1048 variable: "n".to_string(),
1049 property: "email".to_string(),
1050 }),
1051 },
1052 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1053 variable: "n".to_string(),
1054 label: None,
1055 input: None,
1056 })),
1057 pushdown_hint: None,
1058 })),
1059 }));
1060
1061 let physical = planner.plan(&logical).unwrap();
1062 assert_eq!(physical.columns(), &["n"]);
1063 }
1064
1065 #[test]
1066 fn test_plan_filter_function_call() {
1067 let store = create_test_store();
1068 let planner = Planner::new(store);
1069
1070 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1072 items: vec![ReturnItem {
1073 expression: LogicalExpression::Variable("n".to_string()),
1074 alias: None,
1075 }],
1076 distinct: false,
1077 input: Box::new(LogicalOperator::Filter(FilterOp {
1078 predicate: LogicalExpression::Binary {
1079 left: Box::new(LogicalExpression::FunctionCall {
1080 name: "size".to_string(),
1081 args: vec![LogicalExpression::Property {
1082 variable: "n".to_string(),
1083 property: "friends".to_string(),
1084 }],
1085 distinct: false,
1086 }),
1087 op: BinaryOp::Gt,
1088 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1089 },
1090 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1091 variable: "n".to_string(),
1092 label: None,
1093 input: None,
1094 })),
1095 pushdown_hint: None,
1096 })),
1097 }));
1098
1099 let physical = planner.plan(&logical).unwrap();
1100 assert_eq!(physical.columns(), &["n"]);
1101 }
1102
1103 #[test]
1106 fn test_plan_expand_outgoing() {
1107 let store = create_test_store();
1108 let planner = Planner::new(store);
1109
1110 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1112 items: vec![
1113 ReturnItem {
1114 expression: LogicalExpression::Variable("a".to_string()),
1115 alias: None,
1116 },
1117 ReturnItem {
1118 expression: LogicalExpression::Variable("b".to_string()),
1119 alias: None,
1120 },
1121 ],
1122 distinct: false,
1123 input: Box::new(LogicalOperator::Expand(ExpandOp {
1124 from_variable: "a".to_string(),
1125 to_variable: "b".to_string(),
1126 edge_variable: None,
1127 direction: ExpandDirection::Outgoing,
1128 edge_types: vec!["KNOWS".to_string()],
1129 min_hops: 1,
1130 max_hops: Some(1),
1131 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1132 variable: "a".to_string(),
1133 label: Some("Person".to_string()),
1134 input: None,
1135 })),
1136 path_alias: None,
1137 path_mode: PathMode::Walk,
1138 })),
1139 }));
1140
1141 let physical = planner.plan(&logical).unwrap();
1142 assert!(physical.columns().contains(&"a".to_string()));
1144 assert!(physical.columns().contains(&"b".to_string()));
1145 }
1146
1147 #[test]
1148 fn test_plan_expand_with_edge_variable() {
1149 let store = create_test_store();
1150 let planner = Planner::new(store);
1151
1152 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1154 items: vec![
1155 ReturnItem {
1156 expression: LogicalExpression::Variable("a".to_string()),
1157 alias: None,
1158 },
1159 ReturnItem {
1160 expression: LogicalExpression::Variable("r".to_string()),
1161 alias: None,
1162 },
1163 ReturnItem {
1164 expression: LogicalExpression::Variable("b".to_string()),
1165 alias: None,
1166 },
1167 ],
1168 distinct: false,
1169 input: Box::new(LogicalOperator::Expand(ExpandOp {
1170 from_variable: "a".to_string(),
1171 to_variable: "b".to_string(),
1172 edge_variable: Some("r".to_string()),
1173 direction: ExpandDirection::Outgoing,
1174 edge_types: vec!["KNOWS".to_string()],
1175 min_hops: 1,
1176 max_hops: Some(1),
1177 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1178 variable: "a".to_string(),
1179 label: None,
1180 input: None,
1181 })),
1182 path_alias: None,
1183 path_mode: PathMode::Walk,
1184 })),
1185 }));
1186
1187 let physical = planner.plan(&logical).unwrap();
1188 assert!(physical.columns().contains(&"a".to_string()));
1189 assert!(physical.columns().contains(&"r".to_string()));
1190 assert!(physical.columns().contains(&"b".to_string()));
1191 }
1192
1193 #[test]
1196 fn test_plan_limit() {
1197 let store = create_test_store();
1198 let planner = Planner::new(store);
1199
1200 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1202 items: vec![ReturnItem {
1203 expression: LogicalExpression::Variable("n".to_string()),
1204 alias: None,
1205 }],
1206 distinct: false,
1207 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1208 count: 10.into(),
1209 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1210 variable: "n".to_string(),
1211 label: None,
1212 input: None,
1213 })),
1214 })),
1215 }));
1216
1217 let physical = planner.plan(&logical).unwrap();
1218 assert_eq!(physical.columns(), &["n"]);
1219 }
1220
1221 #[test]
1222 fn test_plan_skip() {
1223 let store = create_test_store();
1224 let planner = Planner::new(store);
1225
1226 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1228 items: vec![ReturnItem {
1229 expression: LogicalExpression::Variable("n".to_string()),
1230 alias: None,
1231 }],
1232 distinct: false,
1233 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1234 count: 5.into(),
1235 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1236 variable: "n".to_string(),
1237 label: None,
1238 input: None,
1239 })),
1240 })),
1241 }));
1242
1243 let physical = planner.plan(&logical).unwrap();
1244 assert_eq!(physical.columns(), &["n"]);
1245 }
1246
1247 #[test]
1248 fn test_plan_sort() {
1249 let store = create_test_store();
1250 let planner = Planner::new(store);
1251
1252 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1254 items: vec![ReturnItem {
1255 expression: LogicalExpression::Variable("n".to_string()),
1256 alias: None,
1257 }],
1258 distinct: false,
1259 input: Box::new(LogicalOperator::Sort(SortOp {
1260 keys: vec![SortKey {
1261 expression: LogicalExpression::Variable("n".to_string()),
1262 order: SortOrder::Ascending,
1263 nulls: None,
1264 }],
1265 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1266 variable: "n".to_string(),
1267 label: None,
1268 input: None,
1269 })),
1270 })),
1271 }));
1272
1273 let physical = planner.plan(&logical).unwrap();
1274 assert_eq!(physical.columns(), &["n"]);
1275 }
1276
1277 #[test]
1278 fn test_plan_sort_descending() {
1279 let store = create_test_store();
1280 let planner = Planner::new(store);
1281
1282 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1284 items: vec![ReturnItem {
1285 expression: LogicalExpression::Variable("n".to_string()),
1286 alias: None,
1287 }],
1288 distinct: false,
1289 input: Box::new(LogicalOperator::Sort(SortOp {
1290 keys: vec![SortKey {
1291 expression: LogicalExpression::Variable("n".to_string()),
1292 order: SortOrder::Descending,
1293 nulls: None,
1294 }],
1295 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1296 variable: "n".to_string(),
1297 label: None,
1298 input: None,
1299 })),
1300 })),
1301 }));
1302
1303 let physical = planner.plan(&logical).unwrap();
1304 assert_eq!(physical.columns(), &["n"]);
1305 }
1306
1307 #[test]
1308 fn test_plan_distinct() {
1309 let store = create_test_store();
1310 let planner = Planner::new(store);
1311
1312 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1314 items: vec![ReturnItem {
1315 expression: LogicalExpression::Variable("n".to_string()),
1316 alias: None,
1317 }],
1318 distinct: false,
1319 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1320 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1321 variable: "n".to_string(),
1322 label: None,
1323 input: None,
1324 })),
1325 columns: None,
1326 })),
1327 }));
1328
1329 let physical = planner.plan(&logical).unwrap();
1330 assert_eq!(physical.columns(), &["n"]);
1331 }
1332
1333 #[test]
1334 fn test_plan_distinct_with_columns() {
1335 let store = create_test_store();
1336 let planner = Planner::new(store);
1337
1338 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1340 items: vec![ReturnItem {
1341 expression: LogicalExpression::Variable("n".to_string()),
1342 alias: None,
1343 }],
1344 distinct: false,
1345 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1346 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1347 variable: "n".to_string(),
1348 label: None,
1349 input: None,
1350 })),
1351 columns: Some(vec!["n".to_string()]),
1352 })),
1353 }));
1354
1355 let physical = planner.plan(&logical).unwrap();
1356 assert_eq!(physical.columns(), &["n"]);
1357 }
1358
1359 #[test]
1360 fn test_plan_distinct_with_nonexistent_columns() {
1361 let store = create_test_store();
1362 let planner = Planner::new(store);
1363
1364 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1367 items: vec![ReturnItem {
1368 expression: LogicalExpression::Variable("n".to_string()),
1369 alias: None,
1370 }],
1371 distinct: false,
1372 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1373 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1374 variable: "n".to_string(),
1375 label: None,
1376 input: None,
1377 })),
1378 columns: Some(vec!["nonexistent".to_string()]),
1379 })),
1380 }));
1381
1382 let physical = planner.plan(&logical).unwrap();
1383 assert_eq!(physical.columns(), &["n"]);
1384 }
1385
1386 #[test]
1389 fn test_plan_aggregate_count() {
1390 let store = create_test_store();
1391 let planner = Planner::new(store);
1392
1393 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1395 items: vec![ReturnItem {
1396 expression: LogicalExpression::Variable("cnt".to_string()),
1397 alias: None,
1398 }],
1399 distinct: false,
1400 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1401 group_by: vec![],
1402 aggregates: vec![LogicalAggregateExpr {
1403 function: LogicalAggregateFunction::Count,
1404 expression: Some(LogicalExpression::Variable("n".to_string())),
1405 expression2: None,
1406 distinct: false,
1407 alias: Some("cnt".to_string()),
1408 percentile: None,
1409 separator: None,
1410 }],
1411 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1412 variable: "n".to_string(),
1413 label: None,
1414 input: None,
1415 })),
1416 having: None,
1417 })),
1418 }));
1419
1420 let physical = planner.plan(&logical).unwrap();
1421 assert!(physical.columns().contains(&"cnt".to_string()));
1422 }
1423
1424 #[test]
1425 fn test_plan_aggregate_with_group_by() {
1426 let store = create_test_store();
1427 let planner = Planner::new(store);
1428
1429 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1431 group_by: vec![LogicalExpression::Property {
1432 variable: "n".to_string(),
1433 property: "city".to_string(),
1434 }],
1435 aggregates: vec![LogicalAggregateExpr {
1436 function: LogicalAggregateFunction::Count,
1437 expression: Some(LogicalExpression::Variable("n".to_string())),
1438 expression2: None,
1439 distinct: false,
1440 alias: Some("cnt".to_string()),
1441 percentile: None,
1442 separator: None,
1443 }],
1444 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1445 variable: "n".to_string(),
1446 label: Some("Person".to_string()),
1447 input: None,
1448 })),
1449 having: None,
1450 }));
1451
1452 let physical = planner.plan(&logical).unwrap();
1453 assert_eq!(physical.columns().len(), 2);
1454 }
1455
1456 #[test]
1457 fn test_plan_aggregate_sum() {
1458 let store = create_test_store();
1459 let planner = Planner::new(store);
1460
1461 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1463 group_by: vec![],
1464 aggregates: vec![LogicalAggregateExpr {
1465 function: LogicalAggregateFunction::Sum,
1466 expression: Some(LogicalExpression::Property {
1467 variable: "n".to_string(),
1468 property: "value".to_string(),
1469 }),
1470 expression2: None,
1471 distinct: false,
1472 alias: Some("total".to_string()),
1473 percentile: None,
1474 separator: None,
1475 }],
1476 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1477 variable: "n".to_string(),
1478 label: None,
1479 input: None,
1480 })),
1481 having: None,
1482 }));
1483
1484 let physical = planner.plan(&logical).unwrap();
1485 assert!(physical.columns().contains(&"total".to_string()));
1486 }
1487
1488 #[test]
1489 fn test_plan_aggregate_avg() {
1490 let store = create_test_store();
1491 let planner = Planner::new(store);
1492
1493 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1495 group_by: vec![],
1496 aggregates: vec![LogicalAggregateExpr {
1497 function: LogicalAggregateFunction::Avg,
1498 expression: Some(LogicalExpression::Property {
1499 variable: "n".to_string(),
1500 property: "score".to_string(),
1501 }),
1502 expression2: None,
1503 distinct: false,
1504 alias: Some("average".to_string()),
1505 percentile: None,
1506 separator: None,
1507 }],
1508 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1509 variable: "n".to_string(),
1510 label: None,
1511 input: None,
1512 })),
1513 having: None,
1514 }));
1515
1516 let physical = planner.plan(&logical).unwrap();
1517 assert!(physical.columns().contains(&"average".to_string()));
1518 }
1519
1520 #[test]
1521 fn test_plan_aggregate_min_max() {
1522 let store = create_test_store();
1523 let planner = Planner::new(store);
1524
1525 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1527 group_by: vec![],
1528 aggregates: vec![
1529 LogicalAggregateExpr {
1530 function: LogicalAggregateFunction::Min,
1531 expression: Some(LogicalExpression::Property {
1532 variable: "n".to_string(),
1533 property: "age".to_string(),
1534 }),
1535 expression2: None,
1536 distinct: false,
1537 alias: Some("youngest".to_string()),
1538 percentile: None,
1539 separator: None,
1540 },
1541 LogicalAggregateExpr {
1542 function: LogicalAggregateFunction::Max,
1543 expression: Some(LogicalExpression::Property {
1544 variable: "n".to_string(),
1545 property: "age".to_string(),
1546 }),
1547 expression2: None,
1548 distinct: false,
1549 alias: Some("oldest".to_string()),
1550 percentile: None,
1551 separator: None,
1552 },
1553 ],
1554 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1555 variable: "n".to_string(),
1556 label: None,
1557 input: None,
1558 })),
1559 having: None,
1560 }));
1561
1562 let physical = planner.plan(&logical).unwrap();
1563 assert!(physical.columns().contains(&"youngest".to_string()));
1564 assert!(physical.columns().contains(&"oldest".to_string()));
1565 }
1566
1567 #[test]
1570 fn test_plan_inner_join() {
1571 let store = create_test_store();
1572 let planner = Planner::new(store);
1573
1574 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1576 items: vec![
1577 ReturnItem {
1578 expression: LogicalExpression::Variable("a".to_string()),
1579 alias: None,
1580 },
1581 ReturnItem {
1582 expression: LogicalExpression::Variable("b".to_string()),
1583 alias: None,
1584 },
1585 ],
1586 distinct: false,
1587 input: Box::new(LogicalOperator::Join(JoinOp {
1588 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1589 variable: "a".to_string(),
1590 label: Some("Person".to_string()),
1591 input: None,
1592 })),
1593 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1594 variable: "b".to_string(),
1595 label: Some("Company".to_string()),
1596 input: None,
1597 })),
1598 join_type: JoinType::Inner,
1599 conditions: vec![JoinCondition {
1600 left: LogicalExpression::Variable("a".to_string()),
1601 right: LogicalExpression::Variable("b".to_string()),
1602 }],
1603 })),
1604 }));
1605
1606 let physical = planner.plan(&logical).unwrap();
1607 assert!(physical.columns().contains(&"a".to_string()));
1608 assert!(physical.columns().contains(&"b".to_string()));
1609 }
1610
1611 #[test]
1612 fn test_plan_cross_join() {
1613 let store = create_test_store();
1614 let planner = Planner::new(store);
1615
1616 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1618 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1619 variable: "a".to_string(),
1620 label: None,
1621 input: None,
1622 })),
1623 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1624 variable: "b".to_string(),
1625 label: None,
1626 input: None,
1627 })),
1628 join_type: JoinType::Cross,
1629 conditions: vec![],
1630 }));
1631
1632 let physical = planner.plan(&logical).unwrap();
1633 assert_eq!(physical.columns().len(), 2);
1634 }
1635
1636 #[test]
1637 fn test_plan_left_join() {
1638 let store = create_test_store();
1639 let planner = Planner::new(store);
1640
1641 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1642 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1643 variable: "a".to_string(),
1644 label: None,
1645 input: None,
1646 })),
1647 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1648 variable: "b".to_string(),
1649 label: None,
1650 input: None,
1651 })),
1652 join_type: JoinType::Left,
1653 conditions: vec![],
1654 }));
1655
1656 let physical = planner.plan(&logical).unwrap();
1657 assert_eq!(physical.columns().len(), 2);
1658 }
1659
1660 fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1663 let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStore>);
1664 p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1665 p
1666 }
1667
1668 #[test]
1669 fn test_plan_create_node() {
1670 let store = create_test_store();
1671 let planner = create_writable_planner(&store);
1672
1673 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1675 variable: "n".to_string(),
1676 labels: vec!["Person".to_string()],
1677 properties: vec![(
1678 "name".to_string(),
1679 LogicalExpression::Literal(Value::String("Alix".into())),
1680 )],
1681 input: None,
1682 }));
1683
1684 let physical = planner.plan(&logical).unwrap();
1685 assert!(physical.columns().contains(&"n".to_string()));
1686 }
1687
1688 #[test]
1689 fn test_plan_create_edge() {
1690 let store = create_test_store();
1691 let planner = create_writable_planner(&store);
1692
1693 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1695 variable: Some("r".to_string()),
1696 from_variable: "a".to_string(),
1697 to_variable: "b".to_string(),
1698 edge_type: "KNOWS".to_string(),
1699 properties: vec![],
1700 input: Box::new(LogicalOperator::Join(JoinOp {
1701 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1702 variable: "a".to_string(),
1703 label: None,
1704 input: None,
1705 })),
1706 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1707 variable: "b".to_string(),
1708 label: None,
1709 input: None,
1710 })),
1711 join_type: JoinType::Cross,
1712 conditions: vec![],
1713 })),
1714 }));
1715
1716 let physical = planner.plan(&logical).unwrap();
1717 assert!(physical.columns().contains(&"r".to_string()));
1718 }
1719
1720 #[test]
1721 fn test_plan_delete_node() {
1722 let store = create_test_store();
1723 let planner = create_writable_planner(&store);
1724
1725 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1727 variable: "n".to_string(),
1728 detach: false,
1729 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1730 variable: "n".to_string(),
1731 label: None,
1732 input: None,
1733 })),
1734 }));
1735
1736 let physical = planner.plan(&logical).unwrap();
1737 assert!(physical.columns().contains(&"n".to_string()));
1738 }
1739
1740 #[test]
1743 fn test_plan_empty_errors() {
1744 let store = create_test_store();
1745 let planner = Planner::new(store);
1746
1747 let logical = LogicalPlan::new(LogicalOperator::Empty);
1748 let result = planner.plan(&logical);
1749 assert!(result.is_err());
1750 }
1751
1752 #[test]
1753 fn test_plan_missing_variable_in_return() {
1754 let store = create_test_store();
1755 let planner = Planner::new(store);
1756
1757 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1759 items: vec![ReturnItem {
1760 expression: LogicalExpression::Variable("missing".to_string()),
1761 alias: None,
1762 }],
1763 distinct: false,
1764 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1765 variable: "n".to_string(),
1766 label: None,
1767 input: None,
1768 })),
1769 }));
1770
1771 let result = planner.plan(&logical);
1772 assert!(result.is_err());
1773 }
1774
1775 #[test]
1778 fn test_convert_binary_ops() {
1779 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1780 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1781 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1782 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1783 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1784 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1785 assert!(convert_binary_op(BinaryOp::And).is_ok());
1786 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1787 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1788 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1789 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1790 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1791 }
1792
1793 #[test]
1794 fn test_convert_unary_ops() {
1795 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1796 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1797 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1798 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1799 }
1800
1801 #[test]
1802 fn test_convert_aggregate_functions() {
1803 assert!(matches!(
1804 convert_aggregate_function(LogicalAggregateFunction::Count),
1805 PhysicalAggregateFunction::Count
1806 ));
1807 assert!(matches!(
1808 convert_aggregate_function(LogicalAggregateFunction::Sum),
1809 PhysicalAggregateFunction::Sum
1810 ));
1811 assert!(matches!(
1812 convert_aggregate_function(LogicalAggregateFunction::Avg),
1813 PhysicalAggregateFunction::Avg
1814 ));
1815 assert!(matches!(
1816 convert_aggregate_function(LogicalAggregateFunction::Min),
1817 PhysicalAggregateFunction::Min
1818 ));
1819 assert!(matches!(
1820 convert_aggregate_function(LogicalAggregateFunction::Max),
1821 PhysicalAggregateFunction::Max
1822 ));
1823 }
1824
1825 #[test]
1826 fn test_planner_accessors() {
1827 let store = create_test_store();
1828 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1829
1830 assert!(planner.transaction_id().is_none());
1831 assert!(planner.transaction_manager().is_none());
1832 let _ = planner.viewing_epoch(); }
1834
1835 #[test]
1836 fn test_physical_plan_accessors() {
1837 let store = create_test_store();
1838 let planner = Planner::new(store);
1839
1840 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1841 variable: "n".to_string(),
1842 label: None,
1843 input: None,
1844 }));
1845
1846 let physical = planner.plan(&logical).unwrap();
1847 assert_eq!(physical.columns(), &["n"]);
1848
1849 let _ = physical.into_operator();
1851 }
1852
1853 #[test]
1856 fn test_plan_adaptive_with_scan() {
1857 let store = create_test_store();
1858 let planner = Planner::new(store);
1859
1860 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1862 items: vec![ReturnItem {
1863 expression: LogicalExpression::Variable("n".to_string()),
1864 alias: None,
1865 }],
1866 distinct: false,
1867 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1868 variable: "n".to_string(),
1869 label: Some("Person".to_string()),
1870 input: None,
1871 })),
1872 }));
1873
1874 let physical = planner.plan_adaptive(&logical).unwrap();
1875 assert_eq!(physical.columns(), &["n"]);
1876 assert!(physical.adaptive_context.is_some());
1878 }
1879
1880 #[test]
1881 fn test_plan_adaptive_with_filter() {
1882 let store = create_test_store();
1883 let planner = Planner::new(store);
1884
1885 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1887 items: vec![ReturnItem {
1888 expression: LogicalExpression::Variable("n".to_string()),
1889 alias: None,
1890 }],
1891 distinct: false,
1892 input: Box::new(LogicalOperator::Filter(FilterOp {
1893 predicate: LogicalExpression::Binary {
1894 left: Box::new(LogicalExpression::Property {
1895 variable: "n".to_string(),
1896 property: "age".to_string(),
1897 }),
1898 op: BinaryOp::Gt,
1899 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1900 },
1901 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1902 variable: "n".to_string(),
1903 label: None,
1904 input: None,
1905 })),
1906 pushdown_hint: None,
1907 })),
1908 }));
1909
1910 let physical = planner.plan_adaptive(&logical).unwrap();
1911 assert!(physical.adaptive_context.is_some());
1912 }
1913
1914 #[test]
1915 fn test_plan_adaptive_with_expand() {
1916 let store = create_test_store();
1917 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
1918 .with_factorized_execution(false);
1919
1920 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1922 items: vec![
1923 ReturnItem {
1924 expression: LogicalExpression::Variable("a".to_string()),
1925 alias: None,
1926 },
1927 ReturnItem {
1928 expression: LogicalExpression::Variable("b".to_string()),
1929 alias: None,
1930 },
1931 ],
1932 distinct: false,
1933 input: Box::new(LogicalOperator::Expand(ExpandOp {
1934 from_variable: "a".to_string(),
1935 to_variable: "b".to_string(),
1936 edge_variable: None,
1937 direction: ExpandDirection::Outgoing,
1938 edge_types: vec!["KNOWS".to_string()],
1939 min_hops: 1,
1940 max_hops: Some(1),
1941 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1942 variable: "a".to_string(),
1943 label: None,
1944 input: None,
1945 })),
1946 path_alias: None,
1947 path_mode: PathMode::Walk,
1948 })),
1949 }));
1950
1951 let physical = planner.plan_adaptive(&logical).unwrap();
1952 assert!(physical.adaptive_context.is_some());
1953 }
1954
1955 #[test]
1956 fn test_plan_adaptive_with_join() {
1957 let store = create_test_store();
1958 let planner = Planner::new(store);
1959
1960 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1961 items: vec![
1962 ReturnItem {
1963 expression: LogicalExpression::Variable("a".to_string()),
1964 alias: None,
1965 },
1966 ReturnItem {
1967 expression: LogicalExpression::Variable("b".to_string()),
1968 alias: None,
1969 },
1970 ],
1971 distinct: false,
1972 input: Box::new(LogicalOperator::Join(JoinOp {
1973 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1974 variable: "a".to_string(),
1975 label: None,
1976 input: None,
1977 })),
1978 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1979 variable: "b".to_string(),
1980 label: None,
1981 input: None,
1982 })),
1983 join_type: JoinType::Cross,
1984 conditions: vec![],
1985 })),
1986 }));
1987
1988 let physical = planner.plan_adaptive(&logical).unwrap();
1989 assert!(physical.adaptive_context.is_some());
1990 }
1991
1992 #[test]
1993 fn test_plan_adaptive_with_aggregate() {
1994 let store = create_test_store();
1995 let planner = Planner::new(store);
1996
1997 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1998 group_by: vec![],
1999 aggregates: vec![LogicalAggregateExpr {
2000 function: LogicalAggregateFunction::Count,
2001 expression: Some(LogicalExpression::Variable("n".to_string())),
2002 expression2: None,
2003 distinct: false,
2004 alias: Some("cnt".to_string()),
2005 percentile: None,
2006 separator: None,
2007 }],
2008 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2009 variable: "n".to_string(),
2010 label: None,
2011 input: None,
2012 })),
2013 having: None,
2014 }));
2015
2016 let physical = planner.plan_adaptive(&logical).unwrap();
2017 assert!(physical.adaptive_context.is_some());
2018 }
2019
2020 #[test]
2021 fn test_plan_adaptive_with_distinct() {
2022 let store = create_test_store();
2023 let planner = Planner::new(store);
2024
2025 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2026 items: vec![ReturnItem {
2027 expression: LogicalExpression::Variable("n".to_string()),
2028 alias: None,
2029 }],
2030 distinct: false,
2031 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2032 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2033 variable: "n".to_string(),
2034 label: None,
2035 input: None,
2036 })),
2037 columns: None,
2038 })),
2039 }));
2040
2041 let physical = planner.plan_adaptive(&logical).unwrap();
2042 assert!(physical.adaptive_context.is_some());
2043 }
2044
2045 #[test]
2046 fn test_plan_adaptive_with_limit() {
2047 let store = create_test_store();
2048 let planner = Planner::new(store);
2049
2050 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2051 items: vec![ReturnItem {
2052 expression: LogicalExpression::Variable("n".to_string()),
2053 alias: None,
2054 }],
2055 distinct: false,
2056 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2057 count: 10.into(),
2058 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2059 variable: "n".to_string(),
2060 label: None,
2061 input: None,
2062 })),
2063 })),
2064 }));
2065
2066 let physical = planner.plan_adaptive(&logical).unwrap();
2067 assert!(physical.adaptive_context.is_some());
2068 }
2069
2070 #[test]
2071 fn test_plan_adaptive_with_skip() {
2072 let store = create_test_store();
2073 let planner = Planner::new(store);
2074
2075 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2076 items: vec![ReturnItem {
2077 expression: LogicalExpression::Variable("n".to_string()),
2078 alias: None,
2079 }],
2080 distinct: false,
2081 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2082 count: 5.into(),
2083 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2084 variable: "n".to_string(),
2085 label: None,
2086 input: None,
2087 })),
2088 })),
2089 }));
2090
2091 let physical = planner.plan_adaptive(&logical).unwrap();
2092 assert!(physical.adaptive_context.is_some());
2093 }
2094
2095 #[test]
2096 fn test_plan_adaptive_with_sort() {
2097 let store = create_test_store();
2098 let planner = Planner::new(store);
2099
2100 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2101 items: vec![ReturnItem {
2102 expression: LogicalExpression::Variable("n".to_string()),
2103 alias: None,
2104 }],
2105 distinct: false,
2106 input: Box::new(LogicalOperator::Sort(SortOp {
2107 keys: vec![SortKey {
2108 expression: LogicalExpression::Variable("n".to_string()),
2109 order: SortOrder::Ascending,
2110 nulls: None,
2111 }],
2112 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2113 variable: "n".to_string(),
2114 label: None,
2115 input: None,
2116 })),
2117 })),
2118 }));
2119
2120 let physical = planner.plan_adaptive(&logical).unwrap();
2121 assert!(physical.adaptive_context.is_some());
2122 }
2123
2124 #[test]
2125 fn test_plan_adaptive_with_union() {
2126 let store = create_test_store();
2127 let planner = Planner::new(store);
2128
2129 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2130 items: vec![ReturnItem {
2131 expression: LogicalExpression::Variable("n".to_string()),
2132 alias: None,
2133 }],
2134 distinct: false,
2135 input: Box::new(LogicalOperator::Union(UnionOp {
2136 inputs: vec![
2137 LogicalOperator::NodeScan(NodeScanOp {
2138 variable: "n".to_string(),
2139 label: Some("Person".to_string()),
2140 input: None,
2141 }),
2142 LogicalOperator::NodeScan(NodeScanOp {
2143 variable: "n".to_string(),
2144 label: Some("Company".to_string()),
2145 input: None,
2146 }),
2147 ],
2148 })),
2149 }));
2150
2151 let physical = planner.plan_adaptive(&logical).unwrap();
2152 assert!(physical.adaptive_context.is_some());
2153 }
2154
2155 #[test]
2158 fn test_plan_expand_variable_length() {
2159 let store = create_test_store();
2160 let planner = Planner::new(store);
2161
2162 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2164 items: vec![
2165 ReturnItem {
2166 expression: LogicalExpression::Variable("a".to_string()),
2167 alias: None,
2168 },
2169 ReturnItem {
2170 expression: LogicalExpression::Variable("b".to_string()),
2171 alias: None,
2172 },
2173 ],
2174 distinct: false,
2175 input: Box::new(LogicalOperator::Expand(ExpandOp {
2176 from_variable: "a".to_string(),
2177 to_variable: "b".to_string(),
2178 edge_variable: None,
2179 direction: ExpandDirection::Outgoing,
2180 edge_types: vec!["KNOWS".to_string()],
2181 min_hops: 1,
2182 max_hops: Some(3),
2183 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2184 variable: "a".to_string(),
2185 label: None,
2186 input: None,
2187 })),
2188 path_alias: None,
2189 path_mode: PathMode::Walk,
2190 })),
2191 }));
2192
2193 let physical = planner.plan(&logical).unwrap();
2194 assert!(physical.columns().contains(&"a".to_string()));
2195 assert!(physical.columns().contains(&"b".to_string()));
2196 }
2197
2198 #[test]
2199 fn test_plan_expand_with_path_alias() {
2200 let store = create_test_store();
2201 let planner = Planner::new(store);
2202
2203 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2205 items: vec![
2206 ReturnItem {
2207 expression: LogicalExpression::Variable("a".to_string()),
2208 alias: None,
2209 },
2210 ReturnItem {
2211 expression: LogicalExpression::Variable("b".to_string()),
2212 alias: None,
2213 },
2214 ],
2215 distinct: false,
2216 input: Box::new(LogicalOperator::Expand(ExpandOp {
2217 from_variable: "a".to_string(),
2218 to_variable: "b".to_string(),
2219 edge_variable: None,
2220 direction: ExpandDirection::Outgoing,
2221 edge_types: vec!["KNOWS".to_string()],
2222 min_hops: 1,
2223 max_hops: Some(3),
2224 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2225 variable: "a".to_string(),
2226 label: None,
2227 input: None,
2228 })),
2229 path_alias: Some("p".to_string()),
2230 path_mode: PathMode::Walk,
2231 })),
2232 }));
2233
2234 let physical = planner.plan(&logical).unwrap();
2235 assert!(physical.columns().contains(&"a".to_string()));
2237 assert!(physical.columns().contains(&"b".to_string()));
2238 }
2239
2240 #[test]
2241 fn test_plan_expand_incoming() {
2242 let store = create_test_store();
2243 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2244 .with_factorized_execution(false);
2245
2246 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2248 items: vec![
2249 ReturnItem {
2250 expression: LogicalExpression::Variable("a".to_string()),
2251 alias: None,
2252 },
2253 ReturnItem {
2254 expression: LogicalExpression::Variable("b".to_string()),
2255 alias: None,
2256 },
2257 ],
2258 distinct: false,
2259 input: Box::new(LogicalOperator::Expand(ExpandOp {
2260 from_variable: "a".to_string(),
2261 to_variable: "b".to_string(),
2262 edge_variable: None,
2263 direction: ExpandDirection::Incoming,
2264 edge_types: vec!["KNOWS".to_string()],
2265 min_hops: 1,
2266 max_hops: Some(1),
2267 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2268 variable: "a".to_string(),
2269 label: None,
2270 input: None,
2271 })),
2272 path_alias: None,
2273 path_mode: PathMode::Walk,
2274 })),
2275 }));
2276
2277 let physical = planner.plan(&logical).unwrap();
2278 assert!(physical.columns().contains(&"a".to_string()));
2279 assert!(physical.columns().contains(&"b".to_string()));
2280 }
2281
2282 #[test]
2283 fn test_plan_expand_both_directions() {
2284 let store = create_test_store();
2285 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2286 .with_factorized_execution(false);
2287
2288 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2290 items: vec![
2291 ReturnItem {
2292 expression: LogicalExpression::Variable("a".to_string()),
2293 alias: None,
2294 },
2295 ReturnItem {
2296 expression: LogicalExpression::Variable("b".to_string()),
2297 alias: None,
2298 },
2299 ],
2300 distinct: false,
2301 input: Box::new(LogicalOperator::Expand(ExpandOp {
2302 from_variable: "a".to_string(),
2303 to_variable: "b".to_string(),
2304 edge_variable: None,
2305 direction: ExpandDirection::Both,
2306 edge_types: vec!["KNOWS".to_string()],
2307 min_hops: 1,
2308 max_hops: Some(1),
2309 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2310 variable: "a".to_string(),
2311 label: None,
2312 input: None,
2313 })),
2314 path_alias: None,
2315 path_mode: PathMode::Walk,
2316 })),
2317 }));
2318
2319 let physical = planner.plan(&logical).unwrap();
2320 assert!(physical.columns().contains(&"a".to_string()));
2321 assert!(physical.columns().contains(&"b".to_string()));
2322 }
2323
2324 #[test]
2327 fn test_planner_with_context() {
2328 use crate::transaction::TransactionManager;
2329
2330 let store = create_test_store();
2331 let transaction_manager = Arc::new(TransactionManager::new());
2332 let transaction_id = transaction_manager.begin();
2333 let epoch = transaction_manager.current_epoch();
2334
2335 let planner = Planner::with_context(
2336 Arc::clone(&store) as Arc<dyn GraphStore>,
2337 Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2338 Arc::clone(&transaction_manager),
2339 Some(transaction_id),
2340 epoch,
2341 );
2342
2343 assert_eq!(planner.transaction_id(), Some(transaction_id));
2344 assert!(planner.transaction_manager().is_some());
2345 assert_eq!(planner.viewing_epoch(), epoch);
2346 }
2347
2348 #[test]
2349 fn test_planner_with_factorized_execution_disabled() {
2350 let store = create_test_store();
2351 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2352 .with_factorized_execution(false);
2353
2354 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2356 items: vec![
2357 ReturnItem {
2358 expression: LogicalExpression::Variable("a".to_string()),
2359 alias: None,
2360 },
2361 ReturnItem {
2362 expression: LogicalExpression::Variable("c".to_string()),
2363 alias: None,
2364 },
2365 ],
2366 distinct: false,
2367 input: Box::new(LogicalOperator::Expand(ExpandOp {
2368 from_variable: "b".to_string(),
2369 to_variable: "c".to_string(),
2370 edge_variable: None,
2371 direction: ExpandDirection::Outgoing,
2372 edge_types: vec![],
2373 min_hops: 1,
2374 max_hops: Some(1),
2375 input: Box::new(LogicalOperator::Expand(ExpandOp {
2376 from_variable: "a".to_string(),
2377 to_variable: "b".to_string(),
2378 edge_variable: None,
2379 direction: ExpandDirection::Outgoing,
2380 edge_types: vec![],
2381 min_hops: 1,
2382 max_hops: Some(1),
2383 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2384 variable: "a".to_string(),
2385 label: None,
2386 input: None,
2387 })),
2388 path_alias: None,
2389 path_mode: PathMode::Walk,
2390 })),
2391 path_alias: None,
2392 path_mode: PathMode::Walk,
2393 })),
2394 }));
2395
2396 let physical = planner.plan(&logical).unwrap();
2397 assert!(physical.columns().contains(&"a".to_string()));
2398 assert!(physical.columns().contains(&"c".to_string()));
2399 }
2400
2401 #[test]
2404 fn test_plan_sort_by_property() {
2405 let store = create_test_store();
2406 let planner = Planner::new(store);
2407
2408 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2410 items: vec![ReturnItem {
2411 expression: LogicalExpression::Variable("n".to_string()),
2412 alias: None,
2413 }],
2414 distinct: false,
2415 input: Box::new(LogicalOperator::Sort(SortOp {
2416 keys: vec![SortKey {
2417 expression: LogicalExpression::Property {
2418 variable: "n".to_string(),
2419 property: "name".to_string(),
2420 },
2421 order: SortOrder::Ascending,
2422 nulls: None,
2423 }],
2424 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2425 variable: "n".to_string(),
2426 label: None,
2427 input: None,
2428 })),
2429 })),
2430 }));
2431
2432 let physical = planner.plan(&logical).unwrap();
2433 assert!(physical.columns().contains(&"n".to_string()));
2435 }
2436
2437 #[test]
2440 fn test_plan_scan_with_input() {
2441 let store = create_test_store();
2442 let planner = Planner::new(store);
2443
2444 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2446 items: vec![
2447 ReturnItem {
2448 expression: LogicalExpression::Variable("a".to_string()),
2449 alias: None,
2450 },
2451 ReturnItem {
2452 expression: LogicalExpression::Variable("b".to_string()),
2453 alias: None,
2454 },
2455 ],
2456 distinct: false,
2457 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2458 variable: "b".to_string(),
2459 label: Some("Company".to_string()),
2460 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2461 variable: "a".to_string(),
2462 label: Some("Person".to_string()),
2463 input: None,
2464 }))),
2465 })),
2466 }));
2467
2468 let physical = planner.plan(&logical).unwrap();
2469 assert!(physical.columns().contains(&"a".to_string()));
2470 assert!(physical.columns().contains(&"b".to_string()));
2471 }
2472}