Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod filter_hybrid;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16#[cfg(feature = "algos")]
17use crate::query::plan::CallProcedureOp;
18#[cfg(feature = "text-index")]
19use crate::query::plan::TextScanOp;
20use crate::query::plan::{
21    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
22    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
23    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
24    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
25    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
26    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
27    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
28};
29#[cfg(feature = "vector-index")]
30use crate::query::plan::{VectorMetric, VectorScanOp};
31use grafeo_common::grafeo_debug_span;
32use grafeo_common::types::{EpochId, TransactionId};
33use grafeo_common::types::{LogicalType, Value};
34use grafeo_common::utils::error::{Error, Result};
35use grafeo_core::execution::AdaptiveContext;
36use grafeo_core::execution::operators::{
37    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
38    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
39    DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
40    ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
41    FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
42    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
43    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
44    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
45    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
46    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
47    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
48    VariableLengthExpandOperator,
49};
50use grafeo_core::graph::{Direction, GraphStoreMut, GraphStoreSearch};
51use std::collections::HashMap;
52use std::sync::Arc;
53
54use crate::query::planner::common;
55use crate::query::planner::common::expression_to_string;
56use crate::query::planner::{
57    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
58    convert_unary_op, value_to_logical_type,
59};
60use crate::transaction::TransactionManager;
61
62/// Range bounds for property-based range queries.
63struct RangeBounds<'a> {
64    min: Option<&'a Value>,
65    max: Option<&'a Value>,
66    min_inclusive: bool,
67    max_inclusive: bool,
68}
69
70/// Converts a logical plan to a physical operator tree for LPG stores.
71pub struct Planner {
72    /// The graph store (read-only operations).
73    pub(super) store: Arc<dyn GraphStoreSearch>,
74    /// Writable graph store (None for read-only databases).
75    pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
76    /// Transaction manager for MVCC operations.
77    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
78    /// Current transaction ID (if in a transaction).
79    pub(super) transaction_id: Option<TransactionId>,
80    /// Epoch to use for visibility checks.
81    pub(super) viewing_epoch: EpochId,
82    /// Counter for generating unique anonymous edge column names.
83    pub(super) anon_edge_counter: std::cell::Cell<u32>,
84    /// Whether to use factorized execution for multi-hop queries.
85    pub(super) factorized_execution: bool,
86    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
87    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
88    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
89    /// Variables that hold edge IDs (from MATCH edge patterns).
90    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
91    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
92    /// Optional constraint validator for schema enforcement during mutations.
93    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
94    /// Catalog for user-defined procedure lookup.
95    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
96    /// Shared parameter state for the currently planning correlated Apply.
97    /// Set by `plan_apply` before planning the inner operator, consumed by
98    /// `plan_operator` when encountering `ParameterScan`.
99    pub(super) correlated_param_state:
100        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
101    /// Variables from variable-length expand patterns (group-list variables).
102    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
103    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
104    /// When true, each physical operator is wrapped in `ProfiledOperator`.
105    profiling: std::cell::Cell<bool>,
106    /// Profile entries collected during planning (post-order).
107    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
108    /// Optional write tracker for recording writes during mutations.
109    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
110    /// Session context for introspection functions (info, schema, current_schema, etc.).
111    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
112    /// When true, expand operators use epoch-only visibility (no MVCC version
113    /// chain walks).  Set when the plan contains no mutations, so PENDING
114    /// writes are impossible to observe.
115    pub(super) read_only: bool,
116}
117
118impl Planner {
119    /// Creates a new planner with the given store.
120    ///
121    /// This creates a planner without transaction context, using the current
122    /// epoch from the store for visibility.
123    #[must_use]
124    pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
125        let epoch = store.current_epoch();
126        Self {
127            store,
128            write_store: None,
129            transaction_manager: None,
130            transaction_id: None,
131            viewing_epoch: epoch,
132            anon_edge_counter: std::cell::Cell::new(0),
133            factorized_execution: true,
134            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
135            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
136            validator: None,
137            catalog: None,
138            correlated_param_state: std::cell::RefCell::new(None),
139            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
140            profiling: std::cell::Cell::new(false),
141            profile_entries: std::cell::RefCell::new(Vec::new()),
142            write_tracker: None,
143            session_context: grafeo_core::execution::operators::SessionContext::default(),
144            read_only: false,
145        }
146    }
147
148    /// Creates a new planner with transaction context for MVCC-aware planning.
149    #[must_use]
150    pub fn with_context(
151        store: Arc<dyn GraphStoreSearch>,
152        write_store: Option<Arc<dyn GraphStoreMut>>,
153        transaction_manager: Arc<TransactionManager>,
154        transaction_id: Option<TransactionId>,
155        viewing_epoch: EpochId,
156    ) -> Self {
157        use crate::transaction::TransactionWriteTracker;
158
159        // Create write tracker when there's an active transaction
160        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
161            if transaction_id.is_some() {
162                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
163                    &transaction_manager,
164                ))))
165            } else {
166                None
167            };
168
169        Self {
170            store,
171            write_store,
172            transaction_manager: Some(transaction_manager),
173            transaction_id,
174            viewing_epoch,
175            anon_edge_counter: std::cell::Cell::new(0),
176            factorized_execution: true,
177            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
178            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
179            validator: None,
180            catalog: None,
181            correlated_param_state: std::cell::RefCell::new(None),
182            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
183            profiling: std::cell::Cell::new(false),
184            profile_entries: std::cell::RefCell::new(Vec::new()),
185            write_tracker,
186            session_context: grafeo_core::execution::operators::SessionContext::default(),
187            read_only: false,
188        }
189    }
190
191    /// Marks this planner as planning a read-only query (no mutations),
192    /// enabling fast-path visibility checks in expand operators.
193    #[must_use]
194    pub fn with_read_only(mut self, read_only: bool) -> Self {
195        self.read_only = read_only;
196        self
197    }
198
199    /// Returns the writable store, or `TransactionError::ReadOnly` if unavailable.
200    fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
201        self.write_store
202            .as_ref()
203            .map(Arc::clone)
204            .ok_or(Error::Transaction(
205                grafeo_common::utils::error::TransactionError::ReadOnly,
206            ))
207    }
208
209    /// Returns the viewing epoch for this planner.
210    #[must_use]
211    pub fn viewing_epoch(&self) -> EpochId {
212        self.viewing_epoch
213    }
214
215    /// Returns the transaction ID for this planner, if any.
216    #[must_use]
217    pub fn transaction_id(&self) -> Option<TransactionId> {
218        self.transaction_id
219    }
220
221    /// Returns a reference to the transaction manager, if available.
222    #[must_use]
223    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
224        self.transaction_manager.as_ref()
225    }
226
227    /// Enables or disables factorized execution for multi-hop queries.
228    #[must_use]
229    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
230        self.factorized_execution = enabled;
231        self
232    }
233
234    /// Sets the constraint validator for schema enforcement during mutations.
235    #[must_use]
236    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
237        self.validator = Some(validator);
238        self
239    }
240
241    /// Sets the catalog for user-defined procedure lookup.
242    #[must_use]
243    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
244        self.catalog = Some(catalog);
245        self
246    }
247
248    /// Sets the session context for introspection functions.
249    #[must_use]
250    pub fn with_session_context(
251        mut self,
252        context: grafeo_core::execution::operators::SessionContext,
253    ) -> Self {
254        self.session_context = context;
255        self
256    }
257
258    /// Generates an edge column name from an expand's edge variable (or an
259    /// anonymous fallback) and registers it in `edge_columns` so downstream
260    /// RETURN emits `EdgeResolve` instead of `NodeResolve`.
261    ///
262    /// Returns the column name for the caller to push into its output columns.
263    pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
264        let name = edge_variable.clone().unwrap_or_else(|| {
265            let count = self.anon_edge_counter.get();
266            self.anon_edge_counter.set(count + 1);
267            format!("_anon_edge_{}", count)
268        });
269        self.edge_columns.borrow_mut().insert(name.clone());
270        name
271    }
272
273    /// Counts consecutive single-hop expand operations.
274    ///
275    /// Returns the count and the deepest non-expand operator (the base of the chain).
276    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
277        match op {
278            LogicalOperator::Expand(expand) => {
279                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
280
281                if is_single_hop {
282                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
283                    (inner_count + 1, base)
284                } else {
285                    (0, op)
286                }
287            }
288            _ => (0, op),
289        }
290    }
291
292    /// Collects expand operations from the outermost down to the base.
293    ///
294    /// Returns expands in order from innermost (base) to outermost.
295    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
296        let mut chain = Vec::new();
297        let mut current = op;
298
299        while let LogicalOperator::Expand(expand) = current {
300            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
301            if !is_single_hop {
302                break;
303            }
304            chain.push(expand);
305            current = &expand.input;
306        }
307
308        chain.reverse();
309        chain
310    }
311
312    /// Plans a logical plan into a physical operator.
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if the logical plan contains unsupported operators
317    /// or invalid expressions.
318    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
319        let _span = grafeo_debug_span!("grafeo::query::plan");
320        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
321        Ok(PhysicalPlan {
322            operator,
323            columns,
324            adaptive_context: None,
325        })
326    }
327
328    /// Plans a logical plan with profiling: each physical operator is wrapped
329    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
330    /// collect row counts and timing. Returns the physical plan together with
331    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
332    /// items in post-order (children before parents).
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the logical plan contains unsupported operators
337    /// or invalid expressions.
338    pub fn plan_profiled(
339        &self,
340        logical_plan: &LogicalPlan,
341    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
342        self.profiling.set(true);
343        self.profile_entries.borrow_mut().clear();
344
345        let result = self.plan_operator(&logical_plan.root);
346
347        self.profiling.set(false);
348        let (operator, columns) = result?;
349        let entries = self.profile_entries.borrow_mut().drain(..).collect();
350
351        Ok((
352            PhysicalPlan {
353                operator,
354                columns,
355                adaptive_context: None,
356            },
357            entries,
358        ))
359    }
360
361    /// Plans a logical plan with adaptive execution support.
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if the logical plan contains unsupported operators
366    /// or invalid expressions.
367    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
368        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
369
370        let mut adaptive_context = AdaptiveContext::new();
371        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
372
373        Ok(PhysicalPlan {
374            operator,
375            columns,
376            adaptive_context: Some(adaptive_context),
377        })
378    }
379
380    /// Collects cardinality estimates from the logical plan into an adaptive context.
381    fn collect_cardinality_estimates(
382        &self,
383        op: &LogicalOperator,
384        ctx: &mut AdaptiveContext,
385        depth: usize,
386    ) {
387        match op {
388            LogicalOperator::NodeScan(scan) => {
389                let estimate = if let Some(label) = &scan.label {
390                    self.store.nodes_by_label(label).len() as f64
391                } else {
392                    self.store.node_count() as f64
393                };
394                let id = format!("scan_{}", scan.variable);
395                ctx.set_estimate(&id, estimate);
396
397                if let Some(input) = &scan.input {
398                    self.collect_cardinality_estimates(input, ctx, depth + 1);
399                }
400            }
401            LogicalOperator::Filter(filter) => {
402                let input_estimate = self.estimate_cardinality(&filter.input);
403                let estimate = input_estimate * 0.3;
404                let id = format!("filter_{depth}");
405                ctx.set_estimate(&id, estimate);
406
407                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
408            }
409            LogicalOperator::Expand(expand) => {
410                let input_estimate = self.estimate_cardinality(&expand.input);
411                let stats = self.store.statistics();
412                let avg_degree = self.estimate_expand_degree(&stats, expand);
413                let estimate = input_estimate * avg_degree;
414                let id = format!("expand_{}", expand.to_variable);
415                ctx.set_estimate(&id, estimate);
416
417                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
418            }
419            LogicalOperator::Join(join) => {
420                let left_est = self.estimate_cardinality(&join.left);
421                let right_est = self.estimate_cardinality(&join.right);
422                let estimate = (left_est * right_est).sqrt();
423                let id = format!("join_{depth}");
424                ctx.set_estimate(&id, estimate);
425
426                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
427                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
428            }
429            LogicalOperator::Aggregate(agg) => {
430                let input_estimate = self.estimate_cardinality(&agg.input);
431                let estimate = if agg.group_by.is_empty() {
432                    1.0
433                } else {
434                    (input_estimate * 0.1).max(1.0)
435                };
436                let id = format!("aggregate_{depth}");
437                ctx.set_estimate(&id, estimate);
438
439                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
440            }
441            LogicalOperator::Distinct(distinct) => {
442                let input_estimate = self.estimate_cardinality(&distinct.input);
443                let estimate = (input_estimate * 0.5).max(1.0);
444                let id = format!("distinct_{depth}");
445                ctx.set_estimate(&id, estimate);
446
447                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
448            }
449            LogicalOperator::Return(ret) => {
450                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
451            }
452            LogicalOperator::Limit(limit) => {
453                let input_estimate = self.estimate_cardinality(&limit.input);
454                let estimate = (input_estimate).min(limit.count.estimate());
455                let id = format!("limit_{depth}");
456                ctx.set_estimate(&id, estimate);
457
458                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
459            }
460            LogicalOperator::Skip(skip) => {
461                let input_estimate = self.estimate_cardinality(&skip.input);
462                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
463                let id = format!("skip_{depth}");
464                ctx.set_estimate(&id, estimate);
465
466                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
467            }
468            LogicalOperator::Sort(sort) => {
469                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
470            }
471            LogicalOperator::Union(union) => {
472                let estimate: f64 = union
473                    .inputs
474                    .iter()
475                    .map(|input| self.estimate_cardinality(input))
476                    .sum();
477                let id = format!("union_{depth}");
478                ctx.set_estimate(&id, estimate);
479
480                for input in &union.inputs {
481                    self.collect_cardinality_estimates(input, ctx, depth + 1);
482                }
483            }
484            _ => {
485                // For other operators, try to recurse into known input patterns
486            }
487        }
488    }
489
490    /// Estimates cardinality for a logical operator subtree.
491    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
492        match op {
493            LogicalOperator::NodeScan(scan) => {
494                if let Some(label) = &scan.label {
495                    self.store.nodes_by_label(label).len() as f64
496                } else {
497                    self.store.node_count() as f64
498                }
499            }
500            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
501            LogicalOperator::Expand(expand) => {
502                let stats = self.store.statistics();
503                let avg_degree = self.estimate_expand_degree(&stats, expand);
504                self.estimate_cardinality(&expand.input) * avg_degree
505            }
506            LogicalOperator::Join(join) => {
507                let left = self.estimate_cardinality(&join.left);
508                let right = self.estimate_cardinality(&join.right);
509                (left * right).sqrt()
510            }
511            LogicalOperator::Aggregate(agg) => {
512                if agg.group_by.is_empty() {
513                    1.0
514                } else {
515                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
516                }
517            }
518            LogicalOperator::Distinct(distinct) => {
519                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
520            }
521            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
522            LogicalOperator::Limit(limit) => self
523                .estimate_cardinality(&limit.input)
524                .min(limit.count.estimate()),
525            LogicalOperator::Skip(skip) => {
526                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
527            }
528            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
529            LogicalOperator::Union(union) => union
530                .inputs
531                .iter()
532                .map(|input| self.estimate_cardinality(input))
533                .sum(),
534            LogicalOperator::Except(except) => {
535                let left = self.estimate_cardinality(&except.left);
536                let right = self.estimate_cardinality(&except.right);
537                (left - right).max(0.0)
538            }
539            LogicalOperator::Intersect(intersect) => {
540                let left = self.estimate_cardinality(&intersect.left);
541                let right = self.estimate_cardinality(&intersect.right);
542                left.min(right)
543            }
544            LogicalOperator::Otherwise(otherwise) => self
545                .estimate_cardinality(&otherwise.left)
546                .max(self.estimate_cardinality(&otherwise.right)),
547            _ => 1000.0,
548        }
549    }
550
551    /// Estimates the average edge degree for an expand operation using store statistics.
552    fn estimate_expand_degree(
553        &self,
554        stats: &grafeo_core::statistics::Statistics,
555        expand: &ExpandOp,
556    ) -> f64 {
557        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
558        if expand.edge_types.len() == 1 {
559            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
560        } else if stats.total_nodes > 0 {
561            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
562        } else {
563            10.0
564        }
565    }
566
567    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
568    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
569    fn maybe_profile(
570        &self,
571        result: Result<(Box<dyn Operator>, Vec<String>)>,
572        op: &LogicalOperator,
573    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
574        if self.profiling.get() {
575            let (physical, columns) = result?;
576            let (entry, stats) =
577                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
578            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
579            self.profile_entries.borrow_mut().push(entry);
580            Ok((Box::new(profiled), columns))
581        } else {
582            result
583        }
584    }
585
586    /// Plans a single logical operator.
587    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
588        let result = match op {
589            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
590            LogicalOperator::Expand(expand) => {
591                if self.factorized_execution {
592                    let (chain_len, _base) = Self::count_expand_chain(op);
593                    if chain_len >= 2 {
594                        return self.maybe_profile(self.plan_expand_chain(op), op);
595                    }
596                }
597                self.plan_expand(expand)
598            }
599            LogicalOperator::Return(ret) => self.plan_return(ret),
600            LogicalOperator::Filter(filter) => self.plan_filter(filter),
601            LogicalOperator::Project(project) => self.plan_project(project),
602            LogicalOperator::Limit(limit) => self.plan_limit(limit),
603            LogicalOperator::Skip(skip) => self.plan_skip(skip),
604            LogicalOperator::Sort(sort) => self.plan_sort(sort),
605            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
606            LogicalOperator::Join(join) => self.plan_join(join),
607            LogicalOperator::Union(union) => self.plan_union(union),
608            LogicalOperator::Except(except) => self.plan_except(except),
609            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
610            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
611            LogicalOperator::Apply(apply) => self.plan_apply(apply),
612            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
613            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
614            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
615            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
616            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
617            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
618            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
619            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
620            LogicalOperator::Merge(merge) => self.plan_merge(merge),
621            LogicalOperator::MergeRelationship(merge_rel) => {
622                self.plan_merge_relationship(merge_rel)
623            }
624            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
625            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
626            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
627            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
628            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
629            #[cfg(feature = "algos")]
630            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
631            #[cfg(not(feature = "algos"))]
632            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
633                "CALL procedures require the 'algos' feature".to_string(),
634            )),
635            LogicalOperator::ParameterScan(_param_scan) => {
636                let state = self
637                    .correlated_param_state
638                    .borrow()
639                    .clone()
640                    .ok_or_else(|| {
641                        Error::Internal(
642                            "ParameterScan without correlated Apply context".to_string(),
643                        )
644                    })?;
645                // Use the actual column names from the ParameterState (which may
646                // have been expanded from "*" to real variable names in plan_apply)
647                let columns = state.columns.clone();
648                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
649                Ok((operator, columns))
650            }
651            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
652            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
653            LogicalOperator::LoadData(load) => {
654                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
655                    load.path.clone(),
656                    load.format,
657                    load.with_headers,
658                    load.field_terminator,
659                    load.variable.clone(),
660                ));
661                Ok((operator, vec![load.variable.clone()]))
662            }
663            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
664            #[cfg(feature = "vector-index")]
665            LogicalOperator::VectorScan(scan) => self.plan_vector_scan(scan),
666            #[cfg(not(feature = "vector-index"))]
667            LogicalOperator::VectorScan(_) => Err(Error::Internal(
668                "VectorScan requires vector-index feature".to_string(),
669            )),
670            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
671                "VectorJoin requires vector-index feature".to_string(),
672            )),
673            #[cfg(feature = "text-index")]
674            LogicalOperator::TextScan(scan) => self.plan_text_scan(scan),
675            #[cfg(not(feature = "text-index"))]
676            LogicalOperator::TextScan(_) => Err(Error::Internal(
677                "TextScan requires text-index feature".to_string(),
678            )),
679            _ => Err(Error::Internal(format!(
680                "Unsupported operator: {:?}",
681                std::mem::discriminant(op)
682            ))),
683        };
684        self.maybe_profile(result, op)
685    }
686
687    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
688    fn plan_horizontal_aggregate(
689        &self,
690        ha: &HorizontalAggregateOp,
691    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
692        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
693
694        let list_col_idx = child_columns
695            .iter()
696            .position(|c| c == &ha.list_column)
697            .ok_or_else(|| {
698                Error::Internal(format!(
699                    "HorizontalAggregate list column '{}' not found in {:?}",
700                    ha.list_column, child_columns
701                ))
702            })?;
703
704        let entity_kind = match ha.entity_kind {
705            LogicalEntityKind::Edge => EntityKind::Edge,
706            LogicalEntityKind::Node => EntityKind::Node,
707        };
708
709        let function = convert_aggregate_function(ha.function);
710        let input_column_count = child_columns.len();
711
712        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
713            child_op,
714            list_col_idx,
715            entity_kind,
716            function,
717            ha.property.clone(),
718            Arc::clone(&self.store) as Arc<dyn GraphStoreSearch>,
719            input_column_count,
720        ));
721
722        let mut columns = child_columns;
723        columns.push(ha.alias.clone());
724        // Mark the result as a scalar column
725        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
726
727        Ok((operator, columns))
728    }
729
730    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
731    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
732        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
733        let key_idx = child_columns
734            .iter()
735            .position(|c| c == &mc.key_var)
736            .ok_or_else(|| {
737                Error::Internal(format!(
738                    "MapCollect key '{}' not in columns {:?}",
739                    mc.key_var, child_columns
740                ))
741            })?;
742        let value_idx = child_columns
743            .iter()
744            .position(|c| c == &mc.value_var)
745            .ok_or_else(|| {
746                Error::Internal(format!(
747                    "MapCollect value '{}' not in columns {:?}",
748                    mc.value_var, child_columns
749                ))
750            })?;
751        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
752        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
753        Ok((operator, vec![mc.alias.clone()]))
754    }
755
756    /// Plans a text search scan operator using BM25 inverted index.
757    #[cfg(feature = "text-index")]
758    fn plan_text_scan(&self, scan: &TextScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
759        use grafeo_core::execution::operators::TextScanOperator;
760
761        let query_string = match &scan.query {
762            LogicalExpression::Literal(Value::String(s)) => s.to_string(),
763            LogicalExpression::Parameter(name) => {
764                return Err(Error::Internal(format!(
765                    "TextScan query parameter ${} not resolved",
766                    name
767                )));
768            }
769            _ => {
770                return Err(Error::Internal(
771                    "TextScan query must be a string literal or parameter".to_string(),
772                ));
773            }
774        };
775
776        let operator: Box<dyn Operator> = if let Some(k) = scan.k {
777            Box::new(TextScanOperator::top_k(
778                Arc::clone(&self.store),
779                &scan.label,
780                &scan.property,
781                &query_string,
782                k,
783            ))
784        } else if let Some(threshold) = scan.threshold {
785            Box::new(TextScanOperator::with_threshold(
786                Arc::clone(&self.store),
787                &scan.label,
788                &scan.property,
789                &query_string,
790                threshold,
791            ))
792        } else {
793            Box::new(TextScanOperator::top_k(
794                Arc::clone(&self.store),
795                &scan.label,
796                &scan.property,
797                &query_string,
798                100,
799            ))
800        };
801
802        let mut columns = vec![scan.variable.clone()];
803        if let Some(ref score_col) = scan.score_column {
804            columns.push(score_col.clone());
805        }
806
807        Ok((operator, columns))
808    }
809
810    /// Plans a VectorScan logical operator into a physical VectorScanOperator.
811    #[cfg(feature = "vector-index")]
812    pub(super) fn plan_vector_scan(
813        &self,
814        scan: &VectorScanOp,
815    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
816        use grafeo_core::execution::operators::VectorScanOperator;
817        use grafeo_core::index::vector::DistanceMetric;
818
819        // Hybrid shape `VectorScan(input=graph_pattern)` is not supported by
820        // the physical VectorScanOperator: it has no input slot and would
821        // silently drop upstream bindings. Reject rather than plan it
822        // incorrectly; callers should build a VectorJoin for this case.
823        if scan.input.is_some() {
824            return Err(Error::Internal(
825                "VectorScan with an input subtree is not supported, use VectorJoin for hybrid graph+vector queries".to_string(),
826            ));
827        }
828
829        let query_vec = self.resolve_vector_literal(&scan.query_vector)?;
830
831        let requested_metric = scan.metric.map(|m| match m {
832            VectorMetric::Cosine => DistanceMetric::Cosine,
833            VectorMetric::Euclidean => DistanceMetric::Euclidean,
834            VectorMetric::DotProduct => DistanceMetric::DotProduct,
835            VectorMetric::Manhattan => DistanceMetric::Manhattan,
836        });
837
838        let k = scan.k.unwrap_or(usize::MAX);
839
840        // Pick the metric we'll execute under. When the user asked for a
841        // specific one, honor it. Otherwise inherit the index's metric (so a
842        // cosine-built index drives cosine scoring) or default to Cosine for
843        // the unindexed brute-force path.
844        let index_metric = scan
845            .label
846            .as_ref()
847            .and_then(|label| self.store.vector_index_metric(label, &scan.property));
848        let metric = requested_metric
849            .or(index_metric)
850            .unwrap_or(DistanceMetric::Cosine);
851
852        // The store's vector_search routes HNSW when an index exists whose
853        // metric matches `metric`, and brute-force scan otherwise. No handle
854        // downcast; no Arc<dyn Any>.
855        let mut operator = VectorScanOperator::new(
856            Arc::clone(&self.store),
857            scan.label.clone(),
858            scan.property.clone(),
859            query_vec,
860            k,
861            metric,
862        );
863
864        if let Some(sim) = scan.min_similarity {
865            operator = operator.with_min_similarity(sim);
866        }
867        if let Some(dist) = scan.max_distance {
868            operator = operator.with_max_distance(dist);
869        }
870
871        let mut columns = vec![scan.variable.clone()];
872        // VectorScan always projects a score column keyed by the resolved
873        // metric (after index-driven fallback) so downstream score reuse
874        // matches the actual distance values written out.
875        let metric_tag = match metric {
876            DistanceMetric::Cosine => "cos",
877            DistanceMetric::Euclidean => "euc",
878            DistanceMetric::DotProduct => "dot",
879            DistanceMetric::Manhattan => "man",
880            // Future metrics (DistanceMetric is #[non_exhaustive]) fall back
881            // to a generic tag so they still produce a stable column name.
882            _ => "other",
883        };
884        columns.push(project::vector_score_column_name(
885            metric_tag,
886            &scan.property,
887            &scan.variable,
888            &scan.query_vector,
889        ));
890
891        Ok((Box::new(operator), columns))
892    }
893
894    /// Resolves a LogicalExpression to a Vec<f32> for vector operations.
895    #[cfg(feature = "vector-index")]
896    pub(super) fn resolve_vector_literal(&self, expr: &LogicalExpression) -> Result<Vec<f32>> {
897        // f64→f32 precision loss throughout is intentional: vectors are stored and searched as f32.
898        #[allow(clippy::cast_possible_truncation)]
899        match expr {
900            LogicalExpression::Literal(Value::Vector(v)) => Ok(v.to_vec()),
901            LogicalExpression::Literal(Value::List(list)) => {
902                let mut vec = Vec::with_capacity(list.len());
903                for item in list.iter() {
904                    match item {
905                        Value::Float64(f) => vec.push(*f as f32),
906                        Value::Int64(i) => vec.push(*i as f32),
907                        _ => {
908                            return Err(Error::Internal(
909                                "Vector elements must be numeric".to_string(),
910                            ));
911                        }
912                    }
913                }
914                Ok(vec)
915            }
916            // GQL/Cypher parser produces List([Literal(Float64), ...]) for inline vectors like
917            // [0.9, 0.1, 0.0] — handle this form by recursively resolving each element.
918            LogicalExpression::List(items) => {
919                let mut vec = Vec::with_capacity(items.len());
920                for item in items {
921                    match item {
922                        LogicalExpression::Literal(Value::Float64(f)) => vec.push(*f as f32),
923                        LogicalExpression::Literal(Value::Int64(i)) => vec.push(*i as f32),
924                        _ => {
925                            return Err(Error::Internal(
926                                "Vector elements must be numeric literals".to_string(),
927                            ));
928                        }
929                    }
930                }
931                Ok(vec)
932            }
933            _ => Err(Error::Internal("Expected vector literal".to_string())),
934        }
935    }
936}
937
938/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
939#[cfg(feature = "algos")]
940struct StaticResultOperator {
941    rows: Vec<Vec<Value>>,
942    column_indices: Vec<usize>,
943    row_index: usize,
944}
945
946#[cfg(feature = "algos")]
947impl Operator for StaticResultOperator {
948    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
949        use grafeo_core::execution::DataChunk;
950
951        if self.row_index >= self.rows.len() {
952            return Ok(None);
953        }
954
955        let remaining = self.rows.len() - self.row_index;
956        let chunk_rows = remaining.min(1024);
957        let col_count = self.column_indices.len();
958
959        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
960        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
961
962        for row_offset in 0..chunk_rows {
963            let row = &self.rows[self.row_index + row_offset];
964            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
965                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
966                if let Some(col) = chunk.column_mut(col_idx) {
967                    col.push_value(value);
968                }
969            }
970        }
971        chunk.set_count(chunk_rows);
972
973        self.row_index += chunk_rows;
974        Ok(Some(chunk))
975    }
976
977    fn reset(&mut self) {
978        self.row_index = 0;
979    }
980
981    fn name(&self) -> &'static str {
982        "StaticResult"
983    }
984
985    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
986        self
987    }
988}
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993    use crate::query::plan::{
994        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
995        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
996        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
997        SkipOp as LogicalSkipOp, SortKey, SortOp,
998    };
999    use grafeo_common::types::Value;
1000    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
1001    use grafeo_core::graph::GraphStoreMut;
1002    use grafeo_core::graph::lpg::LpgStore;
1003
1004    fn create_test_store() -> Arc<LpgStore> {
1005        let store = Arc::new(LpgStore::new().unwrap());
1006        store.create_node(&["Person"]);
1007        store.create_node(&["Person"]);
1008        store.create_node(&["Company"]);
1009        store
1010    }
1011
1012    // ==================== Simple Scan Tests ====================
1013
1014    #[test]
1015    fn test_plan_simple_scan() {
1016        let store = create_test_store();
1017        let planner = Planner::new(store);
1018
1019        // MATCH (n:Person) RETURN n
1020        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1021            items: vec![ReturnItem {
1022                expression: LogicalExpression::Variable("n".to_string()),
1023                alias: None,
1024            }],
1025            distinct: false,
1026            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1027                variable: "n".to_string(),
1028                label: Some("Person".to_string()),
1029                input: None,
1030            })),
1031        }));
1032
1033        let physical = planner.plan(&logical).unwrap();
1034        assert_eq!(physical.columns(), &["n"]);
1035    }
1036
1037    #[test]
1038    fn test_plan_scan_without_label() {
1039        let store = create_test_store();
1040        let planner = Planner::new(store);
1041
1042        // MATCH (n) RETURN n
1043        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1044            items: vec![ReturnItem {
1045                expression: LogicalExpression::Variable("n".to_string()),
1046                alias: None,
1047            }],
1048            distinct: false,
1049            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1050                variable: "n".to_string(),
1051                label: None,
1052                input: None,
1053            })),
1054        }));
1055
1056        let physical = planner.plan(&logical).unwrap();
1057        assert_eq!(physical.columns(), &["n"]);
1058    }
1059
1060    #[test]
1061    fn test_plan_return_with_alias() {
1062        let store = create_test_store();
1063        let planner = Planner::new(store);
1064
1065        // MATCH (n:Person) RETURN n AS person
1066        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1067            items: vec![ReturnItem {
1068                expression: LogicalExpression::Variable("n".to_string()),
1069                alias: Some("person".to_string()),
1070            }],
1071            distinct: false,
1072            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1073                variable: "n".to_string(),
1074                label: Some("Person".to_string()),
1075                input: None,
1076            })),
1077        }));
1078
1079        let physical = planner.plan(&logical).unwrap();
1080        assert_eq!(physical.columns(), &["person"]);
1081    }
1082
1083    #[test]
1084    fn test_plan_return_property() {
1085        let store = create_test_store();
1086        let planner = Planner::new(store);
1087
1088        // MATCH (n:Person) RETURN n.name
1089        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1090            items: vec![ReturnItem {
1091                expression: LogicalExpression::Property {
1092                    variable: "n".to_string(),
1093                    property: "name".to_string(),
1094                },
1095                alias: None,
1096            }],
1097            distinct: false,
1098            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1099                variable: "n".to_string(),
1100                label: Some("Person".to_string()),
1101                input: None,
1102            })),
1103        }));
1104
1105        let physical = planner.plan(&logical).unwrap();
1106        assert_eq!(physical.columns(), &["n.name"]);
1107    }
1108
1109    #[test]
1110    fn test_plan_return_literal() {
1111        let store = create_test_store();
1112        let planner = Planner::new(store);
1113
1114        // MATCH (n) RETURN 42 AS answer
1115        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1116            items: vec![ReturnItem {
1117                expression: LogicalExpression::Literal(Value::Int64(42)),
1118                alias: Some("answer".to_string()),
1119            }],
1120            distinct: false,
1121            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1122                variable: "n".to_string(),
1123                label: None,
1124                input: None,
1125            })),
1126        }));
1127
1128        let physical = planner.plan(&logical).unwrap();
1129        assert_eq!(physical.columns(), &["answer"]);
1130    }
1131
1132    // ==================== Filter Tests ====================
1133
1134    #[test]
1135    fn test_plan_filter_equality() {
1136        let store = create_test_store();
1137        let planner = Planner::new(store);
1138
1139        // MATCH (n:Person) WHERE n.age = 30 RETURN n
1140        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1141            items: vec![ReturnItem {
1142                expression: LogicalExpression::Variable("n".to_string()),
1143                alias: None,
1144            }],
1145            distinct: false,
1146            input: Box::new(LogicalOperator::Filter(FilterOp {
1147                predicate: LogicalExpression::Binary {
1148                    left: Box::new(LogicalExpression::Property {
1149                        variable: "n".to_string(),
1150                        property: "age".to_string(),
1151                    }),
1152                    op: BinaryOp::Eq,
1153                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1154                },
1155                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1156                    variable: "n".to_string(),
1157                    label: Some("Person".to_string()),
1158                    input: None,
1159                })),
1160                pushdown_hint: None,
1161            })),
1162        }));
1163
1164        let physical = planner.plan(&logical).unwrap();
1165        assert_eq!(physical.columns(), &["n"]);
1166    }
1167
1168    #[test]
1169    fn test_plan_filter_compound_and() {
1170        let store = create_test_store();
1171        let planner = Planner::new(store);
1172
1173        // WHERE n.age > 20 AND n.age < 40
1174        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1175            items: vec![ReturnItem {
1176                expression: LogicalExpression::Variable("n".to_string()),
1177                alias: None,
1178            }],
1179            distinct: false,
1180            input: Box::new(LogicalOperator::Filter(FilterOp {
1181                predicate: LogicalExpression::Binary {
1182                    left: Box::new(LogicalExpression::Binary {
1183                        left: Box::new(LogicalExpression::Property {
1184                            variable: "n".to_string(),
1185                            property: "age".to_string(),
1186                        }),
1187                        op: BinaryOp::Gt,
1188                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1189                    }),
1190                    op: BinaryOp::And,
1191                    right: Box::new(LogicalExpression::Binary {
1192                        left: Box::new(LogicalExpression::Property {
1193                            variable: "n".to_string(),
1194                            property: "age".to_string(),
1195                        }),
1196                        op: BinaryOp::Lt,
1197                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1198                    }),
1199                },
1200                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1201                    variable: "n".to_string(),
1202                    label: None,
1203                    input: None,
1204                })),
1205                pushdown_hint: None,
1206            })),
1207        }));
1208
1209        let physical = planner.plan(&logical).unwrap();
1210        assert_eq!(physical.columns(), &["n"]);
1211    }
1212
1213    #[test]
1214    fn test_plan_filter_unary_not() {
1215        let store = create_test_store();
1216        let planner = Planner::new(store);
1217
1218        // WHERE NOT n.active
1219        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1220            items: vec![ReturnItem {
1221                expression: LogicalExpression::Variable("n".to_string()),
1222                alias: None,
1223            }],
1224            distinct: false,
1225            input: Box::new(LogicalOperator::Filter(FilterOp {
1226                predicate: LogicalExpression::Unary {
1227                    op: UnaryOp::Not,
1228                    operand: Box::new(LogicalExpression::Property {
1229                        variable: "n".to_string(),
1230                        property: "active".to_string(),
1231                    }),
1232                },
1233                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1234                    variable: "n".to_string(),
1235                    label: None,
1236                    input: None,
1237                })),
1238                pushdown_hint: None,
1239            })),
1240        }));
1241
1242        let physical = planner.plan(&logical).unwrap();
1243        assert_eq!(physical.columns(), &["n"]);
1244    }
1245
1246    #[test]
1247    fn test_plan_filter_is_null() {
1248        let store = create_test_store();
1249        let planner = Planner::new(store);
1250
1251        // WHERE n.email IS NULL
1252        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1253            items: vec![ReturnItem {
1254                expression: LogicalExpression::Variable("n".to_string()),
1255                alias: None,
1256            }],
1257            distinct: false,
1258            input: Box::new(LogicalOperator::Filter(FilterOp {
1259                predicate: LogicalExpression::Unary {
1260                    op: UnaryOp::IsNull,
1261                    operand: Box::new(LogicalExpression::Property {
1262                        variable: "n".to_string(),
1263                        property: "email".to_string(),
1264                    }),
1265                },
1266                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1267                    variable: "n".to_string(),
1268                    label: None,
1269                    input: None,
1270                })),
1271                pushdown_hint: None,
1272            })),
1273        }));
1274
1275        let physical = planner.plan(&logical).unwrap();
1276        assert_eq!(physical.columns(), &["n"]);
1277    }
1278
1279    #[test]
1280    fn test_plan_filter_function_call() {
1281        let store = create_test_store();
1282        let planner = Planner::new(store);
1283
1284        // WHERE size(n.friends) > 0
1285        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1286            items: vec![ReturnItem {
1287                expression: LogicalExpression::Variable("n".to_string()),
1288                alias: None,
1289            }],
1290            distinct: false,
1291            input: Box::new(LogicalOperator::Filter(FilterOp {
1292                predicate: LogicalExpression::Binary {
1293                    left: Box::new(LogicalExpression::FunctionCall {
1294                        name: "size".to_string(),
1295                        args: vec![LogicalExpression::Property {
1296                            variable: "n".to_string(),
1297                            property: "friends".to_string(),
1298                        }],
1299                        distinct: false,
1300                    }),
1301                    op: BinaryOp::Gt,
1302                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1303                },
1304                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1305                    variable: "n".to_string(),
1306                    label: None,
1307                    input: None,
1308                })),
1309                pushdown_hint: None,
1310            })),
1311        }));
1312
1313        let physical = planner.plan(&logical).unwrap();
1314        assert_eq!(physical.columns(), &["n"]);
1315    }
1316
1317    // ==================== Expand Tests ====================
1318
1319    #[test]
1320    fn test_plan_expand_outgoing() {
1321        let store = create_test_store();
1322        let planner = Planner::new(store);
1323
1324        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1325        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1326            items: vec![
1327                ReturnItem {
1328                    expression: LogicalExpression::Variable("a".to_string()),
1329                    alias: None,
1330                },
1331                ReturnItem {
1332                    expression: LogicalExpression::Variable("b".to_string()),
1333                    alias: None,
1334                },
1335            ],
1336            distinct: false,
1337            input: Box::new(LogicalOperator::Expand(ExpandOp {
1338                from_variable: "a".to_string(),
1339                to_variable: "b".to_string(),
1340                edge_variable: None,
1341                direction: ExpandDirection::Outgoing,
1342                edge_types: vec!["KNOWS".to_string()],
1343                min_hops: 1,
1344                max_hops: Some(1),
1345                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1346                    variable: "a".to_string(),
1347                    label: Some("Person".to_string()),
1348                    input: None,
1349                })),
1350                path_alias: None,
1351                path_mode: PathMode::Walk,
1352            })),
1353        }));
1354
1355        let physical = planner.plan(&logical).unwrap();
1356        // The return should have columns [a, b]
1357        assert!(physical.columns().contains(&"a".to_string()));
1358        assert!(physical.columns().contains(&"b".to_string()));
1359    }
1360
1361    #[test]
1362    fn test_plan_expand_with_edge_variable() {
1363        let store = create_test_store();
1364        let planner = Planner::new(store);
1365
1366        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1367        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1368            items: vec![
1369                ReturnItem {
1370                    expression: LogicalExpression::Variable("a".to_string()),
1371                    alias: None,
1372                },
1373                ReturnItem {
1374                    expression: LogicalExpression::Variable("r".to_string()),
1375                    alias: None,
1376                },
1377                ReturnItem {
1378                    expression: LogicalExpression::Variable("b".to_string()),
1379                    alias: None,
1380                },
1381            ],
1382            distinct: false,
1383            input: Box::new(LogicalOperator::Expand(ExpandOp {
1384                from_variable: "a".to_string(),
1385                to_variable: "b".to_string(),
1386                edge_variable: Some("r".to_string()),
1387                direction: ExpandDirection::Outgoing,
1388                edge_types: vec!["KNOWS".to_string()],
1389                min_hops: 1,
1390                max_hops: Some(1),
1391                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1392                    variable: "a".to_string(),
1393                    label: None,
1394                    input: None,
1395                })),
1396                path_alias: None,
1397                path_mode: PathMode::Walk,
1398            })),
1399        }));
1400
1401        let physical = planner.plan(&logical).unwrap();
1402        assert!(physical.columns().contains(&"a".to_string()));
1403        assert!(physical.columns().contains(&"r".to_string()));
1404        assert!(physical.columns().contains(&"b".to_string()));
1405    }
1406
1407    // ==================== Limit/Skip/Sort Tests ====================
1408
1409    #[test]
1410    fn test_plan_limit() {
1411        let store = create_test_store();
1412        let planner = Planner::new(store);
1413
1414        // MATCH (n) RETURN n LIMIT 10
1415        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1416            items: vec![ReturnItem {
1417                expression: LogicalExpression::Variable("n".to_string()),
1418                alias: None,
1419            }],
1420            distinct: false,
1421            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1422                count: 10.into(),
1423                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1424                    variable: "n".to_string(),
1425                    label: None,
1426                    input: None,
1427                })),
1428            })),
1429        }));
1430
1431        let physical = planner.plan(&logical).unwrap();
1432        assert_eq!(physical.columns(), &["n"]);
1433    }
1434
1435    #[test]
1436    fn test_plan_skip() {
1437        let store = create_test_store();
1438        let planner = Planner::new(store);
1439
1440        // MATCH (n) RETURN n SKIP 5
1441        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1442            items: vec![ReturnItem {
1443                expression: LogicalExpression::Variable("n".to_string()),
1444                alias: None,
1445            }],
1446            distinct: false,
1447            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1448                count: 5.into(),
1449                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1450                    variable: "n".to_string(),
1451                    label: None,
1452                    input: None,
1453                })),
1454            })),
1455        }));
1456
1457        let physical = planner.plan(&logical).unwrap();
1458        assert_eq!(physical.columns(), &["n"]);
1459    }
1460
1461    #[test]
1462    fn test_plan_sort() {
1463        let store = create_test_store();
1464        let planner = Planner::new(store);
1465
1466        // MATCH (n) RETURN n ORDER BY n.name ASC
1467        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1468            items: vec![ReturnItem {
1469                expression: LogicalExpression::Variable("n".to_string()),
1470                alias: None,
1471            }],
1472            distinct: false,
1473            input: Box::new(LogicalOperator::Sort(SortOp {
1474                keys: vec![SortKey {
1475                    expression: LogicalExpression::Variable("n".to_string()),
1476                    order: SortOrder::Ascending,
1477                    nulls: None,
1478                }],
1479                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1480                    variable: "n".to_string(),
1481                    label: None,
1482                    input: None,
1483                })),
1484            })),
1485        }));
1486
1487        let physical = planner.plan(&logical).unwrap();
1488        assert_eq!(physical.columns(), &["n"]);
1489    }
1490
1491    #[test]
1492    fn test_plan_sort_descending() {
1493        let store = create_test_store();
1494        let planner = Planner::new(store);
1495
1496        // ORDER BY n DESC
1497        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1498            items: vec![ReturnItem {
1499                expression: LogicalExpression::Variable("n".to_string()),
1500                alias: None,
1501            }],
1502            distinct: false,
1503            input: Box::new(LogicalOperator::Sort(SortOp {
1504                keys: vec![SortKey {
1505                    expression: LogicalExpression::Variable("n".to_string()),
1506                    order: SortOrder::Descending,
1507                    nulls: None,
1508                }],
1509                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1510                    variable: "n".to_string(),
1511                    label: None,
1512                    input: None,
1513                })),
1514            })),
1515        }));
1516
1517        let physical = planner.plan(&logical).unwrap();
1518        assert_eq!(physical.columns(), &["n"]);
1519    }
1520
1521    #[test]
1522    fn test_plan_distinct() {
1523        let store = create_test_store();
1524        let planner = Planner::new(store);
1525
1526        // MATCH (n) RETURN DISTINCT n
1527        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1528            items: vec![ReturnItem {
1529                expression: LogicalExpression::Variable("n".to_string()),
1530                alias: None,
1531            }],
1532            distinct: false,
1533            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1534                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1535                    variable: "n".to_string(),
1536                    label: None,
1537                    input: None,
1538                })),
1539                columns: None,
1540            })),
1541        }));
1542
1543        let physical = planner.plan(&logical).unwrap();
1544        assert_eq!(physical.columns(), &["n"]);
1545    }
1546
1547    #[test]
1548    fn test_plan_distinct_with_columns() {
1549        let store = create_test_store();
1550        let planner = Planner::new(store);
1551
1552        // DISTINCT on specific columns (column-specific dedup)
1553        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1554            items: vec![ReturnItem {
1555                expression: LogicalExpression::Variable("n".to_string()),
1556                alias: None,
1557            }],
1558            distinct: false,
1559            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1560                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1561                    variable: "n".to_string(),
1562                    label: None,
1563                    input: None,
1564                })),
1565                columns: Some(vec!["n".to_string()]),
1566            })),
1567        }));
1568
1569        let physical = planner.plan(&logical).unwrap();
1570        assert_eq!(physical.columns(), &["n"]);
1571    }
1572
1573    #[test]
1574    fn test_plan_distinct_with_nonexistent_columns() {
1575        let store = create_test_store();
1576        let planner = Planner::new(store);
1577
1578        // When distinct columns don't match any output columns,
1579        // it falls back to full-row distinct.
1580        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1581            items: vec![ReturnItem {
1582                expression: LogicalExpression::Variable("n".to_string()),
1583                alias: None,
1584            }],
1585            distinct: false,
1586            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1587                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1588                    variable: "n".to_string(),
1589                    label: None,
1590                    input: None,
1591                })),
1592                columns: Some(vec!["nonexistent".to_string()]),
1593            })),
1594        }));
1595
1596        let physical = planner.plan(&logical).unwrap();
1597        assert_eq!(physical.columns(), &["n"]);
1598    }
1599
1600    // ==================== Aggregate Tests ====================
1601
1602    #[test]
1603    fn test_plan_aggregate_count() {
1604        let store = create_test_store();
1605        let planner = Planner::new(store);
1606
1607        // MATCH (n) RETURN count(n)
1608        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1609            items: vec![ReturnItem {
1610                expression: LogicalExpression::Variable("cnt".to_string()),
1611                alias: None,
1612            }],
1613            distinct: false,
1614            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1615                group_by: vec![],
1616                aggregates: vec![LogicalAggregateExpr {
1617                    function: LogicalAggregateFunction::Count,
1618                    expression: Some(LogicalExpression::Variable("n".to_string())),
1619                    expression2: None,
1620                    distinct: false,
1621                    alias: Some("cnt".to_string()),
1622                    percentile: None,
1623                    separator: None,
1624                }],
1625                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1626                    variable: "n".to_string(),
1627                    label: None,
1628                    input: None,
1629                })),
1630                having: None,
1631            })),
1632        }));
1633
1634        let physical = planner.plan(&logical).unwrap();
1635        assert!(physical.columns().contains(&"cnt".to_string()));
1636    }
1637
1638    #[test]
1639    fn test_plan_aggregate_with_group_by() {
1640        let store = create_test_store();
1641        let planner = Planner::new(store);
1642
1643        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1644        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1645            group_by: vec![LogicalExpression::Property {
1646                variable: "n".to_string(),
1647                property: "city".to_string(),
1648            }],
1649            aggregates: vec![LogicalAggregateExpr {
1650                function: LogicalAggregateFunction::Count,
1651                expression: Some(LogicalExpression::Variable("n".to_string())),
1652                expression2: None,
1653                distinct: false,
1654                alias: Some("cnt".to_string()),
1655                percentile: None,
1656                separator: None,
1657            }],
1658            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1659                variable: "n".to_string(),
1660                label: Some("Person".to_string()),
1661                input: None,
1662            })),
1663            having: None,
1664        }));
1665
1666        let physical = planner.plan(&logical).unwrap();
1667        assert_eq!(physical.columns().len(), 2);
1668    }
1669
1670    #[test]
1671    fn test_plan_aggregate_sum() {
1672        let store = create_test_store();
1673        let planner = Planner::new(store);
1674
1675        // SUM(n.value)
1676        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1677            group_by: vec![],
1678            aggregates: vec![LogicalAggregateExpr {
1679                function: LogicalAggregateFunction::Sum,
1680                expression: Some(LogicalExpression::Property {
1681                    variable: "n".to_string(),
1682                    property: "value".to_string(),
1683                }),
1684                expression2: None,
1685                distinct: false,
1686                alias: Some("total".to_string()),
1687                percentile: None,
1688                separator: None,
1689            }],
1690            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1691                variable: "n".to_string(),
1692                label: None,
1693                input: None,
1694            })),
1695            having: None,
1696        }));
1697
1698        let physical = planner.plan(&logical).unwrap();
1699        assert!(physical.columns().contains(&"total".to_string()));
1700    }
1701
1702    #[test]
1703    fn test_plan_aggregate_avg() {
1704        let store = create_test_store();
1705        let planner = Planner::new(store);
1706
1707        // AVG(n.score)
1708        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1709            group_by: vec![],
1710            aggregates: vec![LogicalAggregateExpr {
1711                function: LogicalAggregateFunction::Avg,
1712                expression: Some(LogicalExpression::Property {
1713                    variable: "n".to_string(),
1714                    property: "score".to_string(),
1715                }),
1716                expression2: None,
1717                distinct: false,
1718                alias: Some("average".to_string()),
1719                percentile: None,
1720                separator: None,
1721            }],
1722            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1723                variable: "n".to_string(),
1724                label: None,
1725                input: None,
1726            })),
1727            having: None,
1728        }));
1729
1730        let physical = planner.plan(&logical).unwrap();
1731        assert!(physical.columns().contains(&"average".to_string()));
1732    }
1733
1734    #[test]
1735    fn test_plan_aggregate_min_max() {
1736        let store = create_test_store();
1737        let planner = Planner::new(store);
1738
1739        // MIN(n.age), MAX(n.age)
1740        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1741            group_by: vec![],
1742            aggregates: vec![
1743                LogicalAggregateExpr {
1744                    function: LogicalAggregateFunction::Min,
1745                    expression: Some(LogicalExpression::Property {
1746                        variable: "n".to_string(),
1747                        property: "age".to_string(),
1748                    }),
1749                    expression2: None,
1750                    distinct: false,
1751                    alias: Some("youngest".to_string()),
1752                    percentile: None,
1753                    separator: None,
1754                },
1755                LogicalAggregateExpr {
1756                    function: LogicalAggregateFunction::Max,
1757                    expression: Some(LogicalExpression::Property {
1758                        variable: "n".to_string(),
1759                        property: "age".to_string(),
1760                    }),
1761                    expression2: None,
1762                    distinct: false,
1763                    alias: Some("oldest".to_string()),
1764                    percentile: None,
1765                    separator: None,
1766                },
1767            ],
1768            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1769                variable: "n".to_string(),
1770                label: None,
1771                input: None,
1772            })),
1773            having: None,
1774        }));
1775
1776        let physical = planner.plan(&logical).unwrap();
1777        assert!(physical.columns().contains(&"youngest".to_string()));
1778        assert!(physical.columns().contains(&"oldest".to_string()));
1779    }
1780
1781    // ==================== Join Tests ====================
1782
1783    #[test]
1784    fn test_plan_inner_join() {
1785        let store = create_test_store();
1786        let planner = Planner::new(store);
1787
1788        // Inner join between two scans
1789        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1790            items: vec![
1791                ReturnItem {
1792                    expression: LogicalExpression::Variable("a".to_string()),
1793                    alias: None,
1794                },
1795                ReturnItem {
1796                    expression: LogicalExpression::Variable("b".to_string()),
1797                    alias: None,
1798                },
1799            ],
1800            distinct: false,
1801            input: Box::new(LogicalOperator::Join(JoinOp {
1802                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1803                    variable: "a".to_string(),
1804                    label: Some("Person".to_string()),
1805                    input: None,
1806                })),
1807                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1808                    variable: "b".to_string(),
1809                    label: Some("Company".to_string()),
1810                    input: None,
1811                })),
1812                join_type: JoinType::Inner,
1813                conditions: vec![JoinCondition {
1814                    left: LogicalExpression::Variable("a".to_string()),
1815                    right: LogicalExpression::Variable("b".to_string()),
1816                }],
1817            })),
1818        }));
1819
1820        let physical = planner.plan(&logical).unwrap();
1821        assert!(physical.columns().contains(&"a".to_string()));
1822        assert!(physical.columns().contains(&"b".to_string()));
1823    }
1824
1825    #[test]
1826    fn test_plan_cross_join() {
1827        let store = create_test_store();
1828        let planner = Planner::new(store);
1829
1830        // Cross join (no conditions)
1831        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1832            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1833                variable: "a".to_string(),
1834                label: None,
1835                input: None,
1836            })),
1837            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1838                variable: "b".to_string(),
1839                label: None,
1840                input: None,
1841            })),
1842            join_type: JoinType::Cross,
1843            conditions: vec![],
1844        }));
1845
1846        let physical = planner.plan(&logical).unwrap();
1847        assert_eq!(physical.columns().len(), 2);
1848    }
1849
1850    #[test]
1851    fn test_plan_left_join() {
1852        let store = create_test_store();
1853        let planner = Planner::new(store);
1854
1855        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1856            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1857                variable: "a".to_string(),
1858                label: None,
1859                input: None,
1860            })),
1861            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1862                variable: "b".to_string(),
1863                label: None,
1864                input: None,
1865            })),
1866            join_type: JoinType::Left,
1867            conditions: vec![],
1868        }));
1869
1870        let physical = planner.plan(&logical).unwrap();
1871        assert_eq!(physical.columns().len(), 2);
1872    }
1873
1874    // ==================== Mutation Tests ====================
1875
1876    fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1877        let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStoreSearch>);
1878        p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1879        p
1880    }
1881
1882    #[test]
1883    fn test_plan_create_node() {
1884        let store = create_test_store();
1885        let planner = create_writable_planner(&store);
1886
1887        // CREATE (n:Person {name: 'Alix'})
1888        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1889            variable: "n".to_string(),
1890            labels: vec!["Person".to_string()],
1891            properties: vec![(
1892                "name".to_string(),
1893                LogicalExpression::Literal(Value::String("Alix".into())),
1894            )],
1895            input: None,
1896        }));
1897
1898        let physical = planner.plan(&logical).unwrap();
1899        assert!(physical.columns().contains(&"n".to_string()));
1900    }
1901
1902    #[test]
1903    fn test_plan_create_edge() {
1904        let store = create_test_store();
1905        let planner = create_writable_planner(&store);
1906
1907        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1908        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1909            variable: Some("r".to_string()),
1910            from_variable: "a".to_string(),
1911            to_variable: "b".to_string(),
1912            edge_type: "KNOWS".to_string(),
1913            properties: vec![],
1914            input: Box::new(LogicalOperator::Join(JoinOp {
1915                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1916                    variable: "a".to_string(),
1917                    label: None,
1918                    input: None,
1919                })),
1920                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1921                    variable: "b".to_string(),
1922                    label: None,
1923                    input: None,
1924                })),
1925                join_type: JoinType::Cross,
1926                conditions: vec![],
1927            })),
1928        }));
1929
1930        let physical = planner.plan(&logical).unwrap();
1931        assert!(physical.columns().contains(&"r".to_string()));
1932    }
1933
1934    #[test]
1935    fn test_plan_delete_node() {
1936        let store = create_test_store();
1937        let planner = create_writable_planner(&store);
1938
1939        // MATCH (n) DELETE n
1940        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1941            variable: "n".to_string(),
1942            detach: false,
1943            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1944                variable: "n".to_string(),
1945                label: None,
1946                input: None,
1947            })),
1948        }));
1949
1950        let physical = planner.plan(&logical).unwrap();
1951        assert!(physical.columns().contains(&"n".to_string()));
1952    }
1953
1954    // ==================== Error Cases ====================
1955
1956    #[test]
1957    fn test_plan_empty_errors() {
1958        let store = create_test_store();
1959        let planner = Planner::new(store);
1960
1961        let logical = LogicalPlan::new(LogicalOperator::Empty);
1962        let result = planner.plan(&logical);
1963        assert!(result.is_err());
1964    }
1965
1966    #[test]
1967    fn test_plan_missing_variable_in_return() {
1968        let store = create_test_store();
1969        let planner = Planner::new(store);
1970
1971        // Return variable that doesn't exist in input
1972        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1973            items: vec![ReturnItem {
1974                expression: LogicalExpression::Variable("missing".to_string()),
1975                alias: None,
1976            }],
1977            distinct: false,
1978            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1979                variable: "n".to_string(),
1980                label: None,
1981                input: None,
1982            })),
1983        }));
1984
1985        let result = planner.plan(&logical);
1986        assert!(result.is_err());
1987    }
1988
1989    // ==================== Helper Function Tests ====================
1990
1991    #[test]
1992    fn test_convert_binary_ops() {
1993        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1994        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1995        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1996        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1997        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1998        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1999        assert!(convert_binary_op(BinaryOp::And).is_ok());
2000        assert!(convert_binary_op(BinaryOp::Or).is_ok());
2001        assert!(convert_binary_op(BinaryOp::Add).is_ok());
2002        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2003        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2004        assert!(convert_binary_op(BinaryOp::Div).is_ok());
2005    }
2006
2007    #[test]
2008    fn test_convert_unary_ops() {
2009        assert!(convert_unary_op(UnaryOp::Not).is_ok());
2010        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2011        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2012        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2013    }
2014
2015    #[test]
2016    fn test_convert_aggregate_functions() {
2017        assert!(matches!(
2018            convert_aggregate_function(LogicalAggregateFunction::Count),
2019            PhysicalAggregateFunction::Count
2020        ));
2021        assert!(matches!(
2022            convert_aggregate_function(LogicalAggregateFunction::Sum),
2023            PhysicalAggregateFunction::Sum
2024        ));
2025        assert!(matches!(
2026            convert_aggregate_function(LogicalAggregateFunction::Avg),
2027            PhysicalAggregateFunction::Avg
2028        ));
2029        assert!(matches!(
2030            convert_aggregate_function(LogicalAggregateFunction::Min),
2031            PhysicalAggregateFunction::Min
2032        ));
2033        assert!(matches!(
2034            convert_aggregate_function(LogicalAggregateFunction::Max),
2035            PhysicalAggregateFunction::Max
2036        ));
2037    }
2038
2039    #[test]
2040    fn test_planner_accessors() {
2041        let store = create_test_store();
2042        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2043
2044        assert!(planner.transaction_id().is_none());
2045        assert!(planner.transaction_manager().is_none());
2046        let _ = planner.viewing_epoch(); // Just ensure it's accessible
2047    }
2048
2049    #[test]
2050    fn test_physical_plan_accessors() {
2051        let store = create_test_store();
2052        let planner = Planner::new(store);
2053
2054        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2055            variable: "n".to_string(),
2056            label: None,
2057            input: None,
2058        }));
2059
2060        let physical = planner.plan(&logical).unwrap();
2061        assert_eq!(physical.columns(), &["n"]);
2062
2063        // Test into_operator
2064        let _ = physical.into_operator();
2065    }
2066
2067    // ==================== Adaptive Planning Tests ====================
2068
2069    #[test]
2070    fn test_plan_adaptive_with_scan() {
2071        let store = create_test_store();
2072        let planner = Planner::new(store);
2073
2074        // MATCH (n:Person) RETURN n
2075        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2076            items: vec![ReturnItem {
2077                expression: LogicalExpression::Variable("n".to_string()),
2078                alias: None,
2079            }],
2080            distinct: false,
2081            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2082                variable: "n".to_string(),
2083                label: Some("Person".to_string()),
2084                input: None,
2085            })),
2086        }));
2087
2088        let physical = planner.plan_adaptive(&logical).unwrap();
2089        assert_eq!(physical.columns(), &["n"]);
2090        // Should have adaptive context with estimates
2091        assert!(physical.adaptive_context.is_some());
2092    }
2093
2094    #[test]
2095    fn test_plan_adaptive_with_filter() {
2096        let store = create_test_store();
2097        let planner = Planner::new(store);
2098
2099        // MATCH (n) WHERE n.age > 30 RETURN n
2100        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2101            items: vec![ReturnItem {
2102                expression: LogicalExpression::Variable("n".to_string()),
2103                alias: None,
2104            }],
2105            distinct: false,
2106            input: Box::new(LogicalOperator::Filter(FilterOp {
2107                predicate: LogicalExpression::Binary {
2108                    left: Box::new(LogicalExpression::Property {
2109                        variable: "n".to_string(),
2110                        property: "age".to_string(),
2111                    }),
2112                    op: BinaryOp::Gt,
2113                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2114                },
2115                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2116                    variable: "n".to_string(),
2117                    label: None,
2118                    input: None,
2119                })),
2120                pushdown_hint: None,
2121            })),
2122        }));
2123
2124        let physical = planner.plan_adaptive(&logical).unwrap();
2125        assert!(physical.adaptive_context.is_some());
2126    }
2127
2128    #[test]
2129    fn test_plan_adaptive_with_expand() {
2130        let store = create_test_store();
2131        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2132            .with_factorized_execution(false);
2133
2134        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
2135        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2136            items: vec![
2137                ReturnItem {
2138                    expression: LogicalExpression::Variable("a".to_string()),
2139                    alias: None,
2140                },
2141                ReturnItem {
2142                    expression: LogicalExpression::Variable("b".to_string()),
2143                    alias: None,
2144                },
2145            ],
2146            distinct: false,
2147            input: Box::new(LogicalOperator::Expand(ExpandOp {
2148                from_variable: "a".to_string(),
2149                to_variable: "b".to_string(),
2150                edge_variable: None,
2151                direction: ExpandDirection::Outgoing,
2152                edge_types: vec!["KNOWS".to_string()],
2153                min_hops: 1,
2154                max_hops: Some(1),
2155                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2156                    variable: "a".to_string(),
2157                    label: None,
2158                    input: None,
2159                })),
2160                path_alias: None,
2161                path_mode: PathMode::Walk,
2162            })),
2163        }));
2164
2165        let physical = planner.plan_adaptive(&logical).unwrap();
2166        assert!(physical.adaptive_context.is_some());
2167    }
2168
2169    #[test]
2170    fn test_plan_adaptive_with_join() {
2171        let store = create_test_store();
2172        let planner = Planner::new(store);
2173
2174        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2175            items: vec![
2176                ReturnItem {
2177                    expression: LogicalExpression::Variable("a".to_string()),
2178                    alias: None,
2179                },
2180                ReturnItem {
2181                    expression: LogicalExpression::Variable("b".to_string()),
2182                    alias: None,
2183                },
2184            ],
2185            distinct: false,
2186            input: Box::new(LogicalOperator::Join(JoinOp {
2187                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2188                    variable: "a".to_string(),
2189                    label: None,
2190                    input: None,
2191                })),
2192                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2193                    variable: "b".to_string(),
2194                    label: None,
2195                    input: None,
2196                })),
2197                join_type: JoinType::Cross,
2198                conditions: vec![],
2199            })),
2200        }));
2201
2202        let physical = planner.plan_adaptive(&logical).unwrap();
2203        assert!(physical.adaptive_context.is_some());
2204    }
2205
2206    #[test]
2207    fn test_plan_adaptive_with_aggregate() {
2208        let store = create_test_store();
2209        let planner = Planner::new(store);
2210
2211        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2212            group_by: vec![],
2213            aggregates: vec![LogicalAggregateExpr {
2214                function: LogicalAggregateFunction::Count,
2215                expression: Some(LogicalExpression::Variable("n".to_string())),
2216                expression2: None,
2217                distinct: false,
2218                alias: Some("cnt".to_string()),
2219                percentile: None,
2220                separator: None,
2221            }],
2222            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2223                variable: "n".to_string(),
2224                label: None,
2225                input: None,
2226            })),
2227            having: None,
2228        }));
2229
2230        let physical = planner.plan_adaptive(&logical).unwrap();
2231        assert!(physical.adaptive_context.is_some());
2232    }
2233
2234    #[test]
2235    fn test_plan_adaptive_with_distinct() {
2236        let store = create_test_store();
2237        let planner = Planner::new(store);
2238
2239        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2240            items: vec![ReturnItem {
2241                expression: LogicalExpression::Variable("n".to_string()),
2242                alias: None,
2243            }],
2244            distinct: false,
2245            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2246                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2247                    variable: "n".to_string(),
2248                    label: None,
2249                    input: None,
2250                })),
2251                columns: None,
2252            })),
2253        }));
2254
2255        let physical = planner.plan_adaptive(&logical).unwrap();
2256        assert!(physical.adaptive_context.is_some());
2257    }
2258
2259    #[test]
2260    fn test_plan_adaptive_with_limit() {
2261        let store = create_test_store();
2262        let planner = Planner::new(store);
2263
2264        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2265            items: vec![ReturnItem {
2266                expression: LogicalExpression::Variable("n".to_string()),
2267                alias: None,
2268            }],
2269            distinct: false,
2270            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2271                count: 10.into(),
2272                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2273                    variable: "n".to_string(),
2274                    label: None,
2275                    input: None,
2276                })),
2277            })),
2278        }));
2279
2280        let physical = planner.plan_adaptive(&logical).unwrap();
2281        assert!(physical.adaptive_context.is_some());
2282    }
2283
2284    #[test]
2285    fn test_plan_adaptive_with_skip() {
2286        let store = create_test_store();
2287        let planner = Planner::new(store);
2288
2289        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2290            items: vec![ReturnItem {
2291                expression: LogicalExpression::Variable("n".to_string()),
2292                alias: None,
2293            }],
2294            distinct: false,
2295            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2296                count: 5.into(),
2297                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2298                    variable: "n".to_string(),
2299                    label: None,
2300                    input: None,
2301                })),
2302            })),
2303        }));
2304
2305        let physical = planner.plan_adaptive(&logical).unwrap();
2306        assert!(physical.adaptive_context.is_some());
2307    }
2308
2309    #[test]
2310    fn test_plan_adaptive_with_sort() {
2311        let store = create_test_store();
2312        let planner = Planner::new(store);
2313
2314        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2315            items: vec![ReturnItem {
2316                expression: LogicalExpression::Variable("n".to_string()),
2317                alias: None,
2318            }],
2319            distinct: false,
2320            input: Box::new(LogicalOperator::Sort(SortOp {
2321                keys: vec![SortKey {
2322                    expression: LogicalExpression::Variable("n".to_string()),
2323                    order: SortOrder::Ascending,
2324                    nulls: None,
2325                }],
2326                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2327                    variable: "n".to_string(),
2328                    label: None,
2329                    input: None,
2330                })),
2331            })),
2332        }));
2333
2334        let physical = planner.plan_adaptive(&logical).unwrap();
2335        assert!(physical.adaptive_context.is_some());
2336    }
2337
2338    #[test]
2339    fn test_plan_adaptive_with_union() {
2340        let store = create_test_store();
2341        let planner = Planner::new(store);
2342
2343        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2344            items: vec![ReturnItem {
2345                expression: LogicalExpression::Variable("n".to_string()),
2346                alias: None,
2347            }],
2348            distinct: false,
2349            input: Box::new(LogicalOperator::Union(UnionOp {
2350                inputs: vec![
2351                    LogicalOperator::NodeScan(NodeScanOp {
2352                        variable: "n".to_string(),
2353                        label: Some("Person".to_string()),
2354                        input: None,
2355                    }),
2356                    LogicalOperator::NodeScan(NodeScanOp {
2357                        variable: "n".to_string(),
2358                        label: Some("Company".to_string()),
2359                        input: None,
2360                    }),
2361                ],
2362            })),
2363        }));
2364
2365        let physical = planner.plan_adaptive(&logical).unwrap();
2366        assert!(physical.adaptive_context.is_some());
2367    }
2368
2369    // ==================== Variable Length Path Tests ====================
2370
2371    #[test]
2372    fn test_plan_expand_variable_length() {
2373        let store = create_test_store();
2374        let planner = Planner::new(store);
2375
2376        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2377        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2378            items: vec![
2379                ReturnItem {
2380                    expression: LogicalExpression::Variable("a".to_string()),
2381                    alias: None,
2382                },
2383                ReturnItem {
2384                    expression: LogicalExpression::Variable("b".to_string()),
2385                    alias: None,
2386                },
2387            ],
2388            distinct: false,
2389            input: Box::new(LogicalOperator::Expand(ExpandOp {
2390                from_variable: "a".to_string(),
2391                to_variable: "b".to_string(),
2392                edge_variable: None,
2393                direction: ExpandDirection::Outgoing,
2394                edge_types: vec!["KNOWS".to_string()],
2395                min_hops: 1,
2396                max_hops: Some(3),
2397                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2398                    variable: "a".to_string(),
2399                    label: None,
2400                    input: None,
2401                })),
2402                path_alias: None,
2403                path_mode: PathMode::Walk,
2404            })),
2405        }));
2406
2407        let physical = planner.plan(&logical).unwrap();
2408        assert!(physical.columns().contains(&"a".to_string()));
2409        assert!(physical.columns().contains(&"b".to_string()));
2410    }
2411
2412    #[test]
2413    fn test_plan_expand_with_path_alias() {
2414        let store = create_test_store();
2415        let planner = Planner::new(store);
2416
2417        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2418        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2419            items: vec![
2420                ReturnItem {
2421                    expression: LogicalExpression::Variable("a".to_string()),
2422                    alias: None,
2423                },
2424                ReturnItem {
2425                    expression: LogicalExpression::Variable("b".to_string()),
2426                    alias: None,
2427                },
2428            ],
2429            distinct: false,
2430            input: Box::new(LogicalOperator::Expand(ExpandOp {
2431                from_variable: "a".to_string(),
2432                to_variable: "b".to_string(),
2433                edge_variable: None,
2434                direction: ExpandDirection::Outgoing,
2435                edge_types: vec!["KNOWS".to_string()],
2436                min_hops: 1,
2437                max_hops: Some(3),
2438                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2439                    variable: "a".to_string(),
2440                    label: None,
2441                    input: None,
2442                })),
2443                path_alias: Some("p".to_string()),
2444                path_mode: PathMode::Walk,
2445            })),
2446        }));
2447
2448        let physical = planner.plan(&logical).unwrap();
2449        // Verify plan was created successfully with expected output columns
2450        assert!(physical.columns().contains(&"a".to_string()));
2451        assert!(physical.columns().contains(&"b".to_string()));
2452    }
2453
2454    #[test]
2455    fn test_plan_expand_incoming() {
2456        let store = create_test_store();
2457        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2458            .with_factorized_execution(false);
2459
2460        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2461        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2462            items: vec![
2463                ReturnItem {
2464                    expression: LogicalExpression::Variable("a".to_string()),
2465                    alias: None,
2466                },
2467                ReturnItem {
2468                    expression: LogicalExpression::Variable("b".to_string()),
2469                    alias: None,
2470                },
2471            ],
2472            distinct: false,
2473            input: Box::new(LogicalOperator::Expand(ExpandOp {
2474                from_variable: "a".to_string(),
2475                to_variable: "b".to_string(),
2476                edge_variable: None,
2477                direction: ExpandDirection::Incoming,
2478                edge_types: vec!["KNOWS".to_string()],
2479                min_hops: 1,
2480                max_hops: Some(1),
2481                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2482                    variable: "a".to_string(),
2483                    label: None,
2484                    input: None,
2485                })),
2486                path_alias: None,
2487                path_mode: PathMode::Walk,
2488            })),
2489        }));
2490
2491        let physical = planner.plan(&logical).unwrap();
2492        assert!(physical.columns().contains(&"a".to_string()));
2493        assert!(physical.columns().contains(&"b".to_string()));
2494    }
2495
2496    #[test]
2497    fn test_plan_expand_both_directions() {
2498        let store = create_test_store();
2499        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2500            .with_factorized_execution(false);
2501
2502        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2503        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2504            items: vec![
2505                ReturnItem {
2506                    expression: LogicalExpression::Variable("a".to_string()),
2507                    alias: None,
2508                },
2509                ReturnItem {
2510                    expression: LogicalExpression::Variable("b".to_string()),
2511                    alias: None,
2512                },
2513            ],
2514            distinct: false,
2515            input: Box::new(LogicalOperator::Expand(ExpandOp {
2516                from_variable: "a".to_string(),
2517                to_variable: "b".to_string(),
2518                edge_variable: None,
2519                direction: ExpandDirection::Both,
2520                edge_types: vec!["KNOWS".to_string()],
2521                min_hops: 1,
2522                max_hops: Some(1),
2523                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2524                    variable: "a".to_string(),
2525                    label: None,
2526                    input: None,
2527                })),
2528                path_alias: None,
2529                path_mode: PathMode::Walk,
2530            })),
2531        }));
2532
2533        let physical = planner.plan(&logical).unwrap();
2534        assert!(physical.columns().contains(&"a".to_string()));
2535        assert!(physical.columns().contains(&"b".to_string()));
2536    }
2537
2538    // ==================== With Context Tests ====================
2539
2540    #[test]
2541    fn test_planner_with_context() {
2542        use crate::transaction::TransactionManager;
2543
2544        let store = create_test_store();
2545        let transaction_manager = Arc::new(TransactionManager::new());
2546        let transaction_id = transaction_manager.begin();
2547        let epoch = transaction_manager.current_epoch();
2548
2549        let planner = Planner::with_context(
2550            Arc::clone(&store) as Arc<dyn GraphStoreSearch>,
2551            Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2552            Arc::clone(&transaction_manager),
2553            Some(transaction_id),
2554            epoch,
2555        );
2556
2557        assert_eq!(planner.transaction_id(), Some(transaction_id));
2558        assert!(planner.transaction_manager().is_some());
2559        assert_eq!(planner.viewing_epoch(), epoch);
2560    }
2561
2562    #[test]
2563    fn test_planner_with_factorized_execution_disabled() {
2564        let store = create_test_store();
2565        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2566            .with_factorized_execution(false);
2567
2568        // Two consecutive expands - should NOT use factorized execution
2569        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2570            items: vec![
2571                ReturnItem {
2572                    expression: LogicalExpression::Variable("a".to_string()),
2573                    alias: None,
2574                },
2575                ReturnItem {
2576                    expression: LogicalExpression::Variable("c".to_string()),
2577                    alias: None,
2578                },
2579            ],
2580            distinct: false,
2581            input: Box::new(LogicalOperator::Expand(ExpandOp {
2582                from_variable: "b".to_string(),
2583                to_variable: "c".to_string(),
2584                edge_variable: None,
2585                direction: ExpandDirection::Outgoing,
2586                edge_types: vec![],
2587                min_hops: 1,
2588                max_hops: Some(1),
2589                input: Box::new(LogicalOperator::Expand(ExpandOp {
2590                    from_variable: "a".to_string(),
2591                    to_variable: "b".to_string(),
2592                    edge_variable: None,
2593                    direction: ExpandDirection::Outgoing,
2594                    edge_types: vec![],
2595                    min_hops: 1,
2596                    max_hops: Some(1),
2597                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2598                        variable: "a".to_string(),
2599                        label: None,
2600                        input: None,
2601                    })),
2602                    path_alias: None,
2603                    path_mode: PathMode::Walk,
2604                })),
2605                path_alias: None,
2606                path_mode: PathMode::Walk,
2607            })),
2608        }));
2609
2610        let physical = planner.plan(&logical).unwrap();
2611        assert!(physical.columns().contains(&"a".to_string()));
2612        assert!(physical.columns().contains(&"c".to_string()));
2613    }
2614
2615    // ==================== Sort with Property Tests ====================
2616
2617    #[test]
2618    fn test_plan_sort_by_property() {
2619        let store = create_test_store();
2620        let planner = Planner::new(store);
2621
2622        // MATCH (n) RETURN n ORDER BY n.name ASC
2623        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2624            items: vec![ReturnItem {
2625                expression: LogicalExpression::Variable("n".to_string()),
2626                alias: None,
2627            }],
2628            distinct: false,
2629            input: Box::new(LogicalOperator::Sort(SortOp {
2630                keys: vec![SortKey {
2631                    expression: LogicalExpression::Property {
2632                        variable: "n".to_string(),
2633                        property: "name".to_string(),
2634                    },
2635                    order: SortOrder::Ascending,
2636                    nulls: None,
2637                }],
2638                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2639                    variable: "n".to_string(),
2640                    label: None,
2641                    input: None,
2642                })),
2643            })),
2644        }));
2645
2646        let physical = planner.plan(&logical).unwrap();
2647        // Should have the property column projected
2648        assert!(physical.columns().contains(&"n".to_string()));
2649    }
2650
2651    // ==================== Scan with Input Tests ====================
2652
2653    #[test]
2654    fn test_plan_scan_with_input() {
2655        let store = create_test_store();
2656        let planner = Planner::new(store);
2657
2658        // A scan with another scan as input (for chained patterns)
2659        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2660            items: vec![
2661                ReturnItem {
2662                    expression: LogicalExpression::Variable("a".to_string()),
2663                    alias: None,
2664                },
2665                ReturnItem {
2666                    expression: LogicalExpression::Variable("b".to_string()),
2667                    alias: None,
2668                },
2669            ],
2670            distinct: false,
2671            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2672                variable: "b".to_string(),
2673                label: Some("Company".to_string()),
2674                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2675                    variable: "a".to_string(),
2676                    label: Some("Person".to_string()),
2677                    input: None,
2678                }))),
2679            })),
2680        }));
2681
2682        let physical = planner.plan(&logical).unwrap();
2683        assert!(physical.columns().contains(&"a".to_string()));
2684        assert!(physical.columns().contains(&"b".to_string()));
2685    }
2686
2687    // ==================== Additional Coverage Tests ====================
2688    //
2689    // These tests target branches that were not exercised by the original
2690    // planner tests: builder methods, read-only flag, profiled planning,
2691    // unsupported operator error paths, and the dispatch branches for every
2692    // plan_* function reachable through plan_operator.
2693
2694    use crate::catalog::Catalog;
2695    use crate::query::plan::{
2696        AddLabelOp, AntiJoinOp, ApplyOp, BindOp, DeleteEdgeOp, EdgeScanOp, ExceptOp,
2697        HorizontalAggregateOp, IntersectOp, LeftJoinOp, LoadDataFormat, LoadDataOp, MapCollectOp,
2698        MergeOp, MergeRelationshipOp, MultiWayJoinOp, OtherwiseOp, ParameterScanOp, RemoveLabelOp,
2699        SetPropertyOp, ShortestPathOp, TripleComponent, TripleScanOp, UnionOp, UnwindOp,
2700    };
2701    use grafeo_core::execution::operators::{Operator, SessionContext};
2702
2703    fn full_store() -> Arc<LpgStore> {
2704        // Richer store so expand and shortest path tests have real data.
2705        let store = Arc::new(LpgStore::new().unwrap());
2706        let vincent = store.create_node(&["Person"]);
2707        let jules = store.create_node(&["Person"]);
2708        let mia = store.create_node(&["Person"]);
2709        let _company = store.create_node(&["Company"]);
2710        store.create_edge(vincent, jules, "KNOWS");
2711        store.create_edge(jules, mia, "KNOWS");
2712        store
2713    }
2714
2715    fn scan_person(var: &str) -> LogicalOperator {
2716        LogicalOperator::NodeScan(NodeScanOp {
2717            variable: var.to_string(),
2718            label: Some("Person".to_string()),
2719            input: None,
2720        })
2721    }
2722
2723    fn scan_any(var: &str) -> LogicalOperator {
2724        LogicalOperator::NodeScan(NodeScanOp {
2725            variable: var.to_string(),
2726            label: None,
2727            input: None,
2728        })
2729    }
2730
2731    // ==================== Builder Methods ====================
2732
2733    #[test]
2734    fn test_with_read_only_flag() {
2735        let store = create_test_store();
2736        let planner =
2737            Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(true);
2738        assert!(planner.read_only);
2739
2740        let planner_off =
2741            Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>).with_read_only(false);
2742        assert!(!planner_off.read_only);
2743    }
2744
2745    #[test]
2746    fn test_with_catalog() {
2747        let store = create_test_store();
2748        let catalog = Arc::new(Catalog::new());
2749        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2750            .with_catalog(Arc::clone(&catalog));
2751        assert!(planner.catalog.is_some());
2752    }
2753
2754    #[test]
2755    fn test_with_session_context() {
2756        let store = create_test_store();
2757        let context = SessionContext {
2758            current_schema: Some("public".to_string()),
2759            current_graph: Some("main".to_string()),
2760            ..SessionContext::default()
2761        };
2762        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>)
2763            .with_session_context(context);
2764        assert_eq!(
2765            planner.session_context.current_schema.as_deref(),
2766            Some("public")
2767        );
2768        assert_eq!(
2769            planner.session_context.current_graph.as_deref(),
2770            Some("main")
2771        );
2772    }
2773
2774    // ==================== register_edge_column ====================
2775
2776    #[test]
2777    fn test_register_edge_column_named() {
2778        let store = create_test_store();
2779        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2780        let name = planner.register_edge_column(&Some("r".to_string()));
2781        assert_eq!(name, "r");
2782        assert!(planner.edge_columns.borrow().contains("r"));
2783    }
2784
2785    #[test]
2786    fn test_register_edge_column_anonymous_counter_advances() {
2787        let store = create_test_store();
2788        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
2789        let a = planner.register_edge_column(&None);
2790        let b = planner.register_edge_column(&None);
2791        assert_eq!(a, "_anon_edge_0");
2792        assert_eq!(b, "_anon_edge_1");
2793        assert!(planner.edge_columns.borrow().contains("_anon_edge_0"));
2794        assert!(planner.edge_columns.borrow().contains("_anon_edge_1"));
2795    }
2796
2797    // ==================== write_store() error path ====================
2798
2799    #[test]
2800    fn test_create_node_without_write_store_errors() {
2801        // Read-only planner: CREATE should fail with ReadOnly transaction error.
2802        let store = create_test_store();
2803        let planner = Planner::new(store);
2804
2805        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2806            variable: "n".to_string(),
2807            labels: vec!["Person".to_string()],
2808            properties: vec![],
2809            input: None,
2810        }));
2811
2812        let result = planner.plan(&logical);
2813        assert!(result.is_err());
2814    }
2815
2816    // ==================== plan_profiled ====================
2817
2818    #[test]
2819    fn test_plan_profiled_collects_entries() {
2820        let store = create_test_store();
2821        let planner = Planner::new(store);
2822
2823        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2824            items: vec![ReturnItem {
2825                expression: LogicalExpression::Variable("n".to_string()),
2826                alias: None,
2827            }],
2828            distinct: false,
2829            input: Box::new(scan_person("n")),
2830        }));
2831
2832        let (physical, entries) = planner.plan_profiled(&logical).unwrap();
2833        assert_eq!(physical.columns(), &["n"]);
2834        // Post-order: scan, then return (at least two entries).
2835        assert!(
2836            entries.len() >= 2,
2837            "expected entries, got {}",
2838            entries.len()
2839        );
2840        // After profiling, the internal flag is cleared.
2841        assert!(!planner.profiling.get());
2842    }
2843
2844    #[test]
2845    fn test_plan_profiled_propagates_plan_errors() {
2846        let store = create_test_store();
2847        let planner = Planner::new(store);
2848        let logical = LogicalPlan::new(LogicalOperator::Empty);
2849        let result = planner.plan_profiled(&logical);
2850        assert!(result.is_err());
2851        // Profiling must still be reset to false even on error.
2852        assert!(!planner.profiling.get());
2853    }
2854
2855    // ==================== Unsupported operator error paths ====================
2856
2857    #[test]
2858    fn test_plan_edge_scan_is_unsupported() {
2859        // LPG planner does not handle bare EdgeScan; this hits the catch-all branch.
2860        let store = create_test_store();
2861        let planner = Planner::new(store);
2862        let logical = LogicalPlan::new(LogicalOperator::EdgeScan(EdgeScanOp {
2863            variable: "e".to_string(),
2864            edge_types: vec![],
2865            input: None,
2866        }));
2867        let err = planner.plan(&logical).err().expect("plan should fail");
2868        assert!(format!("{err}").contains("Unsupported operator"));
2869    }
2870
2871    #[test]
2872    fn test_plan_triple_scan_is_unsupported() {
2873        let store = create_test_store();
2874        let planner = Planner::new(store);
2875        let logical = LogicalPlan::new(LogicalOperator::TripleScan(TripleScanOp {
2876            subject: TripleComponent::Variable("s".to_string()),
2877            predicate: TripleComponent::Variable("p".to_string()),
2878            object: TripleComponent::Variable("o".to_string()),
2879            graph: None,
2880            input: None,
2881            dataset: None,
2882        }));
2883        assert!(planner.plan(&logical).is_err());
2884    }
2885
2886    #[test]
2887    fn test_plan_bind_is_unsupported() {
2888        let store = create_test_store();
2889        let planner = Planner::new(store);
2890        let logical = LogicalPlan::new(LogicalOperator::Bind(BindOp {
2891            expression: LogicalExpression::Literal(Value::Int64(1)),
2892            variable: "x".to_string(),
2893            input: Box::new(scan_any("n")),
2894        }));
2895        assert!(planner.plan(&logical).is_err());
2896    }
2897
2898    #[test]
2899    fn test_plan_parameter_scan_without_apply_errors() {
2900        let store = create_test_store();
2901        let planner = Planner::new(store);
2902        let logical = LogicalPlan::new(LogicalOperator::ParameterScan(ParameterScanOp {
2903            columns: vec!["n".to_string()],
2904        }));
2905        let err = planner.plan(&logical).err().expect("plan should fail");
2906        assert!(format!("{err}").contains("ParameterScan"));
2907    }
2908
2909    // ==================== plan_operator dispatch branches ====================
2910
2911    #[test]
2912    fn test_plan_union_dispatch() {
2913        let store = create_test_store();
2914        let planner = Planner::new(store);
2915        let logical = LogicalPlan::new(LogicalOperator::Union(UnionOp {
2916            inputs: vec![scan_person("n"), scan_person("n")],
2917        }));
2918        let physical = planner.plan(&logical).unwrap();
2919        assert_eq!(physical.columns(), &["n"]);
2920    }
2921
2922    #[test]
2923    fn test_plan_except_dispatch() {
2924        let store = create_test_store();
2925        let planner = Planner::new(store);
2926        let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
2927            left: Box::new(scan_person("n")),
2928            right: Box::new(scan_person("n")),
2929            all: false,
2930        }));
2931        let physical = planner.plan(&logical).unwrap();
2932        assert_eq!(physical.columns(), &["n"]);
2933    }
2934
2935    #[test]
2936    fn test_plan_intersect_dispatch() {
2937        let store = create_test_store();
2938        let planner = Planner::new(store);
2939        let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
2940            left: Box::new(scan_person("n")),
2941            right: Box::new(scan_person("n")),
2942            all: false,
2943        }));
2944        let physical = planner.plan(&logical).unwrap();
2945        assert_eq!(physical.columns(), &["n"]);
2946    }
2947
2948    #[test]
2949    fn test_plan_otherwise_dispatch() {
2950        let store = create_test_store();
2951        let planner = Planner::new(store);
2952        let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
2953            left: Box::new(scan_person("n")),
2954            right: Box::new(scan_any("n")),
2955        }));
2956        let physical = planner.plan(&logical).unwrap();
2957        assert_eq!(physical.columns(), &["n"]);
2958    }
2959
2960    #[test]
2961    fn test_plan_left_join_dispatch() {
2962        let store = create_test_store();
2963        let planner = Planner::new(store);
2964        let logical = LogicalPlan::new(LogicalOperator::LeftJoin(LeftJoinOp {
2965            left: Box::new(scan_any("a")),
2966            right: Box::new(scan_any("b")),
2967            condition: None,
2968        }));
2969        let physical = planner.plan(&logical).unwrap();
2970        assert!(physical.columns().contains(&"a".to_string()));
2971        assert!(physical.columns().contains(&"b".to_string()));
2972    }
2973
2974    #[test]
2975    fn test_plan_anti_join_dispatch() {
2976        let store = create_test_store();
2977        let planner = Planner::new(store);
2978        let logical = LogicalPlan::new(LogicalOperator::AntiJoin(AntiJoinOp {
2979            left: Box::new(scan_any("a")),
2980            right: Box::new(scan_any("b")),
2981        }));
2982        let physical = planner.plan(&logical).unwrap();
2983        assert!(physical.columns().contains(&"a".to_string()));
2984    }
2985
2986    #[test]
2987    fn test_plan_apply_uncorrelated_dispatch() {
2988        let store = create_test_store();
2989        let planner = Planner::new(store);
2990        let logical = LogicalPlan::new(LogicalOperator::Apply(ApplyOp {
2991            input: Box::new(scan_any("a")),
2992            subplan: Box::new(scan_any("b")),
2993            shared_variables: vec![],
2994            optional: false,
2995        }));
2996        let physical = planner.plan(&logical).unwrap();
2997        assert!(physical.columns().contains(&"a".to_string()));
2998        assert!(physical.columns().contains(&"b".to_string()));
2999    }
3000
3001    #[test]
3002    fn test_plan_unwind_literal_list() {
3003        let store = create_test_store();
3004        let planner = Planner::new(store);
3005
3006        // UNWIND [1,2,3] AS x
3007        let logical = LogicalPlan::new(LogicalOperator::Unwind(UnwindOp {
3008            expression: LogicalExpression::List(vec![
3009                LogicalExpression::Literal(Value::Int64(1)),
3010                LogicalExpression::Literal(Value::Int64(2)),
3011                LogicalExpression::Literal(Value::Int64(3)),
3012            ]),
3013            variable: "x".to_string(),
3014            ordinality_var: None,
3015            offset_var: None,
3016            input: Box::new(LogicalOperator::Empty),
3017        }));
3018        let physical = planner.plan(&logical).unwrap();
3019        assert!(physical.columns().contains(&"x".to_string()));
3020    }
3021
3022    #[test]
3023    fn test_plan_merge_dispatch() {
3024        let store = create_test_store();
3025        let planner = create_writable_planner(&store);
3026
3027        // MERGE (n:Person)
3028        let logical = LogicalPlan::new(LogicalOperator::Merge(MergeOp {
3029            variable: "n".to_string(),
3030            labels: vec!["Person".to_string()],
3031            match_properties: vec![],
3032            on_create: vec![],
3033            on_match: vec![],
3034            input: Box::new(LogicalOperator::Empty),
3035        }));
3036        let physical = planner.plan(&logical).unwrap();
3037        assert!(physical.columns().contains(&"n".to_string()));
3038    }
3039
3040    #[test]
3041    fn test_plan_merge_relationship_dispatch() {
3042        let store = full_store();
3043        let planner = create_writable_planner(&store);
3044
3045        // MATCH (a:Person),(b:Person) MERGE (a)-[r:KNOWS]->(b)
3046        let logical = LogicalPlan::new(LogicalOperator::MergeRelationship(MergeRelationshipOp {
3047            variable: "r".to_string(),
3048            source_variable: "a".to_string(),
3049            target_variable: "b".to_string(),
3050            edge_type: "KNOWS".to_string(),
3051            match_properties: vec![],
3052            on_create: vec![],
3053            on_match: vec![],
3054            input: Box::new(LogicalOperator::Join(JoinOp {
3055                left: Box::new(scan_person("a")),
3056                right: Box::new(scan_person("b")),
3057                join_type: JoinType::Cross,
3058                conditions: vec![],
3059            })),
3060        }));
3061        let physical = planner.plan(&logical).unwrap();
3062        assert!(physical.columns().contains(&"r".to_string()));
3063    }
3064
3065    #[test]
3066    fn test_plan_add_label_dispatch() {
3067        let store = full_store();
3068        let planner = create_writable_planner(&store);
3069        let logical = LogicalPlan::new(LogicalOperator::AddLabel(AddLabelOp {
3070            variable: "n".to_string(),
3071            labels: vec!["VIP".to_string()],
3072            input: Box::new(scan_person("n")),
3073        }));
3074        let physical = planner.plan(&logical).unwrap();
3075        assert!(physical.columns().contains(&"labels_added".to_string()));
3076    }
3077
3078    #[test]
3079    fn test_plan_remove_label_dispatch() {
3080        let store = full_store();
3081        let planner = create_writable_planner(&store);
3082        let logical = LogicalPlan::new(LogicalOperator::RemoveLabel(RemoveLabelOp {
3083            variable: "n".to_string(),
3084            labels: vec!["Person".to_string()],
3085            input: Box::new(scan_person("n")),
3086        }));
3087        let physical = planner.plan(&logical).unwrap();
3088        assert!(physical.columns().contains(&"labels_removed".to_string()));
3089    }
3090
3091    #[test]
3092    fn test_plan_set_property_dispatch() {
3093        let store = full_store();
3094        let planner = create_writable_planner(&store);
3095        let logical = LogicalPlan::new(LogicalOperator::SetProperty(SetPropertyOp {
3096            variable: "n".to_string(),
3097            properties: vec![(
3098                "city".to_string(),
3099                LogicalExpression::Literal(Value::String("Amsterdam".into())),
3100            )],
3101            replace: false,
3102            is_edge: false,
3103            input: Box::new(scan_person("n")),
3104        }));
3105        let physical = planner.plan(&logical).unwrap();
3106        assert!(physical.columns().contains(&"n".to_string()));
3107    }
3108
3109    #[test]
3110    fn test_plan_delete_edge_dispatch() {
3111        let store = full_store();
3112        let planner = create_writable_planner(&store);
3113
3114        // Register the edge column first via an outgoing expand, then DELETE r.
3115        let expand_op = LogicalOperator::Expand(ExpandOp {
3116            from_variable: "a".to_string(),
3117            to_variable: "b".to_string(),
3118            edge_variable: Some("r".to_string()),
3119            direction: ExpandDirection::Outgoing,
3120            edge_types: vec!["KNOWS".to_string()],
3121            min_hops: 1,
3122            max_hops: Some(1),
3123            input: Box::new(scan_person("a")),
3124            path_alias: None,
3125            path_mode: PathMode::Walk,
3126        });
3127        let logical = LogicalPlan::new(LogicalOperator::DeleteEdge(DeleteEdgeOp {
3128            variable: "r".to_string(),
3129            input: Box::new(expand_op),
3130        }));
3131        let physical = planner.plan(&logical).unwrap();
3132        assert!(physical.columns().contains(&"r".to_string()));
3133    }
3134
3135    #[test]
3136    fn test_plan_shortest_path_dispatch() {
3137        let store = full_store();
3138        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3139
3140        // SHORTEST PATH (a)-(b)
3141        let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3142            input: Box::new(LogicalOperator::Join(JoinOp {
3143                left: Box::new(scan_person("a")),
3144                right: Box::new(scan_person("b")),
3145                join_type: JoinType::Cross,
3146                conditions: vec![],
3147            })),
3148            source_var: "a".to_string(),
3149            target_var: "b".to_string(),
3150            edge_types: vec!["KNOWS".to_string()],
3151            direction: ExpandDirection::Outgoing,
3152            path_alias: "p".to_string(),
3153            all_paths: false,
3154        }));
3155        let physical = planner.plan(&logical).unwrap();
3156        assert!(
3157            physical
3158                .columns()
3159                .iter()
3160                .any(|c| c.contains("_path_length_p"))
3161        );
3162    }
3163
3164    #[test]
3165    fn test_plan_shortest_path_missing_source_errors() {
3166        let store = full_store();
3167        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3168        let logical = LogicalPlan::new(LogicalOperator::ShortestPath(ShortestPathOp {
3169            input: Box::new(scan_person("a")),
3170            source_var: "missing".to_string(),
3171            target_var: "a".to_string(),
3172            edge_types: vec![],
3173            direction: ExpandDirection::Both,
3174            path_alias: "p".to_string(),
3175            all_paths: false,
3176        }));
3177        let err = planner.plan(&logical).err().expect("plan should fail");
3178        assert!(format!("{err}").contains("Source variable"));
3179    }
3180
3181    #[test]
3182    fn test_plan_map_collect_dispatch() {
3183        // Build rows with two columns named 'k' and 'v', then collect k->v into a map.
3184        let store = create_test_store();
3185        let planner = Planner::new(store);
3186        let input_with_kv = LogicalOperator::Project(crate::query::plan::ProjectOp {
3187            projections: vec![
3188                crate::query::plan::Projection {
3189                    expression: LogicalExpression::Literal(Value::String("key".into())),
3190                    alias: Some("k".to_string()),
3191                },
3192                crate::query::plan::Projection {
3193                    expression: LogicalExpression::Literal(Value::Int64(1)),
3194                    alias: Some("v".to_string()),
3195                },
3196            ],
3197            input: Box::new(scan_person("n")),
3198            pass_through_input: false,
3199        });
3200        let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3201            key_var: "k".to_string(),
3202            value_var: "v".to_string(),
3203            alias: "m".to_string(),
3204            input: Box::new(input_with_kv),
3205        }));
3206        let physical = planner.plan(&logical).unwrap();
3207        assert_eq!(physical.columns(), &["m"]);
3208    }
3209
3210    #[test]
3211    fn test_plan_map_collect_missing_key_errors() {
3212        let store = create_test_store();
3213        let planner = Planner::new(store);
3214        let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3215            key_var: "not_there".to_string(),
3216            value_var: "also_missing".to_string(),
3217            alias: "m".to_string(),
3218            input: Box::new(scan_any("n")),
3219        }));
3220        let err = planner.plan(&logical).err().expect("plan should fail");
3221        let msg = format!("{err}");
3222        assert!(msg.contains("MapCollect key"), "got: {msg}");
3223    }
3224
3225    #[test]
3226    fn test_plan_map_collect_missing_value_errors() {
3227        let store = create_test_store();
3228        let planner = Planner::new(store);
3229        // Input has column "n" so key resolves but value does not.
3230        let logical = LogicalPlan::new(LogicalOperator::MapCollect(MapCollectOp {
3231            key_var: "n".to_string(),
3232            value_var: "missing_value".to_string(),
3233            alias: "m".to_string(),
3234            input: Box::new(scan_any("n")),
3235        }));
3236        let err = planner.plan(&logical).err().expect("plan should fail");
3237        let msg = format!("{err}");
3238        assert!(msg.contains("MapCollect value"), "got: {msg}");
3239    }
3240
3241    #[test]
3242    fn test_plan_horizontal_aggregate_missing_column_errors() {
3243        let store = create_test_store();
3244        let planner = Planner::new(store);
3245        let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3246            HorizontalAggregateOp {
3247                list_column: "not_a_column".to_string(),
3248                entity_kind: crate::query::plan::EntityKind::Edge,
3249                function: LogicalAggregateFunction::Count,
3250                property: "age".to_string(),
3251                alias: "total".to_string(),
3252                input: Box::new(scan_any("n")),
3253            },
3254        ));
3255        let err = planner.plan(&logical).err().expect("plan should fail");
3256        assert!(format!("{err}").contains("HorizontalAggregate"));
3257    }
3258
3259    #[test]
3260    fn test_plan_load_data_dispatch() {
3261        let store = create_test_store();
3262        let planner = Planner::new(store);
3263        // Path does not need to exist: planning just builds the operator.
3264        let logical = LogicalPlan::new(LogicalOperator::LoadData(LoadDataOp {
3265            format: LoadDataFormat::Csv,
3266            with_headers: true,
3267            path: "/nonexistent/data.csv".to_string(),
3268            variable: "row".to_string(),
3269            field_terminator: Some(','),
3270        }));
3271        let physical = planner.plan(&logical).unwrap();
3272        assert_eq!(physical.columns(), &["row"]);
3273    }
3274
3275    #[test]
3276    fn test_plan_multi_way_join_dispatch() {
3277        // Three-way join over three expand inputs. Some configurations may
3278        // error during planning; we just require no panic.
3279        let store = full_store();
3280        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3281        let ab = LogicalOperator::Expand(ExpandOp {
3282            from_variable: "a".to_string(),
3283            to_variable: "b".to_string(),
3284            edge_variable: None,
3285            direction: ExpandDirection::Outgoing,
3286            edge_types: vec!["KNOWS".to_string()],
3287            min_hops: 1,
3288            max_hops: Some(1),
3289            input: Box::new(scan_person("a")),
3290            path_alias: None,
3291            path_mode: PathMode::Walk,
3292        });
3293        let bc = LogicalOperator::Expand(ExpandOp {
3294            from_variable: "b".to_string(),
3295            to_variable: "c".to_string(),
3296            edge_variable: None,
3297            direction: ExpandDirection::Outgoing,
3298            edge_types: vec!["KNOWS".to_string()],
3299            min_hops: 1,
3300            max_hops: Some(1),
3301            input: Box::new(scan_person("b")),
3302            path_alias: None,
3303            path_mode: PathMode::Walk,
3304        });
3305        let ca = LogicalOperator::Expand(ExpandOp {
3306            from_variable: "c".to_string(),
3307            to_variable: "a".to_string(),
3308            edge_variable: None,
3309            direction: ExpandDirection::Outgoing,
3310            edge_types: vec!["KNOWS".to_string()],
3311            min_hops: 1,
3312            max_hops: Some(1),
3313            input: Box::new(scan_person("c")),
3314            path_alias: None,
3315            path_mode: PathMode::Walk,
3316        });
3317        let logical = LogicalPlan::new(LogicalOperator::MultiWayJoin(MultiWayJoinOp {
3318            inputs: vec![ab, bc, ca],
3319            conditions: vec![],
3320            shared_variables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
3321        }));
3322        let _ = planner.plan(&logical);
3323    }
3324
3325    #[test]
3326    fn test_plan_horizontal_aggregate_dispatch() {
3327        // Variable-length expand produces a list column that the aggregate targets.
3328        let store = full_store();
3329        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStoreSearch>);
3330
3331        let path = LogicalOperator::Expand(ExpandOp {
3332            from_variable: "a".to_string(),
3333            to_variable: "b".to_string(),
3334            edge_variable: Some("r".to_string()),
3335            direction: ExpandDirection::Outgoing,
3336            edge_types: vec!["KNOWS".to_string()],
3337            min_hops: 1,
3338            max_hops: Some(3),
3339            input: Box::new(scan_person("a")),
3340            path_alias: Some("p".to_string()),
3341            path_mode: PathMode::Walk,
3342        });
3343        // Variable-length expand emits a column named _path_edges_p.
3344        let logical = LogicalPlan::new(LogicalOperator::HorizontalAggregate(
3345            HorizontalAggregateOp {
3346                list_column: "_path_edges_p".to_string(),
3347                entity_kind: crate::query::plan::EntityKind::Edge,
3348                function: LogicalAggregateFunction::Count,
3349                property: "weight".to_string(),
3350                alias: "edge_count".to_string(),
3351                input: Box::new(path),
3352            },
3353        ));
3354        let physical = planner.plan(&logical).unwrap();
3355        assert!(physical.columns().contains(&"edge_count".to_string()));
3356    }
3357
3358    // ==================== Cardinality estimation branches ====================
3359
3360    #[test]
3361    fn test_plan_adaptive_with_except() {
3362        let store = create_test_store();
3363        let planner = Planner::new(store);
3364        let logical = LogicalPlan::new(LogicalOperator::Except(ExceptOp {
3365            left: Box::new(scan_person("n")),
3366            right: Box::new(scan_person("n")),
3367            all: false,
3368        }));
3369        let physical = planner.plan_adaptive(&logical).unwrap();
3370        assert!(physical.adaptive_context.is_some());
3371    }
3372
3373    #[test]
3374    fn test_plan_adaptive_with_intersect() {
3375        let store = create_test_store();
3376        let planner = Planner::new(store);
3377        let logical = LogicalPlan::new(LogicalOperator::Intersect(IntersectOp {
3378            left: Box::new(scan_person("n")),
3379            right: Box::new(scan_any("n")),
3380            all: false,
3381        }));
3382        let physical = planner.plan_adaptive(&logical).unwrap();
3383        assert!(physical.adaptive_context.is_some());
3384    }
3385
3386    #[test]
3387    fn test_plan_adaptive_with_otherwise() {
3388        let store = create_test_store();
3389        let planner = Planner::new(store);
3390        let logical = LogicalPlan::new(LogicalOperator::Otherwise(OtherwiseOp {
3391            left: Box::new(scan_person("n")),
3392            right: Box::new(scan_any("n")),
3393        }));
3394        let physical = planner.plan_adaptive(&logical).unwrap();
3395        assert!(physical.adaptive_context.is_some());
3396    }
3397
3398    // ==================== count_expand_chain edge case ====================
3399
3400    #[test]
3401    fn test_count_expand_chain_variable_length_breaks_chain() {
3402        // A variable-length expand (not single-hop) should NOT count in the chain.
3403        let var_expand = LogicalOperator::Expand(ExpandOp {
3404            from_variable: "a".to_string(),
3405            to_variable: "b".to_string(),
3406            edge_variable: None,
3407            direction: ExpandDirection::Outgoing,
3408            edge_types: vec!["KNOWS".to_string()],
3409            min_hops: 1,
3410            max_hops: Some(3),
3411            input: Box::new(scan_person("a")),
3412            path_alias: None,
3413            path_mode: PathMode::Walk,
3414        });
3415        let (count, _) = Planner::count_expand_chain(&var_expand);
3416        assert_eq!(count, 0);
3417    }
3418
3419    // ==================== StaticResultOperator ====================
3420
3421    #[cfg(feature = "algos")]
3422    #[test]
3423    fn test_static_result_operator_emits_rows_and_resets() {
3424        use grafeo_common::types::Value;
3425        let rows = vec![
3426            vec![Value::Int64(1), Value::String("Vincent".into())],
3427            vec![Value::Int64(2), Value::String("Jules".into())],
3428        ];
3429        let mut op = StaticResultOperator {
3430            rows,
3431            column_indices: vec![0, 1],
3432            row_index: 0,
3433        };
3434        assert_eq!(op.name(), "StaticResult");
3435        let chunk = op.next().unwrap().expect("first chunk");
3436        assert_eq!(chunk.row_count(), 2);
3437        // Exhausted.
3438        assert!(op.next().unwrap().is_none());
3439        // Reset allows re-emitting.
3440        op.reset();
3441        assert!(op.next().unwrap().is_some());
3442        // into_any round trip.
3443        let boxed: Box<dyn Operator> = Box::new(StaticResultOperator {
3444            rows: vec![vec![Value::Null]],
3445            column_indices: vec![0],
3446            row_index: 0,
3447        });
3448        let _any = boxed.into_any();
3449    }
3450}