1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29 UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37struct RangeBounds<'a> {
39 min: Option<&'a Value>,
40 max: Option<&'a Value>,
41 min_inclusive: bool,
42 max_inclusive: bool,
43}
44
45pub struct Planner {
47 store: Arc<LpgStore>,
49 tx_manager: Option<Arc<TransactionManager>>,
51 tx_id: Option<TxId>,
53 viewing_epoch: EpochId,
55 anon_edge_counter: std::cell::Cell<u32>,
57 factorized_execution: bool,
59}
60
61impl Planner {
62 #[must_use]
67 pub fn new(store: Arc<LpgStore>) -> Self {
68 let epoch = store.current_epoch();
69 Self {
70 store,
71 tx_manager: None,
72 tx_id: None,
73 viewing_epoch: epoch,
74 anon_edge_counter: std::cell::Cell::new(0),
75 factorized_execution: true,
76 }
77 }
78
79 #[must_use]
88 pub fn with_context(
89 store: Arc<LpgStore>,
90 tx_manager: Arc<TransactionManager>,
91 tx_id: Option<TxId>,
92 viewing_epoch: EpochId,
93 ) -> Self {
94 Self {
95 store,
96 tx_manager: Some(tx_manager),
97 tx_id,
98 viewing_epoch,
99 anon_edge_counter: std::cell::Cell::new(0),
100 factorized_execution: true,
101 }
102 }
103
104 #[must_use]
106 pub fn viewing_epoch(&self) -> EpochId {
107 self.viewing_epoch
108 }
109
110 #[must_use]
112 pub fn tx_id(&self) -> Option<TxId> {
113 self.tx_id
114 }
115
116 #[must_use]
118 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119 self.tx_manager.as_ref()
120 }
121
122 #[must_use]
124 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125 self.factorized_execution = enabled;
126 self
127 }
128
129 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133 match op {
134 LogicalOperator::Expand(expand) => {
135 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138 if is_single_hop {
139 let (inner_count, base) = Self::count_expand_chain(&expand.input);
140 (inner_count + 1, base)
141 } else {
142 (0, op)
144 }
145 }
146 _ => (0, op),
147 }
148 }
149
150 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154 let mut chain = Vec::new();
155 let mut current = op;
156
157 while let LogicalOperator::Expand(expand) = current {
158 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160 if !is_single_hop {
161 break;
162 }
163 chain.push(expand);
164 current = &expand.input;
165 }
166
167 chain.reverse();
169 chain
170 }
171
172 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179 Ok(PhysicalPlan {
180 operator,
181 columns,
182 adaptive_context: None,
183 })
184 }
185
186 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197 let mut adaptive_context = AdaptiveContext::new();
199 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201 Ok(PhysicalPlan {
202 operator,
203 columns,
204 adaptive_context: Some(adaptive_context),
205 })
206 }
207
208 fn collect_cardinality_estimates(
210 &self,
211 op: &LogicalOperator,
212 ctx: &mut AdaptiveContext,
213 depth: usize,
214 ) {
215 match op {
216 LogicalOperator::NodeScan(scan) => {
217 let estimate = if let Some(label) = &scan.label {
219 self.store.nodes_by_label(label).len() as f64
220 } else {
221 self.store.node_count() as f64
222 };
223 let id = format!("scan_{}", scan.variable);
224 ctx.set_estimate(&id, estimate);
225
226 if let Some(input) = &scan.input {
228 self.collect_cardinality_estimates(input, ctx, depth + 1);
229 }
230 }
231 LogicalOperator::Filter(filter) => {
232 let input_estimate = self.estimate_cardinality(&filter.input);
234 let estimate = input_estimate * 0.3;
235 let id = format!("filter_{depth}");
236 ctx.set_estimate(&id, estimate);
237
238 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239 }
240 LogicalOperator::Expand(expand) => {
241 let input_estimate = self.estimate_cardinality(&expand.input);
243 let stats = self.store.statistics();
244 let avg_degree = self.estimate_expand_degree(&stats, expand);
245 let estimate = input_estimate * avg_degree;
246 let id = format!("expand_{}", expand.to_variable);
247 ctx.set_estimate(&id, estimate);
248
249 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250 }
251 LogicalOperator::Join(join) => {
252 let left_est = self.estimate_cardinality(&join.left);
254 let right_est = self.estimate_cardinality(&join.right);
255 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
257 ctx.set_estimate(&id, estimate);
258
259 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261 }
262 LogicalOperator::Aggregate(agg) => {
263 let input_estimate = self.estimate_cardinality(&agg.input);
265 let estimate = if agg.group_by.is_empty() {
266 1.0 } else {
268 (input_estimate * 0.1).max(1.0) };
270 let id = format!("aggregate_{depth}");
271 ctx.set_estimate(&id, estimate);
272
273 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274 }
275 LogicalOperator::Distinct(distinct) => {
276 let input_estimate = self.estimate_cardinality(&distinct.input);
277 let estimate = (input_estimate * 0.5).max(1.0);
278 let id = format!("distinct_{depth}");
279 ctx.set_estimate(&id, estimate);
280
281 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282 }
283 LogicalOperator::Return(ret) => {
284 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285 }
286 LogicalOperator::Limit(limit) => {
287 let input_estimate = self.estimate_cardinality(&limit.input);
288 let estimate = (input_estimate).min(limit.count as f64);
289 let id = format!("limit_{depth}");
290 ctx.set_estimate(&id, estimate);
291
292 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293 }
294 LogicalOperator::Skip(skip) => {
295 let input_estimate = self.estimate_cardinality(&skip.input);
296 let estimate = (input_estimate - skip.count as f64).max(0.0);
297 let id = format!("skip_{depth}");
298 ctx.set_estimate(&id, estimate);
299
300 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301 }
302 LogicalOperator::Sort(sort) => {
303 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305 }
306 LogicalOperator::Union(union) => {
307 let estimate: f64 = union
308 .inputs
309 .iter()
310 .map(|input| self.estimate_cardinality(input))
311 .sum();
312 let id = format!("union_{depth}");
313 ctx.set_estimate(&id, estimate);
314
315 for input in &union.inputs {
316 self.collect_cardinality_estimates(input, ctx, depth + 1);
317 }
318 }
319 _ => {
320 }
322 }
323 }
324
325 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327 match op {
328 LogicalOperator::NodeScan(scan) => {
329 if let Some(label) = &scan.label {
330 self.store.nodes_by_label(label).len() as f64
331 } else {
332 self.store.node_count() as f64
333 }
334 }
335 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336 LogicalOperator::Expand(expand) => {
337 let stats = self.store.statistics();
338 let avg_degree = self.estimate_expand_degree(&stats, expand);
339 self.estimate_cardinality(&expand.input) * avg_degree
340 }
341 LogicalOperator::Join(join) => {
342 let left = self.estimate_cardinality(&join.left);
343 let right = self.estimate_cardinality(&join.right);
344 (left * right).sqrt()
345 }
346 LogicalOperator::Aggregate(agg) => {
347 if agg.group_by.is_empty() {
348 1.0
349 } else {
350 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351 }
352 }
353 LogicalOperator::Distinct(distinct) => {
354 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355 }
356 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357 LogicalOperator::Limit(limit) => self
358 .estimate_cardinality(&limit.input)
359 .min(limit.count as f64),
360 LogicalOperator::Skip(skip) => {
361 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362 }
363 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364 LogicalOperator::Union(union) => union
365 .inputs
366 .iter()
367 .map(|input| self.estimate_cardinality(input))
368 .sum(),
369 _ => 1000.0, }
371 }
372
373 fn estimate_expand_degree(
375 &self,
376 stats: &grafeo_core::statistics::Statistics,
377 expand: &ExpandOp,
378 ) -> f64 {
379 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380 if let Some(edge_type) = &expand.edge_type {
381 stats.estimate_avg_degree(edge_type, outgoing)
382 } else if stats.total_nodes > 0 {
383 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384 } else {
385 10.0 }
387 }
388
389 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391 match op {
392 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393 LogicalOperator::Expand(expand) => {
394 if self.factorized_execution {
396 let (chain_len, _base) = Self::count_expand_chain(op);
397 if chain_len >= 2 {
398 return self.plan_expand_chain(op);
400 }
401 }
402 self.plan_expand(expand)
403 }
404 LogicalOperator::Return(ret) => self.plan_return(ret),
405 LogicalOperator::Filter(filter) => self.plan_filter(filter),
406 LogicalOperator::Project(project) => self.plan_project(project),
407 LogicalOperator::Limit(limit) => self.plan_limit(limit),
408 LogicalOperator::Skip(skip) => self.plan_skip(skip),
409 LogicalOperator::Sort(sort) => self.plan_sort(sort),
410 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411 LogicalOperator::Join(join) => self.plan_join(join),
412 LogicalOperator::Union(union) => self.plan_union(union),
413 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421 LogicalOperator::Merge(merge) => self.plan_merge(merge),
422 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
427 LogicalOperator::VectorScan(_) => Err(Error::Internal(
428 "VectorScan requires vector-index feature".to_string(),
429 )),
430 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
431 "VectorJoin requires vector-index feature".to_string(),
432 )),
433 _ => Err(Error::Internal(format!(
434 "Unsupported operator: {:?}",
435 std::mem::discriminant(op)
436 ))),
437 }
438 }
439
440 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
442 let scan_op = if let Some(label) = &scan.label {
443 ScanOperator::with_label(Arc::clone(&self.store), label)
444 } else {
445 ScanOperator::new(Arc::clone(&self.store))
446 };
447
448 let scan_operator: Box<dyn Operator> =
450 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
451
452 if let Some(input) = &scan.input {
454 let (input_op, mut input_columns) = self.plan_operator(input)?;
455
456 let mut output_schema: Vec<LogicalType> =
458 input_columns.iter().map(|_| LogicalType::Any).collect();
459 output_schema.push(LogicalType::Node);
460
461 input_columns.push(scan.variable.clone());
463
464 let join_op = Box::new(NestedLoopJoinOperator::new(
466 input_op,
467 scan_operator,
468 None, PhysicalJoinType::Cross,
470 output_schema,
471 ));
472
473 Ok((join_op, input_columns))
474 } else {
475 let columns = vec![scan.variable.clone()];
476 Ok((scan_operator, columns))
477 }
478 }
479
480 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
482 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
484
485 let source_column = input_columns
487 .iter()
488 .position(|c| c == &expand.from_variable)
489 .ok_or_else(|| {
490 Error::Internal(format!(
491 "Source variable '{}' not found in input columns",
492 expand.from_variable
493 ))
494 })?;
495
496 let direction = match expand.direction {
498 ExpandDirection::Outgoing => Direction::Outgoing,
499 ExpandDirection::Incoming => Direction::Incoming,
500 ExpandDirection::Both => Direction::Both,
501 };
502
503 let is_variable_length =
505 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
506
507 let operator: Box<dyn Operator> = if is_variable_length {
508 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
511 Arc::clone(&self.store),
512 input_op,
513 source_column,
514 direction,
515 expand.edge_type.clone(),
516 expand.min_hops,
517 max_hops,
518 )
519 .with_tx_context(self.viewing_epoch, self.tx_id);
520
521 if expand.path_alias.is_some() {
523 expand_op = expand_op
524 .with_path_length_output()
525 .with_path_detail_output();
526 }
527
528 Box::new(expand_op)
529 } else {
530 let expand_op = ExpandOperator::new(
532 Arc::clone(&self.store),
533 input_op,
534 source_column,
535 direction,
536 expand.edge_type.clone(),
537 )
538 .with_tx_context(self.viewing_epoch, self.tx_id);
539 Box::new(expand_op)
540 };
541
542 let mut columns = input_columns;
545
546 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
548 let count = self.anon_edge_counter.get();
549 self.anon_edge_counter.set(count + 1);
550 format!("_anon_edge_{}", count)
551 });
552 columns.push(edge_col_name);
553
554 columns.push(expand.to_variable.clone());
555
556 if let Some(ref path_alias) = expand.path_alias {
558 columns.push(format!("_path_length_{}", path_alias));
559 columns.push(format!("_path_nodes_{}", path_alias));
560 columns.push(format!("_path_edges_{}", path_alias));
561 }
562
563 Ok((operator, columns))
564 }
565
566 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
574 let expands = Self::collect_expand_chain(op);
575 if expands.is_empty() {
576 return Err(Error::Internal("Empty expand chain".to_string()));
577 }
578
579 let first_expand = expands[0];
581 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
582
583 let mut columns = base_columns.clone();
584 let mut steps = Vec::new();
585
586 let mut is_first = true;
591
592 for expand in &expands {
593 let source_column = if is_first {
595 base_columns
597 .iter()
598 .position(|c| c == &expand.from_variable)
599 .ok_or_else(|| {
600 Error::Internal(format!(
601 "Source variable '{}' not found in base columns",
602 expand.from_variable
603 ))
604 })?
605 } else {
606 1
609 };
610
611 let direction = match expand.direction {
613 ExpandDirection::Outgoing => Direction::Outgoing,
614 ExpandDirection::Incoming => Direction::Incoming,
615 ExpandDirection::Both => Direction::Both,
616 };
617
618 steps.push(ExpandStep {
620 source_column,
621 direction,
622 edge_type: expand.edge_type.clone(),
623 });
624
625 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
627 let count = self.anon_edge_counter.get();
628 self.anon_edge_counter.set(count + 1);
629 format!("_anon_edge_{}", count)
630 });
631 columns.push(edge_col_name);
632 columns.push(expand.to_variable.clone());
633
634 is_first = false;
635 }
636
637 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
639
640 if let Some(tx_id) = self.tx_id {
641 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
642 } else {
643 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
644 }
645
646 Ok((Box::new(lazy_op), columns))
647 }
648
649 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
651 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
653
654 let variable_columns: HashMap<String, usize> = input_columns
656 .iter()
657 .enumerate()
658 .map(|(i, name)| (name.clone(), i))
659 .collect();
660
661 let columns: Vec<String> = ret
663 .items
664 .iter()
665 .map(|item| {
666 item.alias.clone().unwrap_or_else(|| {
667 expression_to_string(&item.expression)
669 })
670 })
671 .collect();
672
673 let needs_project = ret
675 .items
676 .iter()
677 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
678
679 if needs_project {
680 let mut projections = Vec::with_capacity(ret.items.len());
682 let mut output_types = Vec::with_capacity(ret.items.len());
683
684 for item in &ret.items {
685 match &item.expression {
686 LogicalExpression::Variable(name) => {
687 let col_idx = *variable_columns.get(name).ok_or_else(|| {
688 Error::Internal(format!("Variable '{}' not found in input", name))
689 })?;
690 projections.push(ProjectExpr::Column(col_idx));
691 if name.starts_with("_path_nodes_")
693 || name.starts_with("_path_edges_")
694 || name.starts_with("_path_length_")
695 {
696 output_types.push(LogicalType::Any);
697 } else {
698 output_types.push(LogicalType::Node);
699 }
700 }
701 LogicalExpression::Property { variable, property } => {
702 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
703 Error::Internal(format!("Variable '{}' not found in input", variable))
704 })?;
705 projections.push(ProjectExpr::PropertyAccess {
706 column: col_idx,
707 property: property.clone(),
708 });
709 output_types.push(LogicalType::Any);
711 }
712 LogicalExpression::Literal(value) => {
713 projections.push(ProjectExpr::Constant(value.clone()));
714 output_types.push(value_to_logical_type(value));
715 }
716 LogicalExpression::FunctionCall { name, args, .. } => {
717 match name.to_lowercase().as_str() {
719 "type" => {
720 if args.len() != 1 {
722 return Err(Error::Internal(
723 "type() requires exactly one argument".to_string(),
724 ));
725 }
726 if let LogicalExpression::Variable(var_name) = &args[0] {
727 let col_idx =
728 *variable_columns.get(var_name).ok_or_else(|| {
729 Error::Internal(format!(
730 "Variable '{}' not found in input",
731 var_name
732 ))
733 })?;
734 projections.push(ProjectExpr::EdgeType { column: col_idx });
735 output_types.push(LogicalType::String);
736 } else {
737 return Err(Error::Internal(
738 "type() argument must be a variable".to_string(),
739 ));
740 }
741 }
742 "length" => {
743 if args.len() != 1 {
746 return Err(Error::Internal(
747 "length() requires exactly one argument".to_string(),
748 ));
749 }
750 if let LogicalExpression::Variable(var_name) = &args[0] {
751 let col_idx =
752 *variable_columns.get(var_name).ok_or_else(|| {
753 Error::Internal(format!(
754 "Variable '{}' not found in input",
755 var_name
756 ))
757 })?;
758 projections.push(ProjectExpr::Column(col_idx));
760 output_types.push(LogicalType::Int64);
761 } else {
762 return Err(Error::Internal(
763 "length() argument must be a variable".to_string(),
764 ));
765 }
766 }
767 "nodes" | "edges" => {
768 if args.len() != 1 {
770 return Err(Error::Internal(format!(
771 "{}() requires exactly one argument",
772 name
773 )));
774 }
775 if let LogicalExpression::Variable(var_name) = &args[0] {
776 let col_idx =
777 *variable_columns.get(var_name).ok_or_else(|| {
778 Error::Internal(format!(
779 "Variable '{var_name}' not found in input",
780 ))
781 })?;
782 projections.push(ProjectExpr::Column(col_idx));
783 output_types.push(LogicalType::Any);
784 } else {
785 return Err(Error::Internal(format!(
786 "{}() argument must be a variable",
787 name
788 )));
789 }
790 }
791 _ => {
793 let filter_expr = self.convert_expression(&item.expression)?;
794 projections.push(ProjectExpr::Expression {
795 expr: filter_expr,
796 variable_columns: variable_columns.clone(),
797 });
798 output_types.push(LogicalType::Any);
799 }
800 }
801 }
802 LogicalExpression::Case { .. } => {
803 let filter_expr = self.convert_expression(&item.expression)?;
805 projections.push(ProjectExpr::Expression {
806 expr: filter_expr,
807 variable_columns: variable_columns.clone(),
808 });
809 output_types.push(LogicalType::Any);
811 }
812 _ => {
813 return Err(Error::Internal(format!(
814 "Unsupported RETURN expression: {:?}",
815 item.expression
816 )));
817 }
818 }
819 }
820
821 let operator = Box::new(ProjectOperator::with_store(
822 input_op,
823 projections,
824 output_types,
825 Arc::clone(&self.store),
826 ));
827
828 Ok((operator, columns))
829 } else {
830 let mut projections = Vec::with_capacity(ret.items.len());
833 let mut output_types = Vec::with_capacity(ret.items.len());
834
835 for item in &ret.items {
836 if let LogicalExpression::Variable(name) = &item.expression {
837 let col_idx = *variable_columns.get(name).ok_or_else(|| {
838 Error::Internal(format!("Variable '{}' not found in input", name))
839 })?;
840 projections.push(ProjectExpr::Column(col_idx));
841 output_types.push(LogicalType::Node);
842 }
843 }
844
845 if projections.len() == input_columns.len()
847 && projections
848 .iter()
849 .enumerate()
850 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
851 {
852 Ok((input_op, columns))
854 } else {
855 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
856 Ok((operator, columns))
857 }
858 }
859 }
860
861 fn plan_project(
863 &self,
864 project: &crate::query::plan::ProjectOp,
865 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
866 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
868 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
869 let single_row_op: Box<dyn Operator> = Box::new(
871 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
872 );
873 (single_row_op, Vec::new())
874 } else {
875 self.plan_operator(&project.input)?
876 };
877
878 let variable_columns: HashMap<String, usize> = input_columns
880 .iter()
881 .enumerate()
882 .map(|(i, name)| (name.clone(), i))
883 .collect();
884
885 let mut projections = Vec::with_capacity(project.projections.len());
887 let mut output_types = Vec::with_capacity(project.projections.len());
888 let mut output_columns = Vec::with_capacity(project.projections.len());
889
890 for projection in &project.projections {
891 let col_name = projection
893 .alias
894 .clone()
895 .unwrap_or_else(|| expression_to_string(&projection.expression));
896 output_columns.push(col_name);
897
898 match &projection.expression {
899 LogicalExpression::Variable(name) => {
900 let col_idx = *variable_columns.get(name).ok_or_else(|| {
901 Error::Internal(format!("Variable '{}' not found in input", name))
902 })?;
903 projections.push(ProjectExpr::Column(col_idx));
904 output_types.push(LogicalType::Node);
905 }
906 LogicalExpression::Property { variable, property } => {
907 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
908 Error::Internal(format!("Variable '{}' not found in input", variable))
909 })?;
910 projections.push(ProjectExpr::PropertyAccess {
911 column: col_idx,
912 property: property.clone(),
913 });
914 output_types.push(LogicalType::Any);
915 }
916 LogicalExpression::Literal(value) => {
917 projections.push(ProjectExpr::Constant(value.clone()));
918 output_types.push(value_to_logical_type(value));
919 }
920 _ => {
921 let filter_expr = self.convert_expression(&projection.expression)?;
923 projections.push(ProjectExpr::Expression {
924 expr: filter_expr,
925 variable_columns: variable_columns.clone(),
926 });
927 output_types.push(LogicalType::Any);
928 }
929 }
930 }
931
932 let operator = Box::new(ProjectOperator::with_store(
933 input_op,
934 projections,
935 output_types,
936 Arc::clone(&self.store),
937 ));
938
939 Ok((operator, output_columns))
940 }
941
942 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
948 if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
951 let (_, columns) = self.plan_operator(&filter.input)?;
953 let schema = self.derive_schema_from_columns(&columns);
954 let empty_op = Box::new(EmptyOperator::new(schema));
955 return Ok((empty_op, columns));
956 }
957
958 if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
960 return Ok(result);
961 }
962
963 if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
965 return Ok(result);
966 }
967
968 let (input_op, columns) = self.plan_operator(&filter.input)?;
970
971 let variable_columns: HashMap<String, usize> = columns
973 .iter()
974 .enumerate()
975 .map(|(i, name)| (name.clone(), i))
976 .collect();
977
978 let filter_expr = self.convert_expression(&filter.predicate)?;
980
981 let predicate =
983 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
984
985 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
987
988 Ok((operator, columns))
989 }
990
991 fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
998 use grafeo_core::graph::lpg::CompareOp;
999
1000 match predicate {
1001 LogicalExpression::Binary { left, op, right } => {
1002 match op {
1004 BinaryOp::And => {
1005 let left_result = self.check_zone_map_for_predicate(left);
1006 let right_result = self.check_zone_map_for_predicate(right);
1007
1008 return match (left_result, right_result) {
1009 (Some(false), _) | (_, Some(false)) => Some(false),
1011 (Some(true), Some(true)) => Some(true),
1013 _ => None,
1015 };
1016 }
1017 BinaryOp::Or => {
1018 let left_result = self.check_zone_map_for_predicate(left);
1019 let right_result = self.check_zone_map_for_predicate(right);
1020
1021 return match (left_result, right_result) {
1022 (Some(false), Some(false)) => Some(false),
1024 (Some(true), _) | (_, Some(true)) => Some(true),
1026 _ => None,
1028 };
1029 }
1030 _ => {}
1031 }
1032
1033 let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1035 (
1036 LogicalExpression::Property { property, .. },
1037 LogicalExpression::Literal(val),
1038 ) => {
1039 let cmp = match op {
1040 BinaryOp::Eq => CompareOp::Eq,
1041 BinaryOp::Ne => CompareOp::Ne,
1042 BinaryOp::Lt => CompareOp::Lt,
1043 BinaryOp::Le => CompareOp::Le,
1044 BinaryOp::Gt => CompareOp::Gt,
1045 BinaryOp::Ge => CompareOp::Ge,
1046 _ => return None,
1047 };
1048 (property.clone(), cmp, val.clone())
1049 }
1050 (
1051 LogicalExpression::Literal(val),
1052 LogicalExpression::Property { property, .. },
1053 ) => {
1054 let cmp = match op {
1056 BinaryOp::Eq => CompareOp::Eq,
1057 BinaryOp::Ne => CompareOp::Ne,
1058 BinaryOp::Lt => CompareOp::Gt, BinaryOp::Le => CompareOp::Ge,
1060 BinaryOp::Gt => CompareOp::Lt,
1061 BinaryOp::Ge => CompareOp::Le,
1062 _ => return None,
1063 };
1064 (property.clone(), cmp, val.clone())
1065 }
1066 _ => return None,
1067 };
1068
1069 let might_match =
1071 self.store
1072 .node_property_might_match(&property.into(), compare_op, &value);
1073
1074 Some(might_match)
1075 }
1076
1077 _ => None,
1078 }
1079 }
1080
1081 fn try_plan_filter_with_property_index(
1090 &self,
1091 filter: &FilterOp,
1092 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1093 let (scan_variable, scan_label) = match filter.input.as_ref() {
1095 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1096 (scan.variable.clone(), scan.label.clone())
1097 }
1098 _ => return Ok(None),
1099 };
1100
1101 let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1104
1105 if conditions.is_empty() {
1106 return Ok(None);
1107 }
1108
1109 let has_indexed_condition = conditions
1111 .iter()
1112 .any(|(prop, _)| self.store.has_property_index(prop));
1113
1114 if !has_indexed_condition {
1115 return Ok(None);
1116 }
1117
1118 let conditions_ref: Vec<(&str, Value)> = conditions
1120 .iter()
1121 .map(|(p, v)| (p.as_str(), v.clone()))
1122 .collect();
1123 let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1124
1125 if let Some(label) = &scan_label {
1127 let label_nodes: std::collections::HashSet<_> =
1128 self.store.nodes_by_label(label).into_iter().collect();
1129 matching_nodes.retain(|n| label_nodes.contains(n));
1130 }
1131
1132 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1134 let columns = vec![scan_variable];
1135
1136 Ok(Some((node_list_op, columns)))
1137 }
1138
1139 fn extract_equality_conditions(
1145 &self,
1146 predicate: &LogicalExpression,
1147 target_variable: &str,
1148 ) -> Vec<(String, Value)> {
1149 let mut conditions = Vec::new();
1150 self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1151 conditions
1152 }
1153
1154 fn collect_equality_conditions(
1156 &self,
1157 expr: &LogicalExpression,
1158 target_variable: &str,
1159 conditions: &mut Vec<(String, Value)>,
1160 ) {
1161 match expr {
1162 LogicalExpression::Binary {
1164 left,
1165 op: BinaryOp::And,
1166 right,
1167 } => {
1168 self.collect_equality_conditions(left, target_variable, conditions);
1169 self.collect_equality_conditions(right, target_variable, conditions);
1170 }
1171
1172 LogicalExpression::Binary {
1174 left,
1175 op: BinaryOp::Eq,
1176 right,
1177 } => {
1178 if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1179 && var == target_variable
1180 {
1181 conditions.push((prop, val));
1182 }
1183 }
1184
1185 _ => {}
1186 }
1187 }
1188
1189 fn extract_property_equality(
1191 &self,
1192 left: &LogicalExpression,
1193 right: &LogicalExpression,
1194 ) -> Option<(String, String, Value)> {
1195 match (left, right) {
1196 (
1197 LogicalExpression::Property { variable, property },
1198 LogicalExpression::Literal(val),
1199 ) => Some((variable.clone(), property.clone(), val.clone())),
1200 (
1201 LogicalExpression::Literal(val),
1202 LogicalExpression::Property { variable, property },
1203 ) => Some((variable.clone(), property.clone(), val.clone())),
1204 _ => None,
1205 }
1206 }
1207
1208 fn try_plan_filter_with_range_index(
1221 &self,
1222 filter: &FilterOp,
1223 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1224 let (scan_variable, scan_label) = match filter.input.as_ref() {
1226 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1227 (scan.variable.clone(), scan.label.clone())
1228 }
1229 _ => return Ok(None),
1230 };
1231
1232 if let Some((variable, property, min, max, min_inc, max_inc)) =
1234 self.extract_between_predicate(&filter.predicate)
1235 && variable == scan_variable
1236 {
1237 return self.plan_range_filter(
1238 &scan_variable,
1239 &scan_label,
1240 &property,
1241 RangeBounds {
1242 min: Some(&min),
1243 max: Some(&max),
1244 min_inclusive: min_inc,
1245 max_inclusive: max_inc,
1246 },
1247 );
1248 }
1249
1250 if let Some((variable, property, op, value)) =
1252 self.extract_range_predicate(&filter.predicate)
1253 && variable == scan_variable
1254 {
1255 let (min, max, min_inc, max_inc) = match op {
1256 BinaryOp::Lt => (None, Some(value), false, false),
1257 BinaryOp::Le => (None, Some(value), false, true),
1258 BinaryOp::Gt => (Some(value), None, false, false),
1259 BinaryOp::Ge => (Some(value), None, true, false),
1260 _ => return Ok(None),
1261 };
1262 return self.plan_range_filter(
1263 &scan_variable,
1264 &scan_label,
1265 &property,
1266 RangeBounds {
1267 min: min.as_ref(),
1268 max: max.as_ref(),
1269 min_inclusive: min_inc,
1270 max_inclusive: max_inc,
1271 },
1272 );
1273 }
1274
1275 Ok(None)
1276 }
1277
1278 fn plan_range_filter(
1280 &self,
1281 scan_variable: &str,
1282 scan_label: &Option<String>,
1283 property: &str,
1284 bounds: RangeBounds<'_>,
1285 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1286 let mut matching_nodes = self.store.find_nodes_in_range(
1288 property,
1289 bounds.min,
1290 bounds.max,
1291 bounds.min_inclusive,
1292 bounds.max_inclusive,
1293 );
1294
1295 if let Some(label) = scan_label {
1297 let label_nodes: std::collections::HashSet<_> =
1298 self.store.nodes_by_label(label).into_iter().collect();
1299 matching_nodes.retain(|n| label_nodes.contains(n));
1300 }
1301
1302 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1304 let columns = vec![scan_variable.to_string()];
1305
1306 Ok(Some((node_list_op, columns)))
1307 }
1308
1309 fn extract_range_predicate(
1313 &self,
1314 predicate: &LogicalExpression,
1315 ) -> Option<(String, String, BinaryOp, Value)> {
1316 match predicate {
1317 LogicalExpression::Binary { left, op, right } => {
1318 match op {
1319 BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1320 if let (
1322 LogicalExpression::Property { variable, property },
1323 LogicalExpression::Literal(val),
1324 ) = (left.as_ref(), right.as_ref())
1325 {
1326 return Some((variable.clone(), property.clone(), *op, val.clone()));
1327 }
1328
1329 if let (
1331 LogicalExpression::Literal(val),
1332 LogicalExpression::Property { variable, property },
1333 ) = (left.as_ref(), right.as_ref())
1334 {
1335 let flipped_op = match op {
1336 BinaryOp::Lt => BinaryOp::Gt,
1337 BinaryOp::Le => BinaryOp::Ge,
1338 BinaryOp::Gt => BinaryOp::Lt,
1339 BinaryOp::Ge => BinaryOp::Le,
1340 _ => return None,
1341 };
1342 return Some((
1343 variable.clone(),
1344 property.clone(),
1345 flipped_op,
1346 val.clone(),
1347 ));
1348 }
1349 }
1350 _ => {}
1351 }
1352 }
1353 _ => {}
1354 }
1355 None
1356 }
1357
1358 fn extract_between_predicate(
1366 &self,
1367 predicate: &LogicalExpression,
1368 ) -> Option<(String, String, Value, Value, bool, bool)> {
1369 let (left, right) = match predicate {
1371 LogicalExpression::Binary {
1372 left,
1373 op: BinaryOp::And,
1374 right,
1375 } => (left.as_ref(), right.as_ref()),
1376 _ => return None,
1377 };
1378
1379 let left_range = self.extract_range_predicate(left);
1381 let right_range = self.extract_range_predicate(right);
1382
1383 let (left_var, left_prop, left_op, left_val) = left_range?;
1384 let (right_var, right_prop, right_op, right_val) = right_range?;
1385
1386 if left_var != right_var || left_prop != right_prop {
1388 return None;
1389 }
1390
1391 let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1393 (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1395 (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1397 (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1399 (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1401 (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1403 (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1405 (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1407 (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1409 _ => return None,
1410 };
1411
1412 Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1413 }
1414
1415 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1417 let (input_op, columns) = self.plan_operator(&limit.input)?;
1418 let output_schema = self.derive_schema_from_columns(&columns);
1419 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1420 Ok((operator, columns))
1421 }
1422
1423 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1425 let (input_op, columns) = self.plan_operator(&skip.input)?;
1426 let output_schema = self.derive_schema_from_columns(&columns);
1427 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1428 Ok((operator, columns))
1429 }
1430
1431 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1433 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1434
1435 let mut variable_columns: HashMap<String, usize> = input_columns
1437 .iter()
1438 .enumerate()
1439 .map(|(i, name)| (name.clone(), i))
1440 .collect();
1441
1442 let mut property_projections: Vec<(String, String, String)> = Vec::new();
1444 let mut next_col_idx = input_columns.len();
1445
1446 for key in &sort.keys {
1447 if let LogicalExpression::Property { variable, property } = &key.expression {
1448 let col_name = format!("{}_{}", variable, property);
1449 if !variable_columns.contains_key(&col_name) {
1450 property_projections.push((
1451 variable.clone(),
1452 property.clone(),
1453 col_name.clone(),
1454 ));
1455 variable_columns.insert(col_name, next_col_idx);
1456 next_col_idx += 1;
1457 }
1458 }
1459 }
1460
1461 let mut output_columns = input_columns.clone();
1463
1464 if !property_projections.is_empty() {
1466 let mut projections = Vec::new();
1467 let mut output_types = Vec::new();
1468
1469 for (i, _) in input_columns.iter().enumerate() {
1472 projections.push(ProjectExpr::Column(i));
1473 output_types.push(LogicalType::Node);
1474 }
1475
1476 for (variable, property, col_name) in &property_projections {
1478 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1479 Error::Internal(format!(
1480 "Variable '{}' not found for ORDER BY property projection",
1481 variable
1482 ))
1483 })?;
1484 projections.push(ProjectExpr::PropertyAccess {
1485 column: source_col,
1486 property: property.clone(),
1487 });
1488 output_types.push(LogicalType::Any);
1489 output_columns.push(col_name.clone());
1490 }
1491
1492 input_op = Box::new(ProjectOperator::with_store(
1493 input_op,
1494 projections,
1495 output_types,
1496 Arc::clone(&self.store),
1497 ));
1498 }
1499
1500 let physical_keys: Vec<PhysicalSortKey> = sort
1502 .keys
1503 .iter()
1504 .map(|key| {
1505 let col_idx = self
1506 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1507 Ok(PhysicalSortKey {
1508 column: col_idx,
1509 direction: match key.order {
1510 SortOrder::Ascending => SortDirection::Ascending,
1511 SortOrder::Descending => SortDirection::Descending,
1512 },
1513 null_order: NullOrder::NullsLast,
1514 })
1515 })
1516 .collect::<Result<Vec<_>>>()?;
1517
1518 let output_schema = self.derive_schema_from_columns(&output_columns);
1519 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1520 Ok((operator, output_columns))
1521 }
1522
1523 fn resolve_sort_expression_with_properties(
1525 &self,
1526 expr: &LogicalExpression,
1527 variable_columns: &HashMap<String, usize>,
1528 ) -> Result<usize> {
1529 match expr {
1530 LogicalExpression::Variable(name) => {
1531 variable_columns.get(name).copied().ok_or_else(|| {
1532 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1533 })
1534 }
1535 LogicalExpression::Property { variable, property } => {
1536 let col_name = format!("{}_{}", variable, property);
1538 variable_columns.get(&col_name).copied().ok_or_else(|| {
1539 Error::Internal(format!(
1540 "Property column '{}' not found for ORDER BY (from {}.{})",
1541 col_name, variable, property
1542 ))
1543 })
1544 }
1545 _ => Err(Error::Internal(format!(
1546 "Unsupported ORDER BY expression: {:?}",
1547 expr
1548 ))),
1549 }
1550 }
1551
1552 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1554 columns.iter().map(|_| LogicalType::Any).collect()
1555 }
1556
1557 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1559 if self.factorized_execution
1566 && agg.group_by.is_empty()
1567 && Self::count_expand_chain(&agg.input).0 >= 2
1568 && self.is_simple_aggregate(agg)
1569 && let Ok((op, cols)) = self.plan_factorized_aggregate(agg)
1570 {
1571 return Ok((op, cols));
1572 }
1573 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1576
1577 let mut variable_columns: HashMap<String, usize> = input_columns
1579 .iter()
1580 .enumerate()
1581 .map(|(i, name)| (name.clone(), i))
1582 .collect();
1583
1584 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1587
1588 for expr in &agg.group_by {
1590 if let LogicalExpression::Property { variable, property } = expr {
1591 let col_name = format!("{}_{}", variable, property);
1592 if !variable_columns.contains_key(&col_name) {
1593 property_projections.push((
1594 variable.clone(),
1595 property.clone(),
1596 col_name.clone(),
1597 ));
1598 variable_columns.insert(col_name, next_col_idx);
1599 next_col_idx += 1;
1600 }
1601 }
1602 }
1603
1604 for agg_expr in &agg.aggregates {
1606 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1607 let col_name = format!("{}_{}", variable, property);
1608 if !variable_columns.contains_key(&col_name) {
1609 property_projections.push((
1610 variable.clone(),
1611 property.clone(),
1612 col_name.clone(),
1613 ));
1614 variable_columns.insert(col_name, next_col_idx);
1615 next_col_idx += 1;
1616 }
1617 }
1618 }
1619
1620 if !property_projections.is_empty() {
1622 let mut projections = Vec::new();
1623 let mut output_types = Vec::new();
1624
1625 for (i, _) in input_columns.iter().enumerate() {
1628 projections.push(ProjectExpr::Column(i));
1629 output_types.push(LogicalType::Node);
1630 }
1631
1632 for (variable, property, _col_name) in &property_projections {
1634 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1635 Error::Internal(format!(
1636 "Variable '{}' not found for property projection",
1637 variable
1638 ))
1639 })?;
1640 projections.push(ProjectExpr::PropertyAccess {
1641 column: source_col,
1642 property: property.clone(),
1643 });
1644 output_types.push(LogicalType::Any); }
1646
1647 input_op = Box::new(ProjectOperator::with_store(
1648 input_op,
1649 projections,
1650 output_types,
1651 Arc::clone(&self.store),
1652 ));
1653 }
1654
1655 let group_columns: Vec<usize> = agg
1657 .group_by
1658 .iter()
1659 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1660 .collect::<Result<Vec<_>>>()?;
1661
1662 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1664 .aggregates
1665 .iter()
1666 .map(|agg_expr| {
1667 let column = agg_expr
1668 .expression
1669 .as_ref()
1670 .map(|e| {
1671 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1672 })
1673 .transpose()?;
1674
1675 Ok(PhysicalAggregateExpr {
1676 function: convert_aggregate_function(agg_expr.function),
1677 column,
1678 distinct: agg_expr.distinct,
1679 alias: agg_expr.alias.clone(),
1680 percentile: agg_expr.percentile,
1681 })
1682 })
1683 .collect::<Result<Vec<_>>>()?;
1684
1685 let mut output_schema = Vec::new();
1687 let mut output_columns = Vec::new();
1688
1689 for expr in &agg.group_by {
1691 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1693 }
1694
1695 for agg_expr in &agg.aggregates {
1697 let result_type = match agg_expr.function {
1698 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1699 LogicalType::Int64
1700 }
1701 LogicalAggregateFunction::Sum => LogicalType::Int64,
1702 LogicalAggregateFunction::Avg => LogicalType::Float64,
1703 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1704 LogicalType::Int64
1708 }
1709 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1712 | LogicalAggregateFunction::StdDevPop
1713 | LogicalAggregateFunction::PercentileDisc
1714 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1715 };
1716 output_schema.push(result_type);
1717 output_columns.push(
1718 agg_expr
1719 .alias
1720 .clone()
1721 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1722 );
1723 }
1724
1725 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1727 Box::new(SimpleAggregateOperator::new(
1728 input_op,
1729 physical_aggregates,
1730 output_schema,
1731 ))
1732 } else {
1733 Box::new(HashAggregateOperator::new(
1734 input_op,
1735 group_columns,
1736 physical_aggregates,
1737 output_schema,
1738 ))
1739 };
1740
1741 if let Some(having_expr) = &agg.having {
1743 let having_var_columns: HashMap<String, usize> = output_columns
1745 .iter()
1746 .enumerate()
1747 .map(|(i, name)| (name.clone(), i))
1748 .collect();
1749
1750 let filter_expr = self.convert_expression(having_expr)?;
1751 let predicate =
1752 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1753 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1754 }
1755
1756 Ok((operator, output_columns))
1757 }
1758
1759 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1765 agg.aggregates.iter().all(|agg_expr| {
1766 match agg_expr.function {
1767 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1768 agg_expr.expression.is_none()
1770 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1771 }
1772 LogicalAggregateFunction::Sum
1773 | LogicalAggregateFunction::Avg
1774 | LogicalAggregateFunction::Min
1775 | LogicalAggregateFunction::Max => {
1776 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1779 }
1780 _ => false,
1782 }
1783 })
1784 }
1785
1786 fn plan_factorized_aggregate(
1790 &self,
1791 agg: &AggregateOp,
1792 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1793 let expands = Self::collect_expand_chain(&agg.input);
1795 if expands.is_empty() {
1796 return Err(Error::Internal(
1797 "Expected expand chain for factorized aggregate".to_string(),
1798 ));
1799 }
1800
1801 let first_expand = expands[0];
1803 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1804
1805 let mut columns = base_columns.clone();
1806 let mut steps = Vec::new();
1807 let mut is_first = true;
1808
1809 for expand in &expands {
1810 let source_column = if is_first {
1812 base_columns
1813 .iter()
1814 .position(|c| c == &expand.from_variable)
1815 .ok_or_else(|| {
1816 Error::Internal(format!(
1817 "Source variable '{}' not found in base columns",
1818 expand.from_variable
1819 ))
1820 })?
1821 } else {
1822 1 };
1824
1825 let direction = match expand.direction {
1826 ExpandDirection::Outgoing => Direction::Outgoing,
1827 ExpandDirection::Incoming => Direction::Incoming,
1828 ExpandDirection::Both => Direction::Both,
1829 };
1830
1831 steps.push(ExpandStep {
1832 source_column,
1833 direction,
1834 edge_type: expand.edge_type.clone(),
1835 });
1836
1837 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1838 let count = self.anon_edge_counter.get();
1839 self.anon_edge_counter.set(count + 1);
1840 format!("_anon_edge_{}", count)
1841 });
1842 columns.push(edge_col_name);
1843 columns.push(expand.to_variable.clone());
1844
1845 is_first = false;
1846 }
1847
1848 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1850
1851 if let Some(tx_id) = self.tx_id {
1852 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1853 } else {
1854 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1855 }
1856
1857 let factorized_aggs: Vec<FactorizedAggregate> = agg
1859 .aggregates
1860 .iter()
1861 .map(|agg_expr| {
1862 match agg_expr.function {
1863 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1864 if agg_expr.expression.is_none() {
1866 FactorizedAggregate::count()
1867 } else {
1868 FactorizedAggregate::count_column(1) }
1872 }
1873 LogicalAggregateFunction::Sum => {
1874 FactorizedAggregate::sum(1)
1876 }
1877 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1878 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1879 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1880 _ => {
1881 FactorizedAggregate::count()
1883 }
1884 }
1885 })
1886 .collect();
1887
1888 let output_columns: Vec<String> = agg
1890 .aggregates
1891 .iter()
1892 .map(|agg_expr| {
1893 agg_expr
1894 .alias
1895 .clone()
1896 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1897 })
1898 .collect();
1899
1900 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1902
1903 Ok((Box::new(factorized_agg_op), output_columns))
1904 }
1905
1906 #[allow(dead_code)]
1908 fn resolve_expression_to_column(
1909 &self,
1910 expr: &LogicalExpression,
1911 variable_columns: &HashMap<String, usize>,
1912 ) -> Result<usize> {
1913 match expr {
1914 LogicalExpression::Variable(name) => variable_columns
1915 .get(name)
1916 .copied()
1917 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1918 LogicalExpression::Property { variable, .. } => variable_columns
1919 .get(variable)
1920 .copied()
1921 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1922 _ => Err(Error::Internal(format!(
1923 "Cannot resolve expression to column: {:?}",
1924 expr
1925 ))),
1926 }
1927 }
1928
1929 fn resolve_expression_to_column_with_properties(
1933 &self,
1934 expr: &LogicalExpression,
1935 variable_columns: &HashMap<String, usize>,
1936 ) -> Result<usize> {
1937 match expr {
1938 LogicalExpression::Variable(name) => variable_columns
1939 .get(name)
1940 .copied()
1941 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1942 LogicalExpression::Property { variable, property } => {
1943 let col_name = format!("{}_{}", variable, property);
1945 variable_columns.get(&col_name).copied().ok_or_else(|| {
1946 Error::Internal(format!(
1947 "Property column '{}' not found (from {}.{})",
1948 col_name, variable, property
1949 ))
1950 })
1951 }
1952 _ => Err(Error::Internal(format!(
1953 "Cannot resolve expression to column: {:?}",
1954 expr
1955 ))),
1956 }
1957 }
1958
1959 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1961 match expr {
1962 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1963 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1964 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1965 variable: variable.clone(),
1966 property: property.clone(),
1967 }),
1968 LogicalExpression::Binary { left, op, right } => {
1969 let left_expr = self.convert_expression(left)?;
1970 let right_expr = self.convert_expression(right)?;
1971 let filter_op = convert_binary_op(*op)?;
1972 Ok(FilterExpression::Binary {
1973 left: Box::new(left_expr),
1974 op: filter_op,
1975 right: Box::new(right_expr),
1976 })
1977 }
1978 LogicalExpression::Unary { op, operand } => {
1979 let operand_expr = self.convert_expression(operand)?;
1980 let filter_op = convert_unary_op(*op)?;
1981 Ok(FilterExpression::Unary {
1982 op: filter_op,
1983 operand: Box::new(operand_expr),
1984 })
1985 }
1986 LogicalExpression::FunctionCall { name, args, .. } => {
1987 let filter_args: Vec<FilterExpression> = args
1988 .iter()
1989 .map(|a| self.convert_expression(a))
1990 .collect::<Result<Vec<_>>>()?;
1991 Ok(FilterExpression::FunctionCall {
1992 name: name.clone(),
1993 args: filter_args,
1994 })
1995 }
1996 LogicalExpression::Case {
1997 operand,
1998 when_clauses,
1999 else_clause,
2000 } => {
2001 let filter_operand = operand
2002 .as_ref()
2003 .map(|e| self.convert_expression(e))
2004 .transpose()?
2005 .map(Box::new);
2006 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2007 .iter()
2008 .map(|(cond, result)| {
2009 Ok((
2010 self.convert_expression(cond)?,
2011 self.convert_expression(result)?,
2012 ))
2013 })
2014 .collect::<Result<Vec<_>>>()?;
2015 let filter_else = else_clause
2016 .as_ref()
2017 .map(|e| self.convert_expression(e))
2018 .transpose()?
2019 .map(Box::new);
2020 Ok(FilterExpression::Case {
2021 operand: filter_operand,
2022 when_clauses: filter_when_clauses,
2023 else_clause: filter_else,
2024 })
2025 }
2026 LogicalExpression::List(items) => {
2027 let filter_items: Vec<FilterExpression> = items
2028 .iter()
2029 .map(|item| self.convert_expression(item))
2030 .collect::<Result<Vec<_>>>()?;
2031 Ok(FilterExpression::List(filter_items))
2032 }
2033 LogicalExpression::Map(pairs) => {
2034 let filter_pairs: Vec<(String, FilterExpression)> = pairs
2035 .iter()
2036 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2037 .collect::<Result<Vec<_>>>()?;
2038 Ok(FilterExpression::Map(filter_pairs))
2039 }
2040 LogicalExpression::IndexAccess { base, index } => {
2041 let base_expr = self.convert_expression(base)?;
2042 let index_expr = self.convert_expression(index)?;
2043 Ok(FilterExpression::IndexAccess {
2044 base: Box::new(base_expr),
2045 index: Box::new(index_expr),
2046 })
2047 }
2048 LogicalExpression::SliceAccess { base, start, end } => {
2049 let base_expr = self.convert_expression(base)?;
2050 let start_expr = start
2051 .as_ref()
2052 .map(|s| self.convert_expression(s))
2053 .transpose()?
2054 .map(Box::new);
2055 let end_expr = end
2056 .as_ref()
2057 .map(|e| self.convert_expression(e))
2058 .transpose()?
2059 .map(Box::new);
2060 Ok(FilterExpression::SliceAccess {
2061 base: Box::new(base_expr),
2062 start: start_expr,
2063 end: end_expr,
2064 })
2065 }
2066 LogicalExpression::Parameter(_) => Err(Error::Internal(
2067 "Parameters not yet supported in filters".to_string(),
2068 )),
2069 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2070 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2071 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2072 LogicalExpression::ListComprehension {
2073 variable,
2074 list_expr,
2075 filter_expr,
2076 map_expr,
2077 } => {
2078 let list = self.convert_expression(list_expr)?;
2079 let filter = filter_expr
2080 .as_ref()
2081 .map(|f| self.convert_expression(f))
2082 .transpose()?
2083 .map(Box::new);
2084 let map = self.convert_expression(map_expr)?;
2085 Ok(FilterExpression::ListComprehension {
2086 variable: variable.clone(),
2087 list_expr: Box::new(list),
2088 filter_expr: filter,
2089 map_expr: Box::new(map),
2090 })
2091 }
2092 LogicalExpression::ExistsSubquery(subplan) => {
2093 let (start_var, direction, edge_type, end_labels) =
2096 self.extract_exists_pattern(subplan)?;
2097
2098 Ok(FilterExpression::ExistsSubquery {
2099 start_var,
2100 direction,
2101 edge_type,
2102 end_labels,
2103 min_hops: None,
2104 max_hops: None,
2105 })
2106 }
2107 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2108 "COUNT subqueries not yet supported".to_string(),
2109 )),
2110 }
2111 }
2112
2113 fn extract_exists_pattern(
2116 &self,
2117 subplan: &LogicalOperator,
2118 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2119 match subplan {
2120 LogicalOperator::Expand(expand) => {
2121 let end_labels = self.extract_end_labels_from_expand(expand);
2123 let direction = match expand.direction {
2124 ExpandDirection::Outgoing => Direction::Outgoing,
2125 ExpandDirection::Incoming => Direction::Incoming,
2126 ExpandDirection::Both => Direction::Both,
2127 };
2128 Ok((
2129 expand.from_variable.clone(),
2130 direction,
2131 expand.edge_type.clone(),
2132 end_labels,
2133 ))
2134 }
2135 LogicalOperator::NodeScan(scan) => {
2136 if let Some(input) = &scan.input {
2137 self.extract_exists_pattern(input)
2138 } else {
2139 Err(Error::Internal(
2140 "EXISTS subquery must contain an edge pattern".to_string(),
2141 ))
2142 }
2143 }
2144 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2145 _ => Err(Error::Internal(
2146 "Unsupported EXISTS subquery pattern".to_string(),
2147 )),
2148 }
2149 }
2150
2151 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2153 match expand.input.as_ref() {
2155 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2156 _ => None,
2157 }
2158 }
2159
2160 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2162 let (left_op, left_columns) = self.plan_operator(&join.left)?;
2163 let (right_op, right_columns) = self.plan_operator(&join.right)?;
2164
2165 let mut columns = left_columns.clone();
2167 columns.extend(right_columns.clone());
2168
2169 let physical_join_type = match join.join_type {
2171 JoinType::Inner => PhysicalJoinType::Inner,
2172 JoinType::Left => PhysicalJoinType::Left,
2173 JoinType::Right => PhysicalJoinType::Right,
2174 JoinType::Full => PhysicalJoinType::Full,
2175 JoinType::Cross => PhysicalJoinType::Cross,
2176 JoinType::Semi => PhysicalJoinType::Semi,
2177 JoinType::Anti => PhysicalJoinType::Anti,
2178 };
2179
2180 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2182 (vec![], vec![])
2184 } else {
2185 join.conditions
2186 .iter()
2187 .filter_map(|cond| {
2188 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2190 let right_idx = self
2191 .expression_to_column(&cond.right, &right_columns)
2192 .ok()?;
2193 Some((left_idx, right_idx))
2194 })
2195 .unzip()
2196 };
2197
2198 let output_schema = self.derive_schema_from_columns(&columns);
2199
2200 let _ = LeapfrogJoinOperator::new; let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2208 left_op,
2209 right_op,
2210 probe_keys,
2211 build_keys,
2212 physical_join_type,
2213 output_schema,
2214 ));
2215
2216 Ok((operator, columns))
2217 }
2218
2219 #[allow(dead_code)]
2228 fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2229 let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2231 let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2232
2233 Self::collect_join_edges(
2235 &LogicalOperator::Join(join.clone()),
2236 &mut edges,
2237 &mut all_vars,
2238 );
2239
2240 if all_vars.len() < 3 {
2242 return false;
2243 }
2244
2245 Self::has_cycle(&edges, &all_vars)
2247 }
2248
2249 fn collect_join_edges(
2251 op: &LogicalOperator,
2252 edges: &mut HashMap<String, Vec<String>>,
2253 vars: &mut std::collections::HashSet<String>,
2254 ) {
2255 match op {
2256 LogicalOperator::Join(join) => {
2257 for cond in &join.conditions {
2259 if let (Some(left_var), Some(right_var)) = (
2260 Self::extract_join_variable(&cond.left),
2261 Self::extract_join_variable(&cond.right),
2262 ) && left_var != right_var
2263 {
2264 vars.insert(left_var.clone());
2265 vars.insert(right_var.clone());
2266
2267 edges
2269 .entry(left_var.clone())
2270 .or_default()
2271 .push(right_var.clone());
2272 edges.entry(right_var).or_default().push(left_var);
2273 }
2274 }
2275
2276 Self::collect_join_edges(&join.left, edges, vars);
2278 Self::collect_join_edges(&join.right, edges, vars);
2279 }
2280 LogicalOperator::Expand(expand) => {
2281 vars.insert(expand.from_variable.clone());
2283 vars.insert(expand.to_variable.clone());
2284
2285 edges
2286 .entry(expand.from_variable.clone())
2287 .or_default()
2288 .push(expand.to_variable.clone());
2289 edges
2290 .entry(expand.to_variable.clone())
2291 .or_default()
2292 .push(expand.from_variable.clone());
2293
2294 Self::collect_join_edges(&expand.input, edges, vars);
2295 }
2296 LogicalOperator::Filter(filter) => {
2297 Self::collect_join_edges(&filter.input, edges, vars);
2298 }
2299 LogicalOperator::NodeScan(scan) => {
2300 vars.insert(scan.variable.clone());
2301 }
2302 _ => {}
2303 }
2304 }
2305
2306 fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2308 match expr {
2309 LogicalExpression::Variable(v) => Some(v.clone()),
2310 LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2311 LogicalExpression::Id(v) => Some(v.clone()),
2312 _ => None,
2313 }
2314 }
2315
2316 fn has_cycle(
2320 edges: &HashMap<String, Vec<String>>,
2321 vars: &std::collections::HashSet<String>,
2322 ) -> bool {
2323 let mut color: HashMap<&String, u8> = HashMap::new();
2324
2325 for var in vars {
2326 color.insert(var, 0);
2327 }
2328
2329 for start in vars {
2330 if color[start] == 0 && Self::dfs_cycle(start, None, edges, &mut color) {
2331 return true;
2332 }
2333 }
2334
2335 false
2336 }
2337
2338 fn dfs_cycle(
2340 node: &String,
2341 parent: Option<&String>,
2342 edges: &HashMap<String, Vec<String>>,
2343 color: &mut HashMap<&String, u8>,
2344 ) -> bool {
2345 *color.get_mut(node).unwrap() = 1; if let Some(neighbors) = edges.get(node) {
2348 for neighbor in neighbors {
2349 if parent == Some(neighbor) {
2351 continue;
2352 }
2353
2354 if let Some(&c) = color.get(neighbor) {
2355 if c == 1 {
2356 return true;
2358 }
2359 if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2360 return true;
2361 }
2362 }
2363 }
2364 }
2365
2366 *color.get_mut(node).unwrap() = 2; false
2368 }
2369
2370 #[allow(dead_code)]
2372 fn count_relations(op: &LogicalOperator) -> usize {
2373 match op {
2374 LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2375 LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2376 LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2377 LogicalOperator::Join(j) => {
2378 Self::count_relations(&j.left) + Self::count_relations(&j.right)
2379 }
2380 _ => 0,
2381 }
2382 }
2383
2384 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2386 match expr {
2387 LogicalExpression::Variable(name) => columns
2388 .iter()
2389 .position(|c| c == name)
2390 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2391 _ => Err(Error::Internal(
2392 "Only variables supported in join conditions".to_string(),
2393 )),
2394 }
2395 }
2396
2397 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2399 if union.inputs.is_empty() {
2400 return Err(Error::Internal(
2401 "Union requires at least one input".to_string(),
2402 ));
2403 }
2404
2405 let mut inputs = Vec::with_capacity(union.inputs.len());
2406 let mut columns = Vec::new();
2407
2408 for (i, input) in union.inputs.iter().enumerate() {
2409 let (op, cols) = self.plan_operator(input)?;
2410 if i == 0 {
2411 columns = cols;
2412 }
2413 inputs.push(op);
2414 }
2415
2416 let output_schema = self.derive_schema_from_columns(&columns);
2417 let operator = Box::new(UnionOperator::new(inputs, output_schema));
2418
2419 Ok((operator, columns))
2420 }
2421
2422 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2424 let (input_op, columns) = self.plan_operator(&distinct.input)?;
2425 let output_schema = self.derive_schema_from_columns(&columns);
2426 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2427 Ok((operator, columns))
2428 }
2429
2430 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2432 let (input_op, mut columns) = if let Some(ref input) = create.input {
2434 let (op, cols) = self.plan_operator(input)?;
2435 (Some(op), cols)
2436 } else {
2437 (None, vec![])
2438 };
2439
2440 let output_column = columns.len();
2442 columns.push(create.variable.clone());
2443
2444 let properties: Vec<(String, PropertySource)> = create
2446 .properties
2447 .iter()
2448 .map(|(name, expr)| {
2449 let source = match Self::try_fold_expression(expr) {
2450 Some(value) => PropertySource::Constant(value),
2451 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2452 };
2453 (name.clone(), source)
2454 })
2455 .collect();
2456
2457 let output_schema = self.derive_schema_from_columns(&columns);
2458
2459 let operator = Box::new(
2460 CreateNodeOperator::new(
2461 Arc::clone(&self.store),
2462 input_op,
2463 create.labels.clone(),
2464 properties,
2465 output_schema,
2466 output_column,
2467 )
2468 .with_tx_context(self.viewing_epoch, self.tx_id),
2469 );
2470
2471 Ok((operator, columns))
2472 }
2473
2474 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2476 let (input_op, mut columns) = self.plan_operator(&create.input)?;
2477
2478 let from_column = columns
2480 .iter()
2481 .position(|c| c == &create.from_variable)
2482 .ok_or_else(|| {
2483 Error::Internal(format!(
2484 "Source variable '{}' not found",
2485 create.from_variable
2486 ))
2487 })?;
2488
2489 let to_column = columns
2490 .iter()
2491 .position(|c| c == &create.to_variable)
2492 .ok_or_else(|| {
2493 Error::Internal(format!(
2494 "Target variable '{}' not found",
2495 create.to_variable
2496 ))
2497 })?;
2498
2499 let output_column = create.variable.as_ref().map(|v| {
2501 let idx = columns.len();
2502 columns.push(v.clone());
2503 idx
2504 });
2505
2506 let properties: Vec<(String, PropertySource)> = create
2508 .properties
2509 .iter()
2510 .map(|(name, expr)| {
2511 let source = match Self::try_fold_expression(expr) {
2512 Some(value) => PropertySource::Constant(value),
2513 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2514 };
2515 (name.clone(), source)
2516 })
2517 .collect();
2518
2519 let output_schema = self.derive_schema_from_columns(&columns);
2520
2521 let mut operator = CreateEdgeOperator::new(
2522 Arc::clone(&self.store),
2523 input_op,
2524 from_column,
2525 to_column,
2526 create.edge_type.clone(),
2527 output_schema,
2528 )
2529 .with_properties(properties)
2530 .with_tx_context(self.viewing_epoch, self.tx_id);
2531
2532 if let Some(col) = output_column {
2533 operator = operator.with_output_column(col);
2534 }
2535
2536 let operator = Box::new(operator);
2537
2538 Ok((operator, columns))
2539 }
2540
2541 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2543 let (input_op, columns) = self.plan_operator(&delete.input)?;
2544
2545 let node_column = columns
2546 .iter()
2547 .position(|c| c == &delete.variable)
2548 .ok_or_else(|| {
2549 Error::Internal(format!(
2550 "Variable '{}' not found for delete",
2551 delete.variable
2552 ))
2553 })?;
2554
2555 let output_schema = vec![LogicalType::Int64];
2557 let output_columns = vec!["deleted_count".to_string()];
2558
2559 let operator = Box::new(
2560 DeleteNodeOperator::new(
2561 Arc::clone(&self.store),
2562 input_op,
2563 node_column,
2564 output_schema,
2565 delete.detach, )
2567 .with_tx_context(self.viewing_epoch, self.tx_id),
2568 );
2569
2570 Ok((operator, output_columns))
2571 }
2572
2573 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2575 let (input_op, columns) = self.plan_operator(&delete.input)?;
2576
2577 let edge_column = columns
2578 .iter()
2579 .position(|c| c == &delete.variable)
2580 .ok_or_else(|| {
2581 Error::Internal(format!(
2582 "Variable '{}' not found for delete",
2583 delete.variable
2584 ))
2585 })?;
2586
2587 let output_schema = vec![LogicalType::Int64];
2589 let output_columns = vec!["deleted_count".to_string()];
2590
2591 let operator = Box::new(
2592 DeleteEdgeOperator::new(
2593 Arc::clone(&self.store),
2594 input_op,
2595 edge_column,
2596 output_schema,
2597 )
2598 .with_tx_context(self.viewing_epoch, self.tx_id),
2599 );
2600
2601 Ok((operator, output_columns))
2602 }
2603
2604 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2606 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2607 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2608
2609 let mut columns = left_columns.clone();
2611 columns.extend(right_columns.clone());
2612
2613 let mut probe_keys = Vec::new();
2615 let mut build_keys = Vec::new();
2616
2617 for (right_idx, right_col) in right_columns.iter().enumerate() {
2618 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2619 probe_keys.push(left_idx);
2620 build_keys.push(right_idx);
2621 }
2622 }
2623
2624 let output_schema = self.derive_schema_from_columns(&columns);
2625
2626 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2627 left_op,
2628 right_op,
2629 probe_keys,
2630 build_keys,
2631 PhysicalJoinType::Left,
2632 output_schema,
2633 ));
2634
2635 Ok((operator, columns))
2636 }
2637
2638 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2640 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2641 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2642
2643 let columns = left_columns.clone();
2645
2646 let mut probe_keys = Vec::new();
2648 let mut build_keys = Vec::new();
2649
2650 for (right_idx, right_col) in right_columns.iter().enumerate() {
2651 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2652 probe_keys.push(left_idx);
2653 build_keys.push(right_idx);
2654 }
2655 }
2656
2657 let output_schema = self.derive_schema_from_columns(&columns);
2658
2659 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2660 left_op,
2661 right_op,
2662 probe_keys,
2663 build_keys,
2664 PhysicalJoinType::Anti,
2665 output_schema,
2666 ));
2667
2668 Ok((operator, columns))
2669 }
2670
2671 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2673 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2676 if matches!(&*unwind.input, LogicalOperator::Empty) {
2677 let literal_list = self.convert_expression(&unwind.expression)?;
2682
2683 let single_row_op: Box<dyn Operator> = Box::new(
2685 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2686 );
2687 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2688 single_row_op,
2689 vec![ProjectExpr::Expression {
2690 expr: literal_list,
2691 variable_columns: HashMap::new(),
2692 }],
2693 vec![LogicalType::Any],
2694 Arc::clone(&self.store),
2695 ));
2696
2697 (project_op, vec!["__list__".to_string()])
2698 } else {
2699 self.plan_operator(&unwind.input)?
2700 };
2701
2702 let list_col_idx = match &unwind.expression {
2708 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2709 LogicalExpression::Property { variable, .. } => {
2710 input_columns.iter().position(|c| c == variable)
2713 }
2714 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2715 None
2717 }
2718 _ => None,
2719 };
2720
2721 let mut columns = input_columns.clone();
2723 columns.push(unwind.variable.clone());
2724
2725 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2727 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2732
2733 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2734 input_op,
2735 col_idx,
2736 unwind.variable.clone(),
2737 output_schema,
2738 ));
2739
2740 Ok((operator, columns))
2741 }
2742
2743 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2745 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2747 Vec::new()
2748 } else {
2749 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2750 cols
2751 };
2752
2753 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2755 .match_properties
2756 .iter()
2757 .filter_map(|(name, expr)| {
2758 if let LogicalExpression::Literal(v) = expr {
2759 Some((name.clone(), v.clone()))
2760 } else {
2761 None }
2763 })
2764 .collect();
2765
2766 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2768 .on_create
2769 .iter()
2770 .filter_map(|(name, expr)| {
2771 if let LogicalExpression::Literal(v) = expr {
2772 Some((name.clone(), v.clone()))
2773 } else {
2774 None
2775 }
2776 })
2777 .collect();
2778
2779 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2781 .on_match
2782 .iter()
2783 .filter_map(|(name, expr)| {
2784 if let LogicalExpression::Literal(v) = expr {
2785 Some((name.clone(), v.clone()))
2786 } else {
2787 None
2788 }
2789 })
2790 .collect();
2791
2792 columns.push(merge.variable.clone());
2794
2795 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2796 Arc::clone(&self.store),
2797 merge.variable.clone(),
2798 merge.labels.clone(),
2799 match_properties,
2800 on_create_properties,
2801 on_match_properties,
2802 ));
2803
2804 Ok((operator, columns))
2805 }
2806
2807 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2809 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2811
2812 let source_column = columns
2814 .iter()
2815 .position(|c| c == &sp.source_var)
2816 .ok_or_else(|| {
2817 Error::Internal(format!(
2818 "Source variable '{}' not found for shortestPath",
2819 sp.source_var
2820 ))
2821 })?;
2822
2823 let target_column = columns
2824 .iter()
2825 .position(|c| c == &sp.target_var)
2826 .ok_or_else(|| {
2827 Error::Internal(format!(
2828 "Target variable '{}' not found for shortestPath",
2829 sp.target_var
2830 ))
2831 })?;
2832
2833 let direction = match sp.direction {
2835 ExpandDirection::Outgoing => Direction::Outgoing,
2836 ExpandDirection::Incoming => Direction::Incoming,
2837 ExpandDirection::Both => Direction::Both,
2838 };
2839
2840 let operator: Box<dyn Operator> = Box::new(
2842 ShortestPathOperator::new(
2843 Arc::clone(&self.store),
2844 input_op,
2845 source_column,
2846 target_column,
2847 sp.edge_type.clone(),
2848 direction,
2849 )
2850 .with_all_paths(sp.all_paths),
2851 );
2852
2853 columns.push(format!("_path_length_{}", sp.path_alias));
2856
2857 Ok((operator, columns))
2858 }
2859
2860 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2862 let (input_op, columns) = self.plan_operator(&add_label.input)?;
2863
2864 let node_column = columns
2866 .iter()
2867 .position(|c| c == &add_label.variable)
2868 .ok_or_else(|| {
2869 Error::Internal(format!(
2870 "Variable '{}' not found for ADD LABEL",
2871 add_label.variable
2872 ))
2873 })?;
2874
2875 let output_schema = vec![LogicalType::Int64];
2877 let output_columns = vec!["labels_added".to_string()];
2878
2879 let operator = Box::new(AddLabelOperator::new(
2880 Arc::clone(&self.store),
2881 input_op,
2882 node_column,
2883 add_label.labels.clone(),
2884 output_schema,
2885 ));
2886
2887 Ok((operator, output_columns))
2888 }
2889
2890 fn plan_remove_label(
2892 &self,
2893 remove_label: &RemoveLabelOp,
2894 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2895 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2896
2897 let node_column = columns
2899 .iter()
2900 .position(|c| c == &remove_label.variable)
2901 .ok_or_else(|| {
2902 Error::Internal(format!(
2903 "Variable '{}' not found for REMOVE LABEL",
2904 remove_label.variable
2905 ))
2906 })?;
2907
2908 let output_schema = vec![LogicalType::Int64];
2910 let output_columns = vec!["labels_removed".to_string()];
2911
2912 let operator = Box::new(RemoveLabelOperator::new(
2913 Arc::clone(&self.store),
2914 input_op,
2915 node_column,
2916 remove_label.labels.clone(),
2917 output_schema,
2918 ));
2919
2920 Ok((operator, output_columns))
2921 }
2922
2923 fn plan_set_property(
2925 &self,
2926 set_prop: &SetPropertyOp,
2927 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2928 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2929
2930 let entity_column = columns
2932 .iter()
2933 .position(|c| c == &set_prop.variable)
2934 .ok_or_else(|| {
2935 Error::Internal(format!(
2936 "Variable '{}' not found for SET",
2937 set_prop.variable
2938 ))
2939 })?;
2940
2941 let properties: Vec<(String, PropertySource)> = set_prop
2943 .properties
2944 .iter()
2945 .map(|(name, expr)| {
2946 let source = self.expression_to_property_source(expr, &columns)?;
2947 Ok((name.clone(), source))
2948 })
2949 .collect::<Result<Vec<_>>>()?;
2950
2951 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2953 let output_columns = columns.clone();
2954
2955 let operator = Box::new(SetPropertyOperator::new_for_node(
2957 Arc::clone(&self.store),
2958 input_op,
2959 entity_column,
2960 properties,
2961 output_schema,
2962 ));
2963
2964 Ok((operator, output_columns))
2965 }
2966
2967 fn expression_to_property_source(
2969 &self,
2970 expr: &LogicalExpression,
2971 columns: &[String],
2972 ) -> Result<PropertySource> {
2973 match expr {
2974 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2975 LogicalExpression::Variable(name) => {
2976 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2977 Error::Internal(format!("Variable '{}' not found for property source", name))
2978 })?;
2979 Ok(PropertySource::Column(col_idx))
2980 }
2981 LogicalExpression::Parameter(name) => {
2982 Ok(PropertySource::Constant(
2985 grafeo_common::types::Value::String(format!("${}", name).into()),
2986 ))
2987 }
2988 _ => {
2989 if let Some(value) = Self::try_fold_expression(expr) {
2990 Ok(PropertySource::Constant(value))
2991 } else {
2992 Err(Error::Internal(format!(
2993 "Unsupported expression type for property source: {:?}",
2994 expr
2995 )))
2996 }
2997 }
2998 }
2999 }
3000
3001 fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
3007 match expr {
3008 LogicalExpression::Literal(v) => Some(v.clone()),
3009 LogicalExpression::List(items) => {
3010 let values: Option<Vec<Value>> =
3011 items.iter().map(Self::try_fold_expression).collect();
3012 let values = values?;
3013 let all_numeric = !values.is_empty()
3015 && values
3016 .iter()
3017 .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
3018 if all_numeric {
3019 let floats: Vec<f32> = values
3020 .iter()
3021 .filter_map(|v| match v {
3022 Value::Float64(f) => Some(*f as f32),
3023 Value::Int64(i) => Some(*i as f32),
3024 _ => None,
3025 })
3026 .collect();
3027 Some(Value::Vector(floats.into()))
3028 } else {
3029 Some(Value::List(values.into()))
3030 }
3031 }
3032 LogicalExpression::FunctionCall { name, args, .. } => {
3033 match name.to_lowercase().as_str() {
3034 "vector" => {
3035 if args.len() != 1 {
3036 return None;
3037 }
3038 let val = Self::try_fold_expression(&args[0])?;
3039 match val {
3040 Value::List(items) => {
3041 let floats: Vec<f32> = items
3042 .iter()
3043 .filter_map(|v| match v {
3044 Value::Float64(f) => Some(*f as f32),
3045 Value::Int64(i) => Some(*i as f32),
3046 _ => None,
3047 })
3048 .collect();
3049 if floats.len() == items.len() {
3050 Some(Value::Vector(floats.into()))
3051 } else {
3052 None
3053 }
3054 }
3055 Value::Vector(v) => Some(Value::Vector(v)),
3057 _ => None,
3058 }
3059 }
3060 _ => None,
3061 }
3062 }
3063 _ => None,
3064 }
3065 }
3066}
3067
3068pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3070 match op {
3071 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3072 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3073 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3074 BinaryOp::Le => Ok(BinaryFilterOp::Le),
3075 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3076 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3077 BinaryOp::And => Ok(BinaryFilterOp::And),
3078 BinaryOp::Or => Ok(BinaryFilterOp::Or),
3079 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3080 BinaryOp::Add => Ok(BinaryFilterOp::Add),
3081 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3082 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3083 BinaryOp::Div => Ok(BinaryFilterOp::Div),
3084 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3085 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3086 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3087 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3088 BinaryOp::In => Ok(BinaryFilterOp::In),
3089 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3090 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3091 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3092 "Binary operator {:?} not yet supported in filters",
3093 op
3094 ))),
3095 }
3096}
3097
3098pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3100 match op {
3101 UnaryOp::Not => Ok(UnaryFilterOp::Not),
3102 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3103 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3104 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3105 }
3106}
3107
3108pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3110 match func {
3111 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3112 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3113 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3114 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3115 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3116 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3117 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3118 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3119 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3120 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3121 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3122 }
3123}
3124
3125pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3129 match expr {
3130 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3131 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3132 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3133 variable: variable.clone(),
3134 property: property.clone(),
3135 }),
3136 LogicalExpression::Binary { left, op, right } => {
3137 let left_expr = convert_filter_expression(left)?;
3138 let right_expr = convert_filter_expression(right)?;
3139 let filter_op = convert_binary_op(*op)?;
3140 Ok(FilterExpression::Binary {
3141 left: Box::new(left_expr),
3142 op: filter_op,
3143 right: Box::new(right_expr),
3144 })
3145 }
3146 LogicalExpression::Unary { op, operand } => {
3147 let operand_expr = convert_filter_expression(operand)?;
3148 let filter_op = convert_unary_op(*op)?;
3149 Ok(FilterExpression::Unary {
3150 op: filter_op,
3151 operand: Box::new(operand_expr),
3152 })
3153 }
3154 LogicalExpression::FunctionCall { name, args, .. } => {
3155 let filter_args: Vec<FilterExpression> = args
3156 .iter()
3157 .map(convert_filter_expression)
3158 .collect::<Result<Vec<_>>>()?;
3159 Ok(FilterExpression::FunctionCall {
3160 name: name.clone(),
3161 args: filter_args,
3162 })
3163 }
3164 LogicalExpression::Case {
3165 operand,
3166 when_clauses,
3167 else_clause,
3168 } => {
3169 let filter_operand = operand
3170 .as_ref()
3171 .map(|e| convert_filter_expression(e))
3172 .transpose()?
3173 .map(Box::new);
3174 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3175 .iter()
3176 .map(|(cond, result)| {
3177 Ok((
3178 convert_filter_expression(cond)?,
3179 convert_filter_expression(result)?,
3180 ))
3181 })
3182 .collect::<Result<Vec<_>>>()?;
3183 let filter_else = else_clause
3184 .as_ref()
3185 .map(|e| convert_filter_expression(e))
3186 .transpose()?
3187 .map(Box::new);
3188 Ok(FilterExpression::Case {
3189 operand: filter_operand,
3190 when_clauses: filter_when_clauses,
3191 else_clause: filter_else,
3192 })
3193 }
3194 LogicalExpression::List(items) => {
3195 let filter_items: Vec<FilterExpression> = items
3196 .iter()
3197 .map(convert_filter_expression)
3198 .collect::<Result<Vec<_>>>()?;
3199 Ok(FilterExpression::List(filter_items))
3200 }
3201 LogicalExpression::Map(pairs) => {
3202 let filter_pairs: Vec<(String, FilterExpression)> = pairs
3203 .iter()
3204 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3205 .collect::<Result<Vec<_>>>()?;
3206 Ok(FilterExpression::Map(filter_pairs))
3207 }
3208 LogicalExpression::IndexAccess { base, index } => {
3209 let base_expr = convert_filter_expression(base)?;
3210 let index_expr = convert_filter_expression(index)?;
3211 Ok(FilterExpression::IndexAccess {
3212 base: Box::new(base_expr),
3213 index: Box::new(index_expr),
3214 })
3215 }
3216 LogicalExpression::SliceAccess { base, start, end } => {
3217 let base_expr = convert_filter_expression(base)?;
3218 let start_expr = start
3219 .as_ref()
3220 .map(|s| convert_filter_expression(s))
3221 .transpose()?
3222 .map(Box::new);
3223 let end_expr = end
3224 .as_ref()
3225 .map(|e| convert_filter_expression(e))
3226 .transpose()?
3227 .map(Box::new);
3228 Ok(FilterExpression::SliceAccess {
3229 base: Box::new(base_expr),
3230 start: start_expr,
3231 end: end_expr,
3232 })
3233 }
3234 LogicalExpression::Parameter(_) => Err(Error::Internal(
3235 "Parameters not yet supported in filters".to_string(),
3236 )),
3237 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3238 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3239 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3240 LogicalExpression::ListComprehension {
3241 variable,
3242 list_expr,
3243 filter_expr,
3244 map_expr,
3245 } => {
3246 let list = convert_filter_expression(list_expr)?;
3247 let filter = filter_expr
3248 .as_ref()
3249 .map(|f| convert_filter_expression(f))
3250 .transpose()?
3251 .map(Box::new);
3252 let map = convert_filter_expression(map_expr)?;
3253 Ok(FilterExpression::ListComprehension {
3254 variable: variable.clone(),
3255 list_expr: Box::new(list),
3256 filter_expr: filter,
3257 map_expr: Box::new(map),
3258 })
3259 }
3260 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3261 Error::Internal("Subqueries not yet supported in filters".to_string()),
3262 ),
3263 }
3264}
3265
3266fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3268 use grafeo_common::types::Value;
3269 match value {
3270 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
3272 Value::Int64(_) => LogicalType::Int64,
3273 Value::Float64(_) => LogicalType::Float64,
3274 Value::String(_) => LogicalType::String,
3275 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
3277 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
3280 }
3281}
3282
3283fn expression_to_string(expr: &LogicalExpression) -> String {
3285 match expr {
3286 LogicalExpression::Variable(name) => name.clone(),
3287 LogicalExpression::Property { variable, property } => {
3288 format!("{variable}.{property}")
3289 }
3290 LogicalExpression::Literal(value) => format!("{value:?}"),
3291 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3292 _ => "expr".to_string(),
3293 }
3294}
3295
3296pub struct PhysicalPlan {
3298 pub operator: Box<dyn Operator>,
3300 pub columns: Vec<String>,
3302 pub adaptive_context: Option<AdaptiveContext>,
3308}
3309
3310impl PhysicalPlan {
3311 #[must_use]
3313 pub fn columns(&self) -> &[String] {
3314 &self.columns
3315 }
3316
3317 pub fn into_operator(self) -> Box<dyn Operator> {
3319 self.operator
3320 }
3321
3322 #[must_use]
3324 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3325 self.adaptive_context.as_ref()
3326 }
3327
3328 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3330 self.adaptive_context.take()
3331 }
3332}
3333
3334#[allow(dead_code)]
3338struct SingleResultOperator {
3339 result: Option<grafeo_core::execution::DataChunk>,
3340}
3341
3342impl SingleResultOperator {
3343 #[allow(dead_code)]
3344 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3345 Self { result }
3346 }
3347}
3348
3349impl Operator for SingleResultOperator {
3350 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3351 Ok(self.result.take())
3352 }
3353
3354 fn reset(&mut self) {
3355 }
3357
3358 fn name(&self) -> &'static str {
3359 "SingleResult"
3360 }
3361}
3362
3363#[cfg(test)]
3364mod tests {
3365 use super::*;
3366 use crate::query::plan::{
3367 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3368 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3369 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3370 SortKey, SortOp,
3371 };
3372 use grafeo_common::types::Value;
3373
3374 fn create_test_store() -> Arc<LpgStore> {
3375 let store = Arc::new(LpgStore::new());
3376 store.create_node(&["Person"]);
3377 store.create_node(&["Person"]);
3378 store.create_node(&["Company"]);
3379 store
3380 }
3381
3382 #[test]
3385 fn test_plan_simple_scan() {
3386 let store = create_test_store();
3387 let planner = Planner::new(store);
3388
3389 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3391 items: vec![ReturnItem {
3392 expression: LogicalExpression::Variable("n".to_string()),
3393 alias: None,
3394 }],
3395 distinct: false,
3396 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3397 variable: "n".to_string(),
3398 label: Some("Person".to_string()),
3399 input: None,
3400 })),
3401 }));
3402
3403 let physical = planner.plan(&logical).unwrap();
3404 assert_eq!(physical.columns(), &["n"]);
3405 }
3406
3407 #[test]
3408 fn test_plan_scan_without_label() {
3409 let store = create_test_store();
3410 let planner = Planner::new(store);
3411
3412 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3414 items: vec![ReturnItem {
3415 expression: LogicalExpression::Variable("n".to_string()),
3416 alias: None,
3417 }],
3418 distinct: false,
3419 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3420 variable: "n".to_string(),
3421 label: None,
3422 input: None,
3423 })),
3424 }));
3425
3426 let physical = planner.plan(&logical).unwrap();
3427 assert_eq!(physical.columns(), &["n"]);
3428 }
3429
3430 #[test]
3431 fn test_plan_return_with_alias() {
3432 let store = create_test_store();
3433 let planner = Planner::new(store);
3434
3435 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3437 items: vec![ReturnItem {
3438 expression: LogicalExpression::Variable("n".to_string()),
3439 alias: Some("person".to_string()),
3440 }],
3441 distinct: false,
3442 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3443 variable: "n".to_string(),
3444 label: Some("Person".to_string()),
3445 input: None,
3446 })),
3447 }));
3448
3449 let physical = planner.plan(&logical).unwrap();
3450 assert_eq!(physical.columns(), &["person"]);
3451 }
3452
3453 #[test]
3454 fn test_plan_return_property() {
3455 let store = create_test_store();
3456 let planner = Planner::new(store);
3457
3458 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3460 items: vec![ReturnItem {
3461 expression: LogicalExpression::Property {
3462 variable: "n".to_string(),
3463 property: "name".to_string(),
3464 },
3465 alias: None,
3466 }],
3467 distinct: false,
3468 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3469 variable: "n".to_string(),
3470 label: Some("Person".to_string()),
3471 input: None,
3472 })),
3473 }));
3474
3475 let physical = planner.plan(&logical).unwrap();
3476 assert_eq!(physical.columns(), &["n.name"]);
3477 }
3478
3479 #[test]
3480 fn test_plan_return_literal() {
3481 let store = create_test_store();
3482 let planner = Planner::new(store);
3483
3484 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3486 items: vec![ReturnItem {
3487 expression: LogicalExpression::Literal(Value::Int64(42)),
3488 alias: Some("answer".to_string()),
3489 }],
3490 distinct: false,
3491 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3492 variable: "n".to_string(),
3493 label: None,
3494 input: None,
3495 })),
3496 }));
3497
3498 let physical = planner.plan(&logical).unwrap();
3499 assert_eq!(physical.columns(), &["answer"]);
3500 }
3501
3502 #[test]
3505 fn test_plan_filter_equality() {
3506 let store = create_test_store();
3507 let planner = Planner::new(store);
3508
3509 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3511 items: vec![ReturnItem {
3512 expression: LogicalExpression::Variable("n".to_string()),
3513 alias: None,
3514 }],
3515 distinct: false,
3516 input: Box::new(LogicalOperator::Filter(FilterOp {
3517 predicate: LogicalExpression::Binary {
3518 left: Box::new(LogicalExpression::Property {
3519 variable: "n".to_string(),
3520 property: "age".to_string(),
3521 }),
3522 op: BinaryOp::Eq,
3523 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3524 },
3525 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3526 variable: "n".to_string(),
3527 label: Some("Person".to_string()),
3528 input: None,
3529 })),
3530 })),
3531 }));
3532
3533 let physical = planner.plan(&logical).unwrap();
3534 assert_eq!(physical.columns(), &["n"]);
3535 }
3536
3537 #[test]
3538 fn test_plan_filter_compound_and() {
3539 let store = create_test_store();
3540 let planner = Planner::new(store);
3541
3542 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3544 items: vec![ReturnItem {
3545 expression: LogicalExpression::Variable("n".to_string()),
3546 alias: None,
3547 }],
3548 distinct: false,
3549 input: Box::new(LogicalOperator::Filter(FilterOp {
3550 predicate: LogicalExpression::Binary {
3551 left: Box::new(LogicalExpression::Binary {
3552 left: Box::new(LogicalExpression::Property {
3553 variable: "n".to_string(),
3554 property: "age".to_string(),
3555 }),
3556 op: BinaryOp::Gt,
3557 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3558 }),
3559 op: BinaryOp::And,
3560 right: Box::new(LogicalExpression::Binary {
3561 left: Box::new(LogicalExpression::Property {
3562 variable: "n".to_string(),
3563 property: "age".to_string(),
3564 }),
3565 op: BinaryOp::Lt,
3566 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3567 }),
3568 },
3569 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3570 variable: "n".to_string(),
3571 label: None,
3572 input: None,
3573 })),
3574 })),
3575 }));
3576
3577 let physical = planner.plan(&logical).unwrap();
3578 assert_eq!(physical.columns(), &["n"]);
3579 }
3580
3581 #[test]
3582 fn test_plan_filter_unary_not() {
3583 let store = create_test_store();
3584 let planner = Planner::new(store);
3585
3586 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3588 items: vec![ReturnItem {
3589 expression: LogicalExpression::Variable("n".to_string()),
3590 alias: None,
3591 }],
3592 distinct: false,
3593 input: Box::new(LogicalOperator::Filter(FilterOp {
3594 predicate: LogicalExpression::Unary {
3595 op: UnaryOp::Not,
3596 operand: Box::new(LogicalExpression::Property {
3597 variable: "n".to_string(),
3598 property: "active".to_string(),
3599 }),
3600 },
3601 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3602 variable: "n".to_string(),
3603 label: None,
3604 input: None,
3605 })),
3606 })),
3607 }));
3608
3609 let physical = planner.plan(&logical).unwrap();
3610 assert_eq!(physical.columns(), &["n"]);
3611 }
3612
3613 #[test]
3614 fn test_plan_filter_is_null() {
3615 let store = create_test_store();
3616 let planner = Planner::new(store);
3617
3618 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3620 items: vec![ReturnItem {
3621 expression: LogicalExpression::Variable("n".to_string()),
3622 alias: None,
3623 }],
3624 distinct: false,
3625 input: Box::new(LogicalOperator::Filter(FilterOp {
3626 predicate: LogicalExpression::Unary {
3627 op: UnaryOp::IsNull,
3628 operand: Box::new(LogicalExpression::Property {
3629 variable: "n".to_string(),
3630 property: "email".to_string(),
3631 }),
3632 },
3633 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3634 variable: "n".to_string(),
3635 label: None,
3636 input: None,
3637 })),
3638 })),
3639 }));
3640
3641 let physical = planner.plan(&logical).unwrap();
3642 assert_eq!(physical.columns(), &["n"]);
3643 }
3644
3645 #[test]
3646 fn test_plan_filter_function_call() {
3647 let store = create_test_store();
3648 let planner = Planner::new(store);
3649
3650 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3652 items: vec![ReturnItem {
3653 expression: LogicalExpression::Variable("n".to_string()),
3654 alias: None,
3655 }],
3656 distinct: false,
3657 input: Box::new(LogicalOperator::Filter(FilterOp {
3658 predicate: LogicalExpression::Binary {
3659 left: Box::new(LogicalExpression::FunctionCall {
3660 name: "size".to_string(),
3661 args: vec![LogicalExpression::Property {
3662 variable: "n".to_string(),
3663 property: "friends".to_string(),
3664 }],
3665 distinct: false,
3666 }),
3667 op: BinaryOp::Gt,
3668 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3669 },
3670 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3671 variable: "n".to_string(),
3672 label: None,
3673 input: None,
3674 })),
3675 })),
3676 }));
3677
3678 let physical = planner.plan(&logical).unwrap();
3679 assert_eq!(physical.columns(), &["n"]);
3680 }
3681
3682 #[test]
3685 fn test_plan_expand_outgoing() {
3686 let store = create_test_store();
3687 let planner = Planner::new(store);
3688
3689 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3691 items: vec![
3692 ReturnItem {
3693 expression: LogicalExpression::Variable("a".to_string()),
3694 alias: None,
3695 },
3696 ReturnItem {
3697 expression: LogicalExpression::Variable("b".to_string()),
3698 alias: None,
3699 },
3700 ],
3701 distinct: false,
3702 input: Box::new(LogicalOperator::Expand(ExpandOp {
3703 from_variable: "a".to_string(),
3704 to_variable: "b".to_string(),
3705 edge_variable: None,
3706 direction: ExpandDirection::Outgoing,
3707 edge_type: Some("KNOWS".to_string()),
3708 min_hops: 1,
3709 max_hops: Some(1),
3710 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3711 variable: "a".to_string(),
3712 label: Some("Person".to_string()),
3713 input: None,
3714 })),
3715 path_alias: None,
3716 })),
3717 }));
3718
3719 let physical = planner.plan(&logical).unwrap();
3720 assert!(physical.columns().contains(&"a".to_string()));
3722 assert!(physical.columns().contains(&"b".to_string()));
3723 }
3724
3725 #[test]
3726 fn test_plan_expand_with_edge_variable() {
3727 let store = create_test_store();
3728 let planner = Planner::new(store);
3729
3730 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3732 items: vec![
3733 ReturnItem {
3734 expression: LogicalExpression::Variable("a".to_string()),
3735 alias: None,
3736 },
3737 ReturnItem {
3738 expression: LogicalExpression::Variable("r".to_string()),
3739 alias: None,
3740 },
3741 ReturnItem {
3742 expression: LogicalExpression::Variable("b".to_string()),
3743 alias: None,
3744 },
3745 ],
3746 distinct: false,
3747 input: Box::new(LogicalOperator::Expand(ExpandOp {
3748 from_variable: "a".to_string(),
3749 to_variable: "b".to_string(),
3750 edge_variable: Some("r".to_string()),
3751 direction: ExpandDirection::Outgoing,
3752 edge_type: Some("KNOWS".to_string()),
3753 min_hops: 1,
3754 max_hops: Some(1),
3755 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3756 variable: "a".to_string(),
3757 label: None,
3758 input: None,
3759 })),
3760 path_alias: None,
3761 })),
3762 }));
3763
3764 let physical = planner.plan(&logical).unwrap();
3765 assert!(physical.columns().contains(&"a".to_string()));
3766 assert!(physical.columns().contains(&"r".to_string()));
3767 assert!(physical.columns().contains(&"b".to_string()));
3768 }
3769
3770 #[test]
3773 fn test_plan_limit() {
3774 let store = create_test_store();
3775 let planner = Planner::new(store);
3776
3777 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3779 items: vec![ReturnItem {
3780 expression: LogicalExpression::Variable("n".to_string()),
3781 alias: None,
3782 }],
3783 distinct: false,
3784 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3785 count: 10,
3786 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3787 variable: "n".to_string(),
3788 label: None,
3789 input: None,
3790 })),
3791 })),
3792 }));
3793
3794 let physical = planner.plan(&logical).unwrap();
3795 assert_eq!(physical.columns(), &["n"]);
3796 }
3797
3798 #[test]
3799 fn test_plan_skip() {
3800 let store = create_test_store();
3801 let planner = Planner::new(store);
3802
3803 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3805 items: vec![ReturnItem {
3806 expression: LogicalExpression::Variable("n".to_string()),
3807 alias: None,
3808 }],
3809 distinct: false,
3810 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3811 count: 5,
3812 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3813 variable: "n".to_string(),
3814 label: None,
3815 input: None,
3816 })),
3817 })),
3818 }));
3819
3820 let physical = planner.plan(&logical).unwrap();
3821 assert_eq!(physical.columns(), &["n"]);
3822 }
3823
3824 #[test]
3825 fn test_plan_sort() {
3826 let store = create_test_store();
3827 let planner = Planner::new(store);
3828
3829 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3831 items: vec![ReturnItem {
3832 expression: LogicalExpression::Variable("n".to_string()),
3833 alias: None,
3834 }],
3835 distinct: false,
3836 input: Box::new(LogicalOperator::Sort(SortOp {
3837 keys: vec![SortKey {
3838 expression: LogicalExpression::Variable("n".to_string()),
3839 order: SortOrder::Ascending,
3840 }],
3841 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3842 variable: "n".to_string(),
3843 label: None,
3844 input: None,
3845 })),
3846 })),
3847 }));
3848
3849 let physical = planner.plan(&logical).unwrap();
3850 assert_eq!(physical.columns(), &["n"]);
3851 }
3852
3853 #[test]
3854 fn test_plan_sort_descending() {
3855 let store = create_test_store();
3856 let planner = Planner::new(store);
3857
3858 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3860 items: vec![ReturnItem {
3861 expression: LogicalExpression::Variable("n".to_string()),
3862 alias: None,
3863 }],
3864 distinct: false,
3865 input: Box::new(LogicalOperator::Sort(SortOp {
3866 keys: vec![SortKey {
3867 expression: LogicalExpression::Variable("n".to_string()),
3868 order: SortOrder::Descending,
3869 }],
3870 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3871 variable: "n".to_string(),
3872 label: None,
3873 input: None,
3874 })),
3875 })),
3876 }));
3877
3878 let physical = planner.plan(&logical).unwrap();
3879 assert_eq!(physical.columns(), &["n"]);
3880 }
3881
3882 #[test]
3883 fn test_plan_distinct() {
3884 let store = create_test_store();
3885 let planner = Planner::new(store);
3886
3887 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3889 items: vec![ReturnItem {
3890 expression: LogicalExpression::Variable("n".to_string()),
3891 alias: None,
3892 }],
3893 distinct: false,
3894 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3895 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3896 variable: "n".to_string(),
3897 label: None,
3898 input: None,
3899 })),
3900 columns: None,
3901 })),
3902 }));
3903
3904 let physical = planner.plan(&logical).unwrap();
3905 assert_eq!(physical.columns(), &["n"]);
3906 }
3907
3908 #[test]
3911 fn test_plan_aggregate_count() {
3912 let store = create_test_store();
3913 let planner = Planner::new(store);
3914
3915 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3917 items: vec![ReturnItem {
3918 expression: LogicalExpression::Variable("cnt".to_string()),
3919 alias: None,
3920 }],
3921 distinct: false,
3922 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3923 group_by: vec![],
3924 aggregates: vec![LogicalAggregateExpr {
3925 function: LogicalAggregateFunction::Count,
3926 expression: Some(LogicalExpression::Variable("n".to_string())),
3927 distinct: false,
3928 alias: Some("cnt".to_string()),
3929 percentile: None,
3930 }],
3931 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3932 variable: "n".to_string(),
3933 label: None,
3934 input: None,
3935 })),
3936 having: None,
3937 })),
3938 }));
3939
3940 let physical = planner.plan(&logical).unwrap();
3941 assert!(physical.columns().contains(&"cnt".to_string()));
3942 }
3943
3944 #[test]
3945 fn test_plan_aggregate_with_group_by() {
3946 let store = create_test_store();
3947 let planner = Planner::new(store);
3948
3949 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3951 group_by: vec![LogicalExpression::Property {
3952 variable: "n".to_string(),
3953 property: "city".to_string(),
3954 }],
3955 aggregates: vec![LogicalAggregateExpr {
3956 function: LogicalAggregateFunction::Count,
3957 expression: Some(LogicalExpression::Variable("n".to_string())),
3958 distinct: false,
3959 alias: Some("cnt".to_string()),
3960 percentile: None,
3961 }],
3962 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3963 variable: "n".to_string(),
3964 label: Some("Person".to_string()),
3965 input: None,
3966 })),
3967 having: None,
3968 }));
3969
3970 let physical = planner.plan(&logical).unwrap();
3971 assert_eq!(physical.columns().len(), 2);
3972 }
3973
3974 #[test]
3975 fn test_plan_aggregate_sum() {
3976 let store = create_test_store();
3977 let planner = Planner::new(store);
3978
3979 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3981 group_by: vec![],
3982 aggregates: vec![LogicalAggregateExpr {
3983 function: LogicalAggregateFunction::Sum,
3984 expression: Some(LogicalExpression::Property {
3985 variable: "n".to_string(),
3986 property: "value".to_string(),
3987 }),
3988 distinct: false,
3989 alias: Some("total".to_string()),
3990 percentile: None,
3991 }],
3992 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3993 variable: "n".to_string(),
3994 label: None,
3995 input: None,
3996 })),
3997 having: None,
3998 }));
3999
4000 let physical = planner.plan(&logical).unwrap();
4001 assert!(physical.columns().contains(&"total".to_string()));
4002 }
4003
4004 #[test]
4005 fn test_plan_aggregate_avg() {
4006 let store = create_test_store();
4007 let planner = Planner::new(store);
4008
4009 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4011 group_by: vec![],
4012 aggregates: vec![LogicalAggregateExpr {
4013 function: LogicalAggregateFunction::Avg,
4014 expression: Some(LogicalExpression::Property {
4015 variable: "n".to_string(),
4016 property: "score".to_string(),
4017 }),
4018 distinct: false,
4019 alias: Some("average".to_string()),
4020 percentile: None,
4021 }],
4022 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4023 variable: "n".to_string(),
4024 label: None,
4025 input: None,
4026 })),
4027 having: None,
4028 }));
4029
4030 let physical = planner.plan(&logical).unwrap();
4031 assert!(physical.columns().contains(&"average".to_string()));
4032 }
4033
4034 #[test]
4035 fn test_plan_aggregate_min_max() {
4036 let store = create_test_store();
4037 let planner = Planner::new(store);
4038
4039 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4041 group_by: vec![],
4042 aggregates: vec![
4043 LogicalAggregateExpr {
4044 function: LogicalAggregateFunction::Min,
4045 expression: Some(LogicalExpression::Property {
4046 variable: "n".to_string(),
4047 property: "age".to_string(),
4048 }),
4049 distinct: false,
4050 alias: Some("youngest".to_string()),
4051 percentile: None,
4052 },
4053 LogicalAggregateExpr {
4054 function: LogicalAggregateFunction::Max,
4055 expression: Some(LogicalExpression::Property {
4056 variable: "n".to_string(),
4057 property: "age".to_string(),
4058 }),
4059 distinct: false,
4060 alias: Some("oldest".to_string()),
4061 percentile: None,
4062 },
4063 ],
4064 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4065 variable: "n".to_string(),
4066 label: None,
4067 input: None,
4068 })),
4069 having: None,
4070 }));
4071
4072 let physical = planner.plan(&logical).unwrap();
4073 assert!(physical.columns().contains(&"youngest".to_string()));
4074 assert!(physical.columns().contains(&"oldest".to_string()));
4075 }
4076
4077 #[test]
4080 fn test_plan_inner_join() {
4081 let store = create_test_store();
4082 let planner = Planner::new(store);
4083
4084 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4086 items: vec![
4087 ReturnItem {
4088 expression: LogicalExpression::Variable("a".to_string()),
4089 alias: None,
4090 },
4091 ReturnItem {
4092 expression: LogicalExpression::Variable("b".to_string()),
4093 alias: None,
4094 },
4095 ],
4096 distinct: false,
4097 input: Box::new(LogicalOperator::Join(JoinOp {
4098 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4099 variable: "a".to_string(),
4100 label: Some("Person".to_string()),
4101 input: None,
4102 })),
4103 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4104 variable: "b".to_string(),
4105 label: Some("Company".to_string()),
4106 input: None,
4107 })),
4108 join_type: JoinType::Inner,
4109 conditions: vec![JoinCondition {
4110 left: LogicalExpression::Variable("a".to_string()),
4111 right: LogicalExpression::Variable("b".to_string()),
4112 }],
4113 })),
4114 }));
4115
4116 let physical = planner.plan(&logical).unwrap();
4117 assert!(physical.columns().contains(&"a".to_string()));
4118 assert!(physical.columns().contains(&"b".to_string()));
4119 }
4120
4121 #[test]
4122 fn test_plan_cross_join() {
4123 let store = create_test_store();
4124 let planner = Planner::new(store);
4125
4126 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4128 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4129 variable: "a".to_string(),
4130 label: None,
4131 input: None,
4132 })),
4133 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4134 variable: "b".to_string(),
4135 label: None,
4136 input: None,
4137 })),
4138 join_type: JoinType::Cross,
4139 conditions: vec![],
4140 }));
4141
4142 let physical = planner.plan(&logical).unwrap();
4143 assert_eq!(physical.columns().len(), 2);
4144 }
4145
4146 #[test]
4147 fn test_plan_left_join() {
4148 let store = create_test_store();
4149 let planner = Planner::new(store);
4150
4151 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4152 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4153 variable: "a".to_string(),
4154 label: None,
4155 input: None,
4156 })),
4157 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4158 variable: "b".to_string(),
4159 label: None,
4160 input: None,
4161 })),
4162 join_type: JoinType::Left,
4163 conditions: vec![],
4164 }));
4165
4166 let physical = planner.plan(&logical).unwrap();
4167 assert_eq!(physical.columns().len(), 2);
4168 }
4169
4170 #[test]
4173 fn test_plan_create_node() {
4174 let store = create_test_store();
4175 let planner = Planner::new(store);
4176
4177 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4179 variable: "n".to_string(),
4180 labels: vec!["Person".to_string()],
4181 properties: vec![(
4182 "name".to_string(),
4183 LogicalExpression::Literal(Value::String("Alice".into())),
4184 )],
4185 input: None,
4186 }));
4187
4188 let physical = planner.plan(&logical).unwrap();
4189 assert!(physical.columns().contains(&"n".to_string()));
4190 }
4191
4192 #[test]
4193 fn test_plan_create_edge() {
4194 let store = create_test_store();
4195 let planner = Planner::new(store);
4196
4197 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4199 variable: Some("r".to_string()),
4200 from_variable: "a".to_string(),
4201 to_variable: "b".to_string(),
4202 edge_type: "KNOWS".to_string(),
4203 properties: vec![],
4204 input: Box::new(LogicalOperator::Join(JoinOp {
4205 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4206 variable: "a".to_string(),
4207 label: None,
4208 input: None,
4209 })),
4210 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4211 variable: "b".to_string(),
4212 label: None,
4213 input: None,
4214 })),
4215 join_type: JoinType::Cross,
4216 conditions: vec![],
4217 })),
4218 }));
4219
4220 let physical = planner.plan(&logical).unwrap();
4221 assert!(physical.columns().contains(&"r".to_string()));
4222 }
4223
4224 #[test]
4225 fn test_plan_delete_node() {
4226 let store = create_test_store();
4227 let planner = Planner::new(store);
4228
4229 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4231 variable: "n".to_string(),
4232 detach: false,
4233 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4234 variable: "n".to_string(),
4235 label: None,
4236 input: None,
4237 })),
4238 }));
4239
4240 let physical = planner.plan(&logical).unwrap();
4241 assert!(physical.columns().contains(&"deleted_count".to_string()));
4242 }
4243
4244 #[test]
4247 fn test_plan_empty_errors() {
4248 let store = create_test_store();
4249 let planner = Planner::new(store);
4250
4251 let logical = LogicalPlan::new(LogicalOperator::Empty);
4252 let result = planner.plan(&logical);
4253 assert!(result.is_err());
4254 }
4255
4256 #[test]
4257 fn test_plan_missing_variable_in_return() {
4258 let store = create_test_store();
4259 let planner = Planner::new(store);
4260
4261 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4263 items: vec![ReturnItem {
4264 expression: LogicalExpression::Variable("missing".to_string()),
4265 alias: None,
4266 }],
4267 distinct: false,
4268 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4269 variable: "n".to_string(),
4270 label: None,
4271 input: None,
4272 })),
4273 }));
4274
4275 let result = planner.plan(&logical);
4276 assert!(result.is_err());
4277 }
4278
4279 #[test]
4282 fn test_convert_binary_ops() {
4283 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4284 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4285 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4286 assert!(convert_binary_op(BinaryOp::Le).is_ok());
4287 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4288 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4289 assert!(convert_binary_op(BinaryOp::And).is_ok());
4290 assert!(convert_binary_op(BinaryOp::Or).is_ok());
4291 assert!(convert_binary_op(BinaryOp::Add).is_ok());
4292 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4293 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4294 assert!(convert_binary_op(BinaryOp::Div).is_ok());
4295 }
4296
4297 #[test]
4298 fn test_convert_unary_ops() {
4299 assert!(convert_unary_op(UnaryOp::Not).is_ok());
4300 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4301 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4302 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4303 }
4304
4305 #[test]
4306 fn test_convert_aggregate_functions() {
4307 assert!(matches!(
4308 convert_aggregate_function(LogicalAggregateFunction::Count),
4309 PhysicalAggregateFunction::Count
4310 ));
4311 assert!(matches!(
4312 convert_aggregate_function(LogicalAggregateFunction::Sum),
4313 PhysicalAggregateFunction::Sum
4314 ));
4315 assert!(matches!(
4316 convert_aggregate_function(LogicalAggregateFunction::Avg),
4317 PhysicalAggregateFunction::Avg
4318 ));
4319 assert!(matches!(
4320 convert_aggregate_function(LogicalAggregateFunction::Min),
4321 PhysicalAggregateFunction::Min
4322 ));
4323 assert!(matches!(
4324 convert_aggregate_function(LogicalAggregateFunction::Max),
4325 PhysicalAggregateFunction::Max
4326 ));
4327 }
4328
4329 #[test]
4330 fn test_planner_accessors() {
4331 let store = create_test_store();
4332 let planner = Planner::new(Arc::clone(&store));
4333
4334 assert!(planner.tx_id().is_none());
4335 assert!(planner.tx_manager().is_none());
4336 let _ = planner.viewing_epoch(); }
4338
4339 #[test]
4340 fn test_physical_plan_accessors() {
4341 let store = create_test_store();
4342 let planner = Planner::new(store);
4343
4344 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4345 variable: "n".to_string(),
4346 label: None,
4347 input: None,
4348 }));
4349
4350 let physical = planner.plan(&logical).unwrap();
4351 assert_eq!(physical.columns(), &["n"]);
4352
4353 let _ = physical.into_operator();
4355 }
4356
4357 #[test]
4360 fn test_plan_adaptive_with_scan() {
4361 let store = create_test_store();
4362 let planner = Planner::new(store);
4363
4364 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4366 items: vec![ReturnItem {
4367 expression: LogicalExpression::Variable("n".to_string()),
4368 alias: None,
4369 }],
4370 distinct: false,
4371 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4372 variable: "n".to_string(),
4373 label: Some("Person".to_string()),
4374 input: None,
4375 })),
4376 }));
4377
4378 let physical = planner.plan_adaptive(&logical).unwrap();
4379 assert_eq!(physical.columns(), &["n"]);
4380 assert!(physical.adaptive_context.is_some());
4382 }
4383
4384 #[test]
4385 fn test_plan_adaptive_with_filter() {
4386 let store = create_test_store();
4387 let planner = Planner::new(store);
4388
4389 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4391 items: vec![ReturnItem {
4392 expression: LogicalExpression::Variable("n".to_string()),
4393 alias: None,
4394 }],
4395 distinct: false,
4396 input: Box::new(LogicalOperator::Filter(FilterOp {
4397 predicate: LogicalExpression::Binary {
4398 left: Box::new(LogicalExpression::Property {
4399 variable: "n".to_string(),
4400 property: "age".to_string(),
4401 }),
4402 op: BinaryOp::Gt,
4403 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4404 },
4405 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4406 variable: "n".to_string(),
4407 label: None,
4408 input: None,
4409 })),
4410 })),
4411 }));
4412
4413 let physical = planner.plan_adaptive(&logical).unwrap();
4414 assert!(physical.adaptive_context.is_some());
4415 }
4416
4417 #[test]
4418 fn test_plan_adaptive_with_expand() {
4419 let store = create_test_store();
4420 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4421
4422 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4424 items: vec![
4425 ReturnItem {
4426 expression: LogicalExpression::Variable("a".to_string()),
4427 alias: None,
4428 },
4429 ReturnItem {
4430 expression: LogicalExpression::Variable("b".to_string()),
4431 alias: None,
4432 },
4433 ],
4434 distinct: false,
4435 input: Box::new(LogicalOperator::Expand(ExpandOp {
4436 from_variable: "a".to_string(),
4437 to_variable: "b".to_string(),
4438 edge_variable: None,
4439 direction: ExpandDirection::Outgoing,
4440 edge_type: Some("KNOWS".to_string()),
4441 min_hops: 1,
4442 max_hops: Some(1),
4443 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4444 variable: "a".to_string(),
4445 label: None,
4446 input: None,
4447 })),
4448 path_alias: None,
4449 })),
4450 }));
4451
4452 let physical = planner.plan_adaptive(&logical).unwrap();
4453 assert!(physical.adaptive_context.is_some());
4454 }
4455
4456 #[test]
4457 fn test_plan_adaptive_with_join() {
4458 let store = create_test_store();
4459 let planner = Planner::new(store);
4460
4461 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4462 items: vec![
4463 ReturnItem {
4464 expression: LogicalExpression::Variable("a".to_string()),
4465 alias: None,
4466 },
4467 ReturnItem {
4468 expression: LogicalExpression::Variable("b".to_string()),
4469 alias: None,
4470 },
4471 ],
4472 distinct: false,
4473 input: Box::new(LogicalOperator::Join(JoinOp {
4474 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4475 variable: "a".to_string(),
4476 label: None,
4477 input: None,
4478 })),
4479 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4480 variable: "b".to_string(),
4481 label: None,
4482 input: None,
4483 })),
4484 join_type: JoinType::Cross,
4485 conditions: vec![],
4486 })),
4487 }));
4488
4489 let physical = planner.plan_adaptive(&logical).unwrap();
4490 assert!(physical.adaptive_context.is_some());
4491 }
4492
4493 #[test]
4494 fn test_plan_adaptive_with_aggregate() {
4495 let store = create_test_store();
4496 let planner = Planner::new(store);
4497
4498 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4499 group_by: vec![],
4500 aggregates: vec![LogicalAggregateExpr {
4501 function: LogicalAggregateFunction::Count,
4502 expression: Some(LogicalExpression::Variable("n".to_string())),
4503 distinct: false,
4504 alias: Some("cnt".to_string()),
4505 percentile: None,
4506 }],
4507 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4508 variable: "n".to_string(),
4509 label: None,
4510 input: None,
4511 })),
4512 having: None,
4513 }));
4514
4515 let physical = planner.plan_adaptive(&logical).unwrap();
4516 assert!(physical.adaptive_context.is_some());
4517 }
4518
4519 #[test]
4520 fn test_plan_adaptive_with_distinct() {
4521 let store = create_test_store();
4522 let planner = Planner::new(store);
4523
4524 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4525 items: vec![ReturnItem {
4526 expression: LogicalExpression::Variable("n".to_string()),
4527 alias: None,
4528 }],
4529 distinct: false,
4530 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4531 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4532 variable: "n".to_string(),
4533 label: None,
4534 input: None,
4535 })),
4536 columns: None,
4537 })),
4538 }));
4539
4540 let physical = planner.plan_adaptive(&logical).unwrap();
4541 assert!(physical.adaptive_context.is_some());
4542 }
4543
4544 #[test]
4545 fn test_plan_adaptive_with_limit() {
4546 let store = create_test_store();
4547 let planner = Planner::new(store);
4548
4549 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4550 items: vec![ReturnItem {
4551 expression: LogicalExpression::Variable("n".to_string()),
4552 alias: None,
4553 }],
4554 distinct: false,
4555 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4556 count: 10,
4557 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4558 variable: "n".to_string(),
4559 label: None,
4560 input: None,
4561 })),
4562 })),
4563 }));
4564
4565 let physical = planner.plan_adaptive(&logical).unwrap();
4566 assert!(physical.adaptive_context.is_some());
4567 }
4568
4569 #[test]
4570 fn test_plan_adaptive_with_skip() {
4571 let store = create_test_store();
4572 let planner = Planner::new(store);
4573
4574 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4575 items: vec![ReturnItem {
4576 expression: LogicalExpression::Variable("n".to_string()),
4577 alias: None,
4578 }],
4579 distinct: false,
4580 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4581 count: 5,
4582 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4583 variable: "n".to_string(),
4584 label: None,
4585 input: None,
4586 })),
4587 })),
4588 }));
4589
4590 let physical = planner.plan_adaptive(&logical).unwrap();
4591 assert!(physical.adaptive_context.is_some());
4592 }
4593
4594 #[test]
4595 fn test_plan_adaptive_with_sort() {
4596 let store = create_test_store();
4597 let planner = Planner::new(store);
4598
4599 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4600 items: vec![ReturnItem {
4601 expression: LogicalExpression::Variable("n".to_string()),
4602 alias: None,
4603 }],
4604 distinct: false,
4605 input: Box::new(LogicalOperator::Sort(SortOp {
4606 keys: vec![SortKey {
4607 expression: LogicalExpression::Variable("n".to_string()),
4608 order: SortOrder::Ascending,
4609 }],
4610 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4611 variable: "n".to_string(),
4612 label: None,
4613 input: None,
4614 })),
4615 })),
4616 }));
4617
4618 let physical = planner.plan_adaptive(&logical).unwrap();
4619 assert!(physical.adaptive_context.is_some());
4620 }
4621
4622 #[test]
4623 fn test_plan_adaptive_with_union() {
4624 let store = create_test_store();
4625 let planner = Planner::new(store);
4626
4627 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4628 items: vec![ReturnItem {
4629 expression: LogicalExpression::Variable("n".to_string()),
4630 alias: None,
4631 }],
4632 distinct: false,
4633 input: Box::new(LogicalOperator::Union(UnionOp {
4634 inputs: vec![
4635 LogicalOperator::NodeScan(NodeScanOp {
4636 variable: "n".to_string(),
4637 label: Some("Person".to_string()),
4638 input: None,
4639 }),
4640 LogicalOperator::NodeScan(NodeScanOp {
4641 variable: "n".to_string(),
4642 label: Some("Company".to_string()),
4643 input: None,
4644 }),
4645 ],
4646 })),
4647 }));
4648
4649 let physical = planner.plan_adaptive(&logical).unwrap();
4650 assert!(physical.adaptive_context.is_some());
4651 }
4652
4653 #[test]
4656 fn test_plan_expand_variable_length() {
4657 let store = create_test_store();
4658 let planner = Planner::new(store);
4659
4660 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4662 items: vec![
4663 ReturnItem {
4664 expression: LogicalExpression::Variable("a".to_string()),
4665 alias: None,
4666 },
4667 ReturnItem {
4668 expression: LogicalExpression::Variable("b".to_string()),
4669 alias: None,
4670 },
4671 ],
4672 distinct: false,
4673 input: Box::new(LogicalOperator::Expand(ExpandOp {
4674 from_variable: "a".to_string(),
4675 to_variable: "b".to_string(),
4676 edge_variable: None,
4677 direction: ExpandDirection::Outgoing,
4678 edge_type: Some("KNOWS".to_string()),
4679 min_hops: 1,
4680 max_hops: Some(3),
4681 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4682 variable: "a".to_string(),
4683 label: None,
4684 input: None,
4685 })),
4686 path_alias: None,
4687 })),
4688 }));
4689
4690 let physical = planner.plan(&logical).unwrap();
4691 assert!(physical.columns().contains(&"a".to_string()));
4692 assert!(physical.columns().contains(&"b".to_string()));
4693 }
4694
4695 #[test]
4696 fn test_plan_expand_with_path_alias() {
4697 let store = create_test_store();
4698 let planner = Planner::new(store);
4699
4700 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4702 items: vec![
4703 ReturnItem {
4704 expression: LogicalExpression::Variable("a".to_string()),
4705 alias: None,
4706 },
4707 ReturnItem {
4708 expression: LogicalExpression::Variable("b".to_string()),
4709 alias: None,
4710 },
4711 ],
4712 distinct: false,
4713 input: Box::new(LogicalOperator::Expand(ExpandOp {
4714 from_variable: "a".to_string(),
4715 to_variable: "b".to_string(),
4716 edge_variable: None,
4717 direction: ExpandDirection::Outgoing,
4718 edge_type: Some("KNOWS".to_string()),
4719 min_hops: 1,
4720 max_hops: Some(3),
4721 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4722 variable: "a".to_string(),
4723 label: None,
4724 input: None,
4725 })),
4726 path_alias: Some("p".to_string()),
4727 })),
4728 }));
4729
4730 let physical = planner.plan(&logical).unwrap();
4731 assert!(physical.columns().contains(&"a".to_string()));
4733 assert!(physical.columns().contains(&"b".to_string()));
4734 }
4735
4736 #[test]
4737 fn test_plan_expand_incoming() {
4738 let store = create_test_store();
4739 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4740
4741 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4743 items: vec![
4744 ReturnItem {
4745 expression: LogicalExpression::Variable("a".to_string()),
4746 alias: None,
4747 },
4748 ReturnItem {
4749 expression: LogicalExpression::Variable("b".to_string()),
4750 alias: None,
4751 },
4752 ],
4753 distinct: false,
4754 input: Box::new(LogicalOperator::Expand(ExpandOp {
4755 from_variable: "a".to_string(),
4756 to_variable: "b".to_string(),
4757 edge_variable: None,
4758 direction: ExpandDirection::Incoming,
4759 edge_type: Some("KNOWS".to_string()),
4760 min_hops: 1,
4761 max_hops: Some(1),
4762 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4763 variable: "a".to_string(),
4764 label: None,
4765 input: None,
4766 })),
4767 path_alias: None,
4768 })),
4769 }));
4770
4771 let physical = planner.plan(&logical).unwrap();
4772 assert!(physical.columns().contains(&"a".to_string()));
4773 assert!(physical.columns().contains(&"b".to_string()));
4774 }
4775
4776 #[test]
4777 fn test_plan_expand_both_directions() {
4778 let store = create_test_store();
4779 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4780
4781 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4783 items: vec![
4784 ReturnItem {
4785 expression: LogicalExpression::Variable("a".to_string()),
4786 alias: None,
4787 },
4788 ReturnItem {
4789 expression: LogicalExpression::Variable("b".to_string()),
4790 alias: None,
4791 },
4792 ],
4793 distinct: false,
4794 input: Box::new(LogicalOperator::Expand(ExpandOp {
4795 from_variable: "a".to_string(),
4796 to_variable: "b".to_string(),
4797 edge_variable: None,
4798 direction: ExpandDirection::Both,
4799 edge_type: Some("KNOWS".to_string()),
4800 min_hops: 1,
4801 max_hops: Some(1),
4802 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4803 variable: "a".to_string(),
4804 label: None,
4805 input: None,
4806 })),
4807 path_alias: None,
4808 })),
4809 }));
4810
4811 let physical = planner.plan(&logical).unwrap();
4812 assert!(physical.columns().contains(&"a".to_string()));
4813 assert!(physical.columns().contains(&"b".to_string()));
4814 }
4815
4816 #[test]
4819 fn test_planner_with_context() {
4820 use crate::transaction::TransactionManager;
4821
4822 let store = create_test_store();
4823 let tx_manager = Arc::new(TransactionManager::new());
4824 let tx_id = tx_manager.begin();
4825 let epoch = tx_manager.current_epoch();
4826
4827 let planner = Planner::with_context(
4828 Arc::clone(&store),
4829 Arc::clone(&tx_manager),
4830 Some(tx_id),
4831 epoch,
4832 );
4833
4834 assert_eq!(planner.tx_id(), Some(tx_id));
4835 assert!(planner.tx_manager().is_some());
4836 assert_eq!(planner.viewing_epoch(), epoch);
4837 }
4838
4839 #[test]
4840 fn test_planner_with_factorized_execution_disabled() {
4841 let store = create_test_store();
4842 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4843
4844 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4846 items: vec![
4847 ReturnItem {
4848 expression: LogicalExpression::Variable("a".to_string()),
4849 alias: None,
4850 },
4851 ReturnItem {
4852 expression: LogicalExpression::Variable("c".to_string()),
4853 alias: None,
4854 },
4855 ],
4856 distinct: false,
4857 input: Box::new(LogicalOperator::Expand(ExpandOp {
4858 from_variable: "b".to_string(),
4859 to_variable: "c".to_string(),
4860 edge_variable: None,
4861 direction: ExpandDirection::Outgoing,
4862 edge_type: None,
4863 min_hops: 1,
4864 max_hops: Some(1),
4865 input: Box::new(LogicalOperator::Expand(ExpandOp {
4866 from_variable: "a".to_string(),
4867 to_variable: "b".to_string(),
4868 edge_variable: None,
4869 direction: ExpandDirection::Outgoing,
4870 edge_type: None,
4871 min_hops: 1,
4872 max_hops: Some(1),
4873 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4874 variable: "a".to_string(),
4875 label: None,
4876 input: None,
4877 })),
4878 path_alias: None,
4879 })),
4880 path_alias: None,
4881 })),
4882 }));
4883
4884 let physical = planner.plan(&logical).unwrap();
4885 assert!(physical.columns().contains(&"a".to_string()));
4886 assert!(physical.columns().contains(&"c".to_string()));
4887 }
4888
4889 #[test]
4892 fn test_plan_sort_by_property() {
4893 let store = create_test_store();
4894 let planner = Planner::new(store);
4895
4896 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4898 items: vec![ReturnItem {
4899 expression: LogicalExpression::Variable("n".to_string()),
4900 alias: None,
4901 }],
4902 distinct: false,
4903 input: Box::new(LogicalOperator::Sort(SortOp {
4904 keys: vec![SortKey {
4905 expression: LogicalExpression::Property {
4906 variable: "n".to_string(),
4907 property: "name".to_string(),
4908 },
4909 order: SortOrder::Ascending,
4910 }],
4911 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4912 variable: "n".to_string(),
4913 label: None,
4914 input: None,
4915 })),
4916 })),
4917 }));
4918
4919 let physical = planner.plan(&logical).unwrap();
4920 assert!(physical.columns().contains(&"n".to_string()));
4922 }
4923
4924 #[test]
4927 fn test_plan_scan_with_input() {
4928 let store = create_test_store();
4929 let planner = Planner::new(store);
4930
4931 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4933 items: vec![
4934 ReturnItem {
4935 expression: LogicalExpression::Variable("a".to_string()),
4936 alias: None,
4937 },
4938 ReturnItem {
4939 expression: LogicalExpression::Variable("b".to_string()),
4940 alias: None,
4941 },
4942 ],
4943 distinct: false,
4944 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4945 variable: "b".to_string(),
4946 label: Some("Company".to_string()),
4947 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4948 variable: "a".to_string(),
4949 label: Some("Person".to_string()),
4950 input: None,
4951 }))),
4952 })),
4953 }));
4954
4955 let physical = planner.plan(&logical).unwrap();
4956 assert!(physical.columns().contains(&"a".to_string()));
4957 assert!(physical.columns().contains(&"b".to_string()));
4958 }
4959}