1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::LogicalType;
15use grafeo_common::types::{EpochId, TxId};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
22 ExpandStep, ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator,
23 FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
24 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LimitOperator, MergeOperator,
25 NestedLoopJoinOperator, NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource,
26 RemoveLabelOperator, ScanOperator, SetPropertyOperator, ShortestPathOperator,
27 SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
28 UnaryFilterOp, UnionOperator, UnwindOperator, VariableLengthExpandOperator,
29};
30use grafeo_core::graph::{Direction, lpg::LpgStore};
31use std::collections::HashMap;
32use std::sync::Arc;
33
34use crate::transaction::TransactionManager;
35
36pub struct Planner {
38 store: Arc<LpgStore>,
40 tx_manager: Option<Arc<TransactionManager>>,
42 tx_id: Option<TxId>,
44 viewing_epoch: EpochId,
46 anon_edge_counter: std::cell::Cell<u32>,
48 factorized_execution: bool,
50}
51
52impl Planner {
53 #[must_use]
58 pub fn new(store: Arc<LpgStore>) -> Self {
59 let epoch = store.current_epoch();
60 Self {
61 store,
62 tx_manager: None,
63 tx_id: None,
64 viewing_epoch: epoch,
65 anon_edge_counter: std::cell::Cell::new(0),
66 factorized_execution: true,
67 }
68 }
69
70 #[must_use]
79 pub fn with_context(
80 store: Arc<LpgStore>,
81 tx_manager: Arc<TransactionManager>,
82 tx_id: Option<TxId>,
83 viewing_epoch: EpochId,
84 ) -> Self {
85 Self {
86 store,
87 tx_manager: Some(tx_manager),
88 tx_id,
89 viewing_epoch,
90 anon_edge_counter: std::cell::Cell::new(0),
91 factorized_execution: true,
92 }
93 }
94
95 #[must_use]
97 pub fn viewing_epoch(&self) -> EpochId {
98 self.viewing_epoch
99 }
100
101 #[must_use]
103 pub fn tx_id(&self) -> Option<TxId> {
104 self.tx_id
105 }
106
107 #[must_use]
109 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
110 self.tx_manager.as_ref()
111 }
112
113 #[must_use]
115 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
116 self.factorized_execution = enabled;
117 self
118 }
119
120 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
124 match op {
125 LogicalOperator::Expand(expand) => {
126 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
128
129 if is_single_hop {
130 let (inner_count, base) = Self::count_expand_chain(&expand.input);
131 (inner_count + 1, base)
132 } else {
133 (0, op)
135 }
136 }
137 _ => (0, op),
138 }
139 }
140
141 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
145 let mut chain = Vec::new();
146 let mut current = op;
147
148 while let LogicalOperator::Expand(expand) = current {
149 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
151 if !is_single_hop {
152 break;
153 }
154 chain.push(expand);
155 current = &expand.input;
156 }
157
158 chain.reverse();
160 chain
161 }
162
163 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
169 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
170 Ok(PhysicalPlan {
171 operator,
172 columns,
173 adaptive_context: None,
174 })
175 }
176
177 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
186 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
187
188 let mut adaptive_context = AdaptiveContext::new();
190 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
191
192 Ok(PhysicalPlan {
193 operator,
194 columns,
195 adaptive_context: Some(adaptive_context),
196 })
197 }
198
199 fn collect_cardinality_estimates(
201 &self,
202 op: &LogicalOperator,
203 ctx: &mut AdaptiveContext,
204 depth: usize,
205 ) {
206 match op {
207 LogicalOperator::NodeScan(scan) => {
208 let estimate = if let Some(label) = &scan.label {
210 self.store.nodes_by_label(label).len() as f64
211 } else {
212 self.store.node_count() as f64
213 };
214 let id = format!("scan_{}", scan.variable);
215 ctx.set_estimate(&id, estimate);
216
217 if let Some(input) = &scan.input {
219 self.collect_cardinality_estimates(input, ctx, depth + 1);
220 }
221 }
222 LogicalOperator::Filter(filter) => {
223 let input_estimate = self.estimate_cardinality(&filter.input);
225 let estimate = input_estimate * 0.3;
226 let id = format!("filter_{depth}");
227 ctx.set_estimate(&id, estimate);
228
229 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
230 }
231 LogicalOperator::Expand(expand) => {
232 let input_estimate = self.estimate_cardinality(&expand.input);
234 let avg_degree = 10.0; let estimate = input_estimate * avg_degree;
236 let id = format!("expand_{}", expand.to_variable);
237 ctx.set_estimate(&id, estimate);
238
239 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
240 }
241 LogicalOperator::Join(join) => {
242 let left_est = self.estimate_cardinality(&join.left);
244 let right_est = self.estimate_cardinality(&join.right);
245 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
247 ctx.set_estimate(&id, estimate);
248
249 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
250 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
251 }
252 LogicalOperator::Aggregate(agg) => {
253 let input_estimate = self.estimate_cardinality(&agg.input);
255 let estimate = if agg.group_by.is_empty() {
256 1.0 } else {
258 (input_estimate * 0.1).max(1.0) };
260 let id = format!("aggregate_{depth}");
261 ctx.set_estimate(&id, estimate);
262
263 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
264 }
265 LogicalOperator::Distinct(distinct) => {
266 let input_estimate = self.estimate_cardinality(&distinct.input);
267 let estimate = (input_estimate * 0.5).max(1.0);
268 let id = format!("distinct_{depth}");
269 ctx.set_estimate(&id, estimate);
270
271 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
272 }
273 LogicalOperator::Return(ret) => {
274 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
275 }
276 LogicalOperator::Limit(limit) => {
277 let input_estimate = self.estimate_cardinality(&limit.input);
278 let estimate = (input_estimate).min(limit.count as f64);
279 let id = format!("limit_{depth}");
280 ctx.set_estimate(&id, estimate);
281
282 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
283 }
284 LogicalOperator::Skip(skip) => {
285 let input_estimate = self.estimate_cardinality(&skip.input);
286 let estimate = (input_estimate - skip.count as f64).max(0.0);
287 let id = format!("skip_{depth}");
288 ctx.set_estimate(&id, estimate);
289
290 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
291 }
292 LogicalOperator::Sort(sort) => {
293 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
295 }
296 LogicalOperator::Union(union) => {
297 let estimate: f64 = union
298 .inputs
299 .iter()
300 .map(|input| self.estimate_cardinality(input))
301 .sum();
302 let id = format!("union_{depth}");
303 ctx.set_estimate(&id, estimate);
304
305 for input in &union.inputs {
306 self.collect_cardinality_estimates(input, ctx, depth + 1);
307 }
308 }
309 _ => {
310 }
312 }
313 }
314
315 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
317 match op {
318 LogicalOperator::NodeScan(scan) => {
319 if let Some(label) = &scan.label {
320 self.store.nodes_by_label(label).len() as f64
321 } else {
322 self.store.node_count() as f64
323 }
324 }
325 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
326 LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
327 LogicalOperator::Join(join) => {
328 let left = self.estimate_cardinality(&join.left);
329 let right = self.estimate_cardinality(&join.right);
330 (left * right).sqrt()
331 }
332 LogicalOperator::Aggregate(agg) => {
333 if agg.group_by.is_empty() {
334 1.0
335 } else {
336 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
337 }
338 }
339 LogicalOperator::Distinct(distinct) => {
340 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
341 }
342 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
343 LogicalOperator::Limit(limit) => self
344 .estimate_cardinality(&limit.input)
345 .min(limit.count as f64),
346 LogicalOperator::Skip(skip) => {
347 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
348 }
349 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
350 LogicalOperator::Union(union) => union
351 .inputs
352 .iter()
353 .map(|input| self.estimate_cardinality(input))
354 .sum(),
355 _ => 1000.0, }
357 }
358
359 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
361 match op {
362 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
363 LogicalOperator::Expand(expand) => {
364 if self.factorized_execution {
366 let (chain_len, _base) = Self::count_expand_chain(op);
367 if chain_len >= 2 {
368 return self.plan_expand_chain(op);
370 }
371 }
372 self.plan_expand(expand)
373 }
374 LogicalOperator::Return(ret) => self.plan_return(ret),
375 LogicalOperator::Filter(filter) => self.plan_filter(filter),
376 LogicalOperator::Project(project) => self.plan_project(project),
377 LogicalOperator::Limit(limit) => self.plan_limit(limit),
378 LogicalOperator::Skip(skip) => self.plan_skip(skip),
379 LogicalOperator::Sort(sort) => self.plan_sort(sort),
380 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
381 LogicalOperator::Join(join) => self.plan_join(join),
382 LogicalOperator::Union(union) => self.plan_union(union),
383 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
384 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
385 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
386 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
387 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
388 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
389 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
390 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
391 LogicalOperator::Merge(merge) => self.plan_merge(merge),
392 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
393 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
394 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
395 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
396 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
397 _ => Err(Error::Internal(format!(
398 "Unsupported operator: {:?}",
399 std::mem::discriminant(op)
400 ))),
401 }
402 }
403
404 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
406 let scan_op = if let Some(label) = &scan.label {
407 ScanOperator::with_label(Arc::clone(&self.store), label)
408 } else {
409 ScanOperator::new(Arc::clone(&self.store))
410 };
411
412 let scan_operator: Box<dyn Operator> =
414 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
415
416 if let Some(input) = &scan.input {
418 let (input_op, mut input_columns) = self.plan_operator(input)?;
419
420 let mut output_schema: Vec<LogicalType> =
422 input_columns.iter().map(|_| LogicalType::Any).collect();
423 output_schema.push(LogicalType::Node);
424
425 input_columns.push(scan.variable.clone());
427
428 let join_op = Box::new(NestedLoopJoinOperator::new(
430 input_op,
431 scan_operator,
432 None, PhysicalJoinType::Cross,
434 output_schema,
435 ));
436
437 Ok((join_op, input_columns))
438 } else {
439 let columns = vec![scan.variable.clone()];
440 Ok((scan_operator, columns))
441 }
442 }
443
444 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
446 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
448
449 let source_column = input_columns
451 .iter()
452 .position(|c| c == &expand.from_variable)
453 .ok_or_else(|| {
454 Error::Internal(format!(
455 "Source variable '{}' not found in input columns",
456 expand.from_variable
457 ))
458 })?;
459
460 let direction = match expand.direction {
462 ExpandDirection::Outgoing => Direction::Outgoing,
463 ExpandDirection::Incoming => Direction::Incoming,
464 ExpandDirection::Both => Direction::Both,
465 };
466
467 let is_variable_length =
469 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
470
471 let operator: Box<dyn Operator> = if is_variable_length {
472 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
475 Arc::clone(&self.store),
476 input_op,
477 source_column,
478 direction,
479 expand.edge_type.clone(),
480 expand.min_hops,
481 max_hops,
482 )
483 .with_tx_context(self.viewing_epoch, self.tx_id);
484
485 if expand.path_alias.is_some() {
487 expand_op = expand_op.with_path_length_output();
488 }
489
490 Box::new(expand_op)
491 } else {
492 let expand_op = ExpandOperator::new(
494 Arc::clone(&self.store),
495 input_op,
496 source_column,
497 direction,
498 expand.edge_type.clone(),
499 )
500 .with_tx_context(self.viewing_epoch, self.tx_id);
501 Box::new(expand_op)
502 };
503
504 let mut columns = input_columns;
507
508 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
510 let count = self.anon_edge_counter.get();
511 self.anon_edge_counter.set(count + 1);
512 format!("_anon_edge_{}", count)
513 });
514 columns.push(edge_col_name);
515
516 columns.push(expand.to_variable.clone());
517
518 if let Some(ref path_alias) = expand.path_alias {
520 columns.push(format!("_path_length_{}", path_alias));
521 }
522
523 Ok((operator, columns))
524 }
525
526 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
534 let expands = Self::collect_expand_chain(op);
535 if expands.is_empty() {
536 return Err(Error::Internal("Empty expand chain".to_string()));
537 }
538
539 let first_expand = expands[0];
541 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
542
543 let mut columns = base_columns.clone();
544 let mut steps = Vec::new();
545
546 let mut is_first = true;
551
552 for expand in &expands {
553 let source_column = if is_first {
555 base_columns
557 .iter()
558 .position(|c| c == &expand.from_variable)
559 .ok_or_else(|| {
560 Error::Internal(format!(
561 "Source variable '{}' not found in base columns",
562 expand.from_variable
563 ))
564 })?
565 } else {
566 1
569 };
570
571 let direction = match expand.direction {
573 ExpandDirection::Outgoing => Direction::Outgoing,
574 ExpandDirection::Incoming => Direction::Incoming,
575 ExpandDirection::Both => Direction::Both,
576 };
577
578 steps.push(ExpandStep {
580 source_column,
581 direction,
582 edge_type: expand.edge_type.clone(),
583 });
584
585 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
587 let count = self.anon_edge_counter.get();
588 self.anon_edge_counter.set(count + 1);
589 format!("_anon_edge_{}", count)
590 });
591 columns.push(edge_col_name);
592 columns.push(expand.to_variable.clone());
593
594 is_first = false;
595 }
596
597 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
599
600 if let Some(tx_id) = self.tx_id {
601 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
602 } else {
603 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
604 }
605
606 Ok((Box::new(lazy_op), columns))
607 }
608
609 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
611 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
613
614 let variable_columns: HashMap<String, usize> = input_columns
616 .iter()
617 .enumerate()
618 .map(|(i, name)| (name.clone(), i))
619 .collect();
620
621 let columns: Vec<String> = ret
623 .items
624 .iter()
625 .map(|item| {
626 item.alias.clone().unwrap_or_else(|| {
627 expression_to_string(&item.expression)
629 })
630 })
631 .collect();
632
633 let needs_project = ret
635 .items
636 .iter()
637 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
638
639 if needs_project {
640 let mut projections = Vec::with_capacity(ret.items.len());
642 let mut output_types = Vec::with_capacity(ret.items.len());
643
644 for item in &ret.items {
645 match &item.expression {
646 LogicalExpression::Variable(name) => {
647 let col_idx = *variable_columns.get(name).ok_or_else(|| {
648 Error::Internal(format!("Variable '{}' not found in input", name))
649 })?;
650 projections.push(ProjectExpr::Column(col_idx));
651 output_types.push(LogicalType::Node);
653 }
654 LogicalExpression::Property { variable, property } => {
655 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
656 Error::Internal(format!("Variable '{}' not found in input", variable))
657 })?;
658 projections.push(ProjectExpr::PropertyAccess {
659 column: col_idx,
660 property: property.clone(),
661 });
662 output_types.push(LogicalType::Any);
664 }
665 LogicalExpression::Literal(value) => {
666 projections.push(ProjectExpr::Constant(value.clone()));
667 output_types.push(value_to_logical_type(value));
668 }
669 LogicalExpression::FunctionCall { name, args, .. } => {
670 match name.to_lowercase().as_str() {
672 "type" => {
673 if args.len() != 1 {
675 return Err(Error::Internal(
676 "type() requires exactly one argument".to_string(),
677 ));
678 }
679 if let LogicalExpression::Variable(var_name) = &args[0] {
680 let col_idx =
681 *variable_columns.get(var_name).ok_or_else(|| {
682 Error::Internal(format!(
683 "Variable '{}' not found in input",
684 var_name
685 ))
686 })?;
687 projections.push(ProjectExpr::EdgeType { column: col_idx });
688 output_types.push(LogicalType::String);
689 } else {
690 return Err(Error::Internal(
691 "type() argument must be a variable".to_string(),
692 ));
693 }
694 }
695 "length" => {
696 if args.len() != 1 {
699 return Err(Error::Internal(
700 "length() requires exactly one argument".to_string(),
701 ));
702 }
703 if let LogicalExpression::Variable(var_name) = &args[0] {
704 let col_idx =
705 *variable_columns.get(var_name).ok_or_else(|| {
706 Error::Internal(format!(
707 "Variable '{}' not found in input",
708 var_name
709 ))
710 })?;
711 projections.push(ProjectExpr::Column(col_idx));
713 output_types.push(LogicalType::Int64);
714 } else {
715 return Err(Error::Internal(
716 "length() argument must be a variable".to_string(),
717 ));
718 }
719 }
720 _ => {
722 let filter_expr = self.convert_expression(&item.expression)?;
723 projections.push(ProjectExpr::Expression {
724 expr: filter_expr,
725 variable_columns: variable_columns.clone(),
726 });
727 output_types.push(LogicalType::Any);
728 }
729 }
730 }
731 LogicalExpression::Case { .. } => {
732 let filter_expr = self.convert_expression(&item.expression)?;
734 projections.push(ProjectExpr::Expression {
735 expr: filter_expr,
736 variable_columns: variable_columns.clone(),
737 });
738 output_types.push(LogicalType::Any);
740 }
741 _ => {
742 return Err(Error::Internal(format!(
743 "Unsupported RETURN expression: {:?}",
744 item.expression
745 )));
746 }
747 }
748 }
749
750 let operator = Box::new(ProjectOperator::with_store(
751 input_op,
752 projections,
753 output_types,
754 Arc::clone(&self.store),
755 ));
756
757 Ok((operator, columns))
758 } else {
759 let mut projections = Vec::with_capacity(ret.items.len());
762 let mut output_types = Vec::with_capacity(ret.items.len());
763
764 for item in &ret.items {
765 if let LogicalExpression::Variable(name) = &item.expression {
766 let col_idx = *variable_columns.get(name).ok_or_else(|| {
767 Error::Internal(format!("Variable '{}' not found in input", name))
768 })?;
769 projections.push(ProjectExpr::Column(col_idx));
770 output_types.push(LogicalType::Node);
771 }
772 }
773
774 if projections.len() == input_columns.len()
776 && projections
777 .iter()
778 .enumerate()
779 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
780 {
781 Ok((input_op, columns))
783 } else {
784 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
785 Ok((operator, columns))
786 }
787 }
788 }
789
790 fn plan_project(
792 &self,
793 project: &crate::query::plan::ProjectOp,
794 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
795 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
797 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
798 let single_row_op: Box<dyn Operator> = Box::new(
800 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
801 );
802 (single_row_op, Vec::new())
803 } else {
804 self.plan_operator(&project.input)?
805 };
806
807 let variable_columns: HashMap<String, usize> = input_columns
809 .iter()
810 .enumerate()
811 .map(|(i, name)| (name.clone(), i))
812 .collect();
813
814 let mut projections = Vec::with_capacity(project.projections.len());
816 let mut output_types = Vec::with_capacity(project.projections.len());
817 let mut output_columns = Vec::with_capacity(project.projections.len());
818
819 for projection in &project.projections {
820 let col_name = projection
822 .alias
823 .clone()
824 .unwrap_or_else(|| expression_to_string(&projection.expression));
825 output_columns.push(col_name);
826
827 match &projection.expression {
828 LogicalExpression::Variable(name) => {
829 let col_idx = *variable_columns.get(name).ok_or_else(|| {
830 Error::Internal(format!("Variable '{}' not found in input", name))
831 })?;
832 projections.push(ProjectExpr::Column(col_idx));
833 output_types.push(LogicalType::Node);
834 }
835 LogicalExpression::Property { variable, property } => {
836 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
837 Error::Internal(format!("Variable '{}' not found in input", variable))
838 })?;
839 projections.push(ProjectExpr::PropertyAccess {
840 column: col_idx,
841 property: property.clone(),
842 });
843 output_types.push(LogicalType::Any);
844 }
845 LogicalExpression::Literal(value) => {
846 projections.push(ProjectExpr::Constant(value.clone()));
847 output_types.push(value_to_logical_type(value));
848 }
849 _ => {
850 let filter_expr = self.convert_expression(&projection.expression)?;
852 projections.push(ProjectExpr::Expression {
853 expr: filter_expr,
854 variable_columns: variable_columns.clone(),
855 });
856 output_types.push(LogicalType::Any);
857 }
858 }
859 }
860
861 let operator = Box::new(ProjectOperator::with_store(
862 input_op,
863 projections,
864 output_types,
865 Arc::clone(&self.store),
866 ));
867
868 Ok((operator, output_columns))
869 }
870
871 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
873 let (input_op, columns) = self.plan_operator(&filter.input)?;
875
876 let variable_columns: HashMap<String, usize> = columns
878 .iter()
879 .enumerate()
880 .map(|(i, name)| (name.clone(), i))
881 .collect();
882
883 let filter_expr = self.convert_expression(&filter.predicate)?;
885
886 let predicate =
888 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
889
890 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
892
893 Ok((operator, columns))
894 }
895
896 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
898 let (input_op, columns) = self.plan_operator(&limit.input)?;
899 let output_schema = self.derive_schema_from_columns(&columns);
900 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
901 Ok((operator, columns))
902 }
903
904 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
906 let (input_op, columns) = self.plan_operator(&skip.input)?;
907 let output_schema = self.derive_schema_from_columns(&columns);
908 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
909 Ok((operator, columns))
910 }
911
912 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
914 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
915
916 let mut variable_columns: HashMap<String, usize> = input_columns
918 .iter()
919 .enumerate()
920 .map(|(i, name)| (name.clone(), i))
921 .collect();
922
923 let mut property_projections: Vec<(String, String, String)> = Vec::new();
925 let mut next_col_idx = input_columns.len();
926
927 for key in &sort.keys {
928 if let LogicalExpression::Property { variable, property } = &key.expression {
929 let col_name = format!("{}_{}", variable, property);
930 if !variable_columns.contains_key(&col_name) {
931 property_projections.push((
932 variable.clone(),
933 property.clone(),
934 col_name.clone(),
935 ));
936 variable_columns.insert(col_name, next_col_idx);
937 next_col_idx += 1;
938 }
939 }
940 }
941
942 let mut output_columns = input_columns.clone();
944
945 if !property_projections.is_empty() {
947 let mut projections = Vec::new();
948 let mut output_types = Vec::new();
949
950 for (i, _) in input_columns.iter().enumerate() {
953 projections.push(ProjectExpr::Column(i));
954 output_types.push(LogicalType::Node);
955 }
956
957 for (variable, property, col_name) in &property_projections {
959 let source_col = *variable_columns.get(variable).ok_or_else(|| {
960 Error::Internal(format!(
961 "Variable '{}' not found for ORDER BY property projection",
962 variable
963 ))
964 })?;
965 projections.push(ProjectExpr::PropertyAccess {
966 column: source_col,
967 property: property.clone(),
968 });
969 output_types.push(LogicalType::Any);
970 output_columns.push(col_name.clone());
971 }
972
973 input_op = Box::new(ProjectOperator::with_store(
974 input_op,
975 projections,
976 output_types,
977 Arc::clone(&self.store),
978 ));
979 }
980
981 let physical_keys: Vec<PhysicalSortKey> = sort
983 .keys
984 .iter()
985 .map(|key| {
986 let col_idx = self
987 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
988 Ok(PhysicalSortKey {
989 column: col_idx,
990 direction: match key.order {
991 SortOrder::Ascending => SortDirection::Ascending,
992 SortOrder::Descending => SortDirection::Descending,
993 },
994 null_order: NullOrder::NullsLast,
995 })
996 })
997 .collect::<Result<Vec<_>>>()?;
998
999 let output_schema = self.derive_schema_from_columns(&output_columns);
1000 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1001 Ok((operator, output_columns))
1002 }
1003
1004 fn resolve_sort_expression_with_properties(
1006 &self,
1007 expr: &LogicalExpression,
1008 variable_columns: &HashMap<String, usize>,
1009 ) -> Result<usize> {
1010 match expr {
1011 LogicalExpression::Variable(name) => {
1012 variable_columns.get(name).copied().ok_or_else(|| {
1013 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1014 })
1015 }
1016 LogicalExpression::Property { variable, property } => {
1017 let col_name = format!("{}_{}", variable, property);
1019 variable_columns.get(&col_name).copied().ok_or_else(|| {
1020 Error::Internal(format!(
1021 "Property column '{}' not found for ORDER BY (from {}.{})",
1022 col_name, variable, property
1023 ))
1024 })
1025 }
1026 _ => Err(Error::Internal(format!(
1027 "Unsupported ORDER BY expression: {:?}",
1028 expr
1029 ))),
1030 }
1031 }
1032
1033 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1035 columns.iter().map(|_| LogicalType::Any).collect()
1036 }
1037
1038 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1040 if self.factorized_execution
1047 && agg.group_by.is_empty()
1048 && Self::count_expand_chain(&agg.input).0 >= 2
1049 && self.is_simple_aggregate(agg)
1050 {
1051 if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1052 return Ok((op, cols));
1053 }
1054 }
1056
1057 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1058
1059 let mut variable_columns: HashMap<String, usize> = input_columns
1061 .iter()
1062 .enumerate()
1063 .map(|(i, name)| (name.clone(), i))
1064 .collect();
1065
1066 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1069
1070 for expr in &agg.group_by {
1072 if let LogicalExpression::Property { variable, property } = expr {
1073 let col_name = format!("{}_{}", variable, property);
1074 if !variable_columns.contains_key(&col_name) {
1075 property_projections.push((
1076 variable.clone(),
1077 property.clone(),
1078 col_name.clone(),
1079 ));
1080 variable_columns.insert(col_name, next_col_idx);
1081 next_col_idx += 1;
1082 }
1083 }
1084 }
1085
1086 for agg_expr in &agg.aggregates {
1088 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1089 let col_name = format!("{}_{}", variable, property);
1090 if !variable_columns.contains_key(&col_name) {
1091 property_projections.push((
1092 variable.clone(),
1093 property.clone(),
1094 col_name.clone(),
1095 ));
1096 variable_columns.insert(col_name, next_col_idx);
1097 next_col_idx += 1;
1098 }
1099 }
1100 }
1101
1102 if !property_projections.is_empty() {
1104 let mut projections = Vec::new();
1105 let mut output_types = Vec::new();
1106
1107 for (i, _) in input_columns.iter().enumerate() {
1110 projections.push(ProjectExpr::Column(i));
1111 output_types.push(LogicalType::Node);
1112 }
1113
1114 for (variable, property, _col_name) in &property_projections {
1116 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1117 Error::Internal(format!(
1118 "Variable '{}' not found for property projection",
1119 variable
1120 ))
1121 })?;
1122 projections.push(ProjectExpr::PropertyAccess {
1123 column: source_col,
1124 property: property.clone(),
1125 });
1126 output_types.push(LogicalType::Any); }
1128
1129 input_op = Box::new(ProjectOperator::with_store(
1130 input_op,
1131 projections,
1132 output_types,
1133 Arc::clone(&self.store),
1134 ));
1135 }
1136
1137 let group_columns: Vec<usize> = agg
1139 .group_by
1140 .iter()
1141 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1142 .collect::<Result<Vec<_>>>()?;
1143
1144 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1146 .aggregates
1147 .iter()
1148 .map(|agg_expr| {
1149 let column = agg_expr
1150 .expression
1151 .as_ref()
1152 .map(|e| {
1153 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1154 })
1155 .transpose()?;
1156
1157 Ok(PhysicalAggregateExpr {
1158 function: convert_aggregate_function(agg_expr.function),
1159 column,
1160 distinct: agg_expr.distinct,
1161 alias: agg_expr.alias.clone(),
1162 percentile: agg_expr.percentile,
1163 })
1164 })
1165 .collect::<Result<Vec<_>>>()?;
1166
1167 let mut output_schema = Vec::new();
1169 let mut output_columns = Vec::new();
1170
1171 for expr in &agg.group_by {
1173 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1175 }
1176
1177 for agg_expr in &agg.aggregates {
1179 let result_type = match agg_expr.function {
1180 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1181 LogicalType::Int64
1182 }
1183 LogicalAggregateFunction::Sum => LogicalType::Int64,
1184 LogicalAggregateFunction::Avg => LogicalType::Float64,
1185 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1186 LogicalType::Int64
1190 }
1191 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1194 | LogicalAggregateFunction::StdDevPop
1195 | LogicalAggregateFunction::PercentileDisc
1196 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1197 };
1198 output_schema.push(result_type);
1199 output_columns.push(
1200 agg_expr
1201 .alias
1202 .clone()
1203 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1204 );
1205 }
1206
1207 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1209 Box::new(SimpleAggregateOperator::new(
1210 input_op,
1211 physical_aggregates,
1212 output_schema,
1213 ))
1214 } else {
1215 Box::new(HashAggregateOperator::new(
1216 input_op,
1217 group_columns,
1218 physical_aggregates,
1219 output_schema,
1220 ))
1221 };
1222
1223 if let Some(having_expr) = &agg.having {
1225 let having_var_columns: HashMap<String, usize> = output_columns
1227 .iter()
1228 .enumerate()
1229 .map(|(i, name)| (name.clone(), i))
1230 .collect();
1231
1232 let filter_expr = self.convert_expression(having_expr)?;
1233 let predicate =
1234 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1235 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1236 }
1237
1238 Ok((operator, output_columns))
1239 }
1240
1241 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1247 agg.aggregates.iter().all(|agg_expr| {
1248 match agg_expr.function {
1249 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1250 agg_expr.expression.is_none()
1252 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1253 }
1254 LogicalAggregateFunction::Sum
1255 | LogicalAggregateFunction::Avg
1256 | LogicalAggregateFunction::Min
1257 | LogicalAggregateFunction::Max => {
1258 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1261 }
1262 _ => false,
1264 }
1265 })
1266 }
1267
1268 fn plan_factorized_aggregate(
1272 &self,
1273 agg: &AggregateOp,
1274 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1275 let expands = Self::collect_expand_chain(&agg.input);
1277 if expands.is_empty() {
1278 return Err(Error::Internal(
1279 "Expected expand chain for factorized aggregate".to_string(),
1280 ));
1281 }
1282
1283 let first_expand = expands[0];
1285 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1286
1287 let mut columns = base_columns.clone();
1288 let mut steps = Vec::new();
1289 let mut is_first = true;
1290
1291 for expand in &expands {
1292 let source_column = if is_first {
1294 base_columns
1295 .iter()
1296 .position(|c| c == &expand.from_variable)
1297 .ok_or_else(|| {
1298 Error::Internal(format!(
1299 "Source variable '{}' not found in base columns",
1300 expand.from_variable
1301 ))
1302 })?
1303 } else {
1304 1 };
1306
1307 let direction = match expand.direction {
1308 ExpandDirection::Outgoing => Direction::Outgoing,
1309 ExpandDirection::Incoming => Direction::Incoming,
1310 ExpandDirection::Both => Direction::Both,
1311 };
1312
1313 steps.push(ExpandStep {
1314 source_column,
1315 direction,
1316 edge_type: expand.edge_type.clone(),
1317 });
1318
1319 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1320 let count = self.anon_edge_counter.get();
1321 self.anon_edge_counter.set(count + 1);
1322 format!("_anon_edge_{}", count)
1323 });
1324 columns.push(edge_col_name);
1325 columns.push(expand.to_variable.clone());
1326
1327 is_first = false;
1328 }
1329
1330 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1332
1333 if let Some(tx_id) = self.tx_id {
1334 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1335 } else {
1336 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1337 }
1338
1339 let factorized_aggs: Vec<FactorizedAggregate> = agg
1341 .aggregates
1342 .iter()
1343 .map(|agg_expr| {
1344 match agg_expr.function {
1345 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1346 if agg_expr.expression.is_none() {
1348 FactorizedAggregate::count()
1349 } else {
1350 FactorizedAggregate::count_column(1) }
1354 }
1355 LogicalAggregateFunction::Sum => {
1356 FactorizedAggregate::sum(1)
1358 }
1359 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1360 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1361 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1362 _ => {
1363 FactorizedAggregate::count()
1365 }
1366 }
1367 })
1368 .collect();
1369
1370 let output_columns: Vec<String> = agg
1372 .aggregates
1373 .iter()
1374 .map(|agg_expr| {
1375 agg_expr
1376 .alias
1377 .clone()
1378 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1379 })
1380 .collect();
1381
1382 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1384
1385 Ok((Box::new(factorized_agg_op), output_columns))
1386 }
1387
1388 #[allow(dead_code)]
1390 fn resolve_expression_to_column(
1391 &self,
1392 expr: &LogicalExpression,
1393 variable_columns: &HashMap<String, usize>,
1394 ) -> Result<usize> {
1395 match expr {
1396 LogicalExpression::Variable(name) => variable_columns
1397 .get(name)
1398 .copied()
1399 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1400 LogicalExpression::Property { variable, .. } => variable_columns
1401 .get(variable)
1402 .copied()
1403 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1404 _ => Err(Error::Internal(format!(
1405 "Cannot resolve expression to column: {:?}",
1406 expr
1407 ))),
1408 }
1409 }
1410
1411 fn resolve_expression_to_column_with_properties(
1415 &self,
1416 expr: &LogicalExpression,
1417 variable_columns: &HashMap<String, usize>,
1418 ) -> Result<usize> {
1419 match expr {
1420 LogicalExpression::Variable(name) => variable_columns
1421 .get(name)
1422 .copied()
1423 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1424 LogicalExpression::Property { variable, property } => {
1425 let col_name = format!("{}_{}", variable, property);
1427 variable_columns.get(&col_name).copied().ok_or_else(|| {
1428 Error::Internal(format!(
1429 "Property column '{}' not found (from {}.{})",
1430 col_name, variable, property
1431 ))
1432 })
1433 }
1434 _ => Err(Error::Internal(format!(
1435 "Cannot resolve expression to column: {:?}",
1436 expr
1437 ))),
1438 }
1439 }
1440
1441 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1443 match expr {
1444 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1445 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1446 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1447 variable: variable.clone(),
1448 property: property.clone(),
1449 }),
1450 LogicalExpression::Binary { left, op, right } => {
1451 let left_expr = self.convert_expression(left)?;
1452 let right_expr = self.convert_expression(right)?;
1453 let filter_op = convert_binary_op(*op)?;
1454 Ok(FilterExpression::Binary {
1455 left: Box::new(left_expr),
1456 op: filter_op,
1457 right: Box::new(right_expr),
1458 })
1459 }
1460 LogicalExpression::Unary { op, operand } => {
1461 let operand_expr = self.convert_expression(operand)?;
1462 let filter_op = convert_unary_op(*op)?;
1463 Ok(FilterExpression::Unary {
1464 op: filter_op,
1465 operand: Box::new(operand_expr),
1466 })
1467 }
1468 LogicalExpression::FunctionCall { name, args, .. } => {
1469 let filter_args: Vec<FilterExpression> = args
1470 .iter()
1471 .map(|a| self.convert_expression(a))
1472 .collect::<Result<Vec<_>>>()?;
1473 Ok(FilterExpression::FunctionCall {
1474 name: name.clone(),
1475 args: filter_args,
1476 })
1477 }
1478 LogicalExpression::Case {
1479 operand,
1480 when_clauses,
1481 else_clause,
1482 } => {
1483 let filter_operand = operand
1484 .as_ref()
1485 .map(|e| self.convert_expression(e))
1486 .transpose()?
1487 .map(Box::new);
1488 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1489 .iter()
1490 .map(|(cond, result)| {
1491 Ok((
1492 self.convert_expression(cond)?,
1493 self.convert_expression(result)?,
1494 ))
1495 })
1496 .collect::<Result<Vec<_>>>()?;
1497 let filter_else = else_clause
1498 .as_ref()
1499 .map(|e| self.convert_expression(e))
1500 .transpose()?
1501 .map(Box::new);
1502 Ok(FilterExpression::Case {
1503 operand: filter_operand,
1504 when_clauses: filter_when_clauses,
1505 else_clause: filter_else,
1506 })
1507 }
1508 LogicalExpression::List(items) => {
1509 let filter_items: Vec<FilterExpression> = items
1510 .iter()
1511 .map(|item| self.convert_expression(item))
1512 .collect::<Result<Vec<_>>>()?;
1513 Ok(FilterExpression::List(filter_items))
1514 }
1515 LogicalExpression::Map(pairs) => {
1516 let filter_pairs: Vec<(String, FilterExpression)> = pairs
1517 .iter()
1518 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1519 .collect::<Result<Vec<_>>>()?;
1520 Ok(FilterExpression::Map(filter_pairs))
1521 }
1522 LogicalExpression::IndexAccess { base, index } => {
1523 let base_expr = self.convert_expression(base)?;
1524 let index_expr = self.convert_expression(index)?;
1525 Ok(FilterExpression::IndexAccess {
1526 base: Box::new(base_expr),
1527 index: Box::new(index_expr),
1528 })
1529 }
1530 LogicalExpression::SliceAccess { base, start, end } => {
1531 let base_expr = self.convert_expression(base)?;
1532 let start_expr = start
1533 .as_ref()
1534 .map(|s| self.convert_expression(s))
1535 .transpose()?
1536 .map(Box::new);
1537 let end_expr = end
1538 .as_ref()
1539 .map(|e| self.convert_expression(e))
1540 .transpose()?
1541 .map(Box::new);
1542 Ok(FilterExpression::SliceAccess {
1543 base: Box::new(base_expr),
1544 start: start_expr,
1545 end: end_expr,
1546 })
1547 }
1548 LogicalExpression::Parameter(_) => Err(Error::Internal(
1549 "Parameters not yet supported in filters".to_string(),
1550 )),
1551 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1552 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1553 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1554 LogicalExpression::ListComprehension {
1555 variable,
1556 list_expr,
1557 filter_expr,
1558 map_expr,
1559 } => {
1560 let list = self.convert_expression(list_expr)?;
1561 let filter = filter_expr
1562 .as_ref()
1563 .map(|f| self.convert_expression(f))
1564 .transpose()?
1565 .map(Box::new);
1566 let map = self.convert_expression(map_expr)?;
1567 Ok(FilterExpression::ListComprehension {
1568 variable: variable.clone(),
1569 list_expr: Box::new(list),
1570 filter_expr: filter,
1571 map_expr: Box::new(map),
1572 })
1573 }
1574 LogicalExpression::ExistsSubquery(subplan) => {
1575 let (start_var, direction, edge_type, end_labels) =
1578 self.extract_exists_pattern(subplan)?;
1579
1580 Ok(FilterExpression::ExistsSubquery {
1581 start_var,
1582 direction,
1583 edge_type,
1584 end_labels,
1585 min_hops: None,
1586 max_hops: None,
1587 })
1588 }
1589 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
1590 "COUNT subqueries not yet supported".to_string(),
1591 )),
1592 }
1593 }
1594
1595 fn extract_exists_pattern(
1598 &self,
1599 subplan: &LogicalOperator,
1600 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
1601 match subplan {
1602 LogicalOperator::Expand(expand) => {
1603 let end_labels = self.extract_end_labels_from_expand(expand);
1605 let direction = match expand.direction {
1606 ExpandDirection::Outgoing => Direction::Outgoing,
1607 ExpandDirection::Incoming => Direction::Incoming,
1608 ExpandDirection::Both => Direction::Both,
1609 };
1610 Ok((
1611 expand.from_variable.clone(),
1612 direction,
1613 expand.edge_type.clone(),
1614 end_labels,
1615 ))
1616 }
1617 LogicalOperator::NodeScan(scan) => {
1618 if let Some(input) = &scan.input {
1619 self.extract_exists_pattern(input)
1620 } else {
1621 Err(Error::Internal(
1622 "EXISTS subquery must contain an edge pattern".to_string(),
1623 ))
1624 }
1625 }
1626 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
1627 _ => Err(Error::Internal(
1628 "Unsupported EXISTS subquery pattern".to_string(),
1629 )),
1630 }
1631 }
1632
1633 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
1635 match expand.input.as_ref() {
1637 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
1638 _ => None,
1639 }
1640 }
1641
1642 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1644 let (left_op, left_columns) = self.plan_operator(&join.left)?;
1645 let (right_op, right_columns) = self.plan_operator(&join.right)?;
1646
1647 let mut columns = left_columns.clone();
1649 columns.extend(right_columns.clone());
1650
1651 let physical_join_type = match join.join_type {
1653 JoinType::Inner => PhysicalJoinType::Inner,
1654 JoinType::Left => PhysicalJoinType::Left,
1655 JoinType::Right => PhysicalJoinType::Right,
1656 JoinType::Full => PhysicalJoinType::Full,
1657 JoinType::Cross => PhysicalJoinType::Cross,
1658 JoinType::Semi => PhysicalJoinType::Semi,
1659 JoinType::Anti => PhysicalJoinType::Anti,
1660 };
1661
1662 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
1664 (vec![], vec![])
1666 } else {
1667 join.conditions
1668 .iter()
1669 .filter_map(|cond| {
1670 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
1672 let right_idx = self
1673 .expression_to_column(&cond.right, &right_columns)
1674 .ok()?;
1675 Some((left_idx, right_idx))
1676 })
1677 .unzip()
1678 };
1679
1680 let output_schema = self.derive_schema_from_columns(&columns);
1681
1682 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1683 left_op,
1684 right_op,
1685 probe_keys,
1686 build_keys,
1687 physical_join_type,
1688 output_schema,
1689 ));
1690
1691 Ok((operator, columns))
1692 }
1693
1694 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
1696 match expr {
1697 LogicalExpression::Variable(name) => columns
1698 .iter()
1699 .position(|c| c == name)
1700 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1701 _ => Err(Error::Internal(
1702 "Only variables supported in join conditions".to_string(),
1703 )),
1704 }
1705 }
1706
1707 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1709 if union.inputs.is_empty() {
1710 return Err(Error::Internal(
1711 "Union requires at least one input".to_string(),
1712 ));
1713 }
1714
1715 let mut inputs = Vec::with_capacity(union.inputs.len());
1716 let mut columns = Vec::new();
1717
1718 for (i, input) in union.inputs.iter().enumerate() {
1719 let (op, cols) = self.plan_operator(input)?;
1720 if i == 0 {
1721 columns = cols;
1722 }
1723 inputs.push(op);
1724 }
1725
1726 let output_schema = self.derive_schema_from_columns(&columns);
1727 let operator = Box::new(UnionOperator::new(inputs, output_schema));
1728
1729 Ok((operator, columns))
1730 }
1731
1732 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1734 let (input_op, columns) = self.plan_operator(&distinct.input)?;
1735 let output_schema = self.derive_schema_from_columns(&columns);
1736 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
1737 Ok((operator, columns))
1738 }
1739
1740 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1742 let (input_op, mut columns) = if let Some(ref input) = create.input {
1744 let (op, cols) = self.plan_operator(input)?;
1745 (Some(op), cols)
1746 } else {
1747 (None, vec![])
1748 };
1749
1750 let output_column = columns.len();
1752 columns.push(create.variable.clone());
1753
1754 let properties: Vec<(String, PropertySource)> = create
1756 .properties
1757 .iter()
1758 .map(|(name, expr)| {
1759 let source = match expr {
1760 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1761 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1762 };
1763 (name.clone(), source)
1764 })
1765 .collect();
1766
1767 let output_schema = self.derive_schema_from_columns(&columns);
1768
1769 let operator = Box::new(
1770 CreateNodeOperator::new(
1771 Arc::clone(&self.store),
1772 input_op,
1773 create.labels.clone(),
1774 properties,
1775 output_schema,
1776 output_column,
1777 )
1778 .with_tx_context(self.viewing_epoch, self.tx_id),
1779 );
1780
1781 Ok((operator, columns))
1782 }
1783
1784 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1786 let (input_op, mut columns) = self.plan_operator(&create.input)?;
1787
1788 let from_column = columns
1790 .iter()
1791 .position(|c| c == &create.from_variable)
1792 .ok_or_else(|| {
1793 Error::Internal(format!(
1794 "Source variable '{}' not found",
1795 create.from_variable
1796 ))
1797 })?;
1798
1799 let to_column = columns
1800 .iter()
1801 .position(|c| c == &create.to_variable)
1802 .ok_or_else(|| {
1803 Error::Internal(format!(
1804 "Target variable '{}' not found",
1805 create.to_variable
1806 ))
1807 })?;
1808
1809 let output_column = create.variable.as_ref().map(|v| {
1811 let idx = columns.len();
1812 columns.push(v.clone());
1813 idx
1814 });
1815
1816 let properties: Vec<(String, PropertySource)> = create
1818 .properties
1819 .iter()
1820 .map(|(name, expr)| {
1821 let source = match expr {
1822 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1823 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1824 };
1825 (name.clone(), source)
1826 })
1827 .collect();
1828
1829 let output_schema = self.derive_schema_from_columns(&columns);
1830
1831 let operator = Box::new(
1832 CreateEdgeOperator::new(
1833 Arc::clone(&self.store),
1834 input_op,
1835 from_column,
1836 to_column,
1837 create.edge_type.clone(),
1838 properties,
1839 output_schema,
1840 output_column,
1841 )
1842 .with_tx_context(self.viewing_epoch, self.tx_id),
1843 );
1844
1845 Ok((operator, columns))
1846 }
1847
1848 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1850 let (input_op, columns) = self.plan_operator(&delete.input)?;
1851
1852 let node_column = columns
1853 .iter()
1854 .position(|c| c == &delete.variable)
1855 .ok_or_else(|| {
1856 Error::Internal(format!(
1857 "Variable '{}' not found for delete",
1858 delete.variable
1859 ))
1860 })?;
1861
1862 let output_schema = vec![LogicalType::Int64];
1864 let output_columns = vec!["deleted_count".to_string()];
1865
1866 let operator = Box::new(
1867 DeleteNodeOperator::new(
1868 Arc::clone(&self.store),
1869 input_op,
1870 node_column,
1871 output_schema,
1872 delete.detach, )
1874 .with_tx_context(self.viewing_epoch, self.tx_id),
1875 );
1876
1877 Ok((operator, output_columns))
1878 }
1879
1880 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1882 let (input_op, columns) = self.plan_operator(&delete.input)?;
1883
1884 let edge_column = columns
1885 .iter()
1886 .position(|c| c == &delete.variable)
1887 .ok_or_else(|| {
1888 Error::Internal(format!(
1889 "Variable '{}' not found for delete",
1890 delete.variable
1891 ))
1892 })?;
1893
1894 let output_schema = vec![LogicalType::Int64];
1896 let output_columns = vec!["deleted_count".to_string()];
1897
1898 let operator = Box::new(
1899 DeleteEdgeOperator::new(
1900 Arc::clone(&self.store),
1901 input_op,
1902 edge_column,
1903 output_schema,
1904 )
1905 .with_tx_context(self.viewing_epoch, self.tx_id),
1906 );
1907
1908 Ok((operator, output_columns))
1909 }
1910
1911 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1913 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
1914 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
1915
1916 let mut columns = left_columns.clone();
1918 columns.extend(right_columns.clone());
1919
1920 let mut probe_keys = Vec::new();
1922 let mut build_keys = Vec::new();
1923
1924 for (right_idx, right_col) in right_columns.iter().enumerate() {
1925 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1926 probe_keys.push(left_idx);
1927 build_keys.push(right_idx);
1928 }
1929 }
1930
1931 let output_schema = self.derive_schema_from_columns(&columns);
1932
1933 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1934 left_op,
1935 right_op,
1936 probe_keys,
1937 build_keys,
1938 PhysicalJoinType::Left,
1939 output_schema,
1940 ));
1941
1942 Ok((operator, columns))
1943 }
1944
1945 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1947 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
1948 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
1949
1950 let columns = left_columns.clone();
1952
1953 let mut probe_keys = Vec::new();
1955 let mut build_keys = Vec::new();
1956
1957 for (right_idx, right_col) in right_columns.iter().enumerate() {
1958 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1959 probe_keys.push(left_idx);
1960 build_keys.push(right_idx);
1961 }
1962 }
1963
1964 let output_schema = self.derive_schema_from_columns(&columns);
1965
1966 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1967 left_op,
1968 right_op,
1969 probe_keys,
1970 build_keys,
1971 PhysicalJoinType::Anti,
1972 output_schema,
1973 ));
1974
1975 Ok((operator, columns))
1976 }
1977
1978 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1980 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
1983 if matches!(&*unwind.input, LogicalOperator::Empty) {
1984 let literal_list = self.convert_expression(&unwind.expression)?;
1989
1990 let single_row_op: Box<dyn Operator> = Box::new(
1992 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
1993 );
1994 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
1995 single_row_op,
1996 vec![ProjectExpr::Expression {
1997 expr: literal_list,
1998 variable_columns: HashMap::new(),
1999 }],
2000 vec![LogicalType::Any],
2001 Arc::clone(&self.store),
2002 ));
2003
2004 (project_op, vec!["__list__".to_string()])
2005 } else {
2006 self.plan_operator(&unwind.input)?
2007 };
2008
2009 let list_col_idx = match &unwind.expression {
2015 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2016 LogicalExpression::Property { variable, .. } => {
2017 input_columns.iter().position(|c| c == variable)
2020 }
2021 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2022 None
2024 }
2025 _ => None,
2026 };
2027
2028 let mut columns = input_columns.clone();
2030 columns.push(unwind.variable.clone());
2031
2032 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2034 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2039
2040 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2041 input_op,
2042 col_idx,
2043 unwind.variable.clone(),
2044 output_schema,
2045 ));
2046
2047 Ok((operator, columns))
2048 }
2049
2050 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2052 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2054 Vec::new()
2055 } else {
2056 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2057 cols
2058 };
2059
2060 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2062 .match_properties
2063 .iter()
2064 .filter_map(|(name, expr)| {
2065 if let LogicalExpression::Literal(v) = expr {
2066 Some((name.clone(), v.clone()))
2067 } else {
2068 None }
2070 })
2071 .collect();
2072
2073 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2075 .on_create
2076 .iter()
2077 .filter_map(|(name, expr)| {
2078 if let LogicalExpression::Literal(v) = expr {
2079 Some((name.clone(), v.clone()))
2080 } else {
2081 None
2082 }
2083 })
2084 .collect();
2085
2086 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2088 .on_match
2089 .iter()
2090 .filter_map(|(name, expr)| {
2091 if let LogicalExpression::Literal(v) = expr {
2092 Some((name.clone(), v.clone()))
2093 } else {
2094 None
2095 }
2096 })
2097 .collect();
2098
2099 columns.push(merge.variable.clone());
2101
2102 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2103 Arc::clone(&self.store),
2104 merge.variable.clone(),
2105 merge.labels.clone(),
2106 match_properties,
2107 on_create_properties,
2108 on_match_properties,
2109 ));
2110
2111 Ok((operator, columns))
2112 }
2113
2114 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2116 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2118
2119 let source_column = columns
2121 .iter()
2122 .position(|c| c == &sp.source_var)
2123 .ok_or_else(|| {
2124 Error::Internal(format!(
2125 "Source variable '{}' not found for shortestPath",
2126 sp.source_var
2127 ))
2128 })?;
2129
2130 let target_column = columns
2131 .iter()
2132 .position(|c| c == &sp.target_var)
2133 .ok_or_else(|| {
2134 Error::Internal(format!(
2135 "Target variable '{}' not found for shortestPath",
2136 sp.target_var
2137 ))
2138 })?;
2139
2140 let direction = match sp.direction {
2142 ExpandDirection::Outgoing => Direction::Outgoing,
2143 ExpandDirection::Incoming => Direction::Incoming,
2144 ExpandDirection::Both => Direction::Both,
2145 };
2146
2147 let operator: Box<dyn Operator> = Box::new(
2149 ShortestPathOperator::new(
2150 Arc::clone(&self.store),
2151 input_op,
2152 source_column,
2153 target_column,
2154 sp.edge_type.clone(),
2155 direction,
2156 )
2157 .with_all_paths(sp.all_paths),
2158 );
2159
2160 columns.push(format!("_path_length_{}", sp.path_alias));
2163
2164 Ok((operator, columns))
2165 }
2166
2167 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2169 let (input_op, columns) = self.plan_operator(&add_label.input)?;
2170
2171 let node_column = columns
2173 .iter()
2174 .position(|c| c == &add_label.variable)
2175 .ok_or_else(|| {
2176 Error::Internal(format!(
2177 "Variable '{}' not found for ADD LABEL",
2178 add_label.variable
2179 ))
2180 })?;
2181
2182 let output_schema = vec![LogicalType::Int64];
2184 let output_columns = vec!["labels_added".to_string()];
2185
2186 let operator = Box::new(AddLabelOperator::new(
2187 Arc::clone(&self.store),
2188 input_op,
2189 node_column,
2190 add_label.labels.clone(),
2191 output_schema,
2192 ));
2193
2194 Ok((operator, output_columns))
2195 }
2196
2197 fn plan_remove_label(
2199 &self,
2200 remove_label: &RemoveLabelOp,
2201 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2202 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2203
2204 let node_column = columns
2206 .iter()
2207 .position(|c| c == &remove_label.variable)
2208 .ok_or_else(|| {
2209 Error::Internal(format!(
2210 "Variable '{}' not found for REMOVE LABEL",
2211 remove_label.variable
2212 ))
2213 })?;
2214
2215 let output_schema = vec![LogicalType::Int64];
2217 let output_columns = vec!["labels_removed".to_string()];
2218
2219 let operator = Box::new(RemoveLabelOperator::new(
2220 Arc::clone(&self.store),
2221 input_op,
2222 node_column,
2223 remove_label.labels.clone(),
2224 output_schema,
2225 ));
2226
2227 Ok((operator, output_columns))
2228 }
2229
2230 fn plan_set_property(
2232 &self,
2233 set_prop: &SetPropertyOp,
2234 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2235 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2236
2237 let entity_column = columns
2239 .iter()
2240 .position(|c| c == &set_prop.variable)
2241 .ok_or_else(|| {
2242 Error::Internal(format!(
2243 "Variable '{}' not found for SET",
2244 set_prop.variable
2245 ))
2246 })?;
2247
2248 let properties: Vec<(String, PropertySource)> = set_prop
2250 .properties
2251 .iter()
2252 .map(|(name, expr)| {
2253 let source = self.expression_to_property_source(expr, &columns)?;
2254 Ok((name.clone(), source))
2255 })
2256 .collect::<Result<Vec<_>>>()?;
2257
2258 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2260 let output_columns = columns.clone();
2261
2262 let operator = Box::new(SetPropertyOperator::new_for_node(
2264 Arc::clone(&self.store),
2265 input_op,
2266 entity_column,
2267 properties,
2268 output_schema,
2269 ));
2270
2271 Ok((operator, output_columns))
2272 }
2273
2274 fn expression_to_property_source(
2276 &self,
2277 expr: &LogicalExpression,
2278 columns: &[String],
2279 ) -> Result<PropertySource> {
2280 match expr {
2281 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2282 LogicalExpression::Variable(name) => {
2283 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2284 Error::Internal(format!("Variable '{}' not found for property source", name))
2285 })?;
2286 Ok(PropertySource::Column(col_idx))
2287 }
2288 LogicalExpression::Parameter(name) => {
2289 Ok(PropertySource::Constant(
2292 grafeo_common::types::Value::String(format!("${}", name).into()),
2293 ))
2294 }
2295 _ => Err(Error::Internal(format!(
2296 "Unsupported expression type for property source: {:?}",
2297 expr
2298 ))),
2299 }
2300 }
2301}
2302
2303pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2305 match op {
2306 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2307 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2308 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2309 BinaryOp::Le => Ok(BinaryFilterOp::Le),
2310 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2311 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2312 BinaryOp::And => Ok(BinaryFilterOp::And),
2313 BinaryOp::Or => Ok(BinaryFilterOp::Or),
2314 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2315 BinaryOp::Add => Ok(BinaryFilterOp::Add),
2316 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2317 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2318 BinaryOp::Div => Ok(BinaryFilterOp::Div),
2319 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2320 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2321 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2322 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2323 BinaryOp::In => Ok(BinaryFilterOp::In),
2324 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2325 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2326 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2327 "Binary operator {:?} not yet supported in filters",
2328 op
2329 ))),
2330 }
2331}
2332
2333pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2335 match op {
2336 UnaryOp::Not => Ok(UnaryFilterOp::Not),
2337 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2338 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2339 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2340 }
2341}
2342
2343pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2345 match func {
2346 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2347 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2348 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2349 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2350 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2351 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2352 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2353 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2354 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2355 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2356 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2357 }
2358}
2359
2360pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2364 match expr {
2365 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2366 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2367 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2368 variable: variable.clone(),
2369 property: property.clone(),
2370 }),
2371 LogicalExpression::Binary { left, op, right } => {
2372 let left_expr = convert_filter_expression(left)?;
2373 let right_expr = convert_filter_expression(right)?;
2374 let filter_op = convert_binary_op(*op)?;
2375 Ok(FilterExpression::Binary {
2376 left: Box::new(left_expr),
2377 op: filter_op,
2378 right: Box::new(right_expr),
2379 })
2380 }
2381 LogicalExpression::Unary { op, operand } => {
2382 let operand_expr = convert_filter_expression(operand)?;
2383 let filter_op = convert_unary_op(*op)?;
2384 Ok(FilterExpression::Unary {
2385 op: filter_op,
2386 operand: Box::new(operand_expr),
2387 })
2388 }
2389 LogicalExpression::FunctionCall { name, args, .. } => {
2390 let filter_args: Vec<FilterExpression> = args
2391 .iter()
2392 .map(|a| convert_filter_expression(a))
2393 .collect::<Result<Vec<_>>>()?;
2394 Ok(FilterExpression::FunctionCall {
2395 name: name.clone(),
2396 args: filter_args,
2397 })
2398 }
2399 LogicalExpression::Case {
2400 operand,
2401 when_clauses,
2402 else_clause,
2403 } => {
2404 let filter_operand = operand
2405 .as_ref()
2406 .map(|e| convert_filter_expression(e))
2407 .transpose()?
2408 .map(Box::new);
2409 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2410 .iter()
2411 .map(|(cond, result)| {
2412 Ok((
2413 convert_filter_expression(cond)?,
2414 convert_filter_expression(result)?,
2415 ))
2416 })
2417 .collect::<Result<Vec<_>>>()?;
2418 let filter_else = else_clause
2419 .as_ref()
2420 .map(|e| convert_filter_expression(e))
2421 .transpose()?
2422 .map(Box::new);
2423 Ok(FilterExpression::Case {
2424 operand: filter_operand,
2425 when_clauses: filter_when_clauses,
2426 else_clause: filter_else,
2427 })
2428 }
2429 LogicalExpression::List(items) => {
2430 let filter_items: Vec<FilterExpression> = items
2431 .iter()
2432 .map(|item| convert_filter_expression(item))
2433 .collect::<Result<Vec<_>>>()?;
2434 Ok(FilterExpression::List(filter_items))
2435 }
2436 LogicalExpression::Map(pairs) => {
2437 let filter_pairs: Vec<(String, FilterExpression)> = pairs
2438 .iter()
2439 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
2440 .collect::<Result<Vec<_>>>()?;
2441 Ok(FilterExpression::Map(filter_pairs))
2442 }
2443 LogicalExpression::IndexAccess { base, index } => {
2444 let base_expr = convert_filter_expression(base)?;
2445 let index_expr = convert_filter_expression(index)?;
2446 Ok(FilterExpression::IndexAccess {
2447 base: Box::new(base_expr),
2448 index: Box::new(index_expr),
2449 })
2450 }
2451 LogicalExpression::SliceAccess { base, start, end } => {
2452 let base_expr = convert_filter_expression(base)?;
2453 let start_expr = start
2454 .as_ref()
2455 .map(|s| convert_filter_expression(s))
2456 .transpose()?
2457 .map(Box::new);
2458 let end_expr = end
2459 .as_ref()
2460 .map(|e| convert_filter_expression(e))
2461 .transpose()?
2462 .map(Box::new);
2463 Ok(FilterExpression::SliceAccess {
2464 base: Box::new(base_expr),
2465 start: start_expr,
2466 end: end_expr,
2467 })
2468 }
2469 LogicalExpression::Parameter(_) => Err(Error::Internal(
2470 "Parameters not yet supported in filters".to_string(),
2471 )),
2472 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2473 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2474 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2475 LogicalExpression::ListComprehension {
2476 variable,
2477 list_expr,
2478 filter_expr,
2479 map_expr,
2480 } => {
2481 let list = convert_filter_expression(list_expr)?;
2482 let filter = filter_expr
2483 .as_ref()
2484 .map(|f| convert_filter_expression(f))
2485 .transpose()?
2486 .map(Box::new);
2487 let map = convert_filter_expression(map_expr)?;
2488 Ok(FilterExpression::ListComprehension {
2489 variable: variable.clone(),
2490 list_expr: Box::new(list),
2491 filter_expr: filter,
2492 map_expr: Box::new(map),
2493 })
2494 }
2495 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
2496 Error::Internal("Subqueries not yet supported in filters".to_string()),
2497 ),
2498 }
2499}
2500
2501fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
2503 use grafeo_common::types::Value;
2504 match value {
2505 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
2507 Value::Int64(_) => LogicalType::Int64,
2508 Value::Float64(_) => LogicalType::Float64,
2509 Value::String(_) => LogicalType::String,
2510 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
2512 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, }
2515}
2516
2517fn expression_to_string(expr: &LogicalExpression) -> String {
2519 match expr {
2520 LogicalExpression::Variable(name) => name.clone(),
2521 LogicalExpression::Property { variable, property } => {
2522 format!("{variable}.{property}")
2523 }
2524 LogicalExpression::Literal(value) => format!("{value:?}"),
2525 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
2526 _ => "expr".to_string(),
2527 }
2528}
2529
2530pub struct PhysicalPlan {
2532 pub operator: Box<dyn Operator>,
2534 pub columns: Vec<String>,
2536 pub adaptive_context: Option<AdaptiveContext>,
2542}
2543
2544impl PhysicalPlan {
2545 #[must_use]
2547 pub fn columns(&self) -> &[String] {
2548 &self.columns
2549 }
2550
2551 pub fn into_operator(self) -> Box<dyn Operator> {
2553 self.operator
2554 }
2555
2556 #[must_use]
2558 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
2559 self.adaptive_context.as_ref()
2560 }
2561
2562 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
2564 self.adaptive_context.take()
2565 }
2566}
2567
2568#[allow(dead_code)]
2572struct SingleResultOperator {
2573 result: Option<grafeo_core::execution::DataChunk>,
2574}
2575
2576impl SingleResultOperator {
2577 #[allow(dead_code)]
2578 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
2579 Self { result }
2580 }
2581}
2582
2583impl Operator for SingleResultOperator {
2584 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
2585 Ok(self.result.take())
2586 }
2587
2588 fn reset(&mut self) {
2589 }
2591
2592 fn name(&self) -> &'static str {
2593 "SingleResult"
2594 }
2595}
2596
2597#[cfg(test)]
2598mod tests {
2599 use super::*;
2600 use crate::query::plan::{
2601 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
2602 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
2603 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
2604 SortKey, SortOp,
2605 };
2606 use grafeo_common::types::Value;
2607
2608 fn create_test_store() -> Arc<LpgStore> {
2609 let store = Arc::new(LpgStore::new());
2610 store.create_node(&["Person"]);
2611 store.create_node(&["Person"]);
2612 store.create_node(&["Company"]);
2613 store
2614 }
2615
2616 #[test]
2619 fn test_plan_simple_scan() {
2620 let store = create_test_store();
2621 let planner = Planner::new(store);
2622
2623 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2625 items: vec![ReturnItem {
2626 expression: LogicalExpression::Variable("n".to_string()),
2627 alias: None,
2628 }],
2629 distinct: false,
2630 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2631 variable: "n".to_string(),
2632 label: Some("Person".to_string()),
2633 input: None,
2634 })),
2635 }));
2636
2637 let physical = planner.plan(&logical).unwrap();
2638 assert_eq!(physical.columns(), &["n"]);
2639 }
2640
2641 #[test]
2642 fn test_plan_scan_without_label() {
2643 let store = create_test_store();
2644 let planner = Planner::new(store);
2645
2646 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2648 items: vec![ReturnItem {
2649 expression: LogicalExpression::Variable("n".to_string()),
2650 alias: None,
2651 }],
2652 distinct: false,
2653 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2654 variable: "n".to_string(),
2655 label: None,
2656 input: None,
2657 })),
2658 }));
2659
2660 let physical = planner.plan(&logical).unwrap();
2661 assert_eq!(physical.columns(), &["n"]);
2662 }
2663
2664 #[test]
2665 fn test_plan_return_with_alias() {
2666 let store = create_test_store();
2667 let planner = Planner::new(store);
2668
2669 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2671 items: vec![ReturnItem {
2672 expression: LogicalExpression::Variable("n".to_string()),
2673 alias: Some("person".to_string()),
2674 }],
2675 distinct: false,
2676 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2677 variable: "n".to_string(),
2678 label: Some("Person".to_string()),
2679 input: None,
2680 })),
2681 }));
2682
2683 let physical = planner.plan(&logical).unwrap();
2684 assert_eq!(physical.columns(), &["person"]);
2685 }
2686
2687 #[test]
2688 fn test_plan_return_property() {
2689 let store = create_test_store();
2690 let planner = Planner::new(store);
2691
2692 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2694 items: vec![ReturnItem {
2695 expression: LogicalExpression::Property {
2696 variable: "n".to_string(),
2697 property: "name".to_string(),
2698 },
2699 alias: None,
2700 }],
2701 distinct: false,
2702 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2703 variable: "n".to_string(),
2704 label: Some("Person".to_string()),
2705 input: None,
2706 })),
2707 }));
2708
2709 let physical = planner.plan(&logical).unwrap();
2710 assert_eq!(physical.columns(), &["n.name"]);
2711 }
2712
2713 #[test]
2714 fn test_plan_return_literal() {
2715 let store = create_test_store();
2716 let planner = Planner::new(store);
2717
2718 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2720 items: vec![ReturnItem {
2721 expression: LogicalExpression::Literal(Value::Int64(42)),
2722 alias: Some("answer".to_string()),
2723 }],
2724 distinct: false,
2725 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2726 variable: "n".to_string(),
2727 label: None,
2728 input: None,
2729 })),
2730 }));
2731
2732 let physical = planner.plan(&logical).unwrap();
2733 assert_eq!(physical.columns(), &["answer"]);
2734 }
2735
2736 #[test]
2739 fn test_plan_filter_equality() {
2740 let store = create_test_store();
2741 let planner = Planner::new(store);
2742
2743 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2745 items: vec![ReturnItem {
2746 expression: LogicalExpression::Variable("n".to_string()),
2747 alias: None,
2748 }],
2749 distinct: false,
2750 input: Box::new(LogicalOperator::Filter(FilterOp {
2751 predicate: LogicalExpression::Binary {
2752 left: Box::new(LogicalExpression::Property {
2753 variable: "n".to_string(),
2754 property: "age".to_string(),
2755 }),
2756 op: BinaryOp::Eq,
2757 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2758 },
2759 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2760 variable: "n".to_string(),
2761 label: Some("Person".to_string()),
2762 input: None,
2763 })),
2764 })),
2765 }));
2766
2767 let physical = planner.plan(&logical).unwrap();
2768 assert_eq!(physical.columns(), &["n"]);
2769 }
2770
2771 #[test]
2772 fn test_plan_filter_compound_and() {
2773 let store = create_test_store();
2774 let planner = Planner::new(store);
2775
2776 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2778 items: vec![ReturnItem {
2779 expression: LogicalExpression::Variable("n".to_string()),
2780 alias: None,
2781 }],
2782 distinct: false,
2783 input: Box::new(LogicalOperator::Filter(FilterOp {
2784 predicate: LogicalExpression::Binary {
2785 left: Box::new(LogicalExpression::Binary {
2786 left: Box::new(LogicalExpression::Property {
2787 variable: "n".to_string(),
2788 property: "age".to_string(),
2789 }),
2790 op: BinaryOp::Gt,
2791 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
2792 }),
2793 op: BinaryOp::And,
2794 right: Box::new(LogicalExpression::Binary {
2795 left: Box::new(LogicalExpression::Property {
2796 variable: "n".to_string(),
2797 property: "age".to_string(),
2798 }),
2799 op: BinaryOp::Lt,
2800 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
2801 }),
2802 },
2803 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2804 variable: "n".to_string(),
2805 label: None,
2806 input: None,
2807 })),
2808 })),
2809 }));
2810
2811 let physical = planner.plan(&logical).unwrap();
2812 assert_eq!(physical.columns(), &["n"]);
2813 }
2814
2815 #[test]
2816 fn test_plan_filter_unary_not() {
2817 let store = create_test_store();
2818 let planner = Planner::new(store);
2819
2820 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2822 items: vec![ReturnItem {
2823 expression: LogicalExpression::Variable("n".to_string()),
2824 alias: None,
2825 }],
2826 distinct: false,
2827 input: Box::new(LogicalOperator::Filter(FilterOp {
2828 predicate: LogicalExpression::Unary {
2829 op: UnaryOp::Not,
2830 operand: Box::new(LogicalExpression::Property {
2831 variable: "n".to_string(),
2832 property: "active".to_string(),
2833 }),
2834 },
2835 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2836 variable: "n".to_string(),
2837 label: None,
2838 input: None,
2839 })),
2840 })),
2841 }));
2842
2843 let physical = planner.plan(&logical).unwrap();
2844 assert_eq!(physical.columns(), &["n"]);
2845 }
2846
2847 #[test]
2848 fn test_plan_filter_is_null() {
2849 let store = create_test_store();
2850 let planner = Planner::new(store);
2851
2852 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2854 items: vec![ReturnItem {
2855 expression: LogicalExpression::Variable("n".to_string()),
2856 alias: None,
2857 }],
2858 distinct: false,
2859 input: Box::new(LogicalOperator::Filter(FilterOp {
2860 predicate: LogicalExpression::Unary {
2861 op: UnaryOp::IsNull,
2862 operand: Box::new(LogicalExpression::Property {
2863 variable: "n".to_string(),
2864 property: "email".to_string(),
2865 }),
2866 },
2867 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2868 variable: "n".to_string(),
2869 label: None,
2870 input: None,
2871 })),
2872 })),
2873 }));
2874
2875 let physical = planner.plan(&logical).unwrap();
2876 assert_eq!(physical.columns(), &["n"]);
2877 }
2878
2879 #[test]
2880 fn test_plan_filter_function_call() {
2881 let store = create_test_store();
2882 let planner = Planner::new(store);
2883
2884 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2886 items: vec![ReturnItem {
2887 expression: LogicalExpression::Variable("n".to_string()),
2888 alias: None,
2889 }],
2890 distinct: false,
2891 input: Box::new(LogicalOperator::Filter(FilterOp {
2892 predicate: LogicalExpression::Binary {
2893 left: Box::new(LogicalExpression::FunctionCall {
2894 name: "size".to_string(),
2895 args: vec![LogicalExpression::Property {
2896 variable: "n".to_string(),
2897 property: "friends".to_string(),
2898 }],
2899 distinct: false,
2900 }),
2901 op: BinaryOp::Gt,
2902 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
2903 },
2904 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2905 variable: "n".to_string(),
2906 label: None,
2907 input: None,
2908 })),
2909 })),
2910 }));
2911
2912 let physical = planner.plan(&logical).unwrap();
2913 assert_eq!(physical.columns(), &["n"]);
2914 }
2915
2916 #[test]
2919 fn test_plan_expand_outgoing() {
2920 let store = create_test_store();
2921 let planner = Planner::new(store);
2922
2923 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2925 items: vec![
2926 ReturnItem {
2927 expression: LogicalExpression::Variable("a".to_string()),
2928 alias: None,
2929 },
2930 ReturnItem {
2931 expression: LogicalExpression::Variable("b".to_string()),
2932 alias: None,
2933 },
2934 ],
2935 distinct: false,
2936 input: Box::new(LogicalOperator::Expand(ExpandOp {
2937 from_variable: "a".to_string(),
2938 to_variable: "b".to_string(),
2939 edge_variable: None,
2940 direction: ExpandDirection::Outgoing,
2941 edge_type: Some("KNOWS".to_string()),
2942 min_hops: 1,
2943 max_hops: Some(1),
2944 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2945 variable: "a".to_string(),
2946 label: Some("Person".to_string()),
2947 input: None,
2948 })),
2949 path_alias: None,
2950 })),
2951 }));
2952
2953 let physical = planner.plan(&logical).unwrap();
2954 assert!(physical.columns().contains(&"a".to_string()));
2956 assert!(physical.columns().contains(&"b".to_string()));
2957 }
2958
2959 #[test]
2960 fn test_plan_expand_with_edge_variable() {
2961 let store = create_test_store();
2962 let planner = Planner::new(store);
2963
2964 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2966 items: vec![
2967 ReturnItem {
2968 expression: LogicalExpression::Variable("a".to_string()),
2969 alias: None,
2970 },
2971 ReturnItem {
2972 expression: LogicalExpression::Variable("r".to_string()),
2973 alias: None,
2974 },
2975 ReturnItem {
2976 expression: LogicalExpression::Variable("b".to_string()),
2977 alias: None,
2978 },
2979 ],
2980 distinct: false,
2981 input: Box::new(LogicalOperator::Expand(ExpandOp {
2982 from_variable: "a".to_string(),
2983 to_variable: "b".to_string(),
2984 edge_variable: Some("r".to_string()),
2985 direction: ExpandDirection::Outgoing,
2986 edge_type: Some("KNOWS".to_string()),
2987 min_hops: 1,
2988 max_hops: Some(1),
2989 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2990 variable: "a".to_string(),
2991 label: None,
2992 input: None,
2993 })),
2994 path_alias: None,
2995 })),
2996 }));
2997
2998 let physical = planner.plan(&logical).unwrap();
2999 assert!(physical.columns().contains(&"a".to_string()));
3000 assert!(physical.columns().contains(&"r".to_string()));
3001 assert!(physical.columns().contains(&"b".to_string()));
3002 }
3003
3004 #[test]
3007 fn test_plan_limit() {
3008 let store = create_test_store();
3009 let planner = Planner::new(store);
3010
3011 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3013 items: vec![ReturnItem {
3014 expression: LogicalExpression::Variable("n".to_string()),
3015 alias: None,
3016 }],
3017 distinct: false,
3018 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3019 count: 10,
3020 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3021 variable: "n".to_string(),
3022 label: None,
3023 input: None,
3024 })),
3025 })),
3026 }));
3027
3028 let physical = planner.plan(&logical).unwrap();
3029 assert_eq!(physical.columns(), &["n"]);
3030 }
3031
3032 #[test]
3033 fn test_plan_skip() {
3034 let store = create_test_store();
3035 let planner = Planner::new(store);
3036
3037 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3039 items: vec![ReturnItem {
3040 expression: LogicalExpression::Variable("n".to_string()),
3041 alias: None,
3042 }],
3043 distinct: false,
3044 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3045 count: 5,
3046 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3047 variable: "n".to_string(),
3048 label: None,
3049 input: None,
3050 })),
3051 })),
3052 }));
3053
3054 let physical = planner.plan(&logical).unwrap();
3055 assert_eq!(physical.columns(), &["n"]);
3056 }
3057
3058 #[test]
3059 fn test_plan_sort() {
3060 let store = create_test_store();
3061 let planner = Planner::new(store);
3062
3063 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3065 items: vec![ReturnItem {
3066 expression: LogicalExpression::Variable("n".to_string()),
3067 alias: None,
3068 }],
3069 distinct: false,
3070 input: Box::new(LogicalOperator::Sort(SortOp {
3071 keys: vec![SortKey {
3072 expression: LogicalExpression::Variable("n".to_string()),
3073 order: SortOrder::Ascending,
3074 }],
3075 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3076 variable: "n".to_string(),
3077 label: None,
3078 input: None,
3079 })),
3080 })),
3081 }));
3082
3083 let physical = planner.plan(&logical).unwrap();
3084 assert_eq!(physical.columns(), &["n"]);
3085 }
3086
3087 #[test]
3088 fn test_plan_sort_descending() {
3089 let store = create_test_store();
3090 let planner = Planner::new(store);
3091
3092 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3094 items: vec![ReturnItem {
3095 expression: LogicalExpression::Variable("n".to_string()),
3096 alias: None,
3097 }],
3098 distinct: false,
3099 input: Box::new(LogicalOperator::Sort(SortOp {
3100 keys: vec![SortKey {
3101 expression: LogicalExpression::Variable("n".to_string()),
3102 order: SortOrder::Descending,
3103 }],
3104 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3105 variable: "n".to_string(),
3106 label: None,
3107 input: None,
3108 })),
3109 })),
3110 }));
3111
3112 let physical = planner.plan(&logical).unwrap();
3113 assert_eq!(physical.columns(), &["n"]);
3114 }
3115
3116 #[test]
3117 fn test_plan_distinct() {
3118 let store = create_test_store();
3119 let planner = Planner::new(store);
3120
3121 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3123 items: vec![ReturnItem {
3124 expression: LogicalExpression::Variable("n".to_string()),
3125 alias: None,
3126 }],
3127 distinct: false,
3128 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3129 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3130 variable: "n".to_string(),
3131 label: None,
3132 input: None,
3133 })),
3134 columns: None,
3135 })),
3136 }));
3137
3138 let physical = planner.plan(&logical).unwrap();
3139 assert_eq!(physical.columns(), &["n"]);
3140 }
3141
3142 #[test]
3145 fn test_plan_aggregate_count() {
3146 let store = create_test_store();
3147 let planner = Planner::new(store);
3148
3149 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3151 items: vec![ReturnItem {
3152 expression: LogicalExpression::Variable("cnt".to_string()),
3153 alias: None,
3154 }],
3155 distinct: false,
3156 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3157 group_by: vec![],
3158 aggregates: vec![LogicalAggregateExpr {
3159 function: LogicalAggregateFunction::Count,
3160 expression: Some(LogicalExpression::Variable("n".to_string())),
3161 distinct: false,
3162 alias: Some("cnt".to_string()),
3163 percentile: None,
3164 }],
3165 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3166 variable: "n".to_string(),
3167 label: None,
3168 input: None,
3169 })),
3170 having: None,
3171 })),
3172 }));
3173
3174 let physical = planner.plan(&logical).unwrap();
3175 assert!(physical.columns().contains(&"cnt".to_string()));
3176 }
3177
3178 #[test]
3179 fn test_plan_aggregate_with_group_by() {
3180 let store = create_test_store();
3181 let planner = Planner::new(store);
3182
3183 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3185 group_by: vec![LogicalExpression::Property {
3186 variable: "n".to_string(),
3187 property: "city".to_string(),
3188 }],
3189 aggregates: vec![LogicalAggregateExpr {
3190 function: LogicalAggregateFunction::Count,
3191 expression: Some(LogicalExpression::Variable("n".to_string())),
3192 distinct: false,
3193 alias: Some("cnt".to_string()),
3194 percentile: None,
3195 }],
3196 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3197 variable: "n".to_string(),
3198 label: Some("Person".to_string()),
3199 input: None,
3200 })),
3201 having: None,
3202 }));
3203
3204 let physical = planner.plan(&logical).unwrap();
3205 assert_eq!(physical.columns().len(), 2);
3206 }
3207
3208 #[test]
3209 fn test_plan_aggregate_sum() {
3210 let store = create_test_store();
3211 let planner = Planner::new(store);
3212
3213 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3215 group_by: vec![],
3216 aggregates: vec![LogicalAggregateExpr {
3217 function: LogicalAggregateFunction::Sum,
3218 expression: Some(LogicalExpression::Property {
3219 variable: "n".to_string(),
3220 property: "value".to_string(),
3221 }),
3222 distinct: false,
3223 alias: Some("total".to_string()),
3224 percentile: None,
3225 }],
3226 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3227 variable: "n".to_string(),
3228 label: None,
3229 input: None,
3230 })),
3231 having: None,
3232 }));
3233
3234 let physical = planner.plan(&logical).unwrap();
3235 assert!(physical.columns().contains(&"total".to_string()));
3236 }
3237
3238 #[test]
3239 fn test_plan_aggregate_avg() {
3240 let store = create_test_store();
3241 let planner = Planner::new(store);
3242
3243 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3245 group_by: vec![],
3246 aggregates: vec![LogicalAggregateExpr {
3247 function: LogicalAggregateFunction::Avg,
3248 expression: Some(LogicalExpression::Property {
3249 variable: "n".to_string(),
3250 property: "score".to_string(),
3251 }),
3252 distinct: false,
3253 alias: Some("average".to_string()),
3254 percentile: None,
3255 }],
3256 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3257 variable: "n".to_string(),
3258 label: None,
3259 input: None,
3260 })),
3261 having: None,
3262 }));
3263
3264 let physical = planner.plan(&logical).unwrap();
3265 assert!(physical.columns().contains(&"average".to_string()));
3266 }
3267
3268 #[test]
3269 fn test_plan_aggregate_min_max() {
3270 let store = create_test_store();
3271 let planner = Planner::new(store);
3272
3273 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3275 group_by: vec![],
3276 aggregates: vec![
3277 LogicalAggregateExpr {
3278 function: LogicalAggregateFunction::Min,
3279 expression: Some(LogicalExpression::Property {
3280 variable: "n".to_string(),
3281 property: "age".to_string(),
3282 }),
3283 distinct: false,
3284 alias: Some("youngest".to_string()),
3285 percentile: None,
3286 },
3287 LogicalAggregateExpr {
3288 function: LogicalAggregateFunction::Max,
3289 expression: Some(LogicalExpression::Property {
3290 variable: "n".to_string(),
3291 property: "age".to_string(),
3292 }),
3293 distinct: false,
3294 alias: Some("oldest".to_string()),
3295 percentile: None,
3296 },
3297 ],
3298 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3299 variable: "n".to_string(),
3300 label: None,
3301 input: None,
3302 })),
3303 having: None,
3304 }));
3305
3306 let physical = planner.plan(&logical).unwrap();
3307 assert!(physical.columns().contains(&"youngest".to_string()));
3308 assert!(physical.columns().contains(&"oldest".to_string()));
3309 }
3310
3311 #[test]
3314 fn test_plan_inner_join() {
3315 let store = create_test_store();
3316 let planner = Planner::new(store);
3317
3318 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3320 items: vec![
3321 ReturnItem {
3322 expression: LogicalExpression::Variable("a".to_string()),
3323 alias: None,
3324 },
3325 ReturnItem {
3326 expression: LogicalExpression::Variable("b".to_string()),
3327 alias: None,
3328 },
3329 ],
3330 distinct: false,
3331 input: Box::new(LogicalOperator::Join(JoinOp {
3332 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3333 variable: "a".to_string(),
3334 label: Some("Person".to_string()),
3335 input: None,
3336 })),
3337 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3338 variable: "b".to_string(),
3339 label: Some("Company".to_string()),
3340 input: None,
3341 })),
3342 join_type: JoinType::Inner,
3343 conditions: vec![JoinCondition {
3344 left: LogicalExpression::Variable("a".to_string()),
3345 right: LogicalExpression::Variable("b".to_string()),
3346 }],
3347 })),
3348 }));
3349
3350 let physical = planner.plan(&logical).unwrap();
3351 assert!(physical.columns().contains(&"a".to_string()));
3352 assert!(physical.columns().contains(&"b".to_string()));
3353 }
3354
3355 #[test]
3356 fn test_plan_cross_join() {
3357 let store = create_test_store();
3358 let planner = Planner::new(store);
3359
3360 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3362 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3363 variable: "a".to_string(),
3364 label: None,
3365 input: None,
3366 })),
3367 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3368 variable: "b".to_string(),
3369 label: None,
3370 input: None,
3371 })),
3372 join_type: JoinType::Cross,
3373 conditions: vec![],
3374 }));
3375
3376 let physical = planner.plan(&logical).unwrap();
3377 assert_eq!(physical.columns().len(), 2);
3378 }
3379
3380 #[test]
3381 fn test_plan_left_join() {
3382 let store = create_test_store();
3383 let planner = Planner::new(store);
3384
3385 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3386 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3387 variable: "a".to_string(),
3388 label: None,
3389 input: None,
3390 })),
3391 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3392 variable: "b".to_string(),
3393 label: None,
3394 input: None,
3395 })),
3396 join_type: JoinType::Left,
3397 conditions: vec![],
3398 }));
3399
3400 let physical = planner.plan(&logical).unwrap();
3401 assert_eq!(physical.columns().len(), 2);
3402 }
3403
3404 #[test]
3407 fn test_plan_create_node() {
3408 let store = create_test_store();
3409 let planner = Planner::new(store);
3410
3411 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3413 variable: "n".to_string(),
3414 labels: vec!["Person".to_string()],
3415 properties: vec![(
3416 "name".to_string(),
3417 LogicalExpression::Literal(Value::String("Alice".into())),
3418 )],
3419 input: None,
3420 }));
3421
3422 let physical = planner.plan(&logical).unwrap();
3423 assert!(physical.columns().contains(&"n".to_string()));
3424 }
3425
3426 #[test]
3427 fn test_plan_create_edge() {
3428 let store = create_test_store();
3429 let planner = Planner::new(store);
3430
3431 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
3433 variable: Some("r".to_string()),
3434 from_variable: "a".to_string(),
3435 to_variable: "b".to_string(),
3436 edge_type: "KNOWS".to_string(),
3437 properties: vec![],
3438 input: Box::new(LogicalOperator::Join(JoinOp {
3439 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3440 variable: "a".to_string(),
3441 label: None,
3442 input: None,
3443 })),
3444 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3445 variable: "b".to_string(),
3446 label: None,
3447 input: None,
3448 })),
3449 join_type: JoinType::Cross,
3450 conditions: vec![],
3451 })),
3452 }));
3453
3454 let physical = planner.plan(&logical).unwrap();
3455 assert!(physical.columns().contains(&"r".to_string()));
3456 }
3457
3458 #[test]
3459 fn test_plan_delete_node() {
3460 let store = create_test_store();
3461 let planner = Planner::new(store);
3462
3463 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
3465 variable: "n".to_string(),
3466 detach: false,
3467 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3468 variable: "n".to_string(),
3469 label: None,
3470 input: None,
3471 })),
3472 }));
3473
3474 let physical = planner.plan(&logical).unwrap();
3475 assert!(physical.columns().contains(&"deleted_count".to_string()));
3476 }
3477
3478 #[test]
3481 fn test_plan_empty_errors() {
3482 let store = create_test_store();
3483 let planner = Planner::new(store);
3484
3485 let logical = LogicalPlan::new(LogicalOperator::Empty);
3486 let result = planner.plan(&logical);
3487 assert!(result.is_err());
3488 }
3489
3490 #[test]
3491 fn test_plan_missing_variable_in_return() {
3492 let store = create_test_store();
3493 let planner = Planner::new(store);
3494
3495 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3497 items: vec![ReturnItem {
3498 expression: LogicalExpression::Variable("missing".to_string()),
3499 alias: None,
3500 }],
3501 distinct: false,
3502 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3503 variable: "n".to_string(),
3504 label: None,
3505 input: None,
3506 })),
3507 }));
3508
3509 let result = planner.plan(&logical);
3510 assert!(result.is_err());
3511 }
3512
3513 #[test]
3516 fn test_convert_binary_ops() {
3517 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
3518 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
3519 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
3520 assert!(convert_binary_op(BinaryOp::Le).is_ok());
3521 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
3522 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
3523 assert!(convert_binary_op(BinaryOp::And).is_ok());
3524 assert!(convert_binary_op(BinaryOp::Or).is_ok());
3525 assert!(convert_binary_op(BinaryOp::Add).is_ok());
3526 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
3527 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
3528 assert!(convert_binary_op(BinaryOp::Div).is_ok());
3529 }
3530
3531 #[test]
3532 fn test_convert_unary_ops() {
3533 assert!(convert_unary_op(UnaryOp::Not).is_ok());
3534 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
3535 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
3536 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
3537 }
3538
3539 #[test]
3540 fn test_convert_aggregate_functions() {
3541 assert!(matches!(
3542 convert_aggregate_function(LogicalAggregateFunction::Count),
3543 PhysicalAggregateFunction::Count
3544 ));
3545 assert!(matches!(
3546 convert_aggregate_function(LogicalAggregateFunction::Sum),
3547 PhysicalAggregateFunction::Sum
3548 ));
3549 assert!(matches!(
3550 convert_aggregate_function(LogicalAggregateFunction::Avg),
3551 PhysicalAggregateFunction::Avg
3552 ));
3553 assert!(matches!(
3554 convert_aggregate_function(LogicalAggregateFunction::Min),
3555 PhysicalAggregateFunction::Min
3556 ));
3557 assert!(matches!(
3558 convert_aggregate_function(LogicalAggregateFunction::Max),
3559 PhysicalAggregateFunction::Max
3560 ));
3561 }
3562
3563 #[test]
3564 fn test_planner_accessors() {
3565 let store = create_test_store();
3566 let planner = Planner::new(Arc::clone(&store));
3567
3568 assert!(planner.tx_id().is_none());
3569 assert!(planner.tx_manager().is_none());
3570 let _ = planner.viewing_epoch(); }
3572
3573 #[test]
3574 fn test_physical_plan_accessors() {
3575 let store = create_test_store();
3576 let planner = Planner::new(store);
3577
3578 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
3579 variable: "n".to_string(),
3580 label: None,
3581 input: None,
3582 }));
3583
3584 let physical = planner.plan(&logical).unwrap();
3585 assert_eq!(physical.columns(), &["n"]);
3586
3587 let _ = physical.into_operator();
3589 }
3590}