1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TransactionId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33 EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34 FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35 HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
38 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
39 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
40 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
41 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnwindOperator,
42 VariableLengthExpandOperator,
43};
44use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
45use std::collections::HashMap;
46use std::sync::Arc;
47
48use crate::query::planner::common;
49use crate::query::planner::common::expression_to_string;
50use crate::query::planner::{
51 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
52 convert_unary_op, value_to_logical_type,
53};
54use crate::transaction::TransactionManager;
55
56struct RangeBounds<'a> {
58 min: Option<&'a Value>,
59 max: Option<&'a Value>,
60 min_inclusive: bool,
61 max_inclusive: bool,
62}
63
64pub struct Planner {
66 pub(super) store: Arc<dyn GraphStoreMut>,
68 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
70 pub(super) transaction_id: Option<TransactionId>,
72 pub(super) viewing_epoch: EpochId,
74 pub(super) anon_edge_counter: std::cell::Cell<u32>,
76 pub(super) factorized_execution: bool,
78 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
81 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
86 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
88 pub(super) correlated_param_state:
92 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
93 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
96 profiling: std::cell::Cell<bool>,
98 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
100 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
102}
103
104impl Planner {
105 #[must_use]
110 pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
111 let epoch = store.current_epoch();
112 Self {
113 store,
114 transaction_manager: None,
115 transaction_id: None,
116 viewing_epoch: epoch,
117 anon_edge_counter: std::cell::Cell::new(0),
118 factorized_execution: true,
119 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
120 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
121 validator: None,
122 catalog: None,
123 correlated_param_state: std::cell::RefCell::new(None),
124 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
125 profiling: std::cell::Cell::new(false),
126 profile_entries: std::cell::RefCell::new(Vec::new()),
127 write_tracker: None,
128 }
129 }
130
131 #[must_use]
133 pub fn with_context(
134 store: Arc<dyn GraphStoreMut>,
135 transaction_manager: Arc<TransactionManager>,
136 transaction_id: Option<TransactionId>,
137 viewing_epoch: EpochId,
138 ) -> Self {
139 use crate::transaction::TransactionWriteTracker;
140
141 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
143 if transaction_id.is_some() {
144 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
145 &transaction_manager,
146 ))))
147 } else {
148 None
149 };
150
151 Self {
152 store,
153 transaction_manager: Some(transaction_manager),
154 transaction_id,
155 viewing_epoch,
156 anon_edge_counter: std::cell::Cell::new(0),
157 factorized_execution: true,
158 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
159 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
160 validator: None,
161 catalog: None,
162 correlated_param_state: std::cell::RefCell::new(None),
163 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
164 profiling: std::cell::Cell::new(false),
165 profile_entries: std::cell::RefCell::new(Vec::new()),
166 write_tracker,
167 }
168 }
169
170 #[must_use]
172 pub fn viewing_epoch(&self) -> EpochId {
173 self.viewing_epoch
174 }
175
176 #[must_use]
178 pub fn transaction_id(&self) -> Option<TransactionId> {
179 self.transaction_id
180 }
181
182 #[must_use]
184 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
185 self.transaction_manager.as_ref()
186 }
187
188 #[must_use]
190 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
191 self.factorized_execution = enabled;
192 self
193 }
194
195 #[must_use]
197 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
198 self.validator = Some(validator);
199 self
200 }
201
202 #[must_use]
204 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
205 self.catalog = Some(catalog);
206 self
207 }
208
209 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
213 match op {
214 LogicalOperator::Expand(expand) => {
215 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
216
217 if is_single_hop {
218 let (inner_count, base) = Self::count_expand_chain(&expand.input);
219 (inner_count + 1, base)
220 } else {
221 (0, op)
222 }
223 }
224 _ => (0, op),
225 }
226 }
227
228 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
232 let mut chain = Vec::new();
233 let mut current = op;
234
235 while let LogicalOperator::Expand(expand) = current {
236 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
237 if !is_single_hop {
238 break;
239 }
240 chain.push(expand);
241 current = &expand.input;
242 }
243
244 chain.reverse();
245 chain
246 }
247
248 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
250 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
251 Ok(PhysicalPlan {
252 operator,
253 columns,
254 adaptive_context: None,
255 })
256 }
257
258 pub fn plan_profiled(
264 &self,
265 logical_plan: &LogicalPlan,
266 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
267 self.profiling.set(true);
268 self.profile_entries.borrow_mut().clear();
269
270 let result = self.plan_operator(&logical_plan.root);
271
272 self.profiling.set(false);
273 let (operator, columns) = result?;
274 let entries = self.profile_entries.borrow_mut().drain(..).collect();
275
276 Ok((
277 PhysicalPlan {
278 operator,
279 columns,
280 adaptive_context: None,
281 },
282 entries,
283 ))
284 }
285
286 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
288 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
289
290 let mut adaptive_context = AdaptiveContext::new();
291 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
292
293 Ok(PhysicalPlan {
294 operator,
295 columns,
296 adaptive_context: Some(adaptive_context),
297 })
298 }
299
300 fn collect_cardinality_estimates(
302 &self,
303 op: &LogicalOperator,
304 ctx: &mut AdaptiveContext,
305 depth: usize,
306 ) {
307 match op {
308 LogicalOperator::NodeScan(scan) => {
309 let estimate = if let Some(label) = &scan.label {
310 self.store.nodes_by_label(label).len() as f64
311 } else {
312 self.store.node_count() as f64
313 };
314 let id = format!("scan_{}", scan.variable);
315 ctx.set_estimate(&id, estimate);
316
317 if let Some(input) = &scan.input {
318 self.collect_cardinality_estimates(input, ctx, depth + 1);
319 }
320 }
321 LogicalOperator::Filter(filter) => {
322 let input_estimate = self.estimate_cardinality(&filter.input);
323 let estimate = input_estimate * 0.3;
324 let id = format!("filter_{depth}");
325 ctx.set_estimate(&id, estimate);
326
327 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
328 }
329 LogicalOperator::Expand(expand) => {
330 let input_estimate = self.estimate_cardinality(&expand.input);
331 let stats = self.store.statistics();
332 let avg_degree = self.estimate_expand_degree(&stats, expand);
333 let estimate = input_estimate * avg_degree;
334 let id = format!("expand_{}", expand.to_variable);
335 ctx.set_estimate(&id, estimate);
336
337 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
338 }
339 LogicalOperator::Join(join) => {
340 let left_est = self.estimate_cardinality(&join.left);
341 let right_est = self.estimate_cardinality(&join.right);
342 let estimate = (left_est * right_est).sqrt();
343 let id = format!("join_{depth}");
344 ctx.set_estimate(&id, estimate);
345
346 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
347 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
348 }
349 LogicalOperator::Aggregate(agg) => {
350 let input_estimate = self.estimate_cardinality(&agg.input);
351 let estimate = if agg.group_by.is_empty() {
352 1.0
353 } else {
354 (input_estimate * 0.1).max(1.0)
355 };
356 let id = format!("aggregate_{depth}");
357 ctx.set_estimate(&id, estimate);
358
359 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
360 }
361 LogicalOperator::Distinct(distinct) => {
362 let input_estimate = self.estimate_cardinality(&distinct.input);
363 let estimate = (input_estimate * 0.5).max(1.0);
364 let id = format!("distinct_{depth}");
365 ctx.set_estimate(&id, estimate);
366
367 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
368 }
369 LogicalOperator::Return(ret) => {
370 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
371 }
372 LogicalOperator::Limit(limit) => {
373 let input_estimate = self.estimate_cardinality(&limit.input);
374 let estimate = (input_estimate).min(limit.count.estimate());
375 let id = format!("limit_{depth}");
376 ctx.set_estimate(&id, estimate);
377
378 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
379 }
380 LogicalOperator::Skip(skip) => {
381 let input_estimate = self.estimate_cardinality(&skip.input);
382 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
383 let id = format!("skip_{depth}");
384 ctx.set_estimate(&id, estimate);
385
386 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
387 }
388 LogicalOperator::Sort(sort) => {
389 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
390 }
391 LogicalOperator::Union(union) => {
392 let estimate: f64 = union
393 .inputs
394 .iter()
395 .map(|input| self.estimate_cardinality(input))
396 .sum();
397 let id = format!("union_{depth}");
398 ctx.set_estimate(&id, estimate);
399
400 for input in &union.inputs {
401 self.collect_cardinality_estimates(input, ctx, depth + 1);
402 }
403 }
404 _ => {
405 }
407 }
408 }
409
410 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
412 match op {
413 LogicalOperator::NodeScan(scan) => {
414 if let Some(label) = &scan.label {
415 self.store.nodes_by_label(label).len() as f64
416 } else {
417 self.store.node_count() as f64
418 }
419 }
420 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
421 LogicalOperator::Expand(expand) => {
422 let stats = self.store.statistics();
423 let avg_degree = self.estimate_expand_degree(&stats, expand);
424 self.estimate_cardinality(&expand.input) * avg_degree
425 }
426 LogicalOperator::Join(join) => {
427 let left = self.estimate_cardinality(&join.left);
428 let right = self.estimate_cardinality(&join.right);
429 (left * right).sqrt()
430 }
431 LogicalOperator::Aggregate(agg) => {
432 if agg.group_by.is_empty() {
433 1.0
434 } else {
435 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
436 }
437 }
438 LogicalOperator::Distinct(distinct) => {
439 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
440 }
441 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
442 LogicalOperator::Limit(limit) => self
443 .estimate_cardinality(&limit.input)
444 .min(limit.count.estimate()),
445 LogicalOperator::Skip(skip) => {
446 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
447 }
448 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
449 LogicalOperator::Union(union) => union
450 .inputs
451 .iter()
452 .map(|input| self.estimate_cardinality(input))
453 .sum(),
454 LogicalOperator::Except(except) => {
455 let left = self.estimate_cardinality(&except.left);
456 let right = self.estimate_cardinality(&except.right);
457 (left - right).max(0.0)
458 }
459 LogicalOperator::Intersect(intersect) => {
460 let left = self.estimate_cardinality(&intersect.left);
461 let right = self.estimate_cardinality(&intersect.right);
462 left.min(right)
463 }
464 LogicalOperator::Otherwise(otherwise) => self
465 .estimate_cardinality(&otherwise.left)
466 .max(self.estimate_cardinality(&otherwise.right)),
467 _ => 1000.0,
468 }
469 }
470
471 fn estimate_expand_degree(
473 &self,
474 stats: &grafeo_core::statistics::Statistics,
475 expand: &ExpandOp,
476 ) -> f64 {
477 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
478 if expand.edge_types.len() == 1 {
479 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
480 } else if stats.total_nodes > 0 {
481 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
482 } else {
483 10.0
484 }
485 }
486
487 fn maybe_profile(
490 &self,
491 result: Result<(Box<dyn Operator>, Vec<String>)>,
492 op: &LogicalOperator,
493 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
494 if self.profiling.get() {
495 let (physical, columns) = result?;
496 let (entry, stats) =
497 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
498 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
499 self.profile_entries.borrow_mut().push(entry);
500 Ok((Box::new(profiled), columns))
501 } else {
502 result
503 }
504 }
505
506 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
508 let result = match op {
509 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
510 LogicalOperator::Expand(expand) => {
511 if self.factorized_execution {
512 let (chain_len, _base) = Self::count_expand_chain(op);
513 if chain_len >= 2 {
514 return self.maybe_profile(self.plan_expand_chain(op), op);
515 }
516 }
517 self.plan_expand(expand)
518 }
519 LogicalOperator::Return(ret) => self.plan_return(ret),
520 LogicalOperator::Filter(filter) => self.plan_filter(filter),
521 LogicalOperator::Project(project) => self.plan_project(project),
522 LogicalOperator::Limit(limit) => self.plan_limit(limit),
523 LogicalOperator::Skip(skip) => self.plan_skip(skip),
524 LogicalOperator::Sort(sort) => self.plan_sort(sort),
525 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
526 LogicalOperator::Join(join) => self.plan_join(join),
527 LogicalOperator::Union(union) => self.plan_union(union),
528 LogicalOperator::Except(except) => self.plan_except(except),
529 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
530 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
531 LogicalOperator::Apply(apply) => self.plan_apply(apply),
532 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
533 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
534 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
535 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
536 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
537 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
538 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
539 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
540 LogicalOperator::Merge(merge) => self.plan_merge(merge),
541 LogicalOperator::MergeRelationship(merge_rel) => {
542 self.plan_merge_relationship(merge_rel)
543 }
544 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
545 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
546 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
547 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
548 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
549 #[cfg(feature = "algos")]
550 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
551 #[cfg(not(feature = "algos"))]
552 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
553 "CALL procedures require the 'algos' feature".to_string(),
554 )),
555 LogicalOperator::ParameterScan(_param_scan) => {
556 let state = self
557 .correlated_param_state
558 .borrow()
559 .clone()
560 .ok_or_else(|| {
561 Error::Internal(
562 "ParameterScan without correlated Apply context".to_string(),
563 )
564 })?;
565 let columns = state.columns.clone();
568 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
569 Ok((operator, columns))
570 }
571 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
572 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
573 LogicalOperator::LoadData(load) => {
574 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
575 load.path.clone(),
576 load.format,
577 load.with_headers,
578 load.field_terminator,
579 load.variable.clone(),
580 ));
581 Ok((operator, vec![load.variable.clone()]))
582 }
583 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
584 LogicalOperator::VectorScan(_) => Err(Error::Internal(
585 "VectorScan requires vector-index feature".to_string(),
586 )),
587 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
588 "VectorJoin requires vector-index feature".to_string(),
589 )),
590 _ => Err(Error::Internal(format!(
591 "Unsupported operator: {:?}",
592 std::mem::discriminant(op)
593 ))),
594 };
595 self.maybe_profile(result, op)
596 }
597
598 fn plan_horizontal_aggregate(
600 &self,
601 ha: &HorizontalAggregateOp,
602 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
603 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
604
605 let list_col_idx = child_columns
606 .iter()
607 .position(|c| c == &ha.list_column)
608 .ok_or_else(|| {
609 Error::Internal(format!(
610 "HorizontalAggregate list column '{}' not found in {:?}",
611 ha.list_column, child_columns
612 ))
613 })?;
614
615 let entity_kind = match ha.entity_kind {
616 LogicalEntityKind::Edge => EntityKind::Edge,
617 LogicalEntityKind::Node => EntityKind::Node,
618 };
619
620 let function = convert_aggregate_function(ha.function);
621 let input_column_count = child_columns.len();
622
623 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
624 child_op,
625 list_col_idx,
626 entity_kind,
627 function,
628 ha.property.clone(),
629 Arc::clone(&self.store) as Arc<dyn GraphStore>,
630 input_column_count,
631 ));
632
633 let mut columns = child_columns;
634 columns.push(ha.alias.clone());
635 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
637
638 Ok((operator, columns))
639 }
640
641 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
643 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
644 let key_idx = child_columns
645 .iter()
646 .position(|c| c == &mc.key_var)
647 .ok_or_else(|| {
648 Error::Internal(format!(
649 "MapCollect key '{}' not in columns {:?}",
650 mc.key_var, child_columns
651 ))
652 })?;
653 let value_idx = child_columns
654 .iter()
655 .position(|c| c == &mc.value_var)
656 .ok_or_else(|| {
657 Error::Internal(format!(
658 "MapCollect value '{}' not in columns {:?}",
659 mc.value_var, child_columns
660 ))
661 })?;
662 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
663 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
664 Ok((operator, vec![mc.alias.clone()]))
665 }
666}
667
668#[cfg(feature = "algos")]
670struct StaticResultOperator {
671 rows: Vec<Vec<Value>>,
672 column_indices: Vec<usize>,
673 row_index: usize,
674}
675
676#[cfg(feature = "algos")]
677impl Operator for StaticResultOperator {
678 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
679 use grafeo_core::execution::DataChunk;
680
681 if self.row_index >= self.rows.len() {
682 return Ok(None);
683 }
684
685 let remaining = self.rows.len() - self.row_index;
686 let chunk_rows = remaining.min(1024);
687 let col_count = self.column_indices.len();
688
689 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
690 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
691
692 for row_offset in 0..chunk_rows {
693 let row = &self.rows[self.row_index + row_offset];
694 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
695 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
696 if let Some(col) = chunk.column_mut(col_idx) {
697 col.push_value(value);
698 }
699 }
700 }
701 chunk.set_count(chunk_rows);
702
703 self.row_index += chunk_rows;
704 Ok(Some(chunk))
705 }
706
707 fn reset(&mut self) {
708 self.row_index = 0;
709 }
710
711 fn name(&self) -> &'static str {
712 "StaticResult"
713 }
714}
715
716#[cfg(test)]
717mod tests {
718 use super::*;
719 use crate::query::plan::{
720 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
721 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
722 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
723 SkipOp as LogicalSkipOp, SortKey, SortOp,
724 };
725 use grafeo_common::types::Value;
726 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
727 use grafeo_core::graph::GraphStoreMut;
728 use grafeo_core::graph::lpg::LpgStore;
729
730 fn create_test_store() -> Arc<dyn GraphStoreMut> {
731 let store = Arc::new(LpgStore::new().unwrap());
732 store.create_node(&["Person"]);
733 store.create_node(&["Person"]);
734 store.create_node(&["Company"]);
735 store
736 }
737
738 #[test]
741 fn test_plan_simple_scan() {
742 let store = create_test_store();
743 let planner = Planner::new(store);
744
745 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
747 items: vec![ReturnItem {
748 expression: LogicalExpression::Variable("n".to_string()),
749 alias: None,
750 }],
751 distinct: false,
752 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
753 variable: "n".to_string(),
754 label: Some("Person".to_string()),
755 input: None,
756 })),
757 }));
758
759 let physical = planner.plan(&logical).unwrap();
760 assert_eq!(physical.columns(), &["n"]);
761 }
762
763 #[test]
764 fn test_plan_scan_without_label() {
765 let store = create_test_store();
766 let planner = Planner::new(store);
767
768 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
770 items: vec![ReturnItem {
771 expression: LogicalExpression::Variable("n".to_string()),
772 alias: None,
773 }],
774 distinct: false,
775 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
776 variable: "n".to_string(),
777 label: None,
778 input: None,
779 })),
780 }));
781
782 let physical = planner.plan(&logical).unwrap();
783 assert_eq!(physical.columns(), &["n"]);
784 }
785
786 #[test]
787 fn test_plan_return_with_alias() {
788 let store = create_test_store();
789 let planner = Planner::new(store);
790
791 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
793 items: vec![ReturnItem {
794 expression: LogicalExpression::Variable("n".to_string()),
795 alias: Some("person".to_string()),
796 }],
797 distinct: false,
798 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
799 variable: "n".to_string(),
800 label: Some("Person".to_string()),
801 input: None,
802 })),
803 }));
804
805 let physical = planner.plan(&logical).unwrap();
806 assert_eq!(physical.columns(), &["person"]);
807 }
808
809 #[test]
810 fn test_plan_return_property() {
811 let store = create_test_store();
812 let planner = Planner::new(store);
813
814 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
816 items: vec![ReturnItem {
817 expression: LogicalExpression::Property {
818 variable: "n".to_string(),
819 property: "name".to_string(),
820 },
821 alias: None,
822 }],
823 distinct: false,
824 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
825 variable: "n".to_string(),
826 label: Some("Person".to_string()),
827 input: None,
828 })),
829 }));
830
831 let physical = planner.plan(&logical).unwrap();
832 assert_eq!(physical.columns(), &["n.name"]);
833 }
834
835 #[test]
836 fn test_plan_return_literal() {
837 let store = create_test_store();
838 let planner = Planner::new(store);
839
840 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
842 items: vec![ReturnItem {
843 expression: LogicalExpression::Literal(Value::Int64(42)),
844 alias: Some("answer".to_string()),
845 }],
846 distinct: false,
847 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
848 variable: "n".to_string(),
849 label: None,
850 input: None,
851 })),
852 }));
853
854 let physical = planner.plan(&logical).unwrap();
855 assert_eq!(physical.columns(), &["answer"]);
856 }
857
858 #[test]
861 fn test_plan_filter_equality() {
862 let store = create_test_store();
863 let planner = Planner::new(store);
864
865 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
867 items: vec![ReturnItem {
868 expression: LogicalExpression::Variable("n".to_string()),
869 alias: None,
870 }],
871 distinct: false,
872 input: Box::new(LogicalOperator::Filter(FilterOp {
873 predicate: LogicalExpression::Binary {
874 left: Box::new(LogicalExpression::Property {
875 variable: "n".to_string(),
876 property: "age".to_string(),
877 }),
878 op: BinaryOp::Eq,
879 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
880 },
881 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
882 variable: "n".to_string(),
883 label: Some("Person".to_string()),
884 input: None,
885 })),
886 pushdown_hint: None,
887 })),
888 }));
889
890 let physical = planner.plan(&logical).unwrap();
891 assert_eq!(physical.columns(), &["n"]);
892 }
893
894 #[test]
895 fn test_plan_filter_compound_and() {
896 let store = create_test_store();
897 let planner = Planner::new(store);
898
899 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
901 items: vec![ReturnItem {
902 expression: LogicalExpression::Variable("n".to_string()),
903 alias: None,
904 }],
905 distinct: false,
906 input: Box::new(LogicalOperator::Filter(FilterOp {
907 predicate: LogicalExpression::Binary {
908 left: Box::new(LogicalExpression::Binary {
909 left: Box::new(LogicalExpression::Property {
910 variable: "n".to_string(),
911 property: "age".to_string(),
912 }),
913 op: BinaryOp::Gt,
914 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
915 }),
916 op: BinaryOp::And,
917 right: Box::new(LogicalExpression::Binary {
918 left: Box::new(LogicalExpression::Property {
919 variable: "n".to_string(),
920 property: "age".to_string(),
921 }),
922 op: BinaryOp::Lt,
923 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
924 }),
925 },
926 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
927 variable: "n".to_string(),
928 label: None,
929 input: None,
930 })),
931 pushdown_hint: None,
932 })),
933 }));
934
935 let physical = planner.plan(&logical).unwrap();
936 assert_eq!(physical.columns(), &["n"]);
937 }
938
939 #[test]
940 fn test_plan_filter_unary_not() {
941 let store = create_test_store();
942 let planner = Planner::new(store);
943
944 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
946 items: vec![ReturnItem {
947 expression: LogicalExpression::Variable("n".to_string()),
948 alias: None,
949 }],
950 distinct: false,
951 input: Box::new(LogicalOperator::Filter(FilterOp {
952 predicate: LogicalExpression::Unary {
953 op: UnaryOp::Not,
954 operand: Box::new(LogicalExpression::Property {
955 variable: "n".to_string(),
956 property: "active".to_string(),
957 }),
958 },
959 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
960 variable: "n".to_string(),
961 label: None,
962 input: None,
963 })),
964 pushdown_hint: None,
965 })),
966 }));
967
968 let physical = planner.plan(&logical).unwrap();
969 assert_eq!(physical.columns(), &["n"]);
970 }
971
972 #[test]
973 fn test_plan_filter_is_null() {
974 let store = create_test_store();
975 let planner = Planner::new(store);
976
977 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
979 items: vec![ReturnItem {
980 expression: LogicalExpression::Variable("n".to_string()),
981 alias: None,
982 }],
983 distinct: false,
984 input: Box::new(LogicalOperator::Filter(FilterOp {
985 predicate: LogicalExpression::Unary {
986 op: UnaryOp::IsNull,
987 operand: Box::new(LogicalExpression::Property {
988 variable: "n".to_string(),
989 property: "email".to_string(),
990 }),
991 },
992 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
993 variable: "n".to_string(),
994 label: None,
995 input: None,
996 })),
997 pushdown_hint: None,
998 })),
999 }));
1000
1001 let physical = planner.plan(&logical).unwrap();
1002 assert_eq!(physical.columns(), &["n"]);
1003 }
1004
1005 #[test]
1006 fn test_plan_filter_function_call() {
1007 let store = create_test_store();
1008 let planner = Planner::new(store);
1009
1010 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1012 items: vec![ReturnItem {
1013 expression: LogicalExpression::Variable("n".to_string()),
1014 alias: None,
1015 }],
1016 distinct: false,
1017 input: Box::new(LogicalOperator::Filter(FilterOp {
1018 predicate: LogicalExpression::Binary {
1019 left: Box::new(LogicalExpression::FunctionCall {
1020 name: "size".to_string(),
1021 args: vec![LogicalExpression::Property {
1022 variable: "n".to_string(),
1023 property: "friends".to_string(),
1024 }],
1025 distinct: false,
1026 }),
1027 op: BinaryOp::Gt,
1028 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1029 },
1030 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1031 variable: "n".to_string(),
1032 label: None,
1033 input: None,
1034 })),
1035 pushdown_hint: None,
1036 })),
1037 }));
1038
1039 let physical = planner.plan(&logical).unwrap();
1040 assert_eq!(physical.columns(), &["n"]);
1041 }
1042
1043 #[test]
1046 fn test_plan_expand_outgoing() {
1047 let store = create_test_store();
1048 let planner = Planner::new(store);
1049
1050 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1052 items: vec![
1053 ReturnItem {
1054 expression: LogicalExpression::Variable("a".to_string()),
1055 alias: None,
1056 },
1057 ReturnItem {
1058 expression: LogicalExpression::Variable("b".to_string()),
1059 alias: None,
1060 },
1061 ],
1062 distinct: false,
1063 input: Box::new(LogicalOperator::Expand(ExpandOp {
1064 from_variable: "a".to_string(),
1065 to_variable: "b".to_string(),
1066 edge_variable: None,
1067 direction: ExpandDirection::Outgoing,
1068 edge_types: vec!["KNOWS".to_string()],
1069 min_hops: 1,
1070 max_hops: Some(1),
1071 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1072 variable: "a".to_string(),
1073 label: Some("Person".to_string()),
1074 input: None,
1075 })),
1076 path_alias: None,
1077 path_mode: PathMode::Walk,
1078 })),
1079 }));
1080
1081 let physical = planner.plan(&logical).unwrap();
1082 assert!(physical.columns().contains(&"a".to_string()));
1084 assert!(physical.columns().contains(&"b".to_string()));
1085 }
1086
1087 #[test]
1088 fn test_plan_expand_with_edge_variable() {
1089 let store = create_test_store();
1090 let planner = Planner::new(store);
1091
1092 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1094 items: vec![
1095 ReturnItem {
1096 expression: LogicalExpression::Variable("a".to_string()),
1097 alias: None,
1098 },
1099 ReturnItem {
1100 expression: LogicalExpression::Variable("r".to_string()),
1101 alias: None,
1102 },
1103 ReturnItem {
1104 expression: LogicalExpression::Variable("b".to_string()),
1105 alias: None,
1106 },
1107 ],
1108 distinct: false,
1109 input: Box::new(LogicalOperator::Expand(ExpandOp {
1110 from_variable: "a".to_string(),
1111 to_variable: "b".to_string(),
1112 edge_variable: Some("r".to_string()),
1113 direction: ExpandDirection::Outgoing,
1114 edge_types: vec!["KNOWS".to_string()],
1115 min_hops: 1,
1116 max_hops: Some(1),
1117 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1118 variable: "a".to_string(),
1119 label: None,
1120 input: None,
1121 })),
1122 path_alias: None,
1123 path_mode: PathMode::Walk,
1124 })),
1125 }));
1126
1127 let physical = planner.plan(&logical).unwrap();
1128 assert!(physical.columns().contains(&"a".to_string()));
1129 assert!(physical.columns().contains(&"r".to_string()));
1130 assert!(physical.columns().contains(&"b".to_string()));
1131 }
1132
1133 #[test]
1136 fn test_plan_limit() {
1137 let store = create_test_store();
1138 let planner = Planner::new(store);
1139
1140 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1142 items: vec![ReturnItem {
1143 expression: LogicalExpression::Variable("n".to_string()),
1144 alias: None,
1145 }],
1146 distinct: false,
1147 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1148 count: 10.into(),
1149 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1150 variable: "n".to_string(),
1151 label: None,
1152 input: None,
1153 })),
1154 })),
1155 }));
1156
1157 let physical = planner.plan(&logical).unwrap();
1158 assert_eq!(physical.columns(), &["n"]);
1159 }
1160
1161 #[test]
1162 fn test_plan_skip() {
1163 let store = create_test_store();
1164 let planner = Planner::new(store);
1165
1166 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1168 items: vec![ReturnItem {
1169 expression: LogicalExpression::Variable("n".to_string()),
1170 alias: None,
1171 }],
1172 distinct: false,
1173 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1174 count: 5.into(),
1175 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1176 variable: "n".to_string(),
1177 label: None,
1178 input: None,
1179 })),
1180 })),
1181 }));
1182
1183 let physical = planner.plan(&logical).unwrap();
1184 assert_eq!(physical.columns(), &["n"]);
1185 }
1186
1187 #[test]
1188 fn test_plan_sort() {
1189 let store = create_test_store();
1190 let planner = Planner::new(store);
1191
1192 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1194 items: vec![ReturnItem {
1195 expression: LogicalExpression::Variable("n".to_string()),
1196 alias: None,
1197 }],
1198 distinct: false,
1199 input: Box::new(LogicalOperator::Sort(SortOp {
1200 keys: vec![SortKey {
1201 expression: LogicalExpression::Variable("n".to_string()),
1202 order: SortOrder::Ascending,
1203 nulls: None,
1204 }],
1205 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1206 variable: "n".to_string(),
1207 label: None,
1208 input: None,
1209 })),
1210 })),
1211 }));
1212
1213 let physical = planner.plan(&logical).unwrap();
1214 assert_eq!(physical.columns(), &["n"]);
1215 }
1216
1217 #[test]
1218 fn test_plan_sort_descending() {
1219 let store = create_test_store();
1220 let planner = Planner::new(store);
1221
1222 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1224 items: vec![ReturnItem {
1225 expression: LogicalExpression::Variable("n".to_string()),
1226 alias: None,
1227 }],
1228 distinct: false,
1229 input: Box::new(LogicalOperator::Sort(SortOp {
1230 keys: vec![SortKey {
1231 expression: LogicalExpression::Variable("n".to_string()),
1232 order: SortOrder::Descending,
1233 nulls: None,
1234 }],
1235 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1236 variable: "n".to_string(),
1237 label: None,
1238 input: None,
1239 })),
1240 })),
1241 }));
1242
1243 let physical = planner.plan(&logical).unwrap();
1244 assert_eq!(physical.columns(), &["n"]);
1245 }
1246
1247 #[test]
1248 fn test_plan_distinct() {
1249 let store = create_test_store();
1250 let planner = Planner::new(store);
1251
1252 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1254 items: vec![ReturnItem {
1255 expression: LogicalExpression::Variable("n".to_string()),
1256 alias: None,
1257 }],
1258 distinct: false,
1259 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1260 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1261 variable: "n".to_string(),
1262 label: None,
1263 input: None,
1264 })),
1265 columns: None,
1266 })),
1267 }));
1268
1269 let physical = planner.plan(&logical).unwrap();
1270 assert_eq!(physical.columns(), &["n"]);
1271 }
1272
1273 #[test]
1274 fn test_plan_distinct_with_columns() {
1275 let store = create_test_store();
1276 let planner = Planner::new(store);
1277
1278 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1280 items: vec![ReturnItem {
1281 expression: LogicalExpression::Variable("n".to_string()),
1282 alias: None,
1283 }],
1284 distinct: false,
1285 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1286 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1287 variable: "n".to_string(),
1288 label: None,
1289 input: None,
1290 })),
1291 columns: Some(vec!["n".to_string()]),
1292 })),
1293 }));
1294
1295 let physical = planner.plan(&logical).unwrap();
1296 assert_eq!(physical.columns(), &["n"]);
1297 }
1298
1299 #[test]
1300 fn test_plan_distinct_with_nonexistent_columns() {
1301 let store = create_test_store();
1302 let planner = Planner::new(store);
1303
1304 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1307 items: vec![ReturnItem {
1308 expression: LogicalExpression::Variable("n".to_string()),
1309 alias: None,
1310 }],
1311 distinct: false,
1312 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1313 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1314 variable: "n".to_string(),
1315 label: None,
1316 input: None,
1317 })),
1318 columns: Some(vec!["nonexistent".to_string()]),
1319 })),
1320 }));
1321
1322 let physical = planner.plan(&logical).unwrap();
1323 assert_eq!(physical.columns(), &["n"]);
1324 }
1325
1326 #[test]
1329 fn test_plan_aggregate_count() {
1330 let store = create_test_store();
1331 let planner = Planner::new(store);
1332
1333 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1335 items: vec![ReturnItem {
1336 expression: LogicalExpression::Variable("cnt".to_string()),
1337 alias: None,
1338 }],
1339 distinct: false,
1340 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1341 group_by: vec![],
1342 aggregates: vec![LogicalAggregateExpr {
1343 function: LogicalAggregateFunction::Count,
1344 expression: Some(LogicalExpression::Variable("n".to_string())),
1345 expression2: None,
1346 distinct: false,
1347 alias: Some("cnt".to_string()),
1348 percentile: None,
1349 separator: None,
1350 }],
1351 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1352 variable: "n".to_string(),
1353 label: None,
1354 input: None,
1355 })),
1356 having: None,
1357 })),
1358 }));
1359
1360 let physical = planner.plan(&logical).unwrap();
1361 assert!(physical.columns().contains(&"cnt".to_string()));
1362 }
1363
1364 #[test]
1365 fn test_plan_aggregate_with_group_by() {
1366 let store = create_test_store();
1367 let planner = Planner::new(store);
1368
1369 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1371 group_by: vec![LogicalExpression::Property {
1372 variable: "n".to_string(),
1373 property: "city".to_string(),
1374 }],
1375 aggregates: vec![LogicalAggregateExpr {
1376 function: LogicalAggregateFunction::Count,
1377 expression: Some(LogicalExpression::Variable("n".to_string())),
1378 expression2: None,
1379 distinct: false,
1380 alias: Some("cnt".to_string()),
1381 percentile: None,
1382 separator: None,
1383 }],
1384 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1385 variable: "n".to_string(),
1386 label: Some("Person".to_string()),
1387 input: None,
1388 })),
1389 having: None,
1390 }));
1391
1392 let physical = planner.plan(&logical).unwrap();
1393 assert_eq!(physical.columns().len(), 2);
1394 }
1395
1396 #[test]
1397 fn test_plan_aggregate_sum() {
1398 let store = create_test_store();
1399 let planner = Planner::new(store);
1400
1401 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1403 group_by: vec![],
1404 aggregates: vec![LogicalAggregateExpr {
1405 function: LogicalAggregateFunction::Sum,
1406 expression: Some(LogicalExpression::Property {
1407 variable: "n".to_string(),
1408 property: "value".to_string(),
1409 }),
1410 expression2: None,
1411 distinct: false,
1412 alias: Some("total".to_string()),
1413 percentile: None,
1414 separator: None,
1415 }],
1416 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1417 variable: "n".to_string(),
1418 label: None,
1419 input: None,
1420 })),
1421 having: None,
1422 }));
1423
1424 let physical = planner.plan(&logical).unwrap();
1425 assert!(physical.columns().contains(&"total".to_string()));
1426 }
1427
1428 #[test]
1429 fn test_plan_aggregate_avg() {
1430 let store = create_test_store();
1431 let planner = Planner::new(store);
1432
1433 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1435 group_by: vec![],
1436 aggregates: vec![LogicalAggregateExpr {
1437 function: LogicalAggregateFunction::Avg,
1438 expression: Some(LogicalExpression::Property {
1439 variable: "n".to_string(),
1440 property: "score".to_string(),
1441 }),
1442 expression2: None,
1443 distinct: false,
1444 alias: Some("average".to_string()),
1445 percentile: None,
1446 separator: None,
1447 }],
1448 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1449 variable: "n".to_string(),
1450 label: None,
1451 input: None,
1452 })),
1453 having: None,
1454 }));
1455
1456 let physical = planner.plan(&logical).unwrap();
1457 assert!(physical.columns().contains(&"average".to_string()));
1458 }
1459
1460 #[test]
1461 fn test_plan_aggregate_min_max() {
1462 let store = create_test_store();
1463 let planner = Planner::new(store);
1464
1465 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1467 group_by: vec![],
1468 aggregates: vec![
1469 LogicalAggregateExpr {
1470 function: LogicalAggregateFunction::Min,
1471 expression: Some(LogicalExpression::Property {
1472 variable: "n".to_string(),
1473 property: "age".to_string(),
1474 }),
1475 expression2: None,
1476 distinct: false,
1477 alias: Some("youngest".to_string()),
1478 percentile: None,
1479 separator: None,
1480 },
1481 LogicalAggregateExpr {
1482 function: LogicalAggregateFunction::Max,
1483 expression: Some(LogicalExpression::Property {
1484 variable: "n".to_string(),
1485 property: "age".to_string(),
1486 }),
1487 expression2: None,
1488 distinct: false,
1489 alias: Some("oldest".to_string()),
1490 percentile: None,
1491 separator: None,
1492 },
1493 ],
1494 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1495 variable: "n".to_string(),
1496 label: None,
1497 input: None,
1498 })),
1499 having: None,
1500 }));
1501
1502 let physical = planner.plan(&logical).unwrap();
1503 assert!(physical.columns().contains(&"youngest".to_string()));
1504 assert!(physical.columns().contains(&"oldest".to_string()));
1505 }
1506
1507 #[test]
1510 fn test_plan_inner_join() {
1511 let store = create_test_store();
1512 let planner = Planner::new(store);
1513
1514 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1516 items: vec![
1517 ReturnItem {
1518 expression: LogicalExpression::Variable("a".to_string()),
1519 alias: None,
1520 },
1521 ReturnItem {
1522 expression: LogicalExpression::Variable("b".to_string()),
1523 alias: None,
1524 },
1525 ],
1526 distinct: false,
1527 input: Box::new(LogicalOperator::Join(JoinOp {
1528 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1529 variable: "a".to_string(),
1530 label: Some("Person".to_string()),
1531 input: None,
1532 })),
1533 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1534 variable: "b".to_string(),
1535 label: Some("Company".to_string()),
1536 input: None,
1537 })),
1538 join_type: JoinType::Inner,
1539 conditions: vec![JoinCondition {
1540 left: LogicalExpression::Variable("a".to_string()),
1541 right: LogicalExpression::Variable("b".to_string()),
1542 }],
1543 })),
1544 }));
1545
1546 let physical = planner.plan(&logical).unwrap();
1547 assert!(physical.columns().contains(&"a".to_string()));
1548 assert!(physical.columns().contains(&"b".to_string()));
1549 }
1550
1551 #[test]
1552 fn test_plan_cross_join() {
1553 let store = create_test_store();
1554 let planner = Planner::new(store);
1555
1556 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1558 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1559 variable: "a".to_string(),
1560 label: None,
1561 input: None,
1562 })),
1563 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1564 variable: "b".to_string(),
1565 label: None,
1566 input: None,
1567 })),
1568 join_type: JoinType::Cross,
1569 conditions: vec![],
1570 }));
1571
1572 let physical = planner.plan(&logical).unwrap();
1573 assert_eq!(physical.columns().len(), 2);
1574 }
1575
1576 #[test]
1577 fn test_plan_left_join() {
1578 let store = create_test_store();
1579 let planner = Planner::new(store);
1580
1581 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1582 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1583 variable: "a".to_string(),
1584 label: None,
1585 input: None,
1586 })),
1587 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1588 variable: "b".to_string(),
1589 label: None,
1590 input: None,
1591 })),
1592 join_type: JoinType::Left,
1593 conditions: vec![],
1594 }));
1595
1596 let physical = planner.plan(&logical).unwrap();
1597 assert_eq!(physical.columns().len(), 2);
1598 }
1599
1600 #[test]
1603 fn test_plan_create_node() {
1604 let store = create_test_store();
1605 let planner = Planner::new(store);
1606
1607 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1609 variable: "n".to_string(),
1610 labels: vec!["Person".to_string()],
1611 properties: vec![(
1612 "name".to_string(),
1613 LogicalExpression::Literal(Value::String("Alix".into())),
1614 )],
1615 input: None,
1616 }));
1617
1618 let physical = planner.plan(&logical).unwrap();
1619 assert!(physical.columns().contains(&"n".to_string()));
1620 }
1621
1622 #[test]
1623 fn test_plan_create_edge() {
1624 let store = create_test_store();
1625 let planner = Planner::new(store);
1626
1627 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1629 variable: Some("r".to_string()),
1630 from_variable: "a".to_string(),
1631 to_variable: "b".to_string(),
1632 edge_type: "KNOWS".to_string(),
1633 properties: vec![],
1634 input: Box::new(LogicalOperator::Join(JoinOp {
1635 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1636 variable: "a".to_string(),
1637 label: None,
1638 input: None,
1639 })),
1640 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1641 variable: "b".to_string(),
1642 label: None,
1643 input: None,
1644 })),
1645 join_type: JoinType::Cross,
1646 conditions: vec![],
1647 })),
1648 }));
1649
1650 let physical = planner.plan(&logical).unwrap();
1651 assert!(physical.columns().contains(&"r".to_string()));
1652 }
1653
1654 #[test]
1655 fn test_plan_delete_node() {
1656 let store = create_test_store();
1657 let planner = Planner::new(store);
1658
1659 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1661 variable: "n".to_string(),
1662 detach: false,
1663 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1664 variable: "n".to_string(),
1665 label: None,
1666 input: None,
1667 })),
1668 }));
1669
1670 let physical = planner.plan(&logical).unwrap();
1671 assert!(physical.columns().contains(&"n".to_string()));
1672 }
1673
1674 #[test]
1677 fn test_plan_empty_errors() {
1678 let store = create_test_store();
1679 let planner = Planner::new(store);
1680
1681 let logical = LogicalPlan::new(LogicalOperator::Empty);
1682 let result = planner.plan(&logical);
1683 assert!(result.is_err());
1684 }
1685
1686 #[test]
1687 fn test_plan_missing_variable_in_return() {
1688 let store = create_test_store();
1689 let planner = Planner::new(store);
1690
1691 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1693 items: vec![ReturnItem {
1694 expression: LogicalExpression::Variable("missing".to_string()),
1695 alias: None,
1696 }],
1697 distinct: false,
1698 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1699 variable: "n".to_string(),
1700 label: None,
1701 input: None,
1702 })),
1703 }));
1704
1705 let result = planner.plan(&logical);
1706 assert!(result.is_err());
1707 }
1708
1709 #[test]
1712 fn test_convert_binary_ops() {
1713 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1714 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1715 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1716 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1717 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1718 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1719 assert!(convert_binary_op(BinaryOp::And).is_ok());
1720 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1721 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1722 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1723 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1724 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1725 }
1726
1727 #[test]
1728 fn test_convert_unary_ops() {
1729 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1730 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1731 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1732 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1733 }
1734
1735 #[test]
1736 fn test_convert_aggregate_functions() {
1737 assert!(matches!(
1738 convert_aggregate_function(LogicalAggregateFunction::Count),
1739 PhysicalAggregateFunction::Count
1740 ));
1741 assert!(matches!(
1742 convert_aggregate_function(LogicalAggregateFunction::Sum),
1743 PhysicalAggregateFunction::Sum
1744 ));
1745 assert!(matches!(
1746 convert_aggregate_function(LogicalAggregateFunction::Avg),
1747 PhysicalAggregateFunction::Avg
1748 ));
1749 assert!(matches!(
1750 convert_aggregate_function(LogicalAggregateFunction::Min),
1751 PhysicalAggregateFunction::Min
1752 ));
1753 assert!(matches!(
1754 convert_aggregate_function(LogicalAggregateFunction::Max),
1755 PhysicalAggregateFunction::Max
1756 ));
1757 }
1758
1759 #[test]
1760 fn test_planner_accessors() {
1761 let store = create_test_store();
1762 let planner = Planner::new(Arc::clone(&store));
1763
1764 assert!(planner.transaction_id().is_none());
1765 assert!(planner.transaction_manager().is_none());
1766 let _ = planner.viewing_epoch(); }
1768
1769 #[test]
1770 fn test_physical_plan_accessors() {
1771 let store = create_test_store();
1772 let planner = Planner::new(store);
1773
1774 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1775 variable: "n".to_string(),
1776 label: None,
1777 input: None,
1778 }));
1779
1780 let physical = planner.plan(&logical).unwrap();
1781 assert_eq!(physical.columns(), &["n"]);
1782
1783 let _ = physical.into_operator();
1785 }
1786
1787 #[test]
1790 fn test_plan_adaptive_with_scan() {
1791 let store = create_test_store();
1792 let planner = Planner::new(store);
1793
1794 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1796 items: vec![ReturnItem {
1797 expression: LogicalExpression::Variable("n".to_string()),
1798 alias: None,
1799 }],
1800 distinct: false,
1801 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1802 variable: "n".to_string(),
1803 label: Some("Person".to_string()),
1804 input: None,
1805 })),
1806 }));
1807
1808 let physical = planner.plan_adaptive(&logical).unwrap();
1809 assert_eq!(physical.columns(), &["n"]);
1810 assert!(physical.adaptive_context.is_some());
1812 }
1813
1814 #[test]
1815 fn test_plan_adaptive_with_filter() {
1816 let store = create_test_store();
1817 let planner = Planner::new(store);
1818
1819 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1821 items: vec![ReturnItem {
1822 expression: LogicalExpression::Variable("n".to_string()),
1823 alias: None,
1824 }],
1825 distinct: false,
1826 input: Box::new(LogicalOperator::Filter(FilterOp {
1827 predicate: LogicalExpression::Binary {
1828 left: Box::new(LogicalExpression::Property {
1829 variable: "n".to_string(),
1830 property: "age".to_string(),
1831 }),
1832 op: BinaryOp::Gt,
1833 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1834 },
1835 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1836 variable: "n".to_string(),
1837 label: None,
1838 input: None,
1839 })),
1840 pushdown_hint: None,
1841 })),
1842 }));
1843
1844 let physical = planner.plan_adaptive(&logical).unwrap();
1845 assert!(physical.adaptive_context.is_some());
1846 }
1847
1848 #[test]
1849 fn test_plan_adaptive_with_expand() {
1850 let store = create_test_store();
1851 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1852
1853 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1855 items: vec![
1856 ReturnItem {
1857 expression: LogicalExpression::Variable("a".to_string()),
1858 alias: None,
1859 },
1860 ReturnItem {
1861 expression: LogicalExpression::Variable("b".to_string()),
1862 alias: None,
1863 },
1864 ],
1865 distinct: false,
1866 input: Box::new(LogicalOperator::Expand(ExpandOp {
1867 from_variable: "a".to_string(),
1868 to_variable: "b".to_string(),
1869 edge_variable: None,
1870 direction: ExpandDirection::Outgoing,
1871 edge_types: vec!["KNOWS".to_string()],
1872 min_hops: 1,
1873 max_hops: Some(1),
1874 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1875 variable: "a".to_string(),
1876 label: None,
1877 input: None,
1878 })),
1879 path_alias: None,
1880 path_mode: PathMode::Walk,
1881 })),
1882 }));
1883
1884 let physical = planner.plan_adaptive(&logical).unwrap();
1885 assert!(physical.adaptive_context.is_some());
1886 }
1887
1888 #[test]
1889 fn test_plan_adaptive_with_join() {
1890 let store = create_test_store();
1891 let planner = Planner::new(store);
1892
1893 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1894 items: vec![
1895 ReturnItem {
1896 expression: LogicalExpression::Variable("a".to_string()),
1897 alias: None,
1898 },
1899 ReturnItem {
1900 expression: LogicalExpression::Variable("b".to_string()),
1901 alias: None,
1902 },
1903 ],
1904 distinct: false,
1905 input: Box::new(LogicalOperator::Join(JoinOp {
1906 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1907 variable: "a".to_string(),
1908 label: None,
1909 input: None,
1910 })),
1911 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1912 variable: "b".to_string(),
1913 label: None,
1914 input: None,
1915 })),
1916 join_type: JoinType::Cross,
1917 conditions: vec![],
1918 })),
1919 }));
1920
1921 let physical = planner.plan_adaptive(&logical).unwrap();
1922 assert!(physical.adaptive_context.is_some());
1923 }
1924
1925 #[test]
1926 fn test_plan_adaptive_with_aggregate() {
1927 let store = create_test_store();
1928 let planner = Planner::new(store);
1929
1930 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1931 group_by: vec![],
1932 aggregates: vec![LogicalAggregateExpr {
1933 function: LogicalAggregateFunction::Count,
1934 expression: Some(LogicalExpression::Variable("n".to_string())),
1935 expression2: None,
1936 distinct: false,
1937 alias: Some("cnt".to_string()),
1938 percentile: None,
1939 separator: None,
1940 }],
1941 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1942 variable: "n".to_string(),
1943 label: None,
1944 input: None,
1945 })),
1946 having: None,
1947 }));
1948
1949 let physical = planner.plan_adaptive(&logical).unwrap();
1950 assert!(physical.adaptive_context.is_some());
1951 }
1952
1953 #[test]
1954 fn test_plan_adaptive_with_distinct() {
1955 let store = create_test_store();
1956 let planner = Planner::new(store);
1957
1958 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1959 items: vec![ReturnItem {
1960 expression: LogicalExpression::Variable("n".to_string()),
1961 alias: None,
1962 }],
1963 distinct: false,
1964 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1965 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1966 variable: "n".to_string(),
1967 label: None,
1968 input: None,
1969 })),
1970 columns: None,
1971 })),
1972 }));
1973
1974 let physical = planner.plan_adaptive(&logical).unwrap();
1975 assert!(physical.adaptive_context.is_some());
1976 }
1977
1978 #[test]
1979 fn test_plan_adaptive_with_limit() {
1980 let store = create_test_store();
1981 let planner = Planner::new(store);
1982
1983 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1984 items: vec![ReturnItem {
1985 expression: LogicalExpression::Variable("n".to_string()),
1986 alias: None,
1987 }],
1988 distinct: false,
1989 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1990 count: 10.into(),
1991 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1992 variable: "n".to_string(),
1993 label: None,
1994 input: None,
1995 })),
1996 })),
1997 }));
1998
1999 let physical = planner.plan_adaptive(&logical).unwrap();
2000 assert!(physical.adaptive_context.is_some());
2001 }
2002
2003 #[test]
2004 fn test_plan_adaptive_with_skip() {
2005 let store = create_test_store();
2006 let planner = Planner::new(store);
2007
2008 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2009 items: vec![ReturnItem {
2010 expression: LogicalExpression::Variable("n".to_string()),
2011 alias: None,
2012 }],
2013 distinct: false,
2014 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2015 count: 5.into(),
2016 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2017 variable: "n".to_string(),
2018 label: None,
2019 input: None,
2020 })),
2021 })),
2022 }));
2023
2024 let physical = planner.plan_adaptive(&logical).unwrap();
2025 assert!(physical.adaptive_context.is_some());
2026 }
2027
2028 #[test]
2029 fn test_plan_adaptive_with_sort() {
2030 let store = create_test_store();
2031 let planner = Planner::new(store);
2032
2033 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2034 items: vec![ReturnItem {
2035 expression: LogicalExpression::Variable("n".to_string()),
2036 alias: None,
2037 }],
2038 distinct: false,
2039 input: Box::new(LogicalOperator::Sort(SortOp {
2040 keys: vec![SortKey {
2041 expression: LogicalExpression::Variable("n".to_string()),
2042 order: SortOrder::Ascending,
2043 nulls: None,
2044 }],
2045 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2046 variable: "n".to_string(),
2047 label: None,
2048 input: None,
2049 })),
2050 })),
2051 }));
2052
2053 let physical = planner.plan_adaptive(&logical).unwrap();
2054 assert!(physical.adaptive_context.is_some());
2055 }
2056
2057 #[test]
2058 fn test_plan_adaptive_with_union() {
2059 let store = create_test_store();
2060 let planner = Planner::new(store);
2061
2062 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2063 items: vec![ReturnItem {
2064 expression: LogicalExpression::Variable("n".to_string()),
2065 alias: None,
2066 }],
2067 distinct: false,
2068 input: Box::new(LogicalOperator::Union(UnionOp {
2069 inputs: vec![
2070 LogicalOperator::NodeScan(NodeScanOp {
2071 variable: "n".to_string(),
2072 label: Some("Person".to_string()),
2073 input: None,
2074 }),
2075 LogicalOperator::NodeScan(NodeScanOp {
2076 variable: "n".to_string(),
2077 label: Some("Company".to_string()),
2078 input: None,
2079 }),
2080 ],
2081 })),
2082 }));
2083
2084 let physical = planner.plan_adaptive(&logical).unwrap();
2085 assert!(physical.adaptive_context.is_some());
2086 }
2087
2088 #[test]
2091 fn test_plan_expand_variable_length() {
2092 let store = create_test_store();
2093 let planner = Planner::new(store);
2094
2095 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2097 items: vec![
2098 ReturnItem {
2099 expression: LogicalExpression::Variable("a".to_string()),
2100 alias: None,
2101 },
2102 ReturnItem {
2103 expression: LogicalExpression::Variable("b".to_string()),
2104 alias: None,
2105 },
2106 ],
2107 distinct: false,
2108 input: Box::new(LogicalOperator::Expand(ExpandOp {
2109 from_variable: "a".to_string(),
2110 to_variable: "b".to_string(),
2111 edge_variable: None,
2112 direction: ExpandDirection::Outgoing,
2113 edge_types: vec!["KNOWS".to_string()],
2114 min_hops: 1,
2115 max_hops: Some(3),
2116 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2117 variable: "a".to_string(),
2118 label: None,
2119 input: None,
2120 })),
2121 path_alias: None,
2122 path_mode: PathMode::Walk,
2123 })),
2124 }));
2125
2126 let physical = planner.plan(&logical).unwrap();
2127 assert!(physical.columns().contains(&"a".to_string()));
2128 assert!(physical.columns().contains(&"b".to_string()));
2129 }
2130
2131 #[test]
2132 fn test_plan_expand_with_path_alias() {
2133 let store = create_test_store();
2134 let planner = Planner::new(store);
2135
2136 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2138 items: vec![
2139 ReturnItem {
2140 expression: LogicalExpression::Variable("a".to_string()),
2141 alias: None,
2142 },
2143 ReturnItem {
2144 expression: LogicalExpression::Variable("b".to_string()),
2145 alias: None,
2146 },
2147 ],
2148 distinct: false,
2149 input: Box::new(LogicalOperator::Expand(ExpandOp {
2150 from_variable: "a".to_string(),
2151 to_variable: "b".to_string(),
2152 edge_variable: None,
2153 direction: ExpandDirection::Outgoing,
2154 edge_types: vec!["KNOWS".to_string()],
2155 min_hops: 1,
2156 max_hops: Some(3),
2157 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2158 variable: "a".to_string(),
2159 label: None,
2160 input: None,
2161 })),
2162 path_alias: Some("p".to_string()),
2163 path_mode: PathMode::Walk,
2164 })),
2165 }));
2166
2167 let physical = planner.plan(&logical).unwrap();
2168 assert!(physical.columns().contains(&"a".to_string()));
2170 assert!(physical.columns().contains(&"b".to_string()));
2171 }
2172
2173 #[test]
2174 fn test_plan_expand_incoming() {
2175 let store = create_test_store();
2176 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2177
2178 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2180 items: vec![
2181 ReturnItem {
2182 expression: LogicalExpression::Variable("a".to_string()),
2183 alias: None,
2184 },
2185 ReturnItem {
2186 expression: LogicalExpression::Variable("b".to_string()),
2187 alias: None,
2188 },
2189 ],
2190 distinct: false,
2191 input: Box::new(LogicalOperator::Expand(ExpandOp {
2192 from_variable: "a".to_string(),
2193 to_variable: "b".to_string(),
2194 edge_variable: None,
2195 direction: ExpandDirection::Incoming,
2196 edge_types: vec!["KNOWS".to_string()],
2197 min_hops: 1,
2198 max_hops: Some(1),
2199 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2200 variable: "a".to_string(),
2201 label: None,
2202 input: None,
2203 })),
2204 path_alias: None,
2205 path_mode: PathMode::Walk,
2206 })),
2207 }));
2208
2209 let physical = planner.plan(&logical).unwrap();
2210 assert!(physical.columns().contains(&"a".to_string()));
2211 assert!(physical.columns().contains(&"b".to_string()));
2212 }
2213
2214 #[test]
2215 fn test_plan_expand_both_directions() {
2216 let store = create_test_store();
2217 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2218
2219 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2221 items: vec![
2222 ReturnItem {
2223 expression: LogicalExpression::Variable("a".to_string()),
2224 alias: None,
2225 },
2226 ReturnItem {
2227 expression: LogicalExpression::Variable("b".to_string()),
2228 alias: None,
2229 },
2230 ],
2231 distinct: false,
2232 input: Box::new(LogicalOperator::Expand(ExpandOp {
2233 from_variable: "a".to_string(),
2234 to_variable: "b".to_string(),
2235 edge_variable: None,
2236 direction: ExpandDirection::Both,
2237 edge_types: vec!["KNOWS".to_string()],
2238 min_hops: 1,
2239 max_hops: Some(1),
2240 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2241 variable: "a".to_string(),
2242 label: None,
2243 input: None,
2244 })),
2245 path_alias: None,
2246 path_mode: PathMode::Walk,
2247 })),
2248 }));
2249
2250 let physical = planner.plan(&logical).unwrap();
2251 assert!(physical.columns().contains(&"a".to_string()));
2252 assert!(physical.columns().contains(&"b".to_string()));
2253 }
2254
2255 #[test]
2258 fn test_planner_with_context() {
2259 use crate::transaction::TransactionManager;
2260
2261 let store = create_test_store();
2262 let transaction_manager = Arc::new(TransactionManager::new());
2263 let transaction_id = transaction_manager.begin();
2264 let epoch = transaction_manager.current_epoch();
2265
2266 let planner = Planner::with_context(
2267 Arc::clone(&store),
2268 Arc::clone(&transaction_manager),
2269 Some(transaction_id),
2270 epoch,
2271 );
2272
2273 assert_eq!(planner.transaction_id(), Some(transaction_id));
2274 assert!(planner.transaction_manager().is_some());
2275 assert_eq!(planner.viewing_epoch(), epoch);
2276 }
2277
2278 #[test]
2279 fn test_planner_with_factorized_execution_disabled() {
2280 let store = create_test_store();
2281 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2282
2283 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2285 items: vec![
2286 ReturnItem {
2287 expression: LogicalExpression::Variable("a".to_string()),
2288 alias: None,
2289 },
2290 ReturnItem {
2291 expression: LogicalExpression::Variable("c".to_string()),
2292 alias: None,
2293 },
2294 ],
2295 distinct: false,
2296 input: Box::new(LogicalOperator::Expand(ExpandOp {
2297 from_variable: "b".to_string(),
2298 to_variable: "c".to_string(),
2299 edge_variable: None,
2300 direction: ExpandDirection::Outgoing,
2301 edge_types: vec![],
2302 min_hops: 1,
2303 max_hops: Some(1),
2304 input: Box::new(LogicalOperator::Expand(ExpandOp {
2305 from_variable: "a".to_string(),
2306 to_variable: "b".to_string(),
2307 edge_variable: None,
2308 direction: ExpandDirection::Outgoing,
2309 edge_types: vec![],
2310 min_hops: 1,
2311 max_hops: Some(1),
2312 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2313 variable: "a".to_string(),
2314 label: None,
2315 input: None,
2316 })),
2317 path_alias: None,
2318 path_mode: PathMode::Walk,
2319 })),
2320 path_alias: None,
2321 path_mode: PathMode::Walk,
2322 })),
2323 }));
2324
2325 let physical = planner.plan(&logical).unwrap();
2326 assert!(physical.columns().contains(&"a".to_string()));
2327 assert!(physical.columns().contains(&"c".to_string()));
2328 }
2329
2330 #[test]
2333 fn test_plan_sort_by_property() {
2334 let store = create_test_store();
2335 let planner = Planner::new(store);
2336
2337 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2339 items: vec![ReturnItem {
2340 expression: LogicalExpression::Variable("n".to_string()),
2341 alias: None,
2342 }],
2343 distinct: false,
2344 input: Box::new(LogicalOperator::Sort(SortOp {
2345 keys: vec![SortKey {
2346 expression: LogicalExpression::Property {
2347 variable: "n".to_string(),
2348 property: "name".to_string(),
2349 },
2350 order: SortOrder::Ascending,
2351 nulls: None,
2352 }],
2353 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2354 variable: "n".to_string(),
2355 label: None,
2356 input: None,
2357 })),
2358 })),
2359 }));
2360
2361 let physical = planner.plan(&logical).unwrap();
2362 assert!(physical.columns().contains(&"n".to_string()));
2364 }
2365
2366 #[test]
2369 fn test_plan_scan_with_input() {
2370 let store = create_test_store();
2371 let planner = Planner::new(store);
2372
2373 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2375 items: vec![
2376 ReturnItem {
2377 expression: LogicalExpression::Variable("a".to_string()),
2378 alias: None,
2379 },
2380 ReturnItem {
2381 expression: LogicalExpression::Variable("b".to_string()),
2382 alias: None,
2383 },
2384 ],
2385 distinct: false,
2386 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2387 variable: "b".to_string(),
2388 label: Some("Company".to_string()),
2389 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2390 variable: "a".to_string(),
2391 label: Some("Person".to_string()),
2392 input: None,
2393 }))),
2394 })),
2395 }));
2396
2397 let physical = planner.plan(&logical).unwrap();
2398 assert!(physical.columns().contains(&"a".to_string()));
2399 assert!(physical.columns().contains(&"b".to_string()));
2400 }
2401}