1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::LogicalType;
15use grafeo_common::types::{EpochId, TxId};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
22 ExpandStep, ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator,
23 FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
24 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator, LimitOperator,
25 MergeOperator, NestedLoopJoinOperator, NullOrder, Operator, ProjectExpr, ProjectOperator,
26 PropertySource, RemoveLabelOperator, ScanOperator, SetPropertyOperator, ShortestPathOperator,
27 SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
28 UnaryFilterOp, UnionOperator, UnwindOperator, VariableLengthExpandOperator,
29};
30use grafeo_core::graph::{Direction, lpg::LpgStore};
31use std::collections::HashMap;
32use std::sync::Arc;
33
34use crate::transaction::TransactionManager;
35
36pub struct Planner {
38 store: Arc<LpgStore>,
40 tx_manager: Option<Arc<TransactionManager>>,
42 tx_id: Option<TxId>,
44 viewing_epoch: EpochId,
46 anon_edge_counter: std::cell::Cell<u32>,
48 factorized_execution: bool,
50}
51
52impl Planner {
53 #[must_use]
58 pub fn new(store: Arc<LpgStore>) -> Self {
59 let epoch = store.current_epoch();
60 Self {
61 store,
62 tx_manager: None,
63 tx_id: None,
64 viewing_epoch: epoch,
65 anon_edge_counter: std::cell::Cell::new(0),
66 factorized_execution: true,
67 }
68 }
69
70 #[must_use]
79 pub fn with_context(
80 store: Arc<LpgStore>,
81 tx_manager: Arc<TransactionManager>,
82 tx_id: Option<TxId>,
83 viewing_epoch: EpochId,
84 ) -> Self {
85 Self {
86 store,
87 tx_manager: Some(tx_manager),
88 tx_id,
89 viewing_epoch,
90 anon_edge_counter: std::cell::Cell::new(0),
91 factorized_execution: true,
92 }
93 }
94
95 #[must_use]
97 pub fn viewing_epoch(&self) -> EpochId {
98 self.viewing_epoch
99 }
100
101 #[must_use]
103 pub fn tx_id(&self) -> Option<TxId> {
104 self.tx_id
105 }
106
107 #[must_use]
109 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
110 self.tx_manager.as_ref()
111 }
112
113 #[must_use]
115 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
116 self.factorized_execution = enabled;
117 self
118 }
119
120 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
124 match op {
125 LogicalOperator::Expand(expand) => {
126 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
128
129 if is_single_hop {
130 let (inner_count, base) = Self::count_expand_chain(&expand.input);
131 (inner_count + 1, base)
132 } else {
133 (0, op)
135 }
136 }
137 _ => (0, op),
138 }
139 }
140
141 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
145 let mut chain = Vec::new();
146 let mut current = op;
147
148 while let LogicalOperator::Expand(expand) = current {
149 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
151 if !is_single_hop {
152 break;
153 }
154 chain.push(expand);
155 current = &expand.input;
156 }
157
158 chain.reverse();
160 chain
161 }
162
163 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
169 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
170 Ok(PhysicalPlan {
171 operator,
172 columns,
173 adaptive_context: None,
174 })
175 }
176
177 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
186 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
187
188 let mut adaptive_context = AdaptiveContext::new();
190 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
191
192 Ok(PhysicalPlan {
193 operator,
194 columns,
195 adaptive_context: Some(adaptive_context),
196 })
197 }
198
199 fn collect_cardinality_estimates(
201 &self,
202 op: &LogicalOperator,
203 ctx: &mut AdaptiveContext,
204 depth: usize,
205 ) {
206 match op {
207 LogicalOperator::NodeScan(scan) => {
208 let estimate = if let Some(label) = &scan.label {
210 self.store.nodes_by_label(label).len() as f64
211 } else {
212 self.store.node_count() as f64
213 };
214 let id = format!("scan_{}", scan.variable);
215 ctx.set_estimate(&id, estimate);
216
217 if let Some(input) = &scan.input {
219 self.collect_cardinality_estimates(input, ctx, depth + 1);
220 }
221 }
222 LogicalOperator::Filter(filter) => {
223 let input_estimate = self.estimate_cardinality(&filter.input);
225 let estimate = input_estimate * 0.3;
226 let id = format!("filter_{depth}");
227 ctx.set_estimate(&id, estimate);
228
229 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
230 }
231 LogicalOperator::Expand(expand) => {
232 let input_estimate = self.estimate_cardinality(&expand.input);
234 let avg_degree = 10.0; let estimate = input_estimate * avg_degree;
236 let id = format!("expand_{}", expand.to_variable);
237 ctx.set_estimate(&id, estimate);
238
239 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
240 }
241 LogicalOperator::Join(join) => {
242 let left_est = self.estimate_cardinality(&join.left);
244 let right_est = self.estimate_cardinality(&join.right);
245 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
247 ctx.set_estimate(&id, estimate);
248
249 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
250 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
251 }
252 LogicalOperator::Aggregate(agg) => {
253 let input_estimate = self.estimate_cardinality(&agg.input);
255 let estimate = if agg.group_by.is_empty() {
256 1.0 } else {
258 (input_estimate * 0.1).max(1.0) };
260 let id = format!("aggregate_{depth}");
261 ctx.set_estimate(&id, estimate);
262
263 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
264 }
265 LogicalOperator::Distinct(distinct) => {
266 let input_estimate = self.estimate_cardinality(&distinct.input);
267 let estimate = (input_estimate * 0.5).max(1.0);
268 let id = format!("distinct_{depth}");
269 ctx.set_estimate(&id, estimate);
270
271 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
272 }
273 LogicalOperator::Return(ret) => {
274 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
275 }
276 LogicalOperator::Limit(limit) => {
277 let input_estimate = self.estimate_cardinality(&limit.input);
278 let estimate = (input_estimate).min(limit.count as f64);
279 let id = format!("limit_{depth}");
280 ctx.set_estimate(&id, estimate);
281
282 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
283 }
284 LogicalOperator::Skip(skip) => {
285 let input_estimate = self.estimate_cardinality(&skip.input);
286 let estimate = (input_estimate - skip.count as f64).max(0.0);
287 let id = format!("skip_{depth}");
288 ctx.set_estimate(&id, estimate);
289
290 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
291 }
292 LogicalOperator::Sort(sort) => {
293 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
295 }
296 LogicalOperator::Union(union) => {
297 let estimate: f64 = union
298 .inputs
299 .iter()
300 .map(|input| self.estimate_cardinality(input))
301 .sum();
302 let id = format!("union_{depth}");
303 ctx.set_estimate(&id, estimate);
304
305 for input in &union.inputs {
306 self.collect_cardinality_estimates(input, ctx, depth + 1);
307 }
308 }
309 _ => {
310 }
312 }
313 }
314
315 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
317 match op {
318 LogicalOperator::NodeScan(scan) => {
319 if let Some(label) = &scan.label {
320 self.store.nodes_by_label(label).len() as f64
321 } else {
322 self.store.node_count() as f64
323 }
324 }
325 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
326 LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
327 LogicalOperator::Join(join) => {
328 let left = self.estimate_cardinality(&join.left);
329 let right = self.estimate_cardinality(&join.right);
330 (left * right).sqrt()
331 }
332 LogicalOperator::Aggregate(agg) => {
333 if agg.group_by.is_empty() {
334 1.0
335 } else {
336 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
337 }
338 }
339 LogicalOperator::Distinct(distinct) => {
340 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
341 }
342 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
343 LogicalOperator::Limit(limit) => self
344 .estimate_cardinality(&limit.input)
345 .min(limit.count as f64),
346 LogicalOperator::Skip(skip) => {
347 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
348 }
349 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
350 LogicalOperator::Union(union) => union
351 .inputs
352 .iter()
353 .map(|input| self.estimate_cardinality(input))
354 .sum(),
355 _ => 1000.0, }
357 }
358
359 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
361 match op {
362 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
363 LogicalOperator::Expand(expand) => {
364 if self.factorized_execution {
366 let (chain_len, _base) = Self::count_expand_chain(op);
367 if chain_len >= 2 {
368 return self.plan_expand_chain(op);
370 }
371 }
372 self.plan_expand(expand)
373 }
374 LogicalOperator::Return(ret) => self.plan_return(ret),
375 LogicalOperator::Filter(filter) => self.plan_filter(filter),
376 LogicalOperator::Project(project) => self.plan_project(project),
377 LogicalOperator::Limit(limit) => self.plan_limit(limit),
378 LogicalOperator::Skip(skip) => self.plan_skip(skip),
379 LogicalOperator::Sort(sort) => self.plan_sort(sort),
380 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
381 LogicalOperator::Join(join) => self.plan_join(join),
382 LogicalOperator::Union(union) => self.plan_union(union),
383 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
384 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
385 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
386 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
387 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
388 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
389 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
390 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
391 LogicalOperator::Merge(merge) => self.plan_merge(merge),
392 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
393 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
394 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
395 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
396 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
397 _ => Err(Error::Internal(format!(
398 "Unsupported operator: {:?}",
399 std::mem::discriminant(op)
400 ))),
401 }
402 }
403
404 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
406 let scan_op = if let Some(label) = &scan.label {
407 ScanOperator::with_label(Arc::clone(&self.store), label)
408 } else {
409 ScanOperator::new(Arc::clone(&self.store))
410 };
411
412 let scan_operator: Box<dyn Operator> =
414 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
415
416 if let Some(input) = &scan.input {
418 let (input_op, mut input_columns) = self.plan_operator(input)?;
419
420 let mut output_schema: Vec<LogicalType> =
422 input_columns.iter().map(|_| LogicalType::Any).collect();
423 output_schema.push(LogicalType::Node);
424
425 input_columns.push(scan.variable.clone());
427
428 let join_op = Box::new(NestedLoopJoinOperator::new(
430 input_op,
431 scan_operator,
432 None, PhysicalJoinType::Cross,
434 output_schema,
435 ));
436
437 Ok((join_op, input_columns))
438 } else {
439 let columns = vec![scan.variable.clone()];
440 Ok((scan_operator, columns))
441 }
442 }
443
444 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
446 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
448
449 let source_column = input_columns
451 .iter()
452 .position(|c| c == &expand.from_variable)
453 .ok_or_else(|| {
454 Error::Internal(format!(
455 "Source variable '{}' not found in input columns",
456 expand.from_variable
457 ))
458 })?;
459
460 let direction = match expand.direction {
462 ExpandDirection::Outgoing => Direction::Outgoing,
463 ExpandDirection::Incoming => Direction::Incoming,
464 ExpandDirection::Both => Direction::Both,
465 };
466
467 let is_variable_length =
469 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
470
471 let operator: Box<dyn Operator> = if is_variable_length {
472 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
475 Arc::clone(&self.store),
476 input_op,
477 source_column,
478 direction,
479 expand.edge_type.clone(),
480 expand.min_hops,
481 max_hops,
482 )
483 .with_tx_context(self.viewing_epoch, self.tx_id);
484
485 if expand.path_alias.is_some() {
487 expand_op = expand_op.with_path_length_output();
488 }
489
490 Box::new(expand_op)
491 } else {
492 let expand_op = ExpandOperator::new(
494 Arc::clone(&self.store),
495 input_op,
496 source_column,
497 direction,
498 expand.edge_type.clone(),
499 )
500 .with_tx_context(self.viewing_epoch, self.tx_id);
501 Box::new(expand_op)
502 };
503
504 let mut columns = input_columns;
507
508 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
510 let count = self.anon_edge_counter.get();
511 self.anon_edge_counter.set(count + 1);
512 format!("_anon_edge_{}", count)
513 });
514 columns.push(edge_col_name);
515
516 columns.push(expand.to_variable.clone());
517
518 if let Some(ref path_alias) = expand.path_alias {
520 columns.push(format!("_path_length_{}", path_alias));
521 }
522
523 Ok((operator, columns))
524 }
525
526 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
534 let expands = Self::collect_expand_chain(op);
535 if expands.is_empty() {
536 return Err(Error::Internal("Empty expand chain".to_string()));
537 }
538
539 let first_expand = expands[0];
541 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
542
543 let mut columns = base_columns.clone();
544 let mut steps = Vec::new();
545
546 let mut is_first = true;
551
552 for expand in &expands {
553 let source_column = if is_first {
555 base_columns
557 .iter()
558 .position(|c| c == &expand.from_variable)
559 .ok_or_else(|| {
560 Error::Internal(format!(
561 "Source variable '{}' not found in base columns",
562 expand.from_variable
563 ))
564 })?
565 } else {
566 1
569 };
570
571 let direction = match expand.direction {
573 ExpandDirection::Outgoing => Direction::Outgoing,
574 ExpandDirection::Incoming => Direction::Incoming,
575 ExpandDirection::Both => Direction::Both,
576 };
577
578 steps.push(ExpandStep {
580 source_column,
581 direction,
582 edge_type: expand.edge_type.clone(),
583 });
584
585 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
587 let count = self.anon_edge_counter.get();
588 self.anon_edge_counter.set(count + 1);
589 format!("_anon_edge_{}", count)
590 });
591 columns.push(edge_col_name);
592 columns.push(expand.to_variable.clone());
593
594 is_first = false;
595 }
596
597 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
599
600 if let Some(tx_id) = self.tx_id {
601 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
602 } else {
603 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
604 }
605
606 Ok((Box::new(lazy_op), columns))
607 }
608
609 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
611 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
613
614 let variable_columns: HashMap<String, usize> = input_columns
616 .iter()
617 .enumerate()
618 .map(|(i, name)| (name.clone(), i))
619 .collect();
620
621 let columns: Vec<String> = ret
623 .items
624 .iter()
625 .map(|item| {
626 item.alias.clone().unwrap_or_else(|| {
627 expression_to_string(&item.expression)
629 })
630 })
631 .collect();
632
633 let needs_project = ret
635 .items
636 .iter()
637 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
638
639 if needs_project {
640 let mut projections = Vec::with_capacity(ret.items.len());
642 let mut output_types = Vec::with_capacity(ret.items.len());
643
644 for item in &ret.items {
645 match &item.expression {
646 LogicalExpression::Variable(name) => {
647 let col_idx = *variable_columns.get(name).ok_or_else(|| {
648 Error::Internal(format!("Variable '{}' not found in input", name))
649 })?;
650 projections.push(ProjectExpr::Column(col_idx));
651 output_types.push(LogicalType::Node);
653 }
654 LogicalExpression::Property { variable, property } => {
655 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
656 Error::Internal(format!("Variable '{}' not found in input", variable))
657 })?;
658 projections.push(ProjectExpr::PropertyAccess {
659 column: col_idx,
660 property: property.clone(),
661 });
662 output_types.push(LogicalType::Any);
664 }
665 LogicalExpression::Literal(value) => {
666 projections.push(ProjectExpr::Constant(value.clone()));
667 output_types.push(value_to_logical_type(value));
668 }
669 LogicalExpression::FunctionCall { name, args, .. } => {
670 match name.to_lowercase().as_str() {
672 "type" => {
673 if args.len() != 1 {
675 return Err(Error::Internal(
676 "type() requires exactly one argument".to_string(),
677 ));
678 }
679 if let LogicalExpression::Variable(var_name) = &args[0] {
680 let col_idx =
681 *variable_columns.get(var_name).ok_or_else(|| {
682 Error::Internal(format!(
683 "Variable '{}' not found in input",
684 var_name
685 ))
686 })?;
687 projections.push(ProjectExpr::EdgeType { column: col_idx });
688 output_types.push(LogicalType::String);
689 } else {
690 return Err(Error::Internal(
691 "type() argument must be a variable".to_string(),
692 ));
693 }
694 }
695 "length" => {
696 if args.len() != 1 {
699 return Err(Error::Internal(
700 "length() requires exactly one argument".to_string(),
701 ));
702 }
703 if let LogicalExpression::Variable(var_name) = &args[0] {
704 let col_idx =
705 *variable_columns.get(var_name).ok_or_else(|| {
706 Error::Internal(format!(
707 "Variable '{}' not found in input",
708 var_name
709 ))
710 })?;
711 projections.push(ProjectExpr::Column(col_idx));
713 output_types.push(LogicalType::Int64);
714 } else {
715 return Err(Error::Internal(
716 "length() argument must be a variable".to_string(),
717 ));
718 }
719 }
720 _ => {
722 let filter_expr = self.convert_expression(&item.expression)?;
723 projections.push(ProjectExpr::Expression {
724 expr: filter_expr,
725 variable_columns: variable_columns.clone(),
726 });
727 output_types.push(LogicalType::Any);
728 }
729 }
730 }
731 LogicalExpression::Case { .. } => {
732 let filter_expr = self.convert_expression(&item.expression)?;
734 projections.push(ProjectExpr::Expression {
735 expr: filter_expr,
736 variable_columns: variable_columns.clone(),
737 });
738 output_types.push(LogicalType::Any);
740 }
741 _ => {
742 return Err(Error::Internal(format!(
743 "Unsupported RETURN expression: {:?}",
744 item.expression
745 )));
746 }
747 }
748 }
749
750 let operator = Box::new(ProjectOperator::with_store(
751 input_op,
752 projections,
753 output_types,
754 Arc::clone(&self.store),
755 ));
756
757 Ok((operator, columns))
758 } else {
759 let mut projections = Vec::with_capacity(ret.items.len());
762 let mut output_types = Vec::with_capacity(ret.items.len());
763
764 for item in &ret.items {
765 if let LogicalExpression::Variable(name) = &item.expression {
766 let col_idx = *variable_columns.get(name).ok_or_else(|| {
767 Error::Internal(format!("Variable '{}' not found in input", name))
768 })?;
769 projections.push(ProjectExpr::Column(col_idx));
770 output_types.push(LogicalType::Node);
771 }
772 }
773
774 if projections.len() == input_columns.len()
776 && projections
777 .iter()
778 .enumerate()
779 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
780 {
781 Ok((input_op, columns))
783 } else {
784 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
785 Ok((operator, columns))
786 }
787 }
788 }
789
790 fn plan_project(
792 &self,
793 project: &crate::query::plan::ProjectOp,
794 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
795 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
797 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
798 let single_row_op: Box<dyn Operator> = Box::new(
800 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
801 );
802 (single_row_op, Vec::new())
803 } else {
804 self.plan_operator(&project.input)?
805 };
806
807 let variable_columns: HashMap<String, usize> = input_columns
809 .iter()
810 .enumerate()
811 .map(|(i, name)| (name.clone(), i))
812 .collect();
813
814 let mut projections = Vec::with_capacity(project.projections.len());
816 let mut output_types = Vec::with_capacity(project.projections.len());
817 let mut output_columns = Vec::with_capacity(project.projections.len());
818
819 for projection in &project.projections {
820 let col_name = projection
822 .alias
823 .clone()
824 .unwrap_or_else(|| expression_to_string(&projection.expression));
825 output_columns.push(col_name);
826
827 match &projection.expression {
828 LogicalExpression::Variable(name) => {
829 let col_idx = *variable_columns.get(name).ok_or_else(|| {
830 Error::Internal(format!("Variable '{}' not found in input", name))
831 })?;
832 projections.push(ProjectExpr::Column(col_idx));
833 output_types.push(LogicalType::Node);
834 }
835 LogicalExpression::Property { variable, property } => {
836 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
837 Error::Internal(format!("Variable '{}' not found in input", variable))
838 })?;
839 projections.push(ProjectExpr::PropertyAccess {
840 column: col_idx,
841 property: property.clone(),
842 });
843 output_types.push(LogicalType::Any);
844 }
845 LogicalExpression::Literal(value) => {
846 projections.push(ProjectExpr::Constant(value.clone()));
847 output_types.push(value_to_logical_type(value));
848 }
849 _ => {
850 let filter_expr = self.convert_expression(&projection.expression)?;
852 projections.push(ProjectExpr::Expression {
853 expr: filter_expr,
854 variable_columns: variable_columns.clone(),
855 });
856 output_types.push(LogicalType::Any);
857 }
858 }
859 }
860
861 let operator = Box::new(ProjectOperator::with_store(
862 input_op,
863 projections,
864 output_types,
865 Arc::clone(&self.store),
866 ));
867
868 Ok((operator, output_columns))
869 }
870
871 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
873 let (input_op, columns) = self.plan_operator(&filter.input)?;
875
876 let variable_columns: HashMap<String, usize> = columns
878 .iter()
879 .enumerate()
880 .map(|(i, name)| (name.clone(), i))
881 .collect();
882
883 let filter_expr = self.convert_expression(&filter.predicate)?;
885
886 let predicate =
888 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
889
890 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
892
893 Ok((operator, columns))
894 }
895
896 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
898 let (input_op, columns) = self.plan_operator(&limit.input)?;
899 let output_schema = self.derive_schema_from_columns(&columns);
900 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
901 Ok((operator, columns))
902 }
903
904 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
906 let (input_op, columns) = self.plan_operator(&skip.input)?;
907 let output_schema = self.derive_schema_from_columns(&columns);
908 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
909 Ok((operator, columns))
910 }
911
912 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
914 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
915
916 let mut variable_columns: HashMap<String, usize> = input_columns
918 .iter()
919 .enumerate()
920 .map(|(i, name)| (name.clone(), i))
921 .collect();
922
923 let mut property_projections: Vec<(String, String, String)> = Vec::new();
925 let mut next_col_idx = input_columns.len();
926
927 for key in &sort.keys {
928 if let LogicalExpression::Property { variable, property } = &key.expression {
929 let col_name = format!("{}_{}", variable, property);
930 if !variable_columns.contains_key(&col_name) {
931 property_projections.push((
932 variable.clone(),
933 property.clone(),
934 col_name.clone(),
935 ));
936 variable_columns.insert(col_name, next_col_idx);
937 next_col_idx += 1;
938 }
939 }
940 }
941
942 let mut output_columns = input_columns.clone();
944
945 if !property_projections.is_empty() {
947 let mut projections = Vec::new();
948 let mut output_types = Vec::new();
949
950 for (i, _) in input_columns.iter().enumerate() {
953 projections.push(ProjectExpr::Column(i));
954 output_types.push(LogicalType::Node);
955 }
956
957 for (variable, property, col_name) in &property_projections {
959 let source_col = *variable_columns.get(variable).ok_or_else(|| {
960 Error::Internal(format!(
961 "Variable '{}' not found for ORDER BY property projection",
962 variable
963 ))
964 })?;
965 projections.push(ProjectExpr::PropertyAccess {
966 column: source_col,
967 property: property.clone(),
968 });
969 output_types.push(LogicalType::Any);
970 output_columns.push(col_name.clone());
971 }
972
973 input_op = Box::new(ProjectOperator::with_store(
974 input_op,
975 projections,
976 output_types,
977 Arc::clone(&self.store),
978 ));
979 }
980
981 let physical_keys: Vec<PhysicalSortKey> = sort
983 .keys
984 .iter()
985 .map(|key| {
986 let col_idx = self
987 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
988 Ok(PhysicalSortKey {
989 column: col_idx,
990 direction: match key.order {
991 SortOrder::Ascending => SortDirection::Ascending,
992 SortOrder::Descending => SortDirection::Descending,
993 },
994 null_order: NullOrder::NullsLast,
995 })
996 })
997 .collect::<Result<Vec<_>>>()?;
998
999 let output_schema = self.derive_schema_from_columns(&output_columns);
1000 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1001 Ok((operator, output_columns))
1002 }
1003
1004 fn resolve_sort_expression_with_properties(
1006 &self,
1007 expr: &LogicalExpression,
1008 variable_columns: &HashMap<String, usize>,
1009 ) -> Result<usize> {
1010 match expr {
1011 LogicalExpression::Variable(name) => {
1012 variable_columns.get(name).copied().ok_or_else(|| {
1013 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1014 })
1015 }
1016 LogicalExpression::Property { variable, property } => {
1017 let col_name = format!("{}_{}", variable, property);
1019 variable_columns.get(&col_name).copied().ok_or_else(|| {
1020 Error::Internal(format!(
1021 "Property column '{}' not found for ORDER BY (from {}.{})",
1022 col_name, variable, property
1023 ))
1024 })
1025 }
1026 _ => Err(Error::Internal(format!(
1027 "Unsupported ORDER BY expression: {:?}",
1028 expr
1029 ))),
1030 }
1031 }
1032
1033 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1035 columns.iter().map(|_| LogicalType::Any).collect()
1036 }
1037
1038 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1040 if self.factorized_execution
1047 && agg.group_by.is_empty()
1048 && Self::count_expand_chain(&agg.input).0 >= 2
1049 && self.is_simple_aggregate(agg)
1050 {
1051 if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1052 return Ok((op, cols));
1053 }
1054 }
1056
1057 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1058
1059 let mut variable_columns: HashMap<String, usize> = input_columns
1061 .iter()
1062 .enumerate()
1063 .map(|(i, name)| (name.clone(), i))
1064 .collect();
1065
1066 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1069
1070 for expr in &agg.group_by {
1072 if let LogicalExpression::Property { variable, property } = expr {
1073 let col_name = format!("{}_{}", variable, property);
1074 if !variable_columns.contains_key(&col_name) {
1075 property_projections.push((
1076 variable.clone(),
1077 property.clone(),
1078 col_name.clone(),
1079 ));
1080 variable_columns.insert(col_name, next_col_idx);
1081 next_col_idx += 1;
1082 }
1083 }
1084 }
1085
1086 for agg_expr in &agg.aggregates {
1088 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1089 let col_name = format!("{}_{}", variable, property);
1090 if !variable_columns.contains_key(&col_name) {
1091 property_projections.push((
1092 variable.clone(),
1093 property.clone(),
1094 col_name.clone(),
1095 ));
1096 variable_columns.insert(col_name, next_col_idx);
1097 next_col_idx += 1;
1098 }
1099 }
1100 }
1101
1102 if !property_projections.is_empty() {
1104 let mut projections = Vec::new();
1105 let mut output_types = Vec::new();
1106
1107 for (i, _) in input_columns.iter().enumerate() {
1110 projections.push(ProjectExpr::Column(i));
1111 output_types.push(LogicalType::Node);
1112 }
1113
1114 for (variable, property, _col_name) in &property_projections {
1116 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1117 Error::Internal(format!(
1118 "Variable '{}' not found for property projection",
1119 variable
1120 ))
1121 })?;
1122 projections.push(ProjectExpr::PropertyAccess {
1123 column: source_col,
1124 property: property.clone(),
1125 });
1126 output_types.push(LogicalType::Any); }
1128
1129 input_op = Box::new(ProjectOperator::with_store(
1130 input_op,
1131 projections,
1132 output_types,
1133 Arc::clone(&self.store),
1134 ));
1135 }
1136
1137 let group_columns: Vec<usize> = agg
1139 .group_by
1140 .iter()
1141 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1142 .collect::<Result<Vec<_>>>()?;
1143
1144 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1146 .aggregates
1147 .iter()
1148 .map(|agg_expr| {
1149 let column = agg_expr
1150 .expression
1151 .as_ref()
1152 .map(|e| {
1153 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1154 })
1155 .transpose()?;
1156
1157 Ok(PhysicalAggregateExpr {
1158 function: convert_aggregate_function(agg_expr.function),
1159 column,
1160 distinct: agg_expr.distinct,
1161 alias: agg_expr.alias.clone(),
1162 percentile: agg_expr.percentile,
1163 })
1164 })
1165 .collect::<Result<Vec<_>>>()?;
1166
1167 let mut output_schema = Vec::new();
1169 let mut output_columns = Vec::new();
1170
1171 for expr in &agg.group_by {
1173 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1175 }
1176
1177 for agg_expr in &agg.aggregates {
1179 let result_type = match agg_expr.function {
1180 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1181 LogicalType::Int64
1182 }
1183 LogicalAggregateFunction::Sum => LogicalType::Int64,
1184 LogicalAggregateFunction::Avg => LogicalType::Float64,
1185 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1186 LogicalType::Int64
1190 }
1191 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1194 | LogicalAggregateFunction::StdDevPop
1195 | LogicalAggregateFunction::PercentileDisc
1196 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1197 };
1198 output_schema.push(result_type);
1199 output_columns.push(
1200 agg_expr
1201 .alias
1202 .clone()
1203 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1204 );
1205 }
1206
1207 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1209 Box::new(SimpleAggregateOperator::new(
1210 input_op,
1211 physical_aggregates,
1212 output_schema,
1213 ))
1214 } else {
1215 Box::new(HashAggregateOperator::new(
1216 input_op,
1217 group_columns,
1218 physical_aggregates,
1219 output_schema,
1220 ))
1221 };
1222
1223 if let Some(having_expr) = &agg.having {
1225 let having_var_columns: HashMap<String, usize> = output_columns
1227 .iter()
1228 .enumerate()
1229 .map(|(i, name)| (name.clone(), i))
1230 .collect();
1231
1232 let filter_expr = self.convert_expression(having_expr)?;
1233 let predicate =
1234 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1235 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1236 }
1237
1238 Ok((operator, output_columns))
1239 }
1240
1241 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1247 agg.aggregates.iter().all(|agg_expr| {
1248 match agg_expr.function {
1249 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1250 agg_expr.expression.is_none()
1252 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1253 }
1254 LogicalAggregateFunction::Sum
1255 | LogicalAggregateFunction::Avg
1256 | LogicalAggregateFunction::Min
1257 | LogicalAggregateFunction::Max => {
1258 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1261 }
1262 _ => false,
1264 }
1265 })
1266 }
1267
1268 fn plan_factorized_aggregate(
1272 &self,
1273 agg: &AggregateOp,
1274 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1275 let expands = Self::collect_expand_chain(&agg.input);
1277 if expands.is_empty() {
1278 return Err(Error::Internal(
1279 "Expected expand chain for factorized aggregate".to_string(),
1280 ));
1281 }
1282
1283 let first_expand = expands[0];
1285 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1286
1287 let mut columns = base_columns.clone();
1288 let mut steps = Vec::new();
1289 let mut is_first = true;
1290
1291 for expand in &expands {
1292 let source_column = if is_first {
1294 base_columns
1295 .iter()
1296 .position(|c| c == &expand.from_variable)
1297 .ok_or_else(|| {
1298 Error::Internal(format!(
1299 "Source variable '{}' not found in base columns",
1300 expand.from_variable
1301 ))
1302 })?
1303 } else {
1304 1 };
1306
1307 let direction = match expand.direction {
1308 ExpandDirection::Outgoing => Direction::Outgoing,
1309 ExpandDirection::Incoming => Direction::Incoming,
1310 ExpandDirection::Both => Direction::Both,
1311 };
1312
1313 steps.push(ExpandStep {
1314 source_column,
1315 direction,
1316 edge_type: expand.edge_type.clone(),
1317 });
1318
1319 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1320 let count = self.anon_edge_counter.get();
1321 self.anon_edge_counter.set(count + 1);
1322 format!("_anon_edge_{}", count)
1323 });
1324 columns.push(edge_col_name);
1325 columns.push(expand.to_variable.clone());
1326
1327 is_first = false;
1328 }
1329
1330 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1332
1333 if let Some(tx_id) = self.tx_id {
1334 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1335 } else {
1336 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1337 }
1338
1339 let factorized_aggs: Vec<FactorizedAggregate> = agg
1341 .aggregates
1342 .iter()
1343 .map(|agg_expr| {
1344 match agg_expr.function {
1345 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1346 if agg_expr.expression.is_none() {
1348 FactorizedAggregate::count()
1349 } else {
1350 FactorizedAggregate::count_column(1) }
1354 }
1355 LogicalAggregateFunction::Sum => {
1356 FactorizedAggregate::sum(1)
1358 }
1359 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1360 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1361 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1362 _ => {
1363 FactorizedAggregate::count()
1365 }
1366 }
1367 })
1368 .collect();
1369
1370 let output_columns: Vec<String> = agg
1372 .aggregates
1373 .iter()
1374 .map(|agg_expr| {
1375 agg_expr
1376 .alias
1377 .clone()
1378 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1379 })
1380 .collect();
1381
1382 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1384
1385 Ok((Box::new(factorized_agg_op), output_columns))
1386 }
1387
1388 #[allow(dead_code)]
1390 fn resolve_expression_to_column(
1391 &self,
1392 expr: &LogicalExpression,
1393 variable_columns: &HashMap<String, usize>,
1394 ) -> Result<usize> {
1395 match expr {
1396 LogicalExpression::Variable(name) => variable_columns
1397 .get(name)
1398 .copied()
1399 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1400 LogicalExpression::Property { variable, .. } => variable_columns
1401 .get(variable)
1402 .copied()
1403 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1404 _ => Err(Error::Internal(format!(
1405 "Cannot resolve expression to column: {:?}",
1406 expr
1407 ))),
1408 }
1409 }
1410
1411 fn resolve_expression_to_column_with_properties(
1415 &self,
1416 expr: &LogicalExpression,
1417 variable_columns: &HashMap<String, usize>,
1418 ) -> Result<usize> {
1419 match expr {
1420 LogicalExpression::Variable(name) => variable_columns
1421 .get(name)
1422 .copied()
1423 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1424 LogicalExpression::Property { variable, property } => {
1425 let col_name = format!("{}_{}", variable, property);
1427 variable_columns.get(&col_name).copied().ok_or_else(|| {
1428 Error::Internal(format!(
1429 "Property column '{}' not found (from {}.{})",
1430 col_name, variable, property
1431 ))
1432 })
1433 }
1434 _ => Err(Error::Internal(format!(
1435 "Cannot resolve expression to column: {:?}",
1436 expr
1437 ))),
1438 }
1439 }
1440
1441 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1443 match expr {
1444 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1445 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1446 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1447 variable: variable.clone(),
1448 property: property.clone(),
1449 }),
1450 LogicalExpression::Binary { left, op, right } => {
1451 let left_expr = self.convert_expression(left)?;
1452 let right_expr = self.convert_expression(right)?;
1453 let filter_op = convert_binary_op(*op)?;
1454 Ok(FilterExpression::Binary {
1455 left: Box::new(left_expr),
1456 op: filter_op,
1457 right: Box::new(right_expr),
1458 })
1459 }
1460 LogicalExpression::Unary { op, operand } => {
1461 let operand_expr = self.convert_expression(operand)?;
1462 let filter_op = convert_unary_op(*op)?;
1463 Ok(FilterExpression::Unary {
1464 op: filter_op,
1465 operand: Box::new(operand_expr),
1466 })
1467 }
1468 LogicalExpression::FunctionCall { name, args, .. } => {
1469 let filter_args: Vec<FilterExpression> = args
1470 .iter()
1471 .map(|a| self.convert_expression(a))
1472 .collect::<Result<Vec<_>>>()?;
1473 Ok(FilterExpression::FunctionCall {
1474 name: name.clone(),
1475 args: filter_args,
1476 })
1477 }
1478 LogicalExpression::Case {
1479 operand,
1480 when_clauses,
1481 else_clause,
1482 } => {
1483 let filter_operand = operand
1484 .as_ref()
1485 .map(|e| self.convert_expression(e))
1486 .transpose()?
1487 .map(Box::new);
1488 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1489 .iter()
1490 .map(|(cond, result)| {
1491 Ok((
1492 self.convert_expression(cond)?,
1493 self.convert_expression(result)?,
1494 ))
1495 })
1496 .collect::<Result<Vec<_>>>()?;
1497 let filter_else = else_clause
1498 .as_ref()
1499 .map(|e| self.convert_expression(e))
1500 .transpose()?
1501 .map(Box::new);
1502 Ok(FilterExpression::Case {
1503 operand: filter_operand,
1504 when_clauses: filter_when_clauses,
1505 else_clause: filter_else,
1506 })
1507 }
1508 LogicalExpression::List(items) => {
1509 let filter_items: Vec<FilterExpression> = items
1510 .iter()
1511 .map(|item| self.convert_expression(item))
1512 .collect::<Result<Vec<_>>>()?;
1513 Ok(FilterExpression::List(filter_items))
1514 }
1515 LogicalExpression::Map(pairs) => {
1516 let filter_pairs: Vec<(String, FilterExpression)> = pairs
1517 .iter()
1518 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1519 .collect::<Result<Vec<_>>>()?;
1520 Ok(FilterExpression::Map(filter_pairs))
1521 }
1522 LogicalExpression::IndexAccess { base, index } => {
1523 let base_expr = self.convert_expression(base)?;
1524 let index_expr = self.convert_expression(index)?;
1525 Ok(FilterExpression::IndexAccess {
1526 base: Box::new(base_expr),
1527 index: Box::new(index_expr),
1528 })
1529 }
1530 LogicalExpression::SliceAccess { base, start, end } => {
1531 let base_expr = self.convert_expression(base)?;
1532 let start_expr = start
1533 .as_ref()
1534 .map(|s| self.convert_expression(s))
1535 .transpose()?
1536 .map(Box::new);
1537 let end_expr = end
1538 .as_ref()
1539 .map(|e| self.convert_expression(e))
1540 .transpose()?
1541 .map(Box::new);
1542 Ok(FilterExpression::SliceAccess {
1543 base: Box::new(base_expr),
1544 start: start_expr,
1545 end: end_expr,
1546 })
1547 }
1548 LogicalExpression::Parameter(_) => Err(Error::Internal(
1549 "Parameters not yet supported in filters".to_string(),
1550 )),
1551 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1552 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1553 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1554 LogicalExpression::ListComprehension {
1555 variable,
1556 list_expr,
1557 filter_expr,
1558 map_expr,
1559 } => {
1560 let list = self.convert_expression(list_expr)?;
1561 let filter = filter_expr
1562 .as_ref()
1563 .map(|f| self.convert_expression(f))
1564 .transpose()?
1565 .map(Box::new);
1566 let map = self.convert_expression(map_expr)?;
1567 Ok(FilterExpression::ListComprehension {
1568 variable: variable.clone(),
1569 list_expr: Box::new(list),
1570 filter_expr: filter,
1571 map_expr: Box::new(map),
1572 })
1573 }
1574 LogicalExpression::ExistsSubquery(subplan) => {
1575 let (start_var, direction, edge_type, end_labels) =
1578 self.extract_exists_pattern(subplan)?;
1579
1580 Ok(FilterExpression::ExistsSubquery {
1581 start_var,
1582 direction,
1583 edge_type,
1584 end_labels,
1585 min_hops: None,
1586 max_hops: None,
1587 })
1588 }
1589 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
1590 "COUNT subqueries not yet supported".to_string(),
1591 )),
1592 }
1593 }
1594
1595 fn extract_exists_pattern(
1598 &self,
1599 subplan: &LogicalOperator,
1600 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
1601 match subplan {
1602 LogicalOperator::Expand(expand) => {
1603 let end_labels = self.extract_end_labels_from_expand(expand);
1605 let direction = match expand.direction {
1606 ExpandDirection::Outgoing => Direction::Outgoing,
1607 ExpandDirection::Incoming => Direction::Incoming,
1608 ExpandDirection::Both => Direction::Both,
1609 };
1610 Ok((
1611 expand.from_variable.clone(),
1612 direction,
1613 expand.edge_type.clone(),
1614 end_labels,
1615 ))
1616 }
1617 LogicalOperator::NodeScan(scan) => {
1618 if let Some(input) = &scan.input {
1619 self.extract_exists_pattern(input)
1620 } else {
1621 Err(Error::Internal(
1622 "EXISTS subquery must contain an edge pattern".to_string(),
1623 ))
1624 }
1625 }
1626 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
1627 _ => Err(Error::Internal(
1628 "Unsupported EXISTS subquery pattern".to_string(),
1629 )),
1630 }
1631 }
1632
1633 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
1635 match expand.input.as_ref() {
1637 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
1638 _ => None,
1639 }
1640 }
1641
1642 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1644 let (left_op, left_columns) = self.plan_operator(&join.left)?;
1645 let (right_op, right_columns) = self.plan_operator(&join.right)?;
1646
1647 let mut columns = left_columns.clone();
1649 columns.extend(right_columns.clone());
1650
1651 let physical_join_type = match join.join_type {
1653 JoinType::Inner => PhysicalJoinType::Inner,
1654 JoinType::Left => PhysicalJoinType::Left,
1655 JoinType::Right => PhysicalJoinType::Right,
1656 JoinType::Full => PhysicalJoinType::Full,
1657 JoinType::Cross => PhysicalJoinType::Cross,
1658 JoinType::Semi => PhysicalJoinType::Semi,
1659 JoinType::Anti => PhysicalJoinType::Anti,
1660 };
1661
1662 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
1664 (vec![], vec![])
1666 } else {
1667 join.conditions
1668 .iter()
1669 .filter_map(|cond| {
1670 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
1672 let right_idx = self
1673 .expression_to_column(&cond.right, &right_columns)
1674 .ok()?;
1675 Some((left_idx, right_idx))
1676 })
1677 .unzip()
1678 };
1679
1680 let output_schema = self.derive_schema_from_columns(&columns);
1681
1682 let _ = LeapfrogJoinOperator::new; let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1690 left_op,
1691 right_op,
1692 probe_keys,
1693 build_keys,
1694 physical_join_type,
1695 output_schema,
1696 ));
1697
1698 Ok((operator, columns))
1699 }
1700
1701 #[allow(dead_code)]
1710 fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
1711 let mut edges: HashMap<String, Vec<String>> = HashMap::new();
1713 let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
1714
1715 Self::collect_join_edges(
1717 &LogicalOperator::Join(join.clone()),
1718 &mut edges,
1719 &mut all_vars,
1720 );
1721
1722 if all_vars.len() < 3 {
1724 return false;
1725 }
1726
1727 Self::has_cycle(&edges, &all_vars)
1729 }
1730
1731 fn collect_join_edges(
1733 op: &LogicalOperator,
1734 edges: &mut HashMap<String, Vec<String>>,
1735 vars: &mut std::collections::HashSet<String>,
1736 ) {
1737 match op {
1738 LogicalOperator::Join(join) => {
1739 for cond in &join.conditions {
1741 if let (Some(left_var), Some(right_var)) = (
1742 Self::extract_join_variable(&cond.left),
1743 Self::extract_join_variable(&cond.right),
1744 ) {
1745 if left_var != right_var {
1746 vars.insert(left_var.clone());
1747 vars.insert(right_var.clone());
1748
1749 edges
1751 .entry(left_var.clone())
1752 .or_default()
1753 .push(right_var.clone());
1754 edges.entry(right_var).or_default().push(left_var);
1755 }
1756 }
1757 }
1758
1759 Self::collect_join_edges(&join.left, edges, vars);
1761 Self::collect_join_edges(&join.right, edges, vars);
1762 }
1763 LogicalOperator::Expand(expand) => {
1764 vars.insert(expand.from_variable.clone());
1766 vars.insert(expand.to_variable.clone());
1767
1768 edges
1769 .entry(expand.from_variable.clone())
1770 .or_default()
1771 .push(expand.to_variable.clone());
1772 edges
1773 .entry(expand.to_variable.clone())
1774 .or_default()
1775 .push(expand.from_variable.clone());
1776
1777 Self::collect_join_edges(&expand.input, edges, vars);
1778 }
1779 LogicalOperator::Filter(filter) => {
1780 Self::collect_join_edges(&filter.input, edges, vars);
1781 }
1782 LogicalOperator::NodeScan(scan) => {
1783 vars.insert(scan.variable.clone());
1784 }
1785 _ => {}
1786 }
1787 }
1788
1789 fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
1791 match expr {
1792 LogicalExpression::Variable(v) => Some(v.clone()),
1793 LogicalExpression::Property { variable, .. } => Some(variable.clone()),
1794 LogicalExpression::Id(v) => Some(v.clone()),
1795 _ => None,
1796 }
1797 }
1798
1799 fn has_cycle(
1803 edges: &HashMap<String, Vec<String>>,
1804 vars: &std::collections::HashSet<String>,
1805 ) -> bool {
1806 let mut color: HashMap<&String, u8> = HashMap::new();
1807
1808 for var in vars {
1809 color.insert(var, 0);
1810 }
1811
1812 for start in vars {
1813 if color[start] == 0 {
1814 if Self::dfs_cycle(start, None, edges, &mut color) {
1815 return true;
1816 }
1817 }
1818 }
1819
1820 false
1821 }
1822
1823 fn dfs_cycle(
1825 node: &String,
1826 parent: Option<&String>,
1827 edges: &HashMap<String, Vec<String>>,
1828 color: &mut HashMap<&String, u8>,
1829 ) -> bool {
1830 *color.get_mut(node).unwrap() = 1; if let Some(neighbors) = edges.get(node) {
1833 for neighbor in neighbors {
1834 if parent == Some(neighbor) {
1836 continue;
1837 }
1838
1839 if let Some(&c) = color.get(neighbor) {
1840 if c == 1 {
1841 return true;
1843 }
1844 if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
1845 return true;
1846 }
1847 }
1848 }
1849 }
1850
1851 *color.get_mut(node).unwrap() = 2; false
1853 }
1854
1855 #[allow(dead_code)]
1857 fn count_relations(op: &LogicalOperator) -> usize {
1858 match op {
1859 LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
1860 LogicalOperator::Expand(e) => Self::count_relations(&e.input),
1861 LogicalOperator::Filter(f) => Self::count_relations(&f.input),
1862 LogicalOperator::Join(j) => {
1863 Self::count_relations(&j.left) + Self::count_relations(&j.right)
1864 }
1865 _ => 0,
1866 }
1867 }
1868
1869 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
1871 match expr {
1872 LogicalExpression::Variable(name) => columns
1873 .iter()
1874 .position(|c| c == name)
1875 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1876 _ => Err(Error::Internal(
1877 "Only variables supported in join conditions".to_string(),
1878 )),
1879 }
1880 }
1881
1882 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1884 if union.inputs.is_empty() {
1885 return Err(Error::Internal(
1886 "Union requires at least one input".to_string(),
1887 ));
1888 }
1889
1890 let mut inputs = Vec::with_capacity(union.inputs.len());
1891 let mut columns = Vec::new();
1892
1893 for (i, input) in union.inputs.iter().enumerate() {
1894 let (op, cols) = self.plan_operator(input)?;
1895 if i == 0 {
1896 columns = cols;
1897 }
1898 inputs.push(op);
1899 }
1900
1901 let output_schema = self.derive_schema_from_columns(&columns);
1902 let operator = Box::new(UnionOperator::new(inputs, output_schema));
1903
1904 Ok((operator, columns))
1905 }
1906
1907 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1909 let (input_op, columns) = self.plan_operator(&distinct.input)?;
1910 let output_schema = self.derive_schema_from_columns(&columns);
1911 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
1912 Ok((operator, columns))
1913 }
1914
1915 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1917 let (input_op, mut columns) = if let Some(ref input) = create.input {
1919 let (op, cols) = self.plan_operator(input)?;
1920 (Some(op), cols)
1921 } else {
1922 (None, vec![])
1923 };
1924
1925 let output_column = columns.len();
1927 columns.push(create.variable.clone());
1928
1929 let properties: Vec<(String, PropertySource)> = create
1931 .properties
1932 .iter()
1933 .map(|(name, expr)| {
1934 let source = match expr {
1935 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1936 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1937 };
1938 (name.clone(), source)
1939 })
1940 .collect();
1941
1942 let output_schema = self.derive_schema_from_columns(&columns);
1943
1944 let operator = Box::new(
1945 CreateNodeOperator::new(
1946 Arc::clone(&self.store),
1947 input_op,
1948 create.labels.clone(),
1949 properties,
1950 output_schema,
1951 output_column,
1952 )
1953 .with_tx_context(self.viewing_epoch, self.tx_id),
1954 );
1955
1956 Ok((operator, columns))
1957 }
1958
1959 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1961 let (input_op, mut columns) = self.plan_operator(&create.input)?;
1962
1963 let from_column = columns
1965 .iter()
1966 .position(|c| c == &create.from_variable)
1967 .ok_or_else(|| {
1968 Error::Internal(format!(
1969 "Source variable '{}' not found",
1970 create.from_variable
1971 ))
1972 })?;
1973
1974 let to_column = columns
1975 .iter()
1976 .position(|c| c == &create.to_variable)
1977 .ok_or_else(|| {
1978 Error::Internal(format!(
1979 "Target variable '{}' not found",
1980 create.to_variable
1981 ))
1982 })?;
1983
1984 let output_column = create.variable.as_ref().map(|v| {
1986 let idx = columns.len();
1987 columns.push(v.clone());
1988 idx
1989 });
1990
1991 let properties: Vec<(String, PropertySource)> = create
1993 .properties
1994 .iter()
1995 .map(|(name, expr)| {
1996 let source = match expr {
1997 LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1998 _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1999 };
2000 (name.clone(), source)
2001 })
2002 .collect();
2003
2004 let output_schema = self.derive_schema_from_columns(&columns);
2005
2006 let operator = Box::new(
2007 CreateEdgeOperator::new(
2008 Arc::clone(&self.store),
2009 input_op,
2010 from_column,
2011 to_column,
2012 create.edge_type.clone(),
2013 properties,
2014 output_schema,
2015 output_column,
2016 )
2017 .with_tx_context(self.viewing_epoch, self.tx_id),
2018 );
2019
2020 Ok((operator, columns))
2021 }
2022
2023 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2025 let (input_op, columns) = self.plan_operator(&delete.input)?;
2026
2027 let node_column = columns
2028 .iter()
2029 .position(|c| c == &delete.variable)
2030 .ok_or_else(|| {
2031 Error::Internal(format!(
2032 "Variable '{}' not found for delete",
2033 delete.variable
2034 ))
2035 })?;
2036
2037 let output_schema = vec![LogicalType::Int64];
2039 let output_columns = vec!["deleted_count".to_string()];
2040
2041 let operator = Box::new(
2042 DeleteNodeOperator::new(
2043 Arc::clone(&self.store),
2044 input_op,
2045 node_column,
2046 output_schema,
2047 delete.detach, )
2049 .with_tx_context(self.viewing_epoch, self.tx_id),
2050 );
2051
2052 Ok((operator, output_columns))
2053 }
2054
2055 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2057 let (input_op, columns) = self.plan_operator(&delete.input)?;
2058
2059 let edge_column = columns
2060 .iter()
2061 .position(|c| c == &delete.variable)
2062 .ok_or_else(|| {
2063 Error::Internal(format!(
2064 "Variable '{}' not found for delete",
2065 delete.variable
2066 ))
2067 })?;
2068
2069 let output_schema = vec![LogicalType::Int64];
2071 let output_columns = vec!["deleted_count".to_string()];
2072
2073 let operator = Box::new(
2074 DeleteEdgeOperator::new(
2075 Arc::clone(&self.store),
2076 input_op,
2077 edge_column,
2078 output_schema,
2079 )
2080 .with_tx_context(self.viewing_epoch, self.tx_id),
2081 );
2082
2083 Ok((operator, output_columns))
2084 }
2085
2086 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2088 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2089 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2090
2091 let mut columns = left_columns.clone();
2093 columns.extend(right_columns.clone());
2094
2095 let mut probe_keys = Vec::new();
2097 let mut build_keys = Vec::new();
2098
2099 for (right_idx, right_col) in right_columns.iter().enumerate() {
2100 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2101 probe_keys.push(left_idx);
2102 build_keys.push(right_idx);
2103 }
2104 }
2105
2106 let output_schema = self.derive_schema_from_columns(&columns);
2107
2108 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2109 left_op,
2110 right_op,
2111 probe_keys,
2112 build_keys,
2113 PhysicalJoinType::Left,
2114 output_schema,
2115 ));
2116
2117 Ok((operator, columns))
2118 }
2119
2120 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2122 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2123 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2124
2125 let columns = left_columns.clone();
2127
2128 let mut probe_keys = Vec::new();
2130 let mut build_keys = Vec::new();
2131
2132 for (right_idx, right_col) in right_columns.iter().enumerate() {
2133 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2134 probe_keys.push(left_idx);
2135 build_keys.push(right_idx);
2136 }
2137 }
2138
2139 let output_schema = self.derive_schema_from_columns(&columns);
2140
2141 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2142 left_op,
2143 right_op,
2144 probe_keys,
2145 build_keys,
2146 PhysicalJoinType::Anti,
2147 output_schema,
2148 ));
2149
2150 Ok((operator, columns))
2151 }
2152
2153 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2155 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2158 if matches!(&*unwind.input, LogicalOperator::Empty) {
2159 let literal_list = self.convert_expression(&unwind.expression)?;
2164
2165 let single_row_op: Box<dyn Operator> = Box::new(
2167 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2168 );
2169 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2170 single_row_op,
2171 vec![ProjectExpr::Expression {
2172 expr: literal_list,
2173 variable_columns: HashMap::new(),
2174 }],
2175 vec![LogicalType::Any],
2176 Arc::clone(&self.store),
2177 ));
2178
2179 (project_op, vec!["__list__".to_string()])
2180 } else {
2181 self.plan_operator(&unwind.input)?
2182 };
2183
2184 let list_col_idx = match &unwind.expression {
2190 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2191 LogicalExpression::Property { variable, .. } => {
2192 input_columns.iter().position(|c| c == variable)
2195 }
2196 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2197 None
2199 }
2200 _ => None,
2201 };
2202
2203 let mut columns = input_columns.clone();
2205 columns.push(unwind.variable.clone());
2206
2207 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2209 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2214
2215 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2216 input_op,
2217 col_idx,
2218 unwind.variable.clone(),
2219 output_schema,
2220 ));
2221
2222 Ok((operator, columns))
2223 }
2224
2225 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2227 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2229 Vec::new()
2230 } else {
2231 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2232 cols
2233 };
2234
2235 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2237 .match_properties
2238 .iter()
2239 .filter_map(|(name, expr)| {
2240 if let LogicalExpression::Literal(v) = expr {
2241 Some((name.clone(), v.clone()))
2242 } else {
2243 None }
2245 })
2246 .collect();
2247
2248 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2250 .on_create
2251 .iter()
2252 .filter_map(|(name, expr)| {
2253 if let LogicalExpression::Literal(v) = expr {
2254 Some((name.clone(), v.clone()))
2255 } else {
2256 None
2257 }
2258 })
2259 .collect();
2260
2261 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2263 .on_match
2264 .iter()
2265 .filter_map(|(name, expr)| {
2266 if let LogicalExpression::Literal(v) = expr {
2267 Some((name.clone(), v.clone()))
2268 } else {
2269 None
2270 }
2271 })
2272 .collect();
2273
2274 columns.push(merge.variable.clone());
2276
2277 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2278 Arc::clone(&self.store),
2279 merge.variable.clone(),
2280 merge.labels.clone(),
2281 match_properties,
2282 on_create_properties,
2283 on_match_properties,
2284 ));
2285
2286 Ok((operator, columns))
2287 }
2288
2289 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2291 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2293
2294 let source_column = columns
2296 .iter()
2297 .position(|c| c == &sp.source_var)
2298 .ok_or_else(|| {
2299 Error::Internal(format!(
2300 "Source variable '{}' not found for shortestPath",
2301 sp.source_var
2302 ))
2303 })?;
2304
2305 let target_column = columns
2306 .iter()
2307 .position(|c| c == &sp.target_var)
2308 .ok_or_else(|| {
2309 Error::Internal(format!(
2310 "Target variable '{}' not found for shortestPath",
2311 sp.target_var
2312 ))
2313 })?;
2314
2315 let direction = match sp.direction {
2317 ExpandDirection::Outgoing => Direction::Outgoing,
2318 ExpandDirection::Incoming => Direction::Incoming,
2319 ExpandDirection::Both => Direction::Both,
2320 };
2321
2322 let operator: Box<dyn Operator> = Box::new(
2324 ShortestPathOperator::new(
2325 Arc::clone(&self.store),
2326 input_op,
2327 source_column,
2328 target_column,
2329 sp.edge_type.clone(),
2330 direction,
2331 )
2332 .with_all_paths(sp.all_paths),
2333 );
2334
2335 columns.push(format!("_path_length_{}", sp.path_alias));
2338
2339 Ok((operator, columns))
2340 }
2341
2342 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2344 let (input_op, columns) = self.plan_operator(&add_label.input)?;
2345
2346 let node_column = columns
2348 .iter()
2349 .position(|c| c == &add_label.variable)
2350 .ok_or_else(|| {
2351 Error::Internal(format!(
2352 "Variable '{}' not found for ADD LABEL",
2353 add_label.variable
2354 ))
2355 })?;
2356
2357 let output_schema = vec![LogicalType::Int64];
2359 let output_columns = vec!["labels_added".to_string()];
2360
2361 let operator = Box::new(AddLabelOperator::new(
2362 Arc::clone(&self.store),
2363 input_op,
2364 node_column,
2365 add_label.labels.clone(),
2366 output_schema,
2367 ));
2368
2369 Ok((operator, output_columns))
2370 }
2371
2372 fn plan_remove_label(
2374 &self,
2375 remove_label: &RemoveLabelOp,
2376 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2377 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2378
2379 let node_column = columns
2381 .iter()
2382 .position(|c| c == &remove_label.variable)
2383 .ok_or_else(|| {
2384 Error::Internal(format!(
2385 "Variable '{}' not found for REMOVE LABEL",
2386 remove_label.variable
2387 ))
2388 })?;
2389
2390 let output_schema = vec![LogicalType::Int64];
2392 let output_columns = vec!["labels_removed".to_string()];
2393
2394 let operator = Box::new(RemoveLabelOperator::new(
2395 Arc::clone(&self.store),
2396 input_op,
2397 node_column,
2398 remove_label.labels.clone(),
2399 output_schema,
2400 ));
2401
2402 Ok((operator, output_columns))
2403 }
2404
2405 fn plan_set_property(
2407 &self,
2408 set_prop: &SetPropertyOp,
2409 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2410 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2411
2412 let entity_column = columns
2414 .iter()
2415 .position(|c| c == &set_prop.variable)
2416 .ok_or_else(|| {
2417 Error::Internal(format!(
2418 "Variable '{}' not found for SET",
2419 set_prop.variable
2420 ))
2421 })?;
2422
2423 let properties: Vec<(String, PropertySource)> = set_prop
2425 .properties
2426 .iter()
2427 .map(|(name, expr)| {
2428 let source = self.expression_to_property_source(expr, &columns)?;
2429 Ok((name.clone(), source))
2430 })
2431 .collect::<Result<Vec<_>>>()?;
2432
2433 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2435 let output_columns = columns.clone();
2436
2437 let operator = Box::new(SetPropertyOperator::new_for_node(
2439 Arc::clone(&self.store),
2440 input_op,
2441 entity_column,
2442 properties,
2443 output_schema,
2444 ));
2445
2446 Ok((operator, output_columns))
2447 }
2448
2449 fn expression_to_property_source(
2451 &self,
2452 expr: &LogicalExpression,
2453 columns: &[String],
2454 ) -> Result<PropertySource> {
2455 match expr {
2456 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2457 LogicalExpression::Variable(name) => {
2458 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2459 Error::Internal(format!("Variable '{}' not found for property source", name))
2460 })?;
2461 Ok(PropertySource::Column(col_idx))
2462 }
2463 LogicalExpression::Parameter(name) => {
2464 Ok(PropertySource::Constant(
2467 grafeo_common::types::Value::String(format!("${}", name).into()),
2468 ))
2469 }
2470 _ => Err(Error::Internal(format!(
2471 "Unsupported expression type for property source: {:?}",
2472 expr
2473 ))),
2474 }
2475 }
2476}
2477
2478pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2480 match op {
2481 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2482 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2483 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2484 BinaryOp::Le => Ok(BinaryFilterOp::Le),
2485 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2486 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2487 BinaryOp::And => Ok(BinaryFilterOp::And),
2488 BinaryOp::Or => Ok(BinaryFilterOp::Or),
2489 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2490 BinaryOp::Add => Ok(BinaryFilterOp::Add),
2491 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2492 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2493 BinaryOp::Div => Ok(BinaryFilterOp::Div),
2494 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2495 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2496 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2497 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2498 BinaryOp::In => Ok(BinaryFilterOp::In),
2499 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2500 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2501 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2502 "Binary operator {:?} not yet supported in filters",
2503 op
2504 ))),
2505 }
2506}
2507
2508pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2510 match op {
2511 UnaryOp::Not => Ok(UnaryFilterOp::Not),
2512 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2513 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2514 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2515 }
2516}
2517
2518pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2520 match func {
2521 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2522 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2523 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2524 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2525 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2526 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2527 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2528 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2529 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2530 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2531 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2532 }
2533}
2534
2535pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2539 match expr {
2540 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2541 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2542 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2543 variable: variable.clone(),
2544 property: property.clone(),
2545 }),
2546 LogicalExpression::Binary { left, op, right } => {
2547 let left_expr = convert_filter_expression(left)?;
2548 let right_expr = convert_filter_expression(right)?;
2549 let filter_op = convert_binary_op(*op)?;
2550 Ok(FilterExpression::Binary {
2551 left: Box::new(left_expr),
2552 op: filter_op,
2553 right: Box::new(right_expr),
2554 })
2555 }
2556 LogicalExpression::Unary { op, operand } => {
2557 let operand_expr = convert_filter_expression(operand)?;
2558 let filter_op = convert_unary_op(*op)?;
2559 Ok(FilterExpression::Unary {
2560 op: filter_op,
2561 operand: Box::new(operand_expr),
2562 })
2563 }
2564 LogicalExpression::FunctionCall { name, args, .. } => {
2565 let filter_args: Vec<FilterExpression> = args
2566 .iter()
2567 .map(|a| convert_filter_expression(a))
2568 .collect::<Result<Vec<_>>>()?;
2569 Ok(FilterExpression::FunctionCall {
2570 name: name.clone(),
2571 args: filter_args,
2572 })
2573 }
2574 LogicalExpression::Case {
2575 operand,
2576 when_clauses,
2577 else_clause,
2578 } => {
2579 let filter_operand = operand
2580 .as_ref()
2581 .map(|e| convert_filter_expression(e))
2582 .transpose()?
2583 .map(Box::new);
2584 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2585 .iter()
2586 .map(|(cond, result)| {
2587 Ok((
2588 convert_filter_expression(cond)?,
2589 convert_filter_expression(result)?,
2590 ))
2591 })
2592 .collect::<Result<Vec<_>>>()?;
2593 let filter_else = else_clause
2594 .as_ref()
2595 .map(|e| convert_filter_expression(e))
2596 .transpose()?
2597 .map(Box::new);
2598 Ok(FilterExpression::Case {
2599 operand: filter_operand,
2600 when_clauses: filter_when_clauses,
2601 else_clause: filter_else,
2602 })
2603 }
2604 LogicalExpression::List(items) => {
2605 let filter_items: Vec<FilterExpression> = items
2606 .iter()
2607 .map(|item| convert_filter_expression(item))
2608 .collect::<Result<Vec<_>>>()?;
2609 Ok(FilterExpression::List(filter_items))
2610 }
2611 LogicalExpression::Map(pairs) => {
2612 let filter_pairs: Vec<(String, FilterExpression)> = pairs
2613 .iter()
2614 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
2615 .collect::<Result<Vec<_>>>()?;
2616 Ok(FilterExpression::Map(filter_pairs))
2617 }
2618 LogicalExpression::IndexAccess { base, index } => {
2619 let base_expr = convert_filter_expression(base)?;
2620 let index_expr = convert_filter_expression(index)?;
2621 Ok(FilterExpression::IndexAccess {
2622 base: Box::new(base_expr),
2623 index: Box::new(index_expr),
2624 })
2625 }
2626 LogicalExpression::SliceAccess { base, start, end } => {
2627 let base_expr = convert_filter_expression(base)?;
2628 let start_expr = start
2629 .as_ref()
2630 .map(|s| convert_filter_expression(s))
2631 .transpose()?
2632 .map(Box::new);
2633 let end_expr = end
2634 .as_ref()
2635 .map(|e| convert_filter_expression(e))
2636 .transpose()?
2637 .map(Box::new);
2638 Ok(FilterExpression::SliceAccess {
2639 base: Box::new(base_expr),
2640 start: start_expr,
2641 end: end_expr,
2642 })
2643 }
2644 LogicalExpression::Parameter(_) => Err(Error::Internal(
2645 "Parameters not yet supported in filters".to_string(),
2646 )),
2647 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2648 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2649 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2650 LogicalExpression::ListComprehension {
2651 variable,
2652 list_expr,
2653 filter_expr,
2654 map_expr,
2655 } => {
2656 let list = convert_filter_expression(list_expr)?;
2657 let filter = filter_expr
2658 .as_ref()
2659 .map(|f| convert_filter_expression(f))
2660 .transpose()?
2661 .map(Box::new);
2662 let map = convert_filter_expression(map_expr)?;
2663 Ok(FilterExpression::ListComprehension {
2664 variable: variable.clone(),
2665 list_expr: Box::new(list),
2666 filter_expr: filter,
2667 map_expr: Box::new(map),
2668 })
2669 }
2670 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
2671 Error::Internal("Subqueries not yet supported in filters".to_string()),
2672 ),
2673 }
2674}
2675
2676fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
2678 use grafeo_common::types::Value;
2679 match value {
2680 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
2682 Value::Int64(_) => LogicalType::Int64,
2683 Value::Float64(_) => LogicalType::Float64,
2684 Value::String(_) => LogicalType::String,
2685 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
2687 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, }
2690}
2691
2692fn expression_to_string(expr: &LogicalExpression) -> String {
2694 match expr {
2695 LogicalExpression::Variable(name) => name.clone(),
2696 LogicalExpression::Property { variable, property } => {
2697 format!("{variable}.{property}")
2698 }
2699 LogicalExpression::Literal(value) => format!("{value:?}"),
2700 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
2701 _ => "expr".to_string(),
2702 }
2703}
2704
2705pub struct PhysicalPlan {
2707 pub operator: Box<dyn Operator>,
2709 pub columns: Vec<String>,
2711 pub adaptive_context: Option<AdaptiveContext>,
2717}
2718
2719impl PhysicalPlan {
2720 #[must_use]
2722 pub fn columns(&self) -> &[String] {
2723 &self.columns
2724 }
2725
2726 pub fn into_operator(self) -> Box<dyn Operator> {
2728 self.operator
2729 }
2730
2731 #[must_use]
2733 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
2734 self.adaptive_context.as_ref()
2735 }
2736
2737 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
2739 self.adaptive_context.take()
2740 }
2741}
2742
2743#[allow(dead_code)]
2747struct SingleResultOperator {
2748 result: Option<grafeo_core::execution::DataChunk>,
2749}
2750
2751impl SingleResultOperator {
2752 #[allow(dead_code)]
2753 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
2754 Self { result }
2755 }
2756}
2757
2758impl Operator for SingleResultOperator {
2759 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
2760 Ok(self.result.take())
2761 }
2762
2763 fn reset(&mut self) {
2764 }
2766
2767 fn name(&self) -> &'static str {
2768 "SingleResult"
2769 }
2770}
2771
2772#[cfg(test)]
2773mod tests {
2774 use super::*;
2775 use crate::query::plan::{
2776 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
2777 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
2778 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
2779 SortKey, SortOp,
2780 };
2781 use grafeo_common::types::Value;
2782
2783 fn create_test_store() -> Arc<LpgStore> {
2784 let store = Arc::new(LpgStore::new());
2785 store.create_node(&["Person"]);
2786 store.create_node(&["Person"]);
2787 store.create_node(&["Company"]);
2788 store
2789 }
2790
2791 #[test]
2794 fn test_plan_simple_scan() {
2795 let store = create_test_store();
2796 let planner = Planner::new(store);
2797
2798 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2800 items: vec![ReturnItem {
2801 expression: LogicalExpression::Variable("n".to_string()),
2802 alias: None,
2803 }],
2804 distinct: false,
2805 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2806 variable: "n".to_string(),
2807 label: Some("Person".to_string()),
2808 input: None,
2809 })),
2810 }));
2811
2812 let physical = planner.plan(&logical).unwrap();
2813 assert_eq!(physical.columns(), &["n"]);
2814 }
2815
2816 #[test]
2817 fn test_plan_scan_without_label() {
2818 let store = create_test_store();
2819 let planner = Planner::new(store);
2820
2821 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2823 items: vec![ReturnItem {
2824 expression: LogicalExpression::Variable("n".to_string()),
2825 alias: None,
2826 }],
2827 distinct: false,
2828 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2829 variable: "n".to_string(),
2830 label: None,
2831 input: None,
2832 })),
2833 }));
2834
2835 let physical = planner.plan(&logical).unwrap();
2836 assert_eq!(physical.columns(), &["n"]);
2837 }
2838
2839 #[test]
2840 fn test_plan_return_with_alias() {
2841 let store = create_test_store();
2842 let planner = Planner::new(store);
2843
2844 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2846 items: vec![ReturnItem {
2847 expression: LogicalExpression::Variable("n".to_string()),
2848 alias: Some("person".to_string()),
2849 }],
2850 distinct: false,
2851 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2852 variable: "n".to_string(),
2853 label: Some("Person".to_string()),
2854 input: None,
2855 })),
2856 }));
2857
2858 let physical = planner.plan(&logical).unwrap();
2859 assert_eq!(physical.columns(), &["person"]);
2860 }
2861
2862 #[test]
2863 fn test_plan_return_property() {
2864 let store = create_test_store();
2865 let planner = Planner::new(store);
2866
2867 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2869 items: vec![ReturnItem {
2870 expression: LogicalExpression::Property {
2871 variable: "n".to_string(),
2872 property: "name".to_string(),
2873 },
2874 alias: None,
2875 }],
2876 distinct: false,
2877 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2878 variable: "n".to_string(),
2879 label: Some("Person".to_string()),
2880 input: None,
2881 })),
2882 }));
2883
2884 let physical = planner.plan(&logical).unwrap();
2885 assert_eq!(physical.columns(), &["n.name"]);
2886 }
2887
2888 #[test]
2889 fn test_plan_return_literal() {
2890 let store = create_test_store();
2891 let planner = Planner::new(store);
2892
2893 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2895 items: vec![ReturnItem {
2896 expression: LogicalExpression::Literal(Value::Int64(42)),
2897 alias: Some("answer".to_string()),
2898 }],
2899 distinct: false,
2900 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2901 variable: "n".to_string(),
2902 label: None,
2903 input: None,
2904 })),
2905 }));
2906
2907 let physical = planner.plan(&logical).unwrap();
2908 assert_eq!(physical.columns(), &["answer"]);
2909 }
2910
2911 #[test]
2914 fn test_plan_filter_equality() {
2915 let store = create_test_store();
2916 let planner = Planner::new(store);
2917
2918 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2920 items: vec![ReturnItem {
2921 expression: LogicalExpression::Variable("n".to_string()),
2922 alias: None,
2923 }],
2924 distinct: false,
2925 input: Box::new(LogicalOperator::Filter(FilterOp {
2926 predicate: LogicalExpression::Binary {
2927 left: Box::new(LogicalExpression::Property {
2928 variable: "n".to_string(),
2929 property: "age".to_string(),
2930 }),
2931 op: BinaryOp::Eq,
2932 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2933 },
2934 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2935 variable: "n".to_string(),
2936 label: Some("Person".to_string()),
2937 input: None,
2938 })),
2939 })),
2940 }));
2941
2942 let physical = planner.plan(&logical).unwrap();
2943 assert_eq!(physical.columns(), &["n"]);
2944 }
2945
2946 #[test]
2947 fn test_plan_filter_compound_and() {
2948 let store = create_test_store();
2949 let planner = Planner::new(store);
2950
2951 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2953 items: vec![ReturnItem {
2954 expression: LogicalExpression::Variable("n".to_string()),
2955 alias: None,
2956 }],
2957 distinct: false,
2958 input: Box::new(LogicalOperator::Filter(FilterOp {
2959 predicate: LogicalExpression::Binary {
2960 left: Box::new(LogicalExpression::Binary {
2961 left: Box::new(LogicalExpression::Property {
2962 variable: "n".to_string(),
2963 property: "age".to_string(),
2964 }),
2965 op: BinaryOp::Gt,
2966 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
2967 }),
2968 op: BinaryOp::And,
2969 right: Box::new(LogicalExpression::Binary {
2970 left: Box::new(LogicalExpression::Property {
2971 variable: "n".to_string(),
2972 property: "age".to_string(),
2973 }),
2974 op: BinaryOp::Lt,
2975 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
2976 }),
2977 },
2978 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2979 variable: "n".to_string(),
2980 label: None,
2981 input: None,
2982 })),
2983 })),
2984 }));
2985
2986 let physical = planner.plan(&logical).unwrap();
2987 assert_eq!(physical.columns(), &["n"]);
2988 }
2989
2990 #[test]
2991 fn test_plan_filter_unary_not() {
2992 let store = create_test_store();
2993 let planner = Planner::new(store);
2994
2995 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2997 items: vec![ReturnItem {
2998 expression: LogicalExpression::Variable("n".to_string()),
2999 alias: None,
3000 }],
3001 distinct: false,
3002 input: Box::new(LogicalOperator::Filter(FilterOp {
3003 predicate: LogicalExpression::Unary {
3004 op: UnaryOp::Not,
3005 operand: Box::new(LogicalExpression::Property {
3006 variable: "n".to_string(),
3007 property: "active".to_string(),
3008 }),
3009 },
3010 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3011 variable: "n".to_string(),
3012 label: None,
3013 input: None,
3014 })),
3015 })),
3016 }));
3017
3018 let physical = planner.plan(&logical).unwrap();
3019 assert_eq!(physical.columns(), &["n"]);
3020 }
3021
3022 #[test]
3023 fn test_plan_filter_is_null() {
3024 let store = create_test_store();
3025 let planner = Planner::new(store);
3026
3027 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3029 items: vec![ReturnItem {
3030 expression: LogicalExpression::Variable("n".to_string()),
3031 alias: None,
3032 }],
3033 distinct: false,
3034 input: Box::new(LogicalOperator::Filter(FilterOp {
3035 predicate: LogicalExpression::Unary {
3036 op: UnaryOp::IsNull,
3037 operand: Box::new(LogicalExpression::Property {
3038 variable: "n".to_string(),
3039 property: "email".to_string(),
3040 }),
3041 },
3042 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3043 variable: "n".to_string(),
3044 label: None,
3045 input: None,
3046 })),
3047 })),
3048 }));
3049
3050 let physical = planner.plan(&logical).unwrap();
3051 assert_eq!(physical.columns(), &["n"]);
3052 }
3053
3054 #[test]
3055 fn test_plan_filter_function_call() {
3056 let store = create_test_store();
3057 let planner = Planner::new(store);
3058
3059 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3061 items: vec![ReturnItem {
3062 expression: LogicalExpression::Variable("n".to_string()),
3063 alias: None,
3064 }],
3065 distinct: false,
3066 input: Box::new(LogicalOperator::Filter(FilterOp {
3067 predicate: LogicalExpression::Binary {
3068 left: Box::new(LogicalExpression::FunctionCall {
3069 name: "size".to_string(),
3070 args: vec![LogicalExpression::Property {
3071 variable: "n".to_string(),
3072 property: "friends".to_string(),
3073 }],
3074 distinct: false,
3075 }),
3076 op: BinaryOp::Gt,
3077 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3078 },
3079 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3080 variable: "n".to_string(),
3081 label: None,
3082 input: None,
3083 })),
3084 })),
3085 }));
3086
3087 let physical = planner.plan(&logical).unwrap();
3088 assert_eq!(physical.columns(), &["n"]);
3089 }
3090
3091 #[test]
3094 fn test_plan_expand_outgoing() {
3095 let store = create_test_store();
3096 let planner = Planner::new(store);
3097
3098 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3100 items: vec![
3101 ReturnItem {
3102 expression: LogicalExpression::Variable("a".to_string()),
3103 alias: None,
3104 },
3105 ReturnItem {
3106 expression: LogicalExpression::Variable("b".to_string()),
3107 alias: None,
3108 },
3109 ],
3110 distinct: false,
3111 input: Box::new(LogicalOperator::Expand(ExpandOp {
3112 from_variable: "a".to_string(),
3113 to_variable: "b".to_string(),
3114 edge_variable: None,
3115 direction: ExpandDirection::Outgoing,
3116 edge_type: Some("KNOWS".to_string()),
3117 min_hops: 1,
3118 max_hops: Some(1),
3119 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3120 variable: "a".to_string(),
3121 label: Some("Person".to_string()),
3122 input: None,
3123 })),
3124 path_alias: None,
3125 })),
3126 }));
3127
3128 let physical = planner.plan(&logical).unwrap();
3129 assert!(physical.columns().contains(&"a".to_string()));
3131 assert!(physical.columns().contains(&"b".to_string()));
3132 }
3133
3134 #[test]
3135 fn test_plan_expand_with_edge_variable() {
3136 let store = create_test_store();
3137 let planner = Planner::new(store);
3138
3139 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3141 items: vec![
3142 ReturnItem {
3143 expression: LogicalExpression::Variable("a".to_string()),
3144 alias: None,
3145 },
3146 ReturnItem {
3147 expression: LogicalExpression::Variable("r".to_string()),
3148 alias: None,
3149 },
3150 ReturnItem {
3151 expression: LogicalExpression::Variable("b".to_string()),
3152 alias: None,
3153 },
3154 ],
3155 distinct: false,
3156 input: Box::new(LogicalOperator::Expand(ExpandOp {
3157 from_variable: "a".to_string(),
3158 to_variable: "b".to_string(),
3159 edge_variable: Some("r".to_string()),
3160 direction: ExpandDirection::Outgoing,
3161 edge_type: Some("KNOWS".to_string()),
3162 min_hops: 1,
3163 max_hops: Some(1),
3164 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3165 variable: "a".to_string(),
3166 label: None,
3167 input: None,
3168 })),
3169 path_alias: None,
3170 })),
3171 }));
3172
3173 let physical = planner.plan(&logical).unwrap();
3174 assert!(physical.columns().contains(&"a".to_string()));
3175 assert!(physical.columns().contains(&"r".to_string()));
3176 assert!(physical.columns().contains(&"b".to_string()));
3177 }
3178
3179 #[test]
3182 fn test_plan_limit() {
3183 let store = create_test_store();
3184 let planner = Planner::new(store);
3185
3186 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3188 items: vec![ReturnItem {
3189 expression: LogicalExpression::Variable("n".to_string()),
3190 alias: None,
3191 }],
3192 distinct: false,
3193 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3194 count: 10,
3195 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3196 variable: "n".to_string(),
3197 label: None,
3198 input: None,
3199 })),
3200 })),
3201 }));
3202
3203 let physical = planner.plan(&logical).unwrap();
3204 assert_eq!(physical.columns(), &["n"]);
3205 }
3206
3207 #[test]
3208 fn test_plan_skip() {
3209 let store = create_test_store();
3210 let planner = Planner::new(store);
3211
3212 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3214 items: vec![ReturnItem {
3215 expression: LogicalExpression::Variable("n".to_string()),
3216 alias: None,
3217 }],
3218 distinct: false,
3219 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3220 count: 5,
3221 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3222 variable: "n".to_string(),
3223 label: None,
3224 input: None,
3225 })),
3226 })),
3227 }));
3228
3229 let physical = planner.plan(&logical).unwrap();
3230 assert_eq!(physical.columns(), &["n"]);
3231 }
3232
3233 #[test]
3234 fn test_plan_sort() {
3235 let store = create_test_store();
3236 let planner = Planner::new(store);
3237
3238 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3240 items: vec![ReturnItem {
3241 expression: LogicalExpression::Variable("n".to_string()),
3242 alias: None,
3243 }],
3244 distinct: false,
3245 input: Box::new(LogicalOperator::Sort(SortOp {
3246 keys: vec![SortKey {
3247 expression: LogicalExpression::Variable("n".to_string()),
3248 order: SortOrder::Ascending,
3249 }],
3250 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3251 variable: "n".to_string(),
3252 label: None,
3253 input: None,
3254 })),
3255 })),
3256 }));
3257
3258 let physical = planner.plan(&logical).unwrap();
3259 assert_eq!(physical.columns(), &["n"]);
3260 }
3261
3262 #[test]
3263 fn test_plan_sort_descending() {
3264 let store = create_test_store();
3265 let planner = Planner::new(store);
3266
3267 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3269 items: vec![ReturnItem {
3270 expression: LogicalExpression::Variable("n".to_string()),
3271 alias: None,
3272 }],
3273 distinct: false,
3274 input: Box::new(LogicalOperator::Sort(SortOp {
3275 keys: vec![SortKey {
3276 expression: LogicalExpression::Variable("n".to_string()),
3277 order: SortOrder::Descending,
3278 }],
3279 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3280 variable: "n".to_string(),
3281 label: None,
3282 input: None,
3283 })),
3284 })),
3285 }));
3286
3287 let physical = planner.plan(&logical).unwrap();
3288 assert_eq!(physical.columns(), &["n"]);
3289 }
3290
3291 #[test]
3292 fn test_plan_distinct() {
3293 let store = create_test_store();
3294 let planner = Planner::new(store);
3295
3296 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3298 items: vec![ReturnItem {
3299 expression: LogicalExpression::Variable("n".to_string()),
3300 alias: None,
3301 }],
3302 distinct: false,
3303 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3304 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3305 variable: "n".to_string(),
3306 label: None,
3307 input: None,
3308 })),
3309 columns: None,
3310 })),
3311 }));
3312
3313 let physical = planner.plan(&logical).unwrap();
3314 assert_eq!(physical.columns(), &["n"]);
3315 }
3316
3317 #[test]
3320 fn test_plan_aggregate_count() {
3321 let store = create_test_store();
3322 let planner = Planner::new(store);
3323
3324 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3326 items: vec![ReturnItem {
3327 expression: LogicalExpression::Variable("cnt".to_string()),
3328 alias: None,
3329 }],
3330 distinct: false,
3331 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3332 group_by: vec![],
3333 aggregates: vec![LogicalAggregateExpr {
3334 function: LogicalAggregateFunction::Count,
3335 expression: Some(LogicalExpression::Variable("n".to_string())),
3336 distinct: false,
3337 alias: Some("cnt".to_string()),
3338 percentile: None,
3339 }],
3340 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3341 variable: "n".to_string(),
3342 label: None,
3343 input: None,
3344 })),
3345 having: None,
3346 })),
3347 }));
3348
3349 let physical = planner.plan(&logical).unwrap();
3350 assert!(physical.columns().contains(&"cnt".to_string()));
3351 }
3352
3353 #[test]
3354 fn test_plan_aggregate_with_group_by() {
3355 let store = create_test_store();
3356 let planner = Planner::new(store);
3357
3358 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3360 group_by: vec![LogicalExpression::Property {
3361 variable: "n".to_string(),
3362 property: "city".to_string(),
3363 }],
3364 aggregates: vec![LogicalAggregateExpr {
3365 function: LogicalAggregateFunction::Count,
3366 expression: Some(LogicalExpression::Variable("n".to_string())),
3367 distinct: false,
3368 alias: Some("cnt".to_string()),
3369 percentile: None,
3370 }],
3371 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3372 variable: "n".to_string(),
3373 label: Some("Person".to_string()),
3374 input: None,
3375 })),
3376 having: None,
3377 }));
3378
3379 let physical = planner.plan(&logical).unwrap();
3380 assert_eq!(physical.columns().len(), 2);
3381 }
3382
3383 #[test]
3384 fn test_plan_aggregate_sum() {
3385 let store = create_test_store();
3386 let planner = Planner::new(store);
3387
3388 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3390 group_by: vec![],
3391 aggregates: vec![LogicalAggregateExpr {
3392 function: LogicalAggregateFunction::Sum,
3393 expression: Some(LogicalExpression::Property {
3394 variable: "n".to_string(),
3395 property: "value".to_string(),
3396 }),
3397 distinct: false,
3398 alias: Some("total".to_string()),
3399 percentile: None,
3400 }],
3401 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3402 variable: "n".to_string(),
3403 label: None,
3404 input: None,
3405 })),
3406 having: None,
3407 }));
3408
3409 let physical = planner.plan(&logical).unwrap();
3410 assert!(physical.columns().contains(&"total".to_string()));
3411 }
3412
3413 #[test]
3414 fn test_plan_aggregate_avg() {
3415 let store = create_test_store();
3416 let planner = Planner::new(store);
3417
3418 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3420 group_by: vec![],
3421 aggregates: vec![LogicalAggregateExpr {
3422 function: LogicalAggregateFunction::Avg,
3423 expression: Some(LogicalExpression::Property {
3424 variable: "n".to_string(),
3425 property: "score".to_string(),
3426 }),
3427 distinct: false,
3428 alias: Some("average".to_string()),
3429 percentile: None,
3430 }],
3431 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3432 variable: "n".to_string(),
3433 label: None,
3434 input: None,
3435 })),
3436 having: None,
3437 }));
3438
3439 let physical = planner.plan(&logical).unwrap();
3440 assert!(physical.columns().contains(&"average".to_string()));
3441 }
3442
3443 #[test]
3444 fn test_plan_aggregate_min_max() {
3445 let store = create_test_store();
3446 let planner = Planner::new(store);
3447
3448 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3450 group_by: vec![],
3451 aggregates: vec![
3452 LogicalAggregateExpr {
3453 function: LogicalAggregateFunction::Min,
3454 expression: Some(LogicalExpression::Property {
3455 variable: "n".to_string(),
3456 property: "age".to_string(),
3457 }),
3458 distinct: false,
3459 alias: Some("youngest".to_string()),
3460 percentile: None,
3461 },
3462 LogicalAggregateExpr {
3463 function: LogicalAggregateFunction::Max,
3464 expression: Some(LogicalExpression::Property {
3465 variable: "n".to_string(),
3466 property: "age".to_string(),
3467 }),
3468 distinct: false,
3469 alias: Some("oldest".to_string()),
3470 percentile: None,
3471 },
3472 ],
3473 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3474 variable: "n".to_string(),
3475 label: None,
3476 input: None,
3477 })),
3478 having: None,
3479 }));
3480
3481 let physical = planner.plan(&logical).unwrap();
3482 assert!(physical.columns().contains(&"youngest".to_string()));
3483 assert!(physical.columns().contains(&"oldest".to_string()));
3484 }
3485
3486 #[test]
3489 fn test_plan_inner_join() {
3490 let store = create_test_store();
3491 let planner = Planner::new(store);
3492
3493 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3495 items: vec![
3496 ReturnItem {
3497 expression: LogicalExpression::Variable("a".to_string()),
3498 alias: None,
3499 },
3500 ReturnItem {
3501 expression: LogicalExpression::Variable("b".to_string()),
3502 alias: None,
3503 },
3504 ],
3505 distinct: false,
3506 input: Box::new(LogicalOperator::Join(JoinOp {
3507 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3508 variable: "a".to_string(),
3509 label: Some("Person".to_string()),
3510 input: None,
3511 })),
3512 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3513 variable: "b".to_string(),
3514 label: Some("Company".to_string()),
3515 input: None,
3516 })),
3517 join_type: JoinType::Inner,
3518 conditions: vec![JoinCondition {
3519 left: LogicalExpression::Variable("a".to_string()),
3520 right: LogicalExpression::Variable("b".to_string()),
3521 }],
3522 })),
3523 }));
3524
3525 let physical = planner.plan(&logical).unwrap();
3526 assert!(physical.columns().contains(&"a".to_string()));
3527 assert!(physical.columns().contains(&"b".to_string()));
3528 }
3529
3530 #[test]
3531 fn test_plan_cross_join() {
3532 let store = create_test_store();
3533 let planner = Planner::new(store);
3534
3535 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3537 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3538 variable: "a".to_string(),
3539 label: None,
3540 input: None,
3541 })),
3542 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3543 variable: "b".to_string(),
3544 label: None,
3545 input: None,
3546 })),
3547 join_type: JoinType::Cross,
3548 conditions: vec![],
3549 }));
3550
3551 let physical = planner.plan(&logical).unwrap();
3552 assert_eq!(physical.columns().len(), 2);
3553 }
3554
3555 #[test]
3556 fn test_plan_left_join() {
3557 let store = create_test_store();
3558 let planner = Planner::new(store);
3559
3560 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3561 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3562 variable: "a".to_string(),
3563 label: None,
3564 input: None,
3565 })),
3566 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3567 variable: "b".to_string(),
3568 label: None,
3569 input: None,
3570 })),
3571 join_type: JoinType::Left,
3572 conditions: vec![],
3573 }));
3574
3575 let physical = planner.plan(&logical).unwrap();
3576 assert_eq!(physical.columns().len(), 2);
3577 }
3578
3579 #[test]
3582 fn test_plan_create_node() {
3583 let store = create_test_store();
3584 let planner = Planner::new(store);
3585
3586 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3588 variable: "n".to_string(),
3589 labels: vec!["Person".to_string()],
3590 properties: vec![(
3591 "name".to_string(),
3592 LogicalExpression::Literal(Value::String("Alice".into())),
3593 )],
3594 input: None,
3595 }));
3596
3597 let physical = planner.plan(&logical).unwrap();
3598 assert!(physical.columns().contains(&"n".to_string()));
3599 }
3600
3601 #[test]
3602 fn test_plan_create_edge() {
3603 let store = create_test_store();
3604 let planner = Planner::new(store);
3605
3606 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
3608 variable: Some("r".to_string()),
3609 from_variable: "a".to_string(),
3610 to_variable: "b".to_string(),
3611 edge_type: "KNOWS".to_string(),
3612 properties: vec![],
3613 input: Box::new(LogicalOperator::Join(JoinOp {
3614 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3615 variable: "a".to_string(),
3616 label: None,
3617 input: None,
3618 })),
3619 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3620 variable: "b".to_string(),
3621 label: None,
3622 input: None,
3623 })),
3624 join_type: JoinType::Cross,
3625 conditions: vec![],
3626 })),
3627 }));
3628
3629 let physical = planner.plan(&logical).unwrap();
3630 assert!(physical.columns().contains(&"r".to_string()));
3631 }
3632
3633 #[test]
3634 fn test_plan_delete_node() {
3635 let store = create_test_store();
3636 let planner = Planner::new(store);
3637
3638 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
3640 variable: "n".to_string(),
3641 detach: false,
3642 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3643 variable: "n".to_string(),
3644 label: None,
3645 input: None,
3646 })),
3647 }));
3648
3649 let physical = planner.plan(&logical).unwrap();
3650 assert!(physical.columns().contains(&"deleted_count".to_string()));
3651 }
3652
3653 #[test]
3656 fn test_plan_empty_errors() {
3657 let store = create_test_store();
3658 let planner = Planner::new(store);
3659
3660 let logical = LogicalPlan::new(LogicalOperator::Empty);
3661 let result = planner.plan(&logical);
3662 assert!(result.is_err());
3663 }
3664
3665 #[test]
3666 fn test_plan_missing_variable_in_return() {
3667 let store = create_test_store();
3668 let planner = Planner::new(store);
3669
3670 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3672 items: vec![ReturnItem {
3673 expression: LogicalExpression::Variable("missing".to_string()),
3674 alias: None,
3675 }],
3676 distinct: false,
3677 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3678 variable: "n".to_string(),
3679 label: None,
3680 input: None,
3681 })),
3682 }));
3683
3684 let result = planner.plan(&logical);
3685 assert!(result.is_err());
3686 }
3687
3688 #[test]
3691 fn test_convert_binary_ops() {
3692 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
3693 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
3694 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
3695 assert!(convert_binary_op(BinaryOp::Le).is_ok());
3696 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
3697 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
3698 assert!(convert_binary_op(BinaryOp::And).is_ok());
3699 assert!(convert_binary_op(BinaryOp::Or).is_ok());
3700 assert!(convert_binary_op(BinaryOp::Add).is_ok());
3701 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
3702 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
3703 assert!(convert_binary_op(BinaryOp::Div).is_ok());
3704 }
3705
3706 #[test]
3707 fn test_convert_unary_ops() {
3708 assert!(convert_unary_op(UnaryOp::Not).is_ok());
3709 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
3710 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
3711 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
3712 }
3713
3714 #[test]
3715 fn test_convert_aggregate_functions() {
3716 assert!(matches!(
3717 convert_aggregate_function(LogicalAggregateFunction::Count),
3718 PhysicalAggregateFunction::Count
3719 ));
3720 assert!(matches!(
3721 convert_aggregate_function(LogicalAggregateFunction::Sum),
3722 PhysicalAggregateFunction::Sum
3723 ));
3724 assert!(matches!(
3725 convert_aggregate_function(LogicalAggregateFunction::Avg),
3726 PhysicalAggregateFunction::Avg
3727 ));
3728 assert!(matches!(
3729 convert_aggregate_function(LogicalAggregateFunction::Min),
3730 PhysicalAggregateFunction::Min
3731 ));
3732 assert!(matches!(
3733 convert_aggregate_function(LogicalAggregateFunction::Max),
3734 PhysicalAggregateFunction::Max
3735 ));
3736 }
3737
3738 #[test]
3739 fn test_planner_accessors() {
3740 let store = create_test_store();
3741 let planner = Planner::new(Arc::clone(&store));
3742
3743 assert!(planner.tx_id().is_none());
3744 assert!(planner.tx_manager().is_none());
3745 let _ = planner.viewing_epoch(); }
3747
3748 #[test]
3749 fn test_physical_plan_accessors() {
3750 let store = create_test_store();
3751 let planner = Planner::new(store);
3752
3753 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
3754 variable: "n".to_string(),
3755 label: None,
3756 input: None,
3757 }));
3758
3759 let physical = planner.plan(&logical).unwrap();
3760 assert_eq!(physical.columns(), &["n"]);
3761
3762 let _ = physical.into_operator();
3764 }
3765
3766 #[test]
3769 fn test_plan_adaptive_with_scan() {
3770 let store = create_test_store();
3771 let planner = Planner::new(store);
3772
3773 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3775 items: vec![ReturnItem {
3776 expression: LogicalExpression::Variable("n".to_string()),
3777 alias: None,
3778 }],
3779 distinct: false,
3780 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3781 variable: "n".to_string(),
3782 label: Some("Person".to_string()),
3783 input: None,
3784 })),
3785 }));
3786
3787 let physical = planner.plan_adaptive(&logical).unwrap();
3788 assert_eq!(physical.columns(), &["n"]);
3789 assert!(physical.adaptive_context.is_some());
3791 }
3792
3793 #[test]
3794 fn test_plan_adaptive_with_filter() {
3795 let store = create_test_store();
3796 let planner = Planner::new(store);
3797
3798 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3800 items: vec![ReturnItem {
3801 expression: LogicalExpression::Variable("n".to_string()),
3802 alias: None,
3803 }],
3804 distinct: false,
3805 input: Box::new(LogicalOperator::Filter(FilterOp {
3806 predicate: LogicalExpression::Binary {
3807 left: Box::new(LogicalExpression::Property {
3808 variable: "n".to_string(),
3809 property: "age".to_string(),
3810 }),
3811 op: BinaryOp::Gt,
3812 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3813 },
3814 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3815 variable: "n".to_string(),
3816 label: None,
3817 input: None,
3818 })),
3819 })),
3820 }));
3821
3822 let physical = planner.plan_adaptive(&logical).unwrap();
3823 assert!(physical.adaptive_context.is_some());
3824 }
3825
3826 #[test]
3827 fn test_plan_adaptive_with_expand() {
3828 let store = create_test_store();
3829 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
3830
3831 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3833 items: vec![
3834 ReturnItem {
3835 expression: LogicalExpression::Variable("a".to_string()),
3836 alias: None,
3837 },
3838 ReturnItem {
3839 expression: LogicalExpression::Variable("b".to_string()),
3840 alias: None,
3841 },
3842 ],
3843 distinct: false,
3844 input: Box::new(LogicalOperator::Expand(ExpandOp {
3845 from_variable: "a".to_string(),
3846 to_variable: "b".to_string(),
3847 edge_variable: None,
3848 direction: ExpandDirection::Outgoing,
3849 edge_type: Some("KNOWS".to_string()),
3850 min_hops: 1,
3851 max_hops: Some(1),
3852 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3853 variable: "a".to_string(),
3854 label: None,
3855 input: None,
3856 })),
3857 path_alias: None,
3858 })),
3859 }));
3860
3861 let physical = planner.plan_adaptive(&logical).unwrap();
3862 assert!(physical.adaptive_context.is_some());
3863 }
3864
3865 #[test]
3866 fn test_plan_adaptive_with_join() {
3867 let store = create_test_store();
3868 let planner = Planner::new(store);
3869
3870 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3871 items: vec![
3872 ReturnItem {
3873 expression: LogicalExpression::Variable("a".to_string()),
3874 alias: None,
3875 },
3876 ReturnItem {
3877 expression: LogicalExpression::Variable("b".to_string()),
3878 alias: None,
3879 },
3880 ],
3881 distinct: false,
3882 input: Box::new(LogicalOperator::Join(JoinOp {
3883 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3884 variable: "a".to_string(),
3885 label: None,
3886 input: None,
3887 })),
3888 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3889 variable: "b".to_string(),
3890 label: None,
3891 input: None,
3892 })),
3893 join_type: JoinType::Cross,
3894 conditions: vec![],
3895 })),
3896 }));
3897
3898 let physical = planner.plan_adaptive(&logical).unwrap();
3899 assert!(physical.adaptive_context.is_some());
3900 }
3901
3902 #[test]
3903 fn test_plan_adaptive_with_aggregate() {
3904 let store = create_test_store();
3905 let planner = Planner::new(store);
3906
3907 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3908 group_by: vec![],
3909 aggregates: vec![LogicalAggregateExpr {
3910 function: LogicalAggregateFunction::Count,
3911 expression: Some(LogicalExpression::Variable("n".to_string())),
3912 distinct: false,
3913 alias: Some("cnt".to_string()),
3914 percentile: None,
3915 }],
3916 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3917 variable: "n".to_string(),
3918 label: None,
3919 input: None,
3920 })),
3921 having: None,
3922 }));
3923
3924 let physical = planner.plan_adaptive(&logical).unwrap();
3925 assert!(physical.adaptive_context.is_some());
3926 }
3927
3928 #[test]
3929 fn test_plan_adaptive_with_distinct() {
3930 let store = create_test_store();
3931 let planner = Planner::new(store);
3932
3933 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3934 items: vec![ReturnItem {
3935 expression: LogicalExpression::Variable("n".to_string()),
3936 alias: None,
3937 }],
3938 distinct: false,
3939 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3940 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3941 variable: "n".to_string(),
3942 label: None,
3943 input: None,
3944 })),
3945 columns: None,
3946 })),
3947 }));
3948
3949 let physical = planner.plan_adaptive(&logical).unwrap();
3950 assert!(physical.adaptive_context.is_some());
3951 }
3952
3953 #[test]
3954 fn test_plan_adaptive_with_limit() {
3955 let store = create_test_store();
3956 let planner = Planner::new(store);
3957
3958 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3959 items: vec![ReturnItem {
3960 expression: LogicalExpression::Variable("n".to_string()),
3961 alias: None,
3962 }],
3963 distinct: false,
3964 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3965 count: 10,
3966 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3967 variable: "n".to_string(),
3968 label: None,
3969 input: None,
3970 })),
3971 })),
3972 }));
3973
3974 let physical = planner.plan_adaptive(&logical).unwrap();
3975 assert!(physical.adaptive_context.is_some());
3976 }
3977
3978 #[test]
3979 fn test_plan_adaptive_with_skip() {
3980 let store = create_test_store();
3981 let planner = Planner::new(store);
3982
3983 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3984 items: vec![ReturnItem {
3985 expression: LogicalExpression::Variable("n".to_string()),
3986 alias: None,
3987 }],
3988 distinct: false,
3989 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3990 count: 5,
3991 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3992 variable: "n".to_string(),
3993 label: None,
3994 input: None,
3995 })),
3996 })),
3997 }));
3998
3999 let physical = planner.plan_adaptive(&logical).unwrap();
4000 assert!(physical.adaptive_context.is_some());
4001 }
4002
4003 #[test]
4004 fn test_plan_adaptive_with_sort() {
4005 let store = create_test_store();
4006 let planner = Planner::new(store);
4007
4008 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4009 items: vec![ReturnItem {
4010 expression: LogicalExpression::Variable("n".to_string()),
4011 alias: None,
4012 }],
4013 distinct: false,
4014 input: Box::new(LogicalOperator::Sort(SortOp {
4015 keys: vec![SortKey {
4016 expression: LogicalExpression::Variable("n".to_string()),
4017 order: SortOrder::Ascending,
4018 }],
4019 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4020 variable: "n".to_string(),
4021 label: None,
4022 input: None,
4023 })),
4024 })),
4025 }));
4026
4027 let physical = planner.plan_adaptive(&logical).unwrap();
4028 assert!(physical.adaptive_context.is_some());
4029 }
4030
4031 #[test]
4032 fn test_plan_adaptive_with_union() {
4033 let store = create_test_store();
4034 let planner = Planner::new(store);
4035
4036 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4037 items: vec![ReturnItem {
4038 expression: LogicalExpression::Variable("n".to_string()),
4039 alias: None,
4040 }],
4041 distinct: false,
4042 input: Box::new(LogicalOperator::Union(UnionOp {
4043 inputs: vec![
4044 LogicalOperator::NodeScan(NodeScanOp {
4045 variable: "n".to_string(),
4046 label: Some("Person".to_string()),
4047 input: None,
4048 }),
4049 LogicalOperator::NodeScan(NodeScanOp {
4050 variable: "n".to_string(),
4051 label: Some("Company".to_string()),
4052 input: None,
4053 }),
4054 ],
4055 })),
4056 }));
4057
4058 let physical = planner.plan_adaptive(&logical).unwrap();
4059 assert!(physical.adaptive_context.is_some());
4060 }
4061
4062 #[test]
4065 fn test_plan_expand_variable_length() {
4066 let store = create_test_store();
4067 let planner = Planner::new(store);
4068
4069 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4071 items: vec![
4072 ReturnItem {
4073 expression: LogicalExpression::Variable("a".to_string()),
4074 alias: None,
4075 },
4076 ReturnItem {
4077 expression: LogicalExpression::Variable("b".to_string()),
4078 alias: None,
4079 },
4080 ],
4081 distinct: false,
4082 input: Box::new(LogicalOperator::Expand(ExpandOp {
4083 from_variable: "a".to_string(),
4084 to_variable: "b".to_string(),
4085 edge_variable: None,
4086 direction: ExpandDirection::Outgoing,
4087 edge_type: Some("KNOWS".to_string()),
4088 min_hops: 1,
4089 max_hops: Some(3),
4090 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4091 variable: "a".to_string(),
4092 label: None,
4093 input: None,
4094 })),
4095 path_alias: None,
4096 })),
4097 }));
4098
4099 let physical = planner.plan(&logical).unwrap();
4100 assert!(physical.columns().contains(&"a".to_string()));
4101 assert!(physical.columns().contains(&"b".to_string()));
4102 }
4103
4104 #[test]
4105 fn test_plan_expand_with_path_alias() {
4106 let store = create_test_store();
4107 let planner = Planner::new(store);
4108
4109 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4111 items: vec![
4112 ReturnItem {
4113 expression: LogicalExpression::Variable("a".to_string()),
4114 alias: None,
4115 },
4116 ReturnItem {
4117 expression: LogicalExpression::Variable("b".to_string()),
4118 alias: None,
4119 },
4120 ],
4121 distinct: false,
4122 input: Box::new(LogicalOperator::Expand(ExpandOp {
4123 from_variable: "a".to_string(),
4124 to_variable: "b".to_string(),
4125 edge_variable: None,
4126 direction: ExpandDirection::Outgoing,
4127 edge_type: Some("KNOWS".to_string()),
4128 min_hops: 1,
4129 max_hops: Some(3),
4130 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4131 variable: "a".to_string(),
4132 label: None,
4133 input: None,
4134 })),
4135 path_alias: Some("p".to_string()),
4136 })),
4137 }));
4138
4139 let physical = planner.plan(&logical).unwrap();
4140 assert!(physical.columns().contains(&"a".to_string()));
4142 assert!(physical.columns().contains(&"b".to_string()));
4143 }
4144
4145 #[test]
4146 fn test_plan_expand_incoming() {
4147 let store = create_test_store();
4148 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4149
4150 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4152 items: vec![
4153 ReturnItem {
4154 expression: LogicalExpression::Variable("a".to_string()),
4155 alias: None,
4156 },
4157 ReturnItem {
4158 expression: LogicalExpression::Variable("b".to_string()),
4159 alias: None,
4160 },
4161 ],
4162 distinct: false,
4163 input: Box::new(LogicalOperator::Expand(ExpandOp {
4164 from_variable: "a".to_string(),
4165 to_variable: "b".to_string(),
4166 edge_variable: None,
4167 direction: ExpandDirection::Incoming,
4168 edge_type: Some("KNOWS".to_string()),
4169 min_hops: 1,
4170 max_hops: Some(1),
4171 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4172 variable: "a".to_string(),
4173 label: None,
4174 input: None,
4175 })),
4176 path_alias: None,
4177 })),
4178 }));
4179
4180 let physical = planner.plan(&logical).unwrap();
4181 assert!(physical.columns().contains(&"a".to_string()));
4182 assert!(physical.columns().contains(&"b".to_string()));
4183 }
4184
4185 #[test]
4186 fn test_plan_expand_both_directions() {
4187 let store = create_test_store();
4188 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4189
4190 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4192 items: vec![
4193 ReturnItem {
4194 expression: LogicalExpression::Variable("a".to_string()),
4195 alias: None,
4196 },
4197 ReturnItem {
4198 expression: LogicalExpression::Variable("b".to_string()),
4199 alias: None,
4200 },
4201 ],
4202 distinct: false,
4203 input: Box::new(LogicalOperator::Expand(ExpandOp {
4204 from_variable: "a".to_string(),
4205 to_variable: "b".to_string(),
4206 edge_variable: None,
4207 direction: ExpandDirection::Both,
4208 edge_type: Some("KNOWS".to_string()),
4209 min_hops: 1,
4210 max_hops: Some(1),
4211 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4212 variable: "a".to_string(),
4213 label: None,
4214 input: None,
4215 })),
4216 path_alias: None,
4217 })),
4218 }));
4219
4220 let physical = planner.plan(&logical).unwrap();
4221 assert!(physical.columns().contains(&"a".to_string()));
4222 assert!(physical.columns().contains(&"b".to_string()));
4223 }
4224
4225 #[test]
4228 fn test_planner_with_context() {
4229 use crate::transaction::TransactionManager;
4230
4231 let store = create_test_store();
4232 let tx_manager = Arc::new(TransactionManager::new());
4233 let tx_id = tx_manager.begin();
4234 let epoch = tx_manager.current_epoch();
4235
4236 let planner = Planner::with_context(
4237 Arc::clone(&store),
4238 Arc::clone(&tx_manager),
4239 Some(tx_id),
4240 epoch,
4241 );
4242
4243 assert_eq!(planner.tx_id(), Some(tx_id));
4244 assert!(planner.tx_manager().is_some());
4245 assert_eq!(planner.viewing_epoch(), epoch);
4246 }
4247
4248 #[test]
4249 fn test_planner_with_factorized_execution_disabled() {
4250 let store = create_test_store();
4251 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4252
4253 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4255 items: vec![
4256 ReturnItem {
4257 expression: LogicalExpression::Variable("a".to_string()),
4258 alias: None,
4259 },
4260 ReturnItem {
4261 expression: LogicalExpression::Variable("c".to_string()),
4262 alias: None,
4263 },
4264 ],
4265 distinct: false,
4266 input: Box::new(LogicalOperator::Expand(ExpandOp {
4267 from_variable: "b".to_string(),
4268 to_variable: "c".to_string(),
4269 edge_variable: None,
4270 direction: ExpandDirection::Outgoing,
4271 edge_type: None,
4272 min_hops: 1,
4273 max_hops: Some(1),
4274 input: Box::new(LogicalOperator::Expand(ExpandOp {
4275 from_variable: "a".to_string(),
4276 to_variable: "b".to_string(),
4277 edge_variable: None,
4278 direction: ExpandDirection::Outgoing,
4279 edge_type: None,
4280 min_hops: 1,
4281 max_hops: Some(1),
4282 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4283 variable: "a".to_string(),
4284 label: None,
4285 input: None,
4286 })),
4287 path_alias: None,
4288 })),
4289 path_alias: None,
4290 })),
4291 }));
4292
4293 let physical = planner.plan(&logical).unwrap();
4294 assert!(physical.columns().contains(&"a".to_string()));
4295 assert!(physical.columns().contains(&"c".to_string()));
4296 }
4297
4298 #[test]
4301 fn test_plan_sort_by_property() {
4302 let store = create_test_store();
4303 let planner = Planner::new(store);
4304
4305 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4307 items: vec![ReturnItem {
4308 expression: LogicalExpression::Variable("n".to_string()),
4309 alias: None,
4310 }],
4311 distinct: false,
4312 input: Box::new(LogicalOperator::Sort(SortOp {
4313 keys: vec![SortKey {
4314 expression: LogicalExpression::Property {
4315 variable: "n".to_string(),
4316 property: "name".to_string(),
4317 },
4318 order: SortOrder::Ascending,
4319 }],
4320 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4321 variable: "n".to_string(),
4322 label: None,
4323 input: None,
4324 })),
4325 })),
4326 }));
4327
4328 let physical = planner.plan(&logical).unwrap();
4329 assert!(physical.columns().contains(&"n".to_string()));
4331 }
4332
4333 #[test]
4336 fn test_plan_scan_with_input() {
4337 let store = create_test_store();
4338 let planner = Planner::new(store);
4339
4340 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4342 items: vec![
4343 ReturnItem {
4344 expression: LogicalExpression::Variable("a".to_string()),
4345 alias: None,
4346 },
4347 ReturnItem {
4348 expression: LogicalExpression::Variable("b".to_string()),
4349 alias: None,
4350 },
4351 ],
4352 distinct: false,
4353 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4354 variable: "b".to_string(),
4355 label: Some("Company".to_string()),
4356 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4357 variable: "a".to_string(),
4358 label: Some("Person".to_string()),
4359 input: None,
4360 }))),
4361 })),
4362 }));
4363
4364 let physical = planner.plan(&logical).unwrap();
4365 assert!(physical.columns().contains(&"a".to_string()));
4366 assert!(physical.columns().contains(&"b".to_string()));
4367 }
4368}