1mod aggregate;
8mod expand;
9mod expression;
10mod filter;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16use crate::query::plan::{
17 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
18 CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
19 ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
20 LogicalOperator, LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp,
21 ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
22};
23use grafeo_common::types::{EpochId, TxId};
24use grafeo_common::types::{LogicalType, Value};
25use grafeo_common::utils::error::{Error, Result};
26use grafeo_core::execution::AdaptiveContext;
27use grafeo_core::execution::operators::{
28 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
29 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
30 CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
31 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
32 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
33 HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
34 LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
35 NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
36 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
37 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
38 UnwindOperator, VariableLengthExpandOperator,
39};
40use grafeo_core::graph::{Direction, lpg::LpgStore};
41use std::collections::HashMap;
42use std::sync::Arc;
43
44use crate::transaction::TransactionManager;
45
46struct RangeBounds<'a> {
48 min: Option<&'a Value>,
49 max: Option<&'a Value>,
50 min_inclusive: bool,
51 max_inclusive: bool,
52}
53
54pub struct Planner {
56 pub(super) store: Arc<LpgStore>,
58 pub(super) tx_manager: Option<Arc<TransactionManager>>,
60 pub(super) tx_id: Option<TxId>,
62 pub(super) viewing_epoch: EpochId,
64 pub(super) anon_edge_counter: std::cell::Cell<u32>,
66 pub(super) factorized_execution: bool,
68 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
71 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
74}
75
76impl Planner {
77 #[must_use]
82 pub fn new(store: Arc<LpgStore>) -> Self {
83 let epoch = store.current_epoch();
84 Self {
85 store,
86 tx_manager: None,
87 tx_id: None,
88 viewing_epoch: epoch,
89 anon_edge_counter: std::cell::Cell::new(0),
90 factorized_execution: true,
91 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
92 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
93 }
94 }
95
96 #[must_use]
105 pub fn with_context(
106 store: Arc<LpgStore>,
107 tx_manager: Arc<TransactionManager>,
108 tx_id: Option<TxId>,
109 viewing_epoch: EpochId,
110 ) -> Self {
111 Self {
112 store,
113 tx_manager: Some(tx_manager),
114 tx_id,
115 viewing_epoch,
116 anon_edge_counter: std::cell::Cell::new(0),
117 factorized_execution: true,
118 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
119 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
120 }
121 }
122
123 #[must_use]
125 pub fn viewing_epoch(&self) -> EpochId {
126 self.viewing_epoch
127 }
128
129 #[must_use]
131 pub fn tx_id(&self) -> Option<TxId> {
132 self.tx_id
133 }
134
135 #[must_use]
137 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
138 self.tx_manager.as_ref()
139 }
140
141 #[must_use]
143 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
144 self.factorized_execution = enabled;
145 self
146 }
147
148 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
152 match op {
153 LogicalOperator::Expand(expand) => {
154 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
156
157 if is_single_hop {
158 let (inner_count, base) = Self::count_expand_chain(&expand.input);
159 (inner_count + 1, base)
160 } else {
161 (0, op)
163 }
164 }
165 _ => (0, op),
166 }
167 }
168
169 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
173 let mut chain = Vec::new();
174 let mut current = op;
175
176 while let LogicalOperator::Expand(expand) = current {
177 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
179 if !is_single_hop {
180 break;
181 }
182 chain.push(expand);
183 current = &expand.input;
184 }
185
186 chain.reverse();
188 chain
189 }
190
191 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
197 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
198 Ok(PhysicalPlan {
199 operator,
200 columns,
201 adaptive_context: None,
202 })
203 }
204
205 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
214 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
215
216 let mut adaptive_context = AdaptiveContext::new();
218 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
219
220 Ok(PhysicalPlan {
221 operator,
222 columns,
223 adaptive_context: Some(adaptive_context),
224 })
225 }
226
227 fn collect_cardinality_estimates(
229 &self,
230 op: &LogicalOperator,
231 ctx: &mut AdaptiveContext,
232 depth: usize,
233 ) {
234 match op {
235 LogicalOperator::NodeScan(scan) => {
236 let estimate = if let Some(label) = &scan.label {
238 self.store.nodes_by_label(label).len() as f64
239 } else {
240 self.store.node_count() as f64
241 };
242 let id = format!("scan_{}", scan.variable);
243 ctx.set_estimate(&id, estimate);
244
245 if let Some(input) = &scan.input {
247 self.collect_cardinality_estimates(input, ctx, depth + 1);
248 }
249 }
250 LogicalOperator::Filter(filter) => {
251 let input_estimate = self.estimate_cardinality(&filter.input);
253 let estimate = input_estimate * 0.3;
254 let id = format!("filter_{depth}");
255 ctx.set_estimate(&id, estimate);
256
257 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
258 }
259 LogicalOperator::Expand(expand) => {
260 let input_estimate = self.estimate_cardinality(&expand.input);
262 let stats = self.store.statistics();
263 let avg_degree = self.estimate_expand_degree(&stats, expand);
264 let estimate = input_estimate * avg_degree;
265 let id = format!("expand_{}", expand.to_variable);
266 ctx.set_estimate(&id, estimate);
267
268 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
269 }
270 LogicalOperator::Join(join) => {
271 let left_est = self.estimate_cardinality(&join.left);
273 let right_est = self.estimate_cardinality(&join.right);
274 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
276 ctx.set_estimate(&id, estimate);
277
278 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
279 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
280 }
281 LogicalOperator::Aggregate(agg) => {
282 let input_estimate = self.estimate_cardinality(&agg.input);
284 let estimate = if agg.group_by.is_empty() {
285 1.0 } else {
287 (input_estimate * 0.1).max(1.0) };
289 let id = format!("aggregate_{depth}");
290 ctx.set_estimate(&id, estimate);
291
292 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
293 }
294 LogicalOperator::Distinct(distinct) => {
295 let input_estimate = self.estimate_cardinality(&distinct.input);
296 let estimate = (input_estimate * 0.5).max(1.0);
297 let id = format!("distinct_{depth}");
298 ctx.set_estimate(&id, estimate);
299
300 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
301 }
302 LogicalOperator::Return(ret) => {
303 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
304 }
305 LogicalOperator::Limit(limit) => {
306 let input_estimate = self.estimate_cardinality(&limit.input);
307 let estimate = (input_estimate).min(limit.count as f64);
308 let id = format!("limit_{depth}");
309 ctx.set_estimate(&id, estimate);
310
311 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
312 }
313 LogicalOperator::Skip(skip) => {
314 let input_estimate = self.estimate_cardinality(&skip.input);
315 let estimate = (input_estimate - skip.count as f64).max(0.0);
316 let id = format!("skip_{depth}");
317 ctx.set_estimate(&id, estimate);
318
319 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
320 }
321 LogicalOperator::Sort(sort) => {
322 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
324 }
325 LogicalOperator::Union(union) => {
326 let estimate: f64 = union
327 .inputs
328 .iter()
329 .map(|input| self.estimate_cardinality(input))
330 .sum();
331 let id = format!("union_{depth}");
332 ctx.set_estimate(&id, estimate);
333
334 for input in &union.inputs {
335 self.collect_cardinality_estimates(input, ctx, depth + 1);
336 }
337 }
338 _ => {
339 }
341 }
342 }
343
344 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
346 match op {
347 LogicalOperator::NodeScan(scan) => {
348 if let Some(label) = &scan.label {
349 self.store.nodes_by_label(label).len() as f64
350 } else {
351 self.store.node_count() as f64
352 }
353 }
354 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
355 LogicalOperator::Expand(expand) => {
356 let stats = self.store.statistics();
357 let avg_degree = self.estimate_expand_degree(&stats, expand);
358 self.estimate_cardinality(&expand.input) * avg_degree
359 }
360 LogicalOperator::Join(join) => {
361 let left = self.estimate_cardinality(&join.left);
362 let right = self.estimate_cardinality(&join.right);
363 (left * right).sqrt()
364 }
365 LogicalOperator::Aggregate(agg) => {
366 if agg.group_by.is_empty() {
367 1.0
368 } else {
369 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
370 }
371 }
372 LogicalOperator::Distinct(distinct) => {
373 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
374 }
375 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
376 LogicalOperator::Limit(limit) => self
377 .estimate_cardinality(&limit.input)
378 .min(limit.count as f64),
379 LogicalOperator::Skip(skip) => {
380 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
381 }
382 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
383 LogicalOperator::Union(union) => union
384 .inputs
385 .iter()
386 .map(|input| self.estimate_cardinality(input))
387 .sum(),
388 _ => 1000.0, }
390 }
391
392 fn estimate_expand_degree(
394 &self,
395 stats: &grafeo_core::statistics::Statistics,
396 expand: &ExpandOp,
397 ) -> f64 {
398 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
399 if let Some(edge_type) = &expand.edge_type {
400 stats.estimate_avg_degree(edge_type, outgoing)
401 } else if stats.total_nodes > 0 {
402 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
403 } else {
404 10.0 }
406 }
407
408 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
410 match op {
411 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
412 LogicalOperator::Expand(expand) => {
413 if self.factorized_execution {
415 let (chain_len, _base) = Self::count_expand_chain(op);
416 if chain_len >= 2 {
417 return self.plan_expand_chain(op);
419 }
420 }
421 self.plan_expand(expand)
422 }
423 LogicalOperator::Return(ret) => self.plan_return(ret),
424 LogicalOperator::Filter(filter) => self.plan_filter(filter),
425 LogicalOperator::Project(project) => self.plan_project(project),
426 LogicalOperator::Limit(limit) => self.plan_limit(limit),
427 LogicalOperator::Skip(skip) => self.plan_skip(skip),
428 LogicalOperator::Sort(sort) => self.plan_sort(sort),
429 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
430 LogicalOperator::Join(join) => self.plan_join(join),
431 LogicalOperator::Union(union) => self.plan_union(union),
432 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
433 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
434 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
435 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
436 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
437 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
438 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
439 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
440 LogicalOperator::Merge(merge) => self.plan_merge(merge),
441 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
442 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
443 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
444 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
445 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
446 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
447 LogicalOperator::VectorScan(_) => Err(Error::Internal(
448 "VectorScan requires vector-index feature".to_string(),
449 )),
450 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
451 "VectorJoin requires vector-index feature".to_string(),
452 )),
453 _ => Err(Error::Internal(format!(
454 "Unsupported operator: {:?}",
455 std::mem::discriminant(op)
456 ))),
457 }
458 }
459}
460
461pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
463 match op {
464 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
465 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
466 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
467 BinaryOp::Le => Ok(BinaryFilterOp::Le),
468 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
469 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
470 BinaryOp::And => Ok(BinaryFilterOp::And),
471 BinaryOp::Or => Ok(BinaryFilterOp::Or),
472 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
473 BinaryOp::Add => Ok(BinaryFilterOp::Add),
474 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
475 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
476 BinaryOp::Div => Ok(BinaryFilterOp::Div),
477 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
478 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
479 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
480 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
481 BinaryOp::In => Ok(BinaryFilterOp::In),
482 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
483 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
484 BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
485 "Binary operator {:?} not yet supported in filters",
486 op
487 ))),
488 }
489}
490
491pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
493 match op {
494 UnaryOp::Not => Ok(UnaryFilterOp::Not),
495 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
496 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
497 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
498 }
499}
500
501pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
503 match func {
504 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
505 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
506 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
507 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
508 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
509 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
510 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
511 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
512 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
513 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
514 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
515 }
516}
517
518pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
522 match expr {
523 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
524 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
525 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
526 variable: variable.clone(),
527 property: property.clone(),
528 }),
529 LogicalExpression::Binary { left, op, right } => {
530 let left_expr = convert_filter_expression(left)?;
531 let right_expr = convert_filter_expression(right)?;
532 let filter_op = convert_binary_op(*op)?;
533 Ok(FilterExpression::Binary {
534 left: Box::new(left_expr),
535 op: filter_op,
536 right: Box::new(right_expr),
537 })
538 }
539 LogicalExpression::Unary { op, operand } => {
540 let operand_expr = convert_filter_expression(operand)?;
541 let filter_op = convert_unary_op(*op)?;
542 Ok(FilterExpression::Unary {
543 op: filter_op,
544 operand: Box::new(operand_expr),
545 })
546 }
547 LogicalExpression::FunctionCall { name, args, .. } => {
548 let filter_args: Vec<FilterExpression> = args
549 .iter()
550 .map(convert_filter_expression)
551 .collect::<Result<Vec<_>>>()?;
552 Ok(FilterExpression::FunctionCall {
553 name: name.clone(),
554 args: filter_args,
555 })
556 }
557 LogicalExpression::Case {
558 operand,
559 when_clauses,
560 else_clause,
561 } => {
562 let filter_operand = operand
563 .as_ref()
564 .map(|e| convert_filter_expression(e))
565 .transpose()?
566 .map(Box::new);
567 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
568 .iter()
569 .map(|(cond, result)| {
570 Ok((
571 convert_filter_expression(cond)?,
572 convert_filter_expression(result)?,
573 ))
574 })
575 .collect::<Result<Vec<_>>>()?;
576 let filter_else = else_clause
577 .as_ref()
578 .map(|e| convert_filter_expression(e))
579 .transpose()?
580 .map(Box::new);
581 Ok(FilterExpression::Case {
582 operand: filter_operand,
583 when_clauses: filter_when_clauses,
584 else_clause: filter_else,
585 })
586 }
587 LogicalExpression::List(items) => {
588 let filter_items: Vec<FilterExpression> = items
589 .iter()
590 .map(convert_filter_expression)
591 .collect::<Result<Vec<_>>>()?;
592 Ok(FilterExpression::List(filter_items))
593 }
594 LogicalExpression::Map(pairs) => {
595 let filter_pairs: Vec<(String, FilterExpression)> = pairs
596 .iter()
597 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
598 .collect::<Result<Vec<_>>>()?;
599 Ok(FilterExpression::Map(filter_pairs))
600 }
601 LogicalExpression::IndexAccess { base, index } => {
602 let base_expr = convert_filter_expression(base)?;
603 let index_expr = convert_filter_expression(index)?;
604 Ok(FilterExpression::IndexAccess {
605 base: Box::new(base_expr),
606 index: Box::new(index_expr),
607 })
608 }
609 LogicalExpression::SliceAccess { base, start, end } => {
610 let base_expr = convert_filter_expression(base)?;
611 let start_expr = start
612 .as_ref()
613 .map(|s| convert_filter_expression(s))
614 .transpose()?
615 .map(Box::new);
616 let end_expr = end
617 .as_ref()
618 .map(|e| convert_filter_expression(e))
619 .transpose()?
620 .map(Box::new);
621 Ok(FilterExpression::SliceAccess {
622 base: Box::new(base_expr),
623 start: start_expr,
624 end: end_expr,
625 })
626 }
627 LogicalExpression::Parameter(_) => Err(Error::Internal(
628 "Parameters not yet supported in filters".to_string(),
629 )),
630 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
631 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
632 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
633 LogicalExpression::ListComprehension {
634 variable,
635 list_expr,
636 filter_expr,
637 map_expr,
638 } => {
639 let list = convert_filter_expression(list_expr)?;
640 let filter = filter_expr
641 .as_ref()
642 .map(|f| convert_filter_expression(f))
643 .transpose()?
644 .map(Box::new);
645 let map = convert_filter_expression(map_expr)?;
646 Ok(FilterExpression::ListComprehension {
647 variable: variable.clone(),
648 list_expr: Box::new(list),
649 filter_expr: filter,
650 map_expr: Box::new(map),
651 })
652 }
653 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
654 Error::Internal("Subqueries not yet supported in filters".to_string()),
655 ),
656 }
657}
658
659fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
661 use grafeo_common::types::Value;
662 match value {
663 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
665 Value::Int64(_) => LogicalType::Int64,
666 Value::Float64(_) => LogicalType::Float64,
667 Value::String(_) => LogicalType::String,
668 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
670 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
673 }
674}
675
676fn expression_to_string(expr: &LogicalExpression) -> String {
678 match expr {
679 LogicalExpression::Variable(name) => name.clone(),
680 LogicalExpression::Property { variable, property } => {
681 format!("{variable}.{property}")
682 }
683 LogicalExpression::Literal(value) => format!("{value:?}"),
684 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
685 _ => "expr".to_string(),
686 }
687}
688
689pub struct PhysicalPlan {
691 pub operator: Box<dyn Operator>,
693 pub columns: Vec<String>,
695 pub adaptive_context: Option<AdaptiveContext>,
701}
702
703impl PhysicalPlan {
704 #[must_use]
706 pub fn columns(&self) -> &[String] {
707 &self.columns
708 }
709
710 pub fn into_operator(self) -> Box<dyn Operator> {
712 self.operator
713 }
714
715 #[must_use]
717 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
718 self.adaptive_context.as_ref()
719 }
720
721 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
723 self.adaptive_context.take()
724 }
725}
726
727struct StaticResultOperator {
729 rows: Vec<Vec<Value>>,
730 column_indices: Vec<usize>,
731 row_index: usize,
732}
733
734impl Operator for StaticResultOperator {
735 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
736 use grafeo_core::execution::DataChunk;
737
738 if self.row_index >= self.rows.len() {
739 return Ok(None);
740 }
741
742 let remaining = self.rows.len() - self.row_index;
743 let chunk_rows = remaining.min(1024);
744 let col_count = self.column_indices.len();
745
746 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
747 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
748
749 for row_offset in 0..chunk_rows {
750 let row = &self.rows[self.row_index + row_offset];
751 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
752 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
753 if let Some(col) = chunk.column_mut(col_idx) {
754 col.push_value(value);
755 }
756 }
757 }
758 chunk.set_count(chunk_rows);
759
760 self.row_index += chunk_rows;
761 Ok(Some(chunk))
762 }
763
764 fn reset(&mut self) {
765 self.row_index = 0;
766 }
767
768 fn name(&self) -> &'static str {
769 "StaticResult"
770 }
771}
772
773#[cfg(test)]
774mod tests {
775 use super::*;
776 use crate::query::plan::{
777 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
778 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
779 LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
780 SortKey, SortOp,
781 };
782 use grafeo_common::types::Value;
783
784 fn create_test_store() -> Arc<LpgStore> {
785 let store = Arc::new(LpgStore::new());
786 store.create_node(&["Person"]);
787 store.create_node(&["Person"]);
788 store.create_node(&["Company"]);
789 store
790 }
791
792 #[test]
795 fn test_plan_simple_scan() {
796 let store = create_test_store();
797 let planner = Planner::new(store);
798
799 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
801 items: vec![ReturnItem {
802 expression: LogicalExpression::Variable("n".to_string()),
803 alias: None,
804 }],
805 distinct: false,
806 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
807 variable: "n".to_string(),
808 label: Some("Person".to_string()),
809 input: None,
810 })),
811 }));
812
813 let physical = planner.plan(&logical).unwrap();
814 assert_eq!(physical.columns(), &["n"]);
815 }
816
817 #[test]
818 fn test_plan_scan_without_label() {
819 let store = create_test_store();
820 let planner = Planner::new(store);
821
822 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
824 items: vec![ReturnItem {
825 expression: LogicalExpression::Variable("n".to_string()),
826 alias: None,
827 }],
828 distinct: false,
829 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
830 variable: "n".to_string(),
831 label: None,
832 input: None,
833 })),
834 }));
835
836 let physical = planner.plan(&logical).unwrap();
837 assert_eq!(physical.columns(), &["n"]);
838 }
839
840 #[test]
841 fn test_plan_return_with_alias() {
842 let store = create_test_store();
843 let planner = Planner::new(store);
844
845 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
847 items: vec![ReturnItem {
848 expression: LogicalExpression::Variable("n".to_string()),
849 alias: Some("person".to_string()),
850 }],
851 distinct: false,
852 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
853 variable: "n".to_string(),
854 label: Some("Person".to_string()),
855 input: None,
856 })),
857 }));
858
859 let physical = planner.plan(&logical).unwrap();
860 assert_eq!(physical.columns(), &["person"]);
861 }
862
863 #[test]
864 fn test_plan_return_property() {
865 let store = create_test_store();
866 let planner = Planner::new(store);
867
868 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
870 items: vec![ReturnItem {
871 expression: LogicalExpression::Property {
872 variable: "n".to_string(),
873 property: "name".to_string(),
874 },
875 alias: None,
876 }],
877 distinct: false,
878 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
879 variable: "n".to_string(),
880 label: Some("Person".to_string()),
881 input: None,
882 })),
883 }));
884
885 let physical = planner.plan(&logical).unwrap();
886 assert_eq!(physical.columns(), &["n.name"]);
887 }
888
889 #[test]
890 fn test_plan_return_literal() {
891 let store = create_test_store();
892 let planner = Planner::new(store);
893
894 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
896 items: vec![ReturnItem {
897 expression: LogicalExpression::Literal(Value::Int64(42)),
898 alias: Some("answer".to_string()),
899 }],
900 distinct: false,
901 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
902 variable: "n".to_string(),
903 label: None,
904 input: None,
905 })),
906 }));
907
908 let physical = planner.plan(&logical).unwrap();
909 assert_eq!(physical.columns(), &["answer"]);
910 }
911
912 #[test]
915 fn test_plan_filter_equality() {
916 let store = create_test_store();
917 let planner = Planner::new(store);
918
919 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
921 items: vec![ReturnItem {
922 expression: LogicalExpression::Variable("n".to_string()),
923 alias: None,
924 }],
925 distinct: false,
926 input: Box::new(LogicalOperator::Filter(FilterOp {
927 predicate: LogicalExpression::Binary {
928 left: Box::new(LogicalExpression::Property {
929 variable: "n".to_string(),
930 property: "age".to_string(),
931 }),
932 op: BinaryOp::Eq,
933 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
934 },
935 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
936 variable: "n".to_string(),
937 label: Some("Person".to_string()),
938 input: None,
939 })),
940 })),
941 }));
942
943 let physical = planner.plan(&logical).unwrap();
944 assert_eq!(physical.columns(), &["n"]);
945 }
946
947 #[test]
948 fn test_plan_filter_compound_and() {
949 let store = create_test_store();
950 let planner = Planner::new(store);
951
952 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
954 items: vec![ReturnItem {
955 expression: LogicalExpression::Variable("n".to_string()),
956 alias: None,
957 }],
958 distinct: false,
959 input: Box::new(LogicalOperator::Filter(FilterOp {
960 predicate: LogicalExpression::Binary {
961 left: Box::new(LogicalExpression::Binary {
962 left: Box::new(LogicalExpression::Property {
963 variable: "n".to_string(),
964 property: "age".to_string(),
965 }),
966 op: BinaryOp::Gt,
967 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
968 }),
969 op: BinaryOp::And,
970 right: Box::new(LogicalExpression::Binary {
971 left: Box::new(LogicalExpression::Property {
972 variable: "n".to_string(),
973 property: "age".to_string(),
974 }),
975 op: BinaryOp::Lt,
976 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
977 }),
978 },
979 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
980 variable: "n".to_string(),
981 label: None,
982 input: None,
983 })),
984 })),
985 }));
986
987 let physical = planner.plan(&logical).unwrap();
988 assert_eq!(physical.columns(), &["n"]);
989 }
990
991 #[test]
992 fn test_plan_filter_unary_not() {
993 let store = create_test_store();
994 let planner = Planner::new(store);
995
996 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
998 items: vec![ReturnItem {
999 expression: LogicalExpression::Variable("n".to_string()),
1000 alias: None,
1001 }],
1002 distinct: false,
1003 input: Box::new(LogicalOperator::Filter(FilterOp {
1004 predicate: LogicalExpression::Unary {
1005 op: UnaryOp::Not,
1006 operand: Box::new(LogicalExpression::Property {
1007 variable: "n".to_string(),
1008 property: "active".to_string(),
1009 }),
1010 },
1011 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1012 variable: "n".to_string(),
1013 label: None,
1014 input: None,
1015 })),
1016 })),
1017 }));
1018
1019 let physical = planner.plan(&logical).unwrap();
1020 assert_eq!(physical.columns(), &["n"]);
1021 }
1022
1023 #[test]
1024 fn test_plan_filter_is_null() {
1025 let store = create_test_store();
1026 let planner = Planner::new(store);
1027
1028 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1030 items: vec![ReturnItem {
1031 expression: LogicalExpression::Variable("n".to_string()),
1032 alias: None,
1033 }],
1034 distinct: false,
1035 input: Box::new(LogicalOperator::Filter(FilterOp {
1036 predicate: LogicalExpression::Unary {
1037 op: UnaryOp::IsNull,
1038 operand: Box::new(LogicalExpression::Property {
1039 variable: "n".to_string(),
1040 property: "email".to_string(),
1041 }),
1042 },
1043 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1044 variable: "n".to_string(),
1045 label: None,
1046 input: None,
1047 })),
1048 })),
1049 }));
1050
1051 let physical = planner.plan(&logical).unwrap();
1052 assert_eq!(physical.columns(), &["n"]);
1053 }
1054
1055 #[test]
1056 fn test_plan_filter_function_call() {
1057 let store = create_test_store();
1058 let planner = Planner::new(store);
1059
1060 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1062 items: vec![ReturnItem {
1063 expression: LogicalExpression::Variable("n".to_string()),
1064 alias: None,
1065 }],
1066 distinct: false,
1067 input: Box::new(LogicalOperator::Filter(FilterOp {
1068 predicate: LogicalExpression::Binary {
1069 left: Box::new(LogicalExpression::FunctionCall {
1070 name: "size".to_string(),
1071 args: vec![LogicalExpression::Property {
1072 variable: "n".to_string(),
1073 property: "friends".to_string(),
1074 }],
1075 distinct: false,
1076 }),
1077 op: BinaryOp::Gt,
1078 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1079 },
1080 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1081 variable: "n".to_string(),
1082 label: None,
1083 input: None,
1084 })),
1085 })),
1086 }));
1087
1088 let physical = planner.plan(&logical).unwrap();
1089 assert_eq!(physical.columns(), &["n"]);
1090 }
1091
1092 #[test]
1095 fn test_plan_expand_outgoing() {
1096 let store = create_test_store();
1097 let planner = Planner::new(store);
1098
1099 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1101 items: vec![
1102 ReturnItem {
1103 expression: LogicalExpression::Variable("a".to_string()),
1104 alias: None,
1105 },
1106 ReturnItem {
1107 expression: LogicalExpression::Variable("b".to_string()),
1108 alias: None,
1109 },
1110 ],
1111 distinct: false,
1112 input: Box::new(LogicalOperator::Expand(ExpandOp {
1113 from_variable: "a".to_string(),
1114 to_variable: "b".to_string(),
1115 edge_variable: None,
1116 direction: ExpandDirection::Outgoing,
1117 edge_type: Some("KNOWS".to_string()),
1118 min_hops: 1,
1119 max_hops: Some(1),
1120 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1121 variable: "a".to_string(),
1122 label: Some("Person".to_string()),
1123 input: None,
1124 })),
1125 path_alias: None,
1126 })),
1127 }));
1128
1129 let physical = planner.plan(&logical).unwrap();
1130 assert!(physical.columns().contains(&"a".to_string()));
1132 assert!(physical.columns().contains(&"b".to_string()));
1133 }
1134
1135 #[test]
1136 fn test_plan_expand_with_edge_variable() {
1137 let store = create_test_store();
1138 let planner = Planner::new(store);
1139
1140 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1142 items: vec![
1143 ReturnItem {
1144 expression: LogicalExpression::Variable("a".to_string()),
1145 alias: None,
1146 },
1147 ReturnItem {
1148 expression: LogicalExpression::Variable("r".to_string()),
1149 alias: None,
1150 },
1151 ReturnItem {
1152 expression: LogicalExpression::Variable("b".to_string()),
1153 alias: None,
1154 },
1155 ],
1156 distinct: false,
1157 input: Box::new(LogicalOperator::Expand(ExpandOp {
1158 from_variable: "a".to_string(),
1159 to_variable: "b".to_string(),
1160 edge_variable: Some("r".to_string()),
1161 direction: ExpandDirection::Outgoing,
1162 edge_type: Some("KNOWS".to_string()),
1163 min_hops: 1,
1164 max_hops: Some(1),
1165 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1166 variable: "a".to_string(),
1167 label: None,
1168 input: None,
1169 })),
1170 path_alias: None,
1171 })),
1172 }));
1173
1174 let physical = planner.plan(&logical).unwrap();
1175 assert!(physical.columns().contains(&"a".to_string()));
1176 assert!(physical.columns().contains(&"r".to_string()));
1177 assert!(physical.columns().contains(&"b".to_string()));
1178 }
1179
1180 #[test]
1183 fn test_plan_limit() {
1184 let store = create_test_store();
1185 let planner = Planner::new(store);
1186
1187 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1189 items: vec![ReturnItem {
1190 expression: LogicalExpression::Variable("n".to_string()),
1191 alias: None,
1192 }],
1193 distinct: false,
1194 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1195 count: 10,
1196 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1197 variable: "n".to_string(),
1198 label: None,
1199 input: None,
1200 })),
1201 })),
1202 }));
1203
1204 let physical = planner.plan(&logical).unwrap();
1205 assert_eq!(physical.columns(), &["n"]);
1206 }
1207
1208 #[test]
1209 fn test_plan_skip() {
1210 let store = create_test_store();
1211 let planner = Planner::new(store);
1212
1213 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1215 items: vec![ReturnItem {
1216 expression: LogicalExpression::Variable("n".to_string()),
1217 alias: None,
1218 }],
1219 distinct: false,
1220 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1221 count: 5,
1222 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1223 variable: "n".to_string(),
1224 label: None,
1225 input: None,
1226 })),
1227 })),
1228 }));
1229
1230 let physical = planner.plan(&logical).unwrap();
1231 assert_eq!(physical.columns(), &["n"]);
1232 }
1233
1234 #[test]
1235 fn test_plan_sort() {
1236 let store = create_test_store();
1237 let planner = Planner::new(store);
1238
1239 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1241 items: vec![ReturnItem {
1242 expression: LogicalExpression::Variable("n".to_string()),
1243 alias: None,
1244 }],
1245 distinct: false,
1246 input: Box::new(LogicalOperator::Sort(SortOp {
1247 keys: vec![SortKey {
1248 expression: LogicalExpression::Variable("n".to_string()),
1249 order: SortOrder::Ascending,
1250 }],
1251 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1252 variable: "n".to_string(),
1253 label: None,
1254 input: None,
1255 })),
1256 })),
1257 }));
1258
1259 let physical = planner.plan(&logical).unwrap();
1260 assert_eq!(physical.columns(), &["n"]);
1261 }
1262
1263 #[test]
1264 fn test_plan_sort_descending() {
1265 let store = create_test_store();
1266 let planner = Planner::new(store);
1267
1268 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1270 items: vec![ReturnItem {
1271 expression: LogicalExpression::Variable("n".to_string()),
1272 alias: None,
1273 }],
1274 distinct: false,
1275 input: Box::new(LogicalOperator::Sort(SortOp {
1276 keys: vec![SortKey {
1277 expression: LogicalExpression::Variable("n".to_string()),
1278 order: SortOrder::Descending,
1279 }],
1280 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1281 variable: "n".to_string(),
1282 label: None,
1283 input: None,
1284 })),
1285 })),
1286 }));
1287
1288 let physical = planner.plan(&logical).unwrap();
1289 assert_eq!(physical.columns(), &["n"]);
1290 }
1291
1292 #[test]
1293 fn test_plan_distinct() {
1294 let store = create_test_store();
1295 let planner = Planner::new(store);
1296
1297 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1299 items: vec![ReturnItem {
1300 expression: LogicalExpression::Variable("n".to_string()),
1301 alias: None,
1302 }],
1303 distinct: false,
1304 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1305 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1306 variable: "n".to_string(),
1307 label: None,
1308 input: None,
1309 })),
1310 columns: None,
1311 })),
1312 }));
1313
1314 let physical = planner.plan(&logical).unwrap();
1315 assert_eq!(physical.columns(), &["n"]);
1316 }
1317
1318 #[test]
1321 fn test_plan_aggregate_count() {
1322 let store = create_test_store();
1323 let planner = Planner::new(store);
1324
1325 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1327 items: vec![ReturnItem {
1328 expression: LogicalExpression::Variable("cnt".to_string()),
1329 alias: None,
1330 }],
1331 distinct: false,
1332 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1333 group_by: vec![],
1334 aggregates: vec![LogicalAggregateExpr {
1335 function: LogicalAggregateFunction::Count,
1336 expression: Some(LogicalExpression::Variable("n".to_string())),
1337 distinct: false,
1338 alias: Some("cnt".to_string()),
1339 percentile: None,
1340 }],
1341 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1342 variable: "n".to_string(),
1343 label: None,
1344 input: None,
1345 })),
1346 having: None,
1347 })),
1348 }));
1349
1350 let physical = planner.plan(&logical).unwrap();
1351 assert!(physical.columns().contains(&"cnt".to_string()));
1352 }
1353
1354 #[test]
1355 fn test_plan_aggregate_with_group_by() {
1356 let store = create_test_store();
1357 let planner = Planner::new(store);
1358
1359 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1361 group_by: vec![LogicalExpression::Property {
1362 variable: "n".to_string(),
1363 property: "city".to_string(),
1364 }],
1365 aggregates: vec![LogicalAggregateExpr {
1366 function: LogicalAggregateFunction::Count,
1367 expression: Some(LogicalExpression::Variable("n".to_string())),
1368 distinct: false,
1369 alias: Some("cnt".to_string()),
1370 percentile: None,
1371 }],
1372 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1373 variable: "n".to_string(),
1374 label: Some("Person".to_string()),
1375 input: None,
1376 })),
1377 having: None,
1378 }));
1379
1380 let physical = planner.plan(&logical).unwrap();
1381 assert_eq!(physical.columns().len(), 2);
1382 }
1383
1384 #[test]
1385 fn test_plan_aggregate_sum() {
1386 let store = create_test_store();
1387 let planner = Planner::new(store);
1388
1389 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1391 group_by: vec![],
1392 aggregates: vec![LogicalAggregateExpr {
1393 function: LogicalAggregateFunction::Sum,
1394 expression: Some(LogicalExpression::Property {
1395 variable: "n".to_string(),
1396 property: "value".to_string(),
1397 }),
1398 distinct: false,
1399 alias: Some("total".to_string()),
1400 percentile: None,
1401 }],
1402 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1403 variable: "n".to_string(),
1404 label: None,
1405 input: None,
1406 })),
1407 having: None,
1408 }));
1409
1410 let physical = planner.plan(&logical).unwrap();
1411 assert!(physical.columns().contains(&"total".to_string()));
1412 }
1413
1414 #[test]
1415 fn test_plan_aggregate_avg() {
1416 let store = create_test_store();
1417 let planner = Planner::new(store);
1418
1419 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1421 group_by: vec![],
1422 aggregates: vec![LogicalAggregateExpr {
1423 function: LogicalAggregateFunction::Avg,
1424 expression: Some(LogicalExpression::Property {
1425 variable: "n".to_string(),
1426 property: "score".to_string(),
1427 }),
1428 distinct: false,
1429 alias: Some("average".to_string()),
1430 percentile: None,
1431 }],
1432 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1433 variable: "n".to_string(),
1434 label: None,
1435 input: None,
1436 })),
1437 having: None,
1438 }));
1439
1440 let physical = planner.plan(&logical).unwrap();
1441 assert!(physical.columns().contains(&"average".to_string()));
1442 }
1443
1444 #[test]
1445 fn test_plan_aggregate_min_max() {
1446 let store = create_test_store();
1447 let planner = Planner::new(store);
1448
1449 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1451 group_by: vec![],
1452 aggregates: vec![
1453 LogicalAggregateExpr {
1454 function: LogicalAggregateFunction::Min,
1455 expression: Some(LogicalExpression::Property {
1456 variable: "n".to_string(),
1457 property: "age".to_string(),
1458 }),
1459 distinct: false,
1460 alias: Some("youngest".to_string()),
1461 percentile: None,
1462 },
1463 LogicalAggregateExpr {
1464 function: LogicalAggregateFunction::Max,
1465 expression: Some(LogicalExpression::Property {
1466 variable: "n".to_string(),
1467 property: "age".to_string(),
1468 }),
1469 distinct: false,
1470 alias: Some("oldest".to_string()),
1471 percentile: None,
1472 },
1473 ],
1474 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1475 variable: "n".to_string(),
1476 label: None,
1477 input: None,
1478 })),
1479 having: None,
1480 }));
1481
1482 let physical = planner.plan(&logical).unwrap();
1483 assert!(physical.columns().contains(&"youngest".to_string()));
1484 assert!(physical.columns().contains(&"oldest".to_string()));
1485 }
1486
1487 #[test]
1490 fn test_plan_inner_join() {
1491 let store = create_test_store();
1492 let planner = Planner::new(store);
1493
1494 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1496 items: vec![
1497 ReturnItem {
1498 expression: LogicalExpression::Variable("a".to_string()),
1499 alias: None,
1500 },
1501 ReturnItem {
1502 expression: LogicalExpression::Variable("b".to_string()),
1503 alias: None,
1504 },
1505 ],
1506 distinct: false,
1507 input: Box::new(LogicalOperator::Join(JoinOp {
1508 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1509 variable: "a".to_string(),
1510 label: Some("Person".to_string()),
1511 input: None,
1512 })),
1513 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1514 variable: "b".to_string(),
1515 label: Some("Company".to_string()),
1516 input: None,
1517 })),
1518 join_type: JoinType::Inner,
1519 conditions: vec![JoinCondition {
1520 left: LogicalExpression::Variable("a".to_string()),
1521 right: LogicalExpression::Variable("b".to_string()),
1522 }],
1523 })),
1524 }));
1525
1526 let physical = planner.plan(&logical).unwrap();
1527 assert!(physical.columns().contains(&"a".to_string()));
1528 assert!(physical.columns().contains(&"b".to_string()));
1529 }
1530
1531 #[test]
1532 fn test_plan_cross_join() {
1533 let store = create_test_store();
1534 let planner = Planner::new(store);
1535
1536 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1538 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1539 variable: "a".to_string(),
1540 label: None,
1541 input: None,
1542 })),
1543 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1544 variable: "b".to_string(),
1545 label: None,
1546 input: None,
1547 })),
1548 join_type: JoinType::Cross,
1549 conditions: vec![],
1550 }));
1551
1552 let physical = planner.plan(&logical).unwrap();
1553 assert_eq!(physical.columns().len(), 2);
1554 }
1555
1556 #[test]
1557 fn test_plan_left_join() {
1558 let store = create_test_store();
1559 let planner = Planner::new(store);
1560
1561 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1562 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1563 variable: "a".to_string(),
1564 label: None,
1565 input: None,
1566 })),
1567 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1568 variable: "b".to_string(),
1569 label: None,
1570 input: None,
1571 })),
1572 join_type: JoinType::Left,
1573 conditions: vec![],
1574 }));
1575
1576 let physical = planner.plan(&logical).unwrap();
1577 assert_eq!(physical.columns().len(), 2);
1578 }
1579
1580 #[test]
1583 fn test_plan_create_node() {
1584 let store = create_test_store();
1585 let planner = Planner::new(store);
1586
1587 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1589 variable: "n".to_string(),
1590 labels: vec!["Person".to_string()],
1591 properties: vec![(
1592 "name".to_string(),
1593 LogicalExpression::Literal(Value::String("Alice".into())),
1594 )],
1595 input: None,
1596 }));
1597
1598 let physical = planner.plan(&logical).unwrap();
1599 assert!(physical.columns().contains(&"n".to_string()));
1600 }
1601
1602 #[test]
1603 fn test_plan_create_edge() {
1604 let store = create_test_store();
1605 let planner = Planner::new(store);
1606
1607 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1609 variable: Some("r".to_string()),
1610 from_variable: "a".to_string(),
1611 to_variable: "b".to_string(),
1612 edge_type: "KNOWS".to_string(),
1613 properties: vec![],
1614 input: Box::new(LogicalOperator::Join(JoinOp {
1615 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1616 variable: "a".to_string(),
1617 label: None,
1618 input: None,
1619 })),
1620 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1621 variable: "b".to_string(),
1622 label: None,
1623 input: None,
1624 })),
1625 join_type: JoinType::Cross,
1626 conditions: vec![],
1627 })),
1628 }));
1629
1630 let physical = planner.plan(&logical).unwrap();
1631 assert!(physical.columns().contains(&"r".to_string()));
1632 }
1633
1634 #[test]
1635 fn test_plan_delete_node() {
1636 let store = create_test_store();
1637 let planner = Planner::new(store);
1638
1639 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1641 variable: "n".to_string(),
1642 detach: false,
1643 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1644 variable: "n".to_string(),
1645 label: None,
1646 input: None,
1647 })),
1648 }));
1649
1650 let physical = planner.plan(&logical).unwrap();
1651 assert!(physical.columns().contains(&"deleted_count".to_string()));
1652 }
1653
1654 #[test]
1657 fn test_plan_empty_errors() {
1658 let store = create_test_store();
1659 let planner = Planner::new(store);
1660
1661 let logical = LogicalPlan::new(LogicalOperator::Empty);
1662 let result = planner.plan(&logical);
1663 assert!(result.is_err());
1664 }
1665
1666 #[test]
1667 fn test_plan_missing_variable_in_return() {
1668 let store = create_test_store();
1669 let planner = Planner::new(store);
1670
1671 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1673 items: vec![ReturnItem {
1674 expression: LogicalExpression::Variable("missing".to_string()),
1675 alias: None,
1676 }],
1677 distinct: false,
1678 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1679 variable: "n".to_string(),
1680 label: None,
1681 input: None,
1682 })),
1683 }));
1684
1685 let result = planner.plan(&logical);
1686 assert!(result.is_err());
1687 }
1688
1689 #[test]
1692 fn test_convert_binary_ops() {
1693 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1694 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1695 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1696 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1697 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1698 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1699 assert!(convert_binary_op(BinaryOp::And).is_ok());
1700 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1701 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1702 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1703 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1704 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1705 }
1706
1707 #[test]
1708 fn test_convert_unary_ops() {
1709 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1710 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1711 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1712 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1713 }
1714
1715 #[test]
1716 fn test_convert_aggregate_functions() {
1717 assert!(matches!(
1718 convert_aggregate_function(LogicalAggregateFunction::Count),
1719 PhysicalAggregateFunction::Count
1720 ));
1721 assert!(matches!(
1722 convert_aggregate_function(LogicalAggregateFunction::Sum),
1723 PhysicalAggregateFunction::Sum
1724 ));
1725 assert!(matches!(
1726 convert_aggregate_function(LogicalAggregateFunction::Avg),
1727 PhysicalAggregateFunction::Avg
1728 ));
1729 assert!(matches!(
1730 convert_aggregate_function(LogicalAggregateFunction::Min),
1731 PhysicalAggregateFunction::Min
1732 ));
1733 assert!(matches!(
1734 convert_aggregate_function(LogicalAggregateFunction::Max),
1735 PhysicalAggregateFunction::Max
1736 ));
1737 }
1738
1739 #[test]
1740 fn test_planner_accessors() {
1741 let store = create_test_store();
1742 let planner = Planner::new(Arc::clone(&store));
1743
1744 assert!(planner.tx_id().is_none());
1745 assert!(planner.tx_manager().is_none());
1746 let _ = planner.viewing_epoch(); }
1748
1749 #[test]
1750 fn test_physical_plan_accessors() {
1751 let store = create_test_store();
1752 let planner = Planner::new(store);
1753
1754 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1755 variable: "n".to_string(),
1756 label: None,
1757 input: None,
1758 }));
1759
1760 let physical = planner.plan(&logical).unwrap();
1761 assert_eq!(physical.columns(), &["n"]);
1762
1763 let _ = physical.into_operator();
1765 }
1766
1767 #[test]
1770 fn test_plan_adaptive_with_scan() {
1771 let store = create_test_store();
1772 let planner = Planner::new(store);
1773
1774 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1776 items: vec![ReturnItem {
1777 expression: LogicalExpression::Variable("n".to_string()),
1778 alias: None,
1779 }],
1780 distinct: false,
1781 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1782 variable: "n".to_string(),
1783 label: Some("Person".to_string()),
1784 input: None,
1785 })),
1786 }));
1787
1788 let physical = planner.plan_adaptive(&logical).unwrap();
1789 assert_eq!(physical.columns(), &["n"]);
1790 assert!(physical.adaptive_context.is_some());
1792 }
1793
1794 #[test]
1795 fn test_plan_adaptive_with_filter() {
1796 let store = create_test_store();
1797 let planner = Planner::new(store);
1798
1799 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1801 items: vec![ReturnItem {
1802 expression: LogicalExpression::Variable("n".to_string()),
1803 alias: None,
1804 }],
1805 distinct: false,
1806 input: Box::new(LogicalOperator::Filter(FilterOp {
1807 predicate: LogicalExpression::Binary {
1808 left: Box::new(LogicalExpression::Property {
1809 variable: "n".to_string(),
1810 property: "age".to_string(),
1811 }),
1812 op: BinaryOp::Gt,
1813 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1814 },
1815 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816 variable: "n".to_string(),
1817 label: None,
1818 input: None,
1819 })),
1820 })),
1821 }));
1822
1823 let physical = planner.plan_adaptive(&logical).unwrap();
1824 assert!(physical.adaptive_context.is_some());
1825 }
1826
1827 #[test]
1828 fn test_plan_adaptive_with_expand() {
1829 let store = create_test_store();
1830 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1831
1832 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1834 items: vec![
1835 ReturnItem {
1836 expression: LogicalExpression::Variable("a".to_string()),
1837 alias: None,
1838 },
1839 ReturnItem {
1840 expression: LogicalExpression::Variable("b".to_string()),
1841 alias: None,
1842 },
1843 ],
1844 distinct: false,
1845 input: Box::new(LogicalOperator::Expand(ExpandOp {
1846 from_variable: "a".to_string(),
1847 to_variable: "b".to_string(),
1848 edge_variable: None,
1849 direction: ExpandDirection::Outgoing,
1850 edge_type: Some("KNOWS".to_string()),
1851 min_hops: 1,
1852 max_hops: Some(1),
1853 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1854 variable: "a".to_string(),
1855 label: None,
1856 input: None,
1857 })),
1858 path_alias: None,
1859 })),
1860 }));
1861
1862 let physical = planner.plan_adaptive(&logical).unwrap();
1863 assert!(physical.adaptive_context.is_some());
1864 }
1865
1866 #[test]
1867 fn test_plan_adaptive_with_join() {
1868 let store = create_test_store();
1869 let planner = Planner::new(store);
1870
1871 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1872 items: vec![
1873 ReturnItem {
1874 expression: LogicalExpression::Variable("a".to_string()),
1875 alias: None,
1876 },
1877 ReturnItem {
1878 expression: LogicalExpression::Variable("b".to_string()),
1879 alias: None,
1880 },
1881 ],
1882 distinct: false,
1883 input: Box::new(LogicalOperator::Join(JoinOp {
1884 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1885 variable: "a".to_string(),
1886 label: None,
1887 input: None,
1888 })),
1889 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1890 variable: "b".to_string(),
1891 label: None,
1892 input: None,
1893 })),
1894 join_type: JoinType::Cross,
1895 conditions: vec![],
1896 })),
1897 }));
1898
1899 let physical = planner.plan_adaptive(&logical).unwrap();
1900 assert!(physical.adaptive_context.is_some());
1901 }
1902
1903 #[test]
1904 fn test_plan_adaptive_with_aggregate() {
1905 let store = create_test_store();
1906 let planner = Planner::new(store);
1907
1908 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1909 group_by: vec![],
1910 aggregates: vec![LogicalAggregateExpr {
1911 function: LogicalAggregateFunction::Count,
1912 expression: Some(LogicalExpression::Variable("n".to_string())),
1913 distinct: false,
1914 alias: Some("cnt".to_string()),
1915 percentile: None,
1916 }],
1917 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1918 variable: "n".to_string(),
1919 label: None,
1920 input: None,
1921 })),
1922 having: None,
1923 }));
1924
1925 let physical = planner.plan_adaptive(&logical).unwrap();
1926 assert!(physical.adaptive_context.is_some());
1927 }
1928
1929 #[test]
1930 fn test_plan_adaptive_with_distinct() {
1931 let store = create_test_store();
1932 let planner = Planner::new(store);
1933
1934 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1935 items: vec![ReturnItem {
1936 expression: LogicalExpression::Variable("n".to_string()),
1937 alias: None,
1938 }],
1939 distinct: false,
1940 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1941 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1942 variable: "n".to_string(),
1943 label: None,
1944 input: None,
1945 })),
1946 columns: None,
1947 })),
1948 }));
1949
1950 let physical = planner.plan_adaptive(&logical).unwrap();
1951 assert!(physical.adaptive_context.is_some());
1952 }
1953
1954 #[test]
1955 fn test_plan_adaptive_with_limit() {
1956 let store = create_test_store();
1957 let planner = Planner::new(store);
1958
1959 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1960 items: vec![ReturnItem {
1961 expression: LogicalExpression::Variable("n".to_string()),
1962 alias: None,
1963 }],
1964 distinct: false,
1965 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1966 count: 10,
1967 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1968 variable: "n".to_string(),
1969 label: None,
1970 input: None,
1971 })),
1972 })),
1973 }));
1974
1975 let physical = planner.plan_adaptive(&logical).unwrap();
1976 assert!(physical.adaptive_context.is_some());
1977 }
1978
1979 #[test]
1980 fn test_plan_adaptive_with_skip() {
1981 let store = create_test_store();
1982 let planner = Planner::new(store);
1983
1984 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1985 items: vec![ReturnItem {
1986 expression: LogicalExpression::Variable("n".to_string()),
1987 alias: None,
1988 }],
1989 distinct: false,
1990 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1991 count: 5,
1992 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1993 variable: "n".to_string(),
1994 label: None,
1995 input: None,
1996 })),
1997 })),
1998 }));
1999
2000 let physical = planner.plan_adaptive(&logical).unwrap();
2001 assert!(physical.adaptive_context.is_some());
2002 }
2003
2004 #[test]
2005 fn test_plan_adaptive_with_sort() {
2006 let store = create_test_store();
2007 let planner = Planner::new(store);
2008
2009 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2010 items: vec![ReturnItem {
2011 expression: LogicalExpression::Variable("n".to_string()),
2012 alias: None,
2013 }],
2014 distinct: false,
2015 input: Box::new(LogicalOperator::Sort(SortOp {
2016 keys: vec![SortKey {
2017 expression: LogicalExpression::Variable("n".to_string()),
2018 order: SortOrder::Ascending,
2019 }],
2020 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2021 variable: "n".to_string(),
2022 label: None,
2023 input: None,
2024 })),
2025 })),
2026 }));
2027
2028 let physical = planner.plan_adaptive(&logical).unwrap();
2029 assert!(physical.adaptive_context.is_some());
2030 }
2031
2032 #[test]
2033 fn test_plan_adaptive_with_union() {
2034 let store = create_test_store();
2035 let planner = Planner::new(store);
2036
2037 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2038 items: vec![ReturnItem {
2039 expression: LogicalExpression::Variable("n".to_string()),
2040 alias: None,
2041 }],
2042 distinct: false,
2043 input: Box::new(LogicalOperator::Union(UnionOp {
2044 inputs: vec![
2045 LogicalOperator::NodeScan(NodeScanOp {
2046 variable: "n".to_string(),
2047 label: Some("Person".to_string()),
2048 input: None,
2049 }),
2050 LogicalOperator::NodeScan(NodeScanOp {
2051 variable: "n".to_string(),
2052 label: Some("Company".to_string()),
2053 input: None,
2054 }),
2055 ],
2056 })),
2057 }));
2058
2059 let physical = planner.plan_adaptive(&logical).unwrap();
2060 assert!(physical.adaptive_context.is_some());
2061 }
2062
2063 #[test]
2066 fn test_plan_expand_variable_length() {
2067 let store = create_test_store();
2068 let planner = Planner::new(store);
2069
2070 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2072 items: vec![
2073 ReturnItem {
2074 expression: LogicalExpression::Variable("a".to_string()),
2075 alias: None,
2076 },
2077 ReturnItem {
2078 expression: LogicalExpression::Variable("b".to_string()),
2079 alias: None,
2080 },
2081 ],
2082 distinct: false,
2083 input: Box::new(LogicalOperator::Expand(ExpandOp {
2084 from_variable: "a".to_string(),
2085 to_variable: "b".to_string(),
2086 edge_variable: None,
2087 direction: ExpandDirection::Outgoing,
2088 edge_type: Some("KNOWS".to_string()),
2089 min_hops: 1,
2090 max_hops: Some(3),
2091 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2092 variable: "a".to_string(),
2093 label: None,
2094 input: None,
2095 })),
2096 path_alias: None,
2097 })),
2098 }));
2099
2100 let physical = planner.plan(&logical).unwrap();
2101 assert!(physical.columns().contains(&"a".to_string()));
2102 assert!(physical.columns().contains(&"b".to_string()));
2103 }
2104
2105 #[test]
2106 fn test_plan_expand_with_path_alias() {
2107 let store = create_test_store();
2108 let planner = Planner::new(store);
2109
2110 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2112 items: vec![
2113 ReturnItem {
2114 expression: LogicalExpression::Variable("a".to_string()),
2115 alias: None,
2116 },
2117 ReturnItem {
2118 expression: LogicalExpression::Variable("b".to_string()),
2119 alias: None,
2120 },
2121 ],
2122 distinct: false,
2123 input: Box::new(LogicalOperator::Expand(ExpandOp {
2124 from_variable: "a".to_string(),
2125 to_variable: "b".to_string(),
2126 edge_variable: None,
2127 direction: ExpandDirection::Outgoing,
2128 edge_type: Some("KNOWS".to_string()),
2129 min_hops: 1,
2130 max_hops: Some(3),
2131 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2132 variable: "a".to_string(),
2133 label: None,
2134 input: None,
2135 })),
2136 path_alias: Some("p".to_string()),
2137 })),
2138 }));
2139
2140 let physical = planner.plan(&logical).unwrap();
2141 assert!(physical.columns().contains(&"a".to_string()));
2143 assert!(physical.columns().contains(&"b".to_string()));
2144 }
2145
2146 #[test]
2147 fn test_plan_expand_incoming() {
2148 let store = create_test_store();
2149 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2150
2151 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2153 items: vec![
2154 ReturnItem {
2155 expression: LogicalExpression::Variable("a".to_string()),
2156 alias: None,
2157 },
2158 ReturnItem {
2159 expression: LogicalExpression::Variable("b".to_string()),
2160 alias: None,
2161 },
2162 ],
2163 distinct: false,
2164 input: Box::new(LogicalOperator::Expand(ExpandOp {
2165 from_variable: "a".to_string(),
2166 to_variable: "b".to_string(),
2167 edge_variable: None,
2168 direction: ExpandDirection::Incoming,
2169 edge_type: Some("KNOWS".to_string()),
2170 min_hops: 1,
2171 max_hops: Some(1),
2172 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2173 variable: "a".to_string(),
2174 label: None,
2175 input: None,
2176 })),
2177 path_alias: None,
2178 })),
2179 }));
2180
2181 let physical = planner.plan(&logical).unwrap();
2182 assert!(physical.columns().contains(&"a".to_string()));
2183 assert!(physical.columns().contains(&"b".to_string()));
2184 }
2185
2186 #[test]
2187 fn test_plan_expand_both_directions() {
2188 let store = create_test_store();
2189 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2190
2191 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2193 items: vec![
2194 ReturnItem {
2195 expression: LogicalExpression::Variable("a".to_string()),
2196 alias: None,
2197 },
2198 ReturnItem {
2199 expression: LogicalExpression::Variable("b".to_string()),
2200 alias: None,
2201 },
2202 ],
2203 distinct: false,
2204 input: Box::new(LogicalOperator::Expand(ExpandOp {
2205 from_variable: "a".to_string(),
2206 to_variable: "b".to_string(),
2207 edge_variable: None,
2208 direction: ExpandDirection::Both,
2209 edge_type: Some("KNOWS".to_string()),
2210 min_hops: 1,
2211 max_hops: Some(1),
2212 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2213 variable: "a".to_string(),
2214 label: None,
2215 input: None,
2216 })),
2217 path_alias: None,
2218 })),
2219 }));
2220
2221 let physical = planner.plan(&logical).unwrap();
2222 assert!(physical.columns().contains(&"a".to_string()));
2223 assert!(physical.columns().contains(&"b".to_string()));
2224 }
2225
2226 #[test]
2229 fn test_planner_with_context() {
2230 use crate::transaction::TransactionManager;
2231
2232 let store = create_test_store();
2233 let tx_manager = Arc::new(TransactionManager::new());
2234 let tx_id = tx_manager.begin();
2235 let epoch = tx_manager.current_epoch();
2236
2237 let planner = Planner::with_context(
2238 Arc::clone(&store),
2239 Arc::clone(&tx_manager),
2240 Some(tx_id),
2241 epoch,
2242 );
2243
2244 assert_eq!(planner.tx_id(), Some(tx_id));
2245 assert!(planner.tx_manager().is_some());
2246 assert_eq!(planner.viewing_epoch(), epoch);
2247 }
2248
2249 #[test]
2250 fn test_planner_with_factorized_execution_disabled() {
2251 let store = create_test_store();
2252 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2253
2254 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2256 items: vec![
2257 ReturnItem {
2258 expression: LogicalExpression::Variable("a".to_string()),
2259 alias: None,
2260 },
2261 ReturnItem {
2262 expression: LogicalExpression::Variable("c".to_string()),
2263 alias: None,
2264 },
2265 ],
2266 distinct: false,
2267 input: Box::new(LogicalOperator::Expand(ExpandOp {
2268 from_variable: "b".to_string(),
2269 to_variable: "c".to_string(),
2270 edge_variable: None,
2271 direction: ExpandDirection::Outgoing,
2272 edge_type: None,
2273 min_hops: 1,
2274 max_hops: Some(1),
2275 input: Box::new(LogicalOperator::Expand(ExpandOp {
2276 from_variable: "a".to_string(),
2277 to_variable: "b".to_string(),
2278 edge_variable: None,
2279 direction: ExpandDirection::Outgoing,
2280 edge_type: None,
2281 min_hops: 1,
2282 max_hops: Some(1),
2283 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2284 variable: "a".to_string(),
2285 label: None,
2286 input: None,
2287 })),
2288 path_alias: None,
2289 })),
2290 path_alias: None,
2291 })),
2292 }));
2293
2294 let physical = planner.plan(&logical).unwrap();
2295 assert!(physical.columns().contains(&"a".to_string()));
2296 assert!(physical.columns().contains(&"c".to_string()));
2297 }
2298
2299 #[test]
2302 fn test_plan_sort_by_property() {
2303 let store = create_test_store();
2304 let planner = Planner::new(store);
2305
2306 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2308 items: vec![ReturnItem {
2309 expression: LogicalExpression::Variable("n".to_string()),
2310 alias: None,
2311 }],
2312 distinct: false,
2313 input: Box::new(LogicalOperator::Sort(SortOp {
2314 keys: vec![SortKey {
2315 expression: LogicalExpression::Property {
2316 variable: "n".to_string(),
2317 property: "name".to_string(),
2318 },
2319 order: SortOrder::Ascending,
2320 }],
2321 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2322 variable: "n".to_string(),
2323 label: None,
2324 input: None,
2325 })),
2326 })),
2327 }));
2328
2329 let physical = planner.plan(&logical).unwrap();
2330 assert!(physical.columns().contains(&"n".to_string()));
2332 }
2333
2334 #[test]
2337 fn test_plan_scan_with_input() {
2338 let store = create_test_store();
2339 let planner = Planner::new(store);
2340
2341 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2343 items: vec![
2344 ReturnItem {
2345 expression: LogicalExpression::Variable("a".to_string()),
2346 alias: None,
2347 },
2348 ReturnItem {
2349 expression: LogicalExpression::Variable("b".to_string()),
2350 alias: None,
2351 },
2352 ],
2353 distinct: false,
2354 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2355 variable: "b".to_string(),
2356 label: Some("Company".to_string()),
2357 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2358 variable: "a".to_string(),
2359 label: Some("Person".to_string()),
2360 input: None,
2361 }))),
2362 })),
2363 }));
2364
2365 let physical = planner.plan(&logical).unwrap();
2366 assert!(physical.columns().contains(&"a".to_string()));
2367 assert!(physical.columns().contains(&"b".to_string()));
2368 }
2369}