1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
10 ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
11 LogicalOperator, LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp,
12 ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29 UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37struct RangeBounds<'a> {
39 min: Option<&'a Value>,
40 max: Option<&'a Value>,
41 min_inclusive: bool,
42 max_inclusive: bool,
43}
44
45pub struct Planner {
47 store: Arc<LpgStore>,
49 tx_manager: Option<Arc<TransactionManager>>,
51 tx_id: Option<TxId>,
53 viewing_epoch: EpochId,
55 anon_edge_counter: std::cell::Cell<u32>,
57 factorized_execution: bool,
59}
60
61impl Planner {
62 #[must_use]
67 pub fn new(store: Arc<LpgStore>) -> Self {
68 let epoch = store.current_epoch();
69 Self {
70 store,
71 tx_manager: None,
72 tx_id: None,
73 viewing_epoch: epoch,
74 anon_edge_counter: std::cell::Cell::new(0),
75 factorized_execution: true,
76 }
77 }
78
79 #[must_use]
88 pub fn with_context(
89 store: Arc<LpgStore>,
90 tx_manager: Arc<TransactionManager>,
91 tx_id: Option<TxId>,
92 viewing_epoch: EpochId,
93 ) -> Self {
94 Self {
95 store,
96 tx_manager: Some(tx_manager),
97 tx_id,
98 viewing_epoch,
99 anon_edge_counter: std::cell::Cell::new(0),
100 factorized_execution: true,
101 }
102 }
103
104 #[must_use]
106 pub fn viewing_epoch(&self) -> EpochId {
107 self.viewing_epoch
108 }
109
110 #[must_use]
112 pub fn tx_id(&self) -> Option<TxId> {
113 self.tx_id
114 }
115
116 #[must_use]
118 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119 self.tx_manager.as_ref()
120 }
121
122 #[must_use]
124 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125 self.factorized_execution = enabled;
126 self
127 }
128
129 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133 match op {
134 LogicalOperator::Expand(expand) => {
135 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138 if is_single_hop {
139 let (inner_count, base) = Self::count_expand_chain(&expand.input);
140 (inner_count + 1, base)
141 } else {
142 (0, op)
144 }
145 }
146 _ => (0, op),
147 }
148 }
149
150 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154 let mut chain = Vec::new();
155 let mut current = op;
156
157 while let LogicalOperator::Expand(expand) = current {
158 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160 if !is_single_hop {
161 break;
162 }
163 chain.push(expand);
164 current = &expand.input;
165 }
166
167 chain.reverse();
169 chain
170 }
171
172 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179 Ok(PhysicalPlan {
180 operator,
181 columns,
182 adaptive_context: None,
183 })
184 }
185
186 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197 let mut adaptive_context = AdaptiveContext::new();
199 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201 Ok(PhysicalPlan {
202 operator,
203 columns,
204 adaptive_context: Some(adaptive_context),
205 })
206 }
207
208 fn collect_cardinality_estimates(
210 &self,
211 op: &LogicalOperator,
212 ctx: &mut AdaptiveContext,
213 depth: usize,
214 ) {
215 match op {
216 LogicalOperator::NodeScan(scan) => {
217 let estimate = if let Some(label) = &scan.label {
219 self.store.nodes_by_label(label).len() as f64
220 } else {
221 self.store.node_count() as f64
222 };
223 let id = format!("scan_{}", scan.variable);
224 ctx.set_estimate(&id, estimate);
225
226 if let Some(input) = &scan.input {
228 self.collect_cardinality_estimates(input, ctx, depth + 1);
229 }
230 }
231 LogicalOperator::Filter(filter) => {
232 let input_estimate = self.estimate_cardinality(&filter.input);
234 let estimate = input_estimate * 0.3;
235 let id = format!("filter_{depth}");
236 ctx.set_estimate(&id, estimate);
237
238 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239 }
240 LogicalOperator::Expand(expand) => {
241 let input_estimate = self.estimate_cardinality(&expand.input);
243 let stats = self.store.statistics();
244 let avg_degree = self.estimate_expand_degree(&stats, expand);
245 let estimate = input_estimate * avg_degree;
246 let id = format!("expand_{}", expand.to_variable);
247 ctx.set_estimate(&id, estimate);
248
249 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250 }
251 LogicalOperator::Join(join) => {
252 let left_est = self.estimate_cardinality(&join.left);
254 let right_est = self.estimate_cardinality(&join.right);
255 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
257 ctx.set_estimate(&id, estimate);
258
259 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261 }
262 LogicalOperator::Aggregate(agg) => {
263 let input_estimate = self.estimate_cardinality(&agg.input);
265 let estimate = if agg.group_by.is_empty() {
266 1.0 } else {
268 (input_estimate * 0.1).max(1.0) };
270 let id = format!("aggregate_{depth}");
271 ctx.set_estimate(&id, estimate);
272
273 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274 }
275 LogicalOperator::Distinct(distinct) => {
276 let input_estimate = self.estimate_cardinality(&distinct.input);
277 let estimate = (input_estimate * 0.5).max(1.0);
278 let id = format!("distinct_{depth}");
279 ctx.set_estimate(&id, estimate);
280
281 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282 }
283 LogicalOperator::Return(ret) => {
284 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285 }
286 LogicalOperator::Limit(limit) => {
287 let input_estimate = self.estimate_cardinality(&limit.input);
288 let estimate = (input_estimate).min(limit.count as f64);
289 let id = format!("limit_{depth}");
290 ctx.set_estimate(&id, estimate);
291
292 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293 }
294 LogicalOperator::Skip(skip) => {
295 let input_estimate = self.estimate_cardinality(&skip.input);
296 let estimate = (input_estimate - skip.count as f64).max(0.0);
297 let id = format!("skip_{depth}");
298 ctx.set_estimate(&id, estimate);
299
300 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301 }
302 LogicalOperator::Sort(sort) => {
303 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305 }
306 LogicalOperator::Union(union) => {
307 let estimate: f64 = union
308 .inputs
309 .iter()
310 .map(|input| self.estimate_cardinality(input))
311 .sum();
312 let id = format!("union_{depth}");
313 ctx.set_estimate(&id, estimate);
314
315 for input in &union.inputs {
316 self.collect_cardinality_estimates(input, ctx, depth + 1);
317 }
318 }
319 _ => {
320 }
322 }
323 }
324
325 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327 match op {
328 LogicalOperator::NodeScan(scan) => {
329 if let Some(label) = &scan.label {
330 self.store.nodes_by_label(label).len() as f64
331 } else {
332 self.store.node_count() as f64
333 }
334 }
335 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336 LogicalOperator::Expand(expand) => {
337 let stats = self.store.statistics();
338 let avg_degree = self.estimate_expand_degree(&stats, expand);
339 self.estimate_cardinality(&expand.input) * avg_degree
340 }
341 LogicalOperator::Join(join) => {
342 let left = self.estimate_cardinality(&join.left);
343 let right = self.estimate_cardinality(&join.right);
344 (left * right).sqrt()
345 }
346 LogicalOperator::Aggregate(agg) => {
347 if agg.group_by.is_empty() {
348 1.0
349 } else {
350 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351 }
352 }
353 LogicalOperator::Distinct(distinct) => {
354 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355 }
356 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357 LogicalOperator::Limit(limit) => self
358 .estimate_cardinality(&limit.input)
359 .min(limit.count as f64),
360 LogicalOperator::Skip(skip) => {
361 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362 }
363 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364 LogicalOperator::Union(union) => union
365 .inputs
366 .iter()
367 .map(|input| self.estimate_cardinality(input))
368 .sum(),
369 _ => 1000.0, }
371 }
372
373 fn estimate_expand_degree(
375 &self,
376 stats: &grafeo_core::statistics::Statistics,
377 expand: &ExpandOp,
378 ) -> f64 {
379 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380 if let Some(edge_type) = &expand.edge_type {
381 stats.estimate_avg_degree(edge_type, outgoing)
382 } else if stats.total_nodes > 0 {
383 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384 } else {
385 10.0 }
387 }
388
389 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391 match op {
392 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393 LogicalOperator::Expand(expand) => {
394 if self.factorized_execution {
396 let (chain_len, _base) = Self::count_expand_chain(op);
397 if chain_len >= 2 {
398 return self.plan_expand_chain(op);
400 }
401 }
402 self.plan_expand(expand)
403 }
404 LogicalOperator::Return(ret) => self.plan_return(ret),
405 LogicalOperator::Filter(filter) => self.plan_filter(filter),
406 LogicalOperator::Project(project) => self.plan_project(project),
407 LogicalOperator::Limit(limit) => self.plan_limit(limit),
408 LogicalOperator::Skip(skip) => self.plan_skip(skip),
409 LogicalOperator::Sort(sort) => self.plan_sort(sort),
410 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411 LogicalOperator::Join(join) => self.plan_join(join),
412 LogicalOperator::Union(union) => self.plan_union(union),
413 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421 LogicalOperator::Merge(merge) => self.plan_merge(merge),
422 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
427 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
428 LogicalOperator::VectorScan(_) => Err(Error::Internal(
429 "VectorScan requires vector-index feature".to_string(),
430 )),
431 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
432 "VectorJoin requires vector-index feature".to_string(),
433 )),
434 _ => Err(Error::Internal(format!(
435 "Unsupported operator: {:?}",
436 std::mem::discriminant(op)
437 ))),
438 }
439 }
440
441 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
443 let scan_op = if let Some(label) = &scan.label {
444 ScanOperator::with_label(Arc::clone(&self.store), label)
445 } else {
446 ScanOperator::new(Arc::clone(&self.store))
447 };
448
449 let scan_operator: Box<dyn Operator> =
451 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
452
453 if let Some(input) = &scan.input {
455 let (input_op, mut input_columns) = self.plan_operator(input)?;
456
457 let mut output_schema: Vec<LogicalType> =
459 input_columns.iter().map(|_| LogicalType::Any).collect();
460 output_schema.push(LogicalType::Node);
461
462 input_columns.push(scan.variable.clone());
464
465 let join_op = Box::new(NestedLoopJoinOperator::new(
467 input_op,
468 scan_operator,
469 None, PhysicalJoinType::Cross,
471 output_schema,
472 ));
473
474 Ok((join_op, input_columns))
475 } else {
476 let columns = vec![scan.variable.clone()];
477 Ok((scan_operator, columns))
478 }
479 }
480
481 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
483 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
485
486 let source_column = input_columns
488 .iter()
489 .position(|c| c == &expand.from_variable)
490 .ok_or_else(|| {
491 Error::Internal(format!(
492 "Source variable '{}' not found in input columns",
493 expand.from_variable
494 ))
495 })?;
496
497 let direction = match expand.direction {
499 ExpandDirection::Outgoing => Direction::Outgoing,
500 ExpandDirection::Incoming => Direction::Incoming,
501 ExpandDirection::Both => Direction::Both,
502 };
503
504 let is_variable_length =
506 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
507
508 let operator: Box<dyn Operator> = if is_variable_length {
509 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
512 Arc::clone(&self.store),
513 input_op,
514 source_column,
515 direction,
516 expand.edge_type.clone(),
517 expand.min_hops,
518 max_hops,
519 )
520 .with_tx_context(self.viewing_epoch, self.tx_id);
521
522 if expand.path_alias.is_some() {
524 expand_op = expand_op
525 .with_path_length_output()
526 .with_path_detail_output();
527 }
528
529 Box::new(expand_op)
530 } else {
531 let expand_op = ExpandOperator::new(
533 Arc::clone(&self.store),
534 input_op,
535 source_column,
536 direction,
537 expand.edge_type.clone(),
538 )
539 .with_tx_context(self.viewing_epoch, self.tx_id);
540 Box::new(expand_op)
541 };
542
543 let mut columns = input_columns;
546
547 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
549 let count = self.anon_edge_counter.get();
550 self.anon_edge_counter.set(count + 1);
551 format!("_anon_edge_{}", count)
552 });
553 columns.push(edge_col_name);
554
555 columns.push(expand.to_variable.clone());
556
557 if let Some(ref path_alias) = expand.path_alias {
559 columns.push(format!("_path_length_{}", path_alias));
560 columns.push(format!("_path_nodes_{}", path_alias));
561 columns.push(format!("_path_edges_{}", path_alias));
562 }
563
564 Ok((operator, columns))
565 }
566
567 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
575 let expands = Self::collect_expand_chain(op);
576 if expands.is_empty() {
577 return Err(Error::Internal("Empty expand chain".to_string()));
578 }
579
580 let first_expand = expands[0];
582 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
583
584 let mut columns = base_columns.clone();
585 let mut steps = Vec::new();
586
587 let mut is_first = true;
592
593 for expand in &expands {
594 let source_column = if is_first {
596 base_columns
598 .iter()
599 .position(|c| c == &expand.from_variable)
600 .ok_or_else(|| {
601 Error::Internal(format!(
602 "Source variable '{}' not found in base columns",
603 expand.from_variable
604 ))
605 })?
606 } else {
607 1
610 };
611
612 let direction = match expand.direction {
614 ExpandDirection::Outgoing => Direction::Outgoing,
615 ExpandDirection::Incoming => Direction::Incoming,
616 ExpandDirection::Both => Direction::Both,
617 };
618
619 steps.push(ExpandStep {
621 source_column,
622 direction,
623 edge_type: expand.edge_type.clone(),
624 });
625
626 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
628 let count = self.anon_edge_counter.get();
629 self.anon_edge_counter.set(count + 1);
630 format!("_anon_edge_{}", count)
631 });
632 columns.push(edge_col_name);
633 columns.push(expand.to_variable.clone());
634
635 is_first = false;
636 }
637
638 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
640
641 if let Some(tx_id) = self.tx_id {
642 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
643 } else {
644 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
645 }
646
647 Ok((Box::new(lazy_op), columns))
648 }
649
650 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
652 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
654
655 let variable_columns: HashMap<String, usize> = input_columns
657 .iter()
658 .enumerate()
659 .map(|(i, name)| (name.clone(), i))
660 .collect();
661
662 let columns: Vec<String> = ret
664 .items
665 .iter()
666 .map(|item| {
667 item.alias.clone().unwrap_or_else(|| {
668 expression_to_string(&item.expression)
670 })
671 })
672 .collect();
673
674 let needs_project = ret
676 .items
677 .iter()
678 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
679
680 if needs_project {
681 let mut projections = Vec::with_capacity(ret.items.len());
683 let mut output_types = Vec::with_capacity(ret.items.len());
684
685 for item in &ret.items {
686 match &item.expression {
687 LogicalExpression::Variable(name) => {
688 let col_idx = *variable_columns.get(name).ok_or_else(|| {
689 Error::Internal(format!("Variable '{}' not found in input", name))
690 })?;
691 projections.push(ProjectExpr::Column(col_idx));
692 if name.starts_with("_path_nodes_")
694 || name.starts_with("_path_edges_")
695 || name.starts_with("_path_length_")
696 {
697 output_types.push(LogicalType::Any);
698 } else {
699 output_types.push(LogicalType::Node);
700 }
701 }
702 LogicalExpression::Property { variable, property } => {
703 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
704 Error::Internal(format!("Variable '{}' not found in input", variable))
705 })?;
706 projections.push(ProjectExpr::PropertyAccess {
707 column: col_idx,
708 property: property.clone(),
709 });
710 output_types.push(LogicalType::Any);
712 }
713 LogicalExpression::Literal(value) => {
714 projections.push(ProjectExpr::Constant(value.clone()));
715 output_types.push(value_to_logical_type(value));
716 }
717 LogicalExpression::FunctionCall { name, args, .. } => {
718 match name.to_lowercase().as_str() {
720 "type" => {
721 if args.len() != 1 {
723 return Err(Error::Internal(
724 "type() requires exactly one argument".to_string(),
725 ));
726 }
727 if let LogicalExpression::Variable(var_name) = &args[0] {
728 let col_idx =
729 *variable_columns.get(var_name).ok_or_else(|| {
730 Error::Internal(format!(
731 "Variable '{}' not found in input",
732 var_name
733 ))
734 })?;
735 projections.push(ProjectExpr::EdgeType { column: col_idx });
736 output_types.push(LogicalType::String);
737 } else {
738 return Err(Error::Internal(
739 "type() argument must be a variable".to_string(),
740 ));
741 }
742 }
743 "length" => {
744 if args.len() != 1 {
747 return Err(Error::Internal(
748 "length() requires exactly one argument".to_string(),
749 ));
750 }
751 if let LogicalExpression::Variable(var_name) = &args[0] {
752 let col_idx =
753 *variable_columns.get(var_name).ok_or_else(|| {
754 Error::Internal(format!(
755 "Variable '{}' not found in input",
756 var_name
757 ))
758 })?;
759 projections.push(ProjectExpr::Column(col_idx));
761 output_types.push(LogicalType::Int64);
762 } else {
763 return Err(Error::Internal(
764 "length() argument must be a variable".to_string(),
765 ));
766 }
767 }
768 "nodes" | "edges" => {
769 if args.len() != 1 {
771 return Err(Error::Internal(format!(
772 "{}() requires exactly one argument",
773 name
774 )));
775 }
776 if let LogicalExpression::Variable(var_name) = &args[0] {
777 let col_idx =
778 *variable_columns.get(var_name).ok_or_else(|| {
779 Error::Internal(format!(
780 "Variable '{var_name}' not found in input",
781 ))
782 })?;
783 projections.push(ProjectExpr::Column(col_idx));
784 output_types.push(LogicalType::Any);
785 } else {
786 return Err(Error::Internal(format!(
787 "{}() argument must be a variable",
788 name
789 )));
790 }
791 }
792 _ => {
794 let filter_expr = self.convert_expression(&item.expression)?;
795 projections.push(ProjectExpr::Expression {
796 expr: filter_expr,
797 variable_columns: variable_columns.clone(),
798 });
799 output_types.push(LogicalType::Any);
800 }
801 }
802 }
803 LogicalExpression::Case { .. } => {
804 let filter_expr = self.convert_expression(&item.expression)?;
806 projections.push(ProjectExpr::Expression {
807 expr: filter_expr,
808 variable_columns: variable_columns.clone(),
809 });
810 output_types.push(LogicalType::Any);
812 }
813 _ => {
814 return Err(Error::Internal(format!(
815 "Unsupported RETURN expression: {:?}",
816 item.expression
817 )));
818 }
819 }
820 }
821
822 let operator = Box::new(ProjectOperator::with_store(
823 input_op,
824 projections,
825 output_types,
826 Arc::clone(&self.store),
827 ));
828
829 Ok((operator, columns))
830 } else {
831 let mut projections = Vec::with_capacity(ret.items.len());
834 let mut output_types = Vec::with_capacity(ret.items.len());
835
836 for item in &ret.items {
837 if let LogicalExpression::Variable(name) = &item.expression {
838 let col_idx = *variable_columns.get(name).ok_or_else(|| {
839 Error::Internal(format!("Variable '{}' not found in input", name))
840 })?;
841 projections.push(ProjectExpr::Column(col_idx));
842 output_types.push(LogicalType::Node);
843 }
844 }
845
846 if projections.len() == input_columns.len()
848 && projections
849 .iter()
850 .enumerate()
851 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
852 {
853 Ok((input_op, columns))
855 } else {
856 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
857 Ok((operator, columns))
858 }
859 }
860 }
861
862 fn plan_project(
864 &self,
865 project: &crate::query::plan::ProjectOp,
866 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
867 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
869 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
870 let single_row_op: Box<dyn Operator> = Box::new(
872 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
873 );
874 (single_row_op, Vec::new())
875 } else {
876 self.plan_operator(&project.input)?
877 };
878
879 let variable_columns: HashMap<String, usize> = input_columns
881 .iter()
882 .enumerate()
883 .map(|(i, name)| (name.clone(), i))
884 .collect();
885
886 let mut projections = Vec::with_capacity(project.projections.len());
888 let mut output_types = Vec::with_capacity(project.projections.len());
889 let mut output_columns = Vec::with_capacity(project.projections.len());
890
891 for projection in &project.projections {
892 let col_name = projection
894 .alias
895 .clone()
896 .unwrap_or_else(|| expression_to_string(&projection.expression));
897 output_columns.push(col_name);
898
899 match &projection.expression {
900 LogicalExpression::Variable(name) => {
901 let col_idx = *variable_columns.get(name).ok_or_else(|| {
902 Error::Internal(format!("Variable '{}' not found in input", name))
903 })?;
904 projections.push(ProjectExpr::Column(col_idx));
905 output_types.push(LogicalType::Node);
906 }
907 LogicalExpression::Property { variable, property } => {
908 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
909 Error::Internal(format!("Variable '{}' not found in input", variable))
910 })?;
911 projections.push(ProjectExpr::PropertyAccess {
912 column: col_idx,
913 property: property.clone(),
914 });
915 output_types.push(LogicalType::Any);
916 }
917 LogicalExpression::Literal(value) => {
918 projections.push(ProjectExpr::Constant(value.clone()));
919 output_types.push(value_to_logical_type(value));
920 }
921 _ => {
922 let filter_expr = self.convert_expression(&projection.expression)?;
924 projections.push(ProjectExpr::Expression {
925 expr: filter_expr,
926 variable_columns: variable_columns.clone(),
927 });
928 output_types.push(LogicalType::Any);
929 }
930 }
931 }
932
933 let operator = Box::new(ProjectOperator::with_store(
934 input_op,
935 projections,
936 output_types,
937 Arc::clone(&self.store),
938 ));
939
940 Ok((operator, output_columns))
941 }
942
943 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
949 if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
952 let (_, columns) = self.plan_operator(&filter.input)?;
954 let schema = self.derive_schema_from_columns(&columns);
955 let empty_op = Box::new(EmptyOperator::new(schema));
956 return Ok((empty_op, columns));
957 }
958
959 if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
961 return Ok(result);
962 }
963
964 if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
966 return Ok(result);
967 }
968
969 let (input_op, columns) = self.plan_operator(&filter.input)?;
971
972 let variable_columns: HashMap<String, usize> = columns
974 .iter()
975 .enumerate()
976 .map(|(i, name)| (name.clone(), i))
977 .collect();
978
979 let filter_expr = self.convert_expression(&filter.predicate)?;
981
982 let predicate =
984 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
985
986 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
988
989 Ok((operator, columns))
990 }
991
992 fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
999 use grafeo_core::graph::lpg::CompareOp;
1000
1001 match predicate {
1002 LogicalExpression::Binary { left, op, right } => {
1003 match op {
1005 BinaryOp::And => {
1006 let left_result = self.check_zone_map_for_predicate(left);
1007 let right_result = self.check_zone_map_for_predicate(right);
1008
1009 return match (left_result, right_result) {
1010 (Some(false), _) | (_, Some(false)) => Some(false),
1012 (Some(true), Some(true)) => Some(true),
1014 _ => None,
1016 };
1017 }
1018 BinaryOp::Or => {
1019 let left_result = self.check_zone_map_for_predicate(left);
1020 let right_result = self.check_zone_map_for_predicate(right);
1021
1022 return match (left_result, right_result) {
1023 (Some(false), Some(false)) => Some(false),
1025 (Some(true), _) | (_, Some(true)) => Some(true),
1027 _ => None,
1029 };
1030 }
1031 _ => {}
1032 }
1033
1034 let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1036 (
1037 LogicalExpression::Property { property, .. },
1038 LogicalExpression::Literal(val),
1039 ) => {
1040 let cmp = match op {
1041 BinaryOp::Eq => CompareOp::Eq,
1042 BinaryOp::Ne => CompareOp::Ne,
1043 BinaryOp::Lt => CompareOp::Lt,
1044 BinaryOp::Le => CompareOp::Le,
1045 BinaryOp::Gt => CompareOp::Gt,
1046 BinaryOp::Ge => CompareOp::Ge,
1047 _ => return None,
1048 };
1049 (property.clone(), cmp, val.clone())
1050 }
1051 (
1052 LogicalExpression::Literal(val),
1053 LogicalExpression::Property { property, .. },
1054 ) => {
1055 let cmp = match op {
1057 BinaryOp::Eq => CompareOp::Eq,
1058 BinaryOp::Ne => CompareOp::Ne,
1059 BinaryOp::Lt => CompareOp::Gt, BinaryOp::Le => CompareOp::Ge,
1061 BinaryOp::Gt => CompareOp::Lt,
1062 BinaryOp::Ge => CompareOp::Le,
1063 _ => return None,
1064 };
1065 (property.clone(), cmp, val.clone())
1066 }
1067 _ => return None,
1068 };
1069
1070 let might_match =
1072 self.store
1073 .node_property_might_match(&property.into(), compare_op, &value);
1074
1075 Some(might_match)
1076 }
1077
1078 _ => None,
1079 }
1080 }
1081
1082 fn try_plan_filter_with_property_index(
1091 &self,
1092 filter: &FilterOp,
1093 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1094 let (scan_variable, scan_label) = match filter.input.as_ref() {
1096 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1097 (scan.variable.clone(), scan.label.clone())
1098 }
1099 _ => return Ok(None),
1100 };
1101
1102 let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1105
1106 if conditions.is_empty() {
1107 return Ok(None);
1108 }
1109
1110 let has_indexed_condition = conditions
1112 .iter()
1113 .any(|(prop, _)| self.store.has_property_index(prop));
1114
1115 if !has_indexed_condition {
1116 return Ok(None);
1117 }
1118
1119 let conditions_ref: Vec<(&str, Value)> = conditions
1121 .iter()
1122 .map(|(p, v)| (p.as_str(), v.clone()))
1123 .collect();
1124 let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1125
1126 if let Some(label) = &scan_label {
1128 let label_nodes: std::collections::HashSet<_> =
1129 self.store.nodes_by_label(label).into_iter().collect();
1130 matching_nodes.retain(|n| label_nodes.contains(n));
1131 }
1132
1133 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1135 let columns = vec![scan_variable];
1136
1137 Ok(Some((node_list_op, columns)))
1138 }
1139
1140 fn extract_equality_conditions(
1146 &self,
1147 predicate: &LogicalExpression,
1148 target_variable: &str,
1149 ) -> Vec<(String, Value)> {
1150 let mut conditions = Vec::new();
1151 self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1152 conditions
1153 }
1154
1155 fn collect_equality_conditions(
1157 &self,
1158 expr: &LogicalExpression,
1159 target_variable: &str,
1160 conditions: &mut Vec<(String, Value)>,
1161 ) {
1162 match expr {
1163 LogicalExpression::Binary {
1165 left,
1166 op: BinaryOp::And,
1167 right,
1168 } => {
1169 self.collect_equality_conditions(left, target_variable, conditions);
1170 self.collect_equality_conditions(right, target_variable, conditions);
1171 }
1172
1173 LogicalExpression::Binary {
1175 left,
1176 op: BinaryOp::Eq,
1177 right,
1178 } => {
1179 if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1180 && var == target_variable
1181 {
1182 conditions.push((prop, val));
1183 }
1184 }
1185
1186 _ => {}
1187 }
1188 }
1189
1190 fn extract_property_equality(
1192 &self,
1193 left: &LogicalExpression,
1194 right: &LogicalExpression,
1195 ) -> Option<(String, String, Value)> {
1196 match (left, right) {
1197 (
1198 LogicalExpression::Property { variable, property },
1199 LogicalExpression::Literal(val),
1200 ) => Some((variable.clone(), property.clone(), val.clone())),
1201 (
1202 LogicalExpression::Literal(val),
1203 LogicalExpression::Property { variable, property },
1204 ) => Some((variable.clone(), property.clone(), val.clone())),
1205 _ => None,
1206 }
1207 }
1208
1209 fn try_plan_filter_with_range_index(
1222 &self,
1223 filter: &FilterOp,
1224 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1225 let (scan_variable, scan_label) = match filter.input.as_ref() {
1227 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1228 (scan.variable.clone(), scan.label.clone())
1229 }
1230 _ => return Ok(None),
1231 };
1232
1233 if let Some((variable, property, min, max, min_inc, max_inc)) =
1235 self.extract_between_predicate(&filter.predicate)
1236 && variable == scan_variable
1237 {
1238 return self.plan_range_filter(
1239 &scan_variable,
1240 &scan_label,
1241 &property,
1242 RangeBounds {
1243 min: Some(&min),
1244 max: Some(&max),
1245 min_inclusive: min_inc,
1246 max_inclusive: max_inc,
1247 },
1248 );
1249 }
1250
1251 if let Some((variable, property, op, value)) =
1253 self.extract_range_predicate(&filter.predicate)
1254 && variable == scan_variable
1255 {
1256 let (min, max, min_inc, max_inc) = match op {
1257 BinaryOp::Lt => (None, Some(value), false, false),
1258 BinaryOp::Le => (None, Some(value), false, true),
1259 BinaryOp::Gt => (Some(value), None, false, false),
1260 BinaryOp::Ge => (Some(value), None, true, false),
1261 _ => return Ok(None),
1262 };
1263 return self.plan_range_filter(
1264 &scan_variable,
1265 &scan_label,
1266 &property,
1267 RangeBounds {
1268 min: min.as_ref(),
1269 max: max.as_ref(),
1270 min_inclusive: min_inc,
1271 max_inclusive: max_inc,
1272 },
1273 );
1274 }
1275
1276 Ok(None)
1277 }
1278
1279 fn plan_range_filter(
1281 &self,
1282 scan_variable: &str,
1283 scan_label: &Option<String>,
1284 property: &str,
1285 bounds: RangeBounds<'_>,
1286 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1287 let mut matching_nodes = self.store.find_nodes_in_range(
1289 property,
1290 bounds.min,
1291 bounds.max,
1292 bounds.min_inclusive,
1293 bounds.max_inclusive,
1294 );
1295
1296 if let Some(label) = scan_label {
1298 let label_nodes: std::collections::HashSet<_> =
1299 self.store.nodes_by_label(label).into_iter().collect();
1300 matching_nodes.retain(|n| label_nodes.contains(n));
1301 }
1302
1303 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1305 let columns = vec![scan_variable.to_string()];
1306
1307 Ok(Some((node_list_op, columns)))
1308 }
1309
1310 fn extract_range_predicate(
1314 &self,
1315 predicate: &LogicalExpression,
1316 ) -> Option<(String, String, BinaryOp, Value)> {
1317 match predicate {
1318 LogicalExpression::Binary { left, op, right } => {
1319 match op {
1320 BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1321 if let (
1323 LogicalExpression::Property { variable, property },
1324 LogicalExpression::Literal(val),
1325 ) = (left.as_ref(), right.as_ref())
1326 {
1327 return Some((variable.clone(), property.clone(), *op, val.clone()));
1328 }
1329
1330 if let (
1332 LogicalExpression::Literal(val),
1333 LogicalExpression::Property { variable, property },
1334 ) = (left.as_ref(), right.as_ref())
1335 {
1336 let flipped_op = match op {
1337 BinaryOp::Lt => BinaryOp::Gt,
1338 BinaryOp::Le => BinaryOp::Ge,
1339 BinaryOp::Gt => BinaryOp::Lt,
1340 BinaryOp::Ge => BinaryOp::Le,
1341 _ => return None,
1342 };
1343 return Some((
1344 variable.clone(),
1345 property.clone(),
1346 flipped_op,
1347 val.clone(),
1348 ));
1349 }
1350 }
1351 _ => {}
1352 }
1353 }
1354 _ => {}
1355 }
1356 None
1357 }
1358
1359 fn extract_between_predicate(
1367 &self,
1368 predicate: &LogicalExpression,
1369 ) -> Option<(String, String, Value, Value, bool, bool)> {
1370 let (left, right) = match predicate {
1372 LogicalExpression::Binary {
1373 left,
1374 op: BinaryOp::And,
1375 right,
1376 } => (left.as_ref(), right.as_ref()),
1377 _ => return None,
1378 };
1379
1380 let left_range = self.extract_range_predicate(left);
1382 let right_range = self.extract_range_predicate(right);
1383
1384 let (left_var, left_prop, left_op, left_val) = left_range?;
1385 let (right_var, right_prop, right_op, right_val) = right_range?;
1386
1387 if left_var != right_var || left_prop != right_prop {
1389 return None;
1390 }
1391
1392 let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1394 (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1396 (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1398 (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1400 (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1402 (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1404 (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1406 (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1408 (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1410 _ => return None,
1411 };
1412
1413 Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1414 }
1415
1416 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1418 let (input_op, columns) = self.plan_operator(&limit.input)?;
1419 let output_schema = self.derive_schema_from_columns(&columns);
1420 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1421 Ok((operator, columns))
1422 }
1423
1424 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1426 let (input_op, columns) = self.plan_operator(&skip.input)?;
1427 let output_schema = self.derive_schema_from_columns(&columns);
1428 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1429 Ok((operator, columns))
1430 }
1431
1432 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1434 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1435
1436 let mut variable_columns: HashMap<String, usize> = input_columns
1438 .iter()
1439 .enumerate()
1440 .map(|(i, name)| (name.clone(), i))
1441 .collect();
1442
1443 let mut property_projections: Vec<(String, String, String)> = Vec::new();
1445 let mut next_col_idx = input_columns.len();
1446
1447 for key in &sort.keys {
1448 if let LogicalExpression::Property { variable, property } = &key.expression {
1449 let col_name = format!("{}_{}", variable, property);
1450 if !variable_columns.contains_key(&col_name) {
1451 property_projections.push((
1452 variable.clone(),
1453 property.clone(),
1454 col_name.clone(),
1455 ));
1456 variable_columns.insert(col_name, next_col_idx);
1457 next_col_idx += 1;
1458 }
1459 }
1460 }
1461
1462 let mut output_columns = input_columns.clone();
1464
1465 if !property_projections.is_empty() {
1467 let mut projections = Vec::new();
1468 let mut output_types = Vec::new();
1469
1470 for (i, _) in input_columns.iter().enumerate() {
1473 projections.push(ProjectExpr::Column(i));
1474 output_types.push(LogicalType::Node);
1475 }
1476
1477 for (variable, property, col_name) in &property_projections {
1479 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1480 Error::Internal(format!(
1481 "Variable '{}' not found for ORDER BY property projection",
1482 variable
1483 ))
1484 })?;
1485 projections.push(ProjectExpr::PropertyAccess {
1486 column: source_col,
1487 property: property.clone(),
1488 });
1489 output_types.push(LogicalType::Any);
1490 output_columns.push(col_name.clone());
1491 }
1492
1493 input_op = Box::new(ProjectOperator::with_store(
1494 input_op,
1495 projections,
1496 output_types,
1497 Arc::clone(&self.store),
1498 ));
1499 }
1500
1501 let physical_keys: Vec<PhysicalSortKey> = sort
1503 .keys
1504 .iter()
1505 .map(|key| {
1506 let col_idx = self
1507 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1508 Ok(PhysicalSortKey {
1509 column: col_idx,
1510 direction: match key.order {
1511 SortOrder::Ascending => SortDirection::Ascending,
1512 SortOrder::Descending => SortDirection::Descending,
1513 },
1514 null_order: NullOrder::NullsLast,
1515 })
1516 })
1517 .collect::<Result<Vec<_>>>()?;
1518
1519 let output_schema = self.derive_schema_from_columns(&output_columns);
1520 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1521 Ok((operator, output_columns))
1522 }
1523
1524 fn resolve_sort_expression_with_properties(
1526 &self,
1527 expr: &LogicalExpression,
1528 variable_columns: &HashMap<String, usize>,
1529 ) -> Result<usize> {
1530 match expr {
1531 LogicalExpression::Variable(name) => {
1532 variable_columns.get(name).copied().ok_or_else(|| {
1533 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1534 })
1535 }
1536 LogicalExpression::Property { variable, property } => {
1537 let col_name = format!("{}_{}", variable, property);
1539 variable_columns.get(&col_name).copied().ok_or_else(|| {
1540 Error::Internal(format!(
1541 "Property column '{}' not found for ORDER BY (from {}.{})",
1542 col_name, variable, property
1543 ))
1544 })
1545 }
1546 _ => Err(Error::Internal(format!(
1547 "Unsupported ORDER BY expression: {:?}",
1548 expr
1549 ))),
1550 }
1551 }
1552
1553 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1555 columns.iter().map(|_| LogicalType::Any).collect()
1556 }
1557
1558 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1560 if self.factorized_execution
1567 && agg.group_by.is_empty()
1568 && Self::count_expand_chain(&agg.input).0 >= 2
1569 && self.is_simple_aggregate(agg)
1570 && let Ok((op, cols)) = self.plan_factorized_aggregate(agg)
1571 {
1572 return Ok((op, cols));
1573 }
1574 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1577
1578 let mut variable_columns: HashMap<String, usize> = input_columns
1580 .iter()
1581 .enumerate()
1582 .map(|(i, name)| (name.clone(), i))
1583 .collect();
1584
1585 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1588
1589 for expr in &agg.group_by {
1591 if let LogicalExpression::Property { variable, property } = expr {
1592 let col_name = format!("{}_{}", variable, property);
1593 if !variable_columns.contains_key(&col_name) {
1594 property_projections.push((
1595 variable.clone(),
1596 property.clone(),
1597 col_name.clone(),
1598 ));
1599 variable_columns.insert(col_name, next_col_idx);
1600 next_col_idx += 1;
1601 }
1602 }
1603 }
1604
1605 for agg_expr in &agg.aggregates {
1607 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1608 let col_name = format!("{}_{}", variable, property);
1609 if !variable_columns.contains_key(&col_name) {
1610 property_projections.push((
1611 variable.clone(),
1612 property.clone(),
1613 col_name.clone(),
1614 ));
1615 variable_columns.insert(col_name, next_col_idx);
1616 next_col_idx += 1;
1617 }
1618 }
1619 }
1620
1621 if !property_projections.is_empty() {
1623 let mut projections = Vec::new();
1624 let mut output_types = Vec::new();
1625
1626 for (i, _) in input_columns.iter().enumerate() {
1629 projections.push(ProjectExpr::Column(i));
1630 output_types.push(LogicalType::Node);
1631 }
1632
1633 for (variable, property, _col_name) in &property_projections {
1635 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1636 Error::Internal(format!(
1637 "Variable '{}' not found for property projection",
1638 variable
1639 ))
1640 })?;
1641 projections.push(ProjectExpr::PropertyAccess {
1642 column: source_col,
1643 property: property.clone(),
1644 });
1645 output_types.push(LogicalType::Any); }
1647
1648 input_op = Box::new(ProjectOperator::with_store(
1649 input_op,
1650 projections,
1651 output_types,
1652 Arc::clone(&self.store),
1653 ));
1654 }
1655
1656 let group_columns: Vec<usize> = agg
1658 .group_by
1659 .iter()
1660 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1661 .collect::<Result<Vec<_>>>()?;
1662
1663 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1665 .aggregates
1666 .iter()
1667 .map(|agg_expr| {
1668 let column = agg_expr
1669 .expression
1670 .as_ref()
1671 .map(|e| {
1672 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1673 })
1674 .transpose()?;
1675
1676 Ok(PhysicalAggregateExpr {
1677 function: convert_aggregate_function(agg_expr.function),
1678 column,
1679 distinct: agg_expr.distinct,
1680 alias: agg_expr.alias.clone(),
1681 percentile: agg_expr.percentile,
1682 })
1683 })
1684 .collect::<Result<Vec<_>>>()?;
1685
1686 let mut output_schema = Vec::new();
1688 let mut output_columns = Vec::new();
1689
1690 for expr in &agg.group_by {
1692 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1694 }
1695
1696 for agg_expr in &agg.aggregates {
1698 let result_type = match agg_expr.function {
1699 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1700 LogicalType::Int64
1701 }
1702 LogicalAggregateFunction::Sum => LogicalType::Int64,
1703 LogicalAggregateFunction::Avg => LogicalType::Float64,
1704 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1705 LogicalType::Int64
1709 }
1710 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1713 | LogicalAggregateFunction::StdDevPop
1714 | LogicalAggregateFunction::PercentileDisc
1715 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1716 };
1717 output_schema.push(result_type);
1718 output_columns.push(
1719 agg_expr
1720 .alias
1721 .clone()
1722 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1723 );
1724 }
1725
1726 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1728 Box::new(SimpleAggregateOperator::new(
1729 input_op,
1730 physical_aggregates,
1731 output_schema,
1732 ))
1733 } else {
1734 Box::new(HashAggregateOperator::new(
1735 input_op,
1736 group_columns,
1737 physical_aggregates,
1738 output_schema,
1739 ))
1740 };
1741
1742 if let Some(having_expr) = &agg.having {
1744 let having_var_columns: HashMap<String, usize> = output_columns
1746 .iter()
1747 .enumerate()
1748 .map(|(i, name)| (name.clone(), i))
1749 .collect();
1750
1751 let filter_expr = self.convert_expression(having_expr)?;
1752 let predicate =
1753 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1754 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1755 }
1756
1757 Ok((operator, output_columns))
1758 }
1759
1760 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1766 agg.aggregates.iter().all(|agg_expr| {
1767 match agg_expr.function {
1768 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1769 agg_expr.expression.is_none()
1771 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1772 }
1773 LogicalAggregateFunction::Sum
1774 | LogicalAggregateFunction::Avg
1775 | LogicalAggregateFunction::Min
1776 | LogicalAggregateFunction::Max => {
1777 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1780 }
1781 _ => false,
1783 }
1784 })
1785 }
1786
1787 fn plan_factorized_aggregate(
1791 &self,
1792 agg: &AggregateOp,
1793 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1794 let expands = Self::collect_expand_chain(&agg.input);
1796 if expands.is_empty() {
1797 return Err(Error::Internal(
1798 "Expected expand chain for factorized aggregate".to_string(),
1799 ));
1800 }
1801
1802 let first_expand = expands[0];
1804 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1805
1806 let mut columns = base_columns.clone();
1807 let mut steps = Vec::new();
1808 let mut is_first = true;
1809
1810 for expand in &expands {
1811 let source_column = if is_first {
1813 base_columns
1814 .iter()
1815 .position(|c| c == &expand.from_variable)
1816 .ok_or_else(|| {
1817 Error::Internal(format!(
1818 "Source variable '{}' not found in base columns",
1819 expand.from_variable
1820 ))
1821 })?
1822 } else {
1823 1 };
1825
1826 let direction = match expand.direction {
1827 ExpandDirection::Outgoing => Direction::Outgoing,
1828 ExpandDirection::Incoming => Direction::Incoming,
1829 ExpandDirection::Both => Direction::Both,
1830 };
1831
1832 steps.push(ExpandStep {
1833 source_column,
1834 direction,
1835 edge_type: expand.edge_type.clone(),
1836 });
1837
1838 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1839 let count = self.anon_edge_counter.get();
1840 self.anon_edge_counter.set(count + 1);
1841 format!("_anon_edge_{}", count)
1842 });
1843 columns.push(edge_col_name);
1844 columns.push(expand.to_variable.clone());
1845
1846 is_first = false;
1847 }
1848
1849 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1851
1852 if let Some(tx_id) = self.tx_id {
1853 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1854 } else {
1855 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1856 }
1857
1858 let factorized_aggs: Vec<FactorizedAggregate> = agg
1860 .aggregates
1861 .iter()
1862 .map(|agg_expr| {
1863 match agg_expr.function {
1864 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1865 if agg_expr.expression.is_none() {
1867 FactorizedAggregate::count()
1868 } else {
1869 FactorizedAggregate::count_column(1) }
1873 }
1874 LogicalAggregateFunction::Sum => {
1875 FactorizedAggregate::sum(1)
1877 }
1878 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1879 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1880 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1881 _ => {
1882 FactorizedAggregate::count()
1884 }
1885 }
1886 })
1887 .collect();
1888
1889 let output_columns: Vec<String> = agg
1891 .aggregates
1892 .iter()
1893 .map(|agg_expr| {
1894 agg_expr
1895 .alias
1896 .clone()
1897 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1898 })
1899 .collect();
1900
1901 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1903
1904 Ok((Box::new(factorized_agg_op), output_columns))
1905 }
1906
1907 #[allow(dead_code)]
1909 fn resolve_expression_to_column(
1910 &self,
1911 expr: &LogicalExpression,
1912 variable_columns: &HashMap<String, usize>,
1913 ) -> Result<usize> {
1914 match expr {
1915 LogicalExpression::Variable(name) => variable_columns
1916 .get(name)
1917 .copied()
1918 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1919 LogicalExpression::Property { variable, .. } => variable_columns
1920 .get(variable)
1921 .copied()
1922 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1923 _ => Err(Error::Internal(format!(
1924 "Cannot resolve expression to column: {:?}",
1925 expr
1926 ))),
1927 }
1928 }
1929
1930 fn resolve_expression_to_column_with_properties(
1934 &self,
1935 expr: &LogicalExpression,
1936 variable_columns: &HashMap<String, usize>,
1937 ) -> Result<usize> {
1938 match expr {
1939 LogicalExpression::Variable(name) => variable_columns
1940 .get(name)
1941 .copied()
1942 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1943 LogicalExpression::Property { variable, property } => {
1944 let col_name = format!("{}_{}", variable, property);
1946 variable_columns.get(&col_name).copied().ok_or_else(|| {
1947 Error::Internal(format!(
1948 "Property column '{}' not found (from {}.{})",
1949 col_name, variable, property
1950 ))
1951 })
1952 }
1953 _ => Err(Error::Internal(format!(
1954 "Cannot resolve expression to column: {:?}",
1955 expr
1956 ))),
1957 }
1958 }
1959
1960 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1962 match expr {
1963 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1964 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1965 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1966 variable: variable.clone(),
1967 property: property.clone(),
1968 }),
1969 LogicalExpression::Binary { left, op, right } => {
1970 let left_expr = self.convert_expression(left)?;
1971 let right_expr = self.convert_expression(right)?;
1972 let filter_op = convert_binary_op(*op)?;
1973 Ok(FilterExpression::Binary {
1974 left: Box::new(left_expr),
1975 op: filter_op,
1976 right: Box::new(right_expr),
1977 })
1978 }
1979 LogicalExpression::Unary { op, operand } => {
1980 let operand_expr = self.convert_expression(operand)?;
1981 let filter_op = convert_unary_op(*op)?;
1982 Ok(FilterExpression::Unary {
1983 op: filter_op,
1984 operand: Box::new(operand_expr),
1985 })
1986 }
1987 LogicalExpression::FunctionCall { name, args, .. } => {
1988 let filter_args: Vec<FilterExpression> = args
1989 .iter()
1990 .map(|a| self.convert_expression(a))
1991 .collect::<Result<Vec<_>>>()?;
1992 Ok(FilterExpression::FunctionCall {
1993 name: name.clone(),
1994 args: filter_args,
1995 })
1996 }
1997 LogicalExpression::Case {
1998 operand,
1999 when_clauses,
2000 else_clause,
2001 } => {
2002 let filter_operand = operand
2003 .as_ref()
2004 .map(|e| self.convert_expression(e))
2005 .transpose()?
2006 .map(Box::new);
2007 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2008 .iter()
2009 .map(|(cond, result)| {
2010 Ok((
2011 self.convert_expression(cond)?,
2012 self.convert_expression(result)?,
2013 ))
2014 })
2015 .collect::<Result<Vec<_>>>()?;
2016 let filter_else = else_clause
2017 .as_ref()
2018 .map(|e| self.convert_expression(e))
2019 .transpose()?
2020 .map(Box::new);
2021 Ok(FilterExpression::Case {
2022 operand: filter_operand,
2023 when_clauses: filter_when_clauses,
2024 else_clause: filter_else,
2025 })
2026 }
2027 LogicalExpression::List(items) => {
2028 let filter_items: Vec<FilterExpression> = items
2029 .iter()
2030 .map(|item| self.convert_expression(item))
2031 .collect::<Result<Vec<_>>>()?;
2032 Ok(FilterExpression::List(filter_items))
2033 }
2034 LogicalExpression::Map(pairs) => {
2035 let filter_pairs: Vec<(String, FilterExpression)> = pairs
2036 .iter()
2037 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2038 .collect::<Result<Vec<_>>>()?;
2039 Ok(FilterExpression::Map(filter_pairs))
2040 }
2041 LogicalExpression::IndexAccess { base, index } => {
2042 let base_expr = self.convert_expression(base)?;
2043 let index_expr = self.convert_expression(index)?;
2044 Ok(FilterExpression::IndexAccess {
2045 base: Box::new(base_expr),
2046 index: Box::new(index_expr),
2047 })
2048 }
2049 LogicalExpression::SliceAccess { base, start, end } => {
2050 let base_expr = self.convert_expression(base)?;
2051 let start_expr = start
2052 .as_ref()
2053 .map(|s| self.convert_expression(s))
2054 .transpose()?
2055 .map(Box::new);
2056 let end_expr = end
2057 .as_ref()
2058 .map(|e| self.convert_expression(e))
2059 .transpose()?
2060 .map(Box::new);
2061 Ok(FilterExpression::SliceAccess {
2062 base: Box::new(base_expr),
2063 start: start_expr,
2064 end: end_expr,
2065 })
2066 }
2067 LogicalExpression::Parameter(_) => Err(Error::Internal(
2068 "Parameters not yet supported in filters".to_string(),
2069 )),
2070 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2071 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2072 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2073 LogicalExpression::ListComprehension {
2074 variable,
2075 list_expr,
2076 filter_expr,
2077 map_expr,
2078 } => {
2079 let list = self.convert_expression(list_expr)?;
2080 let filter = filter_expr
2081 .as_ref()
2082 .map(|f| self.convert_expression(f))
2083 .transpose()?
2084 .map(Box::new);
2085 let map = self.convert_expression(map_expr)?;
2086 Ok(FilterExpression::ListComprehension {
2087 variable: variable.clone(),
2088 list_expr: Box::new(list),
2089 filter_expr: filter,
2090 map_expr: Box::new(map),
2091 })
2092 }
2093 LogicalExpression::ExistsSubquery(subplan) => {
2094 let (start_var, direction, edge_type, end_labels) =
2097 self.extract_exists_pattern(subplan)?;
2098
2099 Ok(FilterExpression::ExistsSubquery {
2100 start_var,
2101 direction,
2102 edge_type,
2103 end_labels,
2104 min_hops: None,
2105 max_hops: None,
2106 })
2107 }
2108 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2109 "COUNT subqueries not yet supported".to_string(),
2110 )),
2111 }
2112 }
2113
2114 fn extract_exists_pattern(
2117 &self,
2118 subplan: &LogicalOperator,
2119 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2120 match subplan {
2121 LogicalOperator::Expand(expand) => {
2122 let end_labels = self.extract_end_labels_from_expand(expand);
2124 let direction = match expand.direction {
2125 ExpandDirection::Outgoing => Direction::Outgoing,
2126 ExpandDirection::Incoming => Direction::Incoming,
2127 ExpandDirection::Both => Direction::Both,
2128 };
2129 Ok((
2130 expand.from_variable.clone(),
2131 direction,
2132 expand.edge_type.clone(),
2133 end_labels,
2134 ))
2135 }
2136 LogicalOperator::NodeScan(scan) => {
2137 if let Some(input) = &scan.input {
2138 self.extract_exists_pattern(input)
2139 } else {
2140 Err(Error::Internal(
2141 "EXISTS subquery must contain an edge pattern".to_string(),
2142 ))
2143 }
2144 }
2145 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2146 _ => Err(Error::Internal(
2147 "Unsupported EXISTS subquery pattern".to_string(),
2148 )),
2149 }
2150 }
2151
2152 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2154 match expand.input.as_ref() {
2156 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2157 _ => None,
2158 }
2159 }
2160
2161 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2163 let (left_op, left_columns) = self.plan_operator(&join.left)?;
2164 let (right_op, right_columns) = self.plan_operator(&join.right)?;
2165
2166 let mut columns = left_columns.clone();
2168 columns.extend(right_columns.clone());
2169
2170 let physical_join_type = match join.join_type {
2172 JoinType::Inner => PhysicalJoinType::Inner,
2173 JoinType::Left => PhysicalJoinType::Left,
2174 JoinType::Right => PhysicalJoinType::Right,
2175 JoinType::Full => PhysicalJoinType::Full,
2176 JoinType::Cross => PhysicalJoinType::Cross,
2177 JoinType::Semi => PhysicalJoinType::Semi,
2178 JoinType::Anti => PhysicalJoinType::Anti,
2179 };
2180
2181 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2183 (vec![], vec![])
2185 } else {
2186 join.conditions
2187 .iter()
2188 .filter_map(|cond| {
2189 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2191 let right_idx = self
2192 .expression_to_column(&cond.right, &right_columns)
2193 .ok()?;
2194 Some((left_idx, right_idx))
2195 })
2196 .unzip()
2197 };
2198
2199 let output_schema = self.derive_schema_from_columns(&columns);
2200
2201 let _ = LeapfrogJoinOperator::new; let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2209 left_op,
2210 right_op,
2211 probe_keys,
2212 build_keys,
2213 physical_join_type,
2214 output_schema,
2215 ));
2216
2217 Ok((operator, columns))
2218 }
2219
2220 #[allow(dead_code)]
2229 fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2230 let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2232 let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2233
2234 Self::collect_join_edges(
2236 &LogicalOperator::Join(join.clone()),
2237 &mut edges,
2238 &mut all_vars,
2239 );
2240
2241 if all_vars.len() < 3 {
2243 return false;
2244 }
2245
2246 Self::has_cycle(&edges, &all_vars)
2248 }
2249
2250 fn collect_join_edges(
2252 op: &LogicalOperator,
2253 edges: &mut HashMap<String, Vec<String>>,
2254 vars: &mut std::collections::HashSet<String>,
2255 ) {
2256 match op {
2257 LogicalOperator::Join(join) => {
2258 for cond in &join.conditions {
2260 if let (Some(left_var), Some(right_var)) = (
2261 Self::extract_join_variable(&cond.left),
2262 Self::extract_join_variable(&cond.right),
2263 ) && left_var != right_var
2264 {
2265 vars.insert(left_var.clone());
2266 vars.insert(right_var.clone());
2267
2268 edges
2270 .entry(left_var.clone())
2271 .or_default()
2272 .push(right_var.clone());
2273 edges.entry(right_var).or_default().push(left_var);
2274 }
2275 }
2276
2277 Self::collect_join_edges(&join.left, edges, vars);
2279 Self::collect_join_edges(&join.right, edges, vars);
2280 }
2281 LogicalOperator::Expand(expand) => {
2282 vars.insert(expand.from_variable.clone());
2284 vars.insert(expand.to_variable.clone());
2285
2286 edges
2287 .entry(expand.from_variable.clone())
2288 .or_default()
2289 .push(expand.to_variable.clone());
2290 edges
2291 .entry(expand.to_variable.clone())
2292 .or_default()
2293 .push(expand.from_variable.clone());
2294
2295 Self::collect_join_edges(&expand.input, edges, vars);
2296 }
2297 LogicalOperator::Filter(filter) => {
2298 Self::collect_join_edges(&filter.input, edges, vars);
2299 }
2300 LogicalOperator::NodeScan(scan) => {
2301 vars.insert(scan.variable.clone());
2302 }
2303 _ => {}
2304 }
2305 }
2306
2307 fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2309 match expr {
2310 LogicalExpression::Variable(v) => Some(v.clone()),
2311 LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2312 LogicalExpression::Id(v) => Some(v.clone()),
2313 _ => None,
2314 }
2315 }
2316
2317 fn has_cycle(
2321 edges: &HashMap<String, Vec<String>>,
2322 vars: &std::collections::HashSet<String>,
2323 ) -> bool {
2324 let mut color: HashMap<&String, u8> = HashMap::new();
2325
2326 for var in vars {
2327 color.insert(var, 0);
2328 }
2329
2330 for start in vars {
2331 if color[start] == 0 && Self::dfs_cycle(start, None, edges, &mut color) {
2332 return true;
2333 }
2334 }
2335
2336 false
2337 }
2338
2339 fn dfs_cycle(
2341 node: &String,
2342 parent: Option<&String>,
2343 edges: &HashMap<String, Vec<String>>,
2344 color: &mut HashMap<&String, u8>,
2345 ) -> bool {
2346 *color.get_mut(node).unwrap() = 1; if let Some(neighbors) = edges.get(node) {
2349 for neighbor in neighbors {
2350 if parent == Some(neighbor) {
2352 continue;
2353 }
2354
2355 if let Some(&c) = color.get(neighbor) {
2356 if c == 1 {
2357 return true;
2359 }
2360 if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2361 return true;
2362 }
2363 }
2364 }
2365 }
2366
2367 *color.get_mut(node).unwrap() = 2; false
2369 }
2370
2371 #[allow(dead_code)]
2373 fn count_relations(op: &LogicalOperator) -> usize {
2374 match op {
2375 LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2376 LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2377 LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2378 LogicalOperator::Join(j) => {
2379 Self::count_relations(&j.left) + Self::count_relations(&j.right)
2380 }
2381 _ => 0,
2382 }
2383 }
2384
2385 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2387 match expr {
2388 LogicalExpression::Variable(name) => columns
2389 .iter()
2390 .position(|c| c == name)
2391 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2392 _ => Err(Error::Internal(
2393 "Only variables supported in join conditions".to_string(),
2394 )),
2395 }
2396 }
2397
2398 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2400 if union.inputs.is_empty() {
2401 return Err(Error::Internal(
2402 "Union requires at least one input".to_string(),
2403 ));
2404 }
2405
2406 let mut inputs = Vec::with_capacity(union.inputs.len());
2407 let mut columns = Vec::new();
2408
2409 for (i, input) in union.inputs.iter().enumerate() {
2410 let (op, cols) = self.plan_operator(input)?;
2411 if i == 0 {
2412 columns = cols;
2413 }
2414 inputs.push(op);
2415 }
2416
2417 let output_schema = self.derive_schema_from_columns(&columns);
2418 let operator = Box::new(UnionOperator::new(inputs, output_schema));
2419
2420 Ok((operator, columns))
2421 }
2422
2423 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2425 let (input_op, columns) = self.plan_operator(&distinct.input)?;
2426 let output_schema = self.derive_schema_from_columns(&columns);
2427 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2428 Ok((operator, columns))
2429 }
2430
2431 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2433 let (input_op, mut columns) = if let Some(ref input) = create.input {
2435 let (op, cols) = self.plan_operator(input)?;
2436 (Some(op), cols)
2437 } else {
2438 (None, vec![])
2439 };
2440
2441 let output_column = columns.len();
2443 columns.push(create.variable.clone());
2444
2445 let properties: Vec<(String, PropertySource)> = create
2447 .properties
2448 .iter()
2449 .map(|(name, expr)| {
2450 let source = match Self::try_fold_expression(expr) {
2451 Some(value) => PropertySource::Constant(value),
2452 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2453 };
2454 (name.clone(), source)
2455 })
2456 .collect();
2457
2458 let output_schema = self.derive_schema_from_columns(&columns);
2459
2460 let operator = Box::new(
2461 CreateNodeOperator::new(
2462 Arc::clone(&self.store),
2463 input_op,
2464 create.labels.clone(),
2465 properties,
2466 output_schema,
2467 output_column,
2468 )
2469 .with_tx_context(self.viewing_epoch, self.tx_id),
2470 );
2471
2472 Ok((operator, columns))
2473 }
2474
2475 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2477 let (input_op, mut columns) = self.plan_operator(&create.input)?;
2478
2479 let from_column = columns
2481 .iter()
2482 .position(|c| c == &create.from_variable)
2483 .ok_or_else(|| {
2484 Error::Internal(format!(
2485 "Source variable '{}' not found",
2486 create.from_variable
2487 ))
2488 })?;
2489
2490 let to_column = columns
2491 .iter()
2492 .position(|c| c == &create.to_variable)
2493 .ok_or_else(|| {
2494 Error::Internal(format!(
2495 "Target variable '{}' not found",
2496 create.to_variable
2497 ))
2498 })?;
2499
2500 let output_column = create.variable.as_ref().map(|v| {
2502 let idx = columns.len();
2503 columns.push(v.clone());
2504 idx
2505 });
2506
2507 let properties: Vec<(String, PropertySource)> = create
2509 .properties
2510 .iter()
2511 .map(|(name, expr)| {
2512 let source = match Self::try_fold_expression(expr) {
2513 Some(value) => PropertySource::Constant(value),
2514 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2515 };
2516 (name.clone(), source)
2517 })
2518 .collect();
2519
2520 let output_schema = self.derive_schema_from_columns(&columns);
2521
2522 let mut operator = CreateEdgeOperator::new(
2523 Arc::clone(&self.store),
2524 input_op,
2525 from_column,
2526 to_column,
2527 create.edge_type.clone(),
2528 output_schema,
2529 )
2530 .with_properties(properties)
2531 .with_tx_context(self.viewing_epoch, self.tx_id);
2532
2533 if let Some(col) = output_column {
2534 operator = operator.with_output_column(col);
2535 }
2536
2537 let operator = Box::new(operator);
2538
2539 Ok((operator, columns))
2540 }
2541
2542 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2544 let (input_op, columns) = self.plan_operator(&delete.input)?;
2545
2546 let node_column = columns
2547 .iter()
2548 .position(|c| c == &delete.variable)
2549 .ok_or_else(|| {
2550 Error::Internal(format!(
2551 "Variable '{}' not found for delete",
2552 delete.variable
2553 ))
2554 })?;
2555
2556 let output_schema = vec![LogicalType::Int64];
2558 let output_columns = vec!["deleted_count".to_string()];
2559
2560 let operator = Box::new(
2561 DeleteNodeOperator::new(
2562 Arc::clone(&self.store),
2563 input_op,
2564 node_column,
2565 output_schema,
2566 delete.detach, )
2568 .with_tx_context(self.viewing_epoch, self.tx_id),
2569 );
2570
2571 Ok((operator, output_columns))
2572 }
2573
2574 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2576 let (input_op, columns) = self.plan_operator(&delete.input)?;
2577
2578 let edge_column = columns
2579 .iter()
2580 .position(|c| c == &delete.variable)
2581 .ok_or_else(|| {
2582 Error::Internal(format!(
2583 "Variable '{}' not found for delete",
2584 delete.variable
2585 ))
2586 })?;
2587
2588 let output_schema = vec![LogicalType::Int64];
2590 let output_columns = vec!["deleted_count".to_string()];
2591
2592 let operator = Box::new(
2593 DeleteEdgeOperator::new(
2594 Arc::clone(&self.store),
2595 input_op,
2596 edge_column,
2597 output_schema,
2598 )
2599 .with_tx_context(self.viewing_epoch, self.tx_id),
2600 );
2601
2602 Ok((operator, output_columns))
2603 }
2604
2605 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2607 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2608 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2609
2610 let mut columns = left_columns.clone();
2612 columns.extend(right_columns.clone());
2613
2614 let mut probe_keys = Vec::new();
2616 let mut build_keys = Vec::new();
2617
2618 for (right_idx, right_col) in right_columns.iter().enumerate() {
2619 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2620 probe_keys.push(left_idx);
2621 build_keys.push(right_idx);
2622 }
2623 }
2624
2625 let output_schema = self.derive_schema_from_columns(&columns);
2626
2627 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2628 left_op,
2629 right_op,
2630 probe_keys,
2631 build_keys,
2632 PhysicalJoinType::Left,
2633 output_schema,
2634 ));
2635
2636 Ok((operator, columns))
2637 }
2638
2639 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2641 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2642 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2643
2644 let columns = left_columns.clone();
2646
2647 let mut probe_keys = Vec::new();
2649 let mut build_keys = Vec::new();
2650
2651 for (right_idx, right_col) in right_columns.iter().enumerate() {
2652 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2653 probe_keys.push(left_idx);
2654 build_keys.push(right_idx);
2655 }
2656 }
2657
2658 let output_schema = self.derive_schema_from_columns(&columns);
2659
2660 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2661 left_op,
2662 right_op,
2663 probe_keys,
2664 build_keys,
2665 PhysicalJoinType::Anti,
2666 output_schema,
2667 ));
2668
2669 Ok((operator, columns))
2670 }
2671
2672 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2674 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2677 if matches!(&*unwind.input, LogicalOperator::Empty) {
2678 let literal_list = self.convert_expression(&unwind.expression)?;
2683
2684 let single_row_op: Box<dyn Operator> = Box::new(
2686 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2687 );
2688 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2689 single_row_op,
2690 vec![ProjectExpr::Expression {
2691 expr: literal_list,
2692 variable_columns: HashMap::new(),
2693 }],
2694 vec![LogicalType::Any],
2695 Arc::clone(&self.store),
2696 ));
2697
2698 (project_op, vec!["__list__".to_string()])
2699 } else {
2700 self.plan_operator(&unwind.input)?
2701 };
2702
2703 let list_col_idx = match &unwind.expression {
2709 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2710 LogicalExpression::Property { variable, .. } => {
2711 input_columns.iter().position(|c| c == variable)
2714 }
2715 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2716 None
2718 }
2719 _ => None,
2720 };
2721
2722 let mut columns = input_columns.clone();
2724 columns.push(unwind.variable.clone());
2725
2726 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2728 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2733
2734 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2735 input_op,
2736 col_idx,
2737 unwind.variable.clone(),
2738 output_schema,
2739 ));
2740
2741 Ok((operator, columns))
2742 }
2743
2744 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2746 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2748 Vec::new()
2749 } else {
2750 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2751 cols
2752 };
2753
2754 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2756 .match_properties
2757 .iter()
2758 .filter_map(|(name, expr)| {
2759 if let LogicalExpression::Literal(v) = expr {
2760 Some((name.clone(), v.clone()))
2761 } else {
2762 None }
2764 })
2765 .collect();
2766
2767 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2769 .on_create
2770 .iter()
2771 .filter_map(|(name, expr)| {
2772 if let LogicalExpression::Literal(v) = expr {
2773 Some((name.clone(), v.clone()))
2774 } else {
2775 None
2776 }
2777 })
2778 .collect();
2779
2780 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2782 .on_match
2783 .iter()
2784 .filter_map(|(name, expr)| {
2785 if let LogicalExpression::Literal(v) = expr {
2786 Some((name.clone(), v.clone()))
2787 } else {
2788 None
2789 }
2790 })
2791 .collect();
2792
2793 columns.push(merge.variable.clone());
2795
2796 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2797 Arc::clone(&self.store),
2798 merge.variable.clone(),
2799 merge.labels.clone(),
2800 match_properties,
2801 on_create_properties,
2802 on_match_properties,
2803 ));
2804
2805 Ok((operator, columns))
2806 }
2807
2808 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2810 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2812
2813 let source_column = columns
2815 .iter()
2816 .position(|c| c == &sp.source_var)
2817 .ok_or_else(|| {
2818 Error::Internal(format!(
2819 "Source variable '{}' not found for shortestPath",
2820 sp.source_var
2821 ))
2822 })?;
2823
2824 let target_column = columns
2825 .iter()
2826 .position(|c| c == &sp.target_var)
2827 .ok_or_else(|| {
2828 Error::Internal(format!(
2829 "Target variable '{}' not found for shortestPath",
2830 sp.target_var
2831 ))
2832 })?;
2833
2834 let direction = match sp.direction {
2836 ExpandDirection::Outgoing => Direction::Outgoing,
2837 ExpandDirection::Incoming => Direction::Incoming,
2838 ExpandDirection::Both => Direction::Both,
2839 };
2840
2841 let operator: Box<dyn Operator> = Box::new(
2843 ShortestPathOperator::new(
2844 Arc::clone(&self.store),
2845 input_op,
2846 source_column,
2847 target_column,
2848 sp.edge_type.clone(),
2849 direction,
2850 )
2851 .with_all_paths(sp.all_paths),
2852 );
2853
2854 columns.push(format!("_path_length_{}", sp.path_alias));
2857
2858 Ok((operator, columns))
2859 }
2860
2861 fn plan_call_procedure(
2863 &self,
2864 call: &CallProcedureOp,
2865 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2866 use crate::procedures::{self, BuiltinProcedures};
2867
2868 static PROCEDURES: std::sync::OnceLock<BuiltinProcedures> = std::sync::OnceLock::new();
2869 let registry = PROCEDURES.get_or_init(BuiltinProcedures::new);
2870
2871 let resolved_name = call.name.join(".");
2873 if resolved_name == "grafeo.procedures" || resolved_name == "procedures" {
2874 let result = procedures::procedures_result(registry);
2875 return self.plan_static_result(result, &call.yield_items);
2876 }
2877
2878 let algorithm = registry.get(&call.name).ok_or_else(|| {
2880 Error::Internal(format!(
2881 "Unknown procedure: '{}'. Use CALL grafeo.procedures() to list available procedures.",
2882 call.name.join(".")
2883 ))
2884 })?;
2885
2886 let params = procedures::evaluate_arguments(&call.arguments, algorithm.parameters());
2888
2889 let canonical_columns = procedures::output_columns_for_name(algorithm.as_ref());
2891
2892 let yield_columns = call.yield_items.as_ref().map(|items| {
2894 items
2895 .iter()
2896 .map(|item| (item.field_name.clone(), item.alias.clone()))
2897 .collect::<Vec<_>>()
2898 });
2899
2900 let output_columns = if let Some(yield_cols) = &yield_columns {
2901 yield_cols
2902 .iter()
2903 .map(|(name, alias)| alias.clone().unwrap_or_else(|| name.clone()))
2904 .collect()
2905 } else {
2906 canonical_columns.clone()
2907 };
2908
2909 let operator = Box::new(
2910 crate::query::executor::procedure_call::ProcedureCallOperator::new(
2911 Arc::clone(&self.store),
2912 algorithm,
2913 params,
2914 yield_columns,
2915 canonical_columns,
2916 ),
2917 );
2918
2919 Ok((operator, output_columns))
2920 }
2921
2922 fn plan_static_result(
2924 &self,
2925 result: grafeo_adapters::plugins::AlgorithmResult,
2926 yield_items: &Option<Vec<crate::query::plan::ProcedureYield>>,
2927 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2928 let (output_columns, column_indices) = if let Some(items) = yield_items {
2930 let mut cols = Vec::new();
2931 let mut indices = Vec::new();
2932 for item in items {
2933 let idx = result
2934 .columns
2935 .iter()
2936 .position(|c| c == &item.field_name)
2937 .ok_or_else(|| {
2938 Error::Internal(format!(
2939 "YIELD column '{}' not found (available: {})",
2940 item.field_name,
2941 result.columns.join(", ")
2942 ))
2943 })?;
2944 indices.push(idx);
2945 cols.push(
2946 item.alias
2947 .clone()
2948 .unwrap_or_else(|| item.field_name.clone()),
2949 );
2950 }
2951 (cols, indices)
2952 } else {
2953 let indices: Vec<usize> = (0..result.columns.len()).collect();
2954 (result.columns.clone(), indices)
2955 };
2956
2957 let operator = Box::new(StaticResultOperator {
2958 rows: result.rows,
2959 column_indices,
2960 row_index: 0,
2961 });
2962
2963 Ok((operator, output_columns))
2964 }
2965
2966 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2968 let (input_op, columns) = self.plan_operator(&add_label.input)?;
2969
2970 let node_column = columns
2972 .iter()
2973 .position(|c| c == &add_label.variable)
2974 .ok_or_else(|| {
2975 Error::Internal(format!(
2976 "Variable '{}' not found for ADD LABEL",
2977 add_label.variable
2978 ))
2979 })?;
2980
2981 let output_schema = vec![LogicalType::Int64];
2983 let output_columns = vec!["labels_added".to_string()];
2984
2985 let operator = Box::new(AddLabelOperator::new(
2986 Arc::clone(&self.store),
2987 input_op,
2988 node_column,
2989 add_label.labels.clone(),
2990 output_schema,
2991 ));
2992
2993 Ok((operator, output_columns))
2994 }
2995
2996 fn plan_remove_label(
2998 &self,
2999 remove_label: &RemoveLabelOp,
3000 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3001 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
3002
3003 let node_column = columns
3005 .iter()
3006 .position(|c| c == &remove_label.variable)
3007 .ok_or_else(|| {
3008 Error::Internal(format!(
3009 "Variable '{}' not found for REMOVE LABEL",
3010 remove_label.variable
3011 ))
3012 })?;
3013
3014 let output_schema = vec![LogicalType::Int64];
3016 let output_columns = vec!["labels_removed".to_string()];
3017
3018 let operator = Box::new(RemoveLabelOperator::new(
3019 Arc::clone(&self.store),
3020 input_op,
3021 node_column,
3022 remove_label.labels.clone(),
3023 output_schema,
3024 ));
3025
3026 Ok((operator, output_columns))
3027 }
3028
3029 fn plan_set_property(
3031 &self,
3032 set_prop: &SetPropertyOp,
3033 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3034 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
3035
3036 let entity_column = columns
3038 .iter()
3039 .position(|c| c == &set_prop.variable)
3040 .ok_or_else(|| {
3041 Error::Internal(format!(
3042 "Variable '{}' not found for SET",
3043 set_prop.variable
3044 ))
3045 })?;
3046
3047 let properties: Vec<(String, PropertySource)> = set_prop
3049 .properties
3050 .iter()
3051 .map(|(name, expr)| {
3052 let source = self.expression_to_property_source(expr, &columns)?;
3053 Ok((name.clone(), source))
3054 })
3055 .collect::<Result<Vec<_>>>()?;
3056
3057 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
3059 let output_columns = columns.clone();
3060
3061 let operator = Box::new(SetPropertyOperator::new_for_node(
3063 Arc::clone(&self.store),
3064 input_op,
3065 entity_column,
3066 properties,
3067 output_schema,
3068 ));
3069
3070 Ok((operator, output_columns))
3071 }
3072
3073 fn expression_to_property_source(
3075 &self,
3076 expr: &LogicalExpression,
3077 columns: &[String],
3078 ) -> Result<PropertySource> {
3079 match expr {
3080 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
3081 LogicalExpression::Variable(name) => {
3082 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
3083 Error::Internal(format!("Variable '{}' not found for property source", name))
3084 })?;
3085 Ok(PropertySource::Column(col_idx))
3086 }
3087 LogicalExpression::Parameter(name) => {
3088 Ok(PropertySource::Constant(
3091 grafeo_common::types::Value::String(format!("${}", name).into()),
3092 ))
3093 }
3094 _ => {
3095 if let Some(value) = Self::try_fold_expression(expr) {
3096 Ok(PropertySource::Constant(value))
3097 } else {
3098 Err(Error::Internal(format!(
3099 "Unsupported expression type for property source: {:?}",
3100 expr
3101 )))
3102 }
3103 }
3104 }
3105 }
3106
3107 fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
3113 match expr {
3114 LogicalExpression::Literal(v) => Some(v.clone()),
3115 LogicalExpression::List(items) => {
3116 let values: Option<Vec<Value>> =
3117 items.iter().map(Self::try_fold_expression).collect();
3118 let values = values?;
3119 let all_numeric = !values.is_empty()
3121 && values
3122 .iter()
3123 .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
3124 if all_numeric {
3125 let floats: Vec<f32> = values
3126 .iter()
3127 .filter_map(|v| match v {
3128 Value::Float64(f) => Some(*f as f32),
3129 Value::Int64(i) => Some(*i as f32),
3130 _ => None,
3131 })
3132 .collect();
3133 Some(Value::Vector(floats.into()))
3134 } else {
3135 Some(Value::List(values.into()))
3136 }
3137 }
3138 LogicalExpression::FunctionCall { name, args, .. } => {
3139 match name.to_lowercase().as_str() {
3140 "vector" => {
3141 if args.len() != 1 {
3142 return None;
3143 }
3144 let val = Self::try_fold_expression(&args[0])?;
3145 match val {
3146 Value::List(items) => {
3147 let floats: Vec<f32> = items
3148 .iter()
3149 .filter_map(|v| match v {
3150 Value::Float64(f) => Some(*f as f32),
3151 Value::Int64(i) => Some(*i as f32),
3152 _ => None,
3153 })
3154 .collect();
3155 if floats.len() == items.len() {
3156 Some(Value::Vector(floats.into()))
3157 } else {
3158 None
3159 }
3160 }
3161 Value::Vector(v) => Some(Value::Vector(v)),
3163 _ => None,
3164 }
3165 }
3166 _ => None,
3167 }
3168 }
3169 _ => None,
3170 }
3171 }
3172}
3173
3174pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3176 match op {
3177 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3178 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3179 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3180 BinaryOp::Le => Ok(BinaryFilterOp::Le),
3181 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3182 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3183 BinaryOp::And => Ok(BinaryFilterOp::And),
3184 BinaryOp::Or => Ok(BinaryFilterOp::Or),
3185 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3186 BinaryOp::Add => Ok(BinaryFilterOp::Add),
3187 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3188 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3189 BinaryOp::Div => Ok(BinaryFilterOp::Div),
3190 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3191 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3192 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3193 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3194 BinaryOp::In => Ok(BinaryFilterOp::In),
3195 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3196 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3197 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3198 "Binary operator {:?} not yet supported in filters",
3199 op
3200 ))),
3201 }
3202}
3203
3204pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3206 match op {
3207 UnaryOp::Not => Ok(UnaryFilterOp::Not),
3208 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3209 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3210 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3211 }
3212}
3213
3214pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3216 match func {
3217 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3218 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3219 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3220 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3221 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3222 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3223 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3224 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3225 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3226 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3227 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3228 }
3229}
3230
3231pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3235 match expr {
3236 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3237 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3238 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3239 variable: variable.clone(),
3240 property: property.clone(),
3241 }),
3242 LogicalExpression::Binary { left, op, right } => {
3243 let left_expr = convert_filter_expression(left)?;
3244 let right_expr = convert_filter_expression(right)?;
3245 let filter_op = convert_binary_op(*op)?;
3246 Ok(FilterExpression::Binary {
3247 left: Box::new(left_expr),
3248 op: filter_op,
3249 right: Box::new(right_expr),
3250 })
3251 }
3252 LogicalExpression::Unary { op, operand } => {
3253 let operand_expr = convert_filter_expression(operand)?;
3254 let filter_op = convert_unary_op(*op)?;
3255 Ok(FilterExpression::Unary {
3256 op: filter_op,
3257 operand: Box::new(operand_expr),
3258 })
3259 }
3260 LogicalExpression::FunctionCall { name, args, .. } => {
3261 let filter_args: Vec<FilterExpression> = args
3262 .iter()
3263 .map(convert_filter_expression)
3264 .collect::<Result<Vec<_>>>()?;
3265 Ok(FilterExpression::FunctionCall {
3266 name: name.clone(),
3267 args: filter_args,
3268 })
3269 }
3270 LogicalExpression::Case {
3271 operand,
3272 when_clauses,
3273 else_clause,
3274 } => {
3275 let filter_operand = operand
3276 .as_ref()
3277 .map(|e| convert_filter_expression(e))
3278 .transpose()?
3279 .map(Box::new);
3280 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3281 .iter()
3282 .map(|(cond, result)| {
3283 Ok((
3284 convert_filter_expression(cond)?,
3285 convert_filter_expression(result)?,
3286 ))
3287 })
3288 .collect::<Result<Vec<_>>>()?;
3289 let filter_else = else_clause
3290 .as_ref()
3291 .map(|e| convert_filter_expression(e))
3292 .transpose()?
3293 .map(Box::new);
3294 Ok(FilterExpression::Case {
3295 operand: filter_operand,
3296 when_clauses: filter_when_clauses,
3297 else_clause: filter_else,
3298 })
3299 }
3300 LogicalExpression::List(items) => {
3301 let filter_items: Vec<FilterExpression> = items
3302 .iter()
3303 .map(convert_filter_expression)
3304 .collect::<Result<Vec<_>>>()?;
3305 Ok(FilterExpression::List(filter_items))
3306 }
3307 LogicalExpression::Map(pairs) => {
3308 let filter_pairs: Vec<(String, FilterExpression)> = pairs
3309 .iter()
3310 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3311 .collect::<Result<Vec<_>>>()?;
3312 Ok(FilterExpression::Map(filter_pairs))
3313 }
3314 LogicalExpression::IndexAccess { base, index } => {
3315 let base_expr = convert_filter_expression(base)?;
3316 let index_expr = convert_filter_expression(index)?;
3317 Ok(FilterExpression::IndexAccess {
3318 base: Box::new(base_expr),
3319 index: Box::new(index_expr),
3320 })
3321 }
3322 LogicalExpression::SliceAccess { base, start, end } => {
3323 let base_expr = convert_filter_expression(base)?;
3324 let start_expr = start
3325 .as_ref()
3326 .map(|s| convert_filter_expression(s))
3327 .transpose()?
3328 .map(Box::new);
3329 let end_expr = end
3330 .as_ref()
3331 .map(|e| convert_filter_expression(e))
3332 .transpose()?
3333 .map(Box::new);
3334 Ok(FilterExpression::SliceAccess {
3335 base: Box::new(base_expr),
3336 start: start_expr,
3337 end: end_expr,
3338 })
3339 }
3340 LogicalExpression::Parameter(_) => Err(Error::Internal(
3341 "Parameters not yet supported in filters".to_string(),
3342 )),
3343 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3344 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3345 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3346 LogicalExpression::ListComprehension {
3347 variable,
3348 list_expr,
3349 filter_expr,
3350 map_expr,
3351 } => {
3352 let list = convert_filter_expression(list_expr)?;
3353 let filter = filter_expr
3354 .as_ref()
3355 .map(|f| convert_filter_expression(f))
3356 .transpose()?
3357 .map(Box::new);
3358 let map = convert_filter_expression(map_expr)?;
3359 Ok(FilterExpression::ListComprehension {
3360 variable: variable.clone(),
3361 list_expr: Box::new(list),
3362 filter_expr: filter,
3363 map_expr: Box::new(map),
3364 })
3365 }
3366 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3367 Error::Internal("Subqueries not yet supported in filters".to_string()),
3368 ),
3369 }
3370}
3371
3372fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3374 use grafeo_common::types::Value;
3375 match value {
3376 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
3378 Value::Int64(_) => LogicalType::Int64,
3379 Value::Float64(_) => LogicalType::Float64,
3380 Value::String(_) => LogicalType::String,
3381 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
3383 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
3386 }
3387}
3388
3389fn expression_to_string(expr: &LogicalExpression) -> String {
3391 match expr {
3392 LogicalExpression::Variable(name) => name.clone(),
3393 LogicalExpression::Property { variable, property } => {
3394 format!("{variable}.{property}")
3395 }
3396 LogicalExpression::Literal(value) => format!("{value:?}"),
3397 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3398 _ => "expr".to_string(),
3399 }
3400}
3401
3402pub struct PhysicalPlan {
3404 pub operator: Box<dyn Operator>,
3406 pub columns: Vec<String>,
3408 pub adaptive_context: Option<AdaptiveContext>,
3414}
3415
3416impl PhysicalPlan {
3417 #[must_use]
3419 pub fn columns(&self) -> &[String] {
3420 &self.columns
3421 }
3422
3423 pub fn into_operator(self) -> Box<dyn Operator> {
3425 self.operator
3426 }
3427
3428 #[must_use]
3430 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3431 self.adaptive_context.as_ref()
3432 }
3433
3434 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3436 self.adaptive_context.take()
3437 }
3438}
3439
3440#[allow(dead_code)]
3444struct SingleResultOperator {
3445 result: Option<grafeo_core::execution::DataChunk>,
3446}
3447
3448impl SingleResultOperator {
3449 #[allow(dead_code)]
3450 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3451 Self { result }
3452 }
3453}
3454
3455impl Operator for SingleResultOperator {
3456 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3457 Ok(self.result.take())
3458 }
3459
3460 fn reset(&mut self) {
3461 }
3463
3464 fn name(&self) -> &'static str {
3465 "SingleResult"
3466 }
3467}
3468
3469struct StaticResultOperator {
3471 rows: Vec<Vec<Value>>,
3472 column_indices: Vec<usize>,
3473 row_index: usize,
3474}
3475
3476impl Operator for StaticResultOperator {
3477 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3478 use grafeo_core::execution::DataChunk;
3479
3480 if self.row_index >= self.rows.len() {
3481 return Ok(None);
3482 }
3483
3484 let remaining = self.rows.len() - self.row_index;
3485 let chunk_rows = remaining.min(1024);
3486 let col_count = self.column_indices.len();
3487
3488 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
3489 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
3490
3491 for row_offset in 0..chunk_rows {
3492 let row = &self.rows[self.row_index + row_offset];
3493 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
3494 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
3495 if let Some(col) = chunk.column_mut(col_idx) {
3496 col.push_value(value);
3497 }
3498 }
3499 }
3500 chunk.set_count(chunk_rows);
3501
3502 self.row_index += chunk_rows;
3503 Ok(Some(chunk))
3504 }
3505
3506 fn reset(&mut self) {
3507 self.row_index = 0;
3508 }
3509
3510 fn name(&self) -> &'static str {
3511 "StaticResult"
3512 }
3513}
3514
3515#[cfg(test)]
3516mod tests {
3517 use super::*;
3518 use crate::query::plan::{
3519 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3520 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3521 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3522 SortKey, SortOp,
3523 };
3524 use grafeo_common::types::Value;
3525
3526 fn create_test_store() -> Arc<LpgStore> {
3527 let store = Arc::new(LpgStore::new());
3528 store.create_node(&["Person"]);
3529 store.create_node(&["Person"]);
3530 store.create_node(&["Company"]);
3531 store
3532 }
3533
3534 #[test]
3537 fn test_plan_simple_scan() {
3538 let store = create_test_store();
3539 let planner = Planner::new(store);
3540
3541 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3543 items: vec![ReturnItem {
3544 expression: LogicalExpression::Variable("n".to_string()),
3545 alias: None,
3546 }],
3547 distinct: false,
3548 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3549 variable: "n".to_string(),
3550 label: Some("Person".to_string()),
3551 input: None,
3552 })),
3553 }));
3554
3555 let physical = planner.plan(&logical).unwrap();
3556 assert_eq!(physical.columns(), &["n"]);
3557 }
3558
3559 #[test]
3560 fn test_plan_scan_without_label() {
3561 let store = create_test_store();
3562 let planner = Planner::new(store);
3563
3564 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3566 items: vec![ReturnItem {
3567 expression: LogicalExpression::Variable("n".to_string()),
3568 alias: None,
3569 }],
3570 distinct: false,
3571 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3572 variable: "n".to_string(),
3573 label: None,
3574 input: None,
3575 })),
3576 }));
3577
3578 let physical = planner.plan(&logical).unwrap();
3579 assert_eq!(physical.columns(), &["n"]);
3580 }
3581
3582 #[test]
3583 fn test_plan_return_with_alias() {
3584 let store = create_test_store();
3585 let planner = Planner::new(store);
3586
3587 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3589 items: vec![ReturnItem {
3590 expression: LogicalExpression::Variable("n".to_string()),
3591 alias: Some("person".to_string()),
3592 }],
3593 distinct: false,
3594 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3595 variable: "n".to_string(),
3596 label: Some("Person".to_string()),
3597 input: None,
3598 })),
3599 }));
3600
3601 let physical = planner.plan(&logical).unwrap();
3602 assert_eq!(physical.columns(), &["person"]);
3603 }
3604
3605 #[test]
3606 fn test_plan_return_property() {
3607 let store = create_test_store();
3608 let planner = Planner::new(store);
3609
3610 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3612 items: vec![ReturnItem {
3613 expression: LogicalExpression::Property {
3614 variable: "n".to_string(),
3615 property: "name".to_string(),
3616 },
3617 alias: None,
3618 }],
3619 distinct: false,
3620 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3621 variable: "n".to_string(),
3622 label: Some("Person".to_string()),
3623 input: None,
3624 })),
3625 }));
3626
3627 let physical = planner.plan(&logical).unwrap();
3628 assert_eq!(physical.columns(), &["n.name"]);
3629 }
3630
3631 #[test]
3632 fn test_plan_return_literal() {
3633 let store = create_test_store();
3634 let planner = Planner::new(store);
3635
3636 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3638 items: vec![ReturnItem {
3639 expression: LogicalExpression::Literal(Value::Int64(42)),
3640 alias: Some("answer".to_string()),
3641 }],
3642 distinct: false,
3643 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3644 variable: "n".to_string(),
3645 label: None,
3646 input: None,
3647 })),
3648 }));
3649
3650 let physical = planner.plan(&logical).unwrap();
3651 assert_eq!(physical.columns(), &["answer"]);
3652 }
3653
3654 #[test]
3657 fn test_plan_filter_equality() {
3658 let store = create_test_store();
3659 let planner = Planner::new(store);
3660
3661 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3663 items: vec![ReturnItem {
3664 expression: LogicalExpression::Variable("n".to_string()),
3665 alias: None,
3666 }],
3667 distinct: false,
3668 input: Box::new(LogicalOperator::Filter(FilterOp {
3669 predicate: LogicalExpression::Binary {
3670 left: Box::new(LogicalExpression::Property {
3671 variable: "n".to_string(),
3672 property: "age".to_string(),
3673 }),
3674 op: BinaryOp::Eq,
3675 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3676 },
3677 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3678 variable: "n".to_string(),
3679 label: Some("Person".to_string()),
3680 input: None,
3681 })),
3682 })),
3683 }));
3684
3685 let physical = planner.plan(&logical).unwrap();
3686 assert_eq!(physical.columns(), &["n"]);
3687 }
3688
3689 #[test]
3690 fn test_plan_filter_compound_and() {
3691 let store = create_test_store();
3692 let planner = Planner::new(store);
3693
3694 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3696 items: vec![ReturnItem {
3697 expression: LogicalExpression::Variable("n".to_string()),
3698 alias: None,
3699 }],
3700 distinct: false,
3701 input: Box::new(LogicalOperator::Filter(FilterOp {
3702 predicate: LogicalExpression::Binary {
3703 left: Box::new(LogicalExpression::Binary {
3704 left: Box::new(LogicalExpression::Property {
3705 variable: "n".to_string(),
3706 property: "age".to_string(),
3707 }),
3708 op: BinaryOp::Gt,
3709 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3710 }),
3711 op: BinaryOp::And,
3712 right: Box::new(LogicalExpression::Binary {
3713 left: Box::new(LogicalExpression::Property {
3714 variable: "n".to_string(),
3715 property: "age".to_string(),
3716 }),
3717 op: BinaryOp::Lt,
3718 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3719 }),
3720 },
3721 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3722 variable: "n".to_string(),
3723 label: None,
3724 input: None,
3725 })),
3726 })),
3727 }));
3728
3729 let physical = planner.plan(&logical).unwrap();
3730 assert_eq!(physical.columns(), &["n"]);
3731 }
3732
3733 #[test]
3734 fn test_plan_filter_unary_not() {
3735 let store = create_test_store();
3736 let planner = Planner::new(store);
3737
3738 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3740 items: vec![ReturnItem {
3741 expression: LogicalExpression::Variable("n".to_string()),
3742 alias: None,
3743 }],
3744 distinct: false,
3745 input: Box::new(LogicalOperator::Filter(FilterOp {
3746 predicate: LogicalExpression::Unary {
3747 op: UnaryOp::Not,
3748 operand: Box::new(LogicalExpression::Property {
3749 variable: "n".to_string(),
3750 property: "active".to_string(),
3751 }),
3752 },
3753 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3754 variable: "n".to_string(),
3755 label: None,
3756 input: None,
3757 })),
3758 })),
3759 }));
3760
3761 let physical = planner.plan(&logical).unwrap();
3762 assert_eq!(physical.columns(), &["n"]);
3763 }
3764
3765 #[test]
3766 fn test_plan_filter_is_null() {
3767 let store = create_test_store();
3768 let planner = Planner::new(store);
3769
3770 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3772 items: vec![ReturnItem {
3773 expression: LogicalExpression::Variable("n".to_string()),
3774 alias: None,
3775 }],
3776 distinct: false,
3777 input: Box::new(LogicalOperator::Filter(FilterOp {
3778 predicate: LogicalExpression::Unary {
3779 op: UnaryOp::IsNull,
3780 operand: Box::new(LogicalExpression::Property {
3781 variable: "n".to_string(),
3782 property: "email".to_string(),
3783 }),
3784 },
3785 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3786 variable: "n".to_string(),
3787 label: None,
3788 input: None,
3789 })),
3790 })),
3791 }));
3792
3793 let physical = planner.plan(&logical).unwrap();
3794 assert_eq!(physical.columns(), &["n"]);
3795 }
3796
3797 #[test]
3798 fn test_plan_filter_function_call() {
3799 let store = create_test_store();
3800 let planner = Planner::new(store);
3801
3802 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3804 items: vec![ReturnItem {
3805 expression: LogicalExpression::Variable("n".to_string()),
3806 alias: None,
3807 }],
3808 distinct: false,
3809 input: Box::new(LogicalOperator::Filter(FilterOp {
3810 predicate: LogicalExpression::Binary {
3811 left: Box::new(LogicalExpression::FunctionCall {
3812 name: "size".to_string(),
3813 args: vec![LogicalExpression::Property {
3814 variable: "n".to_string(),
3815 property: "friends".to_string(),
3816 }],
3817 distinct: false,
3818 }),
3819 op: BinaryOp::Gt,
3820 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3821 },
3822 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3823 variable: "n".to_string(),
3824 label: None,
3825 input: None,
3826 })),
3827 })),
3828 }));
3829
3830 let physical = planner.plan(&logical).unwrap();
3831 assert_eq!(physical.columns(), &["n"]);
3832 }
3833
3834 #[test]
3837 fn test_plan_expand_outgoing() {
3838 let store = create_test_store();
3839 let planner = Planner::new(store);
3840
3841 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3843 items: vec![
3844 ReturnItem {
3845 expression: LogicalExpression::Variable("a".to_string()),
3846 alias: None,
3847 },
3848 ReturnItem {
3849 expression: LogicalExpression::Variable("b".to_string()),
3850 alias: None,
3851 },
3852 ],
3853 distinct: false,
3854 input: Box::new(LogicalOperator::Expand(ExpandOp {
3855 from_variable: "a".to_string(),
3856 to_variable: "b".to_string(),
3857 edge_variable: None,
3858 direction: ExpandDirection::Outgoing,
3859 edge_type: Some("KNOWS".to_string()),
3860 min_hops: 1,
3861 max_hops: Some(1),
3862 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3863 variable: "a".to_string(),
3864 label: Some("Person".to_string()),
3865 input: None,
3866 })),
3867 path_alias: None,
3868 })),
3869 }));
3870
3871 let physical = planner.plan(&logical).unwrap();
3872 assert!(physical.columns().contains(&"a".to_string()));
3874 assert!(physical.columns().contains(&"b".to_string()));
3875 }
3876
3877 #[test]
3878 fn test_plan_expand_with_edge_variable() {
3879 let store = create_test_store();
3880 let planner = Planner::new(store);
3881
3882 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3884 items: vec![
3885 ReturnItem {
3886 expression: LogicalExpression::Variable("a".to_string()),
3887 alias: None,
3888 },
3889 ReturnItem {
3890 expression: LogicalExpression::Variable("r".to_string()),
3891 alias: None,
3892 },
3893 ReturnItem {
3894 expression: LogicalExpression::Variable("b".to_string()),
3895 alias: None,
3896 },
3897 ],
3898 distinct: false,
3899 input: Box::new(LogicalOperator::Expand(ExpandOp {
3900 from_variable: "a".to_string(),
3901 to_variable: "b".to_string(),
3902 edge_variable: Some("r".to_string()),
3903 direction: ExpandDirection::Outgoing,
3904 edge_type: Some("KNOWS".to_string()),
3905 min_hops: 1,
3906 max_hops: Some(1),
3907 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3908 variable: "a".to_string(),
3909 label: None,
3910 input: None,
3911 })),
3912 path_alias: None,
3913 })),
3914 }));
3915
3916 let physical = planner.plan(&logical).unwrap();
3917 assert!(physical.columns().contains(&"a".to_string()));
3918 assert!(physical.columns().contains(&"r".to_string()));
3919 assert!(physical.columns().contains(&"b".to_string()));
3920 }
3921
3922 #[test]
3925 fn test_plan_limit() {
3926 let store = create_test_store();
3927 let planner = Planner::new(store);
3928
3929 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3931 items: vec![ReturnItem {
3932 expression: LogicalExpression::Variable("n".to_string()),
3933 alias: None,
3934 }],
3935 distinct: false,
3936 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3937 count: 10,
3938 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3939 variable: "n".to_string(),
3940 label: None,
3941 input: None,
3942 })),
3943 })),
3944 }));
3945
3946 let physical = planner.plan(&logical).unwrap();
3947 assert_eq!(physical.columns(), &["n"]);
3948 }
3949
3950 #[test]
3951 fn test_plan_skip() {
3952 let store = create_test_store();
3953 let planner = Planner::new(store);
3954
3955 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3957 items: vec![ReturnItem {
3958 expression: LogicalExpression::Variable("n".to_string()),
3959 alias: None,
3960 }],
3961 distinct: false,
3962 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3963 count: 5,
3964 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3965 variable: "n".to_string(),
3966 label: None,
3967 input: None,
3968 })),
3969 })),
3970 }));
3971
3972 let physical = planner.plan(&logical).unwrap();
3973 assert_eq!(physical.columns(), &["n"]);
3974 }
3975
3976 #[test]
3977 fn test_plan_sort() {
3978 let store = create_test_store();
3979 let planner = Planner::new(store);
3980
3981 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3983 items: vec![ReturnItem {
3984 expression: LogicalExpression::Variable("n".to_string()),
3985 alias: None,
3986 }],
3987 distinct: false,
3988 input: Box::new(LogicalOperator::Sort(SortOp {
3989 keys: vec![SortKey {
3990 expression: LogicalExpression::Variable("n".to_string()),
3991 order: SortOrder::Ascending,
3992 }],
3993 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3994 variable: "n".to_string(),
3995 label: None,
3996 input: None,
3997 })),
3998 })),
3999 }));
4000
4001 let physical = planner.plan(&logical).unwrap();
4002 assert_eq!(physical.columns(), &["n"]);
4003 }
4004
4005 #[test]
4006 fn test_plan_sort_descending() {
4007 let store = create_test_store();
4008 let planner = Planner::new(store);
4009
4010 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4012 items: vec![ReturnItem {
4013 expression: LogicalExpression::Variable("n".to_string()),
4014 alias: None,
4015 }],
4016 distinct: false,
4017 input: Box::new(LogicalOperator::Sort(SortOp {
4018 keys: vec![SortKey {
4019 expression: LogicalExpression::Variable("n".to_string()),
4020 order: SortOrder::Descending,
4021 }],
4022 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4023 variable: "n".to_string(),
4024 label: None,
4025 input: None,
4026 })),
4027 })),
4028 }));
4029
4030 let physical = planner.plan(&logical).unwrap();
4031 assert_eq!(physical.columns(), &["n"]);
4032 }
4033
4034 #[test]
4035 fn test_plan_distinct() {
4036 let store = create_test_store();
4037 let planner = Planner::new(store);
4038
4039 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4041 items: vec![ReturnItem {
4042 expression: LogicalExpression::Variable("n".to_string()),
4043 alias: None,
4044 }],
4045 distinct: false,
4046 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4047 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4048 variable: "n".to_string(),
4049 label: None,
4050 input: None,
4051 })),
4052 columns: None,
4053 })),
4054 }));
4055
4056 let physical = planner.plan(&logical).unwrap();
4057 assert_eq!(physical.columns(), &["n"]);
4058 }
4059
4060 #[test]
4063 fn test_plan_aggregate_count() {
4064 let store = create_test_store();
4065 let planner = Planner::new(store);
4066
4067 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4069 items: vec![ReturnItem {
4070 expression: LogicalExpression::Variable("cnt".to_string()),
4071 alias: None,
4072 }],
4073 distinct: false,
4074 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
4075 group_by: vec![],
4076 aggregates: vec![LogicalAggregateExpr {
4077 function: LogicalAggregateFunction::Count,
4078 expression: Some(LogicalExpression::Variable("n".to_string())),
4079 distinct: false,
4080 alias: Some("cnt".to_string()),
4081 percentile: None,
4082 }],
4083 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4084 variable: "n".to_string(),
4085 label: None,
4086 input: None,
4087 })),
4088 having: None,
4089 })),
4090 }));
4091
4092 let physical = planner.plan(&logical).unwrap();
4093 assert!(physical.columns().contains(&"cnt".to_string()));
4094 }
4095
4096 #[test]
4097 fn test_plan_aggregate_with_group_by() {
4098 let store = create_test_store();
4099 let planner = Planner::new(store);
4100
4101 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4103 group_by: vec![LogicalExpression::Property {
4104 variable: "n".to_string(),
4105 property: "city".to_string(),
4106 }],
4107 aggregates: vec![LogicalAggregateExpr {
4108 function: LogicalAggregateFunction::Count,
4109 expression: Some(LogicalExpression::Variable("n".to_string())),
4110 distinct: false,
4111 alias: Some("cnt".to_string()),
4112 percentile: None,
4113 }],
4114 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4115 variable: "n".to_string(),
4116 label: Some("Person".to_string()),
4117 input: None,
4118 })),
4119 having: None,
4120 }));
4121
4122 let physical = planner.plan(&logical).unwrap();
4123 assert_eq!(physical.columns().len(), 2);
4124 }
4125
4126 #[test]
4127 fn test_plan_aggregate_sum() {
4128 let store = create_test_store();
4129 let planner = Planner::new(store);
4130
4131 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4133 group_by: vec![],
4134 aggregates: vec![LogicalAggregateExpr {
4135 function: LogicalAggregateFunction::Sum,
4136 expression: Some(LogicalExpression::Property {
4137 variable: "n".to_string(),
4138 property: "value".to_string(),
4139 }),
4140 distinct: false,
4141 alias: Some("total".to_string()),
4142 percentile: None,
4143 }],
4144 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4145 variable: "n".to_string(),
4146 label: None,
4147 input: None,
4148 })),
4149 having: None,
4150 }));
4151
4152 let physical = planner.plan(&logical).unwrap();
4153 assert!(physical.columns().contains(&"total".to_string()));
4154 }
4155
4156 #[test]
4157 fn test_plan_aggregate_avg() {
4158 let store = create_test_store();
4159 let planner = Planner::new(store);
4160
4161 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4163 group_by: vec![],
4164 aggregates: vec![LogicalAggregateExpr {
4165 function: LogicalAggregateFunction::Avg,
4166 expression: Some(LogicalExpression::Property {
4167 variable: "n".to_string(),
4168 property: "score".to_string(),
4169 }),
4170 distinct: false,
4171 alias: Some("average".to_string()),
4172 percentile: None,
4173 }],
4174 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4175 variable: "n".to_string(),
4176 label: None,
4177 input: None,
4178 })),
4179 having: None,
4180 }));
4181
4182 let physical = planner.plan(&logical).unwrap();
4183 assert!(physical.columns().contains(&"average".to_string()));
4184 }
4185
4186 #[test]
4187 fn test_plan_aggregate_min_max() {
4188 let store = create_test_store();
4189 let planner = Planner::new(store);
4190
4191 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4193 group_by: vec![],
4194 aggregates: vec![
4195 LogicalAggregateExpr {
4196 function: LogicalAggregateFunction::Min,
4197 expression: Some(LogicalExpression::Property {
4198 variable: "n".to_string(),
4199 property: "age".to_string(),
4200 }),
4201 distinct: false,
4202 alias: Some("youngest".to_string()),
4203 percentile: None,
4204 },
4205 LogicalAggregateExpr {
4206 function: LogicalAggregateFunction::Max,
4207 expression: Some(LogicalExpression::Property {
4208 variable: "n".to_string(),
4209 property: "age".to_string(),
4210 }),
4211 distinct: false,
4212 alias: Some("oldest".to_string()),
4213 percentile: None,
4214 },
4215 ],
4216 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4217 variable: "n".to_string(),
4218 label: None,
4219 input: None,
4220 })),
4221 having: None,
4222 }));
4223
4224 let physical = planner.plan(&logical).unwrap();
4225 assert!(physical.columns().contains(&"youngest".to_string()));
4226 assert!(physical.columns().contains(&"oldest".to_string()));
4227 }
4228
4229 #[test]
4232 fn test_plan_inner_join() {
4233 let store = create_test_store();
4234 let planner = Planner::new(store);
4235
4236 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4238 items: vec![
4239 ReturnItem {
4240 expression: LogicalExpression::Variable("a".to_string()),
4241 alias: None,
4242 },
4243 ReturnItem {
4244 expression: LogicalExpression::Variable("b".to_string()),
4245 alias: None,
4246 },
4247 ],
4248 distinct: false,
4249 input: Box::new(LogicalOperator::Join(JoinOp {
4250 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4251 variable: "a".to_string(),
4252 label: Some("Person".to_string()),
4253 input: None,
4254 })),
4255 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4256 variable: "b".to_string(),
4257 label: Some("Company".to_string()),
4258 input: None,
4259 })),
4260 join_type: JoinType::Inner,
4261 conditions: vec![JoinCondition {
4262 left: LogicalExpression::Variable("a".to_string()),
4263 right: LogicalExpression::Variable("b".to_string()),
4264 }],
4265 })),
4266 }));
4267
4268 let physical = planner.plan(&logical).unwrap();
4269 assert!(physical.columns().contains(&"a".to_string()));
4270 assert!(physical.columns().contains(&"b".to_string()));
4271 }
4272
4273 #[test]
4274 fn test_plan_cross_join() {
4275 let store = create_test_store();
4276 let planner = Planner::new(store);
4277
4278 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4280 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4281 variable: "a".to_string(),
4282 label: None,
4283 input: None,
4284 })),
4285 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4286 variable: "b".to_string(),
4287 label: None,
4288 input: None,
4289 })),
4290 join_type: JoinType::Cross,
4291 conditions: vec![],
4292 }));
4293
4294 let physical = planner.plan(&logical).unwrap();
4295 assert_eq!(physical.columns().len(), 2);
4296 }
4297
4298 #[test]
4299 fn test_plan_left_join() {
4300 let store = create_test_store();
4301 let planner = Planner::new(store);
4302
4303 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4304 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4305 variable: "a".to_string(),
4306 label: None,
4307 input: None,
4308 })),
4309 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4310 variable: "b".to_string(),
4311 label: None,
4312 input: None,
4313 })),
4314 join_type: JoinType::Left,
4315 conditions: vec![],
4316 }));
4317
4318 let physical = planner.plan(&logical).unwrap();
4319 assert_eq!(physical.columns().len(), 2);
4320 }
4321
4322 #[test]
4325 fn test_plan_create_node() {
4326 let store = create_test_store();
4327 let planner = Planner::new(store);
4328
4329 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4331 variable: "n".to_string(),
4332 labels: vec!["Person".to_string()],
4333 properties: vec![(
4334 "name".to_string(),
4335 LogicalExpression::Literal(Value::String("Alice".into())),
4336 )],
4337 input: None,
4338 }));
4339
4340 let physical = planner.plan(&logical).unwrap();
4341 assert!(physical.columns().contains(&"n".to_string()));
4342 }
4343
4344 #[test]
4345 fn test_plan_create_edge() {
4346 let store = create_test_store();
4347 let planner = Planner::new(store);
4348
4349 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4351 variable: Some("r".to_string()),
4352 from_variable: "a".to_string(),
4353 to_variable: "b".to_string(),
4354 edge_type: "KNOWS".to_string(),
4355 properties: vec![],
4356 input: Box::new(LogicalOperator::Join(JoinOp {
4357 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4358 variable: "a".to_string(),
4359 label: None,
4360 input: None,
4361 })),
4362 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4363 variable: "b".to_string(),
4364 label: None,
4365 input: None,
4366 })),
4367 join_type: JoinType::Cross,
4368 conditions: vec![],
4369 })),
4370 }));
4371
4372 let physical = planner.plan(&logical).unwrap();
4373 assert!(physical.columns().contains(&"r".to_string()));
4374 }
4375
4376 #[test]
4377 fn test_plan_delete_node() {
4378 let store = create_test_store();
4379 let planner = Planner::new(store);
4380
4381 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4383 variable: "n".to_string(),
4384 detach: false,
4385 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4386 variable: "n".to_string(),
4387 label: None,
4388 input: None,
4389 })),
4390 }));
4391
4392 let physical = planner.plan(&logical).unwrap();
4393 assert!(physical.columns().contains(&"deleted_count".to_string()));
4394 }
4395
4396 #[test]
4399 fn test_plan_empty_errors() {
4400 let store = create_test_store();
4401 let planner = Planner::new(store);
4402
4403 let logical = LogicalPlan::new(LogicalOperator::Empty);
4404 let result = planner.plan(&logical);
4405 assert!(result.is_err());
4406 }
4407
4408 #[test]
4409 fn test_plan_missing_variable_in_return() {
4410 let store = create_test_store();
4411 let planner = Planner::new(store);
4412
4413 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4415 items: vec![ReturnItem {
4416 expression: LogicalExpression::Variable("missing".to_string()),
4417 alias: None,
4418 }],
4419 distinct: false,
4420 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4421 variable: "n".to_string(),
4422 label: None,
4423 input: None,
4424 })),
4425 }));
4426
4427 let result = planner.plan(&logical);
4428 assert!(result.is_err());
4429 }
4430
4431 #[test]
4434 fn test_convert_binary_ops() {
4435 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4436 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4437 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4438 assert!(convert_binary_op(BinaryOp::Le).is_ok());
4439 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4440 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4441 assert!(convert_binary_op(BinaryOp::And).is_ok());
4442 assert!(convert_binary_op(BinaryOp::Or).is_ok());
4443 assert!(convert_binary_op(BinaryOp::Add).is_ok());
4444 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4445 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4446 assert!(convert_binary_op(BinaryOp::Div).is_ok());
4447 }
4448
4449 #[test]
4450 fn test_convert_unary_ops() {
4451 assert!(convert_unary_op(UnaryOp::Not).is_ok());
4452 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4453 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4454 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4455 }
4456
4457 #[test]
4458 fn test_convert_aggregate_functions() {
4459 assert!(matches!(
4460 convert_aggregate_function(LogicalAggregateFunction::Count),
4461 PhysicalAggregateFunction::Count
4462 ));
4463 assert!(matches!(
4464 convert_aggregate_function(LogicalAggregateFunction::Sum),
4465 PhysicalAggregateFunction::Sum
4466 ));
4467 assert!(matches!(
4468 convert_aggregate_function(LogicalAggregateFunction::Avg),
4469 PhysicalAggregateFunction::Avg
4470 ));
4471 assert!(matches!(
4472 convert_aggregate_function(LogicalAggregateFunction::Min),
4473 PhysicalAggregateFunction::Min
4474 ));
4475 assert!(matches!(
4476 convert_aggregate_function(LogicalAggregateFunction::Max),
4477 PhysicalAggregateFunction::Max
4478 ));
4479 }
4480
4481 #[test]
4482 fn test_planner_accessors() {
4483 let store = create_test_store();
4484 let planner = Planner::new(Arc::clone(&store));
4485
4486 assert!(planner.tx_id().is_none());
4487 assert!(planner.tx_manager().is_none());
4488 let _ = planner.viewing_epoch(); }
4490
4491 #[test]
4492 fn test_physical_plan_accessors() {
4493 let store = create_test_store();
4494 let planner = Planner::new(store);
4495
4496 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4497 variable: "n".to_string(),
4498 label: None,
4499 input: None,
4500 }));
4501
4502 let physical = planner.plan(&logical).unwrap();
4503 assert_eq!(physical.columns(), &["n"]);
4504
4505 let _ = physical.into_operator();
4507 }
4508
4509 #[test]
4512 fn test_plan_adaptive_with_scan() {
4513 let store = create_test_store();
4514 let planner = Planner::new(store);
4515
4516 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4518 items: vec![ReturnItem {
4519 expression: LogicalExpression::Variable("n".to_string()),
4520 alias: None,
4521 }],
4522 distinct: false,
4523 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4524 variable: "n".to_string(),
4525 label: Some("Person".to_string()),
4526 input: None,
4527 })),
4528 }));
4529
4530 let physical = planner.plan_adaptive(&logical).unwrap();
4531 assert_eq!(physical.columns(), &["n"]);
4532 assert!(physical.adaptive_context.is_some());
4534 }
4535
4536 #[test]
4537 fn test_plan_adaptive_with_filter() {
4538 let store = create_test_store();
4539 let planner = Planner::new(store);
4540
4541 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4543 items: vec![ReturnItem {
4544 expression: LogicalExpression::Variable("n".to_string()),
4545 alias: None,
4546 }],
4547 distinct: false,
4548 input: Box::new(LogicalOperator::Filter(FilterOp {
4549 predicate: LogicalExpression::Binary {
4550 left: Box::new(LogicalExpression::Property {
4551 variable: "n".to_string(),
4552 property: "age".to_string(),
4553 }),
4554 op: BinaryOp::Gt,
4555 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4556 },
4557 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4558 variable: "n".to_string(),
4559 label: None,
4560 input: None,
4561 })),
4562 })),
4563 }));
4564
4565 let physical = planner.plan_adaptive(&logical).unwrap();
4566 assert!(physical.adaptive_context.is_some());
4567 }
4568
4569 #[test]
4570 fn test_plan_adaptive_with_expand() {
4571 let store = create_test_store();
4572 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4573
4574 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4576 items: vec![
4577 ReturnItem {
4578 expression: LogicalExpression::Variable("a".to_string()),
4579 alias: None,
4580 },
4581 ReturnItem {
4582 expression: LogicalExpression::Variable("b".to_string()),
4583 alias: None,
4584 },
4585 ],
4586 distinct: false,
4587 input: Box::new(LogicalOperator::Expand(ExpandOp {
4588 from_variable: "a".to_string(),
4589 to_variable: "b".to_string(),
4590 edge_variable: None,
4591 direction: ExpandDirection::Outgoing,
4592 edge_type: Some("KNOWS".to_string()),
4593 min_hops: 1,
4594 max_hops: Some(1),
4595 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4596 variable: "a".to_string(),
4597 label: None,
4598 input: None,
4599 })),
4600 path_alias: None,
4601 })),
4602 }));
4603
4604 let physical = planner.plan_adaptive(&logical).unwrap();
4605 assert!(physical.adaptive_context.is_some());
4606 }
4607
4608 #[test]
4609 fn test_plan_adaptive_with_join() {
4610 let store = create_test_store();
4611 let planner = Planner::new(store);
4612
4613 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4614 items: vec![
4615 ReturnItem {
4616 expression: LogicalExpression::Variable("a".to_string()),
4617 alias: None,
4618 },
4619 ReturnItem {
4620 expression: LogicalExpression::Variable("b".to_string()),
4621 alias: None,
4622 },
4623 ],
4624 distinct: false,
4625 input: Box::new(LogicalOperator::Join(JoinOp {
4626 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4627 variable: "a".to_string(),
4628 label: None,
4629 input: None,
4630 })),
4631 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4632 variable: "b".to_string(),
4633 label: None,
4634 input: None,
4635 })),
4636 join_type: JoinType::Cross,
4637 conditions: vec![],
4638 })),
4639 }));
4640
4641 let physical = planner.plan_adaptive(&logical).unwrap();
4642 assert!(physical.adaptive_context.is_some());
4643 }
4644
4645 #[test]
4646 fn test_plan_adaptive_with_aggregate() {
4647 let store = create_test_store();
4648 let planner = Planner::new(store);
4649
4650 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4651 group_by: vec![],
4652 aggregates: vec![LogicalAggregateExpr {
4653 function: LogicalAggregateFunction::Count,
4654 expression: Some(LogicalExpression::Variable("n".to_string())),
4655 distinct: false,
4656 alias: Some("cnt".to_string()),
4657 percentile: None,
4658 }],
4659 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4660 variable: "n".to_string(),
4661 label: None,
4662 input: None,
4663 })),
4664 having: None,
4665 }));
4666
4667 let physical = planner.plan_adaptive(&logical).unwrap();
4668 assert!(physical.adaptive_context.is_some());
4669 }
4670
4671 #[test]
4672 fn test_plan_adaptive_with_distinct() {
4673 let store = create_test_store();
4674 let planner = Planner::new(store);
4675
4676 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4677 items: vec![ReturnItem {
4678 expression: LogicalExpression::Variable("n".to_string()),
4679 alias: None,
4680 }],
4681 distinct: false,
4682 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4683 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4684 variable: "n".to_string(),
4685 label: None,
4686 input: None,
4687 })),
4688 columns: None,
4689 })),
4690 }));
4691
4692 let physical = planner.plan_adaptive(&logical).unwrap();
4693 assert!(physical.adaptive_context.is_some());
4694 }
4695
4696 #[test]
4697 fn test_plan_adaptive_with_limit() {
4698 let store = create_test_store();
4699 let planner = Planner::new(store);
4700
4701 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4702 items: vec![ReturnItem {
4703 expression: LogicalExpression::Variable("n".to_string()),
4704 alias: None,
4705 }],
4706 distinct: false,
4707 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4708 count: 10,
4709 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4710 variable: "n".to_string(),
4711 label: None,
4712 input: None,
4713 })),
4714 })),
4715 }));
4716
4717 let physical = planner.plan_adaptive(&logical).unwrap();
4718 assert!(physical.adaptive_context.is_some());
4719 }
4720
4721 #[test]
4722 fn test_plan_adaptive_with_skip() {
4723 let store = create_test_store();
4724 let planner = Planner::new(store);
4725
4726 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4727 items: vec![ReturnItem {
4728 expression: LogicalExpression::Variable("n".to_string()),
4729 alias: None,
4730 }],
4731 distinct: false,
4732 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4733 count: 5,
4734 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4735 variable: "n".to_string(),
4736 label: None,
4737 input: None,
4738 })),
4739 })),
4740 }));
4741
4742 let physical = planner.plan_adaptive(&logical).unwrap();
4743 assert!(physical.adaptive_context.is_some());
4744 }
4745
4746 #[test]
4747 fn test_plan_adaptive_with_sort() {
4748 let store = create_test_store();
4749 let planner = Planner::new(store);
4750
4751 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4752 items: vec![ReturnItem {
4753 expression: LogicalExpression::Variable("n".to_string()),
4754 alias: None,
4755 }],
4756 distinct: false,
4757 input: Box::new(LogicalOperator::Sort(SortOp {
4758 keys: vec![SortKey {
4759 expression: LogicalExpression::Variable("n".to_string()),
4760 order: SortOrder::Ascending,
4761 }],
4762 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4763 variable: "n".to_string(),
4764 label: None,
4765 input: None,
4766 })),
4767 })),
4768 }));
4769
4770 let physical = planner.plan_adaptive(&logical).unwrap();
4771 assert!(physical.adaptive_context.is_some());
4772 }
4773
4774 #[test]
4775 fn test_plan_adaptive_with_union() {
4776 let store = create_test_store();
4777 let planner = Planner::new(store);
4778
4779 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4780 items: vec![ReturnItem {
4781 expression: LogicalExpression::Variable("n".to_string()),
4782 alias: None,
4783 }],
4784 distinct: false,
4785 input: Box::new(LogicalOperator::Union(UnionOp {
4786 inputs: vec![
4787 LogicalOperator::NodeScan(NodeScanOp {
4788 variable: "n".to_string(),
4789 label: Some("Person".to_string()),
4790 input: None,
4791 }),
4792 LogicalOperator::NodeScan(NodeScanOp {
4793 variable: "n".to_string(),
4794 label: Some("Company".to_string()),
4795 input: None,
4796 }),
4797 ],
4798 })),
4799 }));
4800
4801 let physical = planner.plan_adaptive(&logical).unwrap();
4802 assert!(physical.adaptive_context.is_some());
4803 }
4804
4805 #[test]
4808 fn test_plan_expand_variable_length() {
4809 let store = create_test_store();
4810 let planner = Planner::new(store);
4811
4812 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4814 items: vec![
4815 ReturnItem {
4816 expression: LogicalExpression::Variable("a".to_string()),
4817 alias: None,
4818 },
4819 ReturnItem {
4820 expression: LogicalExpression::Variable("b".to_string()),
4821 alias: None,
4822 },
4823 ],
4824 distinct: false,
4825 input: Box::new(LogicalOperator::Expand(ExpandOp {
4826 from_variable: "a".to_string(),
4827 to_variable: "b".to_string(),
4828 edge_variable: None,
4829 direction: ExpandDirection::Outgoing,
4830 edge_type: Some("KNOWS".to_string()),
4831 min_hops: 1,
4832 max_hops: Some(3),
4833 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4834 variable: "a".to_string(),
4835 label: None,
4836 input: None,
4837 })),
4838 path_alias: None,
4839 })),
4840 }));
4841
4842 let physical = planner.plan(&logical).unwrap();
4843 assert!(physical.columns().contains(&"a".to_string()));
4844 assert!(physical.columns().contains(&"b".to_string()));
4845 }
4846
4847 #[test]
4848 fn test_plan_expand_with_path_alias() {
4849 let store = create_test_store();
4850 let planner = Planner::new(store);
4851
4852 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4854 items: vec![
4855 ReturnItem {
4856 expression: LogicalExpression::Variable("a".to_string()),
4857 alias: None,
4858 },
4859 ReturnItem {
4860 expression: LogicalExpression::Variable("b".to_string()),
4861 alias: None,
4862 },
4863 ],
4864 distinct: false,
4865 input: Box::new(LogicalOperator::Expand(ExpandOp {
4866 from_variable: "a".to_string(),
4867 to_variable: "b".to_string(),
4868 edge_variable: None,
4869 direction: ExpandDirection::Outgoing,
4870 edge_type: Some("KNOWS".to_string()),
4871 min_hops: 1,
4872 max_hops: Some(3),
4873 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4874 variable: "a".to_string(),
4875 label: None,
4876 input: None,
4877 })),
4878 path_alias: Some("p".to_string()),
4879 })),
4880 }));
4881
4882 let physical = planner.plan(&logical).unwrap();
4883 assert!(physical.columns().contains(&"a".to_string()));
4885 assert!(physical.columns().contains(&"b".to_string()));
4886 }
4887
4888 #[test]
4889 fn test_plan_expand_incoming() {
4890 let store = create_test_store();
4891 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4892
4893 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4895 items: vec![
4896 ReturnItem {
4897 expression: LogicalExpression::Variable("a".to_string()),
4898 alias: None,
4899 },
4900 ReturnItem {
4901 expression: LogicalExpression::Variable("b".to_string()),
4902 alias: None,
4903 },
4904 ],
4905 distinct: false,
4906 input: Box::new(LogicalOperator::Expand(ExpandOp {
4907 from_variable: "a".to_string(),
4908 to_variable: "b".to_string(),
4909 edge_variable: None,
4910 direction: ExpandDirection::Incoming,
4911 edge_type: Some("KNOWS".to_string()),
4912 min_hops: 1,
4913 max_hops: Some(1),
4914 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4915 variable: "a".to_string(),
4916 label: None,
4917 input: None,
4918 })),
4919 path_alias: None,
4920 })),
4921 }));
4922
4923 let physical = planner.plan(&logical).unwrap();
4924 assert!(physical.columns().contains(&"a".to_string()));
4925 assert!(physical.columns().contains(&"b".to_string()));
4926 }
4927
4928 #[test]
4929 fn test_plan_expand_both_directions() {
4930 let store = create_test_store();
4931 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4932
4933 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4935 items: vec![
4936 ReturnItem {
4937 expression: LogicalExpression::Variable("a".to_string()),
4938 alias: None,
4939 },
4940 ReturnItem {
4941 expression: LogicalExpression::Variable("b".to_string()),
4942 alias: None,
4943 },
4944 ],
4945 distinct: false,
4946 input: Box::new(LogicalOperator::Expand(ExpandOp {
4947 from_variable: "a".to_string(),
4948 to_variable: "b".to_string(),
4949 edge_variable: None,
4950 direction: ExpandDirection::Both,
4951 edge_type: Some("KNOWS".to_string()),
4952 min_hops: 1,
4953 max_hops: Some(1),
4954 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4955 variable: "a".to_string(),
4956 label: None,
4957 input: None,
4958 })),
4959 path_alias: None,
4960 })),
4961 }));
4962
4963 let physical = planner.plan(&logical).unwrap();
4964 assert!(physical.columns().contains(&"a".to_string()));
4965 assert!(physical.columns().contains(&"b".to_string()));
4966 }
4967
4968 #[test]
4971 fn test_planner_with_context() {
4972 use crate::transaction::TransactionManager;
4973
4974 let store = create_test_store();
4975 let tx_manager = Arc::new(TransactionManager::new());
4976 let tx_id = tx_manager.begin();
4977 let epoch = tx_manager.current_epoch();
4978
4979 let planner = Planner::with_context(
4980 Arc::clone(&store),
4981 Arc::clone(&tx_manager),
4982 Some(tx_id),
4983 epoch,
4984 );
4985
4986 assert_eq!(planner.tx_id(), Some(tx_id));
4987 assert!(planner.tx_manager().is_some());
4988 assert_eq!(planner.viewing_epoch(), epoch);
4989 }
4990
4991 #[test]
4992 fn test_planner_with_factorized_execution_disabled() {
4993 let store = create_test_store();
4994 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4995
4996 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4998 items: vec![
4999 ReturnItem {
5000 expression: LogicalExpression::Variable("a".to_string()),
5001 alias: None,
5002 },
5003 ReturnItem {
5004 expression: LogicalExpression::Variable("c".to_string()),
5005 alias: None,
5006 },
5007 ],
5008 distinct: false,
5009 input: Box::new(LogicalOperator::Expand(ExpandOp {
5010 from_variable: "b".to_string(),
5011 to_variable: "c".to_string(),
5012 edge_variable: None,
5013 direction: ExpandDirection::Outgoing,
5014 edge_type: None,
5015 min_hops: 1,
5016 max_hops: Some(1),
5017 input: Box::new(LogicalOperator::Expand(ExpandOp {
5018 from_variable: "a".to_string(),
5019 to_variable: "b".to_string(),
5020 edge_variable: None,
5021 direction: ExpandDirection::Outgoing,
5022 edge_type: None,
5023 min_hops: 1,
5024 max_hops: Some(1),
5025 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5026 variable: "a".to_string(),
5027 label: None,
5028 input: None,
5029 })),
5030 path_alias: None,
5031 })),
5032 path_alias: None,
5033 })),
5034 }));
5035
5036 let physical = planner.plan(&logical).unwrap();
5037 assert!(physical.columns().contains(&"a".to_string()));
5038 assert!(physical.columns().contains(&"c".to_string()));
5039 }
5040
5041 #[test]
5044 fn test_plan_sort_by_property() {
5045 let store = create_test_store();
5046 let planner = Planner::new(store);
5047
5048 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5050 items: vec![ReturnItem {
5051 expression: LogicalExpression::Variable("n".to_string()),
5052 alias: None,
5053 }],
5054 distinct: false,
5055 input: Box::new(LogicalOperator::Sort(SortOp {
5056 keys: vec![SortKey {
5057 expression: LogicalExpression::Property {
5058 variable: "n".to_string(),
5059 property: "name".to_string(),
5060 },
5061 order: SortOrder::Ascending,
5062 }],
5063 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5064 variable: "n".to_string(),
5065 label: None,
5066 input: None,
5067 })),
5068 })),
5069 }));
5070
5071 let physical = planner.plan(&logical).unwrap();
5072 assert!(physical.columns().contains(&"n".to_string()));
5074 }
5075
5076 #[test]
5079 fn test_plan_scan_with_input() {
5080 let store = create_test_store();
5081 let planner = Planner::new(store);
5082
5083 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5085 items: vec![
5086 ReturnItem {
5087 expression: LogicalExpression::Variable("a".to_string()),
5088 alias: None,
5089 },
5090 ReturnItem {
5091 expression: LogicalExpression::Variable("b".to_string()),
5092 alias: None,
5093 },
5094 ],
5095 distinct: false,
5096 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5097 variable: "b".to_string(),
5098 label: Some("Company".to_string()),
5099 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
5100 variable: "a".to_string(),
5101 label: Some("Person".to_string()),
5102 input: None,
5103 }))),
5104 })),
5105 }));
5106
5107 let physical = planner.plan(&logical).unwrap();
5108 assert!(physical.columns().contains(&"a".to_string()));
5109 assert!(physical.columns().contains(&"b".to_string()));
5110 }
5111}