1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29 UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37struct RangeBounds<'a> {
39 min: Option<&'a Value>,
40 max: Option<&'a Value>,
41 min_inclusive: bool,
42 max_inclusive: bool,
43}
44
45pub struct Planner {
47 store: Arc<LpgStore>,
49 tx_manager: Option<Arc<TransactionManager>>,
51 tx_id: Option<TxId>,
53 viewing_epoch: EpochId,
55 anon_edge_counter: std::cell::Cell<u32>,
57 factorized_execution: bool,
59}
60
61impl Planner {
62 #[must_use]
67 pub fn new(store: Arc<LpgStore>) -> Self {
68 let epoch = store.current_epoch();
69 Self {
70 store,
71 tx_manager: None,
72 tx_id: None,
73 viewing_epoch: epoch,
74 anon_edge_counter: std::cell::Cell::new(0),
75 factorized_execution: true,
76 }
77 }
78
79 #[must_use]
88 pub fn with_context(
89 store: Arc<LpgStore>,
90 tx_manager: Arc<TransactionManager>,
91 tx_id: Option<TxId>,
92 viewing_epoch: EpochId,
93 ) -> Self {
94 Self {
95 store,
96 tx_manager: Some(tx_manager),
97 tx_id,
98 viewing_epoch,
99 anon_edge_counter: std::cell::Cell::new(0),
100 factorized_execution: true,
101 }
102 }
103
104 #[must_use]
106 pub fn viewing_epoch(&self) -> EpochId {
107 self.viewing_epoch
108 }
109
110 #[must_use]
112 pub fn tx_id(&self) -> Option<TxId> {
113 self.tx_id
114 }
115
116 #[must_use]
118 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119 self.tx_manager.as_ref()
120 }
121
122 #[must_use]
124 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125 self.factorized_execution = enabled;
126 self
127 }
128
129 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133 match op {
134 LogicalOperator::Expand(expand) => {
135 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138 if is_single_hop {
139 let (inner_count, base) = Self::count_expand_chain(&expand.input);
140 (inner_count + 1, base)
141 } else {
142 (0, op)
144 }
145 }
146 _ => (0, op),
147 }
148 }
149
150 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154 let mut chain = Vec::new();
155 let mut current = op;
156
157 while let LogicalOperator::Expand(expand) = current {
158 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160 if !is_single_hop {
161 break;
162 }
163 chain.push(expand);
164 current = &expand.input;
165 }
166
167 chain.reverse();
169 chain
170 }
171
172 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179 Ok(PhysicalPlan {
180 operator,
181 columns,
182 adaptive_context: None,
183 })
184 }
185
186 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197 let mut adaptive_context = AdaptiveContext::new();
199 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201 Ok(PhysicalPlan {
202 operator,
203 columns,
204 adaptive_context: Some(adaptive_context),
205 })
206 }
207
208 fn collect_cardinality_estimates(
210 &self,
211 op: &LogicalOperator,
212 ctx: &mut AdaptiveContext,
213 depth: usize,
214 ) {
215 match op {
216 LogicalOperator::NodeScan(scan) => {
217 let estimate = if let Some(label) = &scan.label {
219 self.store.nodes_by_label(label).len() as f64
220 } else {
221 self.store.node_count() as f64
222 };
223 let id = format!("scan_{}", scan.variable);
224 ctx.set_estimate(&id, estimate);
225
226 if let Some(input) = &scan.input {
228 self.collect_cardinality_estimates(input, ctx, depth + 1);
229 }
230 }
231 LogicalOperator::Filter(filter) => {
232 let input_estimate = self.estimate_cardinality(&filter.input);
234 let estimate = input_estimate * 0.3;
235 let id = format!("filter_{depth}");
236 ctx.set_estimate(&id, estimate);
237
238 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239 }
240 LogicalOperator::Expand(expand) => {
241 let input_estimate = self.estimate_cardinality(&expand.input);
243 let stats = self.store.statistics();
244 let avg_degree = self.estimate_expand_degree(&stats, expand);
245 let estimate = input_estimate * avg_degree;
246 let id = format!("expand_{}", expand.to_variable);
247 ctx.set_estimate(&id, estimate);
248
249 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250 }
251 LogicalOperator::Join(join) => {
252 let left_est = self.estimate_cardinality(&join.left);
254 let right_est = self.estimate_cardinality(&join.right);
255 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
257 ctx.set_estimate(&id, estimate);
258
259 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261 }
262 LogicalOperator::Aggregate(agg) => {
263 let input_estimate = self.estimate_cardinality(&agg.input);
265 let estimate = if agg.group_by.is_empty() {
266 1.0 } else {
268 (input_estimate * 0.1).max(1.0) };
270 let id = format!("aggregate_{depth}");
271 ctx.set_estimate(&id, estimate);
272
273 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274 }
275 LogicalOperator::Distinct(distinct) => {
276 let input_estimate = self.estimate_cardinality(&distinct.input);
277 let estimate = (input_estimate * 0.5).max(1.0);
278 let id = format!("distinct_{depth}");
279 ctx.set_estimate(&id, estimate);
280
281 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282 }
283 LogicalOperator::Return(ret) => {
284 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285 }
286 LogicalOperator::Limit(limit) => {
287 let input_estimate = self.estimate_cardinality(&limit.input);
288 let estimate = (input_estimate).min(limit.count as f64);
289 let id = format!("limit_{depth}");
290 ctx.set_estimate(&id, estimate);
291
292 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293 }
294 LogicalOperator::Skip(skip) => {
295 let input_estimate = self.estimate_cardinality(&skip.input);
296 let estimate = (input_estimate - skip.count as f64).max(0.0);
297 let id = format!("skip_{depth}");
298 ctx.set_estimate(&id, estimate);
299
300 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301 }
302 LogicalOperator::Sort(sort) => {
303 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305 }
306 LogicalOperator::Union(union) => {
307 let estimate: f64 = union
308 .inputs
309 .iter()
310 .map(|input| self.estimate_cardinality(input))
311 .sum();
312 let id = format!("union_{depth}");
313 ctx.set_estimate(&id, estimate);
314
315 for input in &union.inputs {
316 self.collect_cardinality_estimates(input, ctx, depth + 1);
317 }
318 }
319 _ => {
320 }
322 }
323 }
324
325 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327 match op {
328 LogicalOperator::NodeScan(scan) => {
329 if let Some(label) = &scan.label {
330 self.store.nodes_by_label(label).len() as f64
331 } else {
332 self.store.node_count() as f64
333 }
334 }
335 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336 LogicalOperator::Expand(expand) => {
337 let stats = self.store.statistics();
338 let avg_degree = self.estimate_expand_degree(&stats, expand);
339 self.estimate_cardinality(&expand.input) * avg_degree
340 }
341 LogicalOperator::Join(join) => {
342 let left = self.estimate_cardinality(&join.left);
343 let right = self.estimate_cardinality(&join.right);
344 (left * right).sqrt()
345 }
346 LogicalOperator::Aggregate(agg) => {
347 if agg.group_by.is_empty() {
348 1.0
349 } else {
350 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351 }
352 }
353 LogicalOperator::Distinct(distinct) => {
354 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355 }
356 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357 LogicalOperator::Limit(limit) => self
358 .estimate_cardinality(&limit.input)
359 .min(limit.count as f64),
360 LogicalOperator::Skip(skip) => {
361 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362 }
363 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364 LogicalOperator::Union(union) => union
365 .inputs
366 .iter()
367 .map(|input| self.estimate_cardinality(input))
368 .sum(),
369 _ => 1000.0, }
371 }
372
373 fn estimate_expand_degree(
375 &self,
376 stats: &grafeo_core::statistics::Statistics,
377 expand: &ExpandOp,
378 ) -> f64 {
379 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380 if let Some(edge_type) = &expand.edge_type {
381 stats.estimate_avg_degree(edge_type, outgoing)
382 } else if stats.total_nodes > 0 {
383 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384 } else {
385 10.0 }
387 }
388
389 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391 match op {
392 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393 LogicalOperator::Expand(expand) => {
394 if self.factorized_execution {
396 let (chain_len, _base) = Self::count_expand_chain(op);
397 if chain_len >= 2 {
398 return self.plan_expand_chain(op);
400 }
401 }
402 self.plan_expand(expand)
403 }
404 LogicalOperator::Return(ret) => self.plan_return(ret),
405 LogicalOperator::Filter(filter) => self.plan_filter(filter),
406 LogicalOperator::Project(project) => self.plan_project(project),
407 LogicalOperator::Limit(limit) => self.plan_limit(limit),
408 LogicalOperator::Skip(skip) => self.plan_skip(skip),
409 LogicalOperator::Sort(sort) => self.plan_sort(sort),
410 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411 LogicalOperator::Join(join) => self.plan_join(join),
412 LogicalOperator::Union(union) => self.plan_union(union),
413 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421 LogicalOperator::Merge(merge) => self.plan_merge(merge),
422 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
427 LogicalOperator::VectorScan(_) => Err(Error::Internal(
428 "VectorScan requires vector-index feature".to_string(),
429 )),
430 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
431 "VectorJoin requires vector-index feature".to_string(),
432 )),
433 _ => Err(Error::Internal(format!(
434 "Unsupported operator: {:?}",
435 std::mem::discriminant(op)
436 ))),
437 }
438 }
439
440 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
442 let scan_op = if let Some(label) = &scan.label {
443 ScanOperator::with_label(Arc::clone(&self.store), label)
444 } else {
445 ScanOperator::new(Arc::clone(&self.store))
446 };
447
448 let scan_operator: Box<dyn Operator> =
450 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
451
452 if let Some(input) = &scan.input {
454 let (input_op, mut input_columns) = self.plan_operator(input)?;
455
456 let mut output_schema: Vec<LogicalType> =
458 input_columns.iter().map(|_| LogicalType::Any).collect();
459 output_schema.push(LogicalType::Node);
460
461 input_columns.push(scan.variable.clone());
463
464 let join_op = Box::new(NestedLoopJoinOperator::new(
466 input_op,
467 scan_operator,
468 None, PhysicalJoinType::Cross,
470 output_schema,
471 ));
472
473 Ok((join_op, input_columns))
474 } else {
475 let columns = vec![scan.variable.clone()];
476 Ok((scan_operator, columns))
477 }
478 }
479
480 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
482 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
484
485 let source_column = input_columns
487 .iter()
488 .position(|c| c == &expand.from_variable)
489 .ok_or_else(|| {
490 Error::Internal(format!(
491 "Source variable '{}' not found in input columns",
492 expand.from_variable
493 ))
494 })?;
495
496 let direction = match expand.direction {
498 ExpandDirection::Outgoing => Direction::Outgoing,
499 ExpandDirection::Incoming => Direction::Incoming,
500 ExpandDirection::Both => Direction::Both,
501 };
502
503 let is_variable_length =
505 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
506
507 let operator: Box<dyn Operator> = if is_variable_length {
508 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
511 Arc::clone(&self.store),
512 input_op,
513 source_column,
514 direction,
515 expand.edge_type.clone(),
516 expand.min_hops,
517 max_hops,
518 )
519 .with_tx_context(self.viewing_epoch, self.tx_id);
520
521 if expand.path_alias.is_some() {
523 expand_op = expand_op.with_path_length_output();
524 }
525
526 Box::new(expand_op)
527 } else {
528 let expand_op = ExpandOperator::new(
530 Arc::clone(&self.store),
531 input_op,
532 source_column,
533 direction,
534 expand.edge_type.clone(),
535 )
536 .with_tx_context(self.viewing_epoch, self.tx_id);
537 Box::new(expand_op)
538 };
539
540 let mut columns = input_columns;
543
544 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
546 let count = self.anon_edge_counter.get();
547 self.anon_edge_counter.set(count + 1);
548 format!("_anon_edge_{}", count)
549 });
550 columns.push(edge_col_name);
551
552 columns.push(expand.to_variable.clone());
553
554 if let Some(ref path_alias) = expand.path_alias {
556 columns.push(format!("_path_length_{}", path_alias));
557 }
558
559 Ok((operator, columns))
560 }
561
562 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
570 let expands = Self::collect_expand_chain(op);
571 if expands.is_empty() {
572 return Err(Error::Internal("Empty expand chain".to_string()));
573 }
574
575 let first_expand = expands[0];
577 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
578
579 let mut columns = base_columns.clone();
580 let mut steps = Vec::new();
581
582 let mut is_first = true;
587
588 for expand in &expands {
589 let source_column = if is_first {
591 base_columns
593 .iter()
594 .position(|c| c == &expand.from_variable)
595 .ok_or_else(|| {
596 Error::Internal(format!(
597 "Source variable '{}' not found in base columns",
598 expand.from_variable
599 ))
600 })?
601 } else {
602 1
605 };
606
607 let direction = match expand.direction {
609 ExpandDirection::Outgoing => Direction::Outgoing,
610 ExpandDirection::Incoming => Direction::Incoming,
611 ExpandDirection::Both => Direction::Both,
612 };
613
614 steps.push(ExpandStep {
616 source_column,
617 direction,
618 edge_type: expand.edge_type.clone(),
619 });
620
621 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
623 let count = self.anon_edge_counter.get();
624 self.anon_edge_counter.set(count + 1);
625 format!("_anon_edge_{}", count)
626 });
627 columns.push(edge_col_name);
628 columns.push(expand.to_variable.clone());
629
630 is_first = false;
631 }
632
633 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
635
636 if let Some(tx_id) = self.tx_id {
637 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
638 } else {
639 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
640 }
641
642 Ok((Box::new(lazy_op), columns))
643 }
644
645 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
647 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
649
650 let variable_columns: HashMap<String, usize> = input_columns
652 .iter()
653 .enumerate()
654 .map(|(i, name)| (name.clone(), i))
655 .collect();
656
657 let columns: Vec<String> = ret
659 .items
660 .iter()
661 .map(|item| {
662 item.alias.clone().unwrap_or_else(|| {
663 expression_to_string(&item.expression)
665 })
666 })
667 .collect();
668
669 let needs_project = ret
671 .items
672 .iter()
673 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
674
675 if needs_project {
676 let mut projections = Vec::with_capacity(ret.items.len());
678 let mut output_types = Vec::with_capacity(ret.items.len());
679
680 for item in &ret.items {
681 match &item.expression {
682 LogicalExpression::Variable(name) => {
683 let col_idx = *variable_columns.get(name).ok_or_else(|| {
684 Error::Internal(format!("Variable '{}' not found in input", name))
685 })?;
686 projections.push(ProjectExpr::Column(col_idx));
687 output_types.push(LogicalType::Node);
689 }
690 LogicalExpression::Property { variable, property } => {
691 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
692 Error::Internal(format!("Variable '{}' not found in input", variable))
693 })?;
694 projections.push(ProjectExpr::PropertyAccess {
695 column: col_idx,
696 property: property.clone(),
697 });
698 output_types.push(LogicalType::Any);
700 }
701 LogicalExpression::Literal(value) => {
702 projections.push(ProjectExpr::Constant(value.clone()));
703 output_types.push(value_to_logical_type(value));
704 }
705 LogicalExpression::FunctionCall { name, args, .. } => {
706 match name.to_lowercase().as_str() {
708 "type" => {
709 if args.len() != 1 {
711 return Err(Error::Internal(
712 "type() requires exactly one argument".to_string(),
713 ));
714 }
715 if let LogicalExpression::Variable(var_name) = &args[0] {
716 let col_idx =
717 *variable_columns.get(var_name).ok_or_else(|| {
718 Error::Internal(format!(
719 "Variable '{}' not found in input",
720 var_name
721 ))
722 })?;
723 projections.push(ProjectExpr::EdgeType { column: col_idx });
724 output_types.push(LogicalType::String);
725 } else {
726 return Err(Error::Internal(
727 "type() argument must be a variable".to_string(),
728 ));
729 }
730 }
731 "length" => {
732 if args.len() != 1 {
735 return Err(Error::Internal(
736 "length() requires exactly one argument".to_string(),
737 ));
738 }
739 if let LogicalExpression::Variable(var_name) = &args[0] {
740 let col_idx =
741 *variable_columns.get(var_name).ok_or_else(|| {
742 Error::Internal(format!(
743 "Variable '{}' not found in input",
744 var_name
745 ))
746 })?;
747 projections.push(ProjectExpr::Column(col_idx));
749 output_types.push(LogicalType::Int64);
750 } else {
751 return Err(Error::Internal(
752 "length() argument must be a variable".to_string(),
753 ));
754 }
755 }
756 _ => {
758 let filter_expr = self.convert_expression(&item.expression)?;
759 projections.push(ProjectExpr::Expression {
760 expr: filter_expr,
761 variable_columns: variable_columns.clone(),
762 });
763 output_types.push(LogicalType::Any);
764 }
765 }
766 }
767 LogicalExpression::Case { .. } => {
768 let filter_expr = self.convert_expression(&item.expression)?;
770 projections.push(ProjectExpr::Expression {
771 expr: filter_expr,
772 variable_columns: variable_columns.clone(),
773 });
774 output_types.push(LogicalType::Any);
776 }
777 _ => {
778 return Err(Error::Internal(format!(
779 "Unsupported RETURN expression: {:?}",
780 item.expression
781 )));
782 }
783 }
784 }
785
786 let operator = Box::new(ProjectOperator::with_store(
787 input_op,
788 projections,
789 output_types,
790 Arc::clone(&self.store),
791 ));
792
793 Ok((operator, columns))
794 } else {
795 let mut projections = Vec::with_capacity(ret.items.len());
798 let mut output_types = Vec::with_capacity(ret.items.len());
799
800 for item in &ret.items {
801 if let LogicalExpression::Variable(name) = &item.expression {
802 let col_idx = *variable_columns.get(name).ok_or_else(|| {
803 Error::Internal(format!("Variable '{}' not found in input", name))
804 })?;
805 projections.push(ProjectExpr::Column(col_idx));
806 output_types.push(LogicalType::Node);
807 }
808 }
809
810 if projections.len() == input_columns.len()
812 && projections
813 .iter()
814 .enumerate()
815 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
816 {
817 Ok((input_op, columns))
819 } else {
820 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
821 Ok((operator, columns))
822 }
823 }
824 }
825
826 fn plan_project(
828 &self,
829 project: &crate::query::plan::ProjectOp,
830 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
831 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
833 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
834 let single_row_op: Box<dyn Operator> = Box::new(
836 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
837 );
838 (single_row_op, Vec::new())
839 } else {
840 self.plan_operator(&project.input)?
841 };
842
843 let variable_columns: HashMap<String, usize> = input_columns
845 .iter()
846 .enumerate()
847 .map(|(i, name)| (name.clone(), i))
848 .collect();
849
850 let mut projections = Vec::with_capacity(project.projections.len());
852 let mut output_types = Vec::with_capacity(project.projections.len());
853 let mut output_columns = Vec::with_capacity(project.projections.len());
854
855 for projection in &project.projections {
856 let col_name = projection
858 .alias
859 .clone()
860 .unwrap_or_else(|| expression_to_string(&projection.expression));
861 output_columns.push(col_name);
862
863 match &projection.expression {
864 LogicalExpression::Variable(name) => {
865 let col_idx = *variable_columns.get(name).ok_or_else(|| {
866 Error::Internal(format!("Variable '{}' not found in input", name))
867 })?;
868 projections.push(ProjectExpr::Column(col_idx));
869 output_types.push(LogicalType::Node);
870 }
871 LogicalExpression::Property { variable, property } => {
872 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
873 Error::Internal(format!("Variable '{}' not found in input", variable))
874 })?;
875 projections.push(ProjectExpr::PropertyAccess {
876 column: col_idx,
877 property: property.clone(),
878 });
879 output_types.push(LogicalType::Any);
880 }
881 LogicalExpression::Literal(value) => {
882 projections.push(ProjectExpr::Constant(value.clone()));
883 output_types.push(value_to_logical_type(value));
884 }
885 _ => {
886 let filter_expr = self.convert_expression(&projection.expression)?;
888 projections.push(ProjectExpr::Expression {
889 expr: filter_expr,
890 variable_columns: variable_columns.clone(),
891 });
892 output_types.push(LogicalType::Any);
893 }
894 }
895 }
896
897 let operator = Box::new(ProjectOperator::with_store(
898 input_op,
899 projections,
900 output_types,
901 Arc::clone(&self.store),
902 ));
903
904 Ok((operator, output_columns))
905 }
906
907 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
913 if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
916 let (_, columns) = self.plan_operator(&filter.input)?;
918 let schema = self.derive_schema_from_columns(&columns);
919 let empty_op = Box::new(EmptyOperator::new(schema));
920 return Ok((empty_op, columns));
921 }
922
923 if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
925 return Ok(result);
926 }
927
928 if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
930 return Ok(result);
931 }
932
933 let (input_op, columns) = self.plan_operator(&filter.input)?;
935
936 let variable_columns: HashMap<String, usize> = columns
938 .iter()
939 .enumerate()
940 .map(|(i, name)| (name.clone(), i))
941 .collect();
942
943 let filter_expr = self.convert_expression(&filter.predicate)?;
945
946 let predicate =
948 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
949
950 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
952
953 Ok((operator, columns))
954 }
955
956 fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
963 use grafeo_core::graph::lpg::CompareOp;
964
965 match predicate {
966 LogicalExpression::Binary { left, op, right } => {
967 match op {
969 BinaryOp::And => {
970 let left_result = self.check_zone_map_for_predicate(left);
971 let right_result = self.check_zone_map_for_predicate(right);
972
973 return match (left_result, right_result) {
974 (Some(false), _) | (_, Some(false)) => Some(false),
976 (Some(true), Some(true)) => Some(true),
978 _ => None,
980 };
981 }
982 BinaryOp::Or => {
983 let left_result = self.check_zone_map_for_predicate(left);
984 let right_result = self.check_zone_map_for_predicate(right);
985
986 return match (left_result, right_result) {
987 (Some(false), Some(false)) => Some(false),
989 (Some(true), _) | (_, Some(true)) => Some(true),
991 _ => None,
993 };
994 }
995 _ => {}
996 }
997
998 let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1000 (
1001 LogicalExpression::Property { property, .. },
1002 LogicalExpression::Literal(val),
1003 ) => {
1004 let cmp = match op {
1005 BinaryOp::Eq => CompareOp::Eq,
1006 BinaryOp::Ne => CompareOp::Ne,
1007 BinaryOp::Lt => CompareOp::Lt,
1008 BinaryOp::Le => CompareOp::Le,
1009 BinaryOp::Gt => CompareOp::Gt,
1010 BinaryOp::Ge => CompareOp::Ge,
1011 _ => return None,
1012 };
1013 (property.clone(), cmp, val.clone())
1014 }
1015 (
1016 LogicalExpression::Literal(val),
1017 LogicalExpression::Property { property, .. },
1018 ) => {
1019 let cmp = match op {
1021 BinaryOp::Eq => CompareOp::Eq,
1022 BinaryOp::Ne => CompareOp::Ne,
1023 BinaryOp::Lt => CompareOp::Gt, BinaryOp::Le => CompareOp::Ge,
1025 BinaryOp::Gt => CompareOp::Lt,
1026 BinaryOp::Ge => CompareOp::Le,
1027 _ => return None,
1028 };
1029 (property.clone(), cmp, val.clone())
1030 }
1031 _ => return None,
1032 };
1033
1034 let might_match =
1036 self.store
1037 .node_property_might_match(&property.into(), compare_op, &value);
1038
1039 Some(might_match)
1040 }
1041
1042 _ => None,
1043 }
1044 }
1045
1046 fn try_plan_filter_with_property_index(
1055 &self,
1056 filter: &FilterOp,
1057 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1058 let (scan_variable, scan_label) = match filter.input.as_ref() {
1060 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1061 (scan.variable.clone(), scan.label.clone())
1062 }
1063 _ => return Ok(None),
1064 };
1065
1066 let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1069
1070 if conditions.is_empty() {
1071 return Ok(None);
1072 }
1073
1074 let has_indexed_condition = conditions
1076 .iter()
1077 .any(|(prop, _)| self.store.has_property_index(prop));
1078
1079 if !has_indexed_condition {
1080 return Ok(None);
1081 }
1082
1083 let conditions_ref: Vec<(&str, Value)> = conditions
1085 .iter()
1086 .map(|(p, v)| (p.as_str(), v.clone()))
1087 .collect();
1088 let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1089
1090 if let Some(label) = &scan_label {
1092 let label_nodes: std::collections::HashSet<_> =
1093 self.store.nodes_by_label(label).into_iter().collect();
1094 matching_nodes.retain(|n| label_nodes.contains(n));
1095 }
1096
1097 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1099 let columns = vec![scan_variable];
1100
1101 Ok(Some((node_list_op, columns)))
1102 }
1103
1104 fn extract_equality_conditions(
1110 &self,
1111 predicate: &LogicalExpression,
1112 target_variable: &str,
1113 ) -> Vec<(String, Value)> {
1114 let mut conditions = Vec::new();
1115 self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1116 conditions
1117 }
1118
1119 fn collect_equality_conditions(
1121 &self,
1122 expr: &LogicalExpression,
1123 target_variable: &str,
1124 conditions: &mut Vec<(String, Value)>,
1125 ) {
1126 match expr {
1127 LogicalExpression::Binary {
1129 left,
1130 op: BinaryOp::And,
1131 right,
1132 } => {
1133 self.collect_equality_conditions(left, target_variable, conditions);
1134 self.collect_equality_conditions(right, target_variable, conditions);
1135 }
1136
1137 LogicalExpression::Binary {
1139 left,
1140 op: BinaryOp::Eq,
1141 right,
1142 } => {
1143 if let Some((var, prop, val)) = self.extract_property_equality(left, right) {
1144 if var == target_variable {
1145 conditions.push((prop, val));
1146 }
1147 }
1148 }
1149
1150 _ => {}
1151 }
1152 }
1153
1154 fn extract_property_equality(
1156 &self,
1157 left: &LogicalExpression,
1158 right: &LogicalExpression,
1159 ) -> Option<(String, String, Value)> {
1160 match (left, right) {
1161 (
1162 LogicalExpression::Property { variable, property },
1163 LogicalExpression::Literal(val),
1164 ) => Some((variable.clone(), property.clone(), val.clone())),
1165 (
1166 LogicalExpression::Literal(val),
1167 LogicalExpression::Property { variable, property },
1168 ) => Some((variable.clone(), property.clone(), val.clone())),
1169 _ => None,
1170 }
1171 }
1172
1173 fn try_plan_filter_with_range_index(
1186 &self,
1187 filter: &FilterOp,
1188 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1189 let (scan_variable, scan_label) = match filter.input.as_ref() {
1191 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1192 (scan.variable.clone(), scan.label.clone())
1193 }
1194 _ => return Ok(None),
1195 };
1196
1197 if let Some((variable, property, min, max, min_inc, max_inc)) =
1199 self.extract_between_predicate(&filter.predicate)
1200 {
1201 if variable == scan_variable {
1202 return self.plan_range_filter(
1203 &scan_variable,
1204 &scan_label,
1205 &property,
1206 RangeBounds {
1207 min: Some(&min),
1208 max: Some(&max),
1209 min_inclusive: min_inc,
1210 max_inclusive: max_inc,
1211 },
1212 );
1213 }
1214 }
1215
1216 if let Some((variable, property, op, value)) =
1218 self.extract_range_predicate(&filter.predicate)
1219 {
1220 if variable == scan_variable {
1221 let (min, max, min_inc, max_inc) = match op {
1222 BinaryOp::Lt => (None, Some(value), false, false),
1223 BinaryOp::Le => (None, Some(value), false, true),
1224 BinaryOp::Gt => (Some(value), None, false, false),
1225 BinaryOp::Ge => (Some(value), None, true, false),
1226 _ => return Ok(None),
1227 };
1228 return self.plan_range_filter(
1229 &scan_variable,
1230 &scan_label,
1231 &property,
1232 RangeBounds {
1233 min: min.as_ref(),
1234 max: max.as_ref(),
1235 min_inclusive: min_inc,
1236 max_inclusive: max_inc,
1237 },
1238 );
1239 }
1240 }
1241
1242 Ok(None)
1243 }
1244
1245 fn plan_range_filter(
1247 &self,
1248 scan_variable: &str,
1249 scan_label: &Option<String>,
1250 property: &str,
1251 bounds: RangeBounds<'_>,
1252 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1253 let mut matching_nodes = self.store.find_nodes_in_range(
1255 property,
1256 bounds.min,
1257 bounds.max,
1258 bounds.min_inclusive,
1259 bounds.max_inclusive,
1260 );
1261
1262 if let Some(label) = scan_label {
1264 let label_nodes: std::collections::HashSet<_> =
1265 self.store.nodes_by_label(label).into_iter().collect();
1266 matching_nodes.retain(|n| label_nodes.contains(n));
1267 }
1268
1269 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1271 let columns = vec![scan_variable.to_string()];
1272
1273 Ok(Some((node_list_op, columns)))
1274 }
1275
1276 fn extract_range_predicate(
1280 &self,
1281 predicate: &LogicalExpression,
1282 ) -> Option<(String, String, BinaryOp, Value)> {
1283 match predicate {
1284 LogicalExpression::Binary { left, op, right } => {
1285 match op {
1286 BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1287 if let (
1289 LogicalExpression::Property { variable, property },
1290 LogicalExpression::Literal(val),
1291 ) = (left.as_ref(), right.as_ref())
1292 {
1293 return Some((variable.clone(), property.clone(), *op, val.clone()));
1294 }
1295
1296 if let (
1298 LogicalExpression::Literal(val),
1299 LogicalExpression::Property { variable, property },
1300 ) = (left.as_ref(), right.as_ref())
1301 {
1302 let flipped_op = match op {
1303 BinaryOp::Lt => BinaryOp::Gt,
1304 BinaryOp::Le => BinaryOp::Ge,
1305 BinaryOp::Gt => BinaryOp::Lt,
1306 BinaryOp::Ge => BinaryOp::Le,
1307 _ => return None,
1308 };
1309 return Some((
1310 variable.clone(),
1311 property.clone(),
1312 flipped_op,
1313 val.clone(),
1314 ));
1315 }
1316 }
1317 _ => {}
1318 }
1319 }
1320 _ => {}
1321 }
1322 None
1323 }
1324
1325 fn extract_between_predicate(
1333 &self,
1334 predicate: &LogicalExpression,
1335 ) -> Option<(String, String, Value, Value, bool, bool)> {
1336 let (left, right) = match predicate {
1338 LogicalExpression::Binary {
1339 left,
1340 op: BinaryOp::And,
1341 right,
1342 } => (left.as_ref(), right.as_ref()),
1343 _ => return None,
1344 };
1345
1346 let left_range = self.extract_range_predicate(left);
1348 let right_range = self.extract_range_predicate(right);
1349
1350 let (left_var, left_prop, left_op, left_val) = left_range?;
1351 let (right_var, right_prop, right_op, right_val) = right_range?;
1352
1353 if left_var != right_var || left_prop != right_prop {
1355 return None;
1356 }
1357
1358 let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1360 (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1362 (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1364 (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1366 (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1368 (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1370 (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1372 (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1374 (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1376 _ => return None,
1377 };
1378
1379 Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1380 }
1381
1382 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1384 let (input_op, columns) = self.plan_operator(&limit.input)?;
1385 let output_schema = self.derive_schema_from_columns(&columns);
1386 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1387 Ok((operator, columns))
1388 }
1389
1390 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1392 let (input_op, columns) = self.plan_operator(&skip.input)?;
1393 let output_schema = self.derive_schema_from_columns(&columns);
1394 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1395 Ok((operator, columns))
1396 }
1397
1398 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1400 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1401
1402 let mut variable_columns: HashMap<String, usize> = input_columns
1404 .iter()
1405 .enumerate()
1406 .map(|(i, name)| (name.clone(), i))
1407 .collect();
1408
1409 let mut property_projections: Vec<(String, String, String)> = Vec::new();
1411 let mut next_col_idx = input_columns.len();
1412
1413 for key in &sort.keys {
1414 if let LogicalExpression::Property { variable, property } = &key.expression {
1415 let col_name = format!("{}_{}", variable, property);
1416 if !variable_columns.contains_key(&col_name) {
1417 property_projections.push((
1418 variable.clone(),
1419 property.clone(),
1420 col_name.clone(),
1421 ));
1422 variable_columns.insert(col_name, next_col_idx);
1423 next_col_idx += 1;
1424 }
1425 }
1426 }
1427
1428 let mut output_columns = input_columns.clone();
1430
1431 if !property_projections.is_empty() {
1433 let mut projections = Vec::new();
1434 let mut output_types = Vec::new();
1435
1436 for (i, _) in input_columns.iter().enumerate() {
1439 projections.push(ProjectExpr::Column(i));
1440 output_types.push(LogicalType::Node);
1441 }
1442
1443 for (variable, property, col_name) in &property_projections {
1445 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1446 Error::Internal(format!(
1447 "Variable '{}' not found for ORDER BY property projection",
1448 variable
1449 ))
1450 })?;
1451 projections.push(ProjectExpr::PropertyAccess {
1452 column: source_col,
1453 property: property.clone(),
1454 });
1455 output_types.push(LogicalType::Any);
1456 output_columns.push(col_name.clone());
1457 }
1458
1459 input_op = Box::new(ProjectOperator::with_store(
1460 input_op,
1461 projections,
1462 output_types,
1463 Arc::clone(&self.store),
1464 ));
1465 }
1466
1467 let physical_keys: Vec<PhysicalSortKey> = sort
1469 .keys
1470 .iter()
1471 .map(|key| {
1472 let col_idx = self
1473 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1474 Ok(PhysicalSortKey {
1475 column: col_idx,
1476 direction: match key.order {
1477 SortOrder::Ascending => SortDirection::Ascending,
1478 SortOrder::Descending => SortDirection::Descending,
1479 },
1480 null_order: NullOrder::NullsLast,
1481 })
1482 })
1483 .collect::<Result<Vec<_>>>()?;
1484
1485 let output_schema = self.derive_schema_from_columns(&output_columns);
1486 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1487 Ok((operator, output_columns))
1488 }
1489
1490 fn resolve_sort_expression_with_properties(
1492 &self,
1493 expr: &LogicalExpression,
1494 variable_columns: &HashMap<String, usize>,
1495 ) -> Result<usize> {
1496 match expr {
1497 LogicalExpression::Variable(name) => {
1498 variable_columns.get(name).copied().ok_or_else(|| {
1499 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1500 })
1501 }
1502 LogicalExpression::Property { variable, property } => {
1503 let col_name = format!("{}_{}", variable, property);
1505 variable_columns.get(&col_name).copied().ok_or_else(|| {
1506 Error::Internal(format!(
1507 "Property column '{}' not found for ORDER BY (from {}.{})",
1508 col_name, variable, property
1509 ))
1510 })
1511 }
1512 _ => Err(Error::Internal(format!(
1513 "Unsupported ORDER BY expression: {:?}",
1514 expr
1515 ))),
1516 }
1517 }
1518
1519 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1521 columns.iter().map(|_| LogicalType::Any).collect()
1522 }
1523
1524 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1526 if self.factorized_execution
1533 && agg.group_by.is_empty()
1534 && Self::count_expand_chain(&agg.input).0 >= 2
1535 && self.is_simple_aggregate(agg)
1536 {
1537 if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1538 return Ok((op, cols));
1539 }
1540 }
1542
1543 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1544
1545 let mut variable_columns: HashMap<String, usize> = input_columns
1547 .iter()
1548 .enumerate()
1549 .map(|(i, name)| (name.clone(), i))
1550 .collect();
1551
1552 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1555
1556 for expr in &agg.group_by {
1558 if let LogicalExpression::Property { variable, property } = expr {
1559 let col_name = format!("{}_{}", variable, property);
1560 if !variable_columns.contains_key(&col_name) {
1561 property_projections.push((
1562 variable.clone(),
1563 property.clone(),
1564 col_name.clone(),
1565 ));
1566 variable_columns.insert(col_name, next_col_idx);
1567 next_col_idx += 1;
1568 }
1569 }
1570 }
1571
1572 for agg_expr in &agg.aggregates {
1574 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1575 let col_name = format!("{}_{}", variable, property);
1576 if !variable_columns.contains_key(&col_name) {
1577 property_projections.push((
1578 variable.clone(),
1579 property.clone(),
1580 col_name.clone(),
1581 ));
1582 variable_columns.insert(col_name, next_col_idx);
1583 next_col_idx += 1;
1584 }
1585 }
1586 }
1587
1588 if !property_projections.is_empty() {
1590 let mut projections = Vec::new();
1591 let mut output_types = Vec::new();
1592
1593 for (i, _) in input_columns.iter().enumerate() {
1596 projections.push(ProjectExpr::Column(i));
1597 output_types.push(LogicalType::Node);
1598 }
1599
1600 for (variable, property, _col_name) in &property_projections {
1602 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1603 Error::Internal(format!(
1604 "Variable '{}' not found for property projection",
1605 variable
1606 ))
1607 })?;
1608 projections.push(ProjectExpr::PropertyAccess {
1609 column: source_col,
1610 property: property.clone(),
1611 });
1612 output_types.push(LogicalType::Any); }
1614
1615 input_op = Box::new(ProjectOperator::with_store(
1616 input_op,
1617 projections,
1618 output_types,
1619 Arc::clone(&self.store),
1620 ));
1621 }
1622
1623 let group_columns: Vec<usize> = agg
1625 .group_by
1626 .iter()
1627 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1628 .collect::<Result<Vec<_>>>()?;
1629
1630 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1632 .aggregates
1633 .iter()
1634 .map(|agg_expr| {
1635 let column = agg_expr
1636 .expression
1637 .as_ref()
1638 .map(|e| {
1639 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1640 })
1641 .transpose()?;
1642
1643 Ok(PhysicalAggregateExpr {
1644 function: convert_aggregate_function(agg_expr.function),
1645 column,
1646 distinct: agg_expr.distinct,
1647 alias: agg_expr.alias.clone(),
1648 percentile: agg_expr.percentile,
1649 })
1650 })
1651 .collect::<Result<Vec<_>>>()?;
1652
1653 let mut output_schema = Vec::new();
1655 let mut output_columns = Vec::new();
1656
1657 for expr in &agg.group_by {
1659 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1661 }
1662
1663 for agg_expr in &agg.aggregates {
1665 let result_type = match agg_expr.function {
1666 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1667 LogicalType::Int64
1668 }
1669 LogicalAggregateFunction::Sum => LogicalType::Int64,
1670 LogicalAggregateFunction::Avg => LogicalType::Float64,
1671 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1672 LogicalType::Int64
1676 }
1677 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1680 | LogicalAggregateFunction::StdDevPop
1681 | LogicalAggregateFunction::PercentileDisc
1682 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1683 };
1684 output_schema.push(result_type);
1685 output_columns.push(
1686 agg_expr
1687 .alias
1688 .clone()
1689 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1690 );
1691 }
1692
1693 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1695 Box::new(SimpleAggregateOperator::new(
1696 input_op,
1697 physical_aggregates,
1698 output_schema,
1699 ))
1700 } else {
1701 Box::new(HashAggregateOperator::new(
1702 input_op,
1703 group_columns,
1704 physical_aggregates,
1705 output_schema,
1706 ))
1707 };
1708
1709 if let Some(having_expr) = &agg.having {
1711 let having_var_columns: HashMap<String, usize> = output_columns
1713 .iter()
1714 .enumerate()
1715 .map(|(i, name)| (name.clone(), i))
1716 .collect();
1717
1718 let filter_expr = self.convert_expression(having_expr)?;
1719 let predicate =
1720 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1721 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1722 }
1723
1724 Ok((operator, output_columns))
1725 }
1726
1727 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1733 agg.aggregates.iter().all(|agg_expr| {
1734 match agg_expr.function {
1735 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1736 agg_expr.expression.is_none()
1738 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1739 }
1740 LogicalAggregateFunction::Sum
1741 | LogicalAggregateFunction::Avg
1742 | LogicalAggregateFunction::Min
1743 | LogicalAggregateFunction::Max => {
1744 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1747 }
1748 _ => false,
1750 }
1751 })
1752 }
1753
1754 fn plan_factorized_aggregate(
1758 &self,
1759 agg: &AggregateOp,
1760 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1761 let expands = Self::collect_expand_chain(&agg.input);
1763 if expands.is_empty() {
1764 return Err(Error::Internal(
1765 "Expected expand chain for factorized aggregate".to_string(),
1766 ));
1767 }
1768
1769 let first_expand = expands[0];
1771 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1772
1773 let mut columns = base_columns.clone();
1774 let mut steps = Vec::new();
1775 let mut is_first = true;
1776
1777 for expand in &expands {
1778 let source_column = if is_first {
1780 base_columns
1781 .iter()
1782 .position(|c| c == &expand.from_variable)
1783 .ok_or_else(|| {
1784 Error::Internal(format!(
1785 "Source variable '{}' not found in base columns",
1786 expand.from_variable
1787 ))
1788 })?
1789 } else {
1790 1 };
1792
1793 let direction = match expand.direction {
1794 ExpandDirection::Outgoing => Direction::Outgoing,
1795 ExpandDirection::Incoming => Direction::Incoming,
1796 ExpandDirection::Both => Direction::Both,
1797 };
1798
1799 steps.push(ExpandStep {
1800 source_column,
1801 direction,
1802 edge_type: expand.edge_type.clone(),
1803 });
1804
1805 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1806 let count = self.anon_edge_counter.get();
1807 self.anon_edge_counter.set(count + 1);
1808 format!("_anon_edge_{}", count)
1809 });
1810 columns.push(edge_col_name);
1811 columns.push(expand.to_variable.clone());
1812
1813 is_first = false;
1814 }
1815
1816 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1818
1819 if let Some(tx_id) = self.tx_id {
1820 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1821 } else {
1822 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1823 }
1824
1825 let factorized_aggs: Vec<FactorizedAggregate> = agg
1827 .aggregates
1828 .iter()
1829 .map(|agg_expr| {
1830 match agg_expr.function {
1831 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1832 if agg_expr.expression.is_none() {
1834 FactorizedAggregate::count()
1835 } else {
1836 FactorizedAggregate::count_column(1) }
1840 }
1841 LogicalAggregateFunction::Sum => {
1842 FactorizedAggregate::sum(1)
1844 }
1845 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1846 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1847 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1848 _ => {
1849 FactorizedAggregate::count()
1851 }
1852 }
1853 })
1854 .collect();
1855
1856 let output_columns: Vec<String> = agg
1858 .aggregates
1859 .iter()
1860 .map(|agg_expr| {
1861 agg_expr
1862 .alias
1863 .clone()
1864 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1865 })
1866 .collect();
1867
1868 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1870
1871 Ok((Box::new(factorized_agg_op), output_columns))
1872 }
1873
1874 #[allow(dead_code)]
1876 fn resolve_expression_to_column(
1877 &self,
1878 expr: &LogicalExpression,
1879 variable_columns: &HashMap<String, usize>,
1880 ) -> Result<usize> {
1881 match expr {
1882 LogicalExpression::Variable(name) => variable_columns
1883 .get(name)
1884 .copied()
1885 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1886 LogicalExpression::Property { variable, .. } => variable_columns
1887 .get(variable)
1888 .copied()
1889 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1890 _ => Err(Error::Internal(format!(
1891 "Cannot resolve expression to column: {:?}",
1892 expr
1893 ))),
1894 }
1895 }
1896
1897 fn resolve_expression_to_column_with_properties(
1901 &self,
1902 expr: &LogicalExpression,
1903 variable_columns: &HashMap<String, usize>,
1904 ) -> Result<usize> {
1905 match expr {
1906 LogicalExpression::Variable(name) => variable_columns
1907 .get(name)
1908 .copied()
1909 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1910 LogicalExpression::Property { variable, property } => {
1911 let col_name = format!("{}_{}", variable, property);
1913 variable_columns.get(&col_name).copied().ok_or_else(|| {
1914 Error::Internal(format!(
1915 "Property column '{}' not found (from {}.{})",
1916 col_name, variable, property
1917 ))
1918 })
1919 }
1920 _ => Err(Error::Internal(format!(
1921 "Cannot resolve expression to column: {:?}",
1922 expr
1923 ))),
1924 }
1925 }
1926
1927 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1929 match expr {
1930 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1931 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1932 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1933 variable: variable.clone(),
1934 property: property.clone(),
1935 }),
1936 LogicalExpression::Binary { left, op, right } => {
1937 let left_expr = self.convert_expression(left)?;
1938 let right_expr = self.convert_expression(right)?;
1939 let filter_op = convert_binary_op(*op)?;
1940 Ok(FilterExpression::Binary {
1941 left: Box::new(left_expr),
1942 op: filter_op,
1943 right: Box::new(right_expr),
1944 })
1945 }
1946 LogicalExpression::Unary { op, operand } => {
1947 let operand_expr = self.convert_expression(operand)?;
1948 let filter_op = convert_unary_op(*op)?;
1949 Ok(FilterExpression::Unary {
1950 op: filter_op,
1951 operand: Box::new(operand_expr),
1952 })
1953 }
1954 LogicalExpression::FunctionCall { name, args, .. } => {
1955 let filter_args: Vec<FilterExpression> = args
1956 .iter()
1957 .map(|a| self.convert_expression(a))
1958 .collect::<Result<Vec<_>>>()?;
1959 Ok(FilterExpression::FunctionCall {
1960 name: name.clone(),
1961 args: filter_args,
1962 })
1963 }
1964 LogicalExpression::Case {
1965 operand,
1966 when_clauses,
1967 else_clause,
1968 } => {
1969 let filter_operand = operand
1970 .as_ref()
1971 .map(|e| self.convert_expression(e))
1972 .transpose()?
1973 .map(Box::new);
1974 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1975 .iter()
1976 .map(|(cond, result)| {
1977 Ok((
1978 self.convert_expression(cond)?,
1979 self.convert_expression(result)?,
1980 ))
1981 })
1982 .collect::<Result<Vec<_>>>()?;
1983 let filter_else = else_clause
1984 .as_ref()
1985 .map(|e| self.convert_expression(e))
1986 .transpose()?
1987 .map(Box::new);
1988 Ok(FilterExpression::Case {
1989 operand: filter_operand,
1990 when_clauses: filter_when_clauses,
1991 else_clause: filter_else,
1992 })
1993 }
1994 LogicalExpression::List(items) => {
1995 let filter_items: Vec<FilterExpression> = items
1996 .iter()
1997 .map(|item| self.convert_expression(item))
1998 .collect::<Result<Vec<_>>>()?;
1999 Ok(FilterExpression::List(filter_items))
2000 }
2001 LogicalExpression::Map(pairs) => {
2002 let filter_pairs: Vec<(String, FilterExpression)> = pairs
2003 .iter()
2004 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2005 .collect::<Result<Vec<_>>>()?;
2006 Ok(FilterExpression::Map(filter_pairs))
2007 }
2008 LogicalExpression::IndexAccess { base, index } => {
2009 let base_expr = self.convert_expression(base)?;
2010 let index_expr = self.convert_expression(index)?;
2011 Ok(FilterExpression::IndexAccess {
2012 base: Box::new(base_expr),
2013 index: Box::new(index_expr),
2014 })
2015 }
2016 LogicalExpression::SliceAccess { base, start, end } => {
2017 let base_expr = self.convert_expression(base)?;
2018 let start_expr = start
2019 .as_ref()
2020 .map(|s| self.convert_expression(s))
2021 .transpose()?
2022 .map(Box::new);
2023 let end_expr = end
2024 .as_ref()
2025 .map(|e| self.convert_expression(e))
2026 .transpose()?
2027 .map(Box::new);
2028 Ok(FilterExpression::SliceAccess {
2029 base: Box::new(base_expr),
2030 start: start_expr,
2031 end: end_expr,
2032 })
2033 }
2034 LogicalExpression::Parameter(_) => Err(Error::Internal(
2035 "Parameters not yet supported in filters".to_string(),
2036 )),
2037 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2038 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2039 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2040 LogicalExpression::ListComprehension {
2041 variable,
2042 list_expr,
2043 filter_expr,
2044 map_expr,
2045 } => {
2046 let list = self.convert_expression(list_expr)?;
2047 let filter = filter_expr
2048 .as_ref()
2049 .map(|f| self.convert_expression(f))
2050 .transpose()?
2051 .map(Box::new);
2052 let map = self.convert_expression(map_expr)?;
2053 Ok(FilterExpression::ListComprehension {
2054 variable: variable.clone(),
2055 list_expr: Box::new(list),
2056 filter_expr: filter,
2057 map_expr: Box::new(map),
2058 })
2059 }
2060 LogicalExpression::ExistsSubquery(subplan) => {
2061 let (start_var, direction, edge_type, end_labels) =
2064 self.extract_exists_pattern(subplan)?;
2065
2066 Ok(FilterExpression::ExistsSubquery {
2067 start_var,
2068 direction,
2069 edge_type,
2070 end_labels,
2071 min_hops: None,
2072 max_hops: None,
2073 })
2074 }
2075 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2076 "COUNT subqueries not yet supported".to_string(),
2077 )),
2078 }
2079 }
2080
2081 fn extract_exists_pattern(
2084 &self,
2085 subplan: &LogicalOperator,
2086 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2087 match subplan {
2088 LogicalOperator::Expand(expand) => {
2089 let end_labels = self.extract_end_labels_from_expand(expand);
2091 let direction = match expand.direction {
2092 ExpandDirection::Outgoing => Direction::Outgoing,
2093 ExpandDirection::Incoming => Direction::Incoming,
2094 ExpandDirection::Both => Direction::Both,
2095 };
2096 Ok((
2097 expand.from_variable.clone(),
2098 direction,
2099 expand.edge_type.clone(),
2100 end_labels,
2101 ))
2102 }
2103 LogicalOperator::NodeScan(scan) => {
2104 if let Some(input) = &scan.input {
2105 self.extract_exists_pattern(input)
2106 } else {
2107 Err(Error::Internal(
2108 "EXISTS subquery must contain an edge pattern".to_string(),
2109 ))
2110 }
2111 }
2112 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2113 _ => Err(Error::Internal(
2114 "Unsupported EXISTS subquery pattern".to_string(),
2115 )),
2116 }
2117 }
2118
2119 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2121 match expand.input.as_ref() {
2123 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2124 _ => None,
2125 }
2126 }
2127
2128 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2130 let (left_op, left_columns) = self.plan_operator(&join.left)?;
2131 let (right_op, right_columns) = self.plan_operator(&join.right)?;
2132
2133 let mut columns = left_columns.clone();
2135 columns.extend(right_columns.clone());
2136
2137 let physical_join_type = match join.join_type {
2139 JoinType::Inner => PhysicalJoinType::Inner,
2140 JoinType::Left => PhysicalJoinType::Left,
2141 JoinType::Right => PhysicalJoinType::Right,
2142 JoinType::Full => PhysicalJoinType::Full,
2143 JoinType::Cross => PhysicalJoinType::Cross,
2144 JoinType::Semi => PhysicalJoinType::Semi,
2145 JoinType::Anti => PhysicalJoinType::Anti,
2146 };
2147
2148 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2150 (vec![], vec![])
2152 } else {
2153 join.conditions
2154 .iter()
2155 .filter_map(|cond| {
2156 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2158 let right_idx = self
2159 .expression_to_column(&cond.right, &right_columns)
2160 .ok()?;
2161 Some((left_idx, right_idx))
2162 })
2163 .unzip()
2164 };
2165
2166 let output_schema = self.derive_schema_from_columns(&columns);
2167
2168 let _ = LeapfrogJoinOperator::new; let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2176 left_op,
2177 right_op,
2178 probe_keys,
2179 build_keys,
2180 physical_join_type,
2181 output_schema,
2182 ));
2183
2184 Ok((operator, columns))
2185 }
2186
2187 #[allow(dead_code)]
2196 fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2197 let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2199 let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2200
2201 Self::collect_join_edges(
2203 &LogicalOperator::Join(join.clone()),
2204 &mut edges,
2205 &mut all_vars,
2206 );
2207
2208 if all_vars.len() < 3 {
2210 return false;
2211 }
2212
2213 Self::has_cycle(&edges, &all_vars)
2215 }
2216
2217 fn collect_join_edges(
2219 op: &LogicalOperator,
2220 edges: &mut HashMap<String, Vec<String>>,
2221 vars: &mut std::collections::HashSet<String>,
2222 ) {
2223 match op {
2224 LogicalOperator::Join(join) => {
2225 for cond in &join.conditions {
2227 if let (Some(left_var), Some(right_var)) = (
2228 Self::extract_join_variable(&cond.left),
2229 Self::extract_join_variable(&cond.right),
2230 ) {
2231 if left_var != right_var {
2232 vars.insert(left_var.clone());
2233 vars.insert(right_var.clone());
2234
2235 edges
2237 .entry(left_var.clone())
2238 .or_default()
2239 .push(right_var.clone());
2240 edges.entry(right_var).or_default().push(left_var);
2241 }
2242 }
2243 }
2244
2245 Self::collect_join_edges(&join.left, edges, vars);
2247 Self::collect_join_edges(&join.right, edges, vars);
2248 }
2249 LogicalOperator::Expand(expand) => {
2250 vars.insert(expand.from_variable.clone());
2252 vars.insert(expand.to_variable.clone());
2253
2254 edges
2255 .entry(expand.from_variable.clone())
2256 .or_default()
2257 .push(expand.to_variable.clone());
2258 edges
2259 .entry(expand.to_variable.clone())
2260 .or_default()
2261 .push(expand.from_variable.clone());
2262
2263 Self::collect_join_edges(&expand.input, edges, vars);
2264 }
2265 LogicalOperator::Filter(filter) => {
2266 Self::collect_join_edges(&filter.input, edges, vars);
2267 }
2268 LogicalOperator::NodeScan(scan) => {
2269 vars.insert(scan.variable.clone());
2270 }
2271 _ => {}
2272 }
2273 }
2274
2275 fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2277 match expr {
2278 LogicalExpression::Variable(v) => Some(v.clone()),
2279 LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2280 LogicalExpression::Id(v) => Some(v.clone()),
2281 _ => None,
2282 }
2283 }
2284
2285 fn has_cycle(
2289 edges: &HashMap<String, Vec<String>>,
2290 vars: &std::collections::HashSet<String>,
2291 ) -> bool {
2292 let mut color: HashMap<&String, u8> = HashMap::new();
2293
2294 for var in vars {
2295 color.insert(var, 0);
2296 }
2297
2298 for start in vars {
2299 if color[start] == 0 {
2300 if Self::dfs_cycle(start, None, edges, &mut color) {
2301 return true;
2302 }
2303 }
2304 }
2305
2306 false
2307 }
2308
2309 fn dfs_cycle(
2311 node: &String,
2312 parent: Option<&String>,
2313 edges: &HashMap<String, Vec<String>>,
2314 color: &mut HashMap<&String, u8>,
2315 ) -> bool {
2316 *color.get_mut(node).unwrap() = 1; if let Some(neighbors) = edges.get(node) {
2319 for neighbor in neighbors {
2320 if parent == Some(neighbor) {
2322 continue;
2323 }
2324
2325 if let Some(&c) = color.get(neighbor) {
2326 if c == 1 {
2327 return true;
2329 }
2330 if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2331 return true;
2332 }
2333 }
2334 }
2335 }
2336
2337 *color.get_mut(node).unwrap() = 2; false
2339 }
2340
2341 #[allow(dead_code)]
2343 fn count_relations(op: &LogicalOperator) -> usize {
2344 match op {
2345 LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2346 LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2347 LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2348 LogicalOperator::Join(j) => {
2349 Self::count_relations(&j.left) + Self::count_relations(&j.right)
2350 }
2351 _ => 0,
2352 }
2353 }
2354
2355 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2357 match expr {
2358 LogicalExpression::Variable(name) => columns
2359 .iter()
2360 .position(|c| c == name)
2361 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2362 _ => Err(Error::Internal(
2363 "Only variables supported in join conditions".to_string(),
2364 )),
2365 }
2366 }
2367
2368 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2370 if union.inputs.is_empty() {
2371 return Err(Error::Internal(
2372 "Union requires at least one input".to_string(),
2373 ));
2374 }
2375
2376 let mut inputs = Vec::with_capacity(union.inputs.len());
2377 let mut columns = Vec::new();
2378
2379 for (i, input) in union.inputs.iter().enumerate() {
2380 let (op, cols) = self.plan_operator(input)?;
2381 if i == 0 {
2382 columns = cols;
2383 }
2384 inputs.push(op);
2385 }
2386
2387 let output_schema = self.derive_schema_from_columns(&columns);
2388 let operator = Box::new(UnionOperator::new(inputs, output_schema));
2389
2390 Ok((operator, columns))
2391 }
2392
2393 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2395 let (input_op, columns) = self.plan_operator(&distinct.input)?;
2396 let output_schema = self.derive_schema_from_columns(&columns);
2397 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2398 Ok((operator, columns))
2399 }
2400
2401 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2403 let (input_op, mut columns) = if let Some(ref input) = create.input {
2405 let (op, cols) = self.plan_operator(input)?;
2406 (Some(op), cols)
2407 } else {
2408 (None, vec![])
2409 };
2410
2411 let output_column = columns.len();
2413 columns.push(create.variable.clone());
2414
2415 let properties: Vec<(String, PropertySource)> = create
2417 .properties
2418 .iter()
2419 .map(|(name, expr)| {
2420 let source = match Self::try_fold_expression(expr) {
2421 Some(value) => PropertySource::Constant(value),
2422 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2423 };
2424 (name.clone(), source)
2425 })
2426 .collect();
2427
2428 let output_schema = self.derive_schema_from_columns(&columns);
2429
2430 let operator = Box::new(
2431 CreateNodeOperator::new(
2432 Arc::clone(&self.store),
2433 input_op,
2434 create.labels.clone(),
2435 properties,
2436 output_schema,
2437 output_column,
2438 )
2439 .with_tx_context(self.viewing_epoch, self.tx_id),
2440 );
2441
2442 Ok((operator, columns))
2443 }
2444
2445 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2447 let (input_op, mut columns) = self.plan_operator(&create.input)?;
2448
2449 let from_column = columns
2451 .iter()
2452 .position(|c| c == &create.from_variable)
2453 .ok_or_else(|| {
2454 Error::Internal(format!(
2455 "Source variable '{}' not found",
2456 create.from_variable
2457 ))
2458 })?;
2459
2460 let to_column = columns
2461 .iter()
2462 .position(|c| c == &create.to_variable)
2463 .ok_or_else(|| {
2464 Error::Internal(format!(
2465 "Target variable '{}' not found",
2466 create.to_variable
2467 ))
2468 })?;
2469
2470 let output_column = create.variable.as_ref().map(|v| {
2472 let idx = columns.len();
2473 columns.push(v.clone());
2474 idx
2475 });
2476
2477 let properties: Vec<(String, PropertySource)> = create
2479 .properties
2480 .iter()
2481 .map(|(name, expr)| {
2482 let source = match Self::try_fold_expression(expr) {
2483 Some(value) => PropertySource::Constant(value),
2484 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2485 };
2486 (name.clone(), source)
2487 })
2488 .collect();
2489
2490 let output_schema = self.derive_schema_from_columns(&columns);
2491
2492 let mut operator = CreateEdgeOperator::new(
2493 Arc::clone(&self.store),
2494 input_op,
2495 from_column,
2496 to_column,
2497 create.edge_type.clone(),
2498 output_schema,
2499 )
2500 .with_properties(properties)
2501 .with_tx_context(self.viewing_epoch, self.tx_id);
2502
2503 if let Some(col) = output_column {
2504 operator = operator.with_output_column(col);
2505 }
2506
2507 let operator = Box::new(operator);
2508
2509 Ok((operator, columns))
2510 }
2511
2512 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2514 let (input_op, columns) = self.plan_operator(&delete.input)?;
2515
2516 let node_column = columns
2517 .iter()
2518 .position(|c| c == &delete.variable)
2519 .ok_or_else(|| {
2520 Error::Internal(format!(
2521 "Variable '{}' not found for delete",
2522 delete.variable
2523 ))
2524 })?;
2525
2526 let output_schema = vec![LogicalType::Int64];
2528 let output_columns = vec!["deleted_count".to_string()];
2529
2530 let operator = Box::new(
2531 DeleteNodeOperator::new(
2532 Arc::clone(&self.store),
2533 input_op,
2534 node_column,
2535 output_schema,
2536 delete.detach, )
2538 .with_tx_context(self.viewing_epoch, self.tx_id),
2539 );
2540
2541 Ok((operator, output_columns))
2542 }
2543
2544 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2546 let (input_op, columns) = self.plan_operator(&delete.input)?;
2547
2548 let edge_column = columns
2549 .iter()
2550 .position(|c| c == &delete.variable)
2551 .ok_or_else(|| {
2552 Error::Internal(format!(
2553 "Variable '{}' not found for delete",
2554 delete.variable
2555 ))
2556 })?;
2557
2558 let output_schema = vec![LogicalType::Int64];
2560 let output_columns = vec!["deleted_count".to_string()];
2561
2562 let operator = Box::new(
2563 DeleteEdgeOperator::new(
2564 Arc::clone(&self.store),
2565 input_op,
2566 edge_column,
2567 output_schema,
2568 )
2569 .with_tx_context(self.viewing_epoch, self.tx_id),
2570 );
2571
2572 Ok((operator, output_columns))
2573 }
2574
2575 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2577 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2578 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2579
2580 let mut columns = left_columns.clone();
2582 columns.extend(right_columns.clone());
2583
2584 let mut probe_keys = Vec::new();
2586 let mut build_keys = Vec::new();
2587
2588 for (right_idx, right_col) in right_columns.iter().enumerate() {
2589 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2590 probe_keys.push(left_idx);
2591 build_keys.push(right_idx);
2592 }
2593 }
2594
2595 let output_schema = self.derive_schema_from_columns(&columns);
2596
2597 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2598 left_op,
2599 right_op,
2600 probe_keys,
2601 build_keys,
2602 PhysicalJoinType::Left,
2603 output_schema,
2604 ));
2605
2606 Ok((operator, columns))
2607 }
2608
2609 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2611 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2612 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2613
2614 let columns = left_columns.clone();
2616
2617 let mut probe_keys = Vec::new();
2619 let mut build_keys = Vec::new();
2620
2621 for (right_idx, right_col) in right_columns.iter().enumerate() {
2622 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2623 probe_keys.push(left_idx);
2624 build_keys.push(right_idx);
2625 }
2626 }
2627
2628 let output_schema = self.derive_schema_from_columns(&columns);
2629
2630 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2631 left_op,
2632 right_op,
2633 probe_keys,
2634 build_keys,
2635 PhysicalJoinType::Anti,
2636 output_schema,
2637 ));
2638
2639 Ok((operator, columns))
2640 }
2641
2642 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2644 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2647 if matches!(&*unwind.input, LogicalOperator::Empty) {
2648 let literal_list = self.convert_expression(&unwind.expression)?;
2653
2654 let single_row_op: Box<dyn Operator> = Box::new(
2656 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2657 );
2658 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2659 single_row_op,
2660 vec![ProjectExpr::Expression {
2661 expr: literal_list,
2662 variable_columns: HashMap::new(),
2663 }],
2664 vec![LogicalType::Any],
2665 Arc::clone(&self.store),
2666 ));
2667
2668 (project_op, vec!["__list__".to_string()])
2669 } else {
2670 self.plan_operator(&unwind.input)?
2671 };
2672
2673 let list_col_idx = match &unwind.expression {
2679 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2680 LogicalExpression::Property { variable, .. } => {
2681 input_columns.iter().position(|c| c == variable)
2684 }
2685 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2686 None
2688 }
2689 _ => None,
2690 };
2691
2692 let mut columns = input_columns.clone();
2694 columns.push(unwind.variable.clone());
2695
2696 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2698 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2703
2704 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2705 input_op,
2706 col_idx,
2707 unwind.variable.clone(),
2708 output_schema,
2709 ));
2710
2711 Ok((operator, columns))
2712 }
2713
2714 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2716 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2718 Vec::new()
2719 } else {
2720 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2721 cols
2722 };
2723
2724 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2726 .match_properties
2727 .iter()
2728 .filter_map(|(name, expr)| {
2729 if let LogicalExpression::Literal(v) = expr {
2730 Some((name.clone(), v.clone()))
2731 } else {
2732 None }
2734 })
2735 .collect();
2736
2737 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2739 .on_create
2740 .iter()
2741 .filter_map(|(name, expr)| {
2742 if let LogicalExpression::Literal(v) = expr {
2743 Some((name.clone(), v.clone()))
2744 } else {
2745 None
2746 }
2747 })
2748 .collect();
2749
2750 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2752 .on_match
2753 .iter()
2754 .filter_map(|(name, expr)| {
2755 if let LogicalExpression::Literal(v) = expr {
2756 Some((name.clone(), v.clone()))
2757 } else {
2758 None
2759 }
2760 })
2761 .collect();
2762
2763 columns.push(merge.variable.clone());
2765
2766 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2767 Arc::clone(&self.store),
2768 merge.variable.clone(),
2769 merge.labels.clone(),
2770 match_properties,
2771 on_create_properties,
2772 on_match_properties,
2773 ));
2774
2775 Ok((operator, columns))
2776 }
2777
2778 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2780 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2782
2783 let source_column = columns
2785 .iter()
2786 .position(|c| c == &sp.source_var)
2787 .ok_or_else(|| {
2788 Error::Internal(format!(
2789 "Source variable '{}' not found for shortestPath",
2790 sp.source_var
2791 ))
2792 })?;
2793
2794 let target_column = columns
2795 .iter()
2796 .position(|c| c == &sp.target_var)
2797 .ok_or_else(|| {
2798 Error::Internal(format!(
2799 "Target variable '{}' not found for shortestPath",
2800 sp.target_var
2801 ))
2802 })?;
2803
2804 let direction = match sp.direction {
2806 ExpandDirection::Outgoing => Direction::Outgoing,
2807 ExpandDirection::Incoming => Direction::Incoming,
2808 ExpandDirection::Both => Direction::Both,
2809 };
2810
2811 let operator: Box<dyn Operator> = Box::new(
2813 ShortestPathOperator::new(
2814 Arc::clone(&self.store),
2815 input_op,
2816 source_column,
2817 target_column,
2818 sp.edge_type.clone(),
2819 direction,
2820 )
2821 .with_all_paths(sp.all_paths),
2822 );
2823
2824 columns.push(format!("_path_length_{}", sp.path_alias));
2827
2828 Ok((operator, columns))
2829 }
2830
2831 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2833 let (input_op, columns) = self.plan_operator(&add_label.input)?;
2834
2835 let node_column = columns
2837 .iter()
2838 .position(|c| c == &add_label.variable)
2839 .ok_or_else(|| {
2840 Error::Internal(format!(
2841 "Variable '{}' not found for ADD LABEL",
2842 add_label.variable
2843 ))
2844 })?;
2845
2846 let output_schema = vec![LogicalType::Int64];
2848 let output_columns = vec!["labels_added".to_string()];
2849
2850 let operator = Box::new(AddLabelOperator::new(
2851 Arc::clone(&self.store),
2852 input_op,
2853 node_column,
2854 add_label.labels.clone(),
2855 output_schema,
2856 ));
2857
2858 Ok((operator, output_columns))
2859 }
2860
2861 fn plan_remove_label(
2863 &self,
2864 remove_label: &RemoveLabelOp,
2865 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2866 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2867
2868 let node_column = columns
2870 .iter()
2871 .position(|c| c == &remove_label.variable)
2872 .ok_or_else(|| {
2873 Error::Internal(format!(
2874 "Variable '{}' not found for REMOVE LABEL",
2875 remove_label.variable
2876 ))
2877 })?;
2878
2879 let output_schema = vec![LogicalType::Int64];
2881 let output_columns = vec!["labels_removed".to_string()];
2882
2883 let operator = Box::new(RemoveLabelOperator::new(
2884 Arc::clone(&self.store),
2885 input_op,
2886 node_column,
2887 remove_label.labels.clone(),
2888 output_schema,
2889 ));
2890
2891 Ok((operator, output_columns))
2892 }
2893
2894 fn plan_set_property(
2896 &self,
2897 set_prop: &SetPropertyOp,
2898 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2899 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2900
2901 let entity_column = columns
2903 .iter()
2904 .position(|c| c == &set_prop.variable)
2905 .ok_or_else(|| {
2906 Error::Internal(format!(
2907 "Variable '{}' not found for SET",
2908 set_prop.variable
2909 ))
2910 })?;
2911
2912 let properties: Vec<(String, PropertySource)> = set_prop
2914 .properties
2915 .iter()
2916 .map(|(name, expr)| {
2917 let source = self.expression_to_property_source(expr, &columns)?;
2918 Ok((name.clone(), source))
2919 })
2920 .collect::<Result<Vec<_>>>()?;
2921
2922 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2924 let output_columns = columns.clone();
2925
2926 let operator = Box::new(SetPropertyOperator::new_for_node(
2928 Arc::clone(&self.store),
2929 input_op,
2930 entity_column,
2931 properties,
2932 output_schema,
2933 ));
2934
2935 Ok((operator, output_columns))
2936 }
2937
2938 fn expression_to_property_source(
2940 &self,
2941 expr: &LogicalExpression,
2942 columns: &[String],
2943 ) -> Result<PropertySource> {
2944 match expr {
2945 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2946 LogicalExpression::Variable(name) => {
2947 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2948 Error::Internal(format!("Variable '{}' not found for property source", name))
2949 })?;
2950 Ok(PropertySource::Column(col_idx))
2951 }
2952 LogicalExpression::Parameter(name) => {
2953 Ok(PropertySource::Constant(
2956 grafeo_common::types::Value::String(format!("${}", name).into()),
2957 ))
2958 }
2959 _ => {
2960 if let Some(value) = Self::try_fold_expression(expr) {
2961 Ok(PropertySource::Constant(value))
2962 } else {
2963 Err(Error::Internal(format!(
2964 "Unsupported expression type for property source: {:?}",
2965 expr
2966 )))
2967 }
2968 }
2969 }
2970 }
2971
2972 fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
2978 match expr {
2979 LogicalExpression::Literal(v) => Some(v.clone()),
2980 LogicalExpression::List(items) => {
2981 let values: Option<Vec<Value>> =
2982 items.iter().map(Self::try_fold_expression).collect();
2983 let values = values?;
2984 let all_numeric = !values.is_empty()
2986 && values
2987 .iter()
2988 .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
2989 if all_numeric {
2990 let floats: Vec<f32> = values
2991 .iter()
2992 .filter_map(|v| match v {
2993 Value::Float64(f) => Some(*f as f32),
2994 Value::Int64(i) => Some(*i as f32),
2995 _ => None,
2996 })
2997 .collect();
2998 Some(Value::Vector(floats.into()))
2999 } else {
3000 Some(Value::List(values.into()))
3001 }
3002 }
3003 LogicalExpression::FunctionCall { name, args, .. } => {
3004 match name.to_lowercase().as_str() {
3005 "vector" => {
3006 if args.len() != 1 {
3007 return None;
3008 }
3009 let val = Self::try_fold_expression(&args[0])?;
3010 match val {
3011 Value::List(items) => {
3012 let floats: Vec<f32> = items
3013 .iter()
3014 .filter_map(|v| match v {
3015 Value::Float64(f) => Some(*f as f32),
3016 Value::Int64(i) => Some(*i as f32),
3017 _ => None,
3018 })
3019 .collect();
3020 if floats.len() == items.len() {
3021 Some(Value::Vector(floats.into()))
3022 } else {
3023 None
3024 }
3025 }
3026 Value::Vector(v) => Some(Value::Vector(v)),
3028 _ => None,
3029 }
3030 }
3031 _ => None,
3032 }
3033 }
3034 _ => None,
3035 }
3036 }
3037}
3038
3039pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3041 match op {
3042 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3043 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3044 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3045 BinaryOp::Le => Ok(BinaryFilterOp::Le),
3046 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3047 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3048 BinaryOp::And => Ok(BinaryFilterOp::And),
3049 BinaryOp::Or => Ok(BinaryFilterOp::Or),
3050 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3051 BinaryOp::Add => Ok(BinaryFilterOp::Add),
3052 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3053 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3054 BinaryOp::Div => Ok(BinaryFilterOp::Div),
3055 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3056 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3057 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3058 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3059 BinaryOp::In => Ok(BinaryFilterOp::In),
3060 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3061 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3062 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3063 "Binary operator {:?} not yet supported in filters",
3064 op
3065 ))),
3066 }
3067}
3068
3069pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3071 match op {
3072 UnaryOp::Not => Ok(UnaryFilterOp::Not),
3073 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3074 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3075 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3076 }
3077}
3078
3079pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3081 match func {
3082 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3083 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3084 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3085 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3086 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3087 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3088 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3089 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3090 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3091 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3092 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3093 }
3094}
3095
3096pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3100 match expr {
3101 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3102 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3103 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3104 variable: variable.clone(),
3105 property: property.clone(),
3106 }),
3107 LogicalExpression::Binary { left, op, right } => {
3108 let left_expr = convert_filter_expression(left)?;
3109 let right_expr = convert_filter_expression(right)?;
3110 let filter_op = convert_binary_op(*op)?;
3111 Ok(FilterExpression::Binary {
3112 left: Box::new(left_expr),
3113 op: filter_op,
3114 right: Box::new(right_expr),
3115 })
3116 }
3117 LogicalExpression::Unary { op, operand } => {
3118 let operand_expr = convert_filter_expression(operand)?;
3119 let filter_op = convert_unary_op(*op)?;
3120 Ok(FilterExpression::Unary {
3121 op: filter_op,
3122 operand: Box::new(operand_expr),
3123 })
3124 }
3125 LogicalExpression::FunctionCall { name, args, .. } => {
3126 let filter_args: Vec<FilterExpression> = args
3127 .iter()
3128 .map(convert_filter_expression)
3129 .collect::<Result<Vec<_>>>()?;
3130 Ok(FilterExpression::FunctionCall {
3131 name: name.clone(),
3132 args: filter_args,
3133 })
3134 }
3135 LogicalExpression::Case {
3136 operand,
3137 when_clauses,
3138 else_clause,
3139 } => {
3140 let filter_operand = operand
3141 .as_ref()
3142 .map(|e| convert_filter_expression(e))
3143 .transpose()?
3144 .map(Box::new);
3145 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3146 .iter()
3147 .map(|(cond, result)| {
3148 Ok((
3149 convert_filter_expression(cond)?,
3150 convert_filter_expression(result)?,
3151 ))
3152 })
3153 .collect::<Result<Vec<_>>>()?;
3154 let filter_else = else_clause
3155 .as_ref()
3156 .map(|e| convert_filter_expression(e))
3157 .transpose()?
3158 .map(Box::new);
3159 Ok(FilterExpression::Case {
3160 operand: filter_operand,
3161 when_clauses: filter_when_clauses,
3162 else_clause: filter_else,
3163 })
3164 }
3165 LogicalExpression::List(items) => {
3166 let filter_items: Vec<FilterExpression> = items
3167 .iter()
3168 .map(convert_filter_expression)
3169 .collect::<Result<Vec<_>>>()?;
3170 Ok(FilterExpression::List(filter_items))
3171 }
3172 LogicalExpression::Map(pairs) => {
3173 let filter_pairs: Vec<(String, FilterExpression)> = pairs
3174 .iter()
3175 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3176 .collect::<Result<Vec<_>>>()?;
3177 Ok(FilterExpression::Map(filter_pairs))
3178 }
3179 LogicalExpression::IndexAccess { base, index } => {
3180 let base_expr = convert_filter_expression(base)?;
3181 let index_expr = convert_filter_expression(index)?;
3182 Ok(FilterExpression::IndexAccess {
3183 base: Box::new(base_expr),
3184 index: Box::new(index_expr),
3185 })
3186 }
3187 LogicalExpression::SliceAccess { base, start, end } => {
3188 let base_expr = convert_filter_expression(base)?;
3189 let start_expr = start
3190 .as_ref()
3191 .map(|s| convert_filter_expression(s))
3192 .transpose()?
3193 .map(Box::new);
3194 let end_expr = end
3195 .as_ref()
3196 .map(|e| convert_filter_expression(e))
3197 .transpose()?
3198 .map(Box::new);
3199 Ok(FilterExpression::SliceAccess {
3200 base: Box::new(base_expr),
3201 start: start_expr,
3202 end: end_expr,
3203 })
3204 }
3205 LogicalExpression::Parameter(_) => Err(Error::Internal(
3206 "Parameters not yet supported in filters".to_string(),
3207 )),
3208 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3209 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3210 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3211 LogicalExpression::ListComprehension {
3212 variable,
3213 list_expr,
3214 filter_expr,
3215 map_expr,
3216 } => {
3217 let list = convert_filter_expression(list_expr)?;
3218 let filter = filter_expr
3219 .as_ref()
3220 .map(|f| convert_filter_expression(f))
3221 .transpose()?
3222 .map(Box::new);
3223 let map = convert_filter_expression(map_expr)?;
3224 Ok(FilterExpression::ListComprehension {
3225 variable: variable.clone(),
3226 list_expr: Box::new(list),
3227 filter_expr: filter,
3228 map_expr: Box::new(map),
3229 })
3230 }
3231 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3232 Error::Internal("Subqueries not yet supported in filters".to_string()),
3233 ),
3234 }
3235}
3236
3237fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3239 use grafeo_common::types::Value;
3240 match value {
3241 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
3243 Value::Int64(_) => LogicalType::Int64,
3244 Value::Float64(_) => LogicalType::Float64,
3245 Value::String(_) => LogicalType::String,
3246 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
3248 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
3251 }
3252}
3253
3254fn expression_to_string(expr: &LogicalExpression) -> String {
3256 match expr {
3257 LogicalExpression::Variable(name) => name.clone(),
3258 LogicalExpression::Property { variable, property } => {
3259 format!("{variable}.{property}")
3260 }
3261 LogicalExpression::Literal(value) => format!("{value:?}"),
3262 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3263 _ => "expr".to_string(),
3264 }
3265}
3266
3267pub struct PhysicalPlan {
3269 pub operator: Box<dyn Operator>,
3271 pub columns: Vec<String>,
3273 pub adaptive_context: Option<AdaptiveContext>,
3279}
3280
3281impl PhysicalPlan {
3282 #[must_use]
3284 pub fn columns(&self) -> &[String] {
3285 &self.columns
3286 }
3287
3288 pub fn into_operator(self) -> Box<dyn Operator> {
3290 self.operator
3291 }
3292
3293 #[must_use]
3295 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3296 self.adaptive_context.as_ref()
3297 }
3298
3299 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3301 self.adaptive_context.take()
3302 }
3303}
3304
3305#[allow(dead_code)]
3309struct SingleResultOperator {
3310 result: Option<grafeo_core::execution::DataChunk>,
3311}
3312
3313impl SingleResultOperator {
3314 #[allow(dead_code)]
3315 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3316 Self { result }
3317 }
3318}
3319
3320impl Operator for SingleResultOperator {
3321 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3322 Ok(self.result.take())
3323 }
3324
3325 fn reset(&mut self) {
3326 }
3328
3329 fn name(&self) -> &'static str {
3330 "SingleResult"
3331 }
3332}
3333
3334#[cfg(test)]
3335mod tests {
3336 use super::*;
3337 use crate::query::plan::{
3338 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3339 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3340 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3341 SortKey, SortOp,
3342 };
3343 use grafeo_common::types::Value;
3344
3345 fn create_test_store() -> Arc<LpgStore> {
3346 let store = Arc::new(LpgStore::new());
3347 store.create_node(&["Person"]);
3348 store.create_node(&["Person"]);
3349 store.create_node(&["Company"]);
3350 store
3351 }
3352
3353 #[test]
3356 fn test_plan_simple_scan() {
3357 let store = create_test_store();
3358 let planner = Planner::new(store);
3359
3360 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3362 items: vec![ReturnItem {
3363 expression: LogicalExpression::Variable("n".to_string()),
3364 alias: None,
3365 }],
3366 distinct: false,
3367 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3368 variable: "n".to_string(),
3369 label: Some("Person".to_string()),
3370 input: None,
3371 })),
3372 }));
3373
3374 let physical = planner.plan(&logical).unwrap();
3375 assert_eq!(physical.columns(), &["n"]);
3376 }
3377
3378 #[test]
3379 fn test_plan_scan_without_label() {
3380 let store = create_test_store();
3381 let planner = Planner::new(store);
3382
3383 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3385 items: vec![ReturnItem {
3386 expression: LogicalExpression::Variable("n".to_string()),
3387 alias: None,
3388 }],
3389 distinct: false,
3390 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3391 variable: "n".to_string(),
3392 label: None,
3393 input: None,
3394 })),
3395 }));
3396
3397 let physical = planner.plan(&logical).unwrap();
3398 assert_eq!(physical.columns(), &["n"]);
3399 }
3400
3401 #[test]
3402 fn test_plan_return_with_alias() {
3403 let store = create_test_store();
3404 let planner = Planner::new(store);
3405
3406 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3408 items: vec![ReturnItem {
3409 expression: LogicalExpression::Variable("n".to_string()),
3410 alias: Some("person".to_string()),
3411 }],
3412 distinct: false,
3413 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3414 variable: "n".to_string(),
3415 label: Some("Person".to_string()),
3416 input: None,
3417 })),
3418 }));
3419
3420 let physical = planner.plan(&logical).unwrap();
3421 assert_eq!(physical.columns(), &["person"]);
3422 }
3423
3424 #[test]
3425 fn test_plan_return_property() {
3426 let store = create_test_store();
3427 let planner = Planner::new(store);
3428
3429 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3431 items: vec![ReturnItem {
3432 expression: LogicalExpression::Property {
3433 variable: "n".to_string(),
3434 property: "name".to_string(),
3435 },
3436 alias: None,
3437 }],
3438 distinct: false,
3439 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3440 variable: "n".to_string(),
3441 label: Some("Person".to_string()),
3442 input: None,
3443 })),
3444 }));
3445
3446 let physical = planner.plan(&logical).unwrap();
3447 assert_eq!(physical.columns(), &["n.name"]);
3448 }
3449
3450 #[test]
3451 fn test_plan_return_literal() {
3452 let store = create_test_store();
3453 let planner = Planner::new(store);
3454
3455 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3457 items: vec![ReturnItem {
3458 expression: LogicalExpression::Literal(Value::Int64(42)),
3459 alias: Some("answer".to_string()),
3460 }],
3461 distinct: false,
3462 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3463 variable: "n".to_string(),
3464 label: None,
3465 input: None,
3466 })),
3467 }));
3468
3469 let physical = planner.plan(&logical).unwrap();
3470 assert_eq!(physical.columns(), &["answer"]);
3471 }
3472
3473 #[test]
3476 fn test_plan_filter_equality() {
3477 let store = create_test_store();
3478 let planner = Planner::new(store);
3479
3480 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3482 items: vec![ReturnItem {
3483 expression: LogicalExpression::Variable("n".to_string()),
3484 alias: None,
3485 }],
3486 distinct: false,
3487 input: Box::new(LogicalOperator::Filter(FilterOp {
3488 predicate: LogicalExpression::Binary {
3489 left: Box::new(LogicalExpression::Property {
3490 variable: "n".to_string(),
3491 property: "age".to_string(),
3492 }),
3493 op: BinaryOp::Eq,
3494 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3495 },
3496 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3497 variable: "n".to_string(),
3498 label: Some("Person".to_string()),
3499 input: None,
3500 })),
3501 })),
3502 }));
3503
3504 let physical = planner.plan(&logical).unwrap();
3505 assert_eq!(physical.columns(), &["n"]);
3506 }
3507
3508 #[test]
3509 fn test_plan_filter_compound_and() {
3510 let store = create_test_store();
3511 let planner = Planner::new(store);
3512
3513 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3515 items: vec![ReturnItem {
3516 expression: LogicalExpression::Variable("n".to_string()),
3517 alias: None,
3518 }],
3519 distinct: false,
3520 input: Box::new(LogicalOperator::Filter(FilterOp {
3521 predicate: LogicalExpression::Binary {
3522 left: Box::new(LogicalExpression::Binary {
3523 left: Box::new(LogicalExpression::Property {
3524 variable: "n".to_string(),
3525 property: "age".to_string(),
3526 }),
3527 op: BinaryOp::Gt,
3528 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3529 }),
3530 op: BinaryOp::And,
3531 right: Box::new(LogicalExpression::Binary {
3532 left: Box::new(LogicalExpression::Property {
3533 variable: "n".to_string(),
3534 property: "age".to_string(),
3535 }),
3536 op: BinaryOp::Lt,
3537 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3538 }),
3539 },
3540 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3541 variable: "n".to_string(),
3542 label: None,
3543 input: None,
3544 })),
3545 })),
3546 }));
3547
3548 let physical = planner.plan(&logical).unwrap();
3549 assert_eq!(physical.columns(), &["n"]);
3550 }
3551
3552 #[test]
3553 fn test_plan_filter_unary_not() {
3554 let store = create_test_store();
3555 let planner = Planner::new(store);
3556
3557 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3559 items: vec![ReturnItem {
3560 expression: LogicalExpression::Variable("n".to_string()),
3561 alias: None,
3562 }],
3563 distinct: false,
3564 input: Box::new(LogicalOperator::Filter(FilterOp {
3565 predicate: LogicalExpression::Unary {
3566 op: UnaryOp::Not,
3567 operand: Box::new(LogicalExpression::Property {
3568 variable: "n".to_string(),
3569 property: "active".to_string(),
3570 }),
3571 },
3572 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3573 variable: "n".to_string(),
3574 label: None,
3575 input: None,
3576 })),
3577 })),
3578 }));
3579
3580 let physical = planner.plan(&logical).unwrap();
3581 assert_eq!(physical.columns(), &["n"]);
3582 }
3583
3584 #[test]
3585 fn test_plan_filter_is_null() {
3586 let store = create_test_store();
3587 let planner = Planner::new(store);
3588
3589 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3591 items: vec![ReturnItem {
3592 expression: LogicalExpression::Variable("n".to_string()),
3593 alias: None,
3594 }],
3595 distinct: false,
3596 input: Box::new(LogicalOperator::Filter(FilterOp {
3597 predicate: LogicalExpression::Unary {
3598 op: UnaryOp::IsNull,
3599 operand: Box::new(LogicalExpression::Property {
3600 variable: "n".to_string(),
3601 property: "email".to_string(),
3602 }),
3603 },
3604 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3605 variable: "n".to_string(),
3606 label: None,
3607 input: None,
3608 })),
3609 })),
3610 }));
3611
3612 let physical = planner.plan(&logical).unwrap();
3613 assert_eq!(physical.columns(), &["n"]);
3614 }
3615
3616 #[test]
3617 fn test_plan_filter_function_call() {
3618 let store = create_test_store();
3619 let planner = Planner::new(store);
3620
3621 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3623 items: vec![ReturnItem {
3624 expression: LogicalExpression::Variable("n".to_string()),
3625 alias: None,
3626 }],
3627 distinct: false,
3628 input: Box::new(LogicalOperator::Filter(FilterOp {
3629 predicate: LogicalExpression::Binary {
3630 left: Box::new(LogicalExpression::FunctionCall {
3631 name: "size".to_string(),
3632 args: vec![LogicalExpression::Property {
3633 variable: "n".to_string(),
3634 property: "friends".to_string(),
3635 }],
3636 distinct: false,
3637 }),
3638 op: BinaryOp::Gt,
3639 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3640 },
3641 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3642 variable: "n".to_string(),
3643 label: None,
3644 input: None,
3645 })),
3646 })),
3647 }));
3648
3649 let physical = planner.plan(&logical).unwrap();
3650 assert_eq!(physical.columns(), &["n"]);
3651 }
3652
3653 #[test]
3656 fn test_plan_expand_outgoing() {
3657 let store = create_test_store();
3658 let planner = Planner::new(store);
3659
3660 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3662 items: vec![
3663 ReturnItem {
3664 expression: LogicalExpression::Variable("a".to_string()),
3665 alias: None,
3666 },
3667 ReturnItem {
3668 expression: LogicalExpression::Variable("b".to_string()),
3669 alias: None,
3670 },
3671 ],
3672 distinct: false,
3673 input: Box::new(LogicalOperator::Expand(ExpandOp {
3674 from_variable: "a".to_string(),
3675 to_variable: "b".to_string(),
3676 edge_variable: None,
3677 direction: ExpandDirection::Outgoing,
3678 edge_type: Some("KNOWS".to_string()),
3679 min_hops: 1,
3680 max_hops: Some(1),
3681 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3682 variable: "a".to_string(),
3683 label: Some("Person".to_string()),
3684 input: None,
3685 })),
3686 path_alias: None,
3687 })),
3688 }));
3689
3690 let physical = planner.plan(&logical).unwrap();
3691 assert!(physical.columns().contains(&"a".to_string()));
3693 assert!(physical.columns().contains(&"b".to_string()));
3694 }
3695
3696 #[test]
3697 fn test_plan_expand_with_edge_variable() {
3698 let store = create_test_store();
3699 let planner = Planner::new(store);
3700
3701 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3703 items: vec![
3704 ReturnItem {
3705 expression: LogicalExpression::Variable("a".to_string()),
3706 alias: None,
3707 },
3708 ReturnItem {
3709 expression: LogicalExpression::Variable("r".to_string()),
3710 alias: None,
3711 },
3712 ReturnItem {
3713 expression: LogicalExpression::Variable("b".to_string()),
3714 alias: None,
3715 },
3716 ],
3717 distinct: false,
3718 input: Box::new(LogicalOperator::Expand(ExpandOp {
3719 from_variable: "a".to_string(),
3720 to_variable: "b".to_string(),
3721 edge_variable: Some("r".to_string()),
3722 direction: ExpandDirection::Outgoing,
3723 edge_type: Some("KNOWS".to_string()),
3724 min_hops: 1,
3725 max_hops: Some(1),
3726 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3727 variable: "a".to_string(),
3728 label: None,
3729 input: None,
3730 })),
3731 path_alias: None,
3732 })),
3733 }));
3734
3735 let physical = planner.plan(&logical).unwrap();
3736 assert!(physical.columns().contains(&"a".to_string()));
3737 assert!(physical.columns().contains(&"r".to_string()));
3738 assert!(physical.columns().contains(&"b".to_string()));
3739 }
3740
3741 #[test]
3744 fn test_plan_limit() {
3745 let store = create_test_store();
3746 let planner = Planner::new(store);
3747
3748 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3750 items: vec![ReturnItem {
3751 expression: LogicalExpression::Variable("n".to_string()),
3752 alias: None,
3753 }],
3754 distinct: false,
3755 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3756 count: 10,
3757 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3758 variable: "n".to_string(),
3759 label: None,
3760 input: None,
3761 })),
3762 })),
3763 }));
3764
3765 let physical = planner.plan(&logical).unwrap();
3766 assert_eq!(physical.columns(), &["n"]);
3767 }
3768
3769 #[test]
3770 fn test_plan_skip() {
3771 let store = create_test_store();
3772 let planner = Planner::new(store);
3773
3774 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3776 items: vec![ReturnItem {
3777 expression: LogicalExpression::Variable("n".to_string()),
3778 alias: None,
3779 }],
3780 distinct: false,
3781 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3782 count: 5,
3783 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3784 variable: "n".to_string(),
3785 label: None,
3786 input: None,
3787 })),
3788 })),
3789 }));
3790
3791 let physical = planner.plan(&logical).unwrap();
3792 assert_eq!(physical.columns(), &["n"]);
3793 }
3794
3795 #[test]
3796 fn test_plan_sort() {
3797 let store = create_test_store();
3798 let planner = Planner::new(store);
3799
3800 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3802 items: vec![ReturnItem {
3803 expression: LogicalExpression::Variable("n".to_string()),
3804 alias: None,
3805 }],
3806 distinct: false,
3807 input: Box::new(LogicalOperator::Sort(SortOp {
3808 keys: vec![SortKey {
3809 expression: LogicalExpression::Variable("n".to_string()),
3810 order: SortOrder::Ascending,
3811 }],
3812 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3813 variable: "n".to_string(),
3814 label: None,
3815 input: None,
3816 })),
3817 })),
3818 }));
3819
3820 let physical = planner.plan(&logical).unwrap();
3821 assert_eq!(physical.columns(), &["n"]);
3822 }
3823
3824 #[test]
3825 fn test_plan_sort_descending() {
3826 let store = create_test_store();
3827 let planner = Planner::new(store);
3828
3829 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3831 items: vec![ReturnItem {
3832 expression: LogicalExpression::Variable("n".to_string()),
3833 alias: None,
3834 }],
3835 distinct: false,
3836 input: Box::new(LogicalOperator::Sort(SortOp {
3837 keys: vec![SortKey {
3838 expression: LogicalExpression::Variable("n".to_string()),
3839 order: SortOrder::Descending,
3840 }],
3841 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3842 variable: "n".to_string(),
3843 label: None,
3844 input: None,
3845 })),
3846 })),
3847 }));
3848
3849 let physical = planner.plan(&logical).unwrap();
3850 assert_eq!(physical.columns(), &["n"]);
3851 }
3852
3853 #[test]
3854 fn test_plan_distinct() {
3855 let store = create_test_store();
3856 let planner = Planner::new(store);
3857
3858 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3860 items: vec![ReturnItem {
3861 expression: LogicalExpression::Variable("n".to_string()),
3862 alias: None,
3863 }],
3864 distinct: false,
3865 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3866 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3867 variable: "n".to_string(),
3868 label: None,
3869 input: None,
3870 })),
3871 columns: None,
3872 })),
3873 }));
3874
3875 let physical = planner.plan(&logical).unwrap();
3876 assert_eq!(physical.columns(), &["n"]);
3877 }
3878
3879 #[test]
3882 fn test_plan_aggregate_count() {
3883 let store = create_test_store();
3884 let planner = Planner::new(store);
3885
3886 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3888 items: vec![ReturnItem {
3889 expression: LogicalExpression::Variable("cnt".to_string()),
3890 alias: None,
3891 }],
3892 distinct: false,
3893 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3894 group_by: vec![],
3895 aggregates: vec![LogicalAggregateExpr {
3896 function: LogicalAggregateFunction::Count,
3897 expression: Some(LogicalExpression::Variable("n".to_string())),
3898 distinct: false,
3899 alias: Some("cnt".to_string()),
3900 percentile: None,
3901 }],
3902 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3903 variable: "n".to_string(),
3904 label: None,
3905 input: None,
3906 })),
3907 having: None,
3908 })),
3909 }));
3910
3911 let physical = planner.plan(&logical).unwrap();
3912 assert!(physical.columns().contains(&"cnt".to_string()));
3913 }
3914
3915 #[test]
3916 fn test_plan_aggregate_with_group_by() {
3917 let store = create_test_store();
3918 let planner = Planner::new(store);
3919
3920 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3922 group_by: vec![LogicalExpression::Property {
3923 variable: "n".to_string(),
3924 property: "city".to_string(),
3925 }],
3926 aggregates: vec![LogicalAggregateExpr {
3927 function: LogicalAggregateFunction::Count,
3928 expression: Some(LogicalExpression::Variable("n".to_string())),
3929 distinct: false,
3930 alias: Some("cnt".to_string()),
3931 percentile: None,
3932 }],
3933 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3934 variable: "n".to_string(),
3935 label: Some("Person".to_string()),
3936 input: None,
3937 })),
3938 having: None,
3939 }));
3940
3941 let physical = planner.plan(&logical).unwrap();
3942 assert_eq!(physical.columns().len(), 2);
3943 }
3944
3945 #[test]
3946 fn test_plan_aggregate_sum() {
3947 let store = create_test_store();
3948 let planner = Planner::new(store);
3949
3950 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3952 group_by: vec![],
3953 aggregates: vec![LogicalAggregateExpr {
3954 function: LogicalAggregateFunction::Sum,
3955 expression: Some(LogicalExpression::Property {
3956 variable: "n".to_string(),
3957 property: "value".to_string(),
3958 }),
3959 distinct: false,
3960 alias: Some("total".to_string()),
3961 percentile: None,
3962 }],
3963 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3964 variable: "n".to_string(),
3965 label: None,
3966 input: None,
3967 })),
3968 having: None,
3969 }));
3970
3971 let physical = planner.plan(&logical).unwrap();
3972 assert!(physical.columns().contains(&"total".to_string()));
3973 }
3974
3975 #[test]
3976 fn test_plan_aggregate_avg() {
3977 let store = create_test_store();
3978 let planner = Planner::new(store);
3979
3980 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3982 group_by: vec![],
3983 aggregates: vec![LogicalAggregateExpr {
3984 function: LogicalAggregateFunction::Avg,
3985 expression: Some(LogicalExpression::Property {
3986 variable: "n".to_string(),
3987 property: "score".to_string(),
3988 }),
3989 distinct: false,
3990 alias: Some("average".to_string()),
3991 percentile: None,
3992 }],
3993 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3994 variable: "n".to_string(),
3995 label: None,
3996 input: None,
3997 })),
3998 having: None,
3999 }));
4000
4001 let physical = planner.plan(&logical).unwrap();
4002 assert!(physical.columns().contains(&"average".to_string()));
4003 }
4004
4005 #[test]
4006 fn test_plan_aggregate_min_max() {
4007 let store = create_test_store();
4008 let planner = Planner::new(store);
4009
4010 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4012 group_by: vec![],
4013 aggregates: vec![
4014 LogicalAggregateExpr {
4015 function: LogicalAggregateFunction::Min,
4016 expression: Some(LogicalExpression::Property {
4017 variable: "n".to_string(),
4018 property: "age".to_string(),
4019 }),
4020 distinct: false,
4021 alias: Some("youngest".to_string()),
4022 percentile: None,
4023 },
4024 LogicalAggregateExpr {
4025 function: LogicalAggregateFunction::Max,
4026 expression: Some(LogicalExpression::Property {
4027 variable: "n".to_string(),
4028 property: "age".to_string(),
4029 }),
4030 distinct: false,
4031 alias: Some("oldest".to_string()),
4032 percentile: None,
4033 },
4034 ],
4035 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4036 variable: "n".to_string(),
4037 label: None,
4038 input: None,
4039 })),
4040 having: None,
4041 }));
4042
4043 let physical = planner.plan(&logical).unwrap();
4044 assert!(physical.columns().contains(&"youngest".to_string()));
4045 assert!(physical.columns().contains(&"oldest".to_string()));
4046 }
4047
4048 #[test]
4051 fn test_plan_inner_join() {
4052 let store = create_test_store();
4053 let planner = Planner::new(store);
4054
4055 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4057 items: vec![
4058 ReturnItem {
4059 expression: LogicalExpression::Variable("a".to_string()),
4060 alias: None,
4061 },
4062 ReturnItem {
4063 expression: LogicalExpression::Variable("b".to_string()),
4064 alias: None,
4065 },
4066 ],
4067 distinct: false,
4068 input: Box::new(LogicalOperator::Join(JoinOp {
4069 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4070 variable: "a".to_string(),
4071 label: Some("Person".to_string()),
4072 input: None,
4073 })),
4074 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4075 variable: "b".to_string(),
4076 label: Some("Company".to_string()),
4077 input: None,
4078 })),
4079 join_type: JoinType::Inner,
4080 conditions: vec![JoinCondition {
4081 left: LogicalExpression::Variable("a".to_string()),
4082 right: LogicalExpression::Variable("b".to_string()),
4083 }],
4084 })),
4085 }));
4086
4087 let physical = planner.plan(&logical).unwrap();
4088 assert!(physical.columns().contains(&"a".to_string()));
4089 assert!(physical.columns().contains(&"b".to_string()));
4090 }
4091
4092 #[test]
4093 fn test_plan_cross_join() {
4094 let store = create_test_store();
4095 let planner = Planner::new(store);
4096
4097 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4099 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4100 variable: "a".to_string(),
4101 label: None,
4102 input: None,
4103 })),
4104 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4105 variable: "b".to_string(),
4106 label: None,
4107 input: None,
4108 })),
4109 join_type: JoinType::Cross,
4110 conditions: vec![],
4111 }));
4112
4113 let physical = planner.plan(&logical).unwrap();
4114 assert_eq!(physical.columns().len(), 2);
4115 }
4116
4117 #[test]
4118 fn test_plan_left_join() {
4119 let store = create_test_store();
4120 let planner = Planner::new(store);
4121
4122 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4123 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4124 variable: "a".to_string(),
4125 label: None,
4126 input: None,
4127 })),
4128 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4129 variable: "b".to_string(),
4130 label: None,
4131 input: None,
4132 })),
4133 join_type: JoinType::Left,
4134 conditions: vec![],
4135 }));
4136
4137 let physical = planner.plan(&logical).unwrap();
4138 assert_eq!(physical.columns().len(), 2);
4139 }
4140
4141 #[test]
4144 fn test_plan_create_node() {
4145 let store = create_test_store();
4146 let planner = Planner::new(store);
4147
4148 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4150 variable: "n".to_string(),
4151 labels: vec!["Person".to_string()],
4152 properties: vec![(
4153 "name".to_string(),
4154 LogicalExpression::Literal(Value::String("Alice".into())),
4155 )],
4156 input: None,
4157 }));
4158
4159 let physical = planner.plan(&logical).unwrap();
4160 assert!(physical.columns().contains(&"n".to_string()));
4161 }
4162
4163 #[test]
4164 fn test_plan_create_edge() {
4165 let store = create_test_store();
4166 let planner = Planner::new(store);
4167
4168 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4170 variable: Some("r".to_string()),
4171 from_variable: "a".to_string(),
4172 to_variable: "b".to_string(),
4173 edge_type: "KNOWS".to_string(),
4174 properties: vec![],
4175 input: Box::new(LogicalOperator::Join(JoinOp {
4176 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4177 variable: "a".to_string(),
4178 label: None,
4179 input: None,
4180 })),
4181 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4182 variable: "b".to_string(),
4183 label: None,
4184 input: None,
4185 })),
4186 join_type: JoinType::Cross,
4187 conditions: vec![],
4188 })),
4189 }));
4190
4191 let physical = planner.plan(&logical).unwrap();
4192 assert!(physical.columns().contains(&"r".to_string()));
4193 }
4194
4195 #[test]
4196 fn test_plan_delete_node() {
4197 let store = create_test_store();
4198 let planner = Planner::new(store);
4199
4200 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4202 variable: "n".to_string(),
4203 detach: false,
4204 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4205 variable: "n".to_string(),
4206 label: None,
4207 input: None,
4208 })),
4209 }));
4210
4211 let physical = planner.plan(&logical).unwrap();
4212 assert!(physical.columns().contains(&"deleted_count".to_string()));
4213 }
4214
4215 #[test]
4218 fn test_plan_empty_errors() {
4219 let store = create_test_store();
4220 let planner = Planner::new(store);
4221
4222 let logical = LogicalPlan::new(LogicalOperator::Empty);
4223 let result = planner.plan(&logical);
4224 assert!(result.is_err());
4225 }
4226
4227 #[test]
4228 fn test_plan_missing_variable_in_return() {
4229 let store = create_test_store();
4230 let planner = Planner::new(store);
4231
4232 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4234 items: vec![ReturnItem {
4235 expression: LogicalExpression::Variable("missing".to_string()),
4236 alias: None,
4237 }],
4238 distinct: false,
4239 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4240 variable: "n".to_string(),
4241 label: None,
4242 input: None,
4243 })),
4244 }));
4245
4246 let result = planner.plan(&logical);
4247 assert!(result.is_err());
4248 }
4249
4250 #[test]
4253 fn test_convert_binary_ops() {
4254 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4255 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4256 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4257 assert!(convert_binary_op(BinaryOp::Le).is_ok());
4258 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4259 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4260 assert!(convert_binary_op(BinaryOp::And).is_ok());
4261 assert!(convert_binary_op(BinaryOp::Or).is_ok());
4262 assert!(convert_binary_op(BinaryOp::Add).is_ok());
4263 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4264 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4265 assert!(convert_binary_op(BinaryOp::Div).is_ok());
4266 }
4267
4268 #[test]
4269 fn test_convert_unary_ops() {
4270 assert!(convert_unary_op(UnaryOp::Not).is_ok());
4271 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4272 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4273 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4274 }
4275
4276 #[test]
4277 fn test_convert_aggregate_functions() {
4278 assert!(matches!(
4279 convert_aggregate_function(LogicalAggregateFunction::Count),
4280 PhysicalAggregateFunction::Count
4281 ));
4282 assert!(matches!(
4283 convert_aggregate_function(LogicalAggregateFunction::Sum),
4284 PhysicalAggregateFunction::Sum
4285 ));
4286 assert!(matches!(
4287 convert_aggregate_function(LogicalAggregateFunction::Avg),
4288 PhysicalAggregateFunction::Avg
4289 ));
4290 assert!(matches!(
4291 convert_aggregate_function(LogicalAggregateFunction::Min),
4292 PhysicalAggregateFunction::Min
4293 ));
4294 assert!(matches!(
4295 convert_aggregate_function(LogicalAggregateFunction::Max),
4296 PhysicalAggregateFunction::Max
4297 ));
4298 }
4299
4300 #[test]
4301 fn test_planner_accessors() {
4302 let store = create_test_store();
4303 let planner = Planner::new(Arc::clone(&store));
4304
4305 assert!(planner.tx_id().is_none());
4306 assert!(planner.tx_manager().is_none());
4307 let _ = planner.viewing_epoch(); }
4309
4310 #[test]
4311 fn test_physical_plan_accessors() {
4312 let store = create_test_store();
4313 let planner = Planner::new(store);
4314
4315 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4316 variable: "n".to_string(),
4317 label: None,
4318 input: None,
4319 }));
4320
4321 let physical = planner.plan(&logical).unwrap();
4322 assert_eq!(physical.columns(), &["n"]);
4323
4324 let _ = physical.into_operator();
4326 }
4327
4328 #[test]
4331 fn test_plan_adaptive_with_scan() {
4332 let store = create_test_store();
4333 let planner = Planner::new(store);
4334
4335 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4337 items: vec![ReturnItem {
4338 expression: LogicalExpression::Variable("n".to_string()),
4339 alias: None,
4340 }],
4341 distinct: false,
4342 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4343 variable: "n".to_string(),
4344 label: Some("Person".to_string()),
4345 input: None,
4346 })),
4347 }));
4348
4349 let physical = planner.plan_adaptive(&logical).unwrap();
4350 assert_eq!(physical.columns(), &["n"]);
4351 assert!(physical.adaptive_context.is_some());
4353 }
4354
4355 #[test]
4356 fn test_plan_adaptive_with_filter() {
4357 let store = create_test_store();
4358 let planner = Planner::new(store);
4359
4360 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4362 items: vec![ReturnItem {
4363 expression: LogicalExpression::Variable("n".to_string()),
4364 alias: None,
4365 }],
4366 distinct: false,
4367 input: Box::new(LogicalOperator::Filter(FilterOp {
4368 predicate: LogicalExpression::Binary {
4369 left: Box::new(LogicalExpression::Property {
4370 variable: "n".to_string(),
4371 property: "age".to_string(),
4372 }),
4373 op: BinaryOp::Gt,
4374 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4375 },
4376 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4377 variable: "n".to_string(),
4378 label: None,
4379 input: None,
4380 })),
4381 })),
4382 }));
4383
4384 let physical = planner.plan_adaptive(&logical).unwrap();
4385 assert!(physical.adaptive_context.is_some());
4386 }
4387
4388 #[test]
4389 fn test_plan_adaptive_with_expand() {
4390 let store = create_test_store();
4391 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4392
4393 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4395 items: vec![
4396 ReturnItem {
4397 expression: LogicalExpression::Variable("a".to_string()),
4398 alias: None,
4399 },
4400 ReturnItem {
4401 expression: LogicalExpression::Variable("b".to_string()),
4402 alias: None,
4403 },
4404 ],
4405 distinct: false,
4406 input: Box::new(LogicalOperator::Expand(ExpandOp {
4407 from_variable: "a".to_string(),
4408 to_variable: "b".to_string(),
4409 edge_variable: None,
4410 direction: ExpandDirection::Outgoing,
4411 edge_type: Some("KNOWS".to_string()),
4412 min_hops: 1,
4413 max_hops: Some(1),
4414 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4415 variable: "a".to_string(),
4416 label: None,
4417 input: None,
4418 })),
4419 path_alias: None,
4420 })),
4421 }));
4422
4423 let physical = planner.plan_adaptive(&logical).unwrap();
4424 assert!(physical.adaptive_context.is_some());
4425 }
4426
4427 #[test]
4428 fn test_plan_adaptive_with_join() {
4429 let store = create_test_store();
4430 let planner = Planner::new(store);
4431
4432 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4433 items: vec![
4434 ReturnItem {
4435 expression: LogicalExpression::Variable("a".to_string()),
4436 alias: None,
4437 },
4438 ReturnItem {
4439 expression: LogicalExpression::Variable("b".to_string()),
4440 alias: None,
4441 },
4442 ],
4443 distinct: false,
4444 input: Box::new(LogicalOperator::Join(JoinOp {
4445 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4446 variable: "a".to_string(),
4447 label: None,
4448 input: None,
4449 })),
4450 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4451 variable: "b".to_string(),
4452 label: None,
4453 input: None,
4454 })),
4455 join_type: JoinType::Cross,
4456 conditions: vec![],
4457 })),
4458 }));
4459
4460 let physical = planner.plan_adaptive(&logical).unwrap();
4461 assert!(physical.adaptive_context.is_some());
4462 }
4463
4464 #[test]
4465 fn test_plan_adaptive_with_aggregate() {
4466 let store = create_test_store();
4467 let planner = Planner::new(store);
4468
4469 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4470 group_by: vec![],
4471 aggregates: vec![LogicalAggregateExpr {
4472 function: LogicalAggregateFunction::Count,
4473 expression: Some(LogicalExpression::Variable("n".to_string())),
4474 distinct: false,
4475 alias: Some("cnt".to_string()),
4476 percentile: None,
4477 }],
4478 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4479 variable: "n".to_string(),
4480 label: None,
4481 input: None,
4482 })),
4483 having: None,
4484 }));
4485
4486 let physical = planner.plan_adaptive(&logical).unwrap();
4487 assert!(physical.adaptive_context.is_some());
4488 }
4489
4490 #[test]
4491 fn test_plan_adaptive_with_distinct() {
4492 let store = create_test_store();
4493 let planner = Planner::new(store);
4494
4495 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4496 items: vec![ReturnItem {
4497 expression: LogicalExpression::Variable("n".to_string()),
4498 alias: None,
4499 }],
4500 distinct: false,
4501 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4502 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4503 variable: "n".to_string(),
4504 label: None,
4505 input: None,
4506 })),
4507 columns: None,
4508 })),
4509 }));
4510
4511 let physical = planner.plan_adaptive(&logical).unwrap();
4512 assert!(physical.adaptive_context.is_some());
4513 }
4514
4515 #[test]
4516 fn test_plan_adaptive_with_limit() {
4517 let store = create_test_store();
4518 let planner = Planner::new(store);
4519
4520 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4521 items: vec![ReturnItem {
4522 expression: LogicalExpression::Variable("n".to_string()),
4523 alias: None,
4524 }],
4525 distinct: false,
4526 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4527 count: 10,
4528 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4529 variable: "n".to_string(),
4530 label: None,
4531 input: None,
4532 })),
4533 })),
4534 }));
4535
4536 let physical = planner.plan_adaptive(&logical).unwrap();
4537 assert!(physical.adaptive_context.is_some());
4538 }
4539
4540 #[test]
4541 fn test_plan_adaptive_with_skip() {
4542 let store = create_test_store();
4543 let planner = Planner::new(store);
4544
4545 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4546 items: vec![ReturnItem {
4547 expression: LogicalExpression::Variable("n".to_string()),
4548 alias: None,
4549 }],
4550 distinct: false,
4551 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4552 count: 5,
4553 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4554 variable: "n".to_string(),
4555 label: None,
4556 input: None,
4557 })),
4558 })),
4559 }));
4560
4561 let physical = planner.plan_adaptive(&logical).unwrap();
4562 assert!(physical.adaptive_context.is_some());
4563 }
4564
4565 #[test]
4566 fn test_plan_adaptive_with_sort() {
4567 let store = create_test_store();
4568 let planner = Planner::new(store);
4569
4570 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4571 items: vec![ReturnItem {
4572 expression: LogicalExpression::Variable("n".to_string()),
4573 alias: None,
4574 }],
4575 distinct: false,
4576 input: Box::new(LogicalOperator::Sort(SortOp {
4577 keys: vec![SortKey {
4578 expression: LogicalExpression::Variable("n".to_string()),
4579 order: SortOrder::Ascending,
4580 }],
4581 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4582 variable: "n".to_string(),
4583 label: None,
4584 input: None,
4585 })),
4586 })),
4587 }));
4588
4589 let physical = planner.plan_adaptive(&logical).unwrap();
4590 assert!(physical.adaptive_context.is_some());
4591 }
4592
4593 #[test]
4594 fn test_plan_adaptive_with_union() {
4595 let store = create_test_store();
4596 let planner = Planner::new(store);
4597
4598 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4599 items: vec![ReturnItem {
4600 expression: LogicalExpression::Variable("n".to_string()),
4601 alias: None,
4602 }],
4603 distinct: false,
4604 input: Box::new(LogicalOperator::Union(UnionOp {
4605 inputs: vec![
4606 LogicalOperator::NodeScan(NodeScanOp {
4607 variable: "n".to_string(),
4608 label: Some("Person".to_string()),
4609 input: None,
4610 }),
4611 LogicalOperator::NodeScan(NodeScanOp {
4612 variable: "n".to_string(),
4613 label: Some("Company".to_string()),
4614 input: None,
4615 }),
4616 ],
4617 })),
4618 }));
4619
4620 let physical = planner.plan_adaptive(&logical).unwrap();
4621 assert!(physical.adaptive_context.is_some());
4622 }
4623
4624 #[test]
4627 fn test_plan_expand_variable_length() {
4628 let store = create_test_store();
4629 let planner = Planner::new(store);
4630
4631 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4633 items: vec![
4634 ReturnItem {
4635 expression: LogicalExpression::Variable("a".to_string()),
4636 alias: None,
4637 },
4638 ReturnItem {
4639 expression: LogicalExpression::Variable("b".to_string()),
4640 alias: None,
4641 },
4642 ],
4643 distinct: false,
4644 input: Box::new(LogicalOperator::Expand(ExpandOp {
4645 from_variable: "a".to_string(),
4646 to_variable: "b".to_string(),
4647 edge_variable: None,
4648 direction: ExpandDirection::Outgoing,
4649 edge_type: Some("KNOWS".to_string()),
4650 min_hops: 1,
4651 max_hops: Some(3),
4652 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4653 variable: "a".to_string(),
4654 label: None,
4655 input: None,
4656 })),
4657 path_alias: None,
4658 })),
4659 }));
4660
4661 let physical = planner.plan(&logical).unwrap();
4662 assert!(physical.columns().contains(&"a".to_string()));
4663 assert!(physical.columns().contains(&"b".to_string()));
4664 }
4665
4666 #[test]
4667 fn test_plan_expand_with_path_alias() {
4668 let store = create_test_store();
4669 let planner = Planner::new(store);
4670
4671 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4673 items: vec![
4674 ReturnItem {
4675 expression: LogicalExpression::Variable("a".to_string()),
4676 alias: None,
4677 },
4678 ReturnItem {
4679 expression: LogicalExpression::Variable("b".to_string()),
4680 alias: None,
4681 },
4682 ],
4683 distinct: false,
4684 input: Box::new(LogicalOperator::Expand(ExpandOp {
4685 from_variable: "a".to_string(),
4686 to_variable: "b".to_string(),
4687 edge_variable: None,
4688 direction: ExpandDirection::Outgoing,
4689 edge_type: Some("KNOWS".to_string()),
4690 min_hops: 1,
4691 max_hops: Some(3),
4692 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4693 variable: "a".to_string(),
4694 label: None,
4695 input: None,
4696 })),
4697 path_alias: Some("p".to_string()),
4698 })),
4699 }));
4700
4701 let physical = planner.plan(&logical).unwrap();
4702 assert!(physical.columns().contains(&"a".to_string()));
4704 assert!(physical.columns().contains(&"b".to_string()));
4705 }
4706
4707 #[test]
4708 fn test_plan_expand_incoming() {
4709 let store = create_test_store();
4710 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4711
4712 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4714 items: vec![
4715 ReturnItem {
4716 expression: LogicalExpression::Variable("a".to_string()),
4717 alias: None,
4718 },
4719 ReturnItem {
4720 expression: LogicalExpression::Variable("b".to_string()),
4721 alias: None,
4722 },
4723 ],
4724 distinct: false,
4725 input: Box::new(LogicalOperator::Expand(ExpandOp {
4726 from_variable: "a".to_string(),
4727 to_variable: "b".to_string(),
4728 edge_variable: None,
4729 direction: ExpandDirection::Incoming,
4730 edge_type: Some("KNOWS".to_string()),
4731 min_hops: 1,
4732 max_hops: Some(1),
4733 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4734 variable: "a".to_string(),
4735 label: None,
4736 input: None,
4737 })),
4738 path_alias: None,
4739 })),
4740 }));
4741
4742 let physical = planner.plan(&logical).unwrap();
4743 assert!(physical.columns().contains(&"a".to_string()));
4744 assert!(physical.columns().contains(&"b".to_string()));
4745 }
4746
4747 #[test]
4748 fn test_plan_expand_both_directions() {
4749 let store = create_test_store();
4750 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4751
4752 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4754 items: vec![
4755 ReturnItem {
4756 expression: LogicalExpression::Variable("a".to_string()),
4757 alias: None,
4758 },
4759 ReturnItem {
4760 expression: LogicalExpression::Variable("b".to_string()),
4761 alias: None,
4762 },
4763 ],
4764 distinct: false,
4765 input: Box::new(LogicalOperator::Expand(ExpandOp {
4766 from_variable: "a".to_string(),
4767 to_variable: "b".to_string(),
4768 edge_variable: None,
4769 direction: ExpandDirection::Both,
4770 edge_type: Some("KNOWS".to_string()),
4771 min_hops: 1,
4772 max_hops: Some(1),
4773 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4774 variable: "a".to_string(),
4775 label: None,
4776 input: None,
4777 })),
4778 path_alias: None,
4779 })),
4780 }));
4781
4782 let physical = planner.plan(&logical).unwrap();
4783 assert!(physical.columns().contains(&"a".to_string()));
4784 assert!(physical.columns().contains(&"b".to_string()));
4785 }
4786
4787 #[test]
4790 fn test_planner_with_context() {
4791 use crate::transaction::TransactionManager;
4792
4793 let store = create_test_store();
4794 let tx_manager = Arc::new(TransactionManager::new());
4795 let tx_id = tx_manager.begin();
4796 let epoch = tx_manager.current_epoch();
4797
4798 let planner = Planner::with_context(
4799 Arc::clone(&store),
4800 Arc::clone(&tx_manager),
4801 Some(tx_id),
4802 epoch,
4803 );
4804
4805 assert_eq!(planner.tx_id(), Some(tx_id));
4806 assert!(planner.tx_manager().is_some());
4807 assert_eq!(planner.viewing_epoch(), epoch);
4808 }
4809
4810 #[test]
4811 fn test_planner_with_factorized_execution_disabled() {
4812 let store = create_test_store();
4813 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4814
4815 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4817 items: vec![
4818 ReturnItem {
4819 expression: LogicalExpression::Variable("a".to_string()),
4820 alias: None,
4821 },
4822 ReturnItem {
4823 expression: LogicalExpression::Variable("c".to_string()),
4824 alias: None,
4825 },
4826 ],
4827 distinct: false,
4828 input: Box::new(LogicalOperator::Expand(ExpandOp {
4829 from_variable: "b".to_string(),
4830 to_variable: "c".to_string(),
4831 edge_variable: None,
4832 direction: ExpandDirection::Outgoing,
4833 edge_type: None,
4834 min_hops: 1,
4835 max_hops: Some(1),
4836 input: Box::new(LogicalOperator::Expand(ExpandOp {
4837 from_variable: "a".to_string(),
4838 to_variable: "b".to_string(),
4839 edge_variable: None,
4840 direction: ExpandDirection::Outgoing,
4841 edge_type: None,
4842 min_hops: 1,
4843 max_hops: Some(1),
4844 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4845 variable: "a".to_string(),
4846 label: None,
4847 input: None,
4848 })),
4849 path_alias: None,
4850 })),
4851 path_alias: None,
4852 })),
4853 }));
4854
4855 let physical = planner.plan(&logical).unwrap();
4856 assert!(physical.columns().contains(&"a".to_string()));
4857 assert!(physical.columns().contains(&"c".to_string()));
4858 }
4859
4860 #[test]
4863 fn test_plan_sort_by_property() {
4864 let store = create_test_store();
4865 let planner = Planner::new(store);
4866
4867 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4869 items: vec![ReturnItem {
4870 expression: LogicalExpression::Variable("n".to_string()),
4871 alias: None,
4872 }],
4873 distinct: false,
4874 input: Box::new(LogicalOperator::Sort(SortOp {
4875 keys: vec![SortKey {
4876 expression: LogicalExpression::Property {
4877 variable: "n".to_string(),
4878 property: "name".to_string(),
4879 },
4880 order: SortOrder::Ascending,
4881 }],
4882 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4883 variable: "n".to_string(),
4884 label: None,
4885 input: None,
4886 })),
4887 })),
4888 }));
4889
4890 let physical = planner.plan(&logical).unwrap();
4891 assert!(physical.columns().contains(&"n".to_string()));
4893 }
4894
4895 #[test]
4898 fn test_plan_scan_with_input() {
4899 let store = create_test_store();
4900 let planner = Planner::new(store);
4901
4902 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4904 items: vec![
4905 ReturnItem {
4906 expression: LogicalExpression::Variable("a".to_string()),
4907 alias: None,
4908 },
4909 ReturnItem {
4910 expression: LogicalExpression::Variable("b".to_string()),
4911 alias: None,
4912 },
4913 ],
4914 distinct: false,
4915 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4916 variable: "b".to_string(),
4917 label: Some("Company".to_string()),
4918 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4919 variable: "a".to_string(),
4920 label: Some("Person".to_string()),
4921 input: None,
4922 }))),
4923 })),
4924 }));
4925
4926 let physical = planner.plan(&logical).unwrap();
4927 assert!(physical.columns().contains(&"a".to_string()));
4928 assert!(physical.columns().contains(&"b".to_string()));
4929 }
4930}