1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10 FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11 LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29 UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37struct RangeBounds<'a> {
39 min: Option<&'a Value>,
40 max: Option<&'a Value>,
41 min_inclusive: bool,
42 max_inclusive: bool,
43}
44
45pub struct Planner {
47 store: Arc<LpgStore>,
49 tx_manager: Option<Arc<TransactionManager>>,
51 tx_id: Option<TxId>,
53 viewing_epoch: EpochId,
55 anon_edge_counter: std::cell::Cell<u32>,
57 factorized_execution: bool,
59}
60
61impl Planner {
62 #[must_use]
67 pub fn new(store: Arc<LpgStore>) -> Self {
68 let epoch = store.current_epoch();
69 Self {
70 store,
71 tx_manager: None,
72 tx_id: None,
73 viewing_epoch: epoch,
74 anon_edge_counter: std::cell::Cell::new(0),
75 factorized_execution: true,
76 }
77 }
78
79 #[must_use]
88 pub fn with_context(
89 store: Arc<LpgStore>,
90 tx_manager: Arc<TransactionManager>,
91 tx_id: Option<TxId>,
92 viewing_epoch: EpochId,
93 ) -> Self {
94 Self {
95 store,
96 tx_manager: Some(tx_manager),
97 tx_id,
98 viewing_epoch,
99 anon_edge_counter: std::cell::Cell::new(0),
100 factorized_execution: true,
101 }
102 }
103
104 #[must_use]
106 pub fn viewing_epoch(&self) -> EpochId {
107 self.viewing_epoch
108 }
109
110 #[must_use]
112 pub fn tx_id(&self) -> Option<TxId> {
113 self.tx_id
114 }
115
116 #[must_use]
118 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119 self.tx_manager.as_ref()
120 }
121
122 #[must_use]
124 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125 self.factorized_execution = enabled;
126 self
127 }
128
129 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133 match op {
134 LogicalOperator::Expand(expand) => {
135 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138 if is_single_hop {
139 let (inner_count, base) = Self::count_expand_chain(&expand.input);
140 (inner_count + 1, base)
141 } else {
142 (0, op)
144 }
145 }
146 _ => (0, op),
147 }
148 }
149
150 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154 let mut chain = Vec::new();
155 let mut current = op;
156
157 while let LogicalOperator::Expand(expand) = current {
158 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160 if !is_single_hop {
161 break;
162 }
163 chain.push(expand);
164 current = &expand.input;
165 }
166
167 chain.reverse();
169 chain
170 }
171
172 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179 Ok(PhysicalPlan {
180 operator,
181 columns,
182 adaptive_context: None,
183 })
184 }
185
186 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197 let mut adaptive_context = AdaptiveContext::new();
199 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201 Ok(PhysicalPlan {
202 operator,
203 columns,
204 adaptive_context: Some(adaptive_context),
205 })
206 }
207
208 fn collect_cardinality_estimates(
210 &self,
211 op: &LogicalOperator,
212 ctx: &mut AdaptiveContext,
213 depth: usize,
214 ) {
215 match op {
216 LogicalOperator::NodeScan(scan) => {
217 let estimate = if let Some(label) = &scan.label {
219 self.store.nodes_by_label(label).len() as f64
220 } else {
221 self.store.node_count() as f64
222 };
223 let id = format!("scan_{}", scan.variable);
224 ctx.set_estimate(&id, estimate);
225
226 if let Some(input) = &scan.input {
228 self.collect_cardinality_estimates(input, ctx, depth + 1);
229 }
230 }
231 LogicalOperator::Filter(filter) => {
232 let input_estimate = self.estimate_cardinality(&filter.input);
234 let estimate = input_estimate * 0.3;
235 let id = format!("filter_{depth}");
236 ctx.set_estimate(&id, estimate);
237
238 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239 }
240 LogicalOperator::Expand(expand) => {
241 let input_estimate = self.estimate_cardinality(&expand.input);
243 let stats = self.store.statistics();
244 let avg_degree = self.estimate_expand_degree(&stats, expand);
245 let estimate = input_estimate * avg_degree;
246 let id = format!("expand_{}", expand.to_variable);
247 ctx.set_estimate(&id, estimate);
248
249 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250 }
251 LogicalOperator::Join(join) => {
252 let left_est = self.estimate_cardinality(&join.left);
254 let right_est = self.estimate_cardinality(&join.right);
255 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
257 ctx.set_estimate(&id, estimate);
258
259 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261 }
262 LogicalOperator::Aggregate(agg) => {
263 let input_estimate = self.estimate_cardinality(&agg.input);
265 let estimate = if agg.group_by.is_empty() {
266 1.0 } else {
268 (input_estimate * 0.1).max(1.0) };
270 let id = format!("aggregate_{depth}");
271 ctx.set_estimate(&id, estimate);
272
273 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274 }
275 LogicalOperator::Distinct(distinct) => {
276 let input_estimate = self.estimate_cardinality(&distinct.input);
277 let estimate = (input_estimate * 0.5).max(1.0);
278 let id = format!("distinct_{depth}");
279 ctx.set_estimate(&id, estimate);
280
281 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282 }
283 LogicalOperator::Return(ret) => {
284 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285 }
286 LogicalOperator::Limit(limit) => {
287 let input_estimate = self.estimate_cardinality(&limit.input);
288 let estimate = (input_estimate).min(limit.count as f64);
289 let id = format!("limit_{depth}");
290 ctx.set_estimate(&id, estimate);
291
292 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293 }
294 LogicalOperator::Skip(skip) => {
295 let input_estimate = self.estimate_cardinality(&skip.input);
296 let estimate = (input_estimate - skip.count as f64).max(0.0);
297 let id = format!("skip_{depth}");
298 ctx.set_estimate(&id, estimate);
299
300 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301 }
302 LogicalOperator::Sort(sort) => {
303 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305 }
306 LogicalOperator::Union(union) => {
307 let estimate: f64 = union
308 .inputs
309 .iter()
310 .map(|input| self.estimate_cardinality(input))
311 .sum();
312 let id = format!("union_{depth}");
313 ctx.set_estimate(&id, estimate);
314
315 for input in &union.inputs {
316 self.collect_cardinality_estimates(input, ctx, depth + 1);
317 }
318 }
319 _ => {
320 }
322 }
323 }
324
325 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327 match op {
328 LogicalOperator::NodeScan(scan) => {
329 if let Some(label) = &scan.label {
330 self.store.nodes_by_label(label).len() as f64
331 } else {
332 self.store.node_count() as f64
333 }
334 }
335 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336 LogicalOperator::Expand(expand) => {
337 let stats = self.store.statistics();
338 let avg_degree = self.estimate_expand_degree(&stats, expand);
339 self.estimate_cardinality(&expand.input) * avg_degree
340 }
341 LogicalOperator::Join(join) => {
342 let left = self.estimate_cardinality(&join.left);
343 let right = self.estimate_cardinality(&join.right);
344 (left * right).sqrt()
345 }
346 LogicalOperator::Aggregate(agg) => {
347 if agg.group_by.is_empty() {
348 1.0
349 } else {
350 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351 }
352 }
353 LogicalOperator::Distinct(distinct) => {
354 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355 }
356 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357 LogicalOperator::Limit(limit) => self
358 .estimate_cardinality(&limit.input)
359 .min(limit.count as f64),
360 LogicalOperator::Skip(skip) => {
361 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362 }
363 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364 LogicalOperator::Union(union) => union
365 .inputs
366 .iter()
367 .map(|input| self.estimate_cardinality(input))
368 .sum(),
369 _ => 1000.0, }
371 }
372
373 fn estimate_expand_degree(
375 &self,
376 stats: &grafeo_core::statistics::Statistics,
377 expand: &ExpandOp,
378 ) -> f64 {
379 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380 if let Some(edge_type) = &expand.edge_type {
381 stats.estimate_avg_degree(edge_type, outgoing)
382 } else if stats.total_nodes > 0 {
383 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384 } else {
385 10.0 }
387 }
388
389 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391 match op {
392 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393 LogicalOperator::Expand(expand) => {
394 if self.factorized_execution {
396 let (chain_len, _base) = Self::count_expand_chain(op);
397 if chain_len >= 2 {
398 return self.plan_expand_chain(op);
400 }
401 }
402 self.plan_expand(expand)
403 }
404 LogicalOperator::Return(ret) => self.plan_return(ret),
405 LogicalOperator::Filter(filter) => self.plan_filter(filter),
406 LogicalOperator::Project(project) => self.plan_project(project),
407 LogicalOperator::Limit(limit) => self.plan_limit(limit),
408 LogicalOperator::Skip(skip) => self.plan_skip(skip),
409 LogicalOperator::Sort(sort) => self.plan_sort(sort),
410 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411 LogicalOperator::Join(join) => self.plan_join(join),
412 LogicalOperator::Union(union) => self.plan_union(union),
413 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421 LogicalOperator::Merge(merge) => self.plan_merge(merge),
422 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
427 LogicalOperator::VectorScan(_) => Err(Error::Internal(
428 "VectorScan requires vector-index feature".to_string(),
429 )),
430 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
431 "VectorJoin requires vector-index feature".to_string(),
432 )),
433 _ => Err(Error::Internal(format!(
434 "Unsupported operator: {:?}",
435 std::mem::discriminant(op)
436 ))),
437 }
438 }
439
440 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
442 let scan_op = if let Some(label) = &scan.label {
443 ScanOperator::with_label(Arc::clone(&self.store), label)
444 } else {
445 ScanOperator::new(Arc::clone(&self.store))
446 };
447
448 let scan_operator: Box<dyn Operator> =
450 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
451
452 if let Some(input) = &scan.input {
454 let (input_op, mut input_columns) = self.plan_operator(input)?;
455
456 let mut output_schema: Vec<LogicalType> =
458 input_columns.iter().map(|_| LogicalType::Any).collect();
459 output_schema.push(LogicalType::Node);
460
461 input_columns.push(scan.variable.clone());
463
464 let join_op = Box::new(NestedLoopJoinOperator::new(
466 input_op,
467 scan_operator,
468 None, PhysicalJoinType::Cross,
470 output_schema,
471 ));
472
473 Ok((join_op, input_columns))
474 } else {
475 let columns = vec![scan.variable.clone()];
476 Ok((scan_operator, columns))
477 }
478 }
479
480 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
482 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
484
485 let source_column = input_columns
487 .iter()
488 .position(|c| c == &expand.from_variable)
489 .ok_or_else(|| {
490 Error::Internal(format!(
491 "Source variable '{}' not found in input columns",
492 expand.from_variable
493 ))
494 })?;
495
496 let direction = match expand.direction {
498 ExpandDirection::Outgoing => Direction::Outgoing,
499 ExpandDirection::Incoming => Direction::Incoming,
500 ExpandDirection::Both => Direction::Both,
501 };
502
503 let is_variable_length =
505 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
506
507 let operator: Box<dyn Operator> = if is_variable_length {
508 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
511 Arc::clone(&self.store),
512 input_op,
513 source_column,
514 direction,
515 expand.edge_type.clone(),
516 expand.min_hops,
517 max_hops,
518 )
519 .with_tx_context(self.viewing_epoch, self.tx_id);
520
521 if expand.path_alias.is_some() {
523 expand_op = expand_op.with_path_length_output();
524 }
525
526 Box::new(expand_op)
527 } else {
528 let expand_op = ExpandOperator::new(
530 Arc::clone(&self.store),
531 input_op,
532 source_column,
533 direction,
534 expand.edge_type.clone(),
535 )
536 .with_tx_context(self.viewing_epoch, self.tx_id);
537 Box::new(expand_op)
538 };
539
540 let mut columns = input_columns;
543
544 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
546 let count = self.anon_edge_counter.get();
547 self.anon_edge_counter.set(count + 1);
548 format!("_anon_edge_{}", count)
549 });
550 columns.push(edge_col_name);
551
552 columns.push(expand.to_variable.clone());
553
554 if let Some(ref path_alias) = expand.path_alias {
556 columns.push(format!("_path_length_{}", path_alias));
557 }
558
559 Ok((operator, columns))
560 }
561
562 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
570 let expands = Self::collect_expand_chain(op);
571 if expands.is_empty() {
572 return Err(Error::Internal("Empty expand chain".to_string()));
573 }
574
575 let first_expand = expands[0];
577 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
578
579 let mut columns = base_columns.clone();
580 let mut steps = Vec::new();
581
582 let mut is_first = true;
587
588 for expand in &expands {
589 let source_column = if is_first {
591 base_columns
593 .iter()
594 .position(|c| c == &expand.from_variable)
595 .ok_or_else(|| {
596 Error::Internal(format!(
597 "Source variable '{}' not found in base columns",
598 expand.from_variable
599 ))
600 })?
601 } else {
602 1
605 };
606
607 let direction = match expand.direction {
609 ExpandDirection::Outgoing => Direction::Outgoing,
610 ExpandDirection::Incoming => Direction::Incoming,
611 ExpandDirection::Both => Direction::Both,
612 };
613
614 steps.push(ExpandStep {
616 source_column,
617 direction,
618 edge_type: expand.edge_type.clone(),
619 });
620
621 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
623 let count = self.anon_edge_counter.get();
624 self.anon_edge_counter.set(count + 1);
625 format!("_anon_edge_{}", count)
626 });
627 columns.push(edge_col_name);
628 columns.push(expand.to_variable.clone());
629
630 is_first = false;
631 }
632
633 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
635
636 if let Some(tx_id) = self.tx_id {
637 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
638 } else {
639 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
640 }
641
642 Ok((Box::new(lazy_op), columns))
643 }
644
645 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
647 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
649
650 let variable_columns: HashMap<String, usize> = input_columns
652 .iter()
653 .enumerate()
654 .map(|(i, name)| (name.clone(), i))
655 .collect();
656
657 let columns: Vec<String> = ret
659 .items
660 .iter()
661 .map(|item| {
662 item.alias.clone().unwrap_or_else(|| {
663 expression_to_string(&item.expression)
665 })
666 })
667 .collect();
668
669 let needs_project = ret
671 .items
672 .iter()
673 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
674
675 if needs_project {
676 let mut projections = Vec::with_capacity(ret.items.len());
678 let mut output_types = Vec::with_capacity(ret.items.len());
679
680 for item in &ret.items {
681 match &item.expression {
682 LogicalExpression::Variable(name) => {
683 let col_idx = *variable_columns.get(name).ok_or_else(|| {
684 Error::Internal(format!("Variable '{}' not found in input", name))
685 })?;
686 projections.push(ProjectExpr::Column(col_idx));
687 output_types.push(LogicalType::Node);
689 }
690 LogicalExpression::Property { variable, property } => {
691 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
692 Error::Internal(format!("Variable '{}' not found in input", variable))
693 })?;
694 projections.push(ProjectExpr::PropertyAccess {
695 column: col_idx,
696 property: property.clone(),
697 });
698 output_types.push(LogicalType::Any);
700 }
701 LogicalExpression::Literal(value) => {
702 projections.push(ProjectExpr::Constant(value.clone()));
703 output_types.push(value_to_logical_type(value));
704 }
705 LogicalExpression::FunctionCall { name, args, .. } => {
706 match name.to_lowercase().as_str() {
708 "type" => {
709 if args.len() != 1 {
711 return Err(Error::Internal(
712 "type() requires exactly one argument".to_string(),
713 ));
714 }
715 if let LogicalExpression::Variable(var_name) = &args[0] {
716 let col_idx =
717 *variable_columns.get(var_name).ok_or_else(|| {
718 Error::Internal(format!(
719 "Variable '{}' not found in input",
720 var_name
721 ))
722 })?;
723 projections.push(ProjectExpr::EdgeType { column: col_idx });
724 output_types.push(LogicalType::String);
725 } else {
726 return Err(Error::Internal(
727 "type() argument must be a variable".to_string(),
728 ));
729 }
730 }
731 "length" => {
732 if args.len() != 1 {
735 return Err(Error::Internal(
736 "length() requires exactly one argument".to_string(),
737 ));
738 }
739 if let LogicalExpression::Variable(var_name) = &args[0] {
740 let col_idx =
741 *variable_columns.get(var_name).ok_or_else(|| {
742 Error::Internal(format!(
743 "Variable '{}' not found in input",
744 var_name
745 ))
746 })?;
747 projections.push(ProjectExpr::Column(col_idx));
749 output_types.push(LogicalType::Int64);
750 } else {
751 return Err(Error::Internal(
752 "length() argument must be a variable".to_string(),
753 ));
754 }
755 }
756 _ => {
758 let filter_expr = self.convert_expression(&item.expression)?;
759 projections.push(ProjectExpr::Expression {
760 expr: filter_expr,
761 variable_columns: variable_columns.clone(),
762 });
763 output_types.push(LogicalType::Any);
764 }
765 }
766 }
767 LogicalExpression::Case { .. } => {
768 let filter_expr = self.convert_expression(&item.expression)?;
770 projections.push(ProjectExpr::Expression {
771 expr: filter_expr,
772 variable_columns: variable_columns.clone(),
773 });
774 output_types.push(LogicalType::Any);
776 }
777 _ => {
778 return Err(Error::Internal(format!(
779 "Unsupported RETURN expression: {:?}",
780 item.expression
781 )));
782 }
783 }
784 }
785
786 let operator = Box::new(ProjectOperator::with_store(
787 input_op,
788 projections,
789 output_types,
790 Arc::clone(&self.store),
791 ));
792
793 Ok((operator, columns))
794 } else {
795 let mut projections = Vec::with_capacity(ret.items.len());
798 let mut output_types = Vec::with_capacity(ret.items.len());
799
800 for item in &ret.items {
801 if let LogicalExpression::Variable(name) = &item.expression {
802 let col_idx = *variable_columns.get(name).ok_or_else(|| {
803 Error::Internal(format!("Variable '{}' not found in input", name))
804 })?;
805 projections.push(ProjectExpr::Column(col_idx));
806 output_types.push(LogicalType::Node);
807 }
808 }
809
810 if projections.len() == input_columns.len()
812 && projections
813 .iter()
814 .enumerate()
815 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
816 {
817 Ok((input_op, columns))
819 } else {
820 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
821 Ok((operator, columns))
822 }
823 }
824 }
825
826 fn plan_project(
828 &self,
829 project: &crate::query::plan::ProjectOp,
830 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
831 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
833 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
834 let single_row_op: Box<dyn Operator> = Box::new(
836 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
837 );
838 (single_row_op, Vec::new())
839 } else {
840 self.plan_operator(&project.input)?
841 };
842
843 let variable_columns: HashMap<String, usize> = input_columns
845 .iter()
846 .enumerate()
847 .map(|(i, name)| (name.clone(), i))
848 .collect();
849
850 let mut projections = Vec::with_capacity(project.projections.len());
852 let mut output_types = Vec::with_capacity(project.projections.len());
853 let mut output_columns = Vec::with_capacity(project.projections.len());
854
855 for projection in &project.projections {
856 let col_name = projection
858 .alias
859 .clone()
860 .unwrap_or_else(|| expression_to_string(&projection.expression));
861 output_columns.push(col_name);
862
863 match &projection.expression {
864 LogicalExpression::Variable(name) => {
865 let col_idx = *variable_columns.get(name).ok_or_else(|| {
866 Error::Internal(format!("Variable '{}' not found in input", name))
867 })?;
868 projections.push(ProjectExpr::Column(col_idx));
869 output_types.push(LogicalType::Node);
870 }
871 LogicalExpression::Property { variable, property } => {
872 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
873 Error::Internal(format!("Variable '{}' not found in input", variable))
874 })?;
875 projections.push(ProjectExpr::PropertyAccess {
876 column: col_idx,
877 property: property.clone(),
878 });
879 output_types.push(LogicalType::Any);
880 }
881 LogicalExpression::Literal(value) => {
882 projections.push(ProjectExpr::Constant(value.clone()));
883 output_types.push(value_to_logical_type(value));
884 }
885 _ => {
886 let filter_expr = self.convert_expression(&projection.expression)?;
888 projections.push(ProjectExpr::Expression {
889 expr: filter_expr,
890 variable_columns: variable_columns.clone(),
891 });
892 output_types.push(LogicalType::Any);
893 }
894 }
895 }
896
897 let operator = Box::new(ProjectOperator::with_store(
898 input_op,
899 projections,
900 output_types,
901 Arc::clone(&self.store),
902 ));
903
904 Ok((operator, output_columns))
905 }
906
907 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
913 if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
916 let (_, columns) = self.plan_operator(&filter.input)?;
918 let schema = self.derive_schema_from_columns(&columns);
919 let empty_op = Box::new(EmptyOperator::new(schema));
920 return Ok((empty_op, columns));
921 }
922
923 if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
925 return Ok(result);
926 }
927
928 if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
930 return Ok(result);
931 }
932
933 let (input_op, columns) = self.plan_operator(&filter.input)?;
935
936 let variable_columns: HashMap<String, usize> = columns
938 .iter()
939 .enumerate()
940 .map(|(i, name)| (name.clone(), i))
941 .collect();
942
943 let filter_expr = self.convert_expression(&filter.predicate)?;
945
946 let predicate =
948 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
949
950 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
952
953 Ok((operator, columns))
954 }
955
956 fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
963 use grafeo_core::graph::lpg::CompareOp;
964
965 match predicate {
966 LogicalExpression::Binary { left, op, right } => {
967 match op {
969 BinaryOp::And => {
970 let left_result = self.check_zone_map_for_predicate(left);
971 let right_result = self.check_zone_map_for_predicate(right);
972
973 return match (left_result, right_result) {
974 (Some(false), _) | (_, Some(false)) => Some(false),
976 (Some(true), Some(true)) => Some(true),
978 _ => None,
980 };
981 }
982 BinaryOp::Or => {
983 let left_result = self.check_zone_map_for_predicate(left);
984 let right_result = self.check_zone_map_for_predicate(right);
985
986 return match (left_result, right_result) {
987 (Some(false), Some(false)) => Some(false),
989 (Some(true), _) | (_, Some(true)) => Some(true),
991 _ => None,
993 };
994 }
995 _ => {}
996 }
997
998 let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1000 (
1001 LogicalExpression::Property { property, .. },
1002 LogicalExpression::Literal(val),
1003 ) => {
1004 let cmp = match op {
1005 BinaryOp::Eq => CompareOp::Eq,
1006 BinaryOp::Ne => CompareOp::Ne,
1007 BinaryOp::Lt => CompareOp::Lt,
1008 BinaryOp::Le => CompareOp::Le,
1009 BinaryOp::Gt => CompareOp::Gt,
1010 BinaryOp::Ge => CompareOp::Ge,
1011 _ => return None,
1012 };
1013 (property.clone(), cmp, val.clone())
1014 }
1015 (
1016 LogicalExpression::Literal(val),
1017 LogicalExpression::Property { property, .. },
1018 ) => {
1019 let cmp = match op {
1021 BinaryOp::Eq => CompareOp::Eq,
1022 BinaryOp::Ne => CompareOp::Ne,
1023 BinaryOp::Lt => CompareOp::Gt, BinaryOp::Le => CompareOp::Ge,
1025 BinaryOp::Gt => CompareOp::Lt,
1026 BinaryOp::Ge => CompareOp::Le,
1027 _ => return None,
1028 };
1029 (property.clone(), cmp, val.clone())
1030 }
1031 _ => return None,
1032 };
1033
1034 let might_match =
1036 self.store
1037 .node_property_might_match(&property.into(), compare_op, &value);
1038
1039 Some(might_match)
1040 }
1041
1042 _ => None,
1043 }
1044 }
1045
1046 fn try_plan_filter_with_property_index(
1055 &self,
1056 filter: &FilterOp,
1057 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1058 let (scan_variable, scan_label) = match filter.input.as_ref() {
1060 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1061 (scan.variable.clone(), scan.label.clone())
1062 }
1063 _ => return Ok(None),
1064 };
1065
1066 let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1069
1070 if conditions.is_empty() {
1071 return Ok(None);
1072 }
1073
1074 let has_indexed_condition = conditions
1076 .iter()
1077 .any(|(prop, _)| self.store.has_property_index(prop));
1078
1079 if !has_indexed_condition {
1080 return Ok(None);
1081 }
1082
1083 let conditions_ref: Vec<(&str, Value)> = conditions
1085 .iter()
1086 .map(|(p, v)| (p.as_str(), v.clone()))
1087 .collect();
1088 let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1089
1090 if let Some(label) = &scan_label {
1092 let label_nodes: std::collections::HashSet<_> =
1093 self.store.nodes_by_label(label).into_iter().collect();
1094 matching_nodes.retain(|n| label_nodes.contains(n));
1095 }
1096
1097 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1099 let columns = vec![scan_variable];
1100
1101 Ok(Some((node_list_op, columns)))
1102 }
1103
1104 fn extract_equality_conditions(
1110 &self,
1111 predicate: &LogicalExpression,
1112 target_variable: &str,
1113 ) -> Vec<(String, Value)> {
1114 let mut conditions = Vec::new();
1115 self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1116 conditions
1117 }
1118
1119 fn collect_equality_conditions(
1121 &self,
1122 expr: &LogicalExpression,
1123 target_variable: &str,
1124 conditions: &mut Vec<(String, Value)>,
1125 ) {
1126 match expr {
1127 LogicalExpression::Binary {
1129 left,
1130 op: BinaryOp::And,
1131 right,
1132 } => {
1133 self.collect_equality_conditions(left, target_variable, conditions);
1134 self.collect_equality_conditions(right, target_variable, conditions);
1135 }
1136
1137 LogicalExpression::Binary {
1139 left,
1140 op: BinaryOp::Eq,
1141 right,
1142 } => {
1143 if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1144 && var == target_variable
1145 {
1146 conditions.push((prop, val));
1147 }
1148 }
1149
1150 _ => {}
1151 }
1152 }
1153
1154 fn extract_property_equality(
1156 &self,
1157 left: &LogicalExpression,
1158 right: &LogicalExpression,
1159 ) -> Option<(String, String, Value)> {
1160 match (left, right) {
1161 (
1162 LogicalExpression::Property { variable, property },
1163 LogicalExpression::Literal(val),
1164 ) => Some((variable.clone(), property.clone(), val.clone())),
1165 (
1166 LogicalExpression::Literal(val),
1167 LogicalExpression::Property { variable, property },
1168 ) => Some((variable.clone(), property.clone(), val.clone())),
1169 _ => None,
1170 }
1171 }
1172
1173 fn try_plan_filter_with_range_index(
1186 &self,
1187 filter: &FilterOp,
1188 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1189 let (scan_variable, scan_label) = match filter.input.as_ref() {
1191 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1192 (scan.variable.clone(), scan.label.clone())
1193 }
1194 _ => return Ok(None),
1195 };
1196
1197 if let Some((variable, property, min, max, min_inc, max_inc)) =
1199 self.extract_between_predicate(&filter.predicate)
1200 && variable == scan_variable
1201 {
1202 return self.plan_range_filter(
1203 &scan_variable,
1204 &scan_label,
1205 &property,
1206 RangeBounds {
1207 min: Some(&min),
1208 max: Some(&max),
1209 min_inclusive: min_inc,
1210 max_inclusive: max_inc,
1211 },
1212 );
1213 }
1214
1215 if let Some((variable, property, op, value)) =
1217 self.extract_range_predicate(&filter.predicate)
1218 && variable == scan_variable
1219 {
1220 let (min, max, min_inc, max_inc) = match op {
1221 BinaryOp::Lt => (None, Some(value), false, false),
1222 BinaryOp::Le => (None, Some(value), false, true),
1223 BinaryOp::Gt => (Some(value), None, false, false),
1224 BinaryOp::Ge => (Some(value), None, true, false),
1225 _ => return Ok(None),
1226 };
1227 return self.plan_range_filter(
1228 &scan_variable,
1229 &scan_label,
1230 &property,
1231 RangeBounds {
1232 min: min.as_ref(),
1233 max: max.as_ref(),
1234 min_inclusive: min_inc,
1235 max_inclusive: max_inc,
1236 },
1237 );
1238 }
1239
1240 Ok(None)
1241 }
1242
1243 fn plan_range_filter(
1245 &self,
1246 scan_variable: &str,
1247 scan_label: &Option<String>,
1248 property: &str,
1249 bounds: RangeBounds<'_>,
1250 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1251 let mut matching_nodes = self.store.find_nodes_in_range(
1253 property,
1254 bounds.min,
1255 bounds.max,
1256 bounds.min_inclusive,
1257 bounds.max_inclusive,
1258 );
1259
1260 if let Some(label) = scan_label {
1262 let label_nodes: std::collections::HashSet<_> =
1263 self.store.nodes_by_label(label).into_iter().collect();
1264 matching_nodes.retain(|n| label_nodes.contains(n));
1265 }
1266
1267 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1269 let columns = vec![scan_variable.to_string()];
1270
1271 Ok(Some((node_list_op, columns)))
1272 }
1273
1274 fn extract_range_predicate(
1278 &self,
1279 predicate: &LogicalExpression,
1280 ) -> Option<(String, String, BinaryOp, Value)> {
1281 match predicate {
1282 LogicalExpression::Binary { left, op, right } => {
1283 match op {
1284 BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1285 if let (
1287 LogicalExpression::Property { variable, property },
1288 LogicalExpression::Literal(val),
1289 ) = (left.as_ref(), right.as_ref())
1290 {
1291 return Some((variable.clone(), property.clone(), *op, val.clone()));
1292 }
1293
1294 if let (
1296 LogicalExpression::Literal(val),
1297 LogicalExpression::Property { variable, property },
1298 ) = (left.as_ref(), right.as_ref())
1299 {
1300 let flipped_op = match op {
1301 BinaryOp::Lt => BinaryOp::Gt,
1302 BinaryOp::Le => BinaryOp::Ge,
1303 BinaryOp::Gt => BinaryOp::Lt,
1304 BinaryOp::Ge => BinaryOp::Le,
1305 _ => return None,
1306 };
1307 return Some((
1308 variable.clone(),
1309 property.clone(),
1310 flipped_op,
1311 val.clone(),
1312 ));
1313 }
1314 }
1315 _ => {}
1316 }
1317 }
1318 _ => {}
1319 }
1320 None
1321 }
1322
1323 fn extract_between_predicate(
1331 &self,
1332 predicate: &LogicalExpression,
1333 ) -> Option<(String, String, Value, Value, bool, bool)> {
1334 let (left, right) = match predicate {
1336 LogicalExpression::Binary {
1337 left,
1338 op: BinaryOp::And,
1339 right,
1340 } => (left.as_ref(), right.as_ref()),
1341 _ => return None,
1342 };
1343
1344 let left_range = self.extract_range_predicate(left);
1346 let right_range = self.extract_range_predicate(right);
1347
1348 let (left_var, left_prop, left_op, left_val) = left_range?;
1349 let (right_var, right_prop, right_op, right_val) = right_range?;
1350
1351 if left_var != right_var || left_prop != right_prop {
1353 return None;
1354 }
1355
1356 let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1358 (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1360 (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1362 (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1364 (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1366 (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1368 (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1370 (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1372 (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1374 _ => return None,
1375 };
1376
1377 Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1378 }
1379
1380 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1382 let (input_op, columns) = self.plan_operator(&limit.input)?;
1383 let output_schema = self.derive_schema_from_columns(&columns);
1384 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1385 Ok((operator, columns))
1386 }
1387
1388 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1390 let (input_op, columns) = self.plan_operator(&skip.input)?;
1391 let output_schema = self.derive_schema_from_columns(&columns);
1392 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1393 Ok((operator, columns))
1394 }
1395
1396 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1398 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1399
1400 let mut variable_columns: HashMap<String, usize> = input_columns
1402 .iter()
1403 .enumerate()
1404 .map(|(i, name)| (name.clone(), i))
1405 .collect();
1406
1407 let mut property_projections: Vec<(String, String, String)> = Vec::new();
1409 let mut next_col_idx = input_columns.len();
1410
1411 for key in &sort.keys {
1412 if let LogicalExpression::Property { variable, property } = &key.expression {
1413 let col_name = format!("{}_{}", variable, property);
1414 if !variable_columns.contains_key(&col_name) {
1415 property_projections.push((
1416 variable.clone(),
1417 property.clone(),
1418 col_name.clone(),
1419 ));
1420 variable_columns.insert(col_name, next_col_idx);
1421 next_col_idx += 1;
1422 }
1423 }
1424 }
1425
1426 let mut output_columns = input_columns.clone();
1428
1429 if !property_projections.is_empty() {
1431 let mut projections = Vec::new();
1432 let mut output_types = Vec::new();
1433
1434 for (i, _) in input_columns.iter().enumerate() {
1437 projections.push(ProjectExpr::Column(i));
1438 output_types.push(LogicalType::Node);
1439 }
1440
1441 for (variable, property, col_name) in &property_projections {
1443 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1444 Error::Internal(format!(
1445 "Variable '{}' not found for ORDER BY property projection",
1446 variable
1447 ))
1448 })?;
1449 projections.push(ProjectExpr::PropertyAccess {
1450 column: source_col,
1451 property: property.clone(),
1452 });
1453 output_types.push(LogicalType::Any);
1454 output_columns.push(col_name.clone());
1455 }
1456
1457 input_op = Box::new(ProjectOperator::with_store(
1458 input_op,
1459 projections,
1460 output_types,
1461 Arc::clone(&self.store),
1462 ));
1463 }
1464
1465 let physical_keys: Vec<PhysicalSortKey> = sort
1467 .keys
1468 .iter()
1469 .map(|key| {
1470 let col_idx = self
1471 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1472 Ok(PhysicalSortKey {
1473 column: col_idx,
1474 direction: match key.order {
1475 SortOrder::Ascending => SortDirection::Ascending,
1476 SortOrder::Descending => SortDirection::Descending,
1477 },
1478 null_order: NullOrder::NullsLast,
1479 })
1480 })
1481 .collect::<Result<Vec<_>>>()?;
1482
1483 let output_schema = self.derive_schema_from_columns(&output_columns);
1484 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1485 Ok((operator, output_columns))
1486 }
1487
1488 fn resolve_sort_expression_with_properties(
1490 &self,
1491 expr: &LogicalExpression,
1492 variable_columns: &HashMap<String, usize>,
1493 ) -> Result<usize> {
1494 match expr {
1495 LogicalExpression::Variable(name) => {
1496 variable_columns.get(name).copied().ok_or_else(|| {
1497 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1498 })
1499 }
1500 LogicalExpression::Property { variable, property } => {
1501 let col_name = format!("{}_{}", variable, property);
1503 variable_columns.get(&col_name).copied().ok_or_else(|| {
1504 Error::Internal(format!(
1505 "Property column '{}' not found for ORDER BY (from {}.{})",
1506 col_name, variable, property
1507 ))
1508 })
1509 }
1510 _ => Err(Error::Internal(format!(
1511 "Unsupported ORDER BY expression: {:?}",
1512 expr
1513 ))),
1514 }
1515 }
1516
1517 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1519 columns.iter().map(|_| LogicalType::Any).collect()
1520 }
1521
1522 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1524 if self.factorized_execution
1531 && agg.group_by.is_empty()
1532 && Self::count_expand_chain(&agg.input).0 >= 2
1533 && self.is_simple_aggregate(agg)
1534 && let Ok((op, cols)) = self.plan_factorized_aggregate(agg)
1535 {
1536 return Ok((op, cols));
1537 }
1538 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1541
1542 let mut variable_columns: HashMap<String, usize> = input_columns
1544 .iter()
1545 .enumerate()
1546 .map(|(i, name)| (name.clone(), i))
1547 .collect();
1548
1549 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1552
1553 for expr in &agg.group_by {
1555 if let LogicalExpression::Property { variable, property } = expr {
1556 let col_name = format!("{}_{}", variable, property);
1557 if !variable_columns.contains_key(&col_name) {
1558 property_projections.push((
1559 variable.clone(),
1560 property.clone(),
1561 col_name.clone(),
1562 ));
1563 variable_columns.insert(col_name, next_col_idx);
1564 next_col_idx += 1;
1565 }
1566 }
1567 }
1568
1569 for agg_expr in &agg.aggregates {
1571 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1572 let col_name = format!("{}_{}", variable, property);
1573 if !variable_columns.contains_key(&col_name) {
1574 property_projections.push((
1575 variable.clone(),
1576 property.clone(),
1577 col_name.clone(),
1578 ));
1579 variable_columns.insert(col_name, next_col_idx);
1580 next_col_idx += 1;
1581 }
1582 }
1583 }
1584
1585 if !property_projections.is_empty() {
1587 let mut projections = Vec::new();
1588 let mut output_types = Vec::new();
1589
1590 for (i, _) in input_columns.iter().enumerate() {
1593 projections.push(ProjectExpr::Column(i));
1594 output_types.push(LogicalType::Node);
1595 }
1596
1597 for (variable, property, _col_name) in &property_projections {
1599 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1600 Error::Internal(format!(
1601 "Variable '{}' not found for property projection",
1602 variable
1603 ))
1604 })?;
1605 projections.push(ProjectExpr::PropertyAccess {
1606 column: source_col,
1607 property: property.clone(),
1608 });
1609 output_types.push(LogicalType::Any); }
1611
1612 input_op = Box::new(ProjectOperator::with_store(
1613 input_op,
1614 projections,
1615 output_types,
1616 Arc::clone(&self.store),
1617 ));
1618 }
1619
1620 let group_columns: Vec<usize> = agg
1622 .group_by
1623 .iter()
1624 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1625 .collect::<Result<Vec<_>>>()?;
1626
1627 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1629 .aggregates
1630 .iter()
1631 .map(|agg_expr| {
1632 let column = agg_expr
1633 .expression
1634 .as_ref()
1635 .map(|e| {
1636 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1637 })
1638 .transpose()?;
1639
1640 Ok(PhysicalAggregateExpr {
1641 function: convert_aggregate_function(agg_expr.function),
1642 column,
1643 distinct: agg_expr.distinct,
1644 alias: agg_expr.alias.clone(),
1645 percentile: agg_expr.percentile,
1646 })
1647 })
1648 .collect::<Result<Vec<_>>>()?;
1649
1650 let mut output_schema = Vec::new();
1652 let mut output_columns = Vec::new();
1653
1654 for expr in &agg.group_by {
1656 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1658 }
1659
1660 for agg_expr in &agg.aggregates {
1662 let result_type = match agg_expr.function {
1663 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1664 LogicalType::Int64
1665 }
1666 LogicalAggregateFunction::Sum => LogicalType::Int64,
1667 LogicalAggregateFunction::Avg => LogicalType::Float64,
1668 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1669 LogicalType::Int64
1673 }
1674 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1677 | LogicalAggregateFunction::StdDevPop
1678 | LogicalAggregateFunction::PercentileDisc
1679 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1680 };
1681 output_schema.push(result_type);
1682 output_columns.push(
1683 agg_expr
1684 .alias
1685 .clone()
1686 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1687 );
1688 }
1689
1690 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1692 Box::new(SimpleAggregateOperator::new(
1693 input_op,
1694 physical_aggregates,
1695 output_schema,
1696 ))
1697 } else {
1698 Box::new(HashAggregateOperator::new(
1699 input_op,
1700 group_columns,
1701 physical_aggregates,
1702 output_schema,
1703 ))
1704 };
1705
1706 if let Some(having_expr) = &agg.having {
1708 let having_var_columns: HashMap<String, usize> = output_columns
1710 .iter()
1711 .enumerate()
1712 .map(|(i, name)| (name.clone(), i))
1713 .collect();
1714
1715 let filter_expr = self.convert_expression(having_expr)?;
1716 let predicate =
1717 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1718 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1719 }
1720
1721 Ok((operator, output_columns))
1722 }
1723
1724 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1730 agg.aggregates.iter().all(|agg_expr| {
1731 match agg_expr.function {
1732 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1733 agg_expr.expression.is_none()
1735 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1736 }
1737 LogicalAggregateFunction::Sum
1738 | LogicalAggregateFunction::Avg
1739 | LogicalAggregateFunction::Min
1740 | LogicalAggregateFunction::Max => {
1741 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1744 }
1745 _ => false,
1747 }
1748 })
1749 }
1750
1751 fn plan_factorized_aggregate(
1755 &self,
1756 agg: &AggregateOp,
1757 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1758 let expands = Self::collect_expand_chain(&agg.input);
1760 if expands.is_empty() {
1761 return Err(Error::Internal(
1762 "Expected expand chain for factorized aggregate".to_string(),
1763 ));
1764 }
1765
1766 let first_expand = expands[0];
1768 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1769
1770 let mut columns = base_columns.clone();
1771 let mut steps = Vec::new();
1772 let mut is_first = true;
1773
1774 for expand in &expands {
1775 let source_column = if is_first {
1777 base_columns
1778 .iter()
1779 .position(|c| c == &expand.from_variable)
1780 .ok_or_else(|| {
1781 Error::Internal(format!(
1782 "Source variable '{}' not found in base columns",
1783 expand.from_variable
1784 ))
1785 })?
1786 } else {
1787 1 };
1789
1790 let direction = match expand.direction {
1791 ExpandDirection::Outgoing => Direction::Outgoing,
1792 ExpandDirection::Incoming => Direction::Incoming,
1793 ExpandDirection::Both => Direction::Both,
1794 };
1795
1796 steps.push(ExpandStep {
1797 source_column,
1798 direction,
1799 edge_type: expand.edge_type.clone(),
1800 });
1801
1802 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1803 let count = self.anon_edge_counter.get();
1804 self.anon_edge_counter.set(count + 1);
1805 format!("_anon_edge_{}", count)
1806 });
1807 columns.push(edge_col_name);
1808 columns.push(expand.to_variable.clone());
1809
1810 is_first = false;
1811 }
1812
1813 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1815
1816 if let Some(tx_id) = self.tx_id {
1817 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1818 } else {
1819 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1820 }
1821
1822 let factorized_aggs: Vec<FactorizedAggregate> = agg
1824 .aggregates
1825 .iter()
1826 .map(|agg_expr| {
1827 match agg_expr.function {
1828 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1829 if agg_expr.expression.is_none() {
1831 FactorizedAggregate::count()
1832 } else {
1833 FactorizedAggregate::count_column(1) }
1837 }
1838 LogicalAggregateFunction::Sum => {
1839 FactorizedAggregate::sum(1)
1841 }
1842 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1843 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1844 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1845 _ => {
1846 FactorizedAggregate::count()
1848 }
1849 }
1850 })
1851 .collect();
1852
1853 let output_columns: Vec<String> = agg
1855 .aggregates
1856 .iter()
1857 .map(|agg_expr| {
1858 agg_expr
1859 .alias
1860 .clone()
1861 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1862 })
1863 .collect();
1864
1865 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1867
1868 Ok((Box::new(factorized_agg_op), output_columns))
1869 }
1870
1871 #[allow(dead_code)]
1873 fn resolve_expression_to_column(
1874 &self,
1875 expr: &LogicalExpression,
1876 variable_columns: &HashMap<String, usize>,
1877 ) -> Result<usize> {
1878 match expr {
1879 LogicalExpression::Variable(name) => variable_columns
1880 .get(name)
1881 .copied()
1882 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1883 LogicalExpression::Property { variable, .. } => variable_columns
1884 .get(variable)
1885 .copied()
1886 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1887 _ => Err(Error::Internal(format!(
1888 "Cannot resolve expression to column: {:?}",
1889 expr
1890 ))),
1891 }
1892 }
1893
1894 fn resolve_expression_to_column_with_properties(
1898 &self,
1899 expr: &LogicalExpression,
1900 variable_columns: &HashMap<String, usize>,
1901 ) -> Result<usize> {
1902 match expr {
1903 LogicalExpression::Variable(name) => variable_columns
1904 .get(name)
1905 .copied()
1906 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1907 LogicalExpression::Property { variable, property } => {
1908 let col_name = format!("{}_{}", variable, property);
1910 variable_columns.get(&col_name).copied().ok_or_else(|| {
1911 Error::Internal(format!(
1912 "Property column '{}' not found (from {}.{})",
1913 col_name, variable, property
1914 ))
1915 })
1916 }
1917 _ => Err(Error::Internal(format!(
1918 "Cannot resolve expression to column: {:?}",
1919 expr
1920 ))),
1921 }
1922 }
1923
1924 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1926 match expr {
1927 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1928 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1929 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1930 variable: variable.clone(),
1931 property: property.clone(),
1932 }),
1933 LogicalExpression::Binary { left, op, right } => {
1934 let left_expr = self.convert_expression(left)?;
1935 let right_expr = self.convert_expression(right)?;
1936 let filter_op = convert_binary_op(*op)?;
1937 Ok(FilterExpression::Binary {
1938 left: Box::new(left_expr),
1939 op: filter_op,
1940 right: Box::new(right_expr),
1941 })
1942 }
1943 LogicalExpression::Unary { op, operand } => {
1944 let operand_expr = self.convert_expression(operand)?;
1945 let filter_op = convert_unary_op(*op)?;
1946 Ok(FilterExpression::Unary {
1947 op: filter_op,
1948 operand: Box::new(operand_expr),
1949 })
1950 }
1951 LogicalExpression::FunctionCall { name, args, .. } => {
1952 let filter_args: Vec<FilterExpression> = args
1953 .iter()
1954 .map(|a| self.convert_expression(a))
1955 .collect::<Result<Vec<_>>>()?;
1956 Ok(FilterExpression::FunctionCall {
1957 name: name.clone(),
1958 args: filter_args,
1959 })
1960 }
1961 LogicalExpression::Case {
1962 operand,
1963 when_clauses,
1964 else_clause,
1965 } => {
1966 let filter_operand = operand
1967 .as_ref()
1968 .map(|e| self.convert_expression(e))
1969 .transpose()?
1970 .map(Box::new);
1971 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1972 .iter()
1973 .map(|(cond, result)| {
1974 Ok((
1975 self.convert_expression(cond)?,
1976 self.convert_expression(result)?,
1977 ))
1978 })
1979 .collect::<Result<Vec<_>>>()?;
1980 let filter_else = else_clause
1981 .as_ref()
1982 .map(|e| self.convert_expression(e))
1983 .transpose()?
1984 .map(Box::new);
1985 Ok(FilterExpression::Case {
1986 operand: filter_operand,
1987 when_clauses: filter_when_clauses,
1988 else_clause: filter_else,
1989 })
1990 }
1991 LogicalExpression::List(items) => {
1992 let filter_items: Vec<FilterExpression> = items
1993 .iter()
1994 .map(|item| self.convert_expression(item))
1995 .collect::<Result<Vec<_>>>()?;
1996 Ok(FilterExpression::List(filter_items))
1997 }
1998 LogicalExpression::Map(pairs) => {
1999 let filter_pairs: Vec<(String, FilterExpression)> = pairs
2000 .iter()
2001 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2002 .collect::<Result<Vec<_>>>()?;
2003 Ok(FilterExpression::Map(filter_pairs))
2004 }
2005 LogicalExpression::IndexAccess { base, index } => {
2006 let base_expr = self.convert_expression(base)?;
2007 let index_expr = self.convert_expression(index)?;
2008 Ok(FilterExpression::IndexAccess {
2009 base: Box::new(base_expr),
2010 index: Box::new(index_expr),
2011 })
2012 }
2013 LogicalExpression::SliceAccess { base, start, end } => {
2014 let base_expr = self.convert_expression(base)?;
2015 let start_expr = start
2016 .as_ref()
2017 .map(|s| self.convert_expression(s))
2018 .transpose()?
2019 .map(Box::new);
2020 let end_expr = end
2021 .as_ref()
2022 .map(|e| self.convert_expression(e))
2023 .transpose()?
2024 .map(Box::new);
2025 Ok(FilterExpression::SliceAccess {
2026 base: Box::new(base_expr),
2027 start: start_expr,
2028 end: end_expr,
2029 })
2030 }
2031 LogicalExpression::Parameter(_) => Err(Error::Internal(
2032 "Parameters not yet supported in filters".to_string(),
2033 )),
2034 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2035 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2036 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2037 LogicalExpression::ListComprehension {
2038 variable,
2039 list_expr,
2040 filter_expr,
2041 map_expr,
2042 } => {
2043 let list = self.convert_expression(list_expr)?;
2044 let filter = filter_expr
2045 .as_ref()
2046 .map(|f| self.convert_expression(f))
2047 .transpose()?
2048 .map(Box::new);
2049 let map = self.convert_expression(map_expr)?;
2050 Ok(FilterExpression::ListComprehension {
2051 variable: variable.clone(),
2052 list_expr: Box::new(list),
2053 filter_expr: filter,
2054 map_expr: Box::new(map),
2055 })
2056 }
2057 LogicalExpression::ExistsSubquery(subplan) => {
2058 let (start_var, direction, edge_type, end_labels) =
2061 self.extract_exists_pattern(subplan)?;
2062
2063 Ok(FilterExpression::ExistsSubquery {
2064 start_var,
2065 direction,
2066 edge_type,
2067 end_labels,
2068 min_hops: None,
2069 max_hops: None,
2070 })
2071 }
2072 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2073 "COUNT subqueries not yet supported".to_string(),
2074 )),
2075 }
2076 }
2077
2078 fn extract_exists_pattern(
2081 &self,
2082 subplan: &LogicalOperator,
2083 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2084 match subplan {
2085 LogicalOperator::Expand(expand) => {
2086 let end_labels = self.extract_end_labels_from_expand(expand);
2088 let direction = match expand.direction {
2089 ExpandDirection::Outgoing => Direction::Outgoing,
2090 ExpandDirection::Incoming => Direction::Incoming,
2091 ExpandDirection::Both => Direction::Both,
2092 };
2093 Ok((
2094 expand.from_variable.clone(),
2095 direction,
2096 expand.edge_type.clone(),
2097 end_labels,
2098 ))
2099 }
2100 LogicalOperator::NodeScan(scan) => {
2101 if let Some(input) = &scan.input {
2102 self.extract_exists_pattern(input)
2103 } else {
2104 Err(Error::Internal(
2105 "EXISTS subquery must contain an edge pattern".to_string(),
2106 ))
2107 }
2108 }
2109 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2110 _ => Err(Error::Internal(
2111 "Unsupported EXISTS subquery pattern".to_string(),
2112 )),
2113 }
2114 }
2115
2116 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2118 match expand.input.as_ref() {
2120 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2121 _ => None,
2122 }
2123 }
2124
2125 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2127 let (left_op, left_columns) = self.plan_operator(&join.left)?;
2128 let (right_op, right_columns) = self.plan_operator(&join.right)?;
2129
2130 let mut columns = left_columns.clone();
2132 columns.extend(right_columns.clone());
2133
2134 let physical_join_type = match join.join_type {
2136 JoinType::Inner => PhysicalJoinType::Inner,
2137 JoinType::Left => PhysicalJoinType::Left,
2138 JoinType::Right => PhysicalJoinType::Right,
2139 JoinType::Full => PhysicalJoinType::Full,
2140 JoinType::Cross => PhysicalJoinType::Cross,
2141 JoinType::Semi => PhysicalJoinType::Semi,
2142 JoinType::Anti => PhysicalJoinType::Anti,
2143 };
2144
2145 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2147 (vec![], vec![])
2149 } else {
2150 join.conditions
2151 .iter()
2152 .filter_map(|cond| {
2153 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2155 let right_idx = self
2156 .expression_to_column(&cond.right, &right_columns)
2157 .ok()?;
2158 Some((left_idx, right_idx))
2159 })
2160 .unzip()
2161 };
2162
2163 let output_schema = self.derive_schema_from_columns(&columns);
2164
2165 let _ = LeapfrogJoinOperator::new; let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2173 left_op,
2174 right_op,
2175 probe_keys,
2176 build_keys,
2177 physical_join_type,
2178 output_schema,
2179 ));
2180
2181 Ok((operator, columns))
2182 }
2183
2184 #[allow(dead_code)]
2193 fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2194 let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2196 let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2197
2198 Self::collect_join_edges(
2200 &LogicalOperator::Join(join.clone()),
2201 &mut edges,
2202 &mut all_vars,
2203 );
2204
2205 if all_vars.len() < 3 {
2207 return false;
2208 }
2209
2210 Self::has_cycle(&edges, &all_vars)
2212 }
2213
2214 fn collect_join_edges(
2216 op: &LogicalOperator,
2217 edges: &mut HashMap<String, Vec<String>>,
2218 vars: &mut std::collections::HashSet<String>,
2219 ) {
2220 match op {
2221 LogicalOperator::Join(join) => {
2222 for cond in &join.conditions {
2224 if let (Some(left_var), Some(right_var)) = (
2225 Self::extract_join_variable(&cond.left),
2226 Self::extract_join_variable(&cond.right),
2227 ) && left_var != right_var
2228 {
2229 vars.insert(left_var.clone());
2230 vars.insert(right_var.clone());
2231
2232 edges
2234 .entry(left_var.clone())
2235 .or_default()
2236 .push(right_var.clone());
2237 edges.entry(right_var).or_default().push(left_var);
2238 }
2239 }
2240
2241 Self::collect_join_edges(&join.left, edges, vars);
2243 Self::collect_join_edges(&join.right, edges, vars);
2244 }
2245 LogicalOperator::Expand(expand) => {
2246 vars.insert(expand.from_variable.clone());
2248 vars.insert(expand.to_variable.clone());
2249
2250 edges
2251 .entry(expand.from_variable.clone())
2252 .or_default()
2253 .push(expand.to_variable.clone());
2254 edges
2255 .entry(expand.to_variable.clone())
2256 .or_default()
2257 .push(expand.from_variable.clone());
2258
2259 Self::collect_join_edges(&expand.input, edges, vars);
2260 }
2261 LogicalOperator::Filter(filter) => {
2262 Self::collect_join_edges(&filter.input, edges, vars);
2263 }
2264 LogicalOperator::NodeScan(scan) => {
2265 vars.insert(scan.variable.clone());
2266 }
2267 _ => {}
2268 }
2269 }
2270
2271 fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2273 match expr {
2274 LogicalExpression::Variable(v) => Some(v.clone()),
2275 LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2276 LogicalExpression::Id(v) => Some(v.clone()),
2277 _ => None,
2278 }
2279 }
2280
2281 fn has_cycle(
2285 edges: &HashMap<String, Vec<String>>,
2286 vars: &std::collections::HashSet<String>,
2287 ) -> bool {
2288 let mut color: HashMap<&String, u8> = HashMap::new();
2289
2290 for var in vars {
2291 color.insert(var, 0);
2292 }
2293
2294 for start in vars {
2295 if color[start] == 0 && Self::dfs_cycle(start, None, edges, &mut color) {
2296 return true;
2297 }
2298 }
2299
2300 false
2301 }
2302
2303 fn dfs_cycle(
2305 node: &String,
2306 parent: Option<&String>,
2307 edges: &HashMap<String, Vec<String>>,
2308 color: &mut HashMap<&String, u8>,
2309 ) -> bool {
2310 *color.get_mut(node).unwrap() = 1; if let Some(neighbors) = edges.get(node) {
2313 for neighbor in neighbors {
2314 if parent == Some(neighbor) {
2316 continue;
2317 }
2318
2319 if let Some(&c) = color.get(neighbor) {
2320 if c == 1 {
2321 return true;
2323 }
2324 if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2325 return true;
2326 }
2327 }
2328 }
2329 }
2330
2331 *color.get_mut(node).unwrap() = 2; false
2333 }
2334
2335 #[allow(dead_code)]
2337 fn count_relations(op: &LogicalOperator) -> usize {
2338 match op {
2339 LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2340 LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2341 LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2342 LogicalOperator::Join(j) => {
2343 Self::count_relations(&j.left) + Self::count_relations(&j.right)
2344 }
2345 _ => 0,
2346 }
2347 }
2348
2349 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2351 match expr {
2352 LogicalExpression::Variable(name) => columns
2353 .iter()
2354 .position(|c| c == name)
2355 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2356 _ => Err(Error::Internal(
2357 "Only variables supported in join conditions".to_string(),
2358 )),
2359 }
2360 }
2361
2362 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2364 if union.inputs.is_empty() {
2365 return Err(Error::Internal(
2366 "Union requires at least one input".to_string(),
2367 ));
2368 }
2369
2370 let mut inputs = Vec::with_capacity(union.inputs.len());
2371 let mut columns = Vec::new();
2372
2373 for (i, input) in union.inputs.iter().enumerate() {
2374 let (op, cols) = self.plan_operator(input)?;
2375 if i == 0 {
2376 columns = cols;
2377 }
2378 inputs.push(op);
2379 }
2380
2381 let output_schema = self.derive_schema_from_columns(&columns);
2382 let operator = Box::new(UnionOperator::new(inputs, output_schema));
2383
2384 Ok((operator, columns))
2385 }
2386
2387 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2389 let (input_op, columns) = self.plan_operator(&distinct.input)?;
2390 let output_schema = self.derive_schema_from_columns(&columns);
2391 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2392 Ok((operator, columns))
2393 }
2394
2395 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2397 let (input_op, mut columns) = if let Some(ref input) = create.input {
2399 let (op, cols) = self.plan_operator(input)?;
2400 (Some(op), cols)
2401 } else {
2402 (None, vec![])
2403 };
2404
2405 let output_column = columns.len();
2407 columns.push(create.variable.clone());
2408
2409 let properties: Vec<(String, PropertySource)> = create
2411 .properties
2412 .iter()
2413 .map(|(name, expr)| {
2414 let source = match Self::try_fold_expression(expr) {
2415 Some(value) => PropertySource::Constant(value),
2416 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2417 };
2418 (name.clone(), source)
2419 })
2420 .collect();
2421
2422 let output_schema = self.derive_schema_from_columns(&columns);
2423
2424 let operator = Box::new(
2425 CreateNodeOperator::new(
2426 Arc::clone(&self.store),
2427 input_op,
2428 create.labels.clone(),
2429 properties,
2430 output_schema,
2431 output_column,
2432 )
2433 .with_tx_context(self.viewing_epoch, self.tx_id),
2434 );
2435
2436 Ok((operator, columns))
2437 }
2438
2439 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2441 let (input_op, mut columns) = self.plan_operator(&create.input)?;
2442
2443 let from_column = columns
2445 .iter()
2446 .position(|c| c == &create.from_variable)
2447 .ok_or_else(|| {
2448 Error::Internal(format!(
2449 "Source variable '{}' not found",
2450 create.from_variable
2451 ))
2452 })?;
2453
2454 let to_column = columns
2455 .iter()
2456 .position(|c| c == &create.to_variable)
2457 .ok_or_else(|| {
2458 Error::Internal(format!(
2459 "Target variable '{}' not found",
2460 create.to_variable
2461 ))
2462 })?;
2463
2464 let output_column = create.variable.as_ref().map(|v| {
2466 let idx = columns.len();
2467 columns.push(v.clone());
2468 idx
2469 });
2470
2471 let properties: Vec<(String, PropertySource)> = create
2473 .properties
2474 .iter()
2475 .map(|(name, expr)| {
2476 let source = match Self::try_fold_expression(expr) {
2477 Some(value) => PropertySource::Constant(value),
2478 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2479 };
2480 (name.clone(), source)
2481 })
2482 .collect();
2483
2484 let output_schema = self.derive_schema_from_columns(&columns);
2485
2486 let mut operator = CreateEdgeOperator::new(
2487 Arc::clone(&self.store),
2488 input_op,
2489 from_column,
2490 to_column,
2491 create.edge_type.clone(),
2492 output_schema,
2493 )
2494 .with_properties(properties)
2495 .with_tx_context(self.viewing_epoch, self.tx_id);
2496
2497 if let Some(col) = output_column {
2498 operator = operator.with_output_column(col);
2499 }
2500
2501 let operator = Box::new(operator);
2502
2503 Ok((operator, columns))
2504 }
2505
2506 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2508 let (input_op, columns) = self.plan_operator(&delete.input)?;
2509
2510 let node_column = columns
2511 .iter()
2512 .position(|c| c == &delete.variable)
2513 .ok_or_else(|| {
2514 Error::Internal(format!(
2515 "Variable '{}' not found for delete",
2516 delete.variable
2517 ))
2518 })?;
2519
2520 let output_schema = vec![LogicalType::Int64];
2522 let output_columns = vec!["deleted_count".to_string()];
2523
2524 let operator = Box::new(
2525 DeleteNodeOperator::new(
2526 Arc::clone(&self.store),
2527 input_op,
2528 node_column,
2529 output_schema,
2530 delete.detach, )
2532 .with_tx_context(self.viewing_epoch, self.tx_id),
2533 );
2534
2535 Ok((operator, output_columns))
2536 }
2537
2538 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2540 let (input_op, columns) = self.plan_operator(&delete.input)?;
2541
2542 let edge_column = columns
2543 .iter()
2544 .position(|c| c == &delete.variable)
2545 .ok_or_else(|| {
2546 Error::Internal(format!(
2547 "Variable '{}' not found for delete",
2548 delete.variable
2549 ))
2550 })?;
2551
2552 let output_schema = vec![LogicalType::Int64];
2554 let output_columns = vec!["deleted_count".to_string()];
2555
2556 let operator = Box::new(
2557 DeleteEdgeOperator::new(
2558 Arc::clone(&self.store),
2559 input_op,
2560 edge_column,
2561 output_schema,
2562 )
2563 .with_tx_context(self.viewing_epoch, self.tx_id),
2564 );
2565
2566 Ok((operator, output_columns))
2567 }
2568
2569 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2571 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2572 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2573
2574 let mut columns = left_columns.clone();
2576 columns.extend(right_columns.clone());
2577
2578 let mut probe_keys = Vec::new();
2580 let mut build_keys = Vec::new();
2581
2582 for (right_idx, right_col) in right_columns.iter().enumerate() {
2583 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2584 probe_keys.push(left_idx);
2585 build_keys.push(right_idx);
2586 }
2587 }
2588
2589 let output_schema = self.derive_schema_from_columns(&columns);
2590
2591 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2592 left_op,
2593 right_op,
2594 probe_keys,
2595 build_keys,
2596 PhysicalJoinType::Left,
2597 output_schema,
2598 ));
2599
2600 Ok((operator, columns))
2601 }
2602
2603 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2605 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2606 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2607
2608 let columns = left_columns.clone();
2610
2611 let mut probe_keys = Vec::new();
2613 let mut build_keys = Vec::new();
2614
2615 for (right_idx, right_col) in right_columns.iter().enumerate() {
2616 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2617 probe_keys.push(left_idx);
2618 build_keys.push(right_idx);
2619 }
2620 }
2621
2622 let output_schema = self.derive_schema_from_columns(&columns);
2623
2624 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2625 left_op,
2626 right_op,
2627 probe_keys,
2628 build_keys,
2629 PhysicalJoinType::Anti,
2630 output_schema,
2631 ));
2632
2633 Ok((operator, columns))
2634 }
2635
2636 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2638 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2641 if matches!(&*unwind.input, LogicalOperator::Empty) {
2642 let literal_list = self.convert_expression(&unwind.expression)?;
2647
2648 let single_row_op: Box<dyn Operator> = Box::new(
2650 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2651 );
2652 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2653 single_row_op,
2654 vec![ProjectExpr::Expression {
2655 expr: literal_list,
2656 variable_columns: HashMap::new(),
2657 }],
2658 vec![LogicalType::Any],
2659 Arc::clone(&self.store),
2660 ));
2661
2662 (project_op, vec!["__list__".to_string()])
2663 } else {
2664 self.plan_operator(&unwind.input)?
2665 };
2666
2667 let list_col_idx = match &unwind.expression {
2673 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2674 LogicalExpression::Property { variable, .. } => {
2675 input_columns.iter().position(|c| c == variable)
2678 }
2679 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2680 None
2682 }
2683 _ => None,
2684 };
2685
2686 let mut columns = input_columns.clone();
2688 columns.push(unwind.variable.clone());
2689
2690 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2692 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2697
2698 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2699 input_op,
2700 col_idx,
2701 unwind.variable.clone(),
2702 output_schema,
2703 ));
2704
2705 Ok((operator, columns))
2706 }
2707
2708 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2710 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2712 Vec::new()
2713 } else {
2714 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2715 cols
2716 };
2717
2718 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2720 .match_properties
2721 .iter()
2722 .filter_map(|(name, expr)| {
2723 if let LogicalExpression::Literal(v) = expr {
2724 Some((name.clone(), v.clone()))
2725 } else {
2726 None }
2728 })
2729 .collect();
2730
2731 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2733 .on_create
2734 .iter()
2735 .filter_map(|(name, expr)| {
2736 if let LogicalExpression::Literal(v) = expr {
2737 Some((name.clone(), v.clone()))
2738 } else {
2739 None
2740 }
2741 })
2742 .collect();
2743
2744 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2746 .on_match
2747 .iter()
2748 .filter_map(|(name, expr)| {
2749 if let LogicalExpression::Literal(v) = expr {
2750 Some((name.clone(), v.clone()))
2751 } else {
2752 None
2753 }
2754 })
2755 .collect();
2756
2757 columns.push(merge.variable.clone());
2759
2760 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2761 Arc::clone(&self.store),
2762 merge.variable.clone(),
2763 merge.labels.clone(),
2764 match_properties,
2765 on_create_properties,
2766 on_match_properties,
2767 ));
2768
2769 Ok((operator, columns))
2770 }
2771
2772 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2774 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2776
2777 let source_column = columns
2779 .iter()
2780 .position(|c| c == &sp.source_var)
2781 .ok_or_else(|| {
2782 Error::Internal(format!(
2783 "Source variable '{}' not found for shortestPath",
2784 sp.source_var
2785 ))
2786 })?;
2787
2788 let target_column = columns
2789 .iter()
2790 .position(|c| c == &sp.target_var)
2791 .ok_or_else(|| {
2792 Error::Internal(format!(
2793 "Target variable '{}' not found for shortestPath",
2794 sp.target_var
2795 ))
2796 })?;
2797
2798 let direction = match sp.direction {
2800 ExpandDirection::Outgoing => Direction::Outgoing,
2801 ExpandDirection::Incoming => Direction::Incoming,
2802 ExpandDirection::Both => Direction::Both,
2803 };
2804
2805 let operator: Box<dyn Operator> = Box::new(
2807 ShortestPathOperator::new(
2808 Arc::clone(&self.store),
2809 input_op,
2810 source_column,
2811 target_column,
2812 sp.edge_type.clone(),
2813 direction,
2814 )
2815 .with_all_paths(sp.all_paths),
2816 );
2817
2818 columns.push(format!("_path_length_{}", sp.path_alias));
2821
2822 Ok((operator, columns))
2823 }
2824
2825 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2827 let (input_op, columns) = self.plan_operator(&add_label.input)?;
2828
2829 let node_column = columns
2831 .iter()
2832 .position(|c| c == &add_label.variable)
2833 .ok_or_else(|| {
2834 Error::Internal(format!(
2835 "Variable '{}' not found for ADD LABEL",
2836 add_label.variable
2837 ))
2838 })?;
2839
2840 let output_schema = vec![LogicalType::Int64];
2842 let output_columns = vec!["labels_added".to_string()];
2843
2844 let operator = Box::new(AddLabelOperator::new(
2845 Arc::clone(&self.store),
2846 input_op,
2847 node_column,
2848 add_label.labels.clone(),
2849 output_schema,
2850 ));
2851
2852 Ok((operator, output_columns))
2853 }
2854
2855 fn plan_remove_label(
2857 &self,
2858 remove_label: &RemoveLabelOp,
2859 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2860 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2861
2862 let node_column = columns
2864 .iter()
2865 .position(|c| c == &remove_label.variable)
2866 .ok_or_else(|| {
2867 Error::Internal(format!(
2868 "Variable '{}' not found for REMOVE LABEL",
2869 remove_label.variable
2870 ))
2871 })?;
2872
2873 let output_schema = vec![LogicalType::Int64];
2875 let output_columns = vec!["labels_removed".to_string()];
2876
2877 let operator = Box::new(RemoveLabelOperator::new(
2878 Arc::clone(&self.store),
2879 input_op,
2880 node_column,
2881 remove_label.labels.clone(),
2882 output_schema,
2883 ));
2884
2885 Ok((operator, output_columns))
2886 }
2887
2888 fn plan_set_property(
2890 &self,
2891 set_prop: &SetPropertyOp,
2892 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2893 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2894
2895 let entity_column = columns
2897 .iter()
2898 .position(|c| c == &set_prop.variable)
2899 .ok_or_else(|| {
2900 Error::Internal(format!(
2901 "Variable '{}' not found for SET",
2902 set_prop.variable
2903 ))
2904 })?;
2905
2906 let properties: Vec<(String, PropertySource)> = set_prop
2908 .properties
2909 .iter()
2910 .map(|(name, expr)| {
2911 let source = self.expression_to_property_source(expr, &columns)?;
2912 Ok((name.clone(), source))
2913 })
2914 .collect::<Result<Vec<_>>>()?;
2915
2916 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2918 let output_columns = columns.clone();
2919
2920 let operator = Box::new(SetPropertyOperator::new_for_node(
2922 Arc::clone(&self.store),
2923 input_op,
2924 entity_column,
2925 properties,
2926 output_schema,
2927 ));
2928
2929 Ok((operator, output_columns))
2930 }
2931
2932 fn expression_to_property_source(
2934 &self,
2935 expr: &LogicalExpression,
2936 columns: &[String],
2937 ) -> Result<PropertySource> {
2938 match expr {
2939 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2940 LogicalExpression::Variable(name) => {
2941 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2942 Error::Internal(format!("Variable '{}' not found for property source", name))
2943 })?;
2944 Ok(PropertySource::Column(col_idx))
2945 }
2946 LogicalExpression::Parameter(name) => {
2947 Ok(PropertySource::Constant(
2950 grafeo_common::types::Value::String(format!("${}", name).into()),
2951 ))
2952 }
2953 _ => {
2954 if let Some(value) = Self::try_fold_expression(expr) {
2955 Ok(PropertySource::Constant(value))
2956 } else {
2957 Err(Error::Internal(format!(
2958 "Unsupported expression type for property source: {:?}",
2959 expr
2960 )))
2961 }
2962 }
2963 }
2964 }
2965
2966 fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
2972 match expr {
2973 LogicalExpression::Literal(v) => Some(v.clone()),
2974 LogicalExpression::List(items) => {
2975 let values: Option<Vec<Value>> =
2976 items.iter().map(Self::try_fold_expression).collect();
2977 let values = values?;
2978 let all_numeric = !values.is_empty()
2980 && values
2981 .iter()
2982 .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
2983 if all_numeric {
2984 let floats: Vec<f32> = values
2985 .iter()
2986 .filter_map(|v| match v {
2987 Value::Float64(f) => Some(*f as f32),
2988 Value::Int64(i) => Some(*i as f32),
2989 _ => None,
2990 })
2991 .collect();
2992 Some(Value::Vector(floats.into()))
2993 } else {
2994 Some(Value::List(values.into()))
2995 }
2996 }
2997 LogicalExpression::FunctionCall { name, args, .. } => {
2998 match name.to_lowercase().as_str() {
2999 "vector" => {
3000 if args.len() != 1 {
3001 return None;
3002 }
3003 let val = Self::try_fold_expression(&args[0])?;
3004 match val {
3005 Value::List(items) => {
3006 let floats: Vec<f32> = items
3007 .iter()
3008 .filter_map(|v| match v {
3009 Value::Float64(f) => Some(*f as f32),
3010 Value::Int64(i) => Some(*i as f32),
3011 _ => None,
3012 })
3013 .collect();
3014 if floats.len() == items.len() {
3015 Some(Value::Vector(floats.into()))
3016 } else {
3017 None
3018 }
3019 }
3020 Value::Vector(v) => Some(Value::Vector(v)),
3022 _ => None,
3023 }
3024 }
3025 _ => None,
3026 }
3027 }
3028 _ => None,
3029 }
3030 }
3031}
3032
3033pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3035 match op {
3036 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3037 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3038 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3039 BinaryOp::Le => Ok(BinaryFilterOp::Le),
3040 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3041 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3042 BinaryOp::And => Ok(BinaryFilterOp::And),
3043 BinaryOp::Or => Ok(BinaryFilterOp::Or),
3044 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3045 BinaryOp::Add => Ok(BinaryFilterOp::Add),
3046 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3047 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3048 BinaryOp::Div => Ok(BinaryFilterOp::Div),
3049 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3050 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3051 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3052 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3053 BinaryOp::In => Ok(BinaryFilterOp::In),
3054 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3055 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3056 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3057 "Binary operator {:?} not yet supported in filters",
3058 op
3059 ))),
3060 }
3061}
3062
3063pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3065 match op {
3066 UnaryOp::Not => Ok(UnaryFilterOp::Not),
3067 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3068 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3069 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3070 }
3071}
3072
3073pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3075 match func {
3076 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3077 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3078 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3079 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3080 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3081 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3082 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3083 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3084 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3085 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3086 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3087 }
3088}
3089
3090pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3094 match expr {
3095 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3096 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3097 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3098 variable: variable.clone(),
3099 property: property.clone(),
3100 }),
3101 LogicalExpression::Binary { left, op, right } => {
3102 let left_expr = convert_filter_expression(left)?;
3103 let right_expr = convert_filter_expression(right)?;
3104 let filter_op = convert_binary_op(*op)?;
3105 Ok(FilterExpression::Binary {
3106 left: Box::new(left_expr),
3107 op: filter_op,
3108 right: Box::new(right_expr),
3109 })
3110 }
3111 LogicalExpression::Unary { op, operand } => {
3112 let operand_expr = convert_filter_expression(operand)?;
3113 let filter_op = convert_unary_op(*op)?;
3114 Ok(FilterExpression::Unary {
3115 op: filter_op,
3116 operand: Box::new(operand_expr),
3117 })
3118 }
3119 LogicalExpression::FunctionCall { name, args, .. } => {
3120 let filter_args: Vec<FilterExpression> = args
3121 .iter()
3122 .map(convert_filter_expression)
3123 .collect::<Result<Vec<_>>>()?;
3124 Ok(FilterExpression::FunctionCall {
3125 name: name.clone(),
3126 args: filter_args,
3127 })
3128 }
3129 LogicalExpression::Case {
3130 operand,
3131 when_clauses,
3132 else_clause,
3133 } => {
3134 let filter_operand = operand
3135 .as_ref()
3136 .map(|e| convert_filter_expression(e))
3137 .transpose()?
3138 .map(Box::new);
3139 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3140 .iter()
3141 .map(|(cond, result)| {
3142 Ok((
3143 convert_filter_expression(cond)?,
3144 convert_filter_expression(result)?,
3145 ))
3146 })
3147 .collect::<Result<Vec<_>>>()?;
3148 let filter_else = else_clause
3149 .as_ref()
3150 .map(|e| convert_filter_expression(e))
3151 .transpose()?
3152 .map(Box::new);
3153 Ok(FilterExpression::Case {
3154 operand: filter_operand,
3155 when_clauses: filter_when_clauses,
3156 else_clause: filter_else,
3157 })
3158 }
3159 LogicalExpression::List(items) => {
3160 let filter_items: Vec<FilterExpression> = items
3161 .iter()
3162 .map(convert_filter_expression)
3163 .collect::<Result<Vec<_>>>()?;
3164 Ok(FilterExpression::List(filter_items))
3165 }
3166 LogicalExpression::Map(pairs) => {
3167 let filter_pairs: Vec<(String, FilterExpression)> = pairs
3168 .iter()
3169 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3170 .collect::<Result<Vec<_>>>()?;
3171 Ok(FilterExpression::Map(filter_pairs))
3172 }
3173 LogicalExpression::IndexAccess { base, index } => {
3174 let base_expr = convert_filter_expression(base)?;
3175 let index_expr = convert_filter_expression(index)?;
3176 Ok(FilterExpression::IndexAccess {
3177 base: Box::new(base_expr),
3178 index: Box::new(index_expr),
3179 })
3180 }
3181 LogicalExpression::SliceAccess { base, start, end } => {
3182 let base_expr = convert_filter_expression(base)?;
3183 let start_expr = start
3184 .as_ref()
3185 .map(|s| convert_filter_expression(s))
3186 .transpose()?
3187 .map(Box::new);
3188 let end_expr = end
3189 .as_ref()
3190 .map(|e| convert_filter_expression(e))
3191 .transpose()?
3192 .map(Box::new);
3193 Ok(FilterExpression::SliceAccess {
3194 base: Box::new(base_expr),
3195 start: start_expr,
3196 end: end_expr,
3197 })
3198 }
3199 LogicalExpression::Parameter(_) => Err(Error::Internal(
3200 "Parameters not yet supported in filters".to_string(),
3201 )),
3202 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3203 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3204 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3205 LogicalExpression::ListComprehension {
3206 variable,
3207 list_expr,
3208 filter_expr,
3209 map_expr,
3210 } => {
3211 let list = convert_filter_expression(list_expr)?;
3212 let filter = filter_expr
3213 .as_ref()
3214 .map(|f| convert_filter_expression(f))
3215 .transpose()?
3216 .map(Box::new);
3217 let map = convert_filter_expression(map_expr)?;
3218 Ok(FilterExpression::ListComprehension {
3219 variable: variable.clone(),
3220 list_expr: Box::new(list),
3221 filter_expr: filter,
3222 map_expr: Box::new(map),
3223 })
3224 }
3225 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3226 Error::Internal("Subqueries not yet supported in filters".to_string()),
3227 ),
3228 }
3229}
3230
3231fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3233 use grafeo_common::types::Value;
3234 match value {
3235 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
3237 Value::Int64(_) => LogicalType::Int64,
3238 Value::Float64(_) => LogicalType::Float64,
3239 Value::String(_) => LogicalType::String,
3240 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
3242 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
3245 }
3246}
3247
3248fn expression_to_string(expr: &LogicalExpression) -> String {
3250 match expr {
3251 LogicalExpression::Variable(name) => name.clone(),
3252 LogicalExpression::Property { variable, property } => {
3253 format!("{variable}.{property}")
3254 }
3255 LogicalExpression::Literal(value) => format!("{value:?}"),
3256 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3257 _ => "expr".to_string(),
3258 }
3259}
3260
3261pub struct PhysicalPlan {
3263 pub operator: Box<dyn Operator>,
3265 pub columns: Vec<String>,
3267 pub adaptive_context: Option<AdaptiveContext>,
3273}
3274
3275impl PhysicalPlan {
3276 #[must_use]
3278 pub fn columns(&self) -> &[String] {
3279 &self.columns
3280 }
3281
3282 pub fn into_operator(self) -> Box<dyn Operator> {
3284 self.operator
3285 }
3286
3287 #[must_use]
3289 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3290 self.adaptive_context.as_ref()
3291 }
3292
3293 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3295 self.adaptive_context.take()
3296 }
3297}
3298
3299#[allow(dead_code)]
3303struct SingleResultOperator {
3304 result: Option<grafeo_core::execution::DataChunk>,
3305}
3306
3307impl SingleResultOperator {
3308 #[allow(dead_code)]
3309 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3310 Self { result }
3311 }
3312}
3313
3314impl Operator for SingleResultOperator {
3315 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3316 Ok(self.result.take())
3317 }
3318
3319 fn reset(&mut self) {
3320 }
3322
3323 fn name(&self) -> &'static str {
3324 "SingleResult"
3325 }
3326}
3327
3328#[cfg(test)]
3329mod tests {
3330 use super::*;
3331 use crate::query::plan::{
3332 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3333 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3334 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3335 SortKey, SortOp,
3336 };
3337 use grafeo_common::types::Value;
3338
3339 fn create_test_store() -> Arc<LpgStore> {
3340 let store = Arc::new(LpgStore::new());
3341 store.create_node(&["Person"]);
3342 store.create_node(&["Person"]);
3343 store.create_node(&["Company"]);
3344 store
3345 }
3346
3347 #[test]
3350 fn test_plan_simple_scan() {
3351 let store = create_test_store();
3352 let planner = Planner::new(store);
3353
3354 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3356 items: vec![ReturnItem {
3357 expression: LogicalExpression::Variable("n".to_string()),
3358 alias: None,
3359 }],
3360 distinct: false,
3361 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3362 variable: "n".to_string(),
3363 label: Some("Person".to_string()),
3364 input: None,
3365 })),
3366 }));
3367
3368 let physical = planner.plan(&logical).unwrap();
3369 assert_eq!(physical.columns(), &["n"]);
3370 }
3371
3372 #[test]
3373 fn test_plan_scan_without_label() {
3374 let store = create_test_store();
3375 let planner = Planner::new(store);
3376
3377 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3379 items: vec![ReturnItem {
3380 expression: LogicalExpression::Variable("n".to_string()),
3381 alias: None,
3382 }],
3383 distinct: false,
3384 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3385 variable: "n".to_string(),
3386 label: None,
3387 input: None,
3388 })),
3389 }));
3390
3391 let physical = planner.plan(&logical).unwrap();
3392 assert_eq!(physical.columns(), &["n"]);
3393 }
3394
3395 #[test]
3396 fn test_plan_return_with_alias() {
3397 let store = create_test_store();
3398 let planner = Planner::new(store);
3399
3400 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3402 items: vec![ReturnItem {
3403 expression: LogicalExpression::Variable("n".to_string()),
3404 alias: Some("person".to_string()),
3405 }],
3406 distinct: false,
3407 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3408 variable: "n".to_string(),
3409 label: Some("Person".to_string()),
3410 input: None,
3411 })),
3412 }));
3413
3414 let physical = planner.plan(&logical).unwrap();
3415 assert_eq!(physical.columns(), &["person"]);
3416 }
3417
3418 #[test]
3419 fn test_plan_return_property() {
3420 let store = create_test_store();
3421 let planner = Planner::new(store);
3422
3423 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3425 items: vec![ReturnItem {
3426 expression: LogicalExpression::Property {
3427 variable: "n".to_string(),
3428 property: "name".to_string(),
3429 },
3430 alias: None,
3431 }],
3432 distinct: false,
3433 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3434 variable: "n".to_string(),
3435 label: Some("Person".to_string()),
3436 input: None,
3437 })),
3438 }));
3439
3440 let physical = planner.plan(&logical).unwrap();
3441 assert_eq!(physical.columns(), &["n.name"]);
3442 }
3443
3444 #[test]
3445 fn test_plan_return_literal() {
3446 let store = create_test_store();
3447 let planner = Planner::new(store);
3448
3449 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3451 items: vec![ReturnItem {
3452 expression: LogicalExpression::Literal(Value::Int64(42)),
3453 alias: Some("answer".to_string()),
3454 }],
3455 distinct: false,
3456 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3457 variable: "n".to_string(),
3458 label: None,
3459 input: None,
3460 })),
3461 }));
3462
3463 let physical = planner.plan(&logical).unwrap();
3464 assert_eq!(physical.columns(), &["answer"]);
3465 }
3466
3467 #[test]
3470 fn test_plan_filter_equality() {
3471 let store = create_test_store();
3472 let planner = Planner::new(store);
3473
3474 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3476 items: vec![ReturnItem {
3477 expression: LogicalExpression::Variable("n".to_string()),
3478 alias: None,
3479 }],
3480 distinct: false,
3481 input: Box::new(LogicalOperator::Filter(FilterOp {
3482 predicate: LogicalExpression::Binary {
3483 left: Box::new(LogicalExpression::Property {
3484 variable: "n".to_string(),
3485 property: "age".to_string(),
3486 }),
3487 op: BinaryOp::Eq,
3488 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3489 },
3490 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3491 variable: "n".to_string(),
3492 label: Some("Person".to_string()),
3493 input: None,
3494 })),
3495 })),
3496 }));
3497
3498 let physical = planner.plan(&logical).unwrap();
3499 assert_eq!(physical.columns(), &["n"]);
3500 }
3501
3502 #[test]
3503 fn test_plan_filter_compound_and() {
3504 let store = create_test_store();
3505 let planner = Planner::new(store);
3506
3507 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3509 items: vec![ReturnItem {
3510 expression: LogicalExpression::Variable("n".to_string()),
3511 alias: None,
3512 }],
3513 distinct: false,
3514 input: Box::new(LogicalOperator::Filter(FilterOp {
3515 predicate: LogicalExpression::Binary {
3516 left: Box::new(LogicalExpression::Binary {
3517 left: Box::new(LogicalExpression::Property {
3518 variable: "n".to_string(),
3519 property: "age".to_string(),
3520 }),
3521 op: BinaryOp::Gt,
3522 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3523 }),
3524 op: BinaryOp::And,
3525 right: Box::new(LogicalExpression::Binary {
3526 left: Box::new(LogicalExpression::Property {
3527 variable: "n".to_string(),
3528 property: "age".to_string(),
3529 }),
3530 op: BinaryOp::Lt,
3531 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3532 }),
3533 },
3534 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3535 variable: "n".to_string(),
3536 label: None,
3537 input: None,
3538 })),
3539 })),
3540 }));
3541
3542 let physical = planner.plan(&logical).unwrap();
3543 assert_eq!(physical.columns(), &["n"]);
3544 }
3545
3546 #[test]
3547 fn test_plan_filter_unary_not() {
3548 let store = create_test_store();
3549 let planner = Planner::new(store);
3550
3551 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3553 items: vec![ReturnItem {
3554 expression: LogicalExpression::Variable("n".to_string()),
3555 alias: None,
3556 }],
3557 distinct: false,
3558 input: Box::new(LogicalOperator::Filter(FilterOp {
3559 predicate: LogicalExpression::Unary {
3560 op: UnaryOp::Not,
3561 operand: Box::new(LogicalExpression::Property {
3562 variable: "n".to_string(),
3563 property: "active".to_string(),
3564 }),
3565 },
3566 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3567 variable: "n".to_string(),
3568 label: None,
3569 input: None,
3570 })),
3571 })),
3572 }));
3573
3574 let physical = planner.plan(&logical).unwrap();
3575 assert_eq!(physical.columns(), &["n"]);
3576 }
3577
3578 #[test]
3579 fn test_plan_filter_is_null() {
3580 let store = create_test_store();
3581 let planner = Planner::new(store);
3582
3583 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3585 items: vec![ReturnItem {
3586 expression: LogicalExpression::Variable("n".to_string()),
3587 alias: None,
3588 }],
3589 distinct: false,
3590 input: Box::new(LogicalOperator::Filter(FilterOp {
3591 predicate: LogicalExpression::Unary {
3592 op: UnaryOp::IsNull,
3593 operand: Box::new(LogicalExpression::Property {
3594 variable: "n".to_string(),
3595 property: "email".to_string(),
3596 }),
3597 },
3598 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3599 variable: "n".to_string(),
3600 label: None,
3601 input: None,
3602 })),
3603 })),
3604 }));
3605
3606 let physical = planner.plan(&logical).unwrap();
3607 assert_eq!(physical.columns(), &["n"]);
3608 }
3609
3610 #[test]
3611 fn test_plan_filter_function_call() {
3612 let store = create_test_store();
3613 let planner = Planner::new(store);
3614
3615 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3617 items: vec![ReturnItem {
3618 expression: LogicalExpression::Variable("n".to_string()),
3619 alias: None,
3620 }],
3621 distinct: false,
3622 input: Box::new(LogicalOperator::Filter(FilterOp {
3623 predicate: LogicalExpression::Binary {
3624 left: Box::new(LogicalExpression::FunctionCall {
3625 name: "size".to_string(),
3626 args: vec![LogicalExpression::Property {
3627 variable: "n".to_string(),
3628 property: "friends".to_string(),
3629 }],
3630 distinct: false,
3631 }),
3632 op: BinaryOp::Gt,
3633 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3634 },
3635 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3636 variable: "n".to_string(),
3637 label: None,
3638 input: None,
3639 })),
3640 })),
3641 }));
3642
3643 let physical = planner.plan(&logical).unwrap();
3644 assert_eq!(physical.columns(), &["n"]);
3645 }
3646
3647 #[test]
3650 fn test_plan_expand_outgoing() {
3651 let store = create_test_store();
3652 let planner = Planner::new(store);
3653
3654 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3656 items: vec![
3657 ReturnItem {
3658 expression: LogicalExpression::Variable("a".to_string()),
3659 alias: None,
3660 },
3661 ReturnItem {
3662 expression: LogicalExpression::Variable("b".to_string()),
3663 alias: None,
3664 },
3665 ],
3666 distinct: false,
3667 input: Box::new(LogicalOperator::Expand(ExpandOp {
3668 from_variable: "a".to_string(),
3669 to_variable: "b".to_string(),
3670 edge_variable: None,
3671 direction: ExpandDirection::Outgoing,
3672 edge_type: Some("KNOWS".to_string()),
3673 min_hops: 1,
3674 max_hops: Some(1),
3675 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3676 variable: "a".to_string(),
3677 label: Some("Person".to_string()),
3678 input: None,
3679 })),
3680 path_alias: None,
3681 })),
3682 }));
3683
3684 let physical = planner.plan(&logical).unwrap();
3685 assert!(physical.columns().contains(&"a".to_string()));
3687 assert!(physical.columns().contains(&"b".to_string()));
3688 }
3689
3690 #[test]
3691 fn test_plan_expand_with_edge_variable() {
3692 let store = create_test_store();
3693 let planner = Planner::new(store);
3694
3695 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3697 items: vec![
3698 ReturnItem {
3699 expression: LogicalExpression::Variable("a".to_string()),
3700 alias: None,
3701 },
3702 ReturnItem {
3703 expression: LogicalExpression::Variable("r".to_string()),
3704 alias: None,
3705 },
3706 ReturnItem {
3707 expression: LogicalExpression::Variable("b".to_string()),
3708 alias: None,
3709 },
3710 ],
3711 distinct: false,
3712 input: Box::new(LogicalOperator::Expand(ExpandOp {
3713 from_variable: "a".to_string(),
3714 to_variable: "b".to_string(),
3715 edge_variable: Some("r".to_string()),
3716 direction: ExpandDirection::Outgoing,
3717 edge_type: Some("KNOWS".to_string()),
3718 min_hops: 1,
3719 max_hops: Some(1),
3720 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3721 variable: "a".to_string(),
3722 label: None,
3723 input: None,
3724 })),
3725 path_alias: None,
3726 })),
3727 }));
3728
3729 let physical = planner.plan(&logical).unwrap();
3730 assert!(physical.columns().contains(&"a".to_string()));
3731 assert!(physical.columns().contains(&"r".to_string()));
3732 assert!(physical.columns().contains(&"b".to_string()));
3733 }
3734
3735 #[test]
3738 fn test_plan_limit() {
3739 let store = create_test_store();
3740 let planner = Planner::new(store);
3741
3742 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3744 items: vec![ReturnItem {
3745 expression: LogicalExpression::Variable("n".to_string()),
3746 alias: None,
3747 }],
3748 distinct: false,
3749 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3750 count: 10,
3751 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3752 variable: "n".to_string(),
3753 label: None,
3754 input: None,
3755 })),
3756 })),
3757 }));
3758
3759 let physical = planner.plan(&logical).unwrap();
3760 assert_eq!(physical.columns(), &["n"]);
3761 }
3762
3763 #[test]
3764 fn test_plan_skip() {
3765 let store = create_test_store();
3766 let planner = Planner::new(store);
3767
3768 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3770 items: vec![ReturnItem {
3771 expression: LogicalExpression::Variable("n".to_string()),
3772 alias: None,
3773 }],
3774 distinct: false,
3775 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3776 count: 5,
3777 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3778 variable: "n".to_string(),
3779 label: None,
3780 input: None,
3781 })),
3782 })),
3783 }));
3784
3785 let physical = planner.plan(&logical).unwrap();
3786 assert_eq!(physical.columns(), &["n"]);
3787 }
3788
3789 #[test]
3790 fn test_plan_sort() {
3791 let store = create_test_store();
3792 let planner = Planner::new(store);
3793
3794 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3796 items: vec![ReturnItem {
3797 expression: LogicalExpression::Variable("n".to_string()),
3798 alias: None,
3799 }],
3800 distinct: false,
3801 input: Box::new(LogicalOperator::Sort(SortOp {
3802 keys: vec![SortKey {
3803 expression: LogicalExpression::Variable("n".to_string()),
3804 order: SortOrder::Ascending,
3805 }],
3806 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3807 variable: "n".to_string(),
3808 label: None,
3809 input: None,
3810 })),
3811 })),
3812 }));
3813
3814 let physical = planner.plan(&logical).unwrap();
3815 assert_eq!(physical.columns(), &["n"]);
3816 }
3817
3818 #[test]
3819 fn test_plan_sort_descending() {
3820 let store = create_test_store();
3821 let planner = Planner::new(store);
3822
3823 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3825 items: vec![ReturnItem {
3826 expression: LogicalExpression::Variable("n".to_string()),
3827 alias: None,
3828 }],
3829 distinct: false,
3830 input: Box::new(LogicalOperator::Sort(SortOp {
3831 keys: vec![SortKey {
3832 expression: LogicalExpression::Variable("n".to_string()),
3833 order: SortOrder::Descending,
3834 }],
3835 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3836 variable: "n".to_string(),
3837 label: None,
3838 input: None,
3839 })),
3840 })),
3841 }));
3842
3843 let physical = planner.plan(&logical).unwrap();
3844 assert_eq!(physical.columns(), &["n"]);
3845 }
3846
3847 #[test]
3848 fn test_plan_distinct() {
3849 let store = create_test_store();
3850 let planner = Planner::new(store);
3851
3852 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3854 items: vec![ReturnItem {
3855 expression: LogicalExpression::Variable("n".to_string()),
3856 alias: None,
3857 }],
3858 distinct: false,
3859 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3860 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3861 variable: "n".to_string(),
3862 label: None,
3863 input: None,
3864 })),
3865 columns: None,
3866 })),
3867 }));
3868
3869 let physical = planner.plan(&logical).unwrap();
3870 assert_eq!(physical.columns(), &["n"]);
3871 }
3872
3873 #[test]
3876 fn test_plan_aggregate_count() {
3877 let store = create_test_store();
3878 let planner = Planner::new(store);
3879
3880 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3882 items: vec![ReturnItem {
3883 expression: LogicalExpression::Variable("cnt".to_string()),
3884 alias: None,
3885 }],
3886 distinct: false,
3887 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3888 group_by: vec![],
3889 aggregates: vec![LogicalAggregateExpr {
3890 function: LogicalAggregateFunction::Count,
3891 expression: Some(LogicalExpression::Variable("n".to_string())),
3892 distinct: false,
3893 alias: Some("cnt".to_string()),
3894 percentile: None,
3895 }],
3896 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3897 variable: "n".to_string(),
3898 label: None,
3899 input: None,
3900 })),
3901 having: None,
3902 })),
3903 }));
3904
3905 let physical = planner.plan(&logical).unwrap();
3906 assert!(physical.columns().contains(&"cnt".to_string()));
3907 }
3908
3909 #[test]
3910 fn test_plan_aggregate_with_group_by() {
3911 let store = create_test_store();
3912 let planner = Planner::new(store);
3913
3914 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3916 group_by: vec![LogicalExpression::Property {
3917 variable: "n".to_string(),
3918 property: "city".to_string(),
3919 }],
3920 aggregates: vec![LogicalAggregateExpr {
3921 function: LogicalAggregateFunction::Count,
3922 expression: Some(LogicalExpression::Variable("n".to_string())),
3923 distinct: false,
3924 alias: Some("cnt".to_string()),
3925 percentile: None,
3926 }],
3927 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3928 variable: "n".to_string(),
3929 label: Some("Person".to_string()),
3930 input: None,
3931 })),
3932 having: None,
3933 }));
3934
3935 let physical = planner.plan(&logical).unwrap();
3936 assert_eq!(physical.columns().len(), 2);
3937 }
3938
3939 #[test]
3940 fn test_plan_aggregate_sum() {
3941 let store = create_test_store();
3942 let planner = Planner::new(store);
3943
3944 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3946 group_by: vec![],
3947 aggregates: vec![LogicalAggregateExpr {
3948 function: LogicalAggregateFunction::Sum,
3949 expression: Some(LogicalExpression::Property {
3950 variable: "n".to_string(),
3951 property: "value".to_string(),
3952 }),
3953 distinct: false,
3954 alias: Some("total".to_string()),
3955 percentile: None,
3956 }],
3957 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3958 variable: "n".to_string(),
3959 label: None,
3960 input: None,
3961 })),
3962 having: None,
3963 }));
3964
3965 let physical = planner.plan(&logical).unwrap();
3966 assert!(physical.columns().contains(&"total".to_string()));
3967 }
3968
3969 #[test]
3970 fn test_plan_aggregate_avg() {
3971 let store = create_test_store();
3972 let planner = Planner::new(store);
3973
3974 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3976 group_by: vec![],
3977 aggregates: vec![LogicalAggregateExpr {
3978 function: LogicalAggregateFunction::Avg,
3979 expression: Some(LogicalExpression::Property {
3980 variable: "n".to_string(),
3981 property: "score".to_string(),
3982 }),
3983 distinct: false,
3984 alias: Some("average".to_string()),
3985 percentile: None,
3986 }],
3987 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3988 variable: "n".to_string(),
3989 label: None,
3990 input: None,
3991 })),
3992 having: None,
3993 }));
3994
3995 let physical = planner.plan(&logical).unwrap();
3996 assert!(physical.columns().contains(&"average".to_string()));
3997 }
3998
3999 #[test]
4000 fn test_plan_aggregate_min_max() {
4001 let store = create_test_store();
4002 let planner = Planner::new(store);
4003
4004 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4006 group_by: vec![],
4007 aggregates: vec![
4008 LogicalAggregateExpr {
4009 function: LogicalAggregateFunction::Min,
4010 expression: Some(LogicalExpression::Property {
4011 variable: "n".to_string(),
4012 property: "age".to_string(),
4013 }),
4014 distinct: false,
4015 alias: Some("youngest".to_string()),
4016 percentile: None,
4017 },
4018 LogicalAggregateExpr {
4019 function: LogicalAggregateFunction::Max,
4020 expression: Some(LogicalExpression::Property {
4021 variable: "n".to_string(),
4022 property: "age".to_string(),
4023 }),
4024 distinct: false,
4025 alias: Some("oldest".to_string()),
4026 percentile: None,
4027 },
4028 ],
4029 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4030 variable: "n".to_string(),
4031 label: None,
4032 input: None,
4033 })),
4034 having: None,
4035 }));
4036
4037 let physical = planner.plan(&logical).unwrap();
4038 assert!(physical.columns().contains(&"youngest".to_string()));
4039 assert!(physical.columns().contains(&"oldest".to_string()));
4040 }
4041
4042 #[test]
4045 fn test_plan_inner_join() {
4046 let store = create_test_store();
4047 let planner = Planner::new(store);
4048
4049 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4051 items: vec![
4052 ReturnItem {
4053 expression: LogicalExpression::Variable("a".to_string()),
4054 alias: None,
4055 },
4056 ReturnItem {
4057 expression: LogicalExpression::Variable("b".to_string()),
4058 alias: None,
4059 },
4060 ],
4061 distinct: false,
4062 input: Box::new(LogicalOperator::Join(JoinOp {
4063 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4064 variable: "a".to_string(),
4065 label: Some("Person".to_string()),
4066 input: None,
4067 })),
4068 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4069 variable: "b".to_string(),
4070 label: Some("Company".to_string()),
4071 input: None,
4072 })),
4073 join_type: JoinType::Inner,
4074 conditions: vec![JoinCondition {
4075 left: LogicalExpression::Variable("a".to_string()),
4076 right: LogicalExpression::Variable("b".to_string()),
4077 }],
4078 })),
4079 }));
4080
4081 let physical = planner.plan(&logical).unwrap();
4082 assert!(physical.columns().contains(&"a".to_string()));
4083 assert!(physical.columns().contains(&"b".to_string()));
4084 }
4085
4086 #[test]
4087 fn test_plan_cross_join() {
4088 let store = create_test_store();
4089 let planner = Planner::new(store);
4090
4091 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4093 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4094 variable: "a".to_string(),
4095 label: None,
4096 input: None,
4097 })),
4098 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4099 variable: "b".to_string(),
4100 label: None,
4101 input: None,
4102 })),
4103 join_type: JoinType::Cross,
4104 conditions: vec![],
4105 }));
4106
4107 let physical = planner.plan(&logical).unwrap();
4108 assert_eq!(physical.columns().len(), 2);
4109 }
4110
4111 #[test]
4112 fn test_plan_left_join() {
4113 let store = create_test_store();
4114 let planner = Planner::new(store);
4115
4116 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4117 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4118 variable: "a".to_string(),
4119 label: None,
4120 input: None,
4121 })),
4122 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4123 variable: "b".to_string(),
4124 label: None,
4125 input: None,
4126 })),
4127 join_type: JoinType::Left,
4128 conditions: vec![],
4129 }));
4130
4131 let physical = planner.plan(&logical).unwrap();
4132 assert_eq!(physical.columns().len(), 2);
4133 }
4134
4135 #[test]
4138 fn test_plan_create_node() {
4139 let store = create_test_store();
4140 let planner = Planner::new(store);
4141
4142 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4144 variable: "n".to_string(),
4145 labels: vec!["Person".to_string()],
4146 properties: vec![(
4147 "name".to_string(),
4148 LogicalExpression::Literal(Value::String("Alice".into())),
4149 )],
4150 input: None,
4151 }));
4152
4153 let physical = planner.plan(&logical).unwrap();
4154 assert!(physical.columns().contains(&"n".to_string()));
4155 }
4156
4157 #[test]
4158 fn test_plan_create_edge() {
4159 let store = create_test_store();
4160 let planner = Planner::new(store);
4161
4162 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4164 variable: Some("r".to_string()),
4165 from_variable: "a".to_string(),
4166 to_variable: "b".to_string(),
4167 edge_type: "KNOWS".to_string(),
4168 properties: vec![],
4169 input: Box::new(LogicalOperator::Join(JoinOp {
4170 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4171 variable: "a".to_string(),
4172 label: None,
4173 input: None,
4174 })),
4175 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4176 variable: "b".to_string(),
4177 label: None,
4178 input: None,
4179 })),
4180 join_type: JoinType::Cross,
4181 conditions: vec![],
4182 })),
4183 }));
4184
4185 let physical = planner.plan(&logical).unwrap();
4186 assert!(physical.columns().contains(&"r".to_string()));
4187 }
4188
4189 #[test]
4190 fn test_plan_delete_node() {
4191 let store = create_test_store();
4192 let planner = Planner::new(store);
4193
4194 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4196 variable: "n".to_string(),
4197 detach: false,
4198 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4199 variable: "n".to_string(),
4200 label: None,
4201 input: None,
4202 })),
4203 }));
4204
4205 let physical = planner.plan(&logical).unwrap();
4206 assert!(physical.columns().contains(&"deleted_count".to_string()));
4207 }
4208
4209 #[test]
4212 fn test_plan_empty_errors() {
4213 let store = create_test_store();
4214 let planner = Planner::new(store);
4215
4216 let logical = LogicalPlan::new(LogicalOperator::Empty);
4217 let result = planner.plan(&logical);
4218 assert!(result.is_err());
4219 }
4220
4221 #[test]
4222 fn test_plan_missing_variable_in_return() {
4223 let store = create_test_store();
4224 let planner = Planner::new(store);
4225
4226 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4228 items: vec![ReturnItem {
4229 expression: LogicalExpression::Variable("missing".to_string()),
4230 alias: None,
4231 }],
4232 distinct: false,
4233 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4234 variable: "n".to_string(),
4235 label: None,
4236 input: None,
4237 })),
4238 }));
4239
4240 let result = planner.plan(&logical);
4241 assert!(result.is_err());
4242 }
4243
4244 #[test]
4247 fn test_convert_binary_ops() {
4248 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4249 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4250 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4251 assert!(convert_binary_op(BinaryOp::Le).is_ok());
4252 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4253 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4254 assert!(convert_binary_op(BinaryOp::And).is_ok());
4255 assert!(convert_binary_op(BinaryOp::Or).is_ok());
4256 assert!(convert_binary_op(BinaryOp::Add).is_ok());
4257 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4258 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4259 assert!(convert_binary_op(BinaryOp::Div).is_ok());
4260 }
4261
4262 #[test]
4263 fn test_convert_unary_ops() {
4264 assert!(convert_unary_op(UnaryOp::Not).is_ok());
4265 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4266 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4267 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4268 }
4269
4270 #[test]
4271 fn test_convert_aggregate_functions() {
4272 assert!(matches!(
4273 convert_aggregate_function(LogicalAggregateFunction::Count),
4274 PhysicalAggregateFunction::Count
4275 ));
4276 assert!(matches!(
4277 convert_aggregate_function(LogicalAggregateFunction::Sum),
4278 PhysicalAggregateFunction::Sum
4279 ));
4280 assert!(matches!(
4281 convert_aggregate_function(LogicalAggregateFunction::Avg),
4282 PhysicalAggregateFunction::Avg
4283 ));
4284 assert!(matches!(
4285 convert_aggregate_function(LogicalAggregateFunction::Min),
4286 PhysicalAggregateFunction::Min
4287 ));
4288 assert!(matches!(
4289 convert_aggregate_function(LogicalAggregateFunction::Max),
4290 PhysicalAggregateFunction::Max
4291 ));
4292 }
4293
4294 #[test]
4295 fn test_planner_accessors() {
4296 let store = create_test_store();
4297 let planner = Planner::new(Arc::clone(&store));
4298
4299 assert!(planner.tx_id().is_none());
4300 assert!(planner.tx_manager().is_none());
4301 let _ = planner.viewing_epoch(); }
4303
4304 #[test]
4305 fn test_physical_plan_accessors() {
4306 let store = create_test_store();
4307 let planner = Planner::new(store);
4308
4309 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4310 variable: "n".to_string(),
4311 label: None,
4312 input: None,
4313 }));
4314
4315 let physical = planner.plan(&logical).unwrap();
4316 assert_eq!(physical.columns(), &["n"]);
4317
4318 let _ = physical.into_operator();
4320 }
4321
4322 #[test]
4325 fn test_plan_adaptive_with_scan() {
4326 let store = create_test_store();
4327 let planner = Planner::new(store);
4328
4329 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4331 items: vec![ReturnItem {
4332 expression: LogicalExpression::Variable("n".to_string()),
4333 alias: None,
4334 }],
4335 distinct: false,
4336 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4337 variable: "n".to_string(),
4338 label: Some("Person".to_string()),
4339 input: None,
4340 })),
4341 }));
4342
4343 let physical = planner.plan_adaptive(&logical).unwrap();
4344 assert_eq!(physical.columns(), &["n"]);
4345 assert!(physical.adaptive_context.is_some());
4347 }
4348
4349 #[test]
4350 fn test_plan_adaptive_with_filter() {
4351 let store = create_test_store();
4352 let planner = Planner::new(store);
4353
4354 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4356 items: vec![ReturnItem {
4357 expression: LogicalExpression::Variable("n".to_string()),
4358 alias: None,
4359 }],
4360 distinct: false,
4361 input: Box::new(LogicalOperator::Filter(FilterOp {
4362 predicate: LogicalExpression::Binary {
4363 left: Box::new(LogicalExpression::Property {
4364 variable: "n".to_string(),
4365 property: "age".to_string(),
4366 }),
4367 op: BinaryOp::Gt,
4368 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4369 },
4370 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4371 variable: "n".to_string(),
4372 label: None,
4373 input: None,
4374 })),
4375 })),
4376 }));
4377
4378 let physical = planner.plan_adaptive(&logical).unwrap();
4379 assert!(physical.adaptive_context.is_some());
4380 }
4381
4382 #[test]
4383 fn test_plan_adaptive_with_expand() {
4384 let store = create_test_store();
4385 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4386
4387 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4389 items: vec![
4390 ReturnItem {
4391 expression: LogicalExpression::Variable("a".to_string()),
4392 alias: None,
4393 },
4394 ReturnItem {
4395 expression: LogicalExpression::Variable("b".to_string()),
4396 alias: None,
4397 },
4398 ],
4399 distinct: false,
4400 input: Box::new(LogicalOperator::Expand(ExpandOp {
4401 from_variable: "a".to_string(),
4402 to_variable: "b".to_string(),
4403 edge_variable: None,
4404 direction: ExpandDirection::Outgoing,
4405 edge_type: Some("KNOWS".to_string()),
4406 min_hops: 1,
4407 max_hops: Some(1),
4408 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4409 variable: "a".to_string(),
4410 label: None,
4411 input: None,
4412 })),
4413 path_alias: None,
4414 })),
4415 }));
4416
4417 let physical = planner.plan_adaptive(&logical).unwrap();
4418 assert!(physical.adaptive_context.is_some());
4419 }
4420
4421 #[test]
4422 fn test_plan_adaptive_with_join() {
4423 let store = create_test_store();
4424 let planner = Planner::new(store);
4425
4426 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4427 items: vec![
4428 ReturnItem {
4429 expression: LogicalExpression::Variable("a".to_string()),
4430 alias: None,
4431 },
4432 ReturnItem {
4433 expression: LogicalExpression::Variable("b".to_string()),
4434 alias: None,
4435 },
4436 ],
4437 distinct: false,
4438 input: Box::new(LogicalOperator::Join(JoinOp {
4439 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4440 variable: "a".to_string(),
4441 label: None,
4442 input: None,
4443 })),
4444 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4445 variable: "b".to_string(),
4446 label: None,
4447 input: None,
4448 })),
4449 join_type: JoinType::Cross,
4450 conditions: vec![],
4451 })),
4452 }));
4453
4454 let physical = planner.plan_adaptive(&logical).unwrap();
4455 assert!(physical.adaptive_context.is_some());
4456 }
4457
4458 #[test]
4459 fn test_plan_adaptive_with_aggregate() {
4460 let store = create_test_store();
4461 let planner = Planner::new(store);
4462
4463 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4464 group_by: vec![],
4465 aggregates: vec![LogicalAggregateExpr {
4466 function: LogicalAggregateFunction::Count,
4467 expression: Some(LogicalExpression::Variable("n".to_string())),
4468 distinct: false,
4469 alias: Some("cnt".to_string()),
4470 percentile: None,
4471 }],
4472 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4473 variable: "n".to_string(),
4474 label: None,
4475 input: None,
4476 })),
4477 having: None,
4478 }));
4479
4480 let physical = planner.plan_adaptive(&logical).unwrap();
4481 assert!(physical.adaptive_context.is_some());
4482 }
4483
4484 #[test]
4485 fn test_plan_adaptive_with_distinct() {
4486 let store = create_test_store();
4487 let planner = Planner::new(store);
4488
4489 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4490 items: vec![ReturnItem {
4491 expression: LogicalExpression::Variable("n".to_string()),
4492 alias: None,
4493 }],
4494 distinct: false,
4495 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4496 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4497 variable: "n".to_string(),
4498 label: None,
4499 input: None,
4500 })),
4501 columns: None,
4502 })),
4503 }));
4504
4505 let physical = planner.plan_adaptive(&logical).unwrap();
4506 assert!(physical.adaptive_context.is_some());
4507 }
4508
4509 #[test]
4510 fn test_plan_adaptive_with_limit() {
4511 let store = create_test_store();
4512 let planner = Planner::new(store);
4513
4514 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4515 items: vec![ReturnItem {
4516 expression: LogicalExpression::Variable("n".to_string()),
4517 alias: None,
4518 }],
4519 distinct: false,
4520 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4521 count: 10,
4522 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4523 variable: "n".to_string(),
4524 label: None,
4525 input: None,
4526 })),
4527 })),
4528 }));
4529
4530 let physical = planner.plan_adaptive(&logical).unwrap();
4531 assert!(physical.adaptive_context.is_some());
4532 }
4533
4534 #[test]
4535 fn test_plan_adaptive_with_skip() {
4536 let store = create_test_store();
4537 let planner = Planner::new(store);
4538
4539 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4540 items: vec![ReturnItem {
4541 expression: LogicalExpression::Variable("n".to_string()),
4542 alias: None,
4543 }],
4544 distinct: false,
4545 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4546 count: 5,
4547 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4548 variable: "n".to_string(),
4549 label: None,
4550 input: None,
4551 })),
4552 })),
4553 }));
4554
4555 let physical = planner.plan_adaptive(&logical).unwrap();
4556 assert!(physical.adaptive_context.is_some());
4557 }
4558
4559 #[test]
4560 fn test_plan_adaptive_with_sort() {
4561 let store = create_test_store();
4562 let planner = Planner::new(store);
4563
4564 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4565 items: vec![ReturnItem {
4566 expression: LogicalExpression::Variable("n".to_string()),
4567 alias: None,
4568 }],
4569 distinct: false,
4570 input: Box::new(LogicalOperator::Sort(SortOp {
4571 keys: vec![SortKey {
4572 expression: LogicalExpression::Variable("n".to_string()),
4573 order: SortOrder::Ascending,
4574 }],
4575 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4576 variable: "n".to_string(),
4577 label: None,
4578 input: None,
4579 })),
4580 })),
4581 }));
4582
4583 let physical = planner.plan_adaptive(&logical).unwrap();
4584 assert!(physical.adaptive_context.is_some());
4585 }
4586
4587 #[test]
4588 fn test_plan_adaptive_with_union() {
4589 let store = create_test_store();
4590 let planner = Planner::new(store);
4591
4592 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4593 items: vec![ReturnItem {
4594 expression: LogicalExpression::Variable("n".to_string()),
4595 alias: None,
4596 }],
4597 distinct: false,
4598 input: Box::new(LogicalOperator::Union(UnionOp {
4599 inputs: vec![
4600 LogicalOperator::NodeScan(NodeScanOp {
4601 variable: "n".to_string(),
4602 label: Some("Person".to_string()),
4603 input: None,
4604 }),
4605 LogicalOperator::NodeScan(NodeScanOp {
4606 variable: "n".to_string(),
4607 label: Some("Company".to_string()),
4608 input: None,
4609 }),
4610 ],
4611 })),
4612 }));
4613
4614 let physical = planner.plan_adaptive(&logical).unwrap();
4615 assert!(physical.adaptive_context.is_some());
4616 }
4617
4618 #[test]
4621 fn test_plan_expand_variable_length() {
4622 let store = create_test_store();
4623 let planner = Planner::new(store);
4624
4625 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4627 items: vec![
4628 ReturnItem {
4629 expression: LogicalExpression::Variable("a".to_string()),
4630 alias: None,
4631 },
4632 ReturnItem {
4633 expression: LogicalExpression::Variable("b".to_string()),
4634 alias: None,
4635 },
4636 ],
4637 distinct: false,
4638 input: Box::new(LogicalOperator::Expand(ExpandOp {
4639 from_variable: "a".to_string(),
4640 to_variable: "b".to_string(),
4641 edge_variable: None,
4642 direction: ExpandDirection::Outgoing,
4643 edge_type: Some("KNOWS".to_string()),
4644 min_hops: 1,
4645 max_hops: Some(3),
4646 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4647 variable: "a".to_string(),
4648 label: None,
4649 input: None,
4650 })),
4651 path_alias: None,
4652 })),
4653 }));
4654
4655 let physical = planner.plan(&logical).unwrap();
4656 assert!(physical.columns().contains(&"a".to_string()));
4657 assert!(physical.columns().contains(&"b".to_string()));
4658 }
4659
4660 #[test]
4661 fn test_plan_expand_with_path_alias() {
4662 let store = create_test_store();
4663 let planner = Planner::new(store);
4664
4665 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4667 items: vec![
4668 ReturnItem {
4669 expression: LogicalExpression::Variable("a".to_string()),
4670 alias: None,
4671 },
4672 ReturnItem {
4673 expression: LogicalExpression::Variable("b".to_string()),
4674 alias: None,
4675 },
4676 ],
4677 distinct: false,
4678 input: Box::new(LogicalOperator::Expand(ExpandOp {
4679 from_variable: "a".to_string(),
4680 to_variable: "b".to_string(),
4681 edge_variable: None,
4682 direction: ExpandDirection::Outgoing,
4683 edge_type: Some("KNOWS".to_string()),
4684 min_hops: 1,
4685 max_hops: Some(3),
4686 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4687 variable: "a".to_string(),
4688 label: None,
4689 input: None,
4690 })),
4691 path_alias: Some("p".to_string()),
4692 })),
4693 }));
4694
4695 let physical = planner.plan(&logical).unwrap();
4696 assert!(physical.columns().contains(&"a".to_string()));
4698 assert!(physical.columns().contains(&"b".to_string()));
4699 }
4700
4701 #[test]
4702 fn test_plan_expand_incoming() {
4703 let store = create_test_store();
4704 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4705
4706 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4708 items: vec![
4709 ReturnItem {
4710 expression: LogicalExpression::Variable("a".to_string()),
4711 alias: None,
4712 },
4713 ReturnItem {
4714 expression: LogicalExpression::Variable("b".to_string()),
4715 alias: None,
4716 },
4717 ],
4718 distinct: false,
4719 input: Box::new(LogicalOperator::Expand(ExpandOp {
4720 from_variable: "a".to_string(),
4721 to_variable: "b".to_string(),
4722 edge_variable: None,
4723 direction: ExpandDirection::Incoming,
4724 edge_type: Some("KNOWS".to_string()),
4725 min_hops: 1,
4726 max_hops: Some(1),
4727 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4728 variable: "a".to_string(),
4729 label: None,
4730 input: None,
4731 })),
4732 path_alias: None,
4733 })),
4734 }));
4735
4736 let physical = planner.plan(&logical).unwrap();
4737 assert!(physical.columns().contains(&"a".to_string()));
4738 assert!(physical.columns().contains(&"b".to_string()));
4739 }
4740
4741 #[test]
4742 fn test_plan_expand_both_directions() {
4743 let store = create_test_store();
4744 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4745
4746 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4748 items: vec![
4749 ReturnItem {
4750 expression: LogicalExpression::Variable("a".to_string()),
4751 alias: None,
4752 },
4753 ReturnItem {
4754 expression: LogicalExpression::Variable("b".to_string()),
4755 alias: None,
4756 },
4757 ],
4758 distinct: false,
4759 input: Box::new(LogicalOperator::Expand(ExpandOp {
4760 from_variable: "a".to_string(),
4761 to_variable: "b".to_string(),
4762 edge_variable: None,
4763 direction: ExpandDirection::Both,
4764 edge_type: Some("KNOWS".to_string()),
4765 min_hops: 1,
4766 max_hops: Some(1),
4767 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4768 variable: "a".to_string(),
4769 label: None,
4770 input: None,
4771 })),
4772 path_alias: None,
4773 })),
4774 }));
4775
4776 let physical = planner.plan(&logical).unwrap();
4777 assert!(physical.columns().contains(&"a".to_string()));
4778 assert!(physical.columns().contains(&"b".to_string()));
4779 }
4780
4781 #[test]
4784 fn test_planner_with_context() {
4785 use crate::transaction::TransactionManager;
4786
4787 let store = create_test_store();
4788 let tx_manager = Arc::new(TransactionManager::new());
4789 let tx_id = tx_manager.begin();
4790 let epoch = tx_manager.current_epoch();
4791
4792 let planner = Planner::with_context(
4793 Arc::clone(&store),
4794 Arc::clone(&tx_manager),
4795 Some(tx_id),
4796 epoch,
4797 );
4798
4799 assert_eq!(planner.tx_id(), Some(tx_id));
4800 assert!(planner.tx_manager().is_some());
4801 assert_eq!(planner.viewing_epoch(), epoch);
4802 }
4803
4804 #[test]
4805 fn test_planner_with_factorized_execution_disabled() {
4806 let store = create_test_store();
4807 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4808
4809 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4811 items: vec![
4812 ReturnItem {
4813 expression: LogicalExpression::Variable("a".to_string()),
4814 alias: None,
4815 },
4816 ReturnItem {
4817 expression: LogicalExpression::Variable("c".to_string()),
4818 alias: None,
4819 },
4820 ],
4821 distinct: false,
4822 input: Box::new(LogicalOperator::Expand(ExpandOp {
4823 from_variable: "b".to_string(),
4824 to_variable: "c".to_string(),
4825 edge_variable: None,
4826 direction: ExpandDirection::Outgoing,
4827 edge_type: None,
4828 min_hops: 1,
4829 max_hops: Some(1),
4830 input: Box::new(LogicalOperator::Expand(ExpandOp {
4831 from_variable: "a".to_string(),
4832 to_variable: "b".to_string(),
4833 edge_variable: None,
4834 direction: ExpandDirection::Outgoing,
4835 edge_type: None,
4836 min_hops: 1,
4837 max_hops: Some(1),
4838 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4839 variable: "a".to_string(),
4840 label: None,
4841 input: None,
4842 })),
4843 path_alias: None,
4844 })),
4845 path_alias: None,
4846 })),
4847 }));
4848
4849 let physical = planner.plan(&logical).unwrap();
4850 assert!(physical.columns().contains(&"a".to_string()));
4851 assert!(physical.columns().contains(&"c".to_string()));
4852 }
4853
4854 #[test]
4857 fn test_plan_sort_by_property() {
4858 let store = create_test_store();
4859 let planner = Planner::new(store);
4860
4861 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4863 items: vec![ReturnItem {
4864 expression: LogicalExpression::Variable("n".to_string()),
4865 alias: None,
4866 }],
4867 distinct: false,
4868 input: Box::new(LogicalOperator::Sort(SortOp {
4869 keys: vec![SortKey {
4870 expression: LogicalExpression::Property {
4871 variable: "n".to_string(),
4872 property: "name".to_string(),
4873 },
4874 order: SortOrder::Ascending,
4875 }],
4876 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4877 variable: "n".to_string(),
4878 label: None,
4879 input: None,
4880 })),
4881 })),
4882 }));
4883
4884 let physical = planner.plan(&logical).unwrap();
4885 assert!(physical.columns().contains(&"n".to_string()));
4887 }
4888
4889 #[test]
4892 fn test_plan_scan_with_input() {
4893 let store = create_test_store();
4894 let planner = Planner::new(store);
4895
4896 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4898 items: vec![
4899 ReturnItem {
4900 expression: LogicalExpression::Variable("a".to_string()),
4901 alias: None,
4902 },
4903 ReturnItem {
4904 expression: LogicalExpression::Variable("b".to_string()),
4905 alias: None,
4906 },
4907 ],
4908 distinct: false,
4909 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4910 variable: "b".to_string(),
4911 label: Some("Company".to_string()),
4912 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4913 variable: "a".to_string(),
4914 label: Some("Person".to_string()),
4915 input: None,
4916 }))),
4917 })),
4918 }));
4919
4920 let physical = planner.plan(&logical).unwrap();
4921 assert!(physical.columns().contains(&"a".to_string()));
4922 assert!(physical.columns().contains(&"b".to_string()));
4923 }
4924}