1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::LogicalType;
15use grafeo_common::types::{EpochId, TxId};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
22 ExpressionPredicate, FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
23 JoinType as PhysicalJoinType, LimitOperator, MergeOperator, NestedLoopJoinOperator, NullOrder,
24 Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator, ScanOperator,
25 SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
26 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
27 UnwindOperator, VariableLengthExpandOperator,
28};
29use grafeo_core::graph::{Direction, lpg::LpgStore};
30use std::collections::HashMap;
31use std::sync::Arc;
32
33use crate::transaction::TransactionManager;
34
35pub struct Planner {
37 store: Arc<LpgStore>,
39 tx_manager: Option<Arc<TransactionManager>>,
41 tx_id: Option<TxId>,
43 viewing_epoch: EpochId,
45 anon_edge_counter: std::cell::Cell<u32>,
47}
48
49impl Planner {
50 #[must_use]
55 pub fn new(store: Arc<LpgStore>) -> Self {
56 let epoch = store.current_epoch();
57 Self {
58 store,
59 tx_manager: None,
60 tx_id: None,
61 viewing_epoch: epoch,
62 anon_edge_counter: std::cell::Cell::new(0),
63 }
64 }
65
66 #[must_use]
75 pub fn with_context(
76 store: Arc<LpgStore>,
77 tx_manager: Arc<TransactionManager>,
78 tx_id: Option<TxId>,
79 viewing_epoch: EpochId,
80 ) -> Self {
81 Self {
82 store,
83 tx_manager: Some(tx_manager),
84 tx_id,
85 viewing_epoch,
86 anon_edge_counter: std::cell::Cell::new(0),
87 }
88 }
89
90 #[must_use]
92 pub fn viewing_epoch(&self) -> EpochId {
93 self.viewing_epoch
94 }
95
96 #[must_use]
98 pub fn tx_id(&self) -> Option<TxId> {
99 self.tx_id
100 }
101
102 #[must_use]
104 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
105 self.tx_manager.as_ref()
106 }
107
108 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
114 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
115 Ok(PhysicalPlan {
116 operator,
117 columns,
118 adaptive_context: None,
119 })
120 }
121
122 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
131 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
132
133 let mut adaptive_context = AdaptiveContext::new();
135 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
136
137 Ok(PhysicalPlan {
138 operator,
139 columns,
140 adaptive_context: Some(adaptive_context),
141 })
142 }
143
144 fn collect_cardinality_estimates(
146 &self,
147 op: &LogicalOperator,
148 ctx: &mut AdaptiveContext,
149 depth: usize,
150 ) {
151 match op {
152 LogicalOperator::NodeScan(scan) => {
153 let estimate = if let Some(label) = &scan.label {
155 self.store.nodes_by_label(label).len() as f64
156 } else {
157 self.store.node_count() as f64
158 };
159 let id = format!("scan_{}", scan.variable);
160 ctx.set_estimate(&id, estimate);
161
162 if let Some(input) = &scan.input {
164 self.collect_cardinality_estimates(input, ctx, depth + 1);
165 }
166 }
167 LogicalOperator::Filter(filter) => {
168 let input_estimate = self.estimate_cardinality(&filter.input);
170 let estimate = input_estimate * 0.3;
171 let id = format!("filter_{depth}");
172 ctx.set_estimate(&id, estimate);
173
174 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
175 }
176 LogicalOperator::Expand(expand) => {
177 let input_estimate = self.estimate_cardinality(&expand.input);
179 let avg_degree = 10.0; let estimate = input_estimate * avg_degree;
181 let id = format!("expand_{}", expand.to_variable);
182 ctx.set_estimate(&id, estimate);
183
184 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
185 }
186 LogicalOperator::Join(join) => {
187 let left_est = self.estimate_cardinality(&join.left);
189 let right_est = self.estimate_cardinality(&join.right);
190 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
192 ctx.set_estimate(&id, estimate);
193
194 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
195 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
196 }
197 LogicalOperator::Aggregate(agg) => {
198 let input_estimate = self.estimate_cardinality(&agg.input);
200 let estimate = if agg.group_by.is_empty() {
201 1.0 } else {
203 (input_estimate * 0.1).max(1.0) };
205 let id = format!("aggregate_{depth}");
206 ctx.set_estimate(&id, estimate);
207
208 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
209 }
210 LogicalOperator::Distinct(distinct) => {
211 let input_estimate = self.estimate_cardinality(&distinct.input);
212 let estimate = (input_estimate * 0.5).max(1.0);
213 let id = format!("distinct_{depth}");
214 ctx.set_estimate(&id, estimate);
215
216 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
217 }
218 LogicalOperator::Return(ret) => {
219 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
220 }
221 LogicalOperator::Limit(limit) => {
222 let input_estimate = self.estimate_cardinality(&limit.input);
223 let estimate = (input_estimate).min(limit.count as f64);
224 let id = format!("limit_{depth}");
225 ctx.set_estimate(&id, estimate);
226
227 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
228 }
229 LogicalOperator::Skip(skip) => {
230 let input_estimate = self.estimate_cardinality(&skip.input);
231 let estimate = (input_estimate - skip.count as f64).max(0.0);
232 let id = format!("skip_{depth}");
233 ctx.set_estimate(&id, estimate);
234
235 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
236 }
237 LogicalOperator::Sort(sort) => {
238 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
240 }
241 LogicalOperator::Union(union) => {
242 let estimate: f64 = union
243 .inputs
244 .iter()
245 .map(|input| self.estimate_cardinality(input))
246 .sum();
247 let id = format!("union_{depth}");
248 ctx.set_estimate(&id, estimate);
249
250 for input in &union.inputs {
251 self.collect_cardinality_estimates(input, ctx, depth + 1);
252 }
253 }
254 _ => {
255 }
257 }
258 }
259
260 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
262 match op {
263 LogicalOperator::NodeScan(scan) => {
264 if let Some(label) = &scan.label {
265 self.store.nodes_by_label(label).len() as f64
266 } else {
267 self.store.node_count() as f64
268 }
269 }
270 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
271 LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
272 LogicalOperator::Join(join) => {
273 let left = self.estimate_cardinality(&join.left);
274 let right = self.estimate_cardinality(&join.right);
275 (left * right).sqrt()
276 }
277 LogicalOperator::Aggregate(agg) => {
278 if agg.group_by.is_empty() {
279 1.0
280 } else {
281 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
282 }
283 }
284 LogicalOperator::Distinct(distinct) => {
285 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
286 }
287 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
288 LogicalOperator::Limit(limit) => self
289 .estimate_cardinality(&limit.input)
290 .min(limit.count as f64),
291 LogicalOperator::Skip(skip) => {
292 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
293 }
294 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
295 LogicalOperator::Union(union) => union
296 .inputs
297 .iter()
298 .map(|input| self.estimate_cardinality(input))
299 .sum(),
300 _ => 1000.0, }
302 }
303
304 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
306 match op {
307 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
308 LogicalOperator::Expand(expand) => self.plan_expand(expand),
309 LogicalOperator::Return(ret) => self.plan_return(ret),
310 LogicalOperator::Filter(filter) => self.plan_filter(filter),
311 LogicalOperator::Project(project) => self.plan_project(project),
312 LogicalOperator::Limit(limit) => self.plan_limit(limit),
313 LogicalOperator::Skip(skip) => self.plan_skip(skip),
314 LogicalOperator::Sort(sort) => self.plan_sort(sort),
315 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
316 LogicalOperator::Join(join) => self.plan_join(join),
317 LogicalOperator::Union(union) => self.plan_union(union),
318 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
319 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
320 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
321 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
322 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
323 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
324 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
325 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
326 LogicalOperator::Merge(merge) => self.plan_merge(merge),
327 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
328 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
329 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
330 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
331 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
332 _ => Err(Error::Internal(format!(
333 "Unsupported operator: {:?}",
334 std::mem::discriminant(op)
335 ))),
336 }
337 }
338
339 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
341 let scan_op = if let Some(label) = &scan.label {
342 ScanOperator::with_label(Arc::clone(&self.store), label)
343 } else {
344 ScanOperator::new(Arc::clone(&self.store))
345 };
346
347 let scan_operator: Box<dyn Operator> =
349 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
350
351 if let Some(input) = &scan.input {
353 let (input_op, mut input_columns) = self.plan_operator(input)?;
354
355 let mut output_schema: Vec<LogicalType> =
357 input_columns.iter().map(|_| LogicalType::Any).collect();
358 output_schema.push(LogicalType::Node);
359
360 input_columns.push(scan.variable.clone());
362
363 let join_op = Box::new(NestedLoopJoinOperator::new(
365 input_op,
366 scan_operator,
367 None, PhysicalJoinType::Cross,
369 output_schema,
370 ));
371
372 Ok((join_op, input_columns))
373 } else {
374 let columns = vec![scan.variable.clone()];
375 Ok((scan_operator, columns))
376 }
377 }
378
379 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
381 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
383
384 let source_column = input_columns
386 .iter()
387 .position(|c| c == &expand.from_variable)
388 .ok_or_else(|| {
389 Error::Internal(format!(
390 "Source variable '{}' not found in input columns",
391 expand.from_variable
392 ))
393 })?;
394
395 let direction = match expand.direction {
397 ExpandDirection::Outgoing => Direction::Outgoing,
398 ExpandDirection::Incoming => Direction::Incoming,
399 ExpandDirection::Both => Direction::Both,
400 };
401
402 let is_variable_length =
404 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
405
406 let operator: Box<dyn Operator> = if is_variable_length {
407 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
410 Arc::clone(&self.store),
411 input_op,
412 source_column,
413 direction,
414 expand.edge_type.clone(),
415 expand.min_hops,
416 max_hops,
417 )
418 .with_tx_context(self.viewing_epoch, self.tx_id);
419
420 if expand.path_alias.is_some() {
422 expand_op = expand_op.with_path_length_output();
423 }
424
425 Box::new(expand_op)
426 } else {
427 let expand_op = ExpandOperator::new(
429 Arc::clone(&self.store),
430 input_op,
431 source_column,
432 direction,
433 expand.edge_type.clone(),
434 )
435 .with_tx_context(self.viewing_epoch, self.tx_id);
436 Box::new(expand_op)
437 };
438
439 let mut columns = input_columns;
442
443 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
445 let count = self.anon_edge_counter.get();
446 self.anon_edge_counter.set(count + 1);
447 format!("_anon_edge_{}", count)
448 });
449 columns.push(edge_col_name);
450
451 columns.push(expand.to_variable.clone());
452
453 if let Some(ref path_alias) = expand.path_alias {
455 columns.push(format!("_path_length_{}", path_alias));
456 }
457
458 Ok((operator, columns))
459 }
460
461 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
463 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
465
466 let variable_columns: HashMap<String, usize> = input_columns
468 .iter()
469 .enumerate()
470 .map(|(i, name)| (name.clone(), i))
471 .collect();
472
473 let columns: Vec<String> = ret
475 .items
476 .iter()
477 .map(|item| {
478 item.alias.clone().unwrap_or_else(|| {
479 expression_to_string(&item.expression)
481 })
482 })
483 .collect();
484
485 let needs_project = ret
487 .items
488 .iter()
489 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
490
491 if needs_project {
492 let mut projections = Vec::with_capacity(ret.items.len());
494 let mut output_types = Vec::with_capacity(ret.items.len());
495
496 for item in &ret.items {
497 match &item.expression {
498 LogicalExpression::Variable(name) => {
499 let col_idx = *variable_columns.get(name).ok_or_else(|| {
500 Error::Internal(format!("Variable '{}' not found in input", name))
501 })?;
502 projections.push(ProjectExpr::Column(col_idx));
503 output_types.push(LogicalType::Node);
505 }
506 LogicalExpression::Property { variable, property } => {
507 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
508 Error::Internal(format!("Variable '{}' not found in input", variable))
509 })?;
510 projections.push(ProjectExpr::PropertyAccess {
511 column: col_idx,
512 property: property.clone(),
513 });
514 output_types.push(LogicalType::Any);
516 }
517 LogicalExpression::Literal(value) => {
518 projections.push(ProjectExpr::Constant(value.clone()));
519 output_types.push(value_to_logical_type(value));
520 }
521 LogicalExpression::FunctionCall { name, args, .. } => {
522 match name.to_lowercase().as_str() {
524 "type" => {
525 if args.len() != 1 {
527 return Err(Error::Internal(
528 "type() requires exactly one argument".to_string(),
529 ));
530 }
531 if let LogicalExpression::Variable(var_name) = &args[0] {
532 let col_idx =
533 *variable_columns.get(var_name).ok_or_else(|| {
534 Error::Internal(format!(
535 "Variable '{}' not found in input",
536 var_name
537 ))
538 })?;
539 projections.push(ProjectExpr::EdgeType { column: col_idx });
540 output_types.push(LogicalType::String);
541 } else {
542 return Err(Error::Internal(
543 "type() argument must be a variable".to_string(),
544 ));
545 }
546 }
547 "length" => {
548 if args.len() != 1 {
551 return Err(Error::Internal(
552 "length() requires exactly one argument".to_string(),
553 ));
554 }
555 if let LogicalExpression::Variable(var_name) = &args[0] {
556 let col_idx =
557 *variable_columns.get(var_name).ok_or_else(|| {
558 Error::Internal(format!(
559 "Variable '{}' not found in input",
560 var_name
561 ))
562 })?;
563 projections.push(ProjectExpr::Column(col_idx));
565 output_types.push(LogicalType::Int64);
566 } else {
567 return Err(Error::Internal(
568 "length() argument must be a variable".to_string(),
569 ));
570 }
571 }
572 _ => {
574 let filter_expr = self.convert_expression(&item.expression)?;
575 projections.push(ProjectExpr::Expression {
576 expr: filter_expr,
577 variable_columns: variable_columns.clone(),
578 });
579 output_types.push(LogicalType::Any);
580 }
581 }
582 }
583 LogicalExpression::Case { .. } => {
584 let filter_expr = self.convert_expression(&item.expression)?;
586 projections.push(ProjectExpr::Expression {
587 expr: filter_expr,
588 variable_columns: variable_columns.clone(),
589 });
590 output_types.push(LogicalType::Any);
592 }
593 _ => {
594 return Err(Error::Internal(format!(
595 "Unsupported RETURN expression: {:?}",
596 item.expression
597 )));
598 }
599 }
600 }
601
602 let operator = Box::new(ProjectOperator::with_store(
603 input_op,
604 projections,
605 output_types,
606 Arc::clone(&self.store),
607 ));
608
609 Ok((operator, columns))
610 } else {
611 let mut projections = Vec::with_capacity(ret.items.len());
614 let mut output_types = Vec::with_capacity(ret.items.len());
615
616 for item in &ret.items {
617 if let LogicalExpression::Variable(name) = &item.expression {
618 let col_idx = *variable_columns.get(name).ok_or_else(|| {
619 Error::Internal(format!("Variable '{}' not found in input", name))
620 })?;
621 projections.push(ProjectExpr::Column(col_idx));
622 output_types.push(LogicalType::Node);
623 }
624 }
625
626 if projections.len() == input_columns.len()
628 && projections
629 .iter()
630 .enumerate()
631 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
632 {
633 Ok((input_op, columns))
635 } else {
636 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
637 Ok((operator, columns))
638 }
639 }
640 }
641
642 fn plan_project(
644 &self,
645 project: &crate::query::plan::ProjectOp,
646 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
647 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
649 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
650 let single_row_op: Box<dyn Operator> = Box::new(
652 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
653 );
654 (single_row_op, Vec::new())
655 } else {
656 self.plan_operator(&project.input)?
657 };
658
659 let variable_columns: HashMap<String, usize> = input_columns
661 .iter()
662 .enumerate()
663 .map(|(i, name)| (name.clone(), i))
664 .collect();
665
666 let mut projections = Vec::with_capacity(project.projections.len());
668 let mut output_types = Vec::with_capacity(project.projections.len());
669 let mut output_columns = Vec::with_capacity(project.projections.len());
670
671 for projection in &project.projections {
672 let col_name = projection
674 .alias
675 .clone()
676 .unwrap_or_else(|| expression_to_string(&projection.expression));
677 output_columns.push(col_name);
678
679 match &projection.expression {
680 LogicalExpression::Variable(name) => {
681 let col_idx = *variable_columns.get(name).ok_or_else(|| {
682 Error::Internal(format!("Variable '{}' not found in input", name))
683 })?;
684 projections.push(ProjectExpr::Column(col_idx));
685 output_types.push(LogicalType::Node);
686 }
687 LogicalExpression::Property { variable, property } => {
688 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
689 Error::Internal(format!("Variable '{}' not found in input", variable))
690 })?;
691 projections.push(ProjectExpr::PropertyAccess {
692 column: col_idx,
693 property: property.clone(),
694 });
695 output_types.push(LogicalType::Any);
696 }
697 LogicalExpression::Literal(value) => {
698 projections.push(ProjectExpr::Constant(value.clone()));
699 output_types.push(value_to_logical_type(value));
700 }
701 _ => {
702 let filter_expr = self.convert_expression(&projection.expression)?;
704 projections.push(ProjectExpr::Expression {
705 expr: filter_expr,
706 variable_columns: variable_columns.clone(),
707 });
708 output_types.push(LogicalType::Any);
709 }
710 }
711 }
712
713 let operator = Box::new(ProjectOperator::with_store(
714 input_op,
715 projections,
716 output_types,
717 Arc::clone(&self.store),
718 ));
719
720 Ok((operator, output_columns))
721 }
722
723 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
725 let (input_op, columns) = self.plan_operator(&filter.input)?;
727
728 let variable_columns: HashMap<String, usize> = columns
730 .iter()
731 .enumerate()
732 .map(|(i, name)| (name.clone(), i))
733 .collect();
734
735 let filter_expr = self.convert_expression(&filter.predicate)?;
737
738 let predicate =
740 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
741
742 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
744
745 Ok((operator, columns))
746 }
747
748 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
750 let (input_op, columns) = self.plan_operator(&limit.input)?;
751 let output_schema = self.derive_schema_from_columns(&columns);
752 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
753 Ok((operator, columns))
754 }
755
756 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
758 let (input_op, columns) = self.plan_operator(&skip.input)?;
759 let output_schema = self.derive_schema_from_columns(&columns);
760 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
761 Ok((operator, columns))
762 }
763
764 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
766 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
767
768 let mut variable_columns: HashMap<String, usize> = input_columns
770 .iter()
771 .enumerate()
772 .map(|(i, name)| (name.clone(), i))
773 .collect();
774
775 let mut property_projections: Vec<(String, String, String)> = Vec::new();
777 let mut next_col_idx = input_columns.len();
778
779 for key in &sort.keys {
780 if let LogicalExpression::Property { variable, property } = &key.expression {
781 let col_name = format!("{}_{}", variable, property);
782 if !variable_columns.contains_key(&col_name) {
783 property_projections.push((
784 variable.clone(),
785 property.clone(),
786 col_name.clone(),
787 ));
788 variable_columns.insert(col_name, next_col_idx);
789 next_col_idx += 1;
790 }
791 }
792 }
793
794 let mut output_columns = input_columns.clone();
796
797 if !property_projections.is_empty() {
799 let mut projections = Vec::new();
800 let mut output_types = Vec::new();
801
802 for (i, _) in input_columns.iter().enumerate() {
805 projections.push(ProjectExpr::Column(i));
806 output_types.push(LogicalType::Node);
807 }
808
809 for (variable, property, col_name) in &property_projections {
811 let source_col = *variable_columns.get(variable).ok_or_else(|| {
812 Error::Internal(format!(
813 "Variable '{}' not found for ORDER BY property projection",
814 variable
815 ))
816 })?;
817 projections.push(ProjectExpr::PropertyAccess {
818 column: source_col,
819 property: property.clone(),
820 });
821 output_types.push(LogicalType::Any);
822 output_columns.push(col_name.clone());
823 }
824
825 input_op = Box::new(ProjectOperator::with_store(
826 input_op,
827 projections,
828 output_types,
829 Arc::clone(&self.store),
830 ));
831 }
832
833 let physical_keys: Vec<PhysicalSortKey> = sort
835 .keys
836 .iter()
837 .map(|key| {
838 let col_idx = self
839 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
840 Ok(PhysicalSortKey {
841 column: col_idx,
842 direction: match key.order {
843 SortOrder::Ascending => SortDirection::Ascending,
844 SortOrder::Descending => SortDirection::Descending,
845 },
846 null_order: NullOrder::NullsLast,
847 })
848 })
849 .collect::<Result<Vec<_>>>()?;
850
851 let output_schema = self.derive_schema_from_columns(&output_columns);
852 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
853 Ok((operator, output_columns))
854 }
855
856 fn resolve_sort_expression_with_properties(
858 &self,
859 expr: &LogicalExpression,
860 variable_columns: &HashMap<String, usize>,
861 ) -> Result<usize> {
862 match expr {
863 LogicalExpression::Variable(name) => {
864 variable_columns.get(name).copied().ok_or_else(|| {
865 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
866 })
867 }
868 LogicalExpression::Property { variable, property } => {
869 let col_name = format!("{}_{}", variable, property);
871 variable_columns.get(&col_name).copied().ok_or_else(|| {
872 Error::Internal(format!(
873 "Property column '{}' not found for ORDER BY (from {}.{})",
874 col_name, variable, property
875 ))
876 })
877 }
878 _ => Err(Error::Internal(format!(
879 "Unsupported ORDER BY expression: {:?}",
880 expr
881 ))),
882 }
883 }
884
885 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
887 columns.iter().map(|_| LogicalType::Any).collect()
888 }
889
890 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
892 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
893
894 let mut variable_columns: HashMap<String, usize> = input_columns
896 .iter()
897 .enumerate()
898 .map(|(i, name)| (name.clone(), i))
899 .collect();
900
901 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
904
905 for expr in &agg.group_by {
907 if let LogicalExpression::Property { variable, property } = expr {
908 let col_name = format!("{}_{}", variable, property);
909 if !variable_columns.contains_key(&col_name) {
910 property_projections.push((
911 variable.clone(),
912 property.clone(),
913 col_name.clone(),
914 ));
915 variable_columns.insert(col_name, next_col_idx);
916 next_col_idx += 1;
917 }
918 }
919 }
920
921 for agg_expr in &agg.aggregates {
923 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
924 let col_name = format!("{}_{}", variable, property);
925 if !variable_columns.contains_key(&col_name) {
926 property_projections.push((
927 variable.clone(),
928 property.clone(),
929 col_name.clone(),
930 ));
931 variable_columns.insert(col_name, next_col_idx);
932 next_col_idx += 1;
933 }
934 }
935 }
936
937 if !property_projections.is_empty() {
939 let mut projections = Vec::new();
940 let mut output_types = Vec::new();
941
942 for (i, _) in input_columns.iter().enumerate() {
945 projections.push(ProjectExpr::Column(i));
946 output_types.push(LogicalType::Node);
947 }
948
949 for (variable, property, _col_name) in &property_projections {
951 let source_col = *variable_columns.get(variable).ok_or_else(|| {
952 Error::Internal(format!(
953 "Variable '{}' not found for property projection",
954 variable
955 ))
956 })?;
957 projections.push(ProjectExpr::PropertyAccess {
958 column: source_col,
959 property: property.clone(),
960 });
961 output_types.push(LogicalType::Any); }
963
964 input_op = Box::new(ProjectOperator::with_store(
965 input_op,
966 projections,
967 output_types,
968 Arc::clone(&self.store),
969 ));
970 }
971
972 let group_columns: Vec<usize> = agg
974 .group_by
975 .iter()
976 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
977 .collect::<Result<Vec<_>>>()?;
978
979 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
981 .aggregates
982 .iter()
983 .map(|agg_expr| {
984 let column = agg_expr
985 .expression
986 .as_ref()
987 .map(|e| {
988 self.resolve_expression_to_column_with_properties(e, &variable_columns)
989 })
990 .transpose()?;
991
992 Ok(PhysicalAggregateExpr {
993 function: convert_aggregate_function(agg_expr.function),
994 column,
995 distinct: agg_expr.distinct,
996 alias: agg_expr.alias.clone(),
997 percentile: agg_expr.percentile,
998 })
999 })
1000 .collect::<Result<Vec<_>>>()?;
1001
1002 let mut output_schema = Vec::new();
1004 let mut output_columns = Vec::new();
1005
1006 for expr in &agg.group_by {
1008 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1010 }
1011
1012 for agg_expr in &agg.aggregates {
1014 let result_type = match agg_expr.function {
1015 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1016 LogicalType::Int64
1017 }
1018 LogicalAggregateFunction::Sum => LogicalType::Int64,
1019 LogicalAggregateFunction::Avg => LogicalType::Float64,
1020 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1021 LogicalType::Int64
1025 }
1026 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1029 | LogicalAggregateFunction::StdDevPop
1030 | LogicalAggregateFunction::PercentileDisc
1031 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1032 };
1033 output_schema.push(result_type);
1034 output_columns.push(
1035 agg_expr
1036 .alias
1037 .clone()
1038 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1039 );
1040 }
1041
1042 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1044 Box::new(SimpleAggregateOperator::new(
1045 input_op,
1046 physical_aggregates,
1047 output_schema,
1048 ))
1049 } else {
1050 Box::new(HashAggregateOperator::new(
1051 input_op,
1052 group_columns,
1053 physical_aggregates,
1054 output_schema,
1055 ))
1056 };
1057
1058 if let Some(having_expr) = &agg.having {
1060 let having_var_columns: HashMap<String, usize> = output_columns
1062 .iter()
1063 .enumerate()
1064 .map(|(i, name)| (name.clone(), i))
1065 .collect();
1066
1067 let filter_expr = self.convert_expression(having_expr)?;
1068 let predicate =
1069 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1070 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1071 }
1072
1073 Ok((operator, output_columns))
1074 }
1075
1076 #[allow(dead_code)]
1078 fn resolve_expression_to_column(
1079 &self,
1080 expr: &LogicalExpression,
1081 variable_columns: &HashMap<String, usize>,
1082 ) -> Result<usize> {
1083 match expr {
1084 LogicalExpression::Variable(name) => variable_columns
1085 .get(name)
1086 .copied()
1087 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1088 LogicalExpression::Property { variable, .. } => variable_columns
1089 .get(variable)
1090 .copied()
1091 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1092 _ => Err(Error::Internal(format!(
1093 "Cannot resolve expression to column: {:?}",
1094 expr
1095 ))),
1096 }
1097 }
1098
1099 fn resolve_expression_to_column_with_properties(
1103 &self,
1104 expr: &LogicalExpression,
1105 variable_columns: &HashMap<String, usize>,
1106 ) -> Result<usize> {
1107 match expr {
1108 LogicalExpression::Variable(name) => variable_columns
1109 .get(name)
1110 .copied()
1111 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1112 LogicalExpression::Property { variable, property } => {
1113 let col_name = format!("{}_{}", variable, property);
1115 variable_columns.get(&col_name).copied().ok_or_else(|| {
1116 Error::Internal(format!(
1117 "Property column '{}' not found (from {}.{})",
1118 col_name, variable, property
1119 ))
1120 })
1121 }
1122 _ => Err(Error::Internal(format!(
1123 "Cannot resolve expression to column: {:?}",
1124 expr
1125 ))),
1126 }
1127 }
1128
1129 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1131 match expr {
1132 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1133 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1134 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1135 variable: variable.clone(),
1136 property: property.clone(),
1137 }),
1138 LogicalExpression::Binary { left, op, right } => {
1139 let left_expr = self.convert_expression(left)?;
1140 let right_expr = self.convert_expression(right)?;
1141 let filter_op = convert_binary_op(*op)?;
1142 Ok(FilterExpression::Binary {
1143 left: Box::new(left_expr),
1144 op: filter_op,
1145 right: Box::new(right_expr),
1146 })
1147 }
1148 LogicalExpression::Unary { op, operand } => {
1149 let operand_expr = self.convert_expression(operand)?;
1150 let filter_op = convert_unary_op(*op)?;
1151 Ok(FilterExpression::Unary {
1152 op: filter_op,
1153 operand: Box::new(operand_expr),
1154 })
1155 }
1156 LogicalExpression::FunctionCall { name, args, .. } => {
1157 let filter_args: Vec<FilterExpression> = args
1158 .iter()
1159 .map(|a| self.convert_expression(a))
1160 .collect::<Result<Vec<_>>>()?;
1161 Ok(FilterExpression::FunctionCall {
1162 name: name.clone(),
1163 args: filter_args,
1164 })
1165 }
1166 LogicalExpression::Case {
1167 operand,
1168 when_clauses,
1169 else_clause,
1170 } => {
1171 let filter_operand = operand
1172 .as_ref()
1173 .map(|e| self.convert_expression(e))
1174 .transpose()?
1175 .map(Box::new);
1176 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1177 .iter()
1178 .map(|(cond, result)| {
1179 Ok((
1180 self.convert_expression(cond)?,
1181 self.convert_expression(result)?,
1182 ))
1183 })
1184 .collect::<Result<Vec<_>>>()?;
1185 let filter_else = else_clause
1186 .as_ref()
1187 .map(|e| self.convert_expression(e))
1188 .transpose()?
1189 .map(Box::new);
1190 Ok(FilterExpression::Case {
1191 operand: filter_operand,
1192 when_clauses: filter_when_clauses,
1193 else_clause: filter_else,
1194 })
1195 }
1196 LogicalExpression::List(items) => {
1197 let filter_items: Vec<FilterExpression> = items
1198 .iter()
1199 .map(|item| self.convert_expression(item))
1200 .collect::<Result<Vec<_>>>()?;
1201 Ok(FilterExpression::List(filter_items))
1202 }
1203 LogicalExpression::Map(pairs) => {
1204 let filter_pairs: Vec<(String, FilterExpression)> = pairs
1205 .iter()
1206 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1207 .collect::<Result<Vec<_>>>()?;
1208 Ok(FilterExpression::Map(filter_pairs))
1209 }
1210 LogicalExpression::IndexAccess { base, index } => {
1211 let base_expr = self.convert_expression(base)?;
1212 let index_expr = self.convert_expression(index)?;
1213 Ok(FilterExpression::IndexAccess {
1214 base: Box::new(base_expr),
1215 index: Box::new(index_expr),
1216 })
1217 }
1218 LogicalExpression::SliceAccess { base, start, end } => {
1219 let base_expr = self.convert_expression(base)?;
1220 let start_expr = start
1221 .as_ref()
1222 .map(|s| self.convert_expression(s))
1223 .transpose()?
1224 .map(Box::new);
1225 let end_expr = end
1226 .as_ref()
1227 .map(|e| self.convert_expression(e))
1228 .transpose()?
1229 .map(Box::new);
1230 Ok(FilterExpression::SliceAccess {
1231 base: Box::new(base_expr),
1232 start: start_expr,
1233 end: end_expr,
1234 })
1235 }
1236 LogicalExpression::Parameter(_) => Err(Error::Internal(
1237 "Parameters not yet supported in filters".to_string(),
1238 )),
1239 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1240 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1241 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1242 LogicalExpression::ListComprehension {
1243 variable,
1244 list_expr,
1245 filter_expr,
1246 map_expr,
1247 } => {
1248 let list = self.convert_expression(list_expr)?;
1249 let filter = filter_expr
1250 .as_ref()
1251 .map(|f| self.convert_expression(f))
1252 .transpose()?
1253 .map(Box::new);
1254 let map = self.convert_expression(map_expr)?;
1255 Ok(FilterExpression::ListComprehension {
1256 variable: variable.clone(),
1257 list_expr: Box::new(list),
1258 filter_expr: filter,
1259 map_expr: Box::new(map),
1260 })
1261 }
1262 LogicalExpression::ExistsSubquery(subplan) => {
1263 let (start_var, direction, edge_type, end_labels) =
1266 self.extract_exists_pattern(subplan)?;
1267
1268 Ok(FilterExpression::ExistsSubquery {
1269 start_var,
1270 direction,
1271 edge_type,
1272 end_labels,
1273 min_hops: None,
1274 max_hops: None,
1275 })
1276 }
1277 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
1278 "COUNT subqueries not yet supported".to_string(),
1279 )),
1280 }
1281 }
1282
1283 fn extract_exists_pattern(
1286 &self,
1287 subplan: &LogicalOperator,
1288 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
1289 match subplan {
1290 LogicalOperator::Expand(expand) => {
1291 let end_labels = self.extract_end_labels_from_expand(expand);
1293 let direction = match expand.direction {
1294 ExpandDirection::Outgoing => Direction::Outgoing,
1295 ExpandDirection::Incoming => Direction::Incoming,
1296 ExpandDirection::Both => Direction::Both,
1297 };
1298 Ok((
1299 expand.from_variable.clone(),
1300 direction,
1301 expand.edge_type.clone(),
1302 end_labels,
1303 ))
1304 }
1305 LogicalOperator::NodeScan(scan) => {
1306 if let Some(input) = &scan.input {
1307 self.extract_exists_pattern(input)
1308 } else {
1309 Err(Error::Internal(
1310 "EXISTS subquery must contain an edge pattern".to_string(),
1311 ))
1312 }
1313 }
1314 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
1315 _ => Err(Error::Internal(
1316 "Unsupported EXISTS subquery pattern".to_string(),
1317 )),
1318 }
1319 }
1320
1321 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
1323 match expand.input.as_ref() {
1325 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
1326 _ => None,
1327 }
1328 }
1329
1330 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1332 let (left_op, left_columns) = self.plan_operator(&join.left)?;
1333 let (right_op, right_columns) = self.plan_operator(&join.right)?;
1334
1335 let mut columns = left_columns.clone();
1337 columns.extend(right_columns.clone());
1338
1339 let physical_join_type = match join.join_type {
1341 JoinType::Inner => PhysicalJoinType::Inner,
1342 JoinType::Left => PhysicalJoinType::Left,
1343 JoinType::Right => PhysicalJoinType::Right,
1344 JoinType::Full => PhysicalJoinType::Full,
1345 JoinType::Cross => PhysicalJoinType::Cross,
1346 JoinType::Semi => PhysicalJoinType::Semi,
1347 JoinType::Anti => PhysicalJoinType::Anti,
1348 };
1349
1350 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
1352 (vec![], vec![])
1354 } else {
1355 join.conditions
1356 .iter()
1357 .filter_map(|cond| {
1358 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
1360 let right_idx = self
1361 .expression_to_column(&cond.right, &right_columns)
1362 .ok()?;
1363 Some((left_idx, right_idx))
1364 })
1365 .unzip()
1366 };
1367
1368 let output_schema = self.derive_schema_from_columns(&columns);
1369
1370 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1371 left_op,
1372 right_op,
1373 probe_keys,
1374 build_keys,
1375 physical_join_type,
1376 output_schema,
1377 ));
1378
1379 Ok((operator, columns))
1380 }
1381
1382 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
1384 match expr {
1385 LogicalExpression::Variable(name) => columns
1386 .iter()
1387 .position(|c| c == name)
1388 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1389 _ => Err(Error::Internal(
1390 "Only variables supported in join conditions".to_string(),
1391 )),
1392 }
1393 }
1394
1395 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1397 if union.inputs.is_empty() {
1398 return Err(Error::Internal(
1399 "Union requires at least one input".to_string(),
1400 ));
1401 }
1402
1403 let mut inputs = Vec::with_capacity(union.inputs.len());
1404 let mut columns = Vec::new();
1405
1406 for (i, input) in union.inputs.iter().enumerate() {
1407 let (op, cols) = self.plan_operator(input)?;
1408 if i == 0 {
1409 columns = cols;
1410 }
1411 inputs.push(op);
1412 }
1413
1414 let output_schema = self.derive_schema_from_columns(&columns);
1415 let operator = Box::new(UnionOperator::new(inputs, output_schema));
1416
1417 Ok((operator, columns))
1418 }
1419
1420 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1422 let (input_op, columns) = self.plan_operator(&distinct.input)?;
1423 let output_schema = self.derive_schema_from_columns(&columns);
1424 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
1425 Ok((operator, columns))
1426 }
1427
1428 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1430 let (input_op, mut columns) = if let Some(ref input) = create.input {
1432 let (op, cols) = self.plan_operator(input)?;
1433 (Some(op), cols)
1434 } else {
1435 (None, vec![])
1436 };
1437
1438 let output_column = columns.len();
1440 columns.push(create.variable.clone());
1441
1442 let properties: Vec<(String, PropertySource)> = create
1444 .properties
1445 .iter()
1446 .map(|(name, expr)| {
1447 let source = match expr {
1448 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1449 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1450 };
1451 (name.clone(), source)
1452 })
1453 .collect();
1454
1455 let output_schema = self.derive_schema_from_columns(&columns);
1456
1457 let operator = Box::new(
1458 CreateNodeOperator::new(
1459 Arc::clone(&self.store),
1460 input_op,
1461 create.labels.clone(),
1462 properties,
1463 output_schema,
1464 output_column,
1465 )
1466 .with_tx_context(self.viewing_epoch, self.tx_id),
1467 );
1468
1469 Ok((operator, columns))
1470 }
1471
1472 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1474 let (input_op, mut columns) = self.plan_operator(&create.input)?;
1475
1476 let from_column = columns
1478 .iter()
1479 .position(|c| c == &create.from_variable)
1480 .ok_or_else(|| {
1481 Error::Internal(format!(
1482 "Source variable '{}' not found",
1483 create.from_variable
1484 ))
1485 })?;
1486
1487 let to_column = columns
1488 .iter()
1489 .position(|c| c == &create.to_variable)
1490 .ok_or_else(|| {
1491 Error::Internal(format!(
1492 "Target variable '{}' not found",
1493 create.to_variable
1494 ))
1495 })?;
1496
1497 let output_column = create.variable.as_ref().map(|v| {
1499 let idx = columns.len();
1500 columns.push(v.clone());
1501 idx
1502 });
1503
1504 let properties: Vec<(String, PropertySource)> = create
1506 .properties
1507 .iter()
1508 .map(|(name, expr)| {
1509 let source = match expr {
1510 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1511 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1512 };
1513 (name.clone(), source)
1514 })
1515 .collect();
1516
1517 let output_schema = self.derive_schema_from_columns(&columns);
1518
1519 let operator = Box::new(
1520 CreateEdgeOperator::new(
1521 Arc::clone(&self.store),
1522 input_op,
1523 from_column,
1524 to_column,
1525 create.edge_type.clone(),
1526 properties,
1527 output_schema,
1528 output_column,
1529 )
1530 .with_tx_context(self.viewing_epoch, self.tx_id),
1531 );
1532
1533 Ok((operator, columns))
1534 }
1535
1536 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1538 let (input_op, columns) = self.plan_operator(&delete.input)?;
1539
1540 let node_column = columns
1541 .iter()
1542 .position(|c| c == &delete.variable)
1543 .ok_or_else(|| {
1544 Error::Internal(format!(
1545 "Variable '{}' not found for delete",
1546 delete.variable
1547 ))
1548 })?;
1549
1550 let output_schema = vec![LogicalType::Int64];
1552 let output_columns = vec!["deleted_count".to_string()];
1553
1554 let operator = Box::new(
1555 DeleteNodeOperator::new(
1556 Arc::clone(&self.store),
1557 input_op,
1558 node_column,
1559 output_schema,
1560 delete.detach, )
1562 .with_tx_context(self.viewing_epoch, self.tx_id),
1563 );
1564
1565 Ok((operator, output_columns))
1566 }
1567
1568 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1570 let (input_op, columns) = self.plan_operator(&delete.input)?;
1571
1572 let edge_column = columns
1573 .iter()
1574 .position(|c| c == &delete.variable)
1575 .ok_or_else(|| {
1576 Error::Internal(format!(
1577 "Variable '{}' not found for delete",
1578 delete.variable
1579 ))
1580 })?;
1581
1582 let output_schema = vec![LogicalType::Int64];
1584 let output_columns = vec!["deleted_count".to_string()];
1585
1586 let operator = Box::new(
1587 DeleteEdgeOperator::new(
1588 Arc::clone(&self.store),
1589 input_op,
1590 edge_column,
1591 output_schema,
1592 )
1593 .with_tx_context(self.viewing_epoch, self.tx_id),
1594 );
1595
1596 Ok((operator, output_columns))
1597 }
1598
1599 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1601 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
1602 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
1603
1604 let mut columns = left_columns.clone();
1606 columns.extend(right_columns.clone());
1607
1608 let mut probe_keys = Vec::new();
1610 let mut build_keys = Vec::new();
1611
1612 for (right_idx, right_col) in right_columns.iter().enumerate() {
1613 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1614 probe_keys.push(left_idx);
1615 build_keys.push(right_idx);
1616 }
1617 }
1618
1619 let output_schema = self.derive_schema_from_columns(&columns);
1620
1621 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1622 left_op,
1623 right_op,
1624 probe_keys,
1625 build_keys,
1626 PhysicalJoinType::Left,
1627 output_schema,
1628 ));
1629
1630 Ok((operator, columns))
1631 }
1632
1633 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1635 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
1636 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
1637
1638 let columns = left_columns.clone();
1640
1641 let mut probe_keys = Vec::new();
1643 let mut build_keys = Vec::new();
1644
1645 for (right_idx, right_col) in right_columns.iter().enumerate() {
1646 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1647 probe_keys.push(left_idx);
1648 build_keys.push(right_idx);
1649 }
1650 }
1651
1652 let output_schema = self.derive_schema_from_columns(&columns);
1653
1654 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1655 left_op,
1656 right_op,
1657 probe_keys,
1658 build_keys,
1659 PhysicalJoinType::Anti,
1660 output_schema,
1661 ));
1662
1663 Ok((operator, columns))
1664 }
1665
1666 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1668 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
1671 if matches!(&*unwind.input, LogicalOperator::Empty) {
1672 let literal_list = self.convert_expression(&unwind.expression)?;
1677
1678 let single_row_op: Box<dyn Operator> = Box::new(
1680 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
1681 );
1682 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
1683 single_row_op,
1684 vec![ProjectExpr::Expression {
1685 expr: literal_list,
1686 variable_columns: HashMap::new(),
1687 }],
1688 vec![LogicalType::Any],
1689 Arc::clone(&self.store),
1690 ));
1691
1692 (project_op, vec!["__list__".to_string()])
1693 } else {
1694 self.plan_operator(&unwind.input)?
1695 };
1696
1697 let list_col_idx = match &unwind.expression {
1703 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
1704 LogicalExpression::Property { variable, .. } => {
1705 input_columns.iter().position(|c| c == variable)
1708 }
1709 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
1710 None
1712 }
1713 _ => None,
1714 };
1715
1716 let mut columns = input_columns.clone();
1718 columns.push(unwind.variable.clone());
1719
1720 let mut output_schema = self.derive_schema_from_columns(&input_columns);
1722 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
1727
1728 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
1729 input_op,
1730 col_idx,
1731 unwind.variable.clone(),
1732 output_schema,
1733 ));
1734
1735 Ok((operator, columns))
1736 }
1737
1738 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1740 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
1742 Vec::new()
1743 } else {
1744 let (_input_op, cols) = self.plan_operator(&merge.input)?;
1745 cols
1746 };
1747
1748 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
1750 .match_properties
1751 .iter()
1752 .filter_map(|(name, expr)| {
1753 if let LogicalExpression::Literal(v) = expr {
1754 Some((name.clone(), v.clone()))
1755 } else {
1756 None }
1758 })
1759 .collect();
1760
1761 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
1763 .on_create
1764 .iter()
1765 .filter_map(|(name, expr)| {
1766 if let LogicalExpression::Literal(v) = expr {
1767 Some((name.clone(), v.clone()))
1768 } else {
1769 None
1770 }
1771 })
1772 .collect();
1773
1774 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
1776 .on_match
1777 .iter()
1778 .filter_map(|(name, expr)| {
1779 if let LogicalExpression::Literal(v) = expr {
1780 Some((name.clone(), v.clone()))
1781 } else {
1782 None
1783 }
1784 })
1785 .collect();
1786
1787 columns.push(merge.variable.clone());
1789
1790 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
1791 Arc::clone(&self.store),
1792 merge.variable.clone(),
1793 merge.labels.clone(),
1794 match_properties,
1795 on_create_properties,
1796 on_match_properties,
1797 ));
1798
1799 Ok((operator, columns))
1800 }
1801
1802 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1804 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
1806
1807 let source_column = columns
1809 .iter()
1810 .position(|c| c == &sp.source_var)
1811 .ok_or_else(|| {
1812 Error::Internal(format!(
1813 "Source variable '{}' not found for shortestPath",
1814 sp.source_var
1815 ))
1816 })?;
1817
1818 let target_column = columns
1819 .iter()
1820 .position(|c| c == &sp.target_var)
1821 .ok_or_else(|| {
1822 Error::Internal(format!(
1823 "Target variable '{}' not found for shortestPath",
1824 sp.target_var
1825 ))
1826 })?;
1827
1828 let direction = match sp.direction {
1830 ExpandDirection::Outgoing => Direction::Outgoing,
1831 ExpandDirection::Incoming => Direction::Incoming,
1832 ExpandDirection::Both => Direction::Both,
1833 };
1834
1835 let operator: Box<dyn Operator> = Box::new(
1837 ShortestPathOperator::new(
1838 Arc::clone(&self.store),
1839 input_op,
1840 source_column,
1841 target_column,
1842 sp.edge_type.clone(),
1843 direction,
1844 )
1845 .with_all_paths(sp.all_paths),
1846 );
1847
1848 columns.push(format!("_path_length_{}", sp.path_alias));
1851
1852 Ok((operator, columns))
1853 }
1854
1855 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1857 let (input_op, columns) = self.plan_operator(&add_label.input)?;
1858
1859 let node_column = columns
1861 .iter()
1862 .position(|c| c == &add_label.variable)
1863 .ok_or_else(|| {
1864 Error::Internal(format!(
1865 "Variable '{}' not found for ADD LABEL",
1866 add_label.variable
1867 ))
1868 })?;
1869
1870 let output_schema = vec![LogicalType::Int64];
1872 let output_columns = vec!["labels_added".to_string()];
1873
1874 let operator = Box::new(AddLabelOperator::new(
1875 Arc::clone(&self.store),
1876 input_op,
1877 node_column,
1878 add_label.labels.clone(),
1879 output_schema,
1880 ));
1881
1882 Ok((operator, output_columns))
1883 }
1884
1885 fn plan_remove_label(
1887 &self,
1888 remove_label: &RemoveLabelOp,
1889 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1890 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
1891
1892 let node_column = columns
1894 .iter()
1895 .position(|c| c == &remove_label.variable)
1896 .ok_or_else(|| {
1897 Error::Internal(format!(
1898 "Variable '{}' not found for REMOVE LABEL",
1899 remove_label.variable
1900 ))
1901 })?;
1902
1903 let output_schema = vec![LogicalType::Int64];
1905 let output_columns = vec!["labels_removed".to_string()];
1906
1907 let operator = Box::new(RemoveLabelOperator::new(
1908 Arc::clone(&self.store),
1909 input_op,
1910 node_column,
1911 remove_label.labels.clone(),
1912 output_schema,
1913 ));
1914
1915 Ok((operator, output_columns))
1916 }
1917
1918 fn plan_set_property(
1920 &self,
1921 set_prop: &SetPropertyOp,
1922 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1923 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
1924
1925 let entity_column = columns
1927 .iter()
1928 .position(|c| c == &set_prop.variable)
1929 .ok_or_else(|| {
1930 Error::Internal(format!(
1931 "Variable '{}' not found for SET",
1932 set_prop.variable
1933 ))
1934 })?;
1935
1936 let properties: Vec<(String, PropertySource)> = set_prop
1938 .properties
1939 .iter()
1940 .map(|(name, expr)| {
1941 let source = self.expression_to_property_source(expr, &columns)?;
1942 Ok((name.clone(), source))
1943 })
1944 .collect::<Result<Vec<_>>>()?;
1945
1946 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
1948 let output_columns = columns.clone();
1949
1950 let operator = Box::new(SetPropertyOperator::new_for_node(
1952 Arc::clone(&self.store),
1953 input_op,
1954 entity_column,
1955 properties,
1956 output_schema,
1957 ));
1958
1959 Ok((operator, output_columns))
1960 }
1961
1962 fn expression_to_property_source(
1964 &self,
1965 expr: &LogicalExpression,
1966 columns: &[String],
1967 ) -> Result<PropertySource> {
1968 match expr {
1969 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
1970 LogicalExpression::Variable(name) => {
1971 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
1972 Error::Internal(format!("Variable '{}' not found for property source", name))
1973 })?;
1974 Ok(PropertySource::Column(col_idx))
1975 }
1976 LogicalExpression::Parameter(name) => {
1977 Ok(PropertySource::Constant(
1980 grafeo_common::types::Value::String(format!("${}", name).into()),
1981 ))
1982 }
1983 _ => Err(Error::Internal(format!(
1984 "Unsupported expression type for property source: {:?}",
1985 expr
1986 ))),
1987 }
1988 }
1989}
1990
1991pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
1993 match op {
1994 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
1995 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
1996 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
1997 BinaryOp::Le => Ok(BinaryFilterOp::Le),
1998 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
1999 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2000 BinaryOp::And => Ok(BinaryFilterOp::And),
2001 BinaryOp::Or => Ok(BinaryFilterOp::Or),
2002 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2003 BinaryOp::Add => Ok(BinaryFilterOp::Add),
2004 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2005 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2006 BinaryOp::Div => Ok(BinaryFilterOp::Div),
2007 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2008 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2009 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2010 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2011 BinaryOp::In => Ok(BinaryFilterOp::In),
2012 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2013 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2014 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2015 "Binary operator {:?} not yet supported in filters",
2016 op
2017 ))),
2018 }
2019}
2020
2021pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2023 match op {
2024 UnaryOp::Not => Ok(UnaryFilterOp::Not),
2025 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2026 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2027 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2028 }
2029}
2030
2031pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2033 match func {
2034 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2035 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2036 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2037 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2038 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2039 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2040 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2041 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2042 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2043 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2044 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2045 }
2046}
2047
2048pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2052 match expr {
2053 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2054 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2055 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2056 variable: variable.clone(),
2057 property: property.clone(),
2058 }),
2059 LogicalExpression::Binary { left, op, right } => {
2060 let left_expr = convert_filter_expression(left)?;
2061 let right_expr = convert_filter_expression(right)?;
2062 let filter_op = convert_binary_op(*op)?;
2063 Ok(FilterExpression::Binary {
2064 left: Box::new(left_expr),
2065 op: filter_op,
2066 right: Box::new(right_expr),
2067 })
2068 }
2069 LogicalExpression::Unary { op, operand } => {
2070 let operand_expr = convert_filter_expression(operand)?;
2071 let filter_op = convert_unary_op(*op)?;
2072 Ok(FilterExpression::Unary {
2073 op: filter_op,
2074 operand: Box::new(operand_expr),
2075 })
2076 }
2077 LogicalExpression::FunctionCall { name, args, .. } => {
2078 let filter_args: Vec<FilterExpression> = args
2079 .iter()
2080 .map(|a| convert_filter_expression(a))
2081 .collect::<Result<Vec<_>>>()?;
2082 Ok(FilterExpression::FunctionCall {
2083 name: name.clone(),
2084 args: filter_args,
2085 })
2086 }
2087 LogicalExpression::Case {
2088 operand,
2089 when_clauses,
2090 else_clause,
2091 } => {
2092 let filter_operand = operand
2093 .as_ref()
2094 .map(|e| convert_filter_expression(e))
2095 .transpose()?
2096 .map(Box::new);
2097 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2098 .iter()
2099 .map(|(cond, result)| {
2100 Ok((
2101 convert_filter_expression(cond)?,
2102 convert_filter_expression(result)?,
2103 ))
2104 })
2105 .collect::<Result<Vec<_>>>()?;
2106 let filter_else = else_clause
2107 .as_ref()
2108 .map(|e| convert_filter_expression(e))
2109 .transpose()?
2110 .map(Box::new);
2111 Ok(FilterExpression::Case {
2112 operand: filter_operand,
2113 when_clauses: filter_when_clauses,
2114 else_clause: filter_else,
2115 })
2116 }
2117 LogicalExpression::List(items) => {
2118 let filter_items: Vec<FilterExpression> = items
2119 .iter()
2120 .map(|item| convert_filter_expression(item))
2121 .collect::<Result<Vec<_>>>()?;
2122 Ok(FilterExpression::List(filter_items))
2123 }
2124 LogicalExpression::Map(pairs) => {
2125 let filter_pairs: Vec<(String, FilterExpression)> = pairs
2126 .iter()
2127 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
2128 .collect::<Result<Vec<_>>>()?;
2129 Ok(FilterExpression::Map(filter_pairs))
2130 }
2131 LogicalExpression::IndexAccess { base, index } => {
2132 let base_expr = convert_filter_expression(base)?;
2133 let index_expr = convert_filter_expression(index)?;
2134 Ok(FilterExpression::IndexAccess {
2135 base: Box::new(base_expr),
2136 index: Box::new(index_expr),
2137 })
2138 }
2139 LogicalExpression::SliceAccess { base, start, end } => {
2140 let base_expr = convert_filter_expression(base)?;
2141 let start_expr = start
2142 .as_ref()
2143 .map(|s| convert_filter_expression(s))
2144 .transpose()?
2145 .map(Box::new);
2146 let end_expr = end
2147 .as_ref()
2148 .map(|e| convert_filter_expression(e))
2149 .transpose()?
2150 .map(Box::new);
2151 Ok(FilterExpression::SliceAccess {
2152 base: Box::new(base_expr),
2153 start: start_expr,
2154 end: end_expr,
2155 })
2156 }
2157 LogicalExpression::Parameter(_) => Err(Error::Internal(
2158 "Parameters not yet supported in filters".to_string(),
2159 )),
2160 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2161 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2162 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2163 LogicalExpression::ListComprehension {
2164 variable,
2165 list_expr,
2166 filter_expr,
2167 map_expr,
2168 } => {
2169 let list = convert_filter_expression(list_expr)?;
2170 let filter = filter_expr
2171 .as_ref()
2172 .map(|f| convert_filter_expression(f))
2173 .transpose()?
2174 .map(Box::new);
2175 let map = convert_filter_expression(map_expr)?;
2176 Ok(FilterExpression::ListComprehension {
2177 variable: variable.clone(),
2178 list_expr: Box::new(list),
2179 filter_expr: filter,
2180 map_expr: Box::new(map),
2181 })
2182 }
2183 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
2184 Error::Internal("Subqueries not yet supported in filters".to_string()),
2185 ),
2186 }
2187}
2188
2189fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
2191 use grafeo_common::types::Value;
2192 match value {
2193 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
2195 Value::Int64(_) => LogicalType::Int64,
2196 Value::Float64(_) => LogicalType::Float64,
2197 Value::String(_) => LogicalType::String,
2198 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
2200 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, }
2203}
2204
2205fn expression_to_string(expr: &LogicalExpression) -> String {
2207 match expr {
2208 LogicalExpression::Variable(name) => name.clone(),
2209 LogicalExpression::Property { variable, property } => {
2210 format!("{variable}.{property}")
2211 }
2212 LogicalExpression::Literal(value) => format!("{value:?}"),
2213 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
2214 _ => "expr".to_string(),
2215 }
2216}
2217
2218pub struct PhysicalPlan {
2220 pub operator: Box<dyn Operator>,
2222 pub columns: Vec<String>,
2224 pub adaptive_context: Option<AdaptiveContext>,
2230}
2231
2232impl PhysicalPlan {
2233 #[must_use]
2235 pub fn columns(&self) -> &[String] {
2236 &self.columns
2237 }
2238
2239 pub fn into_operator(self) -> Box<dyn Operator> {
2241 self.operator
2242 }
2243
2244 #[must_use]
2246 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
2247 self.adaptive_context.as_ref()
2248 }
2249
2250 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
2252 self.adaptive_context.take()
2253 }
2254}
2255
2256#[cfg(test)]
2257mod tests {
2258 use super::*;
2259 use crate::query::plan::{
2260 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
2261 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
2262 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
2263 SortKey, SortOp,
2264 };
2265 use grafeo_common::types::Value;
2266
2267 fn create_test_store() -> Arc<LpgStore> {
2268 let store = Arc::new(LpgStore::new());
2269 store.create_node(&["Person"]);
2270 store.create_node(&["Person"]);
2271 store.create_node(&["Company"]);
2272 store
2273 }
2274
2275 #[test]
2278 fn test_plan_simple_scan() {
2279 let store = create_test_store();
2280 let planner = Planner::new(store);
2281
2282 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2284 items: vec![ReturnItem {
2285 expression: LogicalExpression::Variable("n".to_string()),
2286 alias: None,
2287 }],
2288 distinct: false,
2289 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2290 variable: "n".to_string(),
2291 label: Some("Person".to_string()),
2292 input: None,
2293 })),
2294 }));
2295
2296 let physical = planner.plan(&logical).unwrap();
2297 assert_eq!(physical.columns(), &["n"]);
2298 }
2299
2300 #[test]
2301 fn test_plan_scan_without_label() {
2302 let store = create_test_store();
2303 let planner = Planner::new(store);
2304
2305 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2307 items: vec![ReturnItem {
2308 expression: LogicalExpression::Variable("n".to_string()),
2309 alias: None,
2310 }],
2311 distinct: false,
2312 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2313 variable: "n".to_string(),
2314 label: None,
2315 input: None,
2316 })),
2317 }));
2318
2319 let physical = planner.plan(&logical).unwrap();
2320 assert_eq!(physical.columns(), &["n"]);
2321 }
2322
2323 #[test]
2324 fn test_plan_return_with_alias() {
2325 let store = create_test_store();
2326 let planner = Planner::new(store);
2327
2328 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2330 items: vec![ReturnItem {
2331 expression: LogicalExpression::Variable("n".to_string()),
2332 alias: Some("person".to_string()),
2333 }],
2334 distinct: false,
2335 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2336 variable: "n".to_string(),
2337 label: Some("Person".to_string()),
2338 input: None,
2339 })),
2340 }));
2341
2342 let physical = planner.plan(&logical).unwrap();
2343 assert_eq!(physical.columns(), &["person"]);
2344 }
2345
2346 #[test]
2347 fn test_plan_return_property() {
2348 let store = create_test_store();
2349 let planner = Planner::new(store);
2350
2351 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2353 items: vec![ReturnItem {
2354 expression: LogicalExpression::Property {
2355 variable: "n".to_string(),
2356 property: "name".to_string(),
2357 },
2358 alias: None,
2359 }],
2360 distinct: false,
2361 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2362 variable: "n".to_string(),
2363 label: Some("Person".to_string()),
2364 input: None,
2365 })),
2366 }));
2367
2368 let physical = planner.plan(&logical).unwrap();
2369 assert_eq!(physical.columns(), &["n.name"]);
2370 }
2371
2372 #[test]
2373 fn test_plan_return_literal() {
2374 let store = create_test_store();
2375 let planner = Planner::new(store);
2376
2377 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2379 items: vec![ReturnItem {
2380 expression: LogicalExpression::Literal(Value::Int64(42)),
2381 alias: Some("answer".to_string()),
2382 }],
2383 distinct: false,
2384 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2385 variable: "n".to_string(),
2386 label: None,
2387 input: None,
2388 })),
2389 }));
2390
2391 let physical = planner.plan(&logical).unwrap();
2392 assert_eq!(physical.columns(), &["answer"]);
2393 }
2394
2395 #[test]
2398 fn test_plan_filter_equality() {
2399 let store = create_test_store();
2400 let planner = Planner::new(store);
2401
2402 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2404 items: vec![ReturnItem {
2405 expression: LogicalExpression::Variable("n".to_string()),
2406 alias: None,
2407 }],
2408 distinct: false,
2409 input: Box::new(LogicalOperator::Filter(FilterOp {
2410 predicate: LogicalExpression::Binary {
2411 left: Box::new(LogicalExpression::Property {
2412 variable: "n".to_string(),
2413 property: "age".to_string(),
2414 }),
2415 op: BinaryOp::Eq,
2416 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2417 },
2418 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2419 variable: "n".to_string(),
2420 label: Some("Person".to_string()),
2421 input: None,
2422 })),
2423 })),
2424 }));
2425
2426 let physical = planner.plan(&logical).unwrap();
2427 assert_eq!(physical.columns(), &["n"]);
2428 }
2429
2430 #[test]
2431 fn test_plan_filter_compound_and() {
2432 let store = create_test_store();
2433 let planner = Planner::new(store);
2434
2435 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2437 items: vec![ReturnItem {
2438 expression: LogicalExpression::Variable("n".to_string()),
2439 alias: None,
2440 }],
2441 distinct: false,
2442 input: Box::new(LogicalOperator::Filter(FilterOp {
2443 predicate: LogicalExpression::Binary {
2444 left: Box::new(LogicalExpression::Binary {
2445 left: Box::new(LogicalExpression::Property {
2446 variable: "n".to_string(),
2447 property: "age".to_string(),
2448 }),
2449 op: BinaryOp::Gt,
2450 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
2451 }),
2452 op: BinaryOp::And,
2453 right: Box::new(LogicalExpression::Binary {
2454 left: Box::new(LogicalExpression::Property {
2455 variable: "n".to_string(),
2456 property: "age".to_string(),
2457 }),
2458 op: BinaryOp::Lt,
2459 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
2460 }),
2461 },
2462 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2463 variable: "n".to_string(),
2464 label: None,
2465 input: None,
2466 })),
2467 })),
2468 }));
2469
2470 let physical = planner.plan(&logical).unwrap();
2471 assert_eq!(physical.columns(), &["n"]);
2472 }
2473
2474 #[test]
2475 fn test_plan_filter_unary_not() {
2476 let store = create_test_store();
2477 let planner = Planner::new(store);
2478
2479 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2481 items: vec![ReturnItem {
2482 expression: LogicalExpression::Variable("n".to_string()),
2483 alias: None,
2484 }],
2485 distinct: false,
2486 input: Box::new(LogicalOperator::Filter(FilterOp {
2487 predicate: LogicalExpression::Unary {
2488 op: UnaryOp::Not,
2489 operand: Box::new(LogicalExpression::Property {
2490 variable: "n".to_string(),
2491 property: "active".to_string(),
2492 }),
2493 },
2494 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2495 variable: "n".to_string(),
2496 label: None,
2497 input: None,
2498 })),
2499 })),
2500 }));
2501
2502 let physical = planner.plan(&logical).unwrap();
2503 assert_eq!(physical.columns(), &["n"]);
2504 }
2505
2506 #[test]
2507 fn test_plan_filter_is_null() {
2508 let store = create_test_store();
2509 let planner = Planner::new(store);
2510
2511 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2513 items: vec![ReturnItem {
2514 expression: LogicalExpression::Variable("n".to_string()),
2515 alias: None,
2516 }],
2517 distinct: false,
2518 input: Box::new(LogicalOperator::Filter(FilterOp {
2519 predicate: LogicalExpression::Unary {
2520 op: UnaryOp::IsNull,
2521 operand: Box::new(LogicalExpression::Property {
2522 variable: "n".to_string(),
2523 property: "email".to_string(),
2524 }),
2525 },
2526 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2527 variable: "n".to_string(),
2528 label: None,
2529 input: None,
2530 })),
2531 })),
2532 }));
2533
2534 let physical = planner.plan(&logical).unwrap();
2535 assert_eq!(physical.columns(), &["n"]);
2536 }
2537
2538 #[test]
2539 fn test_plan_filter_function_call() {
2540 let store = create_test_store();
2541 let planner = Planner::new(store);
2542
2543 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2545 items: vec![ReturnItem {
2546 expression: LogicalExpression::Variable("n".to_string()),
2547 alias: None,
2548 }],
2549 distinct: false,
2550 input: Box::new(LogicalOperator::Filter(FilterOp {
2551 predicate: LogicalExpression::Binary {
2552 left: Box::new(LogicalExpression::FunctionCall {
2553 name: "size".to_string(),
2554 args: vec![LogicalExpression::Property {
2555 variable: "n".to_string(),
2556 property: "friends".to_string(),
2557 }],
2558 distinct: false,
2559 }),
2560 op: BinaryOp::Gt,
2561 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
2562 },
2563 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2564 variable: "n".to_string(),
2565 label: None,
2566 input: None,
2567 })),
2568 })),
2569 }));
2570
2571 let physical = planner.plan(&logical).unwrap();
2572 assert_eq!(physical.columns(), &["n"]);
2573 }
2574
2575 #[test]
2578 fn test_plan_expand_outgoing() {
2579 let store = create_test_store();
2580 let planner = Planner::new(store);
2581
2582 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2584 items: vec![
2585 ReturnItem {
2586 expression: LogicalExpression::Variable("a".to_string()),
2587 alias: None,
2588 },
2589 ReturnItem {
2590 expression: LogicalExpression::Variable("b".to_string()),
2591 alias: None,
2592 },
2593 ],
2594 distinct: false,
2595 input: Box::new(LogicalOperator::Expand(ExpandOp {
2596 from_variable: "a".to_string(),
2597 to_variable: "b".to_string(),
2598 edge_variable: None,
2599 direction: ExpandDirection::Outgoing,
2600 edge_type: Some("KNOWS".to_string()),
2601 min_hops: 1,
2602 max_hops: Some(1),
2603 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2604 variable: "a".to_string(),
2605 label: Some("Person".to_string()),
2606 input: None,
2607 })),
2608 path_alias: None,
2609 })),
2610 }));
2611
2612 let physical = planner.plan(&logical).unwrap();
2613 assert!(physical.columns().contains(&"a".to_string()));
2615 assert!(physical.columns().contains(&"b".to_string()));
2616 }
2617
2618 #[test]
2619 fn test_plan_expand_with_edge_variable() {
2620 let store = create_test_store();
2621 let planner = Planner::new(store);
2622
2623 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2625 items: vec![
2626 ReturnItem {
2627 expression: LogicalExpression::Variable("a".to_string()),
2628 alias: None,
2629 },
2630 ReturnItem {
2631 expression: LogicalExpression::Variable("r".to_string()),
2632 alias: None,
2633 },
2634 ReturnItem {
2635 expression: LogicalExpression::Variable("b".to_string()),
2636 alias: None,
2637 },
2638 ],
2639 distinct: false,
2640 input: Box::new(LogicalOperator::Expand(ExpandOp {
2641 from_variable: "a".to_string(),
2642 to_variable: "b".to_string(),
2643 edge_variable: Some("r".to_string()),
2644 direction: ExpandDirection::Outgoing,
2645 edge_type: Some("KNOWS".to_string()),
2646 min_hops: 1,
2647 max_hops: Some(1),
2648 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2649 variable: "a".to_string(),
2650 label: None,
2651 input: None,
2652 })),
2653 path_alias: None,
2654 })),
2655 }));
2656
2657 let physical = planner.plan(&logical).unwrap();
2658 assert!(physical.columns().contains(&"a".to_string()));
2659 assert!(physical.columns().contains(&"r".to_string()));
2660 assert!(physical.columns().contains(&"b".to_string()));
2661 }
2662
2663 #[test]
2666 fn test_plan_limit() {
2667 let store = create_test_store();
2668 let planner = Planner::new(store);
2669
2670 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2672 items: vec![ReturnItem {
2673 expression: LogicalExpression::Variable("n".to_string()),
2674 alias: None,
2675 }],
2676 distinct: false,
2677 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2678 count: 10,
2679 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2680 variable: "n".to_string(),
2681 label: None,
2682 input: None,
2683 })),
2684 })),
2685 }));
2686
2687 let physical = planner.plan(&logical).unwrap();
2688 assert_eq!(physical.columns(), &["n"]);
2689 }
2690
2691 #[test]
2692 fn test_plan_skip() {
2693 let store = create_test_store();
2694 let planner = Planner::new(store);
2695
2696 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2698 items: vec![ReturnItem {
2699 expression: LogicalExpression::Variable("n".to_string()),
2700 alias: None,
2701 }],
2702 distinct: false,
2703 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2704 count: 5,
2705 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2706 variable: "n".to_string(),
2707 label: None,
2708 input: None,
2709 })),
2710 })),
2711 }));
2712
2713 let physical = planner.plan(&logical).unwrap();
2714 assert_eq!(physical.columns(), &["n"]);
2715 }
2716
2717 #[test]
2718 fn test_plan_sort() {
2719 let store = create_test_store();
2720 let planner = Planner::new(store);
2721
2722 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2724 items: vec![ReturnItem {
2725 expression: LogicalExpression::Variable("n".to_string()),
2726 alias: None,
2727 }],
2728 distinct: false,
2729 input: Box::new(LogicalOperator::Sort(SortOp {
2730 keys: vec![SortKey {
2731 expression: LogicalExpression::Variable("n".to_string()),
2732 order: SortOrder::Ascending,
2733 }],
2734 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2735 variable: "n".to_string(),
2736 label: None,
2737 input: None,
2738 })),
2739 })),
2740 }));
2741
2742 let physical = planner.plan(&logical).unwrap();
2743 assert_eq!(physical.columns(), &["n"]);
2744 }
2745
2746 #[test]
2747 fn test_plan_sort_descending() {
2748 let store = create_test_store();
2749 let planner = Planner::new(store);
2750
2751 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2753 items: vec![ReturnItem {
2754 expression: LogicalExpression::Variable("n".to_string()),
2755 alias: None,
2756 }],
2757 distinct: false,
2758 input: Box::new(LogicalOperator::Sort(SortOp {
2759 keys: vec![SortKey {
2760 expression: LogicalExpression::Variable("n".to_string()),
2761 order: SortOrder::Descending,
2762 }],
2763 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2764 variable: "n".to_string(),
2765 label: None,
2766 input: None,
2767 })),
2768 })),
2769 }));
2770
2771 let physical = planner.plan(&logical).unwrap();
2772 assert_eq!(physical.columns(), &["n"]);
2773 }
2774
2775 #[test]
2776 fn test_plan_distinct() {
2777 let store = create_test_store();
2778 let planner = Planner::new(store);
2779
2780 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2782 items: vec![ReturnItem {
2783 expression: LogicalExpression::Variable("n".to_string()),
2784 alias: None,
2785 }],
2786 distinct: false,
2787 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2788 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2789 variable: "n".to_string(),
2790 label: None,
2791 input: None,
2792 })),
2793 columns: None,
2794 })),
2795 }));
2796
2797 let physical = planner.plan(&logical).unwrap();
2798 assert_eq!(physical.columns(), &["n"]);
2799 }
2800
2801 #[test]
2804 fn test_plan_aggregate_count() {
2805 let store = create_test_store();
2806 let planner = Planner::new(store);
2807
2808 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2810 items: vec![ReturnItem {
2811 expression: LogicalExpression::Variable("cnt".to_string()),
2812 alias: None,
2813 }],
2814 distinct: false,
2815 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
2816 group_by: vec![],
2817 aggregates: vec![LogicalAggregateExpr {
2818 function: LogicalAggregateFunction::Count,
2819 expression: Some(LogicalExpression::Variable("n".to_string())),
2820 distinct: false,
2821 alias: Some("cnt".to_string()),
2822 percentile: None,
2823 }],
2824 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2825 variable: "n".to_string(),
2826 label: None,
2827 input: None,
2828 })),
2829 having: None,
2830 })),
2831 }));
2832
2833 let physical = planner.plan(&logical).unwrap();
2834 assert!(physical.columns().contains(&"cnt".to_string()));
2835 }
2836
2837 #[test]
2838 fn test_plan_aggregate_with_group_by() {
2839 let store = create_test_store();
2840 let planner = Planner::new(store);
2841
2842 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2844 group_by: vec![LogicalExpression::Property {
2845 variable: "n".to_string(),
2846 property: "city".to_string(),
2847 }],
2848 aggregates: vec![LogicalAggregateExpr {
2849 function: LogicalAggregateFunction::Count,
2850 expression: Some(LogicalExpression::Variable("n".to_string())),
2851 distinct: false,
2852 alias: Some("cnt".to_string()),
2853 percentile: None,
2854 }],
2855 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2856 variable: "n".to_string(),
2857 label: Some("Person".to_string()),
2858 input: None,
2859 })),
2860 having: None,
2861 }));
2862
2863 let physical = planner.plan(&logical).unwrap();
2864 assert_eq!(physical.columns().len(), 2);
2865 }
2866
2867 #[test]
2868 fn test_plan_aggregate_sum() {
2869 let store = create_test_store();
2870 let planner = Planner::new(store);
2871
2872 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2874 group_by: vec![],
2875 aggregates: vec![LogicalAggregateExpr {
2876 function: LogicalAggregateFunction::Sum,
2877 expression: Some(LogicalExpression::Property {
2878 variable: "n".to_string(),
2879 property: "value".to_string(),
2880 }),
2881 distinct: false,
2882 alias: Some("total".to_string()),
2883 percentile: None,
2884 }],
2885 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2886 variable: "n".to_string(),
2887 label: None,
2888 input: None,
2889 })),
2890 having: None,
2891 }));
2892
2893 let physical = planner.plan(&logical).unwrap();
2894 assert!(physical.columns().contains(&"total".to_string()));
2895 }
2896
2897 #[test]
2898 fn test_plan_aggregate_avg() {
2899 let store = create_test_store();
2900 let planner = Planner::new(store);
2901
2902 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2904 group_by: vec![],
2905 aggregates: vec![LogicalAggregateExpr {
2906 function: LogicalAggregateFunction::Avg,
2907 expression: Some(LogicalExpression::Property {
2908 variable: "n".to_string(),
2909 property: "score".to_string(),
2910 }),
2911 distinct: false,
2912 alias: Some("average".to_string()),
2913 percentile: None,
2914 }],
2915 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2916 variable: "n".to_string(),
2917 label: None,
2918 input: None,
2919 })),
2920 having: None,
2921 }));
2922
2923 let physical = planner.plan(&logical).unwrap();
2924 assert!(physical.columns().contains(&"average".to_string()));
2925 }
2926
2927 #[test]
2928 fn test_plan_aggregate_min_max() {
2929 let store = create_test_store();
2930 let planner = Planner::new(store);
2931
2932 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2934 group_by: vec![],
2935 aggregates: vec![
2936 LogicalAggregateExpr {
2937 function: LogicalAggregateFunction::Min,
2938 expression: Some(LogicalExpression::Property {
2939 variable: "n".to_string(),
2940 property: "age".to_string(),
2941 }),
2942 distinct: false,
2943 alias: Some("youngest".to_string()),
2944 percentile: None,
2945 },
2946 LogicalAggregateExpr {
2947 function: LogicalAggregateFunction::Max,
2948 expression: Some(LogicalExpression::Property {
2949 variable: "n".to_string(),
2950 property: "age".to_string(),
2951 }),
2952 distinct: false,
2953 alias: Some("oldest".to_string()),
2954 percentile: None,
2955 },
2956 ],
2957 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2958 variable: "n".to_string(),
2959 label: None,
2960 input: None,
2961 })),
2962 having: None,
2963 }));
2964
2965 let physical = planner.plan(&logical).unwrap();
2966 assert!(physical.columns().contains(&"youngest".to_string()));
2967 assert!(physical.columns().contains(&"oldest".to_string()));
2968 }
2969
2970 #[test]
2973 fn test_plan_inner_join() {
2974 let store = create_test_store();
2975 let planner = Planner::new(store);
2976
2977 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2979 items: vec![
2980 ReturnItem {
2981 expression: LogicalExpression::Variable("a".to_string()),
2982 alias: None,
2983 },
2984 ReturnItem {
2985 expression: LogicalExpression::Variable("b".to_string()),
2986 alias: None,
2987 },
2988 ],
2989 distinct: false,
2990 input: Box::new(LogicalOperator::Join(JoinOp {
2991 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2992 variable: "a".to_string(),
2993 label: Some("Person".to_string()),
2994 input: None,
2995 })),
2996 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2997 variable: "b".to_string(),
2998 label: Some("Company".to_string()),
2999 input: None,
3000 })),
3001 join_type: JoinType::Inner,
3002 conditions: vec![JoinCondition {
3003 left: LogicalExpression::Variable("a".to_string()),
3004 right: LogicalExpression::Variable("b".to_string()),
3005 }],
3006 })),
3007 }));
3008
3009 let physical = planner.plan(&logical).unwrap();
3010 assert!(physical.columns().contains(&"a".to_string()));
3011 assert!(physical.columns().contains(&"b".to_string()));
3012 }
3013
3014 #[test]
3015 fn test_plan_cross_join() {
3016 let store = create_test_store();
3017 let planner = Planner::new(store);
3018
3019 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3021 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3022 variable: "a".to_string(),
3023 label: None,
3024 input: None,
3025 })),
3026 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3027 variable: "b".to_string(),
3028 label: None,
3029 input: None,
3030 })),
3031 join_type: JoinType::Cross,
3032 conditions: vec![],
3033 }));
3034
3035 let physical = planner.plan(&logical).unwrap();
3036 assert_eq!(physical.columns().len(), 2);
3037 }
3038
3039 #[test]
3040 fn test_plan_left_join() {
3041 let store = create_test_store();
3042 let planner = Planner::new(store);
3043
3044 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3045 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3046 variable: "a".to_string(),
3047 label: None,
3048 input: None,
3049 })),
3050 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3051 variable: "b".to_string(),
3052 label: None,
3053 input: None,
3054 })),
3055 join_type: JoinType::Left,
3056 conditions: vec![],
3057 }));
3058
3059 let physical = planner.plan(&logical).unwrap();
3060 assert_eq!(physical.columns().len(), 2);
3061 }
3062
3063 #[test]
3066 fn test_plan_create_node() {
3067 let store = create_test_store();
3068 let planner = Planner::new(store);
3069
3070 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3072 variable: "n".to_string(),
3073 labels: vec!["Person".to_string()],
3074 properties: vec![(
3075 "name".to_string(),
3076 LogicalExpression::Literal(Value::String("Alice".into())),
3077 )],
3078 input: None,
3079 }));
3080
3081 let physical = planner.plan(&logical).unwrap();
3082 assert!(physical.columns().contains(&"n".to_string()));
3083 }
3084
3085 #[test]
3086 fn test_plan_create_edge() {
3087 let store = create_test_store();
3088 let planner = Planner::new(store);
3089
3090 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
3092 variable: Some("r".to_string()),
3093 from_variable: "a".to_string(),
3094 to_variable: "b".to_string(),
3095 edge_type: "KNOWS".to_string(),
3096 properties: vec![],
3097 input: Box::new(LogicalOperator::Join(JoinOp {
3098 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3099 variable: "a".to_string(),
3100 label: None,
3101 input: None,
3102 })),
3103 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3104 variable: "b".to_string(),
3105 label: None,
3106 input: None,
3107 })),
3108 join_type: JoinType::Cross,
3109 conditions: vec![],
3110 })),
3111 }));
3112
3113 let physical = planner.plan(&logical).unwrap();
3114 assert!(physical.columns().contains(&"r".to_string()));
3115 }
3116
3117 #[test]
3118 fn test_plan_delete_node() {
3119 let store = create_test_store();
3120 let planner = Planner::new(store);
3121
3122 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
3124 variable: "n".to_string(),
3125 detach: false,
3126 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3127 variable: "n".to_string(),
3128 label: None,
3129 input: None,
3130 })),
3131 }));
3132
3133 let physical = planner.plan(&logical).unwrap();
3134 assert!(physical.columns().contains(&"deleted_count".to_string()));
3135 }
3136
3137 #[test]
3140 fn test_plan_empty_errors() {
3141 let store = create_test_store();
3142 let planner = Planner::new(store);
3143
3144 let logical = LogicalPlan::new(LogicalOperator::Empty);
3145 let result = planner.plan(&logical);
3146 assert!(result.is_err());
3147 }
3148
3149 #[test]
3150 fn test_plan_missing_variable_in_return() {
3151 let store = create_test_store();
3152 let planner = Planner::new(store);
3153
3154 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3156 items: vec![ReturnItem {
3157 expression: LogicalExpression::Variable("missing".to_string()),
3158 alias: None,
3159 }],
3160 distinct: false,
3161 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3162 variable: "n".to_string(),
3163 label: None,
3164 input: None,
3165 })),
3166 }));
3167
3168 let result = planner.plan(&logical);
3169 assert!(result.is_err());
3170 }
3171
3172 #[test]
3175 fn test_convert_binary_ops() {
3176 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
3177 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
3178 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
3179 assert!(convert_binary_op(BinaryOp::Le).is_ok());
3180 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
3181 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
3182 assert!(convert_binary_op(BinaryOp::And).is_ok());
3183 assert!(convert_binary_op(BinaryOp::Or).is_ok());
3184 assert!(convert_binary_op(BinaryOp::Add).is_ok());
3185 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
3186 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
3187 assert!(convert_binary_op(BinaryOp::Div).is_ok());
3188 }
3189
3190 #[test]
3191 fn test_convert_unary_ops() {
3192 assert!(convert_unary_op(UnaryOp::Not).is_ok());
3193 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
3194 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
3195 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
3196 }
3197
3198 #[test]
3199 fn test_convert_aggregate_functions() {
3200 assert!(matches!(
3201 convert_aggregate_function(LogicalAggregateFunction::Count),
3202 PhysicalAggregateFunction::Count
3203 ));
3204 assert!(matches!(
3205 convert_aggregate_function(LogicalAggregateFunction::Sum),
3206 PhysicalAggregateFunction::Sum
3207 ));
3208 assert!(matches!(
3209 convert_aggregate_function(LogicalAggregateFunction::Avg),
3210 PhysicalAggregateFunction::Avg
3211 ));
3212 assert!(matches!(
3213 convert_aggregate_function(LogicalAggregateFunction::Min),
3214 PhysicalAggregateFunction::Min
3215 ));
3216 assert!(matches!(
3217 convert_aggregate_function(LogicalAggregateFunction::Max),
3218 PhysicalAggregateFunction::Max
3219 ));
3220 }
3221
3222 #[test]
3223 fn test_planner_accessors() {
3224 let store = create_test_store();
3225 let planner = Planner::new(Arc::clone(&store));
3226
3227 assert!(planner.tx_id().is_none());
3228 assert!(planner.tx_manager().is_none());
3229 let _ = planner.viewing_epoch(); }
3231
3232 #[test]
3233 fn test_physical_plan_accessors() {
3234 let store = create_test_store();
3235 let planner = Planner::new(store);
3236
3237 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
3238 variable: "n".to_string(),
3239 label: None,
3240 input: None,
3241 }));
3242
3243 let physical = planner.plan(&logical).unwrap();
3244 assert_eq!(physical.columns(), &["n"]);
3245
3246 let _ = physical.into_operator();
3248 }
3249}