1mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod filter_hybrid;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16#[cfg(feature = "algos")]
17use crate::query::plan::CallProcedureOp;
18#[cfg(feature = "text-index")]
19use crate::query::plan::TextScanOp;
20use crate::query::plan::{
21 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
22 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
23 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
24 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
25 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
26 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
27 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
28};
29#[cfg(feature = "vector-index")]
30use crate::query::plan::{VectorMetric, VectorScanOp};
31use grafeo_common::grafeo_debug_span;
32use grafeo_common::types::{EpochId, TransactionId};
33use grafeo_common::types::{LogicalType, Value};
34use grafeo_common::utils::error::{Error, Result};
35use grafeo_core::execution::AdaptiveContext;
36use grafeo_core::execution::operators::{
37 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
38 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
39 DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
40 ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
41 FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
42 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
43 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
44 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
45 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
46 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
47 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
48 VariableLengthExpandOperator,
49};
50use grafeo_core::graph::{Direction, GraphStoreMut, GraphStoreSearch};
51use std::collections::HashMap;
52use std::sync::Arc;
53
54use crate::query::planner::common;
55use crate::query::planner::common::expression_to_string;
56use crate::query::planner::{
57 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
58 convert_unary_op, value_to_logical_type,
59};
60use crate::transaction::TransactionManager;
61
62struct RangeBounds<'a> {
64 min: Option<&'a Value>,
65 max: Option<&'a Value>,
66 min_inclusive: bool,
67 max_inclusive: bool,
68}
69
70pub struct Planner {
72 pub(super) store: Arc<dyn GraphStoreSearch>,
74 pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
76 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
78 pub(super) transaction_id: Option<TransactionId>,
80 pub(super) viewing_epoch: EpochId,
82 pub(super) anon_edge_counter: std::cell::Cell<u32>,
84 pub(super) factorized_execution: bool,
86 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
89 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
92 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
94 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
96 pub(super) correlated_param_state:
100 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
101 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
104 profiling: std::cell::Cell<bool>,
106 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
108 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
110 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
112 pub(super) read_only: bool,
116}
117
118impl Planner {
119 #[must_use]
124 pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
125 let epoch = store.current_epoch();
126 Self {
127 store,
128 write_store: None,
129 transaction_manager: None,
130 transaction_id: None,
131 viewing_epoch: epoch,
132 anon_edge_counter: std::cell::Cell::new(0),
133 factorized_execution: true,
134 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
135 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
136 validator: None,
137 catalog: None,
138 correlated_param_state: std::cell::RefCell::new(None),
139 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
140 profiling: std::cell::Cell::new(false),
141 profile_entries: std::cell::RefCell::new(Vec::new()),
142 write_tracker: None,
143 session_context: grafeo_core::execution::operators::SessionContext::default(),
144 read_only: false,
145 }
146 }
147
148 #[must_use]
150 pub fn with_context(
151 store: Arc<dyn GraphStoreSearch>,
152 write_store: Option<Arc<dyn GraphStoreMut>>,
153 transaction_manager: Arc<TransactionManager>,
154 transaction_id: Option<TransactionId>,
155 viewing_epoch: EpochId,
156 ) -> Self {
157 use crate::transaction::TransactionWriteTracker;
158
159 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
161 if transaction_id.is_some() {
162 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
163 &transaction_manager,
164 ))))
165 } else {
166 None
167 };
168
169 Self {
170 store,
171 write_store,
172 transaction_manager: Some(transaction_manager),
173 transaction_id,
174 viewing_epoch,
175 anon_edge_counter: std::cell::Cell::new(0),
176 factorized_execution: true,
177 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
178 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
179 validator: None,
180 catalog: None,
181 correlated_param_state: std::cell::RefCell::new(None),
182 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
183 profiling: std::cell::Cell::new(false),
184 profile_entries: std::cell::RefCell::new(Vec::new()),
185 write_tracker,
186 session_context: grafeo_core::execution::operators::SessionContext::default(),
187 read_only: false,
188 }
189 }
190
191 #[must_use]
194 pub fn with_read_only(mut self, read_only: bool) -> Self {
195 self.read_only = read_only;
196 self
197 }
198
199 fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
201 self.write_store
202 .as_ref()
203 .map(Arc::clone)
204 .ok_or(Error::Transaction(
205 grafeo_common::utils::error::TransactionError::ReadOnly,
206 ))
207 }
208
209 #[must_use]
211 pub fn viewing_epoch(&self) -> EpochId {
212 self.viewing_epoch
213 }
214
215 #[must_use]
217 pub fn transaction_id(&self) -> Option<TransactionId> {
218 self.transaction_id
219 }
220
221 #[must_use]
223 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
224 self.transaction_manager.as_ref()
225 }
226
227 #[must_use]
229 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
230 self.factorized_execution = enabled;
231 self
232 }
233
234 #[must_use]
236 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
237 self.validator = Some(validator);
238 self
239 }
240
241 #[must_use]
243 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
244 self.catalog = Some(catalog);
245 self
246 }
247
248 #[must_use]
250 pub fn with_session_context(
251 mut self,
252 context: grafeo_core::execution::operators::SessionContext,
253 ) -> Self {
254 self.session_context = context;
255 self
256 }
257
258 pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
264 let name = edge_variable.clone().unwrap_or_else(|| {
265 let count = self.anon_edge_counter.get();
266 self.anon_edge_counter.set(count + 1);
267 format!("_anon_edge_{}", count)
268 });
269 self.edge_columns.borrow_mut().insert(name.clone());
270 name
271 }
272
273 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
277 match op {
278 LogicalOperator::Expand(expand) => {
279 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
280
281 if is_single_hop {
282 let (inner_count, base) = Self::count_expand_chain(&expand.input);
283 (inner_count + 1, base)
284 } else {
285 (0, op)
286 }
287 }
288 _ => (0, op),
289 }
290 }
291
292 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
296 let mut chain = Vec::new();
297 let mut current = op;
298
299 while let LogicalOperator::Expand(expand) = current {
300 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
301 if !is_single_hop {
302 break;
303 }
304 chain.push(expand);
305 current = &expand.input;
306 }
307
308 chain.reverse();
309 chain
310 }
311
312 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
319 let _span = grafeo_debug_span!("grafeo::query::plan");
320 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
321 Ok(PhysicalPlan {
322 operator,
323 columns,
324 adaptive_context: None,
325 })
326 }
327
328 pub fn plan_profiled(
339 &self,
340 logical_plan: &LogicalPlan,
341 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
342 self.profiling.set(true);
343 self.profile_entries.borrow_mut().clear();
344
345 let result = self.plan_operator(&logical_plan.root);
346
347 self.profiling.set(false);
348 let (operator, columns) = result?;
349 let entries = self.profile_entries.borrow_mut().drain(..).collect();
350
351 Ok((
352 PhysicalPlan {
353 operator,
354 columns,
355 adaptive_context: None,
356 },
357 entries,
358 ))
359 }
360
361 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
368 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
369
370 let mut adaptive_context = AdaptiveContext::new();
371 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
372
373 Ok(PhysicalPlan {
374 operator,
375 columns,
376 adaptive_context: Some(adaptive_context),
377 })
378 }
379
380 fn collect_cardinality_estimates(
382 &self,
383 op: &LogicalOperator,
384 ctx: &mut AdaptiveContext,
385 depth: usize,
386 ) {
387 match op {
388 LogicalOperator::NodeScan(scan) => {
389 let estimate = if let Some(label) = &scan.label {
390 self.store.nodes_by_label(label).len() as f64
391 } else {
392 self.store.node_count() as f64
393 };
394 let id = format!("scan_{}", scan.variable);
395 ctx.set_estimate(&id, estimate);
396
397 if let Some(input) = &scan.input {
398 self.collect_cardinality_estimates(input, ctx, depth + 1);
399 }
400 }
401 LogicalOperator::Filter(filter) => {
402 let input_estimate = self.estimate_cardinality(&filter.input);
403 let estimate = input_estimate * 0.3;
404 let id = format!("filter_{depth}");
405 ctx.set_estimate(&id, estimate);
406
407 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
408 }
409 LogicalOperator::Expand(expand) => {
410 let input_estimate = self.estimate_cardinality(&expand.input);
411 let stats = self.store.statistics();
412 let avg_degree = self.estimate_expand_degree(&stats, expand);
413 let estimate = input_estimate * avg_degree;
414 let id = format!("expand_{}", expand.to_variable);
415 ctx.set_estimate(&id, estimate);
416
417 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
418 }
419 LogicalOperator::Join(join) => {
420 let left_est = self.estimate_cardinality(&join.left);
421 let right_est = self.estimate_cardinality(&join.right);
422 let estimate = (left_est * right_est).sqrt();
423 let id = format!("join_{depth}");
424 ctx.set_estimate(&id, estimate);
425
426 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
427 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
428 }
429 LogicalOperator::Aggregate(agg) => {
430 let input_estimate = self.estimate_cardinality(&agg.input);
431 let estimate = if agg.group_by.is_empty() {
432 1.0
433 } else {
434 (input_estimate * 0.1).max(1.0)
435 };
436 let id = format!("aggregate_{depth}");
437 ctx.set_estimate(&id, estimate);
438
439 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
440 }
441 LogicalOperator::Distinct(distinct) => {
442 let input_estimate = self.estimate_cardinality(&distinct.input);
443 let estimate = (input_estimate * 0.5).max(1.0);
444 let id = format!("distinct_{depth}");
445 ctx.set_estimate(&id, estimate);
446
447 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
448 }
449 LogicalOperator::Return(ret) => {
450 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
451 }
452 LogicalOperator::Limit(limit) => {
453 let input_estimate = self.estimate_cardinality(&limit.input);
454 let estimate = (input_estimate).min(limit.count.estimate());
455 let id = format!("limit_{depth}");
456 ctx.set_estimate(&id, estimate);
457
458 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
459 }
460 LogicalOperator::Skip(skip) => {
461 let input_estimate = self.estimate_cardinality(&skip.input);
462 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
463 let id = format!("skip_{depth}");
464 ctx.set_estimate(&id, estimate);
465
466 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
467 }
468 LogicalOperator::Sort(sort) => {
469 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
470 }
471 LogicalOperator::Union(union) => {
472 let estimate: f64 = union
473 .inputs
474 .iter()
475 .map(|input| self.estimate_cardinality(input))
476 .sum();
477 let id = format!("union_{depth}");
478 ctx.set_estimate(&id, estimate);
479
480 for input in &union.inputs {
481 self.collect_cardinality_estimates(input, ctx, depth + 1);
482 }
483 }
484 _ => {
485 }
487 }
488 }
489
490 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
492 match op {
493 LogicalOperator::NodeScan(scan) => {
494 if let Some(label) = &scan.label {
495 self.store.nodes_by_label(label).len() as f64
496 } else {
497 self.store.node_count() as f64
498 }
499 }
500 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
501 LogicalOperator::Expand(expand) => {
502 let stats = self.store.statistics();
503 let avg_degree = self.estimate_expand_degree(&stats, expand);
504 self.estimate_cardinality(&expand.input) * avg_degree
505 }
506 LogicalOperator::Join(join) => {
507 let left = self.estimate_cardinality(&join.left);
508 let right = self.estimate_cardinality(&join.right);
509 (left * right).sqrt()
510 }
511 LogicalOperator::Aggregate(agg) => {
512 if agg.group_by.is_empty() {
513 1.0
514 } else {
515 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
516 }
517 }
518 LogicalOperator::Distinct(distinct) => {
519 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
520 }
521 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
522 LogicalOperator::Limit(limit) => self
523 .estimate_cardinality(&limit.input)
524 .min(limit.count.estimate()),
525 LogicalOperator::Skip(skip) => {
526 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
527 }
528 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
529 LogicalOperator::Union(union) => union
530 .inputs
531 .iter()
532 .map(|input| self.estimate_cardinality(input))
533 .sum(),
534 LogicalOperator::Except(except) => {
535 let left = self.estimate_cardinality(&except.left);
536 let right = self.estimate_cardinality(&except.right);
537 (left - right).max(0.0)
538 }
539 LogicalOperator::Intersect(intersect) => {
540 let left = self.estimate_cardinality(&intersect.left);
541 let right = self.estimate_cardinality(&intersect.right);
542 left.min(right)
543 }
544 LogicalOperator::Otherwise(otherwise) => self
545 .estimate_cardinality(&otherwise.left)
546 .max(self.estimate_cardinality(&otherwise.right)),
547 _ => 1000.0,
548 }
549 }
550
551 fn estimate_expand_degree(
553 &self,
554 stats: &grafeo_core::statistics::Statistics,
555 expand: &ExpandOp,
556 ) -> f64 {
557 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
558 if expand.edge_types.len() == 1 {
559 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
560 } else if stats.total_nodes > 0 {
561 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
562 } else {
563 10.0
564 }
565 }
566
567 fn maybe_profile(
570 &self,
571 result: Result<(Box<dyn Operator>, Vec<String>)>,
572 op: &LogicalOperator,
573 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
574 if self.profiling.get() {
575 let (physical, columns) = result?;
576 let (entry, stats) =
577 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
578 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
579 self.profile_entries.borrow_mut().push(entry);
580 Ok((Box::new(profiled), columns))
581 } else {
582 result
583 }
584 }
585
586 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
588 let result = match op {
589 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
590 LogicalOperator::Expand(expand) => {
591 if self.factorized_execution {
592 let (chain_len, _base) = Self::count_expand_chain(op);
593 if chain_len >= 2 {
594 return self.maybe_profile(self.plan_expand_chain(op), op);
595 }
596 }
597 self.plan_expand(expand)
598 }
599 LogicalOperator::Return(ret) => self.plan_return(ret),
600 LogicalOperator::Filter(filter) => self.plan_filter(filter),
601 LogicalOperator::Project(project) => self.plan_project(project),
602 LogicalOperator::Limit(limit) => self.plan_limit(limit),
603 LogicalOperator::Skip(skip) => self.plan_skip(skip),
604 LogicalOperator::Sort(sort) => self.plan_sort(sort),
605 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
606 LogicalOperator::Join(join) => self.plan_join(join),
607 LogicalOperator::Union(union) => self.plan_union(union),
608 LogicalOperator::Except(except) => self.plan_except(except),
609 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
610 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
611 LogicalOperator::Apply(apply) => self.plan_apply(apply),
612 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
613 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
614 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
615 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
616 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
617 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
618 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
619 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
620 LogicalOperator::Merge(merge) => self.plan_merge(merge),
621 LogicalOperator::MergeRelationship(merge_rel) => {
622 self.plan_merge_relationship(merge_rel)
623 }
624 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
625 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
626 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
627 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
628 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
629 #[cfg(feature = "algos")]
630 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
631 #[cfg(not(feature = "algos"))]
632 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
633 "CALL procedures require the 'algos' feature".to_string(),
634 )),
635 LogicalOperator::ParameterScan(_param_scan) => {
636 let state = self
637 .correlated_param_state
638 .borrow()
639 .clone()
640 .ok_or_else(|| {
641 Error::Internal(
642 "ParameterScan without correlated Apply context".to_string(),
643 )
644 })?;
645 let columns = state.columns.clone();
648 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
649 Ok((operator, columns))
650 }
651 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
652 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
653 LogicalOperator::LoadData(load) => {
654 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
655 load.path.clone(),
656 load.format,
657 load.with_headers,
658 load.field_terminator,
659 load.variable.clone(),
660 ));
661 Ok((operator, vec![load.variable.clone()]))
662 }
663 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
664 #[cfg(feature = "vector-index")]
665 LogicalOperator::VectorScan(scan) => self.plan_vector_scan(scan),
666 #[cfg(not(feature = "vector-index"))]
667 LogicalOperator::VectorScan(_) => Err(Error::Internal(
668 "VectorScan requires vector-index feature".to_string(),
669 )),
670 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
671 "VectorJoin requires vector-index feature".to_string(),
672 )),
673 #[cfg(feature = "text-index")]
674 LogicalOperator::TextScan(scan) => self.plan_text_scan(scan),
675 #[cfg(not(feature = "text-index"))]
676 LogicalOperator::TextScan(_) => Err(Error::Internal(
677 "TextScan requires text-index feature".to_string(),
678 )),
679 _ => Err(Error::Internal(format!(
680 "Unsupported operator: {:?}",
681 std::mem::discriminant(op)
682 ))),
683 };
684 self.maybe_profile(result, op)
685 }
686
687 fn plan_horizontal_aggregate(
689 &self,
690 ha: &HorizontalAggregateOp,
691 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
692 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
693
694 let list_col_idx = child_columns
695 .iter()
696 .position(|c| c == &ha.list_column)
697 .ok_or_else(|| {
698 Error::Internal(format!(
699 "HorizontalAggregate list column '{}' not found in {:?}",
700 ha.list_column, child_columns
701 ))
702 })?;
703
704 let entity_kind = match ha.entity_kind {
705 LogicalEntityKind::Edge => EntityKind::Edge,
706 LogicalEntityKind::Node => EntityKind::Node,
707 };
708
709 let function = convert_aggregate_function(ha.function);
710 let input_column_count = child_columns.len();
711
712 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
713 child_op,
714 list_col_idx,
715 entity_kind,
716 function,
717 ha.property.clone(),
718 Arc::clone(&self.store) as Arc<dyn GraphStoreSearch>,
719 input_column_count,
720 ));
721
722 let mut columns = child_columns;
723 columns.push(ha.alias.clone());
724 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
726
727 Ok((operator, columns))
728 }
729
730 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
732 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
733 let key_idx = child_columns
734 .iter()
735 .position(|c| c == &mc.key_var)
736 .ok_or_else(|| {
737 Error::Internal(format!(
738 "MapCollect key '{}' not in columns {:?}",
739 mc.key_var, child_columns
740 ))
741 })?;
742 let value_idx = child_columns
743 .iter()
744 .position(|c| c == &mc.value_var)
745 .ok_or_else(|| {
746 Error::Internal(format!(
747 "MapCollect value '{}' not in columns {:?}",
748 mc.value_var, child_columns
749 ))
750 })?;
751 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
752 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
753 Ok((operator, vec![mc.alias.clone()]))
754 }
755
756 #[cfg(feature = "text-index")]
758 fn plan_text_scan(&self, scan: &TextScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
759 use grafeo_core::execution::operators::TextScanOperator;
760
761 let query_string = match &scan.query {
762 LogicalExpression::Literal(Value::String(s)) => s.to_string(),
763 LogicalExpression::Parameter(name) => {
764 return Err(Error::Internal(format!(
765 "TextScan query parameter ${} not resolved",
766 name
767 )));
768 }
769 _ => {
770 return Err(Error::Internal(
771 "TextScan query must be a string literal or parameter".to_string(),
772 ));
773 }
774 };
775
776 let operator: Box<dyn Operator> = if let Some(k) = scan.k {
777 Box::new(TextScanOperator::top_k(
778 Arc::clone(&self.store),
779 &scan.label,
780 &scan.property,
781 &query_string,
782 k,
783 ))
784 } else if let Some(threshold) = scan.threshold {
785 Box::new(TextScanOperator::with_threshold(
786 Arc::clone(&self.store),
787 &scan.label,
788 &scan.property,
789 &query_string,
790 threshold,
791 ))
792 } else {
793 Box::new(TextScanOperator::top_k(
794 Arc::clone(&self.store),
795 &scan.label,
796 &scan.property,
797 &query_string,
798 100,
799 ))
800 };
801
802 let mut columns = vec![scan.variable.clone()];
803 if let Some(ref score_col) = scan.score_column {
804 columns.push(score_col.clone());
805 }
806
807 Ok((operator, columns))
808 }
809
810 #[cfg(feature = "vector-index")]
812 pub(super) fn plan_vector_scan(
813 &self,
814 scan: &VectorScanOp,
815 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
816 use grafeo_core::execution::operators::VectorScanOperator;
817 use grafeo_core::index::vector::DistanceMetric;
818
819 if scan.input.is_some() {
824 return Err(Error::Internal(
825 "VectorScan with an input subtree is not supported, use VectorJoin for hybrid graph+vector queries".to_string(),
826 ));
827 }
828
829 let query_vec = self.resolve_vector_literal(&scan.query_vector)?;
830
831 let requested_metric = scan.metric.map(|m| match m {
832 VectorMetric::Cosine => DistanceMetric::Cosine,
833 VectorMetric::Euclidean => DistanceMetric::Euclidean,
834 VectorMetric::DotProduct => DistanceMetric::DotProduct,
835 VectorMetric::Manhattan => DistanceMetric::Manhattan,
836 });
837
838 let k = scan.k.unwrap_or(usize::MAX);
839
840 let index_metric = scan
845 .label
846 .as_ref()
847 .and_then(|label| self.store.vector_index_metric(label, &scan.property));
848 let metric = requested_metric
849 .or(index_metric)
850 .unwrap_or(DistanceMetric::Cosine);
851
852 let mut operator = VectorScanOperator::new(
856 Arc::clone(&self.store),
857 scan.label.clone(),
858 scan.property.clone(),
859 query_vec,
860 k,
861 metric,
862 );
863
864 if let Some(sim) = scan.min_similarity {
865 operator = operator.with_min_similarity(sim);
866 }
867 if let Some(dist) = scan.max_distance {
868 operator = operator.with_max_distance(dist);
869 }
870
871 let mut columns = vec![scan.variable.clone()];
872 let metric_tag = match metric {
876 DistanceMetric::Cosine => "cos",
877 DistanceMetric::Euclidean => "euc",
878 DistanceMetric::DotProduct => "dot",
879 DistanceMetric::Manhattan => "man",
880 _ => "other",
883 };
884 columns.push(project::vector_score_column_name(
885 metric_tag,
886 &scan.property,
887 &scan.variable,
888 &scan.query_vector,
889 ));
890
891 Ok((Box::new(operator), columns))
892 }
893
894 #[cfg(feature = "vector-index")]
896 pub(super) fn resolve_vector_literal(&self, expr: &LogicalExpression) -> Result<Vec<f32>> {
897 #[allow(clippy::cast_possible_truncation)]
899 match expr {
900 LogicalExpression::Literal(Value::Vector(v)) => Ok(v.to_vec()),
901 LogicalExpression::Literal(Value::List(list)) => {
902 let mut vec = Vec::with_capacity(list.len());
903 for item in list.iter() {
904 match item {
905 Value::Float64(f) => vec.push(*f as f32),
906 Value::Int64(i) => vec.push(*i as f32),
907 _ => {
908 return Err(Error::Internal(
909 "Vector elements must be numeric".to_string(),
910 ));
911 }
912 }
913 }
914 Ok(vec)
915 }
916 LogicalExpression::List(items) => {
919 let mut vec = Vec::with_capacity(items.len());
920 for item in items {
921 match item {
922 LogicalExpression::Literal(Value::Float64(f)) => vec.push(*f as f32),
923 LogicalExpression::Literal(Value::Int64(i)) => vec.push(*i as f32),
924 _ => {
925 return Err(Error::Internal(
926 "Vector elements must be numeric literals".to_string(),
927 ));
928 }
929 }
930 }
931 Ok(vec)
932 }
933 _ => Err(Error::Internal("Expected vector literal".to_string())),
934 }
935 }
936}
937
938#[cfg(feature = "algos")]
940struct StaticResultOperator {
941 rows: Vec<Vec<Value>>,
942 column_indices: Vec<usize>,
943 row_index: usize,
944}
945
946#[cfg(feature = "algos")]
947impl Operator for StaticResultOperator {
948 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
949 use grafeo_core::execution::DataChunk;
950
951 if self.row_index >= self.rows.len() {
952 return Ok(None);
953 }
954
955 let remaining = self.rows.len() - self.row_index;
956 let chunk_rows = remaining.min(1024);
957 let col_count = self.column_indices.len();
958
959 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
960 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
961
962 for row_offset in 0..chunk_rows {
963 let row = &self.rows[self.row_index + row_offset];
964 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
965 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
966 if let Some(col) = chunk.column_mut(col_idx) {
967 col.push_value(value);
968 }
969 }
970 }
971 chunk.set_count(chunk_rows);
972
973 self.row_index += chunk_rows;
974 Ok(Some(chunk))
975 }
976
977 fn reset(&mut self) {
978 self.row_index = 0;
979 }
980
981 fn name(&self) -> &'static str {
982 "StaticResult"
983 }
984
985 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
986 self
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use super::*;
993 use crate::query::plan::{
994 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
995 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
996 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
997 SkipOp as LogicalSkipOp, SortKey, SortOp,
998 };
999 use grafeo_common::types::Value;
1000 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
1001 use grafeo_core::graph::GraphStoreMut;
1002 use grafeo_core::graph::lpg::LpgStore;
1003
1004 fn create_test_store() -> Arc<LpgStore> {
1005 let store = Arc::new(LpgStore::new().unwrap());
1006 store.create_node(&["Person"]);
1007 store.create_node(&["Person"]);
1008 store.create_node(&["Company"]);
1009 store
1010 }
1011
1012 #[test]
1015 fn test_plan_simple_scan() {
1016 let store = create_test_store();
1017 let planner = Planner::new(store);
1018
1019 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1021 items: vec![ReturnItem {
1022 expression: LogicalExpression::Variable("n".to_string()),
1023 alias: None,
1024 }],
1025 distinct: false,
1026 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1027 variable: "n".to_string(),
1028 label: Some("Person".to_string()),
1029 input: None,
1030 })),
1031 }));
1032
1033 let physical = planner.plan(&logical).unwrap();
1034 assert_eq!(physical.columns(), &["n"]);
1035 }
1036
1037 #[test]
1038 fn test_plan_scan_without_label() {
1039 let store = create_test_store();
1040 let planner = Planner::new(store);
1041
1042 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1044 items: vec![ReturnItem {
1045 expression: LogicalExpression::Variable("n".to_string()),
1046 alias: None,
1047 }],
1048 distinct: false,
1049 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1050 variable: "n".to_string(),
1051 label: None,
1052 input: None,
1053 })),
1054 }));
1055
1056 let physical = planner.plan(&logical).unwrap();
1057 assert_eq!(physical.columns(), &["n"]);
1058 }
1059
1060 #[test]
1061 fn test_plan_return_with_alias() {
1062 let store = create_test_store();
1063 let planner = Planner::new(store);
1064
1065 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1067 items: vec![ReturnItem {
1068 expression: LogicalExpression::Variable("n".to_string()),
1069 alias: Some("person".to_string()),
1070 }],
1071 distinct: false,
1072 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1073 variable: "n".to_string(),
1074 label: Some("Person".to_string()),
1075 input: None,
1076 })),
1077 }));
1078
1079 let physical = planner.plan(&logical).unwrap();
1080 assert_eq!(physical.columns(), &["person"]);
1081 }
1082
1083 #[test]
1084 fn test_plan_return_property() {
1085 let store = create_test_store();
1086 let planner = Planner::new(store);
1087
1088 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1090 items: vec![ReturnItem {
1091 expression: LogicalExpression::Property {
1092 variable: "n".to_string(),
1093 property: "name".to_string(),
1094 },
1095 alias: None,
1096 }],
1097 distinct: false,
1098 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1099 variable: "n".to_string(),
1100 label: Some("Person".to_string()),
1101 input: None,
1102 })),
1103 }));
1104
1105 let physical = planner.plan(&logical).unwrap();
1106 assert_eq!(physical.columns(), &["n.name"]);
1107 }
1108
1109 #[test]
1110 fn test_plan_return_literal() {
1111 let store = create_test_store();
1112 let planner = Planner::new(store);
1113
1114 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1116 items: vec![ReturnItem {
1117 expression: LogicalExpression::Literal(Value::Int64(42)),
1118 alias: Some("answer".to_string()),
1119 }],
1120 distinct: false,
1121 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1122 variable: "n".to_string(),
1123 label: None,
1124 input: None,
1125 })),
1126 }));
1127
1128 let physical = planner.plan(&logical).unwrap();
1129 assert_eq!(physical.columns(), &["answer"]);
1130 }
1131
1132 #[test]
1135 fn test_plan_filter_equality() {
1136 let store = create_test_store();
1137 let planner = Planner::new(store);
1138
1139 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1141 items: vec![ReturnItem {
1142 expression: LogicalExpression::Variable("n".to_string()),
1143 alias: None,
1144 }],
1145 distinct: false,
1146 input: Box::new(LogicalOperator::Filter(FilterOp {
1147 predicate: LogicalExpression::Binary {
1148 left: Box::new(LogicalExpression::Property {
1149 variable: "n".to_string(),
1150 property: "age".to_string(),
1151 }),
1152 op: BinaryOp::Eq,
1153 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1154 },
1155 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1156 variable: "n".to_string(),
1157 label: Some("Person".to_string()),
1158 input: None,
1159 })),
1160 pushdown_hint: None,
1161 })),
1162 }));
1163
1164 let physical = planner.plan(&logical).unwrap();
1165 assert_eq!(physical.columns(), &["n"]);
1166 }
1167
1168 #[test]
1169 fn test_plan_filter_compound_and() {
1170 let store = create_test_store();
1171 let planner = Planner::new(store);
1172
1173 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1175 items: vec![ReturnItem {
1176 expression: LogicalExpression::Variable("n".to_string()),
1177 alias: None,
1178 }],
1179 distinct: false,
1180 input: Box::new(LogicalOperator::Filter(FilterOp {
1181 predicate: LogicalExpression::Binary {
1182 left: Box::new(LogicalExpression::Binary {
1183 left: Box::new(LogicalExpression::Property {
1184 variable: "n".to_string(),
1185 property: "age".to_string(),
1186 }),
1187 op: BinaryOp::Gt,
1188 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1189 }),
1190 op: BinaryOp::And,
1191 right: Box::new(LogicalExpression::Binary {
1192 left: Box::new(LogicalExpression::Property {
1193 variable: "n".to_string(),
1194 property: "age".to_string(),
1195 }),
1196 op: BinaryOp::Lt,
1197 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1198 }),
1199 },
1200 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1201 variable: "n".to_string(),
1202 label: None,
1203 input: None,
1204 })),
1205 pushdown_hint: None,
1206 })),
1207 }));
1208
1209 let physical = planner.plan(&logical).unwrap();
1210 assert_eq!(physical.columns(), &["n"]);
1211 }
1212
1213 #[test]
1214 fn test_plan_filter_unary_not() {
1215 let store = create_test_store();
1216 let planner = Planner::new(store);
1217
1218 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1220 items: vec![ReturnItem {
1221 expression: LogicalExpression::Variable("n".to_string()),
1222 alias: None,
1223 }],
1224 distinct: false,
1225 input: Box::new(LogicalOperator::Filter(FilterOp {
1226 predicate: LogicalExpression::Unary {
1227 op: UnaryOp::Not,
1228 operand: Box::new(LogicalExpression::Property {
1229 variable: "n".to_string(),
1230 property: "active".to_string(),
1231 }),
1232 },
1233 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1234 variable: "n".to_string(),
1235 label: None,
1236 input: None,
1237 })),
1238 pushdown_hint: None,
1239 })),
1240 }));
1241
1242 let physical = planner.plan(&logical).unwrap();
1243 assert_eq!(physical.columns(), &["n"]);
1244 }
1245
1246 #[test]
1247 fn test_plan_filter_is_null() {
1248 let store = create_test_store();
1249 let planner = Planner::new(store);
1250
1251 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1253 items: vec![ReturnItem {
1254 expression: LogicalExpression::Variable("n".to_string()),
1255 alias: None,
1256 }],
1257 distinct: false,
1258 input: Box::new(LogicalOperator::Filter(FilterOp {
1259 predicate: LogicalExpression::Unary {
1260 op: UnaryOp::IsNull,
1261 operand: Box::new(LogicalExpression::Property {
1262 variable: "n".to_string(),
1263 property: "email".to_string(),
1264 }),
1265 },
1266 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1267 variable: "n".to_string(),
1268 label: None,
1269 input: None,
1270 })),
1271 pushdown_hint: None,
1272 })),
1273 }));
1274
1275 let physical = planner.plan(&logical).unwrap();
1276 assert_eq!(physical.columns(), &["n"]);
1277 }
1278
1279 #[test]
1280 fn test_plan_filter_function_call() {
1281 let store = create_test_store();
1282 let planner = Planner::new(store);
1283
1284 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1286 items: vec![ReturnItem {
1287 expression: LogicalExpression::Variable("n".to_string()),
1288 alias: None,
1289 }],
1290 distinct: false,
1291 input: Box::new(LogicalOperator::Filter(FilterOp {
1292 predicate: LogicalExpression::Binary {
1293 left: Box::new(LogicalExpression::FunctionCall {
1294 name: "size".to_string(),
1295 args: vec![LogicalExpression::Property {
1296 variable: "n".to_string(),
1297 property: "friends".to_string(),
1298 }],
1299 distinct: false,
1300 }),
1301 op: BinaryOp::Gt,
1302 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1303 },
1304 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1305 variable: "n".to_string(),
1306 label: None,
1307 input: None,
1308 })),
1309 pushdown_hint: None,
1310 })),
1311 }));
1312
1313 let physical = planner.plan(&logical).unwrap();
1314 assert_eq!(physical.columns(), &["n"]);
1315 }
1316
1317 #[test]
1320 fn test_plan_expand_outgoing() {
1321 let store = create_test_store();
1322 let planner = Planner::new(store);
1323
1324 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1326 items: vec![
1327 ReturnItem {
1328 expression: LogicalExpression::Variable("a".to_string()),
1329 alias: None,
1330 },
1331 ReturnItem {
1332 expression: LogicalExpression::Variable("b".to_string()),
1333 alias: None,
1334 },
1335 ],
1336 distinct: false,
1337 input: Box::new(LogicalOperator::Expand(ExpandOp {
1338 from_variable: "a".to_string(),
1339 to_variable: "b".to_string(),
1340 edge_variable: None,
1341 direction: ExpandDirection::Outgoing,
1342 edge_types: vec!["KNOWS".to_string()],
1343 min_hops: 1,
1344 max_hops: Some(1),
1345 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1346 variable: "a".to_string(),
1347 label: Some("Person".to_string()),
1348 input: None,
1349 })),
1350 path_alias: None,
1351 path_mode: PathMode::Walk,
1352 })),
1353 }));
1354
1355 let physical = planner.plan(&logical).unwrap();
1356 assert!(physical.columns().contains(&"a".to_string()));
1358 assert!(physical.columns().contains(&"b".to_string()));
1359 }
1360
1361 #[test]
1362 fn test_plan_expand_with_edge_variable() {
1363 let store = create_test_store();
1364 let planner = Planner::new(store);
1365
1366 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1368 items: vec![
1369 ReturnItem {
1370 expression: LogicalExpression::Variable("a".to_string()),
1371 alias: None,
1372 },
1373 ReturnItem {
1374 expression: LogicalExpression::Variable("r".to_string()),
1375 alias: None,
1376 },
1377 ReturnItem {
1378 expression: LogicalExpression::Variable("b".to_string()),
1379 alias: None,
1380 },
1381 ],
1382 distinct: false,
1383 input: Box::new(LogicalOperator::Expand(ExpandOp {
1384 from_variable: "a".to_string(),
1385 to_variable: "b".to_string(),
1386 edge_variable: Some("r".to_string()),
1387 direction: ExpandDirection::Outgoing,
1388 edge_types: vec!["KNOWS".to_string()],
1389 min_hops: 1,
1390 max_hops: Some(1),
1391 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1392 variable: "a".to_string(),
1393 label: None,
1394 input: None,
1395 })),
1396 path_alias: None,
1397 path_mode: PathMode::Walk,
1398 })),
1399 }));
1400
1401 let physical = planner.plan(&logical).unwrap();
1402 assert!(physical.columns().contains(&"a".to_string()));
1403 assert!(physical.columns().contains(&"r".to_string()));
1404 assert!(physical.columns().contains(&"b".to_string()));
1405 }
1406
1407 #[test]
1410 fn test_plan_limit() {
1411 let store = create_test_store();
1412 let planner = Planner::new(store);
1413
1414 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1416 items: vec![ReturnItem {
1417 expression: LogicalExpression::Variable("n".to_string()),
1418 alias: None,
1419 }],
1420 distinct: false,
1421 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1422 count: 10.into(),
1423 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1424 variable: "n".to_string(),
1425 label: None,
1426 input: None,
1427 })),
1428 })),
1429 }));
1430
1431 let physical = planner.plan(&logical).unwrap();
1432 assert_eq!(physical.columns(), &["n"]);
1433 }
1434
1435 #[test]
1436 fn test_plan_skip() {
1437 let store = create_test_store();
1438 let planner = Planner::new(store);
1439
1440 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1442 items: vec![ReturnItem {
1443 expression: LogicalExpression::Variable("n".to_string()),
1444 alias: None,
1445 }],
1446 distinct: false,
1447 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1448 count: 5.into(),
1449 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1450 variable: "n".to_string(),
1451 label: None,
1452 input: None,
1453 })),
1454 })),
1455 }));
1456
1457 let physical = planner.plan(&logical).unwrap();
1458 assert_eq!(physical.columns(), &["n"]);
1459 }
1460
1461 #[test]
1462 fn test_plan_sort() {
1463 let store = create_test_store();
1464 let planner = Planner::new(store);
1465
1466 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1468 items: vec![ReturnItem {
1469 expression: LogicalExpression::Variable("n".to_string()),
1470 alias: None,
1471 }],
1472 distinct: false,
1473 input: Box::new(LogicalOperator::Sort(SortOp {
1474 keys: vec![SortKey {
1475 expression: LogicalExpression::Variable("n".to_string()),
1476 order: SortOrder::Ascending,
1477 nulls: None,
1478 }],
1479 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1480 variable: "n".to_string(),
1481 label: None,
1482 input: None,
1483 })),
1484 })),
1485 }));
1486
1487 let physical = planner.plan(&logical).unwrap();
1488 assert_eq!(physical.columns(), &["n"]);
1489 }
1490
1491 #[test]
1492 fn test_plan_sort_descending() {
1493 let store = create_test_store();
1494 let planner = Planner::new(store);
1495
1496 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1498 items: vec![ReturnItem {
1499 expression: LogicalExpression::Variable("n".to_string()),
1500 alias: None,
1501 }],
1502 distinct: false,
1503 input: Box::new(LogicalOperator::Sort(SortOp {
1504 keys: vec![SortKey {
1505 expression: LogicalExpression::Variable("n".to_string()),
1506 order: SortOrder::Descending,
1507 nulls: None,
1508 }],
1509 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1510 variable: "n".to_string(),
1511 label: None,
1512 input: None,
1513 })),
1514 })),
1515 }));
1516
1517 let physical = planner.plan(&logical).unwrap();
1518 assert_eq!(physical.columns(), &["n"]);
1519 }
1520
1521 #[test]
1522 fn test_plan_distinct() {
1523 let store = create_test_store();
1524 let planner = Planner::new(store);
1525
1526 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1528 items: vec![ReturnItem {
1529 expression: LogicalExpression::Variable("n".to_string()),
1530 alias: None,
1531 }],
1532 distinct: false,
1533 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1534 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1535 variable: "n".to_string(),
1536 label: None,
1537 input: None,
1538 })),
1539 columns: None,
1540 })),
1541 }));
1542
1543 let physical = planner.plan(&logical).unwrap();
1544 assert_eq!(physical.columns(), &["n"]);
1545 }
1546
1547 #[test]
1548 fn test_plan_distinct_with_columns() {
1549 let store = create_test_store();
1550 let planner = Planner::new(store);
1551
1552 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1554 items: vec![ReturnItem {
1555 expression: LogicalExpression::Variable("n".to_string()),
1556 alias: None,
1557 }],
1558 distinct: false,
1559 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1560 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1561 variable: "n".to_string(),
1562 label: None,
1563 input: None,
1564 })),
1565 columns: Some(vec!["n".to_string()]),
1566 })),
1567 }));
1568
1569 let physical = planner.plan(&logical).unwrap();
1570 assert_eq!(physical.columns(), &["n"]);
1571 }
1572
1573 #[test]
1574 fn test_plan_distinct_with_nonexistent_columns() {
1575 let store = create_test_store();
1576 let planner = Planner::new(store);
1577
1578 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1581 items: vec![ReturnItem {
1582 expression: LogicalExpression::Variable("n".to_string()),
1583 alias: None,
1584 }],
1585 distinct: false,
1586 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1587 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1588 variable: "n".to_string(),
1589 label: None,
1590 input: None,
1591 })),
1592 columns: Some(vec!["nonexistent".to_string()]),
1593 })),
1594 }));
1595
1596 let physical = planner.plan(&logical).unwrap();
1597 assert_eq!(physical.columns(), &["n"]);
1598 }
1599
1600 #[test]
1603 fn test_plan_aggregate_count() {
1604 let store = create_test_store();
1605 let planner = Planner::new(store);
1606
1607 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1609 items: vec![ReturnItem {
1610 expression: LogicalExpression::Variable("cnt".to_string()),
1611 alias: None,
1612 }],
1613 distinct: false,
1614 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1615 group_by: vec![],
1616 aggregates: vec![LogicalAggregateExpr {
1617 function: LogicalAggregateFunction::Count,
1618 expression: Some(LogicalExpression::Variable("n".to_string())),
1619 expression2: None,
1620 distinct: false,
1621 alias: Some("cnt".to_string()),
1622 percentile: None,
1623 separator: None,
1624 }],
1625 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1626 variable: "n".to_string(),
1627 label: None,
1628 input: None,
1629 })),
1630 having: None,
1631 })),
1632 }));
1633
1634 let physical = planner.plan(&logical).unwrap();
1635 assert!(physical.columns().contains(&"cnt".to_string()));
1636 }
1637
1638 #[test]
1639 fn test_plan_aggregate_with_group_by() {
1640 let store = create_test_store();
1641 let planner = Planner::new(store);
1642
1643 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1645 group_by: vec![LogicalExpression::Property {
1646 variable: "n".to_string(),
1647 property: "city".to_string(),
1648 }],
1649 aggregates: vec![LogicalAggregateExpr {
1650 function: LogicalAggregateFunction::Count,
1651 expression: Some(LogicalExpression::Variable("n".to_string())),
1652 expression2: None,
1653 distinct: false,
1654 alias: Some("cnt".to_string()),
1655 percentile: None,
1656 separator: None,
1657 }],
1658 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1659 variable: "n".to_string(),
1660 label: Some("Person".to_string()),
1661 input: None,
1662 })),
1663 having: None,
1664 }));
1665
1666 let physical = planner.plan(&logical).unwrap();
1667 assert_eq!(physical.columns().len(), 2);
1668 }
1669
1670 #[test]
1671 fn test_plan_aggregate_sum() {
1672 let store = create_test_store();
1673 let planner = Planner::new(store);
1674
1675 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1677 group_by: vec![],
1678 aggregates: vec![LogicalAggregateExpr {
1679 function: LogicalAggregateFunction::Sum,
1680 expression: Some(LogicalExpression::Property {
1681 variable: "n".to_string(),
1682 property: "value".to_string(),
1683 }),
1684 expression2: None,
1685 distinct: false,
1686 alias: Some("total".to_string()),
1687 percentile: None,
1688 separator: None,
1689 }],
1690 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1691 variable: "n".to_string(),
1692 label: None,
1693 input: None,
1694 })),
1695 having: None,
1696 }));
1697
1698 let physical = planner.plan(&logical).unwrap();
1699 assert!(physical.columns().contains(&"total".to_string()));
1700 }
1701
1702 #[test]
1703 fn test_plan_aggregate_avg() {
1704 let store = create_test_store();
1705 let planner = Planner::new(store);
1706
1707 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1709 group_by: vec![],
1710 aggregates: vec![LogicalAggregateExpr {
1711 function: LogicalAggregateFunction::Avg,
1712 expression: Some(LogicalExpression::Property {
1713 variable: "n".to_string(),
1714 property: "score".to_string(),
1715 }),
1716 expression2: None,
1717 distinct: false,
1718 alias: Some("average".to_string()),
1719 percentile: None,
1720 separator: None,
1721 }],
1722 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1723 variable: "n".to_string(),
1724 label: None,
1725 input: None,
1726 })),
1727 having: None,
1728 }));
1729
1730 let physical = planner.plan(&logical).unwrap();
1731 assert!(physical.columns().contains(&"average".to_string()));
1732 }
1733
1734 #[test]
1735 fn test_plan_aggregate_min_max() {
1736 let store = create_test_store();
1737 let planner = Planner::new(store);
1738
1739 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1741 group_by: vec![],
1742 aggregates: vec![
1743 LogicalAggregateExpr {
1744 function: LogicalAggregateFunction::Min,
1745 expression: Some(LogicalExpression::Property {
1746 variable: "n".to_string(),
1747 property: "age".to_string(),
1748 }),
1749 expression2: None,
1750 distinct: false,
1751 alias: Some("youngest".to_string()),
1752 percentile: None,
1753 separator: None,
1754 },
1755 LogicalAggregateExpr {
1756 function: LogicalAggregateFunction::Max,
1757 expression: Some(LogicalExpression::Property {
1758 variable: "n".to_string(),
1759 property: "age".to_string(),
1760 }),
1761 expression2: None,
1762 distinct: false,
1763 alias: Some("oldest".to_string()),
1764 percentile: None,
1765 separator: None,
1766 },
1767 ],
1768 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1769 variable: "n".to_string(),
1770 label: None,
1771 input: None,
1772 })),
1773 having: None,
1774 }));
1775
1776 let physical = planner.plan(&logical).unwrap();
1777 assert!(physical.columns().contains(&"youngest".to_string()));
1778 assert!(physical.columns().contains(&"oldest".to_string()));
1779 }
1780
1781 #[test]
1784 fn test_plan_inner_join() {
1785 let store = create_test_store();
1786 let planner = Planner::new(store);
1787
1788 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1790 items: vec![
1791 ReturnItem {
1792 expression: LogicalExpression::Variable("a".to_string()),
1793 alias: None,
1794 },
1795 ReturnItem {
1796 expression: LogicalExpression::Variable("b".to_string()),
1797 alias: None,
1798 },
1799 ],
1800 distinct: false,
1801 input: Box::new(LogicalOperator::Join(JoinOp {
1802 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1803 variable: "a".to_string(),
1804 label: Some("Person".to_string()),
1805 input: None,
1806 })),
1807 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1808 variable: "b".to_string(),
1809 label: Some("Company".to_string()),
1810 input: None,
1811 })),
1812 join_type: JoinType::Inner,
1813 conditions: vec![JoinCondition {
1814 left: LogicalExpression::Variable("a".to_string()),
1815 right: LogicalExpression::Variable("b".to_string()),
1816 }],
1817 })),
1818 }));
1819
1820 let physical = planner.plan(&logical).unwrap();
1821 assert!(physical.columns().contains(&"a".to_string()));
1822 assert!(physical.columns().contains(&"b".to_string()));
1823 }
1824
1825 #[test]
1826 fn test_plan_cross_join() {
1827 let store = create_test_store();
1828 let planner = Planner::new(store);
1829
1830 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1832 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1833 variable: "a".to_string(),
1834 label: None,
1835 input: None,
1836 })),
1837 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1838 variable: "b".to_string(),
1839 label: None,
1840 input: None,
1841 })),
1842 join_type: JoinType::Cross,
1843 conditions: vec![],
1844 }));
1845
1846 let physical = planner.plan(&logical).unwrap();
1847 assert_eq!(physical.columns().len(), 2);
1848 }
1849
1850 #[test]
1851 fn test_plan_left_join() {
1852 let store = create_test_store();
1853 let planner = Planner::new(store);
1854
1855 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1856 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1857 variable: "a".to_string(),
1858 label: None,
1859 input: None,
1860 })),
1861 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1862 variable: "b".to_string(),
1863 label: None,
1864 input: None,
1865 })),
1866 join_type: JoinType::Left,
1867 conditions: vec![],
1868 }));
1869
1870 let physical = planner.plan(&logical).unwrap();
1871 assert_eq!(physical.columns().len(), 2);
1872 }
1873
1874 fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1877 let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStoreSearch>);
1878 p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1879 p
1880 }
1881
1882 #[test]
1883 fn test_plan_create_node() {
1884 let store = create_test_store();
1885 let planner = create_writable_planner(&store);
1886
1887 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1889 variable: "n".to_string(),
1890 labels: vec!["Person".to_string()],
1891 properties: vec![(
1892 "name".to_string(),
1893 LogicalExpression::Literal(Value::String("Alix".into())),
1894 )],
1895 input: None,
1896 }));
1897
1898 let physical = planner.plan(&logical).unwrap();
1899 assert!(physical.columns().contains(&"n".to_string()));
1900 }
1901
1902 #[test]
1903 fn test_plan_create_edge() {
1904 let store = create_test_store();
1905 let planner = create_writable_planner(&store);
1906
1907 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1909 variable: Some("r".to_string()),
1910 from_variable: "a".to_string(),
1911 to_variable: "b".to_string(),
1912 edge_type: "KNOWS".to_string(),
1913 properties: vec![],
1914 input: Box::new(LogicalOperator::Join(JoinOp {
1915 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1916 variable: "a".to_string(),
1917 label: None,
1918 input: None,
1919 })),
1920 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1921 variable: "b".to_string(),
1922 label: None,
1923 input: None,
1924 })),
1925 join_type: JoinType::Cross,
1926 conditions: vec![],
1927 })),
1928 }));
1929
1930 let physical = planner.plan(&logical).unwrap();
1931 assert!(physical.columns().contains(&"r".to_string()));
1932 }
1933
1934 #[test]
1935 fn test_plan_delete_node() {
1936 let store = create_test_store();
1937 let planner = create_writable_planner(&store);
1938
1939 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1941 variable: "n".to_string(),
1942 detach: false,
1943 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1944 variable: "n".to_string(),
1945 label: None,
1946 input: None,
1947 })),
1948 }));
1949
1950 let physical = planner.plan(&logical).unwrap();
1951 assert!(physical.columns().contains(&"n".to_string()));
1952 }
1953
1954 #[test]
1957 fn test_plan_empty_errors() {
1958 let store = create_test_store();
1959 let planner = Planner::new(store);
1960
1961 let logical = LogicalPlan::new(LogicalOperator::Empty);
1962 let result = planner.plan(&logical);
1963 assert!(result.is_err());
1964 }
1965
1966 #[test]
1967 fn test_plan_missing_variable_in_return() {
1968 let store = create_test_store();
1969 let planner = Planner::new(store);
1970
1971 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1973 items: vec![ReturnItem {
1974 expression: LogicalExpression::Variable("missing".to_string()),
1975 alias: None,
1976 }],
1977 distinct: false,
1978 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1979 variable: "n".to_string(),
1980 label: None,
1981 input: None,
1982 })),
1983 }));
1984
1985 let result = planner.plan(&logical);
1986 assert!(result.is_err());
1987 }
1988
1989 #[test]
1992 fn test_convert_binary_ops() {
1993 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1994 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1995 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1996 assert!(convert_binary_op(BinaryOp::Le).is_ok());
1997 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1998 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1999 assert!(convert_binary_op(BinaryOp::And).is_ok());
2000 assert!(convert_binary_op(BinaryOp::Or).is_ok());
2001 assert!(convert_binary_op(BinaryOp::Add).is_ok());
2002 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2003 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2004 assert!(convert_binary_op(BinaryOp::Div).is_ok());
2005 }
2006
2007 #[test]
2008 fn test_convert_unary_ops() {
2009 assert!(convert_unary_op(UnaryOp::Not).is_ok());
2010 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2011 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2012 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2013 }
2014
2015 #[test]
2016 fn test_convert_aggregate_functions() {
2017 assert!(matches!(
2018 convert_aggregate_function(LogicalAggregateFunction::Count),
2019 PhysicalAggregateFunction::Count
2020 ));
2021 assert!(matches!(
2022 convert_aggregate_function(LogicalAggregateFunction::Sum),
2023 PhysicalAggregateFunction::Sum
2024 ));
2025 assert!(matches!(
2026 convert_aggregate_function(LogicalAggregateFunction::Avg),
2027 PhysicalAggregateFunction::Avg
2028 ));
2029 assert!(matches!(
2030 convert_aggregate_function(LogicalAggregateFunction::Min),
2031 PhysicalAggregateFunction::Min
2032 ));
2033 assert!(matches!(
2034 convert_aggregate_function(LogicalAggregateFunction::Max),
2035 PhysicalAggregateFunction::Max
2036 ));
2037 }
2038
2039 #[test]
2040 fn test_planner_accessors() {
2041 let store = create_test_store();
2042 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2043
2044 assert!(planner.transaction_id().is_none());
2045 assert!(planner.transaction_manager().is_none());
2046 let _ = planner.viewing_epoch(); }
2048
2049 #[test]
2050 fn test_physical_plan_accessors() {
2051 let store = create_test_store();
2052 let planner = Planner::new(store);
2053
2054 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2055 variable: "n".to_string(),
2056 label: None,
2057 input: None,
2058 }));
2059
2060 let physical = planner.plan(&logical).unwrap();
2061 assert_eq!(physical.columns(), &["n"]);
2062
2063 let _ = physical.into_operator();
2065 }
2066
2067 #[test]
2070 fn test_plan_adaptive_with_scan() {
2071 let store = create_test_store();
2072 let planner = Planner::new(store);
2073
2074 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2076 items: vec![ReturnItem {
2077 expression: LogicalExpression::Variable("n".to_string()),
2078 alias: None,
2079 }],
2080 distinct: false,
2081 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2082 variable: "n".to_string(),
2083 label: Some("Person".to_string()),
2084 input: None,
2085 })),
2086 }));
2087
2088 let physical = planner.plan_adaptive(&logical).unwrap();
2089 assert_eq!(physical.columns(), &["n"]);
2090 assert!(physical.adaptive_context.is_some());
2092 }
2093
2094 #[test]
2095 fn test_plan_adaptive_with_filter() {
2096 let store = create_test_store();
2097 let planner = Planner::new(store);
2098
2099 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2101 items: vec![ReturnItem {
2102 expression: LogicalExpression::Variable("n".to_string()),
2103 alias: None,
2104 }],
2105 distinct: false,
2106 input: Box::new(LogicalOperator::Filter(FilterOp {
2107 predicate: LogicalExpression::Binary {
2108 left: Box::new(LogicalExpression::Property {
2109 variable: "n".to_string(),
2110 property: "age".to_string(),
2111 }),
2112 op: BinaryOp::Gt,
2113 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2114 },
2115 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2116 variable: "n".to_string(),
2117 label: None,
2118 input: None,
2119 })),
2120 pushdown_hint: None,
2121 })),
2122 }));
2123
2124 let physical = planner.plan_adaptive(&logical).unwrap();
2125 assert!(physical.adaptive_context.is_some());
2126 }
2127
2128 #[test]
2129 fn test_plan_adaptive_with_expand() {
2130 let store = create_test_store();
2131 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2132 .with_factorized_execution(false);
2133
2134 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2136 items: vec![
2137 ReturnItem {
2138 expression: LogicalExpression::Variable("a".to_string()),
2139 alias: None,
2140 },
2141 ReturnItem {
2142 expression: LogicalExpression::Variable("b".to_string()),
2143 alias: None,
2144 },
2145 ],
2146 distinct: false,
2147 input: Box::new(LogicalOperator::Expand(ExpandOp {
2148 from_variable: "a".to_string(),
2149 to_variable: "b".to_string(),
2150 edge_variable: None,
2151 direction: ExpandDirection::Outgoing,
2152 edge_types: vec!["KNOWS".to_string()],
2153 min_hops: 1,
2154 max_hops: Some(1),
2155 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2156 variable: "a".to_string(),
2157 label: None,
2158 input: None,
2159 })),
2160 path_alias: None,
2161 path_mode: PathMode::Walk,
2162 })),
2163 }));
2164
2165 let physical = planner.plan_adaptive(&logical).unwrap();
2166 assert!(physical.adaptive_context.is_some());
2167 }
2168
2169 #[test]
2170 fn test_plan_adaptive_with_join() {
2171 let store = create_test_store();
2172 let planner = Planner::new(store);
2173
2174 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2175 items: vec![
2176 ReturnItem {
2177 expression: LogicalExpression::Variable("a".to_string()),
2178 alias: None,
2179 },
2180 ReturnItem {
2181 expression: LogicalExpression::Variable("b".to_string()),
2182 alias: None,
2183 },
2184 ],
2185 distinct: false,
2186 input: Box::new(LogicalOperator::Join(JoinOp {
2187 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2188 variable: "a".to_string(),
2189 label: None,
2190 input: None,
2191 })),
2192 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2193 variable: "b".to_string(),
2194 label: None,
2195 input: None,
2196 })),
2197 join_type: JoinType::Cross,
2198 conditions: vec![],
2199 })),
2200 }));
2201
2202 let physical = planner.plan_adaptive(&logical).unwrap();
2203 assert!(physical.adaptive_context.is_some());
2204 }
2205
2206 #[test]
2207 fn test_plan_adaptive_with_aggregate() {
2208 let store = create_test_store();
2209 let planner = Planner::new(store);
2210
2211 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2212 group_by: vec![],
2213 aggregates: vec![LogicalAggregateExpr {
2214 function: LogicalAggregateFunction::Count,
2215 expression: Some(LogicalExpression::Variable("n".to_string())),
2216 expression2: None,
2217 distinct: false,
2218 alias: Some("cnt".to_string()),
2219 percentile: None,
2220 separator: None,
2221 }],
2222 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2223 variable: "n".to_string(),
2224 label: None,
2225 input: None,
2226 })),
2227 having: None,
2228 }));
2229
2230 let physical = planner.plan_adaptive(&logical).unwrap();
2231 assert!(physical.adaptive_context.is_some());
2232 }
2233
2234 #[test]
2235 fn test_plan_adaptive_with_distinct() {
2236 let store = create_test_store();
2237 let planner = Planner::new(store);
2238
2239 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2240 items: vec![ReturnItem {
2241 expression: LogicalExpression::Variable("n".to_string()),
2242 alias: None,
2243 }],
2244 distinct: false,
2245 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2246 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2247 variable: "n".to_string(),
2248 label: None,
2249 input: None,
2250 })),
2251 columns: None,
2252 })),
2253 }));
2254
2255 let physical = planner.plan_adaptive(&logical).unwrap();
2256 assert!(physical.adaptive_context.is_some());
2257 }
2258
2259 #[test]
2260 fn test_plan_adaptive_with_limit() {
2261 let store = create_test_store();
2262 let planner = Planner::new(store);
2263
2264 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2265 items: vec![ReturnItem {
2266 expression: LogicalExpression::Variable("n".to_string()),
2267 alias: None,
2268 }],
2269 distinct: false,
2270 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2271 count: 10.into(),
2272 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2273 variable: "n".to_string(),
2274 label: None,
2275 input: None,
2276 })),
2277 })),
2278 }));
2279
2280 let physical = planner.plan_adaptive(&logical).unwrap();
2281 assert!(physical.adaptive_context.is_some());
2282 }
2283
2284 #[test]
2285 fn test_plan_adaptive_with_skip() {
2286 let store = create_test_store();
2287 let planner = Planner::new(store);
2288
2289 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2290 items: vec![ReturnItem {
2291 expression: LogicalExpression::Variable("n".to_string()),
2292 alias: None,
2293 }],
2294 distinct: false,
2295 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2296 count: 5.into(),
2297 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2298 variable: "n".to_string(),
2299 label: None,
2300 input: None,
2301 })),
2302 })),
2303 }));
2304
2305 let physical = planner.plan_adaptive(&logical).unwrap();
2306 assert!(physical.adaptive_context.is_some());
2307 }
2308
2309 #[test]
2310 fn test_plan_adaptive_with_sort() {
2311 let store = create_test_store();
2312 let planner = Planner::new(store);
2313
2314 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2315 items: vec![ReturnItem {
2316 expression: LogicalExpression::Variable("n".to_string()),
2317 alias: None,
2318 }],
2319 distinct: false,
2320 input: Box::new(LogicalOperator::Sort(SortOp {
2321 keys: vec![SortKey {
2322 expression: LogicalExpression::Variable("n".to_string()),
2323 order: SortOrder::Ascending,
2324 nulls: None,
2325 }],
2326 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2327 variable: "n".to_string(),
2328 label: None,
2329 input: None,
2330 })),
2331 })),
2332 }));
2333
2334 let physical = planner.plan_adaptive(&logical).unwrap();
2335 assert!(physical.adaptive_context.is_some());
2336 }
2337
2338 #[test]
2339 fn test_plan_adaptive_with_union() {
2340 let store = create_test_store();
2341 let planner = Planner::new(store);
2342
2343 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2344 items: vec![ReturnItem {
2345 expression: LogicalExpression::Variable("n".to_string()),
2346 alias: None,
2347 }],
2348 distinct: false,
2349 input: Box::new(LogicalOperator::Union(UnionOp {
2350 inputs: vec![
2351 LogicalOperator::NodeScan(NodeScanOp {
2352 variable: "n".to_string(),
2353 label: Some("Person".to_string()),
2354 input: None,
2355 }),
2356 LogicalOperator::NodeScan(NodeScanOp {
2357 variable: "n".to_string(),
2358 label: Some("Company".to_string()),
2359 input: None,
2360 }),
2361 ],
2362 })),
2363 }));
2364
2365 let physical = planner.plan_adaptive(&logical).unwrap();
2366 assert!(physical.adaptive_context.is_some());
2367 }
2368
2369 #[test]
2372 fn test_plan_expand_variable_length() {
2373 let store = create_test_store();
2374 let planner = Planner::new(store);
2375
2376 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2378 items: vec![
2379 ReturnItem {
2380 expression: LogicalExpression::Variable("a".to_string()),
2381 alias: None,
2382 },
2383 ReturnItem {
2384 expression: LogicalExpression::Variable("b".to_string()),
2385 alias: None,
2386 },
2387 ],
2388 distinct: false,
2389 input: Box::new(LogicalOperator::Expand(ExpandOp {
2390 from_variable: "a".to_string(),
2391 to_variable: "b".to_string(),
2392 edge_variable: None,
2393 direction: ExpandDirection::Outgoing,
2394 edge_types: vec!["KNOWS".to_string()],
2395 min_hops: 1,
2396 max_hops: Some(3),
2397 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2398 variable: "a".to_string(),
2399 label: None,
2400 input: None,
2401 })),
2402 path_alias: None,
2403 path_mode: PathMode::Walk,
2404 })),
2405 }));
2406
2407 let physical = planner.plan(&logical).unwrap();
2408 assert!(physical.columns().contains(&"a".to_string()));
2409 assert!(physical.columns().contains(&"b".to_string()));
2410 }
2411
2412 #[test]
2413 fn test_plan_expand_with_path_alias() {
2414 let store = create_test_store();
2415 let planner = Planner::new(store);
2416
2417 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2419 items: vec![
2420 ReturnItem {
2421 expression: LogicalExpression::Variable("a".to_string()),
2422 alias: None,
2423 },
2424 ReturnItem {
2425 expression: LogicalExpression::Variable("b".to_string()),
2426 alias: None,
2427 },
2428 ],
2429 distinct: false,
2430 input: Box::new(LogicalOperator::Expand(ExpandOp {
2431 from_variable: "a".to_string(),
2432 to_variable: "b".to_string(),
2433 edge_variable: None,
2434 direction: ExpandDirection::Outgoing,
2435 edge_types: vec!["KNOWS".to_string()],
2436 min_hops: 1,
2437 max_hops: Some(3),
2438 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2439 variable: "a".to_string(),
2440 label: None,
2441 input: None,
2442 })),
2443 path_alias: Some("p".to_string()),
2444 path_mode: PathMode::Walk,
2445 })),
2446 }));
2447
2448 let physical = planner.plan(&logical).unwrap();
2449 assert!(physical.columns().contains(&"a".to_string()));
2451 assert!(physical.columns().contains(&"b".to_string()));
2452 }
2453
2454 #[test]
2455 fn test_plan_expand_incoming() {
2456 let store = create_test_store();
2457 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2458 .with_factorized_execution(false);
2459
2460 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2462 items: vec![
2463 ReturnItem {
2464 expression: LogicalExpression::Variable("a".to_string()),
2465 alias: None,
2466 },
2467 ReturnItem {
2468 expression: LogicalExpression::Variable("b".to_string()),
2469 alias: None,
2470 },
2471 ],
2472 distinct: false,
2473 input: Box::new(LogicalOperator::Expand(ExpandOp {
2474 from_variable: "a".to_string(),
2475 to_variable: "b".to_string(),
2476 edge_variable: None,
2477 direction: ExpandDirection::Incoming,
2478 edge_types: vec!["KNOWS".to_string()],
2479 min_hops: 1,
2480 max_hops: Some(1),
2481 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2482 variable: "a".to_string(),
2483 label: None,
2484 input: None,
2485 })),
2486 path_alias: None,
2487 path_mode: PathMode::Walk,
2488 })),
2489 }));
2490
2491 let physical = planner.plan(&logical).unwrap();
2492 assert!(physical.columns().contains(&"a".to_string()));
2493 assert!(physical.columns().contains(&"b".to_string()));
2494 }
2495
2496 #[test]
2497 fn test_plan_expand_both_directions() {
2498 let store = create_test_store();
2499 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2500 .with_factorized_execution(false);
2501
2502 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2504 items: vec![
2505 ReturnItem {
2506 expression: LogicalExpression::Variable("a".to_string()),
2507 alias: None,
2508 },
2509 ReturnItem {
2510 expression: LogicalExpression::Variable("b".to_string()),
2511 alias: None,
2512 },
2513 ],
2514 distinct: false,
2515 input: Box::new(LogicalOperator::Expand(ExpandOp {
2516 from_variable: "a".to_string(),
2517 to_variable: "b".to_string(),
2518 edge_variable: None,
2519 direction: ExpandDirection::Both,
2520 edge_types: vec!["KNOWS".to_string()],
2521 min_hops: 1,
2522 max_hops: Some(1),
2523 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2524 variable: "a".to_string(),
2525 label: None,
2526 input: None,
2527 })),
2528 path_alias: None,
2529 path_mode: PathMode::Walk,
2530 })),
2531 }));
2532
2533 let physical = planner.plan(&logical).unwrap();
2534 assert!(physical.columns().contains(&"a".to_string()));
2535 assert!(physical.columns().contains(&"b".to_string()));
2536 }
2537
2538 #[test]
2541 fn test_planner_with_context() {
2542 use crate::transaction::TransactionManager;
2543
2544 let store = create_test_store();
2545 let transaction_manager = Arc::new(TransactionManager::new());
2546 let transaction_id = transaction_manager.begin();
2547 let epoch = transaction_manager.current_epoch();
2548
2549 let planner = Planner::with_context(
2550 Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
2551 Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2552 Arc::clone(&transaction_manager),
2553 Some(transaction_id),
2554 epoch,
2555 );
2556
2557 assert_eq!(planner.transaction_id(), Some(transaction_id));
2558 assert!(planner.transaction_manager().is_some());
2559 assert_eq!(planner.viewing_epoch(), epoch);
2560 }
2561
2562 #[test]
2563 fn test_planner_with_factorized_execution_disabled() {
2564 let store = create_test_store();
2565 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2566 .with_factorized_execution(false);
2567
2568 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2570 items: vec![
2571 ReturnItem {
2572 expression: LogicalExpression::Variable("a".to_string()),
2573 alias: None,
2574 },
2575 ReturnItem {
2576 expression: LogicalExpression::Variable("c".to_string()),
2577 alias: None,
2578 },
2579 ],
2580 distinct: false,
2581 input: Box::new(LogicalOperator::Expand(ExpandOp {
2582 from_variable: "b".to_string(),
2583 to_variable: "c".to_string(),
2584 edge_variable: None,
2585 direction: ExpandDirection::Outgoing,
2586 edge_types: vec![],
2587 min_hops: 1,
2588 max_hops: Some(1),
2589 input: Box::new(LogicalOperator::Expand(ExpandOp {
2590 from_variable: "a".to_string(),
2591 to_variable: "b".to_string(),
2592 edge_variable: None,
2593 direction: ExpandDirection::Outgoing,
2594 edge_types: vec![],
2595 min_hops: 1,
2596 max_hops: Some(1),
2597 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2598 variable: "a".to_string(),
2599 label: None,
2600 input: None,
2601 })),
2602 path_alias: None,
2603 path_mode: PathMode::Walk,
2604 })),
2605 path_alias: None,
2606 path_mode: PathMode::Walk,
2607 })),
2608 }));
2609
2610 let physical = planner.plan(&logical).unwrap();
2611 assert!(physical.columns().contains(&"a".to_string()));
2612 assert!(physical.columns().contains(&"c".to_string()));
2613 }
2614
2615 #[test]
2618 fn test_plan_sort_by_property() {
2619 let store = create_test_store();
2620 let planner = Planner::new(store);
2621
2622 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2624 items: vec![ReturnItem {
2625 expression: LogicalExpression::Variable("n".to_string()),
2626 alias: None,
2627 }],
2628 distinct: false,
2629 input: Box::new(LogicalOperator::Sort(SortOp {
2630 keys: vec![SortKey {
2631 expression: LogicalExpression::Property {
2632 variable: "n".to_string(),
2633 property: "name".to_string(),
2634 },
2635 order: SortOrder::Ascending,
2636 nulls: None,
2637 }],
2638 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2639 variable: "n".to_string(),
2640 label: None,
2641 input: None,
2642 })),
2643 })),
2644 }));
2645
2646 let physical = planner.plan(&logical).unwrap();
2647 assert!(physical.columns().contains(&"n".to_string()));
2649 }
2650
2651 #[test]
2654 fn test_plan_scan_with_input() {
2655 let store = create_test_store();
2656 let planner = Planner::new(store);
2657
2658 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2660 items: vec![
2661 ReturnItem {
2662 expression: LogicalExpression::Variable("a".to_string()),
2663 alias: None,
2664 },
2665 ReturnItem {
2666 expression: LogicalExpression::Variable("b".to_string()),
2667 alias: None,
2668 },
2669 ],
2670 distinct: false,
2671 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2672 variable: "b".to_string(),
2673 label: Some("Company".to_string()),
2674 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2675 variable: "a".to_string(),
2676 label: Some("Person".to_string()),
2677 input: None,
2678 }))),
2679 })),
2680 }));
2681
2682 let physical = planner.plan(&logical).unwrap();
2683 assert!(physical.columns().contains(&"a".to_string()));
2684 assert!(physical.columns().contains(&"b".to_string()));
2685 }
2686
2687 use crate::catalog::Catalog;
2695 use crate::query::plan::{
2696 AddLabelOp, AntiJoinOp, ApplyOp, BindOp, DeleteEdgeOp, EdgeScanOp, ExceptOp,
2697 HorizontalAggregateOp, IntersectOp, LeftJoinOp, LoadDataFormat, LoadDataOp, MapCollectOp,
2698 MergeOp, MergeRelationshipOp, MultiWayJoinOp, OtherwiseOp, ParameterScanOp, RemoveLabelOp,
2699 SetPropertyOp, ShortestPathOp, TripleComponent, TripleScanOp, UnionOp, UnwindOp,
2700 };
2701 use grafeo_core::execution::operators::{Operator, SessionContext};
2702
2703 fn full_store() -> Arc<LpgStore> {
2704 let store = Arc::new(LpgStore::new().unwrap());
2706 let vincent = store.create_node(&["Person"]);
2707 let jules = store.create_node(&["Person"]);
2708 let mia = store.create_node(&["Person"]);
2709 let _company = store.create_node(&["Company"]);
2710 store.create_edge(vincent, jules, "KNOWS");
2711 store.create_edge(jules, mia, "KNOWS");
2712 store
2713 }
2714
2715 fn scan_person(var: &str) -> LogicalOperator {
2716 LogicalOperator::NodeScan(NodeScanOp {
2717 variable: var.to_string(),
2718 label: Some("Person".to_string()),
2719 input: None,
2720 })
2721 }
2722
2723 fn scan_any(var: &str) -> LogicalOperator {
2724 LogicalOperator::NodeScan(NodeScanOp {
2725 variable: var.to_string(),
2726 label: None,
2727 input: None,
2728 })
2729 }
2730
2731 #[test]
2734 fn test_with_read_only_flag() {
2735 let store = create_test_store();
2736 let planner =
2737 Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(true);
2738 assert!(planner.read_only);
2739
2740 let planner_off =
2741 Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(false);
2742 assert!(!planner_off.read_only);
2743 }
2744
2745 #[test]
2746 fn test_with_catalog() {
2747 let store = create_test_store();
2748 let catalog = Arc::new(Catalog::new());
2749 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2750 .with_catalog(Arc::clone(&catalog));
2751 assert!(planner.catalog.is_some());
2752 }
2753
2754 #[test]
2755 fn test_with_session_context() {
2756 let store = create_test_store();
2757 let context = SessionContext {
2758 current_schema: Some("public".to_string()),
2759 current_graph: Some("main".to_string()),
2760 ..SessionContext::default()
2761 };
2762 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2763 .with_session_context(context);
2764 assert_eq!(
2765 planner.session_context.current_schema.as_deref(),
2766 Some("public")
2767 );
2768 assert_eq!(
2769 planner.session_context.current_graph.as_deref(),
2770 Some("main")
2771 );
2772 }
2773
2774 #[test]
2777 fn test_register_edge_column_named() {
2778 let store = create_test_store();
2779 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2780 let name = planner.register_edge_column(&Some("r".to_string()));
2781 assert_eq!(name, "r");
2782 assert!(planner.edge_columns.borrow().contains("r"));
2783 }
2784
2785 #[test]
2786 fn test_register_edge_column_anonymous_counter_advances() {
2787 let store = create_test_store();
2788 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2789 let a = planner.register_edge_column(&None);
2790 let b = planner.register_edge_column(&None);
2791 assert_eq!(a, "_anon_edge_0");
2792 assert_eq!(b, "_anon_edge_1");
2793 assert!(planner.edge_columns.borrow().contains("_anon_edge_0"));
2794 assert!(planner.edge_columns.borrow().contains("_anon_edge_1"));
2795 }
2796
2797 #[test]
2800 fn test_create_node_without_write_store_errors() {
2801 let store = create_test_store();
2803 let planner = Planner::new(store);
2804
2805 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2806 variable: "n".to_string(),
2807 labels: vec!["Person".to_string()],
2808 properties: vec![],
2809 input: None,
2810 }));
2811
2812 let result = planner.plan(&logical);
2813 assert!(result.is_err());
2814 }
2815
2816 #[test]
2819 fn test_plan_profiled_collects_entries() {
2820 let store = create_test_store();
2821 let planner = Planner::new(store);
2822
2823 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2824 items: vec![ReturnItem {
2825 expression: LogicalExpression::Variable("n".to_string()),
2826 alias: None,
2827 }],
2828 distinct: false,
2829 input: Box::new(scan_person("n")),
2830 }));
2831
2832 let (physical, entries) = planner.plan_profiled(&logical).unwrap();
2833 assert_eq!(physical.columns(), &["n"]);
2834 assert!(
2836 entries.len() >= 2,
2837 "expected entries, got {}",
2838 entries.len()
2839 );
2840 assert!(!planner.profiling.get());
2842 }
2843
2844 #[test]
2845 fn test_plan_profiled_propagates_plan_errors() {
2846 let store = create_test_store();
2847 let planner = Planner::new(store);
2848 let logical = LogicalPlan::new(LogicalOperator::Empty);
2849 let result = planner.plan_profiled(&logical);
2850 assert!(result.is_err());
2851 assert!(!planner.profiling.get());
2853 }
2854
2855 #[test]
2858 fn test_plan_edge_scan_is_unsupported() {
2859 let store = create_test_store();
2861 let planner = Planner::new(store);
2862 let logical = LogicalPlan::new(LogicalOperator::EdgeScan(EdgeScanOp {
2863 variable: "e".to_string(),
2864 edge_types: vec![],
2865 input: None,
2866 }));
2867 let err = planner.plan(&logical).err().expect("plan should fail");
2868 assert!(format!("{err}").contains("Unsupported operator"));
2869 }
2870
2871 #[test]
2872 fn test_plan_triple_scan_is_unsupported() {
2873 let store = create_test_store();
2874 let planner = Planner::new(store);
2875 let logical = LogicalPlan::new(LogicalOperator::TripleScan(TripleScanOp {
2876 subject: TripleComponent::Variable("s".to_string()),
2877 predicate: TripleComponent::Variable("p".to_string()),
2878 object: TripleComponent::Variable("o".to_string()),
2879 graph: None,
2880 input: None,
2881 dataset: None,
2882 }));
2883 assert!(planner.plan(&logical).is_err());
2884 }
2885
2886 #[test]
2887 fn test_plan_bind_is_unsupported() {
2888 let store = create_test_store();
2889 let planner = Planner::new(store);
2890 let logical = LogicalPlan::new(LogicalOperator::Bind(BindOp {
2891 expression: LogicalExpression::Literal(Value::Int64(1)),
2892 variable: "x".to_string(),
2893 input: Box::new(scan_any("n")),
2894 }));
2895 assert!(planner.plan(&logical).is_err());
2896 }
2897
2898 #[test]
2899 fn test_plan_parameter_scan_without_apply_errors() {
2900 let store = create_test_store();
2901 let planner = Planner::new(store);
2902 let logical = LogicalPlan::new(LogicalOperator::ParameterScan(ParameterScanOp {
2903 columns: vec!["n".to_string()],
2904 }));
2905 let err = planner.plan(&logical).err().expect("plan should fail");
2906 assert!(format!("{err}").contains("ParameterScan"));
2907 }
2908
2909 #[test]
2912 fn test_plan_union_dispatch() {
2913 let store = create_test_store();
2914 let planner = Planner::new(store);
2915 let logical = LogicalPlan::new(LogicalOperator::Union(UnionOp {
2916 inputs: vec![scan_person("n"), scan_person("n")],
2917 }));
2918 let physical = planner.plan(&logical).unwrap();
2919 assert_eq!(physical.columns(), &["n"]);
2920 }
2921
2922 #[test]
2923 fn test_plan_except_dispatch() {
2924 let store = create_test_store();
2925 let planner = Planner::new(store);
2926 let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
2927 left: Box::new(scan_person("n")),
2928 right: Box::new(scan_person("n")),
2929 all: false,
2930 }));
2931 let physical = planner.plan(&logical).unwrap();
2932 assert_eq!(physical.columns(), &["n"]);
2933 }
2934
2935 #[test]
2936 fn test_plan_intersect_dispatch() {
2937 let store = create_test_store();
2938 let planner = Planner::new(store);
2939 let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
2940 left: Box::new(scan_person("n")),
2941 right: Box::new(scan_person("n")),
2942 all: false,
2943 }));
2944 let physical = planner.plan(&logical).unwrap();
2945 assert_eq!(physical.columns(), &["n"]);
2946 }
2947
2948 #[test]
2949 fn test_plan_otherwise_dispatch() {
2950 let store = create_test_store();
2951 let planner = Planner::new(store);
2952 let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
2953 left: Box::new(scan_person("n")),
2954 right: Box::new(scan_any("n")),
2955 }));
2956 let physical = planner.plan(&logical).unwrap();
2957 assert_eq!(physical.columns(), &["n"]);
2958 }
2959
2960 #[test]
2961 fn test_plan_left_join_dispatch() {
2962 let store = create_test_store();
2963 let planner = Planner::new(store);
2964 let logical = LogicalPlan::new(LogicalOperator::LeftJoin(LeftJoinOp {
2965 left: Box::new(scan_any("a")),
2966 right: Box::new(scan_any("b")),
2967 condition: None,
2968 }));
2969 let physical = planner.plan(&logical).unwrap();
2970 assert!(physical.columns().contains(&"a".to_string()));
2971 assert!(physical.columns().contains(&"b".to_string()));
2972 }
2973
2974 #[test]
2975 fn test_plan_anti_join_dispatch() {
2976 let store = create_test_store();
2977 let planner = Planner::new(store);
2978 let logical = LogicalPlan::new(LogicalOperator::AntiJoin(AntiJoinOp {
2979 left: Box::new(scan_any("a")),
2980 right: Box::new(scan_any("b")),
2981 }));
2982 let physical = planner.plan(&logical).unwrap();
2983 assert!(physical.columns().contains(&"a".to_string()));
2984 }
2985
2986 #[test]
2987 fn test_plan_apply_uncorrelated_dispatch() {
2988 let store = create_test_store();
2989 let planner = Planner::new(store);
2990 let logical = LogicalPlan::new(LogicalOperator::Apply(ApplyOp {
2991 input: Box::new(scan_any("a")),
2992 subplan: Box::new(scan_any("b")),
2993 shared_variables: vec![],
2994 optional: false,
2995 }));
2996 let physical = planner.plan(&logical).unwrap();
2997 assert!(physical.columns().contains(&"a".to_string()));
2998 assert!(physical.columns().contains(&"b".to_string()));
2999 }
3000
3001 #[test]
3002 fn test_plan_unwind_literal_list() {
3003 let store = create_test_store();
3004 let planner = Planner::new(store);
3005
3006 let logical = LogicalPlan::new(LogicalOperator::Unwind(UnwindOp {
3008 expression: LogicalExpression::List(vec![
3009 LogicalExpression::Literal(Value::Int64(1)),
3010 LogicalExpression::Literal(Value::Int64(2)),
3011 LogicalExpression::Literal(Value::Int64(3)),
3012 ]),
3013 variable: "x".to_string(),
3014 ordinality_var: None,
3015 offset_var: None,
3016 input: Box::new(LogicalOperator::Empty),
3017 }));
3018 let physical = planner.plan(&logical).unwrap();
3019 assert!(physical.columns().contains(&"x".to_string()));
3020 }
3021
3022 #[test]
3023 fn test_plan_merge_dispatch() {
3024 let store = create_test_store();
3025 let planner = create_writable_planner(&store);
3026
3027 let logical = LogicalPlan::new(LogicalOperator::Merge(MergeOp {
3029 variable: "n".to_string(),
3030 labels: vec!["Person".to_string()],
3031 match_properties: vec![],
3032 on_create: vec![],
3033 on_match: vec![],
3034 input: Box::new(LogicalOperator::Empty),
3035 }));
3036 let physical = planner.plan(&logical).unwrap();
3037 assert!(physical.columns().contains(&"n".to_string()));
3038 }
3039
3040 #[test]
3041 fn test_plan_merge_relationship_dispatch() {
3042 let store = full_store();
3043 let planner = create_writable_planner(&store);
3044
3045 let logical = LogicalPlan::new(LogicalOperator::MergeRelationship(MergeRelationshipOp {
3047 variable: "r".to_string(),
3048 source_variable: "a".to_string(),
3049 target_variable: "b".to_string(),
3050 edge_type: "KNOWS".to_string(),
3051 match_properties: vec![],
3052 on_create: vec![],
3053 on_match: vec![],
3054 input: Box::new(LogicalOperator::Join(JoinOp {
3055 left: Box::new(scan_person("a")),
3056 right: Box::new(scan_person("b")),
3057 join_type: JoinType::Cross,
3058 conditions: vec![],
3059 })),
3060 }));
3061 let physical = planner.plan(&logical).unwrap();
3062 assert!(physical.columns().contains(&"r".to_string()));
3063 }
3064
3065 #[test]
3066 fn test_plan_add_label_dispatch() {
3067 let store = full_store();
3068 let planner = create_writable_planner(&store);
3069 let logical = LogicalPlan::new(LogicalOperator::AddLabel(AddLabelOp {
3070 variable: "n".to_string(),
3071 labels: vec!["VIP".to_string()],
3072 input: Box::new(scan_person("n")),
3073 }));
3074 let physical = planner.plan(&logical).unwrap();
3075 assert!(physical.columns().contains(&"labels_added".to_string()));
3076 }
3077
3078 #[test]
3079 fn test_plan_remove_label_dispatch() {
3080 let store = full_store();
3081 let planner = create_writable_planner(&store);
3082 let logical = LogicalPlan::new(LogicalOperator::RemoveLabel(RemoveLabelOp {
3083 variable: "n".to_string(),
3084 labels: vec!["Person".to_string()],
3085 input: Box::new(scan_person("n")),
3086 }));
3087 let physical = planner.plan(&logical).unwrap();
3088 assert!(physical.columns().contains(&"labels_removed".to_string()));
3089 }
3090
3091 #[test]
3092 fn test_plan_set_property_dispatch() {
3093 let store = full_store();
3094 let planner = create_writable_planner(&store);
3095 let logical = LogicalPlan::new(LogicalOperator::SetProperty(SetPropertyOp {
3096 variable: "n".to_string(),
3097 properties: vec![(
3098 "city".to_string(),
3099 LogicalExpression::Literal(Value::String("Amsterdam".into())),
3100 )],
3101 replace: false,
3102 is_edge: false,
3103 input: Box::new(scan_person("n")),
3104 }));
3105 let physical = planner.plan(&logical).unwrap();
3106 assert!(physical.columns().contains(&"n".to_string()));
3107 }
3108
3109 #[test]
3110 fn test_plan_delete_edge_dispatch() {
3111 let store = full_store();
3112 let planner = create_writable_planner(&store);
3113
3114 let expand_op = LogicalOperator::Expand(ExpandOp {
3116 from_variable: "a".to_string(),
3117 to_variable: "b".to_string(),
3118 edge_variable: Some("r".to_string()),
3119 direction: ExpandDirection::Outgoing,
3120 edge_types: vec!["KNOWS".to_string()],
3121 min_hops: 1,
3122 max_hops: Some(1),
3123 input: Box::new(scan_person("a")),
3124 path_alias: None,
3125 path_mode: PathMode::Walk,
3126 });
3127 let logical = LogicalPlan::new(LogicalOperator::DeleteEdge(DeleteEdgeOp {
3128 variable: "r".to_string(),
3129 input: Box::new(expand_op),
3130 }));
3131 let physical = planner.plan(&logical).unwrap();
3132 assert!(physical.columns().contains(&"r".to_string()));
3133 }
3134
3135 #[test]
3136 fn test_plan_shortest_path_dispatch() {
3137 let store = full_store();
3138 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3139
3140 let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3142 input: Box::new(LogicalOperator::Join(JoinOp {
3143 left: Box::new(scan_person("a")),
3144 right: Box::new(scan_person("b")),
3145 join_type: JoinType::Cross,
3146 conditions: vec![],
3147 })),
3148 source_var: "a".to_string(),
3149 target_var: "b".to_string(),
3150 edge_types: vec!["KNOWS".to_string()],
3151 direction: ExpandDirection::Outgoing,
3152 path_alias: "p".to_string(),
3153 all_paths: false,
3154 }));
3155 let physical = planner.plan(&logical).unwrap();
3156 assert!(
3157 physical
3158 .columns()
3159 .iter()
3160 .any(|c| c.contains("_path_length_p"))
3161 );
3162 }
3163
3164 #[test]
3165 fn test_plan_shortest_path_missing_source_errors() {
3166 let store = full_store();
3167 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3168 let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3169 input: Box::new(scan_person("a")),
3170 source_var: "missing".to_string(),
3171 target_var: "a".to_string(),
3172 edge_types: vec![],
3173 direction: ExpandDirection::Both,
3174 path_alias: "p".to_string(),
3175 all_paths: false,
3176 }));
3177 let err = planner.plan(&logical).err().expect("plan should fail");
3178 assert!(format!("{err}").contains("Source variable"));
3179 }
3180
3181 #[test]
3182 fn test_plan_map_collect_dispatch() {
3183 let store = create_test_store();
3185 let planner = Planner::new(store);
3186 let input_with_kv = LogicalOperator::Project(crate::query::plan::ProjectOp {
3187 projections: vec![
3188 crate::query::plan::Projection {
3189 expression: LogicalExpression::Literal(Value::String("key".into())),
3190 alias: Some("k".to_string()),
3191 },
3192 crate::query::plan::Projection {
3193 expression: LogicalExpression::Literal(Value::Int64(1)),
3194 alias: Some("v".to_string()),
3195 },
3196 ],
3197 input: Box::new(scan_person("n")),
3198 pass_through_input: false,
3199 });
3200 let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3201 key_var: "k".to_string(),
3202 value_var: "v".to_string(),
3203 alias: "m".to_string(),
3204 input: Box::new(input_with_kv),
3205 }));
3206 let physical = planner.plan(&logical).unwrap();
3207 assert_eq!(physical.columns(), &["m"]);
3208 }
3209
3210 #[test]
3211 fn test_plan_map_collect_missing_key_errors() {
3212 let store = create_test_store();
3213 let planner = Planner::new(store);
3214 let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3215 key_var: "not_there".to_string(),
3216 value_var: "also_missing".to_string(),
3217 alias: "m".to_string(),
3218 input: Box::new(scan_any("n")),
3219 }));
3220 let err = planner.plan(&logical).err().expect("plan should fail");
3221 let msg = format!("{err}");
3222 assert!(msg.contains("MapCollect key"), "got: {msg}");
3223 }
3224
3225 #[test]
3226 fn test_plan_map_collect_missing_value_errors() {
3227 let store = create_test_store();
3228 let planner = Planner::new(store);
3229 let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3231 key_var: "n".to_string(),
3232 value_var: "missing_value".to_string(),
3233 alias: "m".to_string(),
3234 input: Box::new(scan_any("n")),
3235 }));
3236 let err = planner.plan(&logical).err().expect("plan should fail");
3237 let msg = format!("{err}");
3238 assert!(msg.contains("MapCollect value"), "got: {msg}");
3239 }
3240
3241 #[test]
3242 fn test_plan_horizontal_aggregate_missing_column_errors() {
3243 let store = create_test_store();
3244 let planner = Planner::new(store);
3245 let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3246 HorizontalAggregateOp {
3247 list_column: "not_a_column".to_string(),
3248 entity_kind: crate::query::plan::EntityKind::Edge,
3249 function: LogicalAggregateFunction::Count,
3250 property: "age".to_string(),
3251 alias: "total".to_string(),
3252 input: Box::new(scan_any("n")),
3253 },
3254 ));
3255 let err = planner.plan(&logical).err().expect("plan should fail");
3256 assert!(format!("{err}").contains("HorizontalAggregate"));
3257 }
3258
3259 #[test]
3260 fn test_plan_load_data_dispatch() {
3261 let store = create_test_store();
3262 let planner = Planner::new(store);
3263 let logical = LogicalPlan::new(LogicalOperator::LoadData(LoadDataOp {
3265 format: LoadDataFormat::Csv,
3266 with_headers: true,
3267 path: "/nonexistent/data.csv".to_string(),
3268 variable: "row".to_string(),
3269 field_terminator: Some(','),
3270 }));
3271 let physical = planner.plan(&logical).unwrap();
3272 assert_eq!(physical.columns(), &["row"]);
3273 }
3274
3275 #[test]
3276 fn test_plan_multi_way_join_dispatch() {
3277 let store = full_store();
3280 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3281 let ab = LogicalOperator::Expand(ExpandOp {
3282 from_variable: "a".to_string(),
3283 to_variable: "b".to_string(),
3284 edge_variable: None,
3285 direction: ExpandDirection::Outgoing,
3286 edge_types: vec!["KNOWS".to_string()],
3287 min_hops: 1,
3288 max_hops: Some(1),
3289 input: Box::new(scan_person("a")),
3290 path_alias: None,
3291 path_mode: PathMode::Walk,
3292 });
3293 let bc = LogicalOperator::Expand(ExpandOp {
3294 from_variable: "b".to_string(),
3295 to_variable: "c".to_string(),
3296 edge_variable: None,
3297 direction: ExpandDirection::Outgoing,
3298 edge_types: vec!["KNOWS".to_string()],
3299 min_hops: 1,
3300 max_hops: Some(1),
3301 input: Box::new(scan_person("b")),
3302 path_alias: None,
3303 path_mode: PathMode::Walk,
3304 });
3305 let ca = LogicalOperator::Expand(ExpandOp {
3306 from_variable: "c".to_string(),
3307 to_variable: "a".to_string(),
3308 edge_variable: None,
3309 direction: ExpandDirection::Outgoing,
3310 edge_types: vec!["KNOWS".to_string()],
3311 min_hops: 1,
3312 max_hops: Some(1),
3313 input: Box::new(scan_person("c")),
3314 path_alias: None,
3315 path_mode: PathMode::Walk,
3316 });
3317 let logical = LogicalPlan::new(LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3318 inputs: vec![ab, bc, ca],
3319 conditions: vec![],
3320 shared_variables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
3321 }));
3322 let _ = planner.plan(&logical);
3323 }
3324
3325 #[test]
3326 fn test_plan_horizontal_aggregate_dispatch() {
3327 let store = full_store();
3329 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3330
3331 let path = LogicalOperator::Expand(ExpandOp {
3332 from_variable: "a".to_string(),
3333 to_variable: "b".to_string(),
3334 edge_variable: Some("r".to_string()),
3335 direction: ExpandDirection::Outgoing,
3336 edge_types: vec!["KNOWS".to_string()],
3337 min_hops: 1,
3338 max_hops: Some(3),
3339 input: Box::new(scan_person("a")),
3340 path_alias: Some("p".to_string()),
3341 path_mode: PathMode::Walk,
3342 });
3343 let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3345 HorizontalAggregateOp {
3346 list_column: "_path_edges_p".to_string(),
3347 entity_kind: crate::query::plan::EntityKind::Edge,
3348 function: LogicalAggregateFunction::Count,
3349 property: "weight".to_string(),
3350 alias: "edge_count".to_string(),
3351 input: Box::new(path),
3352 },
3353 ));
3354 let physical = planner.plan(&logical).unwrap();
3355 assert!(physical.columns().contains(&"edge_count".to_string()));
3356 }
3357
3358 #[test]
3361 fn test_plan_adaptive_with_except() {
3362 let store = create_test_store();
3363 let planner = Planner::new(store);
3364 let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3365 left: Box::new(scan_person("n")),
3366 right: Box::new(scan_person("n")),
3367 all: false,
3368 }));
3369 let physical = planner.plan_adaptive(&logical).unwrap();
3370 assert!(physical.adaptive_context.is_some());
3371 }
3372
3373 #[test]
3374 fn test_plan_adaptive_with_intersect() {
3375 let store = create_test_store();
3376 let planner = Planner::new(store);
3377 let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3378 left: Box::new(scan_person("n")),
3379 right: Box::new(scan_any("n")),
3380 all: false,
3381 }));
3382 let physical = planner.plan_adaptive(&logical).unwrap();
3383 assert!(physical.adaptive_context.is_some());
3384 }
3385
3386 #[test]
3387 fn test_plan_adaptive_with_otherwise() {
3388 let store = create_test_store();
3389 let planner = Planner::new(store);
3390 let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3391 left: Box::new(scan_person("n")),
3392 right: Box::new(scan_any("n")),
3393 }));
3394 let physical = planner.plan_adaptive(&logical).unwrap();
3395 assert!(physical.adaptive_context.is_some());
3396 }
3397
3398 #[test]
3401 fn test_count_expand_chain_variable_length_breaks_chain() {
3402 let var_expand = LogicalOperator::Expand(ExpandOp {
3404 from_variable: "a".to_string(),
3405 to_variable: "b".to_string(),
3406 edge_variable: None,
3407 direction: ExpandDirection::Outgoing,
3408 edge_types: vec!["KNOWS".to_string()],
3409 min_hops: 1,
3410 max_hops: Some(3),
3411 input: Box::new(scan_person("a")),
3412 path_alias: None,
3413 path_mode: PathMode::Walk,
3414 });
3415 let (count, _) = Planner::count_expand_chain(&var_expand);
3416 assert_eq!(count, 0);
3417 }
3418
3419 #[cfg(feature = "algos")]
3422 #[test]
3423 fn test_static_result_operator_emits_rows_and_resets() {
3424 use grafeo_common::types::Value;
3425 let rows = vec![
3426 vec![Value::Int64(1), Value::String("Vincent".into())],
3427 vec![Value::Int64(2), Value::String("Jules".into())],
3428 ];
3429 let mut op = StaticResultOperator {
3430 rows,
3431 column_indices: vec![0, 1],
3432 row_index: 0,
3433 };
3434 assert_eq!(op.name(), "StaticResult");
3435 let chunk = op.next().unwrap().expect("first chunk");
3436 assert_eq!(chunk.row_count(), 2);
3437 assert!(op.next().unwrap().is_none());
3439 op.reset();
3441 assert!(op.next().unwrap().is_some());
3442 let boxed: Box<dyn Operator> = Box::new(StaticResultOperator {
3444 rows: vec![vec![Value::Null]],
3445 column_indices: vec![0],
3446 row_index: 0,
3447 });
3448 let _any = boxed.into_any();
3449 }
3450}