1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29 UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37pub struct Planner {
39 store: Arc<LpgStore>,
41 tx_manager: Option<Arc<TransactionManager>>,
43 tx_id: Option<TxId>,
45 viewing_epoch: EpochId,
47 anon_edge_counter: std::cell::Cell<u32>,
49 factorized_execution: bool,
51}
52
53impl Planner {
54 #[must_use]
59 pub fn new(store: Arc<LpgStore>) -> Self {
60 let epoch = store.current_epoch();
61 Self {
62 store,
63 tx_manager: None,
64 tx_id: None,
65 viewing_epoch: epoch,
66 anon_edge_counter: std::cell::Cell::new(0),
67 factorized_execution: true,
68 }
69 }
70
71 #[must_use]
80 pub fn with_context(
81 store: Arc<LpgStore>,
82 tx_manager: Arc<TransactionManager>,
83 tx_id: Option<TxId>,
84 viewing_epoch: EpochId,
85 ) -> Self {
86 Self {
87 store,
88 tx_manager: Some(tx_manager),
89 tx_id,
90 viewing_epoch,
91 anon_edge_counter: std::cell::Cell::new(0),
92 factorized_execution: true,
93 }
94 }
95
96 #[must_use]
98 pub fn viewing_epoch(&self) -> EpochId {
99 self.viewing_epoch
100 }
101
102 #[must_use]
104 pub fn tx_id(&self) -> Option<TxId> {
105 self.tx_id
106 }
107
108 #[must_use]
110 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
111 self.tx_manager.as_ref()
112 }
113
114 #[must_use]
116 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
117 self.factorized_execution = enabled;
118 self
119 }
120
121 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
125 match op {
126 LogicalOperator::Expand(expand) => {
127 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
129
130 if is_single_hop {
131 let (inner_count, base) = Self::count_expand_chain(&expand.input);
132 (inner_count + 1, base)
133 } else {
134 (0, op)
136 }
137 }
138 _ => (0, op),
139 }
140 }
141
142 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
146 let mut chain = Vec::new();
147 let mut current = op;
148
149 while let LogicalOperator::Expand(expand) = current {
150 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
152 if !is_single_hop {
153 break;
154 }
155 chain.push(expand);
156 current = &expand.input;
157 }
158
159 chain.reverse();
161 chain
162 }
163
164 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
170 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
171 Ok(PhysicalPlan {
172 operator,
173 columns,
174 adaptive_context: None,
175 })
176 }
177
178 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
187 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
188
189 let mut adaptive_context = AdaptiveContext::new();
191 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
192
193 Ok(PhysicalPlan {
194 operator,
195 columns,
196 adaptive_context: Some(adaptive_context),
197 })
198 }
199
200 fn collect_cardinality_estimates(
202 &self,
203 op: &LogicalOperator,
204 ctx: &mut AdaptiveContext,
205 depth: usize,
206 ) {
207 match op {
208 LogicalOperator::NodeScan(scan) => {
209 let estimate = if let Some(label) = &scan.label {
211 self.store.nodes_by_label(label).len() as f64
212 } else {
213 self.store.node_count() as f64
214 };
215 let id = format!("scan_{}", scan.variable);
216 ctx.set_estimate(&id, estimate);
217
218 if let Some(input) = &scan.input {
220 self.collect_cardinality_estimates(input, ctx, depth + 1);
221 }
222 }
223 LogicalOperator::Filter(filter) => {
224 let input_estimate = self.estimate_cardinality(&filter.input);
226 let estimate = input_estimate * 0.3;
227 let id = format!("filter_{depth}");
228 ctx.set_estimate(&id, estimate);
229
230 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
231 }
232 LogicalOperator::Expand(expand) => {
233 let input_estimate = self.estimate_cardinality(&expand.input);
235 let avg_degree = 10.0; let estimate = input_estimate * avg_degree;
237 let id = format!("expand_{}", expand.to_variable);
238 ctx.set_estimate(&id, estimate);
239
240 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
241 }
242 LogicalOperator::Join(join) => {
243 let left_est = self.estimate_cardinality(&join.left);
245 let right_est = self.estimate_cardinality(&join.right);
246 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
248 ctx.set_estimate(&id, estimate);
249
250 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
251 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
252 }
253 LogicalOperator::Aggregate(agg) => {
254 let input_estimate = self.estimate_cardinality(&agg.input);
256 let estimate = if agg.group_by.is_empty() {
257 1.0 } else {
259 (input_estimate * 0.1).max(1.0) };
261 let id = format!("aggregate_{depth}");
262 ctx.set_estimate(&id, estimate);
263
264 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
265 }
266 LogicalOperator::Distinct(distinct) => {
267 let input_estimate = self.estimate_cardinality(&distinct.input);
268 let estimate = (input_estimate * 0.5).max(1.0);
269 let id = format!("distinct_{depth}");
270 ctx.set_estimate(&id, estimate);
271
272 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
273 }
274 LogicalOperator::Return(ret) => {
275 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
276 }
277 LogicalOperator::Limit(limit) => {
278 let input_estimate = self.estimate_cardinality(&limit.input);
279 let estimate = (input_estimate).min(limit.count as f64);
280 let id = format!("limit_{depth}");
281 ctx.set_estimate(&id, estimate);
282
283 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
284 }
285 LogicalOperator::Skip(skip) => {
286 let input_estimate = self.estimate_cardinality(&skip.input);
287 let estimate = (input_estimate - skip.count as f64).max(0.0);
288 let id = format!("skip_{depth}");
289 ctx.set_estimate(&id, estimate);
290
291 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
292 }
293 LogicalOperator::Sort(sort) => {
294 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
296 }
297 LogicalOperator::Union(union) => {
298 let estimate: f64 = union
299 .inputs
300 .iter()
301 .map(|input| self.estimate_cardinality(input))
302 .sum();
303 let id = format!("union_{depth}");
304 ctx.set_estimate(&id, estimate);
305
306 for input in &union.inputs {
307 self.collect_cardinality_estimates(input, ctx, depth + 1);
308 }
309 }
310 _ => {
311 }
313 }
314 }
315
316 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
318 match op {
319 LogicalOperator::NodeScan(scan) => {
320 if let Some(label) = &scan.label {
321 self.store.nodes_by_label(label).len() as f64
322 } else {
323 self.store.node_count() as f64
324 }
325 }
326 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
327 LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
328 LogicalOperator::Join(join) => {
329 let left = self.estimate_cardinality(&join.left);
330 let right = self.estimate_cardinality(&join.right);
331 (left * right).sqrt()
332 }
333 LogicalOperator::Aggregate(agg) => {
334 if agg.group_by.is_empty() {
335 1.0
336 } else {
337 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
338 }
339 }
340 LogicalOperator::Distinct(distinct) => {
341 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
342 }
343 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
344 LogicalOperator::Limit(limit) => self
345 .estimate_cardinality(&limit.input)
346 .min(limit.count as f64),
347 LogicalOperator::Skip(skip) => {
348 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
349 }
350 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
351 LogicalOperator::Union(union) => union
352 .inputs
353 .iter()
354 .map(|input| self.estimate_cardinality(input))
355 .sum(),
356 _ => 1000.0, }
358 }
359
360 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
362 match op {
363 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
364 LogicalOperator::Expand(expand) => {
365 if self.factorized_execution {
367 let (chain_len, _base) = Self::count_expand_chain(op);
368 if chain_len >= 2 {
369 return self.plan_expand_chain(op);
371 }
372 }
373 self.plan_expand(expand)
374 }
375 LogicalOperator::Return(ret) => self.plan_return(ret),
376 LogicalOperator::Filter(filter) => self.plan_filter(filter),
377 LogicalOperator::Project(project) => self.plan_project(project),
378 LogicalOperator::Limit(limit) => self.plan_limit(limit),
379 LogicalOperator::Skip(skip) => self.plan_skip(skip),
380 LogicalOperator::Sort(sort) => self.plan_sort(sort),
381 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
382 LogicalOperator::Join(join) => self.plan_join(join),
383 LogicalOperator::Union(union) => self.plan_union(union),
384 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
385 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
386 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
387 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
388 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
389 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
390 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
391 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
392 LogicalOperator::Merge(merge) => self.plan_merge(merge),
393 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
394 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
395 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
396 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
397 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
398 _ => Err(Error::Internal(format!(
399 "Unsupported operator: {:?}",
400 std::mem::discriminant(op)
401 ))),
402 }
403 }
404
405 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
407 let scan_op = if let Some(label) = &scan.label {
408 ScanOperator::with_label(Arc::clone(&self.store), label)
409 } else {
410 ScanOperator::new(Arc::clone(&self.store))
411 };
412
413 let scan_operator: Box<dyn Operator> =
415 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
416
417 if let Some(input) = &scan.input {
419 let (input_op, mut input_columns) = self.plan_operator(input)?;
420
421 let mut output_schema: Vec<LogicalType> =
423 input_columns.iter().map(|_| LogicalType::Any).collect();
424 output_schema.push(LogicalType::Node);
425
426 input_columns.push(scan.variable.clone());
428
429 let join_op = Box::new(NestedLoopJoinOperator::new(
431 input_op,
432 scan_operator,
433 None, PhysicalJoinType::Cross,
435 output_schema,
436 ));
437
438 Ok((join_op, input_columns))
439 } else {
440 let columns = vec![scan.variable.clone()];
441 Ok((scan_operator, columns))
442 }
443 }
444
445 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
447 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
449
450 let source_column = input_columns
452 .iter()
453 .position(|c| c == &expand.from_variable)
454 .ok_or_else(|| {
455 Error::Internal(format!(
456 "Source variable '{}' not found in input columns",
457 expand.from_variable
458 ))
459 })?;
460
461 let direction = match expand.direction {
463 ExpandDirection::Outgoing => Direction::Outgoing,
464 ExpandDirection::Incoming => Direction::Incoming,
465 ExpandDirection::Both => Direction::Both,
466 };
467
468 let is_variable_length =
470 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
471
472 let operator: Box<dyn Operator> = if is_variable_length {
473 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
476 Arc::clone(&self.store),
477 input_op,
478 source_column,
479 direction,
480 expand.edge_type.clone(),
481 expand.min_hops,
482 max_hops,
483 )
484 .with_tx_context(self.viewing_epoch, self.tx_id);
485
486 if expand.path_alias.is_some() {
488 expand_op = expand_op.with_path_length_output();
489 }
490
491 Box::new(expand_op)
492 } else {
493 let expand_op = ExpandOperator::new(
495 Arc::clone(&self.store),
496 input_op,
497 source_column,
498 direction,
499 expand.edge_type.clone(),
500 )
501 .with_tx_context(self.viewing_epoch, self.tx_id);
502 Box::new(expand_op)
503 };
504
505 let mut columns = input_columns;
508
509 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
511 let count = self.anon_edge_counter.get();
512 self.anon_edge_counter.set(count + 1);
513 format!("_anon_edge_{}", count)
514 });
515 columns.push(edge_col_name);
516
517 columns.push(expand.to_variable.clone());
518
519 if let Some(ref path_alias) = expand.path_alias {
521 columns.push(format!("_path_length_{}", path_alias));
522 }
523
524 Ok((operator, columns))
525 }
526
527 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
535 let expands = Self::collect_expand_chain(op);
536 if expands.is_empty() {
537 return Err(Error::Internal("Empty expand chain".to_string()));
538 }
539
540 let first_expand = expands[0];
542 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
543
544 let mut columns = base_columns.clone();
545 let mut steps = Vec::new();
546
547 let mut is_first = true;
552
553 for expand in &expands {
554 let source_column = if is_first {
556 base_columns
558 .iter()
559 .position(|c| c == &expand.from_variable)
560 .ok_or_else(|| {
561 Error::Internal(format!(
562 "Source variable '{}' not found in base columns",
563 expand.from_variable
564 ))
565 })?
566 } else {
567 1
570 };
571
572 let direction = match expand.direction {
574 ExpandDirection::Outgoing => Direction::Outgoing,
575 ExpandDirection::Incoming => Direction::Incoming,
576 ExpandDirection::Both => Direction::Both,
577 };
578
579 steps.push(ExpandStep {
581 source_column,
582 direction,
583 edge_type: expand.edge_type.clone(),
584 });
585
586 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
588 let count = self.anon_edge_counter.get();
589 self.anon_edge_counter.set(count + 1);
590 format!("_anon_edge_{}", count)
591 });
592 columns.push(edge_col_name);
593 columns.push(expand.to_variable.clone());
594
595 is_first = false;
596 }
597
598 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
600
601 if let Some(tx_id) = self.tx_id {
602 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
603 } else {
604 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
605 }
606
607 Ok((Box::new(lazy_op), columns))
608 }
609
610 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
612 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
614
615 let variable_columns: HashMap<String, usize> = input_columns
617 .iter()
618 .enumerate()
619 .map(|(i, name)| (name.clone(), i))
620 .collect();
621
622 let columns: Vec<String> = ret
624 .items
625 .iter()
626 .map(|item| {
627 item.alias.clone().unwrap_or_else(|| {
628 expression_to_string(&item.expression)
630 })
631 })
632 .collect();
633
634 let needs_project = ret
636 .items
637 .iter()
638 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
639
640 if needs_project {
641 let mut projections = Vec::with_capacity(ret.items.len());
643 let mut output_types = Vec::with_capacity(ret.items.len());
644
645 for item in &ret.items {
646 match &item.expression {
647 LogicalExpression::Variable(name) => {
648 let col_idx = *variable_columns.get(name).ok_or_else(|| {
649 Error::Internal(format!("Variable '{}' not found in input", name))
650 })?;
651 projections.push(ProjectExpr::Column(col_idx));
652 output_types.push(LogicalType::Node);
654 }
655 LogicalExpression::Property { variable, property } => {
656 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
657 Error::Internal(format!("Variable '{}' not found in input", variable))
658 })?;
659 projections.push(ProjectExpr::PropertyAccess {
660 column: col_idx,
661 property: property.clone(),
662 });
663 output_types.push(LogicalType::Any);
665 }
666 LogicalExpression::Literal(value) => {
667 projections.push(ProjectExpr::Constant(value.clone()));
668 output_types.push(value_to_logical_type(value));
669 }
670 LogicalExpression::FunctionCall { name, args, .. } => {
671 match name.to_lowercase().as_str() {
673 "type" => {
674 if args.len() != 1 {
676 return Err(Error::Internal(
677 "type() requires exactly one argument".to_string(),
678 ));
679 }
680 if let LogicalExpression::Variable(var_name) = &args[0] {
681 let col_idx =
682 *variable_columns.get(var_name).ok_or_else(|| {
683 Error::Internal(format!(
684 "Variable '{}' not found in input",
685 var_name
686 ))
687 })?;
688 projections.push(ProjectExpr::EdgeType { column: col_idx });
689 output_types.push(LogicalType::String);
690 } else {
691 return Err(Error::Internal(
692 "type() argument must be a variable".to_string(),
693 ));
694 }
695 }
696 "length" => {
697 if args.len() != 1 {
700 return Err(Error::Internal(
701 "length() requires exactly one argument".to_string(),
702 ));
703 }
704 if let LogicalExpression::Variable(var_name) = &args[0] {
705 let col_idx =
706 *variable_columns.get(var_name).ok_or_else(|| {
707 Error::Internal(format!(
708 "Variable '{}' not found in input",
709 var_name
710 ))
711 })?;
712 projections.push(ProjectExpr::Column(col_idx));
714 output_types.push(LogicalType::Int64);
715 } else {
716 return Err(Error::Internal(
717 "length() argument must be a variable".to_string(),
718 ));
719 }
720 }
721 _ => {
723 let filter_expr = self.convert_expression(&item.expression)?;
724 projections.push(ProjectExpr::Expression {
725 expr: filter_expr,
726 variable_columns: variable_columns.clone(),
727 });
728 output_types.push(LogicalType::Any);
729 }
730 }
731 }
732 LogicalExpression::Case { .. } => {
733 let filter_expr = self.convert_expression(&item.expression)?;
735 projections.push(ProjectExpr::Expression {
736 expr: filter_expr,
737 variable_columns: variable_columns.clone(),
738 });
739 output_types.push(LogicalType::Any);
741 }
742 _ => {
743 return Err(Error::Internal(format!(
744 "Unsupported RETURN expression: {:?}",
745 item.expression
746 )));
747 }
748 }
749 }
750
751 let operator = Box::new(ProjectOperator::with_store(
752 input_op,
753 projections,
754 output_types,
755 Arc::clone(&self.store),
756 ));
757
758 Ok((operator, columns))
759 } else {
760 let mut projections = Vec::with_capacity(ret.items.len());
763 let mut output_types = Vec::with_capacity(ret.items.len());
764
765 for item in &ret.items {
766 if let LogicalExpression::Variable(name) = &item.expression {
767 let col_idx = *variable_columns.get(name).ok_or_else(|| {
768 Error::Internal(format!("Variable '{}' not found in input", name))
769 })?;
770 projections.push(ProjectExpr::Column(col_idx));
771 output_types.push(LogicalType::Node);
772 }
773 }
774
775 if projections.len() == input_columns.len()
777 && projections
778 .iter()
779 .enumerate()
780 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
781 {
782 Ok((input_op, columns))
784 } else {
785 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
786 Ok((operator, columns))
787 }
788 }
789 }
790
791 fn plan_project(
793 &self,
794 project: &crate::query::plan::ProjectOp,
795 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
796 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
798 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
799 let single_row_op: Box<dyn Operator> = Box::new(
801 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
802 );
803 (single_row_op, Vec::new())
804 } else {
805 self.plan_operator(&project.input)?
806 };
807
808 let variable_columns: HashMap<String, usize> = input_columns
810 .iter()
811 .enumerate()
812 .map(|(i, name)| (name.clone(), i))
813 .collect();
814
815 let mut projections = Vec::with_capacity(project.projections.len());
817 let mut output_types = Vec::with_capacity(project.projections.len());
818 let mut output_columns = Vec::with_capacity(project.projections.len());
819
820 for projection in &project.projections {
821 let col_name = projection
823 .alias
824 .clone()
825 .unwrap_or_else(|| expression_to_string(&projection.expression));
826 output_columns.push(col_name);
827
828 match &projection.expression {
829 LogicalExpression::Variable(name) => {
830 let col_idx = *variable_columns.get(name).ok_or_else(|| {
831 Error::Internal(format!("Variable '{}' not found in input", name))
832 })?;
833 projections.push(ProjectExpr::Column(col_idx));
834 output_types.push(LogicalType::Node);
835 }
836 LogicalExpression::Property { variable, property } => {
837 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
838 Error::Internal(format!("Variable '{}' not found in input", variable))
839 })?;
840 projections.push(ProjectExpr::PropertyAccess {
841 column: col_idx,
842 property: property.clone(),
843 });
844 output_types.push(LogicalType::Any);
845 }
846 LogicalExpression::Literal(value) => {
847 projections.push(ProjectExpr::Constant(value.clone()));
848 output_types.push(value_to_logical_type(value));
849 }
850 _ => {
851 let filter_expr = self.convert_expression(&projection.expression)?;
853 projections.push(ProjectExpr::Expression {
854 expr: filter_expr,
855 variable_columns: variable_columns.clone(),
856 });
857 output_types.push(LogicalType::Any);
858 }
859 }
860 }
861
862 let operator = Box::new(ProjectOperator::with_store(
863 input_op,
864 projections,
865 output_types,
866 Arc::clone(&self.store),
867 ));
868
869 Ok((operator, output_columns))
870 }
871
872 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
878 if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
881 let (_, columns) = self.plan_operator(&filter.input)?;
883 let schema = self.derive_schema_from_columns(&columns);
884 let empty_op = Box::new(EmptyOperator::new(schema));
885 return Ok((empty_op, columns));
886 }
887
888 if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
890 return Ok(result);
891 }
892
893 let (input_op, columns) = self.plan_operator(&filter.input)?;
895
896 let variable_columns: HashMap<String, usize> = columns
898 .iter()
899 .enumerate()
900 .map(|(i, name)| (name.clone(), i))
901 .collect();
902
903 let filter_expr = self.convert_expression(&filter.predicate)?;
905
906 let predicate =
908 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
909
910 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
912
913 Ok((operator, columns))
914 }
915
916 fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
923 use grafeo_core::graph::lpg::CompareOp;
924
925 match predicate {
926 LogicalExpression::Binary { left, op, right } => {
927 match op {
929 BinaryOp::And => {
930 let left_result = self.check_zone_map_for_predicate(left);
931 let right_result = self.check_zone_map_for_predicate(right);
932
933 return match (left_result, right_result) {
934 (Some(false), _) | (_, Some(false)) => Some(false),
936 (Some(true), Some(true)) => Some(true),
938 _ => None,
940 };
941 }
942 BinaryOp::Or => {
943 let left_result = self.check_zone_map_for_predicate(left);
944 let right_result = self.check_zone_map_for_predicate(right);
945
946 return match (left_result, right_result) {
947 (Some(false), Some(false)) => Some(false),
949 (Some(true), _) | (_, Some(true)) => Some(true),
951 _ => None,
953 };
954 }
955 _ => {}
956 }
957
958 let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
960 (
961 LogicalExpression::Property { property, .. },
962 LogicalExpression::Literal(val),
963 ) => {
964 let cmp = match op {
965 BinaryOp::Eq => CompareOp::Eq,
966 BinaryOp::Ne => CompareOp::Ne,
967 BinaryOp::Lt => CompareOp::Lt,
968 BinaryOp::Le => CompareOp::Le,
969 BinaryOp::Gt => CompareOp::Gt,
970 BinaryOp::Ge => CompareOp::Ge,
971 _ => return None,
972 };
973 (property.clone(), cmp, val.clone())
974 }
975 (
976 LogicalExpression::Literal(val),
977 LogicalExpression::Property { property, .. },
978 ) => {
979 let cmp = match op {
981 BinaryOp::Eq => CompareOp::Eq,
982 BinaryOp::Ne => CompareOp::Ne,
983 BinaryOp::Lt => CompareOp::Gt, BinaryOp::Le => CompareOp::Ge,
985 BinaryOp::Gt => CompareOp::Lt,
986 BinaryOp::Ge => CompareOp::Le,
987 _ => return None,
988 };
989 (property.clone(), cmp, val.clone())
990 }
991 _ => return None,
992 };
993
994 let might_match =
996 self.store
997 .node_property_might_match(&property.into(), compare_op, &value);
998
999 Some(might_match)
1000 }
1001
1002 _ => None,
1003 }
1004 }
1005
1006 fn try_plan_filter_with_property_index(
1015 &self,
1016 filter: &FilterOp,
1017 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1018 let (scan_variable, scan_label) = match filter.input.as_ref() {
1020 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1021 (scan.variable.clone(), scan.label.clone())
1022 }
1023 _ => return Ok(None),
1024 };
1025
1026 let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1029
1030 if conditions.is_empty() {
1031 return Ok(None);
1032 }
1033
1034 let has_indexed_condition = conditions
1036 .iter()
1037 .any(|(prop, _)| self.store.has_property_index(prop));
1038
1039 if !has_indexed_condition {
1040 return Ok(None);
1041 }
1042
1043 let conditions_ref: Vec<(&str, Value)> = conditions
1045 .iter()
1046 .map(|(p, v)| (p.as_str(), v.clone()))
1047 .collect();
1048 let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1049
1050 if let Some(label) = &scan_label {
1052 let label_nodes: std::collections::HashSet<_> =
1053 self.store.nodes_by_label(label).into_iter().collect();
1054 matching_nodes.retain(|n| label_nodes.contains(n));
1055 }
1056
1057 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1059 let columns = vec![scan_variable];
1060
1061 Ok(Some((node_list_op, columns)))
1062 }
1063
1064 fn extract_equality_conditions(
1070 &self,
1071 predicate: &LogicalExpression,
1072 target_variable: &str,
1073 ) -> Vec<(String, Value)> {
1074 let mut conditions = Vec::new();
1075 self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1076 conditions
1077 }
1078
1079 fn collect_equality_conditions(
1081 &self,
1082 expr: &LogicalExpression,
1083 target_variable: &str,
1084 conditions: &mut Vec<(String, Value)>,
1085 ) {
1086 match expr {
1087 LogicalExpression::Binary {
1089 left,
1090 op: BinaryOp::And,
1091 right,
1092 } => {
1093 self.collect_equality_conditions(left, target_variable, conditions);
1094 self.collect_equality_conditions(right, target_variable, conditions);
1095 }
1096
1097 LogicalExpression::Binary {
1099 left,
1100 op: BinaryOp::Eq,
1101 right,
1102 } => {
1103 if let Some((var, prop, val)) = self.extract_property_equality(left, right) {
1104 if var == target_variable {
1105 conditions.push((prop, val));
1106 }
1107 }
1108 }
1109
1110 _ => {}
1111 }
1112 }
1113
1114 fn extract_property_equality(
1116 &self,
1117 left: &LogicalExpression,
1118 right: &LogicalExpression,
1119 ) -> Option<(String, String, Value)> {
1120 match (left, right) {
1121 (
1122 LogicalExpression::Property { variable, property },
1123 LogicalExpression::Literal(val),
1124 ) => Some((variable.clone(), property.clone(), val.clone())),
1125 (
1126 LogicalExpression::Literal(val),
1127 LogicalExpression::Property { variable, property },
1128 ) => Some((variable.clone(), property.clone(), val.clone())),
1129 _ => None,
1130 }
1131 }
1132
1133 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1135 let (input_op, columns) = self.plan_operator(&limit.input)?;
1136 let output_schema = self.derive_schema_from_columns(&columns);
1137 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1138 Ok((operator, columns))
1139 }
1140
1141 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1143 let (input_op, columns) = self.plan_operator(&skip.input)?;
1144 let output_schema = self.derive_schema_from_columns(&columns);
1145 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1146 Ok((operator, columns))
1147 }
1148
1149 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1151 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1152
1153 let mut variable_columns: HashMap<String, usize> = input_columns
1155 .iter()
1156 .enumerate()
1157 .map(|(i, name)| (name.clone(), i))
1158 .collect();
1159
1160 let mut property_projections: Vec<(String, String, String)> = Vec::new();
1162 let mut next_col_idx = input_columns.len();
1163
1164 for key in &sort.keys {
1165 if let LogicalExpression::Property { variable, property } = &key.expression {
1166 let col_name = format!("{}_{}", variable, property);
1167 if !variable_columns.contains_key(&col_name) {
1168 property_projections.push((
1169 variable.clone(),
1170 property.clone(),
1171 col_name.clone(),
1172 ));
1173 variable_columns.insert(col_name, next_col_idx);
1174 next_col_idx += 1;
1175 }
1176 }
1177 }
1178
1179 let mut output_columns = input_columns.clone();
1181
1182 if !property_projections.is_empty() {
1184 let mut projections = Vec::new();
1185 let mut output_types = Vec::new();
1186
1187 for (i, _) in input_columns.iter().enumerate() {
1190 projections.push(ProjectExpr::Column(i));
1191 output_types.push(LogicalType::Node);
1192 }
1193
1194 for (variable, property, col_name) in &property_projections {
1196 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1197 Error::Internal(format!(
1198 "Variable '{}' not found for ORDER BY property projection",
1199 variable
1200 ))
1201 })?;
1202 projections.push(ProjectExpr::PropertyAccess {
1203 column: source_col,
1204 property: property.clone(),
1205 });
1206 output_types.push(LogicalType::Any);
1207 output_columns.push(col_name.clone());
1208 }
1209
1210 input_op = Box::new(ProjectOperator::with_store(
1211 input_op,
1212 projections,
1213 output_types,
1214 Arc::clone(&self.store),
1215 ));
1216 }
1217
1218 let physical_keys: Vec<PhysicalSortKey> = sort
1220 .keys
1221 .iter()
1222 .map(|key| {
1223 let col_idx = self
1224 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1225 Ok(PhysicalSortKey {
1226 column: col_idx,
1227 direction: match key.order {
1228 SortOrder::Ascending => SortDirection::Ascending,
1229 SortOrder::Descending => SortDirection::Descending,
1230 },
1231 null_order: NullOrder::NullsLast,
1232 })
1233 })
1234 .collect::<Result<Vec<_>>>()?;
1235
1236 let output_schema = self.derive_schema_from_columns(&output_columns);
1237 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1238 Ok((operator, output_columns))
1239 }
1240
1241 fn resolve_sort_expression_with_properties(
1243 &self,
1244 expr: &LogicalExpression,
1245 variable_columns: &HashMap<String, usize>,
1246 ) -> Result<usize> {
1247 match expr {
1248 LogicalExpression::Variable(name) => {
1249 variable_columns.get(name).copied().ok_or_else(|| {
1250 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1251 })
1252 }
1253 LogicalExpression::Property { variable, property } => {
1254 let col_name = format!("{}_{}", variable, property);
1256 variable_columns.get(&col_name).copied().ok_or_else(|| {
1257 Error::Internal(format!(
1258 "Property column '{}' not found for ORDER BY (from {}.{})",
1259 col_name, variable, property
1260 ))
1261 })
1262 }
1263 _ => Err(Error::Internal(format!(
1264 "Unsupported ORDER BY expression: {:?}",
1265 expr
1266 ))),
1267 }
1268 }
1269
1270 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1272 columns.iter().map(|_| LogicalType::Any).collect()
1273 }
1274
1275 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1277 if self.factorized_execution
1284 && agg.group_by.is_empty()
1285 && Self::count_expand_chain(&agg.input).0 >= 2
1286 && self.is_simple_aggregate(agg)
1287 {
1288 if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1289 return Ok((op, cols));
1290 }
1291 }
1293
1294 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1295
1296 let mut variable_columns: HashMap<String, usize> = input_columns
1298 .iter()
1299 .enumerate()
1300 .map(|(i, name)| (name.clone(), i))
1301 .collect();
1302
1303 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1306
1307 for expr in &agg.group_by {
1309 if let LogicalExpression::Property { variable, property } = expr {
1310 let col_name = format!("{}_{}", variable, property);
1311 if !variable_columns.contains_key(&col_name) {
1312 property_projections.push((
1313 variable.clone(),
1314 property.clone(),
1315 col_name.clone(),
1316 ));
1317 variable_columns.insert(col_name, next_col_idx);
1318 next_col_idx += 1;
1319 }
1320 }
1321 }
1322
1323 for agg_expr in &agg.aggregates {
1325 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1326 let col_name = format!("{}_{}", variable, property);
1327 if !variable_columns.contains_key(&col_name) {
1328 property_projections.push((
1329 variable.clone(),
1330 property.clone(),
1331 col_name.clone(),
1332 ));
1333 variable_columns.insert(col_name, next_col_idx);
1334 next_col_idx += 1;
1335 }
1336 }
1337 }
1338
1339 if !property_projections.is_empty() {
1341 let mut projections = Vec::new();
1342 let mut output_types = Vec::new();
1343
1344 for (i, _) in input_columns.iter().enumerate() {
1347 projections.push(ProjectExpr::Column(i));
1348 output_types.push(LogicalType::Node);
1349 }
1350
1351 for (variable, property, _col_name) in &property_projections {
1353 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1354 Error::Internal(format!(
1355 "Variable '{}' not found for property projection",
1356 variable
1357 ))
1358 })?;
1359 projections.push(ProjectExpr::PropertyAccess {
1360 column: source_col,
1361 property: property.clone(),
1362 });
1363 output_types.push(LogicalType::Any); }
1365
1366 input_op = Box::new(ProjectOperator::with_store(
1367 input_op,
1368 projections,
1369 output_types,
1370 Arc::clone(&self.store),
1371 ));
1372 }
1373
1374 let group_columns: Vec<usize> = agg
1376 .group_by
1377 .iter()
1378 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1379 .collect::<Result<Vec<_>>>()?;
1380
1381 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1383 .aggregates
1384 .iter()
1385 .map(|agg_expr| {
1386 let column = agg_expr
1387 .expression
1388 .as_ref()
1389 .map(|e| {
1390 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1391 })
1392 .transpose()?;
1393
1394 Ok(PhysicalAggregateExpr {
1395 function: convert_aggregate_function(agg_expr.function),
1396 column,
1397 distinct: agg_expr.distinct,
1398 alias: agg_expr.alias.clone(),
1399 percentile: agg_expr.percentile,
1400 })
1401 })
1402 .collect::<Result<Vec<_>>>()?;
1403
1404 let mut output_schema = Vec::new();
1406 let mut output_columns = Vec::new();
1407
1408 for expr in &agg.group_by {
1410 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1412 }
1413
1414 for agg_expr in &agg.aggregates {
1416 let result_type = match agg_expr.function {
1417 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1418 LogicalType::Int64
1419 }
1420 LogicalAggregateFunction::Sum => LogicalType::Int64,
1421 LogicalAggregateFunction::Avg => LogicalType::Float64,
1422 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1423 LogicalType::Int64
1427 }
1428 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1431 | LogicalAggregateFunction::StdDevPop
1432 | LogicalAggregateFunction::PercentileDisc
1433 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1434 };
1435 output_schema.push(result_type);
1436 output_columns.push(
1437 agg_expr
1438 .alias
1439 .clone()
1440 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1441 );
1442 }
1443
1444 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1446 Box::new(SimpleAggregateOperator::new(
1447 input_op,
1448 physical_aggregates,
1449 output_schema,
1450 ))
1451 } else {
1452 Box::new(HashAggregateOperator::new(
1453 input_op,
1454 group_columns,
1455 physical_aggregates,
1456 output_schema,
1457 ))
1458 };
1459
1460 if let Some(having_expr) = &agg.having {
1462 let having_var_columns: HashMap<String, usize> = output_columns
1464 .iter()
1465 .enumerate()
1466 .map(|(i, name)| (name.clone(), i))
1467 .collect();
1468
1469 let filter_expr = self.convert_expression(having_expr)?;
1470 let predicate =
1471 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1472 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1473 }
1474
1475 Ok((operator, output_columns))
1476 }
1477
1478 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1484 agg.aggregates.iter().all(|agg_expr| {
1485 match agg_expr.function {
1486 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1487 agg_expr.expression.is_none()
1489 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1490 }
1491 LogicalAggregateFunction::Sum
1492 | LogicalAggregateFunction::Avg
1493 | LogicalAggregateFunction::Min
1494 | LogicalAggregateFunction::Max => {
1495 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1498 }
1499 _ => false,
1501 }
1502 })
1503 }
1504
1505 fn plan_factorized_aggregate(
1509 &self,
1510 agg: &AggregateOp,
1511 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1512 let expands = Self::collect_expand_chain(&agg.input);
1514 if expands.is_empty() {
1515 return Err(Error::Internal(
1516 "Expected expand chain for factorized aggregate".to_string(),
1517 ));
1518 }
1519
1520 let first_expand = expands[0];
1522 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1523
1524 let mut columns = base_columns.clone();
1525 let mut steps = Vec::new();
1526 let mut is_first = true;
1527
1528 for expand in &expands {
1529 let source_column = if is_first {
1531 base_columns
1532 .iter()
1533 .position(|c| c == &expand.from_variable)
1534 .ok_or_else(|| {
1535 Error::Internal(format!(
1536 "Source variable '{}' not found in base columns",
1537 expand.from_variable
1538 ))
1539 })?
1540 } else {
1541 1 };
1543
1544 let direction = match expand.direction {
1545 ExpandDirection::Outgoing => Direction::Outgoing,
1546 ExpandDirection::Incoming => Direction::Incoming,
1547 ExpandDirection::Both => Direction::Both,
1548 };
1549
1550 steps.push(ExpandStep {
1551 source_column,
1552 direction,
1553 edge_type: expand.edge_type.clone(),
1554 });
1555
1556 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1557 let count = self.anon_edge_counter.get();
1558 self.anon_edge_counter.set(count + 1);
1559 format!("_anon_edge_{}", count)
1560 });
1561 columns.push(edge_col_name);
1562 columns.push(expand.to_variable.clone());
1563
1564 is_first = false;
1565 }
1566
1567 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1569
1570 if let Some(tx_id) = self.tx_id {
1571 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1572 } else {
1573 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1574 }
1575
1576 let factorized_aggs: Vec<FactorizedAggregate> = agg
1578 .aggregates
1579 .iter()
1580 .map(|agg_expr| {
1581 match agg_expr.function {
1582 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1583 if agg_expr.expression.is_none() {
1585 FactorizedAggregate::count()
1586 } else {
1587 FactorizedAggregate::count_column(1) }
1591 }
1592 LogicalAggregateFunction::Sum => {
1593 FactorizedAggregate::sum(1)
1595 }
1596 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1597 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1598 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1599 _ => {
1600 FactorizedAggregate::count()
1602 }
1603 }
1604 })
1605 .collect();
1606
1607 let output_columns: Vec<String> = agg
1609 .aggregates
1610 .iter()
1611 .map(|agg_expr| {
1612 agg_expr
1613 .alias
1614 .clone()
1615 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1616 })
1617 .collect();
1618
1619 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1621
1622 Ok((Box::new(factorized_agg_op), output_columns))
1623 }
1624
1625 #[allow(dead_code)]
1627 fn resolve_expression_to_column(
1628 &self,
1629 expr: &LogicalExpression,
1630 variable_columns: &HashMap<String, usize>,
1631 ) -> Result<usize> {
1632 match expr {
1633 LogicalExpression::Variable(name) => variable_columns
1634 .get(name)
1635 .copied()
1636 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1637 LogicalExpression::Property { variable, .. } => variable_columns
1638 .get(variable)
1639 .copied()
1640 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1641 _ => Err(Error::Internal(format!(
1642 "Cannot resolve expression to column: {:?}",
1643 expr
1644 ))),
1645 }
1646 }
1647
1648 fn resolve_expression_to_column_with_properties(
1652 &self,
1653 expr: &LogicalExpression,
1654 variable_columns: &HashMap<String, usize>,
1655 ) -> Result<usize> {
1656 match expr {
1657 LogicalExpression::Variable(name) => variable_columns
1658 .get(name)
1659 .copied()
1660 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1661 LogicalExpression::Property { variable, property } => {
1662 let col_name = format!("{}_{}", variable, property);
1664 variable_columns.get(&col_name).copied().ok_or_else(|| {
1665 Error::Internal(format!(
1666 "Property column '{}' not found (from {}.{})",
1667 col_name, variable, property
1668 ))
1669 })
1670 }
1671 _ => Err(Error::Internal(format!(
1672 "Cannot resolve expression to column: {:?}",
1673 expr
1674 ))),
1675 }
1676 }
1677
1678 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1680 match expr {
1681 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1682 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1683 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1684 variable: variable.clone(),
1685 property: property.clone(),
1686 }),
1687 LogicalExpression::Binary { left, op, right } => {
1688 let left_expr = self.convert_expression(left)?;
1689 let right_expr = self.convert_expression(right)?;
1690 let filter_op = convert_binary_op(*op)?;
1691 Ok(FilterExpression::Binary {
1692 left: Box::new(left_expr),
1693 op: filter_op,
1694 right: Box::new(right_expr),
1695 })
1696 }
1697 LogicalExpression::Unary { op, operand } => {
1698 let operand_expr = self.convert_expression(operand)?;
1699 let filter_op = convert_unary_op(*op)?;
1700 Ok(FilterExpression::Unary {
1701 op: filter_op,
1702 operand: Box::new(operand_expr),
1703 })
1704 }
1705 LogicalExpression::FunctionCall { name, args, .. } => {
1706 let filter_args: Vec<FilterExpression> = args
1707 .iter()
1708 .map(|a| self.convert_expression(a))
1709 .collect::<Result<Vec<_>>>()?;
1710 Ok(FilterExpression::FunctionCall {
1711 name: name.clone(),
1712 args: filter_args,
1713 })
1714 }
1715 LogicalExpression::Case {
1716 operand,
1717 when_clauses,
1718 else_clause,
1719 } => {
1720 let filter_operand = operand
1721 .as_ref()
1722 .map(|e| self.convert_expression(e))
1723 .transpose()?
1724 .map(Box::new);
1725 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1726 .iter()
1727 .map(|(cond, result)| {
1728 Ok((
1729 self.convert_expression(cond)?,
1730 self.convert_expression(result)?,
1731 ))
1732 })
1733 .collect::<Result<Vec<_>>>()?;
1734 let filter_else = else_clause
1735 .as_ref()
1736 .map(|e| self.convert_expression(e))
1737 .transpose()?
1738 .map(Box::new);
1739 Ok(FilterExpression::Case {
1740 operand: filter_operand,
1741 when_clauses: filter_when_clauses,
1742 else_clause: filter_else,
1743 })
1744 }
1745 LogicalExpression::List(items) => {
1746 let filter_items: Vec<FilterExpression> = items
1747 .iter()
1748 .map(|item| self.convert_expression(item))
1749 .collect::<Result<Vec<_>>>()?;
1750 Ok(FilterExpression::List(filter_items))
1751 }
1752 LogicalExpression::Map(pairs) => {
1753 let filter_pairs: Vec<(String, FilterExpression)> = pairs
1754 .iter()
1755 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1756 .collect::<Result<Vec<_>>>()?;
1757 Ok(FilterExpression::Map(filter_pairs))
1758 }
1759 LogicalExpression::IndexAccess { base, index } => {
1760 let base_expr = self.convert_expression(base)?;
1761 let index_expr = self.convert_expression(index)?;
1762 Ok(FilterExpression::IndexAccess {
1763 base: Box::new(base_expr),
1764 index: Box::new(index_expr),
1765 })
1766 }
1767 LogicalExpression::SliceAccess { base, start, end } => {
1768 let base_expr = self.convert_expression(base)?;
1769 let start_expr = start
1770 .as_ref()
1771 .map(|s| self.convert_expression(s))
1772 .transpose()?
1773 .map(Box::new);
1774 let end_expr = end
1775 .as_ref()
1776 .map(|e| self.convert_expression(e))
1777 .transpose()?
1778 .map(Box::new);
1779 Ok(FilterExpression::SliceAccess {
1780 base: Box::new(base_expr),
1781 start: start_expr,
1782 end: end_expr,
1783 })
1784 }
1785 LogicalExpression::Parameter(_) => Err(Error::Internal(
1786 "Parameters not yet supported in filters".to_string(),
1787 )),
1788 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1789 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1790 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1791 LogicalExpression::ListComprehension {
1792 variable,
1793 list_expr,
1794 filter_expr,
1795 map_expr,
1796 } => {
1797 let list = self.convert_expression(list_expr)?;
1798 let filter = filter_expr
1799 .as_ref()
1800 .map(|f| self.convert_expression(f))
1801 .transpose()?
1802 .map(Box::new);
1803 let map = self.convert_expression(map_expr)?;
1804 Ok(FilterExpression::ListComprehension {
1805 variable: variable.clone(),
1806 list_expr: Box::new(list),
1807 filter_expr: filter,
1808 map_expr: Box::new(map),
1809 })
1810 }
1811 LogicalExpression::ExistsSubquery(subplan) => {
1812 let (start_var, direction, edge_type, end_labels) =
1815 self.extract_exists_pattern(subplan)?;
1816
1817 Ok(FilterExpression::ExistsSubquery {
1818 start_var,
1819 direction,
1820 edge_type,
1821 end_labels,
1822 min_hops: None,
1823 max_hops: None,
1824 })
1825 }
1826 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
1827 "COUNT subqueries not yet supported".to_string(),
1828 )),
1829 }
1830 }
1831
1832 fn extract_exists_pattern(
1835 &self,
1836 subplan: &LogicalOperator,
1837 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
1838 match subplan {
1839 LogicalOperator::Expand(expand) => {
1840 let end_labels = self.extract_end_labels_from_expand(expand);
1842 let direction = match expand.direction {
1843 ExpandDirection::Outgoing => Direction::Outgoing,
1844 ExpandDirection::Incoming => Direction::Incoming,
1845 ExpandDirection::Both => Direction::Both,
1846 };
1847 Ok((
1848 expand.from_variable.clone(),
1849 direction,
1850 expand.edge_type.clone(),
1851 end_labels,
1852 ))
1853 }
1854 LogicalOperator::NodeScan(scan) => {
1855 if let Some(input) = &scan.input {
1856 self.extract_exists_pattern(input)
1857 } else {
1858 Err(Error::Internal(
1859 "EXISTS subquery must contain an edge pattern".to_string(),
1860 ))
1861 }
1862 }
1863 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
1864 _ => Err(Error::Internal(
1865 "Unsupported EXISTS subquery pattern".to_string(),
1866 )),
1867 }
1868 }
1869
1870 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
1872 match expand.input.as_ref() {
1874 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
1875 _ => None,
1876 }
1877 }
1878
1879 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1881 let (left_op, left_columns) = self.plan_operator(&join.left)?;
1882 let (right_op, right_columns) = self.plan_operator(&join.right)?;
1883
1884 let mut columns = left_columns.clone();
1886 columns.extend(right_columns.clone());
1887
1888 let physical_join_type = match join.join_type {
1890 JoinType::Inner => PhysicalJoinType::Inner,
1891 JoinType::Left => PhysicalJoinType::Left,
1892 JoinType::Right => PhysicalJoinType::Right,
1893 JoinType::Full => PhysicalJoinType::Full,
1894 JoinType::Cross => PhysicalJoinType::Cross,
1895 JoinType::Semi => PhysicalJoinType::Semi,
1896 JoinType::Anti => PhysicalJoinType::Anti,
1897 };
1898
1899 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
1901 (vec![], vec![])
1903 } else {
1904 join.conditions
1905 .iter()
1906 .filter_map(|cond| {
1907 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
1909 let right_idx = self
1910 .expression_to_column(&cond.right, &right_columns)
1911 .ok()?;
1912 Some((left_idx, right_idx))
1913 })
1914 .unzip()
1915 };
1916
1917 let output_schema = self.derive_schema_from_columns(&columns);
1918
1919 let _ = LeapfrogJoinOperator::new; let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1927 left_op,
1928 right_op,
1929 probe_keys,
1930 build_keys,
1931 physical_join_type,
1932 output_schema,
1933 ));
1934
1935 Ok((operator, columns))
1936 }
1937
1938 #[allow(dead_code)]
1947 fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
1948 let mut edges: HashMap<String, Vec<String>> = HashMap::new();
1950 let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
1951
1952 Self::collect_join_edges(
1954 &LogicalOperator::Join(join.clone()),
1955 &mut edges,
1956 &mut all_vars,
1957 );
1958
1959 if all_vars.len() < 3 {
1961 return false;
1962 }
1963
1964 Self::has_cycle(&edges, &all_vars)
1966 }
1967
1968 fn collect_join_edges(
1970 op: &LogicalOperator,
1971 edges: &mut HashMap<String, Vec<String>>,
1972 vars: &mut std::collections::HashSet<String>,
1973 ) {
1974 match op {
1975 LogicalOperator::Join(join) => {
1976 for cond in &join.conditions {
1978 if let (Some(left_var), Some(right_var)) = (
1979 Self::extract_join_variable(&cond.left),
1980 Self::extract_join_variable(&cond.right),
1981 ) {
1982 if left_var != right_var {
1983 vars.insert(left_var.clone());
1984 vars.insert(right_var.clone());
1985
1986 edges
1988 .entry(left_var.clone())
1989 .or_default()
1990 .push(right_var.clone());
1991 edges.entry(right_var).or_default().push(left_var);
1992 }
1993 }
1994 }
1995
1996 Self::collect_join_edges(&join.left, edges, vars);
1998 Self::collect_join_edges(&join.right, edges, vars);
1999 }
2000 LogicalOperator::Expand(expand) => {
2001 vars.insert(expand.from_variable.clone());
2003 vars.insert(expand.to_variable.clone());
2004
2005 edges
2006 .entry(expand.from_variable.clone())
2007 .or_default()
2008 .push(expand.to_variable.clone());
2009 edges
2010 .entry(expand.to_variable.clone())
2011 .or_default()
2012 .push(expand.from_variable.clone());
2013
2014 Self::collect_join_edges(&expand.input, edges, vars);
2015 }
2016 LogicalOperator::Filter(filter) => {
2017 Self::collect_join_edges(&filter.input, edges, vars);
2018 }
2019 LogicalOperator::NodeScan(scan) => {
2020 vars.insert(scan.variable.clone());
2021 }
2022 _ => {}
2023 }
2024 }
2025
2026 fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2028 match expr {
2029 LogicalExpression::Variable(v) => Some(v.clone()),
2030 LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2031 LogicalExpression::Id(v) => Some(v.clone()),
2032 _ => None,
2033 }
2034 }
2035
2036 fn has_cycle(
2040 edges: &HashMap<String, Vec<String>>,
2041 vars: &std::collections::HashSet<String>,
2042 ) -> bool {
2043 let mut color: HashMap<&String, u8> = HashMap::new();
2044
2045 for var in vars {
2046 color.insert(var, 0);
2047 }
2048
2049 for start in vars {
2050 if color[start] == 0 {
2051 if Self::dfs_cycle(start, None, edges, &mut color) {
2052 return true;
2053 }
2054 }
2055 }
2056
2057 false
2058 }
2059
2060 fn dfs_cycle(
2062 node: &String,
2063 parent: Option<&String>,
2064 edges: &HashMap<String, Vec<String>>,
2065 color: &mut HashMap<&String, u8>,
2066 ) -> bool {
2067 *color.get_mut(node).unwrap() = 1; if let Some(neighbors) = edges.get(node) {
2070 for neighbor in neighbors {
2071 if parent == Some(neighbor) {
2073 continue;
2074 }
2075
2076 if let Some(&c) = color.get(neighbor) {
2077 if c == 1 {
2078 return true;
2080 }
2081 if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2082 return true;
2083 }
2084 }
2085 }
2086 }
2087
2088 *color.get_mut(node).unwrap() = 2; false
2090 }
2091
2092 #[allow(dead_code)]
2094 fn count_relations(op: &LogicalOperator) -> usize {
2095 match op {
2096 LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2097 LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2098 LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2099 LogicalOperator::Join(j) => {
2100 Self::count_relations(&j.left) + Self::count_relations(&j.right)
2101 }
2102 _ => 0,
2103 }
2104 }
2105
2106 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2108 match expr {
2109 LogicalExpression::Variable(name) => columns
2110 .iter()
2111 .position(|c| c == name)
2112 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2113 _ => Err(Error::Internal(
2114 "Only variables supported in join conditions".to_string(),
2115 )),
2116 }
2117 }
2118
2119 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2121 if union.inputs.is_empty() {
2122 return Err(Error::Internal(
2123 "Union requires at least one input".to_string(),
2124 ));
2125 }
2126
2127 let mut inputs = Vec::with_capacity(union.inputs.len());
2128 let mut columns = Vec::new();
2129
2130 for (i, input) in union.inputs.iter().enumerate() {
2131 let (op, cols) = self.plan_operator(input)?;
2132 if i == 0 {
2133 columns = cols;
2134 }
2135 inputs.push(op);
2136 }
2137
2138 let output_schema = self.derive_schema_from_columns(&columns);
2139 let operator = Box::new(UnionOperator::new(inputs, output_schema));
2140
2141 Ok((operator, columns))
2142 }
2143
2144 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2146 let (input_op, columns) = self.plan_operator(&distinct.input)?;
2147 let output_schema = self.derive_schema_from_columns(&columns);
2148 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2149 Ok((operator, columns))
2150 }
2151
2152 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2154 let (input_op, mut columns) = if let Some(ref input) = create.input {
2156 let (op, cols) = self.plan_operator(input)?;
2157 (Some(op), cols)
2158 } else {
2159 (None, vec![])
2160 };
2161
2162 let output_column = columns.len();
2164 columns.push(create.variable.clone());
2165
2166 let properties: Vec<(String, PropertySource)> = create
2168 .properties
2169 .iter()
2170 .map(|(name, expr)| {
2171 let source = match expr {
2172 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2173 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2174 };
2175 (name.clone(), source)
2176 })
2177 .collect();
2178
2179 let output_schema = self.derive_schema_from_columns(&columns);
2180
2181 let operator = Box::new(
2182 CreateNodeOperator::new(
2183 Arc::clone(&self.store),
2184 input_op,
2185 create.labels.clone(),
2186 properties,
2187 output_schema,
2188 output_column,
2189 )
2190 .with_tx_context(self.viewing_epoch, self.tx_id),
2191 );
2192
2193 Ok((operator, columns))
2194 }
2195
2196 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2198 let (input_op, mut columns) = self.plan_operator(&create.input)?;
2199
2200 let from_column = columns
2202 .iter()
2203 .position(|c| c == &create.from_variable)
2204 .ok_or_else(|| {
2205 Error::Internal(format!(
2206 "Source variable '{}' not found",
2207 create.from_variable
2208 ))
2209 })?;
2210
2211 let to_column = columns
2212 .iter()
2213 .position(|c| c == &create.to_variable)
2214 .ok_or_else(|| {
2215 Error::Internal(format!(
2216 "Target variable '{}' not found",
2217 create.to_variable
2218 ))
2219 })?;
2220
2221 let output_column = create.variable.as_ref().map(|v| {
2223 let idx = columns.len();
2224 columns.push(v.clone());
2225 idx
2226 });
2227
2228 let properties: Vec<(String, PropertySource)> = create
2230 .properties
2231 .iter()
2232 .map(|(name, expr)| {
2233 let source = match expr {
2234 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2235 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2236 };
2237 (name.clone(), source)
2238 })
2239 .collect();
2240
2241 let output_schema = self.derive_schema_from_columns(&columns);
2242
2243 let operator = Box::new(
2244 CreateEdgeOperator::new(
2245 Arc::clone(&self.store),
2246 input_op,
2247 from_column,
2248 to_column,
2249 create.edge_type.clone(),
2250 properties,
2251 output_schema,
2252 output_column,
2253 )
2254 .with_tx_context(self.viewing_epoch, self.tx_id),
2255 );
2256
2257 Ok((operator, columns))
2258 }
2259
2260 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2262 let (input_op, columns) = self.plan_operator(&delete.input)?;
2263
2264 let node_column = columns
2265 .iter()
2266 .position(|c| c == &delete.variable)
2267 .ok_or_else(|| {
2268 Error::Internal(format!(
2269 "Variable '{}' not found for delete",
2270 delete.variable
2271 ))
2272 })?;
2273
2274 let output_schema = vec![LogicalType::Int64];
2276 let output_columns = vec!["deleted_count".to_string()];
2277
2278 let operator = Box::new(
2279 DeleteNodeOperator::new(
2280 Arc::clone(&self.store),
2281 input_op,
2282 node_column,
2283 output_schema,
2284 delete.detach, )
2286 .with_tx_context(self.viewing_epoch, self.tx_id),
2287 );
2288
2289 Ok((operator, output_columns))
2290 }
2291
2292 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2294 let (input_op, columns) = self.plan_operator(&delete.input)?;
2295
2296 let edge_column = columns
2297 .iter()
2298 .position(|c| c == &delete.variable)
2299 .ok_or_else(|| {
2300 Error::Internal(format!(
2301 "Variable '{}' not found for delete",
2302 delete.variable
2303 ))
2304 })?;
2305
2306 let output_schema = vec![LogicalType::Int64];
2308 let output_columns = vec!["deleted_count".to_string()];
2309
2310 let operator = Box::new(
2311 DeleteEdgeOperator::new(
2312 Arc::clone(&self.store),
2313 input_op,
2314 edge_column,
2315 output_schema,
2316 )
2317 .with_tx_context(self.viewing_epoch, self.tx_id),
2318 );
2319
2320 Ok((operator, output_columns))
2321 }
2322
2323 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2325 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2326 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2327
2328 let mut columns = left_columns.clone();
2330 columns.extend(right_columns.clone());
2331
2332 let mut probe_keys = Vec::new();
2334 let mut build_keys = Vec::new();
2335
2336 for (right_idx, right_col) in right_columns.iter().enumerate() {
2337 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2338 probe_keys.push(left_idx);
2339 build_keys.push(right_idx);
2340 }
2341 }
2342
2343 let output_schema = self.derive_schema_from_columns(&columns);
2344
2345 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2346 left_op,
2347 right_op,
2348 probe_keys,
2349 build_keys,
2350 PhysicalJoinType::Left,
2351 output_schema,
2352 ));
2353
2354 Ok((operator, columns))
2355 }
2356
2357 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2359 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2360 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2361
2362 let columns = left_columns.clone();
2364
2365 let mut probe_keys = Vec::new();
2367 let mut build_keys = Vec::new();
2368
2369 for (right_idx, right_col) in right_columns.iter().enumerate() {
2370 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2371 probe_keys.push(left_idx);
2372 build_keys.push(right_idx);
2373 }
2374 }
2375
2376 let output_schema = self.derive_schema_from_columns(&columns);
2377
2378 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2379 left_op,
2380 right_op,
2381 probe_keys,
2382 build_keys,
2383 PhysicalJoinType::Anti,
2384 output_schema,
2385 ));
2386
2387 Ok((operator, columns))
2388 }
2389
2390 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2392 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2395 if matches!(&*unwind.input, LogicalOperator::Empty) {
2396 let literal_list = self.convert_expression(&unwind.expression)?;
2401
2402 let single_row_op: Box<dyn Operator> = Box::new(
2404 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2405 );
2406 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2407 single_row_op,
2408 vec![ProjectExpr::Expression {
2409 expr: literal_list,
2410 variable_columns: HashMap::new(),
2411 }],
2412 vec![LogicalType::Any],
2413 Arc::clone(&self.store),
2414 ));
2415
2416 (project_op, vec!["__list__".to_string()])
2417 } else {
2418 self.plan_operator(&unwind.input)?
2419 };
2420
2421 let list_col_idx = match &unwind.expression {
2427 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2428 LogicalExpression::Property { variable, .. } => {
2429 input_columns.iter().position(|c| c == variable)
2432 }
2433 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2434 None
2436 }
2437 _ => None,
2438 };
2439
2440 let mut columns = input_columns.clone();
2442 columns.push(unwind.variable.clone());
2443
2444 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2446 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2451
2452 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2453 input_op,
2454 col_idx,
2455 unwind.variable.clone(),
2456 output_schema,
2457 ));
2458
2459 Ok((operator, columns))
2460 }
2461
2462 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2464 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2466 Vec::new()
2467 } else {
2468 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2469 cols
2470 };
2471
2472 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2474 .match_properties
2475 .iter()
2476 .filter_map(|(name, expr)| {
2477 if let LogicalExpression::Literal(v) = expr {
2478 Some((name.clone(), v.clone()))
2479 } else {
2480 None }
2482 })
2483 .collect();
2484
2485 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2487 .on_create
2488 .iter()
2489 .filter_map(|(name, expr)| {
2490 if let LogicalExpression::Literal(v) = expr {
2491 Some((name.clone(), v.clone()))
2492 } else {
2493 None
2494 }
2495 })
2496 .collect();
2497
2498 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2500 .on_match
2501 .iter()
2502 .filter_map(|(name, expr)| {
2503 if let LogicalExpression::Literal(v) = expr {
2504 Some((name.clone(), v.clone()))
2505 } else {
2506 None
2507 }
2508 })
2509 .collect();
2510
2511 columns.push(merge.variable.clone());
2513
2514 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2515 Arc::clone(&self.store),
2516 merge.variable.clone(),
2517 merge.labels.clone(),
2518 match_properties,
2519 on_create_properties,
2520 on_match_properties,
2521 ));
2522
2523 Ok((operator, columns))
2524 }
2525
2526 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2528 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2530
2531 let source_column = columns
2533 .iter()
2534 .position(|c| c == &sp.source_var)
2535 .ok_or_else(|| {
2536 Error::Internal(format!(
2537 "Source variable '{}' not found for shortestPath",
2538 sp.source_var
2539 ))
2540 })?;
2541
2542 let target_column = columns
2543 .iter()
2544 .position(|c| c == &sp.target_var)
2545 .ok_or_else(|| {
2546 Error::Internal(format!(
2547 "Target variable '{}' not found for shortestPath",
2548 sp.target_var
2549 ))
2550 })?;
2551
2552 let direction = match sp.direction {
2554 ExpandDirection::Outgoing => Direction::Outgoing,
2555 ExpandDirection::Incoming => Direction::Incoming,
2556 ExpandDirection::Both => Direction::Both,
2557 };
2558
2559 let operator: Box<dyn Operator> = Box::new(
2561 ShortestPathOperator::new(
2562 Arc::clone(&self.store),
2563 input_op,
2564 source_column,
2565 target_column,
2566 sp.edge_type.clone(),
2567 direction,
2568 )
2569 .with_all_paths(sp.all_paths),
2570 );
2571
2572 columns.push(format!("_path_length_{}", sp.path_alias));
2575
2576 Ok((operator, columns))
2577 }
2578
2579 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2581 let (input_op, columns) = self.plan_operator(&add_label.input)?;
2582
2583 let node_column = columns
2585 .iter()
2586 .position(|c| c == &add_label.variable)
2587 .ok_or_else(|| {
2588 Error::Internal(format!(
2589 "Variable '{}' not found for ADD LABEL",
2590 add_label.variable
2591 ))
2592 })?;
2593
2594 let output_schema = vec![LogicalType::Int64];
2596 let output_columns = vec!["labels_added".to_string()];
2597
2598 let operator = Box::new(AddLabelOperator::new(
2599 Arc::clone(&self.store),
2600 input_op,
2601 node_column,
2602 add_label.labels.clone(),
2603 output_schema,
2604 ));
2605
2606 Ok((operator, output_columns))
2607 }
2608
2609 fn plan_remove_label(
2611 &self,
2612 remove_label: &RemoveLabelOp,
2613 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2614 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2615
2616 let node_column = columns
2618 .iter()
2619 .position(|c| c == &remove_label.variable)
2620 .ok_or_else(|| {
2621 Error::Internal(format!(
2622 "Variable '{}' not found for REMOVE LABEL",
2623 remove_label.variable
2624 ))
2625 })?;
2626
2627 let output_schema = vec![LogicalType::Int64];
2629 let output_columns = vec!["labels_removed".to_string()];
2630
2631 let operator = Box::new(RemoveLabelOperator::new(
2632 Arc::clone(&self.store),
2633 input_op,
2634 node_column,
2635 remove_label.labels.clone(),
2636 output_schema,
2637 ));
2638
2639 Ok((operator, output_columns))
2640 }
2641
2642 fn plan_set_property(
2644 &self,
2645 set_prop: &SetPropertyOp,
2646 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2647 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2648
2649 let entity_column = columns
2651 .iter()
2652 .position(|c| c == &set_prop.variable)
2653 .ok_or_else(|| {
2654 Error::Internal(format!(
2655 "Variable '{}' not found for SET",
2656 set_prop.variable
2657 ))
2658 })?;
2659
2660 let properties: Vec<(String, PropertySource)> = set_prop
2662 .properties
2663 .iter()
2664 .map(|(name, expr)| {
2665 let source = self.expression_to_property_source(expr, &columns)?;
2666 Ok((name.clone(), source))
2667 })
2668 .collect::<Result<Vec<_>>>()?;
2669
2670 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2672 let output_columns = columns.clone();
2673
2674 let operator = Box::new(SetPropertyOperator::new_for_node(
2676 Arc::clone(&self.store),
2677 input_op,
2678 entity_column,
2679 properties,
2680 output_schema,
2681 ));
2682
2683 Ok((operator, output_columns))
2684 }
2685
2686 fn expression_to_property_source(
2688 &self,
2689 expr: &LogicalExpression,
2690 columns: &[String],
2691 ) -> Result<PropertySource> {
2692 match expr {
2693 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2694 LogicalExpression::Variable(name) => {
2695 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2696 Error::Internal(format!("Variable '{}' not found for property source", name))
2697 })?;
2698 Ok(PropertySource::Column(col_idx))
2699 }
2700 LogicalExpression::Parameter(name) => {
2701 Ok(PropertySource::Constant(
2704 grafeo_common::types::Value::String(format!("${}", name).into()),
2705 ))
2706 }
2707 _ => Err(Error::Internal(format!(
2708 "Unsupported expression type for property source: {:?}",
2709 expr
2710 ))),
2711 }
2712 }
2713}
2714
2715pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2717 match op {
2718 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2719 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2720 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2721 BinaryOp::Le => Ok(BinaryFilterOp::Le),
2722 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2723 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2724 BinaryOp::And => Ok(BinaryFilterOp::And),
2725 BinaryOp::Or => Ok(BinaryFilterOp::Or),
2726 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2727 BinaryOp::Add => Ok(BinaryFilterOp::Add),
2728 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2729 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2730 BinaryOp::Div => Ok(BinaryFilterOp::Div),
2731 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2732 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2733 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2734 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2735 BinaryOp::In => Ok(BinaryFilterOp::In),
2736 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2737 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2738 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2739 "Binary operator {:?} not yet supported in filters",
2740 op
2741 ))),
2742 }
2743}
2744
2745pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2747 match op {
2748 UnaryOp::Not => Ok(UnaryFilterOp::Not),
2749 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2750 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2751 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2752 }
2753}
2754
2755pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2757 match func {
2758 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2759 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2760 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2761 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2762 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2763 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2764 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2765 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2766 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2767 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2768 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2769 }
2770}
2771
2772pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2776 match expr {
2777 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2778 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2779 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2780 variable: variable.clone(),
2781 property: property.clone(),
2782 }),
2783 LogicalExpression::Binary { left, op, right } => {
2784 let left_expr = convert_filter_expression(left)?;
2785 let right_expr = convert_filter_expression(right)?;
2786 let filter_op = convert_binary_op(*op)?;
2787 Ok(FilterExpression::Binary {
2788 left: Box::new(left_expr),
2789 op: filter_op,
2790 right: Box::new(right_expr),
2791 })
2792 }
2793 LogicalExpression::Unary { op, operand } => {
2794 let operand_expr = convert_filter_expression(operand)?;
2795 let filter_op = convert_unary_op(*op)?;
2796 Ok(FilterExpression::Unary {
2797 op: filter_op,
2798 operand: Box::new(operand_expr),
2799 })
2800 }
2801 LogicalExpression::FunctionCall { name, args, .. } => {
2802 let filter_args: Vec<FilterExpression> = args
2803 .iter()
2804 .map(|a| convert_filter_expression(a))
2805 .collect::<Result<Vec<_>>>()?;
2806 Ok(FilterExpression::FunctionCall {
2807 name: name.clone(),
2808 args: filter_args,
2809 })
2810 }
2811 LogicalExpression::Case {
2812 operand,
2813 when_clauses,
2814 else_clause,
2815 } => {
2816 let filter_operand = operand
2817 .as_ref()
2818 .map(|e| convert_filter_expression(e))
2819 .transpose()?
2820 .map(Box::new);
2821 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2822 .iter()
2823 .map(|(cond, result)| {
2824 Ok((
2825 convert_filter_expression(cond)?,
2826 convert_filter_expression(result)?,
2827 ))
2828 })
2829 .collect::<Result<Vec<_>>>()?;
2830 let filter_else = else_clause
2831 .as_ref()
2832 .map(|e| convert_filter_expression(e))
2833 .transpose()?
2834 .map(Box::new);
2835 Ok(FilterExpression::Case {
2836 operand: filter_operand,
2837 when_clauses: filter_when_clauses,
2838 else_clause: filter_else,
2839 })
2840 }
2841 LogicalExpression::List(items) => {
2842 let filter_items: Vec<FilterExpression> = items
2843 .iter()
2844 .map(|item| convert_filter_expression(item))
2845 .collect::<Result<Vec<_>>>()?;
2846 Ok(FilterExpression::List(filter_items))
2847 }
2848 LogicalExpression::Map(pairs) => {
2849 let filter_pairs: Vec<(String, FilterExpression)> = pairs
2850 .iter()
2851 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
2852 .collect::<Result<Vec<_>>>()?;
2853 Ok(FilterExpression::Map(filter_pairs))
2854 }
2855 LogicalExpression::IndexAccess { base, index } => {
2856 let base_expr = convert_filter_expression(base)?;
2857 let index_expr = convert_filter_expression(index)?;
2858 Ok(FilterExpression::IndexAccess {
2859 base: Box::new(base_expr),
2860 index: Box::new(index_expr),
2861 })
2862 }
2863 LogicalExpression::SliceAccess { base, start, end } => {
2864 let base_expr = convert_filter_expression(base)?;
2865 let start_expr = start
2866 .as_ref()
2867 .map(|s| convert_filter_expression(s))
2868 .transpose()?
2869 .map(Box::new);
2870 let end_expr = end
2871 .as_ref()
2872 .map(|e| convert_filter_expression(e))
2873 .transpose()?
2874 .map(Box::new);
2875 Ok(FilterExpression::SliceAccess {
2876 base: Box::new(base_expr),
2877 start: start_expr,
2878 end: end_expr,
2879 })
2880 }
2881 LogicalExpression::Parameter(_) => Err(Error::Internal(
2882 "Parameters not yet supported in filters".to_string(),
2883 )),
2884 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2885 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2886 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2887 LogicalExpression::ListComprehension {
2888 variable,
2889 list_expr,
2890 filter_expr,
2891 map_expr,
2892 } => {
2893 let list = convert_filter_expression(list_expr)?;
2894 let filter = filter_expr
2895 .as_ref()
2896 .map(|f| convert_filter_expression(f))
2897 .transpose()?
2898 .map(Box::new);
2899 let map = convert_filter_expression(map_expr)?;
2900 Ok(FilterExpression::ListComprehension {
2901 variable: variable.clone(),
2902 list_expr: Box::new(list),
2903 filter_expr: filter,
2904 map_expr: Box::new(map),
2905 })
2906 }
2907 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
2908 Error::Internal("Subqueries not yet supported in filters".to_string()),
2909 ),
2910 }
2911}
2912
2913fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
2915 use grafeo_common::types::Value;
2916 match value {
2917 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
2919 Value::Int64(_) => LogicalType::Int64,
2920 Value::Float64(_) => LogicalType::Float64,
2921 Value::String(_) => LogicalType::String,
2922 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
2924 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, }
2927}
2928
2929fn expression_to_string(expr: &LogicalExpression) -> String {
2931 match expr {
2932 LogicalExpression::Variable(name) => name.clone(),
2933 LogicalExpression::Property { variable, property } => {
2934 format!("{variable}.{property}")
2935 }
2936 LogicalExpression::Literal(value) => format!("{value:?}"),
2937 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
2938 _ => "expr".to_string(),
2939 }
2940}
2941
2942pub struct PhysicalPlan {
2944 pub operator: Box<dyn Operator>,
2946 pub columns: Vec<String>,
2948 pub adaptive_context: Option<AdaptiveContext>,
2954}
2955
2956impl PhysicalPlan {
2957 #[must_use]
2959 pub fn columns(&self) -> &[String] {
2960 &self.columns
2961 }
2962
2963 pub fn into_operator(self) -> Box<dyn Operator> {
2965 self.operator
2966 }
2967
2968 #[must_use]
2970 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
2971 self.adaptive_context.as_ref()
2972 }
2973
2974 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
2976 self.adaptive_context.take()
2977 }
2978}
2979
2980#[allow(dead_code)]
2984struct SingleResultOperator {
2985 result: Option<grafeo_core::execution::DataChunk>,
2986}
2987
2988impl SingleResultOperator {
2989 #[allow(dead_code)]
2990 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
2991 Self { result }
2992 }
2993}
2994
2995impl Operator for SingleResultOperator {
2996 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
2997 Ok(self.result.take())
2998 }
2999
3000 fn reset(&mut self) {
3001 }
3003
3004 fn name(&self) -> &'static str {
3005 "SingleResult"
3006 }
3007}
3008
3009#[cfg(test)]
3010mod tests {
3011 use super::*;
3012 use crate::query::plan::{
3013 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3014 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3015 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3016 SortKey, SortOp,
3017 };
3018 use grafeo_common::types::Value;
3019
3020 fn create_test_store() -> Arc<LpgStore> {
3021 let store = Arc::new(LpgStore::new());
3022 store.create_node(&["Person"]);
3023 store.create_node(&["Person"]);
3024 store.create_node(&["Company"]);
3025 store
3026 }
3027
3028 #[test]
3031 fn test_plan_simple_scan() {
3032 let store = create_test_store();
3033 let planner = Planner::new(store);
3034
3035 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3037 items: vec![ReturnItem {
3038 expression: LogicalExpression::Variable("n".to_string()),
3039 alias: None,
3040 }],
3041 distinct: false,
3042 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3043 variable: "n".to_string(),
3044 label: Some("Person".to_string()),
3045 input: None,
3046 })),
3047 }));
3048
3049 let physical = planner.plan(&logical).unwrap();
3050 assert_eq!(physical.columns(), &["n"]);
3051 }
3052
3053 #[test]
3054 fn test_plan_scan_without_label() {
3055 let store = create_test_store();
3056 let planner = Planner::new(store);
3057
3058 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3060 items: vec![ReturnItem {
3061 expression: LogicalExpression::Variable("n".to_string()),
3062 alias: None,
3063 }],
3064 distinct: false,
3065 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3066 variable: "n".to_string(),
3067 label: None,
3068 input: None,
3069 })),
3070 }));
3071
3072 let physical = planner.plan(&logical).unwrap();
3073 assert_eq!(physical.columns(), &["n"]);
3074 }
3075
3076 #[test]
3077 fn test_plan_return_with_alias() {
3078 let store = create_test_store();
3079 let planner = Planner::new(store);
3080
3081 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3083 items: vec![ReturnItem {
3084 expression: LogicalExpression::Variable("n".to_string()),
3085 alias: Some("person".to_string()),
3086 }],
3087 distinct: false,
3088 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3089 variable: "n".to_string(),
3090 label: Some("Person".to_string()),
3091 input: None,
3092 })),
3093 }));
3094
3095 let physical = planner.plan(&logical).unwrap();
3096 assert_eq!(physical.columns(), &["person"]);
3097 }
3098
3099 #[test]
3100 fn test_plan_return_property() {
3101 let store = create_test_store();
3102 let planner = Planner::new(store);
3103
3104 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3106 items: vec![ReturnItem {
3107 expression: LogicalExpression::Property {
3108 variable: "n".to_string(),
3109 property: "name".to_string(),
3110 },
3111 alias: None,
3112 }],
3113 distinct: false,
3114 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3115 variable: "n".to_string(),
3116 label: Some("Person".to_string()),
3117 input: None,
3118 })),
3119 }));
3120
3121 let physical = planner.plan(&logical).unwrap();
3122 assert_eq!(physical.columns(), &["n.name"]);
3123 }
3124
3125 #[test]
3126 fn test_plan_return_literal() {
3127 let store = create_test_store();
3128 let planner = Planner::new(store);
3129
3130 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3132 items: vec![ReturnItem {
3133 expression: LogicalExpression::Literal(Value::Int64(42)),
3134 alias: Some("answer".to_string()),
3135 }],
3136 distinct: false,
3137 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3138 variable: "n".to_string(),
3139 label: None,
3140 input: None,
3141 })),
3142 }));
3143
3144 let physical = planner.plan(&logical).unwrap();
3145 assert_eq!(physical.columns(), &["answer"]);
3146 }
3147
3148 #[test]
3151 fn test_plan_filter_equality() {
3152 let store = create_test_store();
3153 let planner = Planner::new(store);
3154
3155 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3157 items: vec![ReturnItem {
3158 expression: LogicalExpression::Variable("n".to_string()),
3159 alias: None,
3160 }],
3161 distinct: false,
3162 input: Box::new(LogicalOperator::Filter(FilterOp {
3163 predicate: LogicalExpression::Binary {
3164 left: Box::new(LogicalExpression::Property {
3165 variable: "n".to_string(),
3166 property: "age".to_string(),
3167 }),
3168 op: BinaryOp::Eq,
3169 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3170 },
3171 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3172 variable: "n".to_string(),
3173 label: Some("Person".to_string()),
3174 input: None,
3175 })),
3176 })),
3177 }));
3178
3179 let physical = planner.plan(&logical).unwrap();
3180 assert_eq!(physical.columns(), &["n"]);
3181 }
3182
3183 #[test]
3184 fn test_plan_filter_compound_and() {
3185 let store = create_test_store();
3186 let planner = Planner::new(store);
3187
3188 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3190 items: vec![ReturnItem {
3191 expression: LogicalExpression::Variable("n".to_string()),
3192 alias: None,
3193 }],
3194 distinct: false,
3195 input: Box::new(LogicalOperator::Filter(FilterOp {
3196 predicate: LogicalExpression::Binary {
3197 left: Box::new(LogicalExpression::Binary {
3198 left: Box::new(LogicalExpression::Property {
3199 variable: "n".to_string(),
3200 property: "age".to_string(),
3201 }),
3202 op: BinaryOp::Gt,
3203 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3204 }),
3205 op: BinaryOp::And,
3206 right: Box::new(LogicalExpression::Binary {
3207 left: Box::new(LogicalExpression::Property {
3208 variable: "n".to_string(),
3209 property: "age".to_string(),
3210 }),
3211 op: BinaryOp::Lt,
3212 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3213 }),
3214 },
3215 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3216 variable: "n".to_string(),
3217 label: None,
3218 input: None,
3219 })),
3220 })),
3221 }));
3222
3223 let physical = planner.plan(&logical).unwrap();
3224 assert_eq!(physical.columns(), &["n"]);
3225 }
3226
3227 #[test]
3228 fn test_plan_filter_unary_not() {
3229 let store = create_test_store();
3230 let planner = Planner::new(store);
3231
3232 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3234 items: vec![ReturnItem {
3235 expression: LogicalExpression::Variable("n".to_string()),
3236 alias: None,
3237 }],
3238 distinct: false,
3239 input: Box::new(LogicalOperator::Filter(FilterOp {
3240 predicate: LogicalExpression::Unary {
3241 op: UnaryOp::Not,
3242 operand: Box::new(LogicalExpression::Property {
3243 variable: "n".to_string(),
3244 property: "active".to_string(),
3245 }),
3246 },
3247 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3248 variable: "n".to_string(),
3249 label: None,
3250 input: None,
3251 })),
3252 })),
3253 }));
3254
3255 let physical = planner.plan(&logical).unwrap();
3256 assert_eq!(physical.columns(), &["n"]);
3257 }
3258
3259 #[test]
3260 fn test_plan_filter_is_null() {
3261 let store = create_test_store();
3262 let planner = Planner::new(store);
3263
3264 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3266 items: vec![ReturnItem {
3267 expression: LogicalExpression::Variable("n".to_string()),
3268 alias: None,
3269 }],
3270 distinct: false,
3271 input: Box::new(LogicalOperator::Filter(FilterOp {
3272 predicate: LogicalExpression::Unary {
3273 op: UnaryOp::IsNull,
3274 operand: Box::new(LogicalExpression::Property {
3275 variable: "n".to_string(),
3276 property: "email".to_string(),
3277 }),
3278 },
3279 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3280 variable: "n".to_string(),
3281 label: None,
3282 input: None,
3283 })),
3284 })),
3285 }));
3286
3287 let physical = planner.plan(&logical).unwrap();
3288 assert_eq!(physical.columns(), &["n"]);
3289 }
3290
3291 #[test]
3292 fn test_plan_filter_function_call() {
3293 let store = create_test_store();
3294 let planner = Planner::new(store);
3295
3296 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3298 items: vec![ReturnItem {
3299 expression: LogicalExpression::Variable("n".to_string()),
3300 alias: None,
3301 }],
3302 distinct: false,
3303 input: Box::new(LogicalOperator::Filter(FilterOp {
3304 predicate: LogicalExpression::Binary {
3305 left: Box::new(LogicalExpression::FunctionCall {
3306 name: "size".to_string(),
3307 args: vec![LogicalExpression::Property {
3308 variable: "n".to_string(),
3309 property: "friends".to_string(),
3310 }],
3311 distinct: false,
3312 }),
3313 op: BinaryOp::Gt,
3314 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3315 },
3316 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3317 variable: "n".to_string(),
3318 label: None,
3319 input: None,
3320 })),
3321 })),
3322 }));
3323
3324 let physical = planner.plan(&logical).unwrap();
3325 assert_eq!(physical.columns(), &["n"]);
3326 }
3327
3328 #[test]
3331 fn test_plan_expand_outgoing() {
3332 let store = create_test_store();
3333 let planner = Planner::new(store);
3334
3335 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3337 items: vec![
3338 ReturnItem {
3339 expression: LogicalExpression::Variable("a".to_string()),
3340 alias: None,
3341 },
3342 ReturnItem {
3343 expression: LogicalExpression::Variable("b".to_string()),
3344 alias: None,
3345 },
3346 ],
3347 distinct: false,
3348 input: Box::new(LogicalOperator::Expand(ExpandOp {
3349 from_variable: "a".to_string(),
3350 to_variable: "b".to_string(),
3351 edge_variable: None,
3352 direction: ExpandDirection::Outgoing,
3353 edge_type: Some("KNOWS".to_string()),
3354 min_hops: 1,
3355 max_hops: Some(1),
3356 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3357 variable: "a".to_string(),
3358 label: Some("Person".to_string()),
3359 input: None,
3360 })),
3361 path_alias: None,
3362 })),
3363 }));
3364
3365 let physical = planner.plan(&logical).unwrap();
3366 assert!(physical.columns().contains(&"a".to_string()));
3368 assert!(physical.columns().contains(&"b".to_string()));
3369 }
3370
3371 #[test]
3372 fn test_plan_expand_with_edge_variable() {
3373 let store = create_test_store();
3374 let planner = Planner::new(store);
3375
3376 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3378 items: vec![
3379 ReturnItem {
3380 expression: LogicalExpression::Variable("a".to_string()),
3381 alias: None,
3382 },
3383 ReturnItem {
3384 expression: LogicalExpression::Variable("r".to_string()),
3385 alias: None,
3386 },
3387 ReturnItem {
3388 expression: LogicalExpression::Variable("b".to_string()),
3389 alias: None,
3390 },
3391 ],
3392 distinct: false,
3393 input: Box::new(LogicalOperator::Expand(ExpandOp {
3394 from_variable: "a".to_string(),
3395 to_variable: "b".to_string(),
3396 edge_variable: Some("r".to_string()),
3397 direction: ExpandDirection::Outgoing,
3398 edge_type: Some("KNOWS".to_string()),
3399 min_hops: 1,
3400 max_hops: Some(1),
3401 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3402 variable: "a".to_string(),
3403 label: None,
3404 input: None,
3405 })),
3406 path_alias: None,
3407 })),
3408 }));
3409
3410 let physical = planner.plan(&logical).unwrap();
3411 assert!(physical.columns().contains(&"a".to_string()));
3412 assert!(physical.columns().contains(&"r".to_string()));
3413 assert!(physical.columns().contains(&"b".to_string()));
3414 }
3415
3416 #[test]
3419 fn test_plan_limit() {
3420 let store = create_test_store();
3421 let planner = Planner::new(store);
3422
3423 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3425 items: vec![ReturnItem {
3426 expression: LogicalExpression::Variable("n".to_string()),
3427 alias: None,
3428 }],
3429 distinct: false,
3430 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3431 count: 10,
3432 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3433 variable: "n".to_string(),
3434 label: None,
3435 input: None,
3436 })),
3437 })),
3438 }));
3439
3440 let physical = planner.plan(&logical).unwrap();
3441 assert_eq!(physical.columns(), &["n"]);
3442 }
3443
3444 #[test]
3445 fn test_plan_skip() {
3446 let store = create_test_store();
3447 let planner = Planner::new(store);
3448
3449 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3451 items: vec![ReturnItem {
3452 expression: LogicalExpression::Variable("n".to_string()),
3453 alias: None,
3454 }],
3455 distinct: false,
3456 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3457 count: 5,
3458 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3459 variable: "n".to_string(),
3460 label: None,
3461 input: None,
3462 })),
3463 })),
3464 }));
3465
3466 let physical = planner.plan(&logical).unwrap();
3467 assert_eq!(physical.columns(), &["n"]);
3468 }
3469
3470 #[test]
3471 fn test_plan_sort() {
3472 let store = create_test_store();
3473 let planner = Planner::new(store);
3474
3475 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3477 items: vec![ReturnItem {
3478 expression: LogicalExpression::Variable("n".to_string()),
3479 alias: None,
3480 }],
3481 distinct: false,
3482 input: Box::new(LogicalOperator::Sort(SortOp {
3483 keys: vec![SortKey {
3484 expression: LogicalExpression::Variable("n".to_string()),
3485 order: SortOrder::Ascending,
3486 }],
3487 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3488 variable: "n".to_string(),
3489 label: None,
3490 input: None,
3491 })),
3492 })),
3493 }));
3494
3495 let physical = planner.plan(&logical).unwrap();
3496 assert_eq!(physical.columns(), &["n"]);
3497 }
3498
3499 #[test]
3500 fn test_plan_sort_descending() {
3501 let store = create_test_store();
3502 let planner = Planner::new(store);
3503
3504 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3506 items: vec![ReturnItem {
3507 expression: LogicalExpression::Variable("n".to_string()),
3508 alias: None,
3509 }],
3510 distinct: false,
3511 input: Box::new(LogicalOperator::Sort(SortOp {
3512 keys: vec![SortKey {
3513 expression: LogicalExpression::Variable("n".to_string()),
3514 order: SortOrder::Descending,
3515 }],
3516 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3517 variable: "n".to_string(),
3518 label: None,
3519 input: None,
3520 })),
3521 })),
3522 }));
3523
3524 let physical = planner.plan(&logical).unwrap();
3525 assert_eq!(physical.columns(), &["n"]);
3526 }
3527
3528 #[test]
3529 fn test_plan_distinct() {
3530 let store = create_test_store();
3531 let planner = Planner::new(store);
3532
3533 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3535 items: vec![ReturnItem {
3536 expression: LogicalExpression::Variable("n".to_string()),
3537 alias: None,
3538 }],
3539 distinct: false,
3540 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3541 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3542 variable: "n".to_string(),
3543 label: None,
3544 input: None,
3545 })),
3546 columns: None,
3547 })),
3548 }));
3549
3550 let physical = planner.plan(&logical).unwrap();
3551 assert_eq!(physical.columns(), &["n"]);
3552 }
3553
3554 #[test]
3557 fn test_plan_aggregate_count() {
3558 let store = create_test_store();
3559 let planner = Planner::new(store);
3560
3561 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3563 items: vec![ReturnItem {
3564 expression: LogicalExpression::Variable("cnt".to_string()),
3565 alias: None,
3566 }],
3567 distinct: false,
3568 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3569 group_by: vec![],
3570 aggregates: vec![LogicalAggregateExpr {
3571 function: LogicalAggregateFunction::Count,
3572 expression: Some(LogicalExpression::Variable("n".to_string())),
3573 distinct: false,
3574 alias: Some("cnt".to_string()),
3575 percentile: None,
3576 }],
3577 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3578 variable: "n".to_string(),
3579 label: None,
3580 input: None,
3581 })),
3582 having: None,
3583 })),
3584 }));
3585
3586 let physical = planner.plan(&logical).unwrap();
3587 assert!(physical.columns().contains(&"cnt".to_string()));
3588 }
3589
3590 #[test]
3591 fn test_plan_aggregate_with_group_by() {
3592 let store = create_test_store();
3593 let planner = Planner::new(store);
3594
3595 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3597 group_by: vec![LogicalExpression::Property {
3598 variable: "n".to_string(),
3599 property: "city".to_string(),
3600 }],
3601 aggregates: vec![LogicalAggregateExpr {
3602 function: LogicalAggregateFunction::Count,
3603 expression: Some(LogicalExpression::Variable("n".to_string())),
3604 distinct: false,
3605 alias: Some("cnt".to_string()),
3606 percentile: None,
3607 }],
3608 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3609 variable: "n".to_string(),
3610 label: Some("Person".to_string()),
3611 input: None,
3612 })),
3613 having: None,
3614 }));
3615
3616 let physical = planner.plan(&logical).unwrap();
3617 assert_eq!(physical.columns().len(), 2);
3618 }
3619
3620 #[test]
3621 fn test_plan_aggregate_sum() {
3622 let store = create_test_store();
3623 let planner = Planner::new(store);
3624
3625 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3627 group_by: vec![],
3628 aggregates: vec![LogicalAggregateExpr {
3629 function: LogicalAggregateFunction::Sum,
3630 expression: Some(LogicalExpression::Property {
3631 variable: "n".to_string(),
3632 property: "value".to_string(),
3633 }),
3634 distinct: false,
3635 alias: Some("total".to_string()),
3636 percentile: None,
3637 }],
3638 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3639 variable: "n".to_string(),
3640 label: None,
3641 input: None,
3642 })),
3643 having: None,
3644 }));
3645
3646 let physical = planner.plan(&logical).unwrap();
3647 assert!(physical.columns().contains(&"total".to_string()));
3648 }
3649
3650 #[test]
3651 fn test_plan_aggregate_avg() {
3652 let store = create_test_store();
3653 let planner = Planner::new(store);
3654
3655 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3657 group_by: vec![],
3658 aggregates: vec![LogicalAggregateExpr {
3659 function: LogicalAggregateFunction::Avg,
3660 expression: Some(LogicalExpression::Property {
3661 variable: "n".to_string(),
3662 property: "score".to_string(),
3663 }),
3664 distinct: false,
3665 alias: Some("average".to_string()),
3666 percentile: None,
3667 }],
3668 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3669 variable: "n".to_string(),
3670 label: None,
3671 input: None,
3672 })),
3673 having: None,
3674 }));
3675
3676 let physical = planner.plan(&logical).unwrap();
3677 assert!(physical.columns().contains(&"average".to_string()));
3678 }
3679
3680 #[test]
3681 fn test_plan_aggregate_min_max() {
3682 let store = create_test_store();
3683 let planner = Planner::new(store);
3684
3685 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3687 group_by: vec![],
3688 aggregates: vec![
3689 LogicalAggregateExpr {
3690 function: LogicalAggregateFunction::Min,
3691 expression: Some(LogicalExpression::Property {
3692 variable: "n".to_string(),
3693 property: "age".to_string(),
3694 }),
3695 distinct: false,
3696 alias: Some("youngest".to_string()),
3697 percentile: None,
3698 },
3699 LogicalAggregateExpr {
3700 function: LogicalAggregateFunction::Max,
3701 expression: Some(LogicalExpression::Property {
3702 variable: "n".to_string(),
3703 property: "age".to_string(),
3704 }),
3705 distinct: false,
3706 alias: Some("oldest".to_string()),
3707 percentile: None,
3708 },
3709 ],
3710 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3711 variable: "n".to_string(),
3712 label: None,
3713 input: None,
3714 })),
3715 having: None,
3716 }));
3717
3718 let physical = planner.plan(&logical).unwrap();
3719 assert!(physical.columns().contains(&"youngest".to_string()));
3720 assert!(physical.columns().contains(&"oldest".to_string()));
3721 }
3722
3723 #[test]
3726 fn test_plan_inner_join() {
3727 let store = create_test_store();
3728 let planner = Planner::new(store);
3729
3730 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3732 items: vec![
3733 ReturnItem {
3734 expression: LogicalExpression::Variable("a".to_string()),
3735 alias: None,
3736 },
3737 ReturnItem {
3738 expression: LogicalExpression::Variable("b".to_string()),
3739 alias: None,
3740 },
3741 ],
3742 distinct: false,
3743 input: Box::new(LogicalOperator::Join(JoinOp {
3744 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3745 variable: "a".to_string(),
3746 label: Some("Person".to_string()),
3747 input: None,
3748 })),
3749 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3750 variable: "b".to_string(),
3751 label: Some("Company".to_string()),
3752 input: None,
3753 })),
3754 join_type: JoinType::Inner,
3755 conditions: vec![JoinCondition {
3756 left: LogicalExpression::Variable("a".to_string()),
3757 right: LogicalExpression::Variable("b".to_string()),
3758 }],
3759 })),
3760 }));
3761
3762 let physical = planner.plan(&logical).unwrap();
3763 assert!(physical.columns().contains(&"a".to_string()));
3764 assert!(physical.columns().contains(&"b".to_string()));
3765 }
3766
3767 #[test]
3768 fn test_plan_cross_join() {
3769 let store = create_test_store();
3770 let planner = Planner::new(store);
3771
3772 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3774 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3775 variable: "a".to_string(),
3776 label: None,
3777 input: None,
3778 })),
3779 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3780 variable: "b".to_string(),
3781 label: None,
3782 input: None,
3783 })),
3784 join_type: JoinType::Cross,
3785 conditions: vec![],
3786 }));
3787
3788 let physical = planner.plan(&logical).unwrap();
3789 assert_eq!(physical.columns().len(), 2);
3790 }
3791
3792 #[test]
3793 fn test_plan_left_join() {
3794 let store = create_test_store();
3795 let planner = Planner::new(store);
3796
3797 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3798 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3799 variable: "a".to_string(),
3800 label: None,
3801 input: None,
3802 })),
3803 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3804 variable: "b".to_string(),
3805 label: None,
3806 input: None,
3807 })),
3808 join_type: JoinType::Left,
3809 conditions: vec![],
3810 }));
3811
3812 let physical = planner.plan(&logical).unwrap();
3813 assert_eq!(physical.columns().len(), 2);
3814 }
3815
3816 #[test]
3819 fn test_plan_create_node() {
3820 let store = create_test_store();
3821 let planner = Planner::new(store);
3822
3823 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3825 variable: "n".to_string(),
3826 labels: vec!["Person".to_string()],
3827 properties: vec![(
3828 "name".to_string(),
3829 LogicalExpression::Literal(Value::String("Alice".into())),
3830 )],
3831 input: None,
3832 }));
3833
3834 let physical = planner.plan(&logical).unwrap();
3835 assert!(physical.columns().contains(&"n".to_string()));
3836 }
3837
3838 #[test]
3839 fn test_plan_create_edge() {
3840 let store = create_test_store();
3841 let planner = Planner::new(store);
3842
3843 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
3845 variable: Some("r".to_string()),
3846 from_variable: "a".to_string(),
3847 to_variable: "b".to_string(),
3848 edge_type: "KNOWS".to_string(),
3849 properties: vec![],
3850 input: Box::new(LogicalOperator::Join(JoinOp {
3851 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3852 variable: "a".to_string(),
3853 label: None,
3854 input: None,
3855 })),
3856 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3857 variable: "b".to_string(),
3858 label: None,
3859 input: None,
3860 })),
3861 join_type: JoinType::Cross,
3862 conditions: vec![],
3863 })),
3864 }));
3865
3866 let physical = planner.plan(&logical).unwrap();
3867 assert!(physical.columns().contains(&"r".to_string()));
3868 }
3869
3870 #[test]
3871 fn test_plan_delete_node() {
3872 let store = create_test_store();
3873 let planner = Planner::new(store);
3874
3875 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
3877 variable: "n".to_string(),
3878 detach: false,
3879 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3880 variable: "n".to_string(),
3881 label: None,
3882 input: None,
3883 })),
3884 }));
3885
3886 let physical = planner.plan(&logical).unwrap();
3887 assert!(physical.columns().contains(&"deleted_count".to_string()));
3888 }
3889
3890 #[test]
3893 fn test_plan_empty_errors() {
3894 let store = create_test_store();
3895 let planner = Planner::new(store);
3896
3897 let logical = LogicalPlan::new(LogicalOperator::Empty);
3898 let result = planner.plan(&logical);
3899 assert!(result.is_err());
3900 }
3901
3902 #[test]
3903 fn test_plan_missing_variable_in_return() {
3904 let store = create_test_store();
3905 let planner = Planner::new(store);
3906
3907 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3909 items: vec![ReturnItem {
3910 expression: LogicalExpression::Variable("missing".to_string()),
3911 alias: None,
3912 }],
3913 distinct: false,
3914 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3915 variable: "n".to_string(),
3916 label: None,
3917 input: None,
3918 })),
3919 }));
3920
3921 let result = planner.plan(&logical);
3922 assert!(result.is_err());
3923 }
3924
3925 #[test]
3928 fn test_convert_binary_ops() {
3929 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
3930 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
3931 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
3932 assert!(convert_binary_op(BinaryOp::Le).is_ok());
3933 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
3934 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
3935 assert!(convert_binary_op(BinaryOp::And).is_ok());
3936 assert!(convert_binary_op(BinaryOp::Or).is_ok());
3937 assert!(convert_binary_op(BinaryOp::Add).is_ok());
3938 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
3939 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
3940 assert!(convert_binary_op(BinaryOp::Div).is_ok());
3941 }
3942
3943 #[test]
3944 fn test_convert_unary_ops() {
3945 assert!(convert_unary_op(UnaryOp::Not).is_ok());
3946 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
3947 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
3948 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
3949 }
3950
3951 #[test]
3952 fn test_convert_aggregate_functions() {
3953 assert!(matches!(
3954 convert_aggregate_function(LogicalAggregateFunction::Count),
3955 PhysicalAggregateFunction::Count
3956 ));
3957 assert!(matches!(
3958 convert_aggregate_function(LogicalAggregateFunction::Sum),
3959 PhysicalAggregateFunction::Sum
3960 ));
3961 assert!(matches!(
3962 convert_aggregate_function(LogicalAggregateFunction::Avg),
3963 PhysicalAggregateFunction::Avg
3964 ));
3965 assert!(matches!(
3966 convert_aggregate_function(LogicalAggregateFunction::Min),
3967 PhysicalAggregateFunction::Min
3968 ));
3969 assert!(matches!(
3970 convert_aggregate_function(LogicalAggregateFunction::Max),
3971 PhysicalAggregateFunction::Max
3972 ));
3973 }
3974
3975 #[test]
3976 fn test_planner_accessors() {
3977 let store = create_test_store();
3978 let planner = Planner::new(Arc::clone(&store));
3979
3980 assert!(planner.tx_id().is_none());
3981 assert!(planner.tx_manager().is_none());
3982 let _ = planner.viewing_epoch(); }
3984
3985 #[test]
3986 fn test_physical_plan_accessors() {
3987 let store = create_test_store();
3988 let planner = Planner::new(store);
3989
3990 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
3991 variable: "n".to_string(),
3992 label: None,
3993 input: None,
3994 }));
3995
3996 let physical = planner.plan(&logical).unwrap();
3997 assert_eq!(physical.columns(), &["n"]);
3998
3999 let _ = physical.into_operator();
4001 }
4002
4003 #[test]
4006 fn test_plan_adaptive_with_scan() {
4007 let store = create_test_store();
4008 let planner = Planner::new(store);
4009
4010 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4012 items: vec![ReturnItem {
4013 expression: LogicalExpression::Variable("n".to_string()),
4014 alias: None,
4015 }],
4016 distinct: false,
4017 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4018 variable: "n".to_string(),
4019 label: Some("Person".to_string()),
4020 input: None,
4021 })),
4022 }));
4023
4024 let physical = planner.plan_adaptive(&logical).unwrap();
4025 assert_eq!(physical.columns(), &["n"]);
4026 assert!(physical.adaptive_context.is_some());
4028 }
4029
4030 #[test]
4031 fn test_plan_adaptive_with_filter() {
4032 let store = create_test_store();
4033 let planner = Planner::new(store);
4034
4035 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4037 items: vec![ReturnItem {
4038 expression: LogicalExpression::Variable("n".to_string()),
4039 alias: None,
4040 }],
4041 distinct: false,
4042 input: Box::new(LogicalOperator::Filter(FilterOp {
4043 predicate: LogicalExpression::Binary {
4044 left: Box::new(LogicalExpression::Property {
4045 variable: "n".to_string(),
4046 property: "age".to_string(),
4047 }),
4048 op: BinaryOp::Gt,
4049 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4050 },
4051 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4052 variable: "n".to_string(),
4053 label: None,
4054 input: None,
4055 })),
4056 })),
4057 }));
4058
4059 let physical = planner.plan_adaptive(&logical).unwrap();
4060 assert!(physical.adaptive_context.is_some());
4061 }
4062
4063 #[test]
4064 fn test_plan_adaptive_with_expand() {
4065 let store = create_test_store();
4066 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4067
4068 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4070 items: vec![
4071 ReturnItem {
4072 expression: LogicalExpression::Variable("a".to_string()),
4073 alias: None,
4074 },
4075 ReturnItem {
4076 expression: LogicalExpression::Variable("b".to_string()),
4077 alias: None,
4078 },
4079 ],
4080 distinct: false,
4081 input: Box::new(LogicalOperator::Expand(ExpandOp {
4082 from_variable: "a".to_string(),
4083 to_variable: "b".to_string(),
4084 edge_variable: None,
4085 direction: ExpandDirection::Outgoing,
4086 edge_type: Some("KNOWS".to_string()),
4087 min_hops: 1,
4088 max_hops: Some(1),
4089 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4090 variable: "a".to_string(),
4091 label: None,
4092 input: None,
4093 })),
4094 path_alias: None,
4095 })),
4096 }));
4097
4098 let physical = planner.plan_adaptive(&logical).unwrap();
4099 assert!(physical.adaptive_context.is_some());
4100 }
4101
4102 #[test]
4103 fn test_plan_adaptive_with_join() {
4104 let store = create_test_store();
4105 let planner = Planner::new(store);
4106
4107 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4108 items: vec![
4109 ReturnItem {
4110 expression: LogicalExpression::Variable("a".to_string()),
4111 alias: None,
4112 },
4113 ReturnItem {
4114 expression: LogicalExpression::Variable("b".to_string()),
4115 alias: None,
4116 },
4117 ],
4118 distinct: false,
4119 input: Box::new(LogicalOperator::Join(JoinOp {
4120 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4121 variable: "a".to_string(),
4122 label: None,
4123 input: None,
4124 })),
4125 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4126 variable: "b".to_string(),
4127 label: None,
4128 input: None,
4129 })),
4130 join_type: JoinType::Cross,
4131 conditions: vec![],
4132 })),
4133 }));
4134
4135 let physical = planner.plan_adaptive(&logical).unwrap();
4136 assert!(physical.adaptive_context.is_some());
4137 }
4138
4139 #[test]
4140 fn test_plan_adaptive_with_aggregate() {
4141 let store = create_test_store();
4142 let planner = Planner::new(store);
4143
4144 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4145 group_by: vec![],
4146 aggregates: vec![LogicalAggregateExpr {
4147 function: LogicalAggregateFunction::Count,
4148 expression: Some(LogicalExpression::Variable("n".to_string())),
4149 distinct: false,
4150 alias: Some("cnt".to_string()),
4151 percentile: None,
4152 }],
4153 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4154 variable: "n".to_string(),
4155 label: None,
4156 input: None,
4157 })),
4158 having: None,
4159 }));
4160
4161 let physical = planner.plan_adaptive(&logical).unwrap();
4162 assert!(physical.adaptive_context.is_some());
4163 }
4164
4165 #[test]
4166 fn test_plan_adaptive_with_distinct() {
4167 let store = create_test_store();
4168 let planner = Planner::new(store);
4169
4170 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4171 items: vec![ReturnItem {
4172 expression: LogicalExpression::Variable("n".to_string()),
4173 alias: None,
4174 }],
4175 distinct: false,
4176 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4177 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4178 variable: "n".to_string(),
4179 label: None,
4180 input: None,
4181 })),
4182 columns: None,
4183 })),
4184 }));
4185
4186 let physical = planner.plan_adaptive(&logical).unwrap();
4187 assert!(physical.adaptive_context.is_some());
4188 }
4189
4190 #[test]
4191 fn test_plan_adaptive_with_limit() {
4192 let store = create_test_store();
4193 let planner = Planner::new(store);
4194
4195 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4196 items: vec![ReturnItem {
4197 expression: LogicalExpression::Variable("n".to_string()),
4198 alias: None,
4199 }],
4200 distinct: false,
4201 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4202 count: 10,
4203 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4204 variable: "n".to_string(),
4205 label: None,
4206 input: None,
4207 })),
4208 })),
4209 }));
4210
4211 let physical = planner.plan_adaptive(&logical).unwrap();
4212 assert!(physical.adaptive_context.is_some());
4213 }
4214
4215 #[test]
4216 fn test_plan_adaptive_with_skip() {
4217 let store = create_test_store();
4218 let planner = Planner::new(store);
4219
4220 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4221 items: vec![ReturnItem {
4222 expression: LogicalExpression::Variable("n".to_string()),
4223 alias: None,
4224 }],
4225 distinct: false,
4226 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4227 count: 5,
4228 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4229 variable: "n".to_string(),
4230 label: None,
4231 input: None,
4232 })),
4233 })),
4234 }));
4235
4236 let physical = planner.plan_adaptive(&logical).unwrap();
4237 assert!(physical.adaptive_context.is_some());
4238 }
4239
4240 #[test]
4241 fn test_plan_adaptive_with_sort() {
4242 let store = create_test_store();
4243 let planner = Planner::new(store);
4244
4245 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4246 items: vec![ReturnItem {
4247 expression: LogicalExpression::Variable("n".to_string()),
4248 alias: None,
4249 }],
4250 distinct: false,
4251 input: Box::new(LogicalOperator::Sort(SortOp {
4252 keys: vec![SortKey {
4253 expression: LogicalExpression::Variable("n".to_string()),
4254 order: SortOrder::Ascending,
4255 }],
4256 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4257 variable: "n".to_string(),
4258 label: None,
4259 input: None,
4260 })),
4261 })),
4262 }));
4263
4264 let physical = planner.plan_adaptive(&logical).unwrap();
4265 assert!(physical.adaptive_context.is_some());
4266 }
4267
4268 #[test]
4269 fn test_plan_adaptive_with_union() {
4270 let store = create_test_store();
4271 let planner = Planner::new(store);
4272
4273 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4274 items: vec![ReturnItem {
4275 expression: LogicalExpression::Variable("n".to_string()),
4276 alias: None,
4277 }],
4278 distinct: false,
4279 input: Box::new(LogicalOperator::Union(UnionOp {
4280 inputs: vec![
4281 LogicalOperator::NodeScan(NodeScanOp {
4282 variable: "n".to_string(),
4283 label: Some("Person".to_string()),
4284 input: None,
4285 }),
4286 LogicalOperator::NodeScan(NodeScanOp {
4287 variable: "n".to_string(),
4288 label: Some("Company".to_string()),
4289 input: None,
4290 }),
4291 ],
4292 })),
4293 }));
4294
4295 let physical = planner.plan_adaptive(&logical).unwrap();
4296 assert!(physical.adaptive_context.is_some());
4297 }
4298
4299 #[test]
4302 fn test_plan_expand_variable_length() {
4303 let store = create_test_store();
4304 let planner = Planner::new(store);
4305
4306 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4308 items: vec![
4309 ReturnItem {
4310 expression: LogicalExpression::Variable("a".to_string()),
4311 alias: None,
4312 },
4313 ReturnItem {
4314 expression: LogicalExpression::Variable("b".to_string()),
4315 alias: None,
4316 },
4317 ],
4318 distinct: false,
4319 input: Box::new(LogicalOperator::Expand(ExpandOp {
4320 from_variable: "a".to_string(),
4321 to_variable: "b".to_string(),
4322 edge_variable: None,
4323 direction: ExpandDirection::Outgoing,
4324 edge_type: Some("KNOWS".to_string()),
4325 min_hops: 1,
4326 max_hops: Some(3),
4327 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4328 variable: "a".to_string(),
4329 label: None,
4330 input: None,
4331 })),
4332 path_alias: None,
4333 })),
4334 }));
4335
4336 let physical = planner.plan(&logical).unwrap();
4337 assert!(physical.columns().contains(&"a".to_string()));
4338 assert!(physical.columns().contains(&"b".to_string()));
4339 }
4340
4341 #[test]
4342 fn test_plan_expand_with_path_alias() {
4343 let store = create_test_store();
4344 let planner = Planner::new(store);
4345
4346 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4348 items: vec![
4349 ReturnItem {
4350 expression: LogicalExpression::Variable("a".to_string()),
4351 alias: None,
4352 },
4353 ReturnItem {
4354 expression: LogicalExpression::Variable("b".to_string()),
4355 alias: None,
4356 },
4357 ],
4358 distinct: false,
4359 input: Box::new(LogicalOperator::Expand(ExpandOp {
4360 from_variable: "a".to_string(),
4361 to_variable: "b".to_string(),
4362 edge_variable: None,
4363 direction: ExpandDirection::Outgoing,
4364 edge_type: Some("KNOWS".to_string()),
4365 min_hops: 1,
4366 max_hops: Some(3),
4367 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4368 variable: "a".to_string(),
4369 label: None,
4370 input: None,
4371 })),
4372 path_alias: Some("p".to_string()),
4373 })),
4374 }));
4375
4376 let physical = planner.plan(&logical).unwrap();
4377 assert!(physical.columns().contains(&"a".to_string()));
4379 assert!(physical.columns().contains(&"b".to_string()));
4380 }
4381
4382 #[test]
4383 fn test_plan_expand_incoming() {
4384 let store = create_test_store();
4385 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4386
4387 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4389 items: vec![
4390 ReturnItem {
4391 expression: LogicalExpression::Variable("a".to_string()),
4392 alias: None,
4393 },
4394 ReturnItem {
4395 expression: LogicalExpression::Variable("b".to_string()),
4396 alias: None,
4397 },
4398 ],
4399 distinct: false,
4400 input: Box::new(LogicalOperator::Expand(ExpandOp {
4401 from_variable: "a".to_string(),
4402 to_variable: "b".to_string(),
4403 edge_variable: None,
4404 direction: ExpandDirection::Incoming,
4405 edge_type: Some("KNOWS".to_string()),
4406 min_hops: 1,
4407 max_hops: Some(1),
4408 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4409 variable: "a".to_string(),
4410 label: None,
4411 input: None,
4412 })),
4413 path_alias: None,
4414 })),
4415 }));
4416
4417 let physical = planner.plan(&logical).unwrap();
4418 assert!(physical.columns().contains(&"a".to_string()));
4419 assert!(physical.columns().contains(&"b".to_string()));
4420 }
4421
4422 #[test]
4423 fn test_plan_expand_both_directions() {
4424 let store = create_test_store();
4425 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4426
4427 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4429 items: vec![
4430 ReturnItem {
4431 expression: LogicalExpression::Variable("a".to_string()),
4432 alias: None,
4433 },
4434 ReturnItem {
4435 expression: LogicalExpression::Variable("b".to_string()),
4436 alias: None,
4437 },
4438 ],
4439 distinct: false,
4440 input: Box::new(LogicalOperator::Expand(ExpandOp {
4441 from_variable: "a".to_string(),
4442 to_variable: "b".to_string(),
4443 edge_variable: None,
4444 direction: ExpandDirection::Both,
4445 edge_type: Some("KNOWS".to_string()),
4446 min_hops: 1,
4447 max_hops: Some(1),
4448 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4449 variable: "a".to_string(),
4450 label: None,
4451 input: None,
4452 })),
4453 path_alias: None,
4454 })),
4455 }));
4456
4457 let physical = planner.plan(&logical).unwrap();
4458 assert!(physical.columns().contains(&"a".to_string()));
4459 assert!(physical.columns().contains(&"b".to_string()));
4460 }
4461
4462 #[test]
4465 fn test_planner_with_context() {
4466 use crate::transaction::TransactionManager;
4467
4468 let store = create_test_store();
4469 let tx_manager = Arc::new(TransactionManager::new());
4470 let tx_id = tx_manager.begin();
4471 let epoch = tx_manager.current_epoch();
4472
4473 let planner = Planner::with_context(
4474 Arc::clone(&store),
4475 Arc::clone(&tx_manager),
4476 Some(tx_id),
4477 epoch,
4478 );
4479
4480 assert_eq!(planner.tx_id(), Some(tx_id));
4481 assert!(planner.tx_manager().is_some());
4482 assert_eq!(planner.viewing_epoch(), epoch);
4483 }
4484
4485 #[test]
4486 fn test_planner_with_factorized_execution_disabled() {
4487 let store = create_test_store();
4488 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4489
4490 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4492 items: vec![
4493 ReturnItem {
4494 expression: LogicalExpression::Variable("a".to_string()),
4495 alias: None,
4496 },
4497 ReturnItem {
4498 expression: LogicalExpression::Variable("c".to_string()),
4499 alias: None,
4500 },
4501 ],
4502 distinct: false,
4503 input: Box::new(LogicalOperator::Expand(ExpandOp {
4504 from_variable: "b".to_string(),
4505 to_variable: "c".to_string(),
4506 edge_variable: None,
4507 direction: ExpandDirection::Outgoing,
4508 edge_type: None,
4509 min_hops: 1,
4510 max_hops: Some(1),
4511 input: Box::new(LogicalOperator::Expand(ExpandOp {
4512 from_variable: "a".to_string(),
4513 to_variable: "b".to_string(),
4514 edge_variable: None,
4515 direction: ExpandDirection::Outgoing,
4516 edge_type: None,
4517 min_hops: 1,
4518 max_hops: Some(1),
4519 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4520 variable: "a".to_string(),
4521 label: None,
4522 input: None,
4523 })),
4524 path_alias: None,
4525 })),
4526 path_alias: None,
4527 })),
4528 }));
4529
4530 let physical = planner.plan(&logical).unwrap();
4531 assert!(physical.columns().contains(&"a".to_string()));
4532 assert!(physical.columns().contains(&"c".to_string()));
4533 }
4534
4535 #[test]
4538 fn test_plan_sort_by_property() {
4539 let store = create_test_store();
4540 let planner = Planner::new(store);
4541
4542 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4544 items: vec![ReturnItem {
4545 expression: LogicalExpression::Variable("n".to_string()),
4546 alias: None,
4547 }],
4548 distinct: false,
4549 input: Box::new(LogicalOperator::Sort(SortOp {
4550 keys: vec![SortKey {
4551 expression: LogicalExpression::Property {
4552 variable: "n".to_string(),
4553 property: "name".to_string(),
4554 },
4555 order: SortOrder::Ascending,
4556 }],
4557 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4558 variable: "n".to_string(),
4559 label: None,
4560 input: None,
4561 })),
4562 })),
4563 }));
4564
4565 let physical = planner.plan(&logical).unwrap();
4566 assert!(physical.columns().contains(&"n".to_string()));
4568 }
4569
4570 #[test]
4573 fn test_plan_scan_with_input() {
4574 let store = create_test_store();
4575 let planner = Planner::new(store);
4576
4577 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4579 items: vec![
4580 ReturnItem {
4581 expression: LogicalExpression::Variable("a".to_string()),
4582 alias: None,
4583 },
4584 ReturnItem {
4585 expression: LogicalExpression::Variable("b".to_string()),
4586 alias: None,
4587 },
4588 ],
4589 distinct: false,
4590 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4591 variable: "b".to_string(),
4592 label: Some("Company".to_string()),
4593 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4594 variable: "a".to_string(),
4595 label: Some("Person".to_string()),
4596 input: None,
4597 }))),
4598 })),
4599 }));
4600
4601 let physical = planner.plan(&logical).unwrap();
4602 assert!(physical.columns().contains(&"a".to_string()));
4603 assert!(physical.columns().contains(&"b".to_string()));
4604 }
4605}