1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34 DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35 ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36 FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43 VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53 convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57struct RangeBounds<'a> {
59 min: Option<&'a Value>,
60 max: Option<&'a Value>,
61 min_inclusive: bool,
62 max_inclusive: bool,
63}
64
65pub struct Planner {
67 pub(super) store: Arc<dyn GraphStore>,
69 pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
71 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
73 pub(super) transaction_id: Option<TransactionId>,
75 pub(super) viewing_epoch: EpochId,
77 pub(super) anon_edge_counter: std::cell::Cell<u32>,
79 pub(super) factorized_execution: bool,
81 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
87 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
89 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
91 pub(super) correlated_param_state:
95 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
96 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
99 profiling: std::cell::Cell<bool>,
101 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
103 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
105 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
107 pub(super) read_only: bool,
111}
112
113impl Planner {
114 #[must_use]
119 pub fn new(store: Arc<dyn GraphStore>) -> Self {
120 let epoch = store.current_epoch();
121 Self {
122 store,
123 write_store: None,
124 transaction_manager: None,
125 transaction_id: None,
126 viewing_epoch: epoch,
127 anon_edge_counter: std::cell::Cell::new(0),
128 factorized_execution: true,
129 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131 validator: None,
132 catalog: None,
133 correlated_param_state: std::cell::RefCell::new(None),
134 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
135 profiling: std::cell::Cell::new(false),
136 profile_entries: std::cell::RefCell::new(Vec::new()),
137 write_tracker: None,
138 session_context: grafeo_core::execution::operators::SessionContext::default(),
139 read_only: false,
140 }
141 }
142
143 #[must_use]
145 pub fn with_context(
146 store: Arc<dyn GraphStore>,
147 write_store: Option<Arc<dyn GraphStoreMut>>,
148 transaction_manager: Arc<TransactionManager>,
149 transaction_id: Option<TransactionId>,
150 viewing_epoch: EpochId,
151 ) -> Self {
152 use crate::transaction::TransactionWriteTracker;
153
154 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
156 if transaction_id.is_some() {
157 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
158 &transaction_manager,
159 ))))
160 } else {
161 None
162 };
163
164 Self {
165 store,
166 write_store,
167 transaction_manager: Some(transaction_manager),
168 transaction_id,
169 viewing_epoch,
170 anon_edge_counter: std::cell::Cell::new(0),
171 factorized_execution: true,
172 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
173 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
174 validator: None,
175 catalog: None,
176 correlated_param_state: std::cell::RefCell::new(None),
177 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
178 profiling: std::cell::Cell::new(false),
179 profile_entries: std::cell::RefCell::new(Vec::new()),
180 write_tracker,
181 session_context: grafeo_core::execution::operators::SessionContext::default(),
182 read_only: false,
183 }
184 }
185
186 #[must_use]
189 pub fn with_read_only(mut self, read_only: bool) -> Self {
190 self.read_only = read_only;
191 self
192 }
193
194 fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
196 self.write_store
197 .as_ref()
198 .map(Arc::clone)
199 .ok_or(Error::Transaction(
200 grafeo_common::utils::error::TransactionError::ReadOnly,
201 ))
202 }
203
204 #[must_use]
206 pub fn viewing_epoch(&self) -> EpochId {
207 self.viewing_epoch
208 }
209
210 #[must_use]
212 pub fn transaction_id(&self) -> Option<TransactionId> {
213 self.transaction_id
214 }
215
216 #[must_use]
218 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
219 self.transaction_manager.as_ref()
220 }
221
222 #[must_use]
224 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
225 self.factorized_execution = enabled;
226 self
227 }
228
229 #[must_use]
231 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
232 self.validator = Some(validator);
233 self
234 }
235
236 #[must_use]
238 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
239 self.catalog = Some(catalog);
240 self
241 }
242
243 #[must_use]
245 pub fn with_session_context(
246 mut self,
247 context: grafeo_core::execution::operators::SessionContext,
248 ) -> Self {
249 self.session_context = context;
250 self
251 }
252
253 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
257 match op {
258 LogicalOperator::Expand(expand) => {
259 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
260
261 if is_single_hop {
262 let (inner_count, base) = Self::count_expand_chain(&expand.input);
263 (inner_count + 1, base)
264 } else {
265 (0, op)
266 }
267 }
268 _ => (0, op),
269 }
270 }
271
272 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
276 let mut chain = Vec::new();
277 let mut current = op;
278
279 while let LogicalOperator::Expand(expand) = current {
280 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
281 if !is_single_hop {
282 break;
283 }
284 chain.push(expand);
285 current = &expand.input;
286 }
287
288 chain.reverse();
289 chain
290 }
291
292 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
294 let _span = grafeo_debug_span!("grafeo::query::plan");
295 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
296 Ok(PhysicalPlan {
297 operator,
298 columns,
299 adaptive_context: None,
300 })
301 }
302
303 pub fn plan_profiled(
309 &self,
310 logical_plan: &LogicalPlan,
311 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
312 self.profiling.set(true);
313 self.profile_entries.borrow_mut().clear();
314
315 let result = self.plan_operator(&logical_plan.root);
316
317 self.profiling.set(false);
318 let (operator, columns) = result?;
319 let entries = self.profile_entries.borrow_mut().drain(..).collect();
320
321 Ok((
322 PhysicalPlan {
323 operator,
324 columns,
325 adaptive_context: None,
326 },
327 entries,
328 ))
329 }
330
331 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
333 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
334
335 let mut adaptive_context = AdaptiveContext::new();
336 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
337
338 Ok(PhysicalPlan {
339 operator,
340 columns,
341 adaptive_context: Some(adaptive_context),
342 })
343 }
344
345 fn collect_cardinality_estimates(
347 &self,
348 op: &LogicalOperator,
349 ctx: &mut AdaptiveContext,
350 depth: usize,
351 ) {
352 match op {
353 LogicalOperator::NodeScan(scan) => {
354 let estimate = if let Some(label) = &scan.label {
355 self.store.nodes_by_label(label).len() as f64
356 } else {
357 self.store.node_count() as f64
358 };
359 let id = format!("scan_{}", scan.variable);
360 ctx.set_estimate(&id, estimate);
361
362 if let Some(input) = &scan.input {
363 self.collect_cardinality_estimates(input, ctx, depth + 1);
364 }
365 }
366 LogicalOperator::Filter(filter) => {
367 let input_estimate = self.estimate_cardinality(&filter.input);
368 let estimate = input_estimate * 0.3;
369 let id = format!("filter_{depth}");
370 ctx.set_estimate(&id, estimate);
371
372 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
373 }
374 LogicalOperator::Expand(expand) => {
375 let input_estimate = self.estimate_cardinality(&expand.input);
376 let stats = self.store.statistics();
377 let avg_degree = self.estimate_expand_degree(&stats, expand);
378 let estimate = input_estimate * avg_degree;
379 let id = format!("expand_{}", expand.to_variable);
380 ctx.set_estimate(&id, estimate);
381
382 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
383 }
384 LogicalOperator::Join(join) => {
385 let left_est = self.estimate_cardinality(&join.left);
386 let right_est = self.estimate_cardinality(&join.right);
387 let estimate = (left_est * right_est).sqrt();
388 let id = format!("join_{depth}");
389 ctx.set_estimate(&id, estimate);
390
391 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
392 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
393 }
394 LogicalOperator::Aggregate(agg) => {
395 let input_estimate = self.estimate_cardinality(&agg.input);
396 let estimate = if agg.group_by.is_empty() {
397 1.0
398 } else {
399 (input_estimate * 0.1).max(1.0)
400 };
401 let id = format!("aggregate_{depth}");
402 ctx.set_estimate(&id, estimate);
403
404 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
405 }
406 LogicalOperator::Distinct(distinct) => {
407 let input_estimate = self.estimate_cardinality(&distinct.input);
408 let estimate = (input_estimate * 0.5).max(1.0);
409 let id = format!("distinct_{depth}");
410 ctx.set_estimate(&id, estimate);
411
412 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
413 }
414 LogicalOperator::Return(ret) => {
415 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
416 }
417 LogicalOperator::Limit(limit) => {
418 let input_estimate = self.estimate_cardinality(&limit.input);
419 let estimate = (input_estimate).min(limit.count.estimate());
420 let id = format!("limit_{depth}");
421 ctx.set_estimate(&id, estimate);
422
423 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
424 }
425 LogicalOperator::Skip(skip) => {
426 let input_estimate = self.estimate_cardinality(&skip.input);
427 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
428 let id = format!("skip_{depth}");
429 ctx.set_estimate(&id, estimate);
430
431 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
432 }
433 LogicalOperator::Sort(sort) => {
434 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
435 }
436 LogicalOperator::Union(union) => {
437 let estimate: f64 = union
438 .inputs
439 .iter()
440 .map(|input| self.estimate_cardinality(input))
441 .sum();
442 let id = format!("union_{depth}");
443 ctx.set_estimate(&id, estimate);
444
445 for input in &union.inputs {
446 self.collect_cardinality_estimates(input, ctx, depth + 1);
447 }
448 }
449 _ => {
450 }
452 }
453 }
454
455 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
457 match op {
458 LogicalOperator::NodeScan(scan) => {
459 if let Some(label) = &scan.label {
460 self.store.nodes_by_label(label).len() as f64
461 } else {
462 self.store.node_count() as f64
463 }
464 }
465 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
466 LogicalOperator::Expand(expand) => {
467 let stats = self.store.statistics();
468 let avg_degree = self.estimate_expand_degree(&stats, expand);
469 self.estimate_cardinality(&expand.input) * avg_degree
470 }
471 LogicalOperator::Join(join) => {
472 let left = self.estimate_cardinality(&join.left);
473 let right = self.estimate_cardinality(&join.right);
474 (left * right).sqrt()
475 }
476 LogicalOperator::Aggregate(agg) => {
477 if agg.group_by.is_empty() {
478 1.0
479 } else {
480 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
481 }
482 }
483 LogicalOperator::Distinct(distinct) => {
484 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
485 }
486 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
487 LogicalOperator::Limit(limit) => self
488 .estimate_cardinality(&limit.input)
489 .min(limit.count.estimate()),
490 LogicalOperator::Skip(skip) => {
491 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
492 }
493 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
494 LogicalOperator::Union(union) => union
495 .inputs
496 .iter()
497 .map(|input| self.estimate_cardinality(input))
498 .sum(),
499 LogicalOperator::Except(except) => {
500 let left = self.estimate_cardinality(&except.left);
501 let right = self.estimate_cardinality(&except.right);
502 (left - right).max(0.0)
503 }
504 LogicalOperator::Intersect(intersect) => {
505 let left = self.estimate_cardinality(&intersect.left);
506 let right = self.estimate_cardinality(&intersect.right);
507 left.min(right)
508 }
509 LogicalOperator::Otherwise(otherwise) => self
510 .estimate_cardinality(&otherwise.left)
511 .max(self.estimate_cardinality(&otherwise.right)),
512 _ => 1000.0,
513 }
514 }
515
516 fn estimate_expand_degree(
518 &self,
519 stats: &grafeo_core::statistics::Statistics,
520 expand: &ExpandOp,
521 ) -> f64 {
522 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
523 if expand.edge_types.len() == 1 {
524 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
525 } else if stats.total_nodes > 0 {
526 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
527 } else {
528 10.0
529 }
530 }
531
532 fn maybe_profile(
535 &self,
536 result: Result<(Box<dyn Operator>, Vec<String>)>,
537 op: &LogicalOperator,
538 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
539 if self.profiling.get() {
540 let (physical, columns) = result?;
541 let (entry, stats) =
542 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
543 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
544 self.profile_entries.borrow_mut().push(entry);
545 Ok((Box::new(profiled), columns))
546 } else {
547 result
548 }
549 }
550
551 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
553 let result = match op {
554 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
555 LogicalOperator::Expand(expand) => {
556 if self.factorized_execution {
557 let (chain_len, _base) = Self::count_expand_chain(op);
558 if chain_len >= 2 {
559 return self.maybe_profile(self.plan_expand_chain(op), op);
560 }
561 }
562 self.plan_expand(expand)
563 }
564 LogicalOperator::Return(ret) => self.plan_return(ret),
565 LogicalOperator::Filter(filter) => self.plan_filter(filter),
566 LogicalOperator::Project(project) => self.plan_project(project),
567 LogicalOperator::Limit(limit) => self.plan_limit(limit),
568 LogicalOperator::Skip(skip) => self.plan_skip(skip),
569 LogicalOperator::Sort(sort) => self.plan_sort(sort),
570 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
571 LogicalOperator::Join(join) => self.plan_join(join),
572 LogicalOperator::Union(union) => self.plan_union(union),
573 LogicalOperator::Except(except) => self.plan_except(except),
574 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
575 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
576 LogicalOperator::Apply(apply) => self.plan_apply(apply),
577 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
578 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
579 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
580 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
581 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
582 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
583 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
584 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
585 LogicalOperator::Merge(merge) => self.plan_merge(merge),
586 LogicalOperator::MergeRelationship(merge_rel) => {
587 self.plan_merge_relationship(merge_rel)
588 }
589 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
590 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
591 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
592 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
593 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
594 #[cfg(feature = "algos")]
595 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
596 #[cfg(not(feature = "algos"))]
597 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
598 "CALL procedures require the 'algos' feature".to_string(),
599 )),
600 LogicalOperator::ParameterScan(_param_scan) => {
601 let state = self
602 .correlated_param_state
603 .borrow()
604 .clone()
605 .ok_or_else(|| {
606 Error::Internal(
607 "ParameterScan without correlated Apply context".to_string(),
608 )
609 })?;
610 let columns = state.columns.clone();
613 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
614 Ok((operator, columns))
615 }
616 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
617 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
618 LogicalOperator::LoadData(load) => {
619 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
620 load.path.clone(),
621 load.format,
622 load.with_headers,
623 load.field_terminator,
624 load.variable.clone(),
625 ));
626 Ok((operator, vec![load.variable.clone()]))
627 }
628 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
629 LogicalOperator::VectorScan(_) => Err(Error::Internal(
630 "VectorScan requires vector-index feature".to_string(),
631 )),
632 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
633 "VectorJoin requires vector-index feature".to_string(),
634 )),
635 _ => Err(Error::Internal(format!(
636 "Unsupported operator: {:?}",
637 std::mem::discriminant(op)
638 ))),
639 };
640 self.maybe_profile(result, op)
641 }
642
643 fn plan_horizontal_aggregate(
645 &self,
646 ha: &HorizontalAggregateOp,
647 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
648 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
649
650 let list_col_idx = child_columns
651 .iter()
652 .position(|c| c == &ha.list_column)
653 .ok_or_else(|| {
654 Error::Internal(format!(
655 "HorizontalAggregate list column '{}' not found in {:?}",
656 ha.list_column, child_columns
657 ))
658 })?;
659
660 let entity_kind = match ha.entity_kind {
661 LogicalEntityKind::Edge => EntityKind::Edge,
662 LogicalEntityKind::Node => EntityKind::Node,
663 };
664
665 let function = convert_aggregate_function(ha.function);
666 let input_column_count = child_columns.len();
667
668 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
669 child_op,
670 list_col_idx,
671 entity_kind,
672 function,
673 ha.property.clone(),
674 Arc::clone(&self.store) as Arc<dyn GraphStore>,
675 input_column_count,
676 ));
677
678 let mut columns = child_columns;
679 columns.push(ha.alias.clone());
680 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
682
683 Ok((operator, columns))
684 }
685
686 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
688 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
689 let key_idx = child_columns
690 .iter()
691 .position(|c| c == &mc.key_var)
692 .ok_or_else(|| {
693 Error::Internal(format!(
694 "MapCollect key '{}' not in columns {:?}",
695 mc.key_var, child_columns
696 ))
697 })?;
698 let value_idx = child_columns
699 .iter()
700 .position(|c| c == &mc.value_var)
701 .ok_or_else(|| {
702 Error::Internal(format!(
703 "MapCollect value '{}' not in columns {:?}",
704 mc.value_var, child_columns
705 ))
706 })?;
707 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
708 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
709 Ok((operator, vec![mc.alias.clone()]))
710 }
711}
712
713#[cfg(feature = "algos")]
715struct StaticResultOperator {
716 rows: Vec<Vec<Value>>,
717 column_indices: Vec<usize>,
718 row_index: usize,
719}
720
721#[cfg(feature = "algos")]
722impl Operator for StaticResultOperator {
723 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
724 use grafeo_core::execution::DataChunk;
725
726 if self.row_index >= self.rows.len() {
727 return Ok(None);
728 }
729
730 let remaining = self.rows.len() - self.row_index;
731 let chunk_rows = remaining.min(1024);
732 let col_count = self.column_indices.len();
733
734 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
735 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
736
737 for row_offset in 0..chunk_rows {
738 let row = &self.rows[self.row_index + row_offset];
739 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
740 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
741 if let Some(col) = chunk.column_mut(col_idx) {
742 col.push_value(value);
743 }
744 }
745 }
746 chunk.set_count(chunk_rows);
747
748 self.row_index += chunk_rows;
749 Ok(Some(chunk))
750 }
751
752 fn reset(&mut self) {
753 self.row_index = 0;
754 }
755
756 fn name(&self) -> &'static str {
757 "StaticResult"
758 }
759}
760
761#[cfg(test)]
762mod tests {
763 use super::*;
764 use crate::query::plan::{
765 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
766 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
767 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
768 SkipOp as LogicalSkipOp, SortKey, SortOp,
769 };
770 use grafeo_common::types::Value;
771 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
772 use grafeo_core::graph::GraphStoreMut;
773 use grafeo_core::graph::lpg::LpgStore;
774
775 fn create_test_store() -> Arc<LpgStore> {
776 let store = Arc::new(LpgStore::new().unwrap());
777 store.create_node(&["Person"]);
778 store.create_node(&["Person"]);
779 store.create_node(&["Company"]);
780 store
781 }
782
783 #[test]
786 fn test_plan_simple_scan() {
787 let store = create_test_store();
788 let planner = Planner::new(store);
789
790 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
792 items: vec![ReturnItem {
793 expression: LogicalExpression::Variable("n".to_string()),
794 alias: None,
795 }],
796 distinct: false,
797 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
798 variable: "n".to_string(),
799 label: Some("Person".to_string()),
800 input: None,
801 })),
802 }));
803
804 let physical = planner.plan(&logical).unwrap();
805 assert_eq!(physical.columns(), &["n"]);
806 }
807
808 #[test]
809 fn test_plan_scan_without_label() {
810 let store = create_test_store();
811 let planner = Planner::new(store);
812
813 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
815 items: vec![ReturnItem {
816 expression: LogicalExpression::Variable("n".to_string()),
817 alias: None,
818 }],
819 distinct: false,
820 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
821 variable: "n".to_string(),
822 label: None,
823 input: None,
824 })),
825 }));
826
827 let physical = planner.plan(&logical).unwrap();
828 assert_eq!(physical.columns(), &["n"]);
829 }
830
831 #[test]
832 fn test_plan_return_with_alias() {
833 let store = create_test_store();
834 let planner = Planner::new(store);
835
836 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
838 items: vec![ReturnItem {
839 expression: LogicalExpression::Variable("n".to_string()),
840 alias: Some("person".to_string()),
841 }],
842 distinct: false,
843 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
844 variable: "n".to_string(),
845 label: Some("Person".to_string()),
846 input: None,
847 })),
848 }));
849
850 let physical = planner.plan(&logical).unwrap();
851 assert_eq!(physical.columns(), &["person"]);
852 }
853
854 #[test]
855 fn test_plan_return_property() {
856 let store = create_test_store();
857 let planner = Planner::new(store);
858
859 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
861 items: vec![ReturnItem {
862 expression: LogicalExpression::Property {
863 variable: "n".to_string(),
864 property: "name".to_string(),
865 },
866 alias: None,
867 }],
868 distinct: false,
869 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
870 variable: "n".to_string(),
871 label: Some("Person".to_string()),
872 input: None,
873 })),
874 }));
875
876 let physical = planner.plan(&logical).unwrap();
877 assert_eq!(physical.columns(), &["n.name"]);
878 }
879
880 #[test]
881 fn test_plan_return_literal() {
882 let store = create_test_store();
883 let planner = Planner::new(store);
884
885 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
887 items: vec![ReturnItem {
888 expression: LogicalExpression::Literal(Value::Int64(42)),
889 alias: Some("answer".to_string()),
890 }],
891 distinct: false,
892 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
893 variable: "n".to_string(),
894 label: None,
895 input: None,
896 })),
897 }));
898
899 let physical = planner.plan(&logical).unwrap();
900 assert_eq!(physical.columns(), &["answer"]);
901 }
902
903 #[test]
906 fn test_plan_filter_equality() {
907 let store = create_test_store();
908 let planner = Planner::new(store);
909
910 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
912 items: vec![ReturnItem {
913 expression: LogicalExpression::Variable("n".to_string()),
914 alias: None,
915 }],
916 distinct: false,
917 input: Box::new(LogicalOperator::Filter(FilterOp {
918 predicate: LogicalExpression::Binary {
919 left: Box::new(LogicalExpression::Property {
920 variable: "n".to_string(),
921 property: "age".to_string(),
922 }),
923 op: BinaryOp::Eq,
924 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
925 },
926 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
927 variable: "n".to_string(),
928 label: Some("Person".to_string()),
929 input: None,
930 })),
931 pushdown_hint: None,
932 })),
933 }));
934
935 let physical = planner.plan(&logical).unwrap();
936 assert_eq!(physical.columns(), &["n"]);
937 }
938
939 #[test]
940 fn test_plan_filter_compound_and() {
941 let store = create_test_store();
942 let planner = Planner::new(store);
943
944 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
946 items: vec![ReturnItem {
947 expression: LogicalExpression::Variable("n".to_string()),
948 alias: None,
949 }],
950 distinct: false,
951 input: Box::new(LogicalOperator::Filter(FilterOp {
952 predicate: LogicalExpression::Binary {
953 left: Box::new(LogicalExpression::Binary {
954 left: Box::new(LogicalExpression::Property {
955 variable: "n".to_string(),
956 property: "age".to_string(),
957 }),
958 op: BinaryOp::Gt,
959 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
960 }),
961 op: BinaryOp::And,
962 right: Box::new(LogicalExpression::Binary {
963 left: Box::new(LogicalExpression::Property {
964 variable: "n".to_string(),
965 property: "age".to_string(),
966 }),
967 op: BinaryOp::Lt,
968 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
969 }),
970 },
971 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
972 variable: "n".to_string(),
973 label: None,
974 input: None,
975 })),
976 pushdown_hint: None,
977 })),
978 }));
979
980 let physical = planner.plan(&logical).unwrap();
981 assert_eq!(physical.columns(), &["n"]);
982 }
983
984 #[test]
985 fn test_plan_filter_unary_not() {
986 let store = create_test_store();
987 let planner = Planner::new(store);
988
989 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
991 items: vec![ReturnItem {
992 expression: LogicalExpression::Variable("n".to_string()),
993 alias: None,
994 }],
995 distinct: false,
996 input: Box::new(LogicalOperator::Filter(FilterOp {
997 predicate: LogicalExpression::Unary {
998 op: UnaryOp::Not,
999 operand: Box::new(LogicalExpression::Property {
1000 variable: "n".to_string(),
1001 property: "active".to_string(),
1002 }),
1003 },
1004 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1005 variable: "n".to_string(),
1006 label: None,
1007 input: None,
1008 })),
1009 pushdown_hint: None,
1010 })),
1011 }));
1012
1013 let physical = planner.plan(&logical).unwrap();
1014 assert_eq!(physical.columns(), &["n"]);
1015 }
1016
1017 #[test]
1018 fn test_plan_filter_is_null() {
1019 let store = create_test_store();
1020 let planner = Planner::new(store);
1021
1022 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1024 items: vec![ReturnItem {
1025 expression: LogicalExpression::Variable("n".to_string()),
1026 alias: None,
1027 }],
1028 distinct: false,
1029 input: Box::new(LogicalOperator::Filter(FilterOp {
1030 predicate: LogicalExpression::Unary {
1031 op: UnaryOp::IsNull,
1032 operand: Box::new(LogicalExpression::Property {
1033 variable: "n".to_string(),
1034 property: "email".to_string(),
1035 }),
1036 },
1037 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1038 variable: "n".to_string(),
1039 label: None,
1040 input: None,
1041 })),
1042 pushdown_hint: None,
1043 })),
1044 }));
1045
1046 let physical = planner.plan(&logical).unwrap();
1047 assert_eq!(physical.columns(), &["n"]);
1048 }
1049
1050 #[test]
1051 fn test_plan_filter_function_call() {
1052 let store = create_test_store();
1053 let planner = Planner::new(store);
1054
1055 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1057 items: vec![ReturnItem {
1058 expression: LogicalExpression::Variable("n".to_string()),
1059 alias: None,
1060 }],
1061 distinct: false,
1062 input: Box::new(LogicalOperator::Filter(FilterOp {
1063 predicate: LogicalExpression::Binary {
1064 left: Box::new(LogicalExpression::FunctionCall {
1065 name: "size".to_string(),
1066 args: vec![LogicalExpression::Property {
1067 variable: "n".to_string(),
1068 property: "friends".to_string(),
1069 }],
1070 distinct: false,
1071 }),
1072 op: BinaryOp::Gt,
1073 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1074 },
1075 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1076 variable: "n".to_string(),
1077 label: None,
1078 input: None,
1079 })),
1080 pushdown_hint: None,
1081 })),
1082 }));
1083
1084 let physical = planner.plan(&logical).unwrap();
1085 assert_eq!(physical.columns(), &["n"]);
1086 }
1087
1088 #[test]
1091 fn test_plan_expand_outgoing() {
1092 let store = create_test_store();
1093 let planner = Planner::new(store);
1094
1095 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1097 items: vec![
1098 ReturnItem {
1099 expression: LogicalExpression::Variable("a".to_string()),
1100 alias: None,
1101 },
1102 ReturnItem {
1103 expression: LogicalExpression::Variable("b".to_string()),
1104 alias: None,
1105 },
1106 ],
1107 distinct: false,
1108 input: Box::new(LogicalOperator::Expand(ExpandOp {
1109 from_variable: "a".to_string(),
1110 to_variable: "b".to_string(),
1111 edge_variable: None,
1112 direction: ExpandDirection::Outgoing,
1113 edge_types: vec!["KNOWS".to_string()],
1114 min_hops: 1,
1115 max_hops: Some(1),
1116 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1117 variable: "a".to_string(),
1118 label: Some("Person".to_string()),
1119 input: None,
1120 })),
1121 path_alias: None,
1122 path_mode: PathMode::Walk,
1123 })),
1124 }));
1125
1126 let physical = planner.plan(&logical).unwrap();
1127 assert!(physical.columns().contains(&"a".to_string()));
1129 assert!(physical.columns().contains(&"b".to_string()));
1130 }
1131
1132 #[test]
1133 fn test_plan_expand_with_edge_variable() {
1134 let store = create_test_store();
1135 let planner = Planner::new(store);
1136
1137 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1139 items: vec![
1140 ReturnItem {
1141 expression: LogicalExpression::Variable("a".to_string()),
1142 alias: None,
1143 },
1144 ReturnItem {
1145 expression: LogicalExpression::Variable("r".to_string()),
1146 alias: None,
1147 },
1148 ReturnItem {
1149 expression: LogicalExpression::Variable("b".to_string()),
1150 alias: None,
1151 },
1152 ],
1153 distinct: false,
1154 input: Box::new(LogicalOperator::Expand(ExpandOp {
1155 from_variable: "a".to_string(),
1156 to_variable: "b".to_string(),
1157 edge_variable: Some("r".to_string()),
1158 direction: ExpandDirection::Outgoing,
1159 edge_types: vec!["KNOWS".to_string()],
1160 min_hops: 1,
1161 max_hops: Some(1),
1162 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1163 variable: "a".to_string(),
1164 label: None,
1165 input: None,
1166 })),
1167 path_alias: None,
1168 path_mode: PathMode::Walk,
1169 })),
1170 }));
1171
1172 let physical = planner.plan(&logical).unwrap();
1173 assert!(physical.columns().contains(&"a".to_string()));
1174 assert!(physical.columns().contains(&"r".to_string()));
1175 assert!(physical.columns().contains(&"b".to_string()));
1176 }
1177
1178 #[test]
1181 fn test_plan_limit() {
1182 let store = create_test_store();
1183 let planner = Planner::new(store);
1184
1185 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1187 items: vec![ReturnItem {
1188 expression: LogicalExpression::Variable("n".to_string()),
1189 alias: None,
1190 }],
1191 distinct: false,
1192 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1193 count: 10.into(),
1194 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1195 variable: "n".to_string(),
1196 label: None,
1197 input: None,
1198 })),
1199 })),
1200 }));
1201
1202 let physical = planner.plan(&logical).unwrap();
1203 assert_eq!(physical.columns(), &["n"]);
1204 }
1205
1206 #[test]
1207 fn test_plan_skip() {
1208 let store = create_test_store();
1209 let planner = Planner::new(store);
1210
1211 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1213 items: vec![ReturnItem {
1214 expression: LogicalExpression::Variable("n".to_string()),
1215 alias: None,
1216 }],
1217 distinct: false,
1218 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1219 count: 5.into(),
1220 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1221 variable: "n".to_string(),
1222 label: None,
1223 input: None,
1224 })),
1225 })),
1226 }));
1227
1228 let physical = planner.plan(&logical).unwrap();
1229 assert_eq!(physical.columns(), &["n"]);
1230 }
1231
1232 #[test]
1233 fn test_plan_sort() {
1234 let store = create_test_store();
1235 let planner = Planner::new(store);
1236
1237 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1239 items: vec![ReturnItem {
1240 expression: LogicalExpression::Variable("n".to_string()),
1241 alias: None,
1242 }],
1243 distinct: false,
1244 input: Box::new(LogicalOperator::Sort(SortOp {
1245 keys: vec![SortKey {
1246 expression: LogicalExpression::Variable("n".to_string()),
1247 order: SortOrder::Ascending,
1248 nulls: None,
1249 }],
1250 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1251 variable: "n".to_string(),
1252 label: None,
1253 input: None,
1254 })),
1255 })),
1256 }));
1257
1258 let physical = planner.plan(&logical).unwrap();
1259 assert_eq!(physical.columns(), &["n"]);
1260 }
1261
1262 #[test]
1263 fn test_plan_sort_descending() {
1264 let store = create_test_store();
1265 let planner = Planner::new(store);
1266
1267 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1269 items: vec![ReturnItem {
1270 expression: LogicalExpression::Variable("n".to_string()),
1271 alias: None,
1272 }],
1273 distinct: false,
1274 input: Box::new(LogicalOperator::Sort(SortOp {
1275 keys: vec![SortKey {
1276 expression: LogicalExpression::Variable("n".to_string()),
1277 order: SortOrder::Descending,
1278 nulls: None,
1279 }],
1280 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1281 variable: "n".to_string(),
1282 label: None,
1283 input: None,
1284 })),
1285 })),
1286 }));
1287
1288 let physical = planner.plan(&logical).unwrap();
1289 assert_eq!(physical.columns(), &["n"]);
1290 }
1291
1292 #[test]
1293 fn test_plan_distinct() {
1294 let store = create_test_store();
1295 let planner = Planner::new(store);
1296
1297 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1299 items: vec![ReturnItem {
1300 expression: LogicalExpression::Variable("n".to_string()),
1301 alias: None,
1302 }],
1303 distinct: false,
1304 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1305 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1306 variable: "n".to_string(),
1307 label: None,
1308 input: None,
1309 })),
1310 columns: None,
1311 })),
1312 }));
1313
1314 let physical = planner.plan(&logical).unwrap();
1315 assert_eq!(physical.columns(), &["n"]);
1316 }
1317
1318 #[test]
1319 fn test_plan_distinct_with_columns() {
1320 let store = create_test_store();
1321 let planner = Planner::new(store);
1322
1323 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1325 items: vec![ReturnItem {
1326 expression: LogicalExpression::Variable("n".to_string()),
1327 alias: None,
1328 }],
1329 distinct: false,
1330 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1331 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1332 variable: "n".to_string(),
1333 label: None,
1334 input: None,
1335 })),
1336 columns: Some(vec!["n".to_string()]),
1337 })),
1338 }));
1339
1340 let physical = planner.plan(&logical).unwrap();
1341 assert_eq!(physical.columns(), &["n"]);
1342 }
1343
1344 #[test]
1345 fn test_plan_distinct_with_nonexistent_columns() {
1346 let store = create_test_store();
1347 let planner = Planner::new(store);
1348
1349 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1352 items: vec![ReturnItem {
1353 expression: LogicalExpression::Variable("n".to_string()),
1354 alias: None,
1355 }],
1356 distinct: false,
1357 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1358 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1359 variable: "n".to_string(),
1360 label: None,
1361 input: None,
1362 })),
1363 columns: Some(vec!["nonexistent".to_string()]),
1364 })),
1365 }));
1366
1367 let physical = planner.plan(&logical).unwrap();
1368 assert_eq!(physical.columns(), &["n"]);
1369 }
1370
1371 #[test]
1374 fn test_plan_aggregate_count() {
1375 let store = create_test_store();
1376 let planner = Planner::new(store);
1377
1378 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1380 items: vec![ReturnItem {
1381 expression: LogicalExpression::Variable("cnt".to_string()),
1382 alias: None,
1383 }],
1384 distinct: false,
1385 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1386 group_by: vec![],
1387 aggregates: vec![LogicalAggregateExpr {
1388 function: LogicalAggregateFunction::Count,
1389 expression: Some(LogicalExpression::Variable("n".to_string())),
1390 expression2: None,
1391 distinct: false,
1392 alias: Some("cnt".to_string()),
1393 percentile: None,
1394 separator: None,
1395 }],
1396 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1397 variable: "n".to_string(),
1398 label: None,
1399 input: None,
1400 })),
1401 having: None,
1402 })),
1403 }));
1404
1405 let physical = planner.plan(&logical).unwrap();
1406 assert!(physical.columns().contains(&"cnt".to_string()));
1407 }
1408
1409 #[test]
1410 fn test_plan_aggregate_with_group_by() {
1411 let store = create_test_store();
1412 let planner = Planner::new(store);
1413
1414 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1416 group_by: vec![LogicalExpression::Property {
1417 variable: "n".to_string(),
1418 property: "city".to_string(),
1419 }],
1420 aggregates: vec![LogicalAggregateExpr {
1421 function: LogicalAggregateFunction::Count,
1422 expression: Some(LogicalExpression::Variable("n".to_string())),
1423 expression2: None,
1424 distinct: false,
1425 alias: Some("cnt".to_string()),
1426 percentile: None,
1427 separator: None,
1428 }],
1429 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1430 variable: "n".to_string(),
1431 label: Some("Person".to_string()),
1432 input: None,
1433 })),
1434 having: None,
1435 }));
1436
1437 let physical = planner.plan(&logical).unwrap();
1438 assert_eq!(physical.columns().len(), 2);
1439 }
1440
1441 #[test]
1442 fn test_plan_aggregate_sum() {
1443 let store = create_test_store();
1444 let planner = Planner::new(store);
1445
1446 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1448 group_by: vec![],
1449 aggregates: vec![LogicalAggregateExpr {
1450 function: LogicalAggregateFunction::Sum,
1451 expression: Some(LogicalExpression::Property {
1452 variable: "n".to_string(),
1453 property: "value".to_string(),
1454 }),
1455 expression2: None,
1456 distinct: false,
1457 alias: Some("total".to_string()),
1458 percentile: None,
1459 separator: None,
1460 }],
1461 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1462 variable: "n".to_string(),
1463 label: None,
1464 input: None,
1465 })),
1466 having: None,
1467 }));
1468
1469 let physical = planner.plan(&logical).unwrap();
1470 assert!(physical.columns().contains(&"total".to_string()));
1471 }
1472
1473 #[test]
1474 fn test_plan_aggregate_avg() {
1475 let store = create_test_store();
1476 let planner = Planner::new(store);
1477
1478 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1480 group_by: vec![],
1481 aggregates: vec![LogicalAggregateExpr {
1482 function: LogicalAggregateFunction::Avg,
1483 expression: Some(LogicalExpression::Property {
1484 variable: "n".to_string(),
1485 property: "score".to_string(),
1486 }),
1487 expression2: None,
1488 distinct: false,
1489 alias: Some("average".to_string()),
1490 percentile: None,
1491 separator: None,
1492 }],
1493 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1494 variable: "n".to_string(),
1495 label: None,
1496 input: None,
1497 })),
1498 having: None,
1499 }));
1500
1501 let physical = planner.plan(&logical).unwrap();
1502 assert!(physical.columns().contains(&"average".to_string()));
1503 }
1504
1505 #[test]
1506 fn test_plan_aggregate_min_max() {
1507 let store = create_test_store();
1508 let planner = Planner::new(store);
1509
1510 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1512 group_by: vec![],
1513 aggregates: vec![
1514 LogicalAggregateExpr {
1515 function: LogicalAggregateFunction::Min,
1516 expression: Some(LogicalExpression::Property {
1517 variable: "n".to_string(),
1518 property: "age".to_string(),
1519 }),
1520 expression2: None,
1521 distinct: false,
1522 alias: Some("youngest".to_string()),
1523 percentile: None,
1524 separator: None,
1525 },
1526 LogicalAggregateExpr {
1527 function: LogicalAggregateFunction::Max,
1528 expression: Some(LogicalExpression::Property {
1529 variable: "n".to_string(),
1530 property: "age".to_string(),
1531 }),
1532 expression2: None,
1533 distinct: false,
1534 alias: Some("oldest".to_string()),
1535 percentile: None,
1536 separator: None,
1537 },
1538 ],
1539 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1540 variable: "n".to_string(),
1541 label: None,
1542 input: None,
1543 })),
1544 having: None,
1545 }));
1546
1547 let physical = planner.plan(&logical).unwrap();
1548 assert!(physical.columns().contains(&"youngest".to_string()));
1549 assert!(physical.columns().contains(&"oldest".to_string()));
1550 }
1551
1552 #[test]
1555 fn test_plan_inner_join() {
1556 let store = create_test_store();
1557 let planner = Planner::new(store);
1558
1559 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1561 items: vec![
1562 ReturnItem {
1563 expression: LogicalExpression::Variable("a".to_string()),
1564 alias: None,
1565 },
1566 ReturnItem {
1567 expression: LogicalExpression::Variable("b".to_string()),
1568 alias: None,
1569 },
1570 ],
1571 distinct: false,
1572 input: Box::new(LogicalOperator::Join(JoinOp {
1573 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1574 variable: "a".to_string(),
1575 label: Some("Person".to_string()),
1576 input: None,
1577 })),
1578 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1579 variable: "b".to_string(),
1580 label: Some("Company".to_string()),
1581 input: None,
1582 })),
1583 join_type: JoinType::Inner,
1584 conditions: vec![JoinCondition {
1585 left: LogicalExpression::Variable("a".to_string()),
1586 right: LogicalExpression::Variable("b".to_string()),
1587 }],
1588 })),
1589 }));
1590
1591 let physical = planner.plan(&logical).unwrap();
1592 assert!(physical.columns().contains(&"a".to_string()));
1593 assert!(physical.columns().contains(&"b".to_string()));
1594 }
1595
1596 #[test]
1597 fn test_plan_cross_join() {
1598 let store = create_test_store();
1599 let planner = Planner::new(store);
1600
1601 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1603 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1604 variable: "a".to_string(),
1605 label: None,
1606 input: None,
1607 })),
1608 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1609 variable: "b".to_string(),
1610 label: None,
1611 input: None,
1612 })),
1613 join_type: JoinType::Cross,
1614 conditions: vec![],
1615 }));
1616
1617 let physical = planner.plan(&logical).unwrap();
1618 assert_eq!(physical.columns().len(), 2);
1619 }
1620
1621 #[test]
1622 fn test_plan_left_join() {
1623 let store = create_test_store();
1624 let planner = Planner::new(store);
1625
1626 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1627 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1628 variable: "a".to_string(),
1629 label: None,
1630 input: None,
1631 })),
1632 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1633 variable: "b".to_string(),
1634 label: None,
1635 input: None,
1636 })),
1637 join_type: JoinType::Left,
1638 conditions: vec![],
1639 }));
1640
1641 let physical = planner.plan(&logical).unwrap();
1642 assert_eq!(physical.columns().len(), 2);
1643 }
1644
1645 fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1648 let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStore>);
1649 p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1650 p
1651 }
1652
1653 #[test]
1654 fn test_plan_create_node() {
1655 let store = create_test_store();
1656 let planner = create_writable_planner(&store);
1657
1658 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1660 variable: "n".to_string(),
1661 labels: vec!["Person".to_string()],
1662 properties: vec![(
1663 "name".to_string(),
1664 LogicalExpression::Literal(Value::String("Alix".into())),
1665 )],
1666 input: None,
1667 }));
1668
1669 let physical = planner.plan(&logical).unwrap();
1670 assert!(physical.columns().contains(&"n".to_string()));
1671 }
1672
1673 #[test]
1674 fn test_plan_create_edge() {
1675 let store = create_test_store();
1676 let planner = create_writable_planner(&store);
1677
1678 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1680 variable: Some("r".to_string()),
1681 from_variable: "a".to_string(),
1682 to_variable: "b".to_string(),
1683 edge_type: "KNOWS".to_string(),
1684 properties: vec![],
1685 input: Box::new(LogicalOperator::Join(JoinOp {
1686 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1687 variable: "a".to_string(),
1688 label: None,
1689 input: None,
1690 })),
1691 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1692 variable: "b".to_string(),
1693 label: None,
1694 input: None,
1695 })),
1696 join_type: JoinType::Cross,
1697 conditions: vec![],
1698 })),
1699 }));
1700
1701 let physical = planner.plan(&logical).unwrap();
1702 assert!(physical.columns().contains(&"r".to_string()));
1703 }
1704
1705 #[test]
1706 fn test_plan_delete_node() {
1707 let store = create_test_store();
1708 let planner = create_writable_planner(&store);
1709
1710 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1712 variable: "n".to_string(),
1713 detach: false,
1714 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1715 variable: "n".to_string(),
1716 label: None,
1717 input: None,
1718 })),
1719 }));
1720
1721 let physical = planner.plan(&logical).unwrap();
1722 assert!(physical.columns().contains(&"n".to_string()));
1723 }
1724
1725 #[test]
1728 fn test_plan_empty_errors() {
1729 let store = create_test_store();
1730 let planner = Planner::new(store);
1731
1732 let logical = LogicalPlan::new(LogicalOperator::Empty);
1733 let result = planner.plan(&logical);
1734 assert!(result.is_err());
1735 }
1736
1737 #[test]
1738 fn test_plan_missing_variable_in_return() {
1739 let store = create_test_store();
1740 let planner = Planner::new(store);
1741
1742 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1744 items: vec![ReturnItem {
1745 expression: LogicalExpression::Variable("missing".to_string()),
1746 alias: None,
1747 }],
1748 distinct: false,
1749 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1750 variable: "n".to_string(),
1751 label: None,
1752 input: None,
1753 })),
1754 }));
1755
1756 let result = planner.plan(&logical);
1757 assert!(result.is_err());
1758 }
1759
1760 #[test]
1763 fn test_convert_binary_ops() {
1764 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1765 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1766 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1767 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1768 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1769 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1770 assert!(convert_binary_op(BinaryOp::And).is_ok());
1771 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1772 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1773 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1774 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1775 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1776 }
1777
1778 #[test]
1779 fn test_convert_unary_ops() {
1780 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1781 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1782 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1783 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1784 }
1785
1786 #[test]
1787 fn test_convert_aggregate_functions() {
1788 assert!(matches!(
1789 convert_aggregate_function(LogicalAggregateFunction::Count),
1790 PhysicalAggregateFunction::Count
1791 ));
1792 assert!(matches!(
1793 convert_aggregate_function(LogicalAggregateFunction::Sum),
1794 PhysicalAggregateFunction::Sum
1795 ));
1796 assert!(matches!(
1797 convert_aggregate_function(LogicalAggregateFunction::Avg),
1798 PhysicalAggregateFunction::Avg
1799 ));
1800 assert!(matches!(
1801 convert_aggregate_function(LogicalAggregateFunction::Min),
1802 PhysicalAggregateFunction::Min
1803 ));
1804 assert!(matches!(
1805 convert_aggregate_function(LogicalAggregateFunction::Max),
1806 PhysicalAggregateFunction::Max
1807 ));
1808 }
1809
1810 #[test]
1811 fn test_planner_accessors() {
1812 let store = create_test_store();
1813 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1814
1815 assert!(planner.transaction_id().is_none());
1816 assert!(planner.transaction_manager().is_none());
1817 let _ = planner.viewing_epoch(); }
1819
1820 #[test]
1821 fn test_physical_plan_accessors() {
1822 let store = create_test_store();
1823 let planner = Planner::new(store);
1824
1825 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1826 variable: "n".to_string(),
1827 label: None,
1828 input: None,
1829 }));
1830
1831 let physical = planner.plan(&logical).unwrap();
1832 assert_eq!(physical.columns(), &["n"]);
1833
1834 let _ = physical.into_operator();
1836 }
1837
1838 #[test]
1841 fn test_plan_adaptive_with_scan() {
1842 let store = create_test_store();
1843 let planner = Planner::new(store);
1844
1845 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1847 items: vec![ReturnItem {
1848 expression: LogicalExpression::Variable("n".to_string()),
1849 alias: None,
1850 }],
1851 distinct: false,
1852 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1853 variable: "n".to_string(),
1854 label: Some("Person".to_string()),
1855 input: None,
1856 })),
1857 }));
1858
1859 let physical = planner.plan_adaptive(&logical).unwrap();
1860 assert_eq!(physical.columns(), &["n"]);
1861 assert!(physical.adaptive_context.is_some());
1863 }
1864
1865 #[test]
1866 fn test_plan_adaptive_with_filter() {
1867 let store = create_test_store();
1868 let planner = Planner::new(store);
1869
1870 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1872 items: vec![ReturnItem {
1873 expression: LogicalExpression::Variable("n".to_string()),
1874 alias: None,
1875 }],
1876 distinct: false,
1877 input: Box::new(LogicalOperator::Filter(FilterOp {
1878 predicate: LogicalExpression::Binary {
1879 left: Box::new(LogicalExpression::Property {
1880 variable: "n".to_string(),
1881 property: "age".to_string(),
1882 }),
1883 op: BinaryOp::Gt,
1884 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1885 },
1886 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1887 variable: "n".to_string(),
1888 label: None,
1889 input: None,
1890 })),
1891 pushdown_hint: None,
1892 })),
1893 }));
1894
1895 let physical = planner.plan_adaptive(&logical).unwrap();
1896 assert!(physical.adaptive_context.is_some());
1897 }
1898
1899 #[test]
1900 fn test_plan_adaptive_with_expand() {
1901 let store = create_test_store();
1902 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
1903 .with_factorized_execution(false);
1904
1905 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1907 items: vec![
1908 ReturnItem {
1909 expression: LogicalExpression::Variable("a".to_string()),
1910 alias: None,
1911 },
1912 ReturnItem {
1913 expression: LogicalExpression::Variable("b".to_string()),
1914 alias: None,
1915 },
1916 ],
1917 distinct: false,
1918 input: Box::new(LogicalOperator::Expand(ExpandOp {
1919 from_variable: "a".to_string(),
1920 to_variable: "b".to_string(),
1921 edge_variable: None,
1922 direction: ExpandDirection::Outgoing,
1923 edge_types: vec!["KNOWS".to_string()],
1924 min_hops: 1,
1925 max_hops: Some(1),
1926 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1927 variable: "a".to_string(),
1928 label: None,
1929 input: None,
1930 })),
1931 path_alias: None,
1932 path_mode: PathMode::Walk,
1933 })),
1934 }));
1935
1936 let physical = planner.plan_adaptive(&logical).unwrap();
1937 assert!(physical.adaptive_context.is_some());
1938 }
1939
1940 #[test]
1941 fn test_plan_adaptive_with_join() {
1942 let store = create_test_store();
1943 let planner = Planner::new(store);
1944
1945 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1946 items: vec![
1947 ReturnItem {
1948 expression: LogicalExpression::Variable("a".to_string()),
1949 alias: None,
1950 },
1951 ReturnItem {
1952 expression: LogicalExpression::Variable("b".to_string()),
1953 alias: None,
1954 },
1955 ],
1956 distinct: false,
1957 input: Box::new(LogicalOperator::Join(JoinOp {
1958 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1959 variable: "a".to_string(),
1960 label: None,
1961 input: None,
1962 })),
1963 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1964 variable: "b".to_string(),
1965 label: None,
1966 input: None,
1967 })),
1968 join_type: JoinType::Cross,
1969 conditions: vec![],
1970 })),
1971 }));
1972
1973 let physical = planner.plan_adaptive(&logical).unwrap();
1974 assert!(physical.adaptive_context.is_some());
1975 }
1976
1977 #[test]
1978 fn test_plan_adaptive_with_aggregate() {
1979 let store = create_test_store();
1980 let planner = Planner::new(store);
1981
1982 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1983 group_by: vec![],
1984 aggregates: vec![LogicalAggregateExpr {
1985 function: LogicalAggregateFunction::Count,
1986 expression: Some(LogicalExpression::Variable("n".to_string())),
1987 expression2: None,
1988 distinct: false,
1989 alias: Some("cnt".to_string()),
1990 percentile: None,
1991 separator: None,
1992 }],
1993 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1994 variable: "n".to_string(),
1995 label: None,
1996 input: None,
1997 })),
1998 having: None,
1999 }));
2000
2001 let physical = planner.plan_adaptive(&logical).unwrap();
2002 assert!(physical.adaptive_context.is_some());
2003 }
2004
2005 #[test]
2006 fn test_plan_adaptive_with_distinct() {
2007 let store = create_test_store();
2008 let planner = Planner::new(store);
2009
2010 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2011 items: vec![ReturnItem {
2012 expression: LogicalExpression::Variable("n".to_string()),
2013 alias: None,
2014 }],
2015 distinct: false,
2016 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2017 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2018 variable: "n".to_string(),
2019 label: None,
2020 input: None,
2021 })),
2022 columns: None,
2023 })),
2024 }));
2025
2026 let physical = planner.plan_adaptive(&logical).unwrap();
2027 assert!(physical.adaptive_context.is_some());
2028 }
2029
2030 #[test]
2031 fn test_plan_adaptive_with_limit() {
2032 let store = create_test_store();
2033 let planner = Planner::new(store);
2034
2035 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2036 items: vec![ReturnItem {
2037 expression: LogicalExpression::Variable("n".to_string()),
2038 alias: None,
2039 }],
2040 distinct: false,
2041 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2042 count: 10.into(),
2043 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2044 variable: "n".to_string(),
2045 label: None,
2046 input: None,
2047 })),
2048 })),
2049 }));
2050
2051 let physical = planner.plan_adaptive(&logical).unwrap();
2052 assert!(physical.adaptive_context.is_some());
2053 }
2054
2055 #[test]
2056 fn test_plan_adaptive_with_skip() {
2057 let store = create_test_store();
2058 let planner = Planner::new(store);
2059
2060 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2061 items: vec![ReturnItem {
2062 expression: LogicalExpression::Variable("n".to_string()),
2063 alias: None,
2064 }],
2065 distinct: false,
2066 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2067 count: 5.into(),
2068 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2069 variable: "n".to_string(),
2070 label: None,
2071 input: None,
2072 })),
2073 })),
2074 }));
2075
2076 let physical = planner.plan_adaptive(&logical).unwrap();
2077 assert!(physical.adaptive_context.is_some());
2078 }
2079
2080 #[test]
2081 fn test_plan_adaptive_with_sort() {
2082 let store = create_test_store();
2083 let planner = Planner::new(store);
2084
2085 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2086 items: vec![ReturnItem {
2087 expression: LogicalExpression::Variable("n".to_string()),
2088 alias: None,
2089 }],
2090 distinct: false,
2091 input: Box::new(LogicalOperator::Sort(SortOp {
2092 keys: vec![SortKey {
2093 expression: LogicalExpression::Variable("n".to_string()),
2094 order: SortOrder::Ascending,
2095 nulls: None,
2096 }],
2097 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2098 variable: "n".to_string(),
2099 label: None,
2100 input: None,
2101 })),
2102 })),
2103 }));
2104
2105 let physical = planner.plan_adaptive(&logical).unwrap();
2106 assert!(physical.adaptive_context.is_some());
2107 }
2108
2109 #[test]
2110 fn test_plan_adaptive_with_union() {
2111 let store = create_test_store();
2112 let planner = Planner::new(store);
2113
2114 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2115 items: vec![ReturnItem {
2116 expression: LogicalExpression::Variable("n".to_string()),
2117 alias: None,
2118 }],
2119 distinct: false,
2120 input: Box::new(LogicalOperator::Union(UnionOp {
2121 inputs: vec![
2122 LogicalOperator::NodeScan(NodeScanOp {
2123 variable: "n".to_string(),
2124 label: Some("Person".to_string()),
2125 input: None,
2126 }),
2127 LogicalOperator::NodeScan(NodeScanOp {
2128 variable: "n".to_string(),
2129 label: Some("Company".to_string()),
2130 input: None,
2131 }),
2132 ],
2133 })),
2134 }));
2135
2136 let physical = planner.plan_adaptive(&logical).unwrap();
2137 assert!(physical.adaptive_context.is_some());
2138 }
2139
2140 #[test]
2143 fn test_plan_expand_variable_length() {
2144 let store = create_test_store();
2145 let planner = Planner::new(store);
2146
2147 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2149 items: vec![
2150 ReturnItem {
2151 expression: LogicalExpression::Variable("a".to_string()),
2152 alias: None,
2153 },
2154 ReturnItem {
2155 expression: LogicalExpression::Variable("b".to_string()),
2156 alias: None,
2157 },
2158 ],
2159 distinct: false,
2160 input: Box::new(LogicalOperator::Expand(ExpandOp {
2161 from_variable: "a".to_string(),
2162 to_variable: "b".to_string(),
2163 edge_variable: None,
2164 direction: ExpandDirection::Outgoing,
2165 edge_types: vec!["KNOWS".to_string()],
2166 min_hops: 1,
2167 max_hops: Some(3),
2168 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2169 variable: "a".to_string(),
2170 label: None,
2171 input: None,
2172 })),
2173 path_alias: None,
2174 path_mode: PathMode::Walk,
2175 })),
2176 }));
2177
2178 let physical = planner.plan(&logical).unwrap();
2179 assert!(physical.columns().contains(&"a".to_string()));
2180 assert!(physical.columns().contains(&"b".to_string()));
2181 }
2182
2183 #[test]
2184 fn test_plan_expand_with_path_alias() {
2185 let store = create_test_store();
2186 let planner = Planner::new(store);
2187
2188 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2190 items: vec![
2191 ReturnItem {
2192 expression: LogicalExpression::Variable("a".to_string()),
2193 alias: None,
2194 },
2195 ReturnItem {
2196 expression: LogicalExpression::Variable("b".to_string()),
2197 alias: None,
2198 },
2199 ],
2200 distinct: false,
2201 input: Box::new(LogicalOperator::Expand(ExpandOp {
2202 from_variable: "a".to_string(),
2203 to_variable: "b".to_string(),
2204 edge_variable: None,
2205 direction: ExpandDirection::Outgoing,
2206 edge_types: vec!["KNOWS".to_string()],
2207 min_hops: 1,
2208 max_hops: Some(3),
2209 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2210 variable: "a".to_string(),
2211 label: None,
2212 input: None,
2213 })),
2214 path_alias: Some("p".to_string()),
2215 path_mode: PathMode::Walk,
2216 })),
2217 }));
2218
2219 let physical = planner.plan(&logical).unwrap();
2220 assert!(physical.columns().contains(&"a".to_string()));
2222 assert!(physical.columns().contains(&"b".to_string()));
2223 }
2224
2225 #[test]
2226 fn test_plan_expand_incoming() {
2227 let store = create_test_store();
2228 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2229 .with_factorized_execution(false);
2230
2231 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2233 items: vec![
2234 ReturnItem {
2235 expression: LogicalExpression::Variable("a".to_string()),
2236 alias: None,
2237 },
2238 ReturnItem {
2239 expression: LogicalExpression::Variable("b".to_string()),
2240 alias: None,
2241 },
2242 ],
2243 distinct: false,
2244 input: Box::new(LogicalOperator::Expand(ExpandOp {
2245 from_variable: "a".to_string(),
2246 to_variable: "b".to_string(),
2247 edge_variable: None,
2248 direction: ExpandDirection::Incoming,
2249 edge_types: vec!["KNOWS".to_string()],
2250 min_hops: 1,
2251 max_hops: Some(1),
2252 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2253 variable: "a".to_string(),
2254 label: None,
2255 input: None,
2256 })),
2257 path_alias: None,
2258 path_mode: PathMode::Walk,
2259 })),
2260 }));
2261
2262 let physical = planner.plan(&logical).unwrap();
2263 assert!(physical.columns().contains(&"a".to_string()));
2264 assert!(physical.columns().contains(&"b".to_string()));
2265 }
2266
2267 #[test]
2268 fn test_plan_expand_both_directions() {
2269 let store = create_test_store();
2270 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2271 .with_factorized_execution(false);
2272
2273 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2275 items: vec![
2276 ReturnItem {
2277 expression: LogicalExpression::Variable("a".to_string()),
2278 alias: None,
2279 },
2280 ReturnItem {
2281 expression: LogicalExpression::Variable("b".to_string()),
2282 alias: None,
2283 },
2284 ],
2285 distinct: false,
2286 input: Box::new(LogicalOperator::Expand(ExpandOp {
2287 from_variable: "a".to_string(),
2288 to_variable: "b".to_string(),
2289 edge_variable: None,
2290 direction: ExpandDirection::Both,
2291 edge_types: vec!["KNOWS".to_string()],
2292 min_hops: 1,
2293 max_hops: Some(1),
2294 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2295 variable: "a".to_string(),
2296 label: None,
2297 input: None,
2298 })),
2299 path_alias: None,
2300 path_mode: PathMode::Walk,
2301 })),
2302 }));
2303
2304 let physical = planner.plan(&logical).unwrap();
2305 assert!(physical.columns().contains(&"a".to_string()));
2306 assert!(physical.columns().contains(&"b".to_string()));
2307 }
2308
2309 #[test]
2312 fn test_planner_with_context() {
2313 use crate::transaction::TransactionManager;
2314
2315 let store = create_test_store();
2316 let transaction_manager = Arc::new(TransactionManager::new());
2317 let transaction_id = transaction_manager.begin();
2318 let epoch = transaction_manager.current_epoch();
2319
2320 let planner = Planner::with_context(
2321 Arc::clone(&store) as Arc<dyn GraphStore>,
2322 Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2323 Arc::clone(&transaction_manager),
2324 Some(transaction_id),
2325 epoch,
2326 );
2327
2328 assert_eq!(planner.transaction_id(), Some(transaction_id));
2329 assert!(planner.transaction_manager().is_some());
2330 assert_eq!(planner.viewing_epoch(), epoch);
2331 }
2332
2333 #[test]
2334 fn test_planner_with_factorized_execution_disabled() {
2335 let store = create_test_store();
2336 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2337 .with_factorized_execution(false);
2338
2339 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2341 items: vec![
2342 ReturnItem {
2343 expression: LogicalExpression::Variable("a".to_string()),
2344 alias: None,
2345 },
2346 ReturnItem {
2347 expression: LogicalExpression::Variable("c".to_string()),
2348 alias: None,
2349 },
2350 ],
2351 distinct: false,
2352 input: Box::new(LogicalOperator::Expand(ExpandOp {
2353 from_variable: "b".to_string(),
2354 to_variable: "c".to_string(),
2355 edge_variable: None,
2356 direction: ExpandDirection::Outgoing,
2357 edge_types: vec![],
2358 min_hops: 1,
2359 max_hops: Some(1),
2360 input: Box::new(LogicalOperator::Expand(ExpandOp {
2361 from_variable: "a".to_string(),
2362 to_variable: "b".to_string(),
2363 edge_variable: None,
2364 direction: ExpandDirection::Outgoing,
2365 edge_types: vec![],
2366 min_hops: 1,
2367 max_hops: Some(1),
2368 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2369 variable: "a".to_string(),
2370 label: None,
2371 input: None,
2372 })),
2373 path_alias: None,
2374 path_mode: PathMode::Walk,
2375 })),
2376 path_alias: None,
2377 path_mode: PathMode::Walk,
2378 })),
2379 }));
2380
2381 let physical = planner.plan(&logical).unwrap();
2382 assert!(physical.columns().contains(&"a".to_string()));
2383 assert!(physical.columns().contains(&"c".to_string()));
2384 }
2385
2386 #[test]
2389 fn test_plan_sort_by_property() {
2390 let store = create_test_store();
2391 let planner = Planner::new(store);
2392
2393 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2395 items: vec![ReturnItem {
2396 expression: LogicalExpression::Variable("n".to_string()),
2397 alias: None,
2398 }],
2399 distinct: false,
2400 input: Box::new(LogicalOperator::Sort(SortOp {
2401 keys: vec![SortKey {
2402 expression: LogicalExpression::Property {
2403 variable: "n".to_string(),
2404 property: "name".to_string(),
2405 },
2406 order: SortOrder::Ascending,
2407 nulls: None,
2408 }],
2409 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2410 variable: "n".to_string(),
2411 label: None,
2412 input: None,
2413 })),
2414 })),
2415 }));
2416
2417 let physical = planner.plan(&logical).unwrap();
2418 assert!(physical.columns().contains(&"n".to_string()));
2420 }
2421
2422 #[test]
2425 fn test_plan_scan_with_input() {
2426 let store = create_test_store();
2427 let planner = Planner::new(store);
2428
2429 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2431 items: vec![
2432 ReturnItem {
2433 expression: LogicalExpression::Variable("a".to_string()),
2434 alias: None,
2435 },
2436 ReturnItem {
2437 expression: LogicalExpression::Variable("b".to_string()),
2438 alias: None,
2439 },
2440 ],
2441 distinct: false,
2442 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2443 variable: "b".to_string(),
2444 label: Some("Company".to_string()),
2445 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2446 variable: "a".to_string(),
2447 label: Some("Person".to_string()),
2448 input: None,
2449 }))),
2450 })),
2451 }));
2452
2453 let physical = planner.plan(&logical).unwrap();
2454 assert!(physical.columns().contains(&"a".to_string()));
2455 assert!(physical.columns().contains(&"b".to_string()));
2456 }
2457}