1use crate::query::plan::{
6 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
7 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
8 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
9 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, SkipOp, SortOp,
10 SortOrder, UnaryOp, UnionOp, UnwindOp,
11};
12use graphos_common::types::LogicalType;
13use graphos_common::types::{EpochId, TxId};
14use graphos_common::utils::error::{Error, Result};
15use graphos_core::execution::operators::{
16 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
17 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
18 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
19 ExpressionPredicate, FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
20 JoinType as PhysicalJoinType, LimitOperator, MergeOperator, NullOrder, Operator, ProjectExpr,
21 ProjectOperator, PropertySource, RemoveLabelOperator, ScanOperator, SetPropertyOperator,
22 SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
23 UnaryFilterOp, UnionOperator, UnwindOperator,
24};
25use graphos_core::graph::{Direction, lpg::LpgStore};
26use std::collections::HashMap;
27use std::sync::Arc;
28
29use crate::transaction::TransactionManager;
30
31pub struct Planner {
33 store: Arc<LpgStore>,
35 tx_manager: Option<Arc<TransactionManager>>,
37 tx_id: Option<TxId>,
39 viewing_epoch: EpochId,
41 anon_edge_counter: std::cell::Cell<u32>,
43}
44
45impl Planner {
46 #[must_use]
51 pub fn new(store: Arc<LpgStore>) -> Self {
52 let epoch = store.current_epoch();
53 Self {
54 store,
55 tx_manager: None,
56 tx_id: None,
57 viewing_epoch: epoch,
58 anon_edge_counter: std::cell::Cell::new(0),
59 }
60 }
61
62 #[must_use]
71 pub fn with_context(
72 store: Arc<LpgStore>,
73 tx_manager: Arc<TransactionManager>,
74 tx_id: Option<TxId>,
75 viewing_epoch: EpochId,
76 ) -> Self {
77 Self {
78 store,
79 tx_manager: Some(tx_manager),
80 tx_id,
81 viewing_epoch,
82 anon_edge_counter: std::cell::Cell::new(0),
83 }
84 }
85
86 #[must_use]
88 pub fn viewing_epoch(&self) -> EpochId {
89 self.viewing_epoch
90 }
91
92 #[must_use]
94 pub fn tx_id(&self) -> Option<TxId> {
95 self.tx_id
96 }
97
98 #[must_use]
100 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
101 self.tx_manager.as_ref()
102 }
103
104 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
110 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
111 Ok(PhysicalPlan { operator, columns })
112 }
113
114 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
116 match op {
117 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
118 LogicalOperator::Expand(expand) => self.plan_expand(expand),
119 LogicalOperator::Return(ret) => self.plan_return(ret),
120 LogicalOperator::Filter(filter) => self.plan_filter(filter),
121 LogicalOperator::Project(project) => {
122 self.plan_operator(&project.input)
124 }
125 LogicalOperator::Limit(limit) => self.plan_limit(limit),
126 LogicalOperator::Skip(skip) => self.plan_skip(skip),
127 LogicalOperator::Sort(sort) => self.plan_sort(sort),
128 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
129 LogicalOperator::Join(join) => self.plan_join(join),
130 LogicalOperator::Union(union) => self.plan_union(union),
131 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
132 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
133 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
134 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
135 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
136 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
137 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
138 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
139 LogicalOperator::Merge(merge) => self.plan_merge(merge),
140 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
141 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
142 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
143 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
144 _ => Err(Error::Internal(format!(
145 "Unsupported operator: {:?}",
146 std::mem::discriminant(op)
147 ))),
148 }
149 }
150
151 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
153 let scan_op = if let Some(label) = &scan.label {
154 ScanOperator::with_label(Arc::clone(&self.store), label)
155 } else {
156 ScanOperator::new(Arc::clone(&self.store))
157 };
158
159 let operator: Box<dyn Operator> =
161 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
162
163 let columns = vec![scan.variable.clone()];
164
165 Ok((operator, columns))
168 }
169
170 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
172 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
174
175 let source_column = input_columns
177 .iter()
178 .position(|c| c == &expand.from_variable)
179 .ok_or_else(|| {
180 Error::Internal(format!(
181 "Source variable '{}' not found in input columns",
182 expand.from_variable
183 ))
184 })?;
185
186 let direction = match expand.direction {
188 ExpandDirection::Outgoing => Direction::Outgoing,
189 ExpandDirection::Incoming => Direction::Incoming,
190 ExpandDirection::Both => Direction::Both,
191 };
192
193 let expand_op = ExpandOperator::new(
195 Arc::clone(&self.store),
196 input_op,
197 source_column,
198 direction,
199 expand.edge_type.clone(),
200 )
201 .with_tx_context(self.viewing_epoch, self.tx_id);
202
203 let operator: Box<dyn Operator> = Box::new(expand_op);
204
205 let mut columns = input_columns;
208
209 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
211 let count = self.anon_edge_counter.get();
212 self.anon_edge_counter.set(count + 1);
213 format!("_anon_edge_{}", count)
214 });
215 columns.push(edge_col_name);
216
217 columns.push(expand.to_variable.clone());
218
219 Ok((operator, columns))
220 }
221
222 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
224 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
226
227 let variable_columns: HashMap<String, usize> = input_columns
229 .iter()
230 .enumerate()
231 .map(|(i, name)| (name.clone(), i))
232 .collect();
233
234 let columns: Vec<String> = ret
236 .items
237 .iter()
238 .map(|item| {
239 item.alias.clone().unwrap_or_else(|| {
240 expression_to_string(&item.expression)
242 })
243 })
244 .collect();
245
246 let needs_project = ret
248 .items
249 .iter()
250 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
251
252 if needs_project {
253 let mut projections = Vec::with_capacity(ret.items.len());
255 let mut output_types = Vec::with_capacity(ret.items.len());
256
257 for item in &ret.items {
258 match &item.expression {
259 LogicalExpression::Variable(name) => {
260 let col_idx = *variable_columns.get(name).ok_or_else(|| {
261 Error::Internal(format!("Variable '{}' not found in input", name))
262 })?;
263 projections.push(ProjectExpr::Column(col_idx));
264 output_types.push(LogicalType::Node);
266 }
267 LogicalExpression::Property { variable, property } => {
268 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
269 Error::Internal(format!("Variable '{}' not found in input", variable))
270 })?;
271 projections.push(ProjectExpr::PropertyAccess {
272 column: col_idx,
273 property: property.clone(),
274 });
275 output_types.push(LogicalType::String);
277 }
278 LogicalExpression::Literal(value) => {
279 projections.push(ProjectExpr::Constant(value.clone()));
280 output_types.push(value_to_logical_type(value));
281 }
282 _ => {
283 return Err(Error::Internal(format!(
284 "Unsupported RETURN expression: {:?}",
285 item.expression
286 )));
287 }
288 }
289 }
290
291 let operator = Box::new(ProjectOperator::with_store(
292 input_op,
293 projections,
294 output_types,
295 Arc::clone(&self.store),
296 ));
297
298 Ok((operator, columns))
299 } else {
300 let mut projections = Vec::with_capacity(ret.items.len());
303 let mut output_types = Vec::with_capacity(ret.items.len());
304
305 for item in &ret.items {
306 if let LogicalExpression::Variable(name) = &item.expression {
307 let col_idx = *variable_columns.get(name).ok_or_else(|| {
308 Error::Internal(format!("Variable '{}' not found in input", name))
309 })?;
310 projections.push(ProjectExpr::Column(col_idx));
311 output_types.push(LogicalType::Node);
312 }
313 }
314
315 if projections.len() == input_columns.len()
317 && projections
318 .iter()
319 .enumerate()
320 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
321 {
322 Ok((input_op, columns))
324 } else {
325 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
326 Ok((operator, columns))
327 }
328 }
329 }
330
331 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
333 let (input_op, columns) = self.plan_operator(&filter.input)?;
335
336 let variable_columns: HashMap<String, usize> = columns
338 .iter()
339 .enumerate()
340 .map(|(i, name)| (name.clone(), i))
341 .collect();
342
343 let filter_expr = self.convert_expression(&filter.predicate)?;
345
346 let predicate =
348 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
349
350 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
352
353 Ok((operator, columns))
354 }
355
356 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
358 let (input_op, columns) = self.plan_operator(&limit.input)?;
359 let output_schema = self.derive_schema_from_columns(&columns);
360 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
361 Ok((operator, columns))
362 }
363
364 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
366 let (input_op, columns) = self.plan_operator(&skip.input)?;
367 let output_schema = self.derive_schema_from_columns(&columns);
368 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
369 Ok((operator, columns))
370 }
371
372 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
374 let (input_op, columns) = self.plan_operator(&sort.input)?;
375
376 let variable_columns: HashMap<String, usize> = columns
378 .iter()
379 .enumerate()
380 .map(|(i, name)| (name.clone(), i))
381 .collect();
382
383 let physical_keys: Vec<PhysicalSortKey> = sort
385 .keys
386 .iter()
387 .map(|key| {
388 let col_idx = self.resolve_sort_expression(&key.expression, &variable_columns)?;
389 Ok(PhysicalSortKey {
390 column: col_idx,
391 direction: match key.order {
392 SortOrder::Ascending => SortDirection::Ascending,
393 SortOrder::Descending => SortDirection::Descending,
394 },
395 null_order: NullOrder::NullsLast,
396 })
397 })
398 .collect::<Result<Vec<_>>>()?;
399
400 let output_schema = self.derive_schema_from_columns(&columns);
401 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
402 Ok((operator, columns))
403 }
404
405 fn resolve_sort_expression(
407 &self,
408 expr: &LogicalExpression,
409 variable_columns: &HashMap<String, usize>,
410 ) -> Result<usize> {
411 match expr {
412 LogicalExpression::Variable(name) => {
413 variable_columns.get(name).copied().ok_or_else(|| {
414 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
415 })
416 }
417 LogicalExpression::Property { variable, .. } => {
418 variable_columns.get(variable).copied().ok_or_else(|| {
420 Error::Internal(format!("Variable '{}' not found for ORDER BY", variable))
421 })
422 }
423 _ => Err(Error::Internal(format!(
424 "Unsupported ORDER BY expression: {:?}",
425 expr
426 ))),
427 }
428 }
429
430 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
432 columns.iter().map(|_| LogicalType::Node).collect()
433 }
434
435 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
437 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
438
439 let mut variable_columns: HashMap<String, usize> = input_columns
441 .iter()
442 .enumerate()
443 .map(|(i, name)| (name.clone(), i))
444 .collect();
445
446 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
449
450 for expr in &agg.group_by {
452 if let LogicalExpression::Property { variable, property } = expr {
453 let col_name = format!("{}_{}", variable, property);
454 if !variable_columns.contains_key(&col_name) {
455 property_projections.push((
456 variable.clone(),
457 property.clone(),
458 col_name.clone(),
459 ));
460 variable_columns.insert(col_name, next_col_idx);
461 next_col_idx += 1;
462 }
463 }
464 }
465
466 for agg_expr in &agg.aggregates {
468 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
469 let col_name = format!("{}_{}", variable, property);
470 if !variable_columns.contains_key(&col_name) {
471 property_projections.push((
472 variable.clone(),
473 property.clone(),
474 col_name.clone(),
475 ));
476 variable_columns.insert(col_name, next_col_idx);
477 next_col_idx += 1;
478 }
479 }
480 }
481
482 if !property_projections.is_empty() {
484 let mut projections = Vec::new();
485 let mut output_types = Vec::new();
486
487 for (i, _) in input_columns.iter().enumerate() {
489 projections.push(ProjectExpr::Column(i));
490 output_types.push(LogicalType::Node);
491 }
492
493 for (variable, property, _col_name) in &property_projections {
495 let source_col = *variable_columns.get(variable).ok_or_else(|| {
496 Error::Internal(format!(
497 "Variable '{}' not found for property projection",
498 variable
499 ))
500 })?;
501 projections.push(ProjectExpr::PropertyAccess {
502 column: source_col,
503 property: property.clone(),
504 });
505 output_types.push(LogicalType::Int64); }
507
508 input_op = Box::new(ProjectOperator::with_store(
509 input_op,
510 projections,
511 output_types,
512 Arc::clone(&self.store),
513 ));
514 }
515
516 let group_columns: Vec<usize> = agg
518 .group_by
519 .iter()
520 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
521 .collect::<Result<Vec<_>>>()?;
522
523 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
525 .aggregates
526 .iter()
527 .map(|agg_expr| {
528 let column = agg_expr
529 .expression
530 .as_ref()
531 .map(|e| {
532 self.resolve_expression_to_column_with_properties(e, &variable_columns)
533 })
534 .transpose()?;
535
536 Ok(PhysicalAggregateExpr {
537 function: convert_aggregate_function(agg_expr.function),
538 column,
539 distinct: agg_expr.distinct,
540 alias: agg_expr.alias.clone(),
541 })
542 })
543 .collect::<Result<Vec<_>>>()?;
544
545 let mut output_schema = Vec::new();
547 let mut output_columns = Vec::new();
548
549 for (idx, expr) in agg.group_by.iter().enumerate() {
551 output_schema.push(LogicalType::Node); output_columns.push(expression_to_string(expr));
553 if idx < group_columns.len() {
555 }
557 }
558
559 for agg_expr in &agg.aggregates {
561 let result_type = match agg_expr.function {
562 LogicalAggregateFunction::Count => LogicalType::Int64,
563 LogicalAggregateFunction::Sum => LogicalType::Int64,
564 LogicalAggregateFunction::Avg => LogicalType::Float64,
565 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
566 LogicalType::Int64
570 }
571 LogicalAggregateFunction::Collect => LogicalType::String, };
573 output_schema.push(result_type);
574 output_columns.push(
575 agg_expr
576 .alias
577 .clone()
578 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
579 );
580 }
581
582 let operator: Box<dyn Operator> = if group_columns.is_empty() {
584 Box::new(SimpleAggregateOperator::new(
585 input_op,
586 physical_aggregates,
587 output_schema,
588 ))
589 } else {
590 Box::new(HashAggregateOperator::new(
591 input_op,
592 group_columns,
593 physical_aggregates,
594 output_schema,
595 ))
596 };
597
598 Ok((operator, output_columns))
599 }
600
601 #[allow(dead_code)]
603 fn resolve_expression_to_column(
604 &self,
605 expr: &LogicalExpression,
606 variable_columns: &HashMap<String, usize>,
607 ) -> Result<usize> {
608 match expr {
609 LogicalExpression::Variable(name) => variable_columns
610 .get(name)
611 .copied()
612 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
613 LogicalExpression::Property { variable, .. } => variable_columns
614 .get(variable)
615 .copied()
616 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
617 _ => Err(Error::Internal(format!(
618 "Cannot resolve expression to column: {:?}",
619 expr
620 ))),
621 }
622 }
623
624 fn resolve_expression_to_column_with_properties(
628 &self,
629 expr: &LogicalExpression,
630 variable_columns: &HashMap<String, usize>,
631 ) -> Result<usize> {
632 match expr {
633 LogicalExpression::Variable(name) => variable_columns
634 .get(name)
635 .copied()
636 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
637 LogicalExpression::Property { variable, property } => {
638 let col_name = format!("{}_{}", variable, property);
640 variable_columns.get(&col_name).copied().ok_or_else(|| {
641 Error::Internal(format!(
642 "Property column '{}' not found (from {}.{})",
643 col_name, variable, property
644 ))
645 })
646 }
647 _ => Err(Error::Internal(format!(
648 "Cannot resolve expression to column: {:?}",
649 expr
650 ))),
651 }
652 }
653
654 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
656 match expr {
657 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
658 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
659 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
660 variable: variable.clone(),
661 property: property.clone(),
662 }),
663 LogicalExpression::Binary { left, op, right } => {
664 let left_expr = self.convert_expression(left)?;
665 let right_expr = self.convert_expression(right)?;
666 let filter_op = convert_binary_op(*op)?;
667 Ok(FilterExpression::Binary {
668 left: Box::new(left_expr),
669 op: filter_op,
670 right: Box::new(right_expr),
671 })
672 }
673 LogicalExpression::Unary { op, operand } => {
674 let operand_expr = self.convert_expression(operand)?;
675 let filter_op = convert_unary_op(*op)?;
676 Ok(FilterExpression::Unary {
677 op: filter_op,
678 operand: Box::new(operand_expr),
679 })
680 }
681 LogicalExpression::FunctionCall { name, args } => {
682 let filter_args: Vec<FilterExpression> = args
683 .iter()
684 .map(|a| self.convert_expression(a))
685 .collect::<Result<Vec<_>>>()?;
686 Ok(FilterExpression::FunctionCall {
687 name: name.clone(),
688 args: filter_args,
689 })
690 }
691 LogicalExpression::Case {
692 operand,
693 when_clauses,
694 else_clause,
695 } => {
696 let filter_operand = operand
697 .as_ref()
698 .map(|e| self.convert_expression(e))
699 .transpose()?
700 .map(Box::new);
701 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
702 .iter()
703 .map(|(cond, result)| {
704 Ok((
705 self.convert_expression(cond)?,
706 self.convert_expression(result)?,
707 ))
708 })
709 .collect::<Result<Vec<_>>>()?;
710 let filter_else = else_clause
711 .as_ref()
712 .map(|e| self.convert_expression(e))
713 .transpose()?
714 .map(Box::new);
715 Ok(FilterExpression::Case {
716 operand: filter_operand,
717 when_clauses: filter_when_clauses,
718 else_clause: filter_else,
719 })
720 }
721 LogicalExpression::List(items) => {
722 let filter_items: Vec<FilterExpression> = items
723 .iter()
724 .map(|item| self.convert_expression(item))
725 .collect::<Result<Vec<_>>>()?;
726 Ok(FilterExpression::List(filter_items))
727 }
728 LogicalExpression::Map(pairs) => {
729 let filter_pairs: Vec<(String, FilterExpression)> = pairs
730 .iter()
731 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
732 .collect::<Result<Vec<_>>>()?;
733 Ok(FilterExpression::Map(filter_pairs))
734 }
735 LogicalExpression::IndexAccess { base, index } => {
736 let base_expr = self.convert_expression(base)?;
737 let index_expr = self.convert_expression(index)?;
738 Ok(FilterExpression::IndexAccess {
739 base: Box::new(base_expr),
740 index: Box::new(index_expr),
741 })
742 }
743 LogicalExpression::SliceAccess { base, start, end } => {
744 let base_expr = self.convert_expression(base)?;
745 let start_expr = start
746 .as_ref()
747 .map(|s| self.convert_expression(s))
748 .transpose()?
749 .map(Box::new);
750 let end_expr = end
751 .as_ref()
752 .map(|e| self.convert_expression(e))
753 .transpose()?
754 .map(Box::new);
755 Ok(FilterExpression::SliceAccess {
756 base: Box::new(base_expr),
757 start: start_expr,
758 end: end_expr,
759 })
760 }
761 LogicalExpression::Parameter(_) => Err(Error::Internal(
762 "Parameters not yet supported in filters".to_string(),
763 )),
764 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
765 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
766 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
767 LogicalExpression::ListComprehension {
768 variable,
769 list_expr,
770 filter_expr,
771 map_expr,
772 } => {
773 let list = self.convert_expression(list_expr)?;
774 let filter = filter_expr
775 .as_ref()
776 .map(|f| self.convert_expression(f))
777 .transpose()?
778 .map(Box::new);
779 let map = self.convert_expression(map_expr)?;
780 Ok(FilterExpression::ListComprehension {
781 variable: variable.clone(),
782 list_expr: Box::new(list),
783 filter_expr: filter,
784 map_expr: Box::new(map),
785 })
786 }
787 LogicalExpression::ExistsSubquery(subplan) => {
788 let (start_var, direction, edge_type, end_labels) =
791 self.extract_exists_pattern(subplan)?;
792
793 Ok(FilterExpression::ExistsSubquery {
794 start_var,
795 direction,
796 edge_type,
797 end_labels,
798 min_hops: None,
799 max_hops: None,
800 })
801 }
802 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
803 "COUNT subqueries not yet supported".to_string(),
804 )),
805 }
806 }
807
808 fn extract_exists_pattern(
811 &self,
812 subplan: &LogicalOperator,
813 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
814 match subplan {
815 LogicalOperator::Expand(expand) => {
816 let end_labels = self.extract_end_labels_from_expand(expand);
818 let direction = match expand.direction {
819 ExpandDirection::Outgoing => Direction::Outgoing,
820 ExpandDirection::Incoming => Direction::Incoming,
821 ExpandDirection::Both => Direction::Both,
822 };
823 Ok((
824 expand.from_variable.clone(),
825 direction,
826 expand.edge_type.clone(),
827 end_labels,
828 ))
829 }
830 LogicalOperator::NodeScan(scan) => {
831 if let Some(input) = &scan.input {
832 self.extract_exists_pattern(input)
833 } else {
834 Err(Error::Internal(
835 "EXISTS subquery must contain an edge pattern".to_string(),
836 ))
837 }
838 }
839 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
840 _ => Err(Error::Internal(
841 "Unsupported EXISTS subquery pattern".to_string(),
842 )),
843 }
844 }
845
846 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
848 match expand.input.as_ref() {
850 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
851 _ => None,
852 }
853 }
854
855 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
857 let (left_op, left_columns) = self.plan_operator(&join.left)?;
858 let (right_op, right_columns) = self.plan_operator(&join.right)?;
859
860 let mut columns = left_columns.clone();
862 columns.extend(right_columns.clone());
863
864 let physical_join_type = match join.join_type {
866 JoinType::Inner => PhysicalJoinType::Inner,
867 JoinType::Left => PhysicalJoinType::Left,
868 JoinType::Right => PhysicalJoinType::Right,
869 JoinType::Full => PhysicalJoinType::Full,
870 JoinType::Cross => PhysicalJoinType::Cross,
871 JoinType::Semi => PhysicalJoinType::Semi,
872 JoinType::Anti => PhysicalJoinType::Anti,
873 };
874
875 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
877 (vec![], vec![])
879 } else {
880 join.conditions
881 .iter()
882 .filter_map(|cond| {
883 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
885 let right_idx = self
886 .expression_to_column(&cond.right, &right_columns)
887 .ok()?;
888 Some((left_idx, right_idx))
889 })
890 .unzip()
891 };
892
893 let output_schema = self.derive_schema_from_columns(&columns);
894
895 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
896 left_op,
897 right_op,
898 probe_keys,
899 build_keys,
900 physical_join_type,
901 output_schema,
902 ));
903
904 Ok((operator, columns))
905 }
906
907 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
909 match expr {
910 LogicalExpression::Variable(name) => columns
911 .iter()
912 .position(|c| c == name)
913 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
914 _ => Err(Error::Internal(
915 "Only variables supported in join conditions".to_string(),
916 )),
917 }
918 }
919
920 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
922 if union.inputs.is_empty() {
923 return Err(Error::Internal(
924 "Union requires at least one input".to_string(),
925 ));
926 }
927
928 let mut inputs = Vec::with_capacity(union.inputs.len());
929 let mut columns = Vec::new();
930
931 for (i, input) in union.inputs.iter().enumerate() {
932 let (op, cols) = self.plan_operator(input)?;
933 if i == 0 {
934 columns = cols;
935 }
936 inputs.push(op);
937 }
938
939 let output_schema = self.derive_schema_from_columns(&columns);
940 let operator = Box::new(UnionOperator::new(inputs, output_schema));
941
942 Ok((operator, columns))
943 }
944
945 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
947 let (input_op, columns) = self.plan_operator(&distinct.input)?;
948 let output_schema = self.derive_schema_from_columns(&columns);
949 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
950 Ok((operator, columns))
951 }
952
953 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
955 let (input_op, mut columns) = if let Some(ref input) = create.input {
957 let (op, cols) = self.plan_operator(input)?;
958 (Some(op), cols)
959 } else {
960 (None, vec![])
961 };
962
963 let output_column = columns.len();
965 columns.push(create.variable.clone());
966
967 let properties: Vec<(String, PropertySource)> = create
969 .properties
970 .iter()
971 .map(|(name, expr)| {
972 let source = match expr {
973 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
974 _ => PropertySource::Constant(graphos_common::types::Value::Null),
975 };
976 (name.clone(), source)
977 })
978 .collect();
979
980 let output_schema = self.derive_schema_from_columns(&columns);
981
982 let operator = Box::new(
983 CreateNodeOperator::new(
984 Arc::clone(&self.store),
985 input_op,
986 create.labels.clone(),
987 properties,
988 output_schema,
989 output_column,
990 )
991 .with_tx_context(self.viewing_epoch, self.tx_id),
992 );
993
994 Ok((operator, columns))
995 }
996
997 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
999 let (input_op, mut columns) = self.plan_operator(&create.input)?;
1000
1001 let from_column = columns
1003 .iter()
1004 .position(|c| c == &create.from_variable)
1005 .ok_or_else(|| {
1006 Error::Internal(format!(
1007 "Source variable '{}' not found",
1008 create.from_variable
1009 ))
1010 })?;
1011
1012 let to_column = columns
1013 .iter()
1014 .position(|c| c == &create.to_variable)
1015 .ok_or_else(|| {
1016 Error::Internal(format!(
1017 "Target variable '{}' not found",
1018 create.to_variable
1019 ))
1020 })?;
1021
1022 let output_column = create.variable.as_ref().map(|v| {
1024 let idx = columns.len();
1025 columns.push(v.clone());
1026 idx
1027 });
1028
1029 let properties: Vec<(String, PropertySource)> = create
1031 .properties
1032 .iter()
1033 .map(|(name, expr)| {
1034 let source = match expr {
1035 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1036 _ => PropertySource::Constant(graphos_common::types::Value::Null),
1037 };
1038 (name.clone(), source)
1039 })
1040 .collect();
1041
1042 let output_schema = self.derive_schema_from_columns(&columns);
1043
1044 let operator = Box::new(
1045 CreateEdgeOperator::new(
1046 Arc::clone(&self.store),
1047 input_op,
1048 from_column,
1049 to_column,
1050 create.edge_type.clone(),
1051 properties,
1052 output_schema,
1053 output_column,
1054 )
1055 .with_tx_context(self.viewing_epoch, self.tx_id),
1056 );
1057
1058 Ok((operator, columns))
1059 }
1060
1061 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1063 let (input_op, columns) = self.plan_operator(&delete.input)?;
1064
1065 let node_column = columns
1066 .iter()
1067 .position(|c| c == &delete.variable)
1068 .ok_or_else(|| {
1069 Error::Internal(format!(
1070 "Variable '{}' not found for delete",
1071 delete.variable
1072 ))
1073 })?;
1074
1075 let output_schema = vec![LogicalType::Int64];
1077 let output_columns = vec!["deleted_count".to_string()];
1078
1079 let operator = Box::new(
1080 DeleteNodeOperator::new(
1081 Arc::clone(&self.store),
1082 input_op,
1083 node_column,
1084 output_schema,
1085 true, )
1087 .with_tx_context(self.viewing_epoch, self.tx_id),
1088 );
1089
1090 Ok((operator, output_columns))
1091 }
1092
1093 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1095 let (input_op, columns) = self.plan_operator(&delete.input)?;
1096
1097 let edge_column = columns
1098 .iter()
1099 .position(|c| c == &delete.variable)
1100 .ok_or_else(|| {
1101 Error::Internal(format!(
1102 "Variable '{}' not found for delete",
1103 delete.variable
1104 ))
1105 })?;
1106
1107 let output_schema = vec![LogicalType::Int64];
1109 let output_columns = vec!["deleted_count".to_string()];
1110
1111 let operator = Box::new(
1112 DeleteEdgeOperator::new(
1113 Arc::clone(&self.store),
1114 input_op,
1115 edge_column,
1116 output_schema,
1117 )
1118 .with_tx_context(self.viewing_epoch, self.tx_id),
1119 );
1120
1121 Ok((operator, output_columns))
1122 }
1123
1124 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1126 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
1127 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
1128
1129 let mut columns = left_columns.clone();
1131 columns.extend(right_columns.clone());
1132
1133 let mut probe_keys = Vec::new();
1135 let mut build_keys = Vec::new();
1136
1137 for (right_idx, right_col) in right_columns.iter().enumerate() {
1138 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1139 probe_keys.push(left_idx);
1140 build_keys.push(right_idx);
1141 }
1142 }
1143
1144 let output_schema = self.derive_schema_from_columns(&columns);
1145
1146 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1147 left_op,
1148 right_op,
1149 probe_keys,
1150 build_keys,
1151 PhysicalJoinType::Left,
1152 output_schema,
1153 ));
1154
1155 Ok((operator, columns))
1156 }
1157
1158 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1160 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
1161 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
1162
1163 let columns = left_columns.clone();
1165
1166 let mut probe_keys = Vec::new();
1168 let mut build_keys = Vec::new();
1169
1170 for (right_idx, right_col) in right_columns.iter().enumerate() {
1171 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1172 probe_keys.push(left_idx);
1173 build_keys.push(right_idx);
1174 }
1175 }
1176
1177 let output_schema = self.derive_schema_from_columns(&columns);
1178
1179 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1180 left_op,
1181 right_op,
1182 probe_keys,
1183 build_keys,
1184 PhysicalJoinType::Anti,
1185 output_schema,
1186 ));
1187
1188 Ok((operator, columns))
1189 }
1190
1191 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1193 let (input_op, input_columns) = self.plan_operator(&unwind.input)?;
1195
1196 let list_col_idx = match &unwind.expression {
1202 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
1203 LogicalExpression::Property { variable, .. } => {
1204 input_columns.iter().position(|c| c == variable)
1207 }
1208 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
1209 None
1211 }
1212 _ => None,
1213 };
1214
1215 let mut columns = input_columns.clone();
1217 columns.push(unwind.variable.clone());
1218
1219 let mut output_schema = self.derive_schema_from_columns(&input_columns);
1221 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
1226
1227 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
1228 input_op,
1229 col_idx,
1230 unwind.variable.clone(),
1231 output_schema,
1232 ));
1233
1234 Ok((operator, columns))
1235 }
1236
1237 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1239 let (_input_op, mut columns) = self.plan_operator(&merge.input)?;
1241
1242 let match_properties: Vec<(String, graphos_common::types::Value)> = merge
1244 .match_properties
1245 .iter()
1246 .filter_map(|(name, expr)| {
1247 if let LogicalExpression::Literal(v) = expr {
1248 Some((name.clone(), v.clone()))
1249 } else {
1250 None }
1252 })
1253 .collect();
1254
1255 let on_create_properties: Vec<(String, graphos_common::types::Value)> = merge
1257 .on_create
1258 .iter()
1259 .filter_map(|(name, expr)| {
1260 if let LogicalExpression::Literal(v) = expr {
1261 Some((name.clone(), v.clone()))
1262 } else {
1263 None
1264 }
1265 })
1266 .collect();
1267
1268 let on_match_properties: Vec<(String, graphos_common::types::Value)> = merge
1270 .on_match
1271 .iter()
1272 .filter_map(|(name, expr)| {
1273 if let LogicalExpression::Literal(v) = expr {
1274 Some((name.clone(), v.clone()))
1275 } else {
1276 None
1277 }
1278 })
1279 .collect();
1280
1281 columns.push(merge.variable.clone());
1283
1284 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
1285 Arc::clone(&self.store),
1286 merge.variable.clone(),
1287 merge.labels.clone(),
1288 match_properties,
1289 on_create_properties,
1290 on_match_properties,
1291 ));
1292
1293 Ok((operator, columns))
1294 }
1295
1296 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1298 let (input_op, columns) = self.plan_operator(&add_label.input)?;
1299
1300 let node_column = columns
1302 .iter()
1303 .position(|c| c == &add_label.variable)
1304 .ok_or_else(|| {
1305 Error::Internal(format!(
1306 "Variable '{}' not found for ADD LABEL",
1307 add_label.variable
1308 ))
1309 })?;
1310
1311 let output_schema = vec![LogicalType::Int64];
1313 let output_columns = vec!["labels_added".to_string()];
1314
1315 let operator = Box::new(AddLabelOperator::new(
1316 Arc::clone(&self.store),
1317 input_op,
1318 node_column,
1319 add_label.labels.clone(),
1320 output_schema,
1321 ));
1322
1323 Ok((operator, output_columns))
1324 }
1325
1326 fn plan_remove_label(
1328 &self,
1329 remove_label: &RemoveLabelOp,
1330 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1331 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
1332
1333 let node_column = columns
1335 .iter()
1336 .position(|c| c == &remove_label.variable)
1337 .ok_or_else(|| {
1338 Error::Internal(format!(
1339 "Variable '{}' not found for REMOVE LABEL",
1340 remove_label.variable
1341 ))
1342 })?;
1343
1344 let output_schema = vec![LogicalType::Int64];
1346 let output_columns = vec!["labels_removed".to_string()];
1347
1348 let operator = Box::new(RemoveLabelOperator::new(
1349 Arc::clone(&self.store),
1350 input_op,
1351 node_column,
1352 remove_label.labels.clone(),
1353 output_schema,
1354 ));
1355
1356 Ok((operator, output_columns))
1357 }
1358
1359 fn plan_set_property(
1361 &self,
1362 set_prop: &SetPropertyOp,
1363 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1364 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
1365
1366 let entity_column = columns
1368 .iter()
1369 .position(|c| c == &set_prop.variable)
1370 .ok_or_else(|| {
1371 Error::Internal(format!(
1372 "Variable '{}' not found for SET",
1373 set_prop.variable
1374 ))
1375 })?;
1376
1377 let properties: Vec<(String, PropertySource)> = set_prop
1379 .properties
1380 .iter()
1381 .map(|(name, expr)| {
1382 let source = self.expression_to_property_source(expr, &columns)?;
1383 Ok((name.clone(), source))
1384 })
1385 .collect::<Result<Vec<_>>>()?;
1386
1387 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
1389 let output_columns = columns.clone();
1390
1391 let operator = Box::new(SetPropertyOperator::new_for_node(
1393 Arc::clone(&self.store),
1394 input_op,
1395 entity_column,
1396 properties,
1397 output_schema,
1398 ));
1399
1400 Ok((operator, output_columns))
1401 }
1402
1403 fn expression_to_property_source(
1405 &self,
1406 expr: &LogicalExpression,
1407 columns: &[String],
1408 ) -> Result<PropertySource> {
1409 match expr {
1410 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
1411 LogicalExpression::Variable(name) => {
1412 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
1413 Error::Internal(format!("Variable '{}' not found for property source", name))
1414 })?;
1415 Ok(PropertySource::Column(col_idx))
1416 }
1417 LogicalExpression::Parameter(name) => {
1418 Ok(PropertySource::Constant(
1421 graphos_common::types::Value::String(format!("${}", name).into()),
1422 ))
1423 }
1424 _ => Err(Error::Internal(format!(
1425 "Unsupported expression type for property source: {:?}",
1426 expr
1427 ))),
1428 }
1429 }
1430}
1431
1432pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
1434 match op {
1435 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
1436 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
1437 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
1438 BinaryOp::Le => Ok(BinaryFilterOp::Le),
1439 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
1440 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
1441 BinaryOp::And => Ok(BinaryFilterOp::And),
1442 BinaryOp::Or => Ok(BinaryFilterOp::Or),
1443 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
1444 BinaryOp::Add => Ok(BinaryFilterOp::Add),
1445 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
1446 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
1447 BinaryOp::Div => Ok(BinaryFilterOp::Div),
1448 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
1449 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
1450 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
1451 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
1452 BinaryOp::In => Ok(BinaryFilterOp::In),
1453 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
1454 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
1455 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
1456 "Binary operator {:?} not yet supported in filters",
1457 op
1458 ))),
1459 }
1460}
1461
1462pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
1464 match op {
1465 UnaryOp::Not => Ok(UnaryFilterOp::Not),
1466 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
1467 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
1468 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
1469 }
1470}
1471
1472pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
1474 match func {
1475 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
1476 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
1477 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
1478 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
1479 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
1480 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
1481 }
1482}
1483
1484pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
1488 match expr {
1489 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1490 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1491 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1492 variable: variable.clone(),
1493 property: property.clone(),
1494 }),
1495 LogicalExpression::Binary { left, op, right } => {
1496 let left_expr = convert_filter_expression(left)?;
1497 let right_expr = convert_filter_expression(right)?;
1498 let filter_op = convert_binary_op(*op)?;
1499 Ok(FilterExpression::Binary {
1500 left: Box::new(left_expr),
1501 op: filter_op,
1502 right: Box::new(right_expr),
1503 })
1504 }
1505 LogicalExpression::Unary { op, operand } => {
1506 let operand_expr = convert_filter_expression(operand)?;
1507 let filter_op = convert_unary_op(*op)?;
1508 Ok(FilterExpression::Unary {
1509 op: filter_op,
1510 operand: Box::new(operand_expr),
1511 })
1512 }
1513 LogicalExpression::FunctionCall { name, args } => {
1514 let filter_args: Vec<FilterExpression> = args
1515 .iter()
1516 .map(|a| convert_filter_expression(a))
1517 .collect::<Result<Vec<_>>>()?;
1518 Ok(FilterExpression::FunctionCall {
1519 name: name.clone(),
1520 args: filter_args,
1521 })
1522 }
1523 LogicalExpression::Case {
1524 operand,
1525 when_clauses,
1526 else_clause,
1527 } => {
1528 let filter_operand = operand
1529 .as_ref()
1530 .map(|e| convert_filter_expression(e))
1531 .transpose()?
1532 .map(Box::new);
1533 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1534 .iter()
1535 .map(|(cond, result)| {
1536 Ok((
1537 convert_filter_expression(cond)?,
1538 convert_filter_expression(result)?,
1539 ))
1540 })
1541 .collect::<Result<Vec<_>>>()?;
1542 let filter_else = else_clause
1543 .as_ref()
1544 .map(|e| convert_filter_expression(e))
1545 .transpose()?
1546 .map(Box::new);
1547 Ok(FilterExpression::Case {
1548 operand: filter_operand,
1549 when_clauses: filter_when_clauses,
1550 else_clause: filter_else,
1551 })
1552 }
1553 LogicalExpression::List(items) => {
1554 let filter_items: Vec<FilterExpression> = items
1555 .iter()
1556 .map(|item| convert_filter_expression(item))
1557 .collect::<Result<Vec<_>>>()?;
1558 Ok(FilterExpression::List(filter_items))
1559 }
1560 LogicalExpression::Map(pairs) => {
1561 let filter_pairs: Vec<(String, FilterExpression)> = pairs
1562 .iter()
1563 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
1564 .collect::<Result<Vec<_>>>()?;
1565 Ok(FilterExpression::Map(filter_pairs))
1566 }
1567 LogicalExpression::IndexAccess { base, index } => {
1568 let base_expr = convert_filter_expression(base)?;
1569 let index_expr = convert_filter_expression(index)?;
1570 Ok(FilterExpression::IndexAccess {
1571 base: Box::new(base_expr),
1572 index: Box::new(index_expr),
1573 })
1574 }
1575 LogicalExpression::SliceAccess { base, start, end } => {
1576 let base_expr = convert_filter_expression(base)?;
1577 let start_expr = start
1578 .as_ref()
1579 .map(|s| convert_filter_expression(s))
1580 .transpose()?
1581 .map(Box::new);
1582 let end_expr = end
1583 .as_ref()
1584 .map(|e| convert_filter_expression(e))
1585 .transpose()?
1586 .map(Box::new);
1587 Ok(FilterExpression::SliceAccess {
1588 base: Box::new(base_expr),
1589 start: start_expr,
1590 end: end_expr,
1591 })
1592 }
1593 LogicalExpression::Parameter(_) => Err(Error::Internal(
1594 "Parameters not yet supported in filters".to_string(),
1595 )),
1596 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1597 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1598 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1599 LogicalExpression::ListComprehension {
1600 variable,
1601 list_expr,
1602 filter_expr,
1603 map_expr,
1604 } => {
1605 let list = convert_filter_expression(list_expr)?;
1606 let filter = filter_expr
1607 .as_ref()
1608 .map(|f| convert_filter_expression(f))
1609 .transpose()?
1610 .map(Box::new);
1611 let map = convert_filter_expression(map_expr)?;
1612 Ok(FilterExpression::ListComprehension {
1613 variable: variable.clone(),
1614 list_expr: Box::new(list),
1615 filter_expr: filter,
1616 map_expr: Box::new(map),
1617 })
1618 }
1619 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
1620 Error::Internal("Subqueries not yet supported in filters".to_string()),
1621 ),
1622 }
1623}
1624
1625fn value_to_logical_type(value: &graphos_common::types::Value) -> LogicalType {
1627 use graphos_common::types::Value;
1628 match value {
1629 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
1631 Value::Int64(_) => LogicalType::Int64,
1632 Value::Float64(_) => LogicalType::Float64,
1633 Value::String(_) => LogicalType::String,
1634 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
1636 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, }
1639}
1640
1641fn expression_to_string(expr: &LogicalExpression) -> String {
1643 match expr {
1644 LogicalExpression::Variable(name) => name.clone(),
1645 LogicalExpression::Property { variable, property } => {
1646 format!("{variable}.{property}")
1647 }
1648 LogicalExpression::Literal(value) => format!("{value:?}"),
1649 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
1650 _ => "expr".to_string(),
1651 }
1652}
1653
1654pub struct PhysicalPlan {
1656 pub operator: Box<dyn Operator>,
1658 pub columns: Vec<String>,
1660}
1661
1662impl PhysicalPlan {
1663 #[must_use]
1665 pub fn columns(&self) -> &[String] {
1666 &self.columns
1667 }
1668
1669 pub fn into_operator(self) -> Box<dyn Operator> {
1671 self.operator
1672 }
1673}
1674
1675#[cfg(test)]
1676mod tests {
1677 use super::*;
1678 use crate::query::plan::{
1679 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
1680 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
1681 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
1682 SortKey, SortOp,
1683 };
1684 use graphos_common::types::Value;
1685
1686 fn create_test_store() -> Arc<LpgStore> {
1687 let store = Arc::new(LpgStore::new());
1688 store.create_node(&["Person"]);
1689 store.create_node(&["Person"]);
1690 store.create_node(&["Company"]);
1691 store
1692 }
1693
1694 #[test]
1697 fn test_plan_simple_scan() {
1698 let store = create_test_store();
1699 let planner = Planner::new(store);
1700
1701 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1703 items: vec![ReturnItem {
1704 expression: LogicalExpression::Variable("n".to_string()),
1705 alias: None,
1706 }],
1707 distinct: false,
1708 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1709 variable: "n".to_string(),
1710 label: Some("Person".to_string()),
1711 input: None,
1712 })),
1713 }));
1714
1715 let physical = planner.plan(&logical).unwrap();
1716 assert_eq!(physical.columns(), &["n"]);
1717 }
1718
1719 #[test]
1720 fn test_plan_scan_without_label() {
1721 let store = create_test_store();
1722 let planner = Planner::new(store);
1723
1724 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1726 items: vec![ReturnItem {
1727 expression: LogicalExpression::Variable("n".to_string()),
1728 alias: None,
1729 }],
1730 distinct: false,
1731 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1732 variable: "n".to_string(),
1733 label: None,
1734 input: None,
1735 })),
1736 }));
1737
1738 let physical = planner.plan(&logical).unwrap();
1739 assert_eq!(physical.columns(), &["n"]);
1740 }
1741
1742 #[test]
1743 fn test_plan_return_with_alias() {
1744 let store = create_test_store();
1745 let planner = Planner::new(store);
1746
1747 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1749 items: vec![ReturnItem {
1750 expression: LogicalExpression::Variable("n".to_string()),
1751 alias: Some("person".to_string()),
1752 }],
1753 distinct: false,
1754 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1755 variable: "n".to_string(),
1756 label: Some("Person".to_string()),
1757 input: None,
1758 })),
1759 }));
1760
1761 let physical = planner.plan(&logical).unwrap();
1762 assert_eq!(physical.columns(), &["person"]);
1763 }
1764
1765 #[test]
1766 fn test_plan_return_property() {
1767 let store = create_test_store();
1768 let planner = Planner::new(store);
1769
1770 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1772 items: vec![ReturnItem {
1773 expression: LogicalExpression::Property {
1774 variable: "n".to_string(),
1775 property: "name".to_string(),
1776 },
1777 alias: None,
1778 }],
1779 distinct: false,
1780 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1781 variable: "n".to_string(),
1782 label: Some("Person".to_string()),
1783 input: None,
1784 })),
1785 }));
1786
1787 let physical = planner.plan(&logical).unwrap();
1788 assert_eq!(physical.columns(), &["n.name"]);
1789 }
1790
1791 #[test]
1792 fn test_plan_return_literal() {
1793 let store = create_test_store();
1794 let planner = Planner::new(store);
1795
1796 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1798 items: vec![ReturnItem {
1799 expression: LogicalExpression::Literal(Value::Int64(42)),
1800 alias: Some("answer".to_string()),
1801 }],
1802 distinct: false,
1803 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1804 variable: "n".to_string(),
1805 label: None,
1806 input: None,
1807 })),
1808 }));
1809
1810 let physical = planner.plan(&logical).unwrap();
1811 assert_eq!(physical.columns(), &["answer"]);
1812 }
1813
1814 #[test]
1817 fn test_plan_filter_equality() {
1818 let store = create_test_store();
1819 let planner = Planner::new(store);
1820
1821 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1823 items: vec![ReturnItem {
1824 expression: LogicalExpression::Variable("n".to_string()),
1825 alias: None,
1826 }],
1827 distinct: false,
1828 input: Box::new(LogicalOperator::Filter(FilterOp {
1829 predicate: LogicalExpression::Binary {
1830 left: Box::new(LogicalExpression::Property {
1831 variable: "n".to_string(),
1832 property: "age".to_string(),
1833 }),
1834 op: BinaryOp::Eq,
1835 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1836 },
1837 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1838 variable: "n".to_string(),
1839 label: Some("Person".to_string()),
1840 input: None,
1841 })),
1842 })),
1843 }));
1844
1845 let physical = planner.plan(&logical).unwrap();
1846 assert_eq!(physical.columns(), &["n"]);
1847 }
1848
1849 #[test]
1850 fn test_plan_filter_compound_and() {
1851 let store = create_test_store();
1852 let planner = Planner::new(store);
1853
1854 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1856 items: vec![ReturnItem {
1857 expression: LogicalExpression::Variable("n".to_string()),
1858 alias: None,
1859 }],
1860 distinct: false,
1861 input: Box::new(LogicalOperator::Filter(FilterOp {
1862 predicate: LogicalExpression::Binary {
1863 left: Box::new(LogicalExpression::Binary {
1864 left: Box::new(LogicalExpression::Property {
1865 variable: "n".to_string(),
1866 property: "age".to_string(),
1867 }),
1868 op: BinaryOp::Gt,
1869 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1870 }),
1871 op: BinaryOp::And,
1872 right: Box::new(LogicalExpression::Binary {
1873 left: Box::new(LogicalExpression::Property {
1874 variable: "n".to_string(),
1875 property: "age".to_string(),
1876 }),
1877 op: BinaryOp::Lt,
1878 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1879 }),
1880 },
1881 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1882 variable: "n".to_string(),
1883 label: None,
1884 input: None,
1885 })),
1886 })),
1887 }));
1888
1889 let physical = planner.plan(&logical).unwrap();
1890 assert_eq!(physical.columns(), &["n"]);
1891 }
1892
1893 #[test]
1894 fn test_plan_filter_unary_not() {
1895 let store = create_test_store();
1896 let planner = Planner::new(store);
1897
1898 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1900 items: vec![ReturnItem {
1901 expression: LogicalExpression::Variable("n".to_string()),
1902 alias: None,
1903 }],
1904 distinct: false,
1905 input: Box::new(LogicalOperator::Filter(FilterOp {
1906 predicate: LogicalExpression::Unary {
1907 op: UnaryOp::Not,
1908 operand: Box::new(LogicalExpression::Property {
1909 variable: "n".to_string(),
1910 property: "active".to_string(),
1911 }),
1912 },
1913 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1914 variable: "n".to_string(),
1915 label: None,
1916 input: None,
1917 })),
1918 })),
1919 }));
1920
1921 let physical = planner.plan(&logical).unwrap();
1922 assert_eq!(physical.columns(), &["n"]);
1923 }
1924
1925 #[test]
1926 fn test_plan_filter_is_null() {
1927 let store = create_test_store();
1928 let planner = Planner::new(store);
1929
1930 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1932 items: vec![ReturnItem {
1933 expression: LogicalExpression::Variable("n".to_string()),
1934 alias: None,
1935 }],
1936 distinct: false,
1937 input: Box::new(LogicalOperator::Filter(FilterOp {
1938 predicate: LogicalExpression::Unary {
1939 op: UnaryOp::IsNull,
1940 operand: Box::new(LogicalExpression::Property {
1941 variable: "n".to_string(),
1942 property: "email".to_string(),
1943 }),
1944 },
1945 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1946 variable: "n".to_string(),
1947 label: None,
1948 input: None,
1949 })),
1950 })),
1951 }));
1952
1953 let physical = planner.plan(&logical).unwrap();
1954 assert_eq!(physical.columns(), &["n"]);
1955 }
1956
1957 #[test]
1958 fn test_plan_filter_function_call() {
1959 let store = create_test_store();
1960 let planner = Planner::new(store);
1961
1962 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1964 items: vec![ReturnItem {
1965 expression: LogicalExpression::Variable("n".to_string()),
1966 alias: None,
1967 }],
1968 distinct: false,
1969 input: Box::new(LogicalOperator::Filter(FilterOp {
1970 predicate: LogicalExpression::Binary {
1971 left: Box::new(LogicalExpression::FunctionCall {
1972 name: "size".to_string(),
1973 args: vec![LogicalExpression::Property {
1974 variable: "n".to_string(),
1975 property: "friends".to_string(),
1976 }],
1977 }),
1978 op: BinaryOp::Gt,
1979 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1980 },
1981 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1982 variable: "n".to_string(),
1983 label: None,
1984 input: None,
1985 })),
1986 })),
1987 }));
1988
1989 let physical = planner.plan(&logical).unwrap();
1990 assert_eq!(physical.columns(), &["n"]);
1991 }
1992
1993 #[test]
1996 fn test_plan_expand_outgoing() {
1997 let store = create_test_store();
1998 let planner = Planner::new(store);
1999
2000 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2002 items: vec![
2003 ReturnItem {
2004 expression: LogicalExpression::Variable("a".to_string()),
2005 alias: None,
2006 },
2007 ReturnItem {
2008 expression: LogicalExpression::Variable("b".to_string()),
2009 alias: None,
2010 },
2011 ],
2012 distinct: false,
2013 input: Box::new(LogicalOperator::Expand(ExpandOp {
2014 from_variable: "a".to_string(),
2015 to_variable: "b".to_string(),
2016 edge_variable: None,
2017 direction: ExpandDirection::Outgoing,
2018 edge_type: Some("KNOWS".to_string()),
2019 min_hops: 1,
2020 max_hops: Some(1),
2021 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2022 variable: "a".to_string(),
2023 label: Some("Person".to_string()),
2024 input: None,
2025 })),
2026 })),
2027 }));
2028
2029 let physical = planner.plan(&logical).unwrap();
2030 assert!(physical.columns().contains(&"a".to_string()));
2032 assert!(physical.columns().contains(&"b".to_string()));
2033 }
2034
2035 #[test]
2036 fn test_plan_expand_with_edge_variable() {
2037 let store = create_test_store();
2038 let planner = Planner::new(store);
2039
2040 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2042 items: vec![
2043 ReturnItem {
2044 expression: LogicalExpression::Variable("a".to_string()),
2045 alias: None,
2046 },
2047 ReturnItem {
2048 expression: LogicalExpression::Variable("r".to_string()),
2049 alias: None,
2050 },
2051 ReturnItem {
2052 expression: LogicalExpression::Variable("b".to_string()),
2053 alias: None,
2054 },
2055 ],
2056 distinct: false,
2057 input: Box::new(LogicalOperator::Expand(ExpandOp {
2058 from_variable: "a".to_string(),
2059 to_variable: "b".to_string(),
2060 edge_variable: Some("r".to_string()),
2061 direction: ExpandDirection::Outgoing,
2062 edge_type: Some("KNOWS".to_string()),
2063 min_hops: 1,
2064 max_hops: Some(1),
2065 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2066 variable: "a".to_string(),
2067 label: None,
2068 input: None,
2069 })),
2070 })),
2071 }));
2072
2073 let physical = planner.plan(&logical).unwrap();
2074 assert!(physical.columns().contains(&"a".to_string()));
2075 assert!(physical.columns().contains(&"r".to_string()));
2076 assert!(physical.columns().contains(&"b".to_string()));
2077 }
2078
2079 #[test]
2082 fn test_plan_limit() {
2083 let store = create_test_store();
2084 let planner = Planner::new(store);
2085
2086 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2088 items: vec![ReturnItem {
2089 expression: LogicalExpression::Variable("n".to_string()),
2090 alias: None,
2091 }],
2092 distinct: false,
2093 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2094 count: 10,
2095 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2096 variable: "n".to_string(),
2097 label: None,
2098 input: None,
2099 })),
2100 })),
2101 }));
2102
2103 let physical = planner.plan(&logical).unwrap();
2104 assert_eq!(physical.columns(), &["n"]);
2105 }
2106
2107 #[test]
2108 fn test_plan_skip() {
2109 let store = create_test_store();
2110 let planner = Planner::new(store);
2111
2112 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2114 items: vec![ReturnItem {
2115 expression: LogicalExpression::Variable("n".to_string()),
2116 alias: None,
2117 }],
2118 distinct: false,
2119 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2120 count: 5,
2121 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2122 variable: "n".to_string(),
2123 label: None,
2124 input: None,
2125 })),
2126 })),
2127 }));
2128
2129 let physical = planner.plan(&logical).unwrap();
2130 assert_eq!(physical.columns(), &["n"]);
2131 }
2132
2133 #[test]
2134 fn test_plan_sort() {
2135 let store = create_test_store();
2136 let planner = Planner::new(store);
2137
2138 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2140 items: vec![ReturnItem {
2141 expression: LogicalExpression::Variable("n".to_string()),
2142 alias: None,
2143 }],
2144 distinct: false,
2145 input: Box::new(LogicalOperator::Sort(SortOp {
2146 keys: vec![SortKey {
2147 expression: LogicalExpression::Variable("n".to_string()),
2148 order: SortOrder::Ascending,
2149 }],
2150 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2151 variable: "n".to_string(),
2152 label: None,
2153 input: None,
2154 })),
2155 })),
2156 }));
2157
2158 let physical = planner.plan(&logical).unwrap();
2159 assert_eq!(physical.columns(), &["n"]);
2160 }
2161
2162 #[test]
2163 fn test_plan_sort_descending() {
2164 let store = create_test_store();
2165 let planner = Planner::new(store);
2166
2167 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2169 items: vec![ReturnItem {
2170 expression: LogicalExpression::Variable("n".to_string()),
2171 alias: None,
2172 }],
2173 distinct: false,
2174 input: Box::new(LogicalOperator::Sort(SortOp {
2175 keys: vec![SortKey {
2176 expression: LogicalExpression::Variable("n".to_string()),
2177 order: SortOrder::Descending,
2178 }],
2179 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2180 variable: "n".to_string(),
2181 label: None,
2182 input: None,
2183 })),
2184 })),
2185 }));
2186
2187 let physical = planner.plan(&logical).unwrap();
2188 assert_eq!(physical.columns(), &["n"]);
2189 }
2190
2191 #[test]
2192 fn test_plan_distinct() {
2193 let store = create_test_store();
2194 let planner = Planner::new(store);
2195
2196 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2198 items: vec![ReturnItem {
2199 expression: LogicalExpression::Variable("n".to_string()),
2200 alias: None,
2201 }],
2202 distinct: false,
2203 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2204 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2205 variable: "n".to_string(),
2206 label: None,
2207 input: None,
2208 })),
2209 })),
2210 }));
2211
2212 let physical = planner.plan(&logical).unwrap();
2213 assert_eq!(physical.columns(), &["n"]);
2214 }
2215
2216 #[test]
2219 fn test_plan_aggregate_count() {
2220 let store = create_test_store();
2221 let planner = Planner::new(store);
2222
2223 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2225 items: vec![ReturnItem {
2226 expression: LogicalExpression::Variable("cnt".to_string()),
2227 alias: None,
2228 }],
2229 distinct: false,
2230 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
2231 group_by: vec![],
2232 aggregates: vec![LogicalAggregateExpr {
2233 function: LogicalAggregateFunction::Count,
2234 expression: Some(LogicalExpression::Variable("n".to_string())),
2235 distinct: false,
2236 alias: Some("cnt".to_string()),
2237 }],
2238 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2239 variable: "n".to_string(),
2240 label: None,
2241 input: None,
2242 })),
2243 })),
2244 }));
2245
2246 let physical = planner.plan(&logical).unwrap();
2247 assert!(physical.columns().contains(&"cnt".to_string()));
2248 }
2249
2250 #[test]
2251 fn test_plan_aggregate_with_group_by() {
2252 let store = create_test_store();
2253 let planner = Planner::new(store);
2254
2255 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2257 group_by: vec![LogicalExpression::Property {
2258 variable: "n".to_string(),
2259 property: "city".to_string(),
2260 }],
2261 aggregates: vec![LogicalAggregateExpr {
2262 function: LogicalAggregateFunction::Count,
2263 expression: Some(LogicalExpression::Variable("n".to_string())),
2264 distinct: false,
2265 alias: Some("cnt".to_string()),
2266 }],
2267 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2268 variable: "n".to_string(),
2269 label: Some("Person".to_string()),
2270 input: None,
2271 })),
2272 }));
2273
2274 let physical = planner.plan(&logical).unwrap();
2275 assert_eq!(physical.columns().len(), 2);
2276 }
2277
2278 #[test]
2279 fn test_plan_aggregate_sum() {
2280 let store = create_test_store();
2281 let planner = Planner::new(store);
2282
2283 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2285 group_by: vec![],
2286 aggregates: vec![LogicalAggregateExpr {
2287 function: LogicalAggregateFunction::Sum,
2288 expression: Some(LogicalExpression::Property {
2289 variable: "n".to_string(),
2290 property: "value".to_string(),
2291 }),
2292 distinct: false,
2293 alias: Some("total".to_string()),
2294 }],
2295 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2296 variable: "n".to_string(),
2297 label: None,
2298 input: None,
2299 })),
2300 }));
2301
2302 let physical = planner.plan(&logical).unwrap();
2303 assert!(physical.columns().contains(&"total".to_string()));
2304 }
2305
2306 #[test]
2307 fn test_plan_aggregate_avg() {
2308 let store = create_test_store();
2309 let planner = Planner::new(store);
2310
2311 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2313 group_by: vec![],
2314 aggregates: vec![LogicalAggregateExpr {
2315 function: LogicalAggregateFunction::Avg,
2316 expression: Some(LogicalExpression::Property {
2317 variable: "n".to_string(),
2318 property: "score".to_string(),
2319 }),
2320 distinct: false,
2321 alias: Some("average".to_string()),
2322 }],
2323 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2324 variable: "n".to_string(),
2325 label: None,
2326 input: None,
2327 })),
2328 }));
2329
2330 let physical = planner.plan(&logical).unwrap();
2331 assert!(physical.columns().contains(&"average".to_string()));
2332 }
2333
2334 #[test]
2335 fn test_plan_aggregate_min_max() {
2336 let store = create_test_store();
2337 let planner = Planner::new(store);
2338
2339 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2341 group_by: vec![],
2342 aggregates: vec![
2343 LogicalAggregateExpr {
2344 function: LogicalAggregateFunction::Min,
2345 expression: Some(LogicalExpression::Property {
2346 variable: "n".to_string(),
2347 property: "age".to_string(),
2348 }),
2349 distinct: false,
2350 alias: Some("youngest".to_string()),
2351 },
2352 LogicalAggregateExpr {
2353 function: LogicalAggregateFunction::Max,
2354 expression: Some(LogicalExpression::Property {
2355 variable: "n".to_string(),
2356 property: "age".to_string(),
2357 }),
2358 distinct: false,
2359 alias: Some("oldest".to_string()),
2360 },
2361 ],
2362 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2363 variable: "n".to_string(),
2364 label: None,
2365 input: None,
2366 })),
2367 }));
2368
2369 let physical = planner.plan(&logical).unwrap();
2370 assert!(physical.columns().contains(&"youngest".to_string()));
2371 assert!(physical.columns().contains(&"oldest".to_string()));
2372 }
2373
2374 #[test]
2377 fn test_plan_inner_join() {
2378 let store = create_test_store();
2379 let planner = Planner::new(store);
2380
2381 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2383 items: vec![
2384 ReturnItem {
2385 expression: LogicalExpression::Variable("a".to_string()),
2386 alias: None,
2387 },
2388 ReturnItem {
2389 expression: LogicalExpression::Variable("b".to_string()),
2390 alias: None,
2391 },
2392 ],
2393 distinct: false,
2394 input: Box::new(LogicalOperator::Join(JoinOp {
2395 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2396 variable: "a".to_string(),
2397 label: Some("Person".to_string()),
2398 input: None,
2399 })),
2400 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2401 variable: "b".to_string(),
2402 label: Some("Company".to_string()),
2403 input: None,
2404 })),
2405 join_type: JoinType::Inner,
2406 conditions: vec![JoinCondition {
2407 left: LogicalExpression::Variable("a".to_string()),
2408 right: LogicalExpression::Variable("b".to_string()),
2409 }],
2410 })),
2411 }));
2412
2413 let physical = planner.plan(&logical).unwrap();
2414 assert!(physical.columns().contains(&"a".to_string()));
2415 assert!(physical.columns().contains(&"b".to_string()));
2416 }
2417
2418 #[test]
2419 fn test_plan_cross_join() {
2420 let store = create_test_store();
2421 let planner = Planner::new(store);
2422
2423 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2425 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2426 variable: "a".to_string(),
2427 label: None,
2428 input: None,
2429 })),
2430 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2431 variable: "b".to_string(),
2432 label: None,
2433 input: None,
2434 })),
2435 join_type: JoinType::Cross,
2436 conditions: vec![],
2437 }));
2438
2439 let physical = planner.plan(&logical).unwrap();
2440 assert_eq!(physical.columns().len(), 2);
2441 }
2442
2443 #[test]
2444 fn test_plan_left_join() {
2445 let store = create_test_store();
2446 let planner = Planner::new(store);
2447
2448 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2449 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2450 variable: "a".to_string(),
2451 label: None,
2452 input: None,
2453 })),
2454 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2455 variable: "b".to_string(),
2456 label: None,
2457 input: None,
2458 })),
2459 join_type: JoinType::Left,
2460 conditions: vec![],
2461 }));
2462
2463 let physical = planner.plan(&logical).unwrap();
2464 assert_eq!(physical.columns().len(), 2);
2465 }
2466
2467 #[test]
2470 fn test_plan_create_node() {
2471 let store = create_test_store();
2472 let planner = Planner::new(store);
2473
2474 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2476 variable: "n".to_string(),
2477 labels: vec!["Person".to_string()],
2478 properties: vec![(
2479 "name".to_string(),
2480 LogicalExpression::Literal(Value::String("Alice".into())),
2481 )],
2482 input: None,
2483 }));
2484
2485 let physical = planner.plan(&logical).unwrap();
2486 assert!(physical.columns().contains(&"n".to_string()));
2487 }
2488
2489 #[test]
2490 fn test_plan_create_edge() {
2491 let store = create_test_store();
2492 let planner = Planner::new(store);
2493
2494 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
2496 variable: Some("r".to_string()),
2497 from_variable: "a".to_string(),
2498 to_variable: "b".to_string(),
2499 edge_type: "KNOWS".to_string(),
2500 properties: vec![],
2501 input: Box::new(LogicalOperator::Join(JoinOp {
2502 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2503 variable: "a".to_string(),
2504 label: None,
2505 input: None,
2506 })),
2507 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2508 variable: "b".to_string(),
2509 label: None,
2510 input: None,
2511 })),
2512 join_type: JoinType::Cross,
2513 conditions: vec![],
2514 })),
2515 }));
2516
2517 let physical = planner.plan(&logical).unwrap();
2518 assert!(physical.columns().contains(&"r".to_string()));
2519 }
2520
2521 #[test]
2522 fn test_plan_delete_node() {
2523 let store = create_test_store();
2524 let planner = Planner::new(store);
2525
2526 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
2528 variable: "n".to_string(),
2529 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2530 variable: "n".to_string(),
2531 label: None,
2532 input: None,
2533 })),
2534 }));
2535
2536 let physical = planner.plan(&logical).unwrap();
2537 assert!(physical.columns().contains(&"deleted_count".to_string()));
2538 }
2539
2540 #[test]
2543 fn test_plan_empty_errors() {
2544 let store = create_test_store();
2545 let planner = Planner::new(store);
2546
2547 let logical = LogicalPlan::new(LogicalOperator::Empty);
2548 let result = planner.plan(&logical);
2549 assert!(result.is_err());
2550 }
2551
2552 #[test]
2553 fn test_plan_missing_variable_in_return() {
2554 let store = create_test_store();
2555 let planner = Planner::new(store);
2556
2557 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2559 items: vec![ReturnItem {
2560 expression: LogicalExpression::Variable("missing".to_string()),
2561 alias: None,
2562 }],
2563 distinct: false,
2564 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2565 variable: "n".to_string(),
2566 label: None,
2567 input: None,
2568 })),
2569 }));
2570
2571 let result = planner.plan(&logical);
2572 assert!(result.is_err());
2573 }
2574
2575 #[test]
2578 fn test_convert_binary_ops() {
2579 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
2580 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
2581 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
2582 assert!(convert_binary_op(BinaryOp::Le).is_ok());
2583 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
2584 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
2585 assert!(convert_binary_op(BinaryOp::And).is_ok());
2586 assert!(convert_binary_op(BinaryOp::Or).is_ok());
2587 assert!(convert_binary_op(BinaryOp::Add).is_ok());
2588 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2589 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2590 assert!(convert_binary_op(BinaryOp::Div).is_ok());
2591 }
2592
2593 #[test]
2594 fn test_convert_unary_ops() {
2595 assert!(convert_unary_op(UnaryOp::Not).is_ok());
2596 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2597 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2598 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2599 }
2600
2601 #[test]
2602 fn test_convert_aggregate_functions() {
2603 assert!(matches!(
2604 convert_aggregate_function(LogicalAggregateFunction::Count),
2605 PhysicalAggregateFunction::Count
2606 ));
2607 assert!(matches!(
2608 convert_aggregate_function(LogicalAggregateFunction::Sum),
2609 PhysicalAggregateFunction::Sum
2610 ));
2611 assert!(matches!(
2612 convert_aggregate_function(LogicalAggregateFunction::Avg),
2613 PhysicalAggregateFunction::Avg
2614 ));
2615 assert!(matches!(
2616 convert_aggregate_function(LogicalAggregateFunction::Min),
2617 PhysicalAggregateFunction::Min
2618 ));
2619 assert!(matches!(
2620 convert_aggregate_function(LogicalAggregateFunction::Max),
2621 PhysicalAggregateFunction::Max
2622 ));
2623 }
2624
2625 #[test]
2626 fn test_planner_accessors() {
2627 let store = create_test_store();
2628 let planner = Planner::new(Arc::clone(&store));
2629
2630 assert!(planner.tx_id().is_none());
2631 assert!(planner.tx_manager().is_none());
2632 let _ = planner.viewing_epoch(); }
2634
2635 #[test]
2636 fn test_physical_plan_accessors() {
2637 let store = create_test_store();
2638 let planner = Planner::new(store);
2639
2640 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2641 variable: "n".to_string(),
2642 label: None,
2643 input: None,
2644 }));
2645
2646 let physical = planner.plan(&logical).unwrap();
2647 assert_eq!(physical.columns(), &["n"]);
2648
2649 let _ = physical.into_operator();
2651 }
2652}