1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29 UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37struct RangeBounds<'a> {
39 min: Option<&'a Value>,
40 max: Option<&'a Value>,
41 min_inclusive: bool,
42 max_inclusive: bool,
43}
44
45pub struct Planner {
47 store: Arc<LpgStore>,
49 tx_manager: Option<Arc<TransactionManager>>,
51 tx_id: Option<TxId>,
53 viewing_epoch: EpochId,
55 anon_edge_counter: std::cell::Cell<u32>,
57 factorized_execution: bool,
59}
60
61impl Planner {
62 #[must_use]
67 pub fn new(store: Arc<LpgStore>) -> Self {
68 let epoch = store.current_epoch();
69 Self {
70 store,
71 tx_manager: None,
72 tx_id: None,
73 viewing_epoch: epoch,
74 anon_edge_counter: std::cell::Cell::new(0),
75 factorized_execution: true,
76 }
77 }
78
79 #[must_use]
88 pub fn with_context(
89 store: Arc<LpgStore>,
90 tx_manager: Arc<TransactionManager>,
91 tx_id: Option<TxId>,
92 viewing_epoch: EpochId,
93 ) -> Self {
94 Self {
95 store,
96 tx_manager: Some(tx_manager),
97 tx_id,
98 viewing_epoch,
99 anon_edge_counter: std::cell::Cell::new(0),
100 factorized_execution: true,
101 }
102 }
103
104 #[must_use]
106 pub fn viewing_epoch(&self) -> EpochId {
107 self.viewing_epoch
108 }
109
110 #[must_use]
112 pub fn tx_id(&self) -> Option<TxId> {
113 self.tx_id
114 }
115
116 #[must_use]
118 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119 self.tx_manager.as_ref()
120 }
121
122 #[must_use]
124 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125 self.factorized_execution = enabled;
126 self
127 }
128
129 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133 match op {
134 LogicalOperator::Expand(expand) => {
135 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138 if is_single_hop {
139 let (inner_count, base) = Self::count_expand_chain(&expand.input);
140 (inner_count + 1, base)
141 } else {
142 (0, op)
144 }
145 }
146 _ => (0, op),
147 }
148 }
149
150 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154 let mut chain = Vec::new();
155 let mut current = op;
156
157 while let LogicalOperator::Expand(expand) = current {
158 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160 if !is_single_hop {
161 break;
162 }
163 chain.push(expand);
164 current = &expand.input;
165 }
166
167 chain.reverse();
169 chain
170 }
171
172 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179 Ok(PhysicalPlan {
180 operator,
181 columns,
182 adaptive_context: None,
183 })
184 }
185
186 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197 let mut adaptive_context = AdaptiveContext::new();
199 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201 Ok(PhysicalPlan {
202 operator,
203 columns,
204 adaptive_context: Some(adaptive_context),
205 })
206 }
207
208 fn collect_cardinality_estimates(
210 &self,
211 op: &LogicalOperator,
212 ctx: &mut AdaptiveContext,
213 depth: usize,
214 ) {
215 match op {
216 LogicalOperator::NodeScan(scan) => {
217 let estimate = if let Some(label) = &scan.label {
219 self.store.nodes_by_label(label).len() as f64
220 } else {
221 self.store.node_count() as f64
222 };
223 let id = format!("scan_{}", scan.variable);
224 ctx.set_estimate(&id, estimate);
225
226 if let Some(input) = &scan.input {
228 self.collect_cardinality_estimates(input, ctx, depth + 1);
229 }
230 }
231 LogicalOperator::Filter(filter) => {
232 let input_estimate = self.estimate_cardinality(&filter.input);
234 let estimate = input_estimate * 0.3;
235 let id = format!("filter_{depth}");
236 ctx.set_estimate(&id, estimate);
237
238 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239 }
240 LogicalOperator::Expand(expand) => {
241 let input_estimate = self.estimate_cardinality(&expand.input);
243 let avg_degree = 10.0; let estimate = input_estimate * avg_degree;
245 let id = format!("expand_{}", expand.to_variable);
246 ctx.set_estimate(&id, estimate);
247
248 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
249 }
250 LogicalOperator::Join(join) => {
251 let left_est = self.estimate_cardinality(&join.left);
253 let right_est = self.estimate_cardinality(&join.right);
254 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
256 ctx.set_estimate(&id, estimate);
257
258 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
259 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
260 }
261 LogicalOperator::Aggregate(agg) => {
262 let input_estimate = self.estimate_cardinality(&agg.input);
264 let estimate = if agg.group_by.is_empty() {
265 1.0 } else {
267 (input_estimate * 0.1).max(1.0) };
269 let id = format!("aggregate_{depth}");
270 ctx.set_estimate(&id, estimate);
271
272 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
273 }
274 LogicalOperator::Distinct(distinct) => {
275 let input_estimate = self.estimate_cardinality(&distinct.input);
276 let estimate = (input_estimate * 0.5).max(1.0);
277 let id = format!("distinct_{depth}");
278 ctx.set_estimate(&id, estimate);
279
280 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
281 }
282 LogicalOperator::Return(ret) => {
283 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
284 }
285 LogicalOperator::Limit(limit) => {
286 let input_estimate = self.estimate_cardinality(&limit.input);
287 let estimate = (input_estimate).min(limit.count as f64);
288 let id = format!("limit_{depth}");
289 ctx.set_estimate(&id, estimate);
290
291 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
292 }
293 LogicalOperator::Skip(skip) => {
294 let input_estimate = self.estimate_cardinality(&skip.input);
295 let estimate = (input_estimate - skip.count as f64).max(0.0);
296 let id = format!("skip_{depth}");
297 ctx.set_estimate(&id, estimate);
298
299 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
300 }
301 LogicalOperator::Sort(sort) => {
302 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
304 }
305 LogicalOperator::Union(union) => {
306 let estimate: f64 = union
307 .inputs
308 .iter()
309 .map(|input| self.estimate_cardinality(input))
310 .sum();
311 let id = format!("union_{depth}");
312 ctx.set_estimate(&id, estimate);
313
314 for input in &union.inputs {
315 self.collect_cardinality_estimates(input, ctx, depth + 1);
316 }
317 }
318 _ => {
319 }
321 }
322 }
323
324 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
326 match op {
327 LogicalOperator::NodeScan(scan) => {
328 if let Some(label) = &scan.label {
329 self.store.nodes_by_label(label).len() as f64
330 } else {
331 self.store.node_count() as f64
332 }
333 }
334 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
335 LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
336 LogicalOperator::Join(join) => {
337 let left = self.estimate_cardinality(&join.left);
338 let right = self.estimate_cardinality(&join.right);
339 (left * right).sqrt()
340 }
341 LogicalOperator::Aggregate(agg) => {
342 if agg.group_by.is_empty() {
343 1.0
344 } else {
345 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
346 }
347 }
348 LogicalOperator::Distinct(distinct) => {
349 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
350 }
351 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
352 LogicalOperator::Limit(limit) => self
353 .estimate_cardinality(&limit.input)
354 .min(limit.count as f64),
355 LogicalOperator::Skip(skip) => {
356 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
357 }
358 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
359 LogicalOperator::Union(union) => union
360 .inputs
361 .iter()
362 .map(|input| self.estimate_cardinality(input))
363 .sum(),
364 _ => 1000.0, }
366 }
367
368 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
370 match op {
371 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
372 LogicalOperator::Expand(expand) => {
373 if self.factorized_execution {
375 let (chain_len, _base) = Self::count_expand_chain(op);
376 if chain_len >= 2 {
377 return self.plan_expand_chain(op);
379 }
380 }
381 self.plan_expand(expand)
382 }
383 LogicalOperator::Return(ret) => self.plan_return(ret),
384 LogicalOperator::Filter(filter) => self.plan_filter(filter),
385 LogicalOperator::Project(project) => self.plan_project(project),
386 LogicalOperator::Limit(limit) => self.plan_limit(limit),
387 LogicalOperator::Skip(skip) => self.plan_skip(skip),
388 LogicalOperator::Sort(sort) => self.plan_sort(sort),
389 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
390 LogicalOperator::Join(join) => self.plan_join(join),
391 LogicalOperator::Union(union) => self.plan_union(union),
392 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
393 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
394 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
395 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
396 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
397 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
398 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
399 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
400 LogicalOperator::Merge(merge) => self.plan_merge(merge),
401 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
402 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
403 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
404 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
405 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
406 LogicalOperator::VectorScan(_) => Err(Error::Internal(
407 "VectorScan requires vector-index feature".to_string(),
408 )),
409 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
410 "VectorJoin requires vector-index feature".to_string(),
411 )),
412 _ => Err(Error::Internal(format!(
413 "Unsupported operator: {:?}",
414 std::mem::discriminant(op)
415 ))),
416 }
417 }
418
419 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
421 let scan_op = if let Some(label) = &scan.label {
422 ScanOperator::with_label(Arc::clone(&self.store), label)
423 } else {
424 ScanOperator::new(Arc::clone(&self.store))
425 };
426
427 let scan_operator: Box<dyn Operator> =
429 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
430
431 if let Some(input) = &scan.input {
433 let (input_op, mut input_columns) = self.plan_operator(input)?;
434
435 let mut output_schema: Vec<LogicalType> =
437 input_columns.iter().map(|_| LogicalType::Any).collect();
438 output_schema.push(LogicalType::Node);
439
440 input_columns.push(scan.variable.clone());
442
443 let join_op = Box::new(NestedLoopJoinOperator::new(
445 input_op,
446 scan_operator,
447 None, PhysicalJoinType::Cross,
449 output_schema,
450 ));
451
452 Ok((join_op, input_columns))
453 } else {
454 let columns = vec![scan.variable.clone()];
455 Ok((scan_operator, columns))
456 }
457 }
458
459 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
461 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
463
464 let source_column = input_columns
466 .iter()
467 .position(|c| c == &expand.from_variable)
468 .ok_or_else(|| {
469 Error::Internal(format!(
470 "Source variable '{}' not found in input columns",
471 expand.from_variable
472 ))
473 })?;
474
475 let direction = match expand.direction {
477 ExpandDirection::Outgoing => Direction::Outgoing,
478 ExpandDirection::Incoming => Direction::Incoming,
479 ExpandDirection::Both => Direction::Both,
480 };
481
482 let is_variable_length =
484 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
485
486 let operator: Box<dyn Operator> = if is_variable_length {
487 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
490 Arc::clone(&self.store),
491 input_op,
492 source_column,
493 direction,
494 expand.edge_type.clone(),
495 expand.min_hops,
496 max_hops,
497 )
498 .with_tx_context(self.viewing_epoch, self.tx_id);
499
500 if expand.path_alias.is_some() {
502 expand_op = expand_op.with_path_length_output();
503 }
504
505 Box::new(expand_op)
506 } else {
507 let expand_op = ExpandOperator::new(
509 Arc::clone(&self.store),
510 input_op,
511 source_column,
512 direction,
513 expand.edge_type.clone(),
514 )
515 .with_tx_context(self.viewing_epoch, self.tx_id);
516 Box::new(expand_op)
517 };
518
519 let mut columns = input_columns;
522
523 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
525 let count = self.anon_edge_counter.get();
526 self.anon_edge_counter.set(count + 1);
527 format!("_anon_edge_{}", count)
528 });
529 columns.push(edge_col_name);
530
531 columns.push(expand.to_variable.clone());
532
533 if let Some(ref path_alias) = expand.path_alias {
535 columns.push(format!("_path_length_{}", path_alias));
536 }
537
538 Ok((operator, columns))
539 }
540
541 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
549 let expands = Self::collect_expand_chain(op);
550 if expands.is_empty() {
551 return Err(Error::Internal("Empty expand chain".to_string()));
552 }
553
554 let first_expand = expands[0];
556 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
557
558 let mut columns = base_columns.clone();
559 let mut steps = Vec::new();
560
561 let mut is_first = true;
566
567 for expand in &expands {
568 let source_column = if is_first {
570 base_columns
572 .iter()
573 .position(|c| c == &expand.from_variable)
574 .ok_or_else(|| {
575 Error::Internal(format!(
576 "Source variable '{}' not found in base columns",
577 expand.from_variable
578 ))
579 })?
580 } else {
581 1
584 };
585
586 let direction = match expand.direction {
588 ExpandDirection::Outgoing => Direction::Outgoing,
589 ExpandDirection::Incoming => Direction::Incoming,
590 ExpandDirection::Both => Direction::Both,
591 };
592
593 steps.push(ExpandStep {
595 source_column,
596 direction,
597 edge_type: expand.edge_type.clone(),
598 });
599
600 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
602 let count = self.anon_edge_counter.get();
603 self.anon_edge_counter.set(count + 1);
604 format!("_anon_edge_{}", count)
605 });
606 columns.push(edge_col_name);
607 columns.push(expand.to_variable.clone());
608
609 is_first = false;
610 }
611
612 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
614
615 if let Some(tx_id) = self.tx_id {
616 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
617 } else {
618 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
619 }
620
621 Ok((Box::new(lazy_op), columns))
622 }
623
624 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
626 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
628
629 let variable_columns: HashMap<String, usize> = input_columns
631 .iter()
632 .enumerate()
633 .map(|(i, name)| (name.clone(), i))
634 .collect();
635
636 let columns: Vec<String> = ret
638 .items
639 .iter()
640 .map(|item| {
641 item.alias.clone().unwrap_or_else(|| {
642 expression_to_string(&item.expression)
644 })
645 })
646 .collect();
647
648 let needs_project = ret
650 .items
651 .iter()
652 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
653
654 if needs_project {
655 let mut projections = Vec::with_capacity(ret.items.len());
657 let mut output_types = Vec::with_capacity(ret.items.len());
658
659 for item in &ret.items {
660 match &item.expression {
661 LogicalExpression::Variable(name) => {
662 let col_idx = *variable_columns.get(name).ok_or_else(|| {
663 Error::Internal(format!("Variable '{}' not found in input", name))
664 })?;
665 projections.push(ProjectExpr::Column(col_idx));
666 output_types.push(LogicalType::Node);
668 }
669 LogicalExpression::Property { variable, property } => {
670 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
671 Error::Internal(format!("Variable '{}' not found in input", variable))
672 })?;
673 projections.push(ProjectExpr::PropertyAccess {
674 column: col_idx,
675 property: property.clone(),
676 });
677 output_types.push(LogicalType::Any);
679 }
680 LogicalExpression::Literal(value) => {
681 projections.push(ProjectExpr::Constant(value.clone()));
682 output_types.push(value_to_logical_type(value));
683 }
684 LogicalExpression::FunctionCall { name, args, .. } => {
685 match name.to_lowercase().as_str() {
687 "type" => {
688 if args.len() != 1 {
690 return Err(Error::Internal(
691 "type() requires exactly one argument".to_string(),
692 ));
693 }
694 if let LogicalExpression::Variable(var_name) = &args[0] {
695 let col_idx =
696 *variable_columns.get(var_name).ok_or_else(|| {
697 Error::Internal(format!(
698 "Variable '{}' not found in input",
699 var_name
700 ))
701 })?;
702 projections.push(ProjectExpr::EdgeType { column: col_idx });
703 output_types.push(LogicalType::String);
704 } else {
705 return Err(Error::Internal(
706 "type() argument must be a variable".to_string(),
707 ));
708 }
709 }
710 "length" => {
711 if args.len() != 1 {
714 return Err(Error::Internal(
715 "length() requires exactly one argument".to_string(),
716 ));
717 }
718 if let LogicalExpression::Variable(var_name) = &args[0] {
719 let col_idx =
720 *variable_columns.get(var_name).ok_or_else(|| {
721 Error::Internal(format!(
722 "Variable '{}' not found in input",
723 var_name
724 ))
725 })?;
726 projections.push(ProjectExpr::Column(col_idx));
728 output_types.push(LogicalType::Int64);
729 } else {
730 return Err(Error::Internal(
731 "length() argument must be a variable".to_string(),
732 ));
733 }
734 }
735 _ => {
737 let filter_expr = self.convert_expression(&item.expression)?;
738 projections.push(ProjectExpr::Expression {
739 expr: filter_expr,
740 variable_columns: variable_columns.clone(),
741 });
742 output_types.push(LogicalType::Any);
743 }
744 }
745 }
746 LogicalExpression::Case { .. } => {
747 let filter_expr = self.convert_expression(&item.expression)?;
749 projections.push(ProjectExpr::Expression {
750 expr: filter_expr,
751 variable_columns: variable_columns.clone(),
752 });
753 output_types.push(LogicalType::Any);
755 }
756 _ => {
757 return Err(Error::Internal(format!(
758 "Unsupported RETURN expression: {:?}",
759 item.expression
760 )));
761 }
762 }
763 }
764
765 let operator = Box::new(ProjectOperator::with_store(
766 input_op,
767 projections,
768 output_types,
769 Arc::clone(&self.store),
770 ));
771
772 Ok((operator, columns))
773 } else {
774 let mut projections = Vec::with_capacity(ret.items.len());
777 let mut output_types = Vec::with_capacity(ret.items.len());
778
779 for item in &ret.items {
780 if let LogicalExpression::Variable(name) = &item.expression {
781 let col_idx = *variable_columns.get(name).ok_or_else(|| {
782 Error::Internal(format!("Variable '{}' not found in input", name))
783 })?;
784 projections.push(ProjectExpr::Column(col_idx));
785 output_types.push(LogicalType::Node);
786 }
787 }
788
789 if projections.len() == input_columns.len()
791 && projections
792 .iter()
793 .enumerate()
794 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
795 {
796 Ok((input_op, columns))
798 } else {
799 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
800 Ok((operator, columns))
801 }
802 }
803 }
804
805 fn plan_project(
807 &self,
808 project: &crate::query::plan::ProjectOp,
809 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
810 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
812 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
813 let single_row_op: Box<dyn Operator> = Box::new(
815 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
816 );
817 (single_row_op, Vec::new())
818 } else {
819 self.plan_operator(&project.input)?
820 };
821
822 let variable_columns: HashMap<String, usize> = input_columns
824 .iter()
825 .enumerate()
826 .map(|(i, name)| (name.clone(), i))
827 .collect();
828
829 let mut projections = Vec::with_capacity(project.projections.len());
831 let mut output_types = Vec::with_capacity(project.projections.len());
832 let mut output_columns = Vec::with_capacity(project.projections.len());
833
834 for projection in &project.projections {
835 let col_name = projection
837 .alias
838 .clone()
839 .unwrap_or_else(|| expression_to_string(&projection.expression));
840 output_columns.push(col_name);
841
842 match &projection.expression {
843 LogicalExpression::Variable(name) => {
844 let col_idx = *variable_columns.get(name).ok_or_else(|| {
845 Error::Internal(format!("Variable '{}' not found in input", name))
846 })?;
847 projections.push(ProjectExpr::Column(col_idx));
848 output_types.push(LogicalType::Node);
849 }
850 LogicalExpression::Property { variable, property } => {
851 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
852 Error::Internal(format!("Variable '{}' not found in input", variable))
853 })?;
854 projections.push(ProjectExpr::PropertyAccess {
855 column: col_idx,
856 property: property.clone(),
857 });
858 output_types.push(LogicalType::Any);
859 }
860 LogicalExpression::Literal(value) => {
861 projections.push(ProjectExpr::Constant(value.clone()));
862 output_types.push(value_to_logical_type(value));
863 }
864 _ => {
865 let filter_expr = self.convert_expression(&projection.expression)?;
867 projections.push(ProjectExpr::Expression {
868 expr: filter_expr,
869 variable_columns: variable_columns.clone(),
870 });
871 output_types.push(LogicalType::Any);
872 }
873 }
874 }
875
876 let operator = Box::new(ProjectOperator::with_store(
877 input_op,
878 projections,
879 output_types,
880 Arc::clone(&self.store),
881 ));
882
883 Ok((operator, output_columns))
884 }
885
886 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
892 if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
895 let (_, columns) = self.plan_operator(&filter.input)?;
897 let schema = self.derive_schema_from_columns(&columns);
898 let empty_op = Box::new(EmptyOperator::new(schema));
899 return Ok((empty_op, columns));
900 }
901
902 if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
904 return Ok(result);
905 }
906
907 if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
909 return Ok(result);
910 }
911
912 let (input_op, columns) = self.plan_operator(&filter.input)?;
914
915 let variable_columns: HashMap<String, usize> = columns
917 .iter()
918 .enumerate()
919 .map(|(i, name)| (name.clone(), i))
920 .collect();
921
922 let filter_expr = self.convert_expression(&filter.predicate)?;
924
925 let predicate =
927 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
928
929 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
931
932 Ok((operator, columns))
933 }
934
935 fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
942 use grafeo_core::graph::lpg::CompareOp;
943
944 match predicate {
945 LogicalExpression::Binary { left, op, right } => {
946 match op {
948 BinaryOp::And => {
949 let left_result = self.check_zone_map_for_predicate(left);
950 let right_result = self.check_zone_map_for_predicate(right);
951
952 return match (left_result, right_result) {
953 (Some(false), _) | (_, Some(false)) => Some(false),
955 (Some(true), Some(true)) => Some(true),
957 _ => None,
959 };
960 }
961 BinaryOp::Or => {
962 let left_result = self.check_zone_map_for_predicate(left);
963 let right_result = self.check_zone_map_for_predicate(right);
964
965 return match (left_result, right_result) {
966 (Some(false), Some(false)) => Some(false),
968 (Some(true), _) | (_, Some(true)) => Some(true),
970 _ => None,
972 };
973 }
974 _ => {}
975 }
976
977 let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
979 (
980 LogicalExpression::Property { property, .. },
981 LogicalExpression::Literal(val),
982 ) => {
983 let cmp = match op {
984 BinaryOp::Eq => CompareOp::Eq,
985 BinaryOp::Ne => CompareOp::Ne,
986 BinaryOp::Lt => CompareOp::Lt,
987 BinaryOp::Le => CompareOp::Le,
988 BinaryOp::Gt => CompareOp::Gt,
989 BinaryOp::Ge => CompareOp::Ge,
990 _ => return None,
991 };
992 (property.clone(), cmp, val.clone())
993 }
994 (
995 LogicalExpression::Literal(val),
996 LogicalExpression::Property { property, .. },
997 ) => {
998 let cmp = match op {
1000 BinaryOp::Eq => CompareOp::Eq,
1001 BinaryOp::Ne => CompareOp::Ne,
1002 BinaryOp::Lt => CompareOp::Gt, BinaryOp::Le => CompareOp::Ge,
1004 BinaryOp::Gt => CompareOp::Lt,
1005 BinaryOp::Ge => CompareOp::Le,
1006 _ => return None,
1007 };
1008 (property.clone(), cmp, val.clone())
1009 }
1010 _ => return None,
1011 };
1012
1013 let might_match =
1015 self.store
1016 .node_property_might_match(&property.into(), compare_op, &value);
1017
1018 Some(might_match)
1019 }
1020
1021 _ => None,
1022 }
1023 }
1024
1025 fn try_plan_filter_with_property_index(
1034 &self,
1035 filter: &FilterOp,
1036 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1037 let (scan_variable, scan_label) = match filter.input.as_ref() {
1039 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1040 (scan.variable.clone(), scan.label.clone())
1041 }
1042 _ => return Ok(None),
1043 };
1044
1045 let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1048
1049 if conditions.is_empty() {
1050 return Ok(None);
1051 }
1052
1053 let has_indexed_condition = conditions
1055 .iter()
1056 .any(|(prop, _)| self.store.has_property_index(prop));
1057
1058 if !has_indexed_condition {
1059 return Ok(None);
1060 }
1061
1062 let conditions_ref: Vec<(&str, Value)> = conditions
1064 .iter()
1065 .map(|(p, v)| (p.as_str(), v.clone()))
1066 .collect();
1067 let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1068
1069 if let Some(label) = &scan_label {
1071 let label_nodes: std::collections::HashSet<_> =
1072 self.store.nodes_by_label(label).into_iter().collect();
1073 matching_nodes.retain(|n| label_nodes.contains(n));
1074 }
1075
1076 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1078 let columns = vec![scan_variable];
1079
1080 Ok(Some((node_list_op, columns)))
1081 }
1082
1083 fn extract_equality_conditions(
1089 &self,
1090 predicate: &LogicalExpression,
1091 target_variable: &str,
1092 ) -> Vec<(String, Value)> {
1093 let mut conditions = Vec::new();
1094 self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1095 conditions
1096 }
1097
1098 fn collect_equality_conditions(
1100 &self,
1101 expr: &LogicalExpression,
1102 target_variable: &str,
1103 conditions: &mut Vec<(String, Value)>,
1104 ) {
1105 match expr {
1106 LogicalExpression::Binary {
1108 left,
1109 op: BinaryOp::And,
1110 right,
1111 } => {
1112 self.collect_equality_conditions(left, target_variable, conditions);
1113 self.collect_equality_conditions(right, target_variable, conditions);
1114 }
1115
1116 LogicalExpression::Binary {
1118 left,
1119 op: BinaryOp::Eq,
1120 right,
1121 } => {
1122 if let Some((var, prop, val)) = self.extract_property_equality(left, right) {
1123 if var == target_variable {
1124 conditions.push((prop, val));
1125 }
1126 }
1127 }
1128
1129 _ => {}
1130 }
1131 }
1132
1133 fn extract_property_equality(
1135 &self,
1136 left: &LogicalExpression,
1137 right: &LogicalExpression,
1138 ) -> Option<(String, String, Value)> {
1139 match (left, right) {
1140 (
1141 LogicalExpression::Property { variable, property },
1142 LogicalExpression::Literal(val),
1143 ) => Some((variable.clone(), property.clone(), val.clone())),
1144 (
1145 LogicalExpression::Literal(val),
1146 LogicalExpression::Property { variable, property },
1147 ) => Some((variable.clone(), property.clone(), val.clone())),
1148 _ => None,
1149 }
1150 }
1151
1152 fn try_plan_filter_with_range_index(
1165 &self,
1166 filter: &FilterOp,
1167 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1168 let (scan_variable, scan_label) = match filter.input.as_ref() {
1170 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1171 (scan.variable.clone(), scan.label.clone())
1172 }
1173 _ => return Ok(None),
1174 };
1175
1176 if let Some((variable, property, min, max, min_inc, max_inc)) =
1178 self.extract_between_predicate(&filter.predicate)
1179 {
1180 if variable == scan_variable {
1181 return self.plan_range_filter(
1182 &scan_variable,
1183 &scan_label,
1184 &property,
1185 RangeBounds {
1186 min: Some(&min),
1187 max: Some(&max),
1188 min_inclusive: min_inc,
1189 max_inclusive: max_inc,
1190 },
1191 );
1192 }
1193 }
1194
1195 if let Some((variable, property, op, value)) =
1197 self.extract_range_predicate(&filter.predicate)
1198 {
1199 if variable == scan_variable {
1200 let (min, max, min_inc, max_inc) = match op {
1201 BinaryOp::Lt => (None, Some(value), false, false),
1202 BinaryOp::Le => (None, Some(value), false, true),
1203 BinaryOp::Gt => (Some(value), None, false, false),
1204 BinaryOp::Ge => (Some(value), None, true, false),
1205 _ => return Ok(None),
1206 };
1207 return self.plan_range_filter(
1208 &scan_variable,
1209 &scan_label,
1210 &property,
1211 RangeBounds {
1212 min: min.as_ref(),
1213 max: max.as_ref(),
1214 min_inclusive: min_inc,
1215 max_inclusive: max_inc,
1216 },
1217 );
1218 }
1219 }
1220
1221 Ok(None)
1222 }
1223
1224 fn plan_range_filter(
1226 &self,
1227 scan_variable: &str,
1228 scan_label: &Option<String>,
1229 property: &str,
1230 bounds: RangeBounds<'_>,
1231 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1232 let mut matching_nodes = self.store.find_nodes_in_range(
1234 property,
1235 bounds.min,
1236 bounds.max,
1237 bounds.min_inclusive,
1238 bounds.max_inclusive,
1239 );
1240
1241 if let Some(label) = scan_label {
1243 let label_nodes: std::collections::HashSet<_> =
1244 self.store.nodes_by_label(label).into_iter().collect();
1245 matching_nodes.retain(|n| label_nodes.contains(n));
1246 }
1247
1248 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1250 let columns = vec![scan_variable.to_string()];
1251
1252 Ok(Some((node_list_op, columns)))
1253 }
1254
1255 fn extract_range_predicate(
1259 &self,
1260 predicate: &LogicalExpression,
1261 ) -> Option<(String, String, BinaryOp, Value)> {
1262 match predicate {
1263 LogicalExpression::Binary { left, op, right } => {
1264 match op {
1265 BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1266 if let (
1268 LogicalExpression::Property { variable, property },
1269 LogicalExpression::Literal(val),
1270 ) = (left.as_ref(), right.as_ref())
1271 {
1272 return Some((variable.clone(), property.clone(), *op, val.clone()));
1273 }
1274
1275 if let (
1277 LogicalExpression::Literal(val),
1278 LogicalExpression::Property { variable, property },
1279 ) = (left.as_ref(), right.as_ref())
1280 {
1281 let flipped_op = match op {
1282 BinaryOp::Lt => BinaryOp::Gt,
1283 BinaryOp::Le => BinaryOp::Ge,
1284 BinaryOp::Gt => BinaryOp::Lt,
1285 BinaryOp::Ge => BinaryOp::Le,
1286 _ => return None,
1287 };
1288 return Some((
1289 variable.clone(),
1290 property.clone(),
1291 flipped_op,
1292 val.clone(),
1293 ));
1294 }
1295 }
1296 _ => {}
1297 }
1298 }
1299 _ => {}
1300 }
1301 None
1302 }
1303
1304 fn extract_between_predicate(
1312 &self,
1313 predicate: &LogicalExpression,
1314 ) -> Option<(String, String, Value, Value, bool, bool)> {
1315 let (left, right) = match predicate {
1317 LogicalExpression::Binary {
1318 left,
1319 op: BinaryOp::And,
1320 right,
1321 } => (left.as_ref(), right.as_ref()),
1322 _ => return None,
1323 };
1324
1325 let left_range = self.extract_range_predicate(left);
1327 let right_range = self.extract_range_predicate(right);
1328
1329 let (left_var, left_prop, left_op, left_val) = left_range?;
1330 let (right_var, right_prop, right_op, right_val) = right_range?;
1331
1332 if left_var != right_var || left_prop != right_prop {
1334 return None;
1335 }
1336
1337 let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1339 (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1341 (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1343 (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1345 (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1347 (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1349 (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1351 (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1353 (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1355 _ => return None,
1356 };
1357
1358 Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1359 }
1360
1361 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1363 let (input_op, columns) = self.plan_operator(&limit.input)?;
1364 let output_schema = self.derive_schema_from_columns(&columns);
1365 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1366 Ok((operator, columns))
1367 }
1368
1369 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1371 let (input_op, columns) = self.plan_operator(&skip.input)?;
1372 let output_schema = self.derive_schema_from_columns(&columns);
1373 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1374 Ok((operator, columns))
1375 }
1376
1377 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1379 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1380
1381 let mut variable_columns: HashMap<String, usize> = input_columns
1383 .iter()
1384 .enumerate()
1385 .map(|(i, name)| (name.clone(), i))
1386 .collect();
1387
1388 let mut property_projections: Vec<(String, String, String)> = Vec::new();
1390 let mut next_col_idx = input_columns.len();
1391
1392 for key in &sort.keys {
1393 if let LogicalExpression::Property { variable, property } = &key.expression {
1394 let col_name = format!("{}_{}", variable, property);
1395 if !variable_columns.contains_key(&col_name) {
1396 property_projections.push((
1397 variable.clone(),
1398 property.clone(),
1399 col_name.clone(),
1400 ));
1401 variable_columns.insert(col_name, next_col_idx);
1402 next_col_idx += 1;
1403 }
1404 }
1405 }
1406
1407 let mut output_columns = input_columns.clone();
1409
1410 if !property_projections.is_empty() {
1412 let mut projections = Vec::new();
1413 let mut output_types = Vec::new();
1414
1415 for (i, _) in input_columns.iter().enumerate() {
1418 projections.push(ProjectExpr::Column(i));
1419 output_types.push(LogicalType::Node);
1420 }
1421
1422 for (variable, property, col_name) in &property_projections {
1424 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1425 Error::Internal(format!(
1426 "Variable '{}' not found for ORDER BY property projection",
1427 variable
1428 ))
1429 })?;
1430 projections.push(ProjectExpr::PropertyAccess {
1431 column: source_col,
1432 property: property.clone(),
1433 });
1434 output_types.push(LogicalType::Any);
1435 output_columns.push(col_name.clone());
1436 }
1437
1438 input_op = Box::new(ProjectOperator::with_store(
1439 input_op,
1440 projections,
1441 output_types,
1442 Arc::clone(&self.store),
1443 ));
1444 }
1445
1446 let physical_keys: Vec<PhysicalSortKey> = sort
1448 .keys
1449 .iter()
1450 .map(|key| {
1451 let col_idx = self
1452 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1453 Ok(PhysicalSortKey {
1454 column: col_idx,
1455 direction: match key.order {
1456 SortOrder::Ascending => SortDirection::Ascending,
1457 SortOrder::Descending => SortDirection::Descending,
1458 },
1459 null_order: NullOrder::NullsLast,
1460 })
1461 })
1462 .collect::<Result<Vec<_>>>()?;
1463
1464 let output_schema = self.derive_schema_from_columns(&output_columns);
1465 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1466 Ok((operator, output_columns))
1467 }
1468
1469 fn resolve_sort_expression_with_properties(
1471 &self,
1472 expr: &LogicalExpression,
1473 variable_columns: &HashMap<String, usize>,
1474 ) -> Result<usize> {
1475 match expr {
1476 LogicalExpression::Variable(name) => {
1477 variable_columns.get(name).copied().ok_or_else(|| {
1478 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1479 })
1480 }
1481 LogicalExpression::Property { variable, property } => {
1482 let col_name = format!("{}_{}", variable, property);
1484 variable_columns.get(&col_name).copied().ok_or_else(|| {
1485 Error::Internal(format!(
1486 "Property column '{}' not found for ORDER BY (from {}.{})",
1487 col_name, variable, property
1488 ))
1489 })
1490 }
1491 _ => Err(Error::Internal(format!(
1492 "Unsupported ORDER BY expression: {:?}",
1493 expr
1494 ))),
1495 }
1496 }
1497
1498 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1500 columns.iter().map(|_| LogicalType::Any).collect()
1501 }
1502
1503 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1505 if self.factorized_execution
1512 && agg.group_by.is_empty()
1513 && Self::count_expand_chain(&agg.input).0 >= 2
1514 && self.is_simple_aggregate(agg)
1515 {
1516 if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1517 return Ok((op, cols));
1518 }
1519 }
1521
1522 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1523
1524 let mut variable_columns: HashMap<String, usize> = input_columns
1526 .iter()
1527 .enumerate()
1528 .map(|(i, name)| (name.clone(), i))
1529 .collect();
1530
1531 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1534
1535 for expr in &agg.group_by {
1537 if let LogicalExpression::Property { variable, property } = expr {
1538 let col_name = format!("{}_{}", variable, property);
1539 if !variable_columns.contains_key(&col_name) {
1540 property_projections.push((
1541 variable.clone(),
1542 property.clone(),
1543 col_name.clone(),
1544 ));
1545 variable_columns.insert(col_name, next_col_idx);
1546 next_col_idx += 1;
1547 }
1548 }
1549 }
1550
1551 for agg_expr in &agg.aggregates {
1553 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1554 let col_name = format!("{}_{}", variable, property);
1555 if !variable_columns.contains_key(&col_name) {
1556 property_projections.push((
1557 variable.clone(),
1558 property.clone(),
1559 col_name.clone(),
1560 ));
1561 variable_columns.insert(col_name, next_col_idx);
1562 next_col_idx += 1;
1563 }
1564 }
1565 }
1566
1567 if !property_projections.is_empty() {
1569 let mut projections = Vec::new();
1570 let mut output_types = Vec::new();
1571
1572 for (i, _) in input_columns.iter().enumerate() {
1575 projections.push(ProjectExpr::Column(i));
1576 output_types.push(LogicalType::Node);
1577 }
1578
1579 for (variable, property, _col_name) in &property_projections {
1581 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1582 Error::Internal(format!(
1583 "Variable '{}' not found for property projection",
1584 variable
1585 ))
1586 })?;
1587 projections.push(ProjectExpr::PropertyAccess {
1588 column: source_col,
1589 property: property.clone(),
1590 });
1591 output_types.push(LogicalType::Any); }
1593
1594 input_op = Box::new(ProjectOperator::with_store(
1595 input_op,
1596 projections,
1597 output_types,
1598 Arc::clone(&self.store),
1599 ));
1600 }
1601
1602 let group_columns: Vec<usize> = agg
1604 .group_by
1605 .iter()
1606 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1607 .collect::<Result<Vec<_>>>()?;
1608
1609 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1611 .aggregates
1612 .iter()
1613 .map(|agg_expr| {
1614 let column = agg_expr
1615 .expression
1616 .as_ref()
1617 .map(|e| {
1618 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1619 })
1620 .transpose()?;
1621
1622 Ok(PhysicalAggregateExpr {
1623 function: convert_aggregate_function(agg_expr.function),
1624 column,
1625 distinct: agg_expr.distinct,
1626 alias: agg_expr.alias.clone(),
1627 percentile: agg_expr.percentile,
1628 })
1629 })
1630 .collect::<Result<Vec<_>>>()?;
1631
1632 let mut output_schema = Vec::new();
1634 let mut output_columns = Vec::new();
1635
1636 for expr in &agg.group_by {
1638 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1640 }
1641
1642 for agg_expr in &agg.aggregates {
1644 let result_type = match agg_expr.function {
1645 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1646 LogicalType::Int64
1647 }
1648 LogicalAggregateFunction::Sum => LogicalType::Int64,
1649 LogicalAggregateFunction::Avg => LogicalType::Float64,
1650 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1651 LogicalType::Int64
1655 }
1656 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1659 | LogicalAggregateFunction::StdDevPop
1660 | LogicalAggregateFunction::PercentileDisc
1661 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1662 };
1663 output_schema.push(result_type);
1664 output_columns.push(
1665 agg_expr
1666 .alias
1667 .clone()
1668 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1669 );
1670 }
1671
1672 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1674 Box::new(SimpleAggregateOperator::new(
1675 input_op,
1676 physical_aggregates,
1677 output_schema,
1678 ))
1679 } else {
1680 Box::new(HashAggregateOperator::new(
1681 input_op,
1682 group_columns,
1683 physical_aggregates,
1684 output_schema,
1685 ))
1686 };
1687
1688 if let Some(having_expr) = &agg.having {
1690 let having_var_columns: HashMap<String, usize> = output_columns
1692 .iter()
1693 .enumerate()
1694 .map(|(i, name)| (name.clone(), i))
1695 .collect();
1696
1697 let filter_expr = self.convert_expression(having_expr)?;
1698 let predicate =
1699 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1700 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1701 }
1702
1703 Ok((operator, output_columns))
1704 }
1705
1706 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1712 agg.aggregates.iter().all(|agg_expr| {
1713 match agg_expr.function {
1714 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1715 agg_expr.expression.is_none()
1717 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1718 }
1719 LogicalAggregateFunction::Sum
1720 | LogicalAggregateFunction::Avg
1721 | LogicalAggregateFunction::Min
1722 | LogicalAggregateFunction::Max => {
1723 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1726 }
1727 _ => false,
1729 }
1730 })
1731 }
1732
1733 fn plan_factorized_aggregate(
1737 &self,
1738 agg: &AggregateOp,
1739 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1740 let expands = Self::collect_expand_chain(&agg.input);
1742 if expands.is_empty() {
1743 return Err(Error::Internal(
1744 "Expected expand chain for factorized aggregate".to_string(),
1745 ));
1746 }
1747
1748 let first_expand = expands[0];
1750 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1751
1752 let mut columns = base_columns.clone();
1753 let mut steps = Vec::new();
1754 let mut is_first = true;
1755
1756 for expand in &expands {
1757 let source_column = if is_first {
1759 base_columns
1760 .iter()
1761 .position(|c| c == &expand.from_variable)
1762 .ok_or_else(|| {
1763 Error::Internal(format!(
1764 "Source variable '{}' not found in base columns",
1765 expand.from_variable
1766 ))
1767 })?
1768 } else {
1769 1 };
1771
1772 let direction = match expand.direction {
1773 ExpandDirection::Outgoing => Direction::Outgoing,
1774 ExpandDirection::Incoming => Direction::Incoming,
1775 ExpandDirection::Both => Direction::Both,
1776 };
1777
1778 steps.push(ExpandStep {
1779 source_column,
1780 direction,
1781 edge_type: expand.edge_type.clone(),
1782 });
1783
1784 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1785 let count = self.anon_edge_counter.get();
1786 self.anon_edge_counter.set(count + 1);
1787 format!("_anon_edge_{}", count)
1788 });
1789 columns.push(edge_col_name);
1790 columns.push(expand.to_variable.clone());
1791
1792 is_first = false;
1793 }
1794
1795 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1797
1798 if let Some(tx_id) = self.tx_id {
1799 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1800 } else {
1801 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1802 }
1803
1804 let factorized_aggs: Vec<FactorizedAggregate> = agg
1806 .aggregates
1807 .iter()
1808 .map(|agg_expr| {
1809 match agg_expr.function {
1810 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1811 if agg_expr.expression.is_none() {
1813 FactorizedAggregate::count()
1814 } else {
1815 FactorizedAggregate::count_column(1) }
1819 }
1820 LogicalAggregateFunction::Sum => {
1821 FactorizedAggregate::sum(1)
1823 }
1824 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1825 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1826 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1827 _ => {
1828 FactorizedAggregate::count()
1830 }
1831 }
1832 })
1833 .collect();
1834
1835 let output_columns: Vec<String> = agg
1837 .aggregates
1838 .iter()
1839 .map(|agg_expr| {
1840 agg_expr
1841 .alias
1842 .clone()
1843 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1844 })
1845 .collect();
1846
1847 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1849
1850 Ok((Box::new(factorized_agg_op), output_columns))
1851 }
1852
1853 #[allow(dead_code)]
1855 fn resolve_expression_to_column(
1856 &self,
1857 expr: &LogicalExpression,
1858 variable_columns: &HashMap<String, usize>,
1859 ) -> Result<usize> {
1860 match expr {
1861 LogicalExpression::Variable(name) => variable_columns
1862 .get(name)
1863 .copied()
1864 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1865 LogicalExpression::Property { variable, .. } => variable_columns
1866 .get(variable)
1867 .copied()
1868 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1869 _ => Err(Error::Internal(format!(
1870 "Cannot resolve expression to column: {:?}",
1871 expr
1872 ))),
1873 }
1874 }
1875
1876 fn resolve_expression_to_column_with_properties(
1880 &self,
1881 expr: &LogicalExpression,
1882 variable_columns: &HashMap<String, usize>,
1883 ) -> Result<usize> {
1884 match expr {
1885 LogicalExpression::Variable(name) => variable_columns
1886 .get(name)
1887 .copied()
1888 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1889 LogicalExpression::Property { variable, property } => {
1890 let col_name = format!("{}_{}", variable, property);
1892 variable_columns.get(&col_name).copied().ok_or_else(|| {
1893 Error::Internal(format!(
1894 "Property column '{}' not found (from {}.{})",
1895 col_name, variable, property
1896 ))
1897 })
1898 }
1899 _ => Err(Error::Internal(format!(
1900 "Cannot resolve expression to column: {:?}",
1901 expr
1902 ))),
1903 }
1904 }
1905
1906 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1908 match expr {
1909 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1910 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1911 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1912 variable: variable.clone(),
1913 property: property.clone(),
1914 }),
1915 LogicalExpression::Binary { left, op, right } => {
1916 let left_expr = self.convert_expression(left)?;
1917 let right_expr = self.convert_expression(right)?;
1918 let filter_op = convert_binary_op(*op)?;
1919 Ok(FilterExpression::Binary {
1920 left: Box::new(left_expr),
1921 op: filter_op,
1922 right: Box::new(right_expr),
1923 })
1924 }
1925 LogicalExpression::Unary { op, operand } => {
1926 let operand_expr = self.convert_expression(operand)?;
1927 let filter_op = convert_unary_op(*op)?;
1928 Ok(FilterExpression::Unary {
1929 op: filter_op,
1930 operand: Box::new(operand_expr),
1931 })
1932 }
1933 LogicalExpression::FunctionCall { name, args, .. } => {
1934 let filter_args: Vec<FilterExpression> = args
1935 .iter()
1936 .map(|a| self.convert_expression(a))
1937 .collect::<Result<Vec<_>>>()?;
1938 Ok(FilterExpression::FunctionCall {
1939 name: name.clone(),
1940 args: filter_args,
1941 })
1942 }
1943 LogicalExpression::Case {
1944 operand,
1945 when_clauses,
1946 else_clause,
1947 } => {
1948 let filter_operand = operand
1949 .as_ref()
1950 .map(|e| self.convert_expression(e))
1951 .transpose()?
1952 .map(Box::new);
1953 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1954 .iter()
1955 .map(|(cond, result)| {
1956 Ok((
1957 self.convert_expression(cond)?,
1958 self.convert_expression(result)?,
1959 ))
1960 })
1961 .collect::<Result<Vec<_>>>()?;
1962 let filter_else = else_clause
1963 .as_ref()
1964 .map(|e| self.convert_expression(e))
1965 .transpose()?
1966 .map(Box::new);
1967 Ok(FilterExpression::Case {
1968 operand: filter_operand,
1969 when_clauses: filter_when_clauses,
1970 else_clause: filter_else,
1971 })
1972 }
1973 LogicalExpression::List(items) => {
1974 let filter_items: Vec<FilterExpression> = items
1975 .iter()
1976 .map(|item| self.convert_expression(item))
1977 .collect::<Result<Vec<_>>>()?;
1978 Ok(FilterExpression::List(filter_items))
1979 }
1980 LogicalExpression::Map(pairs) => {
1981 let filter_pairs: Vec<(String, FilterExpression)> = pairs
1982 .iter()
1983 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1984 .collect::<Result<Vec<_>>>()?;
1985 Ok(FilterExpression::Map(filter_pairs))
1986 }
1987 LogicalExpression::IndexAccess { base, index } => {
1988 let base_expr = self.convert_expression(base)?;
1989 let index_expr = self.convert_expression(index)?;
1990 Ok(FilterExpression::IndexAccess {
1991 base: Box::new(base_expr),
1992 index: Box::new(index_expr),
1993 })
1994 }
1995 LogicalExpression::SliceAccess { base, start, end } => {
1996 let base_expr = self.convert_expression(base)?;
1997 let start_expr = start
1998 .as_ref()
1999 .map(|s| self.convert_expression(s))
2000 .transpose()?
2001 .map(Box::new);
2002 let end_expr = end
2003 .as_ref()
2004 .map(|e| self.convert_expression(e))
2005 .transpose()?
2006 .map(Box::new);
2007 Ok(FilterExpression::SliceAccess {
2008 base: Box::new(base_expr),
2009 start: start_expr,
2010 end: end_expr,
2011 })
2012 }
2013 LogicalExpression::Parameter(_) => Err(Error::Internal(
2014 "Parameters not yet supported in filters".to_string(),
2015 )),
2016 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2017 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2018 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2019 LogicalExpression::ListComprehension {
2020 variable,
2021 list_expr,
2022 filter_expr,
2023 map_expr,
2024 } => {
2025 let list = self.convert_expression(list_expr)?;
2026 let filter = filter_expr
2027 .as_ref()
2028 .map(|f| self.convert_expression(f))
2029 .transpose()?
2030 .map(Box::new);
2031 let map = self.convert_expression(map_expr)?;
2032 Ok(FilterExpression::ListComprehension {
2033 variable: variable.clone(),
2034 list_expr: Box::new(list),
2035 filter_expr: filter,
2036 map_expr: Box::new(map),
2037 })
2038 }
2039 LogicalExpression::ExistsSubquery(subplan) => {
2040 let (start_var, direction, edge_type, end_labels) =
2043 self.extract_exists_pattern(subplan)?;
2044
2045 Ok(FilterExpression::ExistsSubquery {
2046 start_var,
2047 direction,
2048 edge_type,
2049 end_labels,
2050 min_hops: None,
2051 max_hops: None,
2052 })
2053 }
2054 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2055 "COUNT subqueries not yet supported".to_string(),
2056 )),
2057 }
2058 }
2059
2060 fn extract_exists_pattern(
2063 &self,
2064 subplan: &LogicalOperator,
2065 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2066 match subplan {
2067 LogicalOperator::Expand(expand) => {
2068 let end_labels = self.extract_end_labels_from_expand(expand);
2070 let direction = match expand.direction {
2071 ExpandDirection::Outgoing => Direction::Outgoing,
2072 ExpandDirection::Incoming => Direction::Incoming,
2073 ExpandDirection::Both => Direction::Both,
2074 };
2075 Ok((
2076 expand.from_variable.clone(),
2077 direction,
2078 expand.edge_type.clone(),
2079 end_labels,
2080 ))
2081 }
2082 LogicalOperator::NodeScan(scan) => {
2083 if let Some(input) = &scan.input {
2084 self.extract_exists_pattern(input)
2085 } else {
2086 Err(Error::Internal(
2087 "EXISTS subquery must contain an edge pattern".to_string(),
2088 ))
2089 }
2090 }
2091 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2092 _ => Err(Error::Internal(
2093 "Unsupported EXISTS subquery pattern".to_string(),
2094 )),
2095 }
2096 }
2097
2098 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2100 match expand.input.as_ref() {
2102 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2103 _ => None,
2104 }
2105 }
2106
2107 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2109 let (left_op, left_columns) = self.plan_operator(&join.left)?;
2110 let (right_op, right_columns) = self.plan_operator(&join.right)?;
2111
2112 let mut columns = left_columns.clone();
2114 columns.extend(right_columns.clone());
2115
2116 let physical_join_type = match join.join_type {
2118 JoinType::Inner => PhysicalJoinType::Inner,
2119 JoinType::Left => PhysicalJoinType::Left,
2120 JoinType::Right => PhysicalJoinType::Right,
2121 JoinType::Full => PhysicalJoinType::Full,
2122 JoinType::Cross => PhysicalJoinType::Cross,
2123 JoinType::Semi => PhysicalJoinType::Semi,
2124 JoinType::Anti => PhysicalJoinType::Anti,
2125 };
2126
2127 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2129 (vec![], vec![])
2131 } else {
2132 join.conditions
2133 .iter()
2134 .filter_map(|cond| {
2135 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2137 let right_idx = self
2138 .expression_to_column(&cond.right, &right_columns)
2139 .ok()?;
2140 Some((left_idx, right_idx))
2141 })
2142 .unzip()
2143 };
2144
2145 let output_schema = self.derive_schema_from_columns(&columns);
2146
2147 let _ = LeapfrogJoinOperator::new; let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2155 left_op,
2156 right_op,
2157 probe_keys,
2158 build_keys,
2159 physical_join_type,
2160 output_schema,
2161 ));
2162
2163 Ok((operator, columns))
2164 }
2165
2166 #[allow(dead_code)]
2175 fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2176 let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2178 let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2179
2180 Self::collect_join_edges(
2182 &LogicalOperator::Join(join.clone()),
2183 &mut edges,
2184 &mut all_vars,
2185 );
2186
2187 if all_vars.len() < 3 {
2189 return false;
2190 }
2191
2192 Self::has_cycle(&edges, &all_vars)
2194 }
2195
2196 fn collect_join_edges(
2198 op: &LogicalOperator,
2199 edges: &mut HashMap<String, Vec<String>>,
2200 vars: &mut std::collections::HashSet<String>,
2201 ) {
2202 match op {
2203 LogicalOperator::Join(join) => {
2204 for cond in &join.conditions {
2206 if let (Some(left_var), Some(right_var)) = (
2207 Self::extract_join_variable(&cond.left),
2208 Self::extract_join_variable(&cond.right),
2209 ) {
2210 if left_var != right_var {
2211 vars.insert(left_var.clone());
2212 vars.insert(right_var.clone());
2213
2214 edges
2216 .entry(left_var.clone())
2217 .or_default()
2218 .push(right_var.clone());
2219 edges.entry(right_var).or_default().push(left_var);
2220 }
2221 }
2222 }
2223
2224 Self::collect_join_edges(&join.left, edges, vars);
2226 Self::collect_join_edges(&join.right, edges, vars);
2227 }
2228 LogicalOperator::Expand(expand) => {
2229 vars.insert(expand.from_variable.clone());
2231 vars.insert(expand.to_variable.clone());
2232
2233 edges
2234 .entry(expand.from_variable.clone())
2235 .or_default()
2236 .push(expand.to_variable.clone());
2237 edges
2238 .entry(expand.to_variable.clone())
2239 .or_default()
2240 .push(expand.from_variable.clone());
2241
2242 Self::collect_join_edges(&expand.input, edges, vars);
2243 }
2244 LogicalOperator::Filter(filter) => {
2245 Self::collect_join_edges(&filter.input, edges, vars);
2246 }
2247 LogicalOperator::NodeScan(scan) => {
2248 vars.insert(scan.variable.clone());
2249 }
2250 _ => {}
2251 }
2252 }
2253
2254 fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2256 match expr {
2257 LogicalExpression::Variable(v) => Some(v.clone()),
2258 LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2259 LogicalExpression::Id(v) => Some(v.clone()),
2260 _ => None,
2261 }
2262 }
2263
2264 fn has_cycle(
2268 edges: &HashMap<String, Vec<String>>,
2269 vars: &std::collections::HashSet<String>,
2270 ) -> bool {
2271 let mut color: HashMap<&String, u8> = HashMap::new();
2272
2273 for var in vars {
2274 color.insert(var, 0);
2275 }
2276
2277 for start in vars {
2278 if color[start] == 0 {
2279 if Self::dfs_cycle(start, None, edges, &mut color) {
2280 return true;
2281 }
2282 }
2283 }
2284
2285 false
2286 }
2287
2288 fn dfs_cycle(
2290 node: &String,
2291 parent: Option<&String>,
2292 edges: &HashMap<String, Vec<String>>,
2293 color: &mut HashMap<&String, u8>,
2294 ) -> bool {
2295 *color.get_mut(node).unwrap() = 1; if let Some(neighbors) = edges.get(node) {
2298 for neighbor in neighbors {
2299 if parent == Some(neighbor) {
2301 continue;
2302 }
2303
2304 if let Some(&c) = color.get(neighbor) {
2305 if c == 1 {
2306 return true;
2308 }
2309 if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2310 return true;
2311 }
2312 }
2313 }
2314 }
2315
2316 *color.get_mut(node).unwrap() = 2; false
2318 }
2319
2320 #[allow(dead_code)]
2322 fn count_relations(op: &LogicalOperator) -> usize {
2323 match op {
2324 LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2325 LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2326 LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2327 LogicalOperator::Join(j) => {
2328 Self::count_relations(&j.left) + Self::count_relations(&j.right)
2329 }
2330 _ => 0,
2331 }
2332 }
2333
2334 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2336 match expr {
2337 LogicalExpression::Variable(name) => columns
2338 .iter()
2339 .position(|c| c == name)
2340 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2341 _ => Err(Error::Internal(
2342 "Only variables supported in join conditions".to_string(),
2343 )),
2344 }
2345 }
2346
2347 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2349 if union.inputs.is_empty() {
2350 return Err(Error::Internal(
2351 "Union requires at least one input".to_string(),
2352 ));
2353 }
2354
2355 let mut inputs = Vec::with_capacity(union.inputs.len());
2356 let mut columns = Vec::new();
2357
2358 for (i, input) in union.inputs.iter().enumerate() {
2359 let (op, cols) = self.plan_operator(input)?;
2360 if i == 0 {
2361 columns = cols;
2362 }
2363 inputs.push(op);
2364 }
2365
2366 let output_schema = self.derive_schema_from_columns(&columns);
2367 let operator = Box::new(UnionOperator::new(inputs, output_schema));
2368
2369 Ok((operator, columns))
2370 }
2371
2372 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2374 let (input_op, columns) = self.plan_operator(&distinct.input)?;
2375 let output_schema = self.derive_schema_from_columns(&columns);
2376 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2377 Ok((operator, columns))
2378 }
2379
2380 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2382 let (input_op, mut columns) = if let Some(ref input) = create.input {
2384 let (op, cols) = self.plan_operator(input)?;
2385 (Some(op), cols)
2386 } else {
2387 (None, vec![])
2388 };
2389
2390 let output_column = columns.len();
2392 columns.push(create.variable.clone());
2393
2394 let properties: Vec<(String, PropertySource)> = create
2396 .properties
2397 .iter()
2398 .map(|(name, expr)| {
2399 let source = match expr {
2400 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2401 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2402 };
2403 (name.clone(), source)
2404 })
2405 .collect();
2406
2407 let output_schema = self.derive_schema_from_columns(&columns);
2408
2409 let operator = Box::new(
2410 CreateNodeOperator::new(
2411 Arc::clone(&self.store),
2412 input_op,
2413 create.labels.clone(),
2414 properties,
2415 output_schema,
2416 output_column,
2417 )
2418 .with_tx_context(self.viewing_epoch, self.tx_id),
2419 );
2420
2421 Ok((operator, columns))
2422 }
2423
2424 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2426 let (input_op, mut columns) = self.plan_operator(&create.input)?;
2427
2428 let from_column = columns
2430 .iter()
2431 .position(|c| c == &create.from_variable)
2432 .ok_or_else(|| {
2433 Error::Internal(format!(
2434 "Source variable '{}' not found",
2435 create.from_variable
2436 ))
2437 })?;
2438
2439 let to_column = columns
2440 .iter()
2441 .position(|c| c == &create.to_variable)
2442 .ok_or_else(|| {
2443 Error::Internal(format!(
2444 "Target variable '{}' not found",
2445 create.to_variable
2446 ))
2447 })?;
2448
2449 let output_column = create.variable.as_ref().map(|v| {
2451 let idx = columns.len();
2452 columns.push(v.clone());
2453 idx
2454 });
2455
2456 let properties: Vec<(String, PropertySource)> = create
2458 .properties
2459 .iter()
2460 .map(|(name, expr)| {
2461 let source = match expr {
2462 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2463 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2464 };
2465 (name.clone(), source)
2466 })
2467 .collect();
2468
2469 let output_schema = self.derive_schema_from_columns(&columns);
2470
2471 let mut operator = CreateEdgeOperator::new(
2472 Arc::clone(&self.store),
2473 input_op,
2474 from_column,
2475 to_column,
2476 create.edge_type.clone(),
2477 output_schema,
2478 )
2479 .with_properties(properties)
2480 .with_tx_context(self.viewing_epoch, self.tx_id);
2481
2482 if let Some(col) = output_column {
2483 operator = operator.with_output_column(col);
2484 }
2485
2486 let operator = Box::new(operator);
2487
2488 Ok((operator, columns))
2489 }
2490
2491 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2493 let (input_op, columns) = self.plan_operator(&delete.input)?;
2494
2495 let node_column = columns
2496 .iter()
2497 .position(|c| c == &delete.variable)
2498 .ok_or_else(|| {
2499 Error::Internal(format!(
2500 "Variable '{}' not found for delete",
2501 delete.variable
2502 ))
2503 })?;
2504
2505 let output_schema = vec![LogicalType::Int64];
2507 let output_columns = vec!["deleted_count".to_string()];
2508
2509 let operator = Box::new(
2510 DeleteNodeOperator::new(
2511 Arc::clone(&self.store),
2512 input_op,
2513 node_column,
2514 output_schema,
2515 delete.detach, )
2517 .with_tx_context(self.viewing_epoch, self.tx_id),
2518 );
2519
2520 Ok((operator, output_columns))
2521 }
2522
2523 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2525 let (input_op, columns) = self.plan_operator(&delete.input)?;
2526
2527 let edge_column = columns
2528 .iter()
2529 .position(|c| c == &delete.variable)
2530 .ok_or_else(|| {
2531 Error::Internal(format!(
2532 "Variable '{}' not found for delete",
2533 delete.variable
2534 ))
2535 })?;
2536
2537 let output_schema = vec![LogicalType::Int64];
2539 let output_columns = vec!["deleted_count".to_string()];
2540
2541 let operator = Box::new(
2542 DeleteEdgeOperator::new(
2543 Arc::clone(&self.store),
2544 input_op,
2545 edge_column,
2546 output_schema,
2547 )
2548 .with_tx_context(self.viewing_epoch, self.tx_id),
2549 );
2550
2551 Ok((operator, output_columns))
2552 }
2553
2554 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2556 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2557 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2558
2559 let mut columns = left_columns.clone();
2561 columns.extend(right_columns.clone());
2562
2563 let mut probe_keys = Vec::new();
2565 let mut build_keys = Vec::new();
2566
2567 for (right_idx, right_col) in right_columns.iter().enumerate() {
2568 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2569 probe_keys.push(left_idx);
2570 build_keys.push(right_idx);
2571 }
2572 }
2573
2574 let output_schema = self.derive_schema_from_columns(&columns);
2575
2576 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2577 left_op,
2578 right_op,
2579 probe_keys,
2580 build_keys,
2581 PhysicalJoinType::Left,
2582 output_schema,
2583 ));
2584
2585 Ok((operator, columns))
2586 }
2587
2588 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2590 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2591 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2592
2593 let columns = left_columns.clone();
2595
2596 let mut probe_keys = Vec::new();
2598 let mut build_keys = Vec::new();
2599
2600 for (right_idx, right_col) in right_columns.iter().enumerate() {
2601 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2602 probe_keys.push(left_idx);
2603 build_keys.push(right_idx);
2604 }
2605 }
2606
2607 let output_schema = self.derive_schema_from_columns(&columns);
2608
2609 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2610 left_op,
2611 right_op,
2612 probe_keys,
2613 build_keys,
2614 PhysicalJoinType::Anti,
2615 output_schema,
2616 ));
2617
2618 Ok((operator, columns))
2619 }
2620
2621 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2623 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2626 if matches!(&*unwind.input, LogicalOperator::Empty) {
2627 let literal_list = self.convert_expression(&unwind.expression)?;
2632
2633 let single_row_op: Box<dyn Operator> = Box::new(
2635 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2636 );
2637 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2638 single_row_op,
2639 vec![ProjectExpr::Expression {
2640 expr: literal_list,
2641 variable_columns: HashMap::new(),
2642 }],
2643 vec![LogicalType::Any],
2644 Arc::clone(&self.store),
2645 ));
2646
2647 (project_op, vec!["__list__".to_string()])
2648 } else {
2649 self.plan_operator(&unwind.input)?
2650 };
2651
2652 let list_col_idx = match &unwind.expression {
2658 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2659 LogicalExpression::Property { variable, .. } => {
2660 input_columns.iter().position(|c| c == variable)
2663 }
2664 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2665 None
2667 }
2668 _ => None,
2669 };
2670
2671 let mut columns = input_columns.clone();
2673 columns.push(unwind.variable.clone());
2674
2675 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2677 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2682
2683 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2684 input_op,
2685 col_idx,
2686 unwind.variable.clone(),
2687 output_schema,
2688 ));
2689
2690 Ok((operator, columns))
2691 }
2692
2693 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2695 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2697 Vec::new()
2698 } else {
2699 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2700 cols
2701 };
2702
2703 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2705 .match_properties
2706 .iter()
2707 .filter_map(|(name, expr)| {
2708 if let LogicalExpression::Literal(v) = expr {
2709 Some((name.clone(), v.clone()))
2710 } else {
2711 None }
2713 })
2714 .collect();
2715
2716 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2718 .on_create
2719 .iter()
2720 .filter_map(|(name, expr)| {
2721 if let LogicalExpression::Literal(v) = expr {
2722 Some((name.clone(), v.clone()))
2723 } else {
2724 None
2725 }
2726 })
2727 .collect();
2728
2729 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2731 .on_match
2732 .iter()
2733 .filter_map(|(name, expr)| {
2734 if let LogicalExpression::Literal(v) = expr {
2735 Some((name.clone(), v.clone()))
2736 } else {
2737 None
2738 }
2739 })
2740 .collect();
2741
2742 columns.push(merge.variable.clone());
2744
2745 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2746 Arc::clone(&self.store),
2747 merge.variable.clone(),
2748 merge.labels.clone(),
2749 match_properties,
2750 on_create_properties,
2751 on_match_properties,
2752 ));
2753
2754 Ok((operator, columns))
2755 }
2756
2757 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2759 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2761
2762 let source_column = columns
2764 .iter()
2765 .position(|c| c == &sp.source_var)
2766 .ok_or_else(|| {
2767 Error::Internal(format!(
2768 "Source variable '{}' not found for shortestPath",
2769 sp.source_var
2770 ))
2771 })?;
2772
2773 let target_column = columns
2774 .iter()
2775 .position(|c| c == &sp.target_var)
2776 .ok_or_else(|| {
2777 Error::Internal(format!(
2778 "Target variable '{}' not found for shortestPath",
2779 sp.target_var
2780 ))
2781 })?;
2782
2783 let direction = match sp.direction {
2785 ExpandDirection::Outgoing => Direction::Outgoing,
2786 ExpandDirection::Incoming => Direction::Incoming,
2787 ExpandDirection::Both => Direction::Both,
2788 };
2789
2790 let operator: Box<dyn Operator> = Box::new(
2792 ShortestPathOperator::new(
2793 Arc::clone(&self.store),
2794 input_op,
2795 source_column,
2796 target_column,
2797 sp.edge_type.clone(),
2798 direction,
2799 )
2800 .with_all_paths(sp.all_paths),
2801 );
2802
2803 columns.push(format!("_path_length_{}", sp.path_alias));
2806
2807 Ok((operator, columns))
2808 }
2809
2810 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2812 let (input_op, columns) = self.plan_operator(&add_label.input)?;
2813
2814 let node_column = columns
2816 .iter()
2817 .position(|c| c == &add_label.variable)
2818 .ok_or_else(|| {
2819 Error::Internal(format!(
2820 "Variable '{}' not found for ADD LABEL",
2821 add_label.variable
2822 ))
2823 })?;
2824
2825 let output_schema = vec![LogicalType::Int64];
2827 let output_columns = vec!["labels_added".to_string()];
2828
2829 let operator = Box::new(AddLabelOperator::new(
2830 Arc::clone(&self.store),
2831 input_op,
2832 node_column,
2833 add_label.labels.clone(),
2834 output_schema,
2835 ));
2836
2837 Ok((operator, output_columns))
2838 }
2839
2840 fn plan_remove_label(
2842 &self,
2843 remove_label: &RemoveLabelOp,
2844 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2845 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2846
2847 let node_column = columns
2849 .iter()
2850 .position(|c| c == &remove_label.variable)
2851 .ok_or_else(|| {
2852 Error::Internal(format!(
2853 "Variable '{}' not found for REMOVE LABEL",
2854 remove_label.variable
2855 ))
2856 })?;
2857
2858 let output_schema = vec![LogicalType::Int64];
2860 let output_columns = vec!["labels_removed".to_string()];
2861
2862 let operator = Box::new(RemoveLabelOperator::new(
2863 Arc::clone(&self.store),
2864 input_op,
2865 node_column,
2866 remove_label.labels.clone(),
2867 output_schema,
2868 ));
2869
2870 Ok((operator, output_columns))
2871 }
2872
2873 fn plan_set_property(
2875 &self,
2876 set_prop: &SetPropertyOp,
2877 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2878 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2879
2880 let entity_column = columns
2882 .iter()
2883 .position(|c| c == &set_prop.variable)
2884 .ok_or_else(|| {
2885 Error::Internal(format!(
2886 "Variable '{}' not found for SET",
2887 set_prop.variable
2888 ))
2889 })?;
2890
2891 let properties: Vec<(String, PropertySource)> = set_prop
2893 .properties
2894 .iter()
2895 .map(|(name, expr)| {
2896 let source = self.expression_to_property_source(expr, &columns)?;
2897 Ok((name.clone(), source))
2898 })
2899 .collect::<Result<Vec<_>>>()?;
2900
2901 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2903 let output_columns = columns.clone();
2904
2905 let operator = Box::new(SetPropertyOperator::new_for_node(
2907 Arc::clone(&self.store),
2908 input_op,
2909 entity_column,
2910 properties,
2911 output_schema,
2912 ));
2913
2914 Ok((operator, output_columns))
2915 }
2916
2917 fn expression_to_property_source(
2919 &self,
2920 expr: &LogicalExpression,
2921 columns: &[String],
2922 ) -> Result<PropertySource> {
2923 match expr {
2924 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2925 LogicalExpression::Variable(name) => {
2926 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2927 Error::Internal(format!("Variable '{}' not found for property source", name))
2928 })?;
2929 Ok(PropertySource::Column(col_idx))
2930 }
2931 LogicalExpression::Parameter(name) => {
2932 Ok(PropertySource::Constant(
2935 grafeo_common::types::Value::String(format!("${}", name).into()),
2936 ))
2937 }
2938 _ => Err(Error::Internal(format!(
2939 "Unsupported expression type for property source: {:?}",
2940 expr
2941 ))),
2942 }
2943 }
2944}
2945
2946pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2948 match op {
2949 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2950 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2951 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2952 BinaryOp::Le => Ok(BinaryFilterOp::Le),
2953 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2954 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2955 BinaryOp::And => Ok(BinaryFilterOp::And),
2956 BinaryOp::Or => Ok(BinaryFilterOp::Or),
2957 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2958 BinaryOp::Add => Ok(BinaryFilterOp::Add),
2959 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2960 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2961 BinaryOp::Div => Ok(BinaryFilterOp::Div),
2962 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2963 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2964 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2965 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2966 BinaryOp::In => Ok(BinaryFilterOp::In),
2967 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2968 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2969 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2970 "Binary operator {:?} not yet supported in filters",
2971 op
2972 ))),
2973 }
2974}
2975
2976pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2978 match op {
2979 UnaryOp::Not => Ok(UnaryFilterOp::Not),
2980 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2981 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2982 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2983 }
2984}
2985
2986pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2988 match func {
2989 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2990 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2991 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2992 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2993 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2994 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2995 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2996 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2997 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2998 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2999 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3000 }
3001}
3002
3003pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3007 match expr {
3008 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3009 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3010 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3011 variable: variable.clone(),
3012 property: property.clone(),
3013 }),
3014 LogicalExpression::Binary { left, op, right } => {
3015 let left_expr = convert_filter_expression(left)?;
3016 let right_expr = convert_filter_expression(right)?;
3017 let filter_op = convert_binary_op(*op)?;
3018 Ok(FilterExpression::Binary {
3019 left: Box::new(left_expr),
3020 op: filter_op,
3021 right: Box::new(right_expr),
3022 })
3023 }
3024 LogicalExpression::Unary { op, operand } => {
3025 let operand_expr = convert_filter_expression(operand)?;
3026 let filter_op = convert_unary_op(*op)?;
3027 Ok(FilterExpression::Unary {
3028 op: filter_op,
3029 operand: Box::new(operand_expr),
3030 })
3031 }
3032 LogicalExpression::FunctionCall { name, args, .. } => {
3033 let filter_args: Vec<FilterExpression> = args
3034 .iter()
3035 .map(|a| convert_filter_expression(a))
3036 .collect::<Result<Vec<_>>>()?;
3037 Ok(FilterExpression::FunctionCall {
3038 name: name.clone(),
3039 args: filter_args,
3040 })
3041 }
3042 LogicalExpression::Case {
3043 operand,
3044 when_clauses,
3045 else_clause,
3046 } => {
3047 let filter_operand = operand
3048 .as_ref()
3049 .map(|e| convert_filter_expression(e))
3050 .transpose()?
3051 .map(Box::new);
3052 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3053 .iter()
3054 .map(|(cond, result)| {
3055 Ok((
3056 convert_filter_expression(cond)?,
3057 convert_filter_expression(result)?,
3058 ))
3059 })
3060 .collect::<Result<Vec<_>>>()?;
3061 let filter_else = else_clause
3062 .as_ref()
3063 .map(|e| convert_filter_expression(e))
3064 .transpose()?
3065 .map(Box::new);
3066 Ok(FilterExpression::Case {
3067 operand: filter_operand,
3068 when_clauses: filter_when_clauses,
3069 else_clause: filter_else,
3070 })
3071 }
3072 LogicalExpression::List(items) => {
3073 let filter_items: Vec<FilterExpression> = items
3074 .iter()
3075 .map(|item| convert_filter_expression(item))
3076 .collect::<Result<Vec<_>>>()?;
3077 Ok(FilterExpression::List(filter_items))
3078 }
3079 LogicalExpression::Map(pairs) => {
3080 let filter_pairs: Vec<(String, FilterExpression)> = pairs
3081 .iter()
3082 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3083 .collect::<Result<Vec<_>>>()?;
3084 Ok(FilterExpression::Map(filter_pairs))
3085 }
3086 LogicalExpression::IndexAccess { base, index } => {
3087 let base_expr = convert_filter_expression(base)?;
3088 let index_expr = convert_filter_expression(index)?;
3089 Ok(FilterExpression::IndexAccess {
3090 base: Box::new(base_expr),
3091 index: Box::new(index_expr),
3092 })
3093 }
3094 LogicalExpression::SliceAccess { base, start, end } => {
3095 let base_expr = convert_filter_expression(base)?;
3096 let start_expr = start
3097 .as_ref()
3098 .map(|s| convert_filter_expression(s))
3099 .transpose()?
3100 .map(Box::new);
3101 let end_expr = end
3102 .as_ref()
3103 .map(|e| convert_filter_expression(e))
3104 .transpose()?
3105 .map(Box::new);
3106 Ok(FilterExpression::SliceAccess {
3107 base: Box::new(base_expr),
3108 start: start_expr,
3109 end: end_expr,
3110 })
3111 }
3112 LogicalExpression::Parameter(_) => Err(Error::Internal(
3113 "Parameters not yet supported in filters".to_string(),
3114 )),
3115 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3116 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3117 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3118 LogicalExpression::ListComprehension {
3119 variable,
3120 list_expr,
3121 filter_expr,
3122 map_expr,
3123 } => {
3124 let list = convert_filter_expression(list_expr)?;
3125 let filter = filter_expr
3126 .as_ref()
3127 .map(|f| convert_filter_expression(f))
3128 .transpose()?
3129 .map(Box::new);
3130 let map = convert_filter_expression(map_expr)?;
3131 Ok(FilterExpression::ListComprehension {
3132 variable: variable.clone(),
3133 list_expr: Box::new(list),
3134 filter_expr: filter,
3135 map_expr: Box::new(map),
3136 })
3137 }
3138 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3139 Error::Internal("Subqueries not yet supported in filters".to_string()),
3140 ),
3141 }
3142}
3143
3144fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3146 use grafeo_common::types::Value;
3147 match value {
3148 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
3150 Value::Int64(_) => LogicalType::Int64,
3151 Value::Float64(_) => LogicalType::Float64,
3152 Value::String(_) => LogicalType::String,
3153 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
3155 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
3158 }
3159}
3160
3161fn expression_to_string(expr: &LogicalExpression) -> String {
3163 match expr {
3164 LogicalExpression::Variable(name) => name.clone(),
3165 LogicalExpression::Property { variable, property } => {
3166 format!("{variable}.{property}")
3167 }
3168 LogicalExpression::Literal(value) => format!("{value:?}"),
3169 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3170 _ => "expr".to_string(),
3171 }
3172}
3173
3174pub struct PhysicalPlan {
3176 pub operator: Box<dyn Operator>,
3178 pub columns: Vec<String>,
3180 pub adaptive_context: Option<AdaptiveContext>,
3186}
3187
3188impl PhysicalPlan {
3189 #[must_use]
3191 pub fn columns(&self) -> &[String] {
3192 &self.columns
3193 }
3194
3195 pub fn into_operator(self) -> Box<dyn Operator> {
3197 self.operator
3198 }
3199
3200 #[must_use]
3202 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3203 self.adaptive_context.as_ref()
3204 }
3205
3206 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3208 self.adaptive_context.take()
3209 }
3210}
3211
3212#[allow(dead_code)]
3216struct SingleResultOperator {
3217 result: Option<grafeo_core::execution::DataChunk>,
3218}
3219
3220impl SingleResultOperator {
3221 #[allow(dead_code)]
3222 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3223 Self { result }
3224 }
3225}
3226
3227impl Operator for SingleResultOperator {
3228 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3229 Ok(self.result.take())
3230 }
3231
3232 fn reset(&mut self) {
3233 }
3235
3236 fn name(&self) -> &'static str {
3237 "SingleResult"
3238 }
3239}
3240
3241#[cfg(test)]
3242mod tests {
3243 use super::*;
3244 use crate::query::plan::{
3245 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3246 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3247 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3248 SortKey, SortOp,
3249 };
3250 use grafeo_common::types::Value;
3251
3252 fn create_test_store() -> Arc<LpgStore> {
3253 let store = Arc::new(LpgStore::new());
3254 store.create_node(&["Person"]);
3255 store.create_node(&["Person"]);
3256 store.create_node(&["Company"]);
3257 store
3258 }
3259
3260 #[test]
3263 fn test_plan_simple_scan() {
3264 let store = create_test_store();
3265 let planner = Planner::new(store);
3266
3267 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3269 items: vec![ReturnItem {
3270 expression: LogicalExpression::Variable("n".to_string()),
3271 alias: None,
3272 }],
3273 distinct: false,
3274 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3275 variable: "n".to_string(),
3276 label: Some("Person".to_string()),
3277 input: None,
3278 })),
3279 }));
3280
3281 let physical = planner.plan(&logical).unwrap();
3282 assert_eq!(physical.columns(), &["n"]);
3283 }
3284
3285 #[test]
3286 fn test_plan_scan_without_label() {
3287 let store = create_test_store();
3288 let planner = Planner::new(store);
3289
3290 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3292 items: vec![ReturnItem {
3293 expression: LogicalExpression::Variable("n".to_string()),
3294 alias: None,
3295 }],
3296 distinct: false,
3297 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3298 variable: "n".to_string(),
3299 label: None,
3300 input: None,
3301 })),
3302 }));
3303
3304 let physical = planner.plan(&logical).unwrap();
3305 assert_eq!(physical.columns(), &["n"]);
3306 }
3307
3308 #[test]
3309 fn test_plan_return_with_alias() {
3310 let store = create_test_store();
3311 let planner = Planner::new(store);
3312
3313 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3315 items: vec![ReturnItem {
3316 expression: LogicalExpression::Variable("n".to_string()),
3317 alias: Some("person".to_string()),
3318 }],
3319 distinct: false,
3320 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3321 variable: "n".to_string(),
3322 label: Some("Person".to_string()),
3323 input: None,
3324 })),
3325 }));
3326
3327 let physical = planner.plan(&logical).unwrap();
3328 assert_eq!(physical.columns(), &["person"]);
3329 }
3330
3331 #[test]
3332 fn test_plan_return_property() {
3333 let store = create_test_store();
3334 let planner = Planner::new(store);
3335
3336 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3338 items: vec![ReturnItem {
3339 expression: LogicalExpression::Property {
3340 variable: "n".to_string(),
3341 property: "name".to_string(),
3342 },
3343 alias: None,
3344 }],
3345 distinct: false,
3346 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3347 variable: "n".to_string(),
3348 label: Some("Person".to_string()),
3349 input: None,
3350 })),
3351 }));
3352
3353 let physical = planner.plan(&logical).unwrap();
3354 assert_eq!(physical.columns(), &["n.name"]);
3355 }
3356
3357 #[test]
3358 fn test_plan_return_literal() {
3359 let store = create_test_store();
3360 let planner = Planner::new(store);
3361
3362 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3364 items: vec![ReturnItem {
3365 expression: LogicalExpression::Literal(Value::Int64(42)),
3366 alias: Some("answer".to_string()),
3367 }],
3368 distinct: false,
3369 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3370 variable: "n".to_string(),
3371 label: None,
3372 input: None,
3373 })),
3374 }));
3375
3376 let physical = planner.plan(&logical).unwrap();
3377 assert_eq!(physical.columns(), &["answer"]);
3378 }
3379
3380 #[test]
3383 fn test_plan_filter_equality() {
3384 let store = create_test_store();
3385 let planner = Planner::new(store);
3386
3387 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3389 items: vec![ReturnItem {
3390 expression: LogicalExpression::Variable("n".to_string()),
3391 alias: None,
3392 }],
3393 distinct: false,
3394 input: Box::new(LogicalOperator::Filter(FilterOp {
3395 predicate: LogicalExpression::Binary {
3396 left: Box::new(LogicalExpression::Property {
3397 variable: "n".to_string(),
3398 property: "age".to_string(),
3399 }),
3400 op: BinaryOp::Eq,
3401 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3402 },
3403 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3404 variable: "n".to_string(),
3405 label: Some("Person".to_string()),
3406 input: None,
3407 })),
3408 })),
3409 }));
3410
3411 let physical = planner.plan(&logical).unwrap();
3412 assert_eq!(physical.columns(), &["n"]);
3413 }
3414
3415 #[test]
3416 fn test_plan_filter_compound_and() {
3417 let store = create_test_store();
3418 let planner = Planner::new(store);
3419
3420 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3422 items: vec![ReturnItem {
3423 expression: LogicalExpression::Variable("n".to_string()),
3424 alias: None,
3425 }],
3426 distinct: false,
3427 input: Box::new(LogicalOperator::Filter(FilterOp {
3428 predicate: LogicalExpression::Binary {
3429 left: Box::new(LogicalExpression::Binary {
3430 left: Box::new(LogicalExpression::Property {
3431 variable: "n".to_string(),
3432 property: "age".to_string(),
3433 }),
3434 op: BinaryOp::Gt,
3435 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3436 }),
3437 op: BinaryOp::And,
3438 right: Box::new(LogicalExpression::Binary {
3439 left: Box::new(LogicalExpression::Property {
3440 variable: "n".to_string(),
3441 property: "age".to_string(),
3442 }),
3443 op: BinaryOp::Lt,
3444 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3445 }),
3446 },
3447 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3448 variable: "n".to_string(),
3449 label: None,
3450 input: None,
3451 })),
3452 })),
3453 }));
3454
3455 let physical = planner.plan(&logical).unwrap();
3456 assert_eq!(physical.columns(), &["n"]);
3457 }
3458
3459 #[test]
3460 fn test_plan_filter_unary_not() {
3461 let store = create_test_store();
3462 let planner = Planner::new(store);
3463
3464 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3466 items: vec![ReturnItem {
3467 expression: LogicalExpression::Variable("n".to_string()),
3468 alias: None,
3469 }],
3470 distinct: false,
3471 input: Box::new(LogicalOperator::Filter(FilterOp {
3472 predicate: LogicalExpression::Unary {
3473 op: UnaryOp::Not,
3474 operand: Box::new(LogicalExpression::Property {
3475 variable: "n".to_string(),
3476 property: "active".to_string(),
3477 }),
3478 },
3479 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3480 variable: "n".to_string(),
3481 label: None,
3482 input: None,
3483 })),
3484 })),
3485 }));
3486
3487 let physical = planner.plan(&logical).unwrap();
3488 assert_eq!(physical.columns(), &["n"]);
3489 }
3490
3491 #[test]
3492 fn test_plan_filter_is_null() {
3493 let store = create_test_store();
3494 let planner = Planner::new(store);
3495
3496 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3498 items: vec![ReturnItem {
3499 expression: LogicalExpression::Variable("n".to_string()),
3500 alias: None,
3501 }],
3502 distinct: false,
3503 input: Box::new(LogicalOperator::Filter(FilterOp {
3504 predicate: LogicalExpression::Unary {
3505 op: UnaryOp::IsNull,
3506 operand: Box::new(LogicalExpression::Property {
3507 variable: "n".to_string(),
3508 property: "email".to_string(),
3509 }),
3510 },
3511 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3512 variable: "n".to_string(),
3513 label: None,
3514 input: None,
3515 })),
3516 })),
3517 }));
3518
3519 let physical = planner.plan(&logical).unwrap();
3520 assert_eq!(physical.columns(), &["n"]);
3521 }
3522
3523 #[test]
3524 fn test_plan_filter_function_call() {
3525 let store = create_test_store();
3526 let planner = Planner::new(store);
3527
3528 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3530 items: vec![ReturnItem {
3531 expression: LogicalExpression::Variable("n".to_string()),
3532 alias: None,
3533 }],
3534 distinct: false,
3535 input: Box::new(LogicalOperator::Filter(FilterOp {
3536 predicate: LogicalExpression::Binary {
3537 left: Box::new(LogicalExpression::FunctionCall {
3538 name: "size".to_string(),
3539 args: vec![LogicalExpression::Property {
3540 variable: "n".to_string(),
3541 property: "friends".to_string(),
3542 }],
3543 distinct: false,
3544 }),
3545 op: BinaryOp::Gt,
3546 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3547 },
3548 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3549 variable: "n".to_string(),
3550 label: None,
3551 input: None,
3552 })),
3553 })),
3554 }));
3555
3556 let physical = planner.plan(&logical).unwrap();
3557 assert_eq!(physical.columns(), &["n"]);
3558 }
3559
3560 #[test]
3563 fn test_plan_expand_outgoing() {
3564 let store = create_test_store();
3565 let planner = Planner::new(store);
3566
3567 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3569 items: vec![
3570 ReturnItem {
3571 expression: LogicalExpression::Variable("a".to_string()),
3572 alias: None,
3573 },
3574 ReturnItem {
3575 expression: LogicalExpression::Variable("b".to_string()),
3576 alias: None,
3577 },
3578 ],
3579 distinct: false,
3580 input: Box::new(LogicalOperator::Expand(ExpandOp {
3581 from_variable: "a".to_string(),
3582 to_variable: "b".to_string(),
3583 edge_variable: None,
3584 direction: ExpandDirection::Outgoing,
3585 edge_type: Some("KNOWS".to_string()),
3586 min_hops: 1,
3587 max_hops: Some(1),
3588 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3589 variable: "a".to_string(),
3590 label: Some("Person".to_string()),
3591 input: None,
3592 })),
3593 path_alias: None,
3594 })),
3595 }));
3596
3597 let physical = planner.plan(&logical).unwrap();
3598 assert!(physical.columns().contains(&"a".to_string()));
3600 assert!(physical.columns().contains(&"b".to_string()));
3601 }
3602
3603 #[test]
3604 fn test_plan_expand_with_edge_variable() {
3605 let store = create_test_store();
3606 let planner = Planner::new(store);
3607
3608 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3610 items: vec![
3611 ReturnItem {
3612 expression: LogicalExpression::Variable("a".to_string()),
3613 alias: None,
3614 },
3615 ReturnItem {
3616 expression: LogicalExpression::Variable("r".to_string()),
3617 alias: None,
3618 },
3619 ReturnItem {
3620 expression: LogicalExpression::Variable("b".to_string()),
3621 alias: None,
3622 },
3623 ],
3624 distinct: false,
3625 input: Box::new(LogicalOperator::Expand(ExpandOp {
3626 from_variable: "a".to_string(),
3627 to_variable: "b".to_string(),
3628 edge_variable: Some("r".to_string()),
3629 direction: ExpandDirection::Outgoing,
3630 edge_type: Some("KNOWS".to_string()),
3631 min_hops: 1,
3632 max_hops: Some(1),
3633 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3634 variable: "a".to_string(),
3635 label: None,
3636 input: None,
3637 })),
3638 path_alias: None,
3639 })),
3640 }));
3641
3642 let physical = planner.plan(&logical).unwrap();
3643 assert!(physical.columns().contains(&"a".to_string()));
3644 assert!(physical.columns().contains(&"r".to_string()));
3645 assert!(physical.columns().contains(&"b".to_string()));
3646 }
3647
3648 #[test]
3651 fn test_plan_limit() {
3652 let store = create_test_store();
3653 let planner = Planner::new(store);
3654
3655 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3657 items: vec![ReturnItem {
3658 expression: LogicalExpression::Variable("n".to_string()),
3659 alias: None,
3660 }],
3661 distinct: false,
3662 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3663 count: 10,
3664 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3665 variable: "n".to_string(),
3666 label: None,
3667 input: None,
3668 })),
3669 })),
3670 }));
3671
3672 let physical = planner.plan(&logical).unwrap();
3673 assert_eq!(physical.columns(), &["n"]);
3674 }
3675
3676 #[test]
3677 fn test_plan_skip() {
3678 let store = create_test_store();
3679 let planner = Planner::new(store);
3680
3681 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3683 items: vec![ReturnItem {
3684 expression: LogicalExpression::Variable("n".to_string()),
3685 alias: None,
3686 }],
3687 distinct: false,
3688 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3689 count: 5,
3690 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3691 variable: "n".to_string(),
3692 label: None,
3693 input: None,
3694 })),
3695 })),
3696 }));
3697
3698 let physical = planner.plan(&logical).unwrap();
3699 assert_eq!(physical.columns(), &["n"]);
3700 }
3701
3702 #[test]
3703 fn test_plan_sort() {
3704 let store = create_test_store();
3705 let planner = Planner::new(store);
3706
3707 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3709 items: vec![ReturnItem {
3710 expression: LogicalExpression::Variable("n".to_string()),
3711 alias: None,
3712 }],
3713 distinct: false,
3714 input: Box::new(LogicalOperator::Sort(SortOp {
3715 keys: vec![SortKey {
3716 expression: LogicalExpression::Variable("n".to_string()),
3717 order: SortOrder::Ascending,
3718 }],
3719 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3720 variable: "n".to_string(),
3721 label: None,
3722 input: None,
3723 })),
3724 })),
3725 }));
3726
3727 let physical = planner.plan(&logical).unwrap();
3728 assert_eq!(physical.columns(), &["n"]);
3729 }
3730
3731 #[test]
3732 fn test_plan_sort_descending() {
3733 let store = create_test_store();
3734 let planner = Planner::new(store);
3735
3736 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3738 items: vec![ReturnItem {
3739 expression: LogicalExpression::Variable("n".to_string()),
3740 alias: None,
3741 }],
3742 distinct: false,
3743 input: Box::new(LogicalOperator::Sort(SortOp {
3744 keys: vec![SortKey {
3745 expression: LogicalExpression::Variable("n".to_string()),
3746 order: SortOrder::Descending,
3747 }],
3748 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3749 variable: "n".to_string(),
3750 label: None,
3751 input: None,
3752 })),
3753 })),
3754 }));
3755
3756 let physical = planner.plan(&logical).unwrap();
3757 assert_eq!(physical.columns(), &["n"]);
3758 }
3759
3760 #[test]
3761 fn test_plan_distinct() {
3762 let store = create_test_store();
3763 let planner = Planner::new(store);
3764
3765 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3767 items: vec![ReturnItem {
3768 expression: LogicalExpression::Variable("n".to_string()),
3769 alias: None,
3770 }],
3771 distinct: false,
3772 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3773 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3774 variable: "n".to_string(),
3775 label: None,
3776 input: None,
3777 })),
3778 columns: None,
3779 })),
3780 }));
3781
3782 let physical = planner.plan(&logical).unwrap();
3783 assert_eq!(physical.columns(), &["n"]);
3784 }
3785
3786 #[test]
3789 fn test_plan_aggregate_count() {
3790 let store = create_test_store();
3791 let planner = Planner::new(store);
3792
3793 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3795 items: vec![ReturnItem {
3796 expression: LogicalExpression::Variable("cnt".to_string()),
3797 alias: None,
3798 }],
3799 distinct: false,
3800 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3801 group_by: vec![],
3802 aggregates: vec![LogicalAggregateExpr {
3803 function: LogicalAggregateFunction::Count,
3804 expression: Some(LogicalExpression::Variable("n".to_string())),
3805 distinct: false,
3806 alias: Some("cnt".to_string()),
3807 percentile: None,
3808 }],
3809 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3810 variable: "n".to_string(),
3811 label: None,
3812 input: None,
3813 })),
3814 having: None,
3815 })),
3816 }));
3817
3818 let physical = planner.plan(&logical).unwrap();
3819 assert!(physical.columns().contains(&"cnt".to_string()));
3820 }
3821
3822 #[test]
3823 fn test_plan_aggregate_with_group_by() {
3824 let store = create_test_store();
3825 let planner = Planner::new(store);
3826
3827 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3829 group_by: vec![LogicalExpression::Property {
3830 variable: "n".to_string(),
3831 property: "city".to_string(),
3832 }],
3833 aggregates: vec![LogicalAggregateExpr {
3834 function: LogicalAggregateFunction::Count,
3835 expression: Some(LogicalExpression::Variable("n".to_string())),
3836 distinct: false,
3837 alias: Some("cnt".to_string()),
3838 percentile: None,
3839 }],
3840 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3841 variable: "n".to_string(),
3842 label: Some("Person".to_string()),
3843 input: None,
3844 })),
3845 having: None,
3846 }));
3847
3848 let physical = planner.plan(&logical).unwrap();
3849 assert_eq!(physical.columns().len(), 2);
3850 }
3851
3852 #[test]
3853 fn test_plan_aggregate_sum() {
3854 let store = create_test_store();
3855 let planner = Planner::new(store);
3856
3857 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3859 group_by: vec![],
3860 aggregates: vec![LogicalAggregateExpr {
3861 function: LogicalAggregateFunction::Sum,
3862 expression: Some(LogicalExpression::Property {
3863 variable: "n".to_string(),
3864 property: "value".to_string(),
3865 }),
3866 distinct: false,
3867 alias: Some("total".to_string()),
3868 percentile: None,
3869 }],
3870 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3871 variable: "n".to_string(),
3872 label: None,
3873 input: None,
3874 })),
3875 having: None,
3876 }));
3877
3878 let physical = planner.plan(&logical).unwrap();
3879 assert!(physical.columns().contains(&"total".to_string()));
3880 }
3881
3882 #[test]
3883 fn test_plan_aggregate_avg() {
3884 let store = create_test_store();
3885 let planner = Planner::new(store);
3886
3887 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3889 group_by: vec![],
3890 aggregates: vec![LogicalAggregateExpr {
3891 function: LogicalAggregateFunction::Avg,
3892 expression: Some(LogicalExpression::Property {
3893 variable: "n".to_string(),
3894 property: "score".to_string(),
3895 }),
3896 distinct: false,
3897 alias: Some("average".to_string()),
3898 percentile: None,
3899 }],
3900 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3901 variable: "n".to_string(),
3902 label: None,
3903 input: None,
3904 })),
3905 having: None,
3906 }));
3907
3908 let physical = planner.plan(&logical).unwrap();
3909 assert!(physical.columns().contains(&"average".to_string()));
3910 }
3911
3912 #[test]
3913 fn test_plan_aggregate_min_max() {
3914 let store = create_test_store();
3915 let planner = Planner::new(store);
3916
3917 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3919 group_by: vec![],
3920 aggregates: vec![
3921 LogicalAggregateExpr {
3922 function: LogicalAggregateFunction::Min,
3923 expression: Some(LogicalExpression::Property {
3924 variable: "n".to_string(),
3925 property: "age".to_string(),
3926 }),
3927 distinct: false,
3928 alias: Some("youngest".to_string()),
3929 percentile: None,
3930 },
3931 LogicalAggregateExpr {
3932 function: LogicalAggregateFunction::Max,
3933 expression: Some(LogicalExpression::Property {
3934 variable: "n".to_string(),
3935 property: "age".to_string(),
3936 }),
3937 distinct: false,
3938 alias: Some("oldest".to_string()),
3939 percentile: None,
3940 },
3941 ],
3942 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3943 variable: "n".to_string(),
3944 label: None,
3945 input: None,
3946 })),
3947 having: None,
3948 }));
3949
3950 let physical = planner.plan(&logical).unwrap();
3951 assert!(physical.columns().contains(&"youngest".to_string()));
3952 assert!(physical.columns().contains(&"oldest".to_string()));
3953 }
3954
3955 #[test]
3958 fn test_plan_inner_join() {
3959 let store = create_test_store();
3960 let planner = Planner::new(store);
3961
3962 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3964 items: vec![
3965 ReturnItem {
3966 expression: LogicalExpression::Variable("a".to_string()),
3967 alias: None,
3968 },
3969 ReturnItem {
3970 expression: LogicalExpression::Variable("b".to_string()),
3971 alias: None,
3972 },
3973 ],
3974 distinct: false,
3975 input: Box::new(LogicalOperator::Join(JoinOp {
3976 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3977 variable: "a".to_string(),
3978 label: Some("Person".to_string()),
3979 input: None,
3980 })),
3981 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3982 variable: "b".to_string(),
3983 label: Some("Company".to_string()),
3984 input: None,
3985 })),
3986 join_type: JoinType::Inner,
3987 conditions: vec![JoinCondition {
3988 left: LogicalExpression::Variable("a".to_string()),
3989 right: LogicalExpression::Variable("b".to_string()),
3990 }],
3991 })),
3992 }));
3993
3994 let physical = planner.plan(&logical).unwrap();
3995 assert!(physical.columns().contains(&"a".to_string()));
3996 assert!(physical.columns().contains(&"b".to_string()));
3997 }
3998
3999 #[test]
4000 fn test_plan_cross_join() {
4001 let store = create_test_store();
4002 let planner = Planner::new(store);
4003
4004 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4006 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4007 variable: "a".to_string(),
4008 label: None,
4009 input: None,
4010 })),
4011 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4012 variable: "b".to_string(),
4013 label: None,
4014 input: None,
4015 })),
4016 join_type: JoinType::Cross,
4017 conditions: vec![],
4018 }));
4019
4020 let physical = planner.plan(&logical).unwrap();
4021 assert_eq!(physical.columns().len(), 2);
4022 }
4023
4024 #[test]
4025 fn test_plan_left_join() {
4026 let store = create_test_store();
4027 let planner = Planner::new(store);
4028
4029 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4030 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4031 variable: "a".to_string(),
4032 label: None,
4033 input: None,
4034 })),
4035 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4036 variable: "b".to_string(),
4037 label: None,
4038 input: None,
4039 })),
4040 join_type: JoinType::Left,
4041 conditions: vec![],
4042 }));
4043
4044 let physical = planner.plan(&logical).unwrap();
4045 assert_eq!(physical.columns().len(), 2);
4046 }
4047
4048 #[test]
4051 fn test_plan_create_node() {
4052 let store = create_test_store();
4053 let planner = Planner::new(store);
4054
4055 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4057 variable: "n".to_string(),
4058 labels: vec!["Person".to_string()],
4059 properties: vec![(
4060 "name".to_string(),
4061 LogicalExpression::Literal(Value::String("Alice".into())),
4062 )],
4063 input: None,
4064 }));
4065
4066 let physical = planner.plan(&logical).unwrap();
4067 assert!(physical.columns().contains(&"n".to_string()));
4068 }
4069
4070 #[test]
4071 fn test_plan_create_edge() {
4072 let store = create_test_store();
4073 let planner = Planner::new(store);
4074
4075 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4077 variable: Some("r".to_string()),
4078 from_variable: "a".to_string(),
4079 to_variable: "b".to_string(),
4080 edge_type: "KNOWS".to_string(),
4081 properties: vec![],
4082 input: Box::new(LogicalOperator::Join(JoinOp {
4083 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4084 variable: "a".to_string(),
4085 label: None,
4086 input: None,
4087 })),
4088 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4089 variable: "b".to_string(),
4090 label: None,
4091 input: None,
4092 })),
4093 join_type: JoinType::Cross,
4094 conditions: vec![],
4095 })),
4096 }));
4097
4098 let physical = planner.plan(&logical).unwrap();
4099 assert!(physical.columns().contains(&"r".to_string()));
4100 }
4101
4102 #[test]
4103 fn test_plan_delete_node() {
4104 let store = create_test_store();
4105 let planner = Planner::new(store);
4106
4107 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4109 variable: "n".to_string(),
4110 detach: false,
4111 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4112 variable: "n".to_string(),
4113 label: None,
4114 input: None,
4115 })),
4116 }));
4117
4118 let physical = planner.plan(&logical).unwrap();
4119 assert!(physical.columns().contains(&"deleted_count".to_string()));
4120 }
4121
4122 #[test]
4125 fn test_plan_empty_errors() {
4126 let store = create_test_store();
4127 let planner = Planner::new(store);
4128
4129 let logical = LogicalPlan::new(LogicalOperator::Empty);
4130 let result = planner.plan(&logical);
4131 assert!(result.is_err());
4132 }
4133
4134 #[test]
4135 fn test_plan_missing_variable_in_return() {
4136 let store = create_test_store();
4137 let planner = Planner::new(store);
4138
4139 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4141 items: vec![ReturnItem {
4142 expression: LogicalExpression::Variable("missing".to_string()),
4143 alias: None,
4144 }],
4145 distinct: false,
4146 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4147 variable: "n".to_string(),
4148 label: None,
4149 input: None,
4150 })),
4151 }));
4152
4153 let result = planner.plan(&logical);
4154 assert!(result.is_err());
4155 }
4156
4157 #[test]
4160 fn test_convert_binary_ops() {
4161 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4162 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4163 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4164 assert!(convert_binary_op(BinaryOp::Le).is_ok());
4165 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4166 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4167 assert!(convert_binary_op(BinaryOp::And).is_ok());
4168 assert!(convert_binary_op(BinaryOp::Or).is_ok());
4169 assert!(convert_binary_op(BinaryOp::Add).is_ok());
4170 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4171 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4172 assert!(convert_binary_op(BinaryOp::Div).is_ok());
4173 }
4174
4175 #[test]
4176 fn test_convert_unary_ops() {
4177 assert!(convert_unary_op(UnaryOp::Not).is_ok());
4178 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4179 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4180 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4181 }
4182
4183 #[test]
4184 fn test_convert_aggregate_functions() {
4185 assert!(matches!(
4186 convert_aggregate_function(LogicalAggregateFunction::Count),
4187 PhysicalAggregateFunction::Count
4188 ));
4189 assert!(matches!(
4190 convert_aggregate_function(LogicalAggregateFunction::Sum),
4191 PhysicalAggregateFunction::Sum
4192 ));
4193 assert!(matches!(
4194 convert_aggregate_function(LogicalAggregateFunction::Avg),
4195 PhysicalAggregateFunction::Avg
4196 ));
4197 assert!(matches!(
4198 convert_aggregate_function(LogicalAggregateFunction::Min),
4199 PhysicalAggregateFunction::Min
4200 ));
4201 assert!(matches!(
4202 convert_aggregate_function(LogicalAggregateFunction::Max),
4203 PhysicalAggregateFunction::Max
4204 ));
4205 }
4206
4207 #[test]
4208 fn test_planner_accessors() {
4209 let store = create_test_store();
4210 let planner = Planner::new(Arc::clone(&store));
4211
4212 assert!(planner.tx_id().is_none());
4213 assert!(planner.tx_manager().is_none());
4214 let _ = planner.viewing_epoch(); }
4216
4217 #[test]
4218 fn test_physical_plan_accessors() {
4219 let store = create_test_store();
4220 let planner = Planner::new(store);
4221
4222 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4223 variable: "n".to_string(),
4224 label: None,
4225 input: None,
4226 }));
4227
4228 let physical = planner.plan(&logical).unwrap();
4229 assert_eq!(physical.columns(), &["n"]);
4230
4231 let _ = physical.into_operator();
4233 }
4234
4235 #[test]
4238 fn test_plan_adaptive_with_scan() {
4239 let store = create_test_store();
4240 let planner = Planner::new(store);
4241
4242 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4244 items: vec![ReturnItem {
4245 expression: LogicalExpression::Variable("n".to_string()),
4246 alias: None,
4247 }],
4248 distinct: false,
4249 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4250 variable: "n".to_string(),
4251 label: Some("Person".to_string()),
4252 input: None,
4253 })),
4254 }));
4255
4256 let physical = planner.plan_adaptive(&logical).unwrap();
4257 assert_eq!(physical.columns(), &["n"]);
4258 assert!(physical.adaptive_context.is_some());
4260 }
4261
4262 #[test]
4263 fn test_plan_adaptive_with_filter() {
4264 let store = create_test_store();
4265 let planner = Planner::new(store);
4266
4267 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4269 items: vec![ReturnItem {
4270 expression: LogicalExpression::Variable("n".to_string()),
4271 alias: None,
4272 }],
4273 distinct: false,
4274 input: Box::new(LogicalOperator::Filter(FilterOp {
4275 predicate: LogicalExpression::Binary {
4276 left: Box::new(LogicalExpression::Property {
4277 variable: "n".to_string(),
4278 property: "age".to_string(),
4279 }),
4280 op: BinaryOp::Gt,
4281 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4282 },
4283 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4284 variable: "n".to_string(),
4285 label: None,
4286 input: None,
4287 })),
4288 })),
4289 }));
4290
4291 let physical = planner.plan_adaptive(&logical).unwrap();
4292 assert!(physical.adaptive_context.is_some());
4293 }
4294
4295 #[test]
4296 fn test_plan_adaptive_with_expand() {
4297 let store = create_test_store();
4298 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4299
4300 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4302 items: vec![
4303 ReturnItem {
4304 expression: LogicalExpression::Variable("a".to_string()),
4305 alias: None,
4306 },
4307 ReturnItem {
4308 expression: LogicalExpression::Variable("b".to_string()),
4309 alias: None,
4310 },
4311 ],
4312 distinct: false,
4313 input: Box::new(LogicalOperator::Expand(ExpandOp {
4314 from_variable: "a".to_string(),
4315 to_variable: "b".to_string(),
4316 edge_variable: None,
4317 direction: ExpandDirection::Outgoing,
4318 edge_type: Some("KNOWS".to_string()),
4319 min_hops: 1,
4320 max_hops: Some(1),
4321 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4322 variable: "a".to_string(),
4323 label: None,
4324 input: None,
4325 })),
4326 path_alias: None,
4327 })),
4328 }));
4329
4330 let physical = planner.plan_adaptive(&logical).unwrap();
4331 assert!(physical.adaptive_context.is_some());
4332 }
4333
4334 #[test]
4335 fn test_plan_adaptive_with_join() {
4336 let store = create_test_store();
4337 let planner = Planner::new(store);
4338
4339 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4340 items: vec![
4341 ReturnItem {
4342 expression: LogicalExpression::Variable("a".to_string()),
4343 alias: None,
4344 },
4345 ReturnItem {
4346 expression: LogicalExpression::Variable("b".to_string()),
4347 alias: None,
4348 },
4349 ],
4350 distinct: false,
4351 input: Box::new(LogicalOperator::Join(JoinOp {
4352 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4353 variable: "a".to_string(),
4354 label: None,
4355 input: None,
4356 })),
4357 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4358 variable: "b".to_string(),
4359 label: None,
4360 input: None,
4361 })),
4362 join_type: JoinType::Cross,
4363 conditions: vec![],
4364 })),
4365 }));
4366
4367 let physical = planner.plan_adaptive(&logical).unwrap();
4368 assert!(physical.adaptive_context.is_some());
4369 }
4370
4371 #[test]
4372 fn test_plan_adaptive_with_aggregate() {
4373 let store = create_test_store();
4374 let planner = Planner::new(store);
4375
4376 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4377 group_by: vec![],
4378 aggregates: vec![LogicalAggregateExpr {
4379 function: LogicalAggregateFunction::Count,
4380 expression: Some(LogicalExpression::Variable("n".to_string())),
4381 distinct: false,
4382 alias: Some("cnt".to_string()),
4383 percentile: None,
4384 }],
4385 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4386 variable: "n".to_string(),
4387 label: None,
4388 input: None,
4389 })),
4390 having: None,
4391 }));
4392
4393 let physical = planner.plan_adaptive(&logical).unwrap();
4394 assert!(physical.adaptive_context.is_some());
4395 }
4396
4397 #[test]
4398 fn test_plan_adaptive_with_distinct() {
4399 let store = create_test_store();
4400 let planner = Planner::new(store);
4401
4402 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4403 items: vec![ReturnItem {
4404 expression: LogicalExpression::Variable("n".to_string()),
4405 alias: None,
4406 }],
4407 distinct: false,
4408 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4409 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4410 variable: "n".to_string(),
4411 label: None,
4412 input: None,
4413 })),
4414 columns: None,
4415 })),
4416 }));
4417
4418 let physical = planner.plan_adaptive(&logical).unwrap();
4419 assert!(physical.adaptive_context.is_some());
4420 }
4421
4422 #[test]
4423 fn test_plan_adaptive_with_limit() {
4424 let store = create_test_store();
4425 let planner = Planner::new(store);
4426
4427 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4428 items: vec![ReturnItem {
4429 expression: LogicalExpression::Variable("n".to_string()),
4430 alias: None,
4431 }],
4432 distinct: false,
4433 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4434 count: 10,
4435 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4436 variable: "n".to_string(),
4437 label: None,
4438 input: None,
4439 })),
4440 })),
4441 }));
4442
4443 let physical = planner.plan_adaptive(&logical).unwrap();
4444 assert!(physical.adaptive_context.is_some());
4445 }
4446
4447 #[test]
4448 fn test_plan_adaptive_with_skip() {
4449 let store = create_test_store();
4450 let planner = Planner::new(store);
4451
4452 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4453 items: vec![ReturnItem {
4454 expression: LogicalExpression::Variable("n".to_string()),
4455 alias: None,
4456 }],
4457 distinct: false,
4458 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4459 count: 5,
4460 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4461 variable: "n".to_string(),
4462 label: None,
4463 input: None,
4464 })),
4465 })),
4466 }));
4467
4468 let physical = planner.plan_adaptive(&logical).unwrap();
4469 assert!(physical.adaptive_context.is_some());
4470 }
4471
4472 #[test]
4473 fn test_plan_adaptive_with_sort() {
4474 let store = create_test_store();
4475 let planner = Planner::new(store);
4476
4477 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4478 items: vec![ReturnItem {
4479 expression: LogicalExpression::Variable("n".to_string()),
4480 alias: None,
4481 }],
4482 distinct: false,
4483 input: Box::new(LogicalOperator::Sort(SortOp {
4484 keys: vec![SortKey {
4485 expression: LogicalExpression::Variable("n".to_string()),
4486 order: SortOrder::Ascending,
4487 }],
4488 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4489 variable: "n".to_string(),
4490 label: None,
4491 input: None,
4492 })),
4493 })),
4494 }));
4495
4496 let physical = planner.plan_adaptive(&logical).unwrap();
4497 assert!(physical.adaptive_context.is_some());
4498 }
4499
4500 #[test]
4501 fn test_plan_adaptive_with_union() {
4502 let store = create_test_store();
4503 let planner = Planner::new(store);
4504
4505 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4506 items: vec![ReturnItem {
4507 expression: LogicalExpression::Variable("n".to_string()),
4508 alias: None,
4509 }],
4510 distinct: false,
4511 input: Box::new(LogicalOperator::Union(UnionOp {
4512 inputs: vec![
4513 LogicalOperator::NodeScan(NodeScanOp {
4514 variable: "n".to_string(),
4515 label: Some("Person".to_string()),
4516 input: None,
4517 }),
4518 LogicalOperator::NodeScan(NodeScanOp {
4519 variable: "n".to_string(),
4520 label: Some("Company".to_string()),
4521 input: None,
4522 }),
4523 ],
4524 })),
4525 }));
4526
4527 let physical = planner.plan_adaptive(&logical).unwrap();
4528 assert!(physical.adaptive_context.is_some());
4529 }
4530
4531 #[test]
4534 fn test_plan_expand_variable_length() {
4535 let store = create_test_store();
4536 let planner = Planner::new(store);
4537
4538 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4540 items: vec![
4541 ReturnItem {
4542 expression: LogicalExpression::Variable("a".to_string()),
4543 alias: None,
4544 },
4545 ReturnItem {
4546 expression: LogicalExpression::Variable("b".to_string()),
4547 alias: None,
4548 },
4549 ],
4550 distinct: false,
4551 input: Box::new(LogicalOperator::Expand(ExpandOp {
4552 from_variable: "a".to_string(),
4553 to_variable: "b".to_string(),
4554 edge_variable: None,
4555 direction: ExpandDirection::Outgoing,
4556 edge_type: Some("KNOWS".to_string()),
4557 min_hops: 1,
4558 max_hops: Some(3),
4559 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4560 variable: "a".to_string(),
4561 label: None,
4562 input: None,
4563 })),
4564 path_alias: None,
4565 })),
4566 }));
4567
4568 let physical = planner.plan(&logical).unwrap();
4569 assert!(physical.columns().contains(&"a".to_string()));
4570 assert!(physical.columns().contains(&"b".to_string()));
4571 }
4572
4573 #[test]
4574 fn test_plan_expand_with_path_alias() {
4575 let store = create_test_store();
4576 let planner = Planner::new(store);
4577
4578 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4580 items: vec![
4581 ReturnItem {
4582 expression: LogicalExpression::Variable("a".to_string()),
4583 alias: None,
4584 },
4585 ReturnItem {
4586 expression: LogicalExpression::Variable("b".to_string()),
4587 alias: None,
4588 },
4589 ],
4590 distinct: false,
4591 input: Box::new(LogicalOperator::Expand(ExpandOp {
4592 from_variable: "a".to_string(),
4593 to_variable: "b".to_string(),
4594 edge_variable: None,
4595 direction: ExpandDirection::Outgoing,
4596 edge_type: Some("KNOWS".to_string()),
4597 min_hops: 1,
4598 max_hops: Some(3),
4599 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4600 variable: "a".to_string(),
4601 label: None,
4602 input: None,
4603 })),
4604 path_alias: Some("p".to_string()),
4605 })),
4606 }));
4607
4608 let physical = planner.plan(&logical).unwrap();
4609 assert!(physical.columns().contains(&"a".to_string()));
4611 assert!(physical.columns().contains(&"b".to_string()));
4612 }
4613
4614 #[test]
4615 fn test_plan_expand_incoming() {
4616 let store = create_test_store();
4617 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4618
4619 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4621 items: vec![
4622 ReturnItem {
4623 expression: LogicalExpression::Variable("a".to_string()),
4624 alias: None,
4625 },
4626 ReturnItem {
4627 expression: LogicalExpression::Variable("b".to_string()),
4628 alias: None,
4629 },
4630 ],
4631 distinct: false,
4632 input: Box::new(LogicalOperator::Expand(ExpandOp {
4633 from_variable: "a".to_string(),
4634 to_variable: "b".to_string(),
4635 edge_variable: None,
4636 direction: ExpandDirection::Incoming,
4637 edge_type: Some("KNOWS".to_string()),
4638 min_hops: 1,
4639 max_hops: Some(1),
4640 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4641 variable: "a".to_string(),
4642 label: None,
4643 input: None,
4644 })),
4645 path_alias: None,
4646 })),
4647 }));
4648
4649 let physical = planner.plan(&logical).unwrap();
4650 assert!(physical.columns().contains(&"a".to_string()));
4651 assert!(physical.columns().contains(&"b".to_string()));
4652 }
4653
4654 #[test]
4655 fn test_plan_expand_both_directions() {
4656 let store = create_test_store();
4657 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4658
4659 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4661 items: vec![
4662 ReturnItem {
4663 expression: LogicalExpression::Variable("a".to_string()),
4664 alias: None,
4665 },
4666 ReturnItem {
4667 expression: LogicalExpression::Variable("b".to_string()),
4668 alias: None,
4669 },
4670 ],
4671 distinct: false,
4672 input: Box::new(LogicalOperator::Expand(ExpandOp {
4673 from_variable: "a".to_string(),
4674 to_variable: "b".to_string(),
4675 edge_variable: None,
4676 direction: ExpandDirection::Both,
4677 edge_type: Some("KNOWS".to_string()),
4678 min_hops: 1,
4679 max_hops: Some(1),
4680 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4681 variable: "a".to_string(),
4682 label: None,
4683 input: None,
4684 })),
4685 path_alias: None,
4686 })),
4687 }));
4688
4689 let physical = planner.plan(&logical).unwrap();
4690 assert!(physical.columns().contains(&"a".to_string()));
4691 assert!(physical.columns().contains(&"b".to_string()));
4692 }
4693
4694 #[test]
4697 fn test_planner_with_context() {
4698 use crate::transaction::TransactionManager;
4699
4700 let store = create_test_store();
4701 let tx_manager = Arc::new(TransactionManager::new());
4702 let tx_id = tx_manager.begin();
4703 let epoch = tx_manager.current_epoch();
4704
4705 let planner = Planner::with_context(
4706 Arc::clone(&store),
4707 Arc::clone(&tx_manager),
4708 Some(tx_id),
4709 epoch,
4710 );
4711
4712 assert_eq!(planner.tx_id(), Some(tx_id));
4713 assert!(planner.tx_manager().is_some());
4714 assert_eq!(planner.viewing_epoch(), epoch);
4715 }
4716
4717 #[test]
4718 fn test_planner_with_factorized_execution_disabled() {
4719 let store = create_test_store();
4720 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4721
4722 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4724 items: vec![
4725 ReturnItem {
4726 expression: LogicalExpression::Variable("a".to_string()),
4727 alias: None,
4728 },
4729 ReturnItem {
4730 expression: LogicalExpression::Variable("c".to_string()),
4731 alias: None,
4732 },
4733 ],
4734 distinct: false,
4735 input: Box::new(LogicalOperator::Expand(ExpandOp {
4736 from_variable: "b".to_string(),
4737 to_variable: "c".to_string(),
4738 edge_variable: None,
4739 direction: ExpandDirection::Outgoing,
4740 edge_type: None,
4741 min_hops: 1,
4742 max_hops: Some(1),
4743 input: Box::new(LogicalOperator::Expand(ExpandOp {
4744 from_variable: "a".to_string(),
4745 to_variable: "b".to_string(),
4746 edge_variable: None,
4747 direction: ExpandDirection::Outgoing,
4748 edge_type: None,
4749 min_hops: 1,
4750 max_hops: Some(1),
4751 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4752 variable: "a".to_string(),
4753 label: None,
4754 input: None,
4755 })),
4756 path_alias: None,
4757 })),
4758 path_alias: None,
4759 })),
4760 }));
4761
4762 let physical = planner.plan(&logical).unwrap();
4763 assert!(physical.columns().contains(&"a".to_string()));
4764 assert!(physical.columns().contains(&"c".to_string()));
4765 }
4766
4767 #[test]
4770 fn test_plan_sort_by_property() {
4771 let store = create_test_store();
4772 let planner = Planner::new(store);
4773
4774 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4776 items: vec![ReturnItem {
4777 expression: LogicalExpression::Variable("n".to_string()),
4778 alias: None,
4779 }],
4780 distinct: false,
4781 input: Box::new(LogicalOperator::Sort(SortOp {
4782 keys: vec![SortKey {
4783 expression: LogicalExpression::Property {
4784 variable: "n".to_string(),
4785 property: "name".to_string(),
4786 },
4787 order: SortOrder::Ascending,
4788 }],
4789 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4790 variable: "n".to_string(),
4791 label: None,
4792 input: None,
4793 })),
4794 })),
4795 }));
4796
4797 let physical = planner.plan(&logical).unwrap();
4798 assert!(physical.columns().contains(&"n".to_string()));
4800 }
4801
4802 #[test]
4805 fn test_plan_scan_with_input() {
4806 let store = create_test_store();
4807 let planner = Planner::new(store);
4808
4809 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4811 items: vec![
4812 ReturnItem {
4813 expression: LogicalExpression::Variable("a".to_string()),
4814 alias: None,
4815 },
4816 ReturnItem {
4817 expression: LogicalExpression::Variable("b".to_string()),
4818 alias: None,
4819 },
4820 ],
4821 distinct: false,
4822 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4823 variable: "b".to_string(),
4824 label: Some("Company".to_string()),
4825 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4826 variable: "a".to_string(),
4827 label: Some("Person".to_string()),
4828 input: None,
4829 }))),
4830 })),
4831 }));
4832
4833 let physical = planner.plan(&logical).unwrap();
4834 assert!(physical.columns().contains(&"a".to_string()));
4835 assert!(physical.columns().contains(&"b".to_string()));
4836 }
4837}