1use crate::query::plan::{
8 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9 CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
10 ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
11 LogicalOperator, LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp,
12 ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29 UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37struct RangeBounds<'a> {
39 min: Option<&'a Value>,
40 max: Option<&'a Value>,
41 min_inclusive: bool,
42 max_inclusive: bool,
43}
44
45pub struct Planner {
47 store: Arc<LpgStore>,
49 tx_manager: Option<Arc<TransactionManager>>,
51 tx_id: Option<TxId>,
53 viewing_epoch: EpochId,
55 anon_edge_counter: std::cell::Cell<u32>,
57 factorized_execution: bool,
59}
60
61impl Planner {
62 #[must_use]
67 pub fn new(store: Arc<LpgStore>) -> Self {
68 let epoch = store.current_epoch();
69 Self {
70 store,
71 tx_manager: None,
72 tx_id: None,
73 viewing_epoch: epoch,
74 anon_edge_counter: std::cell::Cell::new(0),
75 factorized_execution: true,
76 }
77 }
78
79 #[must_use]
88 pub fn with_context(
89 store: Arc<LpgStore>,
90 tx_manager: Arc<TransactionManager>,
91 tx_id: Option<TxId>,
92 viewing_epoch: EpochId,
93 ) -> Self {
94 Self {
95 store,
96 tx_manager: Some(tx_manager),
97 tx_id,
98 viewing_epoch,
99 anon_edge_counter: std::cell::Cell::new(0),
100 factorized_execution: true,
101 }
102 }
103
104 #[must_use]
106 pub fn viewing_epoch(&self) -> EpochId {
107 self.viewing_epoch
108 }
109
110 #[must_use]
112 pub fn tx_id(&self) -> Option<TxId> {
113 self.tx_id
114 }
115
116 #[must_use]
118 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119 self.tx_manager.as_ref()
120 }
121
122 #[must_use]
124 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125 self.factorized_execution = enabled;
126 self
127 }
128
129 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133 match op {
134 LogicalOperator::Expand(expand) => {
135 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138 if is_single_hop {
139 let (inner_count, base) = Self::count_expand_chain(&expand.input);
140 (inner_count + 1, base)
141 } else {
142 (0, op)
144 }
145 }
146 _ => (0, op),
147 }
148 }
149
150 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154 let mut chain = Vec::new();
155 let mut current = op;
156
157 while let LogicalOperator::Expand(expand) = current {
158 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160 if !is_single_hop {
161 break;
162 }
163 chain.push(expand);
164 current = &expand.input;
165 }
166
167 chain.reverse();
169 chain
170 }
171
172 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179 Ok(PhysicalPlan {
180 operator,
181 columns,
182 adaptive_context: None,
183 })
184 }
185
186 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197 let mut adaptive_context = AdaptiveContext::new();
199 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201 Ok(PhysicalPlan {
202 operator,
203 columns,
204 adaptive_context: Some(adaptive_context),
205 })
206 }
207
208 fn collect_cardinality_estimates(
210 &self,
211 op: &LogicalOperator,
212 ctx: &mut AdaptiveContext,
213 depth: usize,
214 ) {
215 match op {
216 LogicalOperator::NodeScan(scan) => {
217 let estimate = if let Some(label) = &scan.label {
219 self.store.nodes_by_label(label).len() as f64
220 } else {
221 self.store.node_count() as f64
222 };
223 let id = format!("scan_{}", scan.variable);
224 ctx.set_estimate(&id, estimate);
225
226 if let Some(input) = &scan.input {
228 self.collect_cardinality_estimates(input, ctx, depth + 1);
229 }
230 }
231 LogicalOperator::Filter(filter) => {
232 let input_estimate = self.estimate_cardinality(&filter.input);
234 let estimate = input_estimate * 0.3;
235 let id = format!("filter_{depth}");
236 ctx.set_estimate(&id, estimate);
237
238 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239 }
240 LogicalOperator::Expand(expand) => {
241 let input_estimate = self.estimate_cardinality(&expand.input);
243 let stats = self.store.statistics();
244 let avg_degree = self.estimate_expand_degree(&stats, expand);
245 let estimate = input_estimate * avg_degree;
246 let id = format!("expand_{}", expand.to_variable);
247 ctx.set_estimate(&id, estimate);
248
249 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250 }
251 LogicalOperator::Join(join) => {
252 let left_est = self.estimate_cardinality(&join.left);
254 let right_est = self.estimate_cardinality(&join.right);
255 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
257 ctx.set_estimate(&id, estimate);
258
259 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261 }
262 LogicalOperator::Aggregate(agg) => {
263 let input_estimate = self.estimate_cardinality(&agg.input);
265 let estimate = if agg.group_by.is_empty() {
266 1.0 } else {
268 (input_estimate * 0.1).max(1.0) };
270 let id = format!("aggregate_{depth}");
271 ctx.set_estimate(&id, estimate);
272
273 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274 }
275 LogicalOperator::Distinct(distinct) => {
276 let input_estimate = self.estimate_cardinality(&distinct.input);
277 let estimate = (input_estimate * 0.5).max(1.0);
278 let id = format!("distinct_{depth}");
279 ctx.set_estimate(&id, estimate);
280
281 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282 }
283 LogicalOperator::Return(ret) => {
284 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285 }
286 LogicalOperator::Limit(limit) => {
287 let input_estimate = self.estimate_cardinality(&limit.input);
288 let estimate = (input_estimate).min(limit.count as f64);
289 let id = format!("limit_{depth}");
290 ctx.set_estimate(&id, estimate);
291
292 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293 }
294 LogicalOperator::Skip(skip) => {
295 let input_estimate = self.estimate_cardinality(&skip.input);
296 let estimate = (input_estimate - skip.count as f64).max(0.0);
297 let id = format!("skip_{depth}");
298 ctx.set_estimate(&id, estimate);
299
300 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301 }
302 LogicalOperator::Sort(sort) => {
303 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305 }
306 LogicalOperator::Union(union) => {
307 let estimate: f64 = union
308 .inputs
309 .iter()
310 .map(|input| self.estimate_cardinality(input))
311 .sum();
312 let id = format!("union_{depth}");
313 ctx.set_estimate(&id, estimate);
314
315 for input in &union.inputs {
316 self.collect_cardinality_estimates(input, ctx, depth + 1);
317 }
318 }
319 _ => {
320 }
322 }
323 }
324
325 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327 match op {
328 LogicalOperator::NodeScan(scan) => {
329 if let Some(label) = &scan.label {
330 self.store.nodes_by_label(label).len() as f64
331 } else {
332 self.store.node_count() as f64
333 }
334 }
335 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336 LogicalOperator::Expand(expand) => {
337 let stats = self.store.statistics();
338 let avg_degree = self.estimate_expand_degree(&stats, expand);
339 self.estimate_cardinality(&expand.input) * avg_degree
340 }
341 LogicalOperator::Join(join) => {
342 let left = self.estimate_cardinality(&join.left);
343 let right = self.estimate_cardinality(&join.right);
344 (left * right).sqrt()
345 }
346 LogicalOperator::Aggregate(agg) => {
347 if agg.group_by.is_empty() {
348 1.0
349 } else {
350 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351 }
352 }
353 LogicalOperator::Distinct(distinct) => {
354 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355 }
356 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357 LogicalOperator::Limit(limit) => self
358 .estimate_cardinality(&limit.input)
359 .min(limit.count as f64),
360 LogicalOperator::Skip(skip) => {
361 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362 }
363 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364 LogicalOperator::Union(union) => union
365 .inputs
366 .iter()
367 .map(|input| self.estimate_cardinality(input))
368 .sum(),
369 _ => 1000.0, }
371 }
372
373 fn estimate_expand_degree(
375 &self,
376 stats: &grafeo_core::statistics::Statistics,
377 expand: &ExpandOp,
378 ) -> f64 {
379 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380 if let Some(edge_type) = &expand.edge_type {
381 stats.estimate_avg_degree(edge_type, outgoing)
382 } else if stats.total_nodes > 0 {
383 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384 } else {
385 10.0 }
387 }
388
389 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391 match op {
392 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393 LogicalOperator::Expand(expand) => {
394 if self.factorized_execution {
396 let (chain_len, _base) = Self::count_expand_chain(op);
397 if chain_len >= 2 {
398 return self.plan_expand_chain(op);
400 }
401 }
402 self.plan_expand(expand)
403 }
404 LogicalOperator::Return(ret) => self.plan_return(ret),
405 LogicalOperator::Filter(filter) => self.plan_filter(filter),
406 LogicalOperator::Project(project) => self.plan_project(project),
407 LogicalOperator::Limit(limit) => self.plan_limit(limit),
408 LogicalOperator::Skip(skip) => self.plan_skip(skip),
409 LogicalOperator::Sort(sort) => self.plan_sort(sort),
410 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411 LogicalOperator::Join(join) => self.plan_join(join),
412 LogicalOperator::Union(union) => self.plan_union(union),
413 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421 LogicalOperator::Merge(merge) => self.plan_merge(merge),
422 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
427 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
428 LogicalOperator::VectorScan(_) => Err(Error::Internal(
429 "VectorScan requires vector-index feature".to_string(),
430 )),
431 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
432 "VectorJoin requires vector-index feature".to_string(),
433 )),
434 _ => Err(Error::Internal(format!(
435 "Unsupported operator: {:?}",
436 std::mem::discriminant(op)
437 ))),
438 }
439 }
440
441 fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
443 let scan_op = if let Some(label) = &scan.label {
444 ScanOperator::with_label(Arc::clone(&self.store), label)
445 } else {
446 ScanOperator::new(Arc::clone(&self.store))
447 };
448
449 let scan_operator: Box<dyn Operator> =
451 Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
452
453 if let Some(input) = &scan.input {
455 let (input_op, mut input_columns) = self.plan_operator(input)?;
456
457 let mut output_schema: Vec<LogicalType> =
459 input_columns.iter().map(|_| LogicalType::Any).collect();
460 output_schema.push(LogicalType::Node);
461
462 input_columns.push(scan.variable.clone());
464
465 let join_op = Box::new(NestedLoopJoinOperator::new(
467 input_op,
468 scan_operator,
469 None, PhysicalJoinType::Cross,
471 output_schema,
472 ));
473
474 Ok((join_op, input_columns))
475 } else {
476 let columns = vec![scan.variable.clone()];
477 Ok((scan_operator, columns))
478 }
479 }
480
481 fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
483 let (input_op, input_columns) = self.plan_operator(&expand.input)?;
485
486 let source_column = input_columns
488 .iter()
489 .position(|c| c == &expand.from_variable)
490 .ok_or_else(|| {
491 Error::Internal(format!(
492 "Source variable '{}' not found in input columns",
493 expand.from_variable
494 ))
495 })?;
496
497 let direction = match expand.direction {
499 ExpandDirection::Outgoing => Direction::Outgoing,
500 ExpandDirection::Incoming => Direction::Incoming,
501 ExpandDirection::Both => Direction::Both,
502 };
503
504 let is_variable_length =
506 expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
507
508 let operator: Box<dyn Operator> = if is_variable_length {
509 let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); let mut expand_op = VariableLengthExpandOperator::new(
512 Arc::clone(&self.store),
513 input_op,
514 source_column,
515 direction,
516 expand.edge_type.clone(),
517 expand.min_hops,
518 max_hops,
519 )
520 .with_tx_context(self.viewing_epoch, self.tx_id);
521
522 if expand.path_alias.is_some() {
524 expand_op = expand_op
525 .with_path_length_output()
526 .with_path_detail_output();
527 }
528
529 Box::new(expand_op)
530 } else {
531 let expand_op = ExpandOperator::new(
533 Arc::clone(&self.store),
534 input_op,
535 source_column,
536 direction,
537 expand.edge_type.clone(),
538 )
539 .with_tx_context(self.viewing_epoch, self.tx_id);
540 Box::new(expand_op)
541 };
542
543 let mut columns = input_columns;
546
547 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
549 let count = self.anon_edge_counter.get();
550 self.anon_edge_counter.set(count + 1);
551 format!("_anon_edge_{}", count)
552 });
553 columns.push(edge_col_name);
554
555 columns.push(expand.to_variable.clone());
556
557 if let Some(ref path_alias) = expand.path_alias {
559 columns.push(format!("_path_length_{}", path_alias));
560 columns.push(format!("_path_nodes_{}", path_alias));
561 columns.push(format!("_path_edges_{}", path_alias));
562 }
563
564 Ok((operator, columns))
565 }
566
567 fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
575 let expands = Self::collect_expand_chain(op);
576 if expands.is_empty() {
577 return Err(Error::Internal("Empty expand chain".to_string()));
578 }
579
580 let first_expand = expands[0];
582 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
583
584 let mut columns = base_columns.clone();
585 let mut steps = Vec::new();
586
587 let mut is_first = true;
592
593 for expand in &expands {
594 let source_column = if is_first {
596 base_columns
598 .iter()
599 .position(|c| c == &expand.from_variable)
600 .ok_or_else(|| {
601 Error::Internal(format!(
602 "Source variable '{}' not found in base columns",
603 expand.from_variable
604 ))
605 })?
606 } else {
607 1
610 };
611
612 let direction = match expand.direction {
614 ExpandDirection::Outgoing => Direction::Outgoing,
615 ExpandDirection::Incoming => Direction::Incoming,
616 ExpandDirection::Both => Direction::Both,
617 };
618
619 steps.push(ExpandStep {
621 source_column,
622 direction,
623 edge_type: expand.edge_type.clone(),
624 });
625
626 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
628 let count = self.anon_edge_counter.get();
629 self.anon_edge_counter.set(count + 1);
630 format!("_anon_edge_{}", count)
631 });
632 columns.push(edge_col_name);
633 columns.push(expand.to_variable.clone());
634
635 is_first = false;
636 }
637
638 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
640
641 if let Some(tx_id) = self.tx_id {
642 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
643 } else {
644 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
645 }
646
647 Ok((Box::new(lazy_op), columns))
648 }
649
650 fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
652 let (input_op, input_columns) = self.plan_operator(&ret.input)?;
654
655 let variable_columns: HashMap<String, usize> = input_columns
657 .iter()
658 .enumerate()
659 .map(|(i, name)| (name.clone(), i))
660 .collect();
661
662 let columns: Vec<String> = ret
664 .items
665 .iter()
666 .map(|item| {
667 item.alias.clone().unwrap_or_else(|| {
668 expression_to_string(&item.expression)
670 })
671 })
672 .collect();
673
674 let needs_project = ret
676 .items
677 .iter()
678 .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
679
680 if needs_project {
681 let mut projections = Vec::with_capacity(ret.items.len());
683 let mut output_types = Vec::with_capacity(ret.items.len());
684
685 for item in &ret.items {
686 match &item.expression {
687 LogicalExpression::Variable(name) => {
688 let col_idx = *variable_columns.get(name).ok_or_else(|| {
689 Error::Internal(format!("Variable '{}' not found in input", name))
690 })?;
691 projections.push(ProjectExpr::Column(col_idx));
692 if name.starts_with("_path_nodes_")
694 || name.starts_with("_path_edges_")
695 || name.starts_with("_path_length_")
696 {
697 output_types.push(LogicalType::Any);
698 } else {
699 output_types.push(LogicalType::Node);
700 }
701 }
702 LogicalExpression::Property { variable, property } => {
703 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
704 Error::Internal(format!("Variable '{}' not found in input", variable))
705 })?;
706 projections.push(ProjectExpr::PropertyAccess {
707 column: col_idx,
708 property: property.clone(),
709 });
710 output_types.push(LogicalType::Any);
712 }
713 LogicalExpression::Literal(value) => {
714 projections.push(ProjectExpr::Constant(value.clone()));
715 output_types.push(value_to_logical_type(value));
716 }
717 LogicalExpression::FunctionCall { name, args, .. } => {
718 match name.to_lowercase().as_str() {
720 "type" => {
721 if args.len() != 1 {
723 return Err(Error::Internal(
724 "type() requires exactly one argument".to_string(),
725 ));
726 }
727 if let LogicalExpression::Variable(var_name) = &args[0] {
728 let col_idx =
729 *variable_columns.get(var_name).ok_or_else(|| {
730 Error::Internal(format!(
731 "Variable '{}' not found in input",
732 var_name
733 ))
734 })?;
735 projections.push(ProjectExpr::EdgeType { column: col_idx });
736 output_types.push(LogicalType::String);
737 } else {
738 return Err(Error::Internal(
739 "type() argument must be a variable".to_string(),
740 ));
741 }
742 }
743 "length" => {
744 if args.len() != 1 {
747 return Err(Error::Internal(
748 "length() requires exactly one argument".to_string(),
749 ));
750 }
751 if let LogicalExpression::Variable(var_name) = &args[0] {
752 let col_idx =
753 *variable_columns.get(var_name).ok_or_else(|| {
754 Error::Internal(format!(
755 "Variable '{}' not found in input",
756 var_name
757 ))
758 })?;
759 projections.push(ProjectExpr::Column(col_idx));
761 output_types.push(LogicalType::Int64);
762 } else {
763 return Err(Error::Internal(
764 "length() argument must be a variable".to_string(),
765 ));
766 }
767 }
768 "nodes" | "edges" => {
769 if args.len() != 1 {
771 return Err(Error::Internal(format!(
772 "{}() requires exactly one argument",
773 name
774 )));
775 }
776 if let LogicalExpression::Variable(var_name) = &args[0] {
777 let col_idx =
778 *variable_columns.get(var_name).ok_or_else(|| {
779 Error::Internal(format!(
780 "Variable '{var_name}' not found in input",
781 ))
782 })?;
783 projections.push(ProjectExpr::Column(col_idx));
784 output_types.push(LogicalType::Any);
785 } else {
786 return Err(Error::Internal(format!(
787 "{}() argument must be a variable",
788 name
789 )));
790 }
791 }
792 _ => {
794 let filter_expr = self.convert_expression(&item.expression)?;
795 projections.push(ProjectExpr::Expression {
796 expr: filter_expr,
797 variable_columns: variable_columns.clone(),
798 });
799 output_types.push(LogicalType::Any);
800 }
801 }
802 }
803 LogicalExpression::Case { .. } => {
804 let filter_expr = self.convert_expression(&item.expression)?;
806 projections.push(ProjectExpr::Expression {
807 expr: filter_expr,
808 variable_columns: variable_columns.clone(),
809 });
810 output_types.push(LogicalType::Any);
812 }
813 _ => {
814 return Err(Error::Internal(format!(
815 "Unsupported RETURN expression: {:?}",
816 item.expression
817 )));
818 }
819 }
820 }
821
822 let operator = Box::new(ProjectOperator::with_store(
823 input_op,
824 projections,
825 output_types,
826 Arc::clone(&self.store),
827 ));
828
829 Ok((operator, columns))
830 } else {
831 let mut projections = Vec::with_capacity(ret.items.len());
834 let mut output_types = Vec::with_capacity(ret.items.len());
835
836 for item in &ret.items {
837 if let LogicalExpression::Variable(name) = &item.expression {
838 let col_idx = *variable_columns.get(name).ok_or_else(|| {
839 Error::Internal(format!("Variable '{}' not found in input", name))
840 })?;
841 projections.push(ProjectExpr::Column(col_idx));
842 output_types.push(LogicalType::Node);
843 }
844 }
845
846 if projections.len() == input_columns.len()
848 && projections
849 .iter()
850 .enumerate()
851 .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
852 {
853 Ok((input_op, columns))
855 } else {
856 let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
857 Ok((operator, columns))
858 }
859 }
860 }
861
862 fn plan_project(
864 &self,
865 project: &crate::query::plan::ProjectOp,
866 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
867 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
869 if matches!(project.input.as_ref(), LogicalOperator::Empty) {
870 let single_row_op: Box<dyn Operator> = Box::new(
872 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
873 );
874 (single_row_op, Vec::new())
875 } else {
876 self.plan_operator(&project.input)?
877 };
878
879 let variable_columns: HashMap<String, usize> = input_columns
881 .iter()
882 .enumerate()
883 .map(|(i, name)| (name.clone(), i))
884 .collect();
885
886 let mut projections = Vec::with_capacity(project.projections.len());
888 let mut output_types = Vec::with_capacity(project.projections.len());
889 let mut output_columns = Vec::with_capacity(project.projections.len());
890
891 for projection in &project.projections {
892 let col_name = projection
894 .alias
895 .clone()
896 .unwrap_or_else(|| expression_to_string(&projection.expression));
897 output_columns.push(col_name);
898
899 match &projection.expression {
900 LogicalExpression::Variable(name) => {
901 let col_idx = *variable_columns.get(name).ok_or_else(|| {
902 Error::Internal(format!("Variable '{}' not found in input", name))
903 })?;
904 projections.push(ProjectExpr::Column(col_idx));
905 output_types.push(LogicalType::Node);
906 }
907 LogicalExpression::Property { variable, property } => {
908 let col_idx = *variable_columns.get(variable).ok_or_else(|| {
909 Error::Internal(format!("Variable '{}' not found in input", variable))
910 })?;
911 projections.push(ProjectExpr::PropertyAccess {
912 column: col_idx,
913 property: property.clone(),
914 });
915 output_types.push(LogicalType::Any);
916 }
917 LogicalExpression::Literal(value) => {
918 projections.push(ProjectExpr::Constant(value.clone()));
919 output_types.push(value_to_logical_type(value));
920 }
921 _ => {
922 let filter_expr = self.convert_expression(&projection.expression)?;
924 projections.push(ProjectExpr::Expression {
925 expr: filter_expr,
926 variable_columns: variable_columns.clone(),
927 });
928 output_types.push(LogicalType::Any);
929 }
930 }
931 }
932
933 let operator = Box::new(ProjectOperator::with_store(
934 input_op,
935 projections,
936 output_types,
937 Arc::clone(&self.store),
938 ));
939
940 Ok((operator, output_columns))
941 }
942
943 fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
949 if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
952 let (_, columns) = self.plan_operator(&filter.input)?;
954 let schema = self.derive_schema_from_columns(&columns);
955 let empty_op = Box::new(EmptyOperator::new(schema));
956 return Ok((empty_op, columns));
957 }
958
959 if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
961 return Ok(result);
962 }
963
964 if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
966 return Ok(result);
967 }
968
969 let (input_op, columns) = self.plan_operator(&filter.input)?;
971
972 let variable_columns: HashMap<String, usize> = columns
974 .iter()
975 .enumerate()
976 .map(|(i, name)| (name.clone(), i))
977 .collect();
978
979 let filter_expr = self.convert_expression(&filter.predicate)?;
981
982 let predicate =
984 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
985
986 let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
988
989 Ok((operator, columns))
990 }
991
992 fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
999 use grafeo_core::graph::lpg::CompareOp;
1000
1001 match predicate {
1002 LogicalExpression::Binary { left, op, right } => {
1003 match op {
1005 BinaryOp::And => {
1006 let left_result = self.check_zone_map_for_predicate(left);
1007 let right_result = self.check_zone_map_for_predicate(right);
1008
1009 return match (left_result, right_result) {
1010 (Some(false), _) | (_, Some(false)) => Some(false),
1012 (Some(true), Some(true)) => Some(true),
1014 _ => None,
1016 };
1017 }
1018 BinaryOp::Or => {
1019 let left_result = self.check_zone_map_for_predicate(left);
1020 let right_result = self.check_zone_map_for_predicate(right);
1021
1022 return match (left_result, right_result) {
1023 (Some(false), Some(false)) => Some(false),
1025 (Some(true), _) | (_, Some(true)) => Some(true),
1027 _ => None,
1029 };
1030 }
1031 _ => {}
1032 }
1033
1034 let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1036 (
1037 LogicalExpression::Property { property, .. },
1038 LogicalExpression::Literal(val),
1039 ) => {
1040 let cmp = match op {
1041 BinaryOp::Eq => CompareOp::Eq,
1042 BinaryOp::Ne => CompareOp::Ne,
1043 BinaryOp::Lt => CompareOp::Lt,
1044 BinaryOp::Le => CompareOp::Le,
1045 BinaryOp::Gt => CompareOp::Gt,
1046 BinaryOp::Ge => CompareOp::Ge,
1047 _ => return None,
1048 };
1049 (property.clone(), cmp, val.clone())
1050 }
1051 (
1052 LogicalExpression::Literal(val),
1053 LogicalExpression::Property { property, .. },
1054 ) => {
1055 let cmp = match op {
1057 BinaryOp::Eq => CompareOp::Eq,
1058 BinaryOp::Ne => CompareOp::Ne,
1059 BinaryOp::Lt => CompareOp::Gt, BinaryOp::Le => CompareOp::Ge,
1061 BinaryOp::Gt => CompareOp::Lt,
1062 BinaryOp::Ge => CompareOp::Le,
1063 _ => return None,
1064 };
1065 (property.clone(), cmp, val.clone())
1066 }
1067 _ => return None,
1068 };
1069
1070 let might_match =
1072 self.store
1073 .node_property_might_match(&property.into(), compare_op, &value);
1074
1075 Some(might_match)
1076 }
1077
1078 _ => None,
1079 }
1080 }
1081
1082 fn try_plan_filter_with_property_index(
1091 &self,
1092 filter: &FilterOp,
1093 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1094 let (scan_variable, scan_label) = match filter.input.as_ref() {
1096 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1097 (scan.variable.clone(), scan.label.clone())
1098 }
1099 _ => return Ok(None),
1100 };
1101
1102 let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1105
1106 if conditions.is_empty() {
1107 return Ok(None);
1108 }
1109
1110 let has_indexed_condition = conditions
1112 .iter()
1113 .any(|(prop, _)| self.store.has_property_index(prop));
1114
1115 if !has_indexed_condition && scan_label.is_none() {
1118 return Ok(None);
1119 }
1120
1121 let mut matching_nodes = if has_indexed_condition {
1122 let conditions_ref: Vec<(&str, Value)> = conditions
1124 .iter()
1125 .map(|(p, v)| (p.as_str(), v.clone()))
1126 .collect();
1127 let mut nodes = self.store.find_nodes_by_properties(&conditions_ref);
1128
1129 if let Some(label) = &scan_label {
1131 let label_nodes: std::collections::HashSet<_> =
1132 self.store.nodes_by_label(label).into_iter().collect();
1133 nodes.retain(|n| label_nodes.contains(n));
1134 }
1135 nodes
1136 } else {
1137 let label = scan_label.as_ref().expect("label checked above");
1141 let label_nodes = self.store.nodes_by_label(label);
1142 label_nodes
1143 .into_iter()
1144 .filter(|&node_id| {
1145 conditions.iter().all(|(prop, val)| {
1146 let key = grafeo_common::types::PropertyKey::new(prop);
1147 self.store
1148 .get_node_property(node_id, &key)
1149 .is_some_and(|v| v == *val)
1150 })
1151 })
1152 .collect()
1153 };
1154
1155 let epoch = self.viewing_epoch;
1158 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
1159 matching_nodes.retain(|id| self.store.get_node_versioned(*id, epoch, tx).is_some());
1160
1161 let columns = vec![scan_variable.clone()];
1162 let node_list_op: Box<dyn Operator> = Box::new(NodeListOperator::new(matching_nodes, 2048));
1163
1164 if let Some(remaining) =
1167 self.extract_remaining_predicate(&filter.predicate, &scan_variable, &conditions)
1168 {
1169 let variable_columns: HashMap<String, usize> = columns
1170 .iter()
1171 .enumerate()
1172 .map(|(i, name)| (name.clone(), i))
1173 .collect();
1174 let filter_expr = self.convert_expression(&remaining)?;
1175 let predicate =
1176 ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
1177 let filtered = Box::new(FilterOperator::new(node_list_op, Box::new(predicate)));
1178 Ok(Some((filtered, columns)))
1179 } else {
1180 Ok(Some((node_list_op, columns)))
1181 }
1182 }
1183
1184 fn extract_remaining_predicate(
1189 &self,
1190 predicate: &LogicalExpression,
1191 target_variable: &str,
1192 pushed_conditions: &[(String, Value)],
1193 ) -> Option<LogicalExpression> {
1194 match predicate {
1195 LogicalExpression::Binary {
1196 left,
1197 op: BinaryOp::And,
1198 right,
1199 } => {
1200 let left_remaining =
1201 self.extract_remaining_predicate(left, target_variable, pushed_conditions);
1202 let right_remaining =
1203 self.extract_remaining_predicate(right, target_variable, pushed_conditions);
1204
1205 match (left_remaining, right_remaining) {
1206 (Some(l), Some(r)) => Some(LogicalExpression::Binary {
1207 left: Box::new(l),
1208 op: BinaryOp::And,
1209 right: Box::new(r),
1210 }),
1211 (Some(l), None) => Some(l),
1212 (None, Some(r)) => Some(r),
1213 (None, None) => None,
1214 }
1215 }
1216 LogicalExpression::Binary {
1217 left,
1218 op: BinaryOp::Eq,
1219 right,
1220 } => {
1221 if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1223 && var == target_variable
1224 && pushed_conditions
1225 .iter()
1226 .any(|(p, v)| *p == prop && *v == val)
1227 {
1228 None } else {
1230 Some(predicate.clone())
1231 }
1232 }
1233 _ => Some(predicate.clone()),
1234 }
1235 }
1236
1237 fn extract_equality_conditions(
1243 &self,
1244 predicate: &LogicalExpression,
1245 target_variable: &str,
1246 ) -> Vec<(String, Value)> {
1247 let mut conditions = Vec::new();
1248 self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1249 conditions
1250 }
1251
1252 fn collect_equality_conditions(
1254 &self,
1255 expr: &LogicalExpression,
1256 target_variable: &str,
1257 conditions: &mut Vec<(String, Value)>,
1258 ) {
1259 match expr {
1260 LogicalExpression::Binary {
1262 left,
1263 op: BinaryOp::And,
1264 right,
1265 } => {
1266 self.collect_equality_conditions(left, target_variable, conditions);
1267 self.collect_equality_conditions(right, target_variable, conditions);
1268 }
1269
1270 LogicalExpression::Binary {
1272 left,
1273 op: BinaryOp::Eq,
1274 right,
1275 } => {
1276 if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1277 && var == target_variable
1278 {
1279 conditions.push((prop, val));
1280 }
1281 }
1282
1283 _ => {}
1284 }
1285 }
1286
1287 fn extract_property_equality(
1289 &self,
1290 left: &LogicalExpression,
1291 right: &LogicalExpression,
1292 ) -> Option<(String, String, Value)> {
1293 match (left, right) {
1294 (
1295 LogicalExpression::Property { variable, property },
1296 LogicalExpression::Literal(val),
1297 ) => Some((variable.clone(), property.clone(), val.clone())),
1298 (
1299 LogicalExpression::Literal(val),
1300 LogicalExpression::Property { variable, property },
1301 ) => Some((variable.clone(), property.clone(), val.clone())),
1302 _ => None,
1303 }
1304 }
1305
1306 fn try_plan_filter_with_range_index(
1319 &self,
1320 filter: &FilterOp,
1321 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1322 let (scan_variable, scan_label) = match filter.input.as_ref() {
1324 LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1325 (scan.variable.clone(), scan.label.clone())
1326 }
1327 _ => return Ok(None),
1328 };
1329
1330 if let Some((variable, property, min, max, min_inc, max_inc)) =
1332 self.extract_between_predicate(&filter.predicate)
1333 && variable == scan_variable
1334 {
1335 return self.plan_range_filter(
1336 &scan_variable,
1337 &scan_label,
1338 &property,
1339 RangeBounds {
1340 min: Some(&min),
1341 max: Some(&max),
1342 min_inclusive: min_inc,
1343 max_inclusive: max_inc,
1344 },
1345 );
1346 }
1347
1348 if let Some((variable, property, op, value)) =
1350 self.extract_range_predicate(&filter.predicate)
1351 && variable == scan_variable
1352 {
1353 let (min, max, min_inc, max_inc) = match op {
1354 BinaryOp::Lt => (None, Some(value), false, false),
1355 BinaryOp::Le => (None, Some(value), false, true),
1356 BinaryOp::Gt => (Some(value), None, false, false),
1357 BinaryOp::Ge => (Some(value), None, true, false),
1358 _ => return Ok(None),
1359 };
1360 return self.plan_range_filter(
1361 &scan_variable,
1362 &scan_label,
1363 &property,
1364 RangeBounds {
1365 min: min.as_ref(),
1366 max: max.as_ref(),
1367 min_inclusive: min_inc,
1368 max_inclusive: max_inc,
1369 },
1370 );
1371 }
1372
1373 Ok(None)
1374 }
1375
1376 fn plan_range_filter(
1378 &self,
1379 scan_variable: &str,
1380 scan_label: &Option<String>,
1381 property: &str,
1382 bounds: RangeBounds<'_>,
1383 ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1384 let mut matching_nodes = self.store.find_nodes_in_range(
1386 property,
1387 bounds.min,
1388 bounds.max,
1389 bounds.min_inclusive,
1390 bounds.max_inclusive,
1391 );
1392
1393 if let Some(label) = scan_label {
1395 let label_nodes: std::collections::HashSet<_> =
1396 self.store.nodes_by_label(label).into_iter().collect();
1397 matching_nodes.retain(|n| label_nodes.contains(n));
1398 }
1399
1400 let epoch = self.viewing_epoch;
1402 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
1403 matching_nodes.retain(|id| self.store.get_node_versioned(*id, epoch, tx).is_some());
1404
1405 let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1407 let columns = vec![scan_variable.to_string()];
1408
1409 Ok(Some((node_list_op, columns)))
1410 }
1411
1412 fn extract_range_predicate(
1416 &self,
1417 predicate: &LogicalExpression,
1418 ) -> Option<(String, String, BinaryOp, Value)> {
1419 match predicate {
1420 LogicalExpression::Binary { left, op, right } => {
1421 match op {
1422 BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1423 if let (
1425 LogicalExpression::Property { variable, property },
1426 LogicalExpression::Literal(val),
1427 ) = (left.as_ref(), right.as_ref())
1428 {
1429 return Some((variable.clone(), property.clone(), *op, val.clone()));
1430 }
1431
1432 if let (
1434 LogicalExpression::Literal(val),
1435 LogicalExpression::Property { variable, property },
1436 ) = (left.as_ref(), right.as_ref())
1437 {
1438 let flipped_op = match op {
1439 BinaryOp::Lt => BinaryOp::Gt,
1440 BinaryOp::Le => BinaryOp::Ge,
1441 BinaryOp::Gt => BinaryOp::Lt,
1442 BinaryOp::Ge => BinaryOp::Le,
1443 _ => return None,
1444 };
1445 return Some((
1446 variable.clone(),
1447 property.clone(),
1448 flipped_op,
1449 val.clone(),
1450 ));
1451 }
1452 }
1453 _ => {}
1454 }
1455 }
1456 _ => {}
1457 }
1458 None
1459 }
1460
1461 fn extract_between_predicate(
1469 &self,
1470 predicate: &LogicalExpression,
1471 ) -> Option<(String, String, Value, Value, bool, bool)> {
1472 let (left, right) = match predicate {
1474 LogicalExpression::Binary {
1475 left,
1476 op: BinaryOp::And,
1477 right,
1478 } => (left.as_ref(), right.as_ref()),
1479 _ => return None,
1480 };
1481
1482 let left_range = self.extract_range_predicate(left);
1484 let right_range = self.extract_range_predicate(right);
1485
1486 let (left_var, left_prop, left_op, left_val) = left_range?;
1487 let (right_var, right_prop, right_op, right_val) = right_range?;
1488
1489 if left_var != right_var || left_prop != right_prop {
1491 return None;
1492 }
1493
1494 let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1496 (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1498 (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1500 (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1502 (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1504 (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1506 (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1508 (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1510 (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1512 _ => return None,
1513 };
1514
1515 Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1516 }
1517
1518 fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1520 let (input_op, columns) = self.plan_operator(&limit.input)?;
1521 let output_schema = self.derive_schema_from_columns(&columns);
1522 let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1523 Ok((operator, columns))
1524 }
1525
1526 fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1528 let (input_op, columns) = self.plan_operator(&skip.input)?;
1529 let output_schema = self.derive_schema_from_columns(&columns);
1530 let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1531 Ok((operator, columns))
1532 }
1533
1534 fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1536 let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1537
1538 let mut variable_columns: HashMap<String, usize> = input_columns
1540 .iter()
1541 .enumerate()
1542 .map(|(i, name)| (name.clone(), i))
1543 .collect();
1544
1545 let mut property_projections: Vec<(String, String, String)> = Vec::new();
1547 let mut next_col_idx = input_columns.len();
1548
1549 for key in &sort.keys {
1550 if let LogicalExpression::Property { variable, property } = &key.expression {
1551 let col_name = format!("{}_{}", variable, property);
1552 if !variable_columns.contains_key(&col_name) {
1553 property_projections.push((
1554 variable.clone(),
1555 property.clone(),
1556 col_name.clone(),
1557 ));
1558 variable_columns.insert(col_name, next_col_idx);
1559 next_col_idx += 1;
1560 }
1561 }
1562 }
1563
1564 let mut output_columns = input_columns.clone();
1566
1567 if !property_projections.is_empty() {
1569 let mut projections = Vec::new();
1570 let mut output_types = Vec::new();
1571
1572 for (i, _) in input_columns.iter().enumerate() {
1575 projections.push(ProjectExpr::Column(i));
1576 output_types.push(LogicalType::Node);
1577 }
1578
1579 for (variable, property, col_name) in &property_projections {
1581 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1582 Error::Internal(format!(
1583 "Variable '{}' not found for ORDER BY property projection",
1584 variable
1585 ))
1586 })?;
1587 projections.push(ProjectExpr::PropertyAccess {
1588 column: source_col,
1589 property: property.clone(),
1590 });
1591 output_types.push(LogicalType::Any);
1592 output_columns.push(col_name.clone());
1593 }
1594
1595 input_op = Box::new(ProjectOperator::with_store(
1596 input_op,
1597 projections,
1598 output_types,
1599 Arc::clone(&self.store),
1600 ));
1601 }
1602
1603 let physical_keys: Vec<PhysicalSortKey> = sort
1605 .keys
1606 .iter()
1607 .map(|key| {
1608 let col_idx = self
1609 .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1610 Ok(PhysicalSortKey {
1611 column: col_idx,
1612 direction: match key.order {
1613 SortOrder::Ascending => SortDirection::Ascending,
1614 SortOrder::Descending => SortDirection::Descending,
1615 },
1616 null_order: NullOrder::NullsLast,
1617 })
1618 })
1619 .collect::<Result<Vec<_>>>()?;
1620
1621 let output_schema = self.derive_schema_from_columns(&output_columns);
1622 let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1623 Ok((operator, output_columns))
1624 }
1625
1626 fn resolve_sort_expression_with_properties(
1628 &self,
1629 expr: &LogicalExpression,
1630 variable_columns: &HashMap<String, usize>,
1631 ) -> Result<usize> {
1632 match expr {
1633 LogicalExpression::Variable(name) => {
1634 variable_columns.get(name).copied().ok_or_else(|| {
1635 Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1636 })
1637 }
1638 LogicalExpression::Property { variable, property } => {
1639 let col_name = format!("{}_{}", variable, property);
1641 variable_columns.get(&col_name).copied().ok_or_else(|| {
1642 Error::Internal(format!(
1643 "Property column '{}' not found for ORDER BY (from {}.{})",
1644 col_name, variable, property
1645 ))
1646 })
1647 }
1648 _ => Err(Error::Internal(format!(
1649 "Unsupported ORDER BY expression: {:?}",
1650 expr
1651 ))),
1652 }
1653 }
1654
1655 fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1657 columns.iter().map(|_| LogicalType::Any).collect()
1658 }
1659
1660 fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1662 if self.factorized_execution
1669 && agg.group_by.is_empty()
1670 && Self::count_expand_chain(&agg.input).0 >= 2
1671 && self.is_simple_aggregate(agg)
1672 && let Ok((op, cols)) = self.plan_factorized_aggregate(agg)
1673 {
1674 return Ok((op, cols));
1675 }
1676 let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1679
1680 let mut variable_columns: HashMap<String, usize> = input_columns
1682 .iter()
1683 .enumerate()
1684 .map(|(i, name)| (name.clone(), i))
1685 .collect();
1686
1687 let mut property_projections: Vec<(String, String, String)> = Vec::new(); let mut next_col_idx = input_columns.len();
1690
1691 for expr in &agg.group_by {
1693 if let LogicalExpression::Property { variable, property } = expr {
1694 let col_name = format!("{}_{}", variable, property);
1695 if !variable_columns.contains_key(&col_name) {
1696 property_projections.push((
1697 variable.clone(),
1698 property.clone(),
1699 col_name.clone(),
1700 ));
1701 variable_columns.insert(col_name, next_col_idx);
1702 next_col_idx += 1;
1703 }
1704 }
1705 }
1706
1707 for agg_expr in &agg.aggregates {
1709 if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1710 let col_name = format!("{}_{}", variable, property);
1711 if !variable_columns.contains_key(&col_name) {
1712 property_projections.push((
1713 variable.clone(),
1714 property.clone(),
1715 col_name.clone(),
1716 ));
1717 variable_columns.insert(col_name, next_col_idx);
1718 next_col_idx += 1;
1719 }
1720 }
1721 }
1722
1723 if !property_projections.is_empty() {
1725 let mut projections = Vec::new();
1726 let mut output_types = Vec::new();
1727
1728 for (i, _) in input_columns.iter().enumerate() {
1731 projections.push(ProjectExpr::Column(i));
1732 output_types.push(LogicalType::Node);
1733 }
1734
1735 for (variable, property, _col_name) in &property_projections {
1737 let source_col = *variable_columns.get(variable).ok_or_else(|| {
1738 Error::Internal(format!(
1739 "Variable '{}' not found for property projection",
1740 variable
1741 ))
1742 })?;
1743 projections.push(ProjectExpr::PropertyAccess {
1744 column: source_col,
1745 property: property.clone(),
1746 });
1747 output_types.push(LogicalType::Any); }
1749
1750 input_op = Box::new(ProjectOperator::with_store(
1751 input_op,
1752 projections,
1753 output_types,
1754 Arc::clone(&self.store),
1755 ));
1756 }
1757
1758 let group_columns: Vec<usize> = agg
1760 .group_by
1761 .iter()
1762 .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1763 .collect::<Result<Vec<_>>>()?;
1764
1765 let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1767 .aggregates
1768 .iter()
1769 .map(|agg_expr| {
1770 let column = agg_expr
1771 .expression
1772 .as_ref()
1773 .map(|e| {
1774 self.resolve_expression_to_column_with_properties(e, &variable_columns)
1775 })
1776 .transpose()?;
1777
1778 Ok(PhysicalAggregateExpr {
1779 function: convert_aggregate_function(agg_expr.function),
1780 column,
1781 distinct: agg_expr.distinct,
1782 alias: agg_expr.alias.clone(),
1783 percentile: agg_expr.percentile,
1784 })
1785 })
1786 .collect::<Result<Vec<_>>>()?;
1787
1788 let mut output_schema = Vec::new();
1790 let mut output_columns = Vec::new();
1791
1792 for expr in &agg.group_by {
1794 output_schema.push(LogicalType::Any); output_columns.push(expression_to_string(expr));
1796 }
1797
1798 for agg_expr in &agg.aggregates {
1800 let result_type = match agg_expr.function {
1801 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1802 LogicalType::Int64
1803 }
1804 LogicalAggregateFunction::Sum => LogicalType::Int64,
1805 LogicalAggregateFunction::Avg => LogicalType::Float64,
1806 LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1807 LogicalType::Int64
1811 }
1812 LogicalAggregateFunction::Collect => LogicalType::Any, LogicalAggregateFunction::StdDev
1815 | LogicalAggregateFunction::StdDevPop
1816 | LogicalAggregateFunction::PercentileDisc
1817 | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1818 };
1819 output_schema.push(result_type);
1820 output_columns.push(
1821 agg_expr
1822 .alias
1823 .clone()
1824 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1825 );
1826 }
1827
1828 let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1830 Box::new(SimpleAggregateOperator::new(
1831 input_op,
1832 physical_aggregates,
1833 output_schema,
1834 ))
1835 } else {
1836 Box::new(HashAggregateOperator::new(
1837 input_op,
1838 group_columns,
1839 physical_aggregates,
1840 output_schema,
1841 ))
1842 };
1843
1844 if let Some(having_expr) = &agg.having {
1846 let having_var_columns: HashMap<String, usize> = output_columns
1848 .iter()
1849 .enumerate()
1850 .map(|(i, name)| (name.clone(), i))
1851 .collect();
1852
1853 let filter_expr = self.convert_expression(having_expr)?;
1854 let predicate =
1855 ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1856 operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1857 }
1858
1859 Ok((operator, output_columns))
1860 }
1861
1862 fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1868 agg.aggregates.iter().all(|agg_expr| {
1869 match agg_expr.function {
1870 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1871 agg_expr.expression.is_none()
1873 || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1874 }
1875 LogicalAggregateFunction::Sum
1876 | LogicalAggregateFunction::Avg
1877 | LogicalAggregateFunction::Min
1878 | LogicalAggregateFunction::Max => {
1879 matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1882 }
1883 _ => false,
1885 }
1886 })
1887 }
1888
1889 fn plan_factorized_aggregate(
1893 &self,
1894 agg: &AggregateOp,
1895 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1896 let expands = Self::collect_expand_chain(&agg.input);
1898 if expands.is_empty() {
1899 return Err(Error::Internal(
1900 "Expected expand chain for factorized aggregate".to_string(),
1901 ));
1902 }
1903
1904 let first_expand = expands[0];
1906 let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1907
1908 let mut columns = base_columns.clone();
1909 let mut steps = Vec::new();
1910 let mut is_first = true;
1911
1912 for expand in &expands {
1913 let source_column = if is_first {
1915 base_columns
1916 .iter()
1917 .position(|c| c == &expand.from_variable)
1918 .ok_or_else(|| {
1919 Error::Internal(format!(
1920 "Source variable '{}' not found in base columns",
1921 expand.from_variable
1922 ))
1923 })?
1924 } else {
1925 1 };
1927
1928 let direction = match expand.direction {
1929 ExpandDirection::Outgoing => Direction::Outgoing,
1930 ExpandDirection::Incoming => Direction::Incoming,
1931 ExpandDirection::Both => Direction::Both,
1932 };
1933
1934 steps.push(ExpandStep {
1935 source_column,
1936 direction,
1937 edge_type: expand.edge_type.clone(),
1938 });
1939
1940 let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1941 let count = self.anon_edge_counter.get();
1942 self.anon_edge_counter.set(count + 1);
1943 format!("_anon_edge_{}", count)
1944 });
1945 columns.push(edge_col_name);
1946 columns.push(expand.to_variable.clone());
1947
1948 is_first = false;
1949 }
1950
1951 let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1953
1954 if let Some(tx_id) = self.tx_id {
1955 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1956 } else {
1957 lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1958 }
1959
1960 let factorized_aggs: Vec<FactorizedAggregate> = agg
1962 .aggregates
1963 .iter()
1964 .map(|agg_expr| {
1965 match agg_expr.function {
1966 LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1967 if agg_expr.expression.is_none() {
1969 FactorizedAggregate::count()
1970 } else {
1971 FactorizedAggregate::count_column(1) }
1975 }
1976 LogicalAggregateFunction::Sum => {
1977 FactorizedAggregate::sum(1)
1979 }
1980 LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1981 LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1982 LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1983 _ => {
1984 FactorizedAggregate::count()
1986 }
1987 }
1988 })
1989 .collect();
1990
1991 let output_columns: Vec<String> = agg
1993 .aggregates
1994 .iter()
1995 .map(|agg_expr| {
1996 agg_expr
1997 .alias
1998 .clone()
1999 .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
2000 })
2001 .collect();
2002
2003 let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
2005
2006 Ok((Box::new(factorized_agg_op), output_columns))
2007 }
2008
2009 #[allow(dead_code)]
2011 fn resolve_expression_to_column(
2012 &self,
2013 expr: &LogicalExpression,
2014 variable_columns: &HashMap<String, usize>,
2015 ) -> Result<usize> {
2016 match expr {
2017 LogicalExpression::Variable(name) => variable_columns
2018 .get(name)
2019 .copied()
2020 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2021 LogicalExpression::Property { variable, .. } => variable_columns
2022 .get(variable)
2023 .copied()
2024 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
2025 _ => Err(Error::Internal(format!(
2026 "Cannot resolve expression to column: {:?}",
2027 expr
2028 ))),
2029 }
2030 }
2031
2032 fn resolve_expression_to_column_with_properties(
2036 &self,
2037 expr: &LogicalExpression,
2038 variable_columns: &HashMap<String, usize>,
2039 ) -> Result<usize> {
2040 match expr {
2041 LogicalExpression::Variable(name) => variable_columns
2042 .get(name)
2043 .copied()
2044 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2045 LogicalExpression::Property { variable, property } => {
2046 let col_name = format!("{}_{}", variable, property);
2048 variable_columns.get(&col_name).copied().ok_or_else(|| {
2049 Error::Internal(format!(
2050 "Property column '{}' not found (from {}.{})",
2051 col_name, variable, property
2052 ))
2053 })
2054 }
2055 _ => Err(Error::Internal(format!(
2056 "Cannot resolve expression to column: {:?}",
2057 expr
2058 ))),
2059 }
2060 }
2061
2062 fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
2064 match expr {
2065 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2066 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2067 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2068 variable: variable.clone(),
2069 property: property.clone(),
2070 }),
2071 LogicalExpression::Binary { left, op, right } => {
2072 let left_expr = self.convert_expression(left)?;
2073 let right_expr = self.convert_expression(right)?;
2074 let filter_op = convert_binary_op(*op)?;
2075 Ok(FilterExpression::Binary {
2076 left: Box::new(left_expr),
2077 op: filter_op,
2078 right: Box::new(right_expr),
2079 })
2080 }
2081 LogicalExpression::Unary { op, operand } => {
2082 let operand_expr = self.convert_expression(operand)?;
2083 let filter_op = convert_unary_op(*op)?;
2084 Ok(FilterExpression::Unary {
2085 op: filter_op,
2086 operand: Box::new(operand_expr),
2087 })
2088 }
2089 LogicalExpression::FunctionCall { name, args, .. } => {
2090 let filter_args: Vec<FilterExpression> = args
2091 .iter()
2092 .map(|a| self.convert_expression(a))
2093 .collect::<Result<Vec<_>>>()?;
2094 Ok(FilterExpression::FunctionCall {
2095 name: name.clone(),
2096 args: filter_args,
2097 })
2098 }
2099 LogicalExpression::Case {
2100 operand,
2101 when_clauses,
2102 else_clause,
2103 } => {
2104 let filter_operand = operand
2105 .as_ref()
2106 .map(|e| self.convert_expression(e))
2107 .transpose()?
2108 .map(Box::new);
2109 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2110 .iter()
2111 .map(|(cond, result)| {
2112 Ok((
2113 self.convert_expression(cond)?,
2114 self.convert_expression(result)?,
2115 ))
2116 })
2117 .collect::<Result<Vec<_>>>()?;
2118 let filter_else = else_clause
2119 .as_ref()
2120 .map(|e| self.convert_expression(e))
2121 .transpose()?
2122 .map(Box::new);
2123 Ok(FilterExpression::Case {
2124 operand: filter_operand,
2125 when_clauses: filter_when_clauses,
2126 else_clause: filter_else,
2127 })
2128 }
2129 LogicalExpression::List(items) => {
2130 let filter_items: Vec<FilterExpression> = items
2131 .iter()
2132 .map(|item| self.convert_expression(item))
2133 .collect::<Result<Vec<_>>>()?;
2134 Ok(FilterExpression::List(filter_items))
2135 }
2136 LogicalExpression::Map(pairs) => {
2137 let filter_pairs: Vec<(String, FilterExpression)> = pairs
2138 .iter()
2139 .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2140 .collect::<Result<Vec<_>>>()?;
2141 Ok(FilterExpression::Map(filter_pairs))
2142 }
2143 LogicalExpression::IndexAccess { base, index } => {
2144 let base_expr = self.convert_expression(base)?;
2145 let index_expr = self.convert_expression(index)?;
2146 Ok(FilterExpression::IndexAccess {
2147 base: Box::new(base_expr),
2148 index: Box::new(index_expr),
2149 })
2150 }
2151 LogicalExpression::SliceAccess { base, start, end } => {
2152 let base_expr = self.convert_expression(base)?;
2153 let start_expr = start
2154 .as_ref()
2155 .map(|s| self.convert_expression(s))
2156 .transpose()?
2157 .map(Box::new);
2158 let end_expr = end
2159 .as_ref()
2160 .map(|e| self.convert_expression(e))
2161 .transpose()?
2162 .map(Box::new);
2163 Ok(FilterExpression::SliceAccess {
2164 base: Box::new(base_expr),
2165 start: start_expr,
2166 end: end_expr,
2167 })
2168 }
2169 LogicalExpression::Parameter(_) => Err(Error::Internal(
2170 "Parameters not yet supported in filters".to_string(),
2171 )),
2172 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2173 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2174 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2175 LogicalExpression::ListComprehension {
2176 variable,
2177 list_expr,
2178 filter_expr,
2179 map_expr,
2180 } => {
2181 let list = self.convert_expression(list_expr)?;
2182 let filter = filter_expr
2183 .as_ref()
2184 .map(|f| self.convert_expression(f))
2185 .transpose()?
2186 .map(Box::new);
2187 let map = self.convert_expression(map_expr)?;
2188 Ok(FilterExpression::ListComprehension {
2189 variable: variable.clone(),
2190 list_expr: Box::new(list),
2191 filter_expr: filter,
2192 map_expr: Box::new(map),
2193 })
2194 }
2195 LogicalExpression::ExistsSubquery(subplan) => {
2196 let (start_var, direction, edge_type, end_labels) =
2199 self.extract_exists_pattern(subplan)?;
2200
2201 Ok(FilterExpression::ExistsSubquery {
2202 start_var,
2203 direction,
2204 edge_type,
2205 end_labels,
2206 min_hops: None,
2207 max_hops: None,
2208 })
2209 }
2210 LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2211 "COUNT subqueries not yet supported".to_string(),
2212 )),
2213 }
2214 }
2215
2216 fn extract_exists_pattern(
2219 &self,
2220 subplan: &LogicalOperator,
2221 ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2222 match subplan {
2223 LogicalOperator::Expand(expand) => {
2224 let end_labels = self.extract_end_labels_from_expand(expand);
2226 let direction = match expand.direction {
2227 ExpandDirection::Outgoing => Direction::Outgoing,
2228 ExpandDirection::Incoming => Direction::Incoming,
2229 ExpandDirection::Both => Direction::Both,
2230 };
2231 Ok((
2232 expand.from_variable.clone(),
2233 direction,
2234 expand.edge_type.clone(),
2235 end_labels,
2236 ))
2237 }
2238 LogicalOperator::NodeScan(scan) => {
2239 if let Some(input) = &scan.input {
2240 self.extract_exists_pattern(input)
2241 } else {
2242 Err(Error::Internal(
2243 "EXISTS subquery must contain an edge pattern".to_string(),
2244 ))
2245 }
2246 }
2247 LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2248 _ => Err(Error::Internal(
2249 "Unsupported EXISTS subquery pattern".to_string(),
2250 )),
2251 }
2252 }
2253
2254 fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2256 match expand.input.as_ref() {
2258 LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2259 _ => None,
2260 }
2261 }
2262
2263 fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2265 let (left_op, left_columns) = self.plan_operator(&join.left)?;
2266 let (right_op, right_columns) = self.plan_operator(&join.right)?;
2267
2268 let mut columns = left_columns.clone();
2270 columns.extend(right_columns.clone());
2271
2272 let physical_join_type = match join.join_type {
2274 JoinType::Inner => PhysicalJoinType::Inner,
2275 JoinType::Left => PhysicalJoinType::Left,
2276 JoinType::Right => PhysicalJoinType::Right,
2277 JoinType::Full => PhysicalJoinType::Full,
2278 JoinType::Cross => PhysicalJoinType::Cross,
2279 JoinType::Semi => PhysicalJoinType::Semi,
2280 JoinType::Anti => PhysicalJoinType::Anti,
2281 };
2282
2283 let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2285 (vec![], vec![])
2287 } else {
2288 join.conditions
2289 .iter()
2290 .filter_map(|cond| {
2291 let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2293 let right_idx = self
2294 .expression_to_column(&cond.right, &right_columns)
2295 .ok()?;
2296 Some((left_idx, right_idx))
2297 })
2298 .unzip()
2299 };
2300
2301 let output_schema = self.derive_schema_from_columns(&columns);
2302
2303 let _ = LeapfrogJoinOperator::new; let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2311 left_op,
2312 right_op,
2313 probe_keys,
2314 build_keys,
2315 physical_join_type,
2316 output_schema,
2317 ));
2318
2319 Ok((operator, columns))
2320 }
2321
2322 #[allow(dead_code)]
2331 fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2332 let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2334 let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2335
2336 Self::collect_join_edges(
2338 &LogicalOperator::Join(join.clone()),
2339 &mut edges,
2340 &mut all_vars,
2341 );
2342
2343 if all_vars.len() < 3 {
2345 return false;
2346 }
2347
2348 Self::has_cycle(&edges, &all_vars)
2350 }
2351
2352 fn collect_join_edges(
2354 op: &LogicalOperator,
2355 edges: &mut HashMap<String, Vec<String>>,
2356 vars: &mut std::collections::HashSet<String>,
2357 ) {
2358 match op {
2359 LogicalOperator::Join(join) => {
2360 for cond in &join.conditions {
2362 if let (Some(left_var), Some(right_var)) = (
2363 Self::extract_join_variable(&cond.left),
2364 Self::extract_join_variable(&cond.right),
2365 ) && left_var != right_var
2366 {
2367 vars.insert(left_var.clone());
2368 vars.insert(right_var.clone());
2369
2370 edges
2372 .entry(left_var.clone())
2373 .or_default()
2374 .push(right_var.clone());
2375 edges.entry(right_var).or_default().push(left_var);
2376 }
2377 }
2378
2379 Self::collect_join_edges(&join.left, edges, vars);
2381 Self::collect_join_edges(&join.right, edges, vars);
2382 }
2383 LogicalOperator::Expand(expand) => {
2384 vars.insert(expand.from_variable.clone());
2386 vars.insert(expand.to_variable.clone());
2387
2388 edges
2389 .entry(expand.from_variable.clone())
2390 .or_default()
2391 .push(expand.to_variable.clone());
2392 edges
2393 .entry(expand.to_variable.clone())
2394 .or_default()
2395 .push(expand.from_variable.clone());
2396
2397 Self::collect_join_edges(&expand.input, edges, vars);
2398 }
2399 LogicalOperator::Filter(filter) => {
2400 Self::collect_join_edges(&filter.input, edges, vars);
2401 }
2402 LogicalOperator::NodeScan(scan) => {
2403 vars.insert(scan.variable.clone());
2404 }
2405 _ => {}
2406 }
2407 }
2408
2409 fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2411 match expr {
2412 LogicalExpression::Variable(v) => Some(v.clone()),
2413 LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2414 LogicalExpression::Id(v) => Some(v.clone()),
2415 _ => None,
2416 }
2417 }
2418
2419 fn has_cycle(
2423 edges: &HashMap<String, Vec<String>>,
2424 vars: &std::collections::HashSet<String>,
2425 ) -> bool {
2426 let mut color: HashMap<&String, u8> = HashMap::new();
2427
2428 for var in vars {
2429 color.insert(var, 0);
2430 }
2431
2432 for start in vars {
2433 if color[start] == 0 && Self::dfs_cycle(start, None, edges, &mut color) {
2434 return true;
2435 }
2436 }
2437
2438 false
2439 }
2440
2441 fn dfs_cycle(
2443 node: &String,
2444 parent: Option<&String>,
2445 edges: &HashMap<String, Vec<String>>,
2446 color: &mut HashMap<&String, u8>,
2447 ) -> bool {
2448 *color.get_mut(node).unwrap() = 1; if let Some(neighbors) = edges.get(node) {
2451 for neighbor in neighbors {
2452 if parent == Some(neighbor) {
2454 continue;
2455 }
2456
2457 if let Some(&c) = color.get(neighbor) {
2458 if c == 1 {
2459 return true;
2461 }
2462 if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2463 return true;
2464 }
2465 }
2466 }
2467 }
2468
2469 *color.get_mut(node).unwrap() = 2; false
2471 }
2472
2473 #[allow(dead_code)]
2475 fn count_relations(op: &LogicalOperator) -> usize {
2476 match op {
2477 LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2478 LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2479 LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2480 LogicalOperator::Join(j) => {
2481 Self::count_relations(&j.left) + Self::count_relations(&j.right)
2482 }
2483 _ => 0,
2484 }
2485 }
2486
2487 fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2489 match expr {
2490 LogicalExpression::Variable(name) => columns
2491 .iter()
2492 .position(|c| c == name)
2493 .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2494 _ => Err(Error::Internal(
2495 "Only variables supported in join conditions".to_string(),
2496 )),
2497 }
2498 }
2499
2500 fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2502 if union.inputs.is_empty() {
2503 return Err(Error::Internal(
2504 "Union requires at least one input".to_string(),
2505 ));
2506 }
2507
2508 let mut inputs = Vec::with_capacity(union.inputs.len());
2509 let mut columns = Vec::new();
2510
2511 for (i, input) in union.inputs.iter().enumerate() {
2512 let (op, cols) = self.plan_operator(input)?;
2513 if i == 0 {
2514 columns = cols;
2515 }
2516 inputs.push(op);
2517 }
2518
2519 let output_schema = self.derive_schema_from_columns(&columns);
2520 let operator = Box::new(UnionOperator::new(inputs, output_schema));
2521
2522 Ok((operator, columns))
2523 }
2524
2525 fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2527 let (input_op, columns) = self.plan_operator(&distinct.input)?;
2528 let output_schema = self.derive_schema_from_columns(&columns);
2529 let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2530 Ok((operator, columns))
2531 }
2532
2533 fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2535 let (input_op, mut columns) = if let Some(ref input) = create.input {
2537 let (op, cols) = self.plan_operator(input)?;
2538 (Some(op), cols)
2539 } else {
2540 (None, vec![])
2541 };
2542
2543 let output_column = columns.len();
2545 columns.push(create.variable.clone());
2546
2547 let properties: Vec<(String, PropertySource)> = create
2549 .properties
2550 .iter()
2551 .map(|(name, expr)| {
2552 let source = match Self::try_fold_expression(expr) {
2553 Some(value) => PropertySource::Constant(value),
2554 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2555 };
2556 (name.clone(), source)
2557 })
2558 .collect();
2559
2560 let output_schema = self.derive_schema_from_columns(&columns);
2561
2562 let operator = Box::new(
2563 CreateNodeOperator::new(
2564 Arc::clone(&self.store),
2565 input_op,
2566 create.labels.clone(),
2567 properties,
2568 output_schema,
2569 output_column,
2570 )
2571 .with_tx_context(self.viewing_epoch, self.tx_id),
2572 );
2573
2574 Ok((operator, columns))
2575 }
2576
2577 fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2579 let (input_op, mut columns) = self.plan_operator(&create.input)?;
2580
2581 let from_column = columns
2583 .iter()
2584 .position(|c| c == &create.from_variable)
2585 .ok_or_else(|| {
2586 Error::Internal(format!(
2587 "Source variable '{}' not found",
2588 create.from_variable
2589 ))
2590 })?;
2591
2592 let to_column = columns
2593 .iter()
2594 .position(|c| c == &create.to_variable)
2595 .ok_or_else(|| {
2596 Error::Internal(format!(
2597 "Target variable '{}' not found",
2598 create.to_variable
2599 ))
2600 })?;
2601
2602 let output_column = create.variable.as_ref().map(|v| {
2604 let idx = columns.len();
2605 columns.push(v.clone());
2606 idx
2607 });
2608
2609 let properties: Vec<(String, PropertySource)> = create
2611 .properties
2612 .iter()
2613 .map(|(name, expr)| {
2614 let source = match Self::try_fold_expression(expr) {
2615 Some(value) => PropertySource::Constant(value),
2616 None => PropertySource::Constant(grafeo_common::types::Value::Null),
2617 };
2618 (name.clone(), source)
2619 })
2620 .collect();
2621
2622 let output_schema = self.derive_schema_from_columns(&columns);
2623
2624 let mut operator = CreateEdgeOperator::new(
2625 Arc::clone(&self.store),
2626 input_op,
2627 from_column,
2628 to_column,
2629 create.edge_type.clone(),
2630 output_schema,
2631 )
2632 .with_properties(properties)
2633 .with_tx_context(self.viewing_epoch, self.tx_id);
2634
2635 if let Some(col) = output_column {
2636 operator = operator.with_output_column(col);
2637 }
2638
2639 let operator = Box::new(operator);
2640
2641 Ok((operator, columns))
2642 }
2643
2644 fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2646 let (input_op, columns) = self.plan_operator(&delete.input)?;
2647
2648 let node_column = columns
2649 .iter()
2650 .position(|c| c == &delete.variable)
2651 .ok_or_else(|| {
2652 Error::Internal(format!(
2653 "Variable '{}' not found for delete",
2654 delete.variable
2655 ))
2656 })?;
2657
2658 let output_schema = vec![LogicalType::Int64];
2660 let output_columns = vec!["deleted_count".to_string()];
2661
2662 let operator = Box::new(
2663 DeleteNodeOperator::new(
2664 Arc::clone(&self.store),
2665 input_op,
2666 node_column,
2667 output_schema,
2668 delete.detach, )
2670 .with_tx_context(self.viewing_epoch, self.tx_id),
2671 );
2672
2673 Ok((operator, output_columns))
2674 }
2675
2676 fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2678 let (input_op, columns) = self.plan_operator(&delete.input)?;
2679
2680 let edge_column = columns
2681 .iter()
2682 .position(|c| c == &delete.variable)
2683 .ok_or_else(|| {
2684 Error::Internal(format!(
2685 "Variable '{}' not found for delete",
2686 delete.variable
2687 ))
2688 })?;
2689
2690 let output_schema = vec![LogicalType::Int64];
2692 let output_columns = vec!["deleted_count".to_string()];
2693
2694 let operator = Box::new(
2695 DeleteEdgeOperator::new(
2696 Arc::clone(&self.store),
2697 input_op,
2698 edge_column,
2699 output_schema,
2700 )
2701 .with_tx_context(self.viewing_epoch, self.tx_id),
2702 );
2703
2704 Ok((operator, output_columns))
2705 }
2706
2707 fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2709 let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2710 let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2711
2712 let mut columns = left_columns.clone();
2714 columns.extend(right_columns.clone());
2715
2716 let mut probe_keys = Vec::new();
2718 let mut build_keys = Vec::new();
2719
2720 for (right_idx, right_col) in right_columns.iter().enumerate() {
2721 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2722 probe_keys.push(left_idx);
2723 build_keys.push(right_idx);
2724 }
2725 }
2726
2727 let output_schema = self.derive_schema_from_columns(&columns);
2728
2729 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2730 left_op,
2731 right_op,
2732 probe_keys,
2733 build_keys,
2734 PhysicalJoinType::Left,
2735 output_schema,
2736 ));
2737
2738 Ok((operator, columns))
2739 }
2740
2741 fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2743 let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2744 let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2745
2746 let columns = left_columns.clone();
2748
2749 let mut probe_keys = Vec::new();
2751 let mut build_keys = Vec::new();
2752
2753 for (right_idx, right_col) in right_columns.iter().enumerate() {
2754 if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2755 probe_keys.push(left_idx);
2756 build_keys.push(right_idx);
2757 }
2758 }
2759
2760 let output_schema = self.derive_schema_from_columns(&columns);
2761
2762 let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2763 left_op,
2764 right_op,
2765 probe_keys,
2766 build_keys,
2767 PhysicalJoinType::Anti,
2768 output_schema,
2769 ));
2770
2771 Ok((operator, columns))
2772 }
2773
2774 fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2776 let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2779 if matches!(&*unwind.input, LogicalOperator::Empty) {
2780 let literal_list = self.convert_expression(&unwind.expression)?;
2785
2786 let single_row_op: Box<dyn Operator> = Box::new(
2788 grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2789 );
2790 let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2791 single_row_op,
2792 vec![ProjectExpr::Expression {
2793 expr: literal_list,
2794 variable_columns: HashMap::new(),
2795 }],
2796 vec![LogicalType::Any],
2797 Arc::clone(&self.store),
2798 ));
2799
2800 (project_op, vec!["__list__".to_string()])
2801 } else {
2802 self.plan_operator(&unwind.input)?
2803 };
2804
2805 let list_col_idx = match &unwind.expression {
2811 LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2812 LogicalExpression::Property { variable, .. } => {
2813 input_columns.iter().position(|c| c == variable)
2816 }
2817 LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2818 None
2820 }
2821 _ => None,
2822 };
2823
2824 let mut columns = input_columns.clone();
2826 columns.push(unwind.variable.clone());
2827
2828 let mut output_schema = self.derive_schema_from_columns(&input_columns);
2830 output_schema.push(LogicalType::Any); let col_idx = list_col_idx.unwrap_or(0);
2835
2836 let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2837 input_op,
2838 col_idx,
2839 unwind.variable.clone(),
2840 output_schema,
2841 ));
2842
2843 Ok((operator, columns))
2844 }
2845
2846 fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2848 let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2850 Vec::new()
2851 } else {
2852 let (_input_op, cols) = self.plan_operator(&merge.input)?;
2853 cols
2854 };
2855
2856 let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2858 .match_properties
2859 .iter()
2860 .filter_map(|(name, expr)| {
2861 if let LogicalExpression::Literal(v) = expr {
2862 Some((name.clone(), v.clone()))
2863 } else {
2864 None }
2866 })
2867 .collect();
2868
2869 let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2871 .on_create
2872 .iter()
2873 .filter_map(|(name, expr)| {
2874 if let LogicalExpression::Literal(v) = expr {
2875 Some((name.clone(), v.clone()))
2876 } else {
2877 None
2878 }
2879 })
2880 .collect();
2881
2882 let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2884 .on_match
2885 .iter()
2886 .filter_map(|(name, expr)| {
2887 if let LogicalExpression::Literal(v) = expr {
2888 Some((name.clone(), v.clone()))
2889 } else {
2890 None
2891 }
2892 })
2893 .collect();
2894
2895 columns.push(merge.variable.clone());
2897
2898 let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2899 Arc::clone(&self.store),
2900 merge.variable.clone(),
2901 merge.labels.clone(),
2902 match_properties,
2903 on_create_properties,
2904 on_match_properties,
2905 ));
2906
2907 Ok((operator, columns))
2908 }
2909
2910 fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2912 let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2914
2915 let source_column = columns
2917 .iter()
2918 .position(|c| c == &sp.source_var)
2919 .ok_or_else(|| {
2920 Error::Internal(format!(
2921 "Source variable '{}' not found for shortestPath",
2922 sp.source_var
2923 ))
2924 })?;
2925
2926 let target_column = columns
2927 .iter()
2928 .position(|c| c == &sp.target_var)
2929 .ok_or_else(|| {
2930 Error::Internal(format!(
2931 "Target variable '{}' not found for shortestPath",
2932 sp.target_var
2933 ))
2934 })?;
2935
2936 let direction = match sp.direction {
2938 ExpandDirection::Outgoing => Direction::Outgoing,
2939 ExpandDirection::Incoming => Direction::Incoming,
2940 ExpandDirection::Both => Direction::Both,
2941 };
2942
2943 let operator: Box<dyn Operator> = Box::new(
2945 ShortestPathOperator::new(
2946 Arc::clone(&self.store),
2947 input_op,
2948 source_column,
2949 target_column,
2950 sp.edge_type.clone(),
2951 direction,
2952 )
2953 .with_all_paths(sp.all_paths),
2954 );
2955
2956 columns.push(format!("_path_length_{}", sp.path_alias));
2959
2960 Ok((operator, columns))
2961 }
2962
2963 fn plan_call_procedure(
2965 &self,
2966 call: &CallProcedureOp,
2967 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2968 use crate::procedures::{self, BuiltinProcedures};
2969
2970 static PROCEDURES: std::sync::OnceLock<BuiltinProcedures> = std::sync::OnceLock::new();
2971 let registry = PROCEDURES.get_or_init(BuiltinProcedures::new);
2972
2973 let resolved_name = call.name.join(".");
2975 if resolved_name == "grafeo.procedures" || resolved_name == "procedures" {
2976 let result = procedures::procedures_result(registry);
2977 return self.plan_static_result(result, &call.yield_items);
2978 }
2979
2980 let algorithm = registry.get(&call.name).ok_or_else(|| {
2982 Error::Internal(format!(
2983 "Unknown procedure: '{}'. Use CALL grafeo.procedures() to list available procedures.",
2984 call.name.join(".")
2985 ))
2986 })?;
2987
2988 let params = procedures::evaluate_arguments(&call.arguments, algorithm.parameters());
2990
2991 let canonical_columns = procedures::output_columns_for_name(algorithm.as_ref());
2993
2994 let yield_columns = call.yield_items.as_ref().map(|items| {
2996 items
2997 .iter()
2998 .map(|item| (item.field_name.clone(), item.alias.clone()))
2999 .collect::<Vec<_>>()
3000 });
3001
3002 let output_columns = if let Some(yield_cols) = &yield_columns {
3003 yield_cols
3004 .iter()
3005 .map(|(name, alias)| alias.clone().unwrap_or_else(|| name.clone()))
3006 .collect()
3007 } else {
3008 canonical_columns.clone()
3009 };
3010
3011 let operator = Box::new(
3012 crate::query::executor::procedure_call::ProcedureCallOperator::new(
3013 Arc::clone(&self.store),
3014 algorithm,
3015 params,
3016 yield_columns,
3017 canonical_columns,
3018 ),
3019 );
3020
3021 Ok((operator, output_columns))
3022 }
3023
3024 fn plan_static_result(
3026 &self,
3027 result: grafeo_adapters::plugins::AlgorithmResult,
3028 yield_items: &Option<Vec<crate::query::plan::ProcedureYield>>,
3029 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3030 let (output_columns, column_indices) = if let Some(items) = yield_items {
3032 let mut cols = Vec::new();
3033 let mut indices = Vec::new();
3034 for item in items {
3035 let idx = result
3036 .columns
3037 .iter()
3038 .position(|c| c == &item.field_name)
3039 .ok_or_else(|| {
3040 Error::Internal(format!(
3041 "YIELD column '{}' not found (available: {})",
3042 item.field_name,
3043 result.columns.join(", ")
3044 ))
3045 })?;
3046 indices.push(idx);
3047 cols.push(
3048 item.alias
3049 .clone()
3050 .unwrap_or_else(|| item.field_name.clone()),
3051 );
3052 }
3053 (cols, indices)
3054 } else {
3055 let indices: Vec<usize> = (0..result.columns.len()).collect();
3056 (result.columns.clone(), indices)
3057 };
3058
3059 let operator = Box::new(StaticResultOperator {
3060 rows: result.rows,
3061 column_indices,
3062 row_index: 0,
3063 });
3064
3065 Ok((operator, output_columns))
3066 }
3067
3068 fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
3070 let (input_op, columns) = self.plan_operator(&add_label.input)?;
3071
3072 let node_column = columns
3074 .iter()
3075 .position(|c| c == &add_label.variable)
3076 .ok_or_else(|| {
3077 Error::Internal(format!(
3078 "Variable '{}' not found for ADD LABEL",
3079 add_label.variable
3080 ))
3081 })?;
3082
3083 let output_schema = vec![LogicalType::Int64];
3085 let output_columns = vec!["labels_added".to_string()];
3086
3087 let operator = Box::new(AddLabelOperator::new(
3088 Arc::clone(&self.store),
3089 input_op,
3090 node_column,
3091 add_label.labels.clone(),
3092 output_schema,
3093 ));
3094
3095 Ok((operator, output_columns))
3096 }
3097
3098 fn plan_remove_label(
3100 &self,
3101 remove_label: &RemoveLabelOp,
3102 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3103 let (input_op, columns) = self.plan_operator(&remove_label.input)?;
3104
3105 let node_column = columns
3107 .iter()
3108 .position(|c| c == &remove_label.variable)
3109 .ok_or_else(|| {
3110 Error::Internal(format!(
3111 "Variable '{}' not found for REMOVE LABEL",
3112 remove_label.variable
3113 ))
3114 })?;
3115
3116 let output_schema = vec![LogicalType::Int64];
3118 let output_columns = vec!["labels_removed".to_string()];
3119
3120 let operator = Box::new(RemoveLabelOperator::new(
3121 Arc::clone(&self.store),
3122 input_op,
3123 node_column,
3124 remove_label.labels.clone(),
3125 output_schema,
3126 ));
3127
3128 Ok((operator, output_columns))
3129 }
3130
3131 fn plan_set_property(
3133 &self,
3134 set_prop: &SetPropertyOp,
3135 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3136 let (input_op, columns) = self.plan_operator(&set_prop.input)?;
3137
3138 let entity_column = columns
3140 .iter()
3141 .position(|c| c == &set_prop.variable)
3142 .ok_or_else(|| {
3143 Error::Internal(format!(
3144 "Variable '{}' not found for SET",
3145 set_prop.variable
3146 ))
3147 })?;
3148
3149 let properties: Vec<(String, PropertySource)> = set_prop
3151 .properties
3152 .iter()
3153 .map(|(name, expr)| {
3154 let source = self.expression_to_property_source(expr, &columns)?;
3155 Ok((name.clone(), source))
3156 })
3157 .collect::<Result<Vec<_>>>()?;
3158
3159 let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
3161 let output_columns = columns.clone();
3162
3163 let operator = Box::new(SetPropertyOperator::new_for_node(
3165 Arc::clone(&self.store),
3166 input_op,
3167 entity_column,
3168 properties,
3169 output_schema,
3170 ));
3171
3172 Ok((operator, output_columns))
3173 }
3174
3175 fn expression_to_property_source(
3177 &self,
3178 expr: &LogicalExpression,
3179 columns: &[String],
3180 ) -> Result<PropertySource> {
3181 match expr {
3182 LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
3183 LogicalExpression::Variable(name) => {
3184 let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
3185 Error::Internal(format!("Variable '{}' not found for property source", name))
3186 })?;
3187 Ok(PropertySource::Column(col_idx))
3188 }
3189 LogicalExpression::Parameter(name) => {
3190 Ok(PropertySource::Constant(
3193 grafeo_common::types::Value::String(format!("${}", name).into()),
3194 ))
3195 }
3196 _ => {
3197 if let Some(value) = Self::try_fold_expression(expr) {
3198 Ok(PropertySource::Constant(value))
3199 } else {
3200 Err(Error::Internal(format!(
3201 "Unsupported expression type for property source: {:?}",
3202 expr
3203 )))
3204 }
3205 }
3206 }
3207 }
3208
3209 fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
3215 match expr {
3216 LogicalExpression::Literal(v) => Some(v.clone()),
3217 LogicalExpression::List(items) => {
3218 let values: Option<Vec<Value>> =
3219 items.iter().map(Self::try_fold_expression).collect();
3220 let values = values?;
3221 let all_numeric = !values.is_empty()
3223 && values
3224 .iter()
3225 .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
3226 if all_numeric {
3227 let floats: Vec<f32> = values
3228 .iter()
3229 .filter_map(|v| match v {
3230 Value::Float64(f) => Some(*f as f32),
3231 Value::Int64(i) => Some(*i as f32),
3232 _ => None,
3233 })
3234 .collect();
3235 Some(Value::Vector(floats.into()))
3236 } else {
3237 Some(Value::List(values.into()))
3238 }
3239 }
3240 LogicalExpression::FunctionCall { name, args, .. } => {
3241 match name.to_lowercase().as_str() {
3242 "vector" => {
3243 if args.len() != 1 {
3244 return None;
3245 }
3246 let val = Self::try_fold_expression(&args[0])?;
3247 match val {
3248 Value::List(items) => {
3249 let floats: Vec<f32> = items
3250 .iter()
3251 .filter_map(|v| match v {
3252 Value::Float64(f) => Some(*f as f32),
3253 Value::Int64(i) => Some(*i as f32),
3254 _ => None,
3255 })
3256 .collect();
3257 if floats.len() == items.len() {
3258 Some(Value::Vector(floats.into()))
3259 } else {
3260 None
3261 }
3262 }
3263 Value::Vector(v) => Some(Value::Vector(v)),
3265 _ => None,
3266 }
3267 }
3268 _ => None,
3269 }
3270 }
3271 _ => None,
3272 }
3273 }
3274}
3275
3276pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3278 match op {
3279 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3280 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3281 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3282 BinaryOp::Le => Ok(BinaryFilterOp::Le),
3283 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3284 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3285 BinaryOp::And => Ok(BinaryFilterOp::And),
3286 BinaryOp::Or => Ok(BinaryFilterOp::Or),
3287 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3288 BinaryOp::Add => Ok(BinaryFilterOp::Add),
3289 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3290 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3291 BinaryOp::Div => Ok(BinaryFilterOp::Div),
3292 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3293 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3294 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3295 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3296 BinaryOp::In => Ok(BinaryFilterOp::In),
3297 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3298 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3299 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3300 "Binary operator {:?} not yet supported in filters",
3301 op
3302 ))),
3303 }
3304}
3305
3306pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3308 match op {
3309 UnaryOp::Not => Ok(UnaryFilterOp::Not),
3310 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3311 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3312 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3313 }
3314}
3315
3316pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3318 match func {
3319 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3320 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3321 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3322 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3323 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3324 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3325 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3326 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3327 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3328 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3329 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3330 }
3331}
3332
3333pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3337 match expr {
3338 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3339 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3340 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3341 variable: variable.clone(),
3342 property: property.clone(),
3343 }),
3344 LogicalExpression::Binary { left, op, right } => {
3345 let left_expr = convert_filter_expression(left)?;
3346 let right_expr = convert_filter_expression(right)?;
3347 let filter_op = convert_binary_op(*op)?;
3348 Ok(FilterExpression::Binary {
3349 left: Box::new(left_expr),
3350 op: filter_op,
3351 right: Box::new(right_expr),
3352 })
3353 }
3354 LogicalExpression::Unary { op, operand } => {
3355 let operand_expr = convert_filter_expression(operand)?;
3356 let filter_op = convert_unary_op(*op)?;
3357 Ok(FilterExpression::Unary {
3358 op: filter_op,
3359 operand: Box::new(operand_expr),
3360 })
3361 }
3362 LogicalExpression::FunctionCall { name, args, .. } => {
3363 let filter_args: Vec<FilterExpression> = args
3364 .iter()
3365 .map(convert_filter_expression)
3366 .collect::<Result<Vec<_>>>()?;
3367 Ok(FilterExpression::FunctionCall {
3368 name: name.clone(),
3369 args: filter_args,
3370 })
3371 }
3372 LogicalExpression::Case {
3373 operand,
3374 when_clauses,
3375 else_clause,
3376 } => {
3377 let filter_operand = operand
3378 .as_ref()
3379 .map(|e| convert_filter_expression(e))
3380 .transpose()?
3381 .map(Box::new);
3382 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3383 .iter()
3384 .map(|(cond, result)| {
3385 Ok((
3386 convert_filter_expression(cond)?,
3387 convert_filter_expression(result)?,
3388 ))
3389 })
3390 .collect::<Result<Vec<_>>>()?;
3391 let filter_else = else_clause
3392 .as_ref()
3393 .map(|e| convert_filter_expression(e))
3394 .transpose()?
3395 .map(Box::new);
3396 Ok(FilterExpression::Case {
3397 operand: filter_operand,
3398 when_clauses: filter_when_clauses,
3399 else_clause: filter_else,
3400 })
3401 }
3402 LogicalExpression::List(items) => {
3403 let filter_items: Vec<FilterExpression> = items
3404 .iter()
3405 .map(convert_filter_expression)
3406 .collect::<Result<Vec<_>>>()?;
3407 Ok(FilterExpression::List(filter_items))
3408 }
3409 LogicalExpression::Map(pairs) => {
3410 let filter_pairs: Vec<(String, FilterExpression)> = pairs
3411 .iter()
3412 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3413 .collect::<Result<Vec<_>>>()?;
3414 Ok(FilterExpression::Map(filter_pairs))
3415 }
3416 LogicalExpression::IndexAccess { base, index } => {
3417 let base_expr = convert_filter_expression(base)?;
3418 let index_expr = convert_filter_expression(index)?;
3419 Ok(FilterExpression::IndexAccess {
3420 base: Box::new(base_expr),
3421 index: Box::new(index_expr),
3422 })
3423 }
3424 LogicalExpression::SliceAccess { base, start, end } => {
3425 let base_expr = convert_filter_expression(base)?;
3426 let start_expr = start
3427 .as_ref()
3428 .map(|s| convert_filter_expression(s))
3429 .transpose()?
3430 .map(Box::new);
3431 let end_expr = end
3432 .as_ref()
3433 .map(|e| convert_filter_expression(e))
3434 .transpose()?
3435 .map(Box::new);
3436 Ok(FilterExpression::SliceAccess {
3437 base: Box::new(base_expr),
3438 start: start_expr,
3439 end: end_expr,
3440 })
3441 }
3442 LogicalExpression::Parameter(_) => Err(Error::Internal(
3443 "Parameters not yet supported in filters".to_string(),
3444 )),
3445 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3446 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3447 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3448 LogicalExpression::ListComprehension {
3449 variable,
3450 list_expr,
3451 filter_expr,
3452 map_expr,
3453 } => {
3454 let list = convert_filter_expression(list_expr)?;
3455 let filter = filter_expr
3456 .as_ref()
3457 .map(|f| convert_filter_expression(f))
3458 .transpose()?
3459 .map(Box::new);
3460 let map = convert_filter_expression(map_expr)?;
3461 Ok(FilterExpression::ListComprehension {
3462 variable: variable.clone(),
3463 list_expr: Box::new(list),
3464 filter_expr: filter,
3465 map_expr: Box::new(map),
3466 })
3467 }
3468 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3469 Error::Internal("Subqueries not yet supported in filters".to_string()),
3470 ),
3471 }
3472}
3473
3474fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3476 use grafeo_common::types::Value;
3477 match value {
3478 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
3480 Value::Int64(_) => LogicalType::Int64,
3481 Value::Float64(_) => LogicalType::Float64,
3482 Value::String(_) => LogicalType::String,
3483 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
3485 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
3488 }
3489}
3490
3491fn expression_to_string(expr: &LogicalExpression) -> String {
3493 match expr {
3494 LogicalExpression::Variable(name) => name.clone(),
3495 LogicalExpression::Property { variable, property } => {
3496 format!("{variable}.{property}")
3497 }
3498 LogicalExpression::Literal(value) => format!("{value:?}"),
3499 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3500 _ => "expr".to_string(),
3501 }
3502}
3503
3504pub struct PhysicalPlan {
3506 pub operator: Box<dyn Operator>,
3508 pub columns: Vec<String>,
3510 pub adaptive_context: Option<AdaptiveContext>,
3516}
3517
3518impl PhysicalPlan {
3519 #[must_use]
3521 pub fn columns(&self) -> &[String] {
3522 &self.columns
3523 }
3524
3525 pub fn into_operator(self) -> Box<dyn Operator> {
3527 self.operator
3528 }
3529
3530 #[must_use]
3532 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3533 self.adaptive_context.as_ref()
3534 }
3535
3536 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3538 self.adaptive_context.take()
3539 }
3540}
3541
3542#[allow(dead_code)]
3546struct SingleResultOperator {
3547 result: Option<grafeo_core::execution::DataChunk>,
3548}
3549
3550impl SingleResultOperator {
3551 #[allow(dead_code)]
3552 fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3553 Self { result }
3554 }
3555}
3556
3557impl Operator for SingleResultOperator {
3558 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3559 Ok(self.result.take())
3560 }
3561
3562 fn reset(&mut self) {
3563 }
3565
3566 fn name(&self) -> &'static str {
3567 "SingleResult"
3568 }
3569}
3570
3571struct StaticResultOperator {
3573 rows: Vec<Vec<Value>>,
3574 column_indices: Vec<usize>,
3575 row_index: usize,
3576}
3577
3578impl Operator for StaticResultOperator {
3579 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3580 use grafeo_core::execution::DataChunk;
3581
3582 if self.row_index >= self.rows.len() {
3583 return Ok(None);
3584 }
3585
3586 let remaining = self.rows.len() - self.row_index;
3587 let chunk_rows = remaining.min(1024);
3588 let col_count = self.column_indices.len();
3589
3590 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
3591 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
3592
3593 for row_offset in 0..chunk_rows {
3594 let row = &self.rows[self.row_index + row_offset];
3595 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
3596 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
3597 if let Some(col) = chunk.column_mut(col_idx) {
3598 col.push_value(value);
3599 }
3600 }
3601 }
3602 chunk.set_count(chunk_rows);
3603
3604 self.row_index += chunk_rows;
3605 Ok(Some(chunk))
3606 }
3607
3608 fn reset(&mut self) {
3609 self.row_index = 0;
3610 }
3611
3612 fn name(&self) -> &'static str {
3613 "StaticResult"
3614 }
3615}
3616
3617#[cfg(test)]
3618mod tests {
3619 use super::*;
3620 use crate::query::plan::{
3621 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3622 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3623 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3624 SortKey, SortOp,
3625 };
3626 use grafeo_common::types::Value;
3627
3628 fn create_test_store() -> Arc<LpgStore> {
3629 let store = Arc::new(LpgStore::new());
3630 store.create_node(&["Person"]);
3631 store.create_node(&["Person"]);
3632 store.create_node(&["Company"]);
3633 store
3634 }
3635
3636 #[test]
3639 fn test_plan_simple_scan() {
3640 let store = create_test_store();
3641 let planner = Planner::new(store);
3642
3643 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3645 items: vec![ReturnItem {
3646 expression: LogicalExpression::Variable("n".to_string()),
3647 alias: None,
3648 }],
3649 distinct: false,
3650 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3651 variable: "n".to_string(),
3652 label: Some("Person".to_string()),
3653 input: None,
3654 })),
3655 }));
3656
3657 let physical = planner.plan(&logical).unwrap();
3658 assert_eq!(physical.columns(), &["n"]);
3659 }
3660
3661 #[test]
3662 fn test_plan_scan_without_label() {
3663 let store = create_test_store();
3664 let planner = Planner::new(store);
3665
3666 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3668 items: vec![ReturnItem {
3669 expression: LogicalExpression::Variable("n".to_string()),
3670 alias: None,
3671 }],
3672 distinct: false,
3673 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3674 variable: "n".to_string(),
3675 label: None,
3676 input: None,
3677 })),
3678 }));
3679
3680 let physical = planner.plan(&logical).unwrap();
3681 assert_eq!(physical.columns(), &["n"]);
3682 }
3683
3684 #[test]
3685 fn test_plan_return_with_alias() {
3686 let store = create_test_store();
3687 let planner = Planner::new(store);
3688
3689 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3691 items: vec![ReturnItem {
3692 expression: LogicalExpression::Variable("n".to_string()),
3693 alias: Some("person".to_string()),
3694 }],
3695 distinct: false,
3696 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3697 variable: "n".to_string(),
3698 label: Some("Person".to_string()),
3699 input: None,
3700 })),
3701 }));
3702
3703 let physical = planner.plan(&logical).unwrap();
3704 assert_eq!(physical.columns(), &["person"]);
3705 }
3706
3707 #[test]
3708 fn test_plan_return_property() {
3709 let store = create_test_store();
3710 let planner = Planner::new(store);
3711
3712 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3714 items: vec![ReturnItem {
3715 expression: LogicalExpression::Property {
3716 variable: "n".to_string(),
3717 property: "name".to_string(),
3718 },
3719 alias: None,
3720 }],
3721 distinct: false,
3722 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3723 variable: "n".to_string(),
3724 label: Some("Person".to_string()),
3725 input: None,
3726 })),
3727 }));
3728
3729 let physical = planner.plan(&logical).unwrap();
3730 assert_eq!(physical.columns(), &["n.name"]);
3731 }
3732
3733 #[test]
3734 fn test_plan_return_literal() {
3735 let store = create_test_store();
3736 let planner = Planner::new(store);
3737
3738 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3740 items: vec![ReturnItem {
3741 expression: LogicalExpression::Literal(Value::Int64(42)),
3742 alias: Some("answer".to_string()),
3743 }],
3744 distinct: false,
3745 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3746 variable: "n".to_string(),
3747 label: None,
3748 input: None,
3749 })),
3750 }));
3751
3752 let physical = planner.plan(&logical).unwrap();
3753 assert_eq!(physical.columns(), &["answer"]);
3754 }
3755
3756 #[test]
3759 fn test_plan_filter_equality() {
3760 let store = create_test_store();
3761 let planner = Planner::new(store);
3762
3763 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3765 items: vec![ReturnItem {
3766 expression: LogicalExpression::Variable("n".to_string()),
3767 alias: None,
3768 }],
3769 distinct: false,
3770 input: Box::new(LogicalOperator::Filter(FilterOp {
3771 predicate: LogicalExpression::Binary {
3772 left: Box::new(LogicalExpression::Property {
3773 variable: "n".to_string(),
3774 property: "age".to_string(),
3775 }),
3776 op: BinaryOp::Eq,
3777 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3778 },
3779 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3780 variable: "n".to_string(),
3781 label: Some("Person".to_string()),
3782 input: None,
3783 })),
3784 })),
3785 }));
3786
3787 let physical = planner.plan(&logical).unwrap();
3788 assert_eq!(physical.columns(), &["n"]);
3789 }
3790
3791 #[test]
3792 fn test_plan_filter_compound_and() {
3793 let store = create_test_store();
3794 let planner = Planner::new(store);
3795
3796 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3798 items: vec![ReturnItem {
3799 expression: LogicalExpression::Variable("n".to_string()),
3800 alias: None,
3801 }],
3802 distinct: false,
3803 input: Box::new(LogicalOperator::Filter(FilterOp {
3804 predicate: LogicalExpression::Binary {
3805 left: Box::new(LogicalExpression::Binary {
3806 left: Box::new(LogicalExpression::Property {
3807 variable: "n".to_string(),
3808 property: "age".to_string(),
3809 }),
3810 op: BinaryOp::Gt,
3811 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3812 }),
3813 op: BinaryOp::And,
3814 right: Box::new(LogicalExpression::Binary {
3815 left: Box::new(LogicalExpression::Property {
3816 variable: "n".to_string(),
3817 property: "age".to_string(),
3818 }),
3819 op: BinaryOp::Lt,
3820 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3821 }),
3822 },
3823 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3824 variable: "n".to_string(),
3825 label: None,
3826 input: None,
3827 })),
3828 })),
3829 }));
3830
3831 let physical = planner.plan(&logical).unwrap();
3832 assert_eq!(physical.columns(), &["n"]);
3833 }
3834
3835 #[test]
3836 fn test_plan_filter_unary_not() {
3837 let store = create_test_store();
3838 let planner = Planner::new(store);
3839
3840 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3842 items: vec![ReturnItem {
3843 expression: LogicalExpression::Variable("n".to_string()),
3844 alias: None,
3845 }],
3846 distinct: false,
3847 input: Box::new(LogicalOperator::Filter(FilterOp {
3848 predicate: LogicalExpression::Unary {
3849 op: UnaryOp::Not,
3850 operand: Box::new(LogicalExpression::Property {
3851 variable: "n".to_string(),
3852 property: "active".to_string(),
3853 }),
3854 },
3855 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3856 variable: "n".to_string(),
3857 label: None,
3858 input: None,
3859 })),
3860 })),
3861 }));
3862
3863 let physical = planner.plan(&logical).unwrap();
3864 assert_eq!(physical.columns(), &["n"]);
3865 }
3866
3867 #[test]
3868 fn test_plan_filter_is_null() {
3869 let store = create_test_store();
3870 let planner = Planner::new(store);
3871
3872 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3874 items: vec![ReturnItem {
3875 expression: LogicalExpression::Variable("n".to_string()),
3876 alias: None,
3877 }],
3878 distinct: false,
3879 input: Box::new(LogicalOperator::Filter(FilterOp {
3880 predicate: LogicalExpression::Unary {
3881 op: UnaryOp::IsNull,
3882 operand: Box::new(LogicalExpression::Property {
3883 variable: "n".to_string(),
3884 property: "email".to_string(),
3885 }),
3886 },
3887 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3888 variable: "n".to_string(),
3889 label: None,
3890 input: None,
3891 })),
3892 })),
3893 }));
3894
3895 let physical = planner.plan(&logical).unwrap();
3896 assert_eq!(physical.columns(), &["n"]);
3897 }
3898
3899 #[test]
3900 fn test_plan_filter_function_call() {
3901 let store = create_test_store();
3902 let planner = Planner::new(store);
3903
3904 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3906 items: vec![ReturnItem {
3907 expression: LogicalExpression::Variable("n".to_string()),
3908 alias: None,
3909 }],
3910 distinct: false,
3911 input: Box::new(LogicalOperator::Filter(FilterOp {
3912 predicate: LogicalExpression::Binary {
3913 left: Box::new(LogicalExpression::FunctionCall {
3914 name: "size".to_string(),
3915 args: vec![LogicalExpression::Property {
3916 variable: "n".to_string(),
3917 property: "friends".to_string(),
3918 }],
3919 distinct: false,
3920 }),
3921 op: BinaryOp::Gt,
3922 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3923 },
3924 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3925 variable: "n".to_string(),
3926 label: None,
3927 input: None,
3928 })),
3929 })),
3930 }));
3931
3932 let physical = planner.plan(&logical).unwrap();
3933 assert_eq!(physical.columns(), &["n"]);
3934 }
3935
3936 #[test]
3939 fn test_plan_expand_outgoing() {
3940 let store = create_test_store();
3941 let planner = Planner::new(store);
3942
3943 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3945 items: vec![
3946 ReturnItem {
3947 expression: LogicalExpression::Variable("a".to_string()),
3948 alias: None,
3949 },
3950 ReturnItem {
3951 expression: LogicalExpression::Variable("b".to_string()),
3952 alias: None,
3953 },
3954 ],
3955 distinct: false,
3956 input: Box::new(LogicalOperator::Expand(ExpandOp {
3957 from_variable: "a".to_string(),
3958 to_variable: "b".to_string(),
3959 edge_variable: None,
3960 direction: ExpandDirection::Outgoing,
3961 edge_type: Some("KNOWS".to_string()),
3962 min_hops: 1,
3963 max_hops: Some(1),
3964 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3965 variable: "a".to_string(),
3966 label: Some("Person".to_string()),
3967 input: None,
3968 })),
3969 path_alias: None,
3970 })),
3971 }));
3972
3973 let physical = planner.plan(&logical).unwrap();
3974 assert!(physical.columns().contains(&"a".to_string()));
3976 assert!(physical.columns().contains(&"b".to_string()));
3977 }
3978
3979 #[test]
3980 fn test_plan_expand_with_edge_variable() {
3981 let store = create_test_store();
3982 let planner = Planner::new(store);
3983
3984 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3986 items: vec![
3987 ReturnItem {
3988 expression: LogicalExpression::Variable("a".to_string()),
3989 alias: None,
3990 },
3991 ReturnItem {
3992 expression: LogicalExpression::Variable("r".to_string()),
3993 alias: None,
3994 },
3995 ReturnItem {
3996 expression: LogicalExpression::Variable("b".to_string()),
3997 alias: None,
3998 },
3999 ],
4000 distinct: false,
4001 input: Box::new(LogicalOperator::Expand(ExpandOp {
4002 from_variable: "a".to_string(),
4003 to_variable: "b".to_string(),
4004 edge_variable: Some("r".to_string()),
4005 direction: ExpandDirection::Outgoing,
4006 edge_type: Some("KNOWS".to_string()),
4007 min_hops: 1,
4008 max_hops: Some(1),
4009 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4010 variable: "a".to_string(),
4011 label: None,
4012 input: None,
4013 })),
4014 path_alias: None,
4015 })),
4016 }));
4017
4018 let physical = planner.plan(&logical).unwrap();
4019 assert!(physical.columns().contains(&"a".to_string()));
4020 assert!(physical.columns().contains(&"r".to_string()));
4021 assert!(physical.columns().contains(&"b".to_string()));
4022 }
4023
4024 #[test]
4027 fn test_plan_limit() {
4028 let store = create_test_store();
4029 let planner = Planner::new(store);
4030
4031 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4033 items: vec![ReturnItem {
4034 expression: LogicalExpression::Variable("n".to_string()),
4035 alias: None,
4036 }],
4037 distinct: false,
4038 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4039 count: 10,
4040 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4041 variable: "n".to_string(),
4042 label: None,
4043 input: None,
4044 })),
4045 })),
4046 }));
4047
4048 let physical = planner.plan(&logical).unwrap();
4049 assert_eq!(physical.columns(), &["n"]);
4050 }
4051
4052 #[test]
4053 fn test_plan_skip() {
4054 let store = create_test_store();
4055 let planner = Planner::new(store);
4056
4057 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4059 items: vec![ReturnItem {
4060 expression: LogicalExpression::Variable("n".to_string()),
4061 alias: None,
4062 }],
4063 distinct: false,
4064 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4065 count: 5,
4066 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4067 variable: "n".to_string(),
4068 label: None,
4069 input: None,
4070 })),
4071 })),
4072 }));
4073
4074 let physical = planner.plan(&logical).unwrap();
4075 assert_eq!(physical.columns(), &["n"]);
4076 }
4077
4078 #[test]
4079 fn test_plan_sort() {
4080 let store = create_test_store();
4081 let planner = Planner::new(store);
4082
4083 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4085 items: vec![ReturnItem {
4086 expression: LogicalExpression::Variable("n".to_string()),
4087 alias: None,
4088 }],
4089 distinct: false,
4090 input: Box::new(LogicalOperator::Sort(SortOp {
4091 keys: vec![SortKey {
4092 expression: LogicalExpression::Variable("n".to_string()),
4093 order: SortOrder::Ascending,
4094 }],
4095 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4096 variable: "n".to_string(),
4097 label: None,
4098 input: None,
4099 })),
4100 })),
4101 }));
4102
4103 let physical = planner.plan(&logical).unwrap();
4104 assert_eq!(physical.columns(), &["n"]);
4105 }
4106
4107 #[test]
4108 fn test_plan_sort_descending() {
4109 let store = create_test_store();
4110 let planner = Planner::new(store);
4111
4112 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4114 items: vec![ReturnItem {
4115 expression: LogicalExpression::Variable("n".to_string()),
4116 alias: None,
4117 }],
4118 distinct: false,
4119 input: Box::new(LogicalOperator::Sort(SortOp {
4120 keys: vec![SortKey {
4121 expression: LogicalExpression::Variable("n".to_string()),
4122 order: SortOrder::Descending,
4123 }],
4124 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4125 variable: "n".to_string(),
4126 label: None,
4127 input: None,
4128 })),
4129 })),
4130 }));
4131
4132 let physical = planner.plan(&logical).unwrap();
4133 assert_eq!(physical.columns(), &["n"]);
4134 }
4135
4136 #[test]
4137 fn test_plan_distinct() {
4138 let store = create_test_store();
4139 let planner = Planner::new(store);
4140
4141 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4143 items: vec![ReturnItem {
4144 expression: LogicalExpression::Variable("n".to_string()),
4145 alias: None,
4146 }],
4147 distinct: false,
4148 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4149 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4150 variable: "n".to_string(),
4151 label: None,
4152 input: None,
4153 })),
4154 columns: None,
4155 })),
4156 }));
4157
4158 let physical = planner.plan(&logical).unwrap();
4159 assert_eq!(physical.columns(), &["n"]);
4160 }
4161
4162 #[test]
4165 fn test_plan_aggregate_count() {
4166 let store = create_test_store();
4167 let planner = Planner::new(store);
4168
4169 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4171 items: vec![ReturnItem {
4172 expression: LogicalExpression::Variable("cnt".to_string()),
4173 alias: None,
4174 }],
4175 distinct: false,
4176 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
4177 group_by: vec![],
4178 aggregates: vec![LogicalAggregateExpr {
4179 function: LogicalAggregateFunction::Count,
4180 expression: Some(LogicalExpression::Variable("n".to_string())),
4181 distinct: false,
4182 alias: Some("cnt".to_string()),
4183 percentile: None,
4184 }],
4185 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4186 variable: "n".to_string(),
4187 label: None,
4188 input: None,
4189 })),
4190 having: None,
4191 })),
4192 }));
4193
4194 let physical = planner.plan(&logical).unwrap();
4195 assert!(physical.columns().contains(&"cnt".to_string()));
4196 }
4197
4198 #[test]
4199 fn test_plan_aggregate_with_group_by() {
4200 let store = create_test_store();
4201 let planner = Planner::new(store);
4202
4203 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4205 group_by: vec![LogicalExpression::Property {
4206 variable: "n".to_string(),
4207 property: "city".to_string(),
4208 }],
4209 aggregates: vec![LogicalAggregateExpr {
4210 function: LogicalAggregateFunction::Count,
4211 expression: Some(LogicalExpression::Variable("n".to_string())),
4212 distinct: false,
4213 alias: Some("cnt".to_string()),
4214 percentile: None,
4215 }],
4216 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4217 variable: "n".to_string(),
4218 label: Some("Person".to_string()),
4219 input: None,
4220 })),
4221 having: None,
4222 }));
4223
4224 let physical = planner.plan(&logical).unwrap();
4225 assert_eq!(physical.columns().len(), 2);
4226 }
4227
4228 #[test]
4229 fn test_plan_aggregate_sum() {
4230 let store = create_test_store();
4231 let planner = Planner::new(store);
4232
4233 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4235 group_by: vec![],
4236 aggregates: vec![LogicalAggregateExpr {
4237 function: LogicalAggregateFunction::Sum,
4238 expression: Some(LogicalExpression::Property {
4239 variable: "n".to_string(),
4240 property: "value".to_string(),
4241 }),
4242 distinct: false,
4243 alias: Some("total".to_string()),
4244 percentile: None,
4245 }],
4246 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4247 variable: "n".to_string(),
4248 label: None,
4249 input: None,
4250 })),
4251 having: None,
4252 }));
4253
4254 let physical = planner.plan(&logical).unwrap();
4255 assert!(physical.columns().contains(&"total".to_string()));
4256 }
4257
4258 #[test]
4259 fn test_plan_aggregate_avg() {
4260 let store = create_test_store();
4261 let planner = Planner::new(store);
4262
4263 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4265 group_by: vec![],
4266 aggregates: vec![LogicalAggregateExpr {
4267 function: LogicalAggregateFunction::Avg,
4268 expression: Some(LogicalExpression::Property {
4269 variable: "n".to_string(),
4270 property: "score".to_string(),
4271 }),
4272 distinct: false,
4273 alias: Some("average".to_string()),
4274 percentile: None,
4275 }],
4276 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4277 variable: "n".to_string(),
4278 label: None,
4279 input: None,
4280 })),
4281 having: None,
4282 }));
4283
4284 let physical = planner.plan(&logical).unwrap();
4285 assert!(physical.columns().contains(&"average".to_string()));
4286 }
4287
4288 #[test]
4289 fn test_plan_aggregate_min_max() {
4290 let store = create_test_store();
4291 let planner = Planner::new(store);
4292
4293 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4295 group_by: vec![],
4296 aggregates: vec![
4297 LogicalAggregateExpr {
4298 function: LogicalAggregateFunction::Min,
4299 expression: Some(LogicalExpression::Property {
4300 variable: "n".to_string(),
4301 property: "age".to_string(),
4302 }),
4303 distinct: false,
4304 alias: Some("youngest".to_string()),
4305 percentile: None,
4306 },
4307 LogicalAggregateExpr {
4308 function: LogicalAggregateFunction::Max,
4309 expression: Some(LogicalExpression::Property {
4310 variable: "n".to_string(),
4311 property: "age".to_string(),
4312 }),
4313 distinct: false,
4314 alias: Some("oldest".to_string()),
4315 percentile: None,
4316 },
4317 ],
4318 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4319 variable: "n".to_string(),
4320 label: None,
4321 input: None,
4322 })),
4323 having: None,
4324 }));
4325
4326 let physical = planner.plan(&logical).unwrap();
4327 assert!(physical.columns().contains(&"youngest".to_string()));
4328 assert!(physical.columns().contains(&"oldest".to_string()));
4329 }
4330
4331 #[test]
4334 fn test_plan_inner_join() {
4335 let store = create_test_store();
4336 let planner = Planner::new(store);
4337
4338 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4340 items: vec![
4341 ReturnItem {
4342 expression: LogicalExpression::Variable("a".to_string()),
4343 alias: None,
4344 },
4345 ReturnItem {
4346 expression: LogicalExpression::Variable("b".to_string()),
4347 alias: None,
4348 },
4349 ],
4350 distinct: false,
4351 input: Box::new(LogicalOperator::Join(JoinOp {
4352 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4353 variable: "a".to_string(),
4354 label: Some("Person".to_string()),
4355 input: None,
4356 })),
4357 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4358 variable: "b".to_string(),
4359 label: Some("Company".to_string()),
4360 input: None,
4361 })),
4362 join_type: JoinType::Inner,
4363 conditions: vec![JoinCondition {
4364 left: LogicalExpression::Variable("a".to_string()),
4365 right: LogicalExpression::Variable("b".to_string()),
4366 }],
4367 })),
4368 }));
4369
4370 let physical = planner.plan(&logical).unwrap();
4371 assert!(physical.columns().contains(&"a".to_string()));
4372 assert!(physical.columns().contains(&"b".to_string()));
4373 }
4374
4375 #[test]
4376 fn test_plan_cross_join() {
4377 let store = create_test_store();
4378 let planner = Planner::new(store);
4379
4380 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4382 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4383 variable: "a".to_string(),
4384 label: None,
4385 input: None,
4386 })),
4387 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4388 variable: "b".to_string(),
4389 label: None,
4390 input: None,
4391 })),
4392 join_type: JoinType::Cross,
4393 conditions: vec![],
4394 }));
4395
4396 let physical = planner.plan(&logical).unwrap();
4397 assert_eq!(physical.columns().len(), 2);
4398 }
4399
4400 #[test]
4401 fn test_plan_left_join() {
4402 let store = create_test_store();
4403 let planner = Planner::new(store);
4404
4405 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4406 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4407 variable: "a".to_string(),
4408 label: None,
4409 input: None,
4410 })),
4411 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4412 variable: "b".to_string(),
4413 label: None,
4414 input: None,
4415 })),
4416 join_type: JoinType::Left,
4417 conditions: vec![],
4418 }));
4419
4420 let physical = planner.plan(&logical).unwrap();
4421 assert_eq!(physical.columns().len(), 2);
4422 }
4423
4424 #[test]
4427 fn test_plan_create_node() {
4428 let store = create_test_store();
4429 let planner = Planner::new(store);
4430
4431 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4433 variable: "n".to_string(),
4434 labels: vec!["Person".to_string()],
4435 properties: vec![(
4436 "name".to_string(),
4437 LogicalExpression::Literal(Value::String("Alice".into())),
4438 )],
4439 input: None,
4440 }));
4441
4442 let physical = planner.plan(&logical).unwrap();
4443 assert!(physical.columns().contains(&"n".to_string()));
4444 }
4445
4446 #[test]
4447 fn test_plan_create_edge() {
4448 let store = create_test_store();
4449 let planner = Planner::new(store);
4450
4451 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4453 variable: Some("r".to_string()),
4454 from_variable: "a".to_string(),
4455 to_variable: "b".to_string(),
4456 edge_type: "KNOWS".to_string(),
4457 properties: vec![],
4458 input: Box::new(LogicalOperator::Join(JoinOp {
4459 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4460 variable: "a".to_string(),
4461 label: None,
4462 input: None,
4463 })),
4464 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4465 variable: "b".to_string(),
4466 label: None,
4467 input: None,
4468 })),
4469 join_type: JoinType::Cross,
4470 conditions: vec![],
4471 })),
4472 }));
4473
4474 let physical = planner.plan(&logical).unwrap();
4475 assert!(physical.columns().contains(&"r".to_string()));
4476 }
4477
4478 #[test]
4479 fn test_plan_delete_node() {
4480 let store = create_test_store();
4481 let planner = Planner::new(store);
4482
4483 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4485 variable: "n".to_string(),
4486 detach: false,
4487 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4488 variable: "n".to_string(),
4489 label: None,
4490 input: None,
4491 })),
4492 }));
4493
4494 let physical = planner.plan(&logical).unwrap();
4495 assert!(physical.columns().contains(&"deleted_count".to_string()));
4496 }
4497
4498 #[test]
4501 fn test_plan_empty_errors() {
4502 let store = create_test_store();
4503 let planner = Planner::new(store);
4504
4505 let logical = LogicalPlan::new(LogicalOperator::Empty);
4506 let result = planner.plan(&logical);
4507 assert!(result.is_err());
4508 }
4509
4510 #[test]
4511 fn test_plan_missing_variable_in_return() {
4512 let store = create_test_store();
4513 let planner = Planner::new(store);
4514
4515 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4517 items: vec![ReturnItem {
4518 expression: LogicalExpression::Variable("missing".to_string()),
4519 alias: None,
4520 }],
4521 distinct: false,
4522 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4523 variable: "n".to_string(),
4524 label: None,
4525 input: None,
4526 })),
4527 }));
4528
4529 let result = planner.plan(&logical);
4530 assert!(result.is_err());
4531 }
4532
4533 #[test]
4536 fn test_convert_binary_ops() {
4537 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4538 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4539 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4540 assert!(convert_binary_op(BinaryOp::Le).is_ok());
4541 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4542 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4543 assert!(convert_binary_op(BinaryOp::And).is_ok());
4544 assert!(convert_binary_op(BinaryOp::Or).is_ok());
4545 assert!(convert_binary_op(BinaryOp::Add).is_ok());
4546 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4547 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4548 assert!(convert_binary_op(BinaryOp::Div).is_ok());
4549 }
4550
4551 #[test]
4552 fn test_convert_unary_ops() {
4553 assert!(convert_unary_op(UnaryOp::Not).is_ok());
4554 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4555 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4556 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4557 }
4558
4559 #[test]
4560 fn test_convert_aggregate_functions() {
4561 assert!(matches!(
4562 convert_aggregate_function(LogicalAggregateFunction::Count),
4563 PhysicalAggregateFunction::Count
4564 ));
4565 assert!(matches!(
4566 convert_aggregate_function(LogicalAggregateFunction::Sum),
4567 PhysicalAggregateFunction::Sum
4568 ));
4569 assert!(matches!(
4570 convert_aggregate_function(LogicalAggregateFunction::Avg),
4571 PhysicalAggregateFunction::Avg
4572 ));
4573 assert!(matches!(
4574 convert_aggregate_function(LogicalAggregateFunction::Min),
4575 PhysicalAggregateFunction::Min
4576 ));
4577 assert!(matches!(
4578 convert_aggregate_function(LogicalAggregateFunction::Max),
4579 PhysicalAggregateFunction::Max
4580 ));
4581 }
4582
4583 #[test]
4584 fn test_planner_accessors() {
4585 let store = create_test_store();
4586 let planner = Planner::new(Arc::clone(&store));
4587
4588 assert!(planner.tx_id().is_none());
4589 assert!(planner.tx_manager().is_none());
4590 let _ = planner.viewing_epoch(); }
4592
4593 #[test]
4594 fn test_physical_plan_accessors() {
4595 let store = create_test_store();
4596 let planner = Planner::new(store);
4597
4598 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4599 variable: "n".to_string(),
4600 label: None,
4601 input: None,
4602 }));
4603
4604 let physical = planner.plan(&logical).unwrap();
4605 assert_eq!(physical.columns(), &["n"]);
4606
4607 let _ = physical.into_operator();
4609 }
4610
4611 #[test]
4614 fn test_plan_adaptive_with_scan() {
4615 let store = create_test_store();
4616 let planner = Planner::new(store);
4617
4618 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4620 items: vec![ReturnItem {
4621 expression: LogicalExpression::Variable("n".to_string()),
4622 alias: None,
4623 }],
4624 distinct: false,
4625 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4626 variable: "n".to_string(),
4627 label: Some("Person".to_string()),
4628 input: None,
4629 })),
4630 }));
4631
4632 let physical = planner.plan_adaptive(&logical).unwrap();
4633 assert_eq!(physical.columns(), &["n"]);
4634 assert!(physical.adaptive_context.is_some());
4636 }
4637
4638 #[test]
4639 fn test_plan_adaptive_with_filter() {
4640 let store = create_test_store();
4641 let planner = Planner::new(store);
4642
4643 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4645 items: vec![ReturnItem {
4646 expression: LogicalExpression::Variable("n".to_string()),
4647 alias: None,
4648 }],
4649 distinct: false,
4650 input: Box::new(LogicalOperator::Filter(FilterOp {
4651 predicate: LogicalExpression::Binary {
4652 left: Box::new(LogicalExpression::Property {
4653 variable: "n".to_string(),
4654 property: "age".to_string(),
4655 }),
4656 op: BinaryOp::Gt,
4657 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4658 },
4659 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4660 variable: "n".to_string(),
4661 label: None,
4662 input: None,
4663 })),
4664 })),
4665 }));
4666
4667 let physical = planner.plan_adaptive(&logical).unwrap();
4668 assert!(physical.adaptive_context.is_some());
4669 }
4670
4671 #[test]
4672 fn test_plan_adaptive_with_expand() {
4673 let store = create_test_store();
4674 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4675
4676 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4678 items: vec![
4679 ReturnItem {
4680 expression: LogicalExpression::Variable("a".to_string()),
4681 alias: None,
4682 },
4683 ReturnItem {
4684 expression: LogicalExpression::Variable("b".to_string()),
4685 alias: None,
4686 },
4687 ],
4688 distinct: false,
4689 input: Box::new(LogicalOperator::Expand(ExpandOp {
4690 from_variable: "a".to_string(),
4691 to_variable: "b".to_string(),
4692 edge_variable: None,
4693 direction: ExpandDirection::Outgoing,
4694 edge_type: Some("KNOWS".to_string()),
4695 min_hops: 1,
4696 max_hops: Some(1),
4697 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4698 variable: "a".to_string(),
4699 label: None,
4700 input: None,
4701 })),
4702 path_alias: None,
4703 })),
4704 }));
4705
4706 let physical = planner.plan_adaptive(&logical).unwrap();
4707 assert!(physical.adaptive_context.is_some());
4708 }
4709
4710 #[test]
4711 fn test_plan_adaptive_with_join() {
4712 let store = create_test_store();
4713 let planner = Planner::new(store);
4714
4715 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4716 items: vec![
4717 ReturnItem {
4718 expression: LogicalExpression::Variable("a".to_string()),
4719 alias: None,
4720 },
4721 ReturnItem {
4722 expression: LogicalExpression::Variable("b".to_string()),
4723 alias: None,
4724 },
4725 ],
4726 distinct: false,
4727 input: Box::new(LogicalOperator::Join(JoinOp {
4728 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4729 variable: "a".to_string(),
4730 label: None,
4731 input: None,
4732 })),
4733 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4734 variable: "b".to_string(),
4735 label: None,
4736 input: None,
4737 })),
4738 join_type: JoinType::Cross,
4739 conditions: vec![],
4740 })),
4741 }));
4742
4743 let physical = planner.plan_adaptive(&logical).unwrap();
4744 assert!(physical.adaptive_context.is_some());
4745 }
4746
4747 #[test]
4748 fn test_plan_adaptive_with_aggregate() {
4749 let store = create_test_store();
4750 let planner = Planner::new(store);
4751
4752 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4753 group_by: vec![],
4754 aggregates: vec![LogicalAggregateExpr {
4755 function: LogicalAggregateFunction::Count,
4756 expression: Some(LogicalExpression::Variable("n".to_string())),
4757 distinct: false,
4758 alias: Some("cnt".to_string()),
4759 percentile: None,
4760 }],
4761 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4762 variable: "n".to_string(),
4763 label: None,
4764 input: None,
4765 })),
4766 having: None,
4767 }));
4768
4769 let physical = planner.plan_adaptive(&logical).unwrap();
4770 assert!(physical.adaptive_context.is_some());
4771 }
4772
4773 #[test]
4774 fn test_plan_adaptive_with_distinct() {
4775 let store = create_test_store();
4776 let planner = Planner::new(store);
4777
4778 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4779 items: vec![ReturnItem {
4780 expression: LogicalExpression::Variable("n".to_string()),
4781 alias: None,
4782 }],
4783 distinct: false,
4784 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4785 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4786 variable: "n".to_string(),
4787 label: None,
4788 input: None,
4789 })),
4790 columns: None,
4791 })),
4792 }));
4793
4794 let physical = planner.plan_adaptive(&logical).unwrap();
4795 assert!(physical.adaptive_context.is_some());
4796 }
4797
4798 #[test]
4799 fn test_plan_adaptive_with_limit() {
4800 let store = create_test_store();
4801 let planner = Planner::new(store);
4802
4803 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4804 items: vec![ReturnItem {
4805 expression: LogicalExpression::Variable("n".to_string()),
4806 alias: None,
4807 }],
4808 distinct: false,
4809 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4810 count: 10,
4811 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4812 variable: "n".to_string(),
4813 label: None,
4814 input: None,
4815 })),
4816 })),
4817 }));
4818
4819 let physical = planner.plan_adaptive(&logical).unwrap();
4820 assert!(physical.adaptive_context.is_some());
4821 }
4822
4823 #[test]
4824 fn test_plan_adaptive_with_skip() {
4825 let store = create_test_store();
4826 let planner = Planner::new(store);
4827
4828 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4829 items: vec![ReturnItem {
4830 expression: LogicalExpression::Variable("n".to_string()),
4831 alias: None,
4832 }],
4833 distinct: false,
4834 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4835 count: 5,
4836 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4837 variable: "n".to_string(),
4838 label: None,
4839 input: None,
4840 })),
4841 })),
4842 }));
4843
4844 let physical = planner.plan_adaptive(&logical).unwrap();
4845 assert!(physical.adaptive_context.is_some());
4846 }
4847
4848 #[test]
4849 fn test_plan_adaptive_with_sort() {
4850 let store = create_test_store();
4851 let planner = Planner::new(store);
4852
4853 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4854 items: vec![ReturnItem {
4855 expression: LogicalExpression::Variable("n".to_string()),
4856 alias: None,
4857 }],
4858 distinct: false,
4859 input: Box::new(LogicalOperator::Sort(SortOp {
4860 keys: vec![SortKey {
4861 expression: LogicalExpression::Variable("n".to_string()),
4862 order: SortOrder::Ascending,
4863 }],
4864 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4865 variable: "n".to_string(),
4866 label: None,
4867 input: None,
4868 })),
4869 })),
4870 }));
4871
4872 let physical = planner.plan_adaptive(&logical).unwrap();
4873 assert!(physical.adaptive_context.is_some());
4874 }
4875
4876 #[test]
4877 fn test_plan_adaptive_with_union() {
4878 let store = create_test_store();
4879 let planner = Planner::new(store);
4880
4881 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4882 items: vec![ReturnItem {
4883 expression: LogicalExpression::Variable("n".to_string()),
4884 alias: None,
4885 }],
4886 distinct: false,
4887 input: Box::new(LogicalOperator::Union(UnionOp {
4888 inputs: vec![
4889 LogicalOperator::NodeScan(NodeScanOp {
4890 variable: "n".to_string(),
4891 label: Some("Person".to_string()),
4892 input: None,
4893 }),
4894 LogicalOperator::NodeScan(NodeScanOp {
4895 variable: "n".to_string(),
4896 label: Some("Company".to_string()),
4897 input: None,
4898 }),
4899 ],
4900 })),
4901 }));
4902
4903 let physical = planner.plan_adaptive(&logical).unwrap();
4904 assert!(physical.adaptive_context.is_some());
4905 }
4906
4907 #[test]
4910 fn test_plan_expand_variable_length() {
4911 let store = create_test_store();
4912 let planner = Planner::new(store);
4913
4914 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4916 items: vec![
4917 ReturnItem {
4918 expression: LogicalExpression::Variable("a".to_string()),
4919 alias: None,
4920 },
4921 ReturnItem {
4922 expression: LogicalExpression::Variable("b".to_string()),
4923 alias: None,
4924 },
4925 ],
4926 distinct: false,
4927 input: Box::new(LogicalOperator::Expand(ExpandOp {
4928 from_variable: "a".to_string(),
4929 to_variable: "b".to_string(),
4930 edge_variable: None,
4931 direction: ExpandDirection::Outgoing,
4932 edge_type: Some("KNOWS".to_string()),
4933 min_hops: 1,
4934 max_hops: Some(3),
4935 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4936 variable: "a".to_string(),
4937 label: None,
4938 input: None,
4939 })),
4940 path_alias: None,
4941 })),
4942 }));
4943
4944 let physical = planner.plan(&logical).unwrap();
4945 assert!(physical.columns().contains(&"a".to_string()));
4946 assert!(physical.columns().contains(&"b".to_string()));
4947 }
4948
4949 #[test]
4950 fn test_plan_expand_with_path_alias() {
4951 let store = create_test_store();
4952 let planner = Planner::new(store);
4953
4954 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4956 items: vec![
4957 ReturnItem {
4958 expression: LogicalExpression::Variable("a".to_string()),
4959 alias: None,
4960 },
4961 ReturnItem {
4962 expression: LogicalExpression::Variable("b".to_string()),
4963 alias: None,
4964 },
4965 ],
4966 distinct: false,
4967 input: Box::new(LogicalOperator::Expand(ExpandOp {
4968 from_variable: "a".to_string(),
4969 to_variable: "b".to_string(),
4970 edge_variable: None,
4971 direction: ExpandDirection::Outgoing,
4972 edge_type: Some("KNOWS".to_string()),
4973 min_hops: 1,
4974 max_hops: Some(3),
4975 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4976 variable: "a".to_string(),
4977 label: None,
4978 input: None,
4979 })),
4980 path_alias: Some("p".to_string()),
4981 })),
4982 }));
4983
4984 let physical = planner.plan(&logical).unwrap();
4985 assert!(physical.columns().contains(&"a".to_string()));
4987 assert!(physical.columns().contains(&"b".to_string()));
4988 }
4989
4990 #[test]
4991 fn test_plan_expand_incoming() {
4992 let store = create_test_store();
4993 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4994
4995 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4997 items: vec![
4998 ReturnItem {
4999 expression: LogicalExpression::Variable("a".to_string()),
5000 alias: None,
5001 },
5002 ReturnItem {
5003 expression: LogicalExpression::Variable("b".to_string()),
5004 alias: None,
5005 },
5006 ],
5007 distinct: false,
5008 input: Box::new(LogicalOperator::Expand(ExpandOp {
5009 from_variable: "a".to_string(),
5010 to_variable: "b".to_string(),
5011 edge_variable: None,
5012 direction: ExpandDirection::Incoming,
5013 edge_type: Some("KNOWS".to_string()),
5014 min_hops: 1,
5015 max_hops: Some(1),
5016 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5017 variable: "a".to_string(),
5018 label: None,
5019 input: None,
5020 })),
5021 path_alias: None,
5022 })),
5023 }));
5024
5025 let physical = planner.plan(&logical).unwrap();
5026 assert!(physical.columns().contains(&"a".to_string()));
5027 assert!(physical.columns().contains(&"b".to_string()));
5028 }
5029
5030 #[test]
5031 fn test_plan_expand_both_directions() {
5032 let store = create_test_store();
5033 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
5034
5035 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5037 items: vec![
5038 ReturnItem {
5039 expression: LogicalExpression::Variable("a".to_string()),
5040 alias: None,
5041 },
5042 ReturnItem {
5043 expression: LogicalExpression::Variable("b".to_string()),
5044 alias: None,
5045 },
5046 ],
5047 distinct: false,
5048 input: Box::new(LogicalOperator::Expand(ExpandOp {
5049 from_variable: "a".to_string(),
5050 to_variable: "b".to_string(),
5051 edge_variable: None,
5052 direction: ExpandDirection::Both,
5053 edge_type: Some("KNOWS".to_string()),
5054 min_hops: 1,
5055 max_hops: Some(1),
5056 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5057 variable: "a".to_string(),
5058 label: None,
5059 input: None,
5060 })),
5061 path_alias: None,
5062 })),
5063 }));
5064
5065 let physical = planner.plan(&logical).unwrap();
5066 assert!(physical.columns().contains(&"a".to_string()));
5067 assert!(physical.columns().contains(&"b".to_string()));
5068 }
5069
5070 #[test]
5073 fn test_planner_with_context() {
5074 use crate::transaction::TransactionManager;
5075
5076 let store = create_test_store();
5077 let tx_manager = Arc::new(TransactionManager::new());
5078 let tx_id = tx_manager.begin();
5079 let epoch = tx_manager.current_epoch();
5080
5081 let planner = Planner::with_context(
5082 Arc::clone(&store),
5083 Arc::clone(&tx_manager),
5084 Some(tx_id),
5085 epoch,
5086 );
5087
5088 assert_eq!(planner.tx_id(), Some(tx_id));
5089 assert!(planner.tx_manager().is_some());
5090 assert_eq!(planner.viewing_epoch(), epoch);
5091 }
5092
5093 #[test]
5094 fn test_planner_with_factorized_execution_disabled() {
5095 let store = create_test_store();
5096 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
5097
5098 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5100 items: vec![
5101 ReturnItem {
5102 expression: LogicalExpression::Variable("a".to_string()),
5103 alias: None,
5104 },
5105 ReturnItem {
5106 expression: LogicalExpression::Variable("c".to_string()),
5107 alias: None,
5108 },
5109 ],
5110 distinct: false,
5111 input: Box::new(LogicalOperator::Expand(ExpandOp {
5112 from_variable: "b".to_string(),
5113 to_variable: "c".to_string(),
5114 edge_variable: None,
5115 direction: ExpandDirection::Outgoing,
5116 edge_type: None,
5117 min_hops: 1,
5118 max_hops: Some(1),
5119 input: Box::new(LogicalOperator::Expand(ExpandOp {
5120 from_variable: "a".to_string(),
5121 to_variable: "b".to_string(),
5122 edge_variable: None,
5123 direction: ExpandDirection::Outgoing,
5124 edge_type: None,
5125 min_hops: 1,
5126 max_hops: Some(1),
5127 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5128 variable: "a".to_string(),
5129 label: None,
5130 input: None,
5131 })),
5132 path_alias: None,
5133 })),
5134 path_alias: None,
5135 })),
5136 }));
5137
5138 let physical = planner.plan(&logical).unwrap();
5139 assert!(physical.columns().contains(&"a".to_string()));
5140 assert!(physical.columns().contains(&"c".to_string()));
5141 }
5142
5143 #[test]
5146 fn test_plan_sort_by_property() {
5147 let store = create_test_store();
5148 let planner = Planner::new(store);
5149
5150 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5152 items: vec![ReturnItem {
5153 expression: LogicalExpression::Variable("n".to_string()),
5154 alias: None,
5155 }],
5156 distinct: false,
5157 input: Box::new(LogicalOperator::Sort(SortOp {
5158 keys: vec![SortKey {
5159 expression: LogicalExpression::Property {
5160 variable: "n".to_string(),
5161 property: "name".to_string(),
5162 },
5163 order: SortOrder::Ascending,
5164 }],
5165 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5166 variable: "n".to_string(),
5167 label: None,
5168 input: None,
5169 })),
5170 })),
5171 }));
5172
5173 let physical = planner.plan(&logical).unwrap();
5174 assert!(physical.columns().contains(&"n".to_string()));
5176 }
5177
5178 #[test]
5181 fn test_plan_scan_with_input() {
5182 let store = create_test_store();
5183 let planner = Planner::new(store);
5184
5185 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5187 items: vec![
5188 ReturnItem {
5189 expression: LogicalExpression::Variable("a".to_string()),
5190 alias: None,
5191 },
5192 ReturnItem {
5193 expression: LogicalExpression::Variable("b".to_string()),
5194 alias: None,
5195 },
5196 ],
5197 distinct: false,
5198 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5199 variable: "b".to_string(),
5200 label: Some("Company".to_string()),
5201 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
5202 variable: "a".to_string(),
5203 label: Some("Person".to_string()),
5204 input: None,
5205 }))),
5206 })),
5207 }));
5208
5209 let physical = planner.plan(&logical).unwrap();
5210 assert!(physical.columns().contains(&"a".to_string()));
5211 assert!(physical.columns().contains(&"b".to_string()));
5212 }
5213}