1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29 UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37struct RangeBounds<'a> {
39 min: Option<&'a Value>,
40 max: Option<&'a Value>,
41 min_inclusive: bool,
42 max_inclusive: bool,
43}
44
45pub struct Planner {
47 store: Arc<LpgStore>,
49 tx_manager: Option<Arc<TransactionManager>>,
51 tx_id: Option<TxId>,
53 viewing_epoch: EpochId,
55 anon_edge_counter: std::cell::Cell<u32>,
57 factorized_execution: bool,
59}
60
61impl Planner {
62 #[must_use]
67 pub fn new(store: Arc<LpgStore>) -> Self {
68 let epoch = store.current_epoch();
69 Self {
70 store,
71 tx_manager: None,
72 tx_id: None,
73 viewing_epoch: epoch,
74 anon_edge_counter: std::cell::Cell::new(0),
75 factorized_execution: true,
76 }
77 }
78
79 #[must_use]
88 pub fn with_context(
89 store: Arc<LpgStore>,
90 tx_manager: Arc<TransactionManager>,
91 tx_id: Option<TxId>,
92 viewing_epoch: EpochId,
93 ) -> Self {
94 Self {
95 store,
96 tx_manager: Some(tx_manager),
97 tx_id,
98 viewing_epoch,
99 anon_edge_counter: std::cell::Cell::new(0),
100 factorized_execution: true,
101 }
102 }
103
104 #[must_use]
106 pub fn viewing_epoch(&self) -> EpochId {
107 self.viewing_epoch
108 }
109
110 #[must_use]
112 pub fn tx_id(&self) -> Option<TxId> {
113 self.tx_id
114 }
115
116 #[must_use]
118 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119 self.tx_manager.as_ref()
120 }
121
122 #[must_use]
124 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125 self.factorized_execution = enabled;
126 self
127 }
128
129 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133 match op {
134 LogicalOperator::Expand(expand) => {
135 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138 if is_single_hop {
139 let (inner_count, base) = Self::count_expand_chain(&expand.input);
140 (inner_count + 1, base)
141 } else {
142 (0, op)
144 }
145 }
146 _ => (0, op),
147 }
148 }
149
150 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154 let mut chain = Vec::new();
155 let mut current = op;
156
157 while let LogicalOperator::Expand(expand) = current {
158 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160 if !is_single_hop {
161 break;
162 }
163 chain.push(expand);
164 current = &expand.input;
165 }
166
167 chain.reverse();
169 chain
170 }
171
172 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179 Ok(PhysicalPlan {
180 operator,
181 columns,
182 adaptive_context: None,
183 })
184 }
185
186 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197 let mut adaptive_context = AdaptiveContext::new();
199 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201 Ok(PhysicalPlan {
202 operator,
203 columns,
204 adaptive_context: Some(adaptive_context),
205 })
206 }
207
208 fn collect_cardinality_estimates(
210 &self,
211 op: &LogicalOperator,
212 ctx: &mut AdaptiveContext,
213 depth: usize,
214 ) {
215 match op {
216 LogicalOperator::NodeScan(scan) => {
217 let estimate = if let Some(label) = &scan.label {
219 self.store.nodes_by_label(label).len() as f64
220 } else {
221 self.store.node_count() as f64
222 };
223 let id = format!("scan_{}", scan.variable);
224 ctx.set_estimate(&id, estimate);
225
226 if let Some(input) = &scan.input {
228 self.collect_cardinality_estimates(input, ctx, depth + 1);
229 }
230 }
231 LogicalOperator::Filter(filter) => {
232 let input_estimate = self.estimate_cardinality(&filter.input);
234 let estimate = input_estimate * 0.3;
235 let id = format!("filter_{depth}");
236 ctx.set_estimate(&id, estimate);
237
238 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239 }
240 LogicalOperator::Expand(expand) => {
241 let input_estimate = self.estimate_cardinality(&expand.input);
243 let avg_degree = 10.0; let estimate = input_estimate * avg_degree;
245 let id = format!("expand_{}", expand.to_variable);
246 ctx.set_estimate(&id, estimate);
247
248 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
249 }
250 LogicalOperator::Join(join) => {
251 let left_est = self.estimate_cardinality(&join.left);
253 let right_est = self.estimate_cardinality(&join.right);
254 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
256 ctx.set_estimate(&id, estimate);
257
258 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
259 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
260 }
261 LogicalOperator::Aggregate(agg) => {
262 let input_estimate = self.estimate_cardinality(&agg.input);
264 let estimate = if agg.group_by.is_empty() {
265 1.0 } else {
267 (input_estimate * 0.1).max(1.0) };
269 let id = format!("aggregate_{depth}");
270 ctx.set_estimate(&id, estimate);
271
272 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
273 }
274 LogicalOperator::Distinct(distinct) => {
275 let input_estimate = self.estimate_cardinality(&distinct.input);
276 let estimate = (input_estimate * 0.5).max(1.0);
277 let id = format!("distinct_{depth}");
278 ctx.set_estimate(&id, estimate);
279
280 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
281 }
282 LogicalOperator::Return(ret) => {
283 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
284 }
285 LogicalOperator::Limit(limit) => {
286 let input_estimate = self.estimate_cardinality(&limit.input);
287 let estimate = (input_estimate).min(limit.count as f64);
288 let id = format!("limit_{depth}");
289 ctx.set_estimate(&id, estimate);
290
291 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
292 }
293 LogicalOperator::Skip(skip) => {
294 let input_estimate = self.estimate_cardinality(&skip.input);
295 let estimate = (input_estimate - skip.count as f64).max(0.0);
296 let id = format!("skip_{depth}");
297 ctx.set_estimate(&id, estimate);
298
299 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
300 }
301 LogicalOperator::Sort(sort) => {
302 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
304 }
305 LogicalOperator::Union(union) => {
306 let estimate: f64 = union
307 .inputs
308 .iter()
309 .map(|input| self.estimate_cardinality(input))
310 .sum();
311 let id = format!("union_{depth}");
312 ctx.set_estimate(&id, estimate);
313
314 for input in &union.inputs {
315 self.collect_cardinality_estimates(input, ctx, depth + 1);
316 }
317 }
318 _ => {
319 }
321 }
322 }
323
324 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
326 match op {
327 LogicalOperator::NodeScan(scan) => {
328 if let Some(label) = &scan.label {
329 self.store.nodes_by_label(label).len() as f64
330 } else {
331 self.store.node_count() as f64
332 }
333 }
334 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
335 LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
336 LogicalOperator::Join(join) => {
337 let left = self.estimate_cardinality(&join.left);
338 let right = self.estimate_cardinality(&join.right);
339 (left * right).sqrt()
340 }
341 LogicalOperator::Aggregate(agg) => {
342 if agg.group_by.is_empty() {
343 1.0
344 } else {
345 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
346 }
347 }
348 LogicalOperator::Distinct(distinct) => {
349 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
350 }
351 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
352 LogicalOperator::Limit(limit) => self
353 .estimate_cardinality(&limit.input)
354 .min(limit.count as f64),
355 LogicalOperator::Skip(skip) => {
356 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
357 }
358 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
359 LogicalOperator::Union(union) => union
360 .inputs
361 .iter()
362 .map(|input| self.estimate_cardinality(input))
363 .sum(),
364 _ => 1000.0, }
366 }
367
368 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
370 match op {
371 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
372 LogicalOperator::Expand(expand) => {
373 if self.factorized_execution {
375 let (chain_len, _base) = Self::count_expand_chain(op);
376 if chain_len >= 2 {
377 return self.plan_expand_chain(op);
379 }
380 }
381 self.plan_expand(expand)
382 }
383 LogicalOperator::Return(ret) => self.plan_return(ret),
384 LogicalOperator::Filter(filter) => self.plan_filter(filter),
385 LogicalOperator::Project(project) => self.plan_project(project),
386 LogicalOperator::Limit(limit) => self.plan_limit(limit),
387 LogicalOperator::Skip(skip) => self.plan_skip(skip),
388 LogicalOperator::Sort(sort) => self.plan_sort(sort),
389 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
390 LogicalOperator::Join(join) => self.plan_join(join),
391 LogicalOperator::Union(union) => self.plan_union(union),
392 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
393 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
394 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
395 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
396 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
397 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
398 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
399 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
400 LogicalOperator::Merge(merge) => self.plan_merge(merge),
401 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
402 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
403 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
404 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
405 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
406 _ => Err(Error::Internal(format!(
407 "Unsupported operator: {:?}",
408 std::mem::discriminant(op)
409 ))),
410 }
411 }
412
413 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
415 let scan_op = if let Some(label) = &scan.label {
416 ScanOperator::with_label(Arc::clone(&self.store), label)
417 } else {
418 ScanOperator::new(Arc::clone(&self.store))
419 };
420
421 let scan_operator: Box<dyn Operator> =
423 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
424
425 if let Some(input) = &scan.input {
427 let (input_op, mut input_columns) = self.plan_operator(input)?;
428
429 let mut output_schema: Vec<LogicalType> =
431 input_columns.iter().map(|_| LogicalType::Any).collect();
432 output_schema.push(LogicalType::Node);
433
434 input_columns.push(scan.variable.clone());
436
437 let join_op = Box::new(NestedLoopJoinOperator::new(
439 input_op,
440 scan_operator,
441 None, PhysicalJoinType::Cross,
443 output_schema,
444 ));
445
446 Ok((join_op, input_columns))
447 } else {
448 let columns = vec![scan.variable.clone()];
449 Ok((scan_operator, columns))
450 }
451 }
452
453 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
455 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
457
458 let source_column = input_columns
460 .iter()
461 .position(|c| c == &expand.from_variable)
462 .ok_or_else(|| {
463 Error::Internal(format!(
464 "Source variable '{}' not found in input columns",
465 expand.from_variable
466 ))
467 })?;
468
469 let direction = match expand.direction {
471 ExpandDirection::Outgoing => Direction::Outgoing,
472 ExpandDirection::Incoming => Direction::Incoming,
473 ExpandDirection::Both => Direction::Both,
474 };
475
476 let is_variable_length =
478 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
479
480 let operator: Box<dyn Operator> = if is_variable_length {
481 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
484 Arc::clone(&self.store),
485 input_op,
486 source_column,
487 direction,
488 expand.edge_type.clone(),
489 expand.min_hops,
490 max_hops,
491 )
492 .with_tx_context(self.viewing_epoch, self.tx_id);
493
494 if expand.path_alias.is_some() {
496 expand_op = expand_op.with_path_length_output();
497 }
498
499 Box::new(expand_op)
500 } else {
501 let expand_op = ExpandOperator::new(
503 Arc::clone(&self.store),
504 input_op,
505 source_column,
506 direction,
507 expand.edge_type.clone(),
508 )
509 .with_tx_context(self.viewing_epoch, self.tx_id);
510 Box::new(expand_op)
511 };
512
513 let mut columns = input_columns;
516
517 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
519 let count = self.anon_edge_counter.get();
520 self.anon_edge_counter.set(count + 1);
521 format!("_anon_edge_{}", count)
522 });
523 columns.push(edge_col_name);
524
525 columns.push(expand.to_variable.clone());
526
527 if let Some(ref path_alias) = expand.path_alias {
529 columns.push(format!("_path_length_{}", path_alias));
530 }
531
532 Ok((operator, columns))
533 }
534
535 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
543 let expands = Self::collect_expand_chain(op);
544 if expands.is_empty() {
545 return Err(Error::Internal("Empty expand chain".to_string()));
546 }
547
548 let first_expand = expands[0];
550 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
551
552 let mut columns = base_columns.clone();
553 let mut steps = Vec::new();
554
555 let mut is_first = true;
560
561 for expand in &expands {
562 let source_column = if is_first {
564 base_columns
566 .iter()
567 .position(|c| c == &expand.from_variable)
568 .ok_or_else(|| {
569 Error::Internal(format!(
570 "Source variable '{}' not found in base columns",
571 expand.from_variable
572 ))
573 })?
574 } else {
575 1
578 };
579
580 let direction = match expand.direction {
582 ExpandDirection::Outgoing => Direction::Outgoing,
583 ExpandDirection::Incoming => Direction::Incoming,
584 ExpandDirection::Both => Direction::Both,
585 };
586
587 steps.push(ExpandStep {
589 source_column,
590 direction,
591 edge_type: expand.edge_type.clone(),
592 });
593
594 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
596 let count = self.anon_edge_counter.get();
597 self.anon_edge_counter.set(count + 1);
598 format!("_anon_edge_{}", count)
599 });
600 columns.push(edge_col_name);
601 columns.push(expand.to_variable.clone());
602
603 is_first = false;
604 }
605
606 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
608
609 if let Some(tx_id) = self.tx_id {
610 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
611 } else {
612 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
613 }
614
615 Ok((Box::new(lazy_op), columns))
616 }
617
618 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
620 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
622
623 let variable_columns: HashMap<String, usize> = input_columns
625 .iter()
626 .enumerate()
627 .map(|(i, name)| (name.clone(), i))
628 .collect();
629
630 let columns: Vec<String> = ret
632 .items
633 .iter()
634 .map(|item| {
635 item.alias.clone().unwrap_or_else(|| {
636 expression_to_string(&item.expression)
638 })
639 })
640 .collect();
641
642 let needs_project = ret
644 .items
645 .iter()
646 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
647
648 if needs_project {
649 let mut projections = Vec::with_capacity(ret.items.len());
651 let mut output_types = Vec::with_capacity(ret.items.len());
652
653 for item in &ret.items {
654 match &item.expression {
655 LogicalExpression::Variable(name) => {
656 let col_idx = *variable_columns.get(name).ok_or_else(|| {
657 Error::Internal(format!("Variable '{}' not found in input", name))
658 })?;
659 projections.push(ProjectExpr::Column(col_idx));
660 output_types.push(LogicalType::Node);
662 }
663 LogicalExpression::Property { variable, property } => {
664 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
665 Error::Internal(format!("Variable '{}' not found in input", variable))
666 })?;
667 projections.push(ProjectExpr::PropertyAccess {
668 column: col_idx,
669 property: property.clone(),
670 });
671 output_types.push(LogicalType::Any);
673 }
674 LogicalExpression::Literal(value) => {
675 projections.push(ProjectExpr::Constant(value.clone()));
676 output_types.push(value_to_logical_type(value));
677 }
678 LogicalExpression::FunctionCall { name, args, .. } => {
679 match name.to_lowercase().as_str() {
681 "type" => {
682 if args.len() != 1 {
684 return Err(Error::Internal(
685 "type() requires exactly one argument".to_string(),
686 ));
687 }
688 if let LogicalExpression::Variable(var_name) = &args[0] {
689 let col_idx =
690 *variable_columns.get(var_name).ok_or_else(|| {
691 Error::Internal(format!(
692 "Variable '{}' not found in input",
693 var_name
694 ))
695 })?;
696 projections.push(ProjectExpr::EdgeType { column: col_idx });
697 output_types.push(LogicalType::String);
698 } else {
699 return Err(Error::Internal(
700 "type() argument must be a variable".to_string(),
701 ));
702 }
703 }
704 "length" => {
705 if args.len() != 1 {
708 return Err(Error::Internal(
709 "length() requires exactly one argument".to_string(),
710 ));
711 }
712 if let LogicalExpression::Variable(var_name) = &args[0] {
713 let col_idx =
714 *variable_columns.get(var_name).ok_or_else(|| {
715 Error::Internal(format!(
716 "Variable '{}' not found in input",
717 var_name
718 ))
719 })?;
720 projections.push(ProjectExpr::Column(col_idx));
722 output_types.push(LogicalType::Int64);
723 } else {
724 return Err(Error::Internal(
725 "length() argument must be a variable".to_string(),
726 ));
727 }
728 }
729 _ => {
731 let filter_expr = self.convert_expression(&item.expression)?;
732 projections.push(ProjectExpr::Expression {
733 expr: filter_expr,
734 variable_columns: variable_columns.clone(),
735 });
736 output_types.push(LogicalType::Any);
737 }
738 }
739 }
740 LogicalExpression::Case { .. } => {
741 let filter_expr = self.convert_expression(&item.expression)?;
743 projections.push(ProjectExpr::Expression {
744 expr: filter_expr,
745 variable_columns: variable_columns.clone(),
746 });
747 output_types.push(LogicalType::Any);
749 }
750 _ => {
751 return Err(Error::Internal(format!(
752 "Unsupported RETURN expression: {:?}",
753 item.expression
754 )));
755 }
756 }
757 }
758
759 let operator = Box::new(ProjectOperator::with_store(
760 input_op,
761 projections,
762 output_types,
763 Arc::clone(&self.store),
764 ));
765
766 Ok((operator, columns))
767 } else {
768 let mut projections = Vec::with_capacity(ret.items.len());
771 let mut output_types = Vec::with_capacity(ret.items.len());
772
773 for item in &ret.items {
774 if let LogicalExpression::Variable(name) = &item.expression {
775 let col_idx = *variable_columns.get(name).ok_or_else(|| {
776 Error::Internal(format!("Variable '{}' not found in input", name))
777 })?;
778 projections.push(ProjectExpr::Column(col_idx));
779 output_types.push(LogicalType::Node);
780 }
781 }
782
783 if projections.len() == input_columns.len()
785 && projections
786 .iter()
787 .enumerate()
788 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
789 {
790 Ok((input_op, columns))
792 } else {
793 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
794 Ok((operator, columns))
795 }
796 }
797 }
798
799 fn plan_project(
801 &self,
802 project: &crate::query::plan::ProjectOp,
803 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
804 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
806 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
807 let single_row_op: Box<dyn Operator> = Box::new(
809 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
810 );
811 (single_row_op, Vec::new())
812 } else {
813 self.plan_operator(&project.input)?
814 };
815
816 let variable_columns: HashMap<String, usize> = input_columns
818 .iter()
819 .enumerate()
820 .map(|(i, name)| (name.clone(), i))
821 .collect();
822
823 let mut projections = Vec::with_capacity(project.projections.len());
825 let mut output_types = Vec::with_capacity(project.projections.len());
826 let mut output_columns = Vec::with_capacity(project.projections.len());
827
828 for projection in &project.projections {
829 let col_name = projection
831 .alias
832 .clone()
833 .unwrap_or_else(|| expression_to_string(&projection.expression));
834 output_columns.push(col_name);
835
836 match &projection.expression {
837 LogicalExpression::Variable(name) => {
838 let col_idx = *variable_columns.get(name).ok_or_else(|| {
839 Error::Internal(format!("Variable '{}' not found in input", name))
840 })?;
841 projections.push(ProjectExpr::Column(col_idx));
842 output_types.push(LogicalType::Node);
843 }
844 LogicalExpression::Property { variable, property } => {
845 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
846 Error::Internal(format!("Variable '{}' not found in input", variable))
847 })?;
848 projections.push(ProjectExpr::PropertyAccess {
849 column: col_idx,
850 property: property.clone(),
851 });
852 output_types.push(LogicalType::Any);
853 }
854 LogicalExpression::Literal(value) => {
855 projections.push(ProjectExpr::Constant(value.clone()));
856 output_types.push(value_to_logical_type(value));
857 }
858 _ => {
859 let filter_expr = self.convert_expression(&projection.expression)?;
861 projections.push(ProjectExpr::Expression {
862 expr: filter_expr,
863 variable_columns: variable_columns.clone(),
864 });
865 output_types.push(LogicalType::Any);
866 }
867 }
868 }
869
870 let operator = Box::new(ProjectOperator::with_store(
871 input_op,
872 projections,
873 output_types,
874 Arc::clone(&self.store),
875 ));
876
877 Ok((operator, output_columns))
878 }
879
880 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
886 if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
889 let (_, columns) = self.plan_operator(&filter.input)?;
891 let schema = self.derive_schema_from_columns(&columns);
892 let empty_op = Box::new(EmptyOperator::new(schema));
893 return Ok((empty_op, columns));
894 }
895
896 if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
898 return Ok(result);
899 }
900
901 if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
903 return Ok(result);
904 }
905
906 let (input_op, columns) = self.plan_operator(&filter.input)?;
908
909 let variable_columns: HashMap<String, usize> = columns
911 .iter()
912 .enumerate()
913 .map(|(i, name)| (name.clone(), i))
914 .collect();
915
916 let filter_expr = self.convert_expression(&filter.predicate)?;
918
919 let predicate =
921 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
922
923 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
925
926 Ok((operator, columns))
927 }
928
929 fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
936 use grafeo_core::graph::lpg::CompareOp;
937
938 match predicate {
939 LogicalExpression::Binary { left, op, right } => {
940 match op {
942 BinaryOp::And => {
943 let left_result = self.check_zone_map_for_predicate(left);
944 let right_result = self.check_zone_map_for_predicate(right);
945
946 return match (left_result, right_result) {
947 (Some(false), _) | (_, Some(false)) => Some(false),
949 (Some(true), Some(true)) => Some(true),
951 _ => None,
953 };
954 }
955 BinaryOp::Or => {
956 let left_result = self.check_zone_map_for_predicate(left);
957 let right_result = self.check_zone_map_for_predicate(right);
958
959 return match (left_result, right_result) {
960 (Some(false), Some(false)) => Some(false),
962 (Some(true), _) | (_, Some(true)) => Some(true),
964 _ => None,
966 };
967 }
968 _ => {}
969 }
970
971 let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
973 (
974 LogicalExpression::Property { property, .. },
975 LogicalExpression::Literal(val),
976 ) => {
977 let cmp = match op {
978 BinaryOp::Eq => CompareOp::Eq,
979 BinaryOp::Ne => CompareOp::Ne,
980 BinaryOp::Lt => CompareOp::Lt,
981 BinaryOp::Le => CompareOp::Le,
982 BinaryOp::Gt => CompareOp::Gt,
983 BinaryOp::Ge => CompareOp::Ge,
984 _ => return None,
985 };
986 (property.clone(), cmp, val.clone())
987 }
988 (
989 LogicalExpression::Literal(val),
990 LogicalExpression::Property { property, .. },
991 ) => {
992 let cmp = match op {
994 BinaryOp::Eq => CompareOp::Eq,
995 BinaryOp::Ne => CompareOp::Ne,
996 BinaryOp::Lt => CompareOp::Gt, BinaryOp::Le => CompareOp::Ge,
998 BinaryOp::Gt => CompareOp::Lt,
999 BinaryOp::Ge => CompareOp::Le,
1000 _ => return None,
1001 };
1002 (property.clone(), cmp, val.clone())
1003 }
1004 _ => return None,
1005 };
1006
1007 let might_match =
1009 self.store
1010 .node_property_might_match(&property.into(), compare_op, &value);
1011
1012 Some(might_match)
1013 }
1014
1015 _ => None,
1016 }
1017 }
1018
1019 fn try_plan_filter_with_property_index(
1028 &self,
1029 filter: &FilterOp,
1030 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1031 let (scan_variable, scan_label) = match filter.input.as_ref() {
1033 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1034 (scan.variable.clone(), scan.label.clone())
1035 }
1036 _ => return Ok(None),
1037 };
1038
1039 let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1042
1043 if conditions.is_empty() {
1044 return Ok(None);
1045 }
1046
1047 let has_indexed_condition = conditions
1049 .iter()
1050 .any(|(prop, _)| self.store.has_property_index(prop));
1051
1052 if !has_indexed_condition {
1053 return Ok(None);
1054 }
1055
1056 let conditions_ref: Vec<(&str, Value)> = conditions
1058 .iter()
1059 .map(|(p, v)| (p.as_str(), v.clone()))
1060 .collect();
1061 let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1062
1063 if let Some(label) = &scan_label {
1065 let label_nodes: std::collections::HashSet<_> =
1066 self.store.nodes_by_label(label).into_iter().collect();
1067 matching_nodes.retain(|n| label_nodes.contains(n));
1068 }
1069
1070 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1072 let columns = vec![scan_variable];
1073
1074 Ok(Some((node_list_op, columns)))
1075 }
1076
1077 fn extract_equality_conditions(
1083 &self,
1084 predicate: &LogicalExpression,
1085 target_variable: &str,
1086 ) -> Vec<(String, Value)> {
1087 let mut conditions = Vec::new();
1088 self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1089 conditions
1090 }
1091
1092 fn collect_equality_conditions(
1094 &self,
1095 expr: &LogicalExpression,
1096 target_variable: &str,
1097 conditions: &mut Vec<(String, Value)>,
1098 ) {
1099 match expr {
1100 LogicalExpression::Binary {
1102 left,
1103 op: BinaryOp::And,
1104 right,
1105 } => {
1106 self.collect_equality_conditions(left, target_variable, conditions);
1107 self.collect_equality_conditions(right, target_variable, conditions);
1108 }
1109
1110 LogicalExpression::Binary {
1112 left,
1113 op: BinaryOp::Eq,
1114 right,
1115 } => {
1116 if let Some((var, prop, val)) = self.extract_property_equality(left, right) {
1117 if var == target_variable {
1118 conditions.push((prop, val));
1119 }
1120 }
1121 }
1122
1123 _ => {}
1124 }
1125 }
1126
1127 fn extract_property_equality(
1129 &self,
1130 left: &LogicalExpression,
1131 right: &LogicalExpression,
1132 ) -> Option<(String, String, Value)> {
1133 match (left, right) {
1134 (
1135 LogicalExpression::Property { variable, property },
1136 LogicalExpression::Literal(val),
1137 ) => Some((variable.clone(), property.clone(), val.clone())),
1138 (
1139 LogicalExpression::Literal(val),
1140 LogicalExpression::Property { variable, property },
1141 ) => Some((variable.clone(), property.clone(), val.clone())),
1142 _ => None,
1143 }
1144 }
1145
1146 fn try_plan_filter_with_range_index(
1159 &self,
1160 filter: &FilterOp,
1161 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1162 let (scan_variable, scan_label) = match filter.input.as_ref() {
1164 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1165 (scan.variable.clone(), scan.label.clone())
1166 }
1167 _ => return Ok(None),
1168 };
1169
1170 if let Some((variable, property, min, max, min_inc, max_inc)) =
1172 self.extract_between_predicate(&filter.predicate)
1173 {
1174 if variable == scan_variable {
1175 return self.plan_range_filter(
1176 &scan_variable,
1177 &scan_label,
1178 &property,
1179 RangeBounds {
1180 min: Some(&min),
1181 max: Some(&max),
1182 min_inclusive: min_inc,
1183 max_inclusive: max_inc,
1184 },
1185 );
1186 }
1187 }
1188
1189 if let Some((variable, property, op, value)) =
1191 self.extract_range_predicate(&filter.predicate)
1192 {
1193 if variable == scan_variable {
1194 let (min, max, min_inc, max_inc) = match op {
1195 BinaryOp::Lt => (None, Some(value), false, false),
1196 BinaryOp::Le => (None, Some(value), false, true),
1197 BinaryOp::Gt => (Some(value), None, false, false),
1198 BinaryOp::Ge => (Some(value), None, true, false),
1199 _ => return Ok(None),
1200 };
1201 return self.plan_range_filter(
1202 &scan_variable,
1203 &scan_label,
1204 &property,
1205 RangeBounds {
1206 min: min.as_ref(),
1207 max: max.as_ref(),
1208 min_inclusive: min_inc,
1209 max_inclusive: max_inc,
1210 },
1211 );
1212 }
1213 }
1214
1215 Ok(None)
1216 }
1217
1218 fn plan_range_filter(
1220 &self,
1221 scan_variable: &str,
1222 scan_label: &Option<String>,
1223 property: &str,
1224 bounds: RangeBounds<'_>,
1225 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1226 let mut matching_nodes = self.store.find_nodes_in_range(
1228 property,
1229 bounds.min,
1230 bounds.max,
1231 bounds.min_inclusive,
1232 bounds.max_inclusive,
1233 );
1234
1235 if let Some(label) = scan_label {
1237 let label_nodes: std::collections::HashSet<_> =
1238 self.store.nodes_by_label(label).into_iter().collect();
1239 matching_nodes.retain(|n| label_nodes.contains(n));
1240 }
1241
1242 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1244 let columns = vec![scan_variable.to_string()];
1245
1246 Ok(Some((node_list_op, columns)))
1247 }
1248
1249 fn extract_range_predicate(
1253 &self,
1254 predicate: &LogicalExpression,
1255 ) -> Option<(String, String, BinaryOp, Value)> {
1256 match predicate {
1257 LogicalExpression::Binary { left, op, right } => {
1258 match op {
1259 BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1260 if let (
1262 LogicalExpression::Property { variable, property },
1263 LogicalExpression::Literal(val),
1264 ) = (left.as_ref(), right.as_ref())
1265 {
1266 return Some((variable.clone(), property.clone(), *op, val.clone()));
1267 }
1268
1269 if let (
1271 LogicalExpression::Literal(val),
1272 LogicalExpression::Property { variable, property },
1273 ) = (left.as_ref(), right.as_ref())
1274 {
1275 let flipped_op = match op {
1276 BinaryOp::Lt => BinaryOp::Gt,
1277 BinaryOp::Le => BinaryOp::Ge,
1278 BinaryOp::Gt => BinaryOp::Lt,
1279 BinaryOp::Ge => BinaryOp::Le,
1280 _ => return None,
1281 };
1282 return Some((
1283 variable.clone(),
1284 property.clone(),
1285 flipped_op,
1286 val.clone(),
1287 ));
1288 }
1289 }
1290 _ => {}
1291 }
1292 }
1293 _ => {}
1294 }
1295 None
1296 }
1297
1298 fn extract_between_predicate(
1306 &self,
1307 predicate: &LogicalExpression,
1308 ) -> Option<(String, String, Value, Value, bool, bool)> {
1309 let (left, right) = match predicate {
1311 LogicalExpression::Binary {
1312 left,
1313 op: BinaryOp::And,
1314 right,
1315 } => (left.as_ref(), right.as_ref()),
1316 _ => return None,
1317 };
1318
1319 let left_range = self.extract_range_predicate(left);
1321 let right_range = self.extract_range_predicate(right);
1322
1323 let (left_var, left_prop, left_op, left_val) = left_range?;
1324 let (right_var, right_prop, right_op, right_val) = right_range?;
1325
1326 if left_var != right_var || left_prop != right_prop {
1328 return None;
1329 }
1330
1331 let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1333 (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1335 (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1337 (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1339 (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1341 (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1343 (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1345 (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1347 (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1349 _ => return None,
1350 };
1351
1352 Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1353 }
1354
1355 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1357 let (input_op, columns) = self.plan_operator(&limit.input)?;
1358 let output_schema = self.derive_schema_from_columns(&columns);
1359 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1360 Ok((operator, columns))
1361 }
1362
1363 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1365 let (input_op, columns) = self.plan_operator(&skip.input)?;
1366 let output_schema = self.derive_schema_from_columns(&columns);
1367 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1368 Ok((operator, columns))
1369 }
1370
1371 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1373 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1374
1375 let mut variable_columns: HashMap<String, usize> = input_columns
1377 .iter()
1378 .enumerate()
1379 .map(|(i, name)| (name.clone(), i))
1380 .collect();
1381
1382 let mut property_projections: Vec<(String, String, String)> = Vec::new();
1384 let mut next_col_idx = input_columns.len();
1385
1386 for key in &sort.keys {
1387 if let LogicalExpression::Property { variable, property } = &key.expression {
1388 let col_name = format!("{}_{}", variable, property);
1389 if !variable_columns.contains_key(&col_name) {
1390 property_projections.push((
1391 variable.clone(),
1392 property.clone(),
1393 col_name.clone(),
1394 ));
1395 variable_columns.insert(col_name, next_col_idx);
1396 next_col_idx += 1;
1397 }
1398 }
1399 }
1400
1401 let mut output_columns = input_columns.clone();
1403
1404 if !property_projections.is_empty() {
1406 let mut projections = Vec::new();
1407 let mut output_types = Vec::new();
1408
1409 for (i, _) in input_columns.iter().enumerate() {
1412 projections.push(ProjectExpr::Column(i));
1413 output_types.push(LogicalType::Node);
1414 }
1415
1416 for (variable, property, col_name) in &property_projections {
1418 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1419 Error::Internal(format!(
1420 "Variable '{}' not found for ORDER BY property projection",
1421 variable
1422 ))
1423 })?;
1424 projections.push(ProjectExpr::PropertyAccess {
1425 column: source_col,
1426 property: property.clone(),
1427 });
1428 output_types.push(LogicalType::Any);
1429 output_columns.push(col_name.clone());
1430 }
1431
1432 input_op = Box::new(ProjectOperator::with_store(
1433 input_op,
1434 projections,
1435 output_types,
1436 Arc::clone(&self.store),
1437 ));
1438 }
1439
1440 let physical_keys: Vec<PhysicalSortKey> = sort
1442 .keys
1443 .iter()
1444 .map(|key| {
1445 let col_idx = self
1446 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1447 Ok(PhysicalSortKey {
1448 column: col_idx,
1449 direction: match key.order {
1450 SortOrder::Ascending => SortDirection::Ascending,
1451 SortOrder::Descending => SortDirection::Descending,
1452 },
1453 null_order: NullOrder::NullsLast,
1454 })
1455 })
1456 .collect::<Result<Vec<_>>>()?;
1457
1458 let output_schema = self.derive_schema_from_columns(&output_columns);
1459 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1460 Ok((operator, output_columns))
1461 }
1462
1463 fn resolve_sort_expression_with_properties(
1465 &self,
1466 expr: &LogicalExpression,
1467 variable_columns: &HashMap<String, usize>,
1468 ) -> Result<usize> {
1469 match expr {
1470 LogicalExpression::Variable(name) => {
1471 variable_columns.get(name).copied().ok_or_else(|| {
1472 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1473 })
1474 }
1475 LogicalExpression::Property { variable, property } => {
1476 let col_name = format!("{}_{}", variable, property);
1478 variable_columns.get(&col_name).copied().ok_or_else(|| {
1479 Error::Internal(format!(
1480 "Property column '{}' not found for ORDER BY (from {}.{})",
1481 col_name, variable, property
1482 ))
1483 })
1484 }
1485 _ => Err(Error::Internal(format!(
1486 "Unsupported ORDER BY expression: {:?}",
1487 expr
1488 ))),
1489 }
1490 }
1491
1492 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1494 columns.iter().map(|_| LogicalType::Any).collect()
1495 }
1496
1497 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1499 if self.factorized_execution
1506 && agg.group_by.is_empty()
1507 && Self::count_expand_chain(&agg.input).0 >= 2
1508 && self.is_simple_aggregate(agg)
1509 {
1510 if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1511 return Ok((op, cols));
1512 }
1513 }
1515
1516 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1517
1518 let mut variable_columns: HashMap<String, usize> = input_columns
1520 .iter()
1521 .enumerate()
1522 .map(|(i, name)| (name.clone(), i))
1523 .collect();
1524
1525 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1528
1529 for expr in &agg.group_by {
1531 if let LogicalExpression::Property { variable, property } = expr {
1532 let col_name = format!("{}_{}", variable, property);
1533 if !variable_columns.contains_key(&col_name) {
1534 property_projections.push((
1535 variable.clone(),
1536 property.clone(),
1537 col_name.clone(),
1538 ));
1539 variable_columns.insert(col_name, next_col_idx);
1540 next_col_idx += 1;
1541 }
1542 }
1543 }
1544
1545 for agg_expr in &agg.aggregates {
1547 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1548 let col_name = format!("{}_{}", variable, property);
1549 if !variable_columns.contains_key(&col_name) {
1550 property_projections.push((
1551 variable.clone(),
1552 property.clone(),
1553 col_name.clone(),
1554 ));
1555 variable_columns.insert(col_name, next_col_idx);
1556 next_col_idx += 1;
1557 }
1558 }
1559 }
1560
1561 if !property_projections.is_empty() {
1563 let mut projections = Vec::new();
1564 let mut output_types = Vec::new();
1565
1566 for (i, _) in input_columns.iter().enumerate() {
1569 projections.push(ProjectExpr::Column(i));
1570 output_types.push(LogicalType::Node);
1571 }
1572
1573 for (variable, property, _col_name) in &property_projections {
1575 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1576 Error::Internal(format!(
1577 "Variable '{}' not found for property projection",
1578 variable
1579 ))
1580 })?;
1581 projections.push(ProjectExpr::PropertyAccess {
1582 column: source_col,
1583 property: property.clone(),
1584 });
1585 output_types.push(LogicalType::Any); }
1587
1588 input_op = Box::new(ProjectOperator::with_store(
1589 input_op,
1590 projections,
1591 output_types,
1592 Arc::clone(&self.store),
1593 ));
1594 }
1595
1596 let group_columns: Vec<usize> = agg
1598 .group_by
1599 .iter()
1600 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1601 .collect::<Result<Vec<_>>>()?;
1602
1603 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1605 .aggregates
1606 .iter()
1607 .map(|agg_expr| {
1608 let column = agg_expr
1609 .expression
1610 .as_ref()
1611 .map(|e| {
1612 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1613 })
1614 .transpose()?;
1615
1616 Ok(PhysicalAggregateExpr {
1617 function: convert_aggregate_function(agg_expr.function),
1618 column,
1619 distinct: agg_expr.distinct,
1620 alias: agg_expr.alias.clone(),
1621 percentile: agg_expr.percentile,
1622 })
1623 })
1624 .collect::<Result<Vec<_>>>()?;
1625
1626 let mut output_schema = Vec::new();
1628 let mut output_columns = Vec::new();
1629
1630 for expr in &agg.group_by {
1632 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1634 }
1635
1636 for agg_expr in &agg.aggregates {
1638 let result_type = match agg_expr.function {
1639 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1640 LogicalType::Int64
1641 }
1642 LogicalAggregateFunction::Sum => LogicalType::Int64,
1643 LogicalAggregateFunction::Avg => LogicalType::Float64,
1644 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1645 LogicalType::Int64
1649 }
1650 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1653 | LogicalAggregateFunction::StdDevPop
1654 | LogicalAggregateFunction::PercentileDisc
1655 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1656 };
1657 output_schema.push(result_type);
1658 output_columns.push(
1659 agg_expr
1660 .alias
1661 .clone()
1662 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1663 );
1664 }
1665
1666 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1668 Box::new(SimpleAggregateOperator::new(
1669 input_op,
1670 physical_aggregates,
1671 output_schema,
1672 ))
1673 } else {
1674 Box::new(HashAggregateOperator::new(
1675 input_op,
1676 group_columns,
1677 physical_aggregates,
1678 output_schema,
1679 ))
1680 };
1681
1682 if let Some(having_expr) = &agg.having {
1684 let having_var_columns: HashMap<String, usize> = output_columns
1686 .iter()
1687 .enumerate()
1688 .map(|(i, name)| (name.clone(), i))
1689 .collect();
1690
1691 let filter_expr = self.convert_expression(having_expr)?;
1692 let predicate =
1693 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1694 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1695 }
1696
1697 Ok((operator, output_columns))
1698 }
1699
1700 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1706 agg.aggregates.iter().all(|agg_expr| {
1707 match agg_expr.function {
1708 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1709 agg_expr.expression.is_none()
1711 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1712 }
1713 LogicalAggregateFunction::Sum
1714 | LogicalAggregateFunction::Avg
1715 | LogicalAggregateFunction::Min
1716 | LogicalAggregateFunction::Max => {
1717 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1720 }
1721 _ => false,
1723 }
1724 })
1725 }
1726
1727 fn plan_factorized_aggregate(
1731 &self,
1732 agg: &AggregateOp,
1733 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1734 let expands = Self::collect_expand_chain(&agg.input);
1736 if expands.is_empty() {
1737 return Err(Error::Internal(
1738 "Expected expand chain for factorized aggregate".to_string(),
1739 ));
1740 }
1741
1742 let first_expand = expands[0];
1744 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1745
1746 let mut columns = base_columns.clone();
1747 let mut steps = Vec::new();
1748 let mut is_first = true;
1749
1750 for expand in &expands {
1751 let source_column = if is_first {
1753 base_columns
1754 .iter()
1755 .position(|c| c == &expand.from_variable)
1756 .ok_or_else(|| {
1757 Error::Internal(format!(
1758 "Source variable '{}' not found in base columns",
1759 expand.from_variable
1760 ))
1761 })?
1762 } else {
1763 1 };
1765
1766 let direction = match expand.direction {
1767 ExpandDirection::Outgoing => Direction::Outgoing,
1768 ExpandDirection::Incoming => Direction::Incoming,
1769 ExpandDirection::Both => Direction::Both,
1770 };
1771
1772 steps.push(ExpandStep {
1773 source_column,
1774 direction,
1775 edge_type: expand.edge_type.clone(),
1776 });
1777
1778 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1779 let count = self.anon_edge_counter.get();
1780 self.anon_edge_counter.set(count + 1);
1781 format!("_anon_edge_{}", count)
1782 });
1783 columns.push(edge_col_name);
1784 columns.push(expand.to_variable.clone());
1785
1786 is_first = false;
1787 }
1788
1789 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1791
1792 if let Some(tx_id) = self.tx_id {
1793 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1794 } else {
1795 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1796 }
1797
1798 let factorized_aggs: Vec<FactorizedAggregate> = agg
1800 .aggregates
1801 .iter()
1802 .map(|agg_expr| {
1803 match agg_expr.function {
1804 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1805 if agg_expr.expression.is_none() {
1807 FactorizedAggregate::count()
1808 } else {
1809 FactorizedAggregate::count_column(1) }
1813 }
1814 LogicalAggregateFunction::Sum => {
1815 FactorizedAggregate::sum(1)
1817 }
1818 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1819 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1820 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1821 _ => {
1822 FactorizedAggregate::count()
1824 }
1825 }
1826 })
1827 .collect();
1828
1829 let output_columns: Vec<String> = agg
1831 .aggregates
1832 .iter()
1833 .map(|agg_expr| {
1834 agg_expr
1835 .alias
1836 .clone()
1837 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1838 })
1839 .collect();
1840
1841 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1843
1844 Ok((Box::new(factorized_agg_op), output_columns))
1845 }
1846
1847 #[allow(dead_code)]
1849 fn resolve_expression_to_column(
1850 &self,
1851 expr: &LogicalExpression,
1852 variable_columns: &HashMap<String, usize>,
1853 ) -> Result<usize> {
1854 match expr {
1855 LogicalExpression::Variable(name) => variable_columns
1856 .get(name)
1857 .copied()
1858 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1859 LogicalExpression::Property { variable, .. } => variable_columns
1860 .get(variable)
1861 .copied()
1862 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1863 _ => Err(Error::Internal(format!(
1864 "Cannot resolve expression to column: {:?}",
1865 expr
1866 ))),
1867 }
1868 }
1869
1870 fn resolve_expression_to_column_with_properties(
1874 &self,
1875 expr: &LogicalExpression,
1876 variable_columns: &HashMap<String, usize>,
1877 ) -> Result<usize> {
1878 match expr {
1879 LogicalExpression::Variable(name) => variable_columns
1880 .get(name)
1881 .copied()
1882 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1883 LogicalExpression::Property { variable, property } => {
1884 let col_name = format!("{}_{}", variable, property);
1886 variable_columns.get(&col_name).copied().ok_or_else(|| {
1887 Error::Internal(format!(
1888 "Property column '{}' not found (from {}.{})",
1889 col_name, variable, property
1890 ))
1891 })
1892 }
1893 _ => Err(Error::Internal(format!(
1894 "Cannot resolve expression to column: {:?}",
1895 expr
1896 ))),
1897 }
1898 }
1899
1900 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1902 match expr {
1903 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1904 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1905 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1906 variable: variable.clone(),
1907 property: property.clone(),
1908 }),
1909 LogicalExpression::Binary { left, op, right } => {
1910 let left_expr = self.convert_expression(left)?;
1911 let right_expr = self.convert_expression(right)?;
1912 let filter_op = convert_binary_op(*op)?;
1913 Ok(FilterExpression::Binary {
1914 left: Box::new(left_expr),
1915 op: filter_op,
1916 right: Box::new(right_expr),
1917 })
1918 }
1919 LogicalExpression::Unary { op, operand } => {
1920 let operand_expr = self.convert_expression(operand)?;
1921 let filter_op = convert_unary_op(*op)?;
1922 Ok(FilterExpression::Unary {
1923 op: filter_op,
1924 operand: Box::new(operand_expr),
1925 })
1926 }
1927 LogicalExpression::FunctionCall { name, args, .. } => {
1928 let filter_args: Vec<FilterExpression> = args
1929 .iter()
1930 .map(|a| self.convert_expression(a))
1931 .collect::<Result<Vec<_>>>()?;
1932 Ok(FilterExpression::FunctionCall {
1933 name: name.clone(),
1934 args: filter_args,
1935 })
1936 }
1937 LogicalExpression::Case {
1938 operand,
1939 when_clauses,
1940 else_clause,
1941 } => {
1942 let filter_operand = operand
1943 .as_ref()
1944 .map(|e| self.convert_expression(e))
1945 .transpose()?
1946 .map(Box::new);
1947 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1948 .iter()
1949 .map(|(cond, result)| {
1950 Ok((
1951 self.convert_expression(cond)?,
1952 self.convert_expression(result)?,
1953 ))
1954 })
1955 .collect::<Result<Vec<_>>>()?;
1956 let filter_else = else_clause
1957 .as_ref()
1958 .map(|e| self.convert_expression(e))
1959 .transpose()?
1960 .map(Box::new);
1961 Ok(FilterExpression::Case {
1962 operand: filter_operand,
1963 when_clauses: filter_when_clauses,
1964 else_clause: filter_else,
1965 })
1966 }
1967 LogicalExpression::List(items) => {
1968 let filter_items: Vec<FilterExpression> = items
1969 .iter()
1970 .map(|item| self.convert_expression(item))
1971 .collect::<Result<Vec<_>>>()?;
1972 Ok(FilterExpression::List(filter_items))
1973 }
1974 LogicalExpression::Map(pairs) => {
1975 let filter_pairs: Vec<(String, FilterExpression)> = pairs
1976 .iter()
1977 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1978 .collect::<Result<Vec<_>>>()?;
1979 Ok(FilterExpression::Map(filter_pairs))
1980 }
1981 LogicalExpression::IndexAccess { base, index } => {
1982 let base_expr = self.convert_expression(base)?;
1983 let index_expr = self.convert_expression(index)?;
1984 Ok(FilterExpression::IndexAccess {
1985 base: Box::new(base_expr),
1986 index: Box::new(index_expr),
1987 })
1988 }
1989 LogicalExpression::SliceAccess { base, start, end } => {
1990 let base_expr = self.convert_expression(base)?;
1991 let start_expr = start
1992 .as_ref()
1993 .map(|s| self.convert_expression(s))
1994 .transpose()?
1995 .map(Box::new);
1996 let end_expr = end
1997 .as_ref()
1998 .map(|e| self.convert_expression(e))
1999 .transpose()?
2000 .map(Box::new);
2001 Ok(FilterExpression::SliceAccess {
2002 base: Box::new(base_expr),
2003 start: start_expr,
2004 end: end_expr,
2005 })
2006 }
2007 LogicalExpression::Parameter(_) => Err(Error::Internal(
2008 "Parameters not yet supported in filters".to_string(),
2009 )),
2010 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2011 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2012 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2013 LogicalExpression::ListComprehension {
2014 variable,
2015 list_expr,
2016 filter_expr,
2017 map_expr,
2018 } => {
2019 let list = self.convert_expression(list_expr)?;
2020 let filter = filter_expr
2021 .as_ref()
2022 .map(|f| self.convert_expression(f))
2023 .transpose()?
2024 .map(Box::new);
2025 let map = self.convert_expression(map_expr)?;
2026 Ok(FilterExpression::ListComprehension {
2027 variable: variable.clone(),
2028 list_expr: Box::new(list),
2029 filter_expr: filter,
2030 map_expr: Box::new(map),
2031 })
2032 }
2033 LogicalExpression::ExistsSubquery(subplan) => {
2034 let (start_var, direction, edge_type, end_labels) =
2037 self.extract_exists_pattern(subplan)?;
2038
2039 Ok(FilterExpression::ExistsSubquery {
2040 start_var,
2041 direction,
2042 edge_type,
2043 end_labels,
2044 min_hops: None,
2045 max_hops: None,
2046 })
2047 }
2048 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2049 "COUNT subqueries not yet supported".to_string(),
2050 )),
2051 }
2052 }
2053
2054 fn extract_exists_pattern(
2057 &self,
2058 subplan: &LogicalOperator,
2059 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2060 match subplan {
2061 LogicalOperator::Expand(expand) => {
2062 let end_labels = self.extract_end_labels_from_expand(expand);
2064 let direction = match expand.direction {
2065 ExpandDirection::Outgoing => Direction::Outgoing,
2066 ExpandDirection::Incoming => Direction::Incoming,
2067 ExpandDirection::Both => Direction::Both,
2068 };
2069 Ok((
2070 expand.from_variable.clone(),
2071 direction,
2072 expand.edge_type.clone(),
2073 end_labels,
2074 ))
2075 }
2076 LogicalOperator::NodeScan(scan) => {
2077 if let Some(input) = &scan.input {
2078 self.extract_exists_pattern(input)
2079 } else {
2080 Err(Error::Internal(
2081 "EXISTS subquery must contain an edge pattern".to_string(),
2082 ))
2083 }
2084 }
2085 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2086 _ => Err(Error::Internal(
2087 "Unsupported EXISTS subquery pattern".to_string(),
2088 )),
2089 }
2090 }
2091
2092 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2094 match expand.input.as_ref() {
2096 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2097 _ => None,
2098 }
2099 }
2100
2101 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2103 let (left_op, left_columns) = self.plan_operator(&join.left)?;
2104 let (right_op, right_columns) = self.plan_operator(&join.right)?;
2105
2106 let mut columns = left_columns.clone();
2108 columns.extend(right_columns.clone());
2109
2110 let physical_join_type = match join.join_type {
2112 JoinType::Inner => PhysicalJoinType::Inner,
2113 JoinType::Left => PhysicalJoinType::Left,
2114 JoinType::Right => PhysicalJoinType::Right,
2115 JoinType::Full => PhysicalJoinType::Full,
2116 JoinType::Cross => PhysicalJoinType::Cross,
2117 JoinType::Semi => PhysicalJoinType::Semi,
2118 JoinType::Anti => PhysicalJoinType::Anti,
2119 };
2120
2121 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2123 (vec![], vec![])
2125 } else {
2126 join.conditions
2127 .iter()
2128 .filter_map(|cond| {
2129 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2131 let right_idx = self
2132 .expression_to_column(&cond.right, &right_columns)
2133 .ok()?;
2134 Some((left_idx, right_idx))
2135 })
2136 .unzip()
2137 };
2138
2139 let output_schema = self.derive_schema_from_columns(&columns);
2140
2141 let _ = LeapfrogJoinOperator::new; let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2149 left_op,
2150 right_op,
2151 probe_keys,
2152 build_keys,
2153 physical_join_type,
2154 output_schema,
2155 ));
2156
2157 Ok((operator, columns))
2158 }
2159
2160 #[allow(dead_code)]
2169 fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2170 let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2172 let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2173
2174 Self::collect_join_edges(
2176 &LogicalOperator::Join(join.clone()),
2177 &mut edges,
2178 &mut all_vars,
2179 );
2180
2181 if all_vars.len() < 3 {
2183 return false;
2184 }
2185
2186 Self::has_cycle(&edges, &all_vars)
2188 }
2189
2190 fn collect_join_edges(
2192 op: &LogicalOperator,
2193 edges: &mut HashMap<String, Vec<String>>,
2194 vars: &mut std::collections::HashSet<String>,
2195 ) {
2196 match op {
2197 LogicalOperator::Join(join) => {
2198 for cond in &join.conditions {
2200 if let (Some(left_var), Some(right_var)) = (
2201 Self::extract_join_variable(&cond.left),
2202 Self::extract_join_variable(&cond.right),
2203 ) {
2204 if left_var != right_var {
2205 vars.insert(left_var.clone());
2206 vars.insert(right_var.clone());
2207
2208 edges
2210 .entry(left_var.clone())
2211 .or_default()
2212 .push(right_var.clone());
2213 edges.entry(right_var).or_default().push(left_var);
2214 }
2215 }
2216 }
2217
2218 Self::collect_join_edges(&join.left, edges, vars);
2220 Self::collect_join_edges(&join.right, edges, vars);
2221 }
2222 LogicalOperator::Expand(expand) => {
2223 vars.insert(expand.from_variable.clone());
2225 vars.insert(expand.to_variable.clone());
2226
2227 edges
2228 .entry(expand.from_variable.clone())
2229 .or_default()
2230 .push(expand.to_variable.clone());
2231 edges
2232 .entry(expand.to_variable.clone())
2233 .or_default()
2234 .push(expand.from_variable.clone());
2235
2236 Self::collect_join_edges(&expand.input, edges, vars);
2237 }
2238 LogicalOperator::Filter(filter) => {
2239 Self::collect_join_edges(&filter.input, edges, vars);
2240 }
2241 LogicalOperator::NodeScan(scan) => {
2242 vars.insert(scan.variable.clone());
2243 }
2244 _ => {}
2245 }
2246 }
2247
2248 fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2250 match expr {
2251 LogicalExpression::Variable(v) => Some(v.clone()),
2252 LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2253 LogicalExpression::Id(v) => Some(v.clone()),
2254 _ => None,
2255 }
2256 }
2257
2258 fn has_cycle(
2262 edges: &HashMap<String, Vec<String>>,
2263 vars: &std::collections::HashSet<String>,
2264 ) -> bool {
2265 let mut color: HashMap<&String, u8> = HashMap::new();
2266
2267 for var in vars {
2268 color.insert(var, 0);
2269 }
2270
2271 for start in vars {
2272 if color[start] == 0 {
2273 if Self::dfs_cycle(start, None, edges, &mut color) {
2274 return true;
2275 }
2276 }
2277 }
2278
2279 false
2280 }
2281
2282 fn dfs_cycle(
2284 node: &String,
2285 parent: Option<&String>,
2286 edges: &HashMap<String, Vec<String>>,
2287 color: &mut HashMap<&String, u8>,
2288 ) -> bool {
2289 *color.get_mut(node).unwrap() = 1; if let Some(neighbors) = edges.get(node) {
2292 for neighbor in neighbors {
2293 if parent == Some(neighbor) {
2295 continue;
2296 }
2297
2298 if let Some(&c) = color.get(neighbor) {
2299 if c == 1 {
2300 return true;
2302 }
2303 if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2304 return true;
2305 }
2306 }
2307 }
2308 }
2309
2310 *color.get_mut(node).unwrap() = 2; false
2312 }
2313
2314 #[allow(dead_code)]
2316 fn count_relations(op: &LogicalOperator) -> usize {
2317 match op {
2318 LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2319 LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2320 LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2321 LogicalOperator::Join(j) => {
2322 Self::count_relations(&j.left) + Self::count_relations(&j.right)
2323 }
2324 _ => 0,
2325 }
2326 }
2327
2328 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2330 match expr {
2331 LogicalExpression::Variable(name) => columns
2332 .iter()
2333 .position(|c| c == name)
2334 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2335 _ => Err(Error::Internal(
2336 "Only variables supported in join conditions".to_string(),
2337 )),
2338 }
2339 }
2340
2341 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2343 if union.inputs.is_empty() {
2344 return Err(Error::Internal(
2345 "Union requires at least one input".to_string(),
2346 ));
2347 }
2348
2349 let mut inputs = Vec::with_capacity(union.inputs.len());
2350 let mut columns = Vec::new();
2351
2352 for (i, input) in union.inputs.iter().enumerate() {
2353 let (op, cols) = self.plan_operator(input)?;
2354 if i == 0 {
2355 columns = cols;
2356 }
2357 inputs.push(op);
2358 }
2359
2360 let output_schema = self.derive_schema_from_columns(&columns);
2361 let operator = Box::new(UnionOperator::new(inputs, output_schema));
2362
2363 Ok((operator, columns))
2364 }
2365
2366 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2368 let (input_op, columns) = self.plan_operator(&distinct.input)?;
2369 let output_schema = self.derive_schema_from_columns(&columns);
2370 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2371 Ok((operator, columns))
2372 }
2373
2374 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2376 let (input_op, mut columns) = if let Some(ref input) = create.input {
2378 let (op, cols) = self.plan_operator(input)?;
2379 (Some(op), cols)
2380 } else {
2381 (None, vec![])
2382 };
2383
2384 let output_column = columns.len();
2386 columns.push(create.variable.clone());
2387
2388 let properties: Vec<(String, PropertySource)> = create
2390 .properties
2391 .iter()
2392 .map(|(name, expr)| {
2393 let source = match expr {
2394 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2395 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2396 };
2397 (name.clone(), source)
2398 })
2399 .collect();
2400
2401 let output_schema = self.derive_schema_from_columns(&columns);
2402
2403 let operator = Box::new(
2404 CreateNodeOperator::new(
2405 Arc::clone(&self.store),
2406 input_op,
2407 create.labels.clone(),
2408 properties,
2409 output_schema,
2410 output_column,
2411 )
2412 .with_tx_context(self.viewing_epoch, self.tx_id),
2413 );
2414
2415 Ok((operator, columns))
2416 }
2417
2418 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2420 let (input_op, mut columns) = self.plan_operator(&create.input)?;
2421
2422 let from_column = columns
2424 .iter()
2425 .position(|c| c == &create.from_variable)
2426 .ok_or_else(|| {
2427 Error::Internal(format!(
2428 "Source variable '{}' not found",
2429 create.from_variable
2430 ))
2431 })?;
2432
2433 let to_column = columns
2434 .iter()
2435 .position(|c| c == &create.to_variable)
2436 .ok_or_else(|| {
2437 Error::Internal(format!(
2438 "Target variable '{}' not found",
2439 create.to_variable
2440 ))
2441 })?;
2442
2443 let output_column = create.variable.as_ref().map(|v| {
2445 let idx = columns.len();
2446 columns.push(v.clone());
2447 idx
2448 });
2449
2450 let properties: Vec<(String, PropertySource)> = create
2452 .properties
2453 .iter()
2454 .map(|(name, expr)| {
2455 let source = match expr {
2456 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2457 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2458 };
2459 (name.clone(), source)
2460 })
2461 .collect();
2462
2463 let output_schema = self.derive_schema_from_columns(&columns);
2464
2465 let operator = Box::new(
2466 CreateEdgeOperator::new(
2467 Arc::clone(&self.store),
2468 input_op,
2469 from_column,
2470 to_column,
2471 create.edge_type.clone(),
2472 properties,
2473 output_schema,
2474 output_column,
2475 )
2476 .with_tx_context(self.viewing_epoch, self.tx_id),
2477 );
2478
2479 Ok((operator, columns))
2480 }
2481
2482 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2484 let (input_op, columns) = self.plan_operator(&delete.input)?;
2485
2486 let node_column = columns
2487 .iter()
2488 .position(|c| c == &delete.variable)
2489 .ok_or_else(|| {
2490 Error::Internal(format!(
2491 "Variable '{}' not found for delete",
2492 delete.variable
2493 ))
2494 })?;
2495
2496 let output_schema = vec![LogicalType::Int64];
2498 let output_columns = vec!["deleted_count".to_string()];
2499
2500 let operator = Box::new(
2501 DeleteNodeOperator::new(
2502 Arc::clone(&self.store),
2503 input_op,
2504 node_column,
2505 output_schema,
2506 delete.detach, )
2508 .with_tx_context(self.viewing_epoch, self.tx_id),
2509 );
2510
2511 Ok((operator, output_columns))
2512 }
2513
2514 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2516 let (input_op, columns) = self.plan_operator(&delete.input)?;
2517
2518 let edge_column = columns
2519 .iter()
2520 .position(|c| c == &delete.variable)
2521 .ok_or_else(|| {
2522 Error::Internal(format!(
2523 "Variable '{}' not found for delete",
2524 delete.variable
2525 ))
2526 })?;
2527
2528 let output_schema = vec![LogicalType::Int64];
2530 let output_columns = vec!["deleted_count".to_string()];
2531
2532 let operator = Box::new(
2533 DeleteEdgeOperator::new(
2534 Arc::clone(&self.store),
2535 input_op,
2536 edge_column,
2537 output_schema,
2538 )
2539 .with_tx_context(self.viewing_epoch, self.tx_id),
2540 );
2541
2542 Ok((operator, output_columns))
2543 }
2544
2545 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2547 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2548 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2549
2550 let mut columns = left_columns.clone();
2552 columns.extend(right_columns.clone());
2553
2554 let mut probe_keys = Vec::new();
2556 let mut build_keys = Vec::new();
2557
2558 for (right_idx, right_col) in right_columns.iter().enumerate() {
2559 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2560 probe_keys.push(left_idx);
2561 build_keys.push(right_idx);
2562 }
2563 }
2564
2565 let output_schema = self.derive_schema_from_columns(&columns);
2566
2567 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2568 left_op,
2569 right_op,
2570 probe_keys,
2571 build_keys,
2572 PhysicalJoinType::Left,
2573 output_schema,
2574 ));
2575
2576 Ok((operator, columns))
2577 }
2578
2579 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2581 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2582 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2583
2584 let columns = left_columns.clone();
2586
2587 let mut probe_keys = Vec::new();
2589 let mut build_keys = Vec::new();
2590
2591 for (right_idx, right_col) in right_columns.iter().enumerate() {
2592 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2593 probe_keys.push(left_idx);
2594 build_keys.push(right_idx);
2595 }
2596 }
2597
2598 let output_schema = self.derive_schema_from_columns(&columns);
2599
2600 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2601 left_op,
2602 right_op,
2603 probe_keys,
2604 build_keys,
2605 PhysicalJoinType::Anti,
2606 output_schema,
2607 ));
2608
2609 Ok((operator, columns))
2610 }
2611
2612 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2614 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2617 if matches!(&*unwind.input, LogicalOperator::Empty) {
2618 let literal_list = self.convert_expression(&unwind.expression)?;
2623
2624 let single_row_op: Box<dyn Operator> = Box::new(
2626 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2627 );
2628 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2629 single_row_op,
2630 vec![ProjectExpr::Expression {
2631 expr: literal_list,
2632 variable_columns: HashMap::new(),
2633 }],
2634 vec![LogicalType::Any],
2635 Arc::clone(&self.store),
2636 ));
2637
2638 (project_op, vec!["__list__".to_string()])
2639 } else {
2640 self.plan_operator(&unwind.input)?
2641 };
2642
2643 let list_col_idx = match &unwind.expression {
2649 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2650 LogicalExpression::Property { variable, .. } => {
2651 input_columns.iter().position(|c| c == variable)
2654 }
2655 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2656 None
2658 }
2659 _ => None,
2660 };
2661
2662 let mut columns = input_columns.clone();
2664 columns.push(unwind.variable.clone());
2665
2666 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2668 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2673
2674 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2675 input_op,
2676 col_idx,
2677 unwind.variable.clone(),
2678 output_schema,
2679 ));
2680
2681 Ok((operator, columns))
2682 }
2683
2684 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2686 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2688 Vec::new()
2689 } else {
2690 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2691 cols
2692 };
2693
2694 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2696 .match_properties
2697 .iter()
2698 .filter_map(|(name, expr)| {
2699 if let LogicalExpression::Literal(v) = expr {
2700 Some((name.clone(), v.clone()))
2701 } else {
2702 None }
2704 })
2705 .collect();
2706
2707 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2709 .on_create
2710 .iter()
2711 .filter_map(|(name, expr)| {
2712 if let LogicalExpression::Literal(v) = expr {
2713 Some((name.clone(), v.clone()))
2714 } else {
2715 None
2716 }
2717 })
2718 .collect();
2719
2720 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2722 .on_match
2723 .iter()
2724 .filter_map(|(name, expr)| {
2725 if let LogicalExpression::Literal(v) = expr {
2726 Some((name.clone(), v.clone()))
2727 } else {
2728 None
2729 }
2730 })
2731 .collect();
2732
2733 columns.push(merge.variable.clone());
2735
2736 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2737 Arc::clone(&self.store),
2738 merge.variable.clone(),
2739 merge.labels.clone(),
2740 match_properties,
2741 on_create_properties,
2742 on_match_properties,
2743 ));
2744
2745 Ok((operator, columns))
2746 }
2747
2748 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2750 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2752
2753 let source_column = columns
2755 .iter()
2756 .position(|c| c == &sp.source_var)
2757 .ok_or_else(|| {
2758 Error::Internal(format!(
2759 "Source variable '{}' not found for shortestPath",
2760 sp.source_var
2761 ))
2762 })?;
2763
2764 let target_column = columns
2765 .iter()
2766 .position(|c| c == &sp.target_var)
2767 .ok_or_else(|| {
2768 Error::Internal(format!(
2769 "Target variable '{}' not found for shortestPath",
2770 sp.target_var
2771 ))
2772 })?;
2773
2774 let direction = match sp.direction {
2776 ExpandDirection::Outgoing => Direction::Outgoing,
2777 ExpandDirection::Incoming => Direction::Incoming,
2778 ExpandDirection::Both => Direction::Both,
2779 };
2780
2781 let operator: Box<dyn Operator> = Box::new(
2783 ShortestPathOperator::new(
2784 Arc::clone(&self.store),
2785 input_op,
2786 source_column,
2787 target_column,
2788 sp.edge_type.clone(),
2789 direction,
2790 )
2791 .with_all_paths(sp.all_paths),
2792 );
2793
2794 columns.push(format!("_path_length_{}", sp.path_alias));
2797
2798 Ok((operator, columns))
2799 }
2800
2801 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2803 let (input_op, columns) = self.plan_operator(&add_label.input)?;
2804
2805 let node_column = columns
2807 .iter()
2808 .position(|c| c == &add_label.variable)
2809 .ok_or_else(|| {
2810 Error::Internal(format!(
2811 "Variable '{}' not found for ADD LABEL",
2812 add_label.variable
2813 ))
2814 })?;
2815
2816 let output_schema = vec![LogicalType::Int64];
2818 let output_columns = vec!["labels_added".to_string()];
2819
2820 let operator = Box::new(AddLabelOperator::new(
2821 Arc::clone(&self.store),
2822 input_op,
2823 node_column,
2824 add_label.labels.clone(),
2825 output_schema,
2826 ));
2827
2828 Ok((operator, output_columns))
2829 }
2830
2831 fn plan_remove_label(
2833 &self,
2834 remove_label: &RemoveLabelOp,
2835 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2836 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2837
2838 let node_column = columns
2840 .iter()
2841 .position(|c| c == &remove_label.variable)
2842 .ok_or_else(|| {
2843 Error::Internal(format!(
2844 "Variable '{}' not found for REMOVE LABEL",
2845 remove_label.variable
2846 ))
2847 })?;
2848
2849 let output_schema = vec![LogicalType::Int64];
2851 let output_columns = vec!["labels_removed".to_string()];
2852
2853 let operator = Box::new(RemoveLabelOperator::new(
2854 Arc::clone(&self.store),
2855 input_op,
2856 node_column,
2857 remove_label.labels.clone(),
2858 output_schema,
2859 ));
2860
2861 Ok((operator, output_columns))
2862 }
2863
2864 fn plan_set_property(
2866 &self,
2867 set_prop: &SetPropertyOp,
2868 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2869 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2870
2871 let entity_column = columns
2873 .iter()
2874 .position(|c| c == &set_prop.variable)
2875 .ok_or_else(|| {
2876 Error::Internal(format!(
2877 "Variable '{}' not found for SET",
2878 set_prop.variable
2879 ))
2880 })?;
2881
2882 let properties: Vec<(String, PropertySource)> = set_prop
2884 .properties
2885 .iter()
2886 .map(|(name, expr)| {
2887 let source = self.expression_to_property_source(expr, &columns)?;
2888 Ok((name.clone(), source))
2889 })
2890 .collect::<Result<Vec<_>>>()?;
2891
2892 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2894 let output_columns = columns.clone();
2895
2896 let operator = Box::new(SetPropertyOperator::new_for_node(
2898 Arc::clone(&self.store),
2899 input_op,
2900 entity_column,
2901 properties,
2902 output_schema,
2903 ));
2904
2905 Ok((operator, output_columns))
2906 }
2907
2908 fn expression_to_property_source(
2910 &self,
2911 expr: &LogicalExpression,
2912 columns: &[String],
2913 ) -> Result<PropertySource> {
2914 match expr {
2915 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2916 LogicalExpression::Variable(name) => {
2917 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2918 Error::Internal(format!("Variable '{}' not found for property source", name))
2919 })?;
2920 Ok(PropertySource::Column(col_idx))
2921 }
2922 LogicalExpression::Parameter(name) => {
2923 Ok(PropertySource::Constant(
2926 grafeo_common::types::Value::String(format!("${}", name).into()),
2927 ))
2928 }
2929 _ => Err(Error::Internal(format!(
2930 "Unsupported expression type for property source: {:?}",
2931 expr
2932 ))),
2933 }
2934 }
2935}
2936
2937pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2939 match op {
2940 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2941 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2942 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2943 BinaryOp::Le => Ok(BinaryFilterOp::Le),
2944 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2945 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2946 BinaryOp::And => Ok(BinaryFilterOp::And),
2947 BinaryOp::Or => Ok(BinaryFilterOp::Or),
2948 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2949 BinaryOp::Add => Ok(BinaryFilterOp::Add),
2950 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2951 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2952 BinaryOp::Div => Ok(BinaryFilterOp::Div),
2953 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2954 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2955 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2956 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2957 BinaryOp::In => Ok(BinaryFilterOp::In),
2958 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2959 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2960 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2961 "Binary operator {:?} not yet supported in filters",
2962 op
2963 ))),
2964 }
2965}
2966
2967pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2969 match op {
2970 UnaryOp::Not => Ok(UnaryFilterOp::Not),
2971 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2972 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2973 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2974 }
2975}
2976
2977pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2979 match func {
2980 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2981 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2982 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2983 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2984 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2985 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2986 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2987 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2988 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2989 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2990 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2991 }
2992}
2993
2994pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2998 match expr {
2999 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3000 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3001 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3002 variable: variable.clone(),
3003 property: property.clone(),
3004 }),
3005 LogicalExpression::Binary { left, op, right } => {
3006 let left_expr = convert_filter_expression(left)?;
3007 let right_expr = convert_filter_expression(right)?;
3008 let filter_op = convert_binary_op(*op)?;
3009 Ok(FilterExpression::Binary {
3010 left: Box::new(left_expr),
3011 op: filter_op,
3012 right: Box::new(right_expr),
3013 })
3014 }
3015 LogicalExpression::Unary { op, operand } => {
3016 let operand_expr = convert_filter_expression(operand)?;
3017 let filter_op = convert_unary_op(*op)?;
3018 Ok(FilterExpression::Unary {
3019 op: filter_op,
3020 operand: Box::new(operand_expr),
3021 })
3022 }
3023 LogicalExpression::FunctionCall { name, args, .. } => {
3024 let filter_args: Vec<FilterExpression> = args
3025 .iter()
3026 .map(|a| convert_filter_expression(a))
3027 .collect::<Result<Vec<_>>>()?;
3028 Ok(FilterExpression::FunctionCall {
3029 name: name.clone(),
3030 args: filter_args,
3031 })
3032 }
3033 LogicalExpression::Case {
3034 operand,
3035 when_clauses,
3036 else_clause,
3037 } => {
3038 let filter_operand = operand
3039 .as_ref()
3040 .map(|e| convert_filter_expression(e))
3041 .transpose()?
3042 .map(Box::new);
3043 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3044 .iter()
3045 .map(|(cond, result)| {
3046 Ok((
3047 convert_filter_expression(cond)?,
3048 convert_filter_expression(result)?,
3049 ))
3050 })
3051 .collect::<Result<Vec<_>>>()?;
3052 let filter_else = else_clause
3053 .as_ref()
3054 .map(|e| convert_filter_expression(e))
3055 .transpose()?
3056 .map(Box::new);
3057 Ok(FilterExpression::Case {
3058 operand: filter_operand,
3059 when_clauses: filter_when_clauses,
3060 else_clause: filter_else,
3061 })
3062 }
3063 LogicalExpression::List(items) => {
3064 let filter_items: Vec<FilterExpression> = items
3065 .iter()
3066 .map(|item| convert_filter_expression(item))
3067 .collect::<Result<Vec<_>>>()?;
3068 Ok(FilterExpression::List(filter_items))
3069 }
3070 LogicalExpression::Map(pairs) => {
3071 let filter_pairs: Vec<(String, FilterExpression)> = pairs
3072 .iter()
3073 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3074 .collect::<Result<Vec<_>>>()?;
3075 Ok(FilterExpression::Map(filter_pairs))
3076 }
3077 LogicalExpression::IndexAccess { base, index } => {
3078 let base_expr = convert_filter_expression(base)?;
3079 let index_expr = convert_filter_expression(index)?;
3080 Ok(FilterExpression::IndexAccess {
3081 base: Box::new(base_expr),
3082 index: Box::new(index_expr),
3083 })
3084 }
3085 LogicalExpression::SliceAccess { base, start, end } => {
3086 let base_expr = convert_filter_expression(base)?;
3087 let start_expr = start
3088 .as_ref()
3089 .map(|s| convert_filter_expression(s))
3090 .transpose()?
3091 .map(Box::new);
3092 let end_expr = end
3093 .as_ref()
3094 .map(|e| convert_filter_expression(e))
3095 .transpose()?
3096 .map(Box::new);
3097 Ok(FilterExpression::SliceAccess {
3098 base: Box::new(base_expr),
3099 start: start_expr,
3100 end: end_expr,
3101 })
3102 }
3103 LogicalExpression::Parameter(_) => Err(Error::Internal(
3104 "Parameters not yet supported in filters".to_string(),
3105 )),
3106 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3107 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3108 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3109 LogicalExpression::ListComprehension {
3110 variable,
3111 list_expr,
3112 filter_expr,
3113 map_expr,
3114 } => {
3115 let list = convert_filter_expression(list_expr)?;
3116 let filter = filter_expr
3117 .as_ref()
3118 .map(|f| convert_filter_expression(f))
3119 .transpose()?
3120 .map(Box::new);
3121 let map = convert_filter_expression(map_expr)?;
3122 Ok(FilterExpression::ListComprehension {
3123 variable: variable.clone(),
3124 list_expr: Box::new(list),
3125 filter_expr: filter,
3126 map_expr: Box::new(map),
3127 })
3128 }
3129 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3130 Error::Internal("Subqueries not yet supported in filters".to_string()),
3131 ),
3132 }
3133}
3134
3135fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3137 use grafeo_common::types::Value;
3138 match value {
3139 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
3141 Value::Int64(_) => LogicalType::Int64,
3142 Value::Float64(_) => LogicalType::Float64,
3143 Value::String(_) => LogicalType::String,
3144 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
3146 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, }
3149}
3150
3151fn expression_to_string(expr: &LogicalExpression) -> String {
3153 match expr {
3154 LogicalExpression::Variable(name) => name.clone(),
3155 LogicalExpression::Property { variable, property } => {
3156 format!("{variable}.{property}")
3157 }
3158 LogicalExpression::Literal(value) => format!("{value:?}"),
3159 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3160 _ => "expr".to_string(),
3161 }
3162}
3163
3164pub struct PhysicalPlan {
3166 pub operator: Box<dyn Operator>,
3168 pub columns: Vec<String>,
3170 pub adaptive_context: Option<AdaptiveContext>,
3176}
3177
3178impl PhysicalPlan {
3179 #[must_use]
3181 pub fn columns(&self) -> &[String] {
3182 &self.columns
3183 }
3184
3185 pub fn into_operator(self) -> Box<dyn Operator> {
3187 self.operator
3188 }
3189
3190 #[must_use]
3192 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3193 self.adaptive_context.as_ref()
3194 }
3195
3196 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3198 self.adaptive_context.take()
3199 }
3200}
3201
3202#[allow(dead_code)]
3206struct SingleResultOperator {
3207 result: Option<grafeo_core::execution::DataChunk>,
3208}
3209
3210impl SingleResultOperator {
3211 #[allow(dead_code)]
3212 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3213 Self { result }
3214 }
3215}
3216
3217impl Operator for SingleResultOperator {
3218 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3219 Ok(self.result.take())
3220 }
3221
3222 fn reset(&mut self) {
3223 }
3225
3226 fn name(&self) -> &'static str {
3227 "SingleResult"
3228 }
3229}
3230
3231#[cfg(test)]
3232mod tests {
3233 use super::*;
3234 use crate::query::plan::{
3235 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3236 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3237 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3238 SortKey, SortOp,
3239 };
3240 use grafeo_common::types::Value;
3241
3242 fn create_test_store() -> Arc<LpgStore> {
3243 let store = Arc::new(LpgStore::new());
3244 store.create_node(&["Person"]);
3245 store.create_node(&["Person"]);
3246 store.create_node(&["Company"]);
3247 store
3248 }
3249
3250 #[test]
3253 fn test_plan_simple_scan() {
3254 let store = create_test_store();
3255 let planner = Planner::new(store);
3256
3257 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3259 items: vec![ReturnItem {
3260 expression: LogicalExpression::Variable("n".to_string()),
3261 alias: None,
3262 }],
3263 distinct: false,
3264 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3265 variable: "n".to_string(),
3266 label: Some("Person".to_string()),
3267 input: None,
3268 })),
3269 }));
3270
3271 let physical = planner.plan(&logical).unwrap();
3272 assert_eq!(physical.columns(), &["n"]);
3273 }
3274
3275 #[test]
3276 fn test_plan_scan_without_label() {
3277 let store = create_test_store();
3278 let planner = Planner::new(store);
3279
3280 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3282 items: vec![ReturnItem {
3283 expression: LogicalExpression::Variable("n".to_string()),
3284 alias: None,
3285 }],
3286 distinct: false,
3287 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3288 variable: "n".to_string(),
3289 label: None,
3290 input: None,
3291 })),
3292 }));
3293
3294 let physical = planner.plan(&logical).unwrap();
3295 assert_eq!(physical.columns(), &["n"]);
3296 }
3297
3298 #[test]
3299 fn test_plan_return_with_alias() {
3300 let store = create_test_store();
3301 let planner = Planner::new(store);
3302
3303 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3305 items: vec![ReturnItem {
3306 expression: LogicalExpression::Variable("n".to_string()),
3307 alias: Some("person".to_string()),
3308 }],
3309 distinct: false,
3310 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3311 variable: "n".to_string(),
3312 label: Some("Person".to_string()),
3313 input: None,
3314 })),
3315 }));
3316
3317 let physical = planner.plan(&logical).unwrap();
3318 assert_eq!(physical.columns(), &["person"]);
3319 }
3320
3321 #[test]
3322 fn test_plan_return_property() {
3323 let store = create_test_store();
3324 let planner = Planner::new(store);
3325
3326 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3328 items: vec![ReturnItem {
3329 expression: LogicalExpression::Property {
3330 variable: "n".to_string(),
3331 property: "name".to_string(),
3332 },
3333 alias: None,
3334 }],
3335 distinct: false,
3336 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3337 variable: "n".to_string(),
3338 label: Some("Person".to_string()),
3339 input: None,
3340 })),
3341 }));
3342
3343 let physical = planner.plan(&logical).unwrap();
3344 assert_eq!(physical.columns(), &["n.name"]);
3345 }
3346
3347 #[test]
3348 fn test_plan_return_literal() {
3349 let store = create_test_store();
3350 let planner = Planner::new(store);
3351
3352 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3354 items: vec![ReturnItem {
3355 expression: LogicalExpression::Literal(Value::Int64(42)),
3356 alias: Some("answer".to_string()),
3357 }],
3358 distinct: false,
3359 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3360 variable: "n".to_string(),
3361 label: None,
3362 input: None,
3363 })),
3364 }));
3365
3366 let physical = planner.plan(&logical).unwrap();
3367 assert_eq!(physical.columns(), &["answer"]);
3368 }
3369
3370 #[test]
3373 fn test_plan_filter_equality() {
3374 let store = create_test_store();
3375 let planner = Planner::new(store);
3376
3377 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3379 items: vec![ReturnItem {
3380 expression: LogicalExpression::Variable("n".to_string()),
3381 alias: None,
3382 }],
3383 distinct: false,
3384 input: Box::new(LogicalOperator::Filter(FilterOp {
3385 predicate: LogicalExpression::Binary {
3386 left: Box::new(LogicalExpression::Property {
3387 variable: "n".to_string(),
3388 property: "age".to_string(),
3389 }),
3390 op: BinaryOp::Eq,
3391 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3392 },
3393 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3394 variable: "n".to_string(),
3395 label: Some("Person".to_string()),
3396 input: None,
3397 })),
3398 })),
3399 }));
3400
3401 let physical = planner.plan(&logical).unwrap();
3402 assert_eq!(physical.columns(), &["n"]);
3403 }
3404
3405 #[test]
3406 fn test_plan_filter_compound_and() {
3407 let store = create_test_store();
3408 let planner = Planner::new(store);
3409
3410 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3412 items: vec![ReturnItem {
3413 expression: LogicalExpression::Variable("n".to_string()),
3414 alias: None,
3415 }],
3416 distinct: false,
3417 input: Box::new(LogicalOperator::Filter(FilterOp {
3418 predicate: LogicalExpression::Binary {
3419 left: Box::new(LogicalExpression::Binary {
3420 left: Box::new(LogicalExpression::Property {
3421 variable: "n".to_string(),
3422 property: "age".to_string(),
3423 }),
3424 op: BinaryOp::Gt,
3425 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3426 }),
3427 op: BinaryOp::And,
3428 right: Box::new(LogicalExpression::Binary {
3429 left: Box::new(LogicalExpression::Property {
3430 variable: "n".to_string(),
3431 property: "age".to_string(),
3432 }),
3433 op: BinaryOp::Lt,
3434 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3435 }),
3436 },
3437 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3438 variable: "n".to_string(),
3439 label: None,
3440 input: None,
3441 })),
3442 })),
3443 }));
3444
3445 let physical = planner.plan(&logical).unwrap();
3446 assert_eq!(physical.columns(), &["n"]);
3447 }
3448
3449 #[test]
3450 fn test_plan_filter_unary_not() {
3451 let store = create_test_store();
3452 let planner = Planner::new(store);
3453
3454 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3456 items: vec![ReturnItem {
3457 expression: LogicalExpression::Variable("n".to_string()),
3458 alias: None,
3459 }],
3460 distinct: false,
3461 input: Box::new(LogicalOperator::Filter(FilterOp {
3462 predicate: LogicalExpression::Unary {
3463 op: UnaryOp::Not,
3464 operand: Box::new(LogicalExpression::Property {
3465 variable: "n".to_string(),
3466 property: "active".to_string(),
3467 }),
3468 },
3469 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3470 variable: "n".to_string(),
3471 label: None,
3472 input: None,
3473 })),
3474 })),
3475 }));
3476
3477 let physical = planner.plan(&logical).unwrap();
3478 assert_eq!(physical.columns(), &["n"]);
3479 }
3480
3481 #[test]
3482 fn test_plan_filter_is_null() {
3483 let store = create_test_store();
3484 let planner = Planner::new(store);
3485
3486 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3488 items: vec![ReturnItem {
3489 expression: LogicalExpression::Variable("n".to_string()),
3490 alias: None,
3491 }],
3492 distinct: false,
3493 input: Box::new(LogicalOperator::Filter(FilterOp {
3494 predicate: LogicalExpression::Unary {
3495 op: UnaryOp::IsNull,
3496 operand: Box::new(LogicalExpression::Property {
3497 variable: "n".to_string(),
3498 property: "email".to_string(),
3499 }),
3500 },
3501 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3502 variable: "n".to_string(),
3503 label: None,
3504 input: None,
3505 })),
3506 })),
3507 }));
3508
3509 let physical = planner.plan(&logical).unwrap();
3510 assert_eq!(physical.columns(), &["n"]);
3511 }
3512
3513 #[test]
3514 fn test_plan_filter_function_call() {
3515 let store = create_test_store();
3516 let planner = Planner::new(store);
3517
3518 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3520 items: vec![ReturnItem {
3521 expression: LogicalExpression::Variable("n".to_string()),
3522 alias: None,
3523 }],
3524 distinct: false,
3525 input: Box::new(LogicalOperator::Filter(FilterOp {
3526 predicate: LogicalExpression::Binary {
3527 left: Box::new(LogicalExpression::FunctionCall {
3528 name: "size".to_string(),
3529 args: vec![LogicalExpression::Property {
3530 variable: "n".to_string(),
3531 property: "friends".to_string(),
3532 }],
3533 distinct: false,
3534 }),
3535 op: BinaryOp::Gt,
3536 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3537 },
3538 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3539 variable: "n".to_string(),
3540 label: None,
3541 input: None,
3542 })),
3543 })),
3544 }));
3545
3546 let physical = planner.plan(&logical).unwrap();
3547 assert_eq!(physical.columns(), &["n"]);
3548 }
3549
3550 #[test]
3553 fn test_plan_expand_outgoing() {
3554 let store = create_test_store();
3555 let planner = Planner::new(store);
3556
3557 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3559 items: vec![
3560 ReturnItem {
3561 expression: LogicalExpression::Variable("a".to_string()),
3562 alias: None,
3563 },
3564 ReturnItem {
3565 expression: LogicalExpression::Variable("b".to_string()),
3566 alias: None,
3567 },
3568 ],
3569 distinct: false,
3570 input: Box::new(LogicalOperator::Expand(ExpandOp {
3571 from_variable: "a".to_string(),
3572 to_variable: "b".to_string(),
3573 edge_variable: None,
3574 direction: ExpandDirection::Outgoing,
3575 edge_type: Some("KNOWS".to_string()),
3576 min_hops: 1,
3577 max_hops: Some(1),
3578 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3579 variable: "a".to_string(),
3580 label: Some("Person".to_string()),
3581 input: None,
3582 })),
3583 path_alias: None,
3584 })),
3585 }));
3586
3587 let physical = planner.plan(&logical).unwrap();
3588 assert!(physical.columns().contains(&"a".to_string()));
3590 assert!(physical.columns().contains(&"b".to_string()));
3591 }
3592
3593 #[test]
3594 fn test_plan_expand_with_edge_variable() {
3595 let store = create_test_store();
3596 let planner = Planner::new(store);
3597
3598 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3600 items: vec![
3601 ReturnItem {
3602 expression: LogicalExpression::Variable("a".to_string()),
3603 alias: None,
3604 },
3605 ReturnItem {
3606 expression: LogicalExpression::Variable("r".to_string()),
3607 alias: None,
3608 },
3609 ReturnItem {
3610 expression: LogicalExpression::Variable("b".to_string()),
3611 alias: None,
3612 },
3613 ],
3614 distinct: false,
3615 input: Box::new(LogicalOperator::Expand(ExpandOp {
3616 from_variable: "a".to_string(),
3617 to_variable: "b".to_string(),
3618 edge_variable: Some("r".to_string()),
3619 direction: ExpandDirection::Outgoing,
3620 edge_type: Some("KNOWS".to_string()),
3621 min_hops: 1,
3622 max_hops: Some(1),
3623 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3624 variable: "a".to_string(),
3625 label: None,
3626 input: None,
3627 })),
3628 path_alias: None,
3629 })),
3630 }));
3631
3632 let physical = planner.plan(&logical).unwrap();
3633 assert!(physical.columns().contains(&"a".to_string()));
3634 assert!(physical.columns().contains(&"r".to_string()));
3635 assert!(physical.columns().contains(&"b".to_string()));
3636 }
3637
3638 #[test]
3641 fn test_plan_limit() {
3642 let store = create_test_store();
3643 let planner = Planner::new(store);
3644
3645 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3647 items: vec![ReturnItem {
3648 expression: LogicalExpression::Variable("n".to_string()),
3649 alias: None,
3650 }],
3651 distinct: false,
3652 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3653 count: 10,
3654 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3655 variable: "n".to_string(),
3656 label: None,
3657 input: None,
3658 })),
3659 })),
3660 }));
3661
3662 let physical = planner.plan(&logical).unwrap();
3663 assert_eq!(physical.columns(), &["n"]);
3664 }
3665
3666 #[test]
3667 fn test_plan_skip() {
3668 let store = create_test_store();
3669 let planner = Planner::new(store);
3670
3671 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3673 items: vec![ReturnItem {
3674 expression: LogicalExpression::Variable("n".to_string()),
3675 alias: None,
3676 }],
3677 distinct: false,
3678 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3679 count: 5,
3680 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3681 variable: "n".to_string(),
3682 label: None,
3683 input: None,
3684 })),
3685 })),
3686 }));
3687
3688 let physical = planner.plan(&logical).unwrap();
3689 assert_eq!(physical.columns(), &["n"]);
3690 }
3691
3692 #[test]
3693 fn test_plan_sort() {
3694 let store = create_test_store();
3695 let planner = Planner::new(store);
3696
3697 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3699 items: vec![ReturnItem {
3700 expression: LogicalExpression::Variable("n".to_string()),
3701 alias: None,
3702 }],
3703 distinct: false,
3704 input: Box::new(LogicalOperator::Sort(SortOp {
3705 keys: vec![SortKey {
3706 expression: LogicalExpression::Variable("n".to_string()),
3707 order: SortOrder::Ascending,
3708 }],
3709 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3710 variable: "n".to_string(),
3711 label: None,
3712 input: None,
3713 })),
3714 })),
3715 }));
3716
3717 let physical = planner.plan(&logical).unwrap();
3718 assert_eq!(physical.columns(), &["n"]);
3719 }
3720
3721 #[test]
3722 fn test_plan_sort_descending() {
3723 let store = create_test_store();
3724 let planner = Planner::new(store);
3725
3726 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3728 items: vec![ReturnItem {
3729 expression: LogicalExpression::Variable("n".to_string()),
3730 alias: None,
3731 }],
3732 distinct: false,
3733 input: Box::new(LogicalOperator::Sort(SortOp {
3734 keys: vec![SortKey {
3735 expression: LogicalExpression::Variable("n".to_string()),
3736 order: SortOrder::Descending,
3737 }],
3738 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3739 variable: "n".to_string(),
3740 label: None,
3741 input: None,
3742 })),
3743 })),
3744 }));
3745
3746 let physical = planner.plan(&logical).unwrap();
3747 assert_eq!(physical.columns(), &["n"]);
3748 }
3749
3750 #[test]
3751 fn test_plan_distinct() {
3752 let store = create_test_store();
3753 let planner = Planner::new(store);
3754
3755 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3757 items: vec![ReturnItem {
3758 expression: LogicalExpression::Variable("n".to_string()),
3759 alias: None,
3760 }],
3761 distinct: false,
3762 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3763 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3764 variable: "n".to_string(),
3765 label: None,
3766 input: None,
3767 })),
3768 columns: None,
3769 })),
3770 }));
3771
3772 let physical = planner.plan(&logical).unwrap();
3773 assert_eq!(physical.columns(), &["n"]);
3774 }
3775
3776 #[test]
3779 fn test_plan_aggregate_count() {
3780 let store = create_test_store();
3781 let planner = Planner::new(store);
3782
3783 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3785 items: vec![ReturnItem {
3786 expression: LogicalExpression::Variable("cnt".to_string()),
3787 alias: None,
3788 }],
3789 distinct: false,
3790 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3791 group_by: vec![],
3792 aggregates: vec![LogicalAggregateExpr {
3793 function: LogicalAggregateFunction::Count,
3794 expression: Some(LogicalExpression::Variable("n".to_string())),
3795 distinct: false,
3796 alias: Some("cnt".to_string()),
3797 percentile: None,
3798 }],
3799 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3800 variable: "n".to_string(),
3801 label: None,
3802 input: None,
3803 })),
3804 having: None,
3805 })),
3806 }));
3807
3808 let physical = planner.plan(&logical).unwrap();
3809 assert!(physical.columns().contains(&"cnt".to_string()));
3810 }
3811
3812 #[test]
3813 fn test_plan_aggregate_with_group_by() {
3814 let store = create_test_store();
3815 let planner = Planner::new(store);
3816
3817 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3819 group_by: vec![LogicalExpression::Property {
3820 variable: "n".to_string(),
3821 property: "city".to_string(),
3822 }],
3823 aggregates: vec![LogicalAggregateExpr {
3824 function: LogicalAggregateFunction::Count,
3825 expression: Some(LogicalExpression::Variable("n".to_string())),
3826 distinct: false,
3827 alias: Some("cnt".to_string()),
3828 percentile: None,
3829 }],
3830 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3831 variable: "n".to_string(),
3832 label: Some("Person".to_string()),
3833 input: None,
3834 })),
3835 having: None,
3836 }));
3837
3838 let physical = planner.plan(&logical).unwrap();
3839 assert_eq!(physical.columns().len(), 2);
3840 }
3841
3842 #[test]
3843 fn test_plan_aggregate_sum() {
3844 let store = create_test_store();
3845 let planner = Planner::new(store);
3846
3847 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3849 group_by: vec![],
3850 aggregates: vec![LogicalAggregateExpr {
3851 function: LogicalAggregateFunction::Sum,
3852 expression: Some(LogicalExpression::Property {
3853 variable: "n".to_string(),
3854 property: "value".to_string(),
3855 }),
3856 distinct: false,
3857 alias: Some("total".to_string()),
3858 percentile: None,
3859 }],
3860 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3861 variable: "n".to_string(),
3862 label: None,
3863 input: None,
3864 })),
3865 having: None,
3866 }));
3867
3868 let physical = planner.plan(&logical).unwrap();
3869 assert!(physical.columns().contains(&"total".to_string()));
3870 }
3871
3872 #[test]
3873 fn test_plan_aggregate_avg() {
3874 let store = create_test_store();
3875 let planner = Planner::new(store);
3876
3877 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3879 group_by: vec![],
3880 aggregates: vec![LogicalAggregateExpr {
3881 function: LogicalAggregateFunction::Avg,
3882 expression: Some(LogicalExpression::Property {
3883 variable: "n".to_string(),
3884 property: "score".to_string(),
3885 }),
3886 distinct: false,
3887 alias: Some("average".to_string()),
3888 percentile: None,
3889 }],
3890 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3891 variable: "n".to_string(),
3892 label: None,
3893 input: None,
3894 })),
3895 having: None,
3896 }));
3897
3898 let physical = planner.plan(&logical).unwrap();
3899 assert!(physical.columns().contains(&"average".to_string()));
3900 }
3901
3902 #[test]
3903 fn test_plan_aggregate_min_max() {
3904 let store = create_test_store();
3905 let planner = Planner::new(store);
3906
3907 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3909 group_by: vec![],
3910 aggregates: vec![
3911 LogicalAggregateExpr {
3912 function: LogicalAggregateFunction::Min,
3913 expression: Some(LogicalExpression::Property {
3914 variable: "n".to_string(),
3915 property: "age".to_string(),
3916 }),
3917 distinct: false,
3918 alias: Some("youngest".to_string()),
3919 percentile: None,
3920 },
3921 LogicalAggregateExpr {
3922 function: LogicalAggregateFunction::Max,
3923 expression: Some(LogicalExpression::Property {
3924 variable: "n".to_string(),
3925 property: "age".to_string(),
3926 }),
3927 distinct: false,
3928 alias: Some("oldest".to_string()),
3929 percentile: None,
3930 },
3931 ],
3932 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3933 variable: "n".to_string(),
3934 label: None,
3935 input: None,
3936 })),
3937 having: None,
3938 }));
3939
3940 let physical = planner.plan(&logical).unwrap();
3941 assert!(physical.columns().contains(&"youngest".to_string()));
3942 assert!(physical.columns().contains(&"oldest".to_string()));
3943 }
3944
3945 #[test]
3948 fn test_plan_inner_join() {
3949 let store = create_test_store();
3950 let planner = Planner::new(store);
3951
3952 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3954 items: vec![
3955 ReturnItem {
3956 expression: LogicalExpression::Variable("a".to_string()),
3957 alias: None,
3958 },
3959 ReturnItem {
3960 expression: LogicalExpression::Variable("b".to_string()),
3961 alias: None,
3962 },
3963 ],
3964 distinct: false,
3965 input: Box::new(LogicalOperator::Join(JoinOp {
3966 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3967 variable: "a".to_string(),
3968 label: Some("Person".to_string()),
3969 input: None,
3970 })),
3971 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3972 variable: "b".to_string(),
3973 label: Some("Company".to_string()),
3974 input: None,
3975 })),
3976 join_type: JoinType::Inner,
3977 conditions: vec![JoinCondition {
3978 left: LogicalExpression::Variable("a".to_string()),
3979 right: LogicalExpression::Variable("b".to_string()),
3980 }],
3981 })),
3982 }));
3983
3984 let physical = planner.plan(&logical).unwrap();
3985 assert!(physical.columns().contains(&"a".to_string()));
3986 assert!(physical.columns().contains(&"b".to_string()));
3987 }
3988
3989 #[test]
3990 fn test_plan_cross_join() {
3991 let store = create_test_store();
3992 let planner = Planner::new(store);
3993
3994 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3996 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3997 variable: "a".to_string(),
3998 label: None,
3999 input: None,
4000 })),
4001 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4002 variable: "b".to_string(),
4003 label: None,
4004 input: None,
4005 })),
4006 join_type: JoinType::Cross,
4007 conditions: vec![],
4008 }));
4009
4010 let physical = planner.plan(&logical).unwrap();
4011 assert_eq!(physical.columns().len(), 2);
4012 }
4013
4014 #[test]
4015 fn test_plan_left_join() {
4016 let store = create_test_store();
4017 let planner = Planner::new(store);
4018
4019 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4020 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4021 variable: "a".to_string(),
4022 label: None,
4023 input: None,
4024 })),
4025 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4026 variable: "b".to_string(),
4027 label: None,
4028 input: None,
4029 })),
4030 join_type: JoinType::Left,
4031 conditions: vec![],
4032 }));
4033
4034 let physical = planner.plan(&logical).unwrap();
4035 assert_eq!(physical.columns().len(), 2);
4036 }
4037
4038 #[test]
4041 fn test_plan_create_node() {
4042 let store = create_test_store();
4043 let planner = Planner::new(store);
4044
4045 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4047 variable: "n".to_string(),
4048 labels: vec!["Person".to_string()],
4049 properties: vec![(
4050 "name".to_string(),
4051 LogicalExpression::Literal(Value::String("Alice".into())),
4052 )],
4053 input: None,
4054 }));
4055
4056 let physical = planner.plan(&logical).unwrap();
4057 assert!(physical.columns().contains(&"n".to_string()));
4058 }
4059
4060 #[test]
4061 fn test_plan_create_edge() {
4062 let store = create_test_store();
4063 let planner = Planner::new(store);
4064
4065 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4067 variable: Some("r".to_string()),
4068 from_variable: "a".to_string(),
4069 to_variable: "b".to_string(),
4070 edge_type: "KNOWS".to_string(),
4071 properties: vec![],
4072 input: Box::new(LogicalOperator::Join(JoinOp {
4073 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4074 variable: "a".to_string(),
4075 label: None,
4076 input: None,
4077 })),
4078 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4079 variable: "b".to_string(),
4080 label: None,
4081 input: None,
4082 })),
4083 join_type: JoinType::Cross,
4084 conditions: vec![],
4085 })),
4086 }));
4087
4088 let physical = planner.plan(&logical).unwrap();
4089 assert!(physical.columns().contains(&"r".to_string()));
4090 }
4091
4092 #[test]
4093 fn test_plan_delete_node() {
4094 let store = create_test_store();
4095 let planner = Planner::new(store);
4096
4097 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4099 variable: "n".to_string(),
4100 detach: false,
4101 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4102 variable: "n".to_string(),
4103 label: None,
4104 input: None,
4105 })),
4106 }));
4107
4108 let physical = planner.plan(&logical).unwrap();
4109 assert!(physical.columns().contains(&"deleted_count".to_string()));
4110 }
4111
4112 #[test]
4115 fn test_plan_empty_errors() {
4116 let store = create_test_store();
4117 let planner = Planner::new(store);
4118
4119 let logical = LogicalPlan::new(LogicalOperator::Empty);
4120 let result = planner.plan(&logical);
4121 assert!(result.is_err());
4122 }
4123
4124 #[test]
4125 fn test_plan_missing_variable_in_return() {
4126 let store = create_test_store();
4127 let planner = Planner::new(store);
4128
4129 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4131 items: vec![ReturnItem {
4132 expression: LogicalExpression::Variable("missing".to_string()),
4133 alias: None,
4134 }],
4135 distinct: false,
4136 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4137 variable: "n".to_string(),
4138 label: None,
4139 input: None,
4140 })),
4141 }));
4142
4143 let result = planner.plan(&logical);
4144 assert!(result.is_err());
4145 }
4146
4147 #[test]
4150 fn test_convert_binary_ops() {
4151 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4152 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4153 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4154 assert!(convert_binary_op(BinaryOp::Le).is_ok());
4155 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4156 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4157 assert!(convert_binary_op(BinaryOp::And).is_ok());
4158 assert!(convert_binary_op(BinaryOp::Or).is_ok());
4159 assert!(convert_binary_op(BinaryOp::Add).is_ok());
4160 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4161 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4162 assert!(convert_binary_op(BinaryOp::Div).is_ok());
4163 }
4164
4165 #[test]
4166 fn test_convert_unary_ops() {
4167 assert!(convert_unary_op(UnaryOp::Not).is_ok());
4168 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4169 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4170 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4171 }
4172
4173 #[test]
4174 fn test_convert_aggregate_functions() {
4175 assert!(matches!(
4176 convert_aggregate_function(LogicalAggregateFunction::Count),
4177 PhysicalAggregateFunction::Count
4178 ));
4179 assert!(matches!(
4180 convert_aggregate_function(LogicalAggregateFunction::Sum),
4181 PhysicalAggregateFunction::Sum
4182 ));
4183 assert!(matches!(
4184 convert_aggregate_function(LogicalAggregateFunction::Avg),
4185 PhysicalAggregateFunction::Avg
4186 ));
4187 assert!(matches!(
4188 convert_aggregate_function(LogicalAggregateFunction::Min),
4189 PhysicalAggregateFunction::Min
4190 ));
4191 assert!(matches!(
4192 convert_aggregate_function(LogicalAggregateFunction::Max),
4193 PhysicalAggregateFunction::Max
4194 ));
4195 }
4196
4197 #[test]
4198 fn test_planner_accessors() {
4199 let store = create_test_store();
4200 let planner = Planner::new(Arc::clone(&store));
4201
4202 assert!(planner.tx_id().is_none());
4203 assert!(planner.tx_manager().is_none());
4204 let _ = planner.viewing_epoch(); }
4206
4207 #[test]
4208 fn test_physical_plan_accessors() {
4209 let store = create_test_store();
4210 let planner = Planner::new(store);
4211
4212 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4213 variable: "n".to_string(),
4214 label: None,
4215 input: None,
4216 }));
4217
4218 let physical = planner.plan(&logical).unwrap();
4219 assert_eq!(physical.columns(), &["n"]);
4220
4221 let _ = physical.into_operator();
4223 }
4224
4225 #[test]
4228 fn test_plan_adaptive_with_scan() {
4229 let store = create_test_store();
4230 let planner = Planner::new(store);
4231
4232 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4234 items: vec![ReturnItem {
4235 expression: LogicalExpression::Variable("n".to_string()),
4236 alias: None,
4237 }],
4238 distinct: false,
4239 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4240 variable: "n".to_string(),
4241 label: Some("Person".to_string()),
4242 input: None,
4243 })),
4244 }));
4245
4246 let physical = planner.plan_adaptive(&logical).unwrap();
4247 assert_eq!(physical.columns(), &["n"]);
4248 assert!(physical.adaptive_context.is_some());
4250 }
4251
4252 #[test]
4253 fn test_plan_adaptive_with_filter() {
4254 let store = create_test_store();
4255 let planner = Planner::new(store);
4256
4257 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4259 items: vec![ReturnItem {
4260 expression: LogicalExpression::Variable("n".to_string()),
4261 alias: None,
4262 }],
4263 distinct: false,
4264 input: Box::new(LogicalOperator::Filter(FilterOp {
4265 predicate: LogicalExpression::Binary {
4266 left: Box::new(LogicalExpression::Property {
4267 variable: "n".to_string(),
4268 property: "age".to_string(),
4269 }),
4270 op: BinaryOp::Gt,
4271 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4272 },
4273 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4274 variable: "n".to_string(),
4275 label: None,
4276 input: None,
4277 })),
4278 })),
4279 }));
4280
4281 let physical = planner.plan_adaptive(&logical).unwrap();
4282 assert!(physical.adaptive_context.is_some());
4283 }
4284
4285 #[test]
4286 fn test_plan_adaptive_with_expand() {
4287 let store = create_test_store();
4288 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4289
4290 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4292 items: vec![
4293 ReturnItem {
4294 expression: LogicalExpression::Variable("a".to_string()),
4295 alias: None,
4296 },
4297 ReturnItem {
4298 expression: LogicalExpression::Variable("b".to_string()),
4299 alias: None,
4300 },
4301 ],
4302 distinct: false,
4303 input: Box::new(LogicalOperator::Expand(ExpandOp {
4304 from_variable: "a".to_string(),
4305 to_variable: "b".to_string(),
4306 edge_variable: None,
4307 direction: ExpandDirection::Outgoing,
4308 edge_type: Some("KNOWS".to_string()),
4309 min_hops: 1,
4310 max_hops: Some(1),
4311 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4312 variable: "a".to_string(),
4313 label: None,
4314 input: None,
4315 })),
4316 path_alias: None,
4317 })),
4318 }));
4319
4320 let physical = planner.plan_adaptive(&logical).unwrap();
4321 assert!(physical.adaptive_context.is_some());
4322 }
4323
4324 #[test]
4325 fn test_plan_adaptive_with_join() {
4326 let store = create_test_store();
4327 let planner = Planner::new(store);
4328
4329 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4330 items: vec![
4331 ReturnItem {
4332 expression: LogicalExpression::Variable("a".to_string()),
4333 alias: None,
4334 },
4335 ReturnItem {
4336 expression: LogicalExpression::Variable("b".to_string()),
4337 alias: None,
4338 },
4339 ],
4340 distinct: false,
4341 input: Box::new(LogicalOperator::Join(JoinOp {
4342 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4343 variable: "a".to_string(),
4344 label: None,
4345 input: None,
4346 })),
4347 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4348 variable: "b".to_string(),
4349 label: None,
4350 input: None,
4351 })),
4352 join_type: JoinType::Cross,
4353 conditions: vec![],
4354 })),
4355 }));
4356
4357 let physical = planner.plan_adaptive(&logical).unwrap();
4358 assert!(physical.adaptive_context.is_some());
4359 }
4360
4361 #[test]
4362 fn test_plan_adaptive_with_aggregate() {
4363 let store = create_test_store();
4364 let planner = Planner::new(store);
4365
4366 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4367 group_by: vec![],
4368 aggregates: vec![LogicalAggregateExpr {
4369 function: LogicalAggregateFunction::Count,
4370 expression: Some(LogicalExpression::Variable("n".to_string())),
4371 distinct: false,
4372 alias: Some("cnt".to_string()),
4373 percentile: None,
4374 }],
4375 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4376 variable: "n".to_string(),
4377 label: None,
4378 input: None,
4379 })),
4380 having: None,
4381 }));
4382
4383 let physical = planner.plan_adaptive(&logical).unwrap();
4384 assert!(physical.adaptive_context.is_some());
4385 }
4386
4387 #[test]
4388 fn test_plan_adaptive_with_distinct() {
4389 let store = create_test_store();
4390 let planner = Planner::new(store);
4391
4392 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4393 items: vec![ReturnItem {
4394 expression: LogicalExpression::Variable("n".to_string()),
4395 alias: None,
4396 }],
4397 distinct: false,
4398 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4399 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4400 variable: "n".to_string(),
4401 label: None,
4402 input: None,
4403 })),
4404 columns: None,
4405 })),
4406 }));
4407
4408 let physical = planner.plan_adaptive(&logical).unwrap();
4409 assert!(physical.adaptive_context.is_some());
4410 }
4411
4412 #[test]
4413 fn test_plan_adaptive_with_limit() {
4414 let store = create_test_store();
4415 let planner = Planner::new(store);
4416
4417 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4418 items: vec![ReturnItem {
4419 expression: LogicalExpression::Variable("n".to_string()),
4420 alias: None,
4421 }],
4422 distinct: false,
4423 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4424 count: 10,
4425 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4426 variable: "n".to_string(),
4427 label: None,
4428 input: None,
4429 })),
4430 })),
4431 }));
4432
4433 let physical = planner.plan_adaptive(&logical).unwrap();
4434 assert!(physical.adaptive_context.is_some());
4435 }
4436
4437 #[test]
4438 fn test_plan_adaptive_with_skip() {
4439 let store = create_test_store();
4440 let planner = Planner::new(store);
4441
4442 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4443 items: vec![ReturnItem {
4444 expression: LogicalExpression::Variable("n".to_string()),
4445 alias: None,
4446 }],
4447 distinct: false,
4448 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4449 count: 5,
4450 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4451 variable: "n".to_string(),
4452 label: None,
4453 input: None,
4454 })),
4455 })),
4456 }));
4457
4458 let physical = planner.plan_adaptive(&logical).unwrap();
4459 assert!(physical.adaptive_context.is_some());
4460 }
4461
4462 #[test]
4463 fn test_plan_adaptive_with_sort() {
4464 let store = create_test_store();
4465 let planner = Planner::new(store);
4466
4467 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4468 items: vec![ReturnItem {
4469 expression: LogicalExpression::Variable("n".to_string()),
4470 alias: None,
4471 }],
4472 distinct: false,
4473 input: Box::new(LogicalOperator::Sort(SortOp {
4474 keys: vec![SortKey {
4475 expression: LogicalExpression::Variable("n".to_string()),
4476 order: SortOrder::Ascending,
4477 }],
4478 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4479 variable: "n".to_string(),
4480 label: None,
4481 input: None,
4482 })),
4483 })),
4484 }));
4485
4486 let physical = planner.plan_adaptive(&logical).unwrap();
4487 assert!(physical.adaptive_context.is_some());
4488 }
4489
4490 #[test]
4491 fn test_plan_adaptive_with_union() {
4492 let store = create_test_store();
4493 let planner = Planner::new(store);
4494
4495 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4496 items: vec![ReturnItem {
4497 expression: LogicalExpression::Variable("n".to_string()),
4498 alias: None,
4499 }],
4500 distinct: false,
4501 input: Box::new(LogicalOperator::Union(UnionOp {
4502 inputs: vec![
4503 LogicalOperator::NodeScan(NodeScanOp {
4504 variable: "n".to_string(),
4505 label: Some("Person".to_string()),
4506 input: None,
4507 }),
4508 LogicalOperator::NodeScan(NodeScanOp {
4509 variable: "n".to_string(),
4510 label: Some("Company".to_string()),
4511 input: None,
4512 }),
4513 ],
4514 })),
4515 }));
4516
4517 let physical = planner.plan_adaptive(&logical).unwrap();
4518 assert!(physical.adaptive_context.is_some());
4519 }
4520
4521 #[test]
4524 fn test_plan_expand_variable_length() {
4525 let store = create_test_store();
4526 let planner = Planner::new(store);
4527
4528 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4530 items: vec![
4531 ReturnItem {
4532 expression: LogicalExpression::Variable("a".to_string()),
4533 alias: None,
4534 },
4535 ReturnItem {
4536 expression: LogicalExpression::Variable("b".to_string()),
4537 alias: None,
4538 },
4539 ],
4540 distinct: false,
4541 input: Box::new(LogicalOperator::Expand(ExpandOp {
4542 from_variable: "a".to_string(),
4543 to_variable: "b".to_string(),
4544 edge_variable: None,
4545 direction: ExpandDirection::Outgoing,
4546 edge_type: Some("KNOWS".to_string()),
4547 min_hops: 1,
4548 max_hops: Some(3),
4549 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4550 variable: "a".to_string(),
4551 label: None,
4552 input: None,
4553 })),
4554 path_alias: None,
4555 })),
4556 }));
4557
4558 let physical = planner.plan(&logical).unwrap();
4559 assert!(physical.columns().contains(&"a".to_string()));
4560 assert!(physical.columns().contains(&"b".to_string()));
4561 }
4562
4563 #[test]
4564 fn test_plan_expand_with_path_alias() {
4565 let store = create_test_store();
4566 let planner = Planner::new(store);
4567
4568 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4570 items: vec![
4571 ReturnItem {
4572 expression: LogicalExpression::Variable("a".to_string()),
4573 alias: None,
4574 },
4575 ReturnItem {
4576 expression: LogicalExpression::Variable("b".to_string()),
4577 alias: None,
4578 },
4579 ],
4580 distinct: false,
4581 input: Box::new(LogicalOperator::Expand(ExpandOp {
4582 from_variable: "a".to_string(),
4583 to_variable: "b".to_string(),
4584 edge_variable: None,
4585 direction: ExpandDirection::Outgoing,
4586 edge_type: Some("KNOWS".to_string()),
4587 min_hops: 1,
4588 max_hops: Some(3),
4589 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4590 variable: "a".to_string(),
4591 label: None,
4592 input: None,
4593 })),
4594 path_alias: Some("p".to_string()),
4595 })),
4596 }));
4597
4598 let physical = planner.plan(&logical).unwrap();
4599 assert!(physical.columns().contains(&"a".to_string()));
4601 assert!(physical.columns().contains(&"b".to_string()));
4602 }
4603
4604 #[test]
4605 fn test_plan_expand_incoming() {
4606 let store = create_test_store();
4607 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4608
4609 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4611 items: vec![
4612 ReturnItem {
4613 expression: LogicalExpression::Variable("a".to_string()),
4614 alias: None,
4615 },
4616 ReturnItem {
4617 expression: LogicalExpression::Variable("b".to_string()),
4618 alias: None,
4619 },
4620 ],
4621 distinct: false,
4622 input: Box::new(LogicalOperator::Expand(ExpandOp {
4623 from_variable: "a".to_string(),
4624 to_variable: "b".to_string(),
4625 edge_variable: None,
4626 direction: ExpandDirection::Incoming,
4627 edge_type: Some("KNOWS".to_string()),
4628 min_hops: 1,
4629 max_hops: Some(1),
4630 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4631 variable: "a".to_string(),
4632 label: None,
4633 input: None,
4634 })),
4635 path_alias: None,
4636 })),
4637 }));
4638
4639 let physical = planner.plan(&logical).unwrap();
4640 assert!(physical.columns().contains(&"a".to_string()));
4641 assert!(physical.columns().contains(&"b".to_string()));
4642 }
4643
4644 #[test]
4645 fn test_plan_expand_both_directions() {
4646 let store = create_test_store();
4647 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4648
4649 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4651 items: vec![
4652 ReturnItem {
4653 expression: LogicalExpression::Variable("a".to_string()),
4654 alias: None,
4655 },
4656 ReturnItem {
4657 expression: LogicalExpression::Variable("b".to_string()),
4658 alias: None,
4659 },
4660 ],
4661 distinct: false,
4662 input: Box::new(LogicalOperator::Expand(ExpandOp {
4663 from_variable: "a".to_string(),
4664 to_variable: "b".to_string(),
4665 edge_variable: None,
4666 direction: ExpandDirection::Both,
4667 edge_type: Some("KNOWS".to_string()),
4668 min_hops: 1,
4669 max_hops: Some(1),
4670 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4671 variable: "a".to_string(),
4672 label: None,
4673 input: None,
4674 })),
4675 path_alias: None,
4676 })),
4677 }));
4678
4679 let physical = planner.plan(&logical).unwrap();
4680 assert!(physical.columns().contains(&"a".to_string()));
4681 assert!(physical.columns().contains(&"b".to_string()));
4682 }
4683
4684 #[test]
4687 fn test_planner_with_context() {
4688 use crate::transaction::TransactionManager;
4689
4690 let store = create_test_store();
4691 let tx_manager = Arc::new(TransactionManager::new());
4692 let tx_id = tx_manager.begin();
4693 let epoch = tx_manager.current_epoch();
4694
4695 let planner = Planner::with_context(
4696 Arc::clone(&store),
4697 Arc::clone(&tx_manager),
4698 Some(tx_id),
4699 epoch,
4700 );
4701
4702 assert_eq!(planner.tx_id(), Some(tx_id));
4703 assert!(planner.tx_manager().is_some());
4704 assert_eq!(planner.viewing_epoch(), epoch);
4705 }
4706
4707 #[test]
4708 fn test_planner_with_factorized_execution_disabled() {
4709 let store = create_test_store();
4710 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4711
4712 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4714 items: vec![
4715 ReturnItem {
4716 expression: LogicalExpression::Variable("a".to_string()),
4717 alias: None,
4718 },
4719 ReturnItem {
4720 expression: LogicalExpression::Variable("c".to_string()),
4721 alias: None,
4722 },
4723 ],
4724 distinct: false,
4725 input: Box::new(LogicalOperator::Expand(ExpandOp {
4726 from_variable: "b".to_string(),
4727 to_variable: "c".to_string(),
4728 edge_variable: None,
4729 direction: ExpandDirection::Outgoing,
4730 edge_type: None,
4731 min_hops: 1,
4732 max_hops: Some(1),
4733 input: Box::new(LogicalOperator::Expand(ExpandOp {
4734 from_variable: "a".to_string(),
4735 to_variable: "b".to_string(),
4736 edge_variable: None,
4737 direction: ExpandDirection::Outgoing,
4738 edge_type: None,
4739 min_hops: 1,
4740 max_hops: Some(1),
4741 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4742 variable: "a".to_string(),
4743 label: None,
4744 input: None,
4745 })),
4746 path_alias: None,
4747 })),
4748 path_alias: None,
4749 })),
4750 }));
4751
4752 let physical = planner.plan(&logical).unwrap();
4753 assert!(physical.columns().contains(&"a".to_string()));
4754 assert!(physical.columns().contains(&"c".to_string()));
4755 }
4756
4757 #[test]
4760 fn test_plan_sort_by_property() {
4761 let store = create_test_store();
4762 let planner = Planner::new(store);
4763
4764 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4766 items: vec![ReturnItem {
4767 expression: LogicalExpression::Variable("n".to_string()),
4768 alias: None,
4769 }],
4770 distinct: false,
4771 input: Box::new(LogicalOperator::Sort(SortOp {
4772 keys: vec![SortKey {
4773 expression: LogicalExpression::Property {
4774 variable: "n".to_string(),
4775 property: "name".to_string(),
4776 },
4777 order: SortOrder::Ascending,
4778 }],
4779 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4780 variable: "n".to_string(),
4781 label: None,
4782 input: None,
4783 })),
4784 })),
4785 }));
4786
4787 let physical = planner.plan(&logical).unwrap();
4788 assert!(physical.columns().contains(&"n".to_string()));
4790 }
4791
4792 #[test]
4795 fn test_plan_scan_with_input() {
4796 let store = create_test_store();
4797 let planner = Planner::new(store);
4798
4799 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4801 items: vec![
4802 ReturnItem {
4803 expression: LogicalExpression::Variable("a".to_string()),
4804 alias: None,
4805 },
4806 ReturnItem {
4807 expression: LogicalExpression::Variable("b".to_string()),
4808 alias: None,
4809 },
4810 ],
4811 distinct: false,
4812 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4813 variable: "b".to_string(),
4814 label: Some("Company".to_string()),
4815 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4816 variable: "a".to_string(),
4817 label: Some("Person".to_string()),
4818 input: None,
4819 }))),
4820 })),
4821 }));
4822
4823 let physical = planner.plan(&logical).unwrap();
4824 assert!(physical.columns().contains(&"a".to_string()));
4825 assert!(physical.columns().contains(&"b".to_string()));
4826 }
4827}