Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5//!
6//! # Structure
7//!
8//! Logical operators are consumed by `plan_operator`, dispatched to
9//! per-operator submodules that share the crate-private `Planner`
10//! struct:
11//!
12//! | Submodule    | Owns                                                         |
13//! |--------------|--------------------------------------------------------------|
14//! | `scan`       | `plan_node_scan`: label, property, and index-seek scans      |
15//! | `expand`     | single-hop and multi-hop expansion (factorized chain)        |
16//! | `filter`     | predicate rewriting, zone maps, index pushdown, EXISTS/COUNT |
17//! | `expression` | `LogicalExpression` -> `FilterExpression` conversion         |
18//! | `join`       | hash, nested-loop, leapfrog, multi-way, semi/anti            |
19//! | `aggregate`, `project`, `mutation` | the rest of the pipeline               |
20//!
21//! # Rewrites applied during planning
22//!
23//! The planner is not a pure tree walker: it performs several shape
24//! transforms up-front because they decide which *physical* operator is
25//! legal, not just which is faster. Each rewrite lives in the submodule
26//! that owns the corresponding logical operator.
27//!
28//! 1. **Factorized expand chains** (`expand::plan_expand_chain`):
29//!    two or more consecutive `Expand`s are collapsed into a single
30//!    `LazyFactorizedChainOperator`. A hand-rolled tree of `Expand`
31//!    operators would materialise the Cartesian product between hops
32//!    (O(d^k) rows for a k-hop query with average degree d); factorisation
33//!    represents each hop as a set per row and evaluates upward filters
34//!    lazily, which is asymptotically better *and* preserves filter
35//!    semantics (filters apply to the flattened view). Disabled when
36//!    `factorized_execution` is false on the planner, used as an escape
37//!    hatch for operators that need materialised intermediate state.
38//!
39//! 2. **EXISTS / NOT EXISTS as semi/anti joins** (`filter::extract_complex_exists`):
40//!    anything more than a trivial single-hop EXISTS is rewritten to a
41//!    semi-join (EXISTS) or anti-join (NOT EXISTS) before the filter is
42//!    planned. A literal nested re-execution would re-scan the subquery
43//!    per outer row; the join form piggy-backs on the regular hash-join
44//!    infrastructure. The fast path in `expression::convert_expression`
45//!    keeps trivial single-hop EXISTS as inline predicates so small
46//!    queries stay scan-local.
47//!
48//! 3. **EXISTS inside OR** (`filter::extract_exists_from_or`):
49//!    semi-joins filter rows and therefore compose incorrectly with the
50//!    disjunctive scalar side of an OR. The rewrite splits the OR into a
51//!    semi-join branch and a regular filter branch, UNIONNs them, and
52//!    deduplicates. Without the split, rows satisfied only by the scalar
53//!    side would be dropped.
54//!
55//! 4. **COUNT subquery comparisons** (`filter::extract_count_comparison`):
56//!    `COUNT { ... } op N` rewrites into Apply + Aggregate + Filter.
57//!    Evaluating the COUNT inline per row would be a correlated
58//!    subquery; Apply + Aggregate reuses the hash-aggregate physical
59//!    operator.
60//!
61//! # Filter pushdown and index selection
62//!
63//! `filter::plan_filter` is the central point. It tries a sequence of
64//! optimisations before falling back to a generic `FilterOperator`; the
65//! ordering is load-bearing:
66//!
67//! 1. **Complex EXISTS / OR / COUNT rewrites** (above). These must run
68//!    first because later steps assume a pure-scalar predicate.
69//! 2. **Zone-map short-circuit**: if the column's zone summary proves the
70//!    predicate cannot match any row group, plan an `EmptyOperator`.
71//! 3. **Property index seek**: equality on an indexed property becomes an
72//!    index scan (O(1) per lookup) instead of a filtered full scan.
73//! 4. **Range index lookup**: `>`, `<`, `>=`, `<=` on an indexed
74//!    property becomes a range scan.
75//! 5. **Hybrid vector+text pushdown** (feature-gated): compound
76//!    AND/OR predicates over `vector_match` / `text_match` are folded
77//!    into a single fused scan.
78//! 6. **Generic `FilterOperator`**: the residual predicate wraps the
79//!    input scan.
80//!
81//! Whatever *remains* after a pushdown (e.g. the non-index-backed half
82//! of an AND) is planned as a residual filter on top of the chosen scan,
83//! so the returned tree is always semantically equivalent to the
84//! input plan.
85//!
86//! # Transaction context
87//!
88//! Every operator inherits the planner's `viewing_epoch` and
89//! `transaction_id`. Factorised chains and mutations carry them
90//! through `with_transaction_context()` so MVCC visibility is applied
91//! at every hop: see [`crate::transaction`] for the epoch/visibility
92//! rules.
93
94mod aggregate;
95mod expand;
96mod expression;
97mod filter;
98mod filter_hybrid;
99mod join;
100mod mutation;
101mod project;
102mod scan;
103
104#[cfg(feature = "algos")]
105use crate::query::plan::CallProcedureOp;
106#[cfg(feature = "text-index")]
107use crate::query::plan::TextScanOp;
108use crate::query::plan::{
109    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
110    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
111    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
112    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
113    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
114    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
115    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
116};
117#[cfg(feature = "vector-index")]
118use crate::query::plan::{VectorMetric, VectorScanOp};
119use grafeo_common::grafeo_debug_span;
120use grafeo_common::types::{EpochId, TransactionId};
121use grafeo_common::types::{LogicalType, Value};
122use grafeo_common::utils::error::{Error, Result};
123use grafeo_core::execution::AdaptiveContext;
124use grafeo_core::execution::operators::{
125    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
126    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
127    DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
128    ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
129    FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
130    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
131    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
132    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
133    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RangeScanOperator,
134    RemoveLabelOperator, ScanOperator, SetPropertyOperator, ShortestPathOperator,
135    SimpleAggregateOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
136    UnionOperator, UnwindOperator, VariableLengthExpandOperator,
137};
138use grafeo_core::graph::{Direction, GraphStoreMut, GraphStoreSearch};
139use std::collections::HashMap;
140use std::sync::Arc;
141
142use crate::query::planner::common;
143use crate::query::planner::common::expression_to_string;
144use crate::query::planner::{
145    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
146    convert_unary_op, value_to_logical_type,
147};
148use crate::transaction::TransactionManager;
149
150/// Range bounds for property-based range queries.
151struct RangeBounds<'a> {
152    min: Option<&'a Value>,
153    max: Option<&'a Value>,
154    min_inclusive: bool,
155    max_inclusive: bool,
156}
157
158/// Converts a logical plan to a physical operator tree for LPG stores.
159pub struct Planner {
160    /// The graph store (read-only operations).
161    pub(super) store: Arc<dyn GraphStoreSearch>,
162    /// Writable graph store (None for read-only databases).
163    pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
164    /// Transaction manager for MVCC operations.
165    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
166    /// Current transaction ID (if in a transaction).
167    pub(super) transaction_id: Option<TransactionId>,
168    /// Epoch to use for visibility checks.
169    pub(super) viewing_epoch: EpochId,
170    /// Counter for generating unique anonymous edge column names.
171    pub(super) anon_edge_counter: std::cell::Cell<u32>,
172    /// Whether to use factorized execution for multi-hop queries.
173    pub(super) factorized_execution: bool,
174    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
175    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
176    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
177    /// Variables that hold edge IDs (from MATCH edge patterns).
178    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
179    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
180    /// Optional constraint validator for schema enforcement during mutations.
181    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
182    /// Catalog for user-defined procedure lookup.
183    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
184    /// LPG store handle for procedures that need direct index access (vector
185    /// and text search reach HNSW / BM25 indexes owned by the LPG store).
186    #[cfg(feature = "lpg")]
187    pub(super) lpg_store: Option<Arc<grafeo_core::graph::lpg::LpgStore>>,
188    /// Shared parameter state for the currently planning correlated Apply.
189    /// Set by `plan_apply` before planning the inner operator, consumed by
190    /// `plan_operator` when encountering `ParameterScan`.
191    pub(super) correlated_param_state:
192        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
193    /// Variables from variable-length expand patterns (group-list variables).
194    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
195    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
196    /// When true, each physical operator is wrapped in `ProfiledOperator`.
197    profiling: std::cell::Cell<bool>,
198    /// Profile entries collected during planning (post-order).
199    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
200    /// Optional write tracker for recording writes during mutations.
201    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
202    /// Session context for introspection functions (info, schema, current_schema, etc.).
203    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
204    /// When true, expand operators use epoch-only visibility (no MVCC version
205    /// chain walks).  Set when the plan contains no mutations, so PENDING
206    /// writes are impossible to observe.
207    pub(super) read_only: bool,
208    /// LIMIT hint pushed down from `plan_limit` to leaf scan operators.
209    ///
210    /// Phase 4e: when `plan_limit`'s input is a Filter that plans to a
211    /// `RangeScanOperator`, the inner operator gets `with_limit(n)` so
212    /// its materialization step terminates after `n` matches. The outer
213    /// `LimitOperator` still wraps the result for correctness.
214    ///
215    /// Save-and-restore pattern: callers set the hint via
216    /// `Cell::replace`, recurse, then restore. This handles nested
217    /// LIMITs (e.g. subqueries) without state leaking across scopes.
218    pub(super) limit_hint: std::cell::Cell<Option<usize>>,
219}
220
221impl Planner {
222    /// Creates a new planner with the given store.
223    ///
224    /// This creates a planner without transaction context, using the current
225    /// epoch from the store for visibility.
226    #[must_use]
227    pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
228        let epoch = store.current_epoch();
229        Self {
230            store,
231            write_store: None,
232            transaction_manager: None,
233            transaction_id: None,
234            viewing_epoch: epoch,
235            anon_edge_counter: std::cell::Cell::new(0),
236            factorized_execution: true,
237            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
238            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
239            validator: None,
240            catalog: None,
241            #[cfg(feature = "lpg")]
242            lpg_store: None,
243            correlated_param_state: std::cell::RefCell::new(None),
244            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
245            profiling: std::cell::Cell::new(false),
246            profile_entries: std::cell::RefCell::new(Vec::new()),
247            write_tracker: None,
248            session_context: grafeo_core::execution::operators::SessionContext::default(),
249            read_only: false,
250            limit_hint: std::cell::Cell::new(None),
251        }
252    }
253
254    /// Creates a new planner with transaction context for MVCC-aware planning.
255    #[must_use]
256    pub fn with_context(
257        store: Arc<dyn GraphStoreSearch>,
258        write_store: Option<Arc<dyn GraphStoreMut>>,
259        transaction_manager: Arc<TransactionManager>,
260        transaction_id: Option<TransactionId>,
261        viewing_epoch: EpochId,
262    ) -> Self {
263        use crate::transaction::TransactionWriteTracker;
264
265        // Create write tracker when there's an active transaction
266        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
267            if transaction_id.is_some() {
268                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
269                    &transaction_manager,
270                ))))
271            } else {
272                None
273            };
274
275        Self {
276            store,
277            write_store,
278            transaction_manager: Some(transaction_manager),
279            transaction_id,
280            viewing_epoch,
281            anon_edge_counter: std::cell::Cell::new(0),
282            factorized_execution: true,
283            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
284            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
285            validator: None,
286            catalog: None,
287            #[cfg(feature = "lpg")]
288            lpg_store: None,
289            correlated_param_state: std::cell::RefCell::new(None),
290            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
291            profiling: std::cell::Cell::new(false),
292            profile_entries: std::cell::RefCell::new(Vec::new()),
293            write_tracker,
294            session_context: grafeo_core::execution::operators::SessionContext::default(),
295            read_only: false,
296            limit_hint: std::cell::Cell::new(None),
297        }
298    }
299
300    /// Marks this planner as planning a read-only query (no mutations),
301    /// enabling fast-path visibility checks in expand operators.
302    #[must_use]
303    pub fn with_read_only(mut self, read_only: bool) -> Self {
304        self.read_only = read_only;
305        self
306    }
307
308    /// Returns the writable store, or `TransactionError::ReadOnly` if unavailable.
309    fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
310        self.write_store
311            .as_ref()
312            .map(Arc::clone)
313            .ok_or(Error::Transaction(
314                grafeo_common::utils::error::TransactionError::ReadOnly,
315            ))
316    }
317
318    /// Returns the viewing epoch for this planner.
319    #[must_use]
320    pub fn viewing_epoch(&self) -> EpochId {
321        self.viewing_epoch
322    }
323
324    /// Returns the transaction ID for this planner, if any.
325    #[must_use]
326    pub fn transaction_id(&self) -> Option<TransactionId> {
327        self.transaction_id
328    }
329
330    /// Returns a reference to the transaction manager, if available.
331    #[must_use]
332    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
333        self.transaction_manager.as_ref()
334    }
335
336    /// Enables or disables factorized execution for multi-hop queries.
337    #[must_use]
338    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
339        self.factorized_execution = enabled;
340        self
341    }
342
343    /// Sets the constraint validator for schema enforcement during mutations.
344    #[must_use]
345    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
346        self.validator = Some(validator);
347        self
348    }
349
350    /// Sets the catalog for user-defined procedure lookup.
351    #[must_use]
352    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
353        self.catalog = Some(catalog);
354        self
355    }
356
357    /// Attaches an LPG store handle so `CALL grafeo.search.*` procedures can
358    /// reach the vector and text indexes.
359    #[cfg(feature = "lpg")]
360    #[must_use]
361    pub fn with_lpg_store(mut self, lpg_store: Arc<grafeo_core::graph::lpg::LpgStore>) -> Self {
362        self.lpg_store = Some(lpg_store);
363        self
364    }
365
366    /// Sets the session context for introspection functions.
367    #[must_use]
368    pub fn with_session_context(
369        mut self,
370        context: grafeo_core::execution::operators::SessionContext,
371    ) -> Self {
372        self.session_context = context;
373        self
374    }
375
376    /// Generates an edge column name from an expand's edge variable (or an
377    /// anonymous fallback) and registers it in `edge_columns` so downstream
378    /// RETURN emits `EdgeResolve` instead of `NodeResolve`.
379    ///
380    /// Returns the column name for the caller to push into its output columns.
381    pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
382        let name = edge_variable.clone().unwrap_or_else(|| {
383            let count = self.anon_edge_counter.get();
384            self.anon_edge_counter.set(count + 1);
385            format!("_anon_edge_{}", count)
386        });
387        self.edge_columns.borrow_mut().insert(name.clone());
388        name
389    }
390
391    /// Counts consecutive single-hop expand operations.
392    ///
393    /// Returns the count and the deepest non-expand operator (the base of the chain).
394    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
395        match op {
396            LogicalOperator::Expand(expand) => {
397                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
398
399                if is_single_hop {
400                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
401                    (inner_count + 1, base)
402                } else {
403                    (0, op)
404                }
405            }
406            _ => (0, op),
407        }
408    }
409
410    /// Collects expand operations from the outermost down to the base.
411    ///
412    /// Returns expands in order from innermost (base) to outermost.
413    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
414        let mut chain = Vec::new();
415        let mut current = op;
416
417        while let LogicalOperator::Expand(expand) = current {
418            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
419            if !is_single_hop {
420                break;
421            }
422            chain.push(expand);
423            current = &expand.input;
424        }
425
426        chain.reverse();
427        chain
428    }
429
430    /// Plans a logical plan into a physical operator.
431    ///
432    /// # Errors
433    ///
434    /// Returns an error if the logical plan contains unsupported operators
435    /// or invalid expressions.
436    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
437        let _span = grafeo_debug_span!("grafeo::query::plan");
438        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
439        Ok(PhysicalPlan {
440            operator,
441            columns,
442            adaptive_context: None,
443        })
444    }
445
446    /// Plans a logical plan with profiling: each physical operator is wrapped
447    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
448    /// collect row counts and timing. Returns the physical plan together with
449    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
450    /// items in post-order (children before parents).
451    ///
452    /// # Errors
453    ///
454    /// Returns an error if the logical plan contains unsupported operators
455    /// or invalid expressions.
456    pub fn plan_profiled(
457        &self,
458        logical_plan: &LogicalPlan,
459    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
460        self.profiling.set(true);
461        self.profile_entries.borrow_mut().clear();
462
463        let result = self.plan_operator(&logical_plan.root);
464
465        self.profiling.set(false);
466        let (operator, columns) = result?;
467        let entries = self.profile_entries.borrow_mut().drain(..).collect();
468
469        Ok((
470            PhysicalPlan {
471                operator,
472                columns,
473                adaptive_context: None,
474            },
475            entries,
476        ))
477    }
478
479    /// Plans a logical plan with adaptive execution support.
480    ///
481    /// # Errors
482    ///
483    /// Returns an error if the logical plan contains unsupported operators
484    /// or invalid expressions.
485    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
486        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
487
488        let mut adaptive_context = AdaptiveContext::new();
489        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
490
491        Ok(PhysicalPlan {
492            operator,
493            columns,
494            adaptive_context: Some(adaptive_context),
495        })
496    }
497
498    /// Collects cardinality estimates from the logical plan into an adaptive context.
499    fn collect_cardinality_estimates(
500        &self,
501        op: &LogicalOperator,
502        ctx: &mut AdaptiveContext,
503        depth: usize,
504    ) {
505        match op {
506            LogicalOperator::NodeScan(scan) => {
507                let estimate = if let Some(label) = &scan.label {
508                    self.store.nodes_by_label(label).len() as f64
509                } else {
510                    self.store.node_count() as f64
511                };
512                let id = format!("scan_{}", scan.variable);
513                ctx.set_estimate(&id, estimate);
514
515                if let Some(input) = &scan.input {
516                    self.collect_cardinality_estimates(input, ctx, depth + 1);
517                }
518            }
519            LogicalOperator::Filter(filter) => {
520                let input_estimate = self.estimate_cardinality(&filter.input);
521                let estimate = input_estimate * 0.3;
522                let id = format!("filter_{depth}");
523                ctx.set_estimate(&id, estimate);
524
525                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
526            }
527            LogicalOperator::Expand(expand) => {
528                let input_estimate = self.estimate_cardinality(&expand.input);
529                let stats = self.store.statistics();
530                let avg_degree = self.estimate_expand_degree(&stats, expand);
531                let estimate = input_estimate * avg_degree;
532                let id = format!("expand_{}", expand.to_variable);
533                ctx.set_estimate(&id, estimate);
534
535                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
536            }
537            LogicalOperator::Join(join) => {
538                let left_est = self.estimate_cardinality(&join.left);
539                let right_est = self.estimate_cardinality(&join.right);
540                let estimate = (left_est * right_est).sqrt();
541                let id = format!("join_{depth}");
542                ctx.set_estimate(&id, estimate);
543
544                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
545                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
546            }
547            LogicalOperator::Aggregate(agg) => {
548                let input_estimate = self.estimate_cardinality(&agg.input);
549                let estimate = if agg.group_by.is_empty() {
550                    1.0
551                } else {
552                    (input_estimate * 0.1).max(1.0)
553                };
554                let id = format!("aggregate_{depth}");
555                ctx.set_estimate(&id, estimate);
556
557                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
558            }
559            LogicalOperator::Distinct(distinct) => {
560                let input_estimate = self.estimate_cardinality(&distinct.input);
561                let estimate = (input_estimate * 0.5).max(1.0);
562                let id = format!("distinct_{depth}");
563                ctx.set_estimate(&id, estimate);
564
565                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
566            }
567            LogicalOperator::Return(ret) => {
568                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
569            }
570            LogicalOperator::Limit(limit) => {
571                let input_estimate = self.estimate_cardinality(&limit.input);
572                let estimate = (input_estimate).min(limit.count.estimate());
573                let id = format!("limit_{depth}");
574                ctx.set_estimate(&id, estimate);
575
576                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
577            }
578            LogicalOperator::Skip(skip) => {
579                let input_estimate = self.estimate_cardinality(&skip.input);
580                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
581                let id = format!("skip_{depth}");
582                ctx.set_estimate(&id, estimate);
583
584                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
585            }
586            LogicalOperator::Sort(sort) => {
587                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
588            }
589            LogicalOperator::Union(union) => {
590                let estimate: f64 = union
591                    .inputs
592                    .iter()
593                    .map(|input| self.estimate_cardinality(input))
594                    .sum();
595                let id = format!("union_{depth}");
596                ctx.set_estimate(&id, estimate);
597
598                for input in &union.inputs {
599                    self.collect_cardinality_estimates(input, ctx, depth + 1);
600                }
601            }
602            _ => {
603                // For other operators, try to recurse into known input patterns
604            }
605        }
606    }
607
608    /// Estimates cardinality for a logical operator subtree.
609    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
610        match op {
611            LogicalOperator::NodeScan(scan) => {
612                if let Some(label) = &scan.label {
613                    self.store.nodes_by_label(label).len() as f64
614                } else {
615                    self.store.node_count() as f64
616                }
617            }
618            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
619            LogicalOperator::Expand(expand) => {
620                let stats = self.store.statistics();
621                let avg_degree = self.estimate_expand_degree(&stats, expand);
622                self.estimate_cardinality(&expand.input) * avg_degree
623            }
624            LogicalOperator::Join(join) => {
625                let left = self.estimate_cardinality(&join.left);
626                let right = self.estimate_cardinality(&join.right);
627                (left * right).sqrt()
628            }
629            LogicalOperator::Aggregate(agg) => {
630                if agg.group_by.is_empty() {
631                    1.0
632                } else {
633                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
634                }
635            }
636            LogicalOperator::Distinct(distinct) => {
637                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
638            }
639            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
640            LogicalOperator::Limit(limit) => self
641                .estimate_cardinality(&limit.input)
642                .min(limit.count.estimate()),
643            LogicalOperator::Skip(skip) => {
644                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
645            }
646            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
647            LogicalOperator::Union(union) => union
648                .inputs
649                .iter()
650                .map(|input| self.estimate_cardinality(input))
651                .sum(),
652            LogicalOperator::Except(except) => {
653                let left = self.estimate_cardinality(&except.left);
654                let right = self.estimate_cardinality(&except.right);
655                (left - right).max(0.0)
656            }
657            LogicalOperator::Intersect(intersect) => {
658                let left = self.estimate_cardinality(&intersect.left);
659                let right = self.estimate_cardinality(&intersect.right);
660                left.min(right)
661            }
662            LogicalOperator::Otherwise(otherwise) => self
663                .estimate_cardinality(&otherwise.left)
664                .max(self.estimate_cardinality(&otherwise.right)),
665            _ => 1000.0,
666        }
667    }
668
669    /// Estimates the average edge degree for an expand operation using store statistics.
670    fn estimate_expand_degree(
671        &self,
672        stats: &grafeo_core::statistics::Statistics,
673        expand: &ExpandOp,
674    ) -> f64 {
675        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
676        if expand.edge_types.len() == 1 {
677            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
678        } else if stats.total_nodes > 0 {
679            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
680        } else {
681            10.0
682        }
683    }
684
685    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
686    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
687    fn maybe_profile(
688        &self,
689        result: Result<(Box<dyn Operator>, Vec<String>)>,
690        op: &LogicalOperator,
691    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
692        if self.profiling.get() {
693            let (physical, columns) = result?;
694            let (entry, stats) =
695                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
696            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
697            self.profile_entries.borrow_mut().push(entry);
698            Ok((Box::new(profiled), columns))
699        } else {
700            result
701        }
702    }
703
704    /// Plans a single logical operator.
705    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
706        let result = match op {
707            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
708            LogicalOperator::Expand(expand) => {
709                // Factorized chain fuses N Expand logical operators into a
710                // single LazyFactorizedChainOperator. It kicks in only when
711                // it actually helps: chain_len >= 2 means at least two
712                // consecutive Expands, where separate plans would
713                // Cartesian-product per hop. For a single hop, the plain
714                // `plan_expand` path is cheaper and integrates more naturally
715                // with adjacent operators.
716                //
717                // Disable under PROFILE for the same reason as
718                // `try_topk_rewrite` in plan_limit: PROFILE's
719                // `build_profile_tree` walks the logical plan expecting one
720                // ProfileEntry per logical operator, and a fused chain only
721                // emits one entry for the whole chain. The result is a count
722                // mismatch and a panic at
723                // `crates/grafeo-engine/src/query/profile.rs:75`. Run the
724                // unfused per-Expand path under PROFILE so each Expand gets
725                // its own entry.
726                if self.factorized_execution && !self.profiling.get() {
727                    let (chain_len, _base) = Self::count_expand_chain(op);
728                    if chain_len >= 2 {
729                        return self.maybe_profile(self.plan_expand_chain(op), op);
730                    }
731                }
732                self.plan_expand(expand)
733            }
734            LogicalOperator::Return(ret) => self.plan_return(ret),
735            LogicalOperator::Filter(filter) => self.plan_filter(filter),
736            LogicalOperator::Project(project) => self.plan_project(project),
737            LogicalOperator::Limit(limit) => self.plan_limit(limit),
738            LogicalOperator::Skip(skip) => self.plan_skip(skip),
739            LogicalOperator::Sort(sort) => self.plan_sort(sort),
740            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
741            LogicalOperator::Join(join) => self.plan_join(join),
742            LogicalOperator::Union(union) => self.plan_union(union),
743            LogicalOperator::Except(except) => self.plan_except(except),
744            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
745            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
746            LogicalOperator::Apply(apply) => self.plan_apply(apply),
747            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
748            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
749            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
750            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
751            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
752            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
753            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
754            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
755            LogicalOperator::Merge(merge) => self.plan_merge(merge),
756            LogicalOperator::MergeRelationship(merge_rel) => {
757                self.plan_merge_relationship(merge_rel)
758            }
759            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
760            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
761            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
762            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
763            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
764            #[cfg(feature = "algos")]
765            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
766            #[cfg(not(feature = "algos"))]
767            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
768                "CALL procedures require the 'algos' feature".to_string(),
769            )),
770            LogicalOperator::ParameterScan(_param_scan) => {
771                let state = self
772                    .correlated_param_state
773                    .borrow()
774                    .clone()
775                    .ok_or_else(|| {
776                        Error::Internal(
777                            "ParameterScan without correlated Apply context".to_string(),
778                        )
779                    })?;
780                // Use the actual column names from the ParameterState (which may
781                // have been expanded from "*" to real variable names in plan_apply)
782                let columns = state.columns.clone();
783                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
784                Ok((operator, columns))
785            }
786            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
787            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
788            LogicalOperator::LoadData(load) => {
789                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
790                    load.path.clone(),
791                    load.format,
792                    load.with_headers,
793                    load.field_terminator,
794                    load.variable.clone(),
795                ));
796                Ok((operator, vec![load.variable.clone()]))
797            }
798            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
799            #[cfg(feature = "vector-index")]
800            LogicalOperator::VectorScan(scan) => self.plan_vector_scan(scan),
801            #[cfg(not(feature = "vector-index"))]
802            LogicalOperator::VectorScan(_) => Err(Error::Internal(
803                "VectorScan requires vector-index feature".to_string(),
804            )),
805            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
806                "VectorJoin requires vector-index feature".to_string(),
807            )),
808            #[cfg(feature = "text-index")]
809            LogicalOperator::TextScan(scan) => self.plan_text_scan(scan),
810            #[cfg(not(feature = "text-index"))]
811            LogicalOperator::TextScan(_) => Err(Error::Internal(
812                "TextScan requires text-index feature".to_string(),
813            )),
814            _ => Err(Error::Internal(format!(
815                "Unsupported operator: {:?}",
816                std::mem::discriminant(op)
817            ))),
818        };
819        self.maybe_profile(result, op)
820    }
821
822    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
823    fn plan_horizontal_aggregate(
824        &self,
825        ha: &HorizontalAggregateOp,
826    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
827        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
828
829        let list_col_idx = child_columns
830            .iter()
831            .position(|c| c == &ha.list_column)
832            .ok_or_else(|| {
833                Error::Internal(format!(
834                    "HorizontalAggregate list column '{}' not found in {:?}",
835                    ha.list_column, child_columns
836                ))
837            })?;
838
839        let entity_kind = match ha.entity_kind {
840            LogicalEntityKind::Edge => EntityKind::Edge,
841            LogicalEntityKind::Node => EntityKind::Node,
842        };
843
844        let function = convert_aggregate_function(ha.function);
845        let input_column_count = child_columns.len();
846
847        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
848            child_op,
849            list_col_idx,
850            entity_kind,
851            function,
852            ha.property.clone(),
853            Arc::clone(&self.store) as Arc<dyn GraphStoreSearch>,
854            input_column_count,
855        ));
856
857        let mut columns = child_columns;
858        columns.push(ha.alias.clone());
859        // Mark the result as a scalar column
860        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
861
862        Ok((operator, columns))
863    }
864
865    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
866    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
867        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
868        let key_idx = child_columns
869            .iter()
870            .position(|c| c == &mc.key_var)
871            .ok_or_else(|| {
872                Error::Internal(format!(
873                    "MapCollect key '{}' not in columns {:?}",
874                    mc.key_var, child_columns
875                ))
876            })?;
877        let value_idx = child_columns
878            .iter()
879            .position(|c| c == &mc.value_var)
880            .ok_or_else(|| {
881                Error::Internal(format!(
882                    "MapCollect value '{}' not in columns {:?}",
883                    mc.value_var, child_columns
884                ))
885            })?;
886        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
887        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
888        Ok((operator, vec![mc.alias.clone()]))
889    }
890
891    /// Plans a text search scan operator using BM25 inverted index.
892    #[cfg(feature = "text-index")]
893    fn plan_text_scan(&self, scan: &TextScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
894        use grafeo_core::execution::operators::TextScanOperator;
895
896        let query_string = match &scan.query {
897            LogicalExpression::Literal(Value::String(s)) => s.to_string(),
898            LogicalExpression::Parameter(name) => {
899                return Err(Error::Internal(format!(
900                    "TextScan query parameter ${} not resolved",
901                    name
902                )));
903            }
904            _ => {
905                return Err(Error::Internal(
906                    "TextScan query must be a string literal or parameter".to_string(),
907                ));
908            }
909        };
910
911        let operator: Box<dyn Operator> = if let Some(k) = scan.k {
912            Box::new(TextScanOperator::top_k(
913                Arc::clone(&self.store),
914                &scan.label,
915                &scan.property,
916                &query_string,
917                k,
918            ))
919        } else if let Some(threshold) = scan.threshold {
920            Box::new(TextScanOperator::with_threshold(
921                Arc::clone(&self.store),
922                &scan.label,
923                &scan.property,
924                &query_string,
925                threshold,
926            ))
927        } else {
928            Box::new(TextScanOperator::top_k(
929                Arc::clone(&self.store),
930                &scan.label,
931                &scan.property,
932                &query_string,
933                100,
934            ))
935        };
936
937        let mut columns = vec![scan.variable.clone()];
938        if let Some(ref score_col) = scan.score_column {
939            columns.push(score_col.clone());
940        }
941
942        Ok((operator, columns))
943    }
944
945    /// Plans a VectorScan logical operator into a physical VectorScanOperator.
946    #[cfg(feature = "vector-index")]
947    pub(super) fn plan_vector_scan(
948        &self,
949        scan: &VectorScanOp,
950    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
951        use grafeo_core::execution::operators::VectorScanOperator;
952        use grafeo_core::index::vector::DistanceMetric;
953
954        // Hybrid shape `VectorScan(input=graph_pattern)` is not supported by
955        // the physical VectorScanOperator: it has no input slot and would
956        // silently drop upstream bindings. Reject rather than plan it
957        // incorrectly; callers should build a VectorJoin for this case.
958        if scan.input.is_some() {
959            return Err(Error::Internal(
960                "VectorScan with an input subtree is not supported, use VectorJoin for hybrid graph+vector queries".to_string(),
961            ));
962        }
963
964        let query_vec = self.resolve_vector_literal(&scan.query_vector)?;
965
966        let requested_metric = scan.metric.map(|m| match m {
967            VectorMetric::Cosine => DistanceMetric::Cosine,
968            VectorMetric::Euclidean => DistanceMetric::Euclidean,
969            VectorMetric::DotProduct => DistanceMetric::DotProduct,
970            VectorMetric::Manhattan => DistanceMetric::Manhattan,
971        });
972
973        // Top-k mode uses HNSW when available; threshold/unbounded mode
974        // bounds k to the label's node count so we don't feed usize::MAX
975        // into HNSW (which degrades to full traversal and risks overflow
976        // in quantized rescore paths even with saturating_mul).
977        let k = scan.k.unwrap_or_else(|| {
978            scan.label.as_ref().map_or_else(
979                || self.store.node_count(),
980                |l| self.store.nodes_by_label_count(l),
981            )
982        });
983
984        // Pick the metric we'll execute under. When the user asked for a
985        // specific one, honor it. Otherwise inherit the index's metric (so a
986        // cosine-built index drives cosine scoring) or default to Cosine for
987        // the unindexed brute-force path.
988        let index_metric = scan
989            .label
990            .as_ref()
991            .and_then(|label| self.store.vector_index_metric(label, &scan.property));
992        let metric = requested_metric
993            .or(index_metric)
994            .unwrap_or(DistanceMetric::Cosine);
995
996        // The store's vector_search routes HNSW when an index exists whose
997        // metric matches `metric`, and brute-force scan otherwise. No handle
998        // downcast; no Arc<dyn Any>.
999        let mut operator = VectorScanOperator::new(
1000            Arc::clone(&self.store),
1001            scan.label.clone(),
1002            scan.property.clone(),
1003            query_vec,
1004            k,
1005            metric,
1006        );
1007
1008        if let Some(sim) = scan.min_similarity {
1009            operator = operator.with_min_similarity(sim);
1010        }
1011        if let Some(dist) = scan.max_distance {
1012            operator = operator.with_max_distance(dist);
1013        }
1014
1015        let mut columns = vec![scan.variable.clone()];
1016        // VectorScan always projects a score column keyed by the resolved
1017        // metric (after index-driven fallback) so downstream score reuse
1018        // matches the actual distance values written out.
1019        let metric_tag = match metric {
1020            DistanceMetric::Cosine => "cos",
1021            DistanceMetric::Euclidean => "euc",
1022            DistanceMetric::DotProduct => "dot",
1023            DistanceMetric::Manhattan => "man",
1024            // Future metrics (DistanceMetric is #[non_exhaustive]) fall back
1025            // to a generic tag so they still produce a stable column name.
1026            _ => "other",
1027        };
1028        columns.push(project::vector_score_column_name(
1029            metric_tag,
1030            &scan.property,
1031            &scan.variable,
1032            &scan.query_vector,
1033        ));
1034
1035        Ok((Box::new(operator), columns))
1036    }
1037
1038    /// Resolves a LogicalExpression to a Vec<f32> for vector operations.
1039    #[cfg(feature = "vector-index")]
1040    pub(super) fn resolve_vector_literal(&self, expr: &LogicalExpression) -> Result<Vec<f32>> {
1041        // f64→f32 precision loss throughout is intentional: vectors are stored and searched as f32.
1042        #[allow(clippy::cast_possible_truncation)]
1043        match expr {
1044            LogicalExpression::Literal(Value::Vector(v)) => Ok(v.to_vec()),
1045            LogicalExpression::Literal(Value::List(list)) => {
1046                let mut vec = Vec::with_capacity(list.len());
1047                for item in list.iter() {
1048                    match item {
1049                        Value::Float64(f) => vec.push(*f as f32),
1050                        Value::Int64(i) => vec.push(*i as f32),
1051                        _ => {
1052                            return Err(Error::Internal(
1053                                "Vector elements must be numeric".to_string(),
1054                            ));
1055                        }
1056                    }
1057                }
1058                Ok(vec)
1059            }
1060            // GQL/Cypher parser produces List([Literal(Float64), ...]) for inline vectors like
1061            // [0.9, 0.1, 0.0] — handle this form by recursively resolving each element.
1062            LogicalExpression::List(items) => {
1063                let mut vec = Vec::with_capacity(items.len());
1064                for item in items {
1065                    match item {
1066                        LogicalExpression::Literal(Value::Float64(f)) => vec.push(*f as f32),
1067                        LogicalExpression::Literal(Value::Int64(i)) => vec.push(*i as f32),
1068                        _ => {
1069                            return Err(Error::Internal(
1070                                "Vector elements must be numeric literals".to_string(),
1071                            ));
1072                        }
1073                    }
1074                }
1075                Ok(vec)
1076            }
1077            _ => Err(Error::Internal("Expected vector literal".to_string())),
1078        }
1079    }
1080}
1081
1082/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
1083#[cfg(feature = "algos")]
1084struct StaticResultOperator {
1085    rows: Vec<Vec<Value>>,
1086    column_indices: Vec<usize>,
1087    row_index: usize,
1088}
1089
1090#[cfg(feature = "algos")]
1091impl Operator for StaticResultOperator {
1092    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
1093        use grafeo_core::execution::DataChunk;
1094
1095        if self.row_index >= self.rows.len() {
1096            return Ok(None);
1097        }
1098
1099        let remaining = self.rows.len() - self.row_index;
1100        let chunk_rows = remaining.min(1024);
1101        let col_count = self.column_indices.len();
1102
1103        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
1104        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
1105
1106        for row_offset in 0..chunk_rows {
1107            let row = &self.rows[self.row_index + row_offset];
1108            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
1109                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
1110                if let Some(col) = chunk.column_mut(col_idx) {
1111                    col.push_value(value);
1112                }
1113            }
1114        }
1115        chunk.set_count(chunk_rows);
1116
1117        self.row_index += chunk_rows;
1118        Ok(Some(chunk))
1119    }
1120
1121    fn reset(&mut self) {
1122        self.row_index = 0;
1123    }
1124
1125    fn name(&self) -> &'static str {
1126        "StaticResult"
1127    }
1128
1129    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1130        self
1131    }
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136    use super::*;
1137    use crate::query::plan::{
1138        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
1139        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
1140        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
1141        SkipOp as LogicalSkipOp, SortKey, SortOp,
1142    };
1143    use grafeo_common::types::Value;
1144    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
1145    use grafeo_core::graph::GraphStoreMut;
1146    use grafeo_core::graph::lpg::LpgStore;
1147
1148    fn create_test_store() -> Arc<LpgStore> {
1149        let store = Arc::new(LpgStore::new().unwrap());
1150        store.create_node(&["Person"]);
1151        store.create_node(&["Person"]);
1152        store.create_node(&["Company"]);
1153        store
1154    }
1155
1156    // ==================== Simple Scan Tests ====================
1157
1158    #[test]
1159    fn test_plan_simple_scan() {
1160        let store = create_test_store();
1161        let planner = Planner::new(store);
1162
1163        // MATCH (n:Person) RETURN n
1164        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1165            items: vec![ReturnItem {
1166                expression: LogicalExpression::Variable("n".to_string()),
1167                alias: None,
1168            }],
1169            distinct: false,
1170            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1171                variable: "n".to_string(),
1172                label: Some("Person".to_string()),
1173                input: None,
1174            })),
1175        }));
1176
1177        let physical = planner.plan(&logical).unwrap();
1178        assert_eq!(physical.columns(), &["n"]);
1179    }
1180
1181    #[test]
1182    fn test_plan_scan_without_label() {
1183        let store = create_test_store();
1184        let planner = Planner::new(store);
1185
1186        // MATCH (n) RETURN n
1187        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1188            items: vec![ReturnItem {
1189                expression: LogicalExpression::Variable("n".to_string()),
1190                alias: None,
1191            }],
1192            distinct: false,
1193            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1194                variable: "n".to_string(),
1195                label: None,
1196                input: None,
1197            })),
1198        }));
1199
1200        let physical = planner.plan(&logical).unwrap();
1201        assert_eq!(physical.columns(), &["n"]);
1202    }
1203
1204    #[test]
1205    fn test_plan_return_with_alias() {
1206        let store = create_test_store();
1207        let planner = Planner::new(store);
1208
1209        // MATCH (n:Person) RETURN n AS person
1210        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1211            items: vec![ReturnItem {
1212                expression: LogicalExpression::Variable("n".to_string()),
1213                alias: Some("person".to_string()),
1214            }],
1215            distinct: false,
1216            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1217                variable: "n".to_string(),
1218                label: Some("Person".to_string()),
1219                input: None,
1220            })),
1221        }));
1222
1223        let physical = planner.plan(&logical).unwrap();
1224        assert_eq!(physical.columns(), &["person"]);
1225    }
1226
1227    #[test]
1228    fn test_plan_return_property() {
1229        let store = create_test_store();
1230        let planner = Planner::new(store);
1231
1232        // MATCH (n:Person) RETURN n.name
1233        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1234            items: vec![ReturnItem {
1235                expression: LogicalExpression::Property {
1236                    variable: "n".to_string(),
1237                    property: "name".to_string(),
1238                },
1239                alias: None,
1240            }],
1241            distinct: false,
1242            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1243                variable: "n".to_string(),
1244                label: Some("Person".to_string()),
1245                input: None,
1246            })),
1247        }));
1248
1249        let physical = planner.plan(&logical).unwrap();
1250        assert_eq!(physical.columns(), &["n.name"]);
1251    }
1252
1253    #[test]
1254    fn test_plan_return_literal() {
1255        let store = create_test_store();
1256        let planner = Planner::new(store);
1257
1258        // MATCH (n) RETURN 42 AS answer
1259        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1260            items: vec![ReturnItem {
1261                expression: LogicalExpression::Literal(Value::Int64(42)),
1262                alias: Some("answer".to_string()),
1263            }],
1264            distinct: false,
1265            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1266                variable: "n".to_string(),
1267                label: None,
1268                input: None,
1269            })),
1270        }));
1271
1272        let physical = planner.plan(&logical).unwrap();
1273        assert_eq!(physical.columns(), &["answer"]);
1274    }
1275
1276    // ==================== Filter Tests ====================
1277
1278    #[test]
1279    fn test_plan_filter_equality() {
1280        let store = create_test_store();
1281        let planner = Planner::new(store);
1282
1283        // MATCH (n:Person) WHERE n.age = 30 RETURN n
1284        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1285            items: vec![ReturnItem {
1286                expression: LogicalExpression::Variable("n".to_string()),
1287                alias: None,
1288            }],
1289            distinct: false,
1290            input: Box::new(LogicalOperator::Filter(FilterOp {
1291                predicate: LogicalExpression::Binary {
1292                    left: Box::new(LogicalExpression::Property {
1293                        variable: "n".to_string(),
1294                        property: "age".to_string(),
1295                    }),
1296                    op: BinaryOp::Eq,
1297                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1298                },
1299                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1300                    variable: "n".to_string(),
1301                    label: Some("Person".to_string()),
1302                    input: None,
1303                })),
1304                pushdown_hint: None,
1305            })),
1306        }));
1307
1308        let physical = planner.plan(&logical).unwrap();
1309        assert_eq!(physical.columns(), &["n"]);
1310    }
1311
1312    #[test]
1313    fn test_plan_filter_compound_and() {
1314        let store = create_test_store();
1315        let planner = Planner::new(store);
1316
1317        // WHERE n.age > 20 AND n.age < 40
1318        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1319            items: vec![ReturnItem {
1320                expression: LogicalExpression::Variable("n".to_string()),
1321                alias: None,
1322            }],
1323            distinct: false,
1324            input: Box::new(LogicalOperator::Filter(FilterOp {
1325                predicate: LogicalExpression::Binary {
1326                    left: Box::new(LogicalExpression::Binary {
1327                        left: Box::new(LogicalExpression::Property {
1328                            variable: "n".to_string(),
1329                            property: "age".to_string(),
1330                        }),
1331                        op: BinaryOp::Gt,
1332                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1333                    }),
1334                    op: BinaryOp::And,
1335                    right: Box::new(LogicalExpression::Binary {
1336                        left: Box::new(LogicalExpression::Property {
1337                            variable: "n".to_string(),
1338                            property: "age".to_string(),
1339                        }),
1340                        op: BinaryOp::Lt,
1341                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1342                    }),
1343                },
1344                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1345                    variable: "n".to_string(),
1346                    label: None,
1347                    input: None,
1348                })),
1349                pushdown_hint: None,
1350            })),
1351        }));
1352
1353        let physical = planner.plan(&logical).unwrap();
1354        assert_eq!(physical.columns(), &["n"]);
1355    }
1356
1357    #[test]
1358    fn test_plan_filter_unary_not() {
1359        let store = create_test_store();
1360        let planner = Planner::new(store);
1361
1362        // WHERE NOT n.active
1363        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1364            items: vec![ReturnItem {
1365                expression: LogicalExpression::Variable("n".to_string()),
1366                alias: None,
1367            }],
1368            distinct: false,
1369            input: Box::new(LogicalOperator::Filter(FilterOp {
1370                predicate: LogicalExpression::Unary {
1371                    op: UnaryOp::Not,
1372                    operand: Box::new(LogicalExpression::Property {
1373                        variable: "n".to_string(),
1374                        property: "active".to_string(),
1375                    }),
1376                },
1377                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1378                    variable: "n".to_string(),
1379                    label: None,
1380                    input: None,
1381                })),
1382                pushdown_hint: None,
1383            })),
1384        }));
1385
1386        let physical = planner.plan(&logical).unwrap();
1387        assert_eq!(physical.columns(), &["n"]);
1388    }
1389
1390    #[test]
1391    fn test_plan_filter_is_null() {
1392        let store = create_test_store();
1393        let planner = Planner::new(store);
1394
1395        // WHERE n.email IS NULL
1396        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1397            items: vec![ReturnItem {
1398                expression: LogicalExpression::Variable("n".to_string()),
1399                alias: None,
1400            }],
1401            distinct: false,
1402            input: Box::new(LogicalOperator::Filter(FilterOp {
1403                predicate: LogicalExpression::Unary {
1404                    op: UnaryOp::IsNull,
1405                    operand: Box::new(LogicalExpression::Property {
1406                        variable: "n".to_string(),
1407                        property: "email".to_string(),
1408                    }),
1409                },
1410                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1411                    variable: "n".to_string(),
1412                    label: None,
1413                    input: None,
1414                })),
1415                pushdown_hint: None,
1416            })),
1417        }));
1418
1419        let physical = planner.plan(&logical).unwrap();
1420        assert_eq!(physical.columns(), &["n"]);
1421    }
1422
1423    #[test]
1424    fn test_plan_filter_function_call() {
1425        let store = create_test_store();
1426        let planner = Planner::new(store);
1427
1428        // WHERE size(n.friends) > 0
1429        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1430            items: vec![ReturnItem {
1431                expression: LogicalExpression::Variable("n".to_string()),
1432                alias: None,
1433            }],
1434            distinct: false,
1435            input: Box::new(LogicalOperator::Filter(FilterOp {
1436                predicate: LogicalExpression::Binary {
1437                    left: Box::new(LogicalExpression::FunctionCall {
1438                        name: "size".to_string(),
1439                        args: vec![LogicalExpression::Property {
1440                            variable: "n".to_string(),
1441                            property: "friends".to_string(),
1442                        }],
1443                        distinct: false,
1444                    }),
1445                    op: BinaryOp::Gt,
1446                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1447                },
1448                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1449                    variable: "n".to_string(),
1450                    label: None,
1451                    input: None,
1452                })),
1453                pushdown_hint: None,
1454            })),
1455        }));
1456
1457        let physical = planner.plan(&logical).unwrap();
1458        assert_eq!(physical.columns(), &["n"]);
1459    }
1460
1461    // ==================== Expand Tests ====================
1462
1463    #[test]
1464    fn test_plan_expand_outgoing() {
1465        let store = create_test_store();
1466        let planner = Planner::new(store);
1467
1468        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1469        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1470            items: vec![
1471                ReturnItem {
1472                    expression: LogicalExpression::Variable("a".to_string()),
1473                    alias: None,
1474                },
1475                ReturnItem {
1476                    expression: LogicalExpression::Variable("b".to_string()),
1477                    alias: None,
1478                },
1479            ],
1480            distinct: false,
1481            input: Box::new(LogicalOperator::Expand(ExpandOp {
1482                from_variable: "a".to_string(),
1483                to_variable: "b".to_string(),
1484                edge_variable: None,
1485                direction: ExpandDirection::Outgoing,
1486                edge_types: vec!["KNOWS".to_string()],
1487                min_hops: 1,
1488                max_hops: Some(1),
1489                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1490                    variable: "a".to_string(),
1491                    label: Some("Person".to_string()),
1492                    input: None,
1493                })),
1494                path_alias: None,
1495                path_mode: PathMode::Walk,
1496            })),
1497        }));
1498
1499        let physical = planner.plan(&logical).unwrap();
1500        // The return should have columns [a, b]
1501        assert!(physical.columns().contains(&"a".to_string()));
1502        assert!(physical.columns().contains(&"b".to_string()));
1503    }
1504
1505    #[test]
1506    fn test_plan_expand_with_edge_variable() {
1507        let store = create_test_store();
1508        let planner = Planner::new(store);
1509
1510        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1511        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1512            items: vec![
1513                ReturnItem {
1514                    expression: LogicalExpression::Variable("a".to_string()),
1515                    alias: None,
1516                },
1517                ReturnItem {
1518                    expression: LogicalExpression::Variable("r".to_string()),
1519                    alias: None,
1520                },
1521                ReturnItem {
1522                    expression: LogicalExpression::Variable("b".to_string()),
1523                    alias: None,
1524                },
1525            ],
1526            distinct: false,
1527            input: Box::new(LogicalOperator::Expand(ExpandOp {
1528                from_variable: "a".to_string(),
1529                to_variable: "b".to_string(),
1530                edge_variable: Some("r".to_string()),
1531                direction: ExpandDirection::Outgoing,
1532                edge_types: vec!["KNOWS".to_string()],
1533                min_hops: 1,
1534                max_hops: Some(1),
1535                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1536                    variable: "a".to_string(),
1537                    label: None,
1538                    input: None,
1539                })),
1540                path_alias: None,
1541                path_mode: PathMode::Walk,
1542            })),
1543        }));
1544
1545        let physical = planner.plan(&logical).unwrap();
1546        assert!(physical.columns().contains(&"a".to_string()));
1547        assert!(physical.columns().contains(&"r".to_string()));
1548        assert!(physical.columns().contains(&"b".to_string()));
1549    }
1550
1551    // ==================== Limit/Skip/Sort Tests ====================
1552
1553    #[test]
1554    fn test_plan_limit() {
1555        let store = create_test_store();
1556        let planner = Planner::new(store);
1557
1558        // MATCH (n) RETURN n LIMIT 10
1559        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1560            items: vec![ReturnItem {
1561                expression: LogicalExpression::Variable("n".to_string()),
1562                alias: None,
1563            }],
1564            distinct: false,
1565            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1566                count: 10.into(),
1567                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1568                    variable: "n".to_string(),
1569                    label: None,
1570                    input: None,
1571                })),
1572            })),
1573        }));
1574
1575        let physical = planner.plan(&logical).unwrap();
1576        assert_eq!(physical.columns(), &["n"]);
1577    }
1578
1579    #[test]
1580    fn test_plan_skip() {
1581        let store = create_test_store();
1582        let planner = Planner::new(store);
1583
1584        // MATCH (n) RETURN n SKIP 5
1585        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1586            items: vec![ReturnItem {
1587                expression: LogicalExpression::Variable("n".to_string()),
1588                alias: None,
1589            }],
1590            distinct: false,
1591            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1592                count: 5.into(),
1593                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1594                    variable: "n".to_string(),
1595                    label: None,
1596                    input: None,
1597                })),
1598            })),
1599        }));
1600
1601        let physical = planner.plan(&logical).unwrap();
1602        assert_eq!(physical.columns(), &["n"]);
1603    }
1604
1605    // ==================== Phase 4e: LIMIT pushdown ====================
1606
1607    #[test]
1608    fn test_limit_pushdown_into_range_scan_sets_inner_limit() {
1609        use grafeo_core::execution::operators::{LimitOperator, RangeScanOperator};
1610        let store = create_test_store();
1611        let planner = Planner::new(store);
1612
1613        // LIMIT 5 over WHERE n.age > 30 over MATCH (n:Person)
1614        let logical = LogicalPlan::new(LogicalOperator::Limit(LogicalLimitOp {
1615            count: 5.into(),
1616            input: Box::new(LogicalOperator::Filter(FilterOp {
1617                predicate: LogicalExpression::Binary {
1618                    left: Box::new(LogicalExpression::Property {
1619                        variable: "n".to_string(),
1620                        property: "age".to_string(),
1621                    }),
1622                    op: BinaryOp::Gt,
1623                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1624                },
1625                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1626                    variable: "n".to_string(),
1627                    label: Some("Person".to_string()),
1628                    input: None,
1629                })),
1630                pushdown_hint: None,
1631            })),
1632        }));
1633
1634        let physical = planner.plan(&logical).unwrap();
1635        let root = physical.into_operator();
1636
1637        // Top: LimitOperator
1638        let limit_op = root
1639            .into_any()
1640            .downcast::<LimitOperator>()
1641            .expect("top operator is LimitOperator");
1642        let (inner, limit_count) = limit_op.into_parts();
1643        assert_eq!(limit_count, 5, "outer LimitOperator preserves the cap");
1644
1645        // Inner: RangeScanOperator with limit pushed down
1646        let range_op = inner
1647            .into_any()
1648            .downcast::<RangeScanOperator>()
1649            .expect("inner operator is RangeScanOperator");
1650        assert_eq!(
1651            range_op.limit(),
1652            Some(5),
1653            "LIMIT 5 must be pushed into the inner range scan"
1654        );
1655    }
1656
1657    #[test]
1658    fn test_limit_pushdown_blocked_by_sort_in_between() {
1659        use grafeo_core::execution::operators::{
1660            LimitOperator, ProjectOperator, RangeScanOperator, SortOperator,
1661        };
1662        let store = create_test_store();
1663        let planner = Planner::new(store);
1664
1665        // LIMIT 5 over SORT BY n.name over WHERE n.age > 30 over Scan(n:Person)
1666        // Sort needs full materialization; the LIMIT must NOT be pushed
1667        // through it into the range scan.
1668        let logical = LogicalPlan::new(LogicalOperator::Limit(LogicalLimitOp {
1669            count: 5.into(),
1670            input: Box::new(LogicalOperator::Sort(SortOp {
1671                keys: vec![SortKey {
1672                    expression: LogicalExpression::Property {
1673                        variable: "n".to_string(),
1674                        property: "name".to_string(),
1675                    },
1676                    order: SortOrder::Ascending,
1677                    nulls: None,
1678                }],
1679                input: Box::new(LogicalOperator::Filter(FilterOp {
1680                    predicate: LogicalExpression::Binary {
1681                        left: Box::new(LogicalExpression::Property {
1682                            variable: "n".to_string(),
1683                            property: "age".to_string(),
1684                        }),
1685                        op: BinaryOp::Gt,
1686                        right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1687                    },
1688                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1689                        variable: "n".to_string(),
1690                        label: Some("Person".to_string()),
1691                        input: None,
1692                    })),
1693                    pushdown_hint: None,
1694                })),
1695            })),
1696        }));
1697
1698        let physical = planner.plan(&logical).unwrap();
1699        let root = physical.into_operator();
1700
1701        // Walk down: Limit → Sort → RangeScan, asserting at each step.
1702        let limit_op = root
1703            .into_any()
1704            .downcast::<LimitOperator>()
1705            .expect("top operator is LimitOperator");
1706        let (after_limit, _cap) = limit_op.into_parts();
1707
1708        let sort_op = after_limit
1709            .into_any()
1710            .downcast::<SortOperator>()
1711            .expect("operator under Limit must be Sort (Sort blocks pushdown)");
1712        let (after_sort, _keys) = sort_op.into_parts();
1713
1714        // Sort wraps its input in a Project that materializes the sort
1715        // keys. The fold from Filter+NodeScan to RangeScan happens
1716        // inside that Project.
1717        let project_op = after_sort
1718            .into_any()
1719            .downcast::<ProjectOperator>()
1720            .expect("operator under Sort must be Project (Sort materializes sort keys)");
1721        let (after_project, _projs, _types) = project_op.into_parts();
1722
1723        let range_op = after_project
1724            .into_any()
1725            .downcast::<RangeScanOperator>()
1726            .expect("operator under Project must be RangeScan (Filter folded into scan)");
1727
1728        // Core assertion of the test: with a Sort between the Limit and
1729        // the range scan, LIMIT must NOT be pushed into the scan —
1730        // doing so would let Sort observe a truncated input and lose
1731        // the globally-smallest rows.
1732        assert_eq!(
1733            range_op.limit(),
1734            None,
1735            "Sort between Limit and RangeScan must block LIMIT pushdown"
1736        );
1737
1738        // Sanity: the planner cleared its scratch limit-hint after
1739        // planning, so subsequent plans aren't contaminated.
1740        assert_eq!(
1741            planner.limit_hint.get(),
1742            None,
1743            "limit_hint must be cleared after plan_limit returns"
1744        );
1745
1746        // Fixture sanity: a Limit-over-Filter (no Sort) DOES push down,
1747        // proving the negative result above isn't a misconfigured fixture.
1748        let direct = LogicalPlan::new(LogicalOperator::Limit(LogicalLimitOp {
1749            count: 5.into(),
1750            input: Box::new(LogicalOperator::Filter(FilterOp {
1751                predicate: LogicalExpression::Binary {
1752                    left: Box::new(LogicalExpression::Property {
1753                        variable: "n".to_string(),
1754                        property: "age".to_string(),
1755                    }),
1756                    op: BinaryOp::Gt,
1757                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1758                },
1759                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1760                    variable: "n".to_string(),
1761                    label: Some("Person".to_string()),
1762                    input: None,
1763                })),
1764                pushdown_hint: None,
1765            })),
1766        }));
1767        let direct_phys = planner.plan(&direct).unwrap();
1768        let direct_root = direct_phys.into_operator();
1769        let direct_limit = direct_root
1770            .into_any()
1771            .downcast::<LimitOperator>()
1772            .expect("direct top is LimitOperator");
1773        let (direct_inner, _) = direct_limit.into_parts();
1774        let direct_range = direct_inner
1775            .into_any()
1776            .downcast::<RangeScanOperator>()
1777            .expect("direct inner is RangeScanOperator");
1778        assert_eq!(
1779            direct_range.limit(),
1780            Some(5),
1781            "fixture sanity: pushdown DOES fire when input is Filter"
1782        );
1783    }
1784
1785    #[test]
1786    fn test_plan_sort() {
1787        let store = create_test_store();
1788        let planner = Planner::new(store);
1789
1790        // MATCH (n) RETURN n ORDER BY n.name ASC
1791        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1792            items: vec![ReturnItem {
1793                expression: LogicalExpression::Variable("n".to_string()),
1794                alias: None,
1795            }],
1796            distinct: false,
1797            input: Box::new(LogicalOperator::Sort(SortOp {
1798                keys: vec![SortKey {
1799                    expression: LogicalExpression::Variable("n".to_string()),
1800                    order: SortOrder::Ascending,
1801                    nulls: None,
1802                }],
1803                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1804                    variable: "n".to_string(),
1805                    label: None,
1806                    input: None,
1807                })),
1808            })),
1809        }));
1810
1811        let physical = planner.plan(&logical).unwrap();
1812        assert_eq!(physical.columns(), &["n"]);
1813    }
1814
1815    #[test]
1816    fn test_plan_sort_descending() {
1817        let store = create_test_store();
1818        let planner = Planner::new(store);
1819
1820        // ORDER BY n DESC
1821        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1822            items: vec![ReturnItem {
1823                expression: LogicalExpression::Variable("n".to_string()),
1824                alias: None,
1825            }],
1826            distinct: false,
1827            input: Box::new(LogicalOperator::Sort(SortOp {
1828                keys: vec![SortKey {
1829                    expression: LogicalExpression::Variable("n".to_string()),
1830                    order: SortOrder::Descending,
1831                    nulls: None,
1832                }],
1833                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1834                    variable: "n".to_string(),
1835                    label: None,
1836                    input: None,
1837                })),
1838            })),
1839        }));
1840
1841        let physical = planner.plan(&logical).unwrap();
1842        assert_eq!(physical.columns(), &["n"]);
1843    }
1844
1845    #[test]
1846    fn test_plan_distinct() {
1847        let store = create_test_store();
1848        let planner = Planner::new(store);
1849
1850        // MATCH (n) RETURN DISTINCT n
1851        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1852            items: vec![ReturnItem {
1853                expression: LogicalExpression::Variable("n".to_string()),
1854                alias: None,
1855            }],
1856            distinct: false,
1857            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1858                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1859                    variable: "n".to_string(),
1860                    label: None,
1861                    input: None,
1862                })),
1863                columns: None,
1864            })),
1865        }));
1866
1867        let physical = planner.plan(&logical).unwrap();
1868        assert_eq!(physical.columns(), &["n"]);
1869    }
1870
1871    #[test]
1872    fn test_plan_distinct_with_columns() {
1873        let store = create_test_store();
1874        let planner = Planner::new(store);
1875
1876        // DISTINCT on specific columns (column-specific dedup)
1877        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1878            items: vec![ReturnItem {
1879                expression: LogicalExpression::Variable("n".to_string()),
1880                alias: None,
1881            }],
1882            distinct: false,
1883            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1884                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1885                    variable: "n".to_string(),
1886                    label: None,
1887                    input: None,
1888                })),
1889                columns: Some(vec!["n".to_string()]),
1890            })),
1891        }));
1892
1893        let physical = planner.plan(&logical).unwrap();
1894        assert_eq!(physical.columns(), &["n"]);
1895    }
1896
1897    #[test]
1898    fn test_plan_distinct_with_nonexistent_columns() {
1899        let store = create_test_store();
1900        let planner = Planner::new(store);
1901
1902        // When distinct columns don't match any output columns,
1903        // it falls back to full-row distinct.
1904        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1905            items: vec![ReturnItem {
1906                expression: LogicalExpression::Variable("n".to_string()),
1907                alias: None,
1908            }],
1909            distinct: false,
1910            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1911                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1912                    variable: "n".to_string(),
1913                    label: None,
1914                    input: None,
1915                })),
1916                columns: Some(vec!["nonexistent".to_string()]),
1917            })),
1918        }));
1919
1920        let physical = planner.plan(&logical).unwrap();
1921        assert_eq!(physical.columns(), &["n"]);
1922    }
1923
1924    // ==================== Aggregate Tests ====================
1925
1926    #[test]
1927    fn test_plan_aggregate_count() {
1928        let store = create_test_store();
1929        let planner = Planner::new(store);
1930
1931        // MATCH (n) RETURN count(n)
1932        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1933            items: vec![ReturnItem {
1934                expression: LogicalExpression::Variable("cnt".to_string()),
1935                alias: None,
1936            }],
1937            distinct: false,
1938            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1939                group_by: vec![],
1940                aggregates: vec![LogicalAggregateExpr {
1941                    function: LogicalAggregateFunction::Count,
1942                    expression: Some(LogicalExpression::Variable("n".to_string())),
1943                    expression2: None,
1944                    distinct: false,
1945                    alias: Some("cnt".to_string()),
1946                    percentile: None,
1947                    separator: None,
1948                }],
1949                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1950                    variable: "n".to_string(),
1951                    label: None,
1952                    input: None,
1953                })),
1954                having: None,
1955            })),
1956        }));
1957
1958        let physical = planner.plan(&logical).unwrap();
1959        assert!(physical.columns().contains(&"cnt".to_string()));
1960    }
1961
1962    #[test]
1963    fn test_plan_aggregate_with_group_by() {
1964        let store = create_test_store();
1965        let planner = Planner::new(store);
1966
1967        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1968        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1969            group_by: vec![LogicalExpression::Property {
1970                variable: "n".to_string(),
1971                property: "city".to_string(),
1972            }],
1973            aggregates: vec![LogicalAggregateExpr {
1974                function: LogicalAggregateFunction::Count,
1975                expression: Some(LogicalExpression::Variable("n".to_string())),
1976                expression2: None,
1977                distinct: false,
1978                alias: Some("cnt".to_string()),
1979                percentile: None,
1980                separator: None,
1981            }],
1982            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1983                variable: "n".to_string(),
1984                label: Some("Person".to_string()),
1985                input: None,
1986            })),
1987            having: None,
1988        }));
1989
1990        let physical = planner.plan(&logical).unwrap();
1991        assert_eq!(physical.columns().len(), 2);
1992    }
1993
1994    #[test]
1995    fn test_plan_aggregate_sum() {
1996        let store = create_test_store();
1997        let planner = Planner::new(store);
1998
1999        // SUM(n.value)
2000        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2001            group_by: vec![],
2002            aggregates: vec![LogicalAggregateExpr {
2003                function: LogicalAggregateFunction::Sum,
2004                expression: Some(LogicalExpression::Property {
2005                    variable: "n".to_string(),
2006                    property: "value".to_string(),
2007                }),
2008                expression2: None,
2009                distinct: false,
2010                alias: Some("total".to_string()),
2011                percentile: None,
2012                separator: None,
2013            }],
2014            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2015                variable: "n".to_string(),
2016                label: None,
2017                input: None,
2018            })),
2019            having: None,
2020        }));
2021
2022        let physical = planner.plan(&logical).unwrap();
2023        assert!(physical.columns().contains(&"total".to_string()));
2024    }
2025
2026    #[test]
2027    fn test_plan_aggregate_avg() {
2028        let store = create_test_store();
2029        let planner = Planner::new(store);
2030
2031        // AVG(n.score)
2032        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2033            group_by: vec![],
2034            aggregates: vec![LogicalAggregateExpr {
2035                function: LogicalAggregateFunction::Avg,
2036                expression: Some(LogicalExpression::Property {
2037                    variable: "n".to_string(),
2038                    property: "score".to_string(),
2039                }),
2040                expression2: None,
2041                distinct: false,
2042                alias: Some("average".to_string()),
2043                percentile: None,
2044                separator: None,
2045            }],
2046            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2047                variable: "n".to_string(),
2048                label: None,
2049                input: None,
2050            })),
2051            having: None,
2052        }));
2053
2054        let physical = planner.plan(&logical).unwrap();
2055        assert!(physical.columns().contains(&"average".to_string()));
2056    }
2057
2058    #[test]
2059    fn test_plan_aggregate_min_max() {
2060        let store = create_test_store();
2061        let planner = Planner::new(store);
2062
2063        // MIN(n.age), MAX(n.age)
2064        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2065            group_by: vec![],
2066            aggregates: vec![
2067                LogicalAggregateExpr {
2068                    function: LogicalAggregateFunction::Min,
2069                    expression: Some(LogicalExpression::Property {
2070                        variable: "n".to_string(),
2071                        property: "age".to_string(),
2072                    }),
2073                    expression2: None,
2074                    distinct: false,
2075                    alias: Some("youngest".to_string()),
2076                    percentile: None,
2077                    separator: None,
2078                },
2079                LogicalAggregateExpr {
2080                    function: LogicalAggregateFunction::Max,
2081                    expression: Some(LogicalExpression::Property {
2082                        variable: "n".to_string(),
2083                        property: "age".to_string(),
2084                    }),
2085                    expression2: None,
2086                    distinct: false,
2087                    alias: Some("oldest".to_string()),
2088                    percentile: None,
2089                    separator: None,
2090                },
2091            ],
2092            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2093                variable: "n".to_string(),
2094                label: None,
2095                input: None,
2096            })),
2097            having: None,
2098        }));
2099
2100        let physical = planner.plan(&logical).unwrap();
2101        assert!(physical.columns().contains(&"youngest".to_string()));
2102        assert!(physical.columns().contains(&"oldest".to_string()));
2103    }
2104
2105    // ==================== Join Tests ====================
2106
2107    #[test]
2108    fn test_plan_inner_join() {
2109        let store = create_test_store();
2110        let planner = Planner::new(store);
2111
2112        // Inner join between two scans
2113        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2114            items: vec![
2115                ReturnItem {
2116                    expression: LogicalExpression::Variable("a".to_string()),
2117                    alias: None,
2118                },
2119                ReturnItem {
2120                    expression: LogicalExpression::Variable("b".to_string()),
2121                    alias: None,
2122                },
2123            ],
2124            distinct: false,
2125            input: Box::new(LogicalOperator::Join(JoinOp {
2126                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2127                    variable: "a".to_string(),
2128                    label: Some("Person".to_string()),
2129                    input: None,
2130                })),
2131                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2132                    variable: "b".to_string(),
2133                    label: Some("Company".to_string()),
2134                    input: None,
2135                })),
2136                join_type: JoinType::Inner,
2137                conditions: vec![JoinCondition {
2138                    left: LogicalExpression::Variable("a".to_string()),
2139                    right: LogicalExpression::Variable("b".to_string()),
2140                }],
2141            })),
2142        }));
2143
2144        let physical = planner.plan(&logical).unwrap();
2145        assert!(physical.columns().contains(&"a".to_string()));
2146        assert!(physical.columns().contains(&"b".to_string()));
2147    }
2148
2149    #[test]
2150    fn test_plan_cross_join() {
2151        let store = create_test_store();
2152        let planner = Planner::new(store);
2153
2154        // Cross join (no conditions)
2155        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2156            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2157                variable: "a".to_string(),
2158                label: None,
2159                input: None,
2160            })),
2161            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2162                variable: "b".to_string(),
2163                label: None,
2164                input: None,
2165            })),
2166            join_type: JoinType::Cross,
2167            conditions: vec![],
2168        }));
2169
2170        let physical = planner.plan(&logical).unwrap();
2171        assert_eq!(physical.columns().len(), 2);
2172    }
2173
2174    #[test]
2175    fn test_plan_left_join() {
2176        let store = create_test_store();
2177        let planner = Planner::new(store);
2178
2179        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2180            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2181                variable: "a".to_string(),
2182                label: None,
2183                input: None,
2184            })),
2185            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2186                variable: "b".to_string(),
2187                label: None,
2188                input: None,
2189            })),
2190            join_type: JoinType::Left,
2191            conditions: vec![],
2192        }));
2193
2194        let physical = planner.plan(&logical).unwrap();
2195        assert_eq!(physical.columns().len(), 2);
2196    }
2197
2198    // ==================== Mutation Tests ====================
2199
2200    fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
2201        let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStoreSearch>);
2202        p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
2203        p
2204    }
2205
2206    #[test]
2207    fn test_plan_create_node() {
2208        let store = create_test_store();
2209        let planner = create_writable_planner(&store);
2210
2211        // CREATE (n:Person {name: 'Alix'})
2212        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2213            variable: "n".to_string(),
2214            labels: vec!["Person".to_string()],
2215            properties: vec![(
2216                "name".to_string(),
2217                LogicalExpression::Literal(Value::String("Alix".into())),
2218            )],
2219            input: None,
2220        }));
2221
2222        let physical = planner.plan(&logical).unwrap();
2223        assert!(physical.columns().contains(&"n".to_string()));
2224    }
2225
2226    #[test]
2227    fn test_plan_create_edge() {
2228        let store = create_test_store();
2229        let planner = create_writable_planner(&store);
2230
2231        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
2232        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
2233            variable: Some("r".to_string()),
2234            from_variable: "a".to_string(),
2235            to_variable: "b".to_string(),
2236            edge_type: "KNOWS".to_string(),
2237            properties: vec![],
2238            input: Box::new(LogicalOperator::Join(JoinOp {
2239                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2240                    variable: "a".to_string(),
2241                    label: None,
2242                    input: None,
2243                })),
2244                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2245                    variable: "b".to_string(),
2246                    label: None,
2247                    input: None,
2248                })),
2249                join_type: JoinType::Cross,
2250                conditions: vec![],
2251            })),
2252        }));
2253
2254        let physical = planner.plan(&logical).unwrap();
2255        assert!(physical.columns().contains(&"r".to_string()));
2256    }
2257
2258    #[test]
2259    fn test_plan_delete_node() {
2260        let store = create_test_store();
2261        let planner = create_writable_planner(&store);
2262
2263        // MATCH (n) DELETE n
2264        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
2265            variable: "n".to_string(),
2266            detach: false,
2267            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2268                variable: "n".to_string(),
2269                label: None,
2270                input: None,
2271            })),
2272        }));
2273
2274        let physical = planner.plan(&logical).unwrap();
2275        assert!(physical.columns().contains(&"n".to_string()));
2276    }
2277
2278    // ==================== Error Cases ====================
2279
2280    #[test]
2281    fn test_plan_empty_errors() {
2282        let store = create_test_store();
2283        let planner = Planner::new(store);
2284
2285        let logical = LogicalPlan::new(LogicalOperator::Empty);
2286        let result = planner.plan(&logical);
2287        assert!(result.is_err());
2288    }
2289
2290    #[test]
2291    fn test_plan_missing_variable_in_return() {
2292        let store = create_test_store();
2293        let planner = Planner::new(store);
2294
2295        // Return variable that doesn't exist in input
2296        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2297            items: vec![ReturnItem {
2298                expression: LogicalExpression::Variable("missing".to_string()),
2299                alias: None,
2300            }],
2301            distinct: false,
2302            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2303                variable: "n".to_string(),
2304                label: None,
2305                input: None,
2306            })),
2307        }));
2308
2309        let result = planner.plan(&logical);
2310        assert!(result.is_err());
2311    }
2312
2313    // ==================== Helper Function Tests ====================
2314
2315    #[test]
2316    fn test_convert_binary_ops() {
2317        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
2318        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
2319        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
2320        assert!(convert_binary_op(BinaryOp::Le).is_ok());
2321        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
2322        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
2323        assert!(convert_binary_op(BinaryOp::And).is_ok());
2324        assert!(convert_binary_op(BinaryOp::Or).is_ok());
2325        assert!(convert_binary_op(BinaryOp::Add).is_ok());
2326        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2327        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2328        assert!(convert_binary_op(BinaryOp::Div).is_ok());
2329    }
2330
2331    #[test]
2332    fn test_convert_unary_ops() {
2333        assert!(convert_unary_op(UnaryOp::Not).is_ok());
2334        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2335        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2336        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2337    }
2338
2339    #[test]
2340    fn test_convert_aggregate_functions() {
2341        assert!(matches!(
2342            convert_aggregate_function(LogicalAggregateFunction::Count),
2343            PhysicalAggregateFunction::Count
2344        ));
2345        assert!(matches!(
2346            convert_aggregate_function(LogicalAggregateFunction::Sum),
2347            PhysicalAggregateFunction::Sum
2348        ));
2349        assert!(matches!(
2350            convert_aggregate_function(LogicalAggregateFunction::Avg),
2351            PhysicalAggregateFunction::Avg
2352        ));
2353        assert!(matches!(
2354            convert_aggregate_function(LogicalAggregateFunction::Min),
2355            PhysicalAggregateFunction::Min
2356        ));
2357        assert!(matches!(
2358            convert_aggregate_function(LogicalAggregateFunction::Max),
2359            PhysicalAggregateFunction::Max
2360        ));
2361    }
2362
2363    #[test]
2364    fn test_planner_accessors() {
2365        let store = create_test_store();
2366        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2367
2368        assert!(planner.transaction_id().is_none());
2369        assert!(planner.transaction_manager().is_none());
2370        let _ = planner.viewing_epoch(); // Just ensure it's accessible
2371    }
2372
2373    #[test]
2374    fn test_physical_plan_accessors() {
2375        let store = create_test_store();
2376        let planner = Planner::new(store);
2377
2378        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2379            variable: "n".to_string(),
2380            label: None,
2381            input: None,
2382        }));
2383
2384        let physical = planner.plan(&logical).unwrap();
2385        assert_eq!(physical.columns(), &["n"]);
2386
2387        // Test into_operator
2388        let _ = physical.into_operator();
2389    }
2390
2391    // ==================== Adaptive Planning Tests ====================
2392
2393    #[test]
2394    fn test_plan_adaptive_with_scan() {
2395        let store = create_test_store();
2396        let planner = Planner::new(store);
2397
2398        // MATCH (n:Person) RETURN n
2399        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2400            items: vec![ReturnItem {
2401                expression: LogicalExpression::Variable("n".to_string()),
2402                alias: None,
2403            }],
2404            distinct: false,
2405            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2406                variable: "n".to_string(),
2407                label: Some("Person".to_string()),
2408                input: None,
2409            })),
2410        }));
2411
2412        let physical = planner.plan_adaptive(&logical).unwrap();
2413        assert_eq!(physical.columns(), &["n"]);
2414        // Should have adaptive context with estimates
2415        assert!(physical.adaptive_context.is_some());
2416    }
2417
2418    #[test]
2419    fn test_plan_adaptive_with_filter() {
2420        let store = create_test_store();
2421        let planner = Planner::new(store);
2422
2423        // MATCH (n) WHERE n.age > 30 RETURN n
2424        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2425            items: vec![ReturnItem {
2426                expression: LogicalExpression::Variable("n".to_string()),
2427                alias: None,
2428            }],
2429            distinct: false,
2430            input: Box::new(LogicalOperator::Filter(FilterOp {
2431                predicate: LogicalExpression::Binary {
2432                    left: Box::new(LogicalExpression::Property {
2433                        variable: "n".to_string(),
2434                        property: "age".to_string(),
2435                    }),
2436                    op: BinaryOp::Gt,
2437                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2438                },
2439                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2440                    variable: "n".to_string(),
2441                    label: None,
2442                    input: None,
2443                })),
2444                pushdown_hint: None,
2445            })),
2446        }));
2447
2448        let physical = planner.plan_adaptive(&logical).unwrap();
2449        assert!(physical.adaptive_context.is_some());
2450    }
2451
2452    #[test]
2453    fn test_plan_adaptive_with_expand() {
2454        let store = create_test_store();
2455        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2456            .with_factorized_execution(false);
2457
2458        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
2459        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2460            items: vec![
2461                ReturnItem {
2462                    expression: LogicalExpression::Variable("a".to_string()),
2463                    alias: None,
2464                },
2465                ReturnItem {
2466                    expression: LogicalExpression::Variable("b".to_string()),
2467                    alias: None,
2468                },
2469            ],
2470            distinct: false,
2471            input: Box::new(LogicalOperator::Expand(ExpandOp {
2472                from_variable: "a".to_string(),
2473                to_variable: "b".to_string(),
2474                edge_variable: None,
2475                direction: ExpandDirection::Outgoing,
2476                edge_types: vec!["KNOWS".to_string()],
2477                min_hops: 1,
2478                max_hops: Some(1),
2479                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2480                    variable: "a".to_string(),
2481                    label: None,
2482                    input: None,
2483                })),
2484                path_alias: None,
2485                path_mode: PathMode::Walk,
2486            })),
2487        }));
2488
2489        let physical = planner.plan_adaptive(&logical).unwrap();
2490        assert!(physical.adaptive_context.is_some());
2491    }
2492
2493    #[test]
2494    fn test_plan_adaptive_with_join() {
2495        let store = create_test_store();
2496        let planner = Planner::new(store);
2497
2498        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2499            items: vec![
2500                ReturnItem {
2501                    expression: LogicalExpression::Variable("a".to_string()),
2502                    alias: None,
2503                },
2504                ReturnItem {
2505                    expression: LogicalExpression::Variable("b".to_string()),
2506                    alias: None,
2507                },
2508            ],
2509            distinct: false,
2510            input: Box::new(LogicalOperator::Join(JoinOp {
2511                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2512                    variable: "a".to_string(),
2513                    label: None,
2514                    input: None,
2515                })),
2516                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2517                    variable: "b".to_string(),
2518                    label: None,
2519                    input: None,
2520                })),
2521                join_type: JoinType::Cross,
2522                conditions: vec![],
2523            })),
2524        }));
2525
2526        let physical = planner.plan_adaptive(&logical).unwrap();
2527        assert!(physical.adaptive_context.is_some());
2528    }
2529
2530    #[test]
2531    fn test_plan_adaptive_with_aggregate() {
2532        let store = create_test_store();
2533        let planner = Planner::new(store);
2534
2535        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2536            group_by: vec![],
2537            aggregates: vec![LogicalAggregateExpr {
2538                function: LogicalAggregateFunction::Count,
2539                expression: Some(LogicalExpression::Variable("n".to_string())),
2540                expression2: None,
2541                distinct: false,
2542                alias: Some("cnt".to_string()),
2543                percentile: None,
2544                separator: None,
2545            }],
2546            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2547                variable: "n".to_string(),
2548                label: None,
2549                input: None,
2550            })),
2551            having: None,
2552        }));
2553
2554        let physical = planner.plan_adaptive(&logical).unwrap();
2555        assert!(physical.adaptive_context.is_some());
2556    }
2557
2558    #[test]
2559    fn test_plan_adaptive_with_distinct() {
2560        let store = create_test_store();
2561        let planner = Planner::new(store);
2562
2563        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2564            items: vec![ReturnItem {
2565                expression: LogicalExpression::Variable("n".to_string()),
2566                alias: None,
2567            }],
2568            distinct: false,
2569            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2570                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2571                    variable: "n".to_string(),
2572                    label: None,
2573                    input: None,
2574                })),
2575                columns: None,
2576            })),
2577        }));
2578
2579        let physical = planner.plan_adaptive(&logical).unwrap();
2580        assert!(physical.adaptive_context.is_some());
2581    }
2582
2583    #[test]
2584    fn test_plan_adaptive_with_limit() {
2585        let store = create_test_store();
2586        let planner = Planner::new(store);
2587
2588        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2589            items: vec![ReturnItem {
2590                expression: LogicalExpression::Variable("n".to_string()),
2591                alias: None,
2592            }],
2593            distinct: false,
2594            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2595                count: 10.into(),
2596                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2597                    variable: "n".to_string(),
2598                    label: None,
2599                    input: None,
2600                })),
2601            })),
2602        }));
2603
2604        let physical = planner.plan_adaptive(&logical).unwrap();
2605        assert!(physical.adaptive_context.is_some());
2606    }
2607
2608    #[test]
2609    fn test_plan_adaptive_with_skip() {
2610        let store = create_test_store();
2611        let planner = Planner::new(store);
2612
2613        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2614            items: vec![ReturnItem {
2615                expression: LogicalExpression::Variable("n".to_string()),
2616                alias: None,
2617            }],
2618            distinct: false,
2619            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2620                count: 5.into(),
2621                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2622                    variable: "n".to_string(),
2623                    label: None,
2624                    input: None,
2625                })),
2626            })),
2627        }));
2628
2629        let physical = planner.plan_adaptive(&logical).unwrap();
2630        assert!(physical.adaptive_context.is_some());
2631    }
2632
2633    #[test]
2634    fn test_plan_adaptive_with_sort() {
2635        let store = create_test_store();
2636        let planner = Planner::new(store);
2637
2638        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2639            items: vec![ReturnItem {
2640                expression: LogicalExpression::Variable("n".to_string()),
2641                alias: None,
2642            }],
2643            distinct: false,
2644            input: Box::new(LogicalOperator::Sort(SortOp {
2645                keys: vec![SortKey {
2646                    expression: LogicalExpression::Variable("n".to_string()),
2647                    order: SortOrder::Ascending,
2648                    nulls: None,
2649                }],
2650                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2651                    variable: "n".to_string(),
2652                    label: None,
2653                    input: None,
2654                })),
2655            })),
2656        }));
2657
2658        let physical = planner.plan_adaptive(&logical).unwrap();
2659        assert!(physical.adaptive_context.is_some());
2660    }
2661
2662    #[test]
2663    fn test_plan_adaptive_with_union() {
2664        let store = create_test_store();
2665        let planner = Planner::new(store);
2666
2667        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2668            items: vec![ReturnItem {
2669                expression: LogicalExpression::Variable("n".to_string()),
2670                alias: None,
2671            }],
2672            distinct: false,
2673            input: Box::new(LogicalOperator::Union(UnionOp {
2674                inputs: vec![
2675                    LogicalOperator::NodeScan(NodeScanOp {
2676                        variable: "n".to_string(),
2677                        label: Some("Person".to_string()),
2678                        input: None,
2679                    }),
2680                    LogicalOperator::NodeScan(NodeScanOp {
2681                        variable: "n".to_string(),
2682                        label: Some("Company".to_string()),
2683                        input: None,
2684                    }),
2685                ],
2686            })),
2687        }));
2688
2689        let physical = planner.plan_adaptive(&logical).unwrap();
2690        assert!(physical.adaptive_context.is_some());
2691    }
2692
2693    // ==================== Variable Length Path Tests ====================
2694
2695    #[test]
2696    fn test_plan_expand_variable_length() {
2697        let store = create_test_store();
2698        let planner = Planner::new(store);
2699
2700        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2701        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2702            items: vec![
2703                ReturnItem {
2704                    expression: LogicalExpression::Variable("a".to_string()),
2705                    alias: None,
2706                },
2707                ReturnItem {
2708                    expression: LogicalExpression::Variable("b".to_string()),
2709                    alias: None,
2710                },
2711            ],
2712            distinct: false,
2713            input: Box::new(LogicalOperator::Expand(ExpandOp {
2714                from_variable: "a".to_string(),
2715                to_variable: "b".to_string(),
2716                edge_variable: None,
2717                direction: ExpandDirection::Outgoing,
2718                edge_types: vec!["KNOWS".to_string()],
2719                min_hops: 1,
2720                max_hops: Some(3),
2721                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2722                    variable: "a".to_string(),
2723                    label: None,
2724                    input: None,
2725                })),
2726                path_alias: None,
2727                path_mode: PathMode::Walk,
2728            })),
2729        }));
2730
2731        let physical = planner.plan(&logical).unwrap();
2732        assert!(physical.columns().contains(&"a".to_string()));
2733        assert!(physical.columns().contains(&"b".to_string()));
2734    }
2735
2736    #[test]
2737    fn test_plan_expand_with_path_alias() {
2738        let store = create_test_store();
2739        let planner = Planner::new(store);
2740
2741        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2742        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2743            items: vec![
2744                ReturnItem {
2745                    expression: LogicalExpression::Variable("a".to_string()),
2746                    alias: None,
2747                },
2748                ReturnItem {
2749                    expression: LogicalExpression::Variable("b".to_string()),
2750                    alias: None,
2751                },
2752            ],
2753            distinct: false,
2754            input: Box::new(LogicalOperator::Expand(ExpandOp {
2755                from_variable: "a".to_string(),
2756                to_variable: "b".to_string(),
2757                edge_variable: None,
2758                direction: ExpandDirection::Outgoing,
2759                edge_types: vec!["KNOWS".to_string()],
2760                min_hops: 1,
2761                max_hops: Some(3),
2762                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2763                    variable: "a".to_string(),
2764                    label: None,
2765                    input: None,
2766                })),
2767                path_alias: Some("p".to_string()),
2768                path_mode: PathMode::Walk,
2769            })),
2770        }));
2771
2772        let physical = planner.plan(&logical).unwrap();
2773        // Verify plan was created successfully with expected output columns
2774        assert!(physical.columns().contains(&"a".to_string()));
2775        assert!(physical.columns().contains(&"b".to_string()));
2776    }
2777
2778    #[test]
2779    fn test_plan_expand_incoming() {
2780        let store = create_test_store();
2781        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2782            .with_factorized_execution(false);
2783
2784        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2785        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2786            items: vec![
2787                ReturnItem {
2788                    expression: LogicalExpression::Variable("a".to_string()),
2789                    alias: None,
2790                },
2791                ReturnItem {
2792                    expression: LogicalExpression::Variable("b".to_string()),
2793                    alias: None,
2794                },
2795            ],
2796            distinct: false,
2797            input: Box::new(LogicalOperator::Expand(ExpandOp {
2798                from_variable: "a".to_string(),
2799                to_variable: "b".to_string(),
2800                edge_variable: None,
2801                direction: ExpandDirection::Incoming,
2802                edge_types: vec!["KNOWS".to_string()],
2803                min_hops: 1,
2804                max_hops: Some(1),
2805                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2806                    variable: "a".to_string(),
2807                    label: None,
2808                    input: None,
2809                })),
2810                path_alias: None,
2811                path_mode: PathMode::Walk,
2812            })),
2813        }));
2814
2815        let physical = planner.plan(&logical).unwrap();
2816        assert!(physical.columns().contains(&"a".to_string()));
2817        assert!(physical.columns().contains(&"b".to_string()));
2818    }
2819
2820    #[test]
2821    fn test_plan_expand_both_directions() {
2822        let store = create_test_store();
2823        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2824            .with_factorized_execution(false);
2825
2826        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2827        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2828            items: vec![
2829                ReturnItem {
2830                    expression: LogicalExpression::Variable("a".to_string()),
2831                    alias: None,
2832                },
2833                ReturnItem {
2834                    expression: LogicalExpression::Variable("b".to_string()),
2835                    alias: None,
2836                },
2837            ],
2838            distinct: false,
2839            input: Box::new(LogicalOperator::Expand(ExpandOp {
2840                from_variable: "a".to_string(),
2841                to_variable: "b".to_string(),
2842                edge_variable: None,
2843                direction: ExpandDirection::Both,
2844                edge_types: vec!["KNOWS".to_string()],
2845                min_hops: 1,
2846                max_hops: Some(1),
2847                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2848                    variable: "a".to_string(),
2849                    label: None,
2850                    input: None,
2851                })),
2852                path_alias: None,
2853                path_mode: PathMode::Walk,
2854            })),
2855        }));
2856
2857        let physical = planner.plan(&logical).unwrap();
2858        assert!(physical.columns().contains(&"a".to_string()));
2859        assert!(physical.columns().contains(&"b".to_string()));
2860    }
2861
2862    // ==================== With Context Tests ====================
2863
2864    #[test]
2865    fn test_planner_with_context() {
2866        use crate::transaction::TransactionManager;
2867
2868        let store = create_test_store();
2869        let transaction_manager = Arc::new(TransactionManager::new());
2870        let transaction_id = transaction_manager.begin();
2871        let epoch = transaction_manager.current_epoch();
2872
2873        let planner = Planner::with_context(
2874            Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
2875            Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2876            Arc::clone(&transaction_manager),
2877            Some(transaction_id),
2878            epoch,
2879        );
2880
2881        assert_eq!(planner.transaction_id(), Some(transaction_id));
2882        assert!(planner.transaction_manager().is_some());
2883        assert_eq!(planner.viewing_epoch(), epoch);
2884    }
2885
2886    #[test]
2887    fn test_planner_with_factorized_execution_disabled() {
2888        let store = create_test_store();
2889        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2890            .with_factorized_execution(false);
2891
2892        // Two consecutive expands - should NOT use factorized execution
2893        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2894            items: vec![
2895                ReturnItem {
2896                    expression: LogicalExpression::Variable("a".to_string()),
2897                    alias: None,
2898                },
2899                ReturnItem {
2900                    expression: LogicalExpression::Variable("c".to_string()),
2901                    alias: None,
2902                },
2903            ],
2904            distinct: false,
2905            input: Box::new(LogicalOperator::Expand(ExpandOp {
2906                from_variable: "b".to_string(),
2907                to_variable: "c".to_string(),
2908                edge_variable: None,
2909                direction: ExpandDirection::Outgoing,
2910                edge_types: vec![],
2911                min_hops: 1,
2912                max_hops: Some(1),
2913                input: Box::new(LogicalOperator::Expand(ExpandOp {
2914                    from_variable: "a".to_string(),
2915                    to_variable: "b".to_string(),
2916                    edge_variable: None,
2917                    direction: ExpandDirection::Outgoing,
2918                    edge_types: vec![],
2919                    min_hops: 1,
2920                    max_hops: Some(1),
2921                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2922                        variable: "a".to_string(),
2923                        label: None,
2924                        input: None,
2925                    })),
2926                    path_alias: None,
2927                    path_mode: PathMode::Walk,
2928                })),
2929                path_alias: None,
2930                path_mode: PathMode::Walk,
2931            })),
2932        }));
2933
2934        let physical = planner.plan(&logical).unwrap();
2935        assert!(physical.columns().contains(&"a".to_string()));
2936        assert!(physical.columns().contains(&"c".to_string()));
2937    }
2938
2939    // ==================== Sort with Property Tests ====================
2940
2941    #[test]
2942    fn test_plan_sort_by_property() {
2943        let store = create_test_store();
2944        let planner = Planner::new(store);
2945
2946        // MATCH (n) RETURN n ORDER BY n.name ASC
2947        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2948            items: vec![ReturnItem {
2949                expression: LogicalExpression::Variable("n".to_string()),
2950                alias: None,
2951            }],
2952            distinct: false,
2953            input: Box::new(LogicalOperator::Sort(SortOp {
2954                keys: vec![SortKey {
2955                    expression: LogicalExpression::Property {
2956                        variable: "n".to_string(),
2957                        property: "name".to_string(),
2958                    },
2959                    order: SortOrder::Ascending,
2960                    nulls: None,
2961                }],
2962                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2963                    variable: "n".to_string(),
2964                    label: None,
2965                    input: None,
2966                })),
2967            })),
2968        }));
2969
2970        let physical = planner.plan(&logical).unwrap();
2971        // Should have the property column projected
2972        assert!(physical.columns().contains(&"n".to_string()));
2973    }
2974
2975    // ==================== Scan with Input Tests ====================
2976
2977    #[test]
2978    fn test_plan_scan_with_input() {
2979        let store = create_test_store();
2980        let planner = Planner::new(store);
2981
2982        // A scan with another scan as input (for chained patterns)
2983        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2984            items: vec![
2985                ReturnItem {
2986                    expression: LogicalExpression::Variable("a".to_string()),
2987                    alias: None,
2988                },
2989                ReturnItem {
2990                    expression: LogicalExpression::Variable("b".to_string()),
2991                    alias: None,
2992                },
2993            ],
2994            distinct: false,
2995            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2996                variable: "b".to_string(),
2997                label: Some("Company".to_string()),
2998                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2999                    variable: "a".to_string(),
3000                    label: Some("Person".to_string()),
3001                    input: None,
3002                }))),
3003            })),
3004        }));
3005
3006        let physical = planner.plan(&logical).unwrap();
3007        assert!(physical.columns().contains(&"a".to_string()));
3008        assert!(physical.columns().contains(&"b".to_string()));
3009    }
3010
3011    // ==================== Additional Coverage Tests ====================
3012    //
3013    // These tests target branches that were not exercised by the original
3014    // planner tests: builder methods, read-only flag, profiled planning,
3015    // unsupported operator error paths, and the dispatch branches for every
3016    // plan_* function reachable through plan_operator.
3017
3018    use crate::catalog::Catalog;
3019    use crate::query::plan::{
3020        AddLabelOp, AntiJoinOp, ApplyOp, BindOp, DeleteEdgeOp, EdgeScanOp, ExceptOp,
3021        HorizontalAggregateOp, IntersectOp, LeftJoinOp, LoadDataFormat, LoadDataOp, MapCollectOp,
3022        MergeOp, MergeRelationshipOp, MultiWayJoinOp, OtherwiseOp, ParameterScanOp, RemoveLabelOp,
3023        SetPropertyOp, ShortestPathOp, TripleComponent, TripleScanOp, UnionOp, UnwindOp,
3024    };
3025    use grafeo_core::execution::operators::{Operator, SessionContext};
3026
3027    fn full_store() -> Arc<LpgStore> {
3028        // Richer store so expand and shortest path tests have real data.
3029        let store = Arc::new(LpgStore::new().unwrap());
3030        let vincent = store.create_node(&["Person"]);
3031        let jules = store.create_node(&["Person"]);
3032        let mia = store.create_node(&["Person"]);
3033        let _company = store.create_node(&["Company"]);
3034        store.create_edge(vincent, jules, "KNOWS");
3035        store.create_edge(jules, mia, "KNOWS");
3036        store
3037    }
3038
3039    fn scan_person(var: &str) -> LogicalOperator {
3040        LogicalOperator::NodeScan(NodeScanOp {
3041            variable: var.to_string(),
3042            label: Some("Person".to_string()),
3043            input: None,
3044        })
3045    }
3046
3047    fn scan_any(var: &str) -> LogicalOperator {
3048        LogicalOperator::NodeScan(NodeScanOp {
3049            variable: var.to_string(),
3050            label: None,
3051            input: None,
3052        })
3053    }
3054
3055    // ==================== Builder Methods ====================
3056
3057    #[test]
3058    fn test_with_read_only_flag() {
3059        let store = create_test_store();
3060        let planner =
3061            Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(true);
3062        assert!(planner.read_only);
3063
3064        let planner_off =
3065            Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(false);
3066        assert!(!planner_off.read_only);
3067    }
3068
3069    #[test]
3070    fn test_with_catalog() {
3071        let store = create_test_store();
3072        let catalog = Arc::new(Catalog::new());
3073        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
3074            .with_catalog(Arc::clone(&catalog));
3075        assert!(planner.catalog.is_some());
3076    }
3077
3078    #[test]
3079    fn test_with_session_context() {
3080        let store = create_test_store();
3081        let context = SessionContext {
3082            current_schema: Some("public".to_string()),
3083            current_graph: Some("main".to_string()),
3084            ..SessionContext::default()
3085        };
3086        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
3087            .with_session_context(context);
3088        assert_eq!(
3089            planner.session_context.current_schema.as_deref(),
3090            Some("public")
3091        );
3092        assert_eq!(
3093            planner.session_context.current_graph.as_deref(),
3094            Some("main")
3095        );
3096    }
3097
3098    // ==================== register_edge_column ====================
3099
3100    #[test]
3101    fn test_register_edge_column_named() {
3102        let store = create_test_store();
3103        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3104        let name = planner.register_edge_column(&Some("r".to_string()));
3105        assert_eq!(name, "r");
3106        assert!(planner.edge_columns.borrow().contains("r"));
3107    }
3108
3109    #[test]
3110    fn test_register_edge_column_anonymous_counter_advances() {
3111        let store = create_test_store();
3112        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3113        let a = planner.register_edge_column(&None);
3114        let b = planner.register_edge_column(&None);
3115        assert_eq!(a, "_anon_edge_0");
3116        assert_eq!(b, "_anon_edge_1");
3117        assert!(planner.edge_columns.borrow().contains("_anon_edge_0"));
3118        assert!(planner.edge_columns.borrow().contains("_anon_edge_1"));
3119    }
3120
3121    // ==================== write_store() error path ====================
3122
3123    #[test]
3124    fn test_create_node_without_write_store_errors() {
3125        // Read-only planner: CREATE should fail with ReadOnly transaction error.
3126        let store = create_test_store();
3127        let planner = Planner::new(store);
3128
3129        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3130            variable: "n".to_string(),
3131            labels: vec!["Person".to_string()],
3132            properties: vec![],
3133            input: None,
3134        }));
3135
3136        let result = planner.plan(&logical);
3137        assert!(result.is_err());
3138    }
3139
3140    // ==================== plan_profiled ====================
3141
3142    #[test]
3143    fn test_plan_profiled_collects_entries() {
3144        let store = create_test_store();
3145        let planner = Planner::new(store);
3146
3147        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3148            items: vec![ReturnItem {
3149                expression: LogicalExpression::Variable("n".to_string()),
3150                alias: None,
3151            }],
3152            distinct: false,
3153            input: Box::new(scan_person("n")),
3154        }));
3155
3156        let (physical, entries) = planner.plan_profiled(&logical).unwrap();
3157        assert_eq!(physical.columns(), &["n"]);
3158        // Post-order: scan, then return (at least two entries).
3159        assert!(
3160            entries.len() >= 2,
3161            "expected entries, got {}",
3162            entries.len()
3163        );
3164        // After profiling, the internal flag is cleared.
3165        assert!(!planner.profiling.get());
3166    }
3167
3168    #[test]
3169    fn test_plan_profiled_propagates_plan_errors() {
3170        let store = create_test_store();
3171        let planner = Planner::new(store);
3172        let logical = LogicalPlan::new(LogicalOperator::Empty);
3173        let result = planner.plan_profiled(&logical);
3174        assert!(result.is_err());
3175        // Profiling must still be reset to false even on error.
3176        assert!(!planner.profiling.get());
3177    }
3178
3179    // ==================== Unsupported operator error paths ====================
3180
3181    #[test]
3182    fn test_plan_edge_scan_is_unsupported() {
3183        // LPG planner does not handle bare EdgeScan; this hits the catch-all branch.
3184        let store = create_test_store();
3185        let planner = Planner::new(store);
3186        let logical = LogicalPlan::new(LogicalOperator::EdgeScan(EdgeScanOp {
3187            variable: "e".to_string(),
3188            edge_types: vec![],
3189            input: None,
3190        }));
3191        let err = planner.plan(&logical).err().expect("plan should fail");
3192        assert!(format!("{err}").contains("Unsupported operator"));
3193    }
3194
3195    #[test]
3196    fn test_plan_triple_scan_is_unsupported() {
3197        let store = create_test_store();
3198        let planner = Planner::new(store);
3199        let logical = LogicalPlan::new(LogicalOperator::TripleScan(TripleScanOp {
3200            subject: TripleComponent::Variable("s".to_string()),
3201            predicate: TripleComponent::Variable("p".to_string()),
3202            object: TripleComponent::Variable("o".to_string()),
3203            graph: None,
3204            input: None,
3205            dataset: None,
3206        }));
3207        assert!(planner.plan(&logical).is_err());
3208    }
3209
3210    #[test]
3211    fn test_plan_bind_is_unsupported() {
3212        let store = create_test_store();
3213        let planner = Planner::new(store);
3214        let logical = LogicalPlan::new(LogicalOperator::Bind(BindOp {
3215            expression: LogicalExpression::Literal(Value::Int64(1)),
3216            variable: "x".to_string(),
3217            input: Box::new(scan_any("n")),
3218        }));
3219        assert!(planner.plan(&logical).is_err());
3220    }
3221
3222    #[test]
3223    fn test_plan_parameter_scan_without_apply_errors() {
3224        let store = create_test_store();
3225        let planner = Planner::new(store);
3226        let logical = LogicalPlan::new(LogicalOperator::ParameterScan(ParameterScanOp {
3227            columns: vec!["n".to_string()],
3228        }));
3229        let err = planner.plan(&logical).err().expect("plan should fail");
3230        assert!(format!("{err}").contains("ParameterScan"));
3231    }
3232
3233    // ==================== plan_operator dispatch branches ====================
3234
3235    #[test]
3236    fn test_plan_union_dispatch() {
3237        let store = create_test_store();
3238        let planner = Planner::new(store);
3239        let logical = LogicalPlan::new(LogicalOperator::Union(UnionOp {
3240            inputs: vec![scan_person("n"), scan_person("n")],
3241        }));
3242        let physical = planner.plan(&logical).unwrap();
3243        assert_eq!(physical.columns(), &["n"]);
3244    }
3245
3246    #[test]
3247    fn test_plan_except_dispatch() {
3248        let store = create_test_store();
3249        let planner = Planner::new(store);
3250        let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3251            left: Box::new(scan_person("n")),
3252            right: Box::new(scan_person("n")),
3253            all: false,
3254        }));
3255        let physical = planner.plan(&logical).unwrap();
3256        assert_eq!(physical.columns(), &["n"]);
3257    }
3258
3259    #[test]
3260    fn test_plan_intersect_dispatch() {
3261        let store = create_test_store();
3262        let planner = Planner::new(store);
3263        let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3264            left: Box::new(scan_person("n")),
3265            right: Box::new(scan_person("n")),
3266            all: false,
3267        }));
3268        let physical = planner.plan(&logical).unwrap();
3269        assert_eq!(physical.columns(), &["n"]);
3270    }
3271
3272    #[test]
3273    fn test_plan_otherwise_dispatch() {
3274        let store = create_test_store();
3275        let planner = Planner::new(store);
3276        let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3277            left: Box::new(scan_person("n")),
3278            right: Box::new(scan_any("n")),
3279        }));
3280        let physical = planner.plan(&logical).unwrap();
3281        assert_eq!(physical.columns(), &["n"]);
3282    }
3283
3284    #[test]
3285    fn test_plan_left_join_dispatch() {
3286        let store = create_test_store();
3287        let planner = Planner::new(store);
3288        let logical = LogicalPlan::new(LogicalOperator::LeftJoin(LeftJoinOp {
3289            left: Box::new(scan_any("a")),
3290            right: Box::new(scan_any("b")),
3291            condition: None,
3292        }));
3293        let physical = planner.plan(&logical).unwrap();
3294        assert!(physical.columns().contains(&"a".to_string()));
3295        assert!(physical.columns().contains(&"b".to_string()));
3296    }
3297
3298    #[test]
3299    fn test_plan_anti_join_dispatch() {
3300        let store = create_test_store();
3301        let planner = Planner::new(store);
3302        let logical = LogicalPlan::new(LogicalOperator::AntiJoin(AntiJoinOp {
3303            left: Box::new(scan_any("a")),
3304            right: Box::new(scan_any("b")),
3305        }));
3306        let physical = planner.plan(&logical).unwrap();
3307        assert!(physical.columns().contains(&"a".to_string()));
3308    }
3309
3310    #[test]
3311    fn test_plan_apply_uncorrelated_dispatch() {
3312        let store = create_test_store();
3313        let planner = Planner::new(store);
3314        let logical = LogicalPlan::new(LogicalOperator::Apply(ApplyOp {
3315            input: Box::new(scan_any("a")),
3316            subplan: Box::new(scan_any("b")),
3317            shared_variables: vec![],
3318            optional: false,
3319        }));
3320        let physical = planner.plan(&logical).unwrap();
3321        assert!(physical.columns().contains(&"a".to_string()));
3322        assert!(physical.columns().contains(&"b".to_string()));
3323    }
3324
3325    #[test]
3326    fn test_plan_unwind_literal_list() {
3327        let store = create_test_store();
3328        let planner = Planner::new(store);
3329
3330        // UNWIND [1,2,3] AS x
3331        let logical = LogicalPlan::new(LogicalOperator::Unwind(UnwindOp {
3332            expression: LogicalExpression::List(vec![
3333                LogicalExpression::Literal(Value::Int64(1)),
3334                LogicalExpression::Literal(Value::Int64(2)),
3335                LogicalExpression::Literal(Value::Int64(3)),
3336            ]),
3337            variable: "x".to_string(),
3338            ordinality_var: None,
3339            offset_var: None,
3340            input: Box::new(LogicalOperator::Empty),
3341        }));
3342        let physical = planner.plan(&logical).unwrap();
3343        assert!(physical.columns().contains(&"x".to_string()));
3344    }
3345
3346    #[test]
3347    fn test_plan_merge_dispatch() {
3348        let store = create_test_store();
3349        let planner = create_writable_planner(&store);
3350
3351        // MERGE (n:Person)
3352        let logical = LogicalPlan::new(LogicalOperator::Merge(MergeOp {
3353            variable: "n".to_string(),
3354            labels: vec!["Person".to_string()],
3355            match_properties: vec![],
3356            on_create: vec![],
3357            on_match: vec![],
3358            input: Box::new(LogicalOperator::Empty),
3359        }));
3360        let physical = planner.plan(&logical).unwrap();
3361        assert!(physical.columns().contains(&"n".to_string()));
3362    }
3363
3364    #[test]
3365    fn test_plan_merge_relationship_dispatch() {
3366        let store = full_store();
3367        let planner = create_writable_planner(&store);
3368
3369        // MATCH (a:Person),(b:Person) MERGE (a)-[r:KNOWS]->(b)
3370        let logical = LogicalPlan::new(LogicalOperator::MergeRelationship(MergeRelationshipOp {
3371            variable: "r".to_string(),
3372            source_variable: "a".to_string(),
3373            target_variable: "b".to_string(),
3374            edge_type: "KNOWS".to_string(),
3375            match_properties: vec![],
3376            on_create: vec![],
3377            on_match: vec![],
3378            input: Box::new(LogicalOperator::Join(JoinOp {
3379                left: Box::new(scan_person("a")),
3380                right: Box::new(scan_person("b")),
3381                join_type: JoinType::Cross,
3382                conditions: vec![],
3383            })),
3384        }));
3385        let physical = planner.plan(&logical).unwrap();
3386        assert!(physical.columns().contains(&"r".to_string()));
3387    }
3388
3389    #[test]
3390    fn test_plan_add_label_dispatch() {
3391        let store = full_store();
3392        let planner = create_writable_planner(&store);
3393        let logical = LogicalPlan::new(LogicalOperator::AddLabel(AddLabelOp {
3394            variable: "n".to_string(),
3395            labels: vec!["VIP".to_string()],
3396            input: Box::new(scan_person("n")),
3397        }));
3398        let physical = planner.plan(&logical).unwrap();
3399        assert!(physical.columns().contains(&"labels_added".to_string()));
3400    }
3401
3402    #[test]
3403    fn test_plan_remove_label_dispatch() {
3404        let store = full_store();
3405        let planner = create_writable_planner(&store);
3406        let logical = LogicalPlan::new(LogicalOperator::RemoveLabel(RemoveLabelOp {
3407            variable: "n".to_string(),
3408            labels: vec!["Person".to_string()],
3409            input: Box::new(scan_person("n")),
3410        }));
3411        let physical = planner.plan(&logical).unwrap();
3412        assert!(physical.columns().contains(&"labels_removed".to_string()));
3413    }
3414
3415    #[test]
3416    fn test_plan_set_property_dispatch() {
3417        let store = full_store();
3418        let planner = create_writable_planner(&store);
3419        let logical = LogicalPlan::new(LogicalOperator::SetProperty(SetPropertyOp {
3420            variable: "n".to_string(),
3421            properties: vec![(
3422                "city".to_string(),
3423                LogicalExpression::Literal(Value::String("Amsterdam".into())),
3424            )],
3425            replace: false,
3426            is_edge: false,
3427            input: Box::new(scan_person("n")),
3428        }));
3429        let physical = planner.plan(&logical).unwrap();
3430        assert!(physical.columns().contains(&"n".to_string()));
3431    }
3432
3433    #[test]
3434    fn test_plan_delete_edge_dispatch() {
3435        let store = full_store();
3436        let planner = create_writable_planner(&store);
3437
3438        // Register the edge column first via an outgoing expand, then DELETE r.
3439        let expand_op = LogicalOperator::Expand(ExpandOp {
3440            from_variable: "a".to_string(),
3441            to_variable: "b".to_string(),
3442            edge_variable: Some("r".to_string()),
3443            direction: ExpandDirection::Outgoing,
3444            edge_types: vec!["KNOWS".to_string()],
3445            min_hops: 1,
3446            max_hops: Some(1),
3447            input: Box::new(scan_person("a")),
3448            path_alias: None,
3449            path_mode: PathMode::Walk,
3450        });
3451        let logical = LogicalPlan::new(LogicalOperator::DeleteEdge(DeleteEdgeOp {
3452            variable: "r".to_string(),
3453            input: Box::new(expand_op),
3454        }));
3455        let physical = planner.plan(&logical).unwrap();
3456        assert!(physical.columns().contains(&"r".to_string()));
3457    }
3458
3459    #[test]
3460    fn test_plan_shortest_path_dispatch() {
3461        let store = full_store();
3462        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3463
3464        // SHORTEST PATH (a)-(b)
3465        let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3466            input: Box::new(LogicalOperator::Join(JoinOp {
3467                left: Box::new(scan_person("a")),
3468                right: Box::new(scan_person("b")),
3469                join_type: JoinType::Cross,
3470                conditions: vec![],
3471            })),
3472            source_var: "a".to_string(),
3473            target_var: "b".to_string(),
3474            edge_types: vec!["KNOWS".to_string()],
3475            direction: ExpandDirection::Outgoing,
3476            path_alias: "p".to_string(),
3477            all_paths: false,
3478        }));
3479        let physical = planner.plan(&logical).unwrap();
3480        assert!(
3481            physical
3482                .columns()
3483                .iter()
3484                .any(|c| c.contains("_path_length_p"))
3485        );
3486    }
3487
3488    #[test]
3489    fn test_plan_shortest_path_missing_source_errors() {
3490        let store = full_store();
3491        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3492        let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3493            input: Box::new(scan_person("a")),
3494            source_var: "missing".to_string(),
3495            target_var: "a".to_string(),
3496            edge_types: vec![],
3497            direction: ExpandDirection::Both,
3498            path_alias: "p".to_string(),
3499            all_paths: false,
3500        }));
3501        let err = planner.plan(&logical).err().expect("plan should fail");
3502        assert!(format!("{err}").contains("Source variable"));
3503    }
3504
3505    #[test]
3506    fn test_plan_map_collect_dispatch() {
3507        // Build rows with two columns named 'k' and 'v', then collect k->v into a map.
3508        let store = create_test_store();
3509        let planner = Planner::new(store);
3510        let input_with_kv = LogicalOperator::Project(crate::query::plan::ProjectOp {
3511            projections: vec![
3512                crate::query::plan::Projection {
3513                    expression: LogicalExpression::Literal(Value::String("key".into())),
3514                    alias: Some("k".to_string()),
3515                },
3516                crate::query::plan::Projection {
3517                    expression: LogicalExpression::Literal(Value::Int64(1)),
3518                    alias: Some("v".to_string()),
3519                },
3520            ],
3521            input: Box::new(scan_person("n")),
3522            pass_through_input: false,
3523        });
3524        let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3525            key_var: "k".to_string(),
3526            value_var: "v".to_string(),
3527            alias: "m".to_string(),
3528            input: Box::new(input_with_kv),
3529        }));
3530        let physical = planner.plan(&logical).unwrap();
3531        assert_eq!(physical.columns(), &["m"]);
3532    }
3533
3534    #[test]
3535    fn test_plan_map_collect_missing_key_errors() {
3536        let store = create_test_store();
3537        let planner = Planner::new(store);
3538        let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3539            key_var: "not_there".to_string(),
3540            value_var: "also_missing".to_string(),
3541            alias: "m".to_string(),
3542            input: Box::new(scan_any("n")),
3543        }));
3544        let err = planner.plan(&logical).err().expect("plan should fail");
3545        let msg = format!("{err}");
3546        assert!(msg.contains("MapCollect key"), "got: {msg}");
3547    }
3548
3549    #[test]
3550    fn test_plan_map_collect_missing_value_errors() {
3551        let store = create_test_store();
3552        let planner = Planner::new(store);
3553        // Input has column "n" so key resolves but value does not.
3554        let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3555            key_var: "n".to_string(),
3556            value_var: "missing_value".to_string(),
3557            alias: "m".to_string(),
3558            input: Box::new(scan_any("n")),
3559        }));
3560        let err = planner.plan(&logical).err().expect("plan should fail");
3561        let msg = format!("{err}");
3562        assert!(msg.contains("MapCollect value"), "got: {msg}");
3563    }
3564
3565    #[test]
3566    fn test_plan_horizontal_aggregate_missing_column_errors() {
3567        let store = create_test_store();
3568        let planner = Planner::new(store);
3569        let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3570            HorizontalAggregateOp {
3571                list_column: "not_a_column".to_string(),
3572                entity_kind: crate::query::plan::EntityKind::Edge,
3573                function: LogicalAggregateFunction::Count,
3574                property: "age".to_string(),
3575                alias: "total".to_string(),
3576                input: Box::new(scan_any("n")),
3577            },
3578        ));
3579        let err = planner.plan(&logical).err().expect("plan should fail");
3580        assert!(format!("{err}").contains("HorizontalAggregate"));
3581    }
3582
3583    #[test]
3584    fn test_plan_load_data_dispatch() {
3585        let store = create_test_store();
3586        let planner = Planner::new(store);
3587        // Path does not need to exist: planning just builds the operator.
3588        let logical = LogicalPlan::new(LogicalOperator::LoadData(LoadDataOp {
3589            format: LoadDataFormat::Csv,
3590            with_headers: true,
3591            path: "/nonexistent/data.csv".to_string(),
3592            variable: "row".to_string(),
3593            field_terminator: Some(','),
3594        }));
3595        let physical = planner.plan(&logical).unwrap();
3596        assert_eq!(physical.columns(), &["row"]);
3597    }
3598
3599    #[test]
3600    fn test_plan_multi_way_join_dispatch() {
3601        // Three-way join over three expand inputs. Some configurations may
3602        // error during planning; we just require no panic.
3603        let store = full_store();
3604        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3605        let ab = LogicalOperator::Expand(ExpandOp {
3606            from_variable: "a".to_string(),
3607            to_variable: "b".to_string(),
3608            edge_variable: None,
3609            direction: ExpandDirection::Outgoing,
3610            edge_types: vec!["KNOWS".to_string()],
3611            min_hops: 1,
3612            max_hops: Some(1),
3613            input: Box::new(scan_person("a")),
3614            path_alias: None,
3615            path_mode: PathMode::Walk,
3616        });
3617        let bc = LogicalOperator::Expand(ExpandOp {
3618            from_variable: "b".to_string(),
3619            to_variable: "c".to_string(),
3620            edge_variable: None,
3621            direction: ExpandDirection::Outgoing,
3622            edge_types: vec!["KNOWS".to_string()],
3623            min_hops: 1,
3624            max_hops: Some(1),
3625            input: Box::new(scan_person("b")),
3626            path_alias: None,
3627            path_mode: PathMode::Walk,
3628        });
3629        let ca = LogicalOperator::Expand(ExpandOp {
3630            from_variable: "c".to_string(),
3631            to_variable: "a".to_string(),
3632            edge_variable: None,
3633            direction: ExpandDirection::Outgoing,
3634            edge_types: vec!["KNOWS".to_string()],
3635            min_hops: 1,
3636            max_hops: Some(1),
3637            input: Box::new(scan_person("c")),
3638            path_alias: None,
3639            path_mode: PathMode::Walk,
3640        });
3641        let logical = LogicalPlan::new(LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3642            inputs: vec![ab, bc, ca],
3643            conditions: vec![],
3644            shared_variables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
3645        }));
3646        let _ = planner.plan(&logical);
3647    }
3648
3649    #[test]
3650    fn test_plan_horizontal_aggregate_dispatch() {
3651        // Variable-length expand produces a list column that the aggregate targets.
3652        let store = full_store();
3653        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3654
3655        let path = LogicalOperator::Expand(ExpandOp {
3656            from_variable: "a".to_string(),
3657            to_variable: "b".to_string(),
3658            edge_variable: Some("r".to_string()),
3659            direction: ExpandDirection::Outgoing,
3660            edge_types: vec!["KNOWS".to_string()],
3661            min_hops: 1,
3662            max_hops: Some(3),
3663            input: Box::new(scan_person("a")),
3664            path_alias: Some("p".to_string()),
3665            path_mode: PathMode::Walk,
3666        });
3667        // Variable-length expand emits a column named _path_edges_p.
3668        let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3669            HorizontalAggregateOp {
3670                list_column: "_path_edges_p".to_string(),
3671                entity_kind: crate::query::plan::EntityKind::Edge,
3672                function: LogicalAggregateFunction::Count,
3673                property: "weight".to_string(),
3674                alias: "edge_count".to_string(),
3675                input: Box::new(path),
3676            },
3677        ));
3678        let physical = planner.plan(&logical).unwrap();
3679        assert!(physical.columns().contains(&"edge_count".to_string()));
3680    }
3681
3682    // ==================== Cardinality estimation branches ====================
3683
3684    #[test]
3685    fn test_plan_adaptive_with_except() {
3686        let store = create_test_store();
3687        let planner = Planner::new(store);
3688        let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3689            left: Box::new(scan_person("n")),
3690            right: Box::new(scan_person("n")),
3691            all: false,
3692        }));
3693        let physical = planner.plan_adaptive(&logical).unwrap();
3694        assert!(physical.adaptive_context.is_some());
3695    }
3696
3697    #[test]
3698    fn test_plan_adaptive_with_intersect() {
3699        let store = create_test_store();
3700        let planner = Planner::new(store);
3701        let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3702            left: Box::new(scan_person("n")),
3703            right: Box::new(scan_any("n")),
3704            all: false,
3705        }));
3706        let physical = planner.plan_adaptive(&logical).unwrap();
3707        assert!(physical.adaptive_context.is_some());
3708    }
3709
3710    #[test]
3711    fn test_plan_adaptive_with_otherwise() {
3712        let store = create_test_store();
3713        let planner = Planner::new(store);
3714        let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3715            left: Box::new(scan_person("n")),
3716            right: Box::new(scan_any("n")),
3717        }));
3718        let physical = planner.plan_adaptive(&logical).unwrap();
3719        assert!(physical.adaptive_context.is_some());
3720    }
3721
3722    // ==================== count_expand_chain edge case ====================
3723
3724    #[test]
3725    fn test_count_expand_chain_variable_length_breaks_chain() {
3726        // A variable-length expand (not single-hop) should NOT count in the chain.
3727        let var_expand = LogicalOperator::Expand(ExpandOp {
3728            from_variable: "a".to_string(),
3729            to_variable: "b".to_string(),
3730            edge_variable: None,
3731            direction: ExpandDirection::Outgoing,
3732            edge_types: vec!["KNOWS".to_string()],
3733            min_hops: 1,
3734            max_hops: Some(3),
3735            input: Box::new(scan_person("a")),
3736            path_alias: None,
3737            path_mode: PathMode::Walk,
3738        });
3739        let (count, _) = Planner::count_expand_chain(&var_expand);
3740        assert_eq!(count, 0);
3741    }
3742
3743    // ==================== StaticResultOperator ====================
3744
3745    #[cfg(feature = "algos")]
3746    #[test]
3747    fn test_static_result_operator_emits_rows_and_resets() {
3748        use grafeo_common::types::Value;
3749        let rows = vec![
3750            vec![Value::Int64(1), Value::String("Vincent".into())],
3751            vec![Value::Int64(2), Value::String("Jules".into())],
3752        ];
3753        let mut op = StaticResultOperator {
3754            rows,
3755            column_indices: vec![0, 1],
3756            row_index: 0,
3757        };
3758        assert_eq!(op.name(), "StaticResult");
3759        let chunk = op.next().unwrap().expect("first chunk");
3760        assert_eq!(chunk.row_count(), 2);
3761        // Exhausted.
3762        assert!(op.next().unwrap().is_none());
3763        // Reset allows re-emitting.
3764        op.reset();
3765        assert!(op.next().unwrap().is_some());
3766        // into_any round trip.
3767        let boxed: Box<dyn Operator> = Box::new(StaticResultOperator {
3768            rows: vec![vec![Value::Null]],
3769            column_indices: vec![0],
3770            row_index: 0,
3771        });
3772        let _any = boxed.into_any();
3773    }
3774
3775    // ==================== VectorScan k-bounding regression ====================
3776
3777    /// `VectorScanOp.k = None` means "return every match," but feeding
3778    /// `usize::MAX` into the store's vector_search degrades HNSW to a full
3779    /// traversal and (pre-saturating-mul) could overflow quantized rescore
3780    /// paths. The planner must bound k to the label's node count (or the
3781    /// global count when no label is set, or zero when the label is unknown).
3782    ///
3783    /// We can't inspect VectorScanOperator.k directly — it's private — so the
3784    /// assertion here is that planning succeeds across all three branches
3785    /// without panicking or surfacing an internal-state error. Paired with
3786    /// `LpgStore::nodes_by_label_count` tests, this guards the full path.
3787    #[cfg(feature = "vector-index")]
3788    #[test]
3789    fn test_plan_vector_scan_k_none_bounds_across_label_states() {
3790        use crate::query::plan::VectorScanOp;
3791
3792        let store = create_test_store();
3793        // Sanity: the test store has 2 Person nodes, 1 Company, 0 Unknown.
3794        assert_eq!(store.nodes_by_label_count("Person"), 2);
3795        assert_eq!(store.nodes_by_label_count("Unknown"), 0);
3796        assert_eq!(store.node_count(), 3);
3797
3798        let planner = Planner::new(store);
3799        let make_scan = |label: Option<&str>| VectorScanOp {
3800            variable: "n".to_string(),
3801            index_name: None,
3802            property: "embedding".to_string(),
3803            label: label.map(str::to_string),
3804            query_vector: LogicalExpression::Literal(Value::List(
3805                vec![
3806                    Value::Float64(1.0),
3807                    Value::Float64(0.0),
3808                    Value::Float64(0.0),
3809                ]
3810                .into(),
3811            )),
3812            k: None,
3813            metric: Some(VectorMetric::Cosine),
3814            min_similarity: Some(0.5),
3815            max_distance: None,
3816            input: None,
3817        };
3818
3819        for label in [Some("Person"), Some("Unknown"), None] {
3820            let (_op, cols) = planner
3821                .plan_vector_scan(&make_scan(label))
3822                .unwrap_or_else(|e| panic!("plan_vector_scan failed for {label:?}: {e:?}"));
3823            assert_eq!(cols[0], "n", "variable column must be first for {label:?}");
3824        }
3825    }
3826}