1mod aggregate;
8mod expand;
9mod expression;
10mod filter;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16#[cfg(feature = "algos")]
17use crate::query::plan::CallProcedureOp;
18use crate::query::plan::{
19 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
20 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExceptOp,
21 ExpandDirection, ExpandOp, FilterOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp,
22 LogicalExpression, LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp,
23 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TxId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
32 AggregateFunction as PhysicalAggregateFunction, ApplyOperator, BinaryFilterOp,
33 ConstraintValidator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
34 DeleteNodeOperator, DistinctOperator, EmptyOperator, ExceptOperator, ExecutionPathMode,
35 ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
36 FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
37 HashJoinOperator, IntersectOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
38 LeapfrogJoinOperator, LimitOperator, MapCollectOperator, MergeOperator,
39 MergeRelationshipConfig, MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator,
40 NullOrder, Operator, OtherwiseOperator, ProjectExpr, ProjectOperator, PropertySource,
41 RemoveLabelOperator, ScanOperator, SetPropertyOperator, ShortestPathOperator,
42 SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
43 UnaryFilterOp, UnionOperator, UnwindOperator, VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::transaction::TransactionManager;
50
51struct RangeBounds<'a> {
53 min: Option<&'a Value>,
54 max: Option<&'a Value>,
55 min_inclusive: bool,
56 max_inclusive: bool,
57}
58
59pub struct Planner {
61 pub(super) store: Arc<dyn GraphStoreMut>,
63 pub(super) tx_manager: Option<Arc<TransactionManager>>,
65 pub(super) tx_id: Option<TxId>,
67 pub(super) viewing_epoch: EpochId,
69 pub(super) anon_edge_counter: std::cell::Cell<u32>,
71 pub(super) factorized_execution: bool,
73 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
76 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
79 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
81 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
83}
84
85impl Planner {
86 #[must_use]
91 pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
92 let epoch = store.current_epoch();
93 Self {
94 store,
95 tx_manager: None,
96 tx_id: None,
97 viewing_epoch: epoch,
98 anon_edge_counter: std::cell::Cell::new(0),
99 factorized_execution: true,
100 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
101 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
102 validator: None,
103 catalog: None,
104 }
105 }
106
107 #[must_use]
116 pub fn with_context(
117 store: Arc<dyn GraphStoreMut>,
118 tx_manager: Arc<TransactionManager>,
119 tx_id: Option<TxId>,
120 viewing_epoch: EpochId,
121 ) -> Self {
122 Self {
123 store,
124 tx_manager: Some(tx_manager),
125 tx_id,
126 viewing_epoch,
127 anon_edge_counter: std::cell::Cell::new(0),
128 factorized_execution: true,
129 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131 validator: None,
132 catalog: None,
133 }
134 }
135
136 #[must_use]
138 pub fn viewing_epoch(&self) -> EpochId {
139 self.viewing_epoch
140 }
141
142 #[must_use]
144 pub fn tx_id(&self) -> Option<TxId> {
145 self.tx_id
146 }
147
148 #[must_use]
150 pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
151 self.tx_manager.as_ref()
152 }
153
154 #[must_use]
156 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
157 self.factorized_execution = enabled;
158 self
159 }
160
161 #[must_use]
163 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
164 self.validator = Some(validator);
165 self
166 }
167
168 #[must_use]
170 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
171 self.catalog = Some(catalog);
172 self
173 }
174
175 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
179 match op {
180 LogicalOperator::Expand(expand) => {
181 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
183
184 if is_single_hop {
185 let (inner_count, base) = Self::count_expand_chain(&expand.input);
186 (inner_count + 1, base)
187 } else {
188 (0, op)
190 }
191 }
192 _ => (0, op),
193 }
194 }
195
196 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
200 let mut chain = Vec::new();
201 let mut current = op;
202
203 while let LogicalOperator::Expand(expand) = current {
204 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
206 if !is_single_hop {
207 break;
208 }
209 chain.push(expand);
210 current = &expand.input;
211 }
212
213 chain.reverse();
215 chain
216 }
217
218 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
224 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
225 Ok(PhysicalPlan {
226 operator,
227 columns,
228 adaptive_context: None,
229 })
230 }
231
232 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
241 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
242
243 let mut adaptive_context = AdaptiveContext::new();
245 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
246
247 Ok(PhysicalPlan {
248 operator,
249 columns,
250 adaptive_context: Some(adaptive_context),
251 })
252 }
253
254 fn collect_cardinality_estimates(
256 &self,
257 op: &LogicalOperator,
258 ctx: &mut AdaptiveContext,
259 depth: usize,
260 ) {
261 match op {
262 LogicalOperator::NodeScan(scan) => {
263 let estimate = if let Some(label) = &scan.label {
265 self.store.nodes_by_label(label).len() as f64
266 } else {
267 self.store.node_count() as f64
268 };
269 let id = format!("scan_{}", scan.variable);
270 ctx.set_estimate(&id, estimate);
271
272 if let Some(input) = &scan.input {
274 self.collect_cardinality_estimates(input, ctx, depth + 1);
275 }
276 }
277 LogicalOperator::Filter(filter) => {
278 let input_estimate = self.estimate_cardinality(&filter.input);
280 let estimate = input_estimate * 0.3;
281 let id = format!("filter_{depth}");
282 ctx.set_estimate(&id, estimate);
283
284 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
285 }
286 LogicalOperator::Expand(expand) => {
287 let input_estimate = self.estimate_cardinality(&expand.input);
289 let stats = self.store.statistics();
290 let avg_degree = self.estimate_expand_degree(&stats, expand);
291 let estimate = input_estimate * avg_degree;
292 let id = format!("expand_{}", expand.to_variable);
293 ctx.set_estimate(&id, estimate);
294
295 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
296 }
297 LogicalOperator::Join(join) => {
298 let left_est = self.estimate_cardinality(&join.left);
300 let right_est = self.estimate_cardinality(&join.right);
301 let estimate = (left_est * right_est).sqrt(); let id = format!("join_{depth}");
303 ctx.set_estimate(&id, estimate);
304
305 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
306 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
307 }
308 LogicalOperator::Aggregate(agg) => {
309 let input_estimate = self.estimate_cardinality(&agg.input);
311 let estimate = if agg.group_by.is_empty() {
312 1.0 } else {
314 (input_estimate * 0.1).max(1.0) };
316 let id = format!("aggregate_{depth}");
317 ctx.set_estimate(&id, estimate);
318
319 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
320 }
321 LogicalOperator::Distinct(distinct) => {
322 let input_estimate = self.estimate_cardinality(&distinct.input);
323 let estimate = (input_estimate * 0.5).max(1.0);
324 let id = format!("distinct_{depth}");
325 ctx.set_estimate(&id, estimate);
326
327 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
328 }
329 LogicalOperator::Return(ret) => {
330 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
331 }
332 LogicalOperator::Limit(limit) => {
333 let input_estimate = self.estimate_cardinality(&limit.input);
334 let estimate = (input_estimate).min(limit.count as f64);
335 let id = format!("limit_{depth}");
336 ctx.set_estimate(&id, estimate);
337
338 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
339 }
340 LogicalOperator::Skip(skip) => {
341 let input_estimate = self.estimate_cardinality(&skip.input);
342 let estimate = (input_estimate - skip.count as f64).max(0.0);
343 let id = format!("skip_{depth}");
344 ctx.set_estimate(&id, estimate);
345
346 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
347 }
348 LogicalOperator::Sort(sort) => {
349 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
351 }
352 LogicalOperator::Union(union) => {
353 let estimate: f64 = union
354 .inputs
355 .iter()
356 .map(|input| self.estimate_cardinality(input))
357 .sum();
358 let id = format!("union_{depth}");
359 ctx.set_estimate(&id, estimate);
360
361 for input in &union.inputs {
362 self.collect_cardinality_estimates(input, ctx, depth + 1);
363 }
364 }
365 _ => {
366 }
368 }
369 }
370
371 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
373 match op {
374 LogicalOperator::NodeScan(scan) => {
375 if let Some(label) = &scan.label {
376 self.store.nodes_by_label(label).len() as f64
377 } else {
378 self.store.node_count() as f64
379 }
380 }
381 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
382 LogicalOperator::Expand(expand) => {
383 let stats = self.store.statistics();
384 let avg_degree = self.estimate_expand_degree(&stats, expand);
385 self.estimate_cardinality(&expand.input) * avg_degree
386 }
387 LogicalOperator::Join(join) => {
388 let left = self.estimate_cardinality(&join.left);
389 let right = self.estimate_cardinality(&join.right);
390 (left * right).sqrt()
391 }
392 LogicalOperator::Aggregate(agg) => {
393 if agg.group_by.is_empty() {
394 1.0
395 } else {
396 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
397 }
398 }
399 LogicalOperator::Distinct(distinct) => {
400 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
401 }
402 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
403 LogicalOperator::Limit(limit) => self
404 .estimate_cardinality(&limit.input)
405 .min(limit.count as f64),
406 LogicalOperator::Skip(skip) => {
407 (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
408 }
409 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
410 LogicalOperator::Union(union) => union
411 .inputs
412 .iter()
413 .map(|input| self.estimate_cardinality(input))
414 .sum(),
415 LogicalOperator::Except(except) => {
416 let left = self.estimate_cardinality(&except.left);
417 let right = self.estimate_cardinality(&except.right);
418 (left - right).max(0.0)
419 }
420 LogicalOperator::Intersect(intersect) => {
421 let left = self.estimate_cardinality(&intersect.left);
422 let right = self.estimate_cardinality(&intersect.right);
423 left.min(right)
424 }
425 LogicalOperator::Otherwise(otherwise) => self
426 .estimate_cardinality(&otherwise.left)
427 .max(self.estimate_cardinality(&otherwise.right)),
428 _ => 1000.0, }
430 }
431
432 fn estimate_expand_degree(
434 &self,
435 stats: &grafeo_core::statistics::Statistics,
436 expand: &ExpandOp,
437 ) -> f64 {
438 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
439 if expand.edge_types.len() == 1 {
440 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
441 } else if stats.total_nodes > 0 {
442 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
443 } else {
444 10.0 }
446 }
447
448 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
450 match op {
451 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
452 LogicalOperator::Expand(expand) => {
453 if self.factorized_execution {
455 let (chain_len, _base) = Self::count_expand_chain(op);
456 if chain_len >= 2 {
457 return self.plan_expand_chain(op);
459 }
460 }
461 self.plan_expand(expand)
462 }
463 LogicalOperator::Return(ret) => self.plan_return(ret),
464 LogicalOperator::Filter(filter) => self.plan_filter(filter),
465 LogicalOperator::Project(project) => self.plan_project(project),
466 LogicalOperator::Limit(limit) => self.plan_limit(limit),
467 LogicalOperator::Skip(skip) => self.plan_skip(skip),
468 LogicalOperator::Sort(sort) => self.plan_sort(sort),
469 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
470 LogicalOperator::Join(join) => self.plan_join(join),
471 LogicalOperator::Union(union) => self.plan_union(union),
472 LogicalOperator::Except(except) => self.plan_except(except),
473 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
474 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
475 LogicalOperator::Apply(apply) => self.plan_apply(apply),
476 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
477 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
478 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
479 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
480 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
481 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
482 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
483 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
484 LogicalOperator::Merge(merge) => self.plan_merge(merge),
485 LogicalOperator::MergeRelationship(merge_rel) => {
486 self.plan_merge_relationship(merge_rel)
487 }
488 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
489 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
490 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
491 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
492 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
493 #[cfg(feature = "algos")]
494 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
495 #[cfg(not(feature = "algos"))]
496 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
497 "CALL procedures require the 'algos' feature".to_string(),
498 )),
499 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
500 LogicalOperator::VectorScan(_) => Err(Error::Internal(
501 "VectorScan requires vector-index feature".to_string(),
502 )),
503 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
504 "VectorJoin requires vector-index feature".to_string(),
505 )),
506 _ => Err(Error::Internal(format!(
507 "Unsupported operator: {:?}",
508 std::mem::discriminant(op)
509 ))),
510 }
511 }
512
513 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
515 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
516 let key_idx = child_columns
517 .iter()
518 .position(|c| c == &mc.key_var)
519 .ok_or_else(|| {
520 Error::Internal(format!(
521 "MapCollect key '{}' not in columns {:?}",
522 mc.key_var, child_columns
523 ))
524 })?;
525 let value_idx = child_columns
526 .iter()
527 .position(|c| c == &mc.value_var)
528 .ok_or_else(|| {
529 Error::Internal(format!(
530 "MapCollect value '{}' not in columns {:?}",
531 mc.value_var, child_columns
532 ))
533 })?;
534 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
535 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
538 Ok((operator, vec![mc.alias.clone()]))
539 }
540}
541
542pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
544 match op {
545 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
546 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
547 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
548 BinaryOp::Le => Ok(BinaryFilterOp::Le),
549 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
550 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
551 BinaryOp::And => Ok(BinaryFilterOp::And),
552 BinaryOp::Or => Ok(BinaryFilterOp::Or),
553 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
554 BinaryOp::Add => Ok(BinaryFilterOp::Add),
555 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
556 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
557 BinaryOp::Div => Ok(BinaryFilterOp::Div),
558 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
559 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
560 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
561 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
562 BinaryOp::In => Ok(BinaryFilterOp::In),
563 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
564 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
565 BinaryOp::Concat => Ok(BinaryFilterOp::Concat),
566 BinaryOp::Like => Ok(BinaryFilterOp::Like),
567 }
568}
569
570pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
572 match op {
573 UnaryOp::Not => Ok(UnaryFilterOp::Not),
574 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
575 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
576 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
577 }
578}
579
580pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
582 match func {
583 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
584 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
585 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
586 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
587 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
588 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
589 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
590 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
591 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
592 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
593 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
594 }
595}
596
597pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
601 match expr {
602 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
603 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
604 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
605 variable: variable.clone(),
606 property: property.clone(),
607 }),
608 LogicalExpression::Binary { left, op, right } => {
609 let left_expr = convert_filter_expression(left)?;
610 let right_expr = convert_filter_expression(right)?;
611 let filter_op = convert_binary_op(*op)?;
612 Ok(FilterExpression::Binary {
613 left: Box::new(left_expr),
614 op: filter_op,
615 right: Box::new(right_expr),
616 })
617 }
618 LogicalExpression::Unary { op, operand } => {
619 let operand_expr = convert_filter_expression(operand)?;
620 let filter_op = convert_unary_op(*op)?;
621 Ok(FilterExpression::Unary {
622 op: filter_op,
623 operand: Box::new(operand_expr),
624 })
625 }
626 LogicalExpression::FunctionCall { name, args, .. } => {
627 let filter_args: Vec<FilterExpression> = args
628 .iter()
629 .map(convert_filter_expression)
630 .collect::<Result<Vec<_>>>()?;
631 Ok(FilterExpression::FunctionCall {
632 name: name.clone(),
633 args: filter_args,
634 })
635 }
636 LogicalExpression::Case {
637 operand,
638 when_clauses,
639 else_clause,
640 } => {
641 let filter_operand = operand
642 .as_ref()
643 .map(|e| convert_filter_expression(e))
644 .transpose()?
645 .map(Box::new);
646 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
647 .iter()
648 .map(|(cond, result)| {
649 Ok((
650 convert_filter_expression(cond)?,
651 convert_filter_expression(result)?,
652 ))
653 })
654 .collect::<Result<Vec<_>>>()?;
655 let filter_else = else_clause
656 .as_ref()
657 .map(|e| convert_filter_expression(e))
658 .transpose()?
659 .map(Box::new);
660 Ok(FilterExpression::Case {
661 operand: filter_operand,
662 when_clauses: filter_when_clauses,
663 else_clause: filter_else,
664 })
665 }
666 LogicalExpression::List(items) => {
667 let filter_items: Vec<FilterExpression> = items
668 .iter()
669 .map(convert_filter_expression)
670 .collect::<Result<Vec<_>>>()?;
671 Ok(FilterExpression::List(filter_items))
672 }
673 LogicalExpression::Map(pairs) => {
674 let filter_pairs: Vec<(String, FilterExpression)> = pairs
675 .iter()
676 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
677 .collect::<Result<Vec<_>>>()?;
678 Ok(FilterExpression::Map(filter_pairs))
679 }
680 LogicalExpression::IndexAccess { base, index } => {
681 let base_expr = convert_filter_expression(base)?;
682 let index_expr = convert_filter_expression(index)?;
683 Ok(FilterExpression::IndexAccess {
684 base: Box::new(base_expr),
685 index: Box::new(index_expr),
686 })
687 }
688 LogicalExpression::SliceAccess { base, start, end } => {
689 let base_expr = convert_filter_expression(base)?;
690 let start_expr = start
691 .as_ref()
692 .map(|s| convert_filter_expression(s))
693 .transpose()?
694 .map(Box::new);
695 let end_expr = end
696 .as_ref()
697 .map(|e| convert_filter_expression(e))
698 .transpose()?
699 .map(Box::new);
700 Ok(FilterExpression::SliceAccess {
701 base: Box::new(base_expr),
702 start: start_expr,
703 end: end_expr,
704 })
705 }
706 LogicalExpression::Parameter(_) => Err(Error::Internal(
707 "Parameters not yet supported in filters".to_string(),
708 )),
709 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
710 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
711 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
712 LogicalExpression::ListComprehension {
713 variable,
714 list_expr,
715 filter_expr,
716 map_expr,
717 } => {
718 let list = convert_filter_expression(list_expr)?;
719 let filter = filter_expr
720 .as_ref()
721 .map(|f| convert_filter_expression(f))
722 .transpose()?
723 .map(Box::new);
724 let map = convert_filter_expression(map_expr)?;
725 Ok(FilterExpression::ListComprehension {
726 variable: variable.clone(),
727 list_expr: Box::new(list),
728 filter_expr: filter,
729 map_expr: Box::new(map),
730 })
731 }
732 LogicalExpression::ListPredicate {
733 kind,
734 variable,
735 list_expr,
736 predicate,
737 } => {
738 use crate::query::plan::ListPredicateKind as LPK;
739 let filter_kind = match kind {
740 LPK::All => grafeo_core::execution::operators::ListPredicateKind::All,
741 LPK::Any => grafeo_core::execution::operators::ListPredicateKind::Any,
742 LPK::None => grafeo_core::execution::operators::ListPredicateKind::None,
743 LPK::Single => grafeo_core::execution::operators::ListPredicateKind::Single,
744 };
745 let list = convert_filter_expression(list_expr)?;
746 let pred = convert_filter_expression(predicate)?;
747 Ok(FilterExpression::ListPredicate {
748 kind: filter_kind,
749 variable: variable.clone(),
750 list_expr: Box::new(list),
751 predicate: Box::new(pred),
752 })
753 }
754 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
755 Err(Error::Internal(
760 "Subquery expressions in this position require the semi-join or Apply rewrite; \
761 move the EXISTS/COUNT subquery to a top-level WHERE predicate"
762 .to_string(),
763 ))
764 }
765 LogicalExpression::MapProjection { base, entries } => {
766 let physical_entries: Vec<(String, FilterExpression)> = entries
767 .iter()
768 .map(|entry| match entry {
769 crate::query::plan::MapProjectionEntry::PropertySelector(name) => Ok((
770 name.clone(),
771 FilterExpression::Property {
772 variable: base.clone(),
773 property: name.clone(),
774 },
775 )),
776 crate::query::plan::MapProjectionEntry::LiteralEntry(key, expr) => {
777 Ok((key.clone(), convert_filter_expression(expr)?))
778 }
779 crate::query::plan::MapProjectionEntry::AllProperties => Ok((
780 "*".to_string(),
781 FilterExpression::FunctionCall {
782 name: "properties".to_string(),
783 args: vec![FilterExpression::Variable(base.clone())],
784 },
785 )),
786 })
787 .collect::<Result<Vec<_>>>()?;
788 Ok(FilterExpression::Map(physical_entries))
789 }
790 LogicalExpression::Reduce {
791 accumulator,
792 initial,
793 variable,
794 list,
795 expression,
796 } => Ok(FilterExpression::Reduce {
797 accumulator: accumulator.clone(),
798 initial: Box::new(convert_filter_expression(initial)?),
799 variable: variable.clone(),
800 list: Box::new(convert_filter_expression(list)?),
801 expression: Box::new(convert_filter_expression(expression)?),
802 }),
803 LogicalExpression::PatternComprehension { projection, .. } => {
804 let proj = convert_filter_expression(projection)?;
805 Ok(FilterExpression::FunctionCall {
806 name: "collect".to_string(),
807 args: vec![proj],
808 })
809 }
810 }
811}
812
813fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
815 use grafeo_common::types::Value;
816 match value {
817 Value::Null => LogicalType::String, Value::Bool(_) => LogicalType::Bool,
819 Value::Int64(_) => LogicalType::Int64,
820 Value::Float64(_) => LogicalType::Float64,
821 Value::String(_) => LogicalType::String,
822 Value::Bytes(_) => LogicalType::String, Value::Timestamp(_) => LogicalType::Timestamp,
824 Value::Date(_) => LogicalType::Date,
825 Value::Time(_) => LogicalType::Time,
826 Value::Duration(_) => LogicalType::Duration,
827 Value::ZonedDatetime(_) => LogicalType::ZonedDatetime,
828 Value::List(_) => LogicalType::String, Value::Map(_) => LogicalType::String, Value::Vector(v) => LogicalType::Vector(v.len()),
831 Value::Path { .. } => LogicalType::Any,
832 }
833}
834
835fn expression_to_string(expr: &LogicalExpression) -> String {
837 match expr {
838 LogicalExpression::Variable(name) => name.clone(),
839 LogicalExpression::Property { variable, property } => {
840 format!("{variable}.{property}")
841 }
842 LogicalExpression::Literal(value) => format!("{value:?}"),
843 LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
844 _ => "expr".to_string(),
845 }
846}
847
848pub struct PhysicalPlan {
850 pub operator: Box<dyn Operator>,
852 pub columns: Vec<String>,
854 pub adaptive_context: Option<AdaptiveContext>,
860}
861
862impl PhysicalPlan {
863 #[must_use]
865 pub fn columns(&self) -> &[String] {
866 &self.columns
867 }
868
869 pub fn into_operator(self) -> Box<dyn Operator> {
871 self.operator
872 }
873
874 #[must_use]
876 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
877 self.adaptive_context.as_ref()
878 }
879
880 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
882 self.adaptive_context.take()
883 }
884}
885
886#[cfg(feature = "algos")]
888struct StaticResultOperator {
889 rows: Vec<Vec<Value>>,
890 column_indices: Vec<usize>,
891 row_index: usize,
892}
893
894#[cfg(feature = "algos")]
895impl Operator for StaticResultOperator {
896 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
897 use grafeo_core::execution::DataChunk;
898
899 if self.row_index >= self.rows.len() {
900 return Ok(None);
901 }
902
903 let remaining = self.rows.len() - self.row_index;
904 let chunk_rows = remaining.min(1024);
905 let col_count = self.column_indices.len();
906
907 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
908 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
909
910 for row_offset in 0..chunk_rows {
911 let row = &self.rows[self.row_index + row_offset];
912 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
913 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
914 if let Some(col) = chunk.column_mut(col_idx) {
915 col.push_value(value);
916 }
917 }
918 }
919 chunk.set_count(chunk_rows);
920
921 self.row_index += chunk_rows;
922 Ok(Some(chunk))
923 }
924
925 fn reset(&mut self) {
926 self.row_index = 0;
927 }
928
929 fn name(&self) -> &'static str {
930 "StaticResult"
931 }
932}
933
934pub(crate) fn eval_constant_expression(
939 expr: &crate::query::plan::LogicalExpression,
940) -> grafeo_common::utils::error::Result<grafeo_common::types::Value> {
941 use crate::query::plan::LogicalExpression;
942 use grafeo_common::types::Value;
943 use grafeo_common::utils::error::Error;
944
945 match expr {
946 LogicalExpression::Literal(val) => Ok(val.clone()),
947 LogicalExpression::Unary {
948 op: crate::query::plan::UnaryOp::Neg,
949 operand,
950 } => {
951 let val = eval_constant_expression(operand)?;
952 match val {
953 Value::Int64(n) => Ok(Value::Int64(-n)),
954 Value::Float64(f) => Ok(Value::Float64(-f)),
955 _ => Err(Error::Internal("Cannot negate non-numeric value".into())),
956 }
957 }
958 _ => Err(Error::Internal(
959 "Procedure argument must be a constant value".into(),
960 )),
961 }
962}
963
964#[cfg(test)]
965mod tests {
966 use super::*;
967 use crate::query::plan::{
968 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
969 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
970 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
971 SkipOp as LogicalSkipOp, SortKey, SortOp,
972 };
973 use grafeo_common::types::Value;
974 use grafeo_core::graph::GraphStoreMut;
975 use grafeo_core::graph::lpg::LpgStore;
976
977 fn create_test_store() -> Arc<dyn GraphStoreMut> {
978 let store = Arc::new(LpgStore::new());
979 store.create_node(&["Person"]);
980 store.create_node(&["Person"]);
981 store.create_node(&["Company"]);
982 store
983 }
984
985 #[test]
988 fn test_plan_simple_scan() {
989 let store = create_test_store();
990 let planner = Planner::new(store);
991
992 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
994 items: vec![ReturnItem {
995 expression: LogicalExpression::Variable("n".to_string()),
996 alias: None,
997 }],
998 distinct: false,
999 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1000 variable: "n".to_string(),
1001 label: Some("Person".to_string()),
1002 input: None,
1003 })),
1004 }));
1005
1006 let physical = planner.plan(&logical).unwrap();
1007 assert_eq!(physical.columns(), &["n"]);
1008 }
1009
1010 #[test]
1011 fn test_plan_scan_without_label() {
1012 let store = create_test_store();
1013 let planner = Planner::new(store);
1014
1015 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1017 items: vec![ReturnItem {
1018 expression: LogicalExpression::Variable("n".to_string()),
1019 alias: None,
1020 }],
1021 distinct: false,
1022 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1023 variable: "n".to_string(),
1024 label: None,
1025 input: None,
1026 })),
1027 }));
1028
1029 let physical = planner.plan(&logical).unwrap();
1030 assert_eq!(physical.columns(), &["n"]);
1031 }
1032
1033 #[test]
1034 fn test_plan_return_with_alias() {
1035 let store = create_test_store();
1036 let planner = Planner::new(store);
1037
1038 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1040 items: vec![ReturnItem {
1041 expression: LogicalExpression::Variable("n".to_string()),
1042 alias: Some("person".to_string()),
1043 }],
1044 distinct: false,
1045 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1046 variable: "n".to_string(),
1047 label: Some("Person".to_string()),
1048 input: None,
1049 })),
1050 }));
1051
1052 let physical = planner.plan(&logical).unwrap();
1053 assert_eq!(physical.columns(), &["person"]);
1054 }
1055
1056 #[test]
1057 fn test_plan_return_property() {
1058 let store = create_test_store();
1059 let planner = Planner::new(store);
1060
1061 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1063 items: vec![ReturnItem {
1064 expression: LogicalExpression::Property {
1065 variable: "n".to_string(),
1066 property: "name".to_string(),
1067 },
1068 alias: None,
1069 }],
1070 distinct: false,
1071 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1072 variable: "n".to_string(),
1073 label: Some("Person".to_string()),
1074 input: None,
1075 })),
1076 }));
1077
1078 let physical = planner.plan(&logical).unwrap();
1079 assert_eq!(physical.columns(), &["n.name"]);
1080 }
1081
1082 #[test]
1083 fn test_plan_return_literal() {
1084 let store = create_test_store();
1085 let planner = Planner::new(store);
1086
1087 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1089 items: vec![ReturnItem {
1090 expression: LogicalExpression::Literal(Value::Int64(42)),
1091 alias: Some("answer".to_string()),
1092 }],
1093 distinct: false,
1094 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1095 variable: "n".to_string(),
1096 label: None,
1097 input: None,
1098 })),
1099 }));
1100
1101 let physical = planner.plan(&logical).unwrap();
1102 assert_eq!(physical.columns(), &["answer"]);
1103 }
1104
1105 #[test]
1108 fn test_plan_filter_equality() {
1109 let store = create_test_store();
1110 let planner = Planner::new(store);
1111
1112 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1114 items: vec![ReturnItem {
1115 expression: LogicalExpression::Variable("n".to_string()),
1116 alias: None,
1117 }],
1118 distinct: false,
1119 input: Box::new(LogicalOperator::Filter(FilterOp {
1120 predicate: LogicalExpression::Binary {
1121 left: Box::new(LogicalExpression::Property {
1122 variable: "n".to_string(),
1123 property: "age".to_string(),
1124 }),
1125 op: BinaryOp::Eq,
1126 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1127 },
1128 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1129 variable: "n".to_string(),
1130 label: Some("Person".to_string()),
1131 input: None,
1132 })),
1133 })),
1134 }));
1135
1136 let physical = planner.plan(&logical).unwrap();
1137 assert_eq!(physical.columns(), &["n"]);
1138 }
1139
1140 #[test]
1141 fn test_plan_filter_compound_and() {
1142 let store = create_test_store();
1143 let planner = Planner::new(store);
1144
1145 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1147 items: vec![ReturnItem {
1148 expression: LogicalExpression::Variable("n".to_string()),
1149 alias: None,
1150 }],
1151 distinct: false,
1152 input: Box::new(LogicalOperator::Filter(FilterOp {
1153 predicate: LogicalExpression::Binary {
1154 left: Box::new(LogicalExpression::Binary {
1155 left: Box::new(LogicalExpression::Property {
1156 variable: "n".to_string(),
1157 property: "age".to_string(),
1158 }),
1159 op: BinaryOp::Gt,
1160 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1161 }),
1162 op: BinaryOp::And,
1163 right: Box::new(LogicalExpression::Binary {
1164 left: Box::new(LogicalExpression::Property {
1165 variable: "n".to_string(),
1166 property: "age".to_string(),
1167 }),
1168 op: BinaryOp::Lt,
1169 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1170 }),
1171 },
1172 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1173 variable: "n".to_string(),
1174 label: None,
1175 input: None,
1176 })),
1177 })),
1178 }));
1179
1180 let physical = planner.plan(&logical).unwrap();
1181 assert_eq!(physical.columns(), &["n"]);
1182 }
1183
1184 #[test]
1185 fn test_plan_filter_unary_not() {
1186 let store = create_test_store();
1187 let planner = Planner::new(store);
1188
1189 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1191 items: vec![ReturnItem {
1192 expression: LogicalExpression::Variable("n".to_string()),
1193 alias: None,
1194 }],
1195 distinct: false,
1196 input: Box::new(LogicalOperator::Filter(FilterOp {
1197 predicate: LogicalExpression::Unary {
1198 op: UnaryOp::Not,
1199 operand: Box::new(LogicalExpression::Property {
1200 variable: "n".to_string(),
1201 property: "active".to_string(),
1202 }),
1203 },
1204 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1205 variable: "n".to_string(),
1206 label: None,
1207 input: None,
1208 })),
1209 })),
1210 }));
1211
1212 let physical = planner.plan(&logical).unwrap();
1213 assert_eq!(physical.columns(), &["n"]);
1214 }
1215
1216 #[test]
1217 fn test_plan_filter_is_null() {
1218 let store = create_test_store();
1219 let planner = Planner::new(store);
1220
1221 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1223 items: vec![ReturnItem {
1224 expression: LogicalExpression::Variable("n".to_string()),
1225 alias: None,
1226 }],
1227 distinct: false,
1228 input: Box::new(LogicalOperator::Filter(FilterOp {
1229 predicate: LogicalExpression::Unary {
1230 op: UnaryOp::IsNull,
1231 operand: Box::new(LogicalExpression::Property {
1232 variable: "n".to_string(),
1233 property: "email".to_string(),
1234 }),
1235 },
1236 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1237 variable: "n".to_string(),
1238 label: None,
1239 input: None,
1240 })),
1241 })),
1242 }));
1243
1244 let physical = planner.plan(&logical).unwrap();
1245 assert_eq!(physical.columns(), &["n"]);
1246 }
1247
1248 #[test]
1249 fn test_plan_filter_function_call() {
1250 let store = create_test_store();
1251 let planner = Planner::new(store);
1252
1253 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1255 items: vec![ReturnItem {
1256 expression: LogicalExpression::Variable("n".to_string()),
1257 alias: None,
1258 }],
1259 distinct: false,
1260 input: Box::new(LogicalOperator::Filter(FilterOp {
1261 predicate: LogicalExpression::Binary {
1262 left: Box::new(LogicalExpression::FunctionCall {
1263 name: "size".to_string(),
1264 args: vec![LogicalExpression::Property {
1265 variable: "n".to_string(),
1266 property: "friends".to_string(),
1267 }],
1268 distinct: false,
1269 }),
1270 op: BinaryOp::Gt,
1271 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1272 },
1273 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1274 variable: "n".to_string(),
1275 label: None,
1276 input: None,
1277 })),
1278 })),
1279 }));
1280
1281 let physical = planner.plan(&logical).unwrap();
1282 assert_eq!(physical.columns(), &["n"]);
1283 }
1284
1285 #[test]
1288 fn test_plan_expand_outgoing() {
1289 let store = create_test_store();
1290 let planner = Planner::new(store);
1291
1292 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1294 items: vec![
1295 ReturnItem {
1296 expression: LogicalExpression::Variable("a".to_string()),
1297 alias: None,
1298 },
1299 ReturnItem {
1300 expression: LogicalExpression::Variable("b".to_string()),
1301 alias: None,
1302 },
1303 ],
1304 distinct: false,
1305 input: Box::new(LogicalOperator::Expand(ExpandOp {
1306 from_variable: "a".to_string(),
1307 to_variable: "b".to_string(),
1308 edge_variable: None,
1309 direction: ExpandDirection::Outgoing,
1310 edge_types: vec!["KNOWS".to_string()],
1311 min_hops: 1,
1312 max_hops: Some(1),
1313 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1314 variable: "a".to_string(),
1315 label: Some("Person".to_string()),
1316 input: None,
1317 })),
1318 path_alias: None,
1319 path_mode: PathMode::Walk,
1320 })),
1321 }));
1322
1323 let physical = planner.plan(&logical).unwrap();
1324 assert!(physical.columns().contains(&"a".to_string()));
1326 assert!(physical.columns().contains(&"b".to_string()));
1327 }
1328
1329 #[test]
1330 fn test_plan_expand_with_edge_variable() {
1331 let store = create_test_store();
1332 let planner = Planner::new(store);
1333
1334 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1336 items: vec![
1337 ReturnItem {
1338 expression: LogicalExpression::Variable("a".to_string()),
1339 alias: None,
1340 },
1341 ReturnItem {
1342 expression: LogicalExpression::Variable("r".to_string()),
1343 alias: None,
1344 },
1345 ReturnItem {
1346 expression: LogicalExpression::Variable("b".to_string()),
1347 alias: None,
1348 },
1349 ],
1350 distinct: false,
1351 input: Box::new(LogicalOperator::Expand(ExpandOp {
1352 from_variable: "a".to_string(),
1353 to_variable: "b".to_string(),
1354 edge_variable: Some("r".to_string()),
1355 direction: ExpandDirection::Outgoing,
1356 edge_types: vec!["KNOWS".to_string()],
1357 min_hops: 1,
1358 max_hops: Some(1),
1359 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1360 variable: "a".to_string(),
1361 label: None,
1362 input: None,
1363 })),
1364 path_alias: None,
1365 path_mode: PathMode::Walk,
1366 })),
1367 }));
1368
1369 let physical = planner.plan(&logical).unwrap();
1370 assert!(physical.columns().contains(&"a".to_string()));
1371 assert!(physical.columns().contains(&"r".to_string()));
1372 assert!(physical.columns().contains(&"b".to_string()));
1373 }
1374
1375 #[test]
1378 fn test_plan_limit() {
1379 let store = create_test_store();
1380 let planner = Planner::new(store);
1381
1382 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1384 items: vec![ReturnItem {
1385 expression: LogicalExpression::Variable("n".to_string()),
1386 alias: None,
1387 }],
1388 distinct: false,
1389 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1390 count: 10,
1391 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1392 variable: "n".to_string(),
1393 label: None,
1394 input: None,
1395 })),
1396 })),
1397 }));
1398
1399 let physical = planner.plan(&logical).unwrap();
1400 assert_eq!(physical.columns(), &["n"]);
1401 }
1402
1403 #[test]
1404 fn test_plan_skip() {
1405 let store = create_test_store();
1406 let planner = Planner::new(store);
1407
1408 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1410 items: vec![ReturnItem {
1411 expression: LogicalExpression::Variable("n".to_string()),
1412 alias: None,
1413 }],
1414 distinct: false,
1415 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1416 count: 5,
1417 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1418 variable: "n".to_string(),
1419 label: None,
1420 input: None,
1421 })),
1422 })),
1423 }));
1424
1425 let physical = planner.plan(&logical).unwrap();
1426 assert_eq!(physical.columns(), &["n"]);
1427 }
1428
1429 #[test]
1430 fn test_plan_sort() {
1431 let store = create_test_store();
1432 let planner = Planner::new(store);
1433
1434 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1436 items: vec![ReturnItem {
1437 expression: LogicalExpression::Variable("n".to_string()),
1438 alias: None,
1439 }],
1440 distinct: false,
1441 input: Box::new(LogicalOperator::Sort(SortOp {
1442 keys: vec![SortKey {
1443 expression: LogicalExpression::Variable("n".to_string()),
1444 order: SortOrder::Ascending,
1445 }],
1446 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1447 variable: "n".to_string(),
1448 label: None,
1449 input: None,
1450 })),
1451 })),
1452 }));
1453
1454 let physical = planner.plan(&logical).unwrap();
1455 assert_eq!(physical.columns(), &["n"]);
1456 }
1457
1458 #[test]
1459 fn test_plan_sort_descending() {
1460 let store = create_test_store();
1461 let planner = Planner::new(store);
1462
1463 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1465 items: vec![ReturnItem {
1466 expression: LogicalExpression::Variable("n".to_string()),
1467 alias: None,
1468 }],
1469 distinct: false,
1470 input: Box::new(LogicalOperator::Sort(SortOp {
1471 keys: vec![SortKey {
1472 expression: LogicalExpression::Variable("n".to_string()),
1473 order: SortOrder::Descending,
1474 }],
1475 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1476 variable: "n".to_string(),
1477 label: None,
1478 input: None,
1479 })),
1480 })),
1481 }));
1482
1483 let physical = planner.plan(&logical).unwrap();
1484 assert_eq!(physical.columns(), &["n"]);
1485 }
1486
1487 #[test]
1488 fn test_plan_distinct() {
1489 let store = create_test_store();
1490 let planner = Planner::new(store);
1491
1492 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1494 items: vec![ReturnItem {
1495 expression: LogicalExpression::Variable("n".to_string()),
1496 alias: None,
1497 }],
1498 distinct: false,
1499 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1500 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1501 variable: "n".to_string(),
1502 label: None,
1503 input: None,
1504 })),
1505 columns: None,
1506 })),
1507 }));
1508
1509 let physical = planner.plan(&logical).unwrap();
1510 assert_eq!(physical.columns(), &["n"]);
1511 }
1512
1513 #[test]
1514 fn test_plan_distinct_with_columns() {
1515 let store = create_test_store();
1516 let planner = Planner::new(store);
1517
1518 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1520 items: vec![ReturnItem {
1521 expression: LogicalExpression::Variable("n".to_string()),
1522 alias: None,
1523 }],
1524 distinct: false,
1525 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1526 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1527 variable: "n".to_string(),
1528 label: None,
1529 input: None,
1530 })),
1531 columns: Some(vec!["n".to_string()]),
1532 })),
1533 }));
1534
1535 let physical = planner.plan(&logical).unwrap();
1536 assert_eq!(physical.columns(), &["n"]);
1537 }
1538
1539 #[test]
1540 fn test_plan_distinct_with_nonexistent_columns() {
1541 let store = create_test_store();
1542 let planner = Planner::new(store);
1543
1544 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1547 items: vec![ReturnItem {
1548 expression: LogicalExpression::Variable("n".to_string()),
1549 alias: None,
1550 }],
1551 distinct: false,
1552 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1553 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1554 variable: "n".to_string(),
1555 label: None,
1556 input: None,
1557 })),
1558 columns: Some(vec!["nonexistent".to_string()]),
1559 })),
1560 }));
1561
1562 let physical = planner.plan(&logical).unwrap();
1563 assert_eq!(physical.columns(), &["n"]);
1564 }
1565
1566 #[test]
1569 fn test_plan_aggregate_count() {
1570 let store = create_test_store();
1571 let planner = Planner::new(store);
1572
1573 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1575 items: vec![ReturnItem {
1576 expression: LogicalExpression::Variable("cnt".to_string()),
1577 alias: None,
1578 }],
1579 distinct: false,
1580 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1581 group_by: vec![],
1582 aggregates: vec![LogicalAggregateExpr {
1583 function: LogicalAggregateFunction::Count,
1584 expression: Some(LogicalExpression::Variable("n".to_string())),
1585 distinct: false,
1586 alias: Some("cnt".to_string()),
1587 percentile: None,
1588 }],
1589 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1590 variable: "n".to_string(),
1591 label: None,
1592 input: None,
1593 })),
1594 having: None,
1595 })),
1596 }));
1597
1598 let physical = planner.plan(&logical).unwrap();
1599 assert!(physical.columns().contains(&"cnt".to_string()));
1600 }
1601
1602 #[test]
1603 fn test_plan_aggregate_with_group_by() {
1604 let store = create_test_store();
1605 let planner = Planner::new(store);
1606
1607 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1609 group_by: vec![LogicalExpression::Property {
1610 variable: "n".to_string(),
1611 property: "city".to_string(),
1612 }],
1613 aggregates: vec![LogicalAggregateExpr {
1614 function: LogicalAggregateFunction::Count,
1615 expression: Some(LogicalExpression::Variable("n".to_string())),
1616 distinct: false,
1617 alias: Some("cnt".to_string()),
1618 percentile: None,
1619 }],
1620 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1621 variable: "n".to_string(),
1622 label: Some("Person".to_string()),
1623 input: None,
1624 })),
1625 having: None,
1626 }));
1627
1628 let physical = planner.plan(&logical).unwrap();
1629 assert_eq!(physical.columns().len(), 2);
1630 }
1631
1632 #[test]
1633 fn test_plan_aggregate_sum() {
1634 let store = create_test_store();
1635 let planner = Planner::new(store);
1636
1637 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1639 group_by: vec![],
1640 aggregates: vec![LogicalAggregateExpr {
1641 function: LogicalAggregateFunction::Sum,
1642 expression: Some(LogicalExpression::Property {
1643 variable: "n".to_string(),
1644 property: "value".to_string(),
1645 }),
1646 distinct: false,
1647 alias: Some("total".to_string()),
1648 percentile: None,
1649 }],
1650 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1651 variable: "n".to_string(),
1652 label: None,
1653 input: None,
1654 })),
1655 having: None,
1656 }));
1657
1658 let physical = planner.plan(&logical).unwrap();
1659 assert!(physical.columns().contains(&"total".to_string()));
1660 }
1661
1662 #[test]
1663 fn test_plan_aggregate_avg() {
1664 let store = create_test_store();
1665 let planner = Planner::new(store);
1666
1667 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1669 group_by: vec![],
1670 aggregates: vec![LogicalAggregateExpr {
1671 function: LogicalAggregateFunction::Avg,
1672 expression: Some(LogicalExpression::Property {
1673 variable: "n".to_string(),
1674 property: "score".to_string(),
1675 }),
1676 distinct: false,
1677 alias: Some("average".to_string()),
1678 percentile: None,
1679 }],
1680 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1681 variable: "n".to_string(),
1682 label: None,
1683 input: None,
1684 })),
1685 having: None,
1686 }));
1687
1688 let physical = planner.plan(&logical).unwrap();
1689 assert!(physical.columns().contains(&"average".to_string()));
1690 }
1691
1692 #[test]
1693 fn test_plan_aggregate_min_max() {
1694 let store = create_test_store();
1695 let planner = Planner::new(store);
1696
1697 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1699 group_by: vec![],
1700 aggregates: vec![
1701 LogicalAggregateExpr {
1702 function: LogicalAggregateFunction::Min,
1703 expression: Some(LogicalExpression::Property {
1704 variable: "n".to_string(),
1705 property: "age".to_string(),
1706 }),
1707 distinct: false,
1708 alias: Some("youngest".to_string()),
1709 percentile: None,
1710 },
1711 LogicalAggregateExpr {
1712 function: LogicalAggregateFunction::Max,
1713 expression: Some(LogicalExpression::Property {
1714 variable: "n".to_string(),
1715 property: "age".to_string(),
1716 }),
1717 distinct: false,
1718 alias: Some("oldest".to_string()),
1719 percentile: None,
1720 },
1721 ],
1722 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1723 variable: "n".to_string(),
1724 label: None,
1725 input: None,
1726 })),
1727 having: None,
1728 }));
1729
1730 let physical = planner.plan(&logical).unwrap();
1731 assert!(physical.columns().contains(&"youngest".to_string()));
1732 assert!(physical.columns().contains(&"oldest".to_string()));
1733 }
1734
1735 #[test]
1738 fn test_plan_inner_join() {
1739 let store = create_test_store();
1740 let planner = Planner::new(store);
1741
1742 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1744 items: vec![
1745 ReturnItem {
1746 expression: LogicalExpression::Variable("a".to_string()),
1747 alias: None,
1748 },
1749 ReturnItem {
1750 expression: LogicalExpression::Variable("b".to_string()),
1751 alias: None,
1752 },
1753 ],
1754 distinct: false,
1755 input: Box::new(LogicalOperator::Join(JoinOp {
1756 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1757 variable: "a".to_string(),
1758 label: Some("Person".to_string()),
1759 input: None,
1760 })),
1761 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1762 variable: "b".to_string(),
1763 label: Some("Company".to_string()),
1764 input: None,
1765 })),
1766 join_type: JoinType::Inner,
1767 conditions: vec![JoinCondition {
1768 left: LogicalExpression::Variable("a".to_string()),
1769 right: LogicalExpression::Variable("b".to_string()),
1770 }],
1771 })),
1772 }));
1773
1774 let physical = planner.plan(&logical).unwrap();
1775 assert!(physical.columns().contains(&"a".to_string()));
1776 assert!(physical.columns().contains(&"b".to_string()));
1777 }
1778
1779 #[test]
1780 fn test_plan_cross_join() {
1781 let store = create_test_store();
1782 let planner = Planner::new(store);
1783
1784 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1786 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1787 variable: "a".to_string(),
1788 label: None,
1789 input: None,
1790 })),
1791 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1792 variable: "b".to_string(),
1793 label: None,
1794 input: None,
1795 })),
1796 join_type: JoinType::Cross,
1797 conditions: vec![],
1798 }));
1799
1800 let physical = planner.plan(&logical).unwrap();
1801 assert_eq!(physical.columns().len(), 2);
1802 }
1803
1804 #[test]
1805 fn test_plan_left_join() {
1806 let store = create_test_store();
1807 let planner = Planner::new(store);
1808
1809 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1810 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1811 variable: "a".to_string(),
1812 label: None,
1813 input: None,
1814 })),
1815 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816 variable: "b".to_string(),
1817 label: None,
1818 input: None,
1819 })),
1820 join_type: JoinType::Left,
1821 conditions: vec![],
1822 }));
1823
1824 let physical = planner.plan(&logical).unwrap();
1825 assert_eq!(physical.columns().len(), 2);
1826 }
1827
1828 #[test]
1831 fn test_plan_create_node() {
1832 let store = create_test_store();
1833 let planner = Planner::new(store);
1834
1835 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1837 variable: "n".to_string(),
1838 labels: vec!["Person".to_string()],
1839 properties: vec![(
1840 "name".to_string(),
1841 LogicalExpression::Literal(Value::String("Alice".into())),
1842 )],
1843 input: None,
1844 }));
1845
1846 let physical = planner.plan(&logical).unwrap();
1847 assert!(physical.columns().contains(&"n".to_string()));
1848 }
1849
1850 #[test]
1851 fn test_plan_create_edge() {
1852 let store = create_test_store();
1853 let planner = Planner::new(store);
1854
1855 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1857 variable: Some("r".to_string()),
1858 from_variable: "a".to_string(),
1859 to_variable: "b".to_string(),
1860 edge_type: "KNOWS".to_string(),
1861 properties: vec![],
1862 input: Box::new(LogicalOperator::Join(JoinOp {
1863 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1864 variable: "a".to_string(),
1865 label: None,
1866 input: None,
1867 })),
1868 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1869 variable: "b".to_string(),
1870 label: None,
1871 input: None,
1872 })),
1873 join_type: JoinType::Cross,
1874 conditions: vec![],
1875 })),
1876 }));
1877
1878 let physical = planner.plan(&logical).unwrap();
1879 assert!(physical.columns().contains(&"r".to_string()));
1880 }
1881
1882 #[test]
1883 fn test_plan_delete_node() {
1884 let store = create_test_store();
1885 let planner = Planner::new(store);
1886
1887 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1889 variable: "n".to_string(),
1890 detach: false,
1891 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1892 variable: "n".to_string(),
1893 label: None,
1894 input: None,
1895 })),
1896 }));
1897
1898 let physical = planner.plan(&logical).unwrap();
1899 assert!(physical.columns().contains(&"deleted_count".to_string()));
1900 }
1901
1902 #[test]
1905 fn test_plan_empty_errors() {
1906 let store = create_test_store();
1907 let planner = Planner::new(store);
1908
1909 let logical = LogicalPlan::new(LogicalOperator::Empty);
1910 let result = planner.plan(&logical);
1911 assert!(result.is_err());
1912 }
1913
1914 #[test]
1915 fn test_plan_missing_variable_in_return() {
1916 let store = create_test_store();
1917 let planner = Planner::new(store);
1918
1919 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1921 items: vec![ReturnItem {
1922 expression: LogicalExpression::Variable("missing".to_string()),
1923 alias: None,
1924 }],
1925 distinct: false,
1926 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1927 variable: "n".to_string(),
1928 label: None,
1929 input: None,
1930 })),
1931 }));
1932
1933 let result = planner.plan(&logical);
1934 assert!(result.is_err());
1935 }
1936
1937 #[test]
1940 fn test_convert_binary_ops() {
1941 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1942 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1943 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1944 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1945 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1946 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1947 assert!(convert_binary_op(BinaryOp::And).is_ok());
1948 assert!(convert_binary_op(BinaryOp::Or).is_ok());
1949 assert!(convert_binary_op(BinaryOp::Add).is_ok());
1950 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1951 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1952 assert!(convert_binary_op(BinaryOp::Div).is_ok());
1953 }
1954
1955 #[test]
1956 fn test_convert_unary_ops() {
1957 assert!(convert_unary_op(UnaryOp::Not).is_ok());
1958 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1959 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1960 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1961 }
1962
1963 #[test]
1964 fn test_convert_aggregate_functions() {
1965 assert!(matches!(
1966 convert_aggregate_function(LogicalAggregateFunction::Count),
1967 PhysicalAggregateFunction::Count
1968 ));
1969 assert!(matches!(
1970 convert_aggregate_function(LogicalAggregateFunction::Sum),
1971 PhysicalAggregateFunction::Sum
1972 ));
1973 assert!(matches!(
1974 convert_aggregate_function(LogicalAggregateFunction::Avg),
1975 PhysicalAggregateFunction::Avg
1976 ));
1977 assert!(matches!(
1978 convert_aggregate_function(LogicalAggregateFunction::Min),
1979 PhysicalAggregateFunction::Min
1980 ));
1981 assert!(matches!(
1982 convert_aggregate_function(LogicalAggregateFunction::Max),
1983 PhysicalAggregateFunction::Max
1984 ));
1985 }
1986
1987 #[test]
1988 fn test_planner_accessors() {
1989 let store = create_test_store();
1990 let planner = Planner::new(Arc::clone(&store));
1991
1992 assert!(planner.tx_id().is_none());
1993 assert!(planner.tx_manager().is_none());
1994 let _ = planner.viewing_epoch(); }
1996
1997 #[test]
1998 fn test_physical_plan_accessors() {
1999 let store = create_test_store();
2000 let planner = Planner::new(store);
2001
2002 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2003 variable: "n".to_string(),
2004 label: None,
2005 input: None,
2006 }));
2007
2008 let physical = planner.plan(&logical).unwrap();
2009 assert_eq!(physical.columns(), &["n"]);
2010
2011 let _ = physical.into_operator();
2013 }
2014
2015 #[test]
2018 fn test_plan_adaptive_with_scan() {
2019 let store = create_test_store();
2020 let planner = Planner::new(store);
2021
2022 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2024 items: vec![ReturnItem {
2025 expression: LogicalExpression::Variable("n".to_string()),
2026 alias: None,
2027 }],
2028 distinct: false,
2029 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2030 variable: "n".to_string(),
2031 label: Some("Person".to_string()),
2032 input: None,
2033 })),
2034 }));
2035
2036 let physical = planner.plan_adaptive(&logical).unwrap();
2037 assert_eq!(physical.columns(), &["n"]);
2038 assert!(physical.adaptive_context.is_some());
2040 }
2041
2042 #[test]
2043 fn test_plan_adaptive_with_filter() {
2044 let store = create_test_store();
2045 let planner = Planner::new(store);
2046
2047 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2049 items: vec![ReturnItem {
2050 expression: LogicalExpression::Variable("n".to_string()),
2051 alias: None,
2052 }],
2053 distinct: false,
2054 input: Box::new(LogicalOperator::Filter(FilterOp {
2055 predicate: LogicalExpression::Binary {
2056 left: Box::new(LogicalExpression::Property {
2057 variable: "n".to_string(),
2058 property: "age".to_string(),
2059 }),
2060 op: BinaryOp::Gt,
2061 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2062 },
2063 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2064 variable: "n".to_string(),
2065 label: None,
2066 input: None,
2067 })),
2068 })),
2069 }));
2070
2071 let physical = planner.plan_adaptive(&logical).unwrap();
2072 assert!(physical.adaptive_context.is_some());
2073 }
2074
2075 #[test]
2076 fn test_plan_adaptive_with_expand() {
2077 let store = create_test_store();
2078 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2079
2080 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2082 items: vec![
2083 ReturnItem {
2084 expression: LogicalExpression::Variable("a".to_string()),
2085 alias: None,
2086 },
2087 ReturnItem {
2088 expression: LogicalExpression::Variable("b".to_string()),
2089 alias: None,
2090 },
2091 ],
2092 distinct: false,
2093 input: Box::new(LogicalOperator::Expand(ExpandOp {
2094 from_variable: "a".to_string(),
2095 to_variable: "b".to_string(),
2096 edge_variable: None,
2097 direction: ExpandDirection::Outgoing,
2098 edge_types: vec!["KNOWS".to_string()],
2099 min_hops: 1,
2100 max_hops: Some(1),
2101 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2102 variable: "a".to_string(),
2103 label: None,
2104 input: None,
2105 })),
2106 path_alias: None,
2107 path_mode: PathMode::Walk,
2108 })),
2109 }));
2110
2111 let physical = planner.plan_adaptive(&logical).unwrap();
2112 assert!(physical.adaptive_context.is_some());
2113 }
2114
2115 #[test]
2116 fn test_plan_adaptive_with_join() {
2117 let store = create_test_store();
2118 let planner = Planner::new(store);
2119
2120 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2121 items: vec![
2122 ReturnItem {
2123 expression: LogicalExpression::Variable("a".to_string()),
2124 alias: None,
2125 },
2126 ReturnItem {
2127 expression: LogicalExpression::Variable("b".to_string()),
2128 alias: None,
2129 },
2130 ],
2131 distinct: false,
2132 input: Box::new(LogicalOperator::Join(JoinOp {
2133 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2134 variable: "a".to_string(),
2135 label: None,
2136 input: None,
2137 })),
2138 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2139 variable: "b".to_string(),
2140 label: None,
2141 input: None,
2142 })),
2143 join_type: JoinType::Cross,
2144 conditions: vec![],
2145 })),
2146 }));
2147
2148 let physical = planner.plan_adaptive(&logical).unwrap();
2149 assert!(physical.adaptive_context.is_some());
2150 }
2151
2152 #[test]
2153 fn test_plan_adaptive_with_aggregate() {
2154 let store = create_test_store();
2155 let planner = Planner::new(store);
2156
2157 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2158 group_by: vec![],
2159 aggregates: vec![LogicalAggregateExpr {
2160 function: LogicalAggregateFunction::Count,
2161 expression: Some(LogicalExpression::Variable("n".to_string())),
2162 distinct: false,
2163 alias: Some("cnt".to_string()),
2164 percentile: None,
2165 }],
2166 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2167 variable: "n".to_string(),
2168 label: None,
2169 input: None,
2170 })),
2171 having: None,
2172 }));
2173
2174 let physical = planner.plan_adaptive(&logical).unwrap();
2175 assert!(physical.adaptive_context.is_some());
2176 }
2177
2178 #[test]
2179 fn test_plan_adaptive_with_distinct() {
2180 let store = create_test_store();
2181 let planner = Planner::new(store);
2182
2183 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2184 items: vec![ReturnItem {
2185 expression: LogicalExpression::Variable("n".to_string()),
2186 alias: None,
2187 }],
2188 distinct: false,
2189 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2190 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2191 variable: "n".to_string(),
2192 label: None,
2193 input: None,
2194 })),
2195 columns: None,
2196 })),
2197 }));
2198
2199 let physical = planner.plan_adaptive(&logical).unwrap();
2200 assert!(physical.adaptive_context.is_some());
2201 }
2202
2203 #[test]
2204 fn test_plan_adaptive_with_limit() {
2205 let store = create_test_store();
2206 let planner = Planner::new(store);
2207
2208 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2209 items: vec![ReturnItem {
2210 expression: LogicalExpression::Variable("n".to_string()),
2211 alias: None,
2212 }],
2213 distinct: false,
2214 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2215 count: 10,
2216 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2217 variable: "n".to_string(),
2218 label: None,
2219 input: None,
2220 })),
2221 })),
2222 }));
2223
2224 let physical = planner.plan_adaptive(&logical).unwrap();
2225 assert!(physical.adaptive_context.is_some());
2226 }
2227
2228 #[test]
2229 fn test_plan_adaptive_with_skip() {
2230 let store = create_test_store();
2231 let planner = Planner::new(store);
2232
2233 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2234 items: vec![ReturnItem {
2235 expression: LogicalExpression::Variable("n".to_string()),
2236 alias: None,
2237 }],
2238 distinct: false,
2239 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2240 count: 5,
2241 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2242 variable: "n".to_string(),
2243 label: None,
2244 input: None,
2245 })),
2246 })),
2247 }));
2248
2249 let physical = planner.plan_adaptive(&logical).unwrap();
2250 assert!(physical.adaptive_context.is_some());
2251 }
2252
2253 #[test]
2254 fn test_plan_adaptive_with_sort() {
2255 let store = create_test_store();
2256 let planner = Planner::new(store);
2257
2258 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2259 items: vec![ReturnItem {
2260 expression: LogicalExpression::Variable("n".to_string()),
2261 alias: None,
2262 }],
2263 distinct: false,
2264 input: Box::new(LogicalOperator::Sort(SortOp {
2265 keys: vec![SortKey {
2266 expression: LogicalExpression::Variable("n".to_string()),
2267 order: SortOrder::Ascending,
2268 }],
2269 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2270 variable: "n".to_string(),
2271 label: None,
2272 input: None,
2273 })),
2274 })),
2275 }));
2276
2277 let physical = planner.plan_adaptive(&logical).unwrap();
2278 assert!(physical.adaptive_context.is_some());
2279 }
2280
2281 #[test]
2282 fn test_plan_adaptive_with_union() {
2283 let store = create_test_store();
2284 let planner = Planner::new(store);
2285
2286 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2287 items: vec![ReturnItem {
2288 expression: LogicalExpression::Variable("n".to_string()),
2289 alias: None,
2290 }],
2291 distinct: false,
2292 input: Box::new(LogicalOperator::Union(UnionOp {
2293 inputs: vec![
2294 LogicalOperator::NodeScan(NodeScanOp {
2295 variable: "n".to_string(),
2296 label: Some("Person".to_string()),
2297 input: None,
2298 }),
2299 LogicalOperator::NodeScan(NodeScanOp {
2300 variable: "n".to_string(),
2301 label: Some("Company".to_string()),
2302 input: None,
2303 }),
2304 ],
2305 })),
2306 }));
2307
2308 let physical = planner.plan_adaptive(&logical).unwrap();
2309 assert!(physical.adaptive_context.is_some());
2310 }
2311
2312 #[test]
2315 fn test_plan_expand_variable_length() {
2316 let store = create_test_store();
2317 let planner = Planner::new(store);
2318
2319 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2321 items: vec![
2322 ReturnItem {
2323 expression: LogicalExpression::Variable("a".to_string()),
2324 alias: None,
2325 },
2326 ReturnItem {
2327 expression: LogicalExpression::Variable("b".to_string()),
2328 alias: None,
2329 },
2330 ],
2331 distinct: false,
2332 input: Box::new(LogicalOperator::Expand(ExpandOp {
2333 from_variable: "a".to_string(),
2334 to_variable: "b".to_string(),
2335 edge_variable: None,
2336 direction: ExpandDirection::Outgoing,
2337 edge_types: vec!["KNOWS".to_string()],
2338 min_hops: 1,
2339 max_hops: Some(3),
2340 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2341 variable: "a".to_string(),
2342 label: None,
2343 input: None,
2344 })),
2345 path_alias: None,
2346 path_mode: PathMode::Walk,
2347 })),
2348 }));
2349
2350 let physical = planner.plan(&logical).unwrap();
2351 assert!(physical.columns().contains(&"a".to_string()));
2352 assert!(physical.columns().contains(&"b".to_string()));
2353 }
2354
2355 #[test]
2356 fn test_plan_expand_with_path_alias() {
2357 let store = create_test_store();
2358 let planner = Planner::new(store);
2359
2360 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2362 items: vec![
2363 ReturnItem {
2364 expression: LogicalExpression::Variable("a".to_string()),
2365 alias: None,
2366 },
2367 ReturnItem {
2368 expression: LogicalExpression::Variable("b".to_string()),
2369 alias: None,
2370 },
2371 ],
2372 distinct: false,
2373 input: Box::new(LogicalOperator::Expand(ExpandOp {
2374 from_variable: "a".to_string(),
2375 to_variable: "b".to_string(),
2376 edge_variable: None,
2377 direction: ExpandDirection::Outgoing,
2378 edge_types: vec!["KNOWS".to_string()],
2379 min_hops: 1,
2380 max_hops: Some(3),
2381 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2382 variable: "a".to_string(),
2383 label: None,
2384 input: None,
2385 })),
2386 path_alias: Some("p".to_string()),
2387 path_mode: PathMode::Walk,
2388 })),
2389 }));
2390
2391 let physical = planner.plan(&logical).unwrap();
2392 assert!(physical.columns().contains(&"a".to_string()));
2394 assert!(physical.columns().contains(&"b".to_string()));
2395 }
2396
2397 #[test]
2398 fn test_plan_expand_incoming() {
2399 let store = create_test_store();
2400 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2401
2402 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2404 items: vec![
2405 ReturnItem {
2406 expression: LogicalExpression::Variable("a".to_string()),
2407 alias: None,
2408 },
2409 ReturnItem {
2410 expression: LogicalExpression::Variable("b".to_string()),
2411 alias: None,
2412 },
2413 ],
2414 distinct: false,
2415 input: Box::new(LogicalOperator::Expand(ExpandOp {
2416 from_variable: "a".to_string(),
2417 to_variable: "b".to_string(),
2418 edge_variable: None,
2419 direction: ExpandDirection::Incoming,
2420 edge_types: vec!["KNOWS".to_string()],
2421 min_hops: 1,
2422 max_hops: Some(1),
2423 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2424 variable: "a".to_string(),
2425 label: None,
2426 input: None,
2427 })),
2428 path_alias: None,
2429 path_mode: PathMode::Walk,
2430 })),
2431 }));
2432
2433 let physical = planner.plan(&logical).unwrap();
2434 assert!(physical.columns().contains(&"a".to_string()));
2435 assert!(physical.columns().contains(&"b".to_string()));
2436 }
2437
2438 #[test]
2439 fn test_plan_expand_both_directions() {
2440 let store = create_test_store();
2441 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2442
2443 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2445 items: vec![
2446 ReturnItem {
2447 expression: LogicalExpression::Variable("a".to_string()),
2448 alias: None,
2449 },
2450 ReturnItem {
2451 expression: LogicalExpression::Variable("b".to_string()),
2452 alias: None,
2453 },
2454 ],
2455 distinct: false,
2456 input: Box::new(LogicalOperator::Expand(ExpandOp {
2457 from_variable: "a".to_string(),
2458 to_variable: "b".to_string(),
2459 edge_variable: None,
2460 direction: ExpandDirection::Both,
2461 edge_types: vec!["KNOWS".to_string()],
2462 min_hops: 1,
2463 max_hops: Some(1),
2464 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2465 variable: "a".to_string(),
2466 label: None,
2467 input: None,
2468 })),
2469 path_alias: None,
2470 path_mode: PathMode::Walk,
2471 })),
2472 }));
2473
2474 let physical = planner.plan(&logical).unwrap();
2475 assert!(physical.columns().contains(&"a".to_string()));
2476 assert!(physical.columns().contains(&"b".to_string()));
2477 }
2478
2479 #[test]
2482 fn test_planner_with_context() {
2483 use crate::transaction::TransactionManager;
2484
2485 let store = create_test_store();
2486 let tx_manager = Arc::new(TransactionManager::new());
2487 let tx_id = tx_manager.begin();
2488 let epoch = tx_manager.current_epoch();
2489
2490 let planner = Planner::with_context(
2491 Arc::clone(&store),
2492 Arc::clone(&tx_manager),
2493 Some(tx_id),
2494 epoch,
2495 );
2496
2497 assert_eq!(planner.tx_id(), Some(tx_id));
2498 assert!(planner.tx_manager().is_some());
2499 assert_eq!(planner.viewing_epoch(), epoch);
2500 }
2501
2502 #[test]
2503 fn test_planner_with_factorized_execution_disabled() {
2504 let store = create_test_store();
2505 let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2506
2507 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2509 items: vec![
2510 ReturnItem {
2511 expression: LogicalExpression::Variable("a".to_string()),
2512 alias: None,
2513 },
2514 ReturnItem {
2515 expression: LogicalExpression::Variable("c".to_string()),
2516 alias: None,
2517 },
2518 ],
2519 distinct: false,
2520 input: Box::new(LogicalOperator::Expand(ExpandOp {
2521 from_variable: "b".to_string(),
2522 to_variable: "c".to_string(),
2523 edge_variable: None,
2524 direction: ExpandDirection::Outgoing,
2525 edge_types: vec![],
2526 min_hops: 1,
2527 max_hops: Some(1),
2528 input: Box::new(LogicalOperator::Expand(ExpandOp {
2529 from_variable: "a".to_string(),
2530 to_variable: "b".to_string(),
2531 edge_variable: None,
2532 direction: ExpandDirection::Outgoing,
2533 edge_types: vec![],
2534 min_hops: 1,
2535 max_hops: Some(1),
2536 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2537 variable: "a".to_string(),
2538 label: None,
2539 input: None,
2540 })),
2541 path_alias: None,
2542 path_mode: PathMode::Walk,
2543 })),
2544 path_alias: None,
2545 path_mode: PathMode::Walk,
2546 })),
2547 }));
2548
2549 let physical = planner.plan(&logical).unwrap();
2550 assert!(physical.columns().contains(&"a".to_string()));
2551 assert!(physical.columns().contains(&"c".to_string()));
2552 }
2553
2554 #[test]
2557 fn test_plan_sort_by_property() {
2558 let store = create_test_store();
2559 let planner = Planner::new(store);
2560
2561 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2563 items: vec![ReturnItem {
2564 expression: LogicalExpression::Variable("n".to_string()),
2565 alias: None,
2566 }],
2567 distinct: false,
2568 input: Box::new(LogicalOperator::Sort(SortOp {
2569 keys: vec![SortKey {
2570 expression: LogicalExpression::Property {
2571 variable: "n".to_string(),
2572 property: "name".to_string(),
2573 },
2574 order: SortOrder::Ascending,
2575 }],
2576 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2577 variable: "n".to_string(),
2578 label: None,
2579 input: None,
2580 })),
2581 })),
2582 }));
2583
2584 let physical = planner.plan(&logical).unwrap();
2585 assert!(physical.columns().contains(&"n".to_string()));
2587 }
2588
2589 #[test]
2592 fn test_plan_scan_with_input() {
2593 let store = create_test_store();
2594 let planner = Planner::new(store);
2595
2596 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2598 items: vec![
2599 ReturnItem {
2600 expression: LogicalExpression::Variable("a".to_string()),
2601 alias: None,
2602 },
2603 ReturnItem {
2604 expression: LogicalExpression::Variable("b".to_string()),
2605 alias: None,
2606 },
2607 ],
2608 distinct: false,
2609 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2610 variable: "b".to_string(),
2611 label: Some("Company".to_string()),
2612 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2613 variable: "a".to_string(),
2614 label: Some("Person".to_string()),
2615 input: None,
2616 }))),
2617 })),
2618 }));
2619
2620 let physical = planner.plan(&logical).unwrap();
2621 assert!(physical.columns().contains(&"a".to_string()));
2622 assert!(physical.columns().contains(&"b".to_string()));
2623 }
2624}