1mod aggregate;
8mod expand;
9mod expression;
10mod filter;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16use crate::query::plan::{
17 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
18 CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
19 ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
20 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, NodeScanOp,
21 RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp,
22 UnionOp, UnwindOp,
23};
24use grafeo_common::types::{EpochId, TxId};
25use grafeo_common::types::{LogicalType, Value};
26use grafeo_common::utils::error::{Error, Result};
27use grafeo_core::execution::AdaptiveContext;
28use grafeo_core::execution::operators::{
29 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
30 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
31 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
32 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
33 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
34 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
35 LeapfrogJoinOperator, LimitOperator, MapCollectOperator, MergeOperator,
36 MergeRelationshipConfig, MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator,
37 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
38 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
39 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
40 UnwindOperator, VariableLengthExpandOperator,
41};
42use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
43use std::collections::HashMap;
44use std::sync::Arc;
45
46use crate::transaction::TransactionManager;
47
48struct RangeBounds<'a> {
50 min: Option<&'a Value>,
51 max: Option<&'a Value>,
52 min_inclusive: bool,
53 max_inclusive: bool,
54}
55
56pub struct Planner {
58 pub(super) store: Arc<dyn GraphStoreMut>,
60 pub(super) tx_manager: Option<Arc<TransactionManager>>,
62 pub(super) tx_id: Option<TxId>,
64 pub(super) viewing_epoch: EpochId,
66 pub(super) anon_edge_counter: std::cell::Cell<u32>,
68 pub(super) factorized_execution: bool,
70 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
73 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
76}
77
78impl Planner {
79 #[must_use]
84 pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
85 let epoch = store.current_epoch();
86 Self {
87 store,
88 tx_manager: None,
89 tx_id: None,
90 viewing_epoch: epoch,
91 anon_edge_counter: std::cell::Cell::new(0),
92 factorized_execution: true,
93 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
94 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
95 }
96 }
97
98 #[must_use]
107 pub fn with_context(
108 store: Arc<dyn GraphStoreMut>,
109 tx_manager: Arc<TransactionManager>,
110 tx_id: Option<TxId>,
111 viewing_epoch: EpochId,
112 ) -> Self {
113 Self {
114 store,
115 tx_manager: Some(tx_manager),
116 tx_id,
117 viewing_epoch,
118 anon_edge_counter: std::cell::Cell::new(0),
119 factorized_execution: true,
120 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
121 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
122 }
123 }
124
125 #[must_use]
127 pub fn viewing_epoch(&self) -> EpochId {
128 self.viewing_epoch
129 }
130
131 #[must_use]
133 pub fn tx_id(&self) -> Option<TxId> {
134 self.tx_id
135 }
136
137 #[must_use]
139 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
140 self.tx_manager.as_ref()
141 }
142
143 #[must_use]
145 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
146 self.factorized_execution = enabled;
147 self
148 }
149
150 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
154 match op {
155 LogicalOperator::Expand(expand) => {
156 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
158
159 if is_single_hop {
160 let (inner_count, base) = Self::count_expand_chain(&expand.input);
161 (inner_count + 1, base)
162 } else {
163 (0, op)
165 }
166 }
167 _ => (0, op),
168 }
169 }
170
171 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
175 let mut chain = Vec::new();
176 let mut current = op;
177
178 while let LogicalOperator::Expand(expand) = current {
179 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
181 if !is_single_hop {
182 break;
183 }
184 chain.push(expand);
185 current = &expand.input;
186 }
187
188 chain.reverse();
190 chain
191 }
192
193 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
199 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
200 Ok(PhysicalPlan {
201 operator,
202 columns,
203 adaptive_context: None,
204 })
205 }
206
207 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
216 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
217
218 let mut adaptive_context = AdaptiveContext::new();
220 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
221
222 Ok(PhysicalPlan {
223 operator,
224 columns,
225 adaptive_context: Some(adaptive_context),
226 })
227 }
228
229 fn collect_cardinality_estimates(
231 &self,
232 op: &LogicalOperator,
233 ctx: &mut AdaptiveContext,
234 depth: usize,
235 ) {
236 match op {
237 LogicalOperator::NodeScan(scan) => {
238 let estimate = if let Some(label) = &scan.label {
240 self.store.nodes_by_label(label).len() as f64
241 } else {
242 self.store.node_count() as f64
243 };
244 let id = format!("scan_{}", scan.variable);
245 ctx.set_estimate(&id, estimate);
246
247 if let Some(input) = &scan.input {
249 self.collect_cardinality_estimates(input, ctx, depth + 1);
250 }
251 }
252 LogicalOperator::Filter(filter) => {
253 let input_estimate = self.estimate_cardinality(&filter.input);
255 let estimate = input_estimate * 0.3;
256 let id = format!("filter_{depth}");
257 ctx.set_estimate(&id, estimate);
258
259 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
260 }
261 LogicalOperator::Expand(expand) => {
262 let input_estimate = self.estimate_cardinality(&expand.input);
264 let stats = self.store.statistics();
265 let avg_degree = self.estimate_expand_degree(&stats, expand);
266 let estimate = input_estimate * avg_degree;
267 let id = format!("expand_{}", expand.to_variable);
268 ctx.set_estimate(&id, estimate);
269
270 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
271 }
272 LogicalOperator::Join(join) => {
273 let left_est = self.estimate_cardinality(&join.left);
275 let right_est = self.estimate_cardinality(&join.right);
276 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
278 ctx.set_estimate(&id, estimate);
279
280 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
281 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
282 }
283 LogicalOperator::Aggregate(agg) => {
284 let input_estimate = self.estimate_cardinality(&agg.input);
286 let estimate = if agg.group_by.is_empty() {
287 1.0 } else {
289 (input_estimate * 0.1).max(1.0) };
291 let id = format!("aggregate_{depth}");
292 ctx.set_estimate(&id, estimate);
293
294 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
295 }
296 LogicalOperator::Distinct(distinct) => {
297 let input_estimate = self.estimate_cardinality(&distinct.input);
298 let estimate = (input_estimate * 0.5).max(1.0);
299 let id = format!("distinct_{depth}");
300 ctx.set_estimate(&id, estimate);
301
302 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
303 }
304 LogicalOperator::Return(ret) => {
305 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
306 }
307 LogicalOperator::Limit(limit) => {
308 let input_estimate = self.estimate_cardinality(&limit.input);
309 let estimate = (input_estimate).min(limit.count as f64);
310 let id = format!("limit_{depth}");
311 ctx.set_estimate(&id, estimate);
312
313 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
314 }
315 LogicalOperator::Skip(skip) => {
316 let input_estimate = self.estimate_cardinality(&skip.input);
317 let estimate = (input_estimate - skip.count as f64).max(0.0);
318 let id = format!("skip_{depth}");
319 ctx.set_estimate(&id, estimate);
320
321 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
322 }
323 LogicalOperator::Sort(sort) => {
324 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
326 }
327 LogicalOperator::Union(union) => {
328 let estimate: f64 = union
329 .inputs
330 .iter()
331 .map(|input| self.estimate_cardinality(input))
332 .sum();
333 let id = format!("union_{depth}");
334 ctx.set_estimate(&id, estimate);
335
336 for input in &union.inputs {
337 self.collect_cardinality_estimates(input, ctx, depth + 1);
338 }
339 }
340 _ => {
341 }
343 }
344 }
345
346 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
348 match op {
349 LogicalOperator::NodeScan(scan) => {
350 if let Some(label) = &scan.label {
351 self.store.nodes_by_label(label).len() as f64
352 } else {
353 self.store.node_count() as f64
354 }
355 }
356 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
357 LogicalOperator::Expand(expand) => {
358 let stats = self.store.statistics();
359 let avg_degree = self.estimate_expand_degree(&stats, expand);
360 self.estimate_cardinality(&expand.input) * avg_degree
361 }
362 LogicalOperator::Join(join) => {
363 let left = self.estimate_cardinality(&join.left);
364 let right = self.estimate_cardinality(&join.right);
365 (left * right).sqrt()
366 }
367 LogicalOperator::Aggregate(agg) => {
368 if agg.group_by.is_empty() {
369 1.0
370 } else {
371 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
372 }
373 }
374 LogicalOperator::Distinct(distinct) => {
375 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
376 }
377 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
378 LogicalOperator::Limit(limit) => self
379 .estimate_cardinality(&limit.input)
380 .min(limit.count as f64),
381 LogicalOperator::Skip(skip) => {
382 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
383 }
384 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
385 LogicalOperator::Union(union) => union
386 .inputs
387 .iter()
388 .map(|input| self.estimate_cardinality(input))
389 .sum(),
390 _ => 1000.0, }
392 }
393
394 fn estimate_expand_degree(
396 &self,
397 stats: &grafeo_core::statistics::Statistics,
398 expand: &ExpandOp,
399 ) -> f64 {
400 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
401 if let Some(edge_type) = &expand.edge_type {
402 stats.estimate_avg_degree(edge_type, outgoing)
403 } else if stats.total_nodes > 0 {
404 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
405 } else {
406 10.0 }
408 }
409
410 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
412 match op {
413 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
414 LogicalOperator::Expand(expand) => {
415 if self.factorized_execution {
417 let (chain_len, _base) = Self::count_expand_chain(op);
418 if chain_len >= 2 {
419 return self.plan_expand_chain(op);
421 }
422 }
423 self.plan_expand(expand)
424 }
425 LogicalOperator::Return(ret) => self.plan_return(ret),
426 LogicalOperator::Filter(filter) => self.plan_filter(filter),
427 LogicalOperator::Project(project) => self.plan_project(project),
428 LogicalOperator::Limit(limit) => self.plan_limit(limit),
429 LogicalOperator::Skip(skip) => self.plan_skip(skip),
430 LogicalOperator::Sort(sort) => self.plan_sort(sort),
431 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
432 LogicalOperator::Join(join) => self.plan_join(join),
433 LogicalOperator::Union(union) => self.plan_union(union),
434 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
435 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
436 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
437 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
438 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
439 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
440 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
441 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
442 LogicalOperator::Merge(merge) => self.plan_merge(merge),
443 LogicalOperator::MergeRelationship(merge_rel) => {
444 self.plan_merge_relationship(merge_rel)
445 }
446 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
447 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
448 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
449 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
450 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
451 #[cfg(feature = "algos")]
452 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
453 #[cfg(not(feature = "algos"))]
454 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
455 "CALL procedures require the 'algos' feature".to_string(),
456 )),
457 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
458 LogicalOperator::VectorScan(_) => Err(Error::Internal(
459 "VectorScan requires vector-index feature".to_string(),
460 )),
461 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
462 "VectorJoin requires vector-index feature".to_string(),
463 )),
464 _ => Err(Error::Internal(format!(
465 "Unsupported operator: {:?}",
466 std::mem::discriminant(op)
467 ))),
468 }
469 }
470
471 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
473 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
474 let key_idx = child_columns
475 .iter()
476 .position(|c| c == &mc.key_var)
477 .ok_or_else(|| {
478 Error::Internal(format!(
479 "MapCollect key '{}' not in columns {:?}",
480 mc.key_var, child_columns
481 ))
482 })?;
483 let value_idx = child_columns
484 .iter()
485 .position(|c| c == &mc.value_var)
486 .ok_or_else(|| {
487 Error::Internal(format!(
488 "MapCollect value '{}' not in columns {:?}",
489 mc.value_var, child_columns
490 ))
491 })?;
492 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
493 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
496 Ok((operator, vec![mc.alias.clone()]))
497 }
498}
499
500pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
502 match op {
503 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
504 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
505 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
506 BinaryOp::Le => Ok(BinaryFilterOp::Le),
507 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
508 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
509 BinaryOp::And => Ok(BinaryFilterOp::And),
510 BinaryOp::Or => Ok(BinaryFilterOp::Or),
511 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
512 BinaryOp::Add => Ok(BinaryFilterOp::Add),
513 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
514 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
515 BinaryOp::Div => Ok(BinaryFilterOp::Div),
516 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
517 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
518 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
519 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
520 BinaryOp::In => Ok(BinaryFilterOp::In),
521 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
522 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
523 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
524 "Binary operator {:?} not yet supported in filters",
525 op
526 ))),
527 }
528}
529
530pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
532 match op {
533 UnaryOp::Not => Ok(UnaryFilterOp::Not),
534 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
535 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
536 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
537 }
538}
539
540pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
542 match func {
543 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
544 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
545 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
546 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
547 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
548 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
549 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
550 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
551 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
552 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
553 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
554 }
555}
556
557pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
561 match expr {
562 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
563 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
564 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
565 variable: variable.clone(),
566 property: property.clone(),
567 }),
568 LogicalExpression::Binary { left, op, right } => {
569 let left_expr = convert_filter_expression(left)?;
570 let right_expr = convert_filter_expression(right)?;
571 let filter_op = convert_binary_op(*op)?;
572 Ok(FilterExpression::Binary {
573 left: Box::new(left_expr),
574 op: filter_op,
575 right: Box::new(right_expr),
576 })
577 }
578 LogicalExpression::Unary { op, operand } => {
579 let operand_expr = convert_filter_expression(operand)?;
580 let filter_op = convert_unary_op(*op)?;
581 Ok(FilterExpression::Unary {
582 op: filter_op,
583 operand: Box::new(operand_expr),
584 })
585 }
586 LogicalExpression::FunctionCall { name, args, .. } => {
587 let filter_args: Vec<FilterExpression> = args
588 .iter()
589 .map(convert_filter_expression)
590 .collect::<Result<Vec<_>>>()?;
591 Ok(FilterExpression::FunctionCall {
592 name: name.clone(),
593 args: filter_args,
594 })
595 }
596 LogicalExpression::Case {
597 operand,
598 when_clauses,
599 else_clause,
600 } => {
601 let filter_operand = operand
602 .as_ref()
603 .map(|e| convert_filter_expression(e))
604 .transpose()?
605 .map(Box::new);
606 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
607 .iter()
608 .map(|(cond, result)| {
609 Ok((
610 convert_filter_expression(cond)?,
611 convert_filter_expression(result)?,
612 ))
613 })
614 .collect::<Result<Vec<_>>>()?;
615 let filter_else = else_clause
616 .as_ref()
617 .map(|e| convert_filter_expression(e))
618 .transpose()?
619 .map(Box::new);
620 Ok(FilterExpression::Case {
621 operand: filter_operand,
622 when_clauses: filter_when_clauses,
623 else_clause: filter_else,
624 })
625 }
626 LogicalExpression::List(items) => {
627 let filter_items: Vec<FilterExpression> = items
628 .iter()
629 .map(convert_filter_expression)
630 .collect::<Result<Vec<_>>>()?;
631 Ok(FilterExpression::List(filter_items))
632 }
633 LogicalExpression::Map(pairs) => {
634 let filter_pairs: Vec<(String, FilterExpression)> = pairs
635 .iter()
636 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
637 .collect::<Result<Vec<_>>>()?;
638 Ok(FilterExpression::Map(filter_pairs))
639 }
640 LogicalExpression::IndexAccess { base, index } => {
641 let base_expr = convert_filter_expression(base)?;
642 let index_expr = convert_filter_expression(index)?;
643 Ok(FilterExpression::IndexAccess {
644 base: Box::new(base_expr),
645 index: Box::new(index_expr),
646 })
647 }
648 LogicalExpression::SliceAccess { base, start, end } => {
649 let base_expr = convert_filter_expression(base)?;
650 let start_expr = start
651 .as_ref()
652 .map(|s| convert_filter_expression(s))
653 .transpose()?
654 .map(Box::new);
655 let end_expr = end
656 .as_ref()
657 .map(|e| convert_filter_expression(e))
658 .transpose()?
659 .map(Box::new);
660 Ok(FilterExpression::SliceAccess {
661 base: Box::new(base_expr),
662 start: start_expr,
663 end: end_expr,
664 })
665 }
666 LogicalExpression::Parameter(_) => Err(Error::Internal(
667 "Parameters not yet supported in filters".to_string(),
668 )),
669 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
670 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
671 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
672 LogicalExpression::ListComprehension {
673 variable,
674 list_expr,
675 filter_expr,
676 map_expr,
677 } => {
678 let list = convert_filter_expression(list_expr)?;
679 let filter = filter_expr
680 .as_ref()
681 .map(|f| convert_filter_expression(f))
682 .transpose()?
683 .map(Box::new);
684 let map = convert_filter_expression(map_expr)?;
685 Ok(FilterExpression::ListComprehension {
686 variable: variable.clone(),
687 list_expr: Box::new(list),
688 filter_expr: filter,
689 map_expr: Box::new(map),
690 })
691 }
692 LogicalExpression::ListPredicate {
693 kind,
694 variable,
695 list_expr,
696 predicate,
697 } => {
698 use crate::query::plan::ListPredicateKind as LPK;
699 let filter_kind = match kind {
700 LPK::All => grafeo_core::execution::operators::ListPredicateKind::All,
701 LPK::Any => grafeo_core::execution::operators::ListPredicateKind::Any,
702 LPK::None => grafeo_core::execution::operators::ListPredicateKind::None,
703 LPK::Single => grafeo_core::execution::operators::ListPredicateKind::Single,
704 };
705 let list = convert_filter_expression(list_expr)?;
706 let pred = convert_filter_expression(predicate)?;
707 Ok(FilterExpression::ListPredicate {
708 kind: filter_kind,
709 variable: variable.clone(),
710 list_expr: Box::new(list),
711 predicate: Box::new(pred),
712 })
713 }
714 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
715 Error::Internal("Subqueries not yet supported in filters".to_string()),
716 ),
717 }
718}
719
720fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
722 use grafeo_common::types::Value;
723 match value {
724 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
726 Value::Int64(_) => LogicalType::Int64,
727 Value::Float64(_) => LogicalType::Float64,
728 Value::String(_) => LogicalType::String,
729 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
731 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
734 }
735}
736
737fn expression_to_string(expr: &LogicalExpression) -> String {
739 match expr {
740 LogicalExpression::Variable(name) => name.clone(),
741 LogicalExpression::Property { variable, property } => {
742 format!("{variable}.{property}")
743 }
744 LogicalExpression::Literal(value) => format!("{value:?}"),
745 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
746 _ => "expr".to_string(),
747 }
748}
749
750pub struct PhysicalPlan {
752 pub operator: Box<dyn Operator>,
754 pub columns: Vec<String>,
756 pub adaptive_context: Option<AdaptiveContext>,
762}
763
764impl PhysicalPlan {
765 #[must_use]
767 pub fn columns(&self) -> &[String] {
768 &self.columns
769 }
770
771 pub fn into_operator(self) -> Box<dyn Operator> {
773 self.operator
774 }
775
776 #[must_use]
778 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
779 self.adaptive_context.as_ref()
780 }
781
782 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
784 self.adaptive_context.take()
785 }
786}
787
788struct StaticResultOperator {
790 rows: Vec<Vec<Value>>,
791 column_indices: Vec<usize>,
792 row_index: usize,
793}
794
795impl Operator for StaticResultOperator {
796 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
797 use grafeo_core::execution::DataChunk;
798
799 if self.row_index >= self.rows.len() {
800 return Ok(None);
801 }
802
803 let remaining = self.rows.len() - self.row_index;
804 let chunk_rows = remaining.min(1024);
805 let col_count = self.column_indices.len();
806
807 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
808 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
809
810 for row_offset in 0..chunk_rows {
811 let row = &self.rows[self.row_index + row_offset];
812 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
813 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
814 if let Some(col) = chunk.column_mut(col_idx) {
815 col.push_value(value);
816 }
817 }
818 }
819 chunk.set_count(chunk_rows);
820
821 self.row_index += chunk_rows;
822 Ok(Some(chunk))
823 }
824
825 fn reset(&mut self) {
826 self.row_index = 0;
827 }
828
829 fn name(&self) -> &'static str {
830 "StaticResult"
831 }
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837 use crate::query::plan::{
838 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
839 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
840 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
841 SortKey, SortOp,
842 };
843 use grafeo_common::types::Value;
844 use grafeo_core::graph::GraphStoreMut;
845 use grafeo_core::graph::lpg::LpgStore;
846
847 fn create_test_store() -> Arc<dyn GraphStoreMut> {
848 let store = Arc::new(LpgStore::new());
849 store.create_node(&["Person"]);
850 store.create_node(&["Person"]);
851 store.create_node(&["Company"]);
852 store
853 }
854
855 #[test]
858 fn test_plan_simple_scan() {
859 let store = create_test_store();
860 let planner = Planner::new(store);
861
862 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
864 items: vec![ReturnItem {
865 expression: LogicalExpression::Variable("n".to_string()),
866 alias: None,
867 }],
868 distinct: false,
869 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
870 variable: "n".to_string(),
871 label: Some("Person".to_string()),
872 input: None,
873 })),
874 }));
875
876 let physical = planner.plan(&logical).unwrap();
877 assert_eq!(physical.columns(), &["n"]);
878 }
879
880 #[test]
881 fn test_plan_scan_without_label() {
882 let store = create_test_store();
883 let planner = Planner::new(store);
884
885 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
887 items: vec![ReturnItem {
888 expression: LogicalExpression::Variable("n".to_string()),
889 alias: None,
890 }],
891 distinct: false,
892 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
893 variable: "n".to_string(),
894 label: None,
895 input: None,
896 })),
897 }));
898
899 let physical = planner.plan(&logical).unwrap();
900 assert_eq!(physical.columns(), &["n"]);
901 }
902
903 #[test]
904 fn test_plan_return_with_alias() {
905 let store = create_test_store();
906 let planner = Planner::new(store);
907
908 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
910 items: vec![ReturnItem {
911 expression: LogicalExpression::Variable("n".to_string()),
912 alias: Some("person".to_string()),
913 }],
914 distinct: false,
915 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
916 variable: "n".to_string(),
917 label: Some("Person".to_string()),
918 input: None,
919 })),
920 }));
921
922 let physical = planner.plan(&logical).unwrap();
923 assert_eq!(physical.columns(), &["person"]);
924 }
925
926 #[test]
927 fn test_plan_return_property() {
928 let store = create_test_store();
929 let planner = Planner::new(store);
930
931 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
933 items: vec![ReturnItem {
934 expression: LogicalExpression::Property {
935 variable: "n".to_string(),
936 property: "name".to_string(),
937 },
938 alias: None,
939 }],
940 distinct: false,
941 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
942 variable: "n".to_string(),
943 label: Some("Person".to_string()),
944 input: None,
945 })),
946 }));
947
948 let physical = planner.plan(&logical).unwrap();
949 assert_eq!(physical.columns(), &["n.name"]);
950 }
951
952 #[test]
953 fn test_plan_return_literal() {
954 let store = create_test_store();
955 let planner = Planner::new(store);
956
957 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
959 items: vec![ReturnItem {
960 expression: LogicalExpression::Literal(Value::Int64(42)),
961 alias: Some("answer".to_string()),
962 }],
963 distinct: false,
964 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
965 variable: "n".to_string(),
966 label: None,
967 input: None,
968 })),
969 }));
970
971 let physical = planner.plan(&logical).unwrap();
972 assert_eq!(physical.columns(), &["answer"]);
973 }
974
975 #[test]
978 fn test_plan_filter_equality() {
979 let store = create_test_store();
980 let planner = Planner::new(store);
981
982 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
984 items: vec![ReturnItem {
985 expression: LogicalExpression::Variable("n".to_string()),
986 alias: None,
987 }],
988 distinct: false,
989 input: Box::new(LogicalOperator::Filter(FilterOp {
990 predicate: LogicalExpression::Binary {
991 left: Box::new(LogicalExpression::Property {
992 variable: "n".to_string(),
993 property: "age".to_string(),
994 }),
995 op: BinaryOp::Eq,
996 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
997 },
998 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
999 variable: "n".to_string(),
1000 label: Some("Person".to_string()),
1001 input: None,
1002 })),
1003 })),
1004 }));
1005
1006 let physical = planner.plan(&logical).unwrap();
1007 assert_eq!(physical.columns(), &["n"]);
1008 }
1009
1010 #[test]
1011 fn test_plan_filter_compound_and() {
1012 let store = create_test_store();
1013 let planner = Planner::new(store);
1014
1015 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1017 items: vec![ReturnItem {
1018 expression: LogicalExpression::Variable("n".to_string()),
1019 alias: None,
1020 }],
1021 distinct: false,
1022 input: Box::new(LogicalOperator::Filter(FilterOp {
1023 predicate: LogicalExpression::Binary {
1024 left: Box::new(LogicalExpression::Binary {
1025 left: Box::new(LogicalExpression::Property {
1026 variable: "n".to_string(),
1027 property: "age".to_string(),
1028 }),
1029 op: BinaryOp::Gt,
1030 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1031 }),
1032 op: BinaryOp::And,
1033 right: Box::new(LogicalExpression::Binary {
1034 left: Box::new(LogicalExpression::Property {
1035 variable: "n".to_string(),
1036 property: "age".to_string(),
1037 }),
1038 op: BinaryOp::Lt,
1039 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1040 }),
1041 },
1042 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1043 variable: "n".to_string(),
1044 label: None,
1045 input: None,
1046 })),
1047 })),
1048 }));
1049
1050 let physical = planner.plan(&logical).unwrap();
1051 assert_eq!(physical.columns(), &["n"]);
1052 }
1053
1054 #[test]
1055 fn test_plan_filter_unary_not() {
1056 let store = create_test_store();
1057 let planner = Planner::new(store);
1058
1059 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1061 items: vec![ReturnItem {
1062 expression: LogicalExpression::Variable("n".to_string()),
1063 alias: None,
1064 }],
1065 distinct: false,
1066 input: Box::new(LogicalOperator::Filter(FilterOp {
1067 predicate: LogicalExpression::Unary {
1068 op: UnaryOp::Not,
1069 operand: Box::new(LogicalExpression::Property {
1070 variable: "n".to_string(),
1071 property: "active".to_string(),
1072 }),
1073 },
1074 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1075 variable: "n".to_string(),
1076 label: None,
1077 input: None,
1078 })),
1079 })),
1080 }));
1081
1082 let physical = planner.plan(&logical).unwrap();
1083 assert_eq!(physical.columns(), &["n"]);
1084 }
1085
1086 #[test]
1087 fn test_plan_filter_is_null() {
1088 let store = create_test_store();
1089 let planner = Planner::new(store);
1090
1091 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1093 items: vec![ReturnItem {
1094 expression: LogicalExpression::Variable("n".to_string()),
1095 alias: None,
1096 }],
1097 distinct: false,
1098 input: Box::new(LogicalOperator::Filter(FilterOp {
1099 predicate: LogicalExpression::Unary {
1100 op: UnaryOp::IsNull,
1101 operand: Box::new(LogicalExpression::Property {
1102 variable: "n".to_string(),
1103 property: "email".to_string(),
1104 }),
1105 },
1106 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1107 variable: "n".to_string(),
1108 label: None,
1109 input: None,
1110 })),
1111 })),
1112 }));
1113
1114 let physical = planner.plan(&logical).unwrap();
1115 assert_eq!(physical.columns(), &["n"]);
1116 }
1117
1118 #[test]
1119 fn test_plan_filter_function_call() {
1120 let store = create_test_store();
1121 let planner = Planner::new(store);
1122
1123 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1125 items: vec![ReturnItem {
1126 expression: LogicalExpression::Variable("n".to_string()),
1127 alias: None,
1128 }],
1129 distinct: false,
1130 input: Box::new(LogicalOperator::Filter(FilterOp {
1131 predicate: LogicalExpression::Binary {
1132 left: Box::new(LogicalExpression::FunctionCall {
1133 name: "size".to_string(),
1134 args: vec![LogicalExpression::Property {
1135 variable: "n".to_string(),
1136 property: "friends".to_string(),
1137 }],
1138 distinct: false,
1139 }),
1140 op: BinaryOp::Gt,
1141 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1142 },
1143 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1144 variable: "n".to_string(),
1145 label: None,
1146 input: None,
1147 })),
1148 })),
1149 }));
1150
1151 let physical = planner.plan(&logical).unwrap();
1152 assert_eq!(physical.columns(), &["n"]);
1153 }
1154
1155 #[test]
1158 fn test_plan_expand_outgoing() {
1159 let store = create_test_store();
1160 let planner = Planner::new(store);
1161
1162 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1164 items: vec![
1165 ReturnItem {
1166 expression: LogicalExpression::Variable("a".to_string()),
1167 alias: None,
1168 },
1169 ReturnItem {
1170 expression: LogicalExpression::Variable("b".to_string()),
1171 alias: None,
1172 },
1173 ],
1174 distinct: false,
1175 input: Box::new(LogicalOperator::Expand(ExpandOp {
1176 from_variable: "a".to_string(),
1177 to_variable: "b".to_string(),
1178 edge_variable: None,
1179 direction: ExpandDirection::Outgoing,
1180 edge_type: Some("KNOWS".to_string()),
1181 min_hops: 1,
1182 max_hops: Some(1),
1183 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1184 variable: "a".to_string(),
1185 label: Some("Person".to_string()),
1186 input: None,
1187 })),
1188 path_alias: None,
1189 })),
1190 }));
1191
1192 let physical = planner.plan(&logical).unwrap();
1193 assert!(physical.columns().contains(&"a".to_string()));
1195 assert!(physical.columns().contains(&"b".to_string()));
1196 }
1197
1198 #[test]
1199 fn test_plan_expand_with_edge_variable() {
1200 let store = create_test_store();
1201 let planner = Planner::new(store);
1202
1203 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1205 items: vec![
1206 ReturnItem {
1207 expression: LogicalExpression::Variable("a".to_string()),
1208 alias: None,
1209 },
1210 ReturnItem {
1211 expression: LogicalExpression::Variable("r".to_string()),
1212 alias: None,
1213 },
1214 ReturnItem {
1215 expression: LogicalExpression::Variable("b".to_string()),
1216 alias: None,
1217 },
1218 ],
1219 distinct: false,
1220 input: Box::new(LogicalOperator::Expand(ExpandOp {
1221 from_variable: "a".to_string(),
1222 to_variable: "b".to_string(),
1223 edge_variable: Some("r".to_string()),
1224 direction: ExpandDirection::Outgoing,
1225 edge_type: Some("KNOWS".to_string()),
1226 min_hops: 1,
1227 max_hops: Some(1),
1228 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1229 variable: "a".to_string(),
1230 label: None,
1231 input: None,
1232 })),
1233 path_alias: None,
1234 })),
1235 }));
1236
1237 let physical = planner.plan(&logical).unwrap();
1238 assert!(physical.columns().contains(&"a".to_string()));
1239 assert!(physical.columns().contains(&"r".to_string()));
1240 assert!(physical.columns().contains(&"b".to_string()));
1241 }
1242
1243 #[test]
1246 fn test_plan_limit() {
1247 let store = create_test_store();
1248 let planner = Planner::new(store);
1249
1250 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1252 items: vec![ReturnItem {
1253 expression: LogicalExpression::Variable("n".to_string()),
1254 alias: None,
1255 }],
1256 distinct: false,
1257 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1258 count: 10,
1259 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1260 variable: "n".to_string(),
1261 label: None,
1262 input: None,
1263 })),
1264 })),
1265 }));
1266
1267 let physical = planner.plan(&logical).unwrap();
1268 assert_eq!(physical.columns(), &["n"]);
1269 }
1270
1271 #[test]
1272 fn test_plan_skip() {
1273 let store = create_test_store();
1274 let planner = Planner::new(store);
1275
1276 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1278 items: vec![ReturnItem {
1279 expression: LogicalExpression::Variable("n".to_string()),
1280 alias: None,
1281 }],
1282 distinct: false,
1283 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1284 count: 5,
1285 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1286 variable: "n".to_string(),
1287 label: None,
1288 input: None,
1289 })),
1290 })),
1291 }));
1292
1293 let physical = planner.plan(&logical).unwrap();
1294 assert_eq!(physical.columns(), &["n"]);
1295 }
1296
1297 #[test]
1298 fn test_plan_sort() {
1299 let store = create_test_store();
1300 let planner = Planner::new(store);
1301
1302 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1304 items: vec![ReturnItem {
1305 expression: LogicalExpression::Variable("n".to_string()),
1306 alias: None,
1307 }],
1308 distinct: false,
1309 input: Box::new(LogicalOperator::Sort(SortOp {
1310 keys: vec![SortKey {
1311 expression: LogicalExpression::Variable("n".to_string()),
1312 order: SortOrder::Ascending,
1313 }],
1314 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1315 variable: "n".to_string(),
1316 label: None,
1317 input: None,
1318 })),
1319 })),
1320 }));
1321
1322 let physical = planner.plan(&logical).unwrap();
1323 assert_eq!(physical.columns(), &["n"]);
1324 }
1325
1326 #[test]
1327 fn test_plan_sort_descending() {
1328 let store = create_test_store();
1329 let planner = Planner::new(store);
1330
1331 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1333 items: vec![ReturnItem {
1334 expression: LogicalExpression::Variable("n".to_string()),
1335 alias: None,
1336 }],
1337 distinct: false,
1338 input: Box::new(LogicalOperator::Sort(SortOp {
1339 keys: vec![SortKey {
1340 expression: LogicalExpression::Variable("n".to_string()),
1341 order: SortOrder::Descending,
1342 }],
1343 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1344 variable: "n".to_string(),
1345 label: None,
1346 input: None,
1347 })),
1348 })),
1349 }));
1350
1351 let physical = planner.plan(&logical).unwrap();
1352 assert_eq!(physical.columns(), &["n"]);
1353 }
1354
1355 #[test]
1356 fn test_plan_distinct() {
1357 let store = create_test_store();
1358 let planner = Planner::new(store);
1359
1360 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1362 items: vec![ReturnItem {
1363 expression: LogicalExpression::Variable("n".to_string()),
1364 alias: None,
1365 }],
1366 distinct: false,
1367 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1368 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1369 variable: "n".to_string(),
1370 label: None,
1371 input: None,
1372 })),
1373 columns: None,
1374 })),
1375 }));
1376
1377 let physical = planner.plan(&logical).unwrap();
1378 assert_eq!(physical.columns(), &["n"]);
1379 }
1380
1381 #[test]
1382 fn test_plan_distinct_with_columns() {
1383 let store = create_test_store();
1384 let planner = Planner::new(store);
1385
1386 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1388 items: vec![ReturnItem {
1389 expression: LogicalExpression::Variable("n".to_string()),
1390 alias: None,
1391 }],
1392 distinct: false,
1393 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1394 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1395 variable: "n".to_string(),
1396 label: None,
1397 input: None,
1398 })),
1399 columns: Some(vec!["n".to_string()]),
1400 })),
1401 }));
1402
1403 let physical = planner.plan(&logical).unwrap();
1404 assert_eq!(physical.columns(), &["n"]);
1405 }
1406
1407 #[test]
1408 fn test_plan_distinct_with_nonexistent_columns() {
1409 let store = create_test_store();
1410 let planner = Planner::new(store);
1411
1412 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1415 items: vec![ReturnItem {
1416 expression: LogicalExpression::Variable("n".to_string()),
1417 alias: None,
1418 }],
1419 distinct: false,
1420 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1421 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1422 variable: "n".to_string(),
1423 label: None,
1424 input: None,
1425 })),
1426 columns: Some(vec!["nonexistent".to_string()]),
1427 })),
1428 }));
1429
1430 let physical = planner.plan(&logical).unwrap();
1431 assert_eq!(physical.columns(), &["n"]);
1432 }
1433
1434 #[test]
1437 fn test_plan_aggregate_count() {
1438 let store = create_test_store();
1439 let planner = Planner::new(store);
1440
1441 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1443 items: vec![ReturnItem {
1444 expression: LogicalExpression::Variable("cnt".to_string()),
1445 alias: None,
1446 }],
1447 distinct: false,
1448 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1449 group_by: vec![],
1450 aggregates: vec![LogicalAggregateExpr {
1451 function: LogicalAggregateFunction::Count,
1452 expression: Some(LogicalExpression::Variable("n".to_string())),
1453 distinct: false,
1454 alias: Some("cnt".to_string()),
1455 percentile: None,
1456 }],
1457 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1458 variable: "n".to_string(),
1459 label: None,
1460 input: None,
1461 })),
1462 having: None,
1463 })),
1464 }));
1465
1466 let physical = planner.plan(&logical).unwrap();
1467 assert!(physical.columns().contains(&"cnt".to_string()));
1468 }
1469
1470 #[test]
1471 fn test_plan_aggregate_with_group_by() {
1472 let store = create_test_store();
1473 let planner = Planner::new(store);
1474
1475 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1477 group_by: vec![LogicalExpression::Property {
1478 variable: "n".to_string(),
1479 property: "city".to_string(),
1480 }],
1481 aggregates: vec![LogicalAggregateExpr {
1482 function: LogicalAggregateFunction::Count,
1483 expression: Some(LogicalExpression::Variable("n".to_string())),
1484 distinct: false,
1485 alias: Some("cnt".to_string()),
1486 percentile: None,
1487 }],
1488 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1489 variable: "n".to_string(),
1490 label: Some("Person".to_string()),
1491 input: None,
1492 })),
1493 having: None,
1494 }));
1495
1496 let physical = planner.plan(&logical).unwrap();
1497 assert_eq!(physical.columns().len(), 2);
1498 }
1499
1500 #[test]
1501 fn test_plan_aggregate_sum() {
1502 let store = create_test_store();
1503 let planner = Planner::new(store);
1504
1505 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1507 group_by: vec![],
1508 aggregates: vec![LogicalAggregateExpr {
1509 function: LogicalAggregateFunction::Sum,
1510 expression: Some(LogicalExpression::Property {
1511 variable: "n".to_string(),
1512 property: "value".to_string(),
1513 }),
1514 distinct: false,
1515 alias: Some("total".to_string()),
1516 percentile: None,
1517 }],
1518 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1519 variable: "n".to_string(),
1520 label: None,
1521 input: None,
1522 })),
1523 having: None,
1524 }));
1525
1526 let physical = planner.plan(&logical).unwrap();
1527 assert!(physical.columns().contains(&"total".to_string()));
1528 }
1529
1530 #[test]
1531 fn test_plan_aggregate_avg() {
1532 let store = create_test_store();
1533 let planner = Planner::new(store);
1534
1535 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1537 group_by: vec![],
1538 aggregates: vec![LogicalAggregateExpr {
1539 function: LogicalAggregateFunction::Avg,
1540 expression: Some(LogicalExpression::Property {
1541 variable: "n".to_string(),
1542 property: "score".to_string(),
1543 }),
1544 distinct: false,
1545 alias: Some("average".to_string()),
1546 percentile: None,
1547 }],
1548 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1549 variable: "n".to_string(),
1550 label: None,
1551 input: None,
1552 })),
1553 having: None,
1554 }));
1555
1556 let physical = planner.plan(&logical).unwrap();
1557 assert!(physical.columns().contains(&"average".to_string()));
1558 }
1559
1560 #[test]
1561 fn test_plan_aggregate_min_max() {
1562 let store = create_test_store();
1563 let planner = Planner::new(store);
1564
1565 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1567 group_by: vec![],
1568 aggregates: vec![
1569 LogicalAggregateExpr {
1570 function: LogicalAggregateFunction::Min,
1571 expression: Some(LogicalExpression::Property {
1572 variable: "n".to_string(),
1573 property: "age".to_string(),
1574 }),
1575 distinct: false,
1576 alias: Some("youngest".to_string()),
1577 percentile: None,
1578 },
1579 LogicalAggregateExpr {
1580 function: LogicalAggregateFunction::Max,
1581 expression: Some(LogicalExpression::Property {
1582 variable: "n".to_string(),
1583 property: "age".to_string(),
1584 }),
1585 distinct: false,
1586 alias: Some("oldest".to_string()),
1587 percentile: None,
1588 },
1589 ],
1590 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1591 variable: "n".to_string(),
1592 label: None,
1593 input: None,
1594 })),
1595 having: None,
1596 }));
1597
1598 let physical = planner.plan(&logical).unwrap();
1599 assert!(physical.columns().contains(&"youngest".to_string()));
1600 assert!(physical.columns().contains(&"oldest".to_string()));
1601 }
1602
1603 #[test]
1606 fn test_plan_inner_join() {
1607 let store = create_test_store();
1608 let planner = Planner::new(store);
1609
1610 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1612 items: vec![
1613 ReturnItem {
1614 expression: LogicalExpression::Variable("a".to_string()),
1615 alias: None,
1616 },
1617 ReturnItem {
1618 expression: LogicalExpression::Variable("b".to_string()),
1619 alias: None,
1620 },
1621 ],
1622 distinct: false,
1623 input: Box::new(LogicalOperator::Join(JoinOp {
1624 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1625 variable: "a".to_string(),
1626 label: Some("Person".to_string()),
1627 input: None,
1628 })),
1629 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1630 variable: "b".to_string(),
1631 label: Some("Company".to_string()),
1632 input: None,
1633 })),
1634 join_type: JoinType::Inner,
1635 conditions: vec![JoinCondition {
1636 left: LogicalExpression::Variable("a".to_string()),
1637 right: LogicalExpression::Variable("b".to_string()),
1638 }],
1639 })),
1640 }));
1641
1642 let physical = planner.plan(&logical).unwrap();
1643 assert!(physical.columns().contains(&"a".to_string()));
1644 assert!(physical.columns().contains(&"b".to_string()));
1645 }
1646
1647 #[test]
1648 fn test_plan_cross_join() {
1649 let store = create_test_store();
1650 let planner = Planner::new(store);
1651
1652 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1654 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1655 variable: "a".to_string(),
1656 label: None,
1657 input: None,
1658 })),
1659 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1660 variable: "b".to_string(),
1661 label: None,
1662 input: None,
1663 })),
1664 join_type: JoinType::Cross,
1665 conditions: vec![],
1666 }));
1667
1668 let physical = planner.plan(&logical).unwrap();
1669 assert_eq!(physical.columns().len(), 2);
1670 }
1671
1672 #[test]
1673 fn test_plan_left_join() {
1674 let store = create_test_store();
1675 let planner = Planner::new(store);
1676
1677 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1678 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1679 variable: "a".to_string(),
1680 label: None,
1681 input: None,
1682 })),
1683 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1684 variable: "b".to_string(),
1685 label: None,
1686 input: None,
1687 })),
1688 join_type: JoinType::Left,
1689 conditions: vec![],
1690 }));
1691
1692 let physical = planner.plan(&logical).unwrap();
1693 assert_eq!(physical.columns().len(), 2);
1694 }
1695
1696 #[test]
1699 fn test_plan_create_node() {
1700 let store = create_test_store();
1701 let planner = Planner::new(store);
1702
1703 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1705 variable: "n".to_string(),
1706 labels: vec!["Person".to_string()],
1707 properties: vec![(
1708 "name".to_string(),
1709 LogicalExpression::Literal(Value::String("Alice".into())),
1710 )],
1711 input: None,
1712 }));
1713
1714 let physical = planner.plan(&logical).unwrap();
1715 assert!(physical.columns().contains(&"n".to_string()));
1716 }
1717
1718 #[test]
1719 fn test_plan_create_edge() {
1720 let store = create_test_store();
1721 let planner = Planner::new(store);
1722
1723 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1725 variable: Some("r".to_string()),
1726 from_variable: "a".to_string(),
1727 to_variable: "b".to_string(),
1728 edge_type: "KNOWS".to_string(),
1729 properties: vec![],
1730 input: Box::new(LogicalOperator::Join(JoinOp {
1731 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1732 variable: "a".to_string(),
1733 label: None,
1734 input: None,
1735 })),
1736 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1737 variable: "b".to_string(),
1738 label: None,
1739 input: None,
1740 })),
1741 join_type: JoinType::Cross,
1742 conditions: vec![],
1743 })),
1744 }));
1745
1746 let physical = planner.plan(&logical).unwrap();
1747 assert!(physical.columns().contains(&"r".to_string()));
1748 }
1749
1750 #[test]
1751 fn test_plan_delete_node() {
1752 let store = create_test_store();
1753 let planner = Planner::new(store);
1754
1755 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1757 variable: "n".to_string(),
1758 detach: false,
1759 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1760 variable: "n".to_string(),
1761 label: None,
1762 input: None,
1763 })),
1764 }));
1765
1766 let physical = planner.plan(&logical).unwrap();
1767 assert!(physical.columns().contains(&"deleted_count".to_string()));
1768 }
1769
1770 #[test]
1773 fn test_plan_empty_errors() {
1774 let store = create_test_store();
1775 let planner = Planner::new(store);
1776
1777 let logical = LogicalPlan::new(LogicalOperator::Empty);
1778 let result = planner.plan(&logical);
1779 assert!(result.is_err());
1780 }
1781
1782 #[test]
1783 fn test_plan_missing_variable_in_return() {
1784 let store = create_test_store();
1785 let planner = Planner::new(store);
1786
1787 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1789 items: vec![ReturnItem {
1790 expression: LogicalExpression::Variable("missing".to_string()),
1791 alias: None,
1792 }],
1793 distinct: false,
1794 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1795 variable: "n".to_string(),
1796 label: None,
1797 input: None,
1798 })),
1799 }));
1800
1801 let result = planner.plan(&logical);
1802 assert!(result.is_err());
1803 }
1804
1805 #[test]
1808 fn test_convert_binary_ops() {
1809 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1810 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1811 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1812 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1813 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1814 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1815 assert!(convert_binary_op(BinaryOp::And).is_ok());
1816 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1817 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1818 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1819 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1820 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1821 }
1822
1823 #[test]
1824 fn test_convert_unary_ops() {
1825 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1826 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1827 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1828 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1829 }
1830
1831 #[test]
1832 fn test_convert_aggregate_functions() {
1833 assert!(matches!(
1834 convert_aggregate_function(LogicalAggregateFunction::Count),
1835 PhysicalAggregateFunction::Count
1836 ));
1837 assert!(matches!(
1838 convert_aggregate_function(LogicalAggregateFunction::Sum),
1839 PhysicalAggregateFunction::Sum
1840 ));
1841 assert!(matches!(
1842 convert_aggregate_function(LogicalAggregateFunction::Avg),
1843 PhysicalAggregateFunction::Avg
1844 ));
1845 assert!(matches!(
1846 convert_aggregate_function(LogicalAggregateFunction::Min),
1847 PhysicalAggregateFunction::Min
1848 ));
1849 assert!(matches!(
1850 convert_aggregate_function(LogicalAggregateFunction::Max),
1851 PhysicalAggregateFunction::Max
1852 ));
1853 }
1854
1855 #[test]
1856 fn test_planner_accessors() {
1857 let store = create_test_store();
1858 let planner = Planner::new(Arc::clone(&store));
1859
1860 assert!(planner.tx_id().is_none());
1861 assert!(planner.tx_manager().is_none());
1862 let _ = planner.viewing_epoch(); }
1864
1865 #[test]
1866 fn test_physical_plan_accessors() {
1867 let store = create_test_store();
1868 let planner = Planner::new(store);
1869
1870 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1871 variable: "n".to_string(),
1872 label: None,
1873 input: None,
1874 }));
1875
1876 let physical = planner.plan(&logical).unwrap();
1877 assert_eq!(physical.columns(), &["n"]);
1878
1879 let _ = physical.into_operator();
1881 }
1882
1883 #[test]
1886 fn test_plan_adaptive_with_scan() {
1887 let store = create_test_store();
1888 let planner = Planner::new(store);
1889
1890 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1892 items: vec![ReturnItem {
1893 expression: LogicalExpression::Variable("n".to_string()),
1894 alias: None,
1895 }],
1896 distinct: false,
1897 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1898 variable: "n".to_string(),
1899 label: Some("Person".to_string()),
1900 input: None,
1901 })),
1902 }));
1903
1904 let physical = planner.plan_adaptive(&logical).unwrap();
1905 assert_eq!(physical.columns(), &["n"]);
1906 assert!(physical.adaptive_context.is_some());
1908 }
1909
1910 #[test]
1911 fn test_plan_adaptive_with_filter() {
1912 let store = create_test_store();
1913 let planner = Planner::new(store);
1914
1915 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1917 items: vec![ReturnItem {
1918 expression: LogicalExpression::Variable("n".to_string()),
1919 alias: None,
1920 }],
1921 distinct: false,
1922 input: Box::new(LogicalOperator::Filter(FilterOp {
1923 predicate: LogicalExpression::Binary {
1924 left: Box::new(LogicalExpression::Property {
1925 variable: "n".to_string(),
1926 property: "age".to_string(),
1927 }),
1928 op: BinaryOp::Gt,
1929 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1930 },
1931 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1932 variable: "n".to_string(),
1933 label: None,
1934 input: None,
1935 })),
1936 })),
1937 }));
1938
1939 let physical = planner.plan_adaptive(&logical).unwrap();
1940 assert!(physical.adaptive_context.is_some());
1941 }
1942
1943 #[test]
1944 fn test_plan_adaptive_with_expand() {
1945 let store = create_test_store();
1946 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1947
1948 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1950 items: vec![
1951 ReturnItem {
1952 expression: LogicalExpression::Variable("a".to_string()),
1953 alias: None,
1954 },
1955 ReturnItem {
1956 expression: LogicalExpression::Variable("b".to_string()),
1957 alias: None,
1958 },
1959 ],
1960 distinct: false,
1961 input: Box::new(LogicalOperator::Expand(ExpandOp {
1962 from_variable: "a".to_string(),
1963 to_variable: "b".to_string(),
1964 edge_variable: None,
1965 direction: ExpandDirection::Outgoing,
1966 edge_type: Some("KNOWS".to_string()),
1967 min_hops: 1,
1968 max_hops: Some(1),
1969 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1970 variable: "a".to_string(),
1971 label: None,
1972 input: None,
1973 })),
1974 path_alias: None,
1975 })),
1976 }));
1977
1978 let physical = planner.plan_adaptive(&logical).unwrap();
1979 assert!(physical.adaptive_context.is_some());
1980 }
1981
1982 #[test]
1983 fn test_plan_adaptive_with_join() {
1984 let store = create_test_store();
1985 let planner = Planner::new(store);
1986
1987 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1988 items: vec![
1989 ReturnItem {
1990 expression: LogicalExpression::Variable("a".to_string()),
1991 alias: None,
1992 },
1993 ReturnItem {
1994 expression: LogicalExpression::Variable("b".to_string()),
1995 alias: None,
1996 },
1997 ],
1998 distinct: false,
1999 input: Box::new(LogicalOperator::Join(JoinOp {
2000 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2001 variable: "a".to_string(),
2002 label: None,
2003 input: None,
2004 })),
2005 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2006 variable: "b".to_string(),
2007 label: None,
2008 input: None,
2009 })),
2010 join_type: JoinType::Cross,
2011 conditions: vec![],
2012 })),
2013 }));
2014
2015 let physical = planner.plan_adaptive(&logical).unwrap();
2016 assert!(physical.adaptive_context.is_some());
2017 }
2018
2019 #[test]
2020 fn test_plan_adaptive_with_aggregate() {
2021 let store = create_test_store();
2022 let planner = Planner::new(store);
2023
2024 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2025 group_by: vec![],
2026 aggregates: vec![LogicalAggregateExpr {
2027 function: LogicalAggregateFunction::Count,
2028 expression: Some(LogicalExpression::Variable("n".to_string())),
2029 distinct: false,
2030 alias: Some("cnt".to_string()),
2031 percentile: None,
2032 }],
2033 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2034 variable: "n".to_string(),
2035 label: None,
2036 input: None,
2037 })),
2038 having: None,
2039 }));
2040
2041 let physical = planner.plan_adaptive(&logical).unwrap();
2042 assert!(physical.adaptive_context.is_some());
2043 }
2044
2045 #[test]
2046 fn test_plan_adaptive_with_distinct() {
2047 let store = create_test_store();
2048 let planner = Planner::new(store);
2049
2050 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2051 items: vec![ReturnItem {
2052 expression: LogicalExpression::Variable("n".to_string()),
2053 alias: None,
2054 }],
2055 distinct: false,
2056 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2057 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2058 variable: "n".to_string(),
2059 label: None,
2060 input: None,
2061 })),
2062 columns: None,
2063 })),
2064 }));
2065
2066 let physical = planner.plan_adaptive(&logical).unwrap();
2067 assert!(physical.adaptive_context.is_some());
2068 }
2069
2070 #[test]
2071 fn test_plan_adaptive_with_limit() {
2072 let store = create_test_store();
2073 let planner = Planner::new(store);
2074
2075 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2076 items: vec![ReturnItem {
2077 expression: LogicalExpression::Variable("n".to_string()),
2078 alias: None,
2079 }],
2080 distinct: false,
2081 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2082 count: 10,
2083 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2084 variable: "n".to_string(),
2085 label: None,
2086 input: None,
2087 })),
2088 })),
2089 }));
2090
2091 let physical = planner.plan_adaptive(&logical).unwrap();
2092 assert!(physical.adaptive_context.is_some());
2093 }
2094
2095 #[test]
2096 fn test_plan_adaptive_with_skip() {
2097 let store = create_test_store();
2098 let planner = Planner::new(store);
2099
2100 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2101 items: vec![ReturnItem {
2102 expression: LogicalExpression::Variable("n".to_string()),
2103 alias: None,
2104 }],
2105 distinct: false,
2106 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2107 count: 5,
2108 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2109 variable: "n".to_string(),
2110 label: None,
2111 input: None,
2112 })),
2113 })),
2114 }));
2115
2116 let physical = planner.plan_adaptive(&logical).unwrap();
2117 assert!(physical.adaptive_context.is_some());
2118 }
2119
2120 #[test]
2121 fn test_plan_adaptive_with_sort() {
2122 let store = create_test_store();
2123 let planner = Planner::new(store);
2124
2125 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2126 items: vec![ReturnItem {
2127 expression: LogicalExpression::Variable("n".to_string()),
2128 alias: None,
2129 }],
2130 distinct: false,
2131 input: Box::new(LogicalOperator::Sort(SortOp {
2132 keys: vec![SortKey {
2133 expression: LogicalExpression::Variable("n".to_string()),
2134 order: SortOrder::Ascending,
2135 }],
2136 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2137 variable: "n".to_string(),
2138 label: None,
2139 input: None,
2140 })),
2141 })),
2142 }));
2143
2144 let physical = planner.plan_adaptive(&logical).unwrap();
2145 assert!(physical.adaptive_context.is_some());
2146 }
2147
2148 #[test]
2149 fn test_plan_adaptive_with_union() {
2150 let store = create_test_store();
2151 let planner = Planner::new(store);
2152
2153 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2154 items: vec![ReturnItem {
2155 expression: LogicalExpression::Variable("n".to_string()),
2156 alias: None,
2157 }],
2158 distinct: false,
2159 input: Box::new(LogicalOperator::Union(UnionOp {
2160 inputs: vec![
2161 LogicalOperator::NodeScan(NodeScanOp {
2162 variable: "n".to_string(),
2163 label: Some("Person".to_string()),
2164 input: None,
2165 }),
2166 LogicalOperator::NodeScan(NodeScanOp {
2167 variable: "n".to_string(),
2168 label: Some("Company".to_string()),
2169 input: None,
2170 }),
2171 ],
2172 })),
2173 }));
2174
2175 let physical = planner.plan_adaptive(&logical).unwrap();
2176 assert!(physical.adaptive_context.is_some());
2177 }
2178
2179 #[test]
2182 fn test_plan_expand_variable_length() {
2183 let store = create_test_store();
2184 let planner = Planner::new(store);
2185
2186 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2188 items: vec![
2189 ReturnItem {
2190 expression: LogicalExpression::Variable("a".to_string()),
2191 alias: None,
2192 },
2193 ReturnItem {
2194 expression: LogicalExpression::Variable("b".to_string()),
2195 alias: None,
2196 },
2197 ],
2198 distinct: false,
2199 input: Box::new(LogicalOperator::Expand(ExpandOp {
2200 from_variable: "a".to_string(),
2201 to_variable: "b".to_string(),
2202 edge_variable: None,
2203 direction: ExpandDirection::Outgoing,
2204 edge_type: Some("KNOWS".to_string()),
2205 min_hops: 1,
2206 max_hops: Some(3),
2207 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2208 variable: "a".to_string(),
2209 label: None,
2210 input: None,
2211 })),
2212 path_alias: None,
2213 })),
2214 }));
2215
2216 let physical = planner.plan(&logical).unwrap();
2217 assert!(physical.columns().contains(&"a".to_string()));
2218 assert!(physical.columns().contains(&"b".to_string()));
2219 }
2220
2221 #[test]
2222 fn test_plan_expand_with_path_alias() {
2223 let store = create_test_store();
2224 let planner = Planner::new(store);
2225
2226 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2228 items: vec![
2229 ReturnItem {
2230 expression: LogicalExpression::Variable("a".to_string()),
2231 alias: None,
2232 },
2233 ReturnItem {
2234 expression: LogicalExpression::Variable("b".to_string()),
2235 alias: None,
2236 },
2237 ],
2238 distinct: false,
2239 input: Box::new(LogicalOperator::Expand(ExpandOp {
2240 from_variable: "a".to_string(),
2241 to_variable: "b".to_string(),
2242 edge_variable: None,
2243 direction: ExpandDirection::Outgoing,
2244 edge_type: Some("KNOWS".to_string()),
2245 min_hops: 1,
2246 max_hops: Some(3),
2247 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2248 variable: "a".to_string(),
2249 label: None,
2250 input: None,
2251 })),
2252 path_alias: Some("p".to_string()),
2253 })),
2254 }));
2255
2256 let physical = planner.plan(&logical).unwrap();
2257 assert!(physical.columns().contains(&"a".to_string()));
2259 assert!(physical.columns().contains(&"b".to_string()));
2260 }
2261
2262 #[test]
2263 fn test_plan_expand_incoming() {
2264 let store = create_test_store();
2265 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2266
2267 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2269 items: vec![
2270 ReturnItem {
2271 expression: LogicalExpression::Variable("a".to_string()),
2272 alias: None,
2273 },
2274 ReturnItem {
2275 expression: LogicalExpression::Variable("b".to_string()),
2276 alias: None,
2277 },
2278 ],
2279 distinct: false,
2280 input: Box::new(LogicalOperator::Expand(ExpandOp {
2281 from_variable: "a".to_string(),
2282 to_variable: "b".to_string(),
2283 edge_variable: None,
2284 direction: ExpandDirection::Incoming,
2285 edge_type: Some("KNOWS".to_string()),
2286 min_hops: 1,
2287 max_hops: Some(1),
2288 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2289 variable: "a".to_string(),
2290 label: None,
2291 input: None,
2292 })),
2293 path_alias: None,
2294 })),
2295 }));
2296
2297 let physical = planner.plan(&logical).unwrap();
2298 assert!(physical.columns().contains(&"a".to_string()));
2299 assert!(physical.columns().contains(&"b".to_string()));
2300 }
2301
2302 #[test]
2303 fn test_plan_expand_both_directions() {
2304 let store = create_test_store();
2305 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2306
2307 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2309 items: vec![
2310 ReturnItem {
2311 expression: LogicalExpression::Variable("a".to_string()),
2312 alias: None,
2313 },
2314 ReturnItem {
2315 expression: LogicalExpression::Variable("b".to_string()),
2316 alias: None,
2317 },
2318 ],
2319 distinct: false,
2320 input: Box::new(LogicalOperator::Expand(ExpandOp {
2321 from_variable: "a".to_string(),
2322 to_variable: "b".to_string(),
2323 edge_variable: None,
2324 direction: ExpandDirection::Both,
2325 edge_type: Some("KNOWS".to_string()),
2326 min_hops: 1,
2327 max_hops: Some(1),
2328 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2329 variable: "a".to_string(),
2330 label: None,
2331 input: None,
2332 })),
2333 path_alias: None,
2334 })),
2335 }));
2336
2337 let physical = planner.plan(&logical).unwrap();
2338 assert!(physical.columns().contains(&"a".to_string()));
2339 assert!(physical.columns().contains(&"b".to_string()));
2340 }
2341
2342 #[test]
2345 fn test_planner_with_context() {
2346 use crate::transaction::TransactionManager;
2347
2348 let store = create_test_store();
2349 let tx_manager = Arc::new(TransactionManager::new());
2350 let tx_id = tx_manager.begin();
2351 let epoch = tx_manager.current_epoch();
2352
2353 let planner = Planner::with_context(
2354 Arc::clone(&store),
2355 Arc::clone(&tx_manager),
2356 Some(tx_id),
2357 epoch,
2358 );
2359
2360 assert_eq!(planner.tx_id(), Some(tx_id));
2361 assert!(planner.tx_manager().is_some());
2362 assert_eq!(planner.viewing_epoch(), epoch);
2363 }
2364
2365 #[test]
2366 fn test_planner_with_factorized_execution_disabled() {
2367 let store = create_test_store();
2368 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2369
2370 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2372 items: vec![
2373 ReturnItem {
2374 expression: LogicalExpression::Variable("a".to_string()),
2375 alias: None,
2376 },
2377 ReturnItem {
2378 expression: LogicalExpression::Variable("c".to_string()),
2379 alias: None,
2380 },
2381 ],
2382 distinct: false,
2383 input: Box::new(LogicalOperator::Expand(ExpandOp {
2384 from_variable: "b".to_string(),
2385 to_variable: "c".to_string(),
2386 edge_variable: None,
2387 direction: ExpandDirection::Outgoing,
2388 edge_type: None,
2389 min_hops: 1,
2390 max_hops: Some(1),
2391 input: Box::new(LogicalOperator::Expand(ExpandOp {
2392 from_variable: "a".to_string(),
2393 to_variable: "b".to_string(),
2394 edge_variable: None,
2395 direction: ExpandDirection::Outgoing,
2396 edge_type: None,
2397 min_hops: 1,
2398 max_hops: Some(1),
2399 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2400 variable: "a".to_string(),
2401 label: None,
2402 input: None,
2403 })),
2404 path_alias: None,
2405 })),
2406 path_alias: None,
2407 })),
2408 }));
2409
2410 let physical = planner.plan(&logical).unwrap();
2411 assert!(physical.columns().contains(&"a".to_string()));
2412 assert!(physical.columns().contains(&"c".to_string()));
2413 }
2414
2415 #[test]
2418 fn test_plan_sort_by_property() {
2419 let store = create_test_store();
2420 let planner = Planner::new(store);
2421
2422 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2424 items: vec![ReturnItem {
2425 expression: LogicalExpression::Variable("n".to_string()),
2426 alias: None,
2427 }],
2428 distinct: false,
2429 input: Box::new(LogicalOperator::Sort(SortOp {
2430 keys: vec![SortKey {
2431 expression: LogicalExpression::Property {
2432 variable: "n".to_string(),
2433 property: "name".to_string(),
2434 },
2435 order: SortOrder::Ascending,
2436 }],
2437 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2438 variable: "n".to_string(),
2439 label: None,
2440 input: None,
2441 })),
2442 })),
2443 }));
2444
2445 let physical = planner.plan(&logical).unwrap();
2446 assert!(physical.columns().contains(&"n".to_string()));
2448 }
2449
2450 #[test]
2453 fn test_plan_scan_with_input() {
2454 let store = create_test_store();
2455 let planner = Planner::new(store);
2456
2457 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2459 items: vec![
2460 ReturnItem {
2461 expression: LogicalExpression::Variable("a".to_string()),
2462 alias: None,
2463 },
2464 ReturnItem {
2465 expression: LogicalExpression::Variable("b".to_string()),
2466 alias: None,
2467 },
2468 ],
2469 distinct: false,
2470 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2471 variable: "b".to_string(),
2472 label: Some("Company".to_string()),
2473 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2474 variable: "a".to_string(),
2475 label: Some("Person".to_string()),
2476 input: None,
2477 }))),
2478 })),
2479 }));
2480
2481 let physical = planner.plan(&logical).unwrap();
2482 assert!(physical.columns().contains(&"a".to_string()));
2483 assert!(physical.columns().contains(&"b".to_string()));
2484 }
2485}