1mod aggregate;
95mod expand;
96mod expression;
97mod filter;
98mod filter_hybrid;
99mod join;
100mod mutation;
101mod project;
102mod scan;
103
104#[cfg(feature = "algos")]
105use crate::query::plan::CallProcedureOp;
106#[cfg(feature = "text-index")]
107use crate::query::plan::TextScanOp;
108use crate::query::plan::{
109 AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
110 BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
111 EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
112 HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
113 LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
114 NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
115 SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
116};
117#[cfg(feature = "vector-index")]
118use crate::query::plan::{VectorMetric, VectorScanOp};
119use grafeo_common::grafeo_debug_span;
120use grafeo_common::types::{EpochId, TransactionId};
121use grafeo_common::types::{LogicalType, Value};
122use grafeo_common::utils::error::{Error, Result};
123use grafeo_core::execution::AdaptiveContext;
124use grafeo_core::execution::operators::{
125 AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
126 CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
127 DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
128 ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
129 FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
130 JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
131 LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
132 MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
133 ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
134 ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
135 SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
136 VariableLengthExpandOperator,
137};
138use grafeo_core::graph::{Direction, GraphStoreMut, GraphStoreSearch};
139use std::collections::HashMap;
140use std::sync::Arc;
141
142use crate::query::planner::common;
143use crate::query::planner::common::expression_to_string;
144use crate::query::planner::{
145 PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
146 convert_unary_op, value_to_logical_type,
147};
148use crate::transaction::TransactionManager;
149
150struct RangeBounds<'a> {
152 min: Option<&'a Value>,
153 max: Option<&'a Value>,
154 min_inclusive: bool,
155 max_inclusive: bool,
156}
157
158pub struct Planner {
160 pub(super) store: Arc<dyn GraphStoreSearch>,
162 pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
164 pub(super) transaction_manager: Option<Arc<TransactionManager>>,
166 pub(super) transaction_id: Option<TransactionId>,
168 pub(super) viewing_epoch: EpochId,
170 pub(super) anon_edge_counter: std::cell::Cell<u32>,
172 pub(super) factorized_execution: bool,
174 pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
177 pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
180 pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
182 pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
184 #[cfg(feature = "lpg")]
187 pub(super) lpg_store: Option<Arc<grafeo_core::graph::lpg::LpgStore>>,
188 pub(super) correlated_param_state:
192 std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
193 pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
196 profiling: std::cell::Cell<bool>,
198 profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
200 write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
202 pub(super) session_context: grafeo_core::execution::operators::SessionContext,
204 pub(super) read_only: bool,
208}
209
210impl Planner {
211 #[must_use]
216 pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
217 let epoch = store.current_epoch();
218 Self {
219 store,
220 write_store: None,
221 transaction_manager: None,
222 transaction_id: None,
223 viewing_epoch: epoch,
224 anon_edge_counter: std::cell::Cell::new(0),
225 factorized_execution: true,
226 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
227 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
228 validator: None,
229 catalog: None,
230 #[cfg(feature = "lpg")]
231 lpg_store: None,
232 correlated_param_state: std::cell::RefCell::new(None),
233 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
234 profiling: std::cell::Cell::new(false),
235 profile_entries: std::cell::RefCell::new(Vec::new()),
236 write_tracker: None,
237 session_context: grafeo_core::execution::operators::SessionContext::default(),
238 read_only: false,
239 }
240 }
241
242 #[must_use]
244 pub fn with_context(
245 store: Arc<dyn GraphStoreSearch>,
246 write_store: Option<Arc<dyn GraphStoreMut>>,
247 transaction_manager: Arc<TransactionManager>,
248 transaction_id: Option<TransactionId>,
249 viewing_epoch: EpochId,
250 ) -> Self {
251 use crate::transaction::TransactionWriteTracker;
252
253 let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
255 if transaction_id.is_some() {
256 Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
257 &transaction_manager,
258 ))))
259 } else {
260 None
261 };
262
263 Self {
264 store,
265 write_store,
266 transaction_manager: Some(transaction_manager),
267 transaction_id,
268 viewing_epoch,
269 anon_edge_counter: std::cell::Cell::new(0),
270 factorized_execution: true,
271 scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
272 edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
273 validator: None,
274 catalog: None,
275 #[cfg(feature = "lpg")]
276 lpg_store: None,
277 correlated_param_state: std::cell::RefCell::new(None),
278 group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
279 profiling: std::cell::Cell::new(false),
280 profile_entries: std::cell::RefCell::new(Vec::new()),
281 write_tracker,
282 session_context: grafeo_core::execution::operators::SessionContext::default(),
283 read_only: false,
284 }
285 }
286
287 #[must_use]
290 pub fn with_read_only(mut self, read_only: bool) -> Self {
291 self.read_only = read_only;
292 self
293 }
294
295 fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
297 self.write_store
298 .as_ref()
299 .map(Arc::clone)
300 .ok_or(Error::Transaction(
301 grafeo_common::utils::error::TransactionError::ReadOnly,
302 ))
303 }
304
305 #[must_use]
307 pub fn viewing_epoch(&self) -> EpochId {
308 self.viewing_epoch
309 }
310
311 #[must_use]
313 pub fn transaction_id(&self) -> Option<TransactionId> {
314 self.transaction_id
315 }
316
317 #[must_use]
319 pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
320 self.transaction_manager.as_ref()
321 }
322
323 #[must_use]
325 pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
326 self.factorized_execution = enabled;
327 self
328 }
329
330 #[must_use]
332 pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
333 self.validator = Some(validator);
334 self
335 }
336
337 #[must_use]
339 pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
340 self.catalog = Some(catalog);
341 self
342 }
343
344 #[cfg(feature = "lpg")]
347 #[must_use]
348 pub fn with_lpg_store(mut self, lpg_store: Arc<grafeo_core::graph::lpg::LpgStore>) -> Self {
349 self.lpg_store = Some(lpg_store);
350 self
351 }
352
353 #[must_use]
355 pub fn with_session_context(
356 mut self,
357 context: grafeo_core::execution::operators::SessionContext,
358 ) -> Self {
359 self.session_context = context;
360 self
361 }
362
363 pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
369 let name = edge_variable.clone().unwrap_or_else(|| {
370 let count = self.anon_edge_counter.get();
371 self.anon_edge_counter.set(count + 1);
372 format!("_anon_edge_{}", count)
373 });
374 self.edge_columns.borrow_mut().insert(name.clone());
375 name
376 }
377
378 fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
382 match op {
383 LogicalOperator::Expand(expand) => {
384 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
385
386 if is_single_hop {
387 let (inner_count, base) = Self::count_expand_chain(&expand.input);
388 (inner_count + 1, base)
389 } else {
390 (0, op)
391 }
392 }
393 _ => (0, op),
394 }
395 }
396
397 fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
401 let mut chain = Vec::new();
402 let mut current = op;
403
404 while let LogicalOperator::Expand(expand) = current {
405 let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
406 if !is_single_hop {
407 break;
408 }
409 chain.push(expand);
410 current = &expand.input;
411 }
412
413 chain.reverse();
414 chain
415 }
416
417 pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
424 let _span = grafeo_debug_span!("grafeo::query::plan");
425 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
426 Ok(PhysicalPlan {
427 operator,
428 columns,
429 adaptive_context: None,
430 })
431 }
432
433 pub fn plan_profiled(
444 &self,
445 logical_plan: &LogicalPlan,
446 ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
447 self.profiling.set(true);
448 self.profile_entries.borrow_mut().clear();
449
450 let result = self.plan_operator(&logical_plan.root);
451
452 self.profiling.set(false);
453 let (operator, columns) = result?;
454 let entries = self.profile_entries.borrow_mut().drain(..).collect();
455
456 Ok((
457 PhysicalPlan {
458 operator,
459 columns,
460 adaptive_context: None,
461 },
462 entries,
463 ))
464 }
465
466 pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
473 let (operator, columns) = self.plan_operator(&logical_plan.root)?;
474
475 let mut adaptive_context = AdaptiveContext::new();
476 self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
477
478 Ok(PhysicalPlan {
479 operator,
480 columns,
481 adaptive_context: Some(adaptive_context),
482 })
483 }
484
485 fn collect_cardinality_estimates(
487 &self,
488 op: &LogicalOperator,
489 ctx: &mut AdaptiveContext,
490 depth: usize,
491 ) {
492 match op {
493 LogicalOperator::NodeScan(scan) => {
494 let estimate = if let Some(label) = &scan.label {
495 self.store.nodes_by_label(label).len() as f64
496 } else {
497 self.store.node_count() as f64
498 };
499 let id = format!("scan_{}", scan.variable);
500 ctx.set_estimate(&id, estimate);
501
502 if let Some(input) = &scan.input {
503 self.collect_cardinality_estimates(input, ctx, depth + 1);
504 }
505 }
506 LogicalOperator::Filter(filter) => {
507 let input_estimate = self.estimate_cardinality(&filter.input);
508 let estimate = input_estimate * 0.3;
509 let id = format!("filter_{depth}");
510 ctx.set_estimate(&id, estimate);
511
512 self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
513 }
514 LogicalOperator::Expand(expand) => {
515 let input_estimate = self.estimate_cardinality(&expand.input);
516 let stats = self.store.statistics();
517 let avg_degree = self.estimate_expand_degree(&stats, expand);
518 let estimate = input_estimate * avg_degree;
519 let id = format!("expand_{}", expand.to_variable);
520 ctx.set_estimate(&id, estimate);
521
522 self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
523 }
524 LogicalOperator::Join(join) => {
525 let left_est = self.estimate_cardinality(&join.left);
526 let right_est = self.estimate_cardinality(&join.right);
527 let estimate = (left_est * right_est).sqrt();
528 let id = format!("join_{depth}");
529 ctx.set_estimate(&id, estimate);
530
531 self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
532 self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
533 }
534 LogicalOperator::Aggregate(agg) => {
535 let input_estimate = self.estimate_cardinality(&agg.input);
536 let estimate = if agg.group_by.is_empty() {
537 1.0
538 } else {
539 (input_estimate * 0.1).max(1.0)
540 };
541 let id = format!("aggregate_{depth}");
542 ctx.set_estimate(&id, estimate);
543
544 self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
545 }
546 LogicalOperator::Distinct(distinct) => {
547 let input_estimate = self.estimate_cardinality(&distinct.input);
548 let estimate = (input_estimate * 0.5).max(1.0);
549 let id = format!("distinct_{depth}");
550 ctx.set_estimate(&id, estimate);
551
552 self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
553 }
554 LogicalOperator::Return(ret) => {
555 self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
556 }
557 LogicalOperator::Limit(limit) => {
558 let input_estimate = self.estimate_cardinality(&limit.input);
559 let estimate = (input_estimate).min(limit.count.estimate());
560 let id = format!("limit_{depth}");
561 ctx.set_estimate(&id, estimate);
562
563 self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
564 }
565 LogicalOperator::Skip(skip) => {
566 let input_estimate = self.estimate_cardinality(&skip.input);
567 let estimate = (input_estimate - skip.count.estimate()).max(0.0);
568 let id = format!("skip_{depth}");
569 ctx.set_estimate(&id, estimate);
570
571 self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
572 }
573 LogicalOperator::Sort(sort) => {
574 self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
575 }
576 LogicalOperator::Union(union) => {
577 let estimate: f64 = union
578 .inputs
579 .iter()
580 .map(|input| self.estimate_cardinality(input))
581 .sum();
582 let id = format!("union_{depth}");
583 ctx.set_estimate(&id, estimate);
584
585 for input in &union.inputs {
586 self.collect_cardinality_estimates(input, ctx, depth + 1);
587 }
588 }
589 _ => {
590 }
592 }
593 }
594
595 fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
597 match op {
598 LogicalOperator::NodeScan(scan) => {
599 if let Some(label) = &scan.label {
600 self.store.nodes_by_label(label).len() as f64
601 } else {
602 self.store.node_count() as f64
603 }
604 }
605 LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
606 LogicalOperator::Expand(expand) => {
607 let stats = self.store.statistics();
608 let avg_degree = self.estimate_expand_degree(&stats, expand);
609 self.estimate_cardinality(&expand.input) * avg_degree
610 }
611 LogicalOperator::Join(join) => {
612 let left = self.estimate_cardinality(&join.left);
613 let right = self.estimate_cardinality(&join.right);
614 (left * right).sqrt()
615 }
616 LogicalOperator::Aggregate(agg) => {
617 if agg.group_by.is_empty() {
618 1.0
619 } else {
620 (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
621 }
622 }
623 LogicalOperator::Distinct(distinct) => {
624 (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
625 }
626 LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
627 LogicalOperator::Limit(limit) => self
628 .estimate_cardinality(&limit.input)
629 .min(limit.count.estimate()),
630 LogicalOperator::Skip(skip) => {
631 (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
632 }
633 LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
634 LogicalOperator::Union(union) => union
635 .inputs
636 .iter()
637 .map(|input| self.estimate_cardinality(input))
638 .sum(),
639 LogicalOperator::Except(except) => {
640 let left = self.estimate_cardinality(&except.left);
641 let right = self.estimate_cardinality(&except.right);
642 (left - right).max(0.0)
643 }
644 LogicalOperator::Intersect(intersect) => {
645 let left = self.estimate_cardinality(&intersect.left);
646 let right = self.estimate_cardinality(&intersect.right);
647 left.min(right)
648 }
649 LogicalOperator::Otherwise(otherwise) => self
650 .estimate_cardinality(&otherwise.left)
651 .max(self.estimate_cardinality(&otherwise.right)),
652 _ => 1000.0,
653 }
654 }
655
656 fn estimate_expand_degree(
658 &self,
659 stats: &grafeo_core::statistics::Statistics,
660 expand: &ExpandOp,
661 ) -> f64 {
662 let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
663 if expand.edge_types.len() == 1 {
664 stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
665 } else if stats.total_nodes > 0 {
666 (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
667 } else {
668 10.0
669 }
670 }
671
672 fn maybe_profile(
675 &self,
676 result: Result<(Box<dyn Operator>, Vec<String>)>,
677 op: &LogicalOperator,
678 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
679 if self.profiling.get() {
680 let (physical, columns) = result?;
681 let (entry, stats) =
682 crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
683 let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
684 self.profile_entries.borrow_mut().push(entry);
685 Ok((Box::new(profiled), columns))
686 } else {
687 result
688 }
689 }
690
691 fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
693 let result = match op {
694 LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
695 LogicalOperator::Expand(expand) => {
696 if self.factorized_execution {
702 let (chain_len, _base) = Self::count_expand_chain(op);
703 if chain_len >= 2 {
704 return self.maybe_profile(self.plan_expand_chain(op), op);
705 }
706 }
707 self.plan_expand(expand)
708 }
709 LogicalOperator::Return(ret) => self.plan_return(ret),
710 LogicalOperator::Filter(filter) => self.plan_filter(filter),
711 LogicalOperator::Project(project) => self.plan_project(project),
712 LogicalOperator::Limit(limit) => self.plan_limit(limit),
713 LogicalOperator::Skip(skip) => self.plan_skip(skip),
714 LogicalOperator::Sort(sort) => self.plan_sort(sort),
715 LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
716 LogicalOperator::Join(join) => self.plan_join(join),
717 LogicalOperator::Union(union) => self.plan_union(union),
718 LogicalOperator::Except(except) => self.plan_except(except),
719 LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
720 LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
721 LogicalOperator::Apply(apply) => self.plan_apply(apply),
722 LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
723 LogicalOperator::CreateNode(create) => self.plan_create_node(create),
724 LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
725 LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
726 LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
727 LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
728 LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
729 LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
730 LogicalOperator::Merge(merge) => self.plan_merge(merge),
731 LogicalOperator::MergeRelationship(merge_rel) => {
732 self.plan_merge_relationship(merge_rel)
733 }
734 LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
735 LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
736 LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
737 LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
738 LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
739 #[cfg(feature = "algos")]
740 LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
741 #[cfg(not(feature = "algos"))]
742 LogicalOperator::CallProcedure(_) => Err(Error::Internal(
743 "CALL procedures require the 'algos' feature".to_string(),
744 )),
745 LogicalOperator::ParameterScan(_param_scan) => {
746 let state = self
747 .correlated_param_state
748 .borrow()
749 .clone()
750 .ok_or_else(|| {
751 Error::Internal(
752 "ParameterScan without correlated Apply context".to_string(),
753 )
754 })?;
755 let columns = state.columns.clone();
758 let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
759 Ok((operator, columns))
760 }
761 LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
762 LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
763 LogicalOperator::LoadData(load) => {
764 let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
765 load.path.clone(),
766 load.format,
767 load.with_headers,
768 load.field_terminator,
769 load.variable.clone(),
770 ));
771 Ok((operator, vec![load.variable.clone()]))
772 }
773 LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
774 #[cfg(feature = "vector-index")]
775 LogicalOperator::VectorScan(scan) => self.plan_vector_scan(scan),
776 #[cfg(not(feature = "vector-index"))]
777 LogicalOperator::VectorScan(_) => Err(Error::Internal(
778 "VectorScan requires vector-index feature".to_string(),
779 )),
780 LogicalOperator::VectorJoin(_) => Err(Error::Internal(
781 "VectorJoin requires vector-index feature".to_string(),
782 )),
783 #[cfg(feature = "text-index")]
784 LogicalOperator::TextScan(scan) => self.plan_text_scan(scan),
785 #[cfg(not(feature = "text-index"))]
786 LogicalOperator::TextScan(_) => Err(Error::Internal(
787 "TextScan requires text-index feature".to_string(),
788 )),
789 _ => Err(Error::Internal(format!(
790 "Unsupported operator: {:?}",
791 std::mem::discriminant(op)
792 ))),
793 };
794 self.maybe_profile(result, op)
795 }
796
797 fn plan_horizontal_aggregate(
799 &self,
800 ha: &HorizontalAggregateOp,
801 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
802 let (child_op, child_columns) = self.plan_operator(&ha.input)?;
803
804 let list_col_idx = child_columns
805 .iter()
806 .position(|c| c == &ha.list_column)
807 .ok_or_else(|| {
808 Error::Internal(format!(
809 "HorizontalAggregate list column '{}' not found in {:?}",
810 ha.list_column, child_columns
811 ))
812 })?;
813
814 let entity_kind = match ha.entity_kind {
815 LogicalEntityKind::Edge => EntityKind::Edge,
816 LogicalEntityKind::Node => EntityKind::Node,
817 };
818
819 let function = convert_aggregate_function(ha.function);
820 let input_column_count = child_columns.len();
821
822 let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
823 child_op,
824 list_col_idx,
825 entity_kind,
826 function,
827 ha.property.clone(),
828 Arc::clone(&self.store) as Arc<dyn GraphStoreSearch>,
829 input_column_count,
830 ));
831
832 let mut columns = child_columns;
833 columns.push(ha.alias.clone());
834 self.scalar_columns.borrow_mut().insert(ha.alias.clone());
836
837 Ok((operator, columns))
838 }
839
840 fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
842 let (child_op, child_columns) = self.plan_operator(&mc.input)?;
843 let key_idx = child_columns
844 .iter()
845 .position(|c| c == &mc.key_var)
846 .ok_or_else(|| {
847 Error::Internal(format!(
848 "MapCollect key '{}' not in columns {:?}",
849 mc.key_var, child_columns
850 ))
851 })?;
852 let value_idx = child_columns
853 .iter()
854 .position(|c| c == &mc.value_var)
855 .ok_or_else(|| {
856 Error::Internal(format!(
857 "MapCollect value '{}' not in columns {:?}",
858 mc.value_var, child_columns
859 ))
860 })?;
861 let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
862 self.scalar_columns.borrow_mut().insert(mc.alias.clone());
863 Ok((operator, vec![mc.alias.clone()]))
864 }
865
866 #[cfg(feature = "text-index")]
868 fn plan_text_scan(&self, scan: &TextScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
869 use grafeo_core::execution::operators::TextScanOperator;
870
871 let query_string = match &scan.query {
872 LogicalExpression::Literal(Value::String(s)) => s.to_string(),
873 LogicalExpression::Parameter(name) => {
874 return Err(Error::Internal(format!(
875 "TextScan query parameter ${} not resolved",
876 name
877 )));
878 }
879 _ => {
880 return Err(Error::Internal(
881 "TextScan query must be a string literal or parameter".to_string(),
882 ));
883 }
884 };
885
886 let operator: Box<dyn Operator> = if let Some(k) = scan.k {
887 Box::new(TextScanOperator::top_k(
888 Arc::clone(&self.store),
889 &scan.label,
890 &scan.property,
891 &query_string,
892 k,
893 ))
894 } else if let Some(threshold) = scan.threshold {
895 Box::new(TextScanOperator::with_threshold(
896 Arc::clone(&self.store),
897 &scan.label,
898 &scan.property,
899 &query_string,
900 threshold,
901 ))
902 } else {
903 Box::new(TextScanOperator::top_k(
904 Arc::clone(&self.store),
905 &scan.label,
906 &scan.property,
907 &query_string,
908 100,
909 ))
910 };
911
912 let mut columns = vec![scan.variable.clone()];
913 if let Some(ref score_col) = scan.score_column {
914 columns.push(score_col.clone());
915 }
916
917 Ok((operator, columns))
918 }
919
920 #[cfg(feature = "vector-index")]
922 pub(super) fn plan_vector_scan(
923 &self,
924 scan: &VectorScanOp,
925 ) -> Result<(Box<dyn Operator>, Vec<String>)> {
926 use grafeo_core::execution::operators::VectorScanOperator;
927 use grafeo_core::index::vector::DistanceMetric;
928
929 if scan.input.is_some() {
934 return Err(Error::Internal(
935 "VectorScan with an input subtree is not supported, use VectorJoin for hybrid graph+vector queries".to_string(),
936 ));
937 }
938
939 let query_vec = self.resolve_vector_literal(&scan.query_vector)?;
940
941 let requested_metric = scan.metric.map(|m| match m {
942 VectorMetric::Cosine => DistanceMetric::Cosine,
943 VectorMetric::Euclidean => DistanceMetric::Euclidean,
944 VectorMetric::DotProduct => DistanceMetric::DotProduct,
945 VectorMetric::Manhattan => DistanceMetric::Manhattan,
946 });
947
948 let k = scan.k.unwrap_or_else(|| {
953 scan.label.as_ref().map_or_else(
954 || self.store.node_count(),
955 |l| self.store.nodes_by_label_count(l),
956 )
957 });
958
959 let index_metric = scan
964 .label
965 .as_ref()
966 .and_then(|label| self.store.vector_index_metric(label, &scan.property));
967 let metric = requested_metric
968 .or(index_metric)
969 .unwrap_or(DistanceMetric::Cosine);
970
971 let mut operator = VectorScanOperator::new(
975 Arc::clone(&self.store),
976 scan.label.clone(),
977 scan.property.clone(),
978 query_vec,
979 k,
980 metric,
981 );
982
983 if let Some(sim) = scan.min_similarity {
984 operator = operator.with_min_similarity(sim);
985 }
986 if let Some(dist) = scan.max_distance {
987 operator = operator.with_max_distance(dist);
988 }
989
990 let mut columns = vec![scan.variable.clone()];
991 let metric_tag = match metric {
995 DistanceMetric::Cosine => "cos",
996 DistanceMetric::Euclidean => "euc",
997 DistanceMetric::DotProduct => "dot",
998 DistanceMetric::Manhattan => "man",
999 _ => "other",
1002 };
1003 columns.push(project::vector_score_column_name(
1004 metric_tag,
1005 &scan.property,
1006 &scan.variable,
1007 &scan.query_vector,
1008 ));
1009
1010 Ok((Box::new(operator), columns))
1011 }
1012
1013 #[cfg(feature = "vector-index")]
1015 pub(super) fn resolve_vector_literal(&self, expr: &LogicalExpression) -> Result<Vec<f32>> {
1016 #[allow(clippy::cast_possible_truncation)]
1018 match expr {
1019 LogicalExpression::Literal(Value::Vector(v)) => Ok(v.to_vec()),
1020 LogicalExpression::Literal(Value::List(list)) => {
1021 let mut vec = Vec::with_capacity(list.len());
1022 for item in list.iter() {
1023 match item {
1024 Value::Float64(f) => vec.push(*f as f32),
1025 Value::Int64(i) => vec.push(*i as f32),
1026 _ => {
1027 return Err(Error::Internal(
1028 "Vector elements must be numeric".to_string(),
1029 ));
1030 }
1031 }
1032 }
1033 Ok(vec)
1034 }
1035 LogicalExpression::List(items) => {
1038 let mut vec = Vec::with_capacity(items.len());
1039 for item in items {
1040 match item {
1041 LogicalExpression::Literal(Value::Float64(f)) => vec.push(*f as f32),
1042 LogicalExpression::Literal(Value::Int64(i)) => vec.push(*i as f32),
1043 _ => {
1044 return Err(Error::Internal(
1045 "Vector elements must be numeric literals".to_string(),
1046 ));
1047 }
1048 }
1049 }
1050 Ok(vec)
1051 }
1052 _ => Err(Error::Internal("Expected vector literal".to_string())),
1053 }
1054 }
1055}
1056
1057#[cfg(feature = "algos")]
1059struct StaticResultOperator {
1060 rows: Vec<Vec<Value>>,
1061 column_indices: Vec<usize>,
1062 row_index: usize,
1063}
1064
1065#[cfg(feature = "algos")]
1066impl Operator for StaticResultOperator {
1067 fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
1068 use grafeo_core::execution::DataChunk;
1069
1070 if self.row_index >= self.rows.len() {
1071 return Ok(None);
1072 }
1073
1074 let remaining = self.rows.len() - self.row_index;
1075 let chunk_rows = remaining.min(1024);
1076 let col_count = self.column_indices.len();
1077
1078 let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
1079 let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
1080
1081 for row_offset in 0..chunk_rows {
1082 let row = &self.rows[self.row_index + row_offset];
1083 for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
1084 let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
1085 if let Some(col) = chunk.column_mut(col_idx) {
1086 col.push_value(value);
1087 }
1088 }
1089 }
1090 chunk.set_count(chunk_rows);
1091
1092 self.row_index += chunk_rows;
1093 Ok(Some(chunk))
1094 }
1095
1096 fn reset(&mut self) {
1097 self.row_index = 0;
1098 }
1099
1100 fn name(&self) -> &'static str {
1101 "StaticResult"
1102 }
1103
1104 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1105 self
1106 }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111 use super::*;
1112 use crate::query::plan::{
1113 AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
1114 DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
1115 LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
1116 SkipOp as LogicalSkipOp, SortKey, SortOp,
1117 };
1118 use grafeo_common::types::Value;
1119 use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
1120 use grafeo_core::graph::GraphStoreMut;
1121 use grafeo_core::graph::lpg::LpgStore;
1122
1123 fn create_test_store() -> Arc<LpgStore> {
1124 let store = Arc::new(LpgStore::new().unwrap());
1125 store.create_node(&["Person"]);
1126 store.create_node(&["Person"]);
1127 store.create_node(&["Company"]);
1128 store
1129 }
1130
1131 #[test]
1134 fn test_plan_simple_scan() {
1135 let store = create_test_store();
1136 let planner = Planner::new(store);
1137
1138 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1140 items: vec![ReturnItem {
1141 expression: LogicalExpression::Variable("n".to_string()),
1142 alias: None,
1143 }],
1144 distinct: false,
1145 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1146 variable: "n".to_string(),
1147 label: Some("Person".to_string()),
1148 input: None,
1149 })),
1150 }));
1151
1152 let physical = planner.plan(&logical).unwrap();
1153 assert_eq!(physical.columns(), &["n"]);
1154 }
1155
1156 #[test]
1157 fn test_plan_scan_without_label() {
1158 let store = create_test_store();
1159 let planner = Planner::new(store);
1160
1161 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1163 items: vec![ReturnItem {
1164 expression: LogicalExpression::Variable("n".to_string()),
1165 alias: None,
1166 }],
1167 distinct: false,
1168 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1169 variable: "n".to_string(),
1170 label: None,
1171 input: None,
1172 })),
1173 }));
1174
1175 let physical = planner.plan(&logical).unwrap();
1176 assert_eq!(physical.columns(), &["n"]);
1177 }
1178
1179 #[test]
1180 fn test_plan_return_with_alias() {
1181 let store = create_test_store();
1182 let planner = Planner::new(store);
1183
1184 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1186 items: vec![ReturnItem {
1187 expression: LogicalExpression::Variable("n".to_string()),
1188 alias: Some("person".to_string()),
1189 }],
1190 distinct: false,
1191 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1192 variable: "n".to_string(),
1193 label: Some("Person".to_string()),
1194 input: None,
1195 })),
1196 }));
1197
1198 let physical = planner.plan(&logical).unwrap();
1199 assert_eq!(physical.columns(), &["person"]);
1200 }
1201
1202 #[test]
1203 fn test_plan_return_property() {
1204 let store = create_test_store();
1205 let planner = Planner::new(store);
1206
1207 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1209 items: vec![ReturnItem {
1210 expression: LogicalExpression::Property {
1211 variable: "n".to_string(),
1212 property: "name".to_string(),
1213 },
1214 alias: None,
1215 }],
1216 distinct: false,
1217 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1218 variable: "n".to_string(),
1219 label: Some("Person".to_string()),
1220 input: None,
1221 })),
1222 }));
1223
1224 let physical = planner.plan(&logical).unwrap();
1225 assert_eq!(physical.columns(), &["n.name"]);
1226 }
1227
1228 #[test]
1229 fn test_plan_return_literal() {
1230 let store = create_test_store();
1231 let planner = Planner::new(store);
1232
1233 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1235 items: vec![ReturnItem {
1236 expression: LogicalExpression::Literal(Value::Int64(42)),
1237 alias: Some("answer".to_string()),
1238 }],
1239 distinct: false,
1240 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1241 variable: "n".to_string(),
1242 label: None,
1243 input: None,
1244 })),
1245 }));
1246
1247 let physical = planner.plan(&logical).unwrap();
1248 assert_eq!(physical.columns(), &["answer"]);
1249 }
1250
1251 #[test]
1254 fn test_plan_filter_equality() {
1255 let store = create_test_store();
1256 let planner = Planner::new(store);
1257
1258 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1260 items: vec![ReturnItem {
1261 expression: LogicalExpression::Variable("n".to_string()),
1262 alias: None,
1263 }],
1264 distinct: false,
1265 input: Box::new(LogicalOperator::Filter(FilterOp {
1266 predicate: LogicalExpression::Binary {
1267 left: Box::new(LogicalExpression::Property {
1268 variable: "n".to_string(),
1269 property: "age".to_string(),
1270 }),
1271 op: BinaryOp::Eq,
1272 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1273 },
1274 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1275 variable: "n".to_string(),
1276 label: Some("Person".to_string()),
1277 input: None,
1278 })),
1279 pushdown_hint: None,
1280 })),
1281 }));
1282
1283 let physical = planner.plan(&logical).unwrap();
1284 assert_eq!(physical.columns(), &["n"]);
1285 }
1286
1287 #[test]
1288 fn test_plan_filter_compound_and() {
1289 let store = create_test_store();
1290 let planner = Planner::new(store);
1291
1292 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1294 items: vec![ReturnItem {
1295 expression: LogicalExpression::Variable("n".to_string()),
1296 alias: None,
1297 }],
1298 distinct: false,
1299 input: Box::new(LogicalOperator::Filter(FilterOp {
1300 predicate: LogicalExpression::Binary {
1301 left: Box::new(LogicalExpression::Binary {
1302 left: Box::new(LogicalExpression::Property {
1303 variable: "n".to_string(),
1304 property: "age".to_string(),
1305 }),
1306 op: BinaryOp::Gt,
1307 right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1308 }),
1309 op: BinaryOp::And,
1310 right: Box::new(LogicalExpression::Binary {
1311 left: Box::new(LogicalExpression::Property {
1312 variable: "n".to_string(),
1313 property: "age".to_string(),
1314 }),
1315 op: BinaryOp::Lt,
1316 right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1317 }),
1318 },
1319 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1320 variable: "n".to_string(),
1321 label: None,
1322 input: None,
1323 })),
1324 pushdown_hint: None,
1325 })),
1326 }));
1327
1328 let physical = planner.plan(&logical).unwrap();
1329 assert_eq!(physical.columns(), &["n"]);
1330 }
1331
1332 #[test]
1333 fn test_plan_filter_unary_not() {
1334 let store = create_test_store();
1335 let planner = Planner::new(store);
1336
1337 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1339 items: vec![ReturnItem {
1340 expression: LogicalExpression::Variable("n".to_string()),
1341 alias: None,
1342 }],
1343 distinct: false,
1344 input: Box::new(LogicalOperator::Filter(FilterOp {
1345 predicate: LogicalExpression::Unary {
1346 op: UnaryOp::Not,
1347 operand: Box::new(LogicalExpression::Property {
1348 variable: "n".to_string(),
1349 property: "active".to_string(),
1350 }),
1351 },
1352 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1353 variable: "n".to_string(),
1354 label: None,
1355 input: None,
1356 })),
1357 pushdown_hint: None,
1358 })),
1359 }));
1360
1361 let physical = planner.plan(&logical).unwrap();
1362 assert_eq!(physical.columns(), &["n"]);
1363 }
1364
1365 #[test]
1366 fn test_plan_filter_is_null() {
1367 let store = create_test_store();
1368 let planner = Planner::new(store);
1369
1370 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1372 items: vec![ReturnItem {
1373 expression: LogicalExpression::Variable("n".to_string()),
1374 alias: None,
1375 }],
1376 distinct: false,
1377 input: Box::new(LogicalOperator::Filter(FilterOp {
1378 predicate: LogicalExpression::Unary {
1379 op: UnaryOp::IsNull,
1380 operand: Box::new(LogicalExpression::Property {
1381 variable: "n".to_string(),
1382 property: "email".to_string(),
1383 }),
1384 },
1385 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1386 variable: "n".to_string(),
1387 label: None,
1388 input: None,
1389 })),
1390 pushdown_hint: None,
1391 })),
1392 }));
1393
1394 let physical = planner.plan(&logical).unwrap();
1395 assert_eq!(physical.columns(), &["n"]);
1396 }
1397
1398 #[test]
1399 fn test_plan_filter_function_call() {
1400 let store = create_test_store();
1401 let planner = Planner::new(store);
1402
1403 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1405 items: vec![ReturnItem {
1406 expression: LogicalExpression::Variable("n".to_string()),
1407 alias: None,
1408 }],
1409 distinct: false,
1410 input: Box::new(LogicalOperator::Filter(FilterOp {
1411 predicate: LogicalExpression::Binary {
1412 left: Box::new(LogicalExpression::FunctionCall {
1413 name: "size".to_string(),
1414 args: vec![LogicalExpression::Property {
1415 variable: "n".to_string(),
1416 property: "friends".to_string(),
1417 }],
1418 distinct: false,
1419 }),
1420 op: BinaryOp::Gt,
1421 right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1422 },
1423 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1424 variable: "n".to_string(),
1425 label: None,
1426 input: None,
1427 })),
1428 pushdown_hint: None,
1429 })),
1430 }));
1431
1432 let physical = planner.plan(&logical).unwrap();
1433 assert_eq!(physical.columns(), &["n"]);
1434 }
1435
1436 #[test]
1439 fn test_plan_expand_outgoing() {
1440 let store = create_test_store();
1441 let planner = Planner::new(store);
1442
1443 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1445 items: vec![
1446 ReturnItem {
1447 expression: LogicalExpression::Variable("a".to_string()),
1448 alias: None,
1449 },
1450 ReturnItem {
1451 expression: LogicalExpression::Variable("b".to_string()),
1452 alias: None,
1453 },
1454 ],
1455 distinct: false,
1456 input: Box::new(LogicalOperator::Expand(ExpandOp {
1457 from_variable: "a".to_string(),
1458 to_variable: "b".to_string(),
1459 edge_variable: None,
1460 direction: ExpandDirection::Outgoing,
1461 edge_types: vec!["KNOWS".to_string()],
1462 min_hops: 1,
1463 max_hops: Some(1),
1464 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1465 variable: "a".to_string(),
1466 label: Some("Person".to_string()),
1467 input: None,
1468 })),
1469 path_alias: None,
1470 path_mode: PathMode::Walk,
1471 })),
1472 }));
1473
1474 let physical = planner.plan(&logical).unwrap();
1475 assert!(physical.columns().contains(&"a".to_string()));
1477 assert!(physical.columns().contains(&"b".to_string()));
1478 }
1479
1480 #[test]
1481 fn test_plan_expand_with_edge_variable() {
1482 let store = create_test_store();
1483 let planner = Planner::new(store);
1484
1485 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1487 items: vec![
1488 ReturnItem {
1489 expression: LogicalExpression::Variable("a".to_string()),
1490 alias: None,
1491 },
1492 ReturnItem {
1493 expression: LogicalExpression::Variable("r".to_string()),
1494 alias: None,
1495 },
1496 ReturnItem {
1497 expression: LogicalExpression::Variable("b".to_string()),
1498 alias: None,
1499 },
1500 ],
1501 distinct: false,
1502 input: Box::new(LogicalOperator::Expand(ExpandOp {
1503 from_variable: "a".to_string(),
1504 to_variable: "b".to_string(),
1505 edge_variable: Some("r".to_string()),
1506 direction: ExpandDirection::Outgoing,
1507 edge_types: vec!["KNOWS".to_string()],
1508 min_hops: 1,
1509 max_hops: Some(1),
1510 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1511 variable: "a".to_string(),
1512 label: None,
1513 input: None,
1514 })),
1515 path_alias: None,
1516 path_mode: PathMode::Walk,
1517 })),
1518 }));
1519
1520 let physical = planner.plan(&logical).unwrap();
1521 assert!(physical.columns().contains(&"a".to_string()));
1522 assert!(physical.columns().contains(&"r".to_string()));
1523 assert!(physical.columns().contains(&"b".to_string()));
1524 }
1525
1526 #[test]
1529 fn test_plan_limit() {
1530 let store = create_test_store();
1531 let planner = Planner::new(store);
1532
1533 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1535 items: vec![ReturnItem {
1536 expression: LogicalExpression::Variable("n".to_string()),
1537 alias: None,
1538 }],
1539 distinct: false,
1540 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1541 count: 10.into(),
1542 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1543 variable: "n".to_string(),
1544 label: None,
1545 input: None,
1546 })),
1547 })),
1548 }));
1549
1550 let physical = planner.plan(&logical).unwrap();
1551 assert_eq!(physical.columns(), &["n"]);
1552 }
1553
1554 #[test]
1555 fn test_plan_skip() {
1556 let store = create_test_store();
1557 let planner = Planner::new(store);
1558
1559 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1561 items: vec![ReturnItem {
1562 expression: LogicalExpression::Variable("n".to_string()),
1563 alias: None,
1564 }],
1565 distinct: false,
1566 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1567 count: 5.into(),
1568 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1569 variable: "n".to_string(),
1570 label: None,
1571 input: None,
1572 })),
1573 })),
1574 }));
1575
1576 let physical = planner.plan(&logical).unwrap();
1577 assert_eq!(physical.columns(), &["n"]);
1578 }
1579
1580 #[test]
1581 fn test_plan_sort() {
1582 let store = create_test_store();
1583 let planner = Planner::new(store);
1584
1585 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1587 items: vec![ReturnItem {
1588 expression: LogicalExpression::Variable("n".to_string()),
1589 alias: None,
1590 }],
1591 distinct: false,
1592 input: Box::new(LogicalOperator::Sort(SortOp {
1593 keys: vec![SortKey {
1594 expression: LogicalExpression::Variable("n".to_string()),
1595 order: SortOrder::Ascending,
1596 nulls: None,
1597 }],
1598 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1599 variable: "n".to_string(),
1600 label: None,
1601 input: None,
1602 })),
1603 })),
1604 }));
1605
1606 let physical = planner.plan(&logical).unwrap();
1607 assert_eq!(physical.columns(), &["n"]);
1608 }
1609
1610 #[test]
1611 fn test_plan_sort_descending() {
1612 let store = create_test_store();
1613 let planner = Planner::new(store);
1614
1615 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1617 items: vec![ReturnItem {
1618 expression: LogicalExpression::Variable("n".to_string()),
1619 alias: None,
1620 }],
1621 distinct: false,
1622 input: Box::new(LogicalOperator::Sort(SortOp {
1623 keys: vec![SortKey {
1624 expression: LogicalExpression::Variable("n".to_string()),
1625 order: SortOrder::Descending,
1626 nulls: None,
1627 }],
1628 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1629 variable: "n".to_string(),
1630 label: None,
1631 input: None,
1632 })),
1633 })),
1634 }));
1635
1636 let physical = planner.plan(&logical).unwrap();
1637 assert_eq!(physical.columns(), &["n"]);
1638 }
1639
1640 #[test]
1641 fn test_plan_distinct() {
1642 let store = create_test_store();
1643 let planner = Planner::new(store);
1644
1645 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1647 items: vec![ReturnItem {
1648 expression: LogicalExpression::Variable("n".to_string()),
1649 alias: None,
1650 }],
1651 distinct: false,
1652 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1653 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1654 variable: "n".to_string(),
1655 label: None,
1656 input: None,
1657 })),
1658 columns: None,
1659 })),
1660 }));
1661
1662 let physical = planner.plan(&logical).unwrap();
1663 assert_eq!(physical.columns(), &["n"]);
1664 }
1665
1666 #[test]
1667 fn test_plan_distinct_with_columns() {
1668 let store = create_test_store();
1669 let planner = Planner::new(store);
1670
1671 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1673 items: vec![ReturnItem {
1674 expression: LogicalExpression::Variable("n".to_string()),
1675 alias: None,
1676 }],
1677 distinct: false,
1678 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1679 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1680 variable: "n".to_string(),
1681 label: None,
1682 input: None,
1683 })),
1684 columns: Some(vec!["n".to_string()]),
1685 })),
1686 }));
1687
1688 let physical = planner.plan(&logical).unwrap();
1689 assert_eq!(physical.columns(), &["n"]);
1690 }
1691
1692 #[test]
1693 fn test_plan_distinct_with_nonexistent_columns() {
1694 let store = create_test_store();
1695 let planner = Planner::new(store);
1696
1697 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1700 items: vec![ReturnItem {
1701 expression: LogicalExpression::Variable("n".to_string()),
1702 alias: None,
1703 }],
1704 distinct: false,
1705 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1706 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1707 variable: "n".to_string(),
1708 label: None,
1709 input: None,
1710 })),
1711 columns: Some(vec!["nonexistent".to_string()]),
1712 })),
1713 }));
1714
1715 let physical = planner.plan(&logical).unwrap();
1716 assert_eq!(physical.columns(), &["n"]);
1717 }
1718
1719 #[test]
1722 fn test_plan_aggregate_count() {
1723 let store = create_test_store();
1724 let planner = Planner::new(store);
1725
1726 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1728 items: vec![ReturnItem {
1729 expression: LogicalExpression::Variable("cnt".to_string()),
1730 alias: None,
1731 }],
1732 distinct: false,
1733 input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1734 group_by: vec![],
1735 aggregates: vec![LogicalAggregateExpr {
1736 function: LogicalAggregateFunction::Count,
1737 expression: Some(LogicalExpression::Variable("n".to_string())),
1738 expression2: None,
1739 distinct: false,
1740 alias: Some("cnt".to_string()),
1741 percentile: None,
1742 separator: None,
1743 }],
1744 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1745 variable: "n".to_string(),
1746 label: None,
1747 input: None,
1748 })),
1749 having: None,
1750 })),
1751 }));
1752
1753 let physical = planner.plan(&logical).unwrap();
1754 assert!(physical.columns().contains(&"cnt".to_string()));
1755 }
1756
1757 #[test]
1758 fn test_plan_aggregate_with_group_by() {
1759 let store = create_test_store();
1760 let planner = Planner::new(store);
1761
1762 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1764 group_by: vec![LogicalExpression::Property {
1765 variable: "n".to_string(),
1766 property: "city".to_string(),
1767 }],
1768 aggregates: vec![LogicalAggregateExpr {
1769 function: LogicalAggregateFunction::Count,
1770 expression: Some(LogicalExpression::Variable("n".to_string())),
1771 expression2: None,
1772 distinct: false,
1773 alias: Some("cnt".to_string()),
1774 percentile: None,
1775 separator: None,
1776 }],
1777 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1778 variable: "n".to_string(),
1779 label: Some("Person".to_string()),
1780 input: None,
1781 })),
1782 having: None,
1783 }));
1784
1785 let physical = planner.plan(&logical).unwrap();
1786 assert_eq!(physical.columns().len(), 2);
1787 }
1788
1789 #[test]
1790 fn test_plan_aggregate_sum() {
1791 let store = create_test_store();
1792 let planner = Planner::new(store);
1793
1794 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1796 group_by: vec![],
1797 aggregates: vec![LogicalAggregateExpr {
1798 function: LogicalAggregateFunction::Sum,
1799 expression: Some(LogicalExpression::Property {
1800 variable: "n".to_string(),
1801 property: "value".to_string(),
1802 }),
1803 expression2: None,
1804 distinct: false,
1805 alias: Some("total".to_string()),
1806 percentile: None,
1807 separator: None,
1808 }],
1809 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1810 variable: "n".to_string(),
1811 label: None,
1812 input: None,
1813 })),
1814 having: None,
1815 }));
1816
1817 let physical = planner.plan(&logical).unwrap();
1818 assert!(physical.columns().contains(&"total".to_string()));
1819 }
1820
1821 #[test]
1822 fn test_plan_aggregate_avg() {
1823 let store = create_test_store();
1824 let planner = Planner::new(store);
1825
1826 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1828 group_by: vec![],
1829 aggregates: vec![LogicalAggregateExpr {
1830 function: LogicalAggregateFunction::Avg,
1831 expression: Some(LogicalExpression::Property {
1832 variable: "n".to_string(),
1833 property: "score".to_string(),
1834 }),
1835 expression2: None,
1836 distinct: false,
1837 alias: Some("average".to_string()),
1838 percentile: None,
1839 separator: None,
1840 }],
1841 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1842 variable: "n".to_string(),
1843 label: None,
1844 input: None,
1845 })),
1846 having: None,
1847 }));
1848
1849 let physical = planner.plan(&logical).unwrap();
1850 assert!(physical.columns().contains(&"average".to_string()));
1851 }
1852
1853 #[test]
1854 fn test_plan_aggregate_min_max() {
1855 let store = create_test_store();
1856 let planner = Planner::new(store);
1857
1858 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1860 group_by: vec![],
1861 aggregates: vec![
1862 LogicalAggregateExpr {
1863 function: LogicalAggregateFunction::Min,
1864 expression: Some(LogicalExpression::Property {
1865 variable: "n".to_string(),
1866 property: "age".to_string(),
1867 }),
1868 expression2: None,
1869 distinct: false,
1870 alias: Some("youngest".to_string()),
1871 percentile: None,
1872 separator: None,
1873 },
1874 LogicalAggregateExpr {
1875 function: LogicalAggregateFunction::Max,
1876 expression: Some(LogicalExpression::Property {
1877 variable: "n".to_string(),
1878 property: "age".to_string(),
1879 }),
1880 expression2: None,
1881 distinct: false,
1882 alias: Some("oldest".to_string()),
1883 percentile: None,
1884 separator: None,
1885 },
1886 ],
1887 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1888 variable: "n".to_string(),
1889 label: None,
1890 input: None,
1891 })),
1892 having: None,
1893 }));
1894
1895 let physical = planner.plan(&logical).unwrap();
1896 assert!(physical.columns().contains(&"youngest".to_string()));
1897 assert!(physical.columns().contains(&"oldest".to_string()));
1898 }
1899
1900 #[test]
1903 fn test_plan_inner_join() {
1904 let store = create_test_store();
1905 let planner = Planner::new(store);
1906
1907 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1909 items: vec![
1910 ReturnItem {
1911 expression: LogicalExpression::Variable("a".to_string()),
1912 alias: None,
1913 },
1914 ReturnItem {
1915 expression: LogicalExpression::Variable("b".to_string()),
1916 alias: None,
1917 },
1918 ],
1919 distinct: false,
1920 input: Box::new(LogicalOperator::Join(JoinOp {
1921 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1922 variable: "a".to_string(),
1923 label: Some("Person".to_string()),
1924 input: None,
1925 })),
1926 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1927 variable: "b".to_string(),
1928 label: Some("Company".to_string()),
1929 input: None,
1930 })),
1931 join_type: JoinType::Inner,
1932 conditions: vec![JoinCondition {
1933 left: LogicalExpression::Variable("a".to_string()),
1934 right: LogicalExpression::Variable("b".to_string()),
1935 }],
1936 })),
1937 }));
1938
1939 let physical = planner.plan(&logical).unwrap();
1940 assert!(physical.columns().contains(&"a".to_string()));
1941 assert!(physical.columns().contains(&"b".to_string()));
1942 }
1943
1944 #[test]
1945 fn test_plan_cross_join() {
1946 let store = create_test_store();
1947 let planner = Planner::new(store);
1948
1949 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1951 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1952 variable: "a".to_string(),
1953 label: None,
1954 input: None,
1955 })),
1956 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1957 variable: "b".to_string(),
1958 label: None,
1959 input: None,
1960 })),
1961 join_type: JoinType::Cross,
1962 conditions: vec![],
1963 }));
1964
1965 let physical = planner.plan(&logical).unwrap();
1966 assert_eq!(physical.columns().len(), 2);
1967 }
1968
1969 #[test]
1970 fn test_plan_left_join() {
1971 let store = create_test_store();
1972 let planner = Planner::new(store);
1973
1974 let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1975 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1976 variable: "a".to_string(),
1977 label: None,
1978 input: None,
1979 })),
1980 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1981 variable: "b".to_string(),
1982 label: None,
1983 input: None,
1984 })),
1985 join_type: JoinType::Left,
1986 conditions: vec![],
1987 }));
1988
1989 let physical = planner.plan(&logical).unwrap();
1990 assert_eq!(physical.columns().len(), 2);
1991 }
1992
1993 fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1996 let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStoreSearch>);
1997 p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1998 p
1999 }
2000
2001 #[test]
2002 fn test_plan_create_node() {
2003 let store = create_test_store();
2004 let planner = create_writable_planner(&store);
2005
2006 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2008 variable: "n".to_string(),
2009 labels: vec!["Person".to_string()],
2010 properties: vec![(
2011 "name".to_string(),
2012 LogicalExpression::Literal(Value::String("Alix".into())),
2013 )],
2014 input: None,
2015 }));
2016
2017 let physical = planner.plan(&logical).unwrap();
2018 assert!(physical.columns().contains(&"n".to_string()));
2019 }
2020
2021 #[test]
2022 fn test_plan_create_edge() {
2023 let store = create_test_store();
2024 let planner = create_writable_planner(&store);
2025
2026 let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
2028 variable: Some("r".to_string()),
2029 from_variable: "a".to_string(),
2030 to_variable: "b".to_string(),
2031 edge_type: "KNOWS".to_string(),
2032 properties: vec![],
2033 input: Box::new(LogicalOperator::Join(JoinOp {
2034 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2035 variable: "a".to_string(),
2036 label: None,
2037 input: None,
2038 })),
2039 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2040 variable: "b".to_string(),
2041 label: None,
2042 input: None,
2043 })),
2044 join_type: JoinType::Cross,
2045 conditions: vec![],
2046 })),
2047 }));
2048
2049 let physical = planner.plan(&logical).unwrap();
2050 assert!(physical.columns().contains(&"r".to_string()));
2051 }
2052
2053 #[test]
2054 fn test_plan_delete_node() {
2055 let store = create_test_store();
2056 let planner = create_writable_planner(&store);
2057
2058 let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
2060 variable: "n".to_string(),
2061 detach: false,
2062 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2063 variable: "n".to_string(),
2064 label: None,
2065 input: None,
2066 })),
2067 }));
2068
2069 let physical = planner.plan(&logical).unwrap();
2070 assert!(physical.columns().contains(&"n".to_string()));
2071 }
2072
2073 #[test]
2076 fn test_plan_empty_errors() {
2077 let store = create_test_store();
2078 let planner = Planner::new(store);
2079
2080 let logical = LogicalPlan::new(LogicalOperator::Empty);
2081 let result = planner.plan(&logical);
2082 assert!(result.is_err());
2083 }
2084
2085 #[test]
2086 fn test_plan_missing_variable_in_return() {
2087 let store = create_test_store();
2088 let planner = Planner::new(store);
2089
2090 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2092 items: vec![ReturnItem {
2093 expression: LogicalExpression::Variable("missing".to_string()),
2094 alias: None,
2095 }],
2096 distinct: false,
2097 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2098 variable: "n".to_string(),
2099 label: None,
2100 input: None,
2101 })),
2102 }));
2103
2104 let result = planner.plan(&logical);
2105 assert!(result.is_err());
2106 }
2107
2108 #[test]
2111 fn test_convert_binary_ops() {
2112 assert!(convert_binary_op(BinaryOp::Eq).is_ok());
2113 assert!(convert_binary_op(BinaryOp::Ne).is_ok());
2114 assert!(convert_binary_op(BinaryOp::Lt).is_ok());
2115 assert!(convert_binary_op(BinaryOp::Le).is_ok());
2116 assert!(convert_binary_op(BinaryOp::Gt).is_ok());
2117 assert!(convert_binary_op(BinaryOp::Ge).is_ok());
2118 assert!(convert_binary_op(BinaryOp::And).is_ok());
2119 assert!(convert_binary_op(BinaryOp::Or).is_ok());
2120 assert!(convert_binary_op(BinaryOp::Add).is_ok());
2121 assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2122 assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2123 assert!(convert_binary_op(BinaryOp::Div).is_ok());
2124 }
2125
2126 #[test]
2127 fn test_convert_unary_ops() {
2128 assert!(convert_unary_op(UnaryOp::Not).is_ok());
2129 assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2130 assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2131 assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2132 }
2133
2134 #[test]
2135 fn test_convert_aggregate_functions() {
2136 assert!(matches!(
2137 convert_aggregate_function(LogicalAggregateFunction::Count),
2138 PhysicalAggregateFunction::Count
2139 ));
2140 assert!(matches!(
2141 convert_aggregate_function(LogicalAggregateFunction::Sum),
2142 PhysicalAggregateFunction::Sum
2143 ));
2144 assert!(matches!(
2145 convert_aggregate_function(LogicalAggregateFunction::Avg),
2146 PhysicalAggregateFunction::Avg
2147 ));
2148 assert!(matches!(
2149 convert_aggregate_function(LogicalAggregateFunction::Min),
2150 PhysicalAggregateFunction::Min
2151 ));
2152 assert!(matches!(
2153 convert_aggregate_function(LogicalAggregateFunction::Max),
2154 PhysicalAggregateFunction::Max
2155 ));
2156 }
2157
2158 #[test]
2159 fn test_planner_accessors() {
2160 let store = create_test_store();
2161 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2162
2163 assert!(planner.transaction_id().is_none());
2164 assert!(planner.transaction_manager().is_none());
2165 let _ = planner.viewing_epoch(); }
2167
2168 #[test]
2169 fn test_physical_plan_accessors() {
2170 let store = create_test_store();
2171 let planner = Planner::new(store);
2172
2173 let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2174 variable: "n".to_string(),
2175 label: None,
2176 input: None,
2177 }));
2178
2179 let physical = planner.plan(&logical).unwrap();
2180 assert_eq!(physical.columns(), &["n"]);
2181
2182 let _ = physical.into_operator();
2184 }
2185
2186 #[test]
2189 fn test_plan_adaptive_with_scan() {
2190 let store = create_test_store();
2191 let planner = Planner::new(store);
2192
2193 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2195 items: vec![ReturnItem {
2196 expression: LogicalExpression::Variable("n".to_string()),
2197 alias: None,
2198 }],
2199 distinct: false,
2200 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2201 variable: "n".to_string(),
2202 label: Some("Person".to_string()),
2203 input: None,
2204 })),
2205 }));
2206
2207 let physical = planner.plan_adaptive(&logical).unwrap();
2208 assert_eq!(physical.columns(), &["n"]);
2209 assert!(physical.adaptive_context.is_some());
2211 }
2212
2213 #[test]
2214 fn test_plan_adaptive_with_filter() {
2215 let store = create_test_store();
2216 let planner = Planner::new(store);
2217
2218 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2220 items: vec![ReturnItem {
2221 expression: LogicalExpression::Variable("n".to_string()),
2222 alias: None,
2223 }],
2224 distinct: false,
2225 input: Box::new(LogicalOperator::Filter(FilterOp {
2226 predicate: LogicalExpression::Binary {
2227 left: Box::new(LogicalExpression::Property {
2228 variable: "n".to_string(),
2229 property: "age".to_string(),
2230 }),
2231 op: BinaryOp::Gt,
2232 right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2233 },
2234 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2235 variable: "n".to_string(),
2236 label: None,
2237 input: None,
2238 })),
2239 pushdown_hint: None,
2240 })),
2241 }));
2242
2243 let physical = planner.plan_adaptive(&logical).unwrap();
2244 assert!(physical.adaptive_context.is_some());
2245 }
2246
2247 #[test]
2248 fn test_plan_adaptive_with_expand() {
2249 let store = create_test_store();
2250 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2251 .with_factorized_execution(false);
2252
2253 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2255 items: vec![
2256 ReturnItem {
2257 expression: LogicalExpression::Variable("a".to_string()),
2258 alias: None,
2259 },
2260 ReturnItem {
2261 expression: LogicalExpression::Variable("b".to_string()),
2262 alias: None,
2263 },
2264 ],
2265 distinct: false,
2266 input: Box::new(LogicalOperator::Expand(ExpandOp {
2267 from_variable: "a".to_string(),
2268 to_variable: "b".to_string(),
2269 edge_variable: None,
2270 direction: ExpandDirection::Outgoing,
2271 edge_types: vec!["KNOWS".to_string()],
2272 min_hops: 1,
2273 max_hops: Some(1),
2274 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2275 variable: "a".to_string(),
2276 label: None,
2277 input: None,
2278 })),
2279 path_alias: None,
2280 path_mode: PathMode::Walk,
2281 })),
2282 }));
2283
2284 let physical = planner.plan_adaptive(&logical).unwrap();
2285 assert!(physical.adaptive_context.is_some());
2286 }
2287
2288 #[test]
2289 fn test_plan_adaptive_with_join() {
2290 let store = create_test_store();
2291 let planner = Planner::new(store);
2292
2293 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2294 items: vec![
2295 ReturnItem {
2296 expression: LogicalExpression::Variable("a".to_string()),
2297 alias: None,
2298 },
2299 ReturnItem {
2300 expression: LogicalExpression::Variable("b".to_string()),
2301 alias: None,
2302 },
2303 ],
2304 distinct: false,
2305 input: Box::new(LogicalOperator::Join(JoinOp {
2306 left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2307 variable: "a".to_string(),
2308 label: None,
2309 input: None,
2310 })),
2311 right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2312 variable: "b".to_string(),
2313 label: None,
2314 input: None,
2315 })),
2316 join_type: JoinType::Cross,
2317 conditions: vec![],
2318 })),
2319 }));
2320
2321 let physical = planner.plan_adaptive(&logical).unwrap();
2322 assert!(physical.adaptive_context.is_some());
2323 }
2324
2325 #[test]
2326 fn test_plan_adaptive_with_aggregate() {
2327 let store = create_test_store();
2328 let planner = Planner::new(store);
2329
2330 let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2331 group_by: vec![],
2332 aggregates: vec![LogicalAggregateExpr {
2333 function: LogicalAggregateFunction::Count,
2334 expression: Some(LogicalExpression::Variable("n".to_string())),
2335 expression2: None,
2336 distinct: false,
2337 alias: Some("cnt".to_string()),
2338 percentile: None,
2339 separator: None,
2340 }],
2341 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2342 variable: "n".to_string(),
2343 label: None,
2344 input: None,
2345 })),
2346 having: None,
2347 }));
2348
2349 let physical = planner.plan_adaptive(&logical).unwrap();
2350 assert!(physical.adaptive_context.is_some());
2351 }
2352
2353 #[test]
2354 fn test_plan_adaptive_with_distinct() {
2355 let store = create_test_store();
2356 let planner = Planner::new(store);
2357
2358 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2359 items: vec![ReturnItem {
2360 expression: LogicalExpression::Variable("n".to_string()),
2361 alias: None,
2362 }],
2363 distinct: false,
2364 input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2365 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2366 variable: "n".to_string(),
2367 label: None,
2368 input: None,
2369 })),
2370 columns: None,
2371 })),
2372 }));
2373
2374 let physical = planner.plan_adaptive(&logical).unwrap();
2375 assert!(physical.adaptive_context.is_some());
2376 }
2377
2378 #[test]
2379 fn test_plan_adaptive_with_limit() {
2380 let store = create_test_store();
2381 let planner = Planner::new(store);
2382
2383 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2384 items: vec![ReturnItem {
2385 expression: LogicalExpression::Variable("n".to_string()),
2386 alias: None,
2387 }],
2388 distinct: false,
2389 input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2390 count: 10.into(),
2391 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2392 variable: "n".to_string(),
2393 label: None,
2394 input: None,
2395 })),
2396 })),
2397 }));
2398
2399 let physical = planner.plan_adaptive(&logical).unwrap();
2400 assert!(physical.adaptive_context.is_some());
2401 }
2402
2403 #[test]
2404 fn test_plan_adaptive_with_skip() {
2405 let store = create_test_store();
2406 let planner = Planner::new(store);
2407
2408 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2409 items: vec![ReturnItem {
2410 expression: LogicalExpression::Variable("n".to_string()),
2411 alias: None,
2412 }],
2413 distinct: false,
2414 input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2415 count: 5.into(),
2416 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2417 variable: "n".to_string(),
2418 label: None,
2419 input: None,
2420 })),
2421 })),
2422 }));
2423
2424 let physical = planner.plan_adaptive(&logical).unwrap();
2425 assert!(physical.adaptive_context.is_some());
2426 }
2427
2428 #[test]
2429 fn test_plan_adaptive_with_sort() {
2430 let store = create_test_store();
2431 let planner = Planner::new(store);
2432
2433 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2434 items: vec![ReturnItem {
2435 expression: LogicalExpression::Variable("n".to_string()),
2436 alias: None,
2437 }],
2438 distinct: false,
2439 input: Box::new(LogicalOperator::Sort(SortOp {
2440 keys: vec![SortKey {
2441 expression: LogicalExpression::Variable("n".to_string()),
2442 order: SortOrder::Ascending,
2443 nulls: None,
2444 }],
2445 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2446 variable: "n".to_string(),
2447 label: None,
2448 input: None,
2449 })),
2450 })),
2451 }));
2452
2453 let physical = planner.plan_adaptive(&logical).unwrap();
2454 assert!(physical.adaptive_context.is_some());
2455 }
2456
2457 #[test]
2458 fn test_plan_adaptive_with_union() {
2459 let store = create_test_store();
2460 let planner = Planner::new(store);
2461
2462 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2463 items: vec![ReturnItem {
2464 expression: LogicalExpression::Variable("n".to_string()),
2465 alias: None,
2466 }],
2467 distinct: false,
2468 input: Box::new(LogicalOperator::Union(UnionOp {
2469 inputs: vec![
2470 LogicalOperator::NodeScan(NodeScanOp {
2471 variable: "n".to_string(),
2472 label: Some("Person".to_string()),
2473 input: None,
2474 }),
2475 LogicalOperator::NodeScan(NodeScanOp {
2476 variable: "n".to_string(),
2477 label: Some("Company".to_string()),
2478 input: None,
2479 }),
2480 ],
2481 })),
2482 }));
2483
2484 let physical = planner.plan_adaptive(&logical).unwrap();
2485 assert!(physical.adaptive_context.is_some());
2486 }
2487
2488 #[test]
2491 fn test_plan_expand_variable_length() {
2492 let store = create_test_store();
2493 let planner = Planner::new(store);
2494
2495 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2497 items: vec![
2498 ReturnItem {
2499 expression: LogicalExpression::Variable("a".to_string()),
2500 alias: None,
2501 },
2502 ReturnItem {
2503 expression: LogicalExpression::Variable("b".to_string()),
2504 alias: None,
2505 },
2506 ],
2507 distinct: false,
2508 input: Box::new(LogicalOperator::Expand(ExpandOp {
2509 from_variable: "a".to_string(),
2510 to_variable: "b".to_string(),
2511 edge_variable: None,
2512 direction: ExpandDirection::Outgoing,
2513 edge_types: vec!["KNOWS".to_string()],
2514 min_hops: 1,
2515 max_hops: Some(3),
2516 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2517 variable: "a".to_string(),
2518 label: None,
2519 input: None,
2520 })),
2521 path_alias: None,
2522 path_mode: PathMode::Walk,
2523 })),
2524 }));
2525
2526 let physical = planner.plan(&logical).unwrap();
2527 assert!(physical.columns().contains(&"a".to_string()));
2528 assert!(physical.columns().contains(&"b".to_string()));
2529 }
2530
2531 #[test]
2532 fn test_plan_expand_with_path_alias() {
2533 let store = create_test_store();
2534 let planner = Planner::new(store);
2535
2536 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2538 items: vec![
2539 ReturnItem {
2540 expression: LogicalExpression::Variable("a".to_string()),
2541 alias: None,
2542 },
2543 ReturnItem {
2544 expression: LogicalExpression::Variable("b".to_string()),
2545 alias: None,
2546 },
2547 ],
2548 distinct: false,
2549 input: Box::new(LogicalOperator::Expand(ExpandOp {
2550 from_variable: "a".to_string(),
2551 to_variable: "b".to_string(),
2552 edge_variable: None,
2553 direction: ExpandDirection::Outgoing,
2554 edge_types: vec!["KNOWS".to_string()],
2555 min_hops: 1,
2556 max_hops: Some(3),
2557 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2558 variable: "a".to_string(),
2559 label: None,
2560 input: None,
2561 })),
2562 path_alias: Some("p".to_string()),
2563 path_mode: PathMode::Walk,
2564 })),
2565 }));
2566
2567 let physical = planner.plan(&logical).unwrap();
2568 assert!(physical.columns().contains(&"a".to_string()));
2570 assert!(physical.columns().contains(&"b".to_string()));
2571 }
2572
2573 #[test]
2574 fn test_plan_expand_incoming() {
2575 let store = create_test_store();
2576 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2577 .with_factorized_execution(false);
2578
2579 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2581 items: vec![
2582 ReturnItem {
2583 expression: LogicalExpression::Variable("a".to_string()),
2584 alias: None,
2585 },
2586 ReturnItem {
2587 expression: LogicalExpression::Variable("b".to_string()),
2588 alias: None,
2589 },
2590 ],
2591 distinct: false,
2592 input: Box::new(LogicalOperator::Expand(ExpandOp {
2593 from_variable: "a".to_string(),
2594 to_variable: "b".to_string(),
2595 edge_variable: None,
2596 direction: ExpandDirection::Incoming,
2597 edge_types: vec!["KNOWS".to_string()],
2598 min_hops: 1,
2599 max_hops: Some(1),
2600 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2601 variable: "a".to_string(),
2602 label: None,
2603 input: None,
2604 })),
2605 path_alias: None,
2606 path_mode: PathMode::Walk,
2607 })),
2608 }));
2609
2610 let physical = planner.plan(&logical).unwrap();
2611 assert!(physical.columns().contains(&"a".to_string()));
2612 assert!(physical.columns().contains(&"b".to_string()));
2613 }
2614
2615 #[test]
2616 fn test_plan_expand_both_directions() {
2617 let store = create_test_store();
2618 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2619 .with_factorized_execution(false);
2620
2621 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2623 items: vec![
2624 ReturnItem {
2625 expression: LogicalExpression::Variable("a".to_string()),
2626 alias: None,
2627 },
2628 ReturnItem {
2629 expression: LogicalExpression::Variable("b".to_string()),
2630 alias: None,
2631 },
2632 ],
2633 distinct: false,
2634 input: Box::new(LogicalOperator::Expand(ExpandOp {
2635 from_variable: "a".to_string(),
2636 to_variable: "b".to_string(),
2637 edge_variable: None,
2638 direction: ExpandDirection::Both,
2639 edge_types: vec!["KNOWS".to_string()],
2640 min_hops: 1,
2641 max_hops: Some(1),
2642 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2643 variable: "a".to_string(),
2644 label: None,
2645 input: None,
2646 })),
2647 path_alias: None,
2648 path_mode: PathMode::Walk,
2649 })),
2650 }));
2651
2652 let physical = planner.plan(&logical).unwrap();
2653 assert!(physical.columns().contains(&"a".to_string()));
2654 assert!(physical.columns().contains(&"b".to_string()));
2655 }
2656
2657 #[test]
2660 fn test_planner_with_context() {
2661 use crate::transaction::TransactionManager;
2662
2663 let store = create_test_store();
2664 let transaction_manager = Arc::new(TransactionManager::new());
2665 let transaction_id = transaction_manager.begin();
2666 let epoch = transaction_manager.current_epoch();
2667
2668 let planner = Planner::with_context(
2669 Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
2670 Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2671 Arc::clone(&transaction_manager),
2672 Some(transaction_id),
2673 epoch,
2674 );
2675
2676 assert_eq!(planner.transaction_id(), Some(transaction_id));
2677 assert!(planner.transaction_manager().is_some());
2678 assert_eq!(planner.viewing_epoch(), epoch);
2679 }
2680
2681 #[test]
2682 fn test_planner_with_factorized_execution_disabled() {
2683 let store = create_test_store();
2684 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2685 .with_factorized_execution(false);
2686
2687 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2689 items: vec![
2690 ReturnItem {
2691 expression: LogicalExpression::Variable("a".to_string()),
2692 alias: None,
2693 },
2694 ReturnItem {
2695 expression: LogicalExpression::Variable("c".to_string()),
2696 alias: None,
2697 },
2698 ],
2699 distinct: false,
2700 input: Box::new(LogicalOperator::Expand(ExpandOp {
2701 from_variable: "b".to_string(),
2702 to_variable: "c".to_string(),
2703 edge_variable: None,
2704 direction: ExpandDirection::Outgoing,
2705 edge_types: vec![],
2706 min_hops: 1,
2707 max_hops: Some(1),
2708 input: Box::new(LogicalOperator::Expand(ExpandOp {
2709 from_variable: "a".to_string(),
2710 to_variable: "b".to_string(),
2711 edge_variable: None,
2712 direction: ExpandDirection::Outgoing,
2713 edge_types: vec![],
2714 min_hops: 1,
2715 max_hops: Some(1),
2716 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2717 variable: "a".to_string(),
2718 label: None,
2719 input: None,
2720 })),
2721 path_alias: None,
2722 path_mode: PathMode::Walk,
2723 })),
2724 path_alias: None,
2725 path_mode: PathMode::Walk,
2726 })),
2727 }));
2728
2729 let physical = planner.plan(&logical).unwrap();
2730 assert!(physical.columns().contains(&"a".to_string()));
2731 assert!(physical.columns().contains(&"c".to_string()));
2732 }
2733
2734 #[test]
2737 fn test_plan_sort_by_property() {
2738 let store = create_test_store();
2739 let planner = Planner::new(store);
2740
2741 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2743 items: vec![ReturnItem {
2744 expression: LogicalExpression::Variable("n".to_string()),
2745 alias: None,
2746 }],
2747 distinct: false,
2748 input: Box::new(LogicalOperator::Sort(SortOp {
2749 keys: vec![SortKey {
2750 expression: LogicalExpression::Property {
2751 variable: "n".to_string(),
2752 property: "name".to_string(),
2753 },
2754 order: SortOrder::Ascending,
2755 nulls: None,
2756 }],
2757 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2758 variable: "n".to_string(),
2759 label: None,
2760 input: None,
2761 })),
2762 })),
2763 }));
2764
2765 let physical = planner.plan(&logical).unwrap();
2766 assert!(physical.columns().contains(&"n".to_string()));
2768 }
2769
2770 #[test]
2773 fn test_plan_scan_with_input() {
2774 let store = create_test_store();
2775 let planner = Planner::new(store);
2776
2777 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2779 items: vec![
2780 ReturnItem {
2781 expression: LogicalExpression::Variable("a".to_string()),
2782 alias: None,
2783 },
2784 ReturnItem {
2785 expression: LogicalExpression::Variable("b".to_string()),
2786 alias: None,
2787 },
2788 ],
2789 distinct: false,
2790 input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2791 variable: "b".to_string(),
2792 label: Some("Company".to_string()),
2793 input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2794 variable: "a".to_string(),
2795 label: Some("Person".to_string()),
2796 input: None,
2797 }))),
2798 })),
2799 }));
2800
2801 let physical = planner.plan(&logical).unwrap();
2802 assert!(physical.columns().contains(&"a".to_string()));
2803 assert!(physical.columns().contains(&"b".to_string()));
2804 }
2805
2806 use crate::catalog::Catalog;
2814 use crate::query::plan::{
2815 AddLabelOp, AntiJoinOp, ApplyOp, BindOp, DeleteEdgeOp, EdgeScanOp, ExceptOp,
2816 HorizontalAggregateOp, IntersectOp, LeftJoinOp, LoadDataFormat, LoadDataOp, MapCollectOp,
2817 MergeOp, MergeRelationshipOp, MultiWayJoinOp, OtherwiseOp, ParameterScanOp, RemoveLabelOp,
2818 SetPropertyOp, ShortestPathOp, TripleComponent, TripleScanOp, UnionOp, UnwindOp,
2819 };
2820 use grafeo_core::execution::operators::{Operator, SessionContext};
2821
2822 fn full_store() -> Arc<LpgStore> {
2823 let store = Arc::new(LpgStore::new().unwrap());
2825 let vincent = store.create_node(&["Person"]);
2826 let jules = store.create_node(&["Person"]);
2827 let mia = store.create_node(&["Person"]);
2828 let _company = store.create_node(&["Company"]);
2829 store.create_edge(vincent, jules, "KNOWS");
2830 store.create_edge(jules, mia, "KNOWS");
2831 store
2832 }
2833
2834 fn scan_person(var: &str) -> LogicalOperator {
2835 LogicalOperator::NodeScan(NodeScanOp {
2836 variable: var.to_string(),
2837 label: Some("Person".to_string()),
2838 input: None,
2839 })
2840 }
2841
2842 fn scan_any(var: &str) -> LogicalOperator {
2843 LogicalOperator::NodeScan(NodeScanOp {
2844 variable: var.to_string(),
2845 label: None,
2846 input: None,
2847 })
2848 }
2849
2850 #[test]
2853 fn test_with_read_only_flag() {
2854 let store = create_test_store();
2855 let planner =
2856 Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(true);
2857 assert!(planner.read_only);
2858
2859 let planner_off =
2860 Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(false);
2861 assert!(!planner_off.read_only);
2862 }
2863
2864 #[test]
2865 fn test_with_catalog() {
2866 let store = create_test_store();
2867 let catalog = Arc::new(Catalog::new());
2868 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2869 .with_catalog(Arc::clone(&catalog));
2870 assert!(planner.catalog.is_some());
2871 }
2872
2873 #[test]
2874 fn test_with_session_context() {
2875 let store = create_test_store();
2876 let context = SessionContext {
2877 current_schema: Some("public".to_string()),
2878 current_graph: Some("main".to_string()),
2879 ..SessionContext::default()
2880 };
2881 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2882 .with_session_context(context);
2883 assert_eq!(
2884 planner.session_context.current_schema.as_deref(),
2885 Some("public")
2886 );
2887 assert_eq!(
2888 planner.session_context.current_graph.as_deref(),
2889 Some("main")
2890 );
2891 }
2892
2893 #[test]
2896 fn test_register_edge_column_named() {
2897 let store = create_test_store();
2898 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2899 let name = planner.register_edge_column(&Some("r".to_string()));
2900 assert_eq!(name, "r");
2901 assert!(planner.edge_columns.borrow().contains("r"));
2902 }
2903
2904 #[test]
2905 fn test_register_edge_column_anonymous_counter_advances() {
2906 let store = create_test_store();
2907 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2908 let a = planner.register_edge_column(&None);
2909 let b = planner.register_edge_column(&None);
2910 assert_eq!(a, "_anon_edge_0");
2911 assert_eq!(b, "_anon_edge_1");
2912 assert!(planner.edge_columns.borrow().contains("_anon_edge_0"));
2913 assert!(planner.edge_columns.borrow().contains("_anon_edge_1"));
2914 }
2915
2916 #[test]
2919 fn test_create_node_without_write_store_errors() {
2920 let store = create_test_store();
2922 let planner = Planner::new(store);
2923
2924 let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2925 variable: "n".to_string(),
2926 labels: vec!["Person".to_string()],
2927 properties: vec![],
2928 input: None,
2929 }));
2930
2931 let result = planner.plan(&logical);
2932 assert!(result.is_err());
2933 }
2934
2935 #[test]
2938 fn test_plan_profiled_collects_entries() {
2939 let store = create_test_store();
2940 let planner = Planner::new(store);
2941
2942 let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2943 items: vec![ReturnItem {
2944 expression: LogicalExpression::Variable("n".to_string()),
2945 alias: None,
2946 }],
2947 distinct: false,
2948 input: Box::new(scan_person("n")),
2949 }));
2950
2951 let (physical, entries) = planner.plan_profiled(&logical).unwrap();
2952 assert_eq!(physical.columns(), &["n"]);
2953 assert!(
2955 entries.len() >= 2,
2956 "expected entries, got {}",
2957 entries.len()
2958 );
2959 assert!(!planner.profiling.get());
2961 }
2962
2963 #[test]
2964 fn test_plan_profiled_propagates_plan_errors() {
2965 let store = create_test_store();
2966 let planner = Planner::new(store);
2967 let logical = LogicalPlan::new(LogicalOperator::Empty);
2968 let result = planner.plan_profiled(&logical);
2969 assert!(result.is_err());
2970 assert!(!planner.profiling.get());
2972 }
2973
2974 #[test]
2977 fn test_plan_edge_scan_is_unsupported() {
2978 let store = create_test_store();
2980 let planner = Planner::new(store);
2981 let logical = LogicalPlan::new(LogicalOperator::EdgeScan(EdgeScanOp {
2982 variable: "e".to_string(),
2983 edge_types: vec![],
2984 input: None,
2985 }));
2986 let err = planner.plan(&logical).err().expect("plan should fail");
2987 assert!(format!("{err}").contains("Unsupported operator"));
2988 }
2989
2990 #[test]
2991 fn test_plan_triple_scan_is_unsupported() {
2992 let store = create_test_store();
2993 let planner = Planner::new(store);
2994 let logical = LogicalPlan::new(LogicalOperator::TripleScan(TripleScanOp {
2995 subject: TripleComponent::Variable("s".to_string()),
2996 predicate: TripleComponent::Variable("p".to_string()),
2997 object: TripleComponent::Variable("o".to_string()),
2998 graph: None,
2999 input: None,
3000 dataset: None,
3001 }));
3002 assert!(planner.plan(&logical).is_err());
3003 }
3004
3005 #[test]
3006 fn test_plan_bind_is_unsupported() {
3007 let store = create_test_store();
3008 let planner = Planner::new(store);
3009 let logical = LogicalPlan::new(LogicalOperator::Bind(BindOp {
3010 expression: LogicalExpression::Literal(Value::Int64(1)),
3011 variable: "x".to_string(),
3012 input: Box::new(scan_any("n")),
3013 }));
3014 assert!(planner.plan(&logical).is_err());
3015 }
3016
3017 #[test]
3018 fn test_plan_parameter_scan_without_apply_errors() {
3019 let store = create_test_store();
3020 let planner = Planner::new(store);
3021 let logical = LogicalPlan::new(LogicalOperator::ParameterScan(ParameterScanOp {
3022 columns: vec!["n".to_string()],
3023 }));
3024 let err = planner.plan(&logical).err().expect("plan should fail");
3025 assert!(format!("{err}").contains("ParameterScan"));
3026 }
3027
3028 #[test]
3031 fn test_plan_union_dispatch() {
3032 let store = create_test_store();
3033 let planner = Planner::new(store);
3034 let logical = LogicalPlan::new(LogicalOperator::Union(UnionOp {
3035 inputs: vec![scan_person("n"), scan_person("n")],
3036 }));
3037 let physical = planner.plan(&logical).unwrap();
3038 assert_eq!(physical.columns(), &["n"]);
3039 }
3040
3041 #[test]
3042 fn test_plan_except_dispatch() {
3043 let store = create_test_store();
3044 let planner = Planner::new(store);
3045 let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3046 left: Box::new(scan_person("n")),
3047 right: Box::new(scan_person("n")),
3048 all: false,
3049 }));
3050 let physical = planner.plan(&logical).unwrap();
3051 assert_eq!(physical.columns(), &["n"]);
3052 }
3053
3054 #[test]
3055 fn test_plan_intersect_dispatch() {
3056 let store = create_test_store();
3057 let planner = Planner::new(store);
3058 let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3059 left: Box::new(scan_person("n")),
3060 right: Box::new(scan_person("n")),
3061 all: false,
3062 }));
3063 let physical = planner.plan(&logical).unwrap();
3064 assert_eq!(physical.columns(), &["n"]);
3065 }
3066
3067 #[test]
3068 fn test_plan_otherwise_dispatch() {
3069 let store = create_test_store();
3070 let planner = Planner::new(store);
3071 let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3072 left: Box::new(scan_person("n")),
3073 right: Box::new(scan_any("n")),
3074 }));
3075 let physical = planner.plan(&logical).unwrap();
3076 assert_eq!(physical.columns(), &["n"]);
3077 }
3078
3079 #[test]
3080 fn test_plan_left_join_dispatch() {
3081 let store = create_test_store();
3082 let planner = Planner::new(store);
3083 let logical = LogicalPlan::new(LogicalOperator::LeftJoin(LeftJoinOp {
3084 left: Box::new(scan_any("a")),
3085 right: Box::new(scan_any("b")),
3086 condition: None,
3087 }));
3088 let physical = planner.plan(&logical).unwrap();
3089 assert!(physical.columns().contains(&"a".to_string()));
3090 assert!(physical.columns().contains(&"b".to_string()));
3091 }
3092
3093 #[test]
3094 fn test_plan_anti_join_dispatch() {
3095 let store = create_test_store();
3096 let planner = Planner::new(store);
3097 let logical = LogicalPlan::new(LogicalOperator::AntiJoin(AntiJoinOp {
3098 left: Box::new(scan_any("a")),
3099 right: Box::new(scan_any("b")),
3100 }));
3101 let physical = planner.plan(&logical).unwrap();
3102 assert!(physical.columns().contains(&"a".to_string()));
3103 }
3104
3105 #[test]
3106 fn test_plan_apply_uncorrelated_dispatch() {
3107 let store = create_test_store();
3108 let planner = Planner::new(store);
3109 let logical = LogicalPlan::new(LogicalOperator::Apply(ApplyOp {
3110 input: Box::new(scan_any("a")),
3111 subplan: Box::new(scan_any("b")),
3112 shared_variables: vec![],
3113 optional: false,
3114 }));
3115 let physical = planner.plan(&logical).unwrap();
3116 assert!(physical.columns().contains(&"a".to_string()));
3117 assert!(physical.columns().contains(&"b".to_string()));
3118 }
3119
3120 #[test]
3121 fn test_plan_unwind_literal_list() {
3122 let store = create_test_store();
3123 let planner = Planner::new(store);
3124
3125 let logical = LogicalPlan::new(LogicalOperator::Unwind(UnwindOp {
3127 expression: LogicalExpression::List(vec![
3128 LogicalExpression::Literal(Value::Int64(1)),
3129 LogicalExpression::Literal(Value::Int64(2)),
3130 LogicalExpression::Literal(Value::Int64(3)),
3131 ]),
3132 variable: "x".to_string(),
3133 ordinality_var: None,
3134 offset_var: None,
3135 input: Box::new(LogicalOperator::Empty),
3136 }));
3137 let physical = planner.plan(&logical).unwrap();
3138 assert!(physical.columns().contains(&"x".to_string()));
3139 }
3140
3141 #[test]
3142 fn test_plan_merge_dispatch() {
3143 let store = create_test_store();
3144 let planner = create_writable_planner(&store);
3145
3146 let logical = LogicalPlan::new(LogicalOperator::Merge(MergeOp {
3148 variable: "n".to_string(),
3149 labels: vec!["Person".to_string()],
3150 match_properties: vec![],
3151 on_create: vec![],
3152 on_match: vec![],
3153 input: Box::new(LogicalOperator::Empty),
3154 }));
3155 let physical = planner.plan(&logical).unwrap();
3156 assert!(physical.columns().contains(&"n".to_string()));
3157 }
3158
3159 #[test]
3160 fn test_plan_merge_relationship_dispatch() {
3161 let store = full_store();
3162 let planner = create_writable_planner(&store);
3163
3164 let logical = LogicalPlan::new(LogicalOperator::MergeRelationship(MergeRelationshipOp {
3166 variable: "r".to_string(),
3167 source_variable: "a".to_string(),
3168 target_variable: "b".to_string(),
3169 edge_type: "KNOWS".to_string(),
3170 match_properties: vec![],
3171 on_create: vec![],
3172 on_match: vec![],
3173 input: Box::new(LogicalOperator::Join(JoinOp {
3174 left: Box::new(scan_person("a")),
3175 right: Box::new(scan_person("b")),
3176 join_type: JoinType::Cross,
3177 conditions: vec![],
3178 })),
3179 }));
3180 let physical = planner.plan(&logical).unwrap();
3181 assert!(physical.columns().contains(&"r".to_string()));
3182 }
3183
3184 #[test]
3185 fn test_plan_add_label_dispatch() {
3186 let store = full_store();
3187 let planner = create_writable_planner(&store);
3188 let logical = LogicalPlan::new(LogicalOperator::AddLabel(AddLabelOp {
3189 variable: "n".to_string(),
3190 labels: vec!["VIP".to_string()],
3191 input: Box::new(scan_person("n")),
3192 }));
3193 let physical = planner.plan(&logical).unwrap();
3194 assert!(physical.columns().contains(&"labels_added".to_string()));
3195 }
3196
3197 #[test]
3198 fn test_plan_remove_label_dispatch() {
3199 let store = full_store();
3200 let planner = create_writable_planner(&store);
3201 let logical = LogicalPlan::new(LogicalOperator::RemoveLabel(RemoveLabelOp {
3202 variable: "n".to_string(),
3203 labels: vec!["Person".to_string()],
3204 input: Box::new(scan_person("n")),
3205 }));
3206 let physical = planner.plan(&logical).unwrap();
3207 assert!(physical.columns().contains(&"labels_removed".to_string()));
3208 }
3209
3210 #[test]
3211 fn test_plan_set_property_dispatch() {
3212 let store = full_store();
3213 let planner = create_writable_planner(&store);
3214 let logical = LogicalPlan::new(LogicalOperator::SetProperty(SetPropertyOp {
3215 variable: "n".to_string(),
3216 properties: vec![(
3217 "city".to_string(),
3218 LogicalExpression::Literal(Value::String("Amsterdam".into())),
3219 )],
3220 replace: false,
3221 is_edge: false,
3222 input: Box::new(scan_person("n")),
3223 }));
3224 let physical = planner.plan(&logical).unwrap();
3225 assert!(physical.columns().contains(&"n".to_string()));
3226 }
3227
3228 #[test]
3229 fn test_plan_delete_edge_dispatch() {
3230 let store = full_store();
3231 let planner = create_writable_planner(&store);
3232
3233 let expand_op = LogicalOperator::Expand(ExpandOp {
3235 from_variable: "a".to_string(),
3236 to_variable: "b".to_string(),
3237 edge_variable: Some("r".to_string()),
3238 direction: ExpandDirection::Outgoing,
3239 edge_types: vec!["KNOWS".to_string()],
3240 min_hops: 1,
3241 max_hops: Some(1),
3242 input: Box::new(scan_person("a")),
3243 path_alias: None,
3244 path_mode: PathMode::Walk,
3245 });
3246 let logical = LogicalPlan::new(LogicalOperator::DeleteEdge(DeleteEdgeOp {
3247 variable: "r".to_string(),
3248 input: Box::new(expand_op),
3249 }));
3250 let physical = planner.plan(&logical).unwrap();
3251 assert!(physical.columns().contains(&"r".to_string()));
3252 }
3253
3254 #[test]
3255 fn test_plan_shortest_path_dispatch() {
3256 let store = full_store();
3257 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3258
3259 let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3261 input: Box::new(LogicalOperator::Join(JoinOp {
3262 left: Box::new(scan_person("a")),
3263 right: Box::new(scan_person("b")),
3264 join_type: JoinType::Cross,
3265 conditions: vec![],
3266 })),
3267 source_var: "a".to_string(),
3268 target_var: "b".to_string(),
3269 edge_types: vec!["KNOWS".to_string()],
3270 direction: ExpandDirection::Outgoing,
3271 path_alias: "p".to_string(),
3272 all_paths: false,
3273 }));
3274 let physical = planner.plan(&logical).unwrap();
3275 assert!(
3276 physical
3277 .columns()
3278 .iter()
3279 .any(|c| c.contains("_path_length_p"))
3280 );
3281 }
3282
3283 #[test]
3284 fn test_plan_shortest_path_missing_source_errors() {
3285 let store = full_store();
3286 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3287 let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3288 input: Box::new(scan_person("a")),
3289 source_var: "missing".to_string(),
3290 target_var: "a".to_string(),
3291 edge_types: vec![],
3292 direction: ExpandDirection::Both,
3293 path_alias: "p".to_string(),
3294 all_paths: false,
3295 }));
3296 let err = planner.plan(&logical).err().expect("plan should fail");
3297 assert!(format!("{err}").contains("Source variable"));
3298 }
3299
3300 #[test]
3301 fn test_plan_map_collect_dispatch() {
3302 let store = create_test_store();
3304 let planner = Planner::new(store);
3305 let input_with_kv = LogicalOperator::Project(crate::query::plan::ProjectOp {
3306 projections: vec![
3307 crate::query::plan::Projection {
3308 expression: LogicalExpression::Literal(Value::String("key".into())),
3309 alias: Some("k".to_string()),
3310 },
3311 crate::query::plan::Projection {
3312 expression: LogicalExpression::Literal(Value::Int64(1)),
3313 alias: Some("v".to_string()),
3314 },
3315 ],
3316 input: Box::new(scan_person("n")),
3317 pass_through_input: false,
3318 });
3319 let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3320 key_var: "k".to_string(),
3321 value_var: "v".to_string(),
3322 alias: "m".to_string(),
3323 input: Box::new(input_with_kv),
3324 }));
3325 let physical = planner.plan(&logical).unwrap();
3326 assert_eq!(physical.columns(), &["m"]);
3327 }
3328
3329 #[test]
3330 fn test_plan_map_collect_missing_key_errors() {
3331 let store = create_test_store();
3332 let planner = Planner::new(store);
3333 let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3334 key_var: "not_there".to_string(),
3335 value_var: "also_missing".to_string(),
3336 alias: "m".to_string(),
3337 input: Box::new(scan_any("n")),
3338 }));
3339 let err = planner.plan(&logical).err().expect("plan should fail");
3340 let msg = format!("{err}");
3341 assert!(msg.contains("MapCollect key"), "got: {msg}");
3342 }
3343
3344 #[test]
3345 fn test_plan_map_collect_missing_value_errors() {
3346 let store = create_test_store();
3347 let planner = Planner::new(store);
3348 let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3350 key_var: "n".to_string(),
3351 value_var: "missing_value".to_string(),
3352 alias: "m".to_string(),
3353 input: Box::new(scan_any("n")),
3354 }));
3355 let err = planner.plan(&logical).err().expect("plan should fail");
3356 let msg = format!("{err}");
3357 assert!(msg.contains("MapCollect value"), "got: {msg}");
3358 }
3359
3360 #[test]
3361 fn test_plan_horizontal_aggregate_missing_column_errors() {
3362 let store = create_test_store();
3363 let planner = Planner::new(store);
3364 let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3365 HorizontalAggregateOp {
3366 list_column: "not_a_column".to_string(),
3367 entity_kind: crate::query::plan::EntityKind::Edge,
3368 function: LogicalAggregateFunction::Count,
3369 property: "age".to_string(),
3370 alias: "total".to_string(),
3371 input: Box::new(scan_any("n")),
3372 },
3373 ));
3374 let err = planner.plan(&logical).err().expect("plan should fail");
3375 assert!(format!("{err}").contains("HorizontalAggregate"));
3376 }
3377
3378 #[test]
3379 fn test_plan_load_data_dispatch() {
3380 let store = create_test_store();
3381 let planner = Planner::new(store);
3382 let logical = LogicalPlan::new(LogicalOperator::LoadData(LoadDataOp {
3384 format: LoadDataFormat::Csv,
3385 with_headers: true,
3386 path: "/nonexistent/data.csv".to_string(),
3387 variable: "row".to_string(),
3388 field_terminator: Some(','),
3389 }));
3390 let physical = planner.plan(&logical).unwrap();
3391 assert_eq!(physical.columns(), &["row"]);
3392 }
3393
3394 #[test]
3395 fn test_plan_multi_way_join_dispatch() {
3396 let store = full_store();
3399 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3400 let ab = LogicalOperator::Expand(ExpandOp {
3401 from_variable: "a".to_string(),
3402 to_variable: "b".to_string(),
3403 edge_variable: None,
3404 direction: ExpandDirection::Outgoing,
3405 edge_types: vec!["KNOWS".to_string()],
3406 min_hops: 1,
3407 max_hops: Some(1),
3408 input: Box::new(scan_person("a")),
3409 path_alias: None,
3410 path_mode: PathMode::Walk,
3411 });
3412 let bc = LogicalOperator::Expand(ExpandOp {
3413 from_variable: "b".to_string(),
3414 to_variable: "c".to_string(),
3415 edge_variable: None,
3416 direction: ExpandDirection::Outgoing,
3417 edge_types: vec!["KNOWS".to_string()],
3418 min_hops: 1,
3419 max_hops: Some(1),
3420 input: Box::new(scan_person("b")),
3421 path_alias: None,
3422 path_mode: PathMode::Walk,
3423 });
3424 let ca = LogicalOperator::Expand(ExpandOp {
3425 from_variable: "c".to_string(),
3426 to_variable: "a".to_string(),
3427 edge_variable: None,
3428 direction: ExpandDirection::Outgoing,
3429 edge_types: vec!["KNOWS".to_string()],
3430 min_hops: 1,
3431 max_hops: Some(1),
3432 input: Box::new(scan_person("c")),
3433 path_alias: None,
3434 path_mode: PathMode::Walk,
3435 });
3436 let logical = LogicalPlan::new(LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3437 inputs: vec![ab, bc, ca],
3438 conditions: vec![],
3439 shared_variables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
3440 }));
3441 let _ = planner.plan(&logical);
3442 }
3443
3444 #[test]
3445 fn test_plan_horizontal_aggregate_dispatch() {
3446 let store = full_store();
3448 let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3449
3450 let path = LogicalOperator::Expand(ExpandOp {
3451 from_variable: "a".to_string(),
3452 to_variable: "b".to_string(),
3453 edge_variable: Some("r".to_string()),
3454 direction: ExpandDirection::Outgoing,
3455 edge_types: vec!["KNOWS".to_string()],
3456 min_hops: 1,
3457 max_hops: Some(3),
3458 input: Box::new(scan_person("a")),
3459 path_alias: Some("p".to_string()),
3460 path_mode: PathMode::Walk,
3461 });
3462 let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3464 HorizontalAggregateOp {
3465 list_column: "_path_edges_p".to_string(),
3466 entity_kind: crate::query::plan::EntityKind::Edge,
3467 function: LogicalAggregateFunction::Count,
3468 property: "weight".to_string(),
3469 alias: "edge_count".to_string(),
3470 input: Box::new(path),
3471 },
3472 ));
3473 let physical = planner.plan(&logical).unwrap();
3474 assert!(physical.columns().contains(&"edge_count".to_string()));
3475 }
3476
3477 #[test]
3480 fn test_plan_adaptive_with_except() {
3481 let store = create_test_store();
3482 let planner = Planner::new(store);
3483 let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3484 left: Box::new(scan_person("n")),
3485 right: Box::new(scan_person("n")),
3486 all: false,
3487 }));
3488 let physical = planner.plan_adaptive(&logical).unwrap();
3489 assert!(physical.adaptive_context.is_some());
3490 }
3491
3492 #[test]
3493 fn test_plan_adaptive_with_intersect() {
3494 let store = create_test_store();
3495 let planner = Planner::new(store);
3496 let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3497 left: Box::new(scan_person("n")),
3498 right: Box::new(scan_any("n")),
3499 all: false,
3500 }));
3501 let physical = planner.plan_adaptive(&logical).unwrap();
3502 assert!(physical.adaptive_context.is_some());
3503 }
3504
3505 #[test]
3506 fn test_plan_adaptive_with_otherwise() {
3507 let store = create_test_store();
3508 let planner = Planner::new(store);
3509 let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3510 left: Box::new(scan_person("n")),
3511 right: Box::new(scan_any("n")),
3512 }));
3513 let physical = planner.plan_adaptive(&logical).unwrap();
3514 assert!(physical.adaptive_context.is_some());
3515 }
3516
3517 #[test]
3520 fn test_count_expand_chain_variable_length_breaks_chain() {
3521 let var_expand = LogicalOperator::Expand(ExpandOp {
3523 from_variable: "a".to_string(),
3524 to_variable: "b".to_string(),
3525 edge_variable: None,
3526 direction: ExpandDirection::Outgoing,
3527 edge_types: vec!["KNOWS".to_string()],
3528 min_hops: 1,
3529 max_hops: Some(3),
3530 input: Box::new(scan_person("a")),
3531 path_alias: None,
3532 path_mode: PathMode::Walk,
3533 });
3534 let (count, _) = Planner::count_expand_chain(&var_expand);
3535 assert_eq!(count, 0);
3536 }
3537
3538 #[cfg(feature = "algos")]
3541 #[test]
3542 fn test_static_result_operator_emits_rows_and_resets() {
3543 use grafeo_common::types::Value;
3544 let rows = vec![
3545 vec![Value::Int64(1), Value::String("Vincent".into())],
3546 vec![Value::Int64(2), Value::String("Jules".into())],
3547 ];
3548 let mut op = StaticResultOperator {
3549 rows,
3550 column_indices: vec![0, 1],
3551 row_index: 0,
3552 };
3553 assert_eq!(op.name(), "StaticResult");
3554 let chunk = op.next().unwrap().expect("first chunk");
3555 assert_eq!(chunk.row_count(), 2);
3556 assert!(op.next().unwrap().is_none());
3558 op.reset();
3560 assert!(op.next().unwrap().is_some());
3561 let boxed: Box<dyn Operator> = Box::new(StaticResultOperator {
3563 rows: vec![vec![Value::Null]],
3564 column_indices: vec![0],
3565 row_index: 0,
3566 });
3567 let _any = boxed.into_any();
3568 }
3569
3570 #[cfg(feature = "vector-index")]
3583 #[test]
3584 fn test_plan_vector_scan_k_none_bounds_across_label_states() {
3585 use crate::query::plan::VectorScanOp;
3586
3587 let store = create_test_store();
3588 assert_eq!(store.nodes_by_label_count("Person"), 2);
3590 assert_eq!(store.nodes_by_label_count("Unknown"), 0);
3591 assert_eq!(store.node_count(), 3);
3592
3593 let planner = Planner::new(store);
3594 let make_scan = |label: Option<&str>| VectorScanOp {
3595 variable: "n".to_string(),
3596 index_name: None,
3597 property: "embedding".to_string(),
3598 label: label.map(str::to_string),
3599 query_vector: LogicalExpression::Literal(Value::List(
3600 vec![
3601 Value::Float64(1.0),
3602 Value::Float64(0.0),
3603 Value::Float64(0.0),
3604 ]
3605 .into(),
3606 )),
3607 k: None,
3608 metric: Some(VectorMetric::Cosine),
3609 min_similarity: Some(0.5),
3610 max_distance: None,
3611 input: None,
3612 };
3613
3614 for label in [Some("Person"), Some("Unknown"), None] {
3615 let (_op, cols) = planner
3616 .plan_vector_scan(&make_scan(label))
3617 .unwrap_or_else(|e| panic!("plan_vector_scan failed for {label:?}: {e:?}"));
3618 assert_eq!(cols[0], "n", "variable column must be first for {label:?}");
3619 }
3620 }
3621}