1mod aggregate;
95mod expand;
96mod expression;
97mod filter;
98mod filter_hybrid;
99mod join;
100mod mutation;
101mod project;
102mod scan;
103
104#[cfg(feature = "algos")]
105use crate::query::plan::CallProcedureOp;
106#[cfg(feature = "text-index")]
107use crate::query::plan::TextScanOp;
108use crate::query::plan::{
109 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
110 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
111 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
112 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
113 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
114 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
115 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
116};
117#[cfg(feature = "vector-index")]
118use crate::query::plan::{VectorMetric, VectorScanOp};
119use grafeo_common::grafeo_debug_span;
120use grafeo_common::types::{EpochId, TransactionId};
121use grafeo_common::types::{LogicalType, Value};
122use grafeo_common::utils::error::{Error, Result};
123use grafeo_core::execution::AdaptiveContext;
124use grafeo_core::execution::operators::{
125 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
126 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
127 DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
128 ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
129 FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
130 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
131 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
132 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
133 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RangeScanOperator,
134 RemoveLabelOperator, ScanOperator, SetPropertyOperator, ShortestPathOperator,
135 SimpleAggregateOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
136 UnionOperator, UnwindOperator, VariableLengthExpandOperator,
137};
138use grafeo_core::graph::{Direction, GraphStoreMut, GraphStoreSearch};
139use std::collections::HashMap;
140use std::sync::Arc;
141
142use crate::query::planner::common;
143use crate::query::planner::common::expression_to_string;
144use crate::query::planner::{
145 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
146 convert_unary_op, value_to_logical_type,
147};
148use crate::transaction::TransactionManager;
149
150struct RangeBounds<'a> {
152 min: Option<&'a Value>,
153 max: Option<&'a Value>,
154 min_inclusive: bool,
155 max_inclusive: bool,
156}
157
158pub struct Planner {
160 pub(super) store: Arc<dyn GraphStoreSearch>,
162 pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
164 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
166 pub(super) transaction_id: Option<TransactionId>,
168 pub(super) viewing_epoch: EpochId,
170 pub(super) anon_edge_counter: std::cell::Cell<u32>,
172 pub(super) factorized_execution: bool,
174 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
177 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
180 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
182 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
184 #[cfg(feature = "lpg")]
187 pub(super) lpg_store: Option<Arc<grafeo_core::graph::lpg::LpgStore>>,
188 pub(super) correlated_param_state:
192 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
193 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
196 profiling: std::cell::Cell<bool>,
198 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
200 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
202 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
204 pub(super) read_only: bool,
208 pub(super) limit_hint: std::cell::Cell<Option<usize>>,
219}
220
221impl Planner {
222 #[must_use]
227 pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
228 let epoch = store.current_epoch();
229 Self {
230 store,
231 write_store: None,
232 transaction_manager: None,
233 transaction_id: None,
234 viewing_epoch: epoch,
235 anon_edge_counter: std::cell::Cell::new(0),
236 factorized_execution: true,
237 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
238 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
239 validator: None,
240 catalog: None,
241 #[cfg(feature = "lpg")]
242 lpg_store: None,
243 correlated_param_state: std::cell::RefCell::new(None),
244 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
245 profiling: std::cell::Cell::new(false),
246 profile_entries: std::cell::RefCell::new(Vec::new()),
247 write_tracker: None,
248 session_context: grafeo_core::execution::operators::SessionContext::default(),
249 read_only: false,
250 limit_hint: std::cell::Cell::new(None),
251 }
252 }
253
254 #[must_use]
256 pub fn with_context(
257 store: Arc<dyn GraphStoreSearch>,
258 write_store: Option<Arc<dyn GraphStoreMut>>,
259 transaction_manager: Arc<TransactionManager>,
260 transaction_id: Option<TransactionId>,
261 viewing_epoch: EpochId,
262 ) -> Self {
263 use crate::transaction::TransactionWriteTracker;
264
265 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
267 if transaction_id.is_some() {
268 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
269 &transaction_manager,
270 ))))
271 } else {
272 None
273 };
274
275 Self {
276 store,
277 write_store,
278 transaction_manager: Some(transaction_manager),
279 transaction_id,
280 viewing_epoch,
281 anon_edge_counter: std::cell::Cell::new(0),
282 factorized_execution: true,
283 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
284 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
285 validator: None,
286 catalog: None,
287 #[cfg(feature = "lpg")]
288 lpg_store: None,
289 correlated_param_state: std::cell::RefCell::new(None),
290 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
291 profiling: std::cell::Cell::new(false),
292 profile_entries: std::cell::RefCell::new(Vec::new()),
293 write_tracker,
294 session_context: grafeo_core::execution::operators::SessionContext::default(),
295 read_only: false,
296 limit_hint: std::cell::Cell::new(None),
297 }
298 }
299
300 #[must_use]
303 pub fn with_read_only(mut self, read_only: bool) -> Self {
304 self.read_only = read_only;
305 self
306 }
307
308 fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
310 self.write_store
311 .as_ref()
312 .map(Arc::clone)
313 .ok_or(Error::Transaction(
314 grafeo_common::utils::error::TransactionError::ReadOnly,
315 ))
316 }
317
318 #[must_use]
320 pub fn viewing_epoch(&self) -> EpochId {
321 self.viewing_epoch
322 }
323
324 #[must_use]
326 pub fn transaction_id(&self) -> Option<TransactionId> {
327 self.transaction_id
328 }
329
330 #[must_use]
332 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
333 self.transaction_manager.as_ref()
334 }
335
336 #[must_use]
338 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
339 self.factorized_execution = enabled;
340 self
341 }
342
343 #[must_use]
345 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
346 self.validator = Some(validator);
347 self
348 }
349
350 #[must_use]
352 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
353 self.catalog = Some(catalog);
354 self
355 }
356
357 #[cfg(feature = "lpg")]
360 #[must_use]
361 pub fn with_lpg_store(mut self, lpg_store: Arc<grafeo_core::graph::lpg::LpgStore>) -> Self {
362 self.lpg_store = Some(lpg_store);
363 self
364 }
365
366 #[must_use]
368 pub fn with_session_context(
369 mut self,
370 context: grafeo_core::execution::operators::SessionContext,
371 ) -> Self {
372 self.session_context = context;
373 self
374 }
375
376 pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
382 let name = edge_variable.clone().unwrap_or_else(|| {
383 let count = self.anon_edge_counter.get();
384 self.anon_edge_counter.set(count + 1);
385 format!("_anon_edge_{}", count)
386 });
387 self.edge_columns.borrow_mut().insert(name.clone());
388 name
389 }
390
391 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
395 match op {
396 LogicalOperator::Expand(expand) => {
397 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
398
399 if is_single_hop {
400 let (inner_count, base) = Self::count_expand_chain(&expand.input);
401 (inner_count + 1, base)
402 } else {
403 (0, op)
404 }
405 }
406 _ => (0, op),
407 }
408 }
409
410 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
414 let mut chain = Vec::new();
415 let mut current = op;
416
417 while let LogicalOperator::Expand(expand) = current {
418 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
419 if !is_single_hop {
420 break;
421 }
422 chain.push(expand);
423 current = &expand.input;
424 }
425
426 chain.reverse();
427 chain
428 }
429
430 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
437 let _span = grafeo_debug_span!("grafeo::query::plan");
438 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
439 Ok(PhysicalPlan {
440 operator,
441 columns,
442 adaptive_context: None,
443 })
444 }
445
446 pub fn plan_profiled(
457 &self,
458 logical_plan: &LogicalPlan,
459 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
460 self.profiling.set(true);
461 self.profile_entries.borrow_mut().clear();
462
463 let result = self.plan_operator(&logical_plan.root);
464
465 self.profiling.set(false);
466 let (operator, columns) = result?;
467 let entries = self.profile_entries.borrow_mut().drain(..).collect();
468
469 Ok((
470 PhysicalPlan {
471 operator,
472 columns,
473 adaptive_context: None,
474 },
475 entries,
476 ))
477 }
478
479 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
486 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
487
488 let mut adaptive_context = AdaptiveContext::new();
489 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
490
491 Ok(PhysicalPlan {
492 operator,
493 columns,
494 adaptive_context: Some(adaptive_context),
495 })
496 }
497
498 fn collect_cardinality_estimates(
500 &self,
501 op: &LogicalOperator,
502 ctx: &mut AdaptiveContext,
503 depth: usize,
504 ) {
505 match op {
506 LogicalOperator::NodeScan(scan) => {
507 let estimate = if let Some(label) = &scan.label {
508 self.store.nodes_by_label(label).len() as f64
509 } else {
510 self.store.node_count() as f64
511 };
512 let id = format!("scan_{}", scan.variable);
513 ctx.set_estimate(&id, estimate);
514
515 if let Some(input) = &scan.input {
516 self.collect_cardinality_estimates(input, ctx, depth + 1);
517 }
518 }
519 LogicalOperator::Filter(filter) => {
520 let input_estimate = self.estimate_cardinality(&filter.input);
521 let estimate = input_estimate * 0.3;
522 let id = format!("filter_{depth}");
523 ctx.set_estimate(&id, estimate);
524
525 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
526 }
527 LogicalOperator::Expand(expand) => {
528 let input_estimate = self.estimate_cardinality(&expand.input);
529 let stats = self.store.statistics();
530 let avg_degree = self.estimate_expand_degree(&stats, expand);
531 let estimate = input_estimate * avg_degree;
532 let id = format!("expand_{}", expand.to_variable);
533 ctx.set_estimate(&id, estimate);
534
535 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
536 }
537 LogicalOperator::Join(join) => {
538 let left_est = self.estimate_cardinality(&join.left);
539 let right_est = self.estimate_cardinality(&join.right);
540 let estimate = (left_est * right_est).sqrt();
541 let id = format!("join_{depth}");
542 ctx.set_estimate(&id, estimate);
543
544 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
545 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
546 }
547 LogicalOperator::Aggregate(agg) => {
548 let input_estimate = self.estimate_cardinality(&agg.input);
549 let estimate = if agg.group_by.is_empty() {
550 1.0
551 } else {
552 (input_estimate * 0.1).max(1.0)
553 };
554 let id = format!("aggregate_{depth}");
555 ctx.set_estimate(&id, estimate);
556
557 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
558 }
559 LogicalOperator::Distinct(distinct) => {
560 let input_estimate = self.estimate_cardinality(&distinct.input);
561 let estimate = (input_estimate * 0.5).max(1.0);
562 let id = format!("distinct_{depth}");
563 ctx.set_estimate(&id, estimate);
564
565 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
566 }
567 LogicalOperator::Return(ret) => {
568 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
569 }
570 LogicalOperator::Limit(limit) => {
571 let input_estimate = self.estimate_cardinality(&limit.input);
572 let estimate = (input_estimate).min(limit.count.estimate());
573 let id = format!("limit_{depth}");
574 ctx.set_estimate(&id, estimate);
575
576 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
577 }
578 LogicalOperator::Skip(skip) => {
579 let input_estimate = self.estimate_cardinality(&skip.input);
580 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
581 let id = format!("skip_{depth}");
582 ctx.set_estimate(&id, estimate);
583
584 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
585 }
586 LogicalOperator::Sort(sort) => {
587 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
588 }
589 LogicalOperator::Union(union) => {
590 let estimate: f64 = union
591 .inputs
592 .iter()
593 .map(|input| self.estimate_cardinality(input))
594 .sum();
595 let id = format!("union_{depth}");
596 ctx.set_estimate(&id, estimate);
597
598 for input in &union.inputs {
599 self.collect_cardinality_estimates(input, ctx, depth + 1);
600 }
601 }
602 _ => {
603 }
605 }
606 }
607
608 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
610 match op {
611 LogicalOperator::NodeScan(scan) => {
612 if let Some(label) = &scan.label {
613 self.store.nodes_by_label(label).len() as f64
614 } else {
615 self.store.node_count() as f64
616 }
617 }
618 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
619 LogicalOperator::Expand(expand) => {
620 let stats = self.store.statistics();
621 let avg_degree = self.estimate_expand_degree(&stats, expand);
622 self.estimate_cardinality(&expand.input) * avg_degree
623 }
624 LogicalOperator::Join(join) => {
625 let left = self.estimate_cardinality(&join.left);
626 let right = self.estimate_cardinality(&join.right);
627 (left * right).sqrt()
628 }
629 LogicalOperator::Aggregate(agg) => {
630 if agg.group_by.is_empty() {
631 1.0
632 } else {
633 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
634 }
635 }
636 LogicalOperator::Distinct(distinct) => {
637 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
638 }
639 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
640 LogicalOperator::Limit(limit) => self
641 .estimate_cardinality(&limit.input)
642 .min(limit.count.estimate()),
643 LogicalOperator::Skip(skip) => {
644 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
645 }
646 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
647 LogicalOperator::Union(union) => union
648 .inputs
649 .iter()
650 .map(|input| self.estimate_cardinality(input))
651 .sum(),
652 LogicalOperator::Except(except) => {
653 let left = self.estimate_cardinality(&except.left);
654 let right = self.estimate_cardinality(&except.right);
655 (left - right).max(0.0)
656 }
657 LogicalOperator::Intersect(intersect) => {
658 let left = self.estimate_cardinality(&intersect.left);
659 let right = self.estimate_cardinality(&intersect.right);
660 left.min(right)
661 }
662 LogicalOperator::Otherwise(otherwise) => self
663 .estimate_cardinality(&otherwise.left)
664 .max(self.estimate_cardinality(&otherwise.right)),
665 _ => 1000.0,
666 }
667 }
668
669 fn estimate_expand_degree(
671 &self,
672 stats: &grafeo_core::statistics::Statistics,
673 expand: &ExpandOp,
674 ) -> f64 {
675 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
676 if expand.edge_types.len() == 1 {
677 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
678 } else if stats.total_nodes > 0 {
679 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
680 } else {
681 10.0
682 }
683 }
684
685 fn maybe_profile(
688 &self,
689 result: Result<(Box<dyn Operator>, Vec<String>)>,
690 op: &LogicalOperator,
691 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
692 if self.profiling.get() {
693 let (physical, columns) = result?;
694 let (entry, stats) =
695 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
696 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
697 self.profile_entries.borrow_mut().push(entry);
698 Ok((Box::new(profiled), columns))
699 } else {
700 result
701 }
702 }
703
704 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
706 let result = match op {
707 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
708 LogicalOperator::Expand(expand) => {
709 if self.factorized_execution && !self.profiling.get() {
727 let (chain_len, _base) = Self::count_expand_chain(op);
728 if chain_len >= 2 {
729 return self.maybe_profile(self.plan_expand_chain(op), op);
730 }
731 }
732 self.plan_expand(expand)
733 }
734 LogicalOperator::Return(ret) => self.plan_return(ret),
735 LogicalOperator::Filter(filter) => self.plan_filter(filter),
736 LogicalOperator::Project(project) => self.plan_project(project),
737 LogicalOperator::Limit(limit) => self.plan_limit(limit),
738 LogicalOperator::Skip(skip) => self.plan_skip(skip),
739 LogicalOperator::Sort(sort) => self.plan_sort(sort),
740 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
741 LogicalOperator::Join(join) => self.plan_join(join),
742 LogicalOperator::Union(union) => self.plan_union(union),
743 LogicalOperator::Except(except) => self.plan_except(except),
744 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
745 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
746 LogicalOperator::Apply(apply) => self.plan_apply(apply),
747 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
748 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
749 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
750 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
751 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
752 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
753 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
754 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
755 LogicalOperator::Merge(merge) => self.plan_merge(merge),
756 LogicalOperator::MergeRelationship(merge_rel) => {
757 self.plan_merge_relationship(merge_rel)
758 }
759 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
760 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
761 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
762 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
763 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
764 #[cfg(feature = "algos")]
765 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
766 #[cfg(not(feature = "algos"))]
767 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
768 "CALL procedures require the 'algos' feature".to_string(),
769 )),
770 LogicalOperator::ParameterScan(_param_scan) => {
771 let state = self
772 .correlated_param_state
773 .borrow()
774 .clone()
775 .ok_or_else(|| {
776 Error::Internal(
777 "ParameterScan without correlated Apply context".to_string(),
778 )
779 })?;
780 let columns = state.columns.clone();
783 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
784 Ok((operator, columns))
785 }
786 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
787 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
788 LogicalOperator::LoadData(load) => {
789 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
790 load.path.clone(),
791 load.format,
792 load.with_headers,
793 load.field_terminator,
794 load.variable.clone(),
795 ));
796 Ok((operator, vec![load.variable.clone()]))
797 }
798 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
799 #[cfg(feature = "vector-index")]
800 LogicalOperator::VectorScan(scan) => self.plan_vector_scan(scan),
801 #[cfg(not(feature = "vector-index"))]
802 LogicalOperator::VectorScan(_) => Err(Error::Internal(
803 "VectorScan requires vector-index feature".to_string(),
804 )),
805 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
806 "VectorJoin requires vector-index feature".to_string(),
807 )),
808 #[cfg(feature = "text-index")]
809 LogicalOperator::TextScan(scan) => self.plan_text_scan(scan),
810 #[cfg(not(feature = "text-index"))]
811 LogicalOperator::TextScan(_) => Err(Error::Internal(
812 "TextScan requires text-index feature".to_string(),
813 )),
814 _ => Err(Error::Internal(format!(
815 "Unsupported operator: {:?}",
816 std::mem::discriminant(op)
817 ))),
818 };
819 self.maybe_profile(result, op)
820 }
821
822 fn plan_horizontal_aggregate(
824 &self,
825 ha: &HorizontalAggregateOp,
826 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
827 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
828
829 let list_col_idx = child_columns
830 .iter()
831 .position(|c| c == &ha.list_column)
832 .ok_or_else(|| {
833 Error::Internal(format!(
834 "HorizontalAggregate list column '{}' not found in {:?}",
835 ha.list_column, child_columns
836 ))
837 })?;
838
839 let entity_kind = match ha.entity_kind {
840 LogicalEntityKind::Edge => EntityKind::Edge,
841 LogicalEntityKind::Node => EntityKind::Node,
842 };
843
844 let function = convert_aggregate_function(ha.function);
845 let input_column_count = child_columns.len();
846
847 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
848 child_op,
849 list_col_idx,
850 entity_kind,
851 function,
852 ha.property.clone(),
853 Arc::clone(&self.store) as Arc<dyn GraphStoreSearch>,
854 input_column_count,
855 ));
856
857 let mut columns = child_columns;
858 columns.push(ha.alias.clone());
859 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
861
862 Ok((operator, columns))
863 }
864
865 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
867 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
868 let key_idx = child_columns
869 .iter()
870 .position(|c| c == &mc.key_var)
871 .ok_or_else(|| {
872 Error::Internal(format!(
873 "MapCollect key '{}' not in columns {:?}",
874 mc.key_var, child_columns
875 ))
876 })?;
877 let value_idx = child_columns
878 .iter()
879 .position(|c| c == &mc.value_var)
880 .ok_or_else(|| {
881 Error::Internal(format!(
882 "MapCollect value '{}' not in columns {:?}",
883 mc.value_var, child_columns
884 ))
885 })?;
886 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
887 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
888 Ok((operator, vec![mc.alias.clone()]))
889 }
890
891 #[cfg(feature = "text-index")]
893 fn plan_text_scan(&self, scan: &TextScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
894 use grafeo_core::execution::operators::TextScanOperator;
895
896 let query_string = match &scan.query {
897 LogicalExpression::Literal(Value::String(s)) => s.to_string(),
898 LogicalExpression::Parameter(name) => {
899 return Err(Error::Internal(format!(
900 "TextScan query parameter ${} not resolved",
901 name
902 )));
903 }
904 _ => {
905 return Err(Error::Internal(
906 "TextScan query must be a string literal or parameter".to_string(),
907 ));
908 }
909 };
910
911 let operator: Box<dyn Operator> = if let Some(k) = scan.k {
912 Box::new(TextScanOperator::top_k(
913 Arc::clone(&self.store),
914 &scan.label,
915 &scan.property,
916 &query_string,
917 k,
918 ))
919 } else if let Some(threshold) = scan.threshold {
920 Box::new(TextScanOperator::with_threshold(
921 Arc::clone(&self.store),
922 &scan.label,
923 &scan.property,
924 &query_string,
925 threshold,
926 ))
927 } else {
928 Box::new(TextScanOperator::top_k(
929 Arc::clone(&self.store),
930 &scan.label,
931 &scan.property,
932 &query_string,
933 100,
934 ))
935 };
936
937 let mut columns = vec![scan.variable.clone()];
938 if let Some(ref score_col) = scan.score_column {
939 columns.push(score_col.clone());
940 }
941
942 Ok((operator, columns))
943 }
944
945 #[cfg(feature = "vector-index")]
947 pub(super) fn plan_vector_scan(
948 &self,
949 scan: &VectorScanOp,
950 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
951 use grafeo_core::execution::operators::VectorScanOperator;
952 use grafeo_core::index::vector::DistanceMetric;
953
954 if scan.input.is_some() {
959 return Err(Error::Internal(
960 "VectorScan with an input subtree is not supported, use VectorJoin for hybrid graph+vector queries".to_string(),
961 ));
962 }
963
964 let query_vec = self.resolve_vector_literal(&scan.query_vector)?;
965
966 let requested_metric = scan.metric.map(|m| match m {
967 VectorMetric::Cosine => DistanceMetric::Cosine,
968 VectorMetric::Euclidean => DistanceMetric::Euclidean,
969 VectorMetric::DotProduct => DistanceMetric::DotProduct,
970 VectorMetric::Manhattan => DistanceMetric::Manhattan,
971 });
972
973 let k = scan.k.unwrap_or_else(|| {
978 scan.label.as_ref().map_or_else(
979 || self.store.node_count(),
980 |l| self.store.nodes_by_label_count(l),
981 )
982 });
983
984 let index_metric = scan
989 .label
990 .as_ref()
991 .and_then(|label| self.store.vector_index_metric(label, &scan.property));
992 let metric = requested_metric
993 .or(index_metric)
994 .unwrap_or(DistanceMetric::Cosine);
995
996 let mut operator = VectorScanOperator::new(
1000 Arc::clone(&self.store),
1001 scan.label.clone(),
1002 scan.property.clone(),
1003 query_vec,
1004 k,
1005 metric,
1006 );
1007
1008 if let Some(sim) = scan.min_similarity {
1009 operator = operator.with_min_similarity(sim);
1010 }
1011 if let Some(dist) = scan.max_distance {
1012 operator = operator.with_max_distance(dist);
1013 }
1014
1015 let mut columns = vec![scan.variable.clone()];
1016 let metric_tag = match metric {
1020 DistanceMetric::Cosine => "cos",
1021 DistanceMetric::Euclidean => "euc",
1022 DistanceMetric::DotProduct => "dot",
1023 DistanceMetric::Manhattan => "man",
1024 _ => "other",
1027 };
1028 columns.push(project::vector_score_column_name(
1029 metric_tag,
1030 &scan.property,
1031 &scan.variable,
1032 &scan.query_vector,
1033 ));
1034
1035 Ok((Box::new(operator), columns))
1036 }
1037
1038 #[cfg(feature = "vector-index")]
1040 pub(super) fn resolve_vector_literal(&self, expr: &LogicalExpression) -> Result<Vec<f32>> {
1041 #[allow(clippy::cast_possible_truncation)]
1043 match expr {
1044 LogicalExpression::Literal(Value::Vector(v)) => Ok(v.to_vec()),
1045 LogicalExpression::Literal(Value::List(list)) => {
1046 let mut vec = Vec::with_capacity(list.len());
1047 for item in list.iter() {
1048 match item {
1049 Value::Float64(f) => vec.push(*f as f32),
1050 Value::Int64(i) => vec.push(*i as f32),
1051 _ => {
1052 return Err(Error::Internal(
1053 "Vector elements must be numeric".to_string(),
1054 ));
1055 }
1056 }
1057 }
1058 Ok(vec)
1059 }
1060 LogicalExpression::List(items) => {
1063 let mut vec = Vec::with_capacity(items.len());
1064 for item in items {
1065 match item {
1066 LogicalExpression::Literal(Value::Float64(f)) => vec.push(*f as f32),
1067 LogicalExpression::Literal(Value::Int64(i)) => vec.push(*i as f32),
1068 _ => {
1069 return Err(Error::Internal(
1070 "Vector elements must be numeric literals".to_string(),
1071 ));
1072 }
1073 }
1074 }
1075 Ok(vec)
1076 }
1077 _ => Err(Error::Internal("Expected vector literal".to_string())),
1078 }
1079 }
1080}
1081
1082#[cfg(feature = "algos")]
1084struct StaticResultOperator {
1085 rows: Vec<Vec<Value>>,
1086 column_indices: Vec<usize>,
1087 row_index: usize,
1088}
1089
1090#[cfg(feature = "algos")]
1091impl Operator for StaticResultOperator {
1092 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
1093 use grafeo_core::execution::DataChunk;
1094
1095 if self.row_index >= self.rows.len() {
1096 return Ok(None);
1097 }
1098
1099 let remaining = self.rows.len() - self.row_index;
1100 let chunk_rows = remaining.min(1024);
1101 let col_count = self.column_indices.len();
1102
1103 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
1104 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
1105
1106 for row_offset in 0..chunk_rows {
1107 let row = &self.rows[self.row_index + row_offset];
1108 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
1109 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
1110 if let Some(col) = chunk.column_mut(col_idx) {
1111 col.push_value(value);
1112 }
1113 }
1114 }
1115 chunk.set_count(chunk_rows);
1116
1117 self.row_index += chunk_rows;
1118 Ok(Some(chunk))
1119 }
1120
1121 fn reset(&mut self) {
1122 self.row_index = 0;
1123 }
1124
1125 fn name(&self) -> &'static str {
1126 "StaticResult"
1127 }
1128
1129 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1130 self
1131 }
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136 use super::*;
1137 use crate::query::plan::{
1138 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
1139 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
1140 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
1141 SkipOp as LogicalSkipOp, SortKey, SortOp,
1142 };
1143 use grafeo_common::types::Value;
1144 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
1145 use grafeo_core::graph::GraphStoreMut;
1146 use grafeo_core::graph::lpg::LpgStore;
1147
1148 fn create_test_store() -> Arc<LpgStore> {
1149 let store = Arc::new(LpgStore::new().unwrap());
1150 store.create_node(&["Person"]);
1151 store.create_node(&["Person"]);
1152 store.create_node(&["Company"]);
1153 store
1154 }
1155
1156 #[test]
1159 fn test_plan_simple_scan() {
1160 let store = create_test_store();
1161 let planner = Planner::new(store);
1162
1163 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1165 items: vec![ReturnItem {
1166 expression: LogicalExpression::Variable("n".to_string()),
1167 alias: None,
1168 }],
1169 distinct: false,
1170 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1171 variable: "n".to_string(),
1172 label: Some("Person".to_string()),
1173 input: None,
1174 })),
1175 }));
1176
1177 let physical = planner.plan(&logical).unwrap();
1178 assert_eq!(physical.columns(), &["n"]);
1179 }
1180
1181 #[test]
1182 fn test_plan_scan_without_label() {
1183 let store = create_test_store();
1184 let planner = Planner::new(store);
1185
1186 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1188 items: vec![ReturnItem {
1189 expression: LogicalExpression::Variable("n".to_string()),
1190 alias: None,
1191 }],
1192 distinct: false,
1193 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1194 variable: "n".to_string(),
1195 label: None,
1196 input: None,
1197 })),
1198 }));
1199
1200 let physical = planner.plan(&logical).unwrap();
1201 assert_eq!(physical.columns(), &["n"]);
1202 }
1203
1204 #[test]
1205 fn test_plan_return_with_alias() {
1206 let store = create_test_store();
1207 let planner = Planner::new(store);
1208
1209 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1211 items: vec![ReturnItem {
1212 expression: LogicalExpression::Variable("n".to_string()),
1213 alias: Some("person".to_string()),
1214 }],
1215 distinct: false,
1216 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1217 variable: "n".to_string(),
1218 label: Some("Person".to_string()),
1219 input: None,
1220 })),
1221 }));
1222
1223 let physical = planner.plan(&logical).unwrap();
1224 assert_eq!(physical.columns(), &["person"]);
1225 }
1226
1227 #[test]
1228 fn test_plan_return_property() {
1229 let store = create_test_store();
1230 let planner = Planner::new(store);
1231
1232 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1234 items: vec![ReturnItem {
1235 expression: LogicalExpression::Property {
1236 variable: "n".to_string(),
1237 property: "name".to_string(),
1238 },
1239 alias: None,
1240 }],
1241 distinct: false,
1242 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1243 variable: "n".to_string(),
1244 label: Some("Person".to_string()),
1245 input: None,
1246 })),
1247 }));
1248
1249 let physical = planner.plan(&logical).unwrap();
1250 assert_eq!(physical.columns(), &["n.name"]);
1251 }
1252
1253 #[test]
1254 fn test_plan_return_literal() {
1255 let store = create_test_store();
1256 let planner = Planner::new(store);
1257
1258 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1260 items: vec![ReturnItem {
1261 expression: LogicalExpression::Literal(Value::Int64(42)),
1262 alias: Some("answer".to_string()),
1263 }],
1264 distinct: false,
1265 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1266 variable: "n".to_string(),
1267 label: None,
1268 input: None,
1269 })),
1270 }));
1271
1272 let physical = planner.plan(&logical).unwrap();
1273 assert_eq!(physical.columns(), &["answer"]);
1274 }
1275
1276 #[test]
1279 fn test_plan_filter_equality() {
1280 let store = create_test_store();
1281 let planner = Planner::new(store);
1282
1283 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1285 items: vec![ReturnItem {
1286 expression: LogicalExpression::Variable("n".to_string()),
1287 alias: None,
1288 }],
1289 distinct: false,
1290 input: Box::new(LogicalOperator::Filter(FilterOp {
1291 predicate: LogicalExpression::Binary {
1292 left: Box::new(LogicalExpression::Property {
1293 variable: "n".to_string(),
1294 property: "age".to_string(),
1295 }),
1296 op: BinaryOp::Eq,
1297 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1298 },
1299 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1300 variable: "n".to_string(),
1301 label: Some("Person".to_string()),
1302 input: None,
1303 })),
1304 pushdown_hint: None,
1305 })),
1306 }));
1307
1308 let physical = planner.plan(&logical).unwrap();
1309 assert_eq!(physical.columns(), &["n"]);
1310 }
1311
1312 #[test]
1313 fn test_plan_filter_compound_and() {
1314 let store = create_test_store();
1315 let planner = Planner::new(store);
1316
1317 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1319 items: vec![ReturnItem {
1320 expression: LogicalExpression::Variable("n".to_string()),
1321 alias: None,
1322 }],
1323 distinct: false,
1324 input: Box::new(LogicalOperator::Filter(FilterOp {
1325 predicate: LogicalExpression::Binary {
1326 left: Box::new(LogicalExpression::Binary {
1327 left: Box::new(LogicalExpression::Property {
1328 variable: "n".to_string(),
1329 property: "age".to_string(),
1330 }),
1331 op: BinaryOp::Gt,
1332 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1333 }),
1334 op: BinaryOp::And,
1335 right: Box::new(LogicalExpression::Binary {
1336 left: Box::new(LogicalExpression::Property {
1337 variable: "n".to_string(),
1338 property: "age".to_string(),
1339 }),
1340 op: BinaryOp::Lt,
1341 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1342 }),
1343 },
1344 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1345 variable: "n".to_string(),
1346 label: None,
1347 input: None,
1348 })),
1349 pushdown_hint: None,
1350 })),
1351 }));
1352
1353 let physical = planner.plan(&logical).unwrap();
1354 assert_eq!(physical.columns(), &["n"]);
1355 }
1356
1357 #[test]
1358 fn test_plan_filter_unary_not() {
1359 let store = create_test_store();
1360 let planner = Planner::new(store);
1361
1362 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1364 items: vec![ReturnItem {
1365 expression: LogicalExpression::Variable("n".to_string()),
1366 alias: None,
1367 }],
1368 distinct: false,
1369 input: Box::new(LogicalOperator::Filter(FilterOp {
1370 predicate: LogicalExpression::Unary {
1371 op: UnaryOp::Not,
1372 operand: Box::new(LogicalExpression::Property {
1373 variable: "n".to_string(),
1374 property: "active".to_string(),
1375 }),
1376 },
1377 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1378 variable: "n".to_string(),
1379 label: None,
1380 input: None,
1381 })),
1382 pushdown_hint: None,
1383 })),
1384 }));
1385
1386 let physical = planner.plan(&logical).unwrap();
1387 assert_eq!(physical.columns(), &["n"]);
1388 }
1389
1390 #[test]
1391 fn test_plan_filter_is_null() {
1392 let store = create_test_store();
1393 let planner = Planner::new(store);
1394
1395 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1397 items: vec![ReturnItem {
1398 expression: LogicalExpression::Variable("n".to_string()),
1399 alias: None,
1400 }],
1401 distinct: false,
1402 input: Box::new(LogicalOperator::Filter(FilterOp {
1403 predicate: LogicalExpression::Unary {
1404 op: UnaryOp::IsNull,
1405 operand: Box::new(LogicalExpression::Property {
1406 variable: "n".to_string(),
1407 property: "email".to_string(),
1408 }),
1409 },
1410 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1411 variable: "n".to_string(),
1412 label: None,
1413 input: None,
1414 })),
1415 pushdown_hint: None,
1416 })),
1417 }));
1418
1419 let physical = planner.plan(&logical).unwrap();
1420 assert_eq!(physical.columns(), &["n"]);
1421 }
1422
1423 #[test]
1424 fn test_plan_filter_function_call() {
1425 let store = create_test_store();
1426 let planner = Planner::new(store);
1427
1428 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1430 items: vec![ReturnItem {
1431 expression: LogicalExpression::Variable("n".to_string()),
1432 alias: None,
1433 }],
1434 distinct: false,
1435 input: Box::new(LogicalOperator::Filter(FilterOp {
1436 predicate: LogicalExpression::Binary {
1437 left: Box::new(LogicalExpression::FunctionCall {
1438 name: "size".to_string(),
1439 args: vec![LogicalExpression::Property {
1440 variable: "n".to_string(),
1441 property: "friends".to_string(),
1442 }],
1443 distinct: false,
1444 }),
1445 op: BinaryOp::Gt,
1446 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1447 },
1448 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1449 variable: "n".to_string(),
1450 label: None,
1451 input: None,
1452 })),
1453 pushdown_hint: None,
1454 })),
1455 }));
1456
1457 let physical = planner.plan(&logical).unwrap();
1458 assert_eq!(physical.columns(), &["n"]);
1459 }
1460
1461 #[test]
1464 fn test_plan_expand_outgoing() {
1465 let store = create_test_store();
1466 let planner = Planner::new(store);
1467
1468 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1470 items: vec![
1471 ReturnItem {
1472 expression: LogicalExpression::Variable("a".to_string()),
1473 alias: None,
1474 },
1475 ReturnItem {
1476 expression: LogicalExpression::Variable("b".to_string()),
1477 alias: None,
1478 },
1479 ],
1480 distinct: false,
1481 input: Box::new(LogicalOperator::Expand(ExpandOp {
1482 from_variable: "a".to_string(),
1483 to_variable: "b".to_string(),
1484 edge_variable: None,
1485 direction: ExpandDirection::Outgoing,
1486 edge_types: vec!["KNOWS".to_string()],
1487 min_hops: 1,
1488 max_hops: Some(1),
1489 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1490 variable: "a".to_string(),
1491 label: Some("Person".to_string()),
1492 input: None,
1493 })),
1494 path_alias: None,
1495 path_mode: PathMode::Walk,
1496 })),
1497 }));
1498
1499 let physical = planner.plan(&logical).unwrap();
1500 assert!(physical.columns().contains(&"a".to_string()));
1502 assert!(physical.columns().contains(&"b".to_string()));
1503 }
1504
1505 #[test]
1506 fn test_plan_expand_with_edge_variable() {
1507 let store = create_test_store();
1508 let planner = Planner::new(store);
1509
1510 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1512 items: vec![
1513 ReturnItem {
1514 expression: LogicalExpression::Variable("a".to_string()),
1515 alias: None,
1516 },
1517 ReturnItem {
1518 expression: LogicalExpression::Variable("r".to_string()),
1519 alias: None,
1520 },
1521 ReturnItem {
1522 expression: LogicalExpression::Variable("b".to_string()),
1523 alias: None,
1524 },
1525 ],
1526 distinct: false,
1527 input: Box::new(LogicalOperator::Expand(ExpandOp {
1528 from_variable: "a".to_string(),
1529 to_variable: "b".to_string(),
1530 edge_variable: Some("r".to_string()),
1531 direction: ExpandDirection::Outgoing,
1532 edge_types: vec!["KNOWS".to_string()],
1533 min_hops: 1,
1534 max_hops: Some(1),
1535 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1536 variable: "a".to_string(),
1537 label: None,
1538 input: None,
1539 })),
1540 path_alias: None,
1541 path_mode: PathMode::Walk,
1542 })),
1543 }));
1544
1545 let physical = planner.plan(&logical).unwrap();
1546 assert!(physical.columns().contains(&"a".to_string()));
1547 assert!(physical.columns().contains(&"r".to_string()));
1548 assert!(physical.columns().contains(&"b".to_string()));
1549 }
1550
1551 #[test]
1554 fn test_plan_limit() {
1555 let store = create_test_store();
1556 let planner = Planner::new(store);
1557
1558 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1560 items: vec![ReturnItem {
1561 expression: LogicalExpression::Variable("n".to_string()),
1562 alias: None,
1563 }],
1564 distinct: false,
1565 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1566 count: 10.into(),
1567 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1568 variable: "n".to_string(),
1569 label: None,
1570 input: None,
1571 })),
1572 })),
1573 }));
1574
1575 let physical = planner.plan(&logical).unwrap();
1576 assert_eq!(physical.columns(), &["n"]);
1577 }
1578
1579 #[test]
1580 fn test_plan_skip() {
1581 let store = create_test_store();
1582 let planner = Planner::new(store);
1583
1584 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1586 items: vec![ReturnItem {
1587 expression: LogicalExpression::Variable("n".to_string()),
1588 alias: None,
1589 }],
1590 distinct: false,
1591 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1592 count: 5.into(),
1593 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1594 variable: "n".to_string(),
1595 label: None,
1596 input: None,
1597 })),
1598 })),
1599 }));
1600
1601 let physical = planner.plan(&logical).unwrap();
1602 assert_eq!(physical.columns(), &["n"]);
1603 }
1604
1605 #[test]
1608 fn test_limit_pushdown_into_range_scan_sets_inner_limit() {
1609 use grafeo_core::execution::operators::{LimitOperator, RangeScanOperator};
1610 let store = create_test_store();
1611 let planner = Planner::new(store);
1612
1613 let logical = LogicalPlan::new(LogicalOperator::Limit(LogicalLimitOp {
1615 count: 5.into(),
1616 input: Box::new(LogicalOperator::Filter(FilterOp {
1617 predicate: LogicalExpression::Binary {
1618 left: Box::new(LogicalExpression::Property {
1619 variable: "n".to_string(),
1620 property: "age".to_string(),
1621 }),
1622 op: BinaryOp::Gt,
1623 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1624 },
1625 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1626 variable: "n".to_string(),
1627 label: Some("Person".to_string()),
1628 input: None,
1629 })),
1630 pushdown_hint: None,
1631 })),
1632 }));
1633
1634 let physical = planner.plan(&logical).unwrap();
1635 let root = physical.into_operator();
1636
1637 let limit_op = root
1639 .into_any()
1640 .downcast::<LimitOperator>()
1641 .expect("top operator is LimitOperator");
1642 let (inner, limit_count) = limit_op.into_parts();
1643 assert_eq!(limit_count, 5, "outer LimitOperator preserves the cap");
1644
1645 let range_op = inner
1647 .into_any()
1648 .downcast::<RangeScanOperator>()
1649 .expect("inner operator is RangeScanOperator");
1650 assert_eq!(
1651 range_op.limit(),
1652 Some(5),
1653 "LIMIT 5 must be pushed into the inner range scan"
1654 );
1655 }
1656
1657 #[test]
1658 fn test_limit_pushdown_blocked_by_sort_in_between() {
1659 use grafeo_core::execution::operators::{
1660 LimitOperator, ProjectOperator, RangeScanOperator, SortOperator,
1661 };
1662 let store = create_test_store();
1663 let planner = Planner::new(store);
1664
1665 let logical = LogicalPlan::new(LogicalOperator::Limit(LogicalLimitOp {
1669 count: 5.into(),
1670 input: Box::new(LogicalOperator::Sort(SortOp {
1671 keys: vec![SortKey {
1672 expression: LogicalExpression::Property {
1673 variable: "n".to_string(),
1674 property: "name".to_string(),
1675 },
1676 order: SortOrder::Ascending,
1677 nulls: None,
1678 }],
1679 input: Box::new(LogicalOperator::Filter(FilterOp {
1680 predicate: LogicalExpression::Binary {
1681 left: Box::new(LogicalExpression::Property {
1682 variable: "n".to_string(),
1683 property: "age".to_string(),
1684 }),
1685 op: BinaryOp::Gt,
1686 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1687 },
1688 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1689 variable: "n".to_string(),
1690 label: Some("Person".to_string()),
1691 input: None,
1692 })),
1693 pushdown_hint: None,
1694 })),
1695 })),
1696 }));
1697
1698 let physical = planner.plan(&logical).unwrap();
1699 let root = physical.into_operator();
1700
1701 let limit_op = root
1703 .into_any()
1704 .downcast::<LimitOperator>()
1705 .expect("top operator is LimitOperator");
1706 let (after_limit, _cap) = limit_op.into_parts();
1707
1708 let sort_op = after_limit
1709 .into_any()
1710 .downcast::<SortOperator>()
1711 .expect("operator under Limit must be Sort (Sort blocks pushdown)");
1712 let (after_sort, _keys) = sort_op.into_parts();
1713
1714 let project_op = after_sort
1718 .into_any()
1719 .downcast::<ProjectOperator>()
1720 .expect("operator under Sort must be Project (Sort materializes sort keys)");
1721 let (after_project, _projs, _types) = project_op.into_parts();
1722
1723 let range_op = after_project
1724 .into_any()
1725 .downcast::<RangeScanOperator>()
1726 .expect("operator under Project must be RangeScan (Filter folded into scan)");
1727
1728 assert_eq!(
1733 range_op.limit(),
1734 None,
1735 "Sort between Limit and RangeScan must block LIMIT pushdown"
1736 );
1737
1738 assert_eq!(
1741 planner.limit_hint.get(),
1742 None,
1743 "limit_hint must be cleared after plan_limit returns"
1744 );
1745
1746 let direct = LogicalPlan::new(LogicalOperator::Limit(LogicalLimitOp {
1749 count: 5.into(),
1750 input: Box::new(LogicalOperator::Filter(FilterOp {
1751 predicate: LogicalExpression::Binary {
1752 left: Box::new(LogicalExpression::Property {
1753 variable: "n".to_string(),
1754 property: "age".to_string(),
1755 }),
1756 op: BinaryOp::Gt,
1757 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1758 },
1759 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1760 variable: "n".to_string(),
1761 label: Some("Person".to_string()),
1762 input: None,
1763 })),
1764 pushdown_hint: None,
1765 })),
1766 }));
1767 let direct_phys = planner.plan(&direct).unwrap();
1768 let direct_root = direct_phys.into_operator();
1769 let direct_limit = direct_root
1770 .into_any()
1771 .downcast::<LimitOperator>()
1772 .expect("direct top is LimitOperator");
1773 let (direct_inner, _) = direct_limit.into_parts();
1774 let direct_range = direct_inner
1775 .into_any()
1776 .downcast::<RangeScanOperator>()
1777 .expect("direct inner is RangeScanOperator");
1778 assert_eq!(
1779 direct_range.limit(),
1780 Some(5),
1781 "fixture sanity: pushdown DOES fire when input is Filter"
1782 );
1783 }
1784
1785 #[test]
1786 fn test_plan_sort() {
1787 let store = create_test_store();
1788 let planner = Planner::new(store);
1789
1790 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1792 items: vec![ReturnItem {
1793 expression: LogicalExpression::Variable("n".to_string()),
1794 alias: None,
1795 }],
1796 distinct: false,
1797 input: Box::new(LogicalOperator::Sort(SortOp {
1798 keys: vec![SortKey {
1799 expression: LogicalExpression::Variable("n".to_string()),
1800 order: SortOrder::Ascending,
1801 nulls: None,
1802 }],
1803 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1804 variable: "n".to_string(),
1805 label: None,
1806 input: None,
1807 })),
1808 })),
1809 }));
1810
1811 let physical = planner.plan(&logical).unwrap();
1812 assert_eq!(physical.columns(), &["n"]);
1813 }
1814
1815 #[test]
1816 fn test_plan_sort_descending() {
1817 let store = create_test_store();
1818 let planner = Planner::new(store);
1819
1820 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1822 items: vec![ReturnItem {
1823 expression: LogicalExpression::Variable("n".to_string()),
1824 alias: None,
1825 }],
1826 distinct: false,
1827 input: Box::new(LogicalOperator::Sort(SortOp {
1828 keys: vec![SortKey {
1829 expression: LogicalExpression::Variable("n".to_string()),
1830 order: SortOrder::Descending,
1831 nulls: None,
1832 }],
1833 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1834 variable: "n".to_string(),
1835 label: None,
1836 input: None,
1837 })),
1838 })),
1839 }));
1840
1841 let physical = planner.plan(&logical).unwrap();
1842 assert_eq!(physical.columns(), &["n"]);
1843 }
1844
1845 #[test]
1846 fn test_plan_distinct() {
1847 let store = create_test_store();
1848 let planner = Planner::new(store);
1849
1850 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1852 items: vec![ReturnItem {
1853 expression: LogicalExpression::Variable("n".to_string()),
1854 alias: None,
1855 }],
1856 distinct: false,
1857 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1858 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1859 variable: "n".to_string(),
1860 label: None,
1861 input: None,
1862 })),
1863 columns: None,
1864 })),
1865 }));
1866
1867 let physical = planner.plan(&logical).unwrap();
1868 assert_eq!(physical.columns(), &["n"]);
1869 }
1870
1871 #[test]
1872 fn test_plan_distinct_with_columns() {
1873 let store = create_test_store();
1874 let planner = Planner::new(store);
1875
1876 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1878 items: vec![ReturnItem {
1879 expression: LogicalExpression::Variable("n".to_string()),
1880 alias: None,
1881 }],
1882 distinct: false,
1883 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1884 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1885 variable: "n".to_string(),
1886 label: None,
1887 input: None,
1888 })),
1889 columns: Some(vec!["n".to_string()]),
1890 })),
1891 }));
1892
1893 let physical = planner.plan(&logical).unwrap();
1894 assert_eq!(physical.columns(), &["n"]);
1895 }
1896
1897 #[test]
1898 fn test_plan_distinct_with_nonexistent_columns() {
1899 let store = create_test_store();
1900 let planner = Planner::new(store);
1901
1902 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1905 items: vec![ReturnItem {
1906 expression: LogicalExpression::Variable("n".to_string()),
1907 alias: None,
1908 }],
1909 distinct: false,
1910 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1911 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1912 variable: "n".to_string(),
1913 label: None,
1914 input: None,
1915 })),
1916 columns: Some(vec!["nonexistent".to_string()]),
1917 })),
1918 }));
1919
1920 let physical = planner.plan(&logical).unwrap();
1921 assert_eq!(physical.columns(), &["n"]);
1922 }
1923
1924 #[test]
1927 fn test_plan_aggregate_count() {
1928 let store = create_test_store();
1929 let planner = Planner::new(store);
1930
1931 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1933 items: vec![ReturnItem {
1934 expression: LogicalExpression::Variable("cnt".to_string()),
1935 alias: None,
1936 }],
1937 distinct: false,
1938 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1939 group_by: vec![],
1940 aggregates: vec![LogicalAggregateExpr {
1941 function: LogicalAggregateFunction::Count,
1942 expression: Some(LogicalExpression::Variable("n".to_string())),
1943 expression2: None,
1944 distinct: false,
1945 alias: Some("cnt".to_string()),
1946 percentile: None,
1947 separator: None,
1948 }],
1949 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1950 variable: "n".to_string(),
1951 label: None,
1952 input: None,
1953 })),
1954 having: None,
1955 })),
1956 }));
1957
1958 let physical = planner.plan(&logical).unwrap();
1959 assert!(physical.columns().contains(&"cnt".to_string()));
1960 }
1961
1962 #[test]
1963 fn test_plan_aggregate_with_group_by() {
1964 let store = create_test_store();
1965 let planner = Planner::new(store);
1966
1967 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1969 group_by: vec![LogicalExpression::Property {
1970 variable: "n".to_string(),
1971 property: "city".to_string(),
1972 }],
1973 aggregates: vec![LogicalAggregateExpr {
1974 function: LogicalAggregateFunction::Count,
1975 expression: Some(LogicalExpression::Variable("n".to_string())),
1976 expression2: None,
1977 distinct: false,
1978 alias: Some("cnt".to_string()),
1979 percentile: None,
1980 separator: None,
1981 }],
1982 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1983 variable: "n".to_string(),
1984 label: Some("Person".to_string()),
1985 input: None,
1986 })),
1987 having: None,
1988 }));
1989
1990 let physical = planner.plan(&logical).unwrap();
1991 assert_eq!(physical.columns().len(), 2);
1992 }
1993
1994 #[test]
1995 fn test_plan_aggregate_sum() {
1996 let store = create_test_store();
1997 let planner = Planner::new(store);
1998
1999 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2001 group_by: vec![],
2002 aggregates: vec![LogicalAggregateExpr {
2003 function: LogicalAggregateFunction::Sum,
2004 expression: Some(LogicalExpression::Property {
2005 variable: "n".to_string(),
2006 property: "value".to_string(),
2007 }),
2008 expression2: None,
2009 distinct: false,
2010 alias: Some("total".to_string()),
2011 percentile: None,
2012 separator: None,
2013 }],
2014 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2015 variable: "n".to_string(),
2016 label: None,
2017 input: None,
2018 })),
2019 having: None,
2020 }));
2021
2022 let physical = planner.plan(&logical).unwrap();
2023 assert!(physical.columns().contains(&"total".to_string()));
2024 }
2025
2026 #[test]
2027 fn test_plan_aggregate_avg() {
2028 let store = create_test_store();
2029 let planner = Planner::new(store);
2030
2031 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2033 group_by: vec![],
2034 aggregates: vec![LogicalAggregateExpr {
2035 function: LogicalAggregateFunction::Avg,
2036 expression: Some(LogicalExpression::Property {
2037 variable: "n".to_string(),
2038 property: "score".to_string(),
2039 }),
2040 expression2: None,
2041 distinct: false,
2042 alias: Some("average".to_string()),
2043 percentile: None,
2044 separator: None,
2045 }],
2046 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2047 variable: "n".to_string(),
2048 label: None,
2049 input: None,
2050 })),
2051 having: None,
2052 }));
2053
2054 let physical = planner.plan(&logical).unwrap();
2055 assert!(physical.columns().contains(&"average".to_string()));
2056 }
2057
2058 #[test]
2059 fn test_plan_aggregate_min_max() {
2060 let store = create_test_store();
2061 let planner = Planner::new(store);
2062
2063 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2065 group_by: vec![],
2066 aggregates: vec![
2067 LogicalAggregateExpr {
2068 function: LogicalAggregateFunction::Min,
2069 expression: Some(LogicalExpression::Property {
2070 variable: "n".to_string(),
2071 property: "age".to_string(),
2072 }),
2073 expression2: None,
2074 distinct: false,
2075 alias: Some("youngest".to_string()),
2076 percentile: None,
2077 separator: None,
2078 },
2079 LogicalAggregateExpr {
2080 function: LogicalAggregateFunction::Max,
2081 expression: Some(LogicalExpression::Property {
2082 variable: "n".to_string(),
2083 property: "age".to_string(),
2084 }),
2085 expression2: None,
2086 distinct: false,
2087 alias: Some("oldest".to_string()),
2088 percentile: None,
2089 separator: None,
2090 },
2091 ],
2092 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2093 variable: "n".to_string(),
2094 label: None,
2095 input: None,
2096 })),
2097 having: None,
2098 }));
2099
2100 let physical = planner.plan(&logical).unwrap();
2101 assert!(physical.columns().contains(&"youngest".to_string()));
2102 assert!(physical.columns().contains(&"oldest".to_string()));
2103 }
2104
2105 #[test]
2108 fn test_plan_inner_join() {
2109 let store = create_test_store();
2110 let planner = Planner::new(store);
2111
2112 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2114 items: vec![
2115 ReturnItem {
2116 expression: LogicalExpression::Variable("a".to_string()),
2117 alias: None,
2118 },
2119 ReturnItem {
2120 expression: LogicalExpression::Variable("b".to_string()),
2121 alias: None,
2122 },
2123 ],
2124 distinct: false,
2125 input: Box::new(LogicalOperator::Join(JoinOp {
2126 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2127 variable: "a".to_string(),
2128 label: Some("Person".to_string()),
2129 input: None,
2130 })),
2131 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2132 variable: "b".to_string(),
2133 label: Some("Company".to_string()),
2134 input: None,
2135 })),
2136 join_type: JoinType::Inner,
2137 conditions: vec![JoinCondition {
2138 left: LogicalExpression::Variable("a".to_string()),
2139 right: LogicalExpression::Variable("b".to_string()),
2140 }],
2141 })),
2142 }));
2143
2144 let physical = planner.plan(&logical).unwrap();
2145 assert!(physical.columns().contains(&"a".to_string()));
2146 assert!(physical.columns().contains(&"b".to_string()));
2147 }
2148
2149 #[test]
2150 fn test_plan_cross_join() {
2151 let store = create_test_store();
2152 let planner = Planner::new(store);
2153
2154 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2156 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2157 variable: "a".to_string(),
2158 label: None,
2159 input: None,
2160 })),
2161 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2162 variable: "b".to_string(),
2163 label: None,
2164 input: None,
2165 })),
2166 join_type: JoinType::Cross,
2167 conditions: vec![],
2168 }));
2169
2170 let physical = planner.plan(&logical).unwrap();
2171 assert_eq!(physical.columns().len(), 2);
2172 }
2173
2174 #[test]
2175 fn test_plan_left_join() {
2176 let store = create_test_store();
2177 let planner = Planner::new(store);
2178
2179 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2180 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2181 variable: "a".to_string(),
2182 label: None,
2183 input: None,
2184 })),
2185 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2186 variable: "b".to_string(),
2187 label: None,
2188 input: None,
2189 })),
2190 join_type: JoinType::Left,
2191 conditions: vec![],
2192 }));
2193
2194 let physical = planner.plan(&logical).unwrap();
2195 assert_eq!(physical.columns().len(), 2);
2196 }
2197
2198 fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
2201 let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStoreSearch>);
2202 p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
2203 p
2204 }
2205
2206 #[test]
2207 fn test_plan_create_node() {
2208 let store = create_test_store();
2209 let planner = create_writable_planner(&store);
2210
2211 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2213 variable: "n".to_string(),
2214 labels: vec!["Person".to_string()],
2215 properties: vec![(
2216 "name".to_string(),
2217 LogicalExpression::Literal(Value::String("Alix".into())),
2218 )],
2219 input: None,
2220 }));
2221
2222 let physical = planner.plan(&logical).unwrap();
2223 assert!(physical.columns().contains(&"n".to_string()));
2224 }
2225
2226 #[test]
2227 fn test_plan_create_edge() {
2228 let store = create_test_store();
2229 let planner = create_writable_planner(&store);
2230
2231 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
2233 variable: Some("r".to_string()),
2234 from_variable: "a".to_string(),
2235 to_variable: "b".to_string(),
2236 edge_type: "KNOWS".to_string(),
2237 properties: vec![],
2238 input: Box::new(LogicalOperator::Join(JoinOp {
2239 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2240 variable: "a".to_string(),
2241 label: None,
2242 input: None,
2243 })),
2244 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2245 variable: "b".to_string(),
2246 label: None,
2247 input: None,
2248 })),
2249 join_type: JoinType::Cross,
2250 conditions: vec![],
2251 })),
2252 }));
2253
2254 let physical = planner.plan(&logical).unwrap();
2255 assert!(physical.columns().contains(&"r".to_string()));
2256 }
2257
2258 #[test]
2259 fn test_plan_delete_node() {
2260 let store = create_test_store();
2261 let planner = create_writable_planner(&store);
2262
2263 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
2265 variable: "n".to_string(),
2266 detach: false,
2267 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2268 variable: "n".to_string(),
2269 label: None,
2270 input: None,
2271 })),
2272 }));
2273
2274 let physical = planner.plan(&logical).unwrap();
2275 assert!(physical.columns().contains(&"n".to_string()));
2276 }
2277
2278 #[test]
2281 fn test_plan_empty_errors() {
2282 let store = create_test_store();
2283 let planner = Planner::new(store);
2284
2285 let logical = LogicalPlan::new(LogicalOperator::Empty);
2286 let result = planner.plan(&logical);
2287 assert!(result.is_err());
2288 }
2289
2290 #[test]
2291 fn test_plan_missing_variable_in_return() {
2292 let store = create_test_store();
2293 let planner = Planner::new(store);
2294
2295 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2297 items: vec![ReturnItem {
2298 expression: LogicalExpression::Variable("missing".to_string()),
2299 alias: None,
2300 }],
2301 distinct: false,
2302 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2303 variable: "n".to_string(),
2304 label: None,
2305 input: None,
2306 })),
2307 }));
2308
2309 let result = planner.plan(&logical);
2310 assert!(result.is_err());
2311 }
2312
2313 #[test]
2316 fn test_convert_binary_ops() {
2317 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
2318 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
2319 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
2320 assert!(convert_binary_op(BinaryOp::Le).is_ok());
2321 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
2322 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
2323 assert!(convert_binary_op(BinaryOp::And).is_ok());
2324 assert!(convert_binary_op(BinaryOp::Or).is_ok());
2325 assert!(convert_binary_op(BinaryOp::Add).is_ok());
2326 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2327 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2328 assert!(convert_binary_op(BinaryOp::Div).is_ok());
2329 }
2330
2331 #[test]
2332 fn test_convert_unary_ops() {
2333 assert!(convert_unary_op(UnaryOp::Not).is_ok());
2334 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2335 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2336 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2337 }
2338
2339 #[test]
2340 fn test_convert_aggregate_functions() {
2341 assert!(matches!(
2342 convert_aggregate_function(LogicalAggregateFunction::Count),
2343 PhysicalAggregateFunction::Count
2344 ));
2345 assert!(matches!(
2346 convert_aggregate_function(LogicalAggregateFunction::Sum),
2347 PhysicalAggregateFunction::Sum
2348 ));
2349 assert!(matches!(
2350 convert_aggregate_function(LogicalAggregateFunction::Avg),
2351 PhysicalAggregateFunction::Avg
2352 ));
2353 assert!(matches!(
2354 convert_aggregate_function(LogicalAggregateFunction::Min),
2355 PhysicalAggregateFunction::Min
2356 ));
2357 assert!(matches!(
2358 convert_aggregate_function(LogicalAggregateFunction::Max),
2359 PhysicalAggregateFunction::Max
2360 ));
2361 }
2362
2363 #[test]
2364 fn test_planner_accessors() {
2365 let store = create_test_store();
2366 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2367
2368 assert!(planner.transaction_id().is_none());
2369 assert!(planner.transaction_manager().is_none());
2370 let _ = planner.viewing_epoch(); }
2372
2373 #[test]
2374 fn test_physical_plan_accessors() {
2375 let store = create_test_store();
2376 let planner = Planner::new(store);
2377
2378 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2379 variable: "n".to_string(),
2380 label: None,
2381 input: None,
2382 }));
2383
2384 let physical = planner.plan(&logical).unwrap();
2385 assert_eq!(physical.columns(), &["n"]);
2386
2387 let _ = physical.into_operator();
2389 }
2390
2391 #[test]
2394 fn test_plan_adaptive_with_scan() {
2395 let store = create_test_store();
2396 let planner = Planner::new(store);
2397
2398 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2400 items: vec![ReturnItem {
2401 expression: LogicalExpression::Variable("n".to_string()),
2402 alias: None,
2403 }],
2404 distinct: false,
2405 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2406 variable: "n".to_string(),
2407 label: Some("Person".to_string()),
2408 input: None,
2409 })),
2410 }));
2411
2412 let physical = planner.plan_adaptive(&logical).unwrap();
2413 assert_eq!(physical.columns(), &["n"]);
2414 assert!(physical.adaptive_context.is_some());
2416 }
2417
2418 #[test]
2419 fn test_plan_adaptive_with_filter() {
2420 let store = create_test_store();
2421 let planner = Planner::new(store);
2422
2423 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2425 items: vec![ReturnItem {
2426 expression: LogicalExpression::Variable("n".to_string()),
2427 alias: None,
2428 }],
2429 distinct: false,
2430 input: Box::new(LogicalOperator::Filter(FilterOp {
2431 predicate: LogicalExpression::Binary {
2432 left: Box::new(LogicalExpression::Property {
2433 variable: "n".to_string(),
2434 property: "age".to_string(),
2435 }),
2436 op: BinaryOp::Gt,
2437 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2438 },
2439 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2440 variable: "n".to_string(),
2441 label: None,
2442 input: None,
2443 })),
2444 pushdown_hint: None,
2445 })),
2446 }));
2447
2448 let physical = planner.plan_adaptive(&logical).unwrap();
2449 assert!(physical.adaptive_context.is_some());
2450 }
2451
2452 #[test]
2453 fn test_plan_adaptive_with_expand() {
2454 let store = create_test_store();
2455 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2456 .with_factorized_execution(false);
2457
2458 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2460 items: vec![
2461 ReturnItem {
2462 expression: LogicalExpression::Variable("a".to_string()),
2463 alias: None,
2464 },
2465 ReturnItem {
2466 expression: LogicalExpression::Variable("b".to_string()),
2467 alias: None,
2468 },
2469 ],
2470 distinct: false,
2471 input: Box::new(LogicalOperator::Expand(ExpandOp {
2472 from_variable: "a".to_string(),
2473 to_variable: "b".to_string(),
2474 edge_variable: None,
2475 direction: ExpandDirection::Outgoing,
2476 edge_types: vec!["KNOWS".to_string()],
2477 min_hops: 1,
2478 max_hops: Some(1),
2479 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2480 variable: "a".to_string(),
2481 label: None,
2482 input: None,
2483 })),
2484 path_alias: None,
2485 path_mode: PathMode::Walk,
2486 })),
2487 }));
2488
2489 let physical = planner.plan_adaptive(&logical).unwrap();
2490 assert!(physical.adaptive_context.is_some());
2491 }
2492
2493 #[test]
2494 fn test_plan_adaptive_with_join() {
2495 let store = create_test_store();
2496 let planner = Planner::new(store);
2497
2498 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2499 items: vec![
2500 ReturnItem {
2501 expression: LogicalExpression::Variable("a".to_string()),
2502 alias: None,
2503 },
2504 ReturnItem {
2505 expression: LogicalExpression::Variable("b".to_string()),
2506 alias: None,
2507 },
2508 ],
2509 distinct: false,
2510 input: Box::new(LogicalOperator::Join(JoinOp {
2511 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2512 variable: "a".to_string(),
2513 label: None,
2514 input: None,
2515 })),
2516 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2517 variable: "b".to_string(),
2518 label: None,
2519 input: None,
2520 })),
2521 join_type: JoinType::Cross,
2522 conditions: vec![],
2523 })),
2524 }));
2525
2526 let physical = planner.plan_adaptive(&logical).unwrap();
2527 assert!(physical.adaptive_context.is_some());
2528 }
2529
2530 #[test]
2531 fn test_plan_adaptive_with_aggregate() {
2532 let store = create_test_store();
2533 let planner = Planner::new(store);
2534
2535 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2536 group_by: vec![],
2537 aggregates: vec![LogicalAggregateExpr {
2538 function: LogicalAggregateFunction::Count,
2539 expression: Some(LogicalExpression::Variable("n".to_string())),
2540 expression2: None,
2541 distinct: false,
2542 alias: Some("cnt".to_string()),
2543 percentile: None,
2544 separator: None,
2545 }],
2546 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2547 variable: "n".to_string(),
2548 label: None,
2549 input: None,
2550 })),
2551 having: None,
2552 }));
2553
2554 let physical = planner.plan_adaptive(&logical).unwrap();
2555 assert!(physical.adaptive_context.is_some());
2556 }
2557
2558 #[test]
2559 fn test_plan_adaptive_with_distinct() {
2560 let store = create_test_store();
2561 let planner = Planner::new(store);
2562
2563 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2564 items: vec![ReturnItem {
2565 expression: LogicalExpression::Variable("n".to_string()),
2566 alias: None,
2567 }],
2568 distinct: false,
2569 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2570 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2571 variable: "n".to_string(),
2572 label: None,
2573 input: None,
2574 })),
2575 columns: None,
2576 })),
2577 }));
2578
2579 let physical = planner.plan_adaptive(&logical).unwrap();
2580 assert!(physical.adaptive_context.is_some());
2581 }
2582
2583 #[test]
2584 fn test_plan_adaptive_with_limit() {
2585 let store = create_test_store();
2586 let planner = Planner::new(store);
2587
2588 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2589 items: vec![ReturnItem {
2590 expression: LogicalExpression::Variable("n".to_string()),
2591 alias: None,
2592 }],
2593 distinct: false,
2594 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2595 count: 10.into(),
2596 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2597 variable: "n".to_string(),
2598 label: None,
2599 input: None,
2600 })),
2601 })),
2602 }));
2603
2604 let physical = planner.plan_adaptive(&logical).unwrap();
2605 assert!(physical.adaptive_context.is_some());
2606 }
2607
2608 #[test]
2609 fn test_plan_adaptive_with_skip() {
2610 let store = create_test_store();
2611 let planner = Planner::new(store);
2612
2613 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2614 items: vec![ReturnItem {
2615 expression: LogicalExpression::Variable("n".to_string()),
2616 alias: None,
2617 }],
2618 distinct: false,
2619 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2620 count: 5.into(),
2621 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2622 variable: "n".to_string(),
2623 label: None,
2624 input: None,
2625 })),
2626 })),
2627 }));
2628
2629 let physical = planner.plan_adaptive(&logical).unwrap();
2630 assert!(physical.adaptive_context.is_some());
2631 }
2632
2633 #[test]
2634 fn test_plan_adaptive_with_sort() {
2635 let store = create_test_store();
2636 let planner = Planner::new(store);
2637
2638 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2639 items: vec![ReturnItem {
2640 expression: LogicalExpression::Variable("n".to_string()),
2641 alias: None,
2642 }],
2643 distinct: false,
2644 input: Box::new(LogicalOperator::Sort(SortOp {
2645 keys: vec![SortKey {
2646 expression: LogicalExpression::Variable("n".to_string()),
2647 order: SortOrder::Ascending,
2648 nulls: None,
2649 }],
2650 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2651 variable: "n".to_string(),
2652 label: None,
2653 input: None,
2654 })),
2655 })),
2656 }));
2657
2658 let physical = planner.plan_adaptive(&logical).unwrap();
2659 assert!(physical.adaptive_context.is_some());
2660 }
2661
2662 #[test]
2663 fn test_plan_adaptive_with_union() {
2664 let store = create_test_store();
2665 let planner = Planner::new(store);
2666
2667 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2668 items: vec![ReturnItem {
2669 expression: LogicalExpression::Variable("n".to_string()),
2670 alias: None,
2671 }],
2672 distinct: false,
2673 input: Box::new(LogicalOperator::Union(UnionOp {
2674 inputs: vec![
2675 LogicalOperator::NodeScan(NodeScanOp {
2676 variable: "n".to_string(),
2677 label: Some("Person".to_string()),
2678 input: None,
2679 }),
2680 LogicalOperator::NodeScan(NodeScanOp {
2681 variable: "n".to_string(),
2682 label: Some("Company".to_string()),
2683 input: None,
2684 }),
2685 ],
2686 })),
2687 }));
2688
2689 let physical = planner.plan_adaptive(&logical).unwrap();
2690 assert!(physical.adaptive_context.is_some());
2691 }
2692
2693 #[test]
2696 fn test_plan_expand_variable_length() {
2697 let store = create_test_store();
2698 let planner = Planner::new(store);
2699
2700 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2702 items: vec![
2703 ReturnItem {
2704 expression: LogicalExpression::Variable("a".to_string()),
2705 alias: None,
2706 },
2707 ReturnItem {
2708 expression: LogicalExpression::Variable("b".to_string()),
2709 alias: None,
2710 },
2711 ],
2712 distinct: false,
2713 input: Box::new(LogicalOperator::Expand(ExpandOp {
2714 from_variable: "a".to_string(),
2715 to_variable: "b".to_string(),
2716 edge_variable: None,
2717 direction: ExpandDirection::Outgoing,
2718 edge_types: vec!["KNOWS".to_string()],
2719 min_hops: 1,
2720 max_hops: Some(3),
2721 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2722 variable: "a".to_string(),
2723 label: None,
2724 input: None,
2725 })),
2726 path_alias: None,
2727 path_mode: PathMode::Walk,
2728 })),
2729 }));
2730
2731 let physical = planner.plan(&logical).unwrap();
2732 assert!(physical.columns().contains(&"a".to_string()));
2733 assert!(physical.columns().contains(&"b".to_string()));
2734 }
2735
2736 #[test]
2737 fn test_plan_expand_with_path_alias() {
2738 let store = create_test_store();
2739 let planner = Planner::new(store);
2740
2741 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2743 items: vec![
2744 ReturnItem {
2745 expression: LogicalExpression::Variable("a".to_string()),
2746 alias: None,
2747 },
2748 ReturnItem {
2749 expression: LogicalExpression::Variable("b".to_string()),
2750 alias: None,
2751 },
2752 ],
2753 distinct: false,
2754 input: Box::new(LogicalOperator::Expand(ExpandOp {
2755 from_variable: "a".to_string(),
2756 to_variable: "b".to_string(),
2757 edge_variable: None,
2758 direction: ExpandDirection::Outgoing,
2759 edge_types: vec!["KNOWS".to_string()],
2760 min_hops: 1,
2761 max_hops: Some(3),
2762 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2763 variable: "a".to_string(),
2764 label: None,
2765 input: None,
2766 })),
2767 path_alias: Some("p".to_string()),
2768 path_mode: PathMode::Walk,
2769 })),
2770 }));
2771
2772 let physical = planner.plan(&logical).unwrap();
2773 assert!(physical.columns().contains(&"a".to_string()));
2775 assert!(physical.columns().contains(&"b".to_string()));
2776 }
2777
2778 #[test]
2779 fn test_plan_expand_incoming() {
2780 let store = create_test_store();
2781 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2782 .with_factorized_execution(false);
2783
2784 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2786 items: vec![
2787 ReturnItem {
2788 expression: LogicalExpression::Variable("a".to_string()),
2789 alias: None,
2790 },
2791 ReturnItem {
2792 expression: LogicalExpression::Variable("b".to_string()),
2793 alias: None,
2794 },
2795 ],
2796 distinct: false,
2797 input: Box::new(LogicalOperator::Expand(ExpandOp {
2798 from_variable: "a".to_string(),
2799 to_variable: "b".to_string(),
2800 edge_variable: None,
2801 direction: ExpandDirection::Incoming,
2802 edge_types: vec!["KNOWS".to_string()],
2803 min_hops: 1,
2804 max_hops: Some(1),
2805 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2806 variable: "a".to_string(),
2807 label: None,
2808 input: None,
2809 })),
2810 path_alias: None,
2811 path_mode: PathMode::Walk,
2812 })),
2813 }));
2814
2815 let physical = planner.plan(&logical).unwrap();
2816 assert!(physical.columns().contains(&"a".to_string()));
2817 assert!(physical.columns().contains(&"b".to_string()));
2818 }
2819
2820 #[test]
2821 fn test_plan_expand_both_directions() {
2822 let store = create_test_store();
2823 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2824 .with_factorized_execution(false);
2825
2826 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2828 items: vec![
2829 ReturnItem {
2830 expression: LogicalExpression::Variable("a".to_string()),
2831 alias: None,
2832 },
2833 ReturnItem {
2834 expression: LogicalExpression::Variable("b".to_string()),
2835 alias: None,
2836 },
2837 ],
2838 distinct: false,
2839 input: Box::new(LogicalOperator::Expand(ExpandOp {
2840 from_variable: "a".to_string(),
2841 to_variable: "b".to_string(),
2842 edge_variable: None,
2843 direction: ExpandDirection::Both,
2844 edge_types: vec!["KNOWS".to_string()],
2845 min_hops: 1,
2846 max_hops: Some(1),
2847 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2848 variable: "a".to_string(),
2849 label: None,
2850 input: None,
2851 })),
2852 path_alias: None,
2853 path_mode: PathMode::Walk,
2854 })),
2855 }));
2856
2857 let physical = planner.plan(&logical).unwrap();
2858 assert!(physical.columns().contains(&"a".to_string()));
2859 assert!(physical.columns().contains(&"b".to_string()));
2860 }
2861
2862 #[test]
2865 fn test_planner_with_context() {
2866 use crate::transaction::TransactionManager;
2867
2868 let store = create_test_store();
2869 let transaction_manager = Arc::new(TransactionManager::new());
2870 let transaction_id = transaction_manager.begin();
2871 let epoch = transaction_manager.current_epoch();
2872
2873 let planner = Planner::with_context(
2874 Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
2875 Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2876 Arc::clone(&transaction_manager),
2877 Some(transaction_id),
2878 epoch,
2879 );
2880
2881 assert_eq!(planner.transaction_id(), Some(transaction_id));
2882 assert!(planner.transaction_manager().is_some());
2883 assert_eq!(planner.viewing_epoch(), epoch);
2884 }
2885
2886 #[test]
2887 fn test_planner_with_factorized_execution_disabled() {
2888 let store = create_test_store();
2889 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2890 .with_factorized_execution(false);
2891
2892 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2894 items: vec![
2895 ReturnItem {
2896 expression: LogicalExpression::Variable("a".to_string()),
2897 alias: None,
2898 },
2899 ReturnItem {
2900 expression: LogicalExpression::Variable("c".to_string()),
2901 alias: None,
2902 },
2903 ],
2904 distinct: false,
2905 input: Box::new(LogicalOperator::Expand(ExpandOp {
2906 from_variable: "b".to_string(),
2907 to_variable: "c".to_string(),
2908 edge_variable: None,
2909 direction: ExpandDirection::Outgoing,
2910 edge_types: vec![],
2911 min_hops: 1,
2912 max_hops: Some(1),
2913 input: Box::new(LogicalOperator::Expand(ExpandOp {
2914 from_variable: "a".to_string(),
2915 to_variable: "b".to_string(),
2916 edge_variable: None,
2917 direction: ExpandDirection::Outgoing,
2918 edge_types: vec![],
2919 min_hops: 1,
2920 max_hops: Some(1),
2921 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2922 variable: "a".to_string(),
2923 label: None,
2924 input: None,
2925 })),
2926 path_alias: None,
2927 path_mode: PathMode::Walk,
2928 })),
2929 path_alias: None,
2930 path_mode: PathMode::Walk,
2931 })),
2932 }));
2933
2934 let physical = planner.plan(&logical).unwrap();
2935 assert!(physical.columns().contains(&"a".to_string()));
2936 assert!(physical.columns().contains(&"c".to_string()));
2937 }
2938
2939 #[test]
2942 fn test_plan_sort_by_property() {
2943 let store = create_test_store();
2944 let planner = Planner::new(store);
2945
2946 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2948 items: vec![ReturnItem {
2949 expression: LogicalExpression::Variable("n".to_string()),
2950 alias: None,
2951 }],
2952 distinct: false,
2953 input: Box::new(LogicalOperator::Sort(SortOp {
2954 keys: vec![SortKey {
2955 expression: LogicalExpression::Property {
2956 variable: "n".to_string(),
2957 property: "name".to_string(),
2958 },
2959 order: SortOrder::Ascending,
2960 nulls: None,
2961 }],
2962 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2963 variable: "n".to_string(),
2964 label: None,
2965 input: None,
2966 })),
2967 })),
2968 }));
2969
2970 let physical = planner.plan(&logical).unwrap();
2971 assert!(physical.columns().contains(&"n".to_string()));
2973 }
2974
2975 #[test]
2978 fn test_plan_scan_with_input() {
2979 let store = create_test_store();
2980 let planner = Planner::new(store);
2981
2982 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2984 items: vec![
2985 ReturnItem {
2986 expression: LogicalExpression::Variable("a".to_string()),
2987 alias: None,
2988 },
2989 ReturnItem {
2990 expression: LogicalExpression::Variable("b".to_string()),
2991 alias: None,
2992 },
2993 ],
2994 distinct: false,
2995 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2996 variable: "b".to_string(),
2997 label: Some("Company".to_string()),
2998 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2999 variable: "a".to_string(),
3000 label: Some("Person".to_string()),
3001 input: None,
3002 }))),
3003 })),
3004 }));
3005
3006 let physical = planner.plan(&logical).unwrap();
3007 assert!(physical.columns().contains(&"a".to_string()));
3008 assert!(physical.columns().contains(&"b".to_string()));
3009 }
3010
3011 use crate::catalog::Catalog;
3019 use crate::query::plan::{
3020 AddLabelOp, AntiJoinOp, ApplyOp, BindOp, DeleteEdgeOp, EdgeScanOp, ExceptOp,
3021 HorizontalAggregateOp, IntersectOp, LeftJoinOp, LoadDataFormat, LoadDataOp, MapCollectOp,
3022 MergeOp, MergeRelationshipOp, MultiWayJoinOp, OtherwiseOp, ParameterScanOp, RemoveLabelOp,
3023 SetPropertyOp, ShortestPathOp, TripleComponent, TripleScanOp, UnionOp, UnwindOp,
3024 };
3025 use grafeo_core::execution::operators::{Operator, SessionContext};
3026
3027 fn full_store() -> Arc<LpgStore> {
3028 let store = Arc::new(LpgStore::new().unwrap());
3030 let vincent = store.create_node(&["Person"]);
3031 let jules = store.create_node(&["Person"]);
3032 let mia = store.create_node(&["Person"]);
3033 let _company = store.create_node(&["Company"]);
3034 store.create_edge(vincent, jules, "KNOWS");
3035 store.create_edge(jules, mia, "KNOWS");
3036 store
3037 }
3038
3039 fn scan_person(var: &str) -> LogicalOperator {
3040 LogicalOperator::NodeScan(NodeScanOp {
3041 variable: var.to_string(),
3042 label: Some("Person".to_string()),
3043 input: None,
3044 })
3045 }
3046
3047 fn scan_any(var: &str) -> LogicalOperator {
3048 LogicalOperator::NodeScan(NodeScanOp {
3049 variable: var.to_string(),
3050 label: None,
3051 input: None,
3052 })
3053 }
3054
3055 #[test]
3058 fn test_with_read_only_flag() {
3059 let store = create_test_store();
3060 let planner =
3061 Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(true);
3062 assert!(planner.read_only);
3063
3064 let planner_off =
3065 Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(false);
3066 assert!(!planner_off.read_only);
3067 }
3068
3069 #[test]
3070 fn test_with_catalog() {
3071 let store = create_test_store();
3072 let catalog = Arc::new(Catalog::new());
3073 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
3074 .with_catalog(Arc::clone(&catalog));
3075 assert!(planner.catalog.is_some());
3076 }
3077
3078 #[test]
3079 fn test_with_session_context() {
3080 let store = create_test_store();
3081 let context = SessionContext {
3082 current_schema: Some("public".to_string()),
3083 current_graph: Some("main".to_string()),
3084 ..SessionContext::default()
3085 };
3086 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
3087 .with_session_context(context);
3088 assert_eq!(
3089 planner.session_context.current_schema.as_deref(),
3090 Some("public")
3091 );
3092 assert_eq!(
3093 planner.session_context.current_graph.as_deref(),
3094 Some("main")
3095 );
3096 }
3097
3098 #[test]
3101 fn test_register_edge_column_named() {
3102 let store = create_test_store();
3103 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3104 let name = planner.register_edge_column(&Some("r".to_string()));
3105 assert_eq!(name, "r");
3106 assert!(planner.edge_columns.borrow().contains("r"));
3107 }
3108
3109 #[test]
3110 fn test_register_edge_column_anonymous_counter_advances() {
3111 let store = create_test_store();
3112 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3113 let a = planner.register_edge_column(&None);
3114 let b = planner.register_edge_column(&None);
3115 assert_eq!(a, "_anon_edge_0");
3116 assert_eq!(b, "_anon_edge_1");
3117 assert!(planner.edge_columns.borrow().contains("_anon_edge_0"));
3118 assert!(planner.edge_columns.borrow().contains("_anon_edge_1"));
3119 }
3120
3121 #[test]
3124 fn test_create_node_without_write_store_errors() {
3125 let store = create_test_store();
3127 let planner = Planner::new(store);
3128
3129 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3130 variable: "n".to_string(),
3131 labels: vec!["Person".to_string()],
3132 properties: vec![],
3133 input: None,
3134 }));
3135
3136 let result = planner.plan(&logical);
3137 assert!(result.is_err());
3138 }
3139
3140 #[test]
3143 fn test_plan_profiled_collects_entries() {
3144 let store = create_test_store();
3145 let planner = Planner::new(store);
3146
3147 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3148 items: vec![ReturnItem {
3149 expression: LogicalExpression::Variable("n".to_string()),
3150 alias: None,
3151 }],
3152 distinct: false,
3153 input: Box::new(scan_person("n")),
3154 }));
3155
3156 let (physical, entries) = planner.plan_profiled(&logical).unwrap();
3157 assert_eq!(physical.columns(), &["n"]);
3158 assert!(
3160 entries.len() >= 2,
3161 "expected entries, got {}",
3162 entries.len()
3163 );
3164 assert!(!planner.profiling.get());
3166 }
3167
3168 #[test]
3169 fn test_plan_profiled_propagates_plan_errors() {
3170 let store = create_test_store();
3171 let planner = Planner::new(store);
3172 let logical = LogicalPlan::new(LogicalOperator::Empty);
3173 let result = planner.plan_profiled(&logical);
3174 assert!(result.is_err());
3175 assert!(!planner.profiling.get());
3177 }
3178
3179 #[test]
3182 fn test_plan_edge_scan_is_unsupported() {
3183 let store = create_test_store();
3185 let planner = Planner::new(store);
3186 let logical = LogicalPlan::new(LogicalOperator::EdgeScan(EdgeScanOp {
3187 variable: "e".to_string(),
3188 edge_types: vec![],
3189 input: None,
3190 }));
3191 let err = planner.plan(&logical).err().expect("plan should fail");
3192 assert!(format!("{err}").contains("Unsupported operator"));
3193 }
3194
3195 #[test]
3196 fn test_plan_triple_scan_is_unsupported() {
3197 let store = create_test_store();
3198 let planner = Planner::new(store);
3199 let logical = LogicalPlan::new(LogicalOperator::TripleScan(TripleScanOp {
3200 subject: TripleComponent::Variable("s".to_string()),
3201 predicate: TripleComponent::Variable("p".to_string()),
3202 object: TripleComponent::Variable("o".to_string()),
3203 graph: None,
3204 input: None,
3205 dataset: None,
3206 }));
3207 assert!(planner.plan(&logical).is_err());
3208 }
3209
3210 #[test]
3211 fn test_plan_bind_is_unsupported() {
3212 let store = create_test_store();
3213 let planner = Planner::new(store);
3214 let logical = LogicalPlan::new(LogicalOperator::Bind(BindOp {
3215 expression: LogicalExpression::Literal(Value::Int64(1)),
3216 variable: "x".to_string(),
3217 input: Box::new(scan_any("n")),
3218 }));
3219 assert!(planner.plan(&logical).is_err());
3220 }
3221
3222 #[test]
3223 fn test_plan_parameter_scan_without_apply_errors() {
3224 let store = create_test_store();
3225 let planner = Planner::new(store);
3226 let logical = LogicalPlan::new(LogicalOperator::ParameterScan(ParameterScanOp {
3227 columns: vec!["n".to_string()],
3228 }));
3229 let err = planner.plan(&logical).err().expect("plan should fail");
3230 assert!(format!("{err}").contains("ParameterScan"));
3231 }
3232
3233 #[test]
3236 fn test_plan_union_dispatch() {
3237 let store = create_test_store();
3238 let planner = Planner::new(store);
3239 let logical = LogicalPlan::new(LogicalOperator::Union(UnionOp {
3240 inputs: vec![scan_person("n"), scan_person("n")],
3241 }));
3242 let physical = planner.plan(&logical).unwrap();
3243 assert_eq!(physical.columns(), &["n"]);
3244 }
3245
3246 #[test]
3247 fn test_plan_except_dispatch() {
3248 let store = create_test_store();
3249 let planner = Planner::new(store);
3250 let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3251 left: Box::new(scan_person("n")),
3252 right: Box::new(scan_person("n")),
3253 all: false,
3254 }));
3255 let physical = planner.plan(&logical).unwrap();
3256 assert_eq!(physical.columns(), &["n"]);
3257 }
3258
3259 #[test]
3260 fn test_plan_intersect_dispatch() {
3261 let store = create_test_store();
3262 let planner = Planner::new(store);
3263 let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3264 left: Box::new(scan_person("n")),
3265 right: Box::new(scan_person("n")),
3266 all: false,
3267 }));
3268 let physical = planner.plan(&logical).unwrap();
3269 assert_eq!(physical.columns(), &["n"]);
3270 }
3271
3272 #[test]
3273 fn test_plan_otherwise_dispatch() {
3274 let store = create_test_store();
3275 let planner = Planner::new(store);
3276 let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3277 left: Box::new(scan_person("n")),
3278 right: Box::new(scan_any("n")),
3279 }));
3280 let physical = planner.plan(&logical).unwrap();
3281 assert_eq!(physical.columns(), &["n"]);
3282 }
3283
3284 #[test]
3285 fn test_plan_left_join_dispatch() {
3286 let store = create_test_store();
3287 let planner = Planner::new(store);
3288 let logical = LogicalPlan::new(LogicalOperator::LeftJoin(LeftJoinOp {
3289 left: Box::new(scan_any("a")),
3290 right: Box::new(scan_any("b")),
3291 condition: None,
3292 }));
3293 let physical = planner.plan(&logical).unwrap();
3294 assert!(physical.columns().contains(&"a".to_string()));
3295 assert!(physical.columns().contains(&"b".to_string()));
3296 }
3297
3298 #[test]
3299 fn test_plan_anti_join_dispatch() {
3300 let store = create_test_store();
3301 let planner = Planner::new(store);
3302 let logical = LogicalPlan::new(LogicalOperator::AntiJoin(AntiJoinOp {
3303 left: Box::new(scan_any("a")),
3304 right: Box::new(scan_any("b")),
3305 }));
3306 let physical = planner.plan(&logical).unwrap();
3307 assert!(physical.columns().contains(&"a".to_string()));
3308 }
3309
3310 #[test]
3311 fn test_plan_apply_uncorrelated_dispatch() {
3312 let store = create_test_store();
3313 let planner = Planner::new(store);
3314 let logical = LogicalPlan::new(LogicalOperator::Apply(ApplyOp {
3315 input: Box::new(scan_any("a")),
3316 subplan: Box::new(scan_any("b")),
3317 shared_variables: vec![],
3318 optional: false,
3319 }));
3320 let physical = planner.plan(&logical).unwrap();
3321 assert!(physical.columns().contains(&"a".to_string()));
3322 assert!(physical.columns().contains(&"b".to_string()));
3323 }
3324
3325 #[test]
3326 fn test_plan_unwind_literal_list() {
3327 let store = create_test_store();
3328 let planner = Planner::new(store);
3329
3330 let logical = LogicalPlan::new(LogicalOperator::Unwind(UnwindOp {
3332 expression: LogicalExpression::List(vec![
3333 LogicalExpression::Literal(Value::Int64(1)),
3334 LogicalExpression::Literal(Value::Int64(2)),
3335 LogicalExpression::Literal(Value::Int64(3)),
3336 ]),
3337 variable: "x".to_string(),
3338 ordinality_var: None,
3339 offset_var: None,
3340 input: Box::new(LogicalOperator::Empty),
3341 }));
3342 let physical = planner.plan(&logical).unwrap();
3343 assert!(physical.columns().contains(&"x".to_string()));
3344 }
3345
3346 #[test]
3347 fn test_plan_merge_dispatch() {
3348 let store = create_test_store();
3349 let planner = create_writable_planner(&store);
3350
3351 let logical = LogicalPlan::new(LogicalOperator::Merge(MergeOp {
3353 variable: "n".to_string(),
3354 labels: vec!["Person".to_string()],
3355 match_properties: vec![],
3356 on_create: vec![],
3357 on_match: vec![],
3358 input: Box::new(LogicalOperator::Empty),
3359 }));
3360 let physical = planner.plan(&logical).unwrap();
3361 assert!(physical.columns().contains(&"n".to_string()));
3362 }
3363
3364 #[test]
3365 fn test_plan_merge_relationship_dispatch() {
3366 let store = full_store();
3367 let planner = create_writable_planner(&store);
3368
3369 let logical = LogicalPlan::new(LogicalOperator::MergeRelationship(MergeRelationshipOp {
3371 variable: "r".to_string(),
3372 source_variable: "a".to_string(),
3373 target_variable: "b".to_string(),
3374 edge_type: "KNOWS".to_string(),
3375 match_properties: vec![],
3376 on_create: vec![],
3377 on_match: vec![],
3378 input: Box::new(LogicalOperator::Join(JoinOp {
3379 left: Box::new(scan_person("a")),
3380 right: Box::new(scan_person("b")),
3381 join_type: JoinType::Cross,
3382 conditions: vec![],
3383 })),
3384 }));
3385 let physical = planner.plan(&logical).unwrap();
3386 assert!(physical.columns().contains(&"r".to_string()));
3387 }
3388
3389 #[test]
3390 fn test_plan_add_label_dispatch() {
3391 let store = full_store();
3392 let planner = create_writable_planner(&store);
3393 let logical = LogicalPlan::new(LogicalOperator::AddLabel(AddLabelOp {
3394 variable: "n".to_string(),
3395 labels: vec!["VIP".to_string()],
3396 input: Box::new(scan_person("n")),
3397 }));
3398 let physical = planner.plan(&logical).unwrap();
3399 assert!(physical.columns().contains(&"labels_added".to_string()));
3400 }
3401
3402 #[test]
3403 fn test_plan_remove_label_dispatch() {
3404 let store = full_store();
3405 let planner = create_writable_planner(&store);
3406 let logical = LogicalPlan::new(LogicalOperator::RemoveLabel(RemoveLabelOp {
3407 variable: "n".to_string(),
3408 labels: vec!["Person".to_string()],
3409 input: Box::new(scan_person("n")),
3410 }));
3411 let physical = planner.plan(&logical).unwrap();
3412 assert!(physical.columns().contains(&"labels_removed".to_string()));
3413 }
3414
3415 #[test]
3416 fn test_plan_set_property_dispatch() {
3417 let store = full_store();
3418 let planner = create_writable_planner(&store);
3419 let logical = LogicalPlan::new(LogicalOperator::SetProperty(SetPropertyOp {
3420 variable: "n".to_string(),
3421 properties: vec![(
3422 "city".to_string(),
3423 LogicalExpression::Literal(Value::String("Amsterdam".into())),
3424 )],
3425 replace: false,
3426 is_edge: false,
3427 input: Box::new(scan_person("n")),
3428 }));
3429 let physical = planner.plan(&logical).unwrap();
3430 assert!(physical.columns().contains(&"n".to_string()));
3431 }
3432
3433 #[test]
3434 fn test_plan_delete_edge_dispatch() {
3435 let store = full_store();
3436 let planner = create_writable_planner(&store);
3437
3438 let expand_op = LogicalOperator::Expand(ExpandOp {
3440 from_variable: "a".to_string(),
3441 to_variable: "b".to_string(),
3442 edge_variable: Some("r".to_string()),
3443 direction: ExpandDirection::Outgoing,
3444 edge_types: vec!["KNOWS".to_string()],
3445 min_hops: 1,
3446 max_hops: Some(1),
3447 input: Box::new(scan_person("a")),
3448 path_alias: None,
3449 path_mode: PathMode::Walk,
3450 });
3451 let logical = LogicalPlan::new(LogicalOperator::DeleteEdge(DeleteEdgeOp {
3452 variable: "r".to_string(),
3453 input: Box::new(expand_op),
3454 }));
3455 let physical = planner.plan(&logical).unwrap();
3456 assert!(physical.columns().contains(&"r".to_string()));
3457 }
3458
3459 #[test]
3460 fn test_plan_shortest_path_dispatch() {
3461 let store = full_store();
3462 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3463
3464 let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3466 input: Box::new(LogicalOperator::Join(JoinOp {
3467 left: Box::new(scan_person("a")),
3468 right: Box::new(scan_person("b")),
3469 join_type: JoinType::Cross,
3470 conditions: vec![],
3471 })),
3472 source_var: "a".to_string(),
3473 target_var: "b".to_string(),
3474 edge_types: vec!["KNOWS".to_string()],
3475 direction: ExpandDirection::Outgoing,
3476 path_alias: "p".to_string(),
3477 all_paths: false,
3478 }));
3479 let physical = planner.plan(&logical).unwrap();
3480 assert!(
3481 physical
3482 .columns()
3483 .iter()
3484 .any(|c| c.contains("_path_length_p"))
3485 );
3486 }
3487
3488 #[test]
3489 fn test_plan_shortest_path_missing_source_errors() {
3490 let store = full_store();
3491 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3492 let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3493 input: Box::new(scan_person("a")),
3494 source_var: "missing".to_string(),
3495 target_var: "a".to_string(),
3496 edge_types: vec![],
3497 direction: ExpandDirection::Both,
3498 path_alias: "p".to_string(),
3499 all_paths: false,
3500 }));
3501 let err = planner.plan(&logical).err().expect("plan should fail");
3502 assert!(format!("{err}").contains("Source variable"));
3503 }
3504
3505 #[test]
3506 fn test_plan_map_collect_dispatch() {
3507 let store = create_test_store();
3509 let planner = Planner::new(store);
3510 let input_with_kv = LogicalOperator::Project(crate::query::plan::ProjectOp {
3511 projections: vec![
3512 crate::query::plan::Projection {
3513 expression: LogicalExpression::Literal(Value::String("key".into())),
3514 alias: Some("k".to_string()),
3515 },
3516 crate::query::plan::Projection {
3517 expression: LogicalExpression::Literal(Value::Int64(1)),
3518 alias: Some("v".to_string()),
3519 },
3520 ],
3521 input: Box::new(scan_person("n")),
3522 pass_through_input: false,
3523 });
3524 let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3525 key_var: "k".to_string(),
3526 value_var: "v".to_string(),
3527 alias: "m".to_string(),
3528 input: Box::new(input_with_kv),
3529 }));
3530 let physical = planner.plan(&logical).unwrap();
3531 assert_eq!(physical.columns(), &["m"]);
3532 }
3533
3534 #[test]
3535 fn test_plan_map_collect_missing_key_errors() {
3536 let store = create_test_store();
3537 let planner = Planner::new(store);
3538 let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3539 key_var: "not_there".to_string(),
3540 value_var: "also_missing".to_string(),
3541 alias: "m".to_string(),
3542 input: Box::new(scan_any("n")),
3543 }));
3544 let err = planner.plan(&logical).err().expect("plan should fail");
3545 let msg = format!("{err}");
3546 assert!(msg.contains("MapCollect key"), "got: {msg}");
3547 }
3548
3549 #[test]
3550 fn test_plan_map_collect_missing_value_errors() {
3551 let store = create_test_store();
3552 let planner = Planner::new(store);
3553 let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3555 key_var: "n".to_string(),
3556 value_var: "missing_value".to_string(),
3557 alias: "m".to_string(),
3558 input: Box::new(scan_any("n")),
3559 }));
3560 let err = planner.plan(&logical).err().expect("plan should fail");
3561 let msg = format!("{err}");
3562 assert!(msg.contains("MapCollect value"), "got: {msg}");
3563 }
3564
3565 #[test]
3566 fn test_plan_horizontal_aggregate_missing_column_errors() {
3567 let store = create_test_store();
3568 let planner = Planner::new(store);
3569 let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3570 HorizontalAggregateOp {
3571 list_column: "not_a_column".to_string(),
3572 entity_kind: crate::query::plan::EntityKind::Edge,
3573 function: LogicalAggregateFunction::Count,
3574 property: "age".to_string(),
3575 alias: "total".to_string(),
3576 input: Box::new(scan_any("n")),
3577 },
3578 ));
3579 let err = planner.plan(&logical).err().expect("plan should fail");
3580 assert!(format!("{err}").contains("HorizontalAggregate"));
3581 }
3582
3583 #[test]
3584 fn test_plan_load_data_dispatch() {
3585 let store = create_test_store();
3586 let planner = Planner::new(store);
3587 let logical = LogicalPlan::new(LogicalOperator::LoadData(LoadDataOp {
3589 format: LoadDataFormat::Csv,
3590 with_headers: true,
3591 path: "/nonexistent/data.csv".to_string(),
3592 variable: "row".to_string(),
3593 field_terminator: Some(','),
3594 }));
3595 let physical = planner.plan(&logical).unwrap();
3596 assert_eq!(physical.columns(), &["row"]);
3597 }
3598
3599 #[test]
3600 fn test_plan_multi_way_join_dispatch() {
3601 let store = full_store();
3604 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3605 let ab = LogicalOperator::Expand(ExpandOp {
3606 from_variable: "a".to_string(),
3607 to_variable: "b".to_string(),
3608 edge_variable: None,
3609 direction: ExpandDirection::Outgoing,
3610 edge_types: vec!["KNOWS".to_string()],
3611 min_hops: 1,
3612 max_hops: Some(1),
3613 input: Box::new(scan_person("a")),
3614 path_alias: None,
3615 path_mode: PathMode::Walk,
3616 });
3617 let bc = LogicalOperator::Expand(ExpandOp {
3618 from_variable: "b".to_string(),
3619 to_variable: "c".to_string(),
3620 edge_variable: None,
3621 direction: ExpandDirection::Outgoing,
3622 edge_types: vec!["KNOWS".to_string()],
3623 min_hops: 1,
3624 max_hops: Some(1),
3625 input: Box::new(scan_person("b")),
3626 path_alias: None,
3627 path_mode: PathMode::Walk,
3628 });
3629 let ca = LogicalOperator::Expand(ExpandOp {
3630 from_variable: "c".to_string(),
3631 to_variable: "a".to_string(),
3632 edge_variable: None,
3633 direction: ExpandDirection::Outgoing,
3634 edge_types: vec!["KNOWS".to_string()],
3635 min_hops: 1,
3636 max_hops: Some(1),
3637 input: Box::new(scan_person("c")),
3638 path_alias: None,
3639 path_mode: PathMode::Walk,
3640 });
3641 let logical = LogicalPlan::new(LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3642 inputs: vec![ab, bc, ca],
3643 conditions: vec![],
3644 shared_variables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
3645 }));
3646 let _ = planner.plan(&logical);
3647 }
3648
3649 #[test]
3650 fn test_plan_horizontal_aggregate_dispatch() {
3651 let store = full_store();
3653 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3654
3655 let path = LogicalOperator::Expand(ExpandOp {
3656 from_variable: "a".to_string(),
3657 to_variable: "b".to_string(),
3658 edge_variable: Some("r".to_string()),
3659 direction: ExpandDirection::Outgoing,
3660 edge_types: vec!["KNOWS".to_string()],
3661 min_hops: 1,
3662 max_hops: Some(3),
3663 input: Box::new(scan_person("a")),
3664 path_alias: Some("p".to_string()),
3665 path_mode: PathMode::Walk,
3666 });
3667 let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3669 HorizontalAggregateOp {
3670 list_column: "_path_edges_p".to_string(),
3671 entity_kind: crate::query::plan::EntityKind::Edge,
3672 function: LogicalAggregateFunction::Count,
3673 property: "weight".to_string(),
3674 alias: "edge_count".to_string(),
3675 input: Box::new(path),
3676 },
3677 ));
3678 let physical = planner.plan(&logical).unwrap();
3679 assert!(physical.columns().contains(&"edge_count".to_string()));
3680 }
3681
3682 #[test]
3685 fn test_plan_adaptive_with_except() {
3686 let store = create_test_store();
3687 let planner = Planner::new(store);
3688 let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3689 left: Box::new(scan_person("n")),
3690 right: Box::new(scan_person("n")),
3691 all: false,
3692 }));
3693 let physical = planner.plan_adaptive(&logical).unwrap();
3694 assert!(physical.adaptive_context.is_some());
3695 }
3696
3697 #[test]
3698 fn test_plan_adaptive_with_intersect() {
3699 let store = create_test_store();
3700 let planner = Planner::new(store);
3701 let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3702 left: Box::new(scan_person("n")),
3703 right: Box::new(scan_any("n")),
3704 all: false,
3705 }));
3706 let physical = planner.plan_adaptive(&logical).unwrap();
3707 assert!(physical.adaptive_context.is_some());
3708 }
3709
3710 #[test]
3711 fn test_plan_adaptive_with_otherwise() {
3712 let store = create_test_store();
3713 let planner = Planner::new(store);
3714 let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3715 left: Box::new(scan_person("n")),
3716 right: Box::new(scan_any("n")),
3717 }));
3718 let physical = planner.plan_adaptive(&logical).unwrap();
3719 assert!(physical.adaptive_context.is_some());
3720 }
3721
3722 #[test]
3725 fn test_count_expand_chain_variable_length_breaks_chain() {
3726 let var_expand = LogicalOperator::Expand(ExpandOp {
3728 from_variable: "a".to_string(),
3729 to_variable: "b".to_string(),
3730 edge_variable: None,
3731 direction: ExpandDirection::Outgoing,
3732 edge_types: vec!["KNOWS".to_string()],
3733 min_hops: 1,
3734 max_hops: Some(3),
3735 input: Box::new(scan_person("a")),
3736 path_alias: None,
3737 path_mode: PathMode::Walk,
3738 });
3739 let (count, _) = Planner::count_expand_chain(&var_expand);
3740 assert_eq!(count, 0);
3741 }
3742
3743 #[cfg(feature = "algos")]
3746 #[test]
3747 fn test_static_result_operator_emits_rows_and_resets() {
3748 use grafeo_common::types::Value;
3749 let rows = vec![
3750 vec![Value::Int64(1), Value::String("Vincent".into())],
3751 vec![Value::Int64(2), Value::String("Jules".into())],
3752 ];
3753 let mut op = StaticResultOperator {
3754 rows,
3755 column_indices: vec![0, 1],
3756 row_index: 0,
3757 };
3758 assert_eq!(op.name(), "StaticResult");
3759 let chunk = op.next().unwrap().expect("first chunk");
3760 assert_eq!(chunk.row_count(), 2);
3761 assert!(op.next().unwrap().is_none());
3763 op.reset();
3765 assert!(op.next().unwrap().is_some());
3766 let boxed: Box<dyn Operator> = Box::new(StaticResultOperator {
3768 rows: vec![vec![Value::Null]],
3769 column_indices: vec![0],
3770 row_index: 0,
3771 });
3772 let _any = boxed.into_any();
3773 }
3774
3775 #[cfg(feature = "vector-index")]
3788 #[test]
3789 fn test_plan_vector_scan_k_none_bounds_across_label_states() {
3790 use crate::query::plan::VectorScanOp;
3791
3792 let store = create_test_store();
3793 assert_eq!(store.nodes_by_label_count("Person"), 2);
3795 assert_eq!(store.nodes_by_label_count("Unknown"), 0);
3796 assert_eq!(store.node_count(), 3);
3797
3798 let planner = Planner::new(store);
3799 let make_scan = |label: Option<&str>| VectorScanOp {
3800 variable: "n".to_string(),
3801 index_name: None,
3802 property: "embedding".to_string(),
3803 label: label.map(str::to_string),
3804 query_vector: LogicalExpression::Literal(Value::List(
3805 vec![
3806 Value::Float64(1.0),
3807 Value::Float64(0.0),
3808 Value::Float64(0.0),
3809 ]
3810 .into(),
3811 )),
3812 k: None,
3813 metric: Some(VectorMetric::Cosine),
3814 min_similarity: Some(0.5),
3815 max_distance: None,
3816 input: None,
3817 };
3818
3819 for label in [Some("Person"), Some("Unknown"), None] {
3820 let (_op, cols) = planner
3821 .plan_vector_scan(&make_scan(label))
3822 .unwrap_or_else(|e| panic!("plan_vector_scan failed for {label:?}: {e:?}"));
3823 assert_eq!(cols[0], "n", "variable column must be first for {label:?}");
3824 }
3825 }
3826}