Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5//!
6//! # Structure
7//!
8//! Logical operators are consumed by `plan_operator`, dispatched to
9//! per-operator submodules that share the crate-private `Planner`
10//! struct:
11//!
12//! | Submodule    | Owns                                                         |
13//! |--------------|--------------------------------------------------------------|
14//! | `scan`       | `plan_node_scan`: label, property, and index-seek scans      |
15//! | `expand`     | single-hop and multi-hop expansion (factorized chain)        |
16//! | `filter`     | predicate rewriting, zone maps, index pushdown, EXISTS/COUNT |
17//! | `expression` | `LogicalExpression` -> `FilterExpression` conversion         |
18//! | `join`       | hash, nested-loop, leapfrog, multi-way, semi/anti            |
19//! | `aggregate`, `project`, `mutation` | the rest of the pipeline               |
20//!
21//! # Rewrites applied during planning
22//!
23//! The planner is not a pure tree walker: it performs several shape
24//! transforms up-front because they decide which *physical* operator is
25//! legal, not just which is faster. Each rewrite lives in the submodule
26//! that owns the corresponding logical operator.
27//!
28//! 1. **Factorized expand chains** (`expand::plan_expand_chain`):
29//!    two or more consecutive `Expand`s are collapsed into a single
30//!    `LazyFactorizedChainOperator`. A hand-rolled tree of `Expand`
31//!    operators would materialise the Cartesian product between hops
32//!    (O(d^k) rows for a k-hop query with average degree d); factorisation
33//!    represents each hop as a set per row and evaluates upward filters
34//!    lazily, which is asymptotically better *and* preserves filter
35//!    semantics (filters apply to the flattened view). Disabled when
36//!    `factorized_execution` is false on the planner, used as an escape
37//!    hatch for operators that need materialised intermediate state.
38//!
39//! 2. **EXISTS / NOT EXISTS as semi/anti joins** (`filter::extract_complex_exists`):
40//!    anything more than a trivial single-hop EXISTS is rewritten to a
41//!    semi-join (EXISTS) or anti-join (NOT EXISTS) before the filter is
42//!    planned. A literal nested re-execution would re-scan the subquery
43//!    per outer row; the join form piggy-backs on the regular hash-join
44//!    infrastructure. The fast path in `expression::convert_expression`
45//!    keeps trivial single-hop EXISTS as inline predicates so small
46//!    queries stay scan-local.
47//!
48//! 3. **EXISTS inside OR** (`filter::extract_exists_from_or`):
49//!    semi-joins filter rows and therefore compose incorrectly with the
50//!    disjunctive scalar side of an OR. The rewrite splits the OR into a
51//!    semi-join branch and a regular filter branch, UNIONNs them, and
52//!    deduplicates. Without the split, rows satisfied only by the scalar
53//!    side would be dropped.
54//!
55//! 4. **COUNT subquery comparisons** (`filter::extract_count_comparison`):
56//!    `COUNT { ... } op N` rewrites into Apply + Aggregate + Filter.
57//!    Evaluating the COUNT inline per row would be a correlated
58//!    subquery; Apply + Aggregate reuses the hash-aggregate physical
59//!    operator.
60//!
61//! # Filter pushdown and index selection
62//!
63//! `filter::plan_filter` is the central point. It tries a sequence of
64//! optimisations before falling back to a generic `FilterOperator`; the
65//! ordering is load-bearing:
66//!
67//! 1. **Complex EXISTS / OR / COUNT rewrites** (above). These must run
68//!    first because later steps assume a pure-scalar predicate.
69//! 2. **Zone-map short-circuit**: if the column's zone summary proves the
70//!    predicate cannot match any row group, plan an `EmptyOperator`.
71//! 3. **Property index seek**: equality on an indexed property becomes an
72//!    index scan (O(1) per lookup) instead of a filtered full scan.
73//! 4. **Range index lookup**: `>`, `<`, `>=`, `<=` on an indexed
74//!    property becomes a range scan.
75//! 5. **Hybrid vector+text pushdown** (feature-gated): compound
76//!    AND/OR predicates over `vector_match` / `text_match` are folded
77//!    into a single fused scan.
78//! 6. **Generic `FilterOperator`**: the residual predicate wraps the
79//!    input scan.
80//!
81//! Whatever *remains* after a pushdown (e.g. the non-index-backed half
82//! of an AND) is planned as a residual filter on top of the chosen scan,
83//! so the returned tree is always semantically equivalent to the
84//! input plan.
85//!
86//! # Transaction context
87//!
88//! Every operator inherits the planner's `viewing_epoch` and
89//! `transaction_id`. Factorised chains and mutations carry them
90//! through `with_transaction_context()` so MVCC visibility is applied
91//! at every hop: see [`crate::transaction`] for the epoch/visibility
92//! rules.
93
94mod aggregate;
95mod expand;
96mod expression;
97mod filter;
98mod filter_hybrid;
99mod join;
100mod mutation;
101mod project;
102mod scan;
103
104#[cfg(feature = "algos")]
105use crate::query::plan::CallProcedureOp;
106#[cfg(feature = "text-index")]
107use crate::query::plan::TextScanOp;
108use crate::query::plan::{
109    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
110    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
111    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
112    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
113    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
114    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
115    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
116};
117#[cfg(feature = "vector-index")]
118use crate::query::plan::{VectorMetric, VectorScanOp};
119use grafeo_common::grafeo_debug_span;
120use grafeo_common::types::{EpochId, TransactionId};
121use grafeo_common::types::{LogicalType, Value};
122use grafeo_common::utils::error::{Error, Result};
123use grafeo_core::execution::AdaptiveContext;
124use grafeo_core::execution::operators::{
125    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
126    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
127    DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
128    ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
129    FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
130    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
131    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
132    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
133    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
134    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
135    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
136    VariableLengthExpandOperator,
137};
138use grafeo_core::graph::{Direction, GraphStoreMut, GraphStoreSearch};
139use std::collections::HashMap;
140use std::sync::Arc;
141
142use crate::query::planner::common;
143use crate::query::planner::common::expression_to_string;
144use crate::query::planner::{
145    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
146    convert_unary_op, value_to_logical_type,
147};
148use crate::transaction::TransactionManager;
149
150/// Range bounds for property-based range queries.
151struct RangeBounds<'a> {
152    min: Option<&'a Value>,
153    max: Option<&'a Value>,
154    min_inclusive: bool,
155    max_inclusive: bool,
156}
157
158/// Converts a logical plan to a physical operator tree for LPG stores.
159pub struct Planner {
160    /// The graph store (read-only operations).
161    pub(super) store: Arc<dyn GraphStoreSearch>,
162    /// Writable graph store (None for read-only databases).
163    pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
164    /// Transaction manager for MVCC operations.
165    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
166    /// Current transaction ID (if in a transaction).
167    pub(super) transaction_id: Option<TransactionId>,
168    /// Epoch to use for visibility checks.
169    pub(super) viewing_epoch: EpochId,
170    /// Counter for generating unique anonymous edge column names.
171    pub(super) anon_edge_counter: std::cell::Cell<u32>,
172    /// Whether to use factorized execution for multi-hop queries.
173    pub(super) factorized_execution: bool,
174    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
175    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
176    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
177    /// Variables that hold edge IDs (from MATCH edge patterns).
178    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
179    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
180    /// Optional constraint validator for schema enforcement during mutations.
181    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
182    /// Catalog for user-defined procedure lookup.
183    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
184    /// LPG store handle for procedures that need direct index access (vector
185    /// and text search reach HNSW / BM25 indexes owned by the LPG store).
186    #[cfg(feature = "lpg")]
187    pub(super) lpg_store: Option<Arc<grafeo_core::graph::lpg::LpgStore>>,
188    /// Shared parameter state for the currently planning correlated Apply.
189    /// Set by `plan_apply` before planning the inner operator, consumed by
190    /// `plan_operator` when encountering `ParameterScan`.
191    pub(super) correlated_param_state:
192        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
193    /// Variables from variable-length expand patterns (group-list variables).
194    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
195    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
196    /// When true, each physical operator is wrapped in `ProfiledOperator`.
197    profiling: std::cell::Cell<bool>,
198    /// Profile entries collected during planning (post-order).
199    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
200    /// Optional write tracker for recording writes during mutations.
201    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
202    /// Session context for introspection functions (info, schema, current_schema, etc.).
203    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
204    /// When true, expand operators use epoch-only visibility (no MVCC version
205    /// chain walks).  Set when the plan contains no mutations, so PENDING
206    /// writes are impossible to observe.
207    pub(super) read_only: bool,
208}
209
210impl Planner {
211    /// Creates a new planner with the given store.
212    ///
213    /// This creates a planner without transaction context, using the current
214    /// epoch from the store for visibility.
215    #[must_use]
216    pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
217        let epoch = store.current_epoch();
218        Self {
219            store,
220            write_store: None,
221            transaction_manager: None,
222            transaction_id: None,
223            viewing_epoch: epoch,
224            anon_edge_counter: std::cell::Cell::new(0),
225            factorized_execution: true,
226            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
227            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
228            validator: None,
229            catalog: None,
230            #[cfg(feature = "lpg")]
231            lpg_store: None,
232            correlated_param_state: std::cell::RefCell::new(None),
233            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
234            profiling: std::cell::Cell::new(false),
235            profile_entries: std::cell::RefCell::new(Vec::new()),
236            write_tracker: None,
237            session_context: grafeo_core::execution::operators::SessionContext::default(),
238            read_only: false,
239        }
240    }
241
242    /// Creates a new planner with transaction context for MVCC-aware planning.
243    #[must_use]
244    pub fn with_context(
245        store: Arc<dyn GraphStoreSearch>,
246        write_store: Option<Arc<dyn GraphStoreMut>>,
247        transaction_manager: Arc<TransactionManager>,
248        transaction_id: Option<TransactionId>,
249        viewing_epoch: EpochId,
250    ) -> Self {
251        use crate::transaction::TransactionWriteTracker;
252
253        // Create write tracker when there's an active transaction
254        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
255            if transaction_id.is_some() {
256                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
257                    &transaction_manager,
258                ))))
259            } else {
260                None
261            };
262
263        Self {
264            store,
265            write_store,
266            transaction_manager: Some(transaction_manager),
267            transaction_id,
268            viewing_epoch,
269            anon_edge_counter: std::cell::Cell::new(0),
270            factorized_execution: true,
271            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
272            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
273            validator: None,
274            catalog: None,
275            #[cfg(feature = "lpg")]
276            lpg_store: None,
277            correlated_param_state: std::cell::RefCell::new(None),
278            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
279            profiling: std::cell::Cell::new(false),
280            profile_entries: std::cell::RefCell::new(Vec::new()),
281            write_tracker,
282            session_context: grafeo_core::execution::operators::SessionContext::default(),
283            read_only: false,
284        }
285    }
286
287    /// Marks this planner as planning a read-only query (no mutations),
288    /// enabling fast-path visibility checks in expand operators.
289    #[must_use]
290    pub fn with_read_only(mut self, read_only: bool) -> Self {
291        self.read_only = read_only;
292        self
293    }
294
295    /// Returns the writable store, or `TransactionError::ReadOnly` if unavailable.
296    fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
297        self.write_store
298            .as_ref()
299            .map(Arc::clone)
300            .ok_or(Error::Transaction(
301                grafeo_common::utils::error::TransactionError::ReadOnly,
302            ))
303    }
304
305    /// Returns the viewing epoch for this planner.
306    #[must_use]
307    pub fn viewing_epoch(&self) -> EpochId {
308        self.viewing_epoch
309    }
310
311    /// Returns the transaction ID for this planner, if any.
312    #[must_use]
313    pub fn transaction_id(&self) -> Option<TransactionId> {
314        self.transaction_id
315    }
316
317    /// Returns a reference to the transaction manager, if available.
318    #[must_use]
319    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
320        self.transaction_manager.as_ref()
321    }
322
323    /// Enables or disables factorized execution for multi-hop queries.
324    #[must_use]
325    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
326        self.factorized_execution = enabled;
327        self
328    }
329
330    /// Sets the constraint validator for schema enforcement during mutations.
331    #[must_use]
332    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
333        self.validator = Some(validator);
334        self
335    }
336
337    /// Sets the catalog for user-defined procedure lookup.
338    #[must_use]
339    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
340        self.catalog = Some(catalog);
341        self
342    }
343
344    /// Attaches an LPG store handle so `CALL grafeo.search.*` procedures can
345    /// reach the vector and text indexes.
346    #[cfg(feature = "lpg")]
347    #[must_use]
348    pub fn with_lpg_store(mut self, lpg_store: Arc<grafeo_core::graph::lpg::LpgStore>) -> Self {
349        self.lpg_store = Some(lpg_store);
350        self
351    }
352
353    /// Sets the session context for introspection functions.
354    #[must_use]
355    pub fn with_session_context(
356        mut self,
357        context: grafeo_core::execution::operators::SessionContext,
358    ) -> Self {
359        self.session_context = context;
360        self
361    }
362
363    /// Generates an edge column name from an expand's edge variable (or an
364    /// anonymous fallback) and registers it in `edge_columns` so downstream
365    /// RETURN emits `EdgeResolve` instead of `NodeResolve`.
366    ///
367    /// Returns the column name for the caller to push into its output columns.
368    pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
369        let name = edge_variable.clone().unwrap_or_else(|| {
370            let count = self.anon_edge_counter.get();
371            self.anon_edge_counter.set(count + 1);
372            format!("_anon_edge_{}", count)
373        });
374        self.edge_columns.borrow_mut().insert(name.clone());
375        name
376    }
377
378    /// Counts consecutive single-hop expand operations.
379    ///
380    /// Returns the count and the deepest non-expand operator (the base of the chain).
381    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
382        match op {
383            LogicalOperator::Expand(expand) => {
384                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
385
386                if is_single_hop {
387                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
388                    (inner_count + 1, base)
389                } else {
390                    (0, op)
391                }
392            }
393            _ => (0, op),
394        }
395    }
396
397    /// Collects expand operations from the outermost down to the base.
398    ///
399    /// Returns expands in order from innermost (base) to outermost.
400    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
401        let mut chain = Vec::new();
402        let mut current = op;
403
404        while let LogicalOperator::Expand(expand) = current {
405            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
406            if !is_single_hop {
407                break;
408            }
409            chain.push(expand);
410            current = &expand.input;
411        }
412
413        chain.reverse();
414        chain
415    }
416
417    /// Plans a logical plan into a physical operator.
418    ///
419    /// # Errors
420    ///
421    /// Returns an error if the logical plan contains unsupported operators
422    /// or invalid expressions.
423    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
424        let _span = grafeo_debug_span!("grafeo::query::plan");
425        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
426        Ok(PhysicalPlan {
427            operator,
428            columns,
429            adaptive_context: None,
430        })
431    }
432
433    /// Plans a logical plan with profiling: each physical operator is wrapped
434    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
435    /// collect row counts and timing. Returns the physical plan together with
436    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
437    /// items in post-order (children before parents).
438    ///
439    /// # Errors
440    ///
441    /// Returns an error if the logical plan contains unsupported operators
442    /// or invalid expressions.
443    pub fn plan_profiled(
444        &self,
445        logical_plan: &LogicalPlan,
446    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
447        self.profiling.set(true);
448        self.profile_entries.borrow_mut().clear();
449
450        let result = self.plan_operator(&logical_plan.root);
451
452        self.profiling.set(false);
453        let (operator, columns) = result?;
454        let entries = self.profile_entries.borrow_mut().drain(..).collect();
455
456        Ok((
457            PhysicalPlan {
458                operator,
459                columns,
460                adaptive_context: None,
461            },
462            entries,
463        ))
464    }
465
466    /// Plans a logical plan with adaptive execution support.
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the logical plan contains unsupported operators
471    /// or invalid expressions.
472    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
473        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
474
475        let mut adaptive_context = AdaptiveContext::new();
476        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
477
478        Ok(PhysicalPlan {
479            operator,
480            columns,
481            adaptive_context: Some(adaptive_context),
482        })
483    }
484
485    /// Collects cardinality estimates from the logical plan into an adaptive context.
486    fn collect_cardinality_estimates(
487        &self,
488        op: &LogicalOperator,
489        ctx: &mut AdaptiveContext,
490        depth: usize,
491    ) {
492        match op {
493            LogicalOperator::NodeScan(scan) => {
494                let estimate = if let Some(label) = &scan.label {
495                    self.store.nodes_by_label(label).len() as f64
496                } else {
497                    self.store.node_count() as f64
498                };
499                let id = format!("scan_{}", scan.variable);
500                ctx.set_estimate(&id, estimate);
501
502                if let Some(input) = &scan.input {
503                    self.collect_cardinality_estimates(input, ctx, depth + 1);
504                }
505            }
506            LogicalOperator::Filter(filter) => {
507                let input_estimate = self.estimate_cardinality(&filter.input);
508                let estimate = input_estimate * 0.3;
509                let id = format!("filter_{depth}");
510                ctx.set_estimate(&id, estimate);
511
512                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
513            }
514            LogicalOperator::Expand(expand) => {
515                let input_estimate = self.estimate_cardinality(&expand.input);
516                let stats = self.store.statistics();
517                let avg_degree = self.estimate_expand_degree(&stats, expand);
518                let estimate = input_estimate * avg_degree;
519                let id = format!("expand_{}", expand.to_variable);
520                ctx.set_estimate(&id, estimate);
521
522                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
523            }
524            LogicalOperator::Join(join) => {
525                let left_est = self.estimate_cardinality(&join.left);
526                let right_est = self.estimate_cardinality(&join.right);
527                let estimate = (left_est * right_est).sqrt();
528                let id = format!("join_{depth}");
529                ctx.set_estimate(&id, estimate);
530
531                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
532                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
533            }
534            LogicalOperator::Aggregate(agg) => {
535                let input_estimate = self.estimate_cardinality(&agg.input);
536                let estimate = if agg.group_by.is_empty() {
537                    1.0
538                } else {
539                    (input_estimate * 0.1).max(1.0)
540                };
541                let id = format!("aggregate_{depth}");
542                ctx.set_estimate(&id, estimate);
543
544                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
545            }
546            LogicalOperator::Distinct(distinct) => {
547                let input_estimate = self.estimate_cardinality(&distinct.input);
548                let estimate = (input_estimate * 0.5).max(1.0);
549                let id = format!("distinct_{depth}");
550                ctx.set_estimate(&id, estimate);
551
552                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
553            }
554            LogicalOperator::Return(ret) => {
555                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
556            }
557            LogicalOperator::Limit(limit) => {
558                let input_estimate = self.estimate_cardinality(&limit.input);
559                let estimate = (input_estimate).min(limit.count.estimate());
560                let id = format!("limit_{depth}");
561                ctx.set_estimate(&id, estimate);
562
563                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
564            }
565            LogicalOperator::Skip(skip) => {
566                let input_estimate = self.estimate_cardinality(&skip.input);
567                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
568                let id = format!("skip_{depth}");
569                ctx.set_estimate(&id, estimate);
570
571                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
572            }
573            LogicalOperator::Sort(sort) => {
574                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
575            }
576            LogicalOperator::Union(union) => {
577                let estimate: f64 = union
578                    .inputs
579                    .iter()
580                    .map(|input| self.estimate_cardinality(input))
581                    .sum();
582                let id = format!("union_{depth}");
583                ctx.set_estimate(&id, estimate);
584
585                for input in &union.inputs {
586                    self.collect_cardinality_estimates(input, ctx, depth + 1);
587                }
588            }
589            _ => {
590                // For other operators, try to recurse into known input patterns
591            }
592        }
593    }
594
595    /// Estimates cardinality for a logical operator subtree.
596    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
597        match op {
598            LogicalOperator::NodeScan(scan) => {
599                if let Some(label) = &scan.label {
600                    self.store.nodes_by_label(label).len() as f64
601                } else {
602                    self.store.node_count() as f64
603                }
604            }
605            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
606            LogicalOperator::Expand(expand) => {
607                let stats = self.store.statistics();
608                let avg_degree = self.estimate_expand_degree(&stats, expand);
609                self.estimate_cardinality(&expand.input) * avg_degree
610            }
611            LogicalOperator::Join(join) => {
612                let left = self.estimate_cardinality(&join.left);
613                let right = self.estimate_cardinality(&join.right);
614                (left * right).sqrt()
615            }
616            LogicalOperator::Aggregate(agg) => {
617                if agg.group_by.is_empty() {
618                    1.0
619                } else {
620                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
621                }
622            }
623            LogicalOperator::Distinct(distinct) => {
624                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
625            }
626            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
627            LogicalOperator::Limit(limit) => self
628                .estimate_cardinality(&limit.input)
629                .min(limit.count.estimate()),
630            LogicalOperator::Skip(skip) => {
631                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
632            }
633            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
634            LogicalOperator::Union(union) => union
635                .inputs
636                .iter()
637                .map(|input| self.estimate_cardinality(input))
638                .sum(),
639            LogicalOperator::Except(except) => {
640                let left = self.estimate_cardinality(&except.left);
641                let right = self.estimate_cardinality(&except.right);
642                (left - right).max(0.0)
643            }
644            LogicalOperator::Intersect(intersect) => {
645                let left = self.estimate_cardinality(&intersect.left);
646                let right = self.estimate_cardinality(&intersect.right);
647                left.min(right)
648            }
649            LogicalOperator::Otherwise(otherwise) => self
650                .estimate_cardinality(&otherwise.left)
651                .max(self.estimate_cardinality(&otherwise.right)),
652            _ => 1000.0,
653        }
654    }
655
656    /// Estimates the average edge degree for an expand operation using store statistics.
657    fn estimate_expand_degree(
658        &self,
659        stats: &grafeo_core::statistics::Statistics,
660        expand: &ExpandOp,
661    ) -> f64 {
662        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
663        if expand.edge_types.len() == 1 {
664            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
665        } else if stats.total_nodes > 0 {
666            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
667        } else {
668            10.0
669        }
670    }
671
672    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
673    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
674    fn maybe_profile(
675        &self,
676        result: Result<(Box<dyn Operator>, Vec<String>)>,
677        op: &LogicalOperator,
678    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
679        if self.profiling.get() {
680            let (physical, columns) = result?;
681            let (entry, stats) =
682                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
683            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
684            self.profile_entries.borrow_mut().push(entry);
685            Ok((Box::new(profiled), columns))
686        } else {
687            result
688        }
689    }
690
691    /// Plans a single logical operator.
692    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
693        let result = match op {
694            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
695            LogicalOperator::Expand(expand) => {
696                // Factorized chain kicks in only when it actually helps:
697                // chain_len >= 2 means at least two consecutive Expands, where
698                // separate plans would Cartesian-product per hop. For a single
699                // hop, the plain `plan_expand` path is cheaper and integrates
700                // more naturally with adjacent operators.
701                if self.factorized_execution {
702                    let (chain_len, _base) = Self::count_expand_chain(op);
703                    if chain_len >= 2 {
704                        return self.maybe_profile(self.plan_expand_chain(op), op);
705                    }
706                }
707                self.plan_expand(expand)
708            }
709            LogicalOperator::Return(ret) => self.plan_return(ret),
710            LogicalOperator::Filter(filter) => self.plan_filter(filter),
711            LogicalOperator::Project(project) => self.plan_project(project),
712            LogicalOperator::Limit(limit) => self.plan_limit(limit),
713            LogicalOperator::Skip(skip) => self.plan_skip(skip),
714            LogicalOperator::Sort(sort) => self.plan_sort(sort),
715            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
716            LogicalOperator::Join(join) => self.plan_join(join),
717            LogicalOperator::Union(union) => self.plan_union(union),
718            LogicalOperator::Except(except) => self.plan_except(except),
719            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
720            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
721            LogicalOperator::Apply(apply) => self.plan_apply(apply),
722            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
723            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
724            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
725            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
726            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
727            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
728            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
729            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
730            LogicalOperator::Merge(merge) => self.plan_merge(merge),
731            LogicalOperator::MergeRelationship(merge_rel) => {
732                self.plan_merge_relationship(merge_rel)
733            }
734            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
735            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
736            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
737            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
738            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
739            #[cfg(feature = "algos")]
740            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
741            #[cfg(not(feature = "algos"))]
742            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
743                "CALL procedures require the 'algos' feature".to_string(),
744            )),
745            LogicalOperator::ParameterScan(_param_scan) => {
746                let state = self
747                    .correlated_param_state
748                    .borrow()
749                    .clone()
750                    .ok_or_else(|| {
751                        Error::Internal(
752                            "ParameterScan without correlated Apply context".to_string(),
753                        )
754                    })?;
755                // Use the actual column names from the ParameterState (which may
756                // have been expanded from "*" to real variable names in plan_apply)
757                let columns = state.columns.clone();
758                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
759                Ok((operator, columns))
760            }
761            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
762            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
763            LogicalOperator::LoadData(load) => {
764                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
765                    load.path.clone(),
766                    load.format,
767                    load.with_headers,
768                    load.field_terminator,
769                    load.variable.clone(),
770                ));
771                Ok((operator, vec![load.variable.clone()]))
772            }
773            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
774            #[cfg(feature = "vector-index")]
775            LogicalOperator::VectorScan(scan) => self.plan_vector_scan(scan),
776            #[cfg(not(feature = "vector-index"))]
777            LogicalOperator::VectorScan(_) => Err(Error::Internal(
778                "VectorScan requires vector-index feature".to_string(),
779            )),
780            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
781                "VectorJoin requires vector-index feature".to_string(),
782            )),
783            #[cfg(feature = "text-index")]
784            LogicalOperator::TextScan(scan) => self.plan_text_scan(scan),
785            #[cfg(not(feature = "text-index"))]
786            LogicalOperator::TextScan(_) => Err(Error::Internal(
787                "TextScan requires text-index feature".to_string(),
788            )),
789            _ => Err(Error::Internal(format!(
790                "Unsupported operator: {:?}",
791                std::mem::discriminant(op)
792            ))),
793        };
794        self.maybe_profile(result, op)
795    }
796
797    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
798    fn plan_horizontal_aggregate(
799        &self,
800        ha: &HorizontalAggregateOp,
801    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
802        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
803
804        let list_col_idx = child_columns
805            .iter()
806            .position(|c| c == &ha.list_column)
807            .ok_or_else(|| {
808                Error::Internal(format!(
809                    "HorizontalAggregate list column '{}' not found in {:?}",
810                    ha.list_column, child_columns
811                ))
812            })?;
813
814        let entity_kind = match ha.entity_kind {
815            LogicalEntityKind::Edge => EntityKind::Edge,
816            LogicalEntityKind::Node => EntityKind::Node,
817        };
818
819        let function = convert_aggregate_function(ha.function);
820        let input_column_count = child_columns.len();
821
822        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
823            child_op,
824            list_col_idx,
825            entity_kind,
826            function,
827            ha.property.clone(),
828            Arc::clone(&self.store) as Arc<dyn GraphStoreSearch>,
829            input_column_count,
830        ));
831
832        let mut columns = child_columns;
833        columns.push(ha.alias.clone());
834        // Mark the result as a scalar column
835        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
836
837        Ok((operator, columns))
838    }
839
840    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
841    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
842        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
843        let key_idx = child_columns
844            .iter()
845            .position(|c| c == &mc.key_var)
846            .ok_or_else(|| {
847                Error::Internal(format!(
848                    "MapCollect key '{}' not in columns {:?}",
849                    mc.key_var, child_columns
850                ))
851            })?;
852        let value_idx = child_columns
853            .iter()
854            .position(|c| c == &mc.value_var)
855            .ok_or_else(|| {
856                Error::Internal(format!(
857                    "MapCollect value '{}' not in columns {:?}",
858                    mc.value_var, child_columns
859                ))
860            })?;
861        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
862        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
863        Ok((operator, vec![mc.alias.clone()]))
864    }
865
866    /// Plans a text search scan operator using BM25 inverted index.
867    #[cfg(feature = "text-index")]
868    fn plan_text_scan(&self, scan: &TextScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
869        use grafeo_core::execution::operators::TextScanOperator;
870
871        let query_string = match &scan.query {
872            LogicalExpression::Literal(Value::String(s)) => s.to_string(),
873            LogicalExpression::Parameter(name) => {
874                return Err(Error::Internal(format!(
875                    "TextScan query parameter ${} not resolved",
876                    name
877                )));
878            }
879            _ => {
880                return Err(Error::Internal(
881                    "TextScan query must be a string literal or parameter".to_string(),
882                ));
883            }
884        };
885
886        let operator: Box<dyn Operator> = if let Some(k) = scan.k {
887            Box::new(TextScanOperator::top_k(
888                Arc::clone(&self.store),
889                &scan.label,
890                &scan.property,
891                &query_string,
892                k,
893            ))
894        } else if let Some(threshold) = scan.threshold {
895            Box::new(TextScanOperator::with_threshold(
896                Arc::clone(&self.store),
897                &scan.label,
898                &scan.property,
899                &query_string,
900                threshold,
901            ))
902        } else {
903            Box::new(TextScanOperator::top_k(
904                Arc::clone(&self.store),
905                &scan.label,
906                &scan.property,
907                &query_string,
908                100,
909            ))
910        };
911
912        let mut columns = vec![scan.variable.clone()];
913        if let Some(ref score_col) = scan.score_column {
914            columns.push(score_col.clone());
915        }
916
917        Ok((operator, columns))
918    }
919
920    /// Plans a VectorScan logical operator into a physical VectorScanOperator.
921    #[cfg(feature = "vector-index")]
922    pub(super) fn plan_vector_scan(
923        &self,
924        scan: &VectorScanOp,
925    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
926        use grafeo_core::execution::operators::VectorScanOperator;
927        use grafeo_core::index::vector::DistanceMetric;
928
929        // Hybrid shape `VectorScan(input=graph_pattern)` is not supported by
930        // the physical VectorScanOperator: it has no input slot and would
931        // silently drop upstream bindings. Reject rather than plan it
932        // incorrectly; callers should build a VectorJoin for this case.
933        if scan.input.is_some() {
934            return Err(Error::Internal(
935                "VectorScan with an input subtree is not supported, use VectorJoin for hybrid graph+vector queries".to_string(),
936            ));
937        }
938
939        let query_vec = self.resolve_vector_literal(&scan.query_vector)?;
940
941        let requested_metric = scan.metric.map(|m| match m {
942            VectorMetric::Cosine => DistanceMetric::Cosine,
943            VectorMetric::Euclidean => DistanceMetric::Euclidean,
944            VectorMetric::DotProduct => DistanceMetric::DotProduct,
945            VectorMetric::Manhattan => DistanceMetric::Manhattan,
946        });
947
948        // Top-k mode uses HNSW when available; threshold/unbounded mode
949        // bounds k to the label's node count so we don't feed usize::MAX
950        // into HNSW (which degrades to full traversal and risks overflow
951        // in quantized rescore paths even with saturating_mul).
952        let k = scan.k.unwrap_or_else(|| {
953            scan.label.as_ref().map_or_else(
954                || self.store.node_count(),
955                |l| self.store.nodes_by_label_count(l),
956            )
957        });
958
959        // Pick the metric we'll execute under. When the user asked for a
960        // specific one, honor it. Otherwise inherit the index's metric (so a
961        // cosine-built index drives cosine scoring) or default to Cosine for
962        // the unindexed brute-force path.
963        let index_metric = scan
964            .label
965            .as_ref()
966            .and_then(|label| self.store.vector_index_metric(label, &scan.property));
967        let metric = requested_metric
968            .or(index_metric)
969            .unwrap_or(DistanceMetric::Cosine);
970
971        // The store's vector_search routes HNSW when an index exists whose
972        // metric matches `metric`, and brute-force scan otherwise. No handle
973        // downcast; no Arc<dyn Any>.
974        let mut operator = VectorScanOperator::new(
975            Arc::clone(&self.store),
976            scan.label.clone(),
977            scan.property.clone(),
978            query_vec,
979            k,
980            metric,
981        );
982
983        if let Some(sim) = scan.min_similarity {
984            operator = operator.with_min_similarity(sim);
985        }
986        if let Some(dist) = scan.max_distance {
987            operator = operator.with_max_distance(dist);
988        }
989
990        let mut columns = vec![scan.variable.clone()];
991        // VectorScan always projects a score column keyed by the resolved
992        // metric (after index-driven fallback) so downstream score reuse
993        // matches the actual distance values written out.
994        let metric_tag = match metric {
995            DistanceMetric::Cosine => "cos",
996            DistanceMetric::Euclidean => "euc",
997            DistanceMetric::DotProduct => "dot",
998            DistanceMetric::Manhattan => "man",
999            // Future metrics (DistanceMetric is #[non_exhaustive]) fall back
1000            // to a generic tag so they still produce a stable column name.
1001            _ => "other",
1002        };
1003        columns.push(project::vector_score_column_name(
1004            metric_tag,
1005            &scan.property,
1006            &scan.variable,
1007            &scan.query_vector,
1008        ));
1009
1010        Ok((Box::new(operator), columns))
1011    }
1012
1013    /// Resolves a LogicalExpression to a Vec<f32> for vector operations.
1014    #[cfg(feature = "vector-index")]
1015    pub(super) fn resolve_vector_literal(&self, expr: &LogicalExpression) -> Result<Vec<f32>> {
1016        // f64→f32 precision loss throughout is intentional: vectors are stored and searched as f32.
1017        #[allow(clippy::cast_possible_truncation)]
1018        match expr {
1019            LogicalExpression::Literal(Value::Vector(v)) => Ok(v.to_vec()),
1020            LogicalExpression::Literal(Value::List(list)) => {
1021                let mut vec = Vec::with_capacity(list.len());
1022                for item in list.iter() {
1023                    match item {
1024                        Value::Float64(f) => vec.push(*f as f32),
1025                        Value::Int64(i) => vec.push(*i as f32),
1026                        _ => {
1027                            return Err(Error::Internal(
1028                                "Vector elements must be numeric".to_string(),
1029                            ));
1030                        }
1031                    }
1032                }
1033                Ok(vec)
1034            }
1035            // GQL/Cypher parser produces List([Literal(Float64), ...]) for inline vectors like
1036            // [0.9, 0.1, 0.0] — handle this form by recursively resolving each element.
1037            LogicalExpression::List(items) => {
1038                let mut vec = Vec::with_capacity(items.len());
1039                for item in items {
1040                    match item {
1041                        LogicalExpression::Literal(Value::Float64(f)) => vec.push(*f as f32),
1042                        LogicalExpression::Literal(Value::Int64(i)) => vec.push(*i as f32),
1043                        _ => {
1044                            return Err(Error::Internal(
1045                                "Vector elements must be numeric literals".to_string(),
1046                            ));
1047                        }
1048                    }
1049                }
1050                Ok(vec)
1051            }
1052            _ => Err(Error::Internal("Expected vector literal".to_string())),
1053        }
1054    }
1055}
1056
1057/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
1058#[cfg(feature = "algos")]
1059struct StaticResultOperator {
1060    rows: Vec<Vec<Value>>,
1061    column_indices: Vec<usize>,
1062    row_index: usize,
1063}
1064
1065#[cfg(feature = "algos")]
1066impl Operator for StaticResultOperator {
1067    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
1068        use grafeo_core::execution::DataChunk;
1069
1070        if self.row_index >= self.rows.len() {
1071            return Ok(None);
1072        }
1073
1074        let remaining = self.rows.len() - self.row_index;
1075        let chunk_rows = remaining.min(1024);
1076        let col_count = self.column_indices.len();
1077
1078        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
1079        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
1080
1081        for row_offset in 0..chunk_rows {
1082            let row = &self.rows[self.row_index + row_offset];
1083            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
1084                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
1085                if let Some(col) = chunk.column_mut(col_idx) {
1086                    col.push_value(value);
1087                }
1088            }
1089        }
1090        chunk.set_count(chunk_rows);
1091
1092        self.row_index += chunk_rows;
1093        Ok(Some(chunk))
1094    }
1095
1096    fn reset(&mut self) {
1097        self.row_index = 0;
1098    }
1099
1100    fn name(&self) -> &'static str {
1101        "StaticResult"
1102    }
1103
1104    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
1105        self
1106    }
1107}
1108
1109#[cfg(test)]
1110mod tests {
1111    use super::*;
1112    use crate::query::plan::{
1113        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
1114        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
1115        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
1116        SkipOp as LogicalSkipOp, SortKey, SortOp,
1117    };
1118    use grafeo_common::types::Value;
1119    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
1120    use grafeo_core::graph::GraphStoreMut;
1121    use grafeo_core::graph::lpg::LpgStore;
1122
1123    fn create_test_store() -> Arc<LpgStore> {
1124        let store = Arc::new(LpgStore::new().unwrap());
1125        store.create_node(&["Person"]);
1126        store.create_node(&["Person"]);
1127        store.create_node(&["Company"]);
1128        store
1129    }
1130
1131    // ==================== Simple Scan Tests ====================
1132
1133    #[test]
1134    fn test_plan_simple_scan() {
1135        let store = create_test_store();
1136        let planner = Planner::new(store);
1137
1138        // MATCH (n:Person) RETURN n
1139        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1140            items: vec![ReturnItem {
1141                expression: LogicalExpression::Variable("n".to_string()),
1142                alias: None,
1143            }],
1144            distinct: false,
1145            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1146                variable: "n".to_string(),
1147                label: Some("Person".to_string()),
1148                input: None,
1149            })),
1150        }));
1151
1152        let physical = planner.plan(&logical).unwrap();
1153        assert_eq!(physical.columns(), &["n"]);
1154    }
1155
1156    #[test]
1157    fn test_plan_scan_without_label() {
1158        let store = create_test_store();
1159        let planner = Planner::new(store);
1160
1161        // MATCH (n) RETURN n
1162        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1163            items: vec![ReturnItem {
1164                expression: LogicalExpression::Variable("n".to_string()),
1165                alias: None,
1166            }],
1167            distinct: false,
1168            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1169                variable: "n".to_string(),
1170                label: None,
1171                input: None,
1172            })),
1173        }));
1174
1175        let physical = planner.plan(&logical).unwrap();
1176        assert_eq!(physical.columns(), &["n"]);
1177    }
1178
1179    #[test]
1180    fn test_plan_return_with_alias() {
1181        let store = create_test_store();
1182        let planner = Planner::new(store);
1183
1184        // MATCH (n:Person) RETURN n AS person
1185        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1186            items: vec![ReturnItem {
1187                expression: LogicalExpression::Variable("n".to_string()),
1188                alias: Some("person".to_string()),
1189            }],
1190            distinct: false,
1191            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1192                variable: "n".to_string(),
1193                label: Some("Person".to_string()),
1194                input: None,
1195            })),
1196        }));
1197
1198        let physical = planner.plan(&logical).unwrap();
1199        assert_eq!(physical.columns(), &["person"]);
1200    }
1201
1202    #[test]
1203    fn test_plan_return_property() {
1204        let store = create_test_store();
1205        let planner = Planner::new(store);
1206
1207        // MATCH (n:Person) RETURN n.name
1208        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1209            items: vec![ReturnItem {
1210                expression: LogicalExpression::Property {
1211                    variable: "n".to_string(),
1212                    property: "name".to_string(),
1213                },
1214                alias: None,
1215            }],
1216            distinct: false,
1217            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1218                variable: "n".to_string(),
1219                label: Some("Person".to_string()),
1220                input: None,
1221            })),
1222        }));
1223
1224        let physical = planner.plan(&logical).unwrap();
1225        assert_eq!(physical.columns(), &["n.name"]);
1226    }
1227
1228    #[test]
1229    fn test_plan_return_literal() {
1230        let store = create_test_store();
1231        let planner = Planner::new(store);
1232
1233        // MATCH (n) RETURN 42 AS answer
1234        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1235            items: vec![ReturnItem {
1236                expression: LogicalExpression::Literal(Value::Int64(42)),
1237                alias: Some("answer".to_string()),
1238            }],
1239            distinct: false,
1240            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1241                variable: "n".to_string(),
1242                label: None,
1243                input: None,
1244            })),
1245        }));
1246
1247        let physical = planner.plan(&logical).unwrap();
1248        assert_eq!(physical.columns(), &["answer"]);
1249    }
1250
1251    // ==================== Filter Tests ====================
1252
1253    #[test]
1254    fn test_plan_filter_equality() {
1255        let store = create_test_store();
1256        let planner = Planner::new(store);
1257
1258        // MATCH (n:Person) WHERE n.age = 30 RETURN n
1259        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1260            items: vec![ReturnItem {
1261                expression: LogicalExpression::Variable("n".to_string()),
1262                alias: None,
1263            }],
1264            distinct: false,
1265            input: Box::new(LogicalOperator::Filter(FilterOp {
1266                predicate: LogicalExpression::Binary {
1267                    left: Box::new(LogicalExpression::Property {
1268                        variable: "n".to_string(),
1269                        property: "age".to_string(),
1270                    }),
1271                    op: BinaryOp::Eq,
1272                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1273                },
1274                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1275                    variable: "n".to_string(),
1276                    label: Some("Person".to_string()),
1277                    input: None,
1278                })),
1279                pushdown_hint: None,
1280            })),
1281        }));
1282
1283        let physical = planner.plan(&logical).unwrap();
1284        assert_eq!(physical.columns(), &["n"]);
1285    }
1286
1287    #[test]
1288    fn test_plan_filter_compound_and() {
1289        let store = create_test_store();
1290        let planner = Planner::new(store);
1291
1292        // WHERE n.age > 20 AND n.age < 40
1293        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1294            items: vec![ReturnItem {
1295                expression: LogicalExpression::Variable("n".to_string()),
1296                alias: None,
1297            }],
1298            distinct: false,
1299            input: Box::new(LogicalOperator::Filter(FilterOp {
1300                predicate: LogicalExpression::Binary {
1301                    left: Box::new(LogicalExpression::Binary {
1302                        left: Box::new(LogicalExpression::Property {
1303                            variable: "n".to_string(),
1304                            property: "age".to_string(),
1305                        }),
1306                        op: BinaryOp::Gt,
1307                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1308                    }),
1309                    op: BinaryOp::And,
1310                    right: Box::new(LogicalExpression::Binary {
1311                        left: Box::new(LogicalExpression::Property {
1312                            variable: "n".to_string(),
1313                            property: "age".to_string(),
1314                        }),
1315                        op: BinaryOp::Lt,
1316                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1317                    }),
1318                },
1319                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1320                    variable: "n".to_string(),
1321                    label: None,
1322                    input: None,
1323                })),
1324                pushdown_hint: None,
1325            })),
1326        }));
1327
1328        let physical = planner.plan(&logical).unwrap();
1329        assert_eq!(physical.columns(), &["n"]);
1330    }
1331
1332    #[test]
1333    fn test_plan_filter_unary_not() {
1334        let store = create_test_store();
1335        let planner = Planner::new(store);
1336
1337        // WHERE NOT n.active
1338        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1339            items: vec![ReturnItem {
1340                expression: LogicalExpression::Variable("n".to_string()),
1341                alias: None,
1342            }],
1343            distinct: false,
1344            input: Box::new(LogicalOperator::Filter(FilterOp {
1345                predicate: LogicalExpression::Unary {
1346                    op: UnaryOp::Not,
1347                    operand: Box::new(LogicalExpression::Property {
1348                        variable: "n".to_string(),
1349                        property: "active".to_string(),
1350                    }),
1351                },
1352                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1353                    variable: "n".to_string(),
1354                    label: None,
1355                    input: None,
1356                })),
1357                pushdown_hint: None,
1358            })),
1359        }));
1360
1361        let physical = planner.plan(&logical).unwrap();
1362        assert_eq!(physical.columns(), &["n"]);
1363    }
1364
1365    #[test]
1366    fn test_plan_filter_is_null() {
1367        let store = create_test_store();
1368        let planner = Planner::new(store);
1369
1370        // WHERE n.email IS NULL
1371        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1372            items: vec![ReturnItem {
1373                expression: LogicalExpression::Variable("n".to_string()),
1374                alias: None,
1375            }],
1376            distinct: false,
1377            input: Box::new(LogicalOperator::Filter(FilterOp {
1378                predicate: LogicalExpression::Unary {
1379                    op: UnaryOp::IsNull,
1380                    operand: Box::new(LogicalExpression::Property {
1381                        variable: "n".to_string(),
1382                        property: "email".to_string(),
1383                    }),
1384                },
1385                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1386                    variable: "n".to_string(),
1387                    label: None,
1388                    input: None,
1389                })),
1390                pushdown_hint: None,
1391            })),
1392        }));
1393
1394        let physical = planner.plan(&logical).unwrap();
1395        assert_eq!(physical.columns(), &["n"]);
1396    }
1397
1398    #[test]
1399    fn test_plan_filter_function_call() {
1400        let store = create_test_store();
1401        let planner = Planner::new(store);
1402
1403        // WHERE size(n.friends) > 0
1404        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1405            items: vec![ReturnItem {
1406                expression: LogicalExpression::Variable("n".to_string()),
1407                alias: None,
1408            }],
1409            distinct: false,
1410            input: Box::new(LogicalOperator::Filter(FilterOp {
1411                predicate: LogicalExpression::Binary {
1412                    left: Box::new(LogicalExpression::FunctionCall {
1413                        name: "size".to_string(),
1414                        args: vec![LogicalExpression::Property {
1415                            variable: "n".to_string(),
1416                            property: "friends".to_string(),
1417                        }],
1418                        distinct: false,
1419                    }),
1420                    op: BinaryOp::Gt,
1421                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1422                },
1423                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1424                    variable: "n".to_string(),
1425                    label: None,
1426                    input: None,
1427                })),
1428                pushdown_hint: None,
1429            })),
1430        }));
1431
1432        let physical = planner.plan(&logical).unwrap();
1433        assert_eq!(physical.columns(), &["n"]);
1434    }
1435
1436    // ==================== Expand Tests ====================
1437
1438    #[test]
1439    fn test_plan_expand_outgoing() {
1440        let store = create_test_store();
1441        let planner = Planner::new(store);
1442
1443        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1444        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1445            items: vec![
1446                ReturnItem {
1447                    expression: LogicalExpression::Variable("a".to_string()),
1448                    alias: None,
1449                },
1450                ReturnItem {
1451                    expression: LogicalExpression::Variable("b".to_string()),
1452                    alias: None,
1453                },
1454            ],
1455            distinct: false,
1456            input: Box::new(LogicalOperator::Expand(ExpandOp {
1457                from_variable: "a".to_string(),
1458                to_variable: "b".to_string(),
1459                edge_variable: None,
1460                direction: ExpandDirection::Outgoing,
1461                edge_types: vec!["KNOWS".to_string()],
1462                min_hops: 1,
1463                max_hops: Some(1),
1464                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1465                    variable: "a".to_string(),
1466                    label: Some("Person".to_string()),
1467                    input: None,
1468                })),
1469                path_alias: None,
1470                path_mode: PathMode::Walk,
1471            })),
1472        }));
1473
1474        let physical = planner.plan(&logical).unwrap();
1475        // The return should have columns [a, b]
1476        assert!(physical.columns().contains(&"a".to_string()));
1477        assert!(physical.columns().contains(&"b".to_string()));
1478    }
1479
1480    #[test]
1481    fn test_plan_expand_with_edge_variable() {
1482        let store = create_test_store();
1483        let planner = Planner::new(store);
1484
1485        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1486        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1487            items: vec![
1488                ReturnItem {
1489                    expression: LogicalExpression::Variable("a".to_string()),
1490                    alias: None,
1491                },
1492                ReturnItem {
1493                    expression: LogicalExpression::Variable("r".to_string()),
1494                    alias: None,
1495                },
1496                ReturnItem {
1497                    expression: LogicalExpression::Variable("b".to_string()),
1498                    alias: None,
1499                },
1500            ],
1501            distinct: false,
1502            input: Box::new(LogicalOperator::Expand(ExpandOp {
1503                from_variable: "a".to_string(),
1504                to_variable: "b".to_string(),
1505                edge_variable: Some("r".to_string()),
1506                direction: ExpandDirection::Outgoing,
1507                edge_types: vec!["KNOWS".to_string()],
1508                min_hops: 1,
1509                max_hops: Some(1),
1510                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1511                    variable: "a".to_string(),
1512                    label: None,
1513                    input: None,
1514                })),
1515                path_alias: None,
1516                path_mode: PathMode::Walk,
1517            })),
1518        }));
1519
1520        let physical = planner.plan(&logical).unwrap();
1521        assert!(physical.columns().contains(&"a".to_string()));
1522        assert!(physical.columns().contains(&"r".to_string()));
1523        assert!(physical.columns().contains(&"b".to_string()));
1524    }
1525
1526    // ==================== Limit/Skip/Sort Tests ====================
1527
1528    #[test]
1529    fn test_plan_limit() {
1530        let store = create_test_store();
1531        let planner = Planner::new(store);
1532
1533        // MATCH (n) RETURN n LIMIT 10
1534        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1535            items: vec![ReturnItem {
1536                expression: LogicalExpression::Variable("n".to_string()),
1537                alias: None,
1538            }],
1539            distinct: false,
1540            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1541                count: 10.into(),
1542                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1543                    variable: "n".to_string(),
1544                    label: None,
1545                    input: None,
1546                })),
1547            })),
1548        }));
1549
1550        let physical = planner.plan(&logical).unwrap();
1551        assert_eq!(physical.columns(), &["n"]);
1552    }
1553
1554    #[test]
1555    fn test_plan_skip() {
1556        let store = create_test_store();
1557        let planner = Planner::new(store);
1558
1559        // MATCH (n) RETURN n SKIP 5
1560        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1561            items: vec![ReturnItem {
1562                expression: LogicalExpression::Variable("n".to_string()),
1563                alias: None,
1564            }],
1565            distinct: false,
1566            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1567                count: 5.into(),
1568                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1569                    variable: "n".to_string(),
1570                    label: None,
1571                    input: None,
1572                })),
1573            })),
1574        }));
1575
1576        let physical = planner.plan(&logical).unwrap();
1577        assert_eq!(physical.columns(), &["n"]);
1578    }
1579
1580    #[test]
1581    fn test_plan_sort() {
1582        let store = create_test_store();
1583        let planner = Planner::new(store);
1584
1585        // MATCH (n) RETURN n ORDER BY n.name ASC
1586        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1587            items: vec![ReturnItem {
1588                expression: LogicalExpression::Variable("n".to_string()),
1589                alias: None,
1590            }],
1591            distinct: false,
1592            input: Box::new(LogicalOperator::Sort(SortOp {
1593                keys: vec![SortKey {
1594                    expression: LogicalExpression::Variable("n".to_string()),
1595                    order: SortOrder::Ascending,
1596                    nulls: None,
1597                }],
1598                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1599                    variable: "n".to_string(),
1600                    label: None,
1601                    input: None,
1602                })),
1603            })),
1604        }));
1605
1606        let physical = planner.plan(&logical).unwrap();
1607        assert_eq!(physical.columns(), &["n"]);
1608    }
1609
1610    #[test]
1611    fn test_plan_sort_descending() {
1612        let store = create_test_store();
1613        let planner = Planner::new(store);
1614
1615        // ORDER BY n DESC
1616        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1617            items: vec![ReturnItem {
1618                expression: LogicalExpression::Variable("n".to_string()),
1619                alias: None,
1620            }],
1621            distinct: false,
1622            input: Box::new(LogicalOperator::Sort(SortOp {
1623                keys: vec![SortKey {
1624                    expression: LogicalExpression::Variable("n".to_string()),
1625                    order: SortOrder::Descending,
1626                    nulls: None,
1627                }],
1628                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1629                    variable: "n".to_string(),
1630                    label: None,
1631                    input: None,
1632                })),
1633            })),
1634        }));
1635
1636        let physical = planner.plan(&logical).unwrap();
1637        assert_eq!(physical.columns(), &["n"]);
1638    }
1639
1640    #[test]
1641    fn test_plan_distinct() {
1642        let store = create_test_store();
1643        let planner = Planner::new(store);
1644
1645        // MATCH (n) RETURN DISTINCT n
1646        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1647            items: vec![ReturnItem {
1648                expression: LogicalExpression::Variable("n".to_string()),
1649                alias: None,
1650            }],
1651            distinct: false,
1652            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1653                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1654                    variable: "n".to_string(),
1655                    label: None,
1656                    input: None,
1657                })),
1658                columns: None,
1659            })),
1660        }));
1661
1662        let physical = planner.plan(&logical).unwrap();
1663        assert_eq!(physical.columns(), &["n"]);
1664    }
1665
1666    #[test]
1667    fn test_plan_distinct_with_columns() {
1668        let store = create_test_store();
1669        let planner = Planner::new(store);
1670
1671        // DISTINCT on specific columns (column-specific dedup)
1672        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1673            items: vec![ReturnItem {
1674                expression: LogicalExpression::Variable("n".to_string()),
1675                alias: None,
1676            }],
1677            distinct: false,
1678            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1679                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1680                    variable: "n".to_string(),
1681                    label: None,
1682                    input: None,
1683                })),
1684                columns: Some(vec!["n".to_string()]),
1685            })),
1686        }));
1687
1688        let physical = planner.plan(&logical).unwrap();
1689        assert_eq!(physical.columns(), &["n"]);
1690    }
1691
1692    #[test]
1693    fn test_plan_distinct_with_nonexistent_columns() {
1694        let store = create_test_store();
1695        let planner = Planner::new(store);
1696
1697        // When distinct columns don't match any output columns,
1698        // it falls back to full-row distinct.
1699        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1700            items: vec![ReturnItem {
1701                expression: LogicalExpression::Variable("n".to_string()),
1702                alias: None,
1703            }],
1704            distinct: false,
1705            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1706                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1707                    variable: "n".to_string(),
1708                    label: None,
1709                    input: None,
1710                })),
1711                columns: Some(vec!["nonexistent".to_string()]),
1712            })),
1713        }));
1714
1715        let physical = planner.plan(&logical).unwrap();
1716        assert_eq!(physical.columns(), &["n"]);
1717    }
1718
1719    // ==================== Aggregate Tests ====================
1720
1721    #[test]
1722    fn test_plan_aggregate_count() {
1723        let store = create_test_store();
1724        let planner = Planner::new(store);
1725
1726        // MATCH (n) RETURN count(n)
1727        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1728            items: vec![ReturnItem {
1729                expression: LogicalExpression::Variable("cnt".to_string()),
1730                alias: None,
1731            }],
1732            distinct: false,
1733            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1734                group_by: vec![],
1735                aggregates: vec![LogicalAggregateExpr {
1736                    function: LogicalAggregateFunction::Count,
1737                    expression: Some(LogicalExpression::Variable("n".to_string())),
1738                    expression2: None,
1739                    distinct: false,
1740                    alias: Some("cnt".to_string()),
1741                    percentile: None,
1742                    separator: None,
1743                }],
1744                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1745                    variable: "n".to_string(),
1746                    label: None,
1747                    input: None,
1748                })),
1749                having: None,
1750            })),
1751        }));
1752
1753        let physical = planner.plan(&logical).unwrap();
1754        assert!(physical.columns().contains(&"cnt".to_string()));
1755    }
1756
1757    #[test]
1758    fn test_plan_aggregate_with_group_by() {
1759        let store = create_test_store();
1760        let planner = Planner::new(store);
1761
1762        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1763        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1764            group_by: vec![LogicalExpression::Property {
1765                variable: "n".to_string(),
1766                property: "city".to_string(),
1767            }],
1768            aggregates: vec![LogicalAggregateExpr {
1769                function: LogicalAggregateFunction::Count,
1770                expression: Some(LogicalExpression::Variable("n".to_string())),
1771                expression2: None,
1772                distinct: false,
1773                alias: Some("cnt".to_string()),
1774                percentile: None,
1775                separator: None,
1776            }],
1777            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1778                variable: "n".to_string(),
1779                label: Some("Person".to_string()),
1780                input: None,
1781            })),
1782            having: None,
1783        }));
1784
1785        let physical = planner.plan(&logical).unwrap();
1786        assert_eq!(physical.columns().len(), 2);
1787    }
1788
1789    #[test]
1790    fn test_plan_aggregate_sum() {
1791        let store = create_test_store();
1792        let planner = Planner::new(store);
1793
1794        // SUM(n.value)
1795        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1796            group_by: vec![],
1797            aggregates: vec![LogicalAggregateExpr {
1798                function: LogicalAggregateFunction::Sum,
1799                expression: Some(LogicalExpression::Property {
1800                    variable: "n".to_string(),
1801                    property: "value".to_string(),
1802                }),
1803                expression2: None,
1804                distinct: false,
1805                alias: Some("total".to_string()),
1806                percentile: None,
1807                separator: None,
1808            }],
1809            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1810                variable: "n".to_string(),
1811                label: None,
1812                input: None,
1813            })),
1814            having: None,
1815        }));
1816
1817        let physical = planner.plan(&logical).unwrap();
1818        assert!(physical.columns().contains(&"total".to_string()));
1819    }
1820
1821    #[test]
1822    fn test_plan_aggregate_avg() {
1823        let store = create_test_store();
1824        let planner = Planner::new(store);
1825
1826        // AVG(n.score)
1827        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1828            group_by: vec![],
1829            aggregates: vec![LogicalAggregateExpr {
1830                function: LogicalAggregateFunction::Avg,
1831                expression: Some(LogicalExpression::Property {
1832                    variable: "n".to_string(),
1833                    property: "score".to_string(),
1834                }),
1835                expression2: None,
1836                distinct: false,
1837                alias: Some("average".to_string()),
1838                percentile: None,
1839                separator: None,
1840            }],
1841            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1842                variable: "n".to_string(),
1843                label: None,
1844                input: None,
1845            })),
1846            having: None,
1847        }));
1848
1849        let physical = planner.plan(&logical).unwrap();
1850        assert!(physical.columns().contains(&"average".to_string()));
1851    }
1852
1853    #[test]
1854    fn test_plan_aggregate_min_max() {
1855        let store = create_test_store();
1856        let planner = Planner::new(store);
1857
1858        // MIN(n.age), MAX(n.age)
1859        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1860            group_by: vec![],
1861            aggregates: vec![
1862                LogicalAggregateExpr {
1863                    function: LogicalAggregateFunction::Min,
1864                    expression: Some(LogicalExpression::Property {
1865                        variable: "n".to_string(),
1866                        property: "age".to_string(),
1867                    }),
1868                    expression2: None,
1869                    distinct: false,
1870                    alias: Some("youngest".to_string()),
1871                    percentile: None,
1872                    separator: None,
1873                },
1874                LogicalAggregateExpr {
1875                    function: LogicalAggregateFunction::Max,
1876                    expression: Some(LogicalExpression::Property {
1877                        variable: "n".to_string(),
1878                        property: "age".to_string(),
1879                    }),
1880                    expression2: None,
1881                    distinct: false,
1882                    alias: Some("oldest".to_string()),
1883                    percentile: None,
1884                    separator: None,
1885                },
1886            ],
1887            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1888                variable: "n".to_string(),
1889                label: None,
1890                input: None,
1891            })),
1892            having: None,
1893        }));
1894
1895        let physical = planner.plan(&logical).unwrap();
1896        assert!(physical.columns().contains(&"youngest".to_string()));
1897        assert!(physical.columns().contains(&"oldest".to_string()));
1898    }
1899
1900    // ==================== Join Tests ====================
1901
1902    #[test]
1903    fn test_plan_inner_join() {
1904        let store = create_test_store();
1905        let planner = Planner::new(store);
1906
1907        // Inner join between two scans
1908        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1909            items: vec![
1910                ReturnItem {
1911                    expression: LogicalExpression::Variable("a".to_string()),
1912                    alias: None,
1913                },
1914                ReturnItem {
1915                    expression: LogicalExpression::Variable("b".to_string()),
1916                    alias: None,
1917                },
1918            ],
1919            distinct: false,
1920            input: Box::new(LogicalOperator::Join(JoinOp {
1921                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1922                    variable: "a".to_string(),
1923                    label: Some("Person".to_string()),
1924                    input: None,
1925                })),
1926                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1927                    variable: "b".to_string(),
1928                    label: Some("Company".to_string()),
1929                    input: None,
1930                })),
1931                join_type: JoinType::Inner,
1932                conditions: vec![JoinCondition {
1933                    left: LogicalExpression::Variable("a".to_string()),
1934                    right: LogicalExpression::Variable("b".to_string()),
1935                }],
1936            })),
1937        }));
1938
1939        let physical = planner.plan(&logical).unwrap();
1940        assert!(physical.columns().contains(&"a".to_string()));
1941        assert!(physical.columns().contains(&"b".to_string()));
1942    }
1943
1944    #[test]
1945    fn test_plan_cross_join() {
1946        let store = create_test_store();
1947        let planner = Planner::new(store);
1948
1949        // Cross join (no conditions)
1950        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1951            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1952                variable: "a".to_string(),
1953                label: None,
1954                input: None,
1955            })),
1956            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1957                variable: "b".to_string(),
1958                label: None,
1959                input: None,
1960            })),
1961            join_type: JoinType::Cross,
1962            conditions: vec![],
1963        }));
1964
1965        let physical = planner.plan(&logical).unwrap();
1966        assert_eq!(physical.columns().len(), 2);
1967    }
1968
1969    #[test]
1970    fn test_plan_left_join() {
1971        let store = create_test_store();
1972        let planner = Planner::new(store);
1973
1974        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1975            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1976                variable: "a".to_string(),
1977                label: None,
1978                input: None,
1979            })),
1980            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1981                variable: "b".to_string(),
1982                label: None,
1983                input: None,
1984            })),
1985            join_type: JoinType::Left,
1986            conditions: vec![],
1987        }));
1988
1989        let physical = planner.plan(&logical).unwrap();
1990        assert_eq!(physical.columns().len(), 2);
1991    }
1992
1993    // ==================== Mutation Tests ====================
1994
1995    fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1996        let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStoreSearch>);
1997        p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1998        p
1999    }
2000
2001    #[test]
2002    fn test_plan_create_node() {
2003        let store = create_test_store();
2004        let planner = create_writable_planner(&store);
2005
2006        // CREATE (n:Person {name: 'Alix'})
2007        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2008            variable: "n".to_string(),
2009            labels: vec!["Person".to_string()],
2010            properties: vec![(
2011                "name".to_string(),
2012                LogicalExpression::Literal(Value::String("Alix".into())),
2013            )],
2014            input: None,
2015        }));
2016
2017        let physical = planner.plan(&logical).unwrap();
2018        assert!(physical.columns().contains(&"n".to_string()));
2019    }
2020
2021    #[test]
2022    fn test_plan_create_edge() {
2023        let store = create_test_store();
2024        let planner = create_writable_planner(&store);
2025
2026        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
2027        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
2028            variable: Some("r".to_string()),
2029            from_variable: "a".to_string(),
2030            to_variable: "b".to_string(),
2031            edge_type: "KNOWS".to_string(),
2032            properties: vec![],
2033            input: Box::new(LogicalOperator::Join(JoinOp {
2034                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2035                    variable: "a".to_string(),
2036                    label: None,
2037                    input: None,
2038                })),
2039                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2040                    variable: "b".to_string(),
2041                    label: None,
2042                    input: None,
2043                })),
2044                join_type: JoinType::Cross,
2045                conditions: vec![],
2046            })),
2047        }));
2048
2049        let physical = planner.plan(&logical).unwrap();
2050        assert!(physical.columns().contains(&"r".to_string()));
2051    }
2052
2053    #[test]
2054    fn test_plan_delete_node() {
2055        let store = create_test_store();
2056        let planner = create_writable_planner(&store);
2057
2058        // MATCH (n) DELETE n
2059        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
2060            variable: "n".to_string(),
2061            detach: false,
2062            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2063                variable: "n".to_string(),
2064                label: None,
2065                input: None,
2066            })),
2067        }));
2068
2069        let physical = planner.plan(&logical).unwrap();
2070        assert!(physical.columns().contains(&"n".to_string()));
2071    }
2072
2073    // ==================== Error Cases ====================
2074
2075    #[test]
2076    fn test_plan_empty_errors() {
2077        let store = create_test_store();
2078        let planner = Planner::new(store);
2079
2080        let logical = LogicalPlan::new(LogicalOperator::Empty);
2081        let result = planner.plan(&logical);
2082        assert!(result.is_err());
2083    }
2084
2085    #[test]
2086    fn test_plan_missing_variable_in_return() {
2087        let store = create_test_store();
2088        let planner = Planner::new(store);
2089
2090        // Return variable that doesn't exist in input
2091        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2092            items: vec![ReturnItem {
2093                expression: LogicalExpression::Variable("missing".to_string()),
2094                alias: None,
2095            }],
2096            distinct: false,
2097            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2098                variable: "n".to_string(),
2099                label: None,
2100                input: None,
2101            })),
2102        }));
2103
2104        let result = planner.plan(&logical);
2105        assert!(result.is_err());
2106    }
2107
2108    // ==================== Helper Function Tests ====================
2109
2110    #[test]
2111    fn test_convert_binary_ops() {
2112        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
2113        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
2114        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
2115        assert!(convert_binary_op(BinaryOp::Le).is_ok());
2116        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
2117        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
2118        assert!(convert_binary_op(BinaryOp::And).is_ok());
2119        assert!(convert_binary_op(BinaryOp::Or).is_ok());
2120        assert!(convert_binary_op(BinaryOp::Add).is_ok());
2121        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2122        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2123        assert!(convert_binary_op(BinaryOp::Div).is_ok());
2124    }
2125
2126    #[test]
2127    fn test_convert_unary_ops() {
2128        assert!(convert_unary_op(UnaryOp::Not).is_ok());
2129        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2130        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2131        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2132    }
2133
2134    #[test]
2135    fn test_convert_aggregate_functions() {
2136        assert!(matches!(
2137            convert_aggregate_function(LogicalAggregateFunction::Count),
2138            PhysicalAggregateFunction::Count
2139        ));
2140        assert!(matches!(
2141            convert_aggregate_function(LogicalAggregateFunction::Sum),
2142            PhysicalAggregateFunction::Sum
2143        ));
2144        assert!(matches!(
2145            convert_aggregate_function(LogicalAggregateFunction::Avg),
2146            PhysicalAggregateFunction::Avg
2147        ));
2148        assert!(matches!(
2149            convert_aggregate_function(LogicalAggregateFunction::Min),
2150            PhysicalAggregateFunction::Min
2151        ));
2152        assert!(matches!(
2153            convert_aggregate_function(LogicalAggregateFunction::Max),
2154            PhysicalAggregateFunction::Max
2155        ));
2156    }
2157
2158    #[test]
2159    fn test_planner_accessors() {
2160        let store = create_test_store();
2161        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2162
2163        assert!(planner.transaction_id().is_none());
2164        assert!(planner.transaction_manager().is_none());
2165        let _ = planner.viewing_epoch(); // Just ensure it's accessible
2166    }
2167
2168    #[test]
2169    fn test_physical_plan_accessors() {
2170        let store = create_test_store();
2171        let planner = Planner::new(store);
2172
2173        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2174            variable: "n".to_string(),
2175            label: None,
2176            input: None,
2177        }));
2178
2179        let physical = planner.plan(&logical).unwrap();
2180        assert_eq!(physical.columns(), &["n"]);
2181
2182        // Test into_operator
2183        let _ = physical.into_operator();
2184    }
2185
2186    // ==================== Adaptive Planning Tests ====================
2187
2188    #[test]
2189    fn test_plan_adaptive_with_scan() {
2190        let store = create_test_store();
2191        let planner = Planner::new(store);
2192
2193        // MATCH (n:Person) RETURN n
2194        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2195            items: vec![ReturnItem {
2196                expression: LogicalExpression::Variable("n".to_string()),
2197                alias: None,
2198            }],
2199            distinct: false,
2200            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2201                variable: "n".to_string(),
2202                label: Some("Person".to_string()),
2203                input: None,
2204            })),
2205        }));
2206
2207        let physical = planner.plan_adaptive(&logical).unwrap();
2208        assert_eq!(physical.columns(), &["n"]);
2209        // Should have adaptive context with estimates
2210        assert!(physical.adaptive_context.is_some());
2211    }
2212
2213    #[test]
2214    fn test_plan_adaptive_with_filter() {
2215        let store = create_test_store();
2216        let planner = Planner::new(store);
2217
2218        // MATCH (n) WHERE n.age > 30 RETURN n
2219        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2220            items: vec![ReturnItem {
2221                expression: LogicalExpression::Variable("n".to_string()),
2222                alias: None,
2223            }],
2224            distinct: false,
2225            input: Box::new(LogicalOperator::Filter(FilterOp {
2226                predicate: LogicalExpression::Binary {
2227                    left: Box::new(LogicalExpression::Property {
2228                        variable: "n".to_string(),
2229                        property: "age".to_string(),
2230                    }),
2231                    op: BinaryOp::Gt,
2232                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2233                },
2234                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2235                    variable: "n".to_string(),
2236                    label: None,
2237                    input: None,
2238                })),
2239                pushdown_hint: None,
2240            })),
2241        }));
2242
2243        let physical = planner.plan_adaptive(&logical).unwrap();
2244        assert!(physical.adaptive_context.is_some());
2245    }
2246
2247    #[test]
2248    fn test_plan_adaptive_with_expand() {
2249        let store = create_test_store();
2250        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2251            .with_factorized_execution(false);
2252
2253        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
2254        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2255            items: vec![
2256                ReturnItem {
2257                    expression: LogicalExpression::Variable("a".to_string()),
2258                    alias: None,
2259                },
2260                ReturnItem {
2261                    expression: LogicalExpression::Variable("b".to_string()),
2262                    alias: None,
2263                },
2264            ],
2265            distinct: false,
2266            input: Box::new(LogicalOperator::Expand(ExpandOp {
2267                from_variable: "a".to_string(),
2268                to_variable: "b".to_string(),
2269                edge_variable: None,
2270                direction: ExpandDirection::Outgoing,
2271                edge_types: vec!["KNOWS".to_string()],
2272                min_hops: 1,
2273                max_hops: Some(1),
2274                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2275                    variable: "a".to_string(),
2276                    label: None,
2277                    input: None,
2278                })),
2279                path_alias: None,
2280                path_mode: PathMode::Walk,
2281            })),
2282        }));
2283
2284        let physical = planner.plan_adaptive(&logical).unwrap();
2285        assert!(physical.adaptive_context.is_some());
2286    }
2287
2288    #[test]
2289    fn test_plan_adaptive_with_join() {
2290        let store = create_test_store();
2291        let planner = Planner::new(store);
2292
2293        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2294            items: vec![
2295                ReturnItem {
2296                    expression: LogicalExpression::Variable("a".to_string()),
2297                    alias: None,
2298                },
2299                ReturnItem {
2300                    expression: LogicalExpression::Variable("b".to_string()),
2301                    alias: None,
2302                },
2303            ],
2304            distinct: false,
2305            input: Box::new(LogicalOperator::Join(JoinOp {
2306                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2307                    variable: "a".to_string(),
2308                    label: None,
2309                    input: None,
2310                })),
2311                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2312                    variable: "b".to_string(),
2313                    label: None,
2314                    input: None,
2315                })),
2316                join_type: JoinType::Cross,
2317                conditions: vec![],
2318            })),
2319        }));
2320
2321        let physical = planner.plan_adaptive(&logical).unwrap();
2322        assert!(physical.adaptive_context.is_some());
2323    }
2324
2325    #[test]
2326    fn test_plan_adaptive_with_aggregate() {
2327        let store = create_test_store();
2328        let planner = Planner::new(store);
2329
2330        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2331            group_by: vec![],
2332            aggregates: vec![LogicalAggregateExpr {
2333                function: LogicalAggregateFunction::Count,
2334                expression: Some(LogicalExpression::Variable("n".to_string())),
2335                expression2: None,
2336                distinct: false,
2337                alias: Some("cnt".to_string()),
2338                percentile: None,
2339                separator: None,
2340            }],
2341            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2342                variable: "n".to_string(),
2343                label: None,
2344                input: None,
2345            })),
2346            having: None,
2347        }));
2348
2349        let physical = planner.plan_adaptive(&logical).unwrap();
2350        assert!(physical.adaptive_context.is_some());
2351    }
2352
2353    #[test]
2354    fn test_plan_adaptive_with_distinct() {
2355        let store = create_test_store();
2356        let planner = Planner::new(store);
2357
2358        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2359            items: vec![ReturnItem {
2360                expression: LogicalExpression::Variable("n".to_string()),
2361                alias: None,
2362            }],
2363            distinct: false,
2364            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2365                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2366                    variable: "n".to_string(),
2367                    label: None,
2368                    input: None,
2369                })),
2370                columns: None,
2371            })),
2372        }));
2373
2374        let physical = planner.plan_adaptive(&logical).unwrap();
2375        assert!(physical.adaptive_context.is_some());
2376    }
2377
2378    #[test]
2379    fn test_plan_adaptive_with_limit() {
2380        let store = create_test_store();
2381        let planner = Planner::new(store);
2382
2383        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2384            items: vec![ReturnItem {
2385                expression: LogicalExpression::Variable("n".to_string()),
2386                alias: None,
2387            }],
2388            distinct: false,
2389            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2390                count: 10.into(),
2391                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2392                    variable: "n".to_string(),
2393                    label: None,
2394                    input: None,
2395                })),
2396            })),
2397        }));
2398
2399        let physical = planner.plan_adaptive(&logical).unwrap();
2400        assert!(physical.adaptive_context.is_some());
2401    }
2402
2403    #[test]
2404    fn test_plan_adaptive_with_skip() {
2405        let store = create_test_store();
2406        let planner = Planner::new(store);
2407
2408        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2409            items: vec![ReturnItem {
2410                expression: LogicalExpression::Variable("n".to_string()),
2411                alias: None,
2412            }],
2413            distinct: false,
2414            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2415                count: 5.into(),
2416                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2417                    variable: "n".to_string(),
2418                    label: None,
2419                    input: None,
2420                })),
2421            })),
2422        }));
2423
2424        let physical = planner.plan_adaptive(&logical).unwrap();
2425        assert!(physical.adaptive_context.is_some());
2426    }
2427
2428    #[test]
2429    fn test_plan_adaptive_with_sort() {
2430        let store = create_test_store();
2431        let planner = Planner::new(store);
2432
2433        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2434            items: vec![ReturnItem {
2435                expression: LogicalExpression::Variable("n".to_string()),
2436                alias: None,
2437            }],
2438            distinct: false,
2439            input: Box::new(LogicalOperator::Sort(SortOp {
2440                keys: vec![SortKey {
2441                    expression: LogicalExpression::Variable("n".to_string()),
2442                    order: SortOrder::Ascending,
2443                    nulls: None,
2444                }],
2445                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2446                    variable: "n".to_string(),
2447                    label: None,
2448                    input: None,
2449                })),
2450            })),
2451        }));
2452
2453        let physical = planner.plan_adaptive(&logical).unwrap();
2454        assert!(physical.adaptive_context.is_some());
2455    }
2456
2457    #[test]
2458    fn test_plan_adaptive_with_union() {
2459        let store = create_test_store();
2460        let planner = Planner::new(store);
2461
2462        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2463            items: vec![ReturnItem {
2464                expression: LogicalExpression::Variable("n".to_string()),
2465                alias: None,
2466            }],
2467            distinct: false,
2468            input: Box::new(LogicalOperator::Union(UnionOp {
2469                inputs: vec![
2470                    LogicalOperator::NodeScan(NodeScanOp {
2471                        variable: "n".to_string(),
2472                        label: Some("Person".to_string()),
2473                        input: None,
2474                    }),
2475                    LogicalOperator::NodeScan(NodeScanOp {
2476                        variable: "n".to_string(),
2477                        label: Some("Company".to_string()),
2478                        input: None,
2479                    }),
2480                ],
2481            })),
2482        }));
2483
2484        let physical = planner.plan_adaptive(&logical).unwrap();
2485        assert!(physical.adaptive_context.is_some());
2486    }
2487
2488    // ==================== Variable Length Path Tests ====================
2489
2490    #[test]
2491    fn test_plan_expand_variable_length() {
2492        let store = create_test_store();
2493        let planner = Planner::new(store);
2494
2495        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2496        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2497            items: vec![
2498                ReturnItem {
2499                    expression: LogicalExpression::Variable("a".to_string()),
2500                    alias: None,
2501                },
2502                ReturnItem {
2503                    expression: LogicalExpression::Variable("b".to_string()),
2504                    alias: None,
2505                },
2506            ],
2507            distinct: false,
2508            input: Box::new(LogicalOperator::Expand(ExpandOp {
2509                from_variable: "a".to_string(),
2510                to_variable: "b".to_string(),
2511                edge_variable: None,
2512                direction: ExpandDirection::Outgoing,
2513                edge_types: vec!["KNOWS".to_string()],
2514                min_hops: 1,
2515                max_hops: Some(3),
2516                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2517                    variable: "a".to_string(),
2518                    label: None,
2519                    input: None,
2520                })),
2521                path_alias: None,
2522                path_mode: PathMode::Walk,
2523            })),
2524        }));
2525
2526        let physical = planner.plan(&logical).unwrap();
2527        assert!(physical.columns().contains(&"a".to_string()));
2528        assert!(physical.columns().contains(&"b".to_string()));
2529    }
2530
2531    #[test]
2532    fn test_plan_expand_with_path_alias() {
2533        let store = create_test_store();
2534        let planner = Planner::new(store);
2535
2536        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2537        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2538            items: vec![
2539                ReturnItem {
2540                    expression: LogicalExpression::Variable("a".to_string()),
2541                    alias: None,
2542                },
2543                ReturnItem {
2544                    expression: LogicalExpression::Variable("b".to_string()),
2545                    alias: None,
2546                },
2547            ],
2548            distinct: false,
2549            input: Box::new(LogicalOperator::Expand(ExpandOp {
2550                from_variable: "a".to_string(),
2551                to_variable: "b".to_string(),
2552                edge_variable: None,
2553                direction: ExpandDirection::Outgoing,
2554                edge_types: vec!["KNOWS".to_string()],
2555                min_hops: 1,
2556                max_hops: Some(3),
2557                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2558                    variable: "a".to_string(),
2559                    label: None,
2560                    input: None,
2561                })),
2562                path_alias: Some("p".to_string()),
2563                path_mode: PathMode::Walk,
2564            })),
2565        }));
2566
2567        let physical = planner.plan(&logical).unwrap();
2568        // Verify plan was created successfully with expected output columns
2569        assert!(physical.columns().contains(&"a".to_string()));
2570        assert!(physical.columns().contains(&"b".to_string()));
2571    }
2572
2573    #[test]
2574    fn test_plan_expand_incoming() {
2575        let store = create_test_store();
2576        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2577            .with_factorized_execution(false);
2578
2579        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2580        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2581            items: vec![
2582                ReturnItem {
2583                    expression: LogicalExpression::Variable("a".to_string()),
2584                    alias: None,
2585                },
2586                ReturnItem {
2587                    expression: LogicalExpression::Variable("b".to_string()),
2588                    alias: None,
2589                },
2590            ],
2591            distinct: false,
2592            input: Box::new(LogicalOperator::Expand(ExpandOp {
2593                from_variable: "a".to_string(),
2594                to_variable: "b".to_string(),
2595                edge_variable: None,
2596                direction: ExpandDirection::Incoming,
2597                edge_types: vec!["KNOWS".to_string()],
2598                min_hops: 1,
2599                max_hops: Some(1),
2600                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2601                    variable: "a".to_string(),
2602                    label: None,
2603                    input: None,
2604                })),
2605                path_alias: None,
2606                path_mode: PathMode::Walk,
2607            })),
2608        }));
2609
2610        let physical = planner.plan(&logical).unwrap();
2611        assert!(physical.columns().contains(&"a".to_string()));
2612        assert!(physical.columns().contains(&"b".to_string()));
2613    }
2614
2615    #[test]
2616    fn test_plan_expand_both_directions() {
2617        let store = create_test_store();
2618        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2619            .with_factorized_execution(false);
2620
2621        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2622        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2623            items: vec![
2624                ReturnItem {
2625                    expression: LogicalExpression::Variable("a".to_string()),
2626                    alias: None,
2627                },
2628                ReturnItem {
2629                    expression: LogicalExpression::Variable("b".to_string()),
2630                    alias: None,
2631                },
2632            ],
2633            distinct: false,
2634            input: Box::new(LogicalOperator::Expand(ExpandOp {
2635                from_variable: "a".to_string(),
2636                to_variable: "b".to_string(),
2637                edge_variable: None,
2638                direction: ExpandDirection::Both,
2639                edge_types: vec!["KNOWS".to_string()],
2640                min_hops: 1,
2641                max_hops: Some(1),
2642                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2643                    variable: "a".to_string(),
2644                    label: None,
2645                    input: None,
2646                })),
2647                path_alias: None,
2648                path_mode: PathMode::Walk,
2649            })),
2650        }));
2651
2652        let physical = planner.plan(&logical).unwrap();
2653        assert!(physical.columns().contains(&"a".to_string()));
2654        assert!(physical.columns().contains(&"b".to_string()));
2655    }
2656
2657    // ==================== With Context Tests ====================
2658
2659    #[test]
2660    fn test_planner_with_context() {
2661        use crate::transaction::TransactionManager;
2662
2663        let store = create_test_store();
2664        let transaction_manager = Arc::new(TransactionManager::new());
2665        let transaction_id = transaction_manager.begin();
2666        let epoch = transaction_manager.current_epoch();
2667
2668        let planner = Planner::with_context(
2669            Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
2670            Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2671            Arc::clone(&transaction_manager),
2672            Some(transaction_id),
2673            epoch,
2674        );
2675
2676        assert_eq!(planner.transaction_id(), Some(transaction_id));
2677        assert!(planner.transaction_manager().is_some());
2678        assert_eq!(planner.viewing_epoch(), epoch);
2679    }
2680
2681    #[test]
2682    fn test_planner_with_factorized_execution_disabled() {
2683        let store = create_test_store();
2684        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2685            .with_factorized_execution(false);
2686
2687        // Two consecutive expands - should NOT use factorized execution
2688        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2689            items: vec![
2690                ReturnItem {
2691                    expression: LogicalExpression::Variable("a".to_string()),
2692                    alias: None,
2693                },
2694                ReturnItem {
2695                    expression: LogicalExpression::Variable("c".to_string()),
2696                    alias: None,
2697                },
2698            ],
2699            distinct: false,
2700            input: Box::new(LogicalOperator::Expand(ExpandOp {
2701                from_variable: "b".to_string(),
2702                to_variable: "c".to_string(),
2703                edge_variable: None,
2704                direction: ExpandDirection::Outgoing,
2705                edge_types: vec![],
2706                min_hops: 1,
2707                max_hops: Some(1),
2708                input: Box::new(LogicalOperator::Expand(ExpandOp {
2709                    from_variable: "a".to_string(),
2710                    to_variable: "b".to_string(),
2711                    edge_variable: None,
2712                    direction: ExpandDirection::Outgoing,
2713                    edge_types: vec![],
2714                    min_hops: 1,
2715                    max_hops: Some(1),
2716                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2717                        variable: "a".to_string(),
2718                        label: None,
2719                        input: None,
2720                    })),
2721                    path_alias: None,
2722                    path_mode: PathMode::Walk,
2723                })),
2724                path_alias: None,
2725                path_mode: PathMode::Walk,
2726            })),
2727        }));
2728
2729        let physical = planner.plan(&logical).unwrap();
2730        assert!(physical.columns().contains(&"a".to_string()));
2731        assert!(physical.columns().contains(&"c".to_string()));
2732    }
2733
2734    // ==================== Sort with Property Tests ====================
2735
2736    #[test]
2737    fn test_plan_sort_by_property() {
2738        let store = create_test_store();
2739        let planner = Planner::new(store);
2740
2741        // MATCH (n) RETURN n ORDER BY n.name ASC
2742        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2743            items: vec![ReturnItem {
2744                expression: LogicalExpression::Variable("n".to_string()),
2745                alias: None,
2746            }],
2747            distinct: false,
2748            input: Box::new(LogicalOperator::Sort(SortOp {
2749                keys: vec![SortKey {
2750                    expression: LogicalExpression::Property {
2751                        variable: "n".to_string(),
2752                        property: "name".to_string(),
2753                    },
2754                    order: SortOrder::Ascending,
2755                    nulls: None,
2756                }],
2757                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2758                    variable: "n".to_string(),
2759                    label: None,
2760                    input: None,
2761                })),
2762            })),
2763        }));
2764
2765        let physical = planner.plan(&logical).unwrap();
2766        // Should have the property column projected
2767        assert!(physical.columns().contains(&"n".to_string()));
2768    }
2769
2770    // ==================== Scan with Input Tests ====================
2771
2772    #[test]
2773    fn test_plan_scan_with_input() {
2774        let store = create_test_store();
2775        let planner = Planner::new(store);
2776
2777        // A scan with another scan as input (for chained patterns)
2778        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2779            items: vec![
2780                ReturnItem {
2781                    expression: LogicalExpression::Variable("a".to_string()),
2782                    alias: None,
2783                },
2784                ReturnItem {
2785                    expression: LogicalExpression::Variable("b".to_string()),
2786                    alias: None,
2787                },
2788            ],
2789            distinct: false,
2790            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2791                variable: "b".to_string(),
2792                label: Some("Company".to_string()),
2793                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2794                    variable: "a".to_string(),
2795                    label: Some("Person".to_string()),
2796                    input: None,
2797                }))),
2798            })),
2799        }));
2800
2801        let physical = planner.plan(&logical).unwrap();
2802        assert!(physical.columns().contains(&"a".to_string()));
2803        assert!(physical.columns().contains(&"b".to_string()));
2804    }
2805
2806    // ==================== Additional Coverage Tests ====================
2807    //
2808    // These tests target branches that were not exercised by the original
2809    // planner tests: builder methods, read-only flag, profiled planning,
2810    // unsupported operator error paths, and the dispatch branches for every
2811    // plan_* function reachable through plan_operator.
2812
2813    use crate::catalog::Catalog;
2814    use crate::query::plan::{
2815        AddLabelOp, AntiJoinOp, ApplyOp, BindOp, DeleteEdgeOp, EdgeScanOp, ExceptOp,
2816        HorizontalAggregateOp, IntersectOp, LeftJoinOp, LoadDataFormat, LoadDataOp, MapCollectOp,
2817        MergeOp, MergeRelationshipOp, MultiWayJoinOp, OtherwiseOp, ParameterScanOp, RemoveLabelOp,
2818        SetPropertyOp, ShortestPathOp, TripleComponent, TripleScanOp, UnionOp, UnwindOp,
2819    };
2820    use grafeo_core::execution::operators::{Operator, SessionContext};
2821
2822    fn full_store() -> Arc<LpgStore> {
2823        // Richer store so expand and shortest path tests have real data.
2824        let store = Arc::new(LpgStore::new().unwrap());
2825        let vincent = store.create_node(&["Person"]);
2826        let jules = store.create_node(&["Person"]);
2827        let mia = store.create_node(&["Person"]);
2828        let _company = store.create_node(&["Company"]);
2829        store.create_edge(vincent, jules, "KNOWS");
2830        store.create_edge(jules, mia, "KNOWS");
2831        store
2832    }
2833
2834    fn scan_person(var: &str) -> LogicalOperator {
2835        LogicalOperator::NodeScan(NodeScanOp {
2836            variable: var.to_string(),
2837            label: Some("Person".to_string()),
2838            input: None,
2839        })
2840    }
2841
2842    fn scan_any(var: &str) -> LogicalOperator {
2843        LogicalOperator::NodeScan(NodeScanOp {
2844            variable: var.to_string(),
2845            label: None,
2846            input: None,
2847        })
2848    }
2849
2850    // ==================== Builder Methods ====================
2851
2852    #[test]
2853    fn test_with_read_only_flag() {
2854        let store = create_test_store();
2855        let planner =
2856            Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(true);
2857        assert!(planner.read_only);
2858
2859        let planner_off =
2860            Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(false);
2861        assert!(!planner_off.read_only);
2862    }
2863
2864    #[test]
2865    fn test_with_catalog() {
2866        let store = create_test_store();
2867        let catalog = Arc::new(Catalog::new());
2868        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2869            .with_catalog(Arc::clone(&catalog));
2870        assert!(planner.catalog.is_some());
2871    }
2872
2873    #[test]
2874    fn test_with_session_context() {
2875        let store = create_test_store();
2876        let context = SessionContext {
2877            current_schema: Some("public".to_string()),
2878            current_graph: Some("main".to_string()),
2879            ..SessionContext::default()
2880        };
2881        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2882            .with_session_context(context);
2883        assert_eq!(
2884            planner.session_context.current_schema.as_deref(),
2885            Some("public")
2886        );
2887        assert_eq!(
2888            planner.session_context.current_graph.as_deref(),
2889            Some("main")
2890        );
2891    }
2892
2893    // ==================== register_edge_column ====================
2894
2895    #[test]
2896    fn test_register_edge_column_named() {
2897        let store = create_test_store();
2898        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2899        let name = planner.register_edge_column(&Some("r".to_string()));
2900        assert_eq!(name, "r");
2901        assert!(planner.edge_columns.borrow().contains("r"));
2902    }
2903
2904    #[test]
2905    fn test_register_edge_column_anonymous_counter_advances() {
2906        let store = create_test_store();
2907        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2908        let a = planner.register_edge_column(&None);
2909        let b = planner.register_edge_column(&None);
2910        assert_eq!(a, "_anon_edge_0");
2911        assert_eq!(b, "_anon_edge_1");
2912        assert!(planner.edge_columns.borrow().contains("_anon_edge_0"));
2913        assert!(planner.edge_columns.borrow().contains("_anon_edge_1"));
2914    }
2915
2916    // ==================== write_store() error path ====================
2917
2918    #[test]
2919    fn test_create_node_without_write_store_errors() {
2920        // Read-only planner: CREATE should fail with ReadOnly transaction error.
2921        let store = create_test_store();
2922        let planner = Planner::new(store);
2923
2924        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2925            variable: "n".to_string(),
2926            labels: vec!["Person".to_string()],
2927            properties: vec![],
2928            input: None,
2929        }));
2930
2931        let result = planner.plan(&logical);
2932        assert!(result.is_err());
2933    }
2934
2935    // ==================== plan_profiled ====================
2936
2937    #[test]
2938    fn test_plan_profiled_collects_entries() {
2939        let store = create_test_store();
2940        let planner = Planner::new(store);
2941
2942        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2943            items: vec![ReturnItem {
2944                expression: LogicalExpression::Variable("n".to_string()),
2945                alias: None,
2946            }],
2947            distinct: false,
2948            input: Box::new(scan_person("n")),
2949        }));
2950
2951        let (physical, entries) = planner.plan_profiled(&logical).unwrap();
2952        assert_eq!(physical.columns(), &["n"]);
2953        // Post-order: scan, then return (at least two entries).
2954        assert!(
2955            entries.len() >= 2,
2956            "expected entries, got {}",
2957            entries.len()
2958        );
2959        // After profiling, the internal flag is cleared.
2960        assert!(!planner.profiling.get());
2961    }
2962
2963    #[test]
2964    fn test_plan_profiled_propagates_plan_errors() {
2965        let store = create_test_store();
2966        let planner = Planner::new(store);
2967        let logical = LogicalPlan::new(LogicalOperator::Empty);
2968        let result = planner.plan_profiled(&logical);
2969        assert!(result.is_err());
2970        // Profiling must still be reset to false even on error.
2971        assert!(!planner.profiling.get());
2972    }
2973
2974    // ==================== Unsupported operator error paths ====================
2975
2976    #[test]
2977    fn test_plan_edge_scan_is_unsupported() {
2978        // LPG planner does not handle bare EdgeScan; this hits the catch-all branch.
2979        let store = create_test_store();
2980        let planner = Planner::new(store);
2981        let logical = LogicalPlan::new(LogicalOperator::EdgeScan(EdgeScanOp {
2982            variable: "e".to_string(),
2983            edge_types: vec![],
2984            input: None,
2985        }));
2986        let err = planner.plan(&logical).err().expect("plan should fail");
2987        assert!(format!("{err}").contains("Unsupported operator"));
2988    }
2989
2990    #[test]
2991    fn test_plan_triple_scan_is_unsupported() {
2992        let store = create_test_store();
2993        let planner = Planner::new(store);
2994        let logical = LogicalPlan::new(LogicalOperator::TripleScan(TripleScanOp {
2995            subject: TripleComponent::Variable("s".to_string()),
2996            predicate: TripleComponent::Variable("p".to_string()),
2997            object: TripleComponent::Variable("o".to_string()),
2998            graph: None,
2999            input: None,
3000            dataset: None,
3001        }));
3002        assert!(planner.plan(&logical).is_err());
3003    }
3004
3005    #[test]
3006    fn test_plan_bind_is_unsupported() {
3007        let store = create_test_store();
3008        let planner = Planner::new(store);
3009        let logical = LogicalPlan::new(LogicalOperator::Bind(BindOp {
3010            expression: LogicalExpression::Literal(Value::Int64(1)),
3011            variable: "x".to_string(),
3012            input: Box::new(scan_any("n")),
3013        }));
3014        assert!(planner.plan(&logical).is_err());
3015    }
3016
3017    #[test]
3018    fn test_plan_parameter_scan_without_apply_errors() {
3019        let store = create_test_store();
3020        let planner = Planner::new(store);
3021        let logical = LogicalPlan::new(LogicalOperator::ParameterScan(ParameterScanOp {
3022            columns: vec!["n".to_string()],
3023        }));
3024        let err = planner.plan(&logical).err().expect("plan should fail");
3025        assert!(format!("{err}").contains("ParameterScan"));
3026    }
3027
3028    // ==================== plan_operator dispatch branches ====================
3029
3030    #[test]
3031    fn test_plan_union_dispatch() {
3032        let store = create_test_store();
3033        let planner = Planner::new(store);
3034        let logical = LogicalPlan::new(LogicalOperator::Union(UnionOp {
3035            inputs: vec![scan_person("n"), scan_person("n")],
3036        }));
3037        let physical = planner.plan(&logical).unwrap();
3038        assert_eq!(physical.columns(), &["n"]);
3039    }
3040
3041    #[test]
3042    fn test_plan_except_dispatch() {
3043        let store = create_test_store();
3044        let planner = Planner::new(store);
3045        let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3046            left: Box::new(scan_person("n")),
3047            right: Box::new(scan_person("n")),
3048            all: false,
3049        }));
3050        let physical = planner.plan(&logical).unwrap();
3051        assert_eq!(physical.columns(), &["n"]);
3052    }
3053
3054    #[test]
3055    fn test_plan_intersect_dispatch() {
3056        let store = create_test_store();
3057        let planner = Planner::new(store);
3058        let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3059            left: Box::new(scan_person("n")),
3060            right: Box::new(scan_person("n")),
3061            all: false,
3062        }));
3063        let physical = planner.plan(&logical).unwrap();
3064        assert_eq!(physical.columns(), &["n"]);
3065    }
3066
3067    #[test]
3068    fn test_plan_otherwise_dispatch() {
3069        let store = create_test_store();
3070        let planner = Planner::new(store);
3071        let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3072            left: Box::new(scan_person("n")),
3073            right: Box::new(scan_any("n")),
3074        }));
3075        let physical = planner.plan(&logical).unwrap();
3076        assert_eq!(physical.columns(), &["n"]);
3077    }
3078
3079    #[test]
3080    fn test_plan_left_join_dispatch() {
3081        let store = create_test_store();
3082        let planner = Planner::new(store);
3083        let logical = LogicalPlan::new(LogicalOperator::LeftJoin(LeftJoinOp {
3084            left: Box::new(scan_any("a")),
3085            right: Box::new(scan_any("b")),
3086            condition: None,
3087        }));
3088        let physical = planner.plan(&logical).unwrap();
3089        assert!(physical.columns().contains(&"a".to_string()));
3090        assert!(physical.columns().contains(&"b".to_string()));
3091    }
3092
3093    #[test]
3094    fn test_plan_anti_join_dispatch() {
3095        let store = create_test_store();
3096        let planner = Planner::new(store);
3097        let logical = LogicalPlan::new(LogicalOperator::AntiJoin(AntiJoinOp {
3098            left: Box::new(scan_any("a")),
3099            right: Box::new(scan_any("b")),
3100        }));
3101        let physical = planner.plan(&logical).unwrap();
3102        assert!(physical.columns().contains(&"a".to_string()));
3103    }
3104
3105    #[test]
3106    fn test_plan_apply_uncorrelated_dispatch() {
3107        let store = create_test_store();
3108        let planner = Planner::new(store);
3109        let logical = LogicalPlan::new(LogicalOperator::Apply(ApplyOp {
3110            input: Box::new(scan_any("a")),
3111            subplan: Box::new(scan_any("b")),
3112            shared_variables: vec![],
3113            optional: false,
3114        }));
3115        let physical = planner.plan(&logical).unwrap();
3116        assert!(physical.columns().contains(&"a".to_string()));
3117        assert!(physical.columns().contains(&"b".to_string()));
3118    }
3119
3120    #[test]
3121    fn test_plan_unwind_literal_list() {
3122        let store = create_test_store();
3123        let planner = Planner::new(store);
3124
3125        // UNWIND [1,2,3] AS x
3126        let logical = LogicalPlan::new(LogicalOperator::Unwind(UnwindOp {
3127            expression: LogicalExpression::List(vec![
3128                LogicalExpression::Literal(Value::Int64(1)),
3129                LogicalExpression::Literal(Value::Int64(2)),
3130                LogicalExpression::Literal(Value::Int64(3)),
3131            ]),
3132            variable: "x".to_string(),
3133            ordinality_var: None,
3134            offset_var: None,
3135            input: Box::new(LogicalOperator::Empty),
3136        }));
3137        let physical = planner.plan(&logical).unwrap();
3138        assert!(physical.columns().contains(&"x".to_string()));
3139    }
3140
3141    #[test]
3142    fn test_plan_merge_dispatch() {
3143        let store = create_test_store();
3144        let planner = create_writable_planner(&store);
3145
3146        // MERGE (n:Person)
3147        let logical = LogicalPlan::new(LogicalOperator::Merge(MergeOp {
3148            variable: "n".to_string(),
3149            labels: vec!["Person".to_string()],
3150            match_properties: vec![],
3151            on_create: vec![],
3152            on_match: vec![],
3153            input: Box::new(LogicalOperator::Empty),
3154        }));
3155        let physical = planner.plan(&logical).unwrap();
3156        assert!(physical.columns().contains(&"n".to_string()));
3157    }
3158
3159    #[test]
3160    fn test_plan_merge_relationship_dispatch() {
3161        let store = full_store();
3162        let planner = create_writable_planner(&store);
3163
3164        // MATCH (a:Person),(b:Person) MERGE (a)-[r:KNOWS]->(b)
3165        let logical = LogicalPlan::new(LogicalOperator::MergeRelationship(MergeRelationshipOp {
3166            variable: "r".to_string(),
3167            source_variable: "a".to_string(),
3168            target_variable: "b".to_string(),
3169            edge_type: "KNOWS".to_string(),
3170            match_properties: vec![],
3171            on_create: vec![],
3172            on_match: vec![],
3173            input: Box::new(LogicalOperator::Join(JoinOp {
3174                left: Box::new(scan_person("a")),
3175                right: Box::new(scan_person("b")),
3176                join_type: JoinType::Cross,
3177                conditions: vec![],
3178            })),
3179        }));
3180        let physical = planner.plan(&logical).unwrap();
3181        assert!(physical.columns().contains(&"r".to_string()));
3182    }
3183
3184    #[test]
3185    fn test_plan_add_label_dispatch() {
3186        let store = full_store();
3187        let planner = create_writable_planner(&store);
3188        let logical = LogicalPlan::new(LogicalOperator::AddLabel(AddLabelOp {
3189            variable: "n".to_string(),
3190            labels: vec!["VIP".to_string()],
3191            input: Box::new(scan_person("n")),
3192        }));
3193        let physical = planner.plan(&logical).unwrap();
3194        assert!(physical.columns().contains(&"labels_added".to_string()));
3195    }
3196
3197    #[test]
3198    fn test_plan_remove_label_dispatch() {
3199        let store = full_store();
3200        let planner = create_writable_planner(&store);
3201        let logical = LogicalPlan::new(LogicalOperator::RemoveLabel(RemoveLabelOp {
3202            variable: "n".to_string(),
3203            labels: vec!["Person".to_string()],
3204            input: Box::new(scan_person("n")),
3205        }));
3206        let physical = planner.plan(&logical).unwrap();
3207        assert!(physical.columns().contains(&"labels_removed".to_string()));
3208    }
3209
3210    #[test]
3211    fn test_plan_set_property_dispatch() {
3212        let store = full_store();
3213        let planner = create_writable_planner(&store);
3214        let logical = LogicalPlan::new(LogicalOperator::SetProperty(SetPropertyOp {
3215            variable: "n".to_string(),
3216            properties: vec![(
3217                "city".to_string(),
3218                LogicalExpression::Literal(Value::String("Amsterdam".into())),
3219            )],
3220            replace: false,
3221            is_edge: false,
3222            input: Box::new(scan_person("n")),
3223        }));
3224        let physical = planner.plan(&logical).unwrap();
3225        assert!(physical.columns().contains(&"n".to_string()));
3226    }
3227
3228    #[test]
3229    fn test_plan_delete_edge_dispatch() {
3230        let store = full_store();
3231        let planner = create_writable_planner(&store);
3232
3233        // Register the edge column first via an outgoing expand, then DELETE r.
3234        let expand_op = LogicalOperator::Expand(ExpandOp {
3235            from_variable: "a".to_string(),
3236            to_variable: "b".to_string(),
3237            edge_variable: Some("r".to_string()),
3238            direction: ExpandDirection::Outgoing,
3239            edge_types: vec!["KNOWS".to_string()],
3240            min_hops: 1,
3241            max_hops: Some(1),
3242            input: Box::new(scan_person("a")),
3243            path_alias: None,
3244            path_mode: PathMode::Walk,
3245        });
3246        let logical = LogicalPlan::new(LogicalOperator::DeleteEdge(DeleteEdgeOp {
3247            variable: "r".to_string(),
3248            input: Box::new(expand_op),
3249        }));
3250        let physical = planner.plan(&logical).unwrap();
3251        assert!(physical.columns().contains(&"r".to_string()));
3252    }
3253
3254    #[test]
3255    fn test_plan_shortest_path_dispatch() {
3256        let store = full_store();
3257        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3258
3259        // SHORTEST PATH (a)-(b)
3260        let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3261            input: Box::new(LogicalOperator::Join(JoinOp {
3262                left: Box::new(scan_person("a")),
3263                right: Box::new(scan_person("b")),
3264                join_type: JoinType::Cross,
3265                conditions: vec![],
3266            })),
3267            source_var: "a".to_string(),
3268            target_var: "b".to_string(),
3269            edge_types: vec!["KNOWS".to_string()],
3270            direction: ExpandDirection::Outgoing,
3271            path_alias: "p".to_string(),
3272            all_paths: false,
3273        }));
3274        let physical = planner.plan(&logical).unwrap();
3275        assert!(
3276            physical
3277                .columns()
3278                .iter()
3279                .any(|c| c.contains("_path_length_p"))
3280        );
3281    }
3282
3283    #[test]
3284    fn test_plan_shortest_path_missing_source_errors() {
3285        let store = full_store();
3286        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3287        let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3288            input: Box::new(scan_person("a")),
3289            source_var: "missing".to_string(),
3290            target_var: "a".to_string(),
3291            edge_types: vec![],
3292            direction: ExpandDirection::Both,
3293            path_alias: "p".to_string(),
3294            all_paths: false,
3295        }));
3296        let err = planner.plan(&logical).err().expect("plan should fail");
3297        assert!(format!("{err}").contains("Source variable"));
3298    }
3299
3300    #[test]
3301    fn test_plan_map_collect_dispatch() {
3302        // Build rows with two columns named 'k' and 'v', then collect k->v into a map.
3303        let store = create_test_store();
3304        let planner = Planner::new(store);
3305        let input_with_kv = LogicalOperator::Project(crate::query::plan::ProjectOp {
3306            projections: vec![
3307                crate::query::plan::Projection {
3308                    expression: LogicalExpression::Literal(Value::String("key".into())),
3309                    alias: Some("k".to_string()),
3310                },
3311                crate::query::plan::Projection {
3312                    expression: LogicalExpression::Literal(Value::Int64(1)),
3313                    alias: Some("v".to_string()),
3314                },
3315            ],
3316            input: Box::new(scan_person("n")),
3317            pass_through_input: false,
3318        });
3319        let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3320            key_var: "k".to_string(),
3321            value_var: "v".to_string(),
3322            alias: "m".to_string(),
3323            input: Box::new(input_with_kv),
3324        }));
3325        let physical = planner.plan(&logical).unwrap();
3326        assert_eq!(physical.columns(), &["m"]);
3327    }
3328
3329    #[test]
3330    fn test_plan_map_collect_missing_key_errors() {
3331        let store = create_test_store();
3332        let planner = Planner::new(store);
3333        let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3334            key_var: "not_there".to_string(),
3335            value_var: "also_missing".to_string(),
3336            alias: "m".to_string(),
3337            input: Box::new(scan_any("n")),
3338        }));
3339        let err = planner.plan(&logical).err().expect("plan should fail");
3340        let msg = format!("{err}");
3341        assert!(msg.contains("MapCollect key"), "got: {msg}");
3342    }
3343
3344    #[test]
3345    fn test_plan_map_collect_missing_value_errors() {
3346        let store = create_test_store();
3347        let planner = Planner::new(store);
3348        // Input has column "n" so key resolves but value does not.
3349        let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3350            key_var: "n".to_string(),
3351            value_var: "missing_value".to_string(),
3352            alias: "m".to_string(),
3353            input: Box::new(scan_any("n")),
3354        }));
3355        let err = planner.plan(&logical).err().expect("plan should fail");
3356        let msg = format!("{err}");
3357        assert!(msg.contains("MapCollect value"), "got: {msg}");
3358    }
3359
3360    #[test]
3361    fn test_plan_horizontal_aggregate_missing_column_errors() {
3362        let store = create_test_store();
3363        let planner = Planner::new(store);
3364        let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3365            HorizontalAggregateOp {
3366                list_column: "not_a_column".to_string(),
3367                entity_kind: crate::query::plan::EntityKind::Edge,
3368                function: LogicalAggregateFunction::Count,
3369                property: "age".to_string(),
3370                alias: "total".to_string(),
3371                input: Box::new(scan_any("n")),
3372            },
3373        ));
3374        let err = planner.plan(&logical).err().expect("plan should fail");
3375        assert!(format!("{err}").contains("HorizontalAggregate"));
3376    }
3377
3378    #[test]
3379    fn test_plan_load_data_dispatch() {
3380        let store = create_test_store();
3381        let planner = Planner::new(store);
3382        // Path does not need to exist: planning just builds the operator.
3383        let logical = LogicalPlan::new(LogicalOperator::LoadData(LoadDataOp {
3384            format: LoadDataFormat::Csv,
3385            with_headers: true,
3386            path: "/nonexistent/data.csv".to_string(),
3387            variable: "row".to_string(),
3388            field_terminator: Some(','),
3389        }));
3390        let physical = planner.plan(&logical).unwrap();
3391        assert_eq!(physical.columns(), &["row"]);
3392    }
3393
3394    #[test]
3395    fn test_plan_multi_way_join_dispatch() {
3396        // Three-way join over three expand inputs. Some configurations may
3397        // error during planning; we just require no panic.
3398        let store = full_store();
3399        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3400        let ab = LogicalOperator::Expand(ExpandOp {
3401            from_variable: "a".to_string(),
3402            to_variable: "b".to_string(),
3403            edge_variable: None,
3404            direction: ExpandDirection::Outgoing,
3405            edge_types: vec!["KNOWS".to_string()],
3406            min_hops: 1,
3407            max_hops: Some(1),
3408            input: Box::new(scan_person("a")),
3409            path_alias: None,
3410            path_mode: PathMode::Walk,
3411        });
3412        let bc = LogicalOperator::Expand(ExpandOp {
3413            from_variable: "b".to_string(),
3414            to_variable: "c".to_string(),
3415            edge_variable: None,
3416            direction: ExpandDirection::Outgoing,
3417            edge_types: vec!["KNOWS".to_string()],
3418            min_hops: 1,
3419            max_hops: Some(1),
3420            input: Box::new(scan_person("b")),
3421            path_alias: None,
3422            path_mode: PathMode::Walk,
3423        });
3424        let ca = LogicalOperator::Expand(ExpandOp {
3425            from_variable: "c".to_string(),
3426            to_variable: "a".to_string(),
3427            edge_variable: None,
3428            direction: ExpandDirection::Outgoing,
3429            edge_types: vec!["KNOWS".to_string()],
3430            min_hops: 1,
3431            max_hops: Some(1),
3432            input: Box::new(scan_person("c")),
3433            path_alias: None,
3434            path_mode: PathMode::Walk,
3435        });
3436        let logical = LogicalPlan::new(LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3437            inputs: vec![ab, bc, ca],
3438            conditions: vec![],
3439            shared_variables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
3440        }));
3441        let _ = planner.plan(&logical);
3442    }
3443
3444    #[test]
3445    fn test_plan_horizontal_aggregate_dispatch() {
3446        // Variable-length expand produces a list column that the aggregate targets.
3447        let store = full_store();
3448        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3449
3450        let path = LogicalOperator::Expand(ExpandOp {
3451            from_variable: "a".to_string(),
3452            to_variable: "b".to_string(),
3453            edge_variable: Some("r".to_string()),
3454            direction: ExpandDirection::Outgoing,
3455            edge_types: vec!["KNOWS".to_string()],
3456            min_hops: 1,
3457            max_hops: Some(3),
3458            input: Box::new(scan_person("a")),
3459            path_alias: Some("p".to_string()),
3460            path_mode: PathMode::Walk,
3461        });
3462        // Variable-length expand emits a column named _path_edges_p.
3463        let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3464            HorizontalAggregateOp {
3465                list_column: "_path_edges_p".to_string(),
3466                entity_kind: crate::query::plan::EntityKind::Edge,
3467                function: LogicalAggregateFunction::Count,
3468                property: "weight".to_string(),
3469                alias: "edge_count".to_string(),
3470                input: Box::new(path),
3471            },
3472        ));
3473        let physical = planner.plan(&logical).unwrap();
3474        assert!(physical.columns().contains(&"edge_count".to_string()));
3475    }
3476
3477    // ==================== Cardinality estimation branches ====================
3478
3479    #[test]
3480    fn test_plan_adaptive_with_except() {
3481        let store = create_test_store();
3482        let planner = Planner::new(store);
3483        let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3484            left: Box::new(scan_person("n")),
3485            right: Box::new(scan_person("n")),
3486            all: false,
3487        }));
3488        let physical = planner.plan_adaptive(&logical).unwrap();
3489        assert!(physical.adaptive_context.is_some());
3490    }
3491
3492    #[test]
3493    fn test_plan_adaptive_with_intersect() {
3494        let store = create_test_store();
3495        let planner = Planner::new(store);
3496        let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3497            left: Box::new(scan_person("n")),
3498            right: Box::new(scan_any("n")),
3499            all: false,
3500        }));
3501        let physical = planner.plan_adaptive(&logical).unwrap();
3502        assert!(physical.adaptive_context.is_some());
3503    }
3504
3505    #[test]
3506    fn test_plan_adaptive_with_otherwise() {
3507        let store = create_test_store();
3508        let planner = Planner::new(store);
3509        let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3510            left: Box::new(scan_person("n")),
3511            right: Box::new(scan_any("n")),
3512        }));
3513        let physical = planner.plan_adaptive(&logical).unwrap();
3514        assert!(physical.adaptive_context.is_some());
3515    }
3516
3517    // ==================== count_expand_chain edge case ====================
3518
3519    #[test]
3520    fn test_count_expand_chain_variable_length_breaks_chain() {
3521        // A variable-length expand (not single-hop) should NOT count in the chain.
3522        let var_expand = LogicalOperator::Expand(ExpandOp {
3523            from_variable: "a".to_string(),
3524            to_variable: "b".to_string(),
3525            edge_variable: None,
3526            direction: ExpandDirection::Outgoing,
3527            edge_types: vec!["KNOWS".to_string()],
3528            min_hops: 1,
3529            max_hops: Some(3),
3530            input: Box::new(scan_person("a")),
3531            path_alias: None,
3532            path_mode: PathMode::Walk,
3533        });
3534        let (count, _) = Planner::count_expand_chain(&var_expand);
3535        assert_eq!(count, 0);
3536    }
3537
3538    // ==================== StaticResultOperator ====================
3539
3540    #[cfg(feature = "algos")]
3541    #[test]
3542    fn test_static_result_operator_emits_rows_and_resets() {
3543        use grafeo_common::types::Value;
3544        let rows = vec![
3545            vec![Value::Int64(1), Value::String("Vincent".into())],
3546            vec![Value::Int64(2), Value::String("Jules".into())],
3547        ];
3548        let mut op = StaticResultOperator {
3549            rows,
3550            column_indices: vec![0, 1],
3551            row_index: 0,
3552        };
3553        assert_eq!(op.name(), "StaticResult");
3554        let chunk = op.next().unwrap().expect("first chunk");
3555        assert_eq!(chunk.row_count(), 2);
3556        // Exhausted.
3557        assert!(op.next().unwrap().is_none());
3558        // Reset allows re-emitting.
3559        op.reset();
3560        assert!(op.next().unwrap().is_some());
3561        // into_any round trip.
3562        let boxed: Box<dyn Operator> = Box::new(StaticResultOperator {
3563            rows: vec![vec![Value::Null]],
3564            column_indices: vec![0],
3565            row_index: 0,
3566        });
3567        let _any = boxed.into_any();
3568    }
3569
3570    // ==================== VectorScan k-bounding regression ====================
3571
3572    /// `VectorScanOp.k = None` means "return every match," but feeding
3573    /// `usize::MAX` into the store's vector_search degrades HNSW to a full
3574    /// traversal and (pre-saturating-mul) could overflow quantized rescore
3575    /// paths. The planner must bound k to the label's node count (or the
3576    /// global count when no label is set, or zero when the label is unknown).
3577    ///
3578    /// We can't inspect VectorScanOperator.k directly — it's private — so the
3579    /// assertion here is that planning succeeds across all three branches
3580    /// without panicking or surfacing an internal-state error. Paired with
3581    /// `LpgStore::nodes_by_label_count` tests, this guards the full path.
3582    #[cfg(feature = "vector-index")]
3583    #[test]
3584    fn test_plan_vector_scan_k_none_bounds_across_label_states() {
3585        use crate::query::plan::VectorScanOp;
3586
3587        let store = create_test_store();
3588        // Sanity: the test store has 2 Person nodes, 1 Company, 0 Unknown.
3589        assert_eq!(store.nodes_by_label_count("Person"), 2);
3590        assert_eq!(store.nodes_by_label_count("Unknown"), 0);
3591        assert_eq!(store.node_count(), 3);
3592
3593        let planner = Planner::new(store);
3594        let make_scan = |label: Option<&str>| VectorScanOp {
3595            variable: "n".to_string(),
3596            index_name: None,
3597            property: "embedding".to_string(),
3598            label: label.map(str::to_string),
3599            query_vector: LogicalExpression::Literal(Value::List(
3600                vec![
3601                    Value::Float64(1.0),
3602                    Value::Float64(0.0),
3603                    Value::Float64(0.0),
3604                ]
3605                .into(),
3606            )),
3607            k: None,
3608            metric: Some(VectorMetric::Cosine),
3609            min_similarity: Some(0.5),
3610            max_distance: None,
3611            input: None,
3612        };
3613
3614        for label in [Some("Person"), Some("Unknown"), None] {
3615            let (_op, cols) = planner
3616                .plan_vector_scan(&make_scan(label))
3617                .unwrap_or_else(|e| panic!("plan_vector_scan failed for {label:?}: {e:?}"));
3618            assert_eq!(cols[0], "n", "variable column must be first for {label:?}");
3619        }
3620    }
3621}