1mod aggregate;
8mod expand;
9mod expression;
10mod filter;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16use crate::query::plan::{
17 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
18 CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
19 ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
20 LogicalOperator, LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp,
21 ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
22};
23use grafeo_common::types::{EpochId, TxId};
24use grafeo_common::types::{LogicalType, Value};
25use grafeo_common::utils::error::{Error, Result};
26use grafeo_core::execution::AdaptiveContext;
27use grafeo_core::execution::operators::{
28 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
29 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
30 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
31 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
32 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
33 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
34 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
35 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
36 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
37 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
38 UnwindOperator, VariableLengthExpandOperator,
39};
40use grafeo_core::graph::{Direction, lpg::LpgStore};
41use std::collections::HashMap;
42use std::sync::Arc;
43
44use crate::transaction::TransactionManager;
45
46struct RangeBounds<'a> {
48 min: Option<&'a Value>,
49 max: Option<&'a Value>,
50 min_inclusive: bool,
51 max_inclusive: bool,
52}
53
54pub struct Planner {
56 pub(super) store: Arc<LpgStore>,
58 pub(super) tx_manager: Option<Arc<TransactionManager>>,
60 pub(super) tx_id: Option<TxId>,
62 pub(super) viewing_epoch: EpochId,
64 pub(super) anon_edge_counter: std::cell::Cell<u32>,
66 pub(super) factorized_execution: bool,
68 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
71 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
74}
75
76impl Planner {
77 #[must_use]
82 pub fn new(store: Arc<LpgStore>) -> Self {
83 let epoch = store.current_epoch();
84 Self {
85 store,
86 tx_manager: None,
87 tx_id: None,
88 viewing_epoch: epoch,
89 anon_edge_counter: std::cell::Cell::new(0),
90 factorized_execution: true,
91 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
92 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
93 }
94 }
95
96 #[must_use]
105 pub fn with_context(
106 store: Arc<LpgStore>,
107 tx_manager: Arc<TransactionManager>,
108 tx_id: Option<TxId>,
109 viewing_epoch: EpochId,
110 ) -> Self {
111 Self {
112 store,
113 tx_manager: Some(tx_manager),
114 tx_id,
115 viewing_epoch,
116 anon_edge_counter: std::cell::Cell::new(0),
117 factorized_execution: true,
118 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
119 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
120 }
121 }
122
123 #[must_use]
125 pub fn viewing_epoch(&self) -> EpochId {
126 self.viewing_epoch
127 }
128
129 #[must_use]
131 pub fn tx_id(&self) -> Option<TxId> {
132 self.tx_id
133 }
134
135 #[must_use]
137 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
138 self.tx_manager.as_ref()
139 }
140
141 #[must_use]
143 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
144 self.factorized_execution = enabled;
145 self
146 }
147
148 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
152 match op {
153 LogicalOperator::Expand(expand) => {
154 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
156
157 if is_single_hop {
158 let (inner_count, base) = Self::count_expand_chain(&expand.input);
159 (inner_count + 1, base)
160 } else {
161 (0, op)
163 }
164 }
165 _ => (0, op),
166 }
167 }
168
169 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
173 let mut chain = Vec::new();
174 let mut current = op;
175
176 while let LogicalOperator::Expand(expand) = current {
177 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
179 if !is_single_hop {
180 break;
181 }
182 chain.push(expand);
183 current = &expand.input;
184 }
185
186 chain.reverse();
188 chain
189 }
190
191 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
197 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
198 Ok(PhysicalPlan {
199 operator,
200 columns,
201 adaptive_context: None,
202 })
203 }
204
205 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
214 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
215
216 let mut adaptive_context = AdaptiveContext::new();
218 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
219
220 Ok(PhysicalPlan {
221 operator,
222 columns,
223 adaptive_context: Some(adaptive_context),
224 })
225 }
226
227 fn collect_cardinality_estimates(
229 &self,
230 op: &LogicalOperator,
231 ctx: &mut AdaptiveContext,
232 depth: usize,
233 ) {
234 match op {
235 LogicalOperator::NodeScan(scan) => {
236 let estimate = if let Some(label) = &scan.label {
238 self.store.nodes_by_label(label).len() as f64
239 } else {
240 self.store.node_count() as f64
241 };
242 let id = format!("scan_{}", scan.variable);
243 ctx.set_estimate(&id, estimate);
244
245 if let Some(input) = &scan.input {
247 self.collect_cardinality_estimates(input, ctx, depth + 1);
248 }
249 }
250 LogicalOperator::Filter(filter) => {
251 let input_estimate = self.estimate_cardinality(&filter.input);
253 let estimate = input_estimate * 0.3;
254 let id = format!("filter_{depth}");
255 ctx.set_estimate(&id, estimate);
256
257 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
258 }
259 LogicalOperator::Expand(expand) => {
260 let input_estimate = self.estimate_cardinality(&expand.input);
262 let stats = self.store.statistics();
263 let avg_degree = self.estimate_expand_degree(&stats, expand);
264 let estimate = input_estimate * avg_degree;
265 let id = format!("expand_{}", expand.to_variable);
266 ctx.set_estimate(&id, estimate);
267
268 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
269 }
270 LogicalOperator::Join(join) => {
271 let left_est = self.estimate_cardinality(&join.left);
273 let right_est = self.estimate_cardinality(&join.right);
274 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
276 ctx.set_estimate(&id, estimate);
277
278 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
279 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
280 }
281 LogicalOperator::Aggregate(agg) => {
282 let input_estimate = self.estimate_cardinality(&agg.input);
284 let estimate = if agg.group_by.is_empty() {
285 1.0 } else {
287 (input_estimate * 0.1).max(1.0) };
289 let id = format!("aggregate_{depth}");
290 ctx.set_estimate(&id, estimate);
291
292 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
293 }
294 LogicalOperator::Distinct(distinct) => {
295 let input_estimate = self.estimate_cardinality(&distinct.input);
296 let estimate = (input_estimate * 0.5).max(1.0);
297 let id = format!("distinct_{depth}");
298 ctx.set_estimate(&id, estimate);
299
300 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
301 }
302 LogicalOperator::Return(ret) => {
303 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
304 }
305 LogicalOperator::Limit(limit) => {
306 let input_estimate = self.estimate_cardinality(&limit.input);
307 let estimate = (input_estimate).min(limit.count as f64);
308 let id = format!("limit_{depth}");
309 ctx.set_estimate(&id, estimate);
310
311 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
312 }
313 LogicalOperator::Skip(skip) => {
314 let input_estimate = self.estimate_cardinality(&skip.input);
315 let estimate = (input_estimate - skip.count as f64).max(0.0);
316 let id = format!("skip_{depth}");
317 ctx.set_estimate(&id, estimate);
318
319 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
320 }
321 LogicalOperator::Sort(sort) => {
322 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
324 }
325 LogicalOperator::Union(union) => {
326 let estimate: f64 = union
327 .inputs
328 .iter()
329 .map(|input| self.estimate_cardinality(input))
330 .sum();
331 let id = format!("union_{depth}");
332 ctx.set_estimate(&id, estimate);
333
334 for input in &union.inputs {
335 self.collect_cardinality_estimates(input, ctx, depth + 1);
336 }
337 }
338 _ => {
339 }
341 }
342 }
343
344 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
346 match op {
347 LogicalOperator::NodeScan(scan) => {
348 if let Some(label) = &scan.label {
349 self.store.nodes_by_label(label).len() as f64
350 } else {
351 self.store.node_count() as f64
352 }
353 }
354 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
355 LogicalOperator::Expand(expand) => {
356 let stats = self.store.statistics();
357 let avg_degree = self.estimate_expand_degree(&stats, expand);
358 self.estimate_cardinality(&expand.input) * avg_degree
359 }
360 LogicalOperator::Join(join) => {
361 let left = self.estimate_cardinality(&join.left);
362 let right = self.estimate_cardinality(&join.right);
363 (left * right).sqrt()
364 }
365 LogicalOperator::Aggregate(agg) => {
366 if agg.group_by.is_empty() {
367 1.0
368 } else {
369 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
370 }
371 }
372 LogicalOperator::Distinct(distinct) => {
373 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
374 }
375 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
376 LogicalOperator::Limit(limit) => self
377 .estimate_cardinality(&limit.input)
378 .min(limit.count as f64),
379 LogicalOperator::Skip(skip) => {
380 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
381 }
382 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
383 LogicalOperator::Union(union) => union
384 .inputs
385 .iter()
386 .map(|input| self.estimate_cardinality(input))
387 .sum(),
388 _ => 1000.0, }
390 }
391
392 fn estimate_expand_degree(
394 &self,
395 stats: &grafeo_core::statistics::Statistics,
396 expand: &ExpandOp,
397 ) -> f64 {
398 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
399 if let Some(edge_type) = &expand.edge_type {
400 stats.estimate_avg_degree(edge_type, outgoing)
401 } else if stats.total_nodes > 0 {
402 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
403 } else {
404 10.0 }
406 }
407
408 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
410 match op {
411 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
412 LogicalOperator::Expand(expand) => {
413 if self.factorized_execution {
415 let (chain_len, _base) = Self::count_expand_chain(op);
416 if chain_len >= 2 {
417 return self.plan_expand_chain(op);
419 }
420 }
421 self.plan_expand(expand)
422 }
423 LogicalOperator::Return(ret) => self.plan_return(ret),
424 LogicalOperator::Filter(filter) => self.plan_filter(filter),
425 LogicalOperator::Project(project) => self.plan_project(project),
426 LogicalOperator::Limit(limit) => self.plan_limit(limit),
427 LogicalOperator::Skip(skip) => self.plan_skip(skip),
428 LogicalOperator::Sort(sort) => self.plan_sort(sort),
429 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
430 LogicalOperator::Join(join) => self.plan_join(join),
431 LogicalOperator::Union(union) => self.plan_union(union),
432 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
433 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
434 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
435 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
436 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
437 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
438 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
439 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
440 LogicalOperator::Merge(merge) => self.plan_merge(merge),
441 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
442 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
443 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
444 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
445 #[cfg(feature = "algos")]
446 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
447 #[cfg(not(feature = "algos"))]
448 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
449 "CALL procedures require the 'algos' feature".to_string(),
450 )),
451 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
452 LogicalOperator::VectorScan(_) => Err(Error::Internal(
453 "VectorScan requires vector-index feature".to_string(),
454 )),
455 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
456 "VectorJoin requires vector-index feature".to_string(),
457 )),
458 _ => Err(Error::Internal(format!(
459 "Unsupported operator: {:?}",
460 std::mem::discriminant(op)
461 ))),
462 }
463 }
464}
465
466pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
468 match op {
469 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
470 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
471 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
472 BinaryOp::Le => Ok(BinaryFilterOp::Le),
473 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
474 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
475 BinaryOp::And => Ok(BinaryFilterOp::And),
476 BinaryOp::Or => Ok(BinaryFilterOp::Or),
477 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
478 BinaryOp::Add => Ok(BinaryFilterOp::Add),
479 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
480 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
481 BinaryOp::Div => Ok(BinaryFilterOp::Div),
482 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
483 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
484 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
485 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
486 BinaryOp::In => Ok(BinaryFilterOp::In),
487 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
488 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
489 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
490 "Binary operator {:?} not yet supported in filters",
491 op
492 ))),
493 }
494}
495
496pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
498 match op {
499 UnaryOp::Not => Ok(UnaryFilterOp::Not),
500 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
501 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
502 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
503 }
504}
505
506pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
508 match func {
509 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
510 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
511 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
512 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
513 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
514 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
515 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
516 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
517 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
518 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
519 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
520 }
521}
522
523pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
527 match expr {
528 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
529 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
530 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
531 variable: variable.clone(),
532 property: property.clone(),
533 }),
534 LogicalExpression::Binary { left, op, right } => {
535 let left_expr = convert_filter_expression(left)?;
536 let right_expr = convert_filter_expression(right)?;
537 let filter_op = convert_binary_op(*op)?;
538 Ok(FilterExpression::Binary {
539 left: Box::new(left_expr),
540 op: filter_op,
541 right: Box::new(right_expr),
542 })
543 }
544 LogicalExpression::Unary { op, operand } => {
545 let operand_expr = convert_filter_expression(operand)?;
546 let filter_op = convert_unary_op(*op)?;
547 Ok(FilterExpression::Unary {
548 op: filter_op,
549 operand: Box::new(operand_expr),
550 })
551 }
552 LogicalExpression::FunctionCall { name, args, .. } => {
553 let filter_args: Vec<FilterExpression> = args
554 .iter()
555 .map(convert_filter_expression)
556 .collect::<Result<Vec<_>>>()?;
557 Ok(FilterExpression::FunctionCall {
558 name: name.clone(),
559 args: filter_args,
560 })
561 }
562 LogicalExpression::Case {
563 operand,
564 when_clauses,
565 else_clause,
566 } => {
567 let filter_operand = operand
568 .as_ref()
569 .map(|e| convert_filter_expression(e))
570 .transpose()?
571 .map(Box::new);
572 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
573 .iter()
574 .map(|(cond, result)| {
575 Ok((
576 convert_filter_expression(cond)?,
577 convert_filter_expression(result)?,
578 ))
579 })
580 .collect::<Result<Vec<_>>>()?;
581 let filter_else = else_clause
582 .as_ref()
583 .map(|e| convert_filter_expression(e))
584 .transpose()?
585 .map(Box::new);
586 Ok(FilterExpression::Case {
587 operand: filter_operand,
588 when_clauses: filter_when_clauses,
589 else_clause: filter_else,
590 })
591 }
592 LogicalExpression::List(items) => {
593 let filter_items: Vec<FilterExpression> = items
594 .iter()
595 .map(convert_filter_expression)
596 .collect::<Result<Vec<_>>>()?;
597 Ok(FilterExpression::List(filter_items))
598 }
599 LogicalExpression::Map(pairs) => {
600 let filter_pairs: Vec<(String, FilterExpression)> = pairs
601 .iter()
602 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
603 .collect::<Result<Vec<_>>>()?;
604 Ok(FilterExpression::Map(filter_pairs))
605 }
606 LogicalExpression::IndexAccess { base, index } => {
607 let base_expr = convert_filter_expression(base)?;
608 let index_expr = convert_filter_expression(index)?;
609 Ok(FilterExpression::IndexAccess {
610 base: Box::new(base_expr),
611 index: Box::new(index_expr),
612 })
613 }
614 LogicalExpression::SliceAccess { base, start, end } => {
615 let base_expr = convert_filter_expression(base)?;
616 let start_expr = start
617 .as_ref()
618 .map(|s| convert_filter_expression(s))
619 .transpose()?
620 .map(Box::new);
621 let end_expr = end
622 .as_ref()
623 .map(|e| convert_filter_expression(e))
624 .transpose()?
625 .map(Box::new);
626 Ok(FilterExpression::SliceAccess {
627 base: Box::new(base_expr),
628 start: start_expr,
629 end: end_expr,
630 })
631 }
632 LogicalExpression::Parameter(_) => Err(Error::Internal(
633 "Parameters not yet supported in filters".to_string(),
634 )),
635 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
636 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
637 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
638 LogicalExpression::ListComprehension {
639 variable,
640 list_expr,
641 filter_expr,
642 map_expr,
643 } => {
644 let list = convert_filter_expression(list_expr)?;
645 let filter = filter_expr
646 .as_ref()
647 .map(|f| convert_filter_expression(f))
648 .transpose()?
649 .map(Box::new);
650 let map = convert_filter_expression(map_expr)?;
651 Ok(FilterExpression::ListComprehension {
652 variable: variable.clone(),
653 list_expr: Box::new(list),
654 filter_expr: filter,
655 map_expr: Box::new(map),
656 })
657 }
658 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
659 Error::Internal("Subqueries not yet supported in filters".to_string()),
660 ),
661 }
662}
663
664fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
666 use grafeo_common::types::Value;
667 match value {
668 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
670 Value::Int64(_) => LogicalType::Int64,
671 Value::Float64(_) => LogicalType::Float64,
672 Value::String(_) => LogicalType::String,
673 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
675 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
678 }
679}
680
681fn expression_to_string(expr: &LogicalExpression) -> String {
683 match expr {
684 LogicalExpression::Variable(name) => name.clone(),
685 LogicalExpression::Property { variable, property } => {
686 format!("{variable}.{property}")
687 }
688 LogicalExpression::Literal(value) => format!("{value:?}"),
689 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
690 _ => "expr".to_string(),
691 }
692}
693
694pub struct PhysicalPlan {
696 pub operator: Box<dyn Operator>,
698 pub columns: Vec<String>,
700 pub adaptive_context: Option<AdaptiveContext>,
706}
707
708impl PhysicalPlan {
709 #[must_use]
711 pub fn columns(&self) -> &[String] {
712 &self.columns
713 }
714
715 pub fn into_operator(self) -> Box<dyn Operator> {
717 self.operator
718 }
719
720 #[must_use]
722 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
723 self.adaptive_context.as_ref()
724 }
725
726 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
728 self.adaptive_context.take()
729 }
730}
731
732struct StaticResultOperator {
734 rows: Vec<Vec<Value>>,
735 column_indices: Vec<usize>,
736 row_index: usize,
737}
738
739impl Operator for StaticResultOperator {
740 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
741 use grafeo_core::execution::DataChunk;
742
743 if self.row_index >= self.rows.len() {
744 return Ok(None);
745 }
746
747 let remaining = self.rows.len() - self.row_index;
748 let chunk_rows = remaining.min(1024);
749 let col_count = self.column_indices.len();
750
751 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
752 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
753
754 for row_offset in 0..chunk_rows {
755 let row = &self.rows[self.row_index + row_offset];
756 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
757 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
758 if let Some(col) = chunk.column_mut(col_idx) {
759 col.push_value(value);
760 }
761 }
762 }
763 chunk.set_count(chunk_rows);
764
765 self.row_index += chunk_rows;
766 Ok(Some(chunk))
767 }
768
769 fn reset(&mut self) {
770 self.row_index = 0;
771 }
772
773 fn name(&self) -> &'static str {
774 "StaticResult"
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781 use crate::query::plan::{
782 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
783 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
784 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
785 SortKey, SortOp,
786 };
787 use grafeo_common::types::Value;
788
789 fn create_test_store() -> Arc<LpgStore> {
790 let store = Arc::new(LpgStore::new());
791 store.create_node(&["Person"]);
792 store.create_node(&["Person"]);
793 store.create_node(&["Company"]);
794 store
795 }
796
797 #[test]
800 fn test_plan_simple_scan() {
801 let store = create_test_store();
802 let planner = Planner::new(store);
803
804 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
806 items: vec![ReturnItem {
807 expression: LogicalExpression::Variable("n".to_string()),
808 alias: None,
809 }],
810 distinct: false,
811 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
812 variable: "n".to_string(),
813 label: Some("Person".to_string()),
814 input: None,
815 })),
816 }));
817
818 let physical = planner.plan(&logical).unwrap();
819 assert_eq!(physical.columns(), &["n"]);
820 }
821
822 #[test]
823 fn test_plan_scan_without_label() {
824 let store = create_test_store();
825 let planner = Planner::new(store);
826
827 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
829 items: vec![ReturnItem {
830 expression: LogicalExpression::Variable("n".to_string()),
831 alias: None,
832 }],
833 distinct: false,
834 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
835 variable: "n".to_string(),
836 label: None,
837 input: None,
838 })),
839 }));
840
841 let physical = planner.plan(&logical).unwrap();
842 assert_eq!(physical.columns(), &["n"]);
843 }
844
845 #[test]
846 fn test_plan_return_with_alias() {
847 let store = create_test_store();
848 let planner = Planner::new(store);
849
850 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
852 items: vec![ReturnItem {
853 expression: LogicalExpression::Variable("n".to_string()),
854 alias: Some("person".to_string()),
855 }],
856 distinct: false,
857 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
858 variable: "n".to_string(),
859 label: Some("Person".to_string()),
860 input: None,
861 })),
862 }));
863
864 let physical = planner.plan(&logical).unwrap();
865 assert_eq!(physical.columns(), &["person"]);
866 }
867
868 #[test]
869 fn test_plan_return_property() {
870 let store = create_test_store();
871 let planner = Planner::new(store);
872
873 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
875 items: vec![ReturnItem {
876 expression: LogicalExpression::Property {
877 variable: "n".to_string(),
878 property: "name".to_string(),
879 },
880 alias: None,
881 }],
882 distinct: false,
883 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
884 variable: "n".to_string(),
885 label: Some("Person".to_string()),
886 input: None,
887 })),
888 }));
889
890 let physical = planner.plan(&logical).unwrap();
891 assert_eq!(physical.columns(), &["n.name"]);
892 }
893
894 #[test]
895 fn test_plan_return_literal() {
896 let store = create_test_store();
897 let planner = Planner::new(store);
898
899 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
901 items: vec![ReturnItem {
902 expression: LogicalExpression::Literal(Value::Int64(42)),
903 alias: Some("answer".to_string()),
904 }],
905 distinct: false,
906 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
907 variable: "n".to_string(),
908 label: None,
909 input: None,
910 })),
911 }));
912
913 let physical = planner.plan(&logical).unwrap();
914 assert_eq!(physical.columns(), &["answer"]);
915 }
916
917 #[test]
920 fn test_plan_filter_equality() {
921 let store = create_test_store();
922 let planner = Planner::new(store);
923
924 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
926 items: vec![ReturnItem {
927 expression: LogicalExpression::Variable("n".to_string()),
928 alias: None,
929 }],
930 distinct: false,
931 input: Box::new(LogicalOperator::Filter(FilterOp {
932 predicate: LogicalExpression::Binary {
933 left: Box::new(LogicalExpression::Property {
934 variable: "n".to_string(),
935 property: "age".to_string(),
936 }),
937 op: BinaryOp::Eq,
938 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
939 },
940 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
941 variable: "n".to_string(),
942 label: Some("Person".to_string()),
943 input: None,
944 })),
945 })),
946 }));
947
948 let physical = planner.plan(&logical).unwrap();
949 assert_eq!(physical.columns(), &["n"]);
950 }
951
952 #[test]
953 fn test_plan_filter_compound_and() {
954 let store = create_test_store();
955 let planner = Planner::new(store);
956
957 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
959 items: vec![ReturnItem {
960 expression: LogicalExpression::Variable("n".to_string()),
961 alias: None,
962 }],
963 distinct: false,
964 input: Box::new(LogicalOperator::Filter(FilterOp {
965 predicate: LogicalExpression::Binary {
966 left: Box::new(LogicalExpression::Binary {
967 left: Box::new(LogicalExpression::Property {
968 variable: "n".to_string(),
969 property: "age".to_string(),
970 }),
971 op: BinaryOp::Gt,
972 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
973 }),
974 op: BinaryOp::And,
975 right: Box::new(LogicalExpression::Binary {
976 left: Box::new(LogicalExpression::Property {
977 variable: "n".to_string(),
978 property: "age".to_string(),
979 }),
980 op: BinaryOp::Lt,
981 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
982 }),
983 },
984 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
985 variable: "n".to_string(),
986 label: None,
987 input: None,
988 })),
989 })),
990 }));
991
992 let physical = planner.plan(&logical).unwrap();
993 assert_eq!(physical.columns(), &["n"]);
994 }
995
996 #[test]
997 fn test_plan_filter_unary_not() {
998 let store = create_test_store();
999 let planner = Planner::new(store);
1000
1001 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1003 items: vec![ReturnItem {
1004 expression: LogicalExpression::Variable("n".to_string()),
1005 alias: None,
1006 }],
1007 distinct: false,
1008 input: Box::new(LogicalOperator::Filter(FilterOp {
1009 predicate: LogicalExpression::Unary {
1010 op: UnaryOp::Not,
1011 operand: Box::new(LogicalExpression::Property {
1012 variable: "n".to_string(),
1013 property: "active".to_string(),
1014 }),
1015 },
1016 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1017 variable: "n".to_string(),
1018 label: None,
1019 input: None,
1020 })),
1021 })),
1022 }));
1023
1024 let physical = planner.plan(&logical).unwrap();
1025 assert_eq!(physical.columns(), &["n"]);
1026 }
1027
1028 #[test]
1029 fn test_plan_filter_is_null() {
1030 let store = create_test_store();
1031 let planner = Planner::new(store);
1032
1033 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1035 items: vec![ReturnItem {
1036 expression: LogicalExpression::Variable("n".to_string()),
1037 alias: None,
1038 }],
1039 distinct: false,
1040 input: Box::new(LogicalOperator::Filter(FilterOp {
1041 predicate: LogicalExpression::Unary {
1042 op: UnaryOp::IsNull,
1043 operand: Box::new(LogicalExpression::Property {
1044 variable: "n".to_string(),
1045 property: "email".to_string(),
1046 }),
1047 },
1048 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1049 variable: "n".to_string(),
1050 label: None,
1051 input: None,
1052 })),
1053 })),
1054 }));
1055
1056 let physical = planner.plan(&logical).unwrap();
1057 assert_eq!(physical.columns(), &["n"]);
1058 }
1059
1060 #[test]
1061 fn test_plan_filter_function_call() {
1062 let store = create_test_store();
1063 let planner = Planner::new(store);
1064
1065 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1067 items: vec![ReturnItem {
1068 expression: LogicalExpression::Variable("n".to_string()),
1069 alias: None,
1070 }],
1071 distinct: false,
1072 input: Box::new(LogicalOperator::Filter(FilterOp {
1073 predicate: LogicalExpression::Binary {
1074 left: Box::new(LogicalExpression::FunctionCall {
1075 name: "size".to_string(),
1076 args: vec![LogicalExpression::Property {
1077 variable: "n".to_string(),
1078 property: "friends".to_string(),
1079 }],
1080 distinct: false,
1081 }),
1082 op: BinaryOp::Gt,
1083 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1084 },
1085 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1086 variable: "n".to_string(),
1087 label: None,
1088 input: None,
1089 })),
1090 })),
1091 }));
1092
1093 let physical = planner.plan(&logical).unwrap();
1094 assert_eq!(physical.columns(), &["n"]);
1095 }
1096
1097 #[test]
1100 fn test_plan_expand_outgoing() {
1101 let store = create_test_store();
1102 let planner = Planner::new(store);
1103
1104 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1106 items: vec![
1107 ReturnItem {
1108 expression: LogicalExpression::Variable("a".to_string()),
1109 alias: None,
1110 },
1111 ReturnItem {
1112 expression: LogicalExpression::Variable("b".to_string()),
1113 alias: None,
1114 },
1115 ],
1116 distinct: false,
1117 input: Box::new(LogicalOperator::Expand(ExpandOp {
1118 from_variable: "a".to_string(),
1119 to_variable: "b".to_string(),
1120 edge_variable: None,
1121 direction: ExpandDirection::Outgoing,
1122 edge_type: Some("KNOWS".to_string()),
1123 min_hops: 1,
1124 max_hops: Some(1),
1125 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1126 variable: "a".to_string(),
1127 label: Some("Person".to_string()),
1128 input: None,
1129 })),
1130 path_alias: None,
1131 })),
1132 }));
1133
1134 let physical = planner.plan(&logical).unwrap();
1135 assert!(physical.columns().contains(&"a".to_string()));
1137 assert!(physical.columns().contains(&"b".to_string()));
1138 }
1139
1140 #[test]
1141 fn test_plan_expand_with_edge_variable() {
1142 let store = create_test_store();
1143 let planner = Planner::new(store);
1144
1145 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1147 items: vec![
1148 ReturnItem {
1149 expression: LogicalExpression::Variable("a".to_string()),
1150 alias: None,
1151 },
1152 ReturnItem {
1153 expression: LogicalExpression::Variable("r".to_string()),
1154 alias: None,
1155 },
1156 ReturnItem {
1157 expression: LogicalExpression::Variable("b".to_string()),
1158 alias: None,
1159 },
1160 ],
1161 distinct: false,
1162 input: Box::new(LogicalOperator::Expand(ExpandOp {
1163 from_variable: "a".to_string(),
1164 to_variable: "b".to_string(),
1165 edge_variable: Some("r".to_string()),
1166 direction: ExpandDirection::Outgoing,
1167 edge_type: Some("KNOWS".to_string()),
1168 min_hops: 1,
1169 max_hops: Some(1),
1170 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1171 variable: "a".to_string(),
1172 label: None,
1173 input: None,
1174 })),
1175 path_alias: None,
1176 })),
1177 }));
1178
1179 let physical = planner.plan(&logical).unwrap();
1180 assert!(physical.columns().contains(&"a".to_string()));
1181 assert!(physical.columns().contains(&"r".to_string()));
1182 assert!(physical.columns().contains(&"b".to_string()));
1183 }
1184
1185 #[test]
1188 fn test_plan_limit() {
1189 let store = create_test_store();
1190 let planner = Planner::new(store);
1191
1192 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1194 items: vec![ReturnItem {
1195 expression: LogicalExpression::Variable("n".to_string()),
1196 alias: None,
1197 }],
1198 distinct: false,
1199 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1200 count: 10,
1201 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1202 variable: "n".to_string(),
1203 label: None,
1204 input: None,
1205 })),
1206 })),
1207 }));
1208
1209 let physical = planner.plan(&logical).unwrap();
1210 assert_eq!(physical.columns(), &["n"]);
1211 }
1212
1213 #[test]
1214 fn test_plan_skip() {
1215 let store = create_test_store();
1216 let planner = Planner::new(store);
1217
1218 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1220 items: vec![ReturnItem {
1221 expression: LogicalExpression::Variable("n".to_string()),
1222 alias: None,
1223 }],
1224 distinct: false,
1225 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1226 count: 5,
1227 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1228 variable: "n".to_string(),
1229 label: None,
1230 input: None,
1231 })),
1232 })),
1233 }));
1234
1235 let physical = planner.plan(&logical).unwrap();
1236 assert_eq!(physical.columns(), &["n"]);
1237 }
1238
1239 #[test]
1240 fn test_plan_sort() {
1241 let store = create_test_store();
1242 let planner = Planner::new(store);
1243
1244 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1246 items: vec![ReturnItem {
1247 expression: LogicalExpression::Variable("n".to_string()),
1248 alias: None,
1249 }],
1250 distinct: false,
1251 input: Box::new(LogicalOperator::Sort(SortOp {
1252 keys: vec![SortKey {
1253 expression: LogicalExpression::Variable("n".to_string()),
1254 order: SortOrder::Ascending,
1255 }],
1256 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1257 variable: "n".to_string(),
1258 label: None,
1259 input: None,
1260 })),
1261 })),
1262 }));
1263
1264 let physical = planner.plan(&logical).unwrap();
1265 assert_eq!(physical.columns(), &["n"]);
1266 }
1267
1268 #[test]
1269 fn test_plan_sort_descending() {
1270 let store = create_test_store();
1271 let planner = Planner::new(store);
1272
1273 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1275 items: vec![ReturnItem {
1276 expression: LogicalExpression::Variable("n".to_string()),
1277 alias: None,
1278 }],
1279 distinct: false,
1280 input: Box::new(LogicalOperator::Sort(SortOp {
1281 keys: vec![SortKey {
1282 expression: LogicalExpression::Variable("n".to_string()),
1283 order: SortOrder::Descending,
1284 }],
1285 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1286 variable: "n".to_string(),
1287 label: None,
1288 input: None,
1289 })),
1290 })),
1291 }));
1292
1293 let physical = planner.plan(&logical).unwrap();
1294 assert_eq!(physical.columns(), &["n"]);
1295 }
1296
1297 #[test]
1298 fn test_plan_distinct() {
1299 let store = create_test_store();
1300 let planner = Planner::new(store);
1301
1302 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1304 items: vec![ReturnItem {
1305 expression: LogicalExpression::Variable("n".to_string()),
1306 alias: None,
1307 }],
1308 distinct: false,
1309 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1310 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1311 variable: "n".to_string(),
1312 label: None,
1313 input: None,
1314 })),
1315 columns: None,
1316 })),
1317 }));
1318
1319 let physical = planner.plan(&logical).unwrap();
1320 assert_eq!(physical.columns(), &["n"]);
1321 }
1322
1323 #[test]
1326 fn test_plan_aggregate_count() {
1327 let store = create_test_store();
1328 let planner = Planner::new(store);
1329
1330 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1332 items: vec![ReturnItem {
1333 expression: LogicalExpression::Variable("cnt".to_string()),
1334 alias: None,
1335 }],
1336 distinct: false,
1337 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1338 group_by: vec![],
1339 aggregates: vec![LogicalAggregateExpr {
1340 function: LogicalAggregateFunction::Count,
1341 expression: Some(LogicalExpression::Variable("n".to_string())),
1342 distinct: false,
1343 alias: Some("cnt".to_string()),
1344 percentile: None,
1345 }],
1346 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1347 variable: "n".to_string(),
1348 label: None,
1349 input: None,
1350 })),
1351 having: None,
1352 })),
1353 }));
1354
1355 let physical = planner.plan(&logical).unwrap();
1356 assert!(physical.columns().contains(&"cnt".to_string()));
1357 }
1358
1359 #[test]
1360 fn test_plan_aggregate_with_group_by() {
1361 let store = create_test_store();
1362 let planner = Planner::new(store);
1363
1364 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1366 group_by: vec![LogicalExpression::Property {
1367 variable: "n".to_string(),
1368 property: "city".to_string(),
1369 }],
1370 aggregates: vec![LogicalAggregateExpr {
1371 function: LogicalAggregateFunction::Count,
1372 expression: Some(LogicalExpression::Variable("n".to_string())),
1373 distinct: false,
1374 alias: Some("cnt".to_string()),
1375 percentile: None,
1376 }],
1377 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1378 variable: "n".to_string(),
1379 label: Some("Person".to_string()),
1380 input: None,
1381 })),
1382 having: None,
1383 }));
1384
1385 let physical = planner.plan(&logical).unwrap();
1386 assert_eq!(physical.columns().len(), 2);
1387 }
1388
1389 #[test]
1390 fn test_plan_aggregate_sum() {
1391 let store = create_test_store();
1392 let planner = Planner::new(store);
1393
1394 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1396 group_by: vec![],
1397 aggregates: vec![LogicalAggregateExpr {
1398 function: LogicalAggregateFunction::Sum,
1399 expression: Some(LogicalExpression::Property {
1400 variable: "n".to_string(),
1401 property: "value".to_string(),
1402 }),
1403 distinct: false,
1404 alias: Some("total".to_string()),
1405 percentile: None,
1406 }],
1407 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1408 variable: "n".to_string(),
1409 label: None,
1410 input: None,
1411 })),
1412 having: None,
1413 }));
1414
1415 let physical = planner.plan(&logical).unwrap();
1416 assert!(physical.columns().contains(&"total".to_string()));
1417 }
1418
1419 #[test]
1420 fn test_plan_aggregate_avg() {
1421 let store = create_test_store();
1422 let planner = Planner::new(store);
1423
1424 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1426 group_by: vec![],
1427 aggregates: vec![LogicalAggregateExpr {
1428 function: LogicalAggregateFunction::Avg,
1429 expression: Some(LogicalExpression::Property {
1430 variable: "n".to_string(),
1431 property: "score".to_string(),
1432 }),
1433 distinct: false,
1434 alias: Some("average".to_string()),
1435 percentile: None,
1436 }],
1437 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1438 variable: "n".to_string(),
1439 label: None,
1440 input: None,
1441 })),
1442 having: None,
1443 }));
1444
1445 let physical = planner.plan(&logical).unwrap();
1446 assert!(physical.columns().contains(&"average".to_string()));
1447 }
1448
1449 #[test]
1450 fn test_plan_aggregate_min_max() {
1451 let store = create_test_store();
1452 let planner = Planner::new(store);
1453
1454 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1456 group_by: vec![],
1457 aggregates: vec![
1458 LogicalAggregateExpr {
1459 function: LogicalAggregateFunction::Min,
1460 expression: Some(LogicalExpression::Property {
1461 variable: "n".to_string(),
1462 property: "age".to_string(),
1463 }),
1464 distinct: false,
1465 alias: Some("youngest".to_string()),
1466 percentile: None,
1467 },
1468 LogicalAggregateExpr {
1469 function: LogicalAggregateFunction::Max,
1470 expression: Some(LogicalExpression::Property {
1471 variable: "n".to_string(),
1472 property: "age".to_string(),
1473 }),
1474 distinct: false,
1475 alias: Some("oldest".to_string()),
1476 percentile: None,
1477 },
1478 ],
1479 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1480 variable: "n".to_string(),
1481 label: None,
1482 input: None,
1483 })),
1484 having: None,
1485 }));
1486
1487 let physical = planner.plan(&logical).unwrap();
1488 assert!(physical.columns().contains(&"youngest".to_string()));
1489 assert!(physical.columns().contains(&"oldest".to_string()));
1490 }
1491
1492 #[test]
1495 fn test_plan_inner_join() {
1496 let store = create_test_store();
1497 let planner = Planner::new(store);
1498
1499 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1501 items: vec![
1502 ReturnItem {
1503 expression: LogicalExpression::Variable("a".to_string()),
1504 alias: None,
1505 },
1506 ReturnItem {
1507 expression: LogicalExpression::Variable("b".to_string()),
1508 alias: None,
1509 },
1510 ],
1511 distinct: false,
1512 input: Box::new(LogicalOperator::Join(JoinOp {
1513 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1514 variable: "a".to_string(),
1515 label: Some("Person".to_string()),
1516 input: None,
1517 })),
1518 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1519 variable: "b".to_string(),
1520 label: Some("Company".to_string()),
1521 input: None,
1522 })),
1523 join_type: JoinType::Inner,
1524 conditions: vec![JoinCondition {
1525 left: LogicalExpression::Variable("a".to_string()),
1526 right: LogicalExpression::Variable("b".to_string()),
1527 }],
1528 })),
1529 }));
1530
1531 let physical = planner.plan(&logical).unwrap();
1532 assert!(physical.columns().contains(&"a".to_string()));
1533 assert!(physical.columns().contains(&"b".to_string()));
1534 }
1535
1536 #[test]
1537 fn test_plan_cross_join() {
1538 let store = create_test_store();
1539 let planner = Planner::new(store);
1540
1541 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1543 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1544 variable: "a".to_string(),
1545 label: None,
1546 input: None,
1547 })),
1548 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1549 variable: "b".to_string(),
1550 label: None,
1551 input: None,
1552 })),
1553 join_type: JoinType::Cross,
1554 conditions: vec![],
1555 }));
1556
1557 let physical = planner.plan(&logical).unwrap();
1558 assert_eq!(physical.columns().len(), 2);
1559 }
1560
1561 #[test]
1562 fn test_plan_left_join() {
1563 let store = create_test_store();
1564 let planner = Planner::new(store);
1565
1566 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1567 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1568 variable: "a".to_string(),
1569 label: None,
1570 input: None,
1571 })),
1572 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1573 variable: "b".to_string(),
1574 label: None,
1575 input: None,
1576 })),
1577 join_type: JoinType::Left,
1578 conditions: vec![],
1579 }));
1580
1581 let physical = planner.plan(&logical).unwrap();
1582 assert_eq!(physical.columns().len(), 2);
1583 }
1584
1585 #[test]
1588 fn test_plan_create_node() {
1589 let store = create_test_store();
1590 let planner = Planner::new(store);
1591
1592 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1594 variable: "n".to_string(),
1595 labels: vec!["Person".to_string()],
1596 properties: vec![(
1597 "name".to_string(),
1598 LogicalExpression::Literal(Value::String("Alice".into())),
1599 )],
1600 input: None,
1601 }));
1602
1603 let physical = planner.plan(&logical).unwrap();
1604 assert!(physical.columns().contains(&"n".to_string()));
1605 }
1606
1607 #[test]
1608 fn test_plan_create_edge() {
1609 let store = create_test_store();
1610 let planner = Planner::new(store);
1611
1612 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1614 variable: Some("r".to_string()),
1615 from_variable: "a".to_string(),
1616 to_variable: "b".to_string(),
1617 edge_type: "KNOWS".to_string(),
1618 properties: vec![],
1619 input: Box::new(LogicalOperator::Join(JoinOp {
1620 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1621 variable: "a".to_string(),
1622 label: None,
1623 input: None,
1624 })),
1625 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1626 variable: "b".to_string(),
1627 label: None,
1628 input: None,
1629 })),
1630 join_type: JoinType::Cross,
1631 conditions: vec![],
1632 })),
1633 }));
1634
1635 let physical = planner.plan(&logical).unwrap();
1636 assert!(physical.columns().contains(&"r".to_string()));
1637 }
1638
1639 #[test]
1640 fn test_plan_delete_node() {
1641 let store = create_test_store();
1642 let planner = Planner::new(store);
1643
1644 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1646 variable: "n".to_string(),
1647 detach: false,
1648 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1649 variable: "n".to_string(),
1650 label: None,
1651 input: None,
1652 })),
1653 }));
1654
1655 let physical = planner.plan(&logical).unwrap();
1656 assert!(physical.columns().contains(&"deleted_count".to_string()));
1657 }
1658
1659 #[test]
1662 fn test_plan_empty_errors() {
1663 let store = create_test_store();
1664 let planner = Planner::new(store);
1665
1666 let logical = LogicalPlan::new(LogicalOperator::Empty);
1667 let result = planner.plan(&logical);
1668 assert!(result.is_err());
1669 }
1670
1671 #[test]
1672 fn test_plan_missing_variable_in_return() {
1673 let store = create_test_store();
1674 let planner = Planner::new(store);
1675
1676 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1678 items: vec![ReturnItem {
1679 expression: LogicalExpression::Variable("missing".to_string()),
1680 alias: None,
1681 }],
1682 distinct: false,
1683 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1684 variable: "n".to_string(),
1685 label: None,
1686 input: None,
1687 })),
1688 }));
1689
1690 let result = planner.plan(&logical);
1691 assert!(result.is_err());
1692 }
1693
1694 #[test]
1697 fn test_convert_binary_ops() {
1698 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1699 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1700 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1701 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1702 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1703 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1704 assert!(convert_binary_op(BinaryOp::And).is_ok());
1705 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1706 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1707 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1708 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1709 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1710 }
1711
1712 #[test]
1713 fn test_convert_unary_ops() {
1714 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1715 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1716 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1717 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1718 }
1719
1720 #[test]
1721 fn test_convert_aggregate_functions() {
1722 assert!(matches!(
1723 convert_aggregate_function(LogicalAggregateFunction::Count),
1724 PhysicalAggregateFunction::Count
1725 ));
1726 assert!(matches!(
1727 convert_aggregate_function(LogicalAggregateFunction::Sum),
1728 PhysicalAggregateFunction::Sum
1729 ));
1730 assert!(matches!(
1731 convert_aggregate_function(LogicalAggregateFunction::Avg),
1732 PhysicalAggregateFunction::Avg
1733 ));
1734 assert!(matches!(
1735 convert_aggregate_function(LogicalAggregateFunction::Min),
1736 PhysicalAggregateFunction::Min
1737 ));
1738 assert!(matches!(
1739 convert_aggregate_function(LogicalAggregateFunction::Max),
1740 PhysicalAggregateFunction::Max
1741 ));
1742 }
1743
1744 #[test]
1745 fn test_planner_accessors() {
1746 let store = create_test_store();
1747 let planner = Planner::new(Arc::clone(&store));
1748
1749 assert!(planner.tx_id().is_none());
1750 assert!(planner.tx_manager().is_none());
1751 let _ = planner.viewing_epoch(); }
1753
1754 #[test]
1755 fn test_physical_plan_accessors() {
1756 let store = create_test_store();
1757 let planner = Planner::new(store);
1758
1759 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1760 variable: "n".to_string(),
1761 label: None,
1762 input: None,
1763 }));
1764
1765 let physical = planner.plan(&logical).unwrap();
1766 assert_eq!(physical.columns(), &["n"]);
1767
1768 let _ = physical.into_operator();
1770 }
1771
1772 #[test]
1775 fn test_plan_adaptive_with_scan() {
1776 let store = create_test_store();
1777 let planner = Planner::new(store);
1778
1779 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1781 items: vec![ReturnItem {
1782 expression: LogicalExpression::Variable("n".to_string()),
1783 alias: None,
1784 }],
1785 distinct: false,
1786 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1787 variable: "n".to_string(),
1788 label: Some("Person".to_string()),
1789 input: None,
1790 })),
1791 }));
1792
1793 let physical = planner.plan_adaptive(&logical).unwrap();
1794 assert_eq!(physical.columns(), &["n"]);
1795 assert!(physical.adaptive_context.is_some());
1797 }
1798
1799 #[test]
1800 fn test_plan_adaptive_with_filter() {
1801 let store = create_test_store();
1802 let planner = Planner::new(store);
1803
1804 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1806 items: vec![ReturnItem {
1807 expression: LogicalExpression::Variable("n".to_string()),
1808 alias: None,
1809 }],
1810 distinct: false,
1811 input: Box::new(LogicalOperator::Filter(FilterOp {
1812 predicate: LogicalExpression::Binary {
1813 left: Box::new(LogicalExpression::Property {
1814 variable: "n".to_string(),
1815 property: "age".to_string(),
1816 }),
1817 op: BinaryOp::Gt,
1818 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1819 },
1820 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1821 variable: "n".to_string(),
1822 label: None,
1823 input: None,
1824 })),
1825 })),
1826 }));
1827
1828 let physical = planner.plan_adaptive(&logical).unwrap();
1829 assert!(physical.adaptive_context.is_some());
1830 }
1831
1832 #[test]
1833 fn test_plan_adaptive_with_expand() {
1834 let store = create_test_store();
1835 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1836
1837 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1839 items: vec![
1840 ReturnItem {
1841 expression: LogicalExpression::Variable("a".to_string()),
1842 alias: None,
1843 },
1844 ReturnItem {
1845 expression: LogicalExpression::Variable("b".to_string()),
1846 alias: None,
1847 },
1848 ],
1849 distinct: false,
1850 input: Box::new(LogicalOperator::Expand(ExpandOp {
1851 from_variable: "a".to_string(),
1852 to_variable: "b".to_string(),
1853 edge_variable: None,
1854 direction: ExpandDirection::Outgoing,
1855 edge_type: Some("KNOWS".to_string()),
1856 min_hops: 1,
1857 max_hops: Some(1),
1858 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1859 variable: "a".to_string(),
1860 label: None,
1861 input: None,
1862 })),
1863 path_alias: None,
1864 })),
1865 }));
1866
1867 let physical = planner.plan_adaptive(&logical).unwrap();
1868 assert!(physical.adaptive_context.is_some());
1869 }
1870
1871 #[test]
1872 fn test_plan_adaptive_with_join() {
1873 let store = create_test_store();
1874 let planner = Planner::new(store);
1875
1876 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1877 items: vec![
1878 ReturnItem {
1879 expression: LogicalExpression::Variable("a".to_string()),
1880 alias: None,
1881 },
1882 ReturnItem {
1883 expression: LogicalExpression::Variable("b".to_string()),
1884 alias: None,
1885 },
1886 ],
1887 distinct: false,
1888 input: Box::new(LogicalOperator::Join(JoinOp {
1889 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1890 variable: "a".to_string(),
1891 label: None,
1892 input: None,
1893 })),
1894 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1895 variable: "b".to_string(),
1896 label: None,
1897 input: None,
1898 })),
1899 join_type: JoinType::Cross,
1900 conditions: vec![],
1901 })),
1902 }));
1903
1904 let physical = planner.plan_adaptive(&logical).unwrap();
1905 assert!(physical.adaptive_context.is_some());
1906 }
1907
1908 #[test]
1909 fn test_plan_adaptive_with_aggregate() {
1910 let store = create_test_store();
1911 let planner = Planner::new(store);
1912
1913 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1914 group_by: vec![],
1915 aggregates: vec![LogicalAggregateExpr {
1916 function: LogicalAggregateFunction::Count,
1917 expression: Some(LogicalExpression::Variable("n".to_string())),
1918 distinct: false,
1919 alias: Some("cnt".to_string()),
1920 percentile: None,
1921 }],
1922 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1923 variable: "n".to_string(),
1924 label: None,
1925 input: None,
1926 })),
1927 having: None,
1928 }));
1929
1930 let physical = planner.plan_adaptive(&logical).unwrap();
1931 assert!(physical.adaptive_context.is_some());
1932 }
1933
1934 #[test]
1935 fn test_plan_adaptive_with_distinct() {
1936 let store = create_test_store();
1937 let planner = Planner::new(store);
1938
1939 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1940 items: vec![ReturnItem {
1941 expression: LogicalExpression::Variable("n".to_string()),
1942 alias: None,
1943 }],
1944 distinct: false,
1945 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1946 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1947 variable: "n".to_string(),
1948 label: None,
1949 input: None,
1950 })),
1951 columns: None,
1952 })),
1953 }));
1954
1955 let physical = planner.plan_adaptive(&logical).unwrap();
1956 assert!(physical.adaptive_context.is_some());
1957 }
1958
1959 #[test]
1960 fn test_plan_adaptive_with_limit() {
1961 let store = create_test_store();
1962 let planner = Planner::new(store);
1963
1964 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1965 items: vec![ReturnItem {
1966 expression: LogicalExpression::Variable("n".to_string()),
1967 alias: None,
1968 }],
1969 distinct: false,
1970 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1971 count: 10,
1972 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1973 variable: "n".to_string(),
1974 label: None,
1975 input: None,
1976 })),
1977 })),
1978 }));
1979
1980 let physical = planner.plan_adaptive(&logical).unwrap();
1981 assert!(physical.adaptive_context.is_some());
1982 }
1983
1984 #[test]
1985 fn test_plan_adaptive_with_skip() {
1986 let store = create_test_store();
1987 let planner = Planner::new(store);
1988
1989 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1990 items: vec![ReturnItem {
1991 expression: LogicalExpression::Variable("n".to_string()),
1992 alias: None,
1993 }],
1994 distinct: false,
1995 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1996 count: 5,
1997 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1998 variable: "n".to_string(),
1999 label: None,
2000 input: None,
2001 })),
2002 })),
2003 }));
2004
2005 let physical = planner.plan_adaptive(&logical).unwrap();
2006 assert!(physical.adaptive_context.is_some());
2007 }
2008
2009 #[test]
2010 fn test_plan_adaptive_with_sort() {
2011 let store = create_test_store();
2012 let planner = Planner::new(store);
2013
2014 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2015 items: vec![ReturnItem {
2016 expression: LogicalExpression::Variable("n".to_string()),
2017 alias: None,
2018 }],
2019 distinct: false,
2020 input: Box::new(LogicalOperator::Sort(SortOp {
2021 keys: vec![SortKey {
2022 expression: LogicalExpression::Variable("n".to_string()),
2023 order: SortOrder::Ascending,
2024 }],
2025 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2026 variable: "n".to_string(),
2027 label: None,
2028 input: None,
2029 })),
2030 })),
2031 }));
2032
2033 let physical = planner.plan_adaptive(&logical).unwrap();
2034 assert!(physical.adaptive_context.is_some());
2035 }
2036
2037 #[test]
2038 fn test_plan_adaptive_with_union() {
2039 let store = create_test_store();
2040 let planner = Planner::new(store);
2041
2042 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2043 items: vec![ReturnItem {
2044 expression: LogicalExpression::Variable("n".to_string()),
2045 alias: None,
2046 }],
2047 distinct: false,
2048 input: Box::new(LogicalOperator::Union(UnionOp {
2049 inputs: vec![
2050 LogicalOperator::NodeScan(NodeScanOp {
2051 variable: "n".to_string(),
2052 label: Some("Person".to_string()),
2053 input: None,
2054 }),
2055 LogicalOperator::NodeScan(NodeScanOp {
2056 variable: "n".to_string(),
2057 label: Some("Company".to_string()),
2058 input: None,
2059 }),
2060 ],
2061 })),
2062 }));
2063
2064 let physical = planner.plan_adaptive(&logical).unwrap();
2065 assert!(physical.adaptive_context.is_some());
2066 }
2067
2068 #[test]
2071 fn test_plan_expand_variable_length() {
2072 let store = create_test_store();
2073 let planner = Planner::new(store);
2074
2075 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2077 items: vec![
2078 ReturnItem {
2079 expression: LogicalExpression::Variable("a".to_string()),
2080 alias: None,
2081 },
2082 ReturnItem {
2083 expression: LogicalExpression::Variable("b".to_string()),
2084 alias: None,
2085 },
2086 ],
2087 distinct: false,
2088 input: Box::new(LogicalOperator::Expand(ExpandOp {
2089 from_variable: "a".to_string(),
2090 to_variable: "b".to_string(),
2091 edge_variable: None,
2092 direction: ExpandDirection::Outgoing,
2093 edge_type: Some("KNOWS".to_string()),
2094 min_hops: 1,
2095 max_hops: Some(3),
2096 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2097 variable: "a".to_string(),
2098 label: None,
2099 input: None,
2100 })),
2101 path_alias: None,
2102 })),
2103 }));
2104
2105 let physical = planner.plan(&logical).unwrap();
2106 assert!(physical.columns().contains(&"a".to_string()));
2107 assert!(physical.columns().contains(&"b".to_string()));
2108 }
2109
2110 #[test]
2111 fn test_plan_expand_with_path_alias() {
2112 let store = create_test_store();
2113 let planner = Planner::new(store);
2114
2115 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2117 items: vec![
2118 ReturnItem {
2119 expression: LogicalExpression::Variable("a".to_string()),
2120 alias: None,
2121 },
2122 ReturnItem {
2123 expression: LogicalExpression::Variable("b".to_string()),
2124 alias: None,
2125 },
2126 ],
2127 distinct: false,
2128 input: Box::new(LogicalOperator::Expand(ExpandOp {
2129 from_variable: "a".to_string(),
2130 to_variable: "b".to_string(),
2131 edge_variable: None,
2132 direction: ExpandDirection::Outgoing,
2133 edge_type: Some("KNOWS".to_string()),
2134 min_hops: 1,
2135 max_hops: Some(3),
2136 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2137 variable: "a".to_string(),
2138 label: None,
2139 input: None,
2140 })),
2141 path_alias: Some("p".to_string()),
2142 })),
2143 }));
2144
2145 let physical = planner.plan(&logical).unwrap();
2146 assert!(physical.columns().contains(&"a".to_string()));
2148 assert!(physical.columns().contains(&"b".to_string()));
2149 }
2150
2151 #[test]
2152 fn test_plan_expand_incoming() {
2153 let store = create_test_store();
2154 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2155
2156 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2158 items: vec![
2159 ReturnItem {
2160 expression: LogicalExpression::Variable("a".to_string()),
2161 alias: None,
2162 },
2163 ReturnItem {
2164 expression: LogicalExpression::Variable("b".to_string()),
2165 alias: None,
2166 },
2167 ],
2168 distinct: false,
2169 input: Box::new(LogicalOperator::Expand(ExpandOp {
2170 from_variable: "a".to_string(),
2171 to_variable: "b".to_string(),
2172 edge_variable: None,
2173 direction: ExpandDirection::Incoming,
2174 edge_type: Some("KNOWS".to_string()),
2175 min_hops: 1,
2176 max_hops: Some(1),
2177 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2178 variable: "a".to_string(),
2179 label: None,
2180 input: None,
2181 })),
2182 path_alias: None,
2183 })),
2184 }));
2185
2186 let physical = planner.plan(&logical).unwrap();
2187 assert!(physical.columns().contains(&"a".to_string()));
2188 assert!(physical.columns().contains(&"b".to_string()));
2189 }
2190
2191 #[test]
2192 fn test_plan_expand_both_directions() {
2193 let store = create_test_store();
2194 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2195
2196 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2198 items: vec![
2199 ReturnItem {
2200 expression: LogicalExpression::Variable("a".to_string()),
2201 alias: None,
2202 },
2203 ReturnItem {
2204 expression: LogicalExpression::Variable("b".to_string()),
2205 alias: None,
2206 },
2207 ],
2208 distinct: false,
2209 input: Box::new(LogicalOperator::Expand(ExpandOp {
2210 from_variable: "a".to_string(),
2211 to_variable: "b".to_string(),
2212 edge_variable: None,
2213 direction: ExpandDirection::Both,
2214 edge_type: Some("KNOWS".to_string()),
2215 min_hops: 1,
2216 max_hops: Some(1),
2217 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2218 variable: "a".to_string(),
2219 label: None,
2220 input: None,
2221 })),
2222 path_alias: None,
2223 })),
2224 }));
2225
2226 let physical = planner.plan(&logical).unwrap();
2227 assert!(physical.columns().contains(&"a".to_string()));
2228 assert!(physical.columns().contains(&"b".to_string()));
2229 }
2230
2231 #[test]
2234 fn test_planner_with_context() {
2235 use crate::transaction::TransactionManager;
2236
2237 let store = create_test_store();
2238 let tx_manager = Arc::new(TransactionManager::new());
2239 let tx_id = tx_manager.begin();
2240 let epoch = tx_manager.current_epoch();
2241
2242 let planner = Planner::with_context(
2243 Arc::clone(&store),
2244 Arc::clone(&tx_manager),
2245 Some(tx_id),
2246 epoch,
2247 );
2248
2249 assert_eq!(planner.tx_id(), Some(tx_id));
2250 assert!(planner.tx_manager().is_some());
2251 assert_eq!(planner.viewing_epoch(), epoch);
2252 }
2253
2254 #[test]
2255 fn test_planner_with_factorized_execution_disabled() {
2256 let store = create_test_store();
2257 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2258
2259 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2261 items: vec![
2262 ReturnItem {
2263 expression: LogicalExpression::Variable("a".to_string()),
2264 alias: None,
2265 },
2266 ReturnItem {
2267 expression: LogicalExpression::Variable("c".to_string()),
2268 alias: None,
2269 },
2270 ],
2271 distinct: false,
2272 input: Box::new(LogicalOperator::Expand(ExpandOp {
2273 from_variable: "b".to_string(),
2274 to_variable: "c".to_string(),
2275 edge_variable: None,
2276 direction: ExpandDirection::Outgoing,
2277 edge_type: None,
2278 min_hops: 1,
2279 max_hops: Some(1),
2280 input: Box::new(LogicalOperator::Expand(ExpandOp {
2281 from_variable: "a".to_string(),
2282 to_variable: "b".to_string(),
2283 edge_variable: None,
2284 direction: ExpandDirection::Outgoing,
2285 edge_type: None,
2286 min_hops: 1,
2287 max_hops: Some(1),
2288 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2289 variable: "a".to_string(),
2290 label: None,
2291 input: None,
2292 })),
2293 path_alias: None,
2294 })),
2295 path_alias: None,
2296 })),
2297 }));
2298
2299 let physical = planner.plan(&logical).unwrap();
2300 assert!(physical.columns().contains(&"a".to_string()));
2301 assert!(physical.columns().contains(&"c".to_string()));
2302 }
2303
2304 #[test]
2307 fn test_plan_sort_by_property() {
2308 let store = create_test_store();
2309 let planner = Planner::new(store);
2310
2311 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2313 items: vec![ReturnItem {
2314 expression: LogicalExpression::Variable("n".to_string()),
2315 alias: None,
2316 }],
2317 distinct: false,
2318 input: Box::new(LogicalOperator::Sort(SortOp {
2319 keys: vec![SortKey {
2320 expression: LogicalExpression::Property {
2321 variable: "n".to_string(),
2322 property: "name".to_string(),
2323 },
2324 order: SortOrder::Ascending,
2325 }],
2326 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2327 variable: "n".to_string(),
2328 label: None,
2329 input: None,
2330 })),
2331 })),
2332 }));
2333
2334 let physical = planner.plan(&logical).unwrap();
2335 assert!(physical.columns().contains(&"n".to_string()));
2337 }
2338
2339 #[test]
2342 fn test_plan_scan_with_input() {
2343 let store = create_test_store();
2344 let planner = Planner::new(store);
2345
2346 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2348 items: vec![
2349 ReturnItem {
2350 expression: LogicalExpression::Variable("a".to_string()),
2351 alias: None,
2352 },
2353 ReturnItem {
2354 expression: LogicalExpression::Variable("b".to_string()),
2355 alias: None,
2356 },
2357 ],
2358 distinct: false,
2359 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2360 variable: "b".to_string(),
2361 label: Some("Company".to_string()),
2362 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2363 variable: "a".to_string(),
2364 label: Some("Person".to_string()),
2365 input: None,
2366 }))),
2367 })),
2368 }));
2369
2370 let physical = planner.plan(&logical).unwrap();
2371 assert!(physical.columns().contains(&"a".to_string()));
2372 assert!(physical.columns().contains(&"b".to_string()));
2373 }
2374}