1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TransactionId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33 EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34 FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35 HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37 LoadCsvOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
38 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
39 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
40 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
41 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnwindOperator,
42 VariableLengthExpandOperator,
43};
44use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
45use std::collections::HashMap;
46use std::sync::Arc;
47
48use crate::query::planner::common;
49use crate::query::planner::common::expression_to_string;
50use crate::query::planner::{
51 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
52 convert_unary_op, value_to_logical_type,
53};
54use crate::transaction::TransactionManager;
55
56struct RangeBounds<'a> {
58 min: Option<&'a Value>,
59 max: Option<&'a Value>,
60 min_inclusive: bool,
61 max_inclusive: bool,
62}
63
64pub struct Planner {
66 pub(super) store: Arc<dyn GraphStoreMut>,
68 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
70 pub(super) transaction_id: Option<TransactionId>,
72 pub(super) viewing_epoch: EpochId,
74 pub(super) anon_edge_counter: std::cell::Cell<u32>,
76 pub(super) factorized_execution: bool,
78 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
81 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
86 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
88 pub(super) correlated_param_state:
92 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
93 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
96 profiling: std::cell::Cell<bool>,
98 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
100}
101
102impl Planner {
103 #[must_use]
108 pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
109 let epoch = store.current_epoch();
110 Self {
111 store,
112 transaction_manager: None,
113 transaction_id: None,
114 viewing_epoch: epoch,
115 anon_edge_counter: std::cell::Cell::new(0),
116 factorized_execution: true,
117 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
118 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
119 validator: None,
120 catalog: None,
121 correlated_param_state: std::cell::RefCell::new(None),
122 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
123 profiling: std::cell::Cell::new(false),
124 profile_entries: std::cell::RefCell::new(Vec::new()),
125 }
126 }
127
128 #[must_use]
130 pub fn with_context(
131 store: Arc<dyn GraphStoreMut>,
132 transaction_manager: Arc<TransactionManager>,
133 transaction_id: Option<TransactionId>,
134 viewing_epoch: EpochId,
135 ) -> Self {
136 Self {
137 store,
138 transaction_manager: Some(transaction_manager),
139 transaction_id,
140 viewing_epoch,
141 anon_edge_counter: std::cell::Cell::new(0),
142 factorized_execution: true,
143 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
144 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
145 validator: None,
146 catalog: None,
147 correlated_param_state: std::cell::RefCell::new(None),
148 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
149 profiling: std::cell::Cell::new(false),
150 profile_entries: std::cell::RefCell::new(Vec::new()),
151 }
152 }
153
154 #[must_use]
156 pub fn viewing_epoch(&self) -> EpochId {
157 self.viewing_epoch
158 }
159
160 #[must_use]
162 pub fn transaction_id(&self) -> Option<TransactionId> {
163 self.transaction_id
164 }
165
166 #[must_use]
168 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
169 self.transaction_manager.as_ref()
170 }
171
172 #[must_use]
174 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
175 self.factorized_execution = enabled;
176 self
177 }
178
179 #[must_use]
181 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
182 self.validator = Some(validator);
183 self
184 }
185
186 #[must_use]
188 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
189 self.catalog = Some(catalog);
190 self
191 }
192
193 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
197 match op {
198 LogicalOperator::Expand(expand) => {
199 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
200
201 if is_single_hop {
202 let (inner_count, base) = Self::count_expand_chain(&expand.input);
203 (inner_count + 1, base)
204 } else {
205 (0, op)
206 }
207 }
208 _ => (0, op),
209 }
210 }
211
212 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
216 let mut chain = Vec::new();
217 let mut current = op;
218
219 while let LogicalOperator::Expand(expand) = current {
220 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
221 if !is_single_hop {
222 break;
223 }
224 chain.push(expand);
225 current = &expand.input;
226 }
227
228 chain.reverse();
229 chain
230 }
231
232 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
234 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
235 Ok(PhysicalPlan {
236 operator,
237 columns,
238 adaptive_context: None,
239 })
240 }
241
242 pub fn plan_profiled(
248 &self,
249 logical_plan: &LogicalPlan,
250 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
251 self.profiling.set(true);
252 self.profile_entries.borrow_mut().clear();
253
254 let result = self.plan_operator(&logical_plan.root);
255
256 self.profiling.set(false);
257 let (operator, columns) = result?;
258 let entries = self.profile_entries.borrow_mut().drain(..).collect();
259
260 Ok((
261 PhysicalPlan {
262 operator,
263 columns,
264 adaptive_context: None,
265 },
266 entries,
267 ))
268 }
269
270 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
272 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
273
274 let mut adaptive_context = AdaptiveContext::new();
275 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
276
277 Ok(PhysicalPlan {
278 operator,
279 columns,
280 adaptive_context: Some(adaptive_context),
281 })
282 }
283
284 fn collect_cardinality_estimates(
286 &self,
287 op: &LogicalOperator,
288 ctx: &mut AdaptiveContext,
289 depth: usize,
290 ) {
291 match op {
292 LogicalOperator::NodeScan(scan) => {
293 let estimate = if let Some(label) = &scan.label {
294 self.store.nodes_by_label(label).len() as f64
295 } else {
296 self.store.node_count() as f64
297 };
298 let id = format!("scan_{}", scan.variable);
299 ctx.set_estimate(&id, estimate);
300
301 if let Some(input) = &scan.input {
302 self.collect_cardinality_estimates(input, ctx, depth + 1);
303 }
304 }
305 LogicalOperator::Filter(filter) => {
306 let input_estimate = self.estimate_cardinality(&filter.input);
307 let estimate = input_estimate * 0.3;
308 let id = format!("filter_{depth}");
309 ctx.set_estimate(&id, estimate);
310
311 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
312 }
313 LogicalOperator::Expand(expand) => {
314 let input_estimate = self.estimate_cardinality(&expand.input);
315 let stats = self.store.statistics();
316 let avg_degree = self.estimate_expand_degree(&stats, expand);
317 let estimate = input_estimate * avg_degree;
318 let id = format!("expand_{}", expand.to_variable);
319 ctx.set_estimate(&id, estimate);
320
321 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
322 }
323 LogicalOperator::Join(join) => {
324 let left_est = self.estimate_cardinality(&join.left);
325 let right_est = self.estimate_cardinality(&join.right);
326 let estimate = (left_est * right_est).sqrt();
327 let id = format!("join_{depth}");
328 ctx.set_estimate(&id, estimate);
329
330 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
331 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
332 }
333 LogicalOperator::Aggregate(agg) => {
334 let input_estimate = self.estimate_cardinality(&agg.input);
335 let estimate = if agg.group_by.is_empty() {
336 1.0
337 } else {
338 (input_estimate * 0.1).max(1.0)
339 };
340 let id = format!("aggregate_{depth}");
341 ctx.set_estimate(&id, estimate);
342
343 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
344 }
345 LogicalOperator::Distinct(distinct) => {
346 let input_estimate = self.estimate_cardinality(&distinct.input);
347 let estimate = (input_estimate * 0.5).max(1.0);
348 let id = format!("distinct_{depth}");
349 ctx.set_estimate(&id, estimate);
350
351 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
352 }
353 LogicalOperator::Return(ret) => {
354 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
355 }
356 LogicalOperator::Limit(limit) => {
357 let input_estimate = self.estimate_cardinality(&limit.input);
358 let estimate = (input_estimate).min(limit.count.estimate());
359 let id = format!("limit_{depth}");
360 ctx.set_estimate(&id, estimate);
361
362 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
363 }
364 LogicalOperator::Skip(skip) => {
365 let input_estimate = self.estimate_cardinality(&skip.input);
366 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
367 let id = format!("skip_{depth}");
368 ctx.set_estimate(&id, estimate);
369
370 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
371 }
372 LogicalOperator::Sort(sort) => {
373 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
374 }
375 LogicalOperator::Union(union) => {
376 let estimate: f64 = union
377 .inputs
378 .iter()
379 .map(|input| self.estimate_cardinality(input))
380 .sum();
381 let id = format!("union_{depth}");
382 ctx.set_estimate(&id, estimate);
383
384 for input in &union.inputs {
385 self.collect_cardinality_estimates(input, ctx, depth + 1);
386 }
387 }
388 _ => {
389 }
391 }
392 }
393
394 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
396 match op {
397 LogicalOperator::NodeScan(scan) => {
398 if let Some(label) = &scan.label {
399 self.store.nodes_by_label(label).len() as f64
400 } else {
401 self.store.node_count() as f64
402 }
403 }
404 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
405 LogicalOperator::Expand(expand) => {
406 let stats = self.store.statistics();
407 let avg_degree = self.estimate_expand_degree(&stats, expand);
408 self.estimate_cardinality(&expand.input) * avg_degree
409 }
410 LogicalOperator::Join(join) => {
411 let left = self.estimate_cardinality(&join.left);
412 let right = self.estimate_cardinality(&join.right);
413 (left * right).sqrt()
414 }
415 LogicalOperator::Aggregate(agg) => {
416 if agg.group_by.is_empty() {
417 1.0
418 } else {
419 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
420 }
421 }
422 LogicalOperator::Distinct(distinct) => {
423 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
424 }
425 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
426 LogicalOperator::Limit(limit) => self
427 .estimate_cardinality(&limit.input)
428 .min(limit.count.estimate()),
429 LogicalOperator::Skip(skip) => {
430 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
431 }
432 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
433 LogicalOperator::Union(union) => union
434 .inputs
435 .iter()
436 .map(|input| self.estimate_cardinality(input))
437 .sum(),
438 LogicalOperator::Except(except) => {
439 let left = self.estimate_cardinality(&except.left);
440 let right = self.estimate_cardinality(&except.right);
441 (left - right).max(0.0)
442 }
443 LogicalOperator::Intersect(intersect) => {
444 let left = self.estimate_cardinality(&intersect.left);
445 let right = self.estimate_cardinality(&intersect.right);
446 left.min(right)
447 }
448 LogicalOperator::Otherwise(otherwise) => self
449 .estimate_cardinality(&otherwise.left)
450 .max(self.estimate_cardinality(&otherwise.right)),
451 _ => 1000.0,
452 }
453 }
454
455 fn estimate_expand_degree(
457 &self,
458 stats: &grafeo_core::statistics::Statistics,
459 expand: &ExpandOp,
460 ) -> f64 {
461 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
462 if expand.edge_types.len() == 1 {
463 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
464 } else if stats.total_nodes > 0 {
465 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
466 } else {
467 10.0
468 }
469 }
470
471 fn maybe_profile(
474 &self,
475 result: Result<(Box<dyn Operator>, Vec<String>)>,
476 op: &LogicalOperator,
477 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
478 if self.profiling.get() {
479 let (physical, columns) = result?;
480 let (entry, stats) =
481 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
482 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
483 self.profile_entries.borrow_mut().push(entry);
484 Ok((Box::new(profiled), columns))
485 } else {
486 result
487 }
488 }
489
490 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
492 let result = match op {
493 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
494 LogicalOperator::Expand(expand) => {
495 if self.factorized_execution {
496 let (chain_len, _base) = Self::count_expand_chain(op);
497 if chain_len >= 2 {
498 return self.maybe_profile(self.plan_expand_chain(op), op);
499 }
500 }
501 self.plan_expand(expand)
502 }
503 LogicalOperator::Return(ret) => self.plan_return(ret),
504 LogicalOperator::Filter(filter) => self.plan_filter(filter),
505 LogicalOperator::Project(project) => self.plan_project(project),
506 LogicalOperator::Limit(limit) => self.plan_limit(limit),
507 LogicalOperator::Skip(skip) => self.plan_skip(skip),
508 LogicalOperator::Sort(sort) => self.plan_sort(sort),
509 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
510 LogicalOperator::Join(join) => self.plan_join(join),
511 LogicalOperator::Union(union) => self.plan_union(union),
512 LogicalOperator::Except(except) => self.plan_except(except),
513 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
514 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
515 LogicalOperator::Apply(apply) => self.plan_apply(apply),
516 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
517 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
518 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
519 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
520 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
521 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
522 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
523 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
524 LogicalOperator::Merge(merge) => self.plan_merge(merge),
525 LogicalOperator::MergeRelationship(merge_rel) => {
526 self.plan_merge_relationship(merge_rel)
527 }
528 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
529 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
530 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
531 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
532 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
533 #[cfg(feature = "algos")]
534 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
535 #[cfg(not(feature = "algos"))]
536 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
537 "CALL procedures require the 'algos' feature".to_string(),
538 )),
539 LogicalOperator::ParameterScan(param_scan) => {
540 let state = self
541 .correlated_param_state
542 .borrow()
543 .clone()
544 .ok_or_else(|| {
545 Error::Internal(
546 "ParameterScan without correlated Apply context".to_string(),
547 )
548 })?;
549 let columns = param_scan.columns.clone();
550 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
551 Ok((operator, columns))
552 }
553 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
554 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
555 LogicalOperator::LoadCsv(load) => {
556 let operator: Box<dyn Operator> = Box::new(LoadCsvOperator::new(
557 load.path.clone(),
558 load.with_headers,
559 load.field_terminator,
560 load.variable.clone(),
561 ));
562 Ok((operator, vec![load.variable.clone()]))
563 }
564 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
565 LogicalOperator::VectorScan(_) => Err(Error::Internal(
566 "VectorScan requires vector-index feature".to_string(),
567 )),
568 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
569 "VectorJoin requires vector-index feature".to_string(),
570 )),
571 _ => Err(Error::Internal(format!(
572 "Unsupported operator: {:?}",
573 std::mem::discriminant(op)
574 ))),
575 };
576 self.maybe_profile(result, op)
577 }
578
579 fn plan_horizontal_aggregate(
581 &self,
582 ha: &HorizontalAggregateOp,
583 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
584 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
585
586 let list_col_idx = child_columns
587 .iter()
588 .position(|c| c == &ha.list_column)
589 .ok_or_else(|| {
590 Error::Internal(format!(
591 "HorizontalAggregate list column '{}' not found in {:?}",
592 ha.list_column, child_columns
593 ))
594 })?;
595
596 let entity_kind = match ha.entity_kind {
597 LogicalEntityKind::Edge => EntityKind::Edge,
598 LogicalEntityKind::Node => EntityKind::Node,
599 };
600
601 let function = convert_aggregate_function(ha.function);
602 let input_column_count = child_columns.len();
603
604 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
605 child_op,
606 list_col_idx,
607 entity_kind,
608 function,
609 ha.property.clone(),
610 Arc::clone(&self.store) as Arc<dyn GraphStore>,
611 input_column_count,
612 ));
613
614 let mut columns = child_columns;
615 columns.push(ha.alias.clone());
616 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
618
619 Ok((operator, columns))
620 }
621
622 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
624 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
625 let key_idx = child_columns
626 .iter()
627 .position(|c| c == &mc.key_var)
628 .ok_or_else(|| {
629 Error::Internal(format!(
630 "MapCollect key '{}' not in columns {:?}",
631 mc.key_var, child_columns
632 ))
633 })?;
634 let value_idx = child_columns
635 .iter()
636 .position(|c| c == &mc.value_var)
637 .ok_or_else(|| {
638 Error::Internal(format!(
639 "MapCollect value '{}' not in columns {:?}",
640 mc.value_var, child_columns
641 ))
642 })?;
643 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
644 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
645 Ok((operator, vec![mc.alias.clone()]))
646 }
647}
648
649#[cfg(feature = "algos")]
651struct StaticResultOperator {
652 rows: Vec<Vec<Value>>,
653 column_indices: Vec<usize>,
654 row_index: usize,
655}
656
657#[cfg(feature = "algos")]
658impl Operator for StaticResultOperator {
659 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
660 use grafeo_core::execution::DataChunk;
661
662 if self.row_index >= self.rows.len() {
663 return Ok(None);
664 }
665
666 let remaining = self.rows.len() - self.row_index;
667 let chunk_rows = remaining.min(1024);
668 let col_count = self.column_indices.len();
669
670 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
671 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
672
673 for row_offset in 0..chunk_rows {
674 let row = &self.rows[self.row_index + row_offset];
675 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
676 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
677 if let Some(col) = chunk.column_mut(col_idx) {
678 col.push_value(value);
679 }
680 }
681 }
682 chunk.set_count(chunk_rows);
683
684 self.row_index += chunk_rows;
685 Ok(Some(chunk))
686 }
687
688 fn reset(&mut self) {
689 self.row_index = 0;
690 }
691
692 fn name(&self) -> &'static str {
693 "StaticResult"
694 }
695}
696
697#[cfg(test)]
698mod tests {
699 use super::*;
700 use crate::query::plan::{
701 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
702 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
703 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
704 SkipOp as LogicalSkipOp, SortKey, SortOp,
705 };
706 use grafeo_common::types::Value;
707 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
708 use grafeo_core::graph::GraphStoreMut;
709 use grafeo_core::graph::lpg::LpgStore;
710
711 fn create_test_store() -> Arc<dyn GraphStoreMut> {
712 let store = Arc::new(LpgStore::new().unwrap());
713 store.create_node(&["Person"]);
714 store.create_node(&["Person"]);
715 store.create_node(&["Company"]);
716 store
717 }
718
719 #[test]
722 fn test_plan_simple_scan() {
723 let store = create_test_store();
724 let planner = Planner::new(store);
725
726 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
728 items: vec![ReturnItem {
729 expression: LogicalExpression::Variable("n".to_string()),
730 alias: None,
731 }],
732 distinct: false,
733 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
734 variable: "n".to_string(),
735 label: Some("Person".to_string()),
736 input: None,
737 })),
738 }));
739
740 let physical = planner.plan(&logical).unwrap();
741 assert_eq!(physical.columns(), &["n"]);
742 }
743
744 #[test]
745 fn test_plan_scan_without_label() {
746 let store = create_test_store();
747 let planner = Planner::new(store);
748
749 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
751 items: vec![ReturnItem {
752 expression: LogicalExpression::Variable("n".to_string()),
753 alias: None,
754 }],
755 distinct: false,
756 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
757 variable: "n".to_string(),
758 label: None,
759 input: None,
760 })),
761 }));
762
763 let physical = planner.plan(&logical).unwrap();
764 assert_eq!(physical.columns(), &["n"]);
765 }
766
767 #[test]
768 fn test_plan_return_with_alias() {
769 let store = create_test_store();
770 let planner = Planner::new(store);
771
772 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
774 items: vec![ReturnItem {
775 expression: LogicalExpression::Variable("n".to_string()),
776 alias: Some("person".to_string()),
777 }],
778 distinct: false,
779 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
780 variable: "n".to_string(),
781 label: Some("Person".to_string()),
782 input: None,
783 })),
784 }));
785
786 let physical = planner.plan(&logical).unwrap();
787 assert_eq!(physical.columns(), &["person"]);
788 }
789
790 #[test]
791 fn test_plan_return_property() {
792 let store = create_test_store();
793 let planner = Planner::new(store);
794
795 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
797 items: vec![ReturnItem {
798 expression: LogicalExpression::Property {
799 variable: "n".to_string(),
800 property: "name".to_string(),
801 },
802 alias: None,
803 }],
804 distinct: false,
805 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
806 variable: "n".to_string(),
807 label: Some("Person".to_string()),
808 input: None,
809 })),
810 }));
811
812 let physical = planner.plan(&logical).unwrap();
813 assert_eq!(physical.columns(), &["n.name"]);
814 }
815
816 #[test]
817 fn test_plan_return_literal() {
818 let store = create_test_store();
819 let planner = Planner::new(store);
820
821 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
823 items: vec![ReturnItem {
824 expression: LogicalExpression::Literal(Value::Int64(42)),
825 alias: Some("answer".to_string()),
826 }],
827 distinct: false,
828 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
829 variable: "n".to_string(),
830 label: None,
831 input: None,
832 })),
833 }));
834
835 let physical = planner.plan(&logical).unwrap();
836 assert_eq!(physical.columns(), &["answer"]);
837 }
838
839 #[test]
842 fn test_plan_filter_equality() {
843 let store = create_test_store();
844 let planner = Planner::new(store);
845
846 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
848 items: vec![ReturnItem {
849 expression: LogicalExpression::Variable("n".to_string()),
850 alias: None,
851 }],
852 distinct: false,
853 input: Box::new(LogicalOperator::Filter(FilterOp {
854 predicate: LogicalExpression::Binary {
855 left: Box::new(LogicalExpression::Property {
856 variable: "n".to_string(),
857 property: "age".to_string(),
858 }),
859 op: BinaryOp::Eq,
860 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
861 },
862 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
863 variable: "n".to_string(),
864 label: Some("Person".to_string()),
865 input: None,
866 })),
867 pushdown_hint: None,
868 })),
869 }));
870
871 let physical = planner.plan(&logical).unwrap();
872 assert_eq!(physical.columns(), &["n"]);
873 }
874
875 #[test]
876 fn test_plan_filter_compound_and() {
877 let store = create_test_store();
878 let planner = Planner::new(store);
879
880 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
882 items: vec![ReturnItem {
883 expression: LogicalExpression::Variable("n".to_string()),
884 alias: None,
885 }],
886 distinct: false,
887 input: Box::new(LogicalOperator::Filter(FilterOp {
888 predicate: LogicalExpression::Binary {
889 left: Box::new(LogicalExpression::Binary {
890 left: Box::new(LogicalExpression::Property {
891 variable: "n".to_string(),
892 property: "age".to_string(),
893 }),
894 op: BinaryOp::Gt,
895 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
896 }),
897 op: BinaryOp::And,
898 right: Box::new(LogicalExpression::Binary {
899 left: Box::new(LogicalExpression::Property {
900 variable: "n".to_string(),
901 property: "age".to_string(),
902 }),
903 op: BinaryOp::Lt,
904 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
905 }),
906 },
907 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
908 variable: "n".to_string(),
909 label: None,
910 input: None,
911 })),
912 pushdown_hint: None,
913 })),
914 }));
915
916 let physical = planner.plan(&logical).unwrap();
917 assert_eq!(physical.columns(), &["n"]);
918 }
919
920 #[test]
921 fn test_plan_filter_unary_not() {
922 let store = create_test_store();
923 let planner = Planner::new(store);
924
925 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
927 items: vec![ReturnItem {
928 expression: LogicalExpression::Variable("n".to_string()),
929 alias: None,
930 }],
931 distinct: false,
932 input: Box::new(LogicalOperator::Filter(FilterOp {
933 predicate: LogicalExpression::Unary {
934 op: UnaryOp::Not,
935 operand: Box::new(LogicalExpression::Property {
936 variable: "n".to_string(),
937 property: "active".to_string(),
938 }),
939 },
940 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
941 variable: "n".to_string(),
942 label: None,
943 input: None,
944 })),
945 pushdown_hint: None,
946 })),
947 }));
948
949 let physical = planner.plan(&logical).unwrap();
950 assert_eq!(physical.columns(), &["n"]);
951 }
952
953 #[test]
954 fn test_plan_filter_is_null() {
955 let store = create_test_store();
956 let planner = Planner::new(store);
957
958 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
960 items: vec![ReturnItem {
961 expression: LogicalExpression::Variable("n".to_string()),
962 alias: None,
963 }],
964 distinct: false,
965 input: Box::new(LogicalOperator::Filter(FilterOp {
966 predicate: LogicalExpression::Unary {
967 op: UnaryOp::IsNull,
968 operand: Box::new(LogicalExpression::Property {
969 variable: "n".to_string(),
970 property: "email".to_string(),
971 }),
972 },
973 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
974 variable: "n".to_string(),
975 label: None,
976 input: None,
977 })),
978 pushdown_hint: None,
979 })),
980 }));
981
982 let physical = planner.plan(&logical).unwrap();
983 assert_eq!(physical.columns(), &["n"]);
984 }
985
986 #[test]
987 fn test_plan_filter_function_call() {
988 let store = create_test_store();
989 let planner = Planner::new(store);
990
991 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
993 items: vec![ReturnItem {
994 expression: LogicalExpression::Variable("n".to_string()),
995 alias: None,
996 }],
997 distinct: false,
998 input: Box::new(LogicalOperator::Filter(FilterOp {
999 predicate: LogicalExpression::Binary {
1000 left: Box::new(LogicalExpression::FunctionCall {
1001 name: "size".to_string(),
1002 args: vec![LogicalExpression::Property {
1003 variable: "n".to_string(),
1004 property: "friends".to_string(),
1005 }],
1006 distinct: false,
1007 }),
1008 op: BinaryOp::Gt,
1009 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1010 },
1011 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1012 variable: "n".to_string(),
1013 label: None,
1014 input: None,
1015 })),
1016 pushdown_hint: None,
1017 })),
1018 }));
1019
1020 let physical = planner.plan(&logical).unwrap();
1021 assert_eq!(physical.columns(), &["n"]);
1022 }
1023
1024 #[test]
1027 fn test_plan_expand_outgoing() {
1028 let store = create_test_store();
1029 let planner = Planner::new(store);
1030
1031 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1033 items: vec![
1034 ReturnItem {
1035 expression: LogicalExpression::Variable("a".to_string()),
1036 alias: None,
1037 },
1038 ReturnItem {
1039 expression: LogicalExpression::Variable("b".to_string()),
1040 alias: None,
1041 },
1042 ],
1043 distinct: false,
1044 input: Box::new(LogicalOperator::Expand(ExpandOp {
1045 from_variable: "a".to_string(),
1046 to_variable: "b".to_string(),
1047 edge_variable: None,
1048 direction: ExpandDirection::Outgoing,
1049 edge_types: vec!["KNOWS".to_string()],
1050 min_hops: 1,
1051 max_hops: Some(1),
1052 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1053 variable: "a".to_string(),
1054 label: Some("Person".to_string()),
1055 input: None,
1056 })),
1057 path_alias: None,
1058 path_mode: PathMode::Walk,
1059 })),
1060 }));
1061
1062 let physical = planner.plan(&logical).unwrap();
1063 assert!(physical.columns().contains(&"a".to_string()));
1065 assert!(physical.columns().contains(&"b".to_string()));
1066 }
1067
1068 #[test]
1069 fn test_plan_expand_with_edge_variable() {
1070 let store = create_test_store();
1071 let planner = Planner::new(store);
1072
1073 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1075 items: vec![
1076 ReturnItem {
1077 expression: LogicalExpression::Variable("a".to_string()),
1078 alias: None,
1079 },
1080 ReturnItem {
1081 expression: LogicalExpression::Variable("r".to_string()),
1082 alias: None,
1083 },
1084 ReturnItem {
1085 expression: LogicalExpression::Variable("b".to_string()),
1086 alias: None,
1087 },
1088 ],
1089 distinct: false,
1090 input: Box::new(LogicalOperator::Expand(ExpandOp {
1091 from_variable: "a".to_string(),
1092 to_variable: "b".to_string(),
1093 edge_variable: Some("r".to_string()),
1094 direction: ExpandDirection::Outgoing,
1095 edge_types: vec!["KNOWS".to_string()],
1096 min_hops: 1,
1097 max_hops: Some(1),
1098 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1099 variable: "a".to_string(),
1100 label: None,
1101 input: None,
1102 })),
1103 path_alias: None,
1104 path_mode: PathMode::Walk,
1105 })),
1106 }));
1107
1108 let physical = planner.plan(&logical).unwrap();
1109 assert!(physical.columns().contains(&"a".to_string()));
1110 assert!(physical.columns().contains(&"r".to_string()));
1111 assert!(physical.columns().contains(&"b".to_string()));
1112 }
1113
1114 #[test]
1117 fn test_plan_limit() {
1118 let store = create_test_store();
1119 let planner = Planner::new(store);
1120
1121 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1123 items: vec![ReturnItem {
1124 expression: LogicalExpression::Variable("n".to_string()),
1125 alias: None,
1126 }],
1127 distinct: false,
1128 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1129 count: 10.into(),
1130 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1131 variable: "n".to_string(),
1132 label: None,
1133 input: None,
1134 })),
1135 })),
1136 }));
1137
1138 let physical = planner.plan(&logical).unwrap();
1139 assert_eq!(physical.columns(), &["n"]);
1140 }
1141
1142 #[test]
1143 fn test_plan_skip() {
1144 let store = create_test_store();
1145 let planner = Planner::new(store);
1146
1147 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1149 items: vec![ReturnItem {
1150 expression: LogicalExpression::Variable("n".to_string()),
1151 alias: None,
1152 }],
1153 distinct: false,
1154 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1155 count: 5.into(),
1156 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1157 variable: "n".to_string(),
1158 label: None,
1159 input: None,
1160 })),
1161 })),
1162 }));
1163
1164 let physical = planner.plan(&logical).unwrap();
1165 assert_eq!(physical.columns(), &["n"]);
1166 }
1167
1168 #[test]
1169 fn test_plan_sort() {
1170 let store = create_test_store();
1171 let planner = Planner::new(store);
1172
1173 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1175 items: vec![ReturnItem {
1176 expression: LogicalExpression::Variable("n".to_string()),
1177 alias: None,
1178 }],
1179 distinct: false,
1180 input: Box::new(LogicalOperator::Sort(SortOp {
1181 keys: vec![SortKey {
1182 expression: LogicalExpression::Variable("n".to_string()),
1183 order: SortOrder::Ascending,
1184 nulls: None,
1185 }],
1186 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1187 variable: "n".to_string(),
1188 label: None,
1189 input: None,
1190 })),
1191 })),
1192 }));
1193
1194 let physical = planner.plan(&logical).unwrap();
1195 assert_eq!(physical.columns(), &["n"]);
1196 }
1197
1198 #[test]
1199 fn test_plan_sort_descending() {
1200 let store = create_test_store();
1201 let planner = Planner::new(store);
1202
1203 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1205 items: vec![ReturnItem {
1206 expression: LogicalExpression::Variable("n".to_string()),
1207 alias: None,
1208 }],
1209 distinct: false,
1210 input: Box::new(LogicalOperator::Sort(SortOp {
1211 keys: vec![SortKey {
1212 expression: LogicalExpression::Variable("n".to_string()),
1213 order: SortOrder::Descending,
1214 nulls: None,
1215 }],
1216 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1217 variable: "n".to_string(),
1218 label: None,
1219 input: None,
1220 })),
1221 })),
1222 }));
1223
1224 let physical = planner.plan(&logical).unwrap();
1225 assert_eq!(physical.columns(), &["n"]);
1226 }
1227
1228 #[test]
1229 fn test_plan_distinct() {
1230 let store = create_test_store();
1231 let planner = Planner::new(store);
1232
1233 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1235 items: vec![ReturnItem {
1236 expression: LogicalExpression::Variable("n".to_string()),
1237 alias: None,
1238 }],
1239 distinct: false,
1240 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1241 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1242 variable: "n".to_string(),
1243 label: None,
1244 input: None,
1245 })),
1246 columns: None,
1247 })),
1248 }));
1249
1250 let physical = planner.plan(&logical).unwrap();
1251 assert_eq!(physical.columns(), &["n"]);
1252 }
1253
1254 #[test]
1255 fn test_plan_distinct_with_columns() {
1256 let store = create_test_store();
1257 let planner = Planner::new(store);
1258
1259 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1261 items: vec![ReturnItem {
1262 expression: LogicalExpression::Variable("n".to_string()),
1263 alias: None,
1264 }],
1265 distinct: false,
1266 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1267 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1268 variable: "n".to_string(),
1269 label: None,
1270 input: None,
1271 })),
1272 columns: Some(vec!["n".to_string()]),
1273 })),
1274 }));
1275
1276 let physical = planner.plan(&logical).unwrap();
1277 assert_eq!(physical.columns(), &["n"]);
1278 }
1279
1280 #[test]
1281 fn test_plan_distinct_with_nonexistent_columns() {
1282 let store = create_test_store();
1283 let planner = Planner::new(store);
1284
1285 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1288 items: vec![ReturnItem {
1289 expression: LogicalExpression::Variable("n".to_string()),
1290 alias: None,
1291 }],
1292 distinct: false,
1293 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1294 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1295 variable: "n".to_string(),
1296 label: None,
1297 input: None,
1298 })),
1299 columns: Some(vec!["nonexistent".to_string()]),
1300 })),
1301 }));
1302
1303 let physical = planner.plan(&logical).unwrap();
1304 assert_eq!(physical.columns(), &["n"]);
1305 }
1306
1307 #[test]
1310 fn test_plan_aggregate_count() {
1311 let store = create_test_store();
1312 let planner = Planner::new(store);
1313
1314 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1316 items: vec![ReturnItem {
1317 expression: LogicalExpression::Variable("cnt".to_string()),
1318 alias: None,
1319 }],
1320 distinct: false,
1321 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1322 group_by: vec![],
1323 aggregates: vec![LogicalAggregateExpr {
1324 function: LogicalAggregateFunction::Count,
1325 expression: Some(LogicalExpression::Variable("n".to_string())),
1326 expression2: None,
1327 distinct: false,
1328 alias: Some("cnt".to_string()),
1329 percentile: None,
1330 separator: None,
1331 }],
1332 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1333 variable: "n".to_string(),
1334 label: None,
1335 input: None,
1336 })),
1337 having: None,
1338 })),
1339 }));
1340
1341 let physical = planner.plan(&logical).unwrap();
1342 assert!(physical.columns().contains(&"cnt".to_string()));
1343 }
1344
1345 #[test]
1346 fn test_plan_aggregate_with_group_by() {
1347 let store = create_test_store();
1348 let planner = Planner::new(store);
1349
1350 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1352 group_by: vec![LogicalExpression::Property {
1353 variable: "n".to_string(),
1354 property: "city".to_string(),
1355 }],
1356 aggregates: vec![LogicalAggregateExpr {
1357 function: LogicalAggregateFunction::Count,
1358 expression: Some(LogicalExpression::Variable("n".to_string())),
1359 expression2: None,
1360 distinct: false,
1361 alias: Some("cnt".to_string()),
1362 percentile: None,
1363 separator: None,
1364 }],
1365 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1366 variable: "n".to_string(),
1367 label: Some("Person".to_string()),
1368 input: None,
1369 })),
1370 having: None,
1371 }));
1372
1373 let physical = planner.plan(&logical).unwrap();
1374 assert_eq!(physical.columns().len(), 2);
1375 }
1376
1377 #[test]
1378 fn test_plan_aggregate_sum() {
1379 let store = create_test_store();
1380 let planner = Planner::new(store);
1381
1382 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1384 group_by: vec![],
1385 aggregates: vec![LogicalAggregateExpr {
1386 function: LogicalAggregateFunction::Sum,
1387 expression: Some(LogicalExpression::Property {
1388 variable: "n".to_string(),
1389 property: "value".to_string(),
1390 }),
1391 expression2: None,
1392 distinct: false,
1393 alias: Some("total".to_string()),
1394 percentile: None,
1395 separator: None,
1396 }],
1397 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1398 variable: "n".to_string(),
1399 label: None,
1400 input: None,
1401 })),
1402 having: None,
1403 }));
1404
1405 let physical = planner.plan(&logical).unwrap();
1406 assert!(physical.columns().contains(&"total".to_string()));
1407 }
1408
1409 #[test]
1410 fn test_plan_aggregate_avg() {
1411 let store = create_test_store();
1412 let planner = Planner::new(store);
1413
1414 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1416 group_by: vec![],
1417 aggregates: vec![LogicalAggregateExpr {
1418 function: LogicalAggregateFunction::Avg,
1419 expression: Some(LogicalExpression::Property {
1420 variable: "n".to_string(),
1421 property: "score".to_string(),
1422 }),
1423 expression2: None,
1424 distinct: false,
1425 alias: Some("average".to_string()),
1426 percentile: None,
1427 separator: None,
1428 }],
1429 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1430 variable: "n".to_string(),
1431 label: None,
1432 input: None,
1433 })),
1434 having: None,
1435 }));
1436
1437 let physical = planner.plan(&logical).unwrap();
1438 assert!(physical.columns().contains(&"average".to_string()));
1439 }
1440
1441 #[test]
1442 fn test_plan_aggregate_min_max() {
1443 let store = create_test_store();
1444 let planner = Planner::new(store);
1445
1446 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1448 group_by: vec![],
1449 aggregates: vec![
1450 LogicalAggregateExpr {
1451 function: LogicalAggregateFunction::Min,
1452 expression: Some(LogicalExpression::Property {
1453 variable: "n".to_string(),
1454 property: "age".to_string(),
1455 }),
1456 expression2: None,
1457 distinct: false,
1458 alias: Some("youngest".to_string()),
1459 percentile: None,
1460 separator: None,
1461 },
1462 LogicalAggregateExpr {
1463 function: LogicalAggregateFunction::Max,
1464 expression: Some(LogicalExpression::Property {
1465 variable: "n".to_string(),
1466 property: "age".to_string(),
1467 }),
1468 expression2: None,
1469 distinct: false,
1470 alias: Some("oldest".to_string()),
1471 percentile: None,
1472 separator: None,
1473 },
1474 ],
1475 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1476 variable: "n".to_string(),
1477 label: None,
1478 input: None,
1479 })),
1480 having: None,
1481 }));
1482
1483 let physical = planner.plan(&logical).unwrap();
1484 assert!(physical.columns().contains(&"youngest".to_string()));
1485 assert!(physical.columns().contains(&"oldest".to_string()));
1486 }
1487
1488 #[test]
1491 fn test_plan_inner_join() {
1492 let store = create_test_store();
1493 let planner = Planner::new(store);
1494
1495 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1497 items: vec![
1498 ReturnItem {
1499 expression: LogicalExpression::Variable("a".to_string()),
1500 alias: None,
1501 },
1502 ReturnItem {
1503 expression: LogicalExpression::Variable("b".to_string()),
1504 alias: None,
1505 },
1506 ],
1507 distinct: false,
1508 input: Box::new(LogicalOperator::Join(JoinOp {
1509 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1510 variable: "a".to_string(),
1511 label: Some("Person".to_string()),
1512 input: None,
1513 })),
1514 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1515 variable: "b".to_string(),
1516 label: Some("Company".to_string()),
1517 input: None,
1518 })),
1519 join_type: JoinType::Inner,
1520 conditions: vec![JoinCondition {
1521 left: LogicalExpression::Variable("a".to_string()),
1522 right: LogicalExpression::Variable("b".to_string()),
1523 }],
1524 })),
1525 }));
1526
1527 let physical = planner.plan(&logical).unwrap();
1528 assert!(physical.columns().contains(&"a".to_string()));
1529 assert!(physical.columns().contains(&"b".to_string()));
1530 }
1531
1532 #[test]
1533 fn test_plan_cross_join() {
1534 let store = create_test_store();
1535 let planner = Planner::new(store);
1536
1537 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1539 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1540 variable: "a".to_string(),
1541 label: None,
1542 input: None,
1543 })),
1544 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1545 variable: "b".to_string(),
1546 label: None,
1547 input: None,
1548 })),
1549 join_type: JoinType::Cross,
1550 conditions: vec![],
1551 }));
1552
1553 let physical = planner.plan(&logical).unwrap();
1554 assert_eq!(physical.columns().len(), 2);
1555 }
1556
1557 #[test]
1558 fn test_plan_left_join() {
1559 let store = create_test_store();
1560 let planner = Planner::new(store);
1561
1562 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1563 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1564 variable: "a".to_string(),
1565 label: None,
1566 input: None,
1567 })),
1568 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1569 variable: "b".to_string(),
1570 label: None,
1571 input: None,
1572 })),
1573 join_type: JoinType::Left,
1574 conditions: vec![],
1575 }));
1576
1577 let physical = planner.plan(&logical).unwrap();
1578 assert_eq!(physical.columns().len(), 2);
1579 }
1580
1581 #[test]
1584 fn test_plan_create_node() {
1585 let store = create_test_store();
1586 let planner = Planner::new(store);
1587
1588 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1590 variable: "n".to_string(),
1591 labels: vec!["Person".to_string()],
1592 properties: vec![(
1593 "name".to_string(),
1594 LogicalExpression::Literal(Value::String("Alix".into())),
1595 )],
1596 input: None,
1597 }));
1598
1599 let physical = planner.plan(&logical).unwrap();
1600 assert!(physical.columns().contains(&"n".to_string()));
1601 }
1602
1603 #[test]
1604 fn test_plan_create_edge() {
1605 let store = create_test_store();
1606 let planner = Planner::new(store);
1607
1608 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1610 variable: Some("r".to_string()),
1611 from_variable: "a".to_string(),
1612 to_variable: "b".to_string(),
1613 edge_type: "KNOWS".to_string(),
1614 properties: vec![],
1615 input: Box::new(LogicalOperator::Join(JoinOp {
1616 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1617 variable: "a".to_string(),
1618 label: None,
1619 input: None,
1620 })),
1621 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1622 variable: "b".to_string(),
1623 label: None,
1624 input: None,
1625 })),
1626 join_type: JoinType::Cross,
1627 conditions: vec![],
1628 })),
1629 }));
1630
1631 let physical = planner.plan(&logical).unwrap();
1632 assert!(physical.columns().contains(&"r".to_string()));
1633 }
1634
1635 #[test]
1636 fn test_plan_delete_node() {
1637 let store = create_test_store();
1638 let planner = Planner::new(store);
1639
1640 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1642 variable: "n".to_string(),
1643 detach: false,
1644 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1645 variable: "n".to_string(),
1646 label: None,
1647 input: None,
1648 })),
1649 }));
1650
1651 let physical = planner.plan(&logical).unwrap();
1652 assert!(physical.columns().contains(&"n".to_string()));
1653 }
1654
1655 #[test]
1658 fn test_plan_empty_errors() {
1659 let store = create_test_store();
1660 let planner = Planner::new(store);
1661
1662 let logical = LogicalPlan::new(LogicalOperator::Empty);
1663 let result = planner.plan(&logical);
1664 assert!(result.is_err());
1665 }
1666
1667 #[test]
1668 fn test_plan_missing_variable_in_return() {
1669 let store = create_test_store();
1670 let planner = Planner::new(store);
1671
1672 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1674 items: vec![ReturnItem {
1675 expression: LogicalExpression::Variable("missing".to_string()),
1676 alias: None,
1677 }],
1678 distinct: false,
1679 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1680 variable: "n".to_string(),
1681 label: None,
1682 input: None,
1683 })),
1684 }));
1685
1686 let result = planner.plan(&logical);
1687 assert!(result.is_err());
1688 }
1689
1690 #[test]
1693 fn test_convert_binary_ops() {
1694 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1695 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1696 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1697 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1698 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1699 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1700 assert!(convert_binary_op(BinaryOp::And).is_ok());
1701 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1702 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1703 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1704 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1705 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1706 }
1707
1708 #[test]
1709 fn test_convert_unary_ops() {
1710 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1711 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1712 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1713 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1714 }
1715
1716 #[test]
1717 fn test_convert_aggregate_functions() {
1718 assert!(matches!(
1719 convert_aggregate_function(LogicalAggregateFunction::Count),
1720 PhysicalAggregateFunction::Count
1721 ));
1722 assert!(matches!(
1723 convert_aggregate_function(LogicalAggregateFunction::Sum),
1724 PhysicalAggregateFunction::Sum
1725 ));
1726 assert!(matches!(
1727 convert_aggregate_function(LogicalAggregateFunction::Avg),
1728 PhysicalAggregateFunction::Avg
1729 ));
1730 assert!(matches!(
1731 convert_aggregate_function(LogicalAggregateFunction::Min),
1732 PhysicalAggregateFunction::Min
1733 ));
1734 assert!(matches!(
1735 convert_aggregate_function(LogicalAggregateFunction::Max),
1736 PhysicalAggregateFunction::Max
1737 ));
1738 }
1739
1740 #[test]
1741 fn test_planner_accessors() {
1742 let store = create_test_store();
1743 let planner = Planner::new(Arc::clone(&store));
1744
1745 assert!(planner.transaction_id().is_none());
1746 assert!(planner.transaction_manager().is_none());
1747 let _ = planner.viewing_epoch(); }
1749
1750 #[test]
1751 fn test_physical_plan_accessors() {
1752 let store = create_test_store();
1753 let planner = Planner::new(store);
1754
1755 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1756 variable: "n".to_string(),
1757 label: None,
1758 input: None,
1759 }));
1760
1761 let physical = planner.plan(&logical).unwrap();
1762 assert_eq!(physical.columns(), &["n"]);
1763
1764 let _ = physical.into_operator();
1766 }
1767
1768 #[test]
1771 fn test_plan_adaptive_with_scan() {
1772 let store = create_test_store();
1773 let planner = Planner::new(store);
1774
1775 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1777 items: vec![ReturnItem {
1778 expression: LogicalExpression::Variable("n".to_string()),
1779 alias: None,
1780 }],
1781 distinct: false,
1782 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1783 variable: "n".to_string(),
1784 label: Some("Person".to_string()),
1785 input: None,
1786 })),
1787 }));
1788
1789 let physical = planner.plan_adaptive(&logical).unwrap();
1790 assert_eq!(physical.columns(), &["n"]);
1791 assert!(physical.adaptive_context.is_some());
1793 }
1794
1795 #[test]
1796 fn test_plan_adaptive_with_filter() {
1797 let store = create_test_store();
1798 let planner = Planner::new(store);
1799
1800 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1802 items: vec![ReturnItem {
1803 expression: LogicalExpression::Variable("n".to_string()),
1804 alias: None,
1805 }],
1806 distinct: false,
1807 input: Box::new(LogicalOperator::Filter(FilterOp {
1808 predicate: LogicalExpression::Binary {
1809 left: Box::new(LogicalExpression::Property {
1810 variable: "n".to_string(),
1811 property: "age".to_string(),
1812 }),
1813 op: BinaryOp::Gt,
1814 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1815 },
1816 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1817 variable: "n".to_string(),
1818 label: None,
1819 input: None,
1820 })),
1821 pushdown_hint: None,
1822 })),
1823 }));
1824
1825 let physical = planner.plan_adaptive(&logical).unwrap();
1826 assert!(physical.adaptive_context.is_some());
1827 }
1828
1829 #[test]
1830 fn test_plan_adaptive_with_expand() {
1831 let store = create_test_store();
1832 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1833
1834 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1836 items: vec![
1837 ReturnItem {
1838 expression: LogicalExpression::Variable("a".to_string()),
1839 alias: None,
1840 },
1841 ReturnItem {
1842 expression: LogicalExpression::Variable("b".to_string()),
1843 alias: None,
1844 },
1845 ],
1846 distinct: false,
1847 input: Box::new(LogicalOperator::Expand(ExpandOp {
1848 from_variable: "a".to_string(),
1849 to_variable: "b".to_string(),
1850 edge_variable: None,
1851 direction: ExpandDirection::Outgoing,
1852 edge_types: vec!["KNOWS".to_string()],
1853 min_hops: 1,
1854 max_hops: Some(1),
1855 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1856 variable: "a".to_string(),
1857 label: None,
1858 input: None,
1859 })),
1860 path_alias: None,
1861 path_mode: PathMode::Walk,
1862 })),
1863 }));
1864
1865 let physical = planner.plan_adaptive(&logical).unwrap();
1866 assert!(physical.adaptive_context.is_some());
1867 }
1868
1869 #[test]
1870 fn test_plan_adaptive_with_join() {
1871 let store = create_test_store();
1872 let planner = Planner::new(store);
1873
1874 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1875 items: vec![
1876 ReturnItem {
1877 expression: LogicalExpression::Variable("a".to_string()),
1878 alias: None,
1879 },
1880 ReturnItem {
1881 expression: LogicalExpression::Variable("b".to_string()),
1882 alias: None,
1883 },
1884 ],
1885 distinct: false,
1886 input: Box::new(LogicalOperator::Join(JoinOp {
1887 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1888 variable: "a".to_string(),
1889 label: None,
1890 input: None,
1891 })),
1892 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1893 variable: "b".to_string(),
1894 label: None,
1895 input: None,
1896 })),
1897 join_type: JoinType::Cross,
1898 conditions: vec![],
1899 })),
1900 }));
1901
1902 let physical = planner.plan_adaptive(&logical).unwrap();
1903 assert!(physical.adaptive_context.is_some());
1904 }
1905
1906 #[test]
1907 fn test_plan_adaptive_with_aggregate() {
1908 let store = create_test_store();
1909 let planner = Planner::new(store);
1910
1911 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1912 group_by: vec![],
1913 aggregates: vec![LogicalAggregateExpr {
1914 function: LogicalAggregateFunction::Count,
1915 expression: Some(LogicalExpression::Variable("n".to_string())),
1916 expression2: None,
1917 distinct: false,
1918 alias: Some("cnt".to_string()),
1919 percentile: None,
1920 separator: None,
1921 }],
1922 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1923 variable: "n".to_string(),
1924 label: None,
1925 input: None,
1926 })),
1927 having: None,
1928 }));
1929
1930 let physical = planner.plan_adaptive(&logical).unwrap();
1931 assert!(physical.adaptive_context.is_some());
1932 }
1933
1934 #[test]
1935 fn test_plan_adaptive_with_distinct() {
1936 let store = create_test_store();
1937 let planner = Planner::new(store);
1938
1939 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1940 items: vec![ReturnItem {
1941 expression: LogicalExpression::Variable("n".to_string()),
1942 alias: None,
1943 }],
1944 distinct: false,
1945 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1946 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1947 variable: "n".to_string(),
1948 label: None,
1949 input: None,
1950 })),
1951 columns: None,
1952 })),
1953 }));
1954
1955 let physical = planner.plan_adaptive(&logical).unwrap();
1956 assert!(physical.adaptive_context.is_some());
1957 }
1958
1959 #[test]
1960 fn test_plan_adaptive_with_limit() {
1961 let store = create_test_store();
1962 let planner = Planner::new(store);
1963
1964 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1965 items: vec![ReturnItem {
1966 expression: LogicalExpression::Variable("n".to_string()),
1967 alias: None,
1968 }],
1969 distinct: false,
1970 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1971 count: 10.into(),
1972 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1973 variable: "n".to_string(),
1974 label: None,
1975 input: None,
1976 })),
1977 })),
1978 }));
1979
1980 let physical = planner.plan_adaptive(&logical).unwrap();
1981 assert!(physical.adaptive_context.is_some());
1982 }
1983
1984 #[test]
1985 fn test_plan_adaptive_with_skip() {
1986 let store = create_test_store();
1987 let planner = Planner::new(store);
1988
1989 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1990 items: vec![ReturnItem {
1991 expression: LogicalExpression::Variable("n".to_string()),
1992 alias: None,
1993 }],
1994 distinct: false,
1995 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1996 count: 5.into(),
1997 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1998 variable: "n".to_string(),
1999 label: None,
2000 input: None,
2001 })),
2002 })),
2003 }));
2004
2005 let physical = planner.plan_adaptive(&logical).unwrap();
2006 assert!(physical.adaptive_context.is_some());
2007 }
2008
2009 #[test]
2010 fn test_plan_adaptive_with_sort() {
2011 let store = create_test_store();
2012 let planner = Planner::new(store);
2013
2014 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2015 items: vec![ReturnItem {
2016 expression: LogicalExpression::Variable("n".to_string()),
2017 alias: None,
2018 }],
2019 distinct: false,
2020 input: Box::new(LogicalOperator::Sort(SortOp {
2021 keys: vec![SortKey {
2022 expression: LogicalExpression::Variable("n".to_string()),
2023 order: SortOrder::Ascending,
2024 nulls: None,
2025 }],
2026 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2027 variable: "n".to_string(),
2028 label: None,
2029 input: None,
2030 })),
2031 })),
2032 }));
2033
2034 let physical = planner.plan_adaptive(&logical).unwrap();
2035 assert!(physical.adaptive_context.is_some());
2036 }
2037
2038 #[test]
2039 fn test_plan_adaptive_with_union() {
2040 let store = create_test_store();
2041 let planner = Planner::new(store);
2042
2043 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2044 items: vec![ReturnItem {
2045 expression: LogicalExpression::Variable("n".to_string()),
2046 alias: None,
2047 }],
2048 distinct: false,
2049 input: Box::new(LogicalOperator::Union(UnionOp {
2050 inputs: vec![
2051 LogicalOperator::NodeScan(NodeScanOp {
2052 variable: "n".to_string(),
2053 label: Some("Person".to_string()),
2054 input: None,
2055 }),
2056 LogicalOperator::NodeScan(NodeScanOp {
2057 variable: "n".to_string(),
2058 label: Some("Company".to_string()),
2059 input: None,
2060 }),
2061 ],
2062 })),
2063 }));
2064
2065 let physical = planner.plan_adaptive(&logical).unwrap();
2066 assert!(physical.adaptive_context.is_some());
2067 }
2068
2069 #[test]
2072 fn test_plan_expand_variable_length() {
2073 let store = create_test_store();
2074 let planner = Planner::new(store);
2075
2076 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2078 items: vec![
2079 ReturnItem {
2080 expression: LogicalExpression::Variable("a".to_string()),
2081 alias: None,
2082 },
2083 ReturnItem {
2084 expression: LogicalExpression::Variable("b".to_string()),
2085 alias: None,
2086 },
2087 ],
2088 distinct: false,
2089 input: Box::new(LogicalOperator::Expand(ExpandOp {
2090 from_variable: "a".to_string(),
2091 to_variable: "b".to_string(),
2092 edge_variable: None,
2093 direction: ExpandDirection::Outgoing,
2094 edge_types: vec!["KNOWS".to_string()],
2095 min_hops: 1,
2096 max_hops: Some(3),
2097 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2098 variable: "a".to_string(),
2099 label: None,
2100 input: None,
2101 })),
2102 path_alias: None,
2103 path_mode: PathMode::Walk,
2104 })),
2105 }));
2106
2107 let physical = planner.plan(&logical).unwrap();
2108 assert!(physical.columns().contains(&"a".to_string()));
2109 assert!(physical.columns().contains(&"b".to_string()));
2110 }
2111
2112 #[test]
2113 fn test_plan_expand_with_path_alias() {
2114 let store = create_test_store();
2115 let planner = Planner::new(store);
2116
2117 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2119 items: vec![
2120 ReturnItem {
2121 expression: LogicalExpression::Variable("a".to_string()),
2122 alias: None,
2123 },
2124 ReturnItem {
2125 expression: LogicalExpression::Variable("b".to_string()),
2126 alias: None,
2127 },
2128 ],
2129 distinct: false,
2130 input: Box::new(LogicalOperator::Expand(ExpandOp {
2131 from_variable: "a".to_string(),
2132 to_variable: "b".to_string(),
2133 edge_variable: None,
2134 direction: ExpandDirection::Outgoing,
2135 edge_types: vec!["KNOWS".to_string()],
2136 min_hops: 1,
2137 max_hops: Some(3),
2138 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2139 variable: "a".to_string(),
2140 label: None,
2141 input: None,
2142 })),
2143 path_alias: Some("p".to_string()),
2144 path_mode: PathMode::Walk,
2145 })),
2146 }));
2147
2148 let physical = planner.plan(&logical).unwrap();
2149 assert!(physical.columns().contains(&"a".to_string()));
2151 assert!(physical.columns().contains(&"b".to_string()));
2152 }
2153
2154 #[test]
2155 fn test_plan_expand_incoming() {
2156 let store = create_test_store();
2157 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2158
2159 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2161 items: vec![
2162 ReturnItem {
2163 expression: LogicalExpression::Variable("a".to_string()),
2164 alias: None,
2165 },
2166 ReturnItem {
2167 expression: LogicalExpression::Variable("b".to_string()),
2168 alias: None,
2169 },
2170 ],
2171 distinct: false,
2172 input: Box::new(LogicalOperator::Expand(ExpandOp {
2173 from_variable: "a".to_string(),
2174 to_variable: "b".to_string(),
2175 edge_variable: None,
2176 direction: ExpandDirection::Incoming,
2177 edge_types: vec!["KNOWS".to_string()],
2178 min_hops: 1,
2179 max_hops: Some(1),
2180 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2181 variable: "a".to_string(),
2182 label: None,
2183 input: None,
2184 })),
2185 path_alias: None,
2186 path_mode: PathMode::Walk,
2187 })),
2188 }));
2189
2190 let physical = planner.plan(&logical).unwrap();
2191 assert!(physical.columns().contains(&"a".to_string()));
2192 assert!(physical.columns().contains(&"b".to_string()));
2193 }
2194
2195 #[test]
2196 fn test_plan_expand_both_directions() {
2197 let store = create_test_store();
2198 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2199
2200 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2202 items: vec![
2203 ReturnItem {
2204 expression: LogicalExpression::Variable("a".to_string()),
2205 alias: None,
2206 },
2207 ReturnItem {
2208 expression: LogicalExpression::Variable("b".to_string()),
2209 alias: None,
2210 },
2211 ],
2212 distinct: false,
2213 input: Box::new(LogicalOperator::Expand(ExpandOp {
2214 from_variable: "a".to_string(),
2215 to_variable: "b".to_string(),
2216 edge_variable: None,
2217 direction: ExpandDirection::Both,
2218 edge_types: vec!["KNOWS".to_string()],
2219 min_hops: 1,
2220 max_hops: Some(1),
2221 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2222 variable: "a".to_string(),
2223 label: None,
2224 input: None,
2225 })),
2226 path_alias: None,
2227 path_mode: PathMode::Walk,
2228 })),
2229 }));
2230
2231 let physical = planner.plan(&logical).unwrap();
2232 assert!(physical.columns().contains(&"a".to_string()));
2233 assert!(physical.columns().contains(&"b".to_string()));
2234 }
2235
2236 #[test]
2239 fn test_planner_with_context() {
2240 use crate::transaction::TransactionManager;
2241
2242 let store = create_test_store();
2243 let transaction_manager = Arc::new(TransactionManager::new());
2244 let transaction_id = transaction_manager.begin();
2245 let epoch = transaction_manager.current_epoch();
2246
2247 let planner = Planner::with_context(
2248 Arc::clone(&store),
2249 Arc::clone(&transaction_manager),
2250 Some(transaction_id),
2251 epoch,
2252 );
2253
2254 assert_eq!(planner.transaction_id(), Some(transaction_id));
2255 assert!(planner.transaction_manager().is_some());
2256 assert_eq!(planner.viewing_epoch(), epoch);
2257 }
2258
2259 #[test]
2260 fn test_planner_with_factorized_execution_disabled() {
2261 let store = create_test_store();
2262 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2263
2264 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2266 items: vec![
2267 ReturnItem {
2268 expression: LogicalExpression::Variable("a".to_string()),
2269 alias: None,
2270 },
2271 ReturnItem {
2272 expression: LogicalExpression::Variable("c".to_string()),
2273 alias: None,
2274 },
2275 ],
2276 distinct: false,
2277 input: Box::new(LogicalOperator::Expand(ExpandOp {
2278 from_variable: "b".to_string(),
2279 to_variable: "c".to_string(),
2280 edge_variable: None,
2281 direction: ExpandDirection::Outgoing,
2282 edge_types: vec![],
2283 min_hops: 1,
2284 max_hops: Some(1),
2285 input: Box::new(LogicalOperator::Expand(ExpandOp {
2286 from_variable: "a".to_string(),
2287 to_variable: "b".to_string(),
2288 edge_variable: None,
2289 direction: ExpandDirection::Outgoing,
2290 edge_types: vec![],
2291 min_hops: 1,
2292 max_hops: Some(1),
2293 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2294 variable: "a".to_string(),
2295 label: None,
2296 input: None,
2297 })),
2298 path_alias: None,
2299 path_mode: PathMode::Walk,
2300 })),
2301 path_alias: None,
2302 path_mode: PathMode::Walk,
2303 })),
2304 }));
2305
2306 let physical = planner.plan(&logical).unwrap();
2307 assert!(physical.columns().contains(&"a".to_string()));
2308 assert!(physical.columns().contains(&"c".to_string()));
2309 }
2310
2311 #[test]
2314 fn test_plan_sort_by_property() {
2315 let store = create_test_store();
2316 let planner = Planner::new(store);
2317
2318 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2320 items: vec![ReturnItem {
2321 expression: LogicalExpression::Variable("n".to_string()),
2322 alias: None,
2323 }],
2324 distinct: false,
2325 input: Box::new(LogicalOperator::Sort(SortOp {
2326 keys: vec![SortKey {
2327 expression: LogicalExpression::Property {
2328 variable: "n".to_string(),
2329 property: "name".to_string(),
2330 },
2331 order: SortOrder::Ascending,
2332 nulls: None,
2333 }],
2334 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2335 variable: "n".to_string(),
2336 label: None,
2337 input: None,
2338 })),
2339 })),
2340 }));
2341
2342 let physical = planner.plan(&logical).unwrap();
2343 assert!(physical.columns().contains(&"n".to_string()));
2345 }
2346
2347 #[test]
2350 fn test_plan_scan_with_input() {
2351 let store = create_test_store();
2352 let planner = Planner::new(store);
2353
2354 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2356 items: vec![
2357 ReturnItem {
2358 expression: LogicalExpression::Variable("a".to_string()),
2359 alias: None,
2360 },
2361 ReturnItem {
2362 expression: LogicalExpression::Variable("b".to_string()),
2363 alias: None,
2364 },
2365 ],
2366 distinct: false,
2367 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2368 variable: "b".to_string(),
2369 label: Some("Company".to_string()),
2370 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2371 variable: "a".to_string(),
2372 label: Some("Person".to_string()),
2373 input: None,
2374 }))),
2375 })),
2376 }));
2377
2378 let physical = planner.plan(&logical).unwrap();
2379 assert!(physical.columns().contains(&"a".to_string()));
2380 assert!(physical.columns().contains(&"b".to_string()));
2381 }
2382}