1use crate::query::plan::{
6 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
7 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
8 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
9 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, SkipOp, SortOp,
10 SortOrder, UnaryOp, UnionOp, UnwindOp,
11};
12use grafeo_common::types::LogicalType;
13use grafeo_common::types::{EpochId, TxId};
14use grafeo_common::utils::error::{Error, Result};
15use grafeo_core::execution::operators::{
16 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
17 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
18 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
19 ExpressionPredicate, FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
20 JoinType as PhysicalJoinType, LimitOperator, MergeOperator, NullOrder, Operator, ProjectExpr,
21 ProjectOperator, PropertySource, RemoveLabelOperator, ScanOperator, SetPropertyOperator,
22 SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
23 UnaryFilterOp, UnionOperator, UnwindOperator,
24};
25use grafeo_core::graph::{Direction, lpg::LpgStore};
26use std::collections::HashMap;
27use std::sync::Arc;
28
29use crate::transaction::TransactionManager;
30
31pub struct Planner {
33 store: Arc<LpgStore>,
35 tx_manager: Option<Arc<TransactionManager>>,
37 tx_id: Option<TxId>,
39 viewing_epoch: EpochId,
41 anon_edge_counter: std::cell::Cell<u32>,
43}
44
45impl Planner {
46 #[must_use]
51 pub fn new(store: Arc<LpgStore>) -> Self {
52 let epoch = store.current_epoch();
53 Self {
54 store,
55 tx_manager: None,
56 tx_id: None,
57 viewing_epoch: epoch,
58 anon_edge_counter: std::cell::Cell::new(0),
59 }
60 }
61
62 #[must_use]
71 pub fn with_context(
72 store: Arc<LpgStore>,
73 tx_manager: Arc<TransactionManager>,
74 tx_id: Option<TxId>,
75 viewing_epoch: EpochId,
76 ) -> Self {
77 Self {
78 store,
79 tx_manager: Some(tx_manager),
80 tx_id,
81 viewing_epoch,
82 anon_edge_counter: std::cell::Cell::new(0),
83 }
84 }
85
86 #[must_use]
88 pub fn viewing_epoch(&self) -> EpochId {
89 self.viewing_epoch
90 }
91
92 #[must_use]
94 pub fn tx_id(&self) -> Option<TxId> {
95 self.tx_id
96 }
97
98 #[must_use]
100 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
101 self.tx_manager.as_ref()
102 }
103
104 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
110 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
111 Ok(PhysicalPlan { operator, columns })
112 }
113
114 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
116 match op {
117 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
118 LogicalOperator::Expand(expand) => self.plan_expand(expand),
119 LogicalOperator::Return(ret) => self.plan_return(ret),
120 LogicalOperator::Filter(filter) => self.plan_filter(filter),
121 LogicalOperator::Project(project) => {
122 self.plan_operator(&project.input)
124 }
125 LogicalOperator::Limit(limit) => self.plan_limit(limit),
126 LogicalOperator::Skip(skip) => self.plan_skip(skip),
127 LogicalOperator::Sort(sort) => self.plan_sort(sort),
128 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
129 LogicalOperator::Join(join) => self.plan_join(join),
130 LogicalOperator::Union(union) => self.plan_union(union),
131 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
132 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
133 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
134 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
135 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
136 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
137 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
138 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
139 LogicalOperator::Merge(merge) => self.plan_merge(merge),
140 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
141 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
142 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
143 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
144 _ => Err(Error::Internal(format!(
145 "Unsupported operator: {:?}",
146 std::mem::discriminant(op)
147 ))),
148 }
149 }
150
151 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
153 let scan_op = if let Some(label) = &scan.label {
154 ScanOperator::with_label(Arc::clone(&self.store), label)
155 } else {
156 ScanOperator::new(Arc::clone(&self.store))
157 };
158
159 let operator: Box<dyn Operator> =
161 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
162
163 let columns = vec![scan.variable.clone()];
164
165 Ok((operator, columns))
168 }
169
170 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
172 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
174
175 let source_column = input_columns
177 .iter()
178 .position(|c| c == &expand.from_variable)
179 .ok_or_else(|| {
180 Error::Internal(format!(
181 "Source variable '{}' not found in input columns",
182 expand.from_variable
183 ))
184 })?;
185
186 let direction = match expand.direction {
188 ExpandDirection::Outgoing => Direction::Outgoing,
189 ExpandDirection::Incoming => Direction::Incoming,
190 ExpandDirection::Both => Direction::Both,
191 };
192
193 let expand_op = ExpandOperator::new(
195 Arc::clone(&self.store),
196 input_op,
197 source_column,
198 direction,
199 expand.edge_type.clone(),
200 )
201 .with_tx_context(self.viewing_epoch, self.tx_id);
202
203 let operator: Box<dyn Operator> = Box::new(expand_op);
204
205 let mut columns = input_columns;
208
209 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
211 let count = self.anon_edge_counter.get();
212 self.anon_edge_counter.set(count + 1);
213 format!("_anon_edge_{}", count)
214 });
215 columns.push(edge_col_name);
216
217 columns.push(expand.to_variable.clone());
218
219 Ok((operator, columns))
220 }
221
222 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
224 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
226
227 let variable_columns: HashMap<String, usize> = input_columns
229 .iter()
230 .enumerate()
231 .map(|(i, name)| (name.clone(), i))
232 .collect();
233
234 let columns: Vec<String> = ret
236 .items
237 .iter()
238 .map(|item| {
239 item.alias.clone().unwrap_or_else(|| {
240 expression_to_string(&item.expression)
242 })
243 })
244 .collect();
245
246 let needs_project = ret
248 .items
249 .iter()
250 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
251
252 if needs_project {
253 let mut projections = Vec::with_capacity(ret.items.len());
255 let mut output_types = Vec::with_capacity(ret.items.len());
256
257 for item in &ret.items {
258 match &item.expression {
259 LogicalExpression::Variable(name) => {
260 let col_idx = *variable_columns.get(name).ok_or_else(|| {
261 Error::Internal(format!("Variable '{}' not found in input", name))
262 })?;
263 projections.push(ProjectExpr::Column(col_idx));
264 output_types.push(LogicalType::Node);
266 }
267 LogicalExpression::Property { variable, property } => {
268 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
269 Error::Internal(format!("Variable '{}' not found in input", variable))
270 })?;
271 projections.push(ProjectExpr::PropertyAccess {
272 column: col_idx,
273 property: property.clone(),
274 });
275 output_types.push(LogicalType::String);
277 }
278 LogicalExpression::Literal(value) => {
279 projections.push(ProjectExpr::Constant(value.clone()));
280 output_types.push(value_to_logical_type(value));
281 }
282 _ => {
283 return Err(Error::Internal(format!(
284 "Unsupported RETURN expression: {:?}",
285 item.expression
286 )));
287 }
288 }
289 }
290
291 let operator = Box::new(ProjectOperator::with_store(
292 input_op,
293 projections,
294 output_types,
295 Arc::clone(&self.store),
296 ));
297
298 Ok((operator, columns))
299 } else {
300 let mut projections = Vec::with_capacity(ret.items.len());
303 let mut output_types = Vec::with_capacity(ret.items.len());
304
305 for item in &ret.items {
306 if let LogicalExpression::Variable(name) = &item.expression {
307 let col_idx = *variable_columns.get(name).ok_or_else(|| {
308 Error::Internal(format!("Variable '{}' not found in input", name))
309 })?;
310 projections.push(ProjectExpr::Column(col_idx));
311 output_types.push(LogicalType::Node);
312 }
313 }
314
315 if projections.len() == input_columns.len()
317 && projections
318 .iter()
319 .enumerate()
320 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
321 {
322 Ok((input_op, columns))
324 } else {
325 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
326 Ok((operator, columns))
327 }
328 }
329 }
330
331 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
333 let (input_op, columns) = self.plan_operator(&filter.input)?;
335
336 let variable_columns: HashMap<String, usize> = columns
338 .iter()
339 .enumerate()
340 .map(|(i, name)| (name.clone(), i))
341 .collect();
342
343 let filter_expr = self.convert_expression(&filter.predicate)?;
345
346 let predicate =
348 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
349
350 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
352
353 Ok((operator, columns))
354 }
355
356 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
358 let (input_op, columns) = self.plan_operator(&limit.input)?;
359 let output_schema = self.derive_schema_from_columns(&columns);
360 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
361 Ok((operator, columns))
362 }
363
364 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
366 let (input_op, columns) = self.plan_operator(&skip.input)?;
367 let output_schema = self.derive_schema_from_columns(&columns);
368 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
369 Ok((operator, columns))
370 }
371
372 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
374 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
375
376 let mut variable_columns: HashMap<String, usize> = input_columns
378 .iter()
379 .enumerate()
380 .map(|(i, name)| (name.clone(), i))
381 .collect();
382
383 let mut property_projections: Vec<(String, String, String)> = Vec::new();
385 let mut next_col_idx = input_columns.len();
386
387 for key in &sort.keys {
388 if let LogicalExpression::Property { variable, property } = &key.expression {
389 let col_name = format!("{}_{}", variable, property);
390 if !variable_columns.contains_key(&col_name) {
391 property_projections.push((
392 variable.clone(),
393 property.clone(),
394 col_name.clone(),
395 ));
396 variable_columns.insert(col_name, next_col_idx);
397 next_col_idx += 1;
398 }
399 }
400 }
401
402 let mut output_columns = input_columns.clone();
404
405 if !property_projections.is_empty() {
407 let mut projections = Vec::new();
408 let mut output_types = Vec::new();
409
410 for (i, _) in input_columns.iter().enumerate() {
412 projections.push(ProjectExpr::Column(i));
413 output_types.push(LogicalType::Node);
414 }
415
416 for (variable, property, col_name) in &property_projections {
418 let source_col = *variable_columns.get(variable).ok_or_else(|| {
419 Error::Internal(format!(
420 "Variable '{}' not found for ORDER BY property projection",
421 variable
422 ))
423 })?;
424 projections.push(ProjectExpr::PropertyAccess {
425 column: source_col,
426 property: property.clone(),
427 });
428 output_types.push(LogicalType::Any);
429 output_columns.push(col_name.clone());
430 }
431
432 input_op = Box::new(ProjectOperator::with_store(
433 input_op,
434 projections,
435 output_types,
436 Arc::clone(&self.store),
437 ));
438 }
439
440 let physical_keys: Vec<PhysicalSortKey> = sort
442 .keys
443 .iter()
444 .map(|key| {
445 let col_idx =
446 self.resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
447 Ok(PhysicalSortKey {
448 column: col_idx,
449 direction: match key.order {
450 SortOrder::Ascending => SortDirection::Ascending,
451 SortOrder::Descending => SortDirection::Descending,
452 },
453 null_order: NullOrder::NullsLast,
454 })
455 })
456 .collect::<Result<Vec<_>>>()?;
457
458 let output_schema = self.derive_schema_from_columns(&output_columns);
459 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
460 Ok((operator, output_columns))
461 }
462
463 fn resolve_sort_expression_with_properties(
465 &self,
466 expr: &LogicalExpression,
467 variable_columns: &HashMap<String, usize>,
468 ) -> Result<usize> {
469 match expr {
470 LogicalExpression::Variable(name) => variable_columns
471 .get(name)
472 .copied()
473 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found for ORDER BY", name))),
474 LogicalExpression::Property { variable, property } => {
475 let col_name = format!("{}_{}", variable, property);
477 variable_columns.get(&col_name).copied().ok_or_else(|| {
478 Error::Internal(format!(
479 "Property column '{}' not found for ORDER BY (from {}.{})",
480 col_name, variable, property
481 ))
482 })
483 }
484 _ => Err(Error::Internal(format!(
485 "Unsupported ORDER BY expression: {:?}",
486 expr
487 ))),
488 }
489 }
490
491 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
493 columns.iter().map(|_| LogicalType::Node).collect()
494 }
495
496 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
498 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
499
500 let mut variable_columns: HashMap<String, usize> = input_columns
502 .iter()
503 .enumerate()
504 .map(|(i, name)| (name.clone(), i))
505 .collect();
506
507 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
510
511 for expr in &agg.group_by {
513 if let LogicalExpression::Property { variable, property } = expr {
514 let col_name = format!("{}_{}", variable, property);
515 if !variable_columns.contains_key(&col_name) {
516 property_projections.push((
517 variable.clone(),
518 property.clone(),
519 col_name.clone(),
520 ));
521 variable_columns.insert(col_name, next_col_idx);
522 next_col_idx += 1;
523 }
524 }
525 }
526
527 for agg_expr in &agg.aggregates {
529 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
530 let col_name = format!("{}_{}", variable, property);
531 if !variable_columns.contains_key(&col_name) {
532 property_projections.push((
533 variable.clone(),
534 property.clone(),
535 col_name.clone(),
536 ));
537 variable_columns.insert(col_name, next_col_idx);
538 next_col_idx += 1;
539 }
540 }
541 }
542
543 if !property_projections.is_empty() {
545 let mut projections = Vec::new();
546 let mut output_types = Vec::new();
547
548 for (i, _) in input_columns.iter().enumerate() {
550 projections.push(ProjectExpr::Column(i));
551 output_types.push(LogicalType::Node);
552 }
553
554 for (variable, property, _col_name) in &property_projections {
556 let source_col = *variable_columns.get(variable).ok_or_else(|| {
557 Error::Internal(format!(
558 "Variable '{}' not found for property projection",
559 variable
560 ))
561 })?;
562 projections.push(ProjectExpr::PropertyAccess {
563 column: source_col,
564 property: property.clone(),
565 });
566 output_types.push(LogicalType::Int64); }
568
569 input_op = Box::new(ProjectOperator::with_store(
570 input_op,
571 projections,
572 output_types,
573 Arc::clone(&self.store),
574 ));
575 }
576
577 let group_columns: Vec<usize> = agg
579 .group_by
580 .iter()
581 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
582 .collect::<Result<Vec<_>>>()?;
583
584 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
586 .aggregates
587 .iter()
588 .map(|agg_expr| {
589 let column = agg_expr
590 .expression
591 .as_ref()
592 .map(|e| {
593 self.resolve_expression_to_column_with_properties(e, &variable_columns)
594 })
595 .transpose()?;
596
597 Ok(PhysicalAggregateExpr {
598 function: convert_aggregate_function(agg_expr.function),
599 column,
600 distinct: agg_expr.distinct,
601 alias: agg_expr.alias.clone(),
602 })
603 })
604 .collect::<Result<Vec<_>>>()?;
605
606 let mut output_schema = Vec::new();
608 let mut output_columns = Vec::new();
609
610 for (idx, expr) in agg.group_by.iter().enumerate() {
612 output_schema.push(LogicalType::Node); output_columns.push(expression_to_string(expr));
614 if idx < group_columns.len() {
616 }
618 }
619
620 for agg_expr in &agg.aggregates {
622 let result_type = match agg_expr.function {
623 LogicalAggregateFunction::Count => LogicalType::Int64,
624 LogicalAggregateFunction::Sum => LogicalType::Int64,
625 LogicalAggregateFunction::Avg => LogicalType::Float64,
626 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
627 LogicalType::Int64
631 }
632 LogicalAggregateFunction::Collect => LogicalType::String, };
634 output_schema.push(result_type);
635 output_columns.push(
636 agg_expr
637 .alias
638 .clone()
639 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
640 );
641 }
642
643 let operator: Box<dyn Operator> = if group_columns.is_empty() {
645 Box::new(SimpleAggregateOperator::new(
646 input_op,
647 physical_aggregates,
648 output_schema,
649 ))
650 } else {
651 Box::new(HashAggregateOperator::new(
652 input_op,
653 group_columns,
654 physical_aggregates,
655 output_schema,
656 ))
657 };
658
659 Ok((operator, output_columns))
660 }
661
662 #[allow(dead_code)]
664 fn resolve_expression_to_column(
665 &self,
666 expr: &LogicalExpression,
667 variable_columns: &HashMap<String, usize>,
668 ) -> Result<usize> {
669 match expr {
670 LogicalExpression::Variable(name) => variable_columns
671 .get(name)
672 .copied()
673 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
674 LogicalExpression::Property { variable, .. } => variable_columns
675 .get(variable)
676 .copied()
677 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
678 _ => Err(Error::Internal(format!(
679 "Cannot resolve expression to column: {:?}",
680 expr
681 ))),
682 }
683 }
684
685 fn resolve_expression_to_column_with_properties(
689 &self,
690 expr: &LogicalExpression,
691 variable_columns: &HashMap<String, usize>,
692 ) -> Result<usize> {
693 match expr {
694 LogicalExpression::Variable(name) => variable_columns
695 .get(name)
696 .copied()
697 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
698 LogicalExpression::Property { variable, property } => {
699 let col_name = format!("{}_{}", variable, property);
701 variable_columns.get(&col_name).copied().ok_or_else(|| {
702 Error::Internal(format!(
703 "Property column '{}' not found (from {}.{})",
704 col_name, variable, property
705 ))
706 })
707 }
708 _ => Err(Error::Internal(format!(
709 "Cannot resolve expression to column: {:?}",
710 expr
711 ))),
712 }
713 }
714
715 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
717 match expr {
718 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
719 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
720 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
721 variable: variable.clone(),
722 property: property.clone(),
723 }),
724 LogicalExpression::Binary { left, op, right } => {
725 let left_expr = self.convert_expression(left)?;
726 let right_expr = self.convert_expression(right)?;
727 let filter_op = convert_binary_op(*op)?;
728 Ok(FilterExpression::Binary {
729 left: Box::new(left_expr),
730 op: filter_op,
731 right: Box::new(right_expr),
732 })
733 }
734 LogicalExpression::Unary { op, operand } => {
735 let operand_expr = self.convert_expression(operand)?;
736 let filter_op = convert_unary_op(*op)?;
737 Ok(FilterExpression::Unary {
738 op: filter_op,
739 operand: Box::new(operand_expr),
740 })
741 }
742 LogicalExpression::FunctionCall { name, args } => {
743 let filter_args: Vec<FilterExpression> = args
744 .iter()
745 .map(|a| self.convert_expression(a))
746 .collect::<Result<Vec<_>>>()?;
747 Ok(FilterExpression::FunctionCall {
748 name: name.clone(),
749 args: filter_args,
750 })
751 }
752 LogicalExpression::Case {
753 operand,
754 when_clauses,
755 else_clause,
756 } => {
757 let filter_operand = operand
758 .as_ref()
759 .map(|e| self.convert_expression(e))
760 .transpose()?
761 .map(Box::new);
762 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
763 .iter()
764 .map(|(cond, result)| {
765 Ok((
766 self.convert_expression(cond)?,
767 self.convert_expression(result)?,
768 ))
769 })
770 .collect::<Result<Vec<_>>>()?;
771 let filter_else = else_clause
772 .as_ref()
773 .map(|e| self.convert_expression(e))
774 .transpose()?
775 .map(Box::new);
776 Ok(FilterExpression::Case {
777 operand: filter_operand,
778 when_clauses: filter_when_clauses,
779 else_clause: filter_else,
780 })
781 }
782 LogicalExpression::List(items) => {
783 let filter_items: Vec<FilterExpression> = items
784 .iter()
785 .map(|item| self.convert_expression(item))
786 .collect::<Result<Vec<_>>>()?;
787 Ok(FilterExpression::List(filter_items))
788 }
789 LogicalExpression::Map(pairs) => {
790 let filter_pairs: Vec<(String, FilterExpression)> = pairs
791 .iter()
792 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
793 .collect::<Result<Vec<_>>>()?;
794 Ok(FilterExpression::Map(filter_pairs))
795 }
796 LogicalExpression::IndexAccess { base, index } => {
797 let base_expr = self.convert_expression(base)?;
798 let index_expr = self.convert_expression(index)?;
799 Ok(FilterExpression::IndexAccess {
800 base: Box::new(base_expr),
801 index: Box::new(index_expr),
802 })
803 }
804 LogicalExpression::SliceAccess { base, start, end } => {
805 let base_expr = self.convert_expression(base)?;
806 let start_expr = start
807 .as_ref()
808 .map(|s| self.convert_expression(s))
809 .transpose()?
810 .map(Box::new);
811 let end_expr = end
812 .as_ref()
813 .map(|e| self.convert_expression(e))
814 .transpose()?
815 .map(Box::new);
816 Ok(FilterExpression::SliceAccess {
817 base: Box::new(base_expr),
818 start: start_expr,
819 end: end_expr,
820 })
821 }
822 LogicalExpression::Parameter(_) => Err(Error::Internal(
823 "Parameters not yet supported in filters".to_string(),
824 )),
825 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
826 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
827 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
828 LogicalExpression::ListComprehension {
829 variable,
830 list_expr,
831 filter_expr,
832 map_expr,
833 } => {
834 let list = self.convert_expression(list_expr)?;
835 let filter = filter_expr
836 .as_ref()
837 .map(|f| self.convert_expression(f))
838 .transpose()?
839 .map(Box::new);
840 let map = self.convert_expression(map_expr)?;
841 Ok(FilterExpression::ListComprehension {
842 variable: variable.clone(),
843 list_expr: Box::new(list),
844 filter_expr: filter,
845 map_expr: Box::new(map),
846 })
847 }
848 LogicalExpression::ExistsSubquery(subplan) => {
849 let (start_var, direction, edge_type, end_labels) =
852 self.extract_exists_pattern(subplan)?;
853
854 Ok(FilterExpression::ExistsSubquery {
855 start_var,
856 direction,
857 edge_type,
858 end_labels,
859 min_hops: None,
860 max_hops: None,
861 })
862 }
863 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
864 "COUNT subqueries not yet supported".to_string(),
865 )),
866 }
867 }
868
869 fn extract_exists_pattern(
872 &self,
873 subplan: &LogicalOperator,
874 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
875 match subplan {
876 LogicalOperator::Expand(expand) => {
877 let end_labels = self.extract_end_labels_from_expand(expand);
879 let direction = match expand.direction {
880 ExpandDirection::Outgoing => Direction::Outgoing,
881 ExpandDirection::Incoming => Direction::Incoming,
882 ExpandDirection::Both => Direction::Both,
883 };
884 Ok((
885 expand.from_variable.clone(),
886 direction,
887 expand.edge_type.clone(),
888 end_labels,
889 ))
890 }
891 LogicalOperator::NodeScan(scan) => {
892 if let Some(input) = &scan.input {
893 self.extract_exists_pattern(input)
894 } else {
895 Err(Error::Internal(
896 "EXISTS subquery must contain an edge pattern".to_string(),
897 ))
898 }
899 }
900 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
901 _ => Err(Error::Internal(
902 "Unsupported EXISTS subquery pattern".to_string(),
903 )),
904 }
905 }
906
907 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
909 match expand.input.as_ref() {
911 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
912 _ => None,
913 }
914 }
915
916 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
918 let (left_op, left_columns) = self.plan_operator(&join.left)?;
919 let (right_op, right_columns) = self.plan_operator(&join.right)?;
920
921 let mut columns = left_columns.clone();
923 columns.extend(right_columns.clone());
924
925 let physical_join_type = match join.join_type {
927 JoinType::Inner => PhysicalJoinType::Inner,
928 JoinType::Left => PhysicalJoinType::Left,
929 JoinType::Right => PhysicalJoinType::Right,
930 JoinType::Full => PhysicalJoinType::Full,
931 JoinType::Cross => PhysicalJoinType::Cross,
932 JoinType::Semi => PhysicalJoinType::Semi,
933 JoinType::Anti => PhysicalJoinType::Anti,
934 };
935
936 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
938 (vec![], vec![])
940 } else {
941 join.conditions
942 .iter()
943 .filter_map(|cond| {
944 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
946 let right_idx = self
947 .expression_to_column(&cond.right, &right_columns)
948 .ok()?;
949 Some((left_idx, right_idx))
950 })
951 .unzip()
952 };
953
954 let output_schema = self.derive_schema_from_columns(&columns);
955
956 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
957 left_op,
958 right_op,
959 probe_keys,
960 build_keys,
961 physical_join_type,
962 output_schema,
963 ));
964
965 Ok((operator, columns))
966 }
967
968 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
970 match expr {
971 LogicalExpression::Variable(name) => columns
972 .iter()
973 .position(|c| c == name)
974 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
975 _ => Err(Error::Internal(
976 "Only variables supported in join conditions".to_string(),
977 )),
978 }
979 }
980
981 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
983 if union.inputs.is_empty() {
984 return Err(Error::Internal(
985 "Union requires at least one input".to_string(),
986 ));
987 }
988
989 let mut inputs = Vec::with_capacity(union.inputs.len());
990 let mut columns = Vec::new();
991
992 for (i, input) in union.inputs.iter().enumerate() {
993 let (op, cols) = self.plan_operator(input)?;
994 if i == 0 {
995 columns = cols;
996 }
997 inputs.push(op);
998 }
999
1000 let output_schema = self.derive_schema_from_columns(&columns);
1001 let operator = Box::new(UnionOperator::new(inputs, output_schema));
1002
1003 Ok((operator, columns))
1004 }
1005
1006 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1008 let (input_op, columns) = self.plan_operator(&distinct.input)?;
1009 let output_schema = self.derive_schema_from_columns(&columns);
1010 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
1011 Ok((operator, columns))
1012 }
1013
1014 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1016 let (input_op, mut columns) = if let Some(ref input) = create.input {
1018 let (op, cols) = self.plan_operator(input)?;
1019 (Some(op), cols)
1020 } else {
1021 (None, vec![])
1022 };
1023
1024 let output_column = columns.len();
1026 columns.push(create.variable.clone());
1027
1028 let properties: Vec<(String, PropertySource)> = create
1030 .properties
1031 .iter()
1032 .map(|(name, expr)| {
1033 let source = match expr {
1034 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1035 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1036 };
1037 (name.clone(), source)
1038 })
1039 .collect();
1040
1041 let output_schema = self.derive_schema_from_columns(&columns);
1042
1043 let operator = Box::new(
1044 CreateNodeOperator::new(
1045 Arc::clone(&self.store),
1046 input_op,
1047 create.labels.clone(),
1048 properties,
1049 output_schema,
1050 output_column,
1051 )
1052 .with_tx_context(self.viewing_epoch, self.tx_id),
1053 );
1054
1055 Ok((operator, columns))
1056 }
1057
1058 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1060 let (input_op, mut columns) = self.plan_operator(&create.input)?;
1061
1062 let from_column = columns
1064 .iter()
1065 .position(|c| c == &create.from_variable)
1066 .ok_or_else(|| {
1067 Error::Internal(format!(
1068 "Source variable '{}' not found",
1069 create.from_variable
1070 ))
1071 })?;
1072
1073 let to_column = columns
1074 .iter()
1075 .position(|c| c == &create.to_variable)
1076 .ok_or_else(|| {
1077 Error::Internal(format!(
1078 "Target variable '{}' not found",
1079 create.to_variable
1080 ))
1081 })?;
1082
1083 let output_column = create.variable.as_ref().map(|v| {
1085 let idx = columns.len();
1086 columns.push(v.clone());
1087 idx
1088 });
1089
1090 let properties: Vec<(String, PropertySource)> = create
1092 .properties
1093 .iter()
1094 .map(|(name, expr)| {
1095 let source = match expr {
1096 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1097 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1098 };
1099 (name.clone(), source)
1100 })
1101 .collect();
1102
1103 let output_schema = self.derive_schema_from_columns(&columns);
1104
1105 let operator = Box::new(
1106 CreateEdgeOperator::new(
1107 Arc::clone(&self.store),
1108 input_op,
1109 from_column,
1110 to_column,
1111 create.edge_type.clone(),
1112 properties,
1113 output_schema,
1114 output_column,
1115 )
1116 .with_tx_context(self.viewing_epoch, self.tx_id),
1117 );
1118
1119 Ok((operator, columns))
1120 }
1121
1122 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1124 let (input_op, columns) = self.plan_operator(&delete.input)?;
1125
1126 let node_column = columns
1127 .iter()
1128 .position(|c| c == &delete.variable)
1129 .ok_or_else(|| {
1130 Error::Internal(format!(
1131 "Variable '{}' not found for delete",
1132 delete.variable
1133 ))
1134 })?;
1135
1136 let output_schema = vec![LogicalType::Int64];
1138 let output_columns = vec!["deleted_count".to_string()];
1139
1140 let operator = Box::new(
1141 DeleteNodeOperator::new(
1142 Arc::clone(&self.store),
1143 input_op,
1144 node_column,
1145 output_schema,
1146 true, )
1148 .with_tx_context(self.viewing_epoch, self.tx_id),
1149 );
1150
1151 Ok((operator, output_columns))
1152 }
1153
1154 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1156 let (input_op, columns) = self.plan_operator(&delete.input)?;
1157
1158 let edge_column = columns
1159 .iter()
1160 .position(|c| c == &delete.variable)
1161 .ok_or_else(|| {
1162 Error::Internal(format!(
1163 "Variable '{}' not found for delete",
1164 delete.variable
1165 ))
1166 })?;
1167
1168 let output_schema = vec![LogicalType::Int64];
1170 let output_columns = vec!["deleted_count".to_string()];
1171
1172 let operator = Box::new(
1173 DeleteEdgeOperator::new(
1174 Arc::clone(&self.store),
1175 input_op,
1176 edge_column,
1177 output_schema,
1178 )
1179 .with_tx_context(self.viewing_epoch, self.tx_id),
1180 );
1181
1182 Ok((operator, output_columns))
1183 }
1184
1185 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1187 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
1188 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
1189
1190 let mut columns = left_columns.clone();
1192 columns.extend(right_columns.clone());
1193
1194 let mut probe_keys = Vec::new();
1196 let mut build_keys = Vec::new();
1197
1198 for (right_idx, right_col) in right_columns.iter().enumerate() {
1199 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1200 probe_keys.push(left_idx);
1201 build_keys.push(right_idx);
1202 }
1203 }
1204
1205 let output_schema = self.derive_schema_from_columns(&columns);
1206
1207 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1208 left_op,
1209 right_op,
1210 probe_keys,
1211 build_keys,
1212 PhysicalJoinType::Left,
1213 output_schema,
1214 ));
1215
1216 Ok((operator, columns))
1217 }
1218
1219 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1221 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
1222 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
1223
1224 let columns = left_columns.clone();
1226
1227 let mut probe_keys = Vec::new();
1229 let mut build_keys = Vec::new();
1230
1231 for (right_idx, right_col) in right_columns.iter().enumerate() {
1232 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1233 probe_keys.push(left_idx);
1234 build_keys.push(right_idx);
1235 }
1236 }
1237
1238 let output_schema = self.derive_schema_from_columns(&columns);
1239
1240 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1241 left_op,
1242 right_op,
1243 probe_keys,
1244 build_keys,
1245 PhysicalJoinType::Anti,
1246 output_schema,
1247 ));
1248
1249 Ok((operator, columns))
1250 }
1251
1252 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1254 let (input_op, input_columns) = self.plan_operator(&unwind.input)?;
1256
1257 let list_col_idx = match &unwind.expression {
1263 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
1264 LogicalExpression::Property { variable, .. } => {
1265 input_columns.iter().position(|c| c == variable)
1268 }
1269 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
1270 None
1272 }
1273 _ => None,
1274 };
1275
1276 let mut columns = input_columns.clone();
1278 columns.push(unwind.variable.clone());
1279
1280 let mut output_schema = self.derive_schema_from_columns(&input_columns);
1282 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
1287
1288 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
1289 input_op,
1290 col_idx,
1291 unwind.variable.clone(),
1292 output_schema,
1293 ));
1294
1295 Ok((operator, columns))
1296 }
1297
1298 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1300 let (_input_op, mut columns) = self.plan_operator(&merge.input)?;
1302
1303 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
1305 .match_properties
1306 .iter()
1307 .filter_map(|(name, expr)| {
1308 if let LogicalExpression::Literal(v) = expr {
1309 Some((name.clone(), v.clone()))
1310 } else {
1311 None }
1313 })
1314 .collect();
1315
1316 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
1318 .on_create
1319 .iter()
1320 .filter_map(|(name, expr)| {
1321 if let LogicalExpression::Literal(v) = expr {
1322 Some((name.clone(), v.clone()))
1323 } else {
1324 None
1325 }
1326 })
1327 .collect();
1328
1329 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
1331 .on_match
1332 .iter()
1333 .filter_map(|(name, expr)| {
1334 if let LogicalExpression::Literal(v) = expr {
1335 Some((name.clone(), v.clone()))
1336 } else {
1337 None
1338 }
1339 })
1340 .collect();
1341
1342 columns.push(merge.variable.clone());
1344
1345 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
1346 Arc::clone(&self.store),
1347 merge.variable.clone(),
1348 merge.labels.clone(),
1349 match_properties,
1350 on_create_properties,
1351 on_match_properties,
1352 ));
1353
1354 Ok((operator, columns))
1355 }
1356
1357 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1359 let (input_op, columns) = self.plan_operator(&add_label.input)?;
1360
1361 let node_column = columns
1363 .iter()
1364 .position(|c| c == &add_label.variable)
1365 .ok_or_else(|| {
1366 Error::Internal(format!(
1367 "Variable '{}' not found for ADD LABEL",
1368 add_label.variable
1369 ))
1370 })?;
1371
1372 let output_schema = vec![LogicalType::Int64];
1374 let output_columns = vec!["labels_added".to_string()];
1375
1376 let operator = Box::new(AddLabelOperator::new(
1377 Arc::clone(&self.store),
1378 input_op,
1379 node_column,
1380 add_label.labels.clone(),
1381 output_schema,
1382 ));
1383
1384 Ok((operator, output_columns))
1385 }
1386
1387 fn plan_remove_label(
1389 &self,
1390 remove_label: &RemoveLabelOp,
1391 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1392 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
1393
1394 let node_column = columns
1396 .iter()
1397 .position(|c| c == &remove_label.variable)
1398 .ok_or_else(|| {
1399 Error::Internal(format!(
1400 "Variable '{}' not found for REMOVE LABEL",
1401 remove_label.variable
1402 ))
1403 })?;
1404
1405 let output_schema = vec![LogicalType::Int64];
1407 let output_columns = vec!["labels_removed".to_string()];
1408
1409 let operator = Box::new(RemoveLabelOperator::new(
1410 Arc::clone(&self.store),
1411 input_op,
1412 node_column,
1413 remove_label.labels.clone(),
1414 output_schema,
1415 ));
1416
1417 Ok((operator, output_columns))
1418 }
1419
1420 fn plan_set_property(
1422 &self,
1423 set_prop: &SetPropertyOp,
1424 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1425 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
1426
1427 let entity_column = columns
1429 .iter()
1430 .position(|c| c == &set_prop.variable)
1431 .ok_or_else(|| {
1432 Error::Internal(format!(
1433 "Variable '{}' not found for SET",
1434 set_prop.variable
1435 ))
1436 })?;
1437
1438 let properties: Vec<(String, PropertySource)> = set_prop
1440 .properties
1441 .iter()
1442 .map(|(name, expr)| {
1443 let source = self.expression_to_property_source(expr, &columns)?;
1444 Ok((name.clone(), source))
1445 })
1446 .collect::<Result<Vec<_>>>()?;
1447
1448 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
1450 let output_columns = columns.clone();
1451
1452 let operator = Box::new(SetPropertyOperator::new_for_node(
1454 Arc::clone(&self.store),
1455 input_op,
1456 entity_column,
1457 properties,
1458 output_schema,
1459 ));
1460
1461 Ok((operator, output_columns))
1462 }
1463
1464 fn expression_to_property_source(
1466 &self,
1467 expr: &LogicalExpression,
1468 columns: &[String],
1469 ) -> Result<PropertySource> {
1470 match expr {
1471 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
1472 LogicalExpression::Variable(name) => {
1473 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
1474 Error::Internal(format!("Variable '{}' not found for property source", name))
1475 })?;
1476 Ok(PropertySource::Column(col_idx))
1477 }
1478 LogicalExpression::Parameter(name) => {
1479 Ok(PropertySource::Constant(
1482 grafeo_common::types::Value::String(format!("${}", name).into()),
1483 ))
1484 }
1485 _ => Err(Error::Internal(format!(
1486 "Unsupported expression type for property source: {:?}",
1487 expr
1488 ))),
1489 }
1490 }
1491}
1492
1493pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
1495 match op {
1496 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
1497 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
1498 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
1499 BinaryOp::Le => Ok(BinaryFilterOp::Le),
1500 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
1501 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
1502 BinaryOp::And => Ok(BinaryFilterOp::And),
1503 BinaryOp::Or => Ok(BinaryFilterOp::Or),
1504 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
1505 BinaryOp::Add => Ok(BinaryFilterOp::Add),
1506 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
1507 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
1508 BinaryOp::Div => Ok(BinaryFilterOp::Div),
1509 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
1510 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
1511 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
1512 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
1513 BinaryOp::In => Ok(BinaryFilterOp::In),
1514 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
1515 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
1516 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
1517 "Binary operator {:?} not yet supported in filters",
1518 op
1519 ))),
1520 }
1521}
1522
1523pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
1525 match op {
1526 UnaryOp::Not => Ok(UnaryFilterOp::Not),
1527 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
1528 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
1529 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
1530 }
1531}
1532
1533pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
1535 match func {
1536 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
1537 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
1538 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
1539 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
1540 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
1541 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
1542 }
1543}
1544
1545pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
1549 match expr {
1550 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1551 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1552 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1553 variable: variable.clone(),
1554 property: property.clone(),
1555 }),
1556 LogicalExpression::Binary { left, op, right } => {
1557 let left_expr = convert_filter_expression(left)?;
1558 let right_expr = convert_filter_expression(right)?;
1559 let filter_op = convert_binary_op(*op)?;
1560 Ok(FilterExpression::Binary {
1561 left: Box::new(left_expr),
1562 op: filter_op,
1563 right: Box::new(right_expr),
1564 })
1565 }
1566 LogicalExpression::Unary { op, operand } => {
1567 let operand_expr = convert_filter_expression(operand)?;
1568 let filter_op = convert_unary_op(*op)?;
1569 Ok(FilterExpression::Unary {
1570 op: filter_op,
1571 operand: Box::new(operand_expr),
1572 })
1573 }
1574 LogicalExpression::FunctionCall { name, args } => {
1575 let filter_args: Vec<FilterExpression> = args
1576 .iter()
1577 .map(|a| convert_filter_expression(a))
1578 .collect::<Result<Vec<_>>>()?;
1579 Ok(FilterExpression::FunctionCall {
1580 name: name.clone(),
1581 args: filter_args,
1582 })
1583 }
1584 LogicalExpression::Case {
1585 operand,
1586 when_clauses,
1587 else_clause,
1588 } => {
1589 let filter_operand = operand
1590 .as_ref()
1591 .map(|e| convert_filter_expression(e))
1592 .transpose()?
1593 .map(Box::new);
1594 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1595 .iter()
1596 .map(|(cond, result)| {
1597 Ok((
1598 convert_filter_expression(cond)?,
1599 convert_filter_expression(result)?,
1600 ))
1601 })
1602 .collect::<Result<Vec<_>>>()?;
1603 let filter_else = else_clause
1604 .as_ref()
1605 .map(|e| convert_filter_expression(e))
1606 .transpose()?
1607 .map(Box::new);
1608 Ok(FilterExpression::Case {
1609 operand: filter_operand,
1610 when_clauses: filter_when_clauses,
1611 else_clause: filter_else,
1612 })
1613 }
1614 LogicalExpression::List(items) => {
1615 let filter_items: Vec<FilterExpression> = items
1616 .iter()
1617 .map(|item| convert_filter_expression(item))
1618 .collect::<Result<Vec<_>>>()?;
1619 Ok(FilterExpression::List(filter_items))
1620 }
1621 LogicalExpression::Map(pairs) => {
1622 let filter_pairs: Vec<(String, FilterExpression)> = pairs
1623 .iter()
1624 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
1625 .collect::<Result<Vec<_>>>()?;
1626 Ok(FilterExpression::Map(filter_pairs))
1627 }
1628 LogicalExpression::IndexAccess { base, index } => {
1629 let base_expr = convert_filter_expression(base)?;
1630 let index_expr = convert_filter_expression(index)?;
1631 Ok(FilterExpression::IndexAccess {
1632 base: Box::new(base_expr),
1633 index: Box::new(index_expr),
1634 })
1635 }
1636 LogicalExpression::SliceAccess { base, start, end } => {
1637 let base_expr = convert_filter_expression(base)?;
1638 let start_expr = start
1639 .as_ref()
1640 .map(|s| convert_filter_expression(s))
1641 .transpose()?
1642 .map(Box::new);
1643 let end_expr = end
1644 .as_ref()
1645 .map(|e| convert_filter_expression(e))
1646 .transpose()?
1647 .map(Box::new);
1648 Ok(FilterExpression::SliceAccess {
1649 base: Box::new(base_expr),
1650 start: start_expr,
1651 end: end_expr,
1652 })
1653 }
1654 LogicalExpression::Parameter(_) => Err(Error::Internal(
1655 "Parameters not yet supported in filters".to_string(),
1656 )),
1657 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1658 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1659 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1660 LogicalExpression::ListComprehension {
1661 variable,
1662 list_expr,
1663 filter_expr,
1664 map_expr,
1665 } => {
1666 let list = convert_filter_expression(list_expr)?;
1667 let filter = filter_expr
1668 .as_ref()
1669 .map(|f| convert_filter_expression(f))
1670 .transpose()?
1671 .map(Box::new);
1672 let map = convert_filter_expression(map_expr)?;
1673 Ok(FilterExpression::ListComprehension {
1674 variable: variable.clone(),
1675 list_expr: Box::new(list),
1676 filter_expr: filter,
1677 map_expr: Box::new(map),
1678 })
1679 }
1680 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
1681 Error::Internal("Subqueries not yet supported in filters".to_string()),
1682 ),
1683 }
1684}
1685
1686fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
1688 use grafeo_common::types::Value;
1689 match value {
1690 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
1692 Value::Int64(_) => LogicalType::Int64,
1693 Value::Float64(_) => LogicalType::Float64,
1694 Value::String(_) => LogicalType::String,
1695 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
1697 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, }
1700}
1701
1702fn expression_to_string(expr: &LogicalExpression) -> String {
1704 match expr {
1705 LogicalExpression::Variable(name) => name.clone(),
1706 LogicalExpression::Property { variable, property } => {
1707 format!("{variable}.{property}")
1708 }
1709 LogicalExpression::Literal(value) => format!("{value:?}"),
1710 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
1711 _ => "expr".to_string(),
1712 }
1713}
1714
1715pub struct PhysicalPlan {
1717 pub operator: Box<dyn Operator>,
1719 pub columns: Vec<String>,
1721}
1722
1723impl PhysicalPlan {
1724 #[must_use]
1726 pub fn columns(&self) -> &[String] {
1727 &self.columns
1728 }
1729
1730 pub fn into_operator(self) -> Box<dyn Operator> {
1732 self.operator
1733 }
1734}
1735
1736#[cfg(test)]
1737mod tests {
1738 use super::*;
1739 use crate::query::plan::{
1740 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
1741 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
1742 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
1743 SortKey, SortOp,
1744 };
1745 use grafeo_common::types::Value;
1746
1747 fn create_test_store() -> Arc<LpgStore> {
1748 let store = Arc::new(LpgStore::new());
1749 store.create_node(&["Person"]);
1750 store.create_node(&["Person"]);
1751 store.create_node(&["Company"]);
1752 store
1753 }
1754
1755 #[test]
1758 fn test_plan_simple_scan() {
1759 let store = create_test_store();
1760 let planner = Planner::new(store);
1761
1762 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1764 items: vec![ReturnItem {
1765 expression: LogicalExpression::Variable("n".to_string()),
1766 alias: None,
1767 }],
1768 distinct: false,
1769 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1770 variable: "n".to_string(),
1771 label: Some("Person".to_string()),
1772 input: None,
1773 })),
1774 }));
1775
1776 let physical = planner.plan(&logical).unwrap();
1777 assert_eq!(physical.columns(), &["n"]);
1778 }
1779
1780 #[test]
1781 fn test_plan_scan_without_label() {
1782 let store = create_test_store();
1783 let planner = Planner::new(store);
1784
1785 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1787 items: vec![ReturnItem {
1788 expression: LogicalExpression::Variable("n".to_string()),
1789 alias: None,
1790 }],
1791 distinct: false,
1792 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1793 variable: "n".to_string(),
1794 label: None,
1795 input: None,
1796 })),
1797 }));
1798
1799 let physical = planner.plan(&logical).unwrap();
1800 assert_eq!(physical.columns(), &["n"]);
1801 }
1802
1803 #[test]
1804 fn test_plan_return_with_alias() {
1805 let store = create_test_store();
1806 let planner = Planner::new(store);
1807
1808 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1810 items: vec![ReturnItem {
1811 expression: LogicalExpression::Variable("n".to_string()),
1812 alias: Some("person".to_string()),
1813 }],
1814 distinct: false,
1815 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816 variable: "n".to_string(),
1817 label: Some("Person".to_string()),
1818 input: None,
1819 })),
1820 }));
1821
1822 let physical = planner.plan(&logical).unwrap();
1823 assert_eq!(physical.columns(), &["person"]);
1824 }
1825
1826 #[test]
1827 fn test_plan_return_property() {
1828 let store = create_test_store();
1829 let planner = Planner::new(store);
1830
1831 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1833 items: vec![ReturnItem {
1834 expression: LogicalExpression::Property {
1835 variable: "n".to_string(),
1836 property: "name".to_string(),
1837 },
1838 alias: None,
1839 }],
1840 distinct: false,
1841 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1842 variable: "n".to_string(),
1843 label: Some("Person".to_string()),
1844 input: None,
1845 })),
1846 }));
1847
1848 let physical = planner.plan(&logical).unwrap();
1849 assert_eq!(physical.columns(), &["n.name"]);
1850 }
1851
1852 #[test]
1853 fn test_plan_return_literal() {
1854 let store = create_test_store();
1855 let planner = Planner::new(store);
1856
1857 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1859 items: vec![ReturnItem {
1860 expression: LogicalExpression::Literal(Value::Int64(42)),
1861 alias: Some("answer".to_string()),
1862 }],
1863 distinct: false,
1864 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1865 variable: "n".to_string(),
1866 label: None,
1867 input: None,
1868 })),
1869 }));
1870
1871 let physical = planner.plan(&logical).unwrap();
1872 assert_eq!(physical.columns(), &["answer"]);
1873 }
1874
1875 #[test]
1878 fn test_plan_filter_equality() {
1879 let store = create_test_store();
1880 let planner = Planner::new(store);
1881
1882 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1884 items: vec![ReturnItem {
1885 expression: LogicalExpression::Variable("n".to_string()),
1886 alias: None,
1887 }],
1888 distinct: false,
1889 input: Box::new(LogicalOperator::Filter(FilterOp {
1890 predicate: LogicalExpression::Binary {
1891 left: Box::new(LogicalExpression::Property {
1892 variable: "n".to_string(),
1893 property: "age".to_string(),
1894 }),
1895 op: BinaryOp::Eq,
1896 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1897 },
1898 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1899 variable: "n".to_string(),
1900 label: Some("Person".to_string()),
1901 input: None,
1902 })),
1903 })),
1904 }));
1905
1906 let physical = planner.plan(&logical).unwrap();
1907 assert_eq!(physical.columns(), &["n"]);
1908 }
1909
1910 #[test]
1911 fn test_plan_filter_compound_and() {
1912 let store = create_test_store();
1913 let planner = Planner::new(store);
1914
1915 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1917 items: vec![ReturnItem {
1918 expression: LogicalExpression::Variable("n".to_string()),
1919 alias: None,
1920 }],
1921 distinct: false,
1922 input: Box::new(LogicalOperator::Filter(FilterOp {
1923 predicate: LogicalExpression::Binary {
1924 left: Box::new(LogicalExpression::Binary {
1925 left: Box::new(LogicalExpression::Property {
1926 variable: "n".to_string(),
1927 property: "age".to_string(),
1928 }),
1929 op: BinaryOp::Gt,
1930 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1931 }),
1932 op: BinaryOp::And,
1933 right: Box::new(LogicalExpression::Binary {
1934 left: Box::new(LogicalExpression::Property {
1935 variable: "n".to_string(),
1936 property: "age".to_string(),
1937 }),
1938 op: BinaryOp::Lt,
1939 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1940 }),
1941 },
1942 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1943 variable: "n".to_string(),
1944 label: None,
1945 input: None,
1946 })),
1947 })),
1948 }));
1949
1950 let physical = planner.plan(&logical).unwrap();
1951 assert_eq!(physical.columns(), &["n"]);
1952 }
1953
1954 #[test]
1955 fn test_plan_filter_unary_not() {
1956 let store = create_test_store();
1957 let planner = Planner::new(store);
1958
1959 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1961 items: vec![ReturnItem {
1962 expression: LogicalExpression::Variable("n".to_string()),
1963 alias: None,
1964 }],
1965 distinct: false,
1966 input: Box::new(LogicalOperator::Filter(FilterOp {
1967 predicate: LogicalExpression::Unary {
1968 op: UnaryOp::Not,
1969 operand: Box::new(LogicalExpression::Property {
1970 variable: "n".to_string(),
1971 property: "active".to_string(),
1972 }),
1973 },
1974 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1975 variable: "n".to_string(),
1976 label: None,
1977 input: None,
1978 })),
1979 })),
1980 }));
1981
1982 let physical = planner.plan(&logical).unwrap();
1983 assert_eq!(physical.columns(), &["n"]);
1984 }
1985
1986 #[test]
1987 fn test_plan_filter_is_null() {
1988 let store = create_test_store();
1989 let planner = Planner::new(store);
1990
1991 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1993 items: vec![ReturnItem {
1994 expression: LogicalExpression::Variable("n".to_string()),
1995 alias: None,
1996 }],
1997 distinct: false,
1998 input: Box::new(LogicalOperator::Filter(FilterOp {
1999 predicate: LogicalExpression::Unary {
2000 op: UnaryOp::IsNull,
2001 operand: Box::new(LogicalExpression::Property {
2002 variable: "n".to_string(),
2003 property: "email".to_string(),
2004 }),
2005 },
2006 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2007 variable: "n".to_string(),
2008 label: None,
2009 input: None,
2010 })),
2011 })),
2012 }));
2013
2014 let physical = planner.plan(&logical).unwrap();
2015 assert_eq!(physical.columns(), &["n"]);
2016 }
2017
2018 #[test]
2019 fn test_plan_filter_function_call() {
2020 let store = create_test_store();
2021 let planner = Planner::new(store);
2022
2023 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2025 items: vec![ReturnItem {
2026 expression: LogicalExpression::Variable("n".to_string()),
2027 alias: None,
2028 }],
2029 distinct: false,
2030 input: Box::new(LogicalOperator::Filter(FilterOp {
2031 predicate: LogicalExpression::Binary {
2032 left: Box::new(LogicalExpression::FunctionCall {
2033 name: "size".to_string(),
2034 args: vec![LogicalExpression::Property {
2035 variable: "n".to_string(),
2036 property: "friends".to_string(),
2037 }],
2038 }),
2039 op: BinaryOp::Gt,
2040 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
2041 },
2042 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2043 variable: "n".to_string(),
2044 label: None,
2045 input: None,
2046 })),
2047 })),
2048 }));
2049
2050 let physical = planner.plan(&logical).unwrap();
2051 assert_eq!(physical.columns(), &["n"]);
2052 }
2053
2054 #[test]
2057 fn test_plan_expand_outgoing() {
2058 let store = create_test_store();
2059 let planner = Planner::new(store);
2060
2061 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2063 items: vec![
2064 ReturnItem {
2065 expression: LogicalExpression::Variable("a".to_string()),
2066 alias: None,
2067 },
2068 ReturnItem {
2069 expression: LogicalExpression::Variable("b".to_string()),
2070 alias: None,
2071 },
2072 ],
2073 distinct: false,
2074 input: Box::new(LogicalOperator::Expand(ExpandOp {
2075 from_variable: "a".to_string(),
2076 to_variable: "b".to_string(),
2077 edge_variable: None,
2078 direction: ExpandDirection::Outgoing,
2079 edge_type: Some("KNOWS".to_string()),
2080 min_hops: 1,
2081 max_hops: Some(1),
2082 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2083 variable: "a".to_string(),
2084 label: Some("Person".to_string()),
2085 input: None,
2086 })),
2087 })),
2088 }));
2089
2090 let physical = planner.plan(&logical).unwrap();
2091 assert!(physical.columns().contains(&"a".to_string()));
2093 assert!(physical.columns().contains(&"b".to_string()));
2094 }
2095
2096 #[test]
2097 fn test_plan_expand_with_edge_variable() {
2098 let store = create_test_store();
2099 let planner = Planner::new(store);
2100
2101 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2103 items: vec![
2104 ReturnItem {
2105 expression: LogicalExpression::Variable("a".to_string()),
2106 alias: None,
2107 },
2108 ReturnItem {
2109 expression: LogicalExpression::Variable("r".to_string()),
2110 alias: None,
2111 },
2112 ReturnItem {
2113 expression: LogicalExpression::Variable("b".to_string()),
2114 alias: None,
2115 },
2116 ],
2117 distinct: false,
2118 input: Box::new(LogicalOperator::Expand(ExpandOp {
2119 from_variable: "a".to_string(),
2120 to_variable: "b".to_string(),
2121 edge_variable: Some("r".to_string()),
2122 direction: ExpandDirection::Outgoing,
2123 edge_type: Some("KNOWS".to_string()),
2124 min_hops: 1,
2125 max_hops: Some(1),
2126 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2127 variable: "a".to_string(),
2128 label: None,
2129 input: None,
2130 })),
2131 })),
2132 }));
2133
2134 let physical = planner.plan(&logical).unwrap();
2135 assert!(physical.columns().contains(&"a".to_string()));
2136 assert!(physical.columns().contains(&"r".to_string()));
2137 assert!(physical.columns().contains(&"b".to_string()));
2138 }
2139
2140 #[test]
2143 fn test_plan_limit() {
2144 let store = create_test_store();
2145 let planner = Planner::new(store);
2146
2147 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2149 items: vec![ReturnItem {
2150 expression: LogicalExpression::Variable("n".to_string()),
2151 alias: None,
2152 }],
2153 distinct: false,
2154 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2155 count: 10,
2156 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2157 variable: "n".to_string(),
2158 label: None,
2159 input: None,
2160 })),
2161 })),
2162 }));
2163
2164 let physical = planner.plan(&logical).unwrap();
2165 assert_eq!(physical.columns(), &["n"]);
2166 }
2167
2168 #[test]
2169 fn test_plan_skip() {
2170 let store = create_test_store();
2171 let planner = Planner::new(store);
2172
2173 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2175 items: vec![ReturnItem {
2176 expression: LogicalExpression::Variable("n".to_string()),
2177 alias: None,
2178 }],
2179 distinct: false,
2180 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2181 count: 5,
2182 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2183 variable: "n".to_string(),
2184 label: None,
2185 input: None,
2186 })),
2187 })),
2188 }));
2189
2190 let physical = planner.plan(&logical).unwrap();
2191 assert_eq!(physical.columns(), &["n"]);
2192 }
2193
2194 #[test]
2195 fn test_plan_sort() {
2196 let store = create_test_store();
2197 let planner = Planner::new(store);
2198
2199 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2201 items: vec![ReturnItem {
2202 expression: LogicalExpression::Variable("n".to_string()),
2203 alias: None,
2204 }],
2205 distinct: false,
2206 input: Box::new(LogicalOperator::Sort(SortOp {
2207 keys: vec![SortKey {
2208 expression: LogicalExpression::Variable("n".to_string()),
2209 order: SortOrder::Ascending,
2210 }],
2211 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2212 variable: "n".to_string(),
2213 label: None,
2214 input: None,
2215 })),
2216 })),
2217 }));
2218
2219 let physical = planner.plan(&logical).unwrap();
2220 assert_eq!(physical.columns(), &["n"]);
2221 }
2222
2223 #[test]
2224 fn test_plan_sort_descending() {
2225 let store = create_test_store();
2226 let planner = Planner::new(store);
2227
2228 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2230 items: vec![ReturnItem {
2231 expression: LogicalExpression::Variable("n".to_string()),
2232 alias: None,
2233 }],
2234 distinct: false,
2235 input: Box::new(LogicalOperator::Sort(SortOp {
2236 keys: vec![SortKey {
2237 expression: LogicalExpression::Variable("n".to_string()),
2238 order: SortOrder::Descending,
2239 }],
2240 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2241 variable: "n".to_string(),
2242 label: None,
2243 input: None,
2244 })),
2245 })),
2246 }));
2247
2248 let physical = planner.plan(&logical).unwrap();
2249 assert_eq!(physical.columns(), &["n"]);
2250 }
2251
2252 #[test]
2253 fn test_plan_distinct() {
2254 let store = create_test_store();
2255 let planner = Planner::new(store);
2256
2257 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2259 items: vec![ReturnItem {
2260 expression: LogicalExpression::Variable("n".to_string()),
2261 alias: None,
2262 }],
2263 distinct: false,
2264 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2265 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2266 variable: "n".to_string(),
2267 label: None,
2268 input: None,
2269 })),
2270 })),
2271 }));
2272
2273 let physical = planner.plan(&logical).unwrap();
2274 assert_eq!(physical.columns(), &["n"]);
2275 }
2276
2277 #[test]
2280 fn test_plan_aggregate_count() {
2281 let store = create_test_store();
2282 let planner = Planner::new(store);
2283
2284 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2286 items: vec![ReturnItem {
2287 expression: LogicalExpression::Variable("cnt".to_string()),
2288 alias: None,
2289 }],
2290 distinct: false,
2291 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
2292 group_by: vec![],
2293 aggregates: vec![LogicalAggregateExpr {
2294 function: LogicalAggregateFunction::Count,
2295 expression: Some(LogicalExpression::Variable("n".to_string())),
2296 distinct: false,
2297 alias: Some("cnt".to_string()),
2298 }],
2299 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2300 variable: "n".to_string(),
2301 label: None,
2302 input: None,
2303 })),
2304 })),
2305 }));
2306
2307 let physical = planner.plan(&logical).unwrap();
2308 assert!(physical.columns().contains(&"cnt".to_string()));
2309 }
2310
2311 #[test]
2312 fn test_plan_aggregate_with_group_by() {
2313 let store = create_test_store();
2314 let planner = Planner::new(store);
2315
2316 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2318 group_by: vec![LogicalExpression::Property {
2319 variable: "n".to_string(),
2320 property: "city".to_string(),
2321 }],
2322 aggregates: vec![LogicalAggregateExpr {
2323 function: LogicalAggregateFunction::Count,
2324 expression: Some(LogicalExpression::Variable("n".to_string())),
2325 distinct: false,
2326 alias: Some("cnt".to_string()),
2327 }],
2328 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2329 variable: "n".to_string(),
2330 label: Some("Person".to_string()),
2331 input: None,
2332 })),
2333 }));
2334
2335 let physical = planner.plan(&logical).unwrap();
2336 assert_eq!(physical.columns().len(), 2);
2337 }
2338
2339 #[test]
2340 fn test_plan_aggregate_sum() {
2341 let store = create_test_store();
2342 let planner = Planner::new(store);
2343
2344 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2346 group_by: vec![],
2347 aggregates: vec![LogicalAggregateExpr {
2348 function: LogicalAggregateFunction::Sum,
2349 expression: Some(LogicalExpression::Property {
2350 variable: "n".to_string(),
2351 property: "value".to_string(),
2352 }),
2353 distinct: false,
2354 alias: Some("total".to_string()),
2355 }],
2356 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2357 variable: "n".to_string(),
2358 label: None,
2359 input: None,
2360 })),
2361 }));
2362
2363 let physical = planner.plan(&logical).unwrap();
2364 assert!(physical.columns().contains(&"total".to_string()));
2365 }
2366
2367 #[test]
2368 fn test_plan_aggregate_avg() {
2369 let store = create_test_store();
2370 let planner = Planner::new(store);
2371
2372 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2374 group_by: vec![],
2375 aggregates: vec![LogicalAggregateExpr {
2376 function: LogicalAggregateFunction::Avg,
2377 expression: Some(LogicalExpression::Property {
2378 variable: "n".to_string(),
2379 property: "score".to_string(),
2380 }),
2381 distinct: false,
2382 alias: Some("average".to_string()),
2383 }],
2384 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2385 variable: "n".to_string(),
2386 label: None,
2387 input: None,
2388 })),
2389 }));
2390
2391 let physical = planner.plan(&logical).unwrap();
2392 assert!(physical.columns().contains(&"average".to_string()));
2393 }
2394
2395 #[test]
2396 fn test_plan_aggregate_min_max() {
2397 let store = create_test_store();
2398 let planner = Planner::new(store);
2399
2400 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2402 group_by: vec![],
2403 aggregates: vec![
2404 LogicalAggregateExpr {
2405 function: LogicalAggregateFunction::Min,
2406 expression: Some(LogicalExpression::Property {
2407 variable: "n".to_string(),
2408 property: "age".to_string(),
2409 }),
2410 distinct: false,
2411 alias: Some("youngest".to_string()),
2412 },
2413 LogicalAggregateExpr {
2414 function: LogicalAggregateFunction::Max,
2415 expression: Some(LogicalExpression::Property {
2416 variable: "n".to_string(),
2417 property: "age".to_string(),
2418 }),
2419 distinct: false,
2420 alias: Some("oldest".to_string()),
2421 },
2422 ],
2423 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2424 variable: "n".to_string(),
2425 label: None,
2426 input: None,
2427 })),
2428 }));
2429
2430 let physical = planner.plan(&logical).unwrap();
2431 assert!(physical.columns().contains(&"youngest".to_string()));
2432 assert!(physical.columns().contains(&"oldest".to_string()));
2433 }
2434
2435 #[test]
2438 fn test_plan_inner_join() {
2439 let store = create_test_store();
2440 let planner = Planner::new(store);
2441
2442 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2444 items: vec![
2445 ReturnItem {
2446 expression: LogicalExpression::Variable("a".to_string()),
2447 alias: None,
2448 },
2449 ReturnItem {
2450 expression: LogicalExpression::Variable("b".to_string()),
2451 alias: None,
2452 },
2453 ],
2454 distinct: false,
2455 input: Box::new(LogicalOperator::Join(JoinOp {
2456 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2457 variable: "a".to_string(),
2458 label: Some("Person".to_string()),
2459 input: None,
2460 })),
2461 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2462 variable: "b".to_string(),
2463 label: Some("Company".to_string()),
2464 input: None,
2465 })),
2466 join_type: JoinType::Inner,
2467 conditions: vec![JoinCondition {
2468 left: LogicalExpression::Variable("a".to_string()),
2469 right: LogicalExpression::Variable("b".to_string()),
2470 }],
2471 })),
2472 }));
2473
2474 let physical = planner.plan(&logical).unwrap();
2475 assert!(physical.columns().contains(&"a".to_string()));
2476 assert!(physical.columns().contains(&"b".to_string()));
2477 }
2478
2479 #[test]
2480 fn test_plan_cross_join() {
2481 let store = create_test_store();
2482 let planner = Planner::new(store);
2483
2484 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2486 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2487 variable: "a".to_string(),
2488 label: None,
2489 input: None,
2490 })),
2491 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2492 variable: "b".to_string(),
2493 label: None,
2494 input: None,
2495 })),
2496 join_type: JoinType::Cross,
2497 conditions: vec![],
2498 }));
2499
2500 let physical = planner.plan(&logical).unwrap();
2501 assert_eq!(physical.columns().len(), 2);
2502 }
2503
2504 #[test]
2505 fn test_plan_left_join() {
2506 let store = create_test_store();
2507 let planner = Planner::new(store);
2508
2509 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2510 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2511 variable: "a".to_string(),
2512 label: None,
2513 input: None,
2514 })),
2515 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2516 variable: "b".to_string(),
2517 label: None,
2518 input: None,
2519 })),
2520 join_type: JoinType::Left,
2521 conditions: vec![],
2522 }));
2523
2524 let physical = planner.plan(&logical).unwrap();
2525 assert_eq!(physical.columns().len(), 2);
2526 }
2527
2528 #[test]
2531 fn test_plan_create_node() {
2532 let store = create_test_store();
2533 let planner = Planner::new(store);
2534
2535 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2537 variable: "n".to_string(),
2538 labels: vec!["Person".to_string()],
2539 properties: vec![(
2540 "name".to_string(),
2541 LogicalExpression::Literal(Value::String("Alice".into())),
2542 )],
2543 input: None,
2544 }));
2545
2546 let physical = planner.plan(&logical).unwrap();
2547 assert!(physical.columns().contains(&"n".to_string()));
2548 }
2549
2550 #[test]
2551 fn test_plan_create_edge() {
2552 let store = create_test_store();
2553 let planner = Planner::new(store);
2554
2555 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
2557 variable: Some("r".to_string()),
2558 from_variable: "a".to_string(),
2559 to_variable: "b".to_string(),
2560 edge_type: "KNOWS".to_string(),
2561 properties: vec![],
2562 input: Box::new(LogicalOperator::Join(JoinOp {
2563 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2564 variable: "a".to_string(),
2565 label: None,
2566 input: None,
2567 })),
2568 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2569 variable: "b".to_string(),
2570 label: None,
2571 input: None,
2572 })),
2573 join_type: JoinType::Cross,
2574 conditions: vec![],
2575 })),
2576 }));
2577
2578 let physical = planner.plan(&logical).unwrap();
2579 assert!(physical.columns().contains(&"r".to_string()));
2580 }
2581
2582 #[test]
2583 fn test_plan_delete_node() {
2584 let store = create_test_store();
2585 let planner = Planner::new(store);
2586
2587 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
2589 variable: "n".to_string(),
2590 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2591 variable: "n".to_string(),
2592 label: None,
2593 input: None,
2594 })),
2595 }));
2596
2597 let physical = planner.plan(&logical).unwrap();
2598 assert!(physical.columns().contains(&"deleted_count".to_string()));
2599 }
2600
2601 #[test]
2604 fn test_plan_empty_errors() {
2605 let store = create_test_store();
2606 let planner = Planner::new(store);
2607
2608 let logical = LogicalPlan::new(LogicalOperator::Empty);
2609 let result = planner.plan(&logical);
2610 assert!(result.is_err());
2611 }
2612
2613 #[test]
2614 fn test_plan_missing_variable_in_return() {
2615 let store = create_test_store();
2616 let planner = Planner::new(store);
2617
2618 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2620 items: vec![ReturnItem {
2621 expression: LogicalExpression::Variable("missing".to_string()),
2622 alias: None,
2623 }],
2624 distinct: false,
2625 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2626 variable: "n".to_string(),
2627 label: None,
2628 input: None,
2629 })),
2630 }));
2631
2632 let result = planner.plan(&logical);
2633 assert!(result.is_err());
2634 }
2635
2636 #[test]
2639 fn test_convert_binary_ops() {
2640 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
2641 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
2642 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
2643 assert!(convert_binary_op(BinaryOp::Le).is_ok());
2644 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
2645 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
2646 assert!(convert_binary_op(BinaryOp::And).is_ok());
2647 assert!(convert_binary_op(BinaryOp::Or).is_ok());
2648 assert!(convert_binary_op(BinaryOp::Add).is_ok());
2649 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2650 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2651 assert!(convert_binary_op(BinaryOp::Div).is_ok());
2652 }
2653
2654 #[test]
2655 fn test_convert_unary_ops() {
2656 assert!(convert_unary_op(UnaryOp::Not).is_ok());
2657 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2658 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2659 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2660 }
2661
2662 #[test]
2663 fn test_convert_aggregate_functions() {
2664 assert!(matches!(
2665 convert_aggregate_function(LogicalAggregateFunction::Count),
2666 PhysicalAggregateFunction::Count
2667 ));
2668 assert!(matches!(
2669 convert_aggregate_function(LogicalAggregateFunction::Sum),
2670 PhysicalAggregateFunction::Sum
2671 ));
2672 assert!(matches!(
2673 convert_aggregate_function(LogicalAggregateFunction::Avg),
2674 PhysicalAggregateFunction::Avg
2675 ));
2676 assert!(matches!(
2677 convert_aggregate_function(LogicalAggregateFunction::Min),
2678 PhysicalAggregateFunction::Min
2679 ));
2680 assert!(matches!(
2681 convert_aggregate_function(LogicalAggregateFunction::Max),
2682 PhysicalAggregateFunction::Max
2683 ));
2684 }
2685
2686 #[test]
2687 fn test_planner_accessors() {
2688 let store = create_test_store();
2689 let planner = Planner::new(Arc::clone(&store));
2690
2691 assert!(planner.tx_id().is_none());
2692 assert!(planner.tx_manager().is_none());
2693 let _ = planner.viewing_epoch(); }
2695
2696 #[test]
2697 fn test_physical_plan_accessors() {
2698 let store = create_test_store();
2699 let planner = Planner::new(store);
2700
2701 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2702 variable: "n".to_string(),
2703 label: None,
2704 input: None,
2705 }));
2706
2707 let physical = planner.plan(&logical).unwrap();
2708 assert_eq!(physical.columns(), &["n"]);
2709
2710 let _ = physical.into_operator();
2712 }
2713}