Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34    DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35    ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36    FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43    VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53    convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57/// Range bounds for property-based range queries.
58struct RangeBounds<'a> {
59    min: Option<&'a Value>,
60    max: Option<&'a Value>,
61    min_inclusive: bool,
62    max_inclusive: bool,
63}
64
65/// Converts a logical plan to a physical operator tree for LPG stores.
66pub struct Planner {
67    /// The graph store (read-only operations).
68    pub(super) store: Arc<dyn GraphStore>,
69    /// Writable graph store (None for read-only databases).
70    pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
71    /// Transaction manager for MVCC operations.
72    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
73    /// Current transaction ID (if in a transaction).
74    pub(super) transaction_id: Option<TransactionId>,
75    /// Epoch to use for visibility checks.
76    pub(super) viewing_epoch: EpochId,
77    /// Counter for generating unique anonymous edge column names.
78    pub(super) anon_edge_counter: std::cell::Cell<u32>,
79    /// Whether to use factorized execution for multi-hop queries.
80    pub(super) factorized_execution: bool,
81    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
82    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
83    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84    /// Variables that hold edge IDs (from MATCH edge patterns).
85    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
86    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
87    /// Optional constraint validator for schema enforcement during mutations.
88    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
89    /// Catalog for user-defined procedure lookup.
90    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
91    /// Shared parameter state for the currently planning correlated Apply.
92    /// Set by `plan_apply` before planning the inner operator, consumed by
93    /// `plan_operator` when encountering `ParameterScan`.
94    pub(super) correlated_param_state:
95        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
96    /// Variables from variable-length expand patterns (group-list variables).
97    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
98    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
99    /// When true, each physical operator is wrapped in `ProfiledOperator`.
100    profiling: std::cell::Cell<bool>,
101    /// Profile entries collected during planning (post-order).
102    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
103    /// Optional write tracker for recording writes during mutations.
104    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
105    /// Session context for introspection functions (info, schema, current_schema, etc.).
106    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
107    /// When true, expand operators use epoch-only visibility (no MVCC version
108    /// chain walks).  Set when the plan contains no mutations, so PENDING
109    /// writes are impossible to observe.
110    pub(super) read_only: bool,
111}
112
113impl Planner {
114    /// Creates a new planner with the given store.
115    ///
116    /// This creates a planner without transaction context, using the current
117    /// epoch from the store for visibility.
118    #[must_use]
119    pub fn new(store: Arc<dyn GraphStore>) -> Self {
120        let epoch = store.current_epoch();
121        Self {
122            store,
123            write_store: None,
124            transaction_manager: None,
125            transaction_id: None,
126            viewing_epoch: epoch,
127            anon_edge_counter: std::cell::Cell::new(0),
128            factorized_execution: true,
129            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131            validator: None,
132            catalog: None,
133            correlated_param_state: std::cell::RefCell::new(None),
134            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
135            profiling: std::cell::Cell::new(false),
136            profile_entries: std::cell::RefCell::new(Vec::new()),
137            write_tracker: None,
138            session_context: grafeo_core::execution::operators::SessionContext::default(),
139            read_only: false,
140        }
141    }
142
143    /// Creates a new planner with transaction context for MVCC-aware planning.
144    #[must_use]
145    pub fn with_context(
146        store: Arc<dyn GraphStore>,
147        write_store: Option<Arc<dyn GraphStoreMut>>,
148        transaction_manager: Arc<TransactionManager>,
149        transaction_id: Option<TransactionId>,
150        viewing_epoch: EpochId,
151    ) -> Self {
152        use crate::transaction::TransactionWriteTracker;
153
154        // Create write tracker when there's an active transaction
155        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
156            if transaction_id.is_some() {
157                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
158                    &transaction_manager,
159                ))))
160            } else {
161                None
162            };
163
164        Self {
165            store,
166            write_store,
167            transaction_manager: Some(transaction_manager),
168            transaction_id,
169            viewing_epoch,
170            anon_edge_counter: std::cell::Cell::new(0),
171            factorized_execution: true,
172            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
173            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
174            validator: None,
175            catalog: None,
176            correlated_param_state: std::cell::RefCell::new(None),
177            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
178            profiling: std::cell::Cell::new(false),
179            profile_entries: std::cell::RefCell::new(Vec::new()),
180            write_tracker,
181            session_context: grafeo_core::execution::operators::SessionContext::default(),
182            read_only: false,
183        }
184    }
185
186    /// Marks this planner as planning a read-only query (no mutations),
187    /// enabling fast-path visibility checks in expand operators.
188    #[must_use]
189    pub fn with_read_only(mut self, read_only: bool) -> Self {
190        self.read_only = read_only;
191        self
192    }
193
194    /// Returns the writable store, or `TransactionError::ReadOnly` if unavailable.
195    fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
196        self.write_store
197            .as_ref()
198            .map(Arc::clone)
199            .ok_or(Error::Transaction(
200                grafeo_common::utils::error::TransactionError::ReadOnly,
201            ))
202    }
203
204    /// Returns the viewing epoch for this planner.
205    #[must_use]
206    pub fn viewing_epoch(&self) -> EpochId {
207        self.viewing_epoch
208    }
209
210    /// Returns the transaction ID for this planner, if any.
211    #[must_use]
212    pub fn transaction_id(&self) -> Option<TransactionId> {
213        self.transaction_id
214    }
215
216    /// Returns a reference to the transaction manager, if available.
217    #[must_use]
218    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
219        self.transaction_manager.as_ref()
220    }
221
222    /// Enables or disables factorized execution for multi-hop queries.
223    #[must_use]
224    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
225        self.factorized_execution = enabled;
226        self
227    }
228
229    /// Sets the constraint validator for schema enforcement during mutations.
230    #[must_use]
231    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
232        self.validator = Some(validator);
233        self
234    }
235
236    /// Sets the catalog for user-defined procedure lookup.
237    #[must_use]
238    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
239        self.catalog = Some(catalog);
240        self
241    }
242
243    /// Sets the session context for introspection functions.
244    #[must_use]
245    pub fn with_session_context(
246        mut self,
247        context: grafeo_core::execution::operators::SessionContext,
248    ) -> Self {
249        self.session_context = context;
250        self
251    }
252
253    /// Counts consecutive single-hop expand operations.
254    ///
255    /// Returns the count and the deepest non-expand operator (the base of the chain).
256    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
257        match op {
258            LogicalOperator::Expand(expand) => {
259                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
260
261                if is_single_hop {
262                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
263                    (inner_count + 1, base)
264                } else {
265                    (0, op)
266                }
267            }
268            _ => (0, op),
269        }
270    }
271
272    /// Collects expand operations from the outermost down to the base.
273    ///
274    /// Returns expands in order from innermost (base) to outermost.
275    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
276        let mut chain = Vec::new();
277        let mut current = op;
278
279        while let LogicalOperator::Expand(expand) = current {
280            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
281            if !is_single_hop {
282                break;
283            }
284            chain.push(expand);
285            current = &expand.input;
286        }
287
288        chain.reverse();
289        chain
290    }
291
292    /// Plans a logical plan into a physical operator.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the logical plan contains unsupported operators
297    /// or invalid expressions.
298    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
299        let _span = grafeo_debug_span!("grafeo::query::plan");
300        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
301        Ok(PhysicalPlan {
302            operator,
303            columns,
304            adaptive_context: None,
305        })
306    }
307
308    /// Plans a logical plan with profiling: each physical operator is wrapped
309    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
310    /// collect row counts and timing. Returns the physical plan together with
311    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
312    /// items in post-order (children before parents).
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if the logical plan contains unsupported operators
317    /// or invalid expressions.
318    pub fn plan_profiled(
319        &self,
320        logical_plan: &LogicalPlan,
321    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
322        self.profiling.set(true);
323        self.profile_entries.borrow_mut().clear();
324
325        let result = self.plan_operator(&logical_plan.root);
326
327        self.profiling.set(false);
328        let (operator, columns) = result?;
329        let entries = self.profile_entries.borrow_mut().drain(..).collect();
330
331        Ok((
332            PhysicalPlan {
333                operator,
334                columns,
335                adaptive_context: None,
336            },
337            entries,
338        ))
339    }
340
341    /// Plans a logical plan with adaptive execution support.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the logical plan contains unsupported operators
346    /// or invalid expressions.
347    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
348        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
349
350        let mut adaptive_context = AdaptiveContext::new();
351        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
352
353        Ok(PhysicalPlan {
354            operator,
355            columns,
356            adaptive_context: Some(adaptive_context),
357        })
358    }
359
360    /// Collects cardinality estimates from the logical plan into an adaptive context.
361    fn collect_cardinality_estimates(
362        &self,
363        op: &LogicalOperator,
364        ctx: &mut AdaptiveContext,
365        depth: usize,
366    ) {
367        match op {
368            LogicalOperator::NodeScan(scan) => {
369                let estimate = if let Some(label) = &scan.label {
370                    self.store.nodes_by_label(label).len() as f64
371                } else {
372                    self.store.node_count() as f64
373                };
374                let id = format!("scan_{}", scan.variable);
375                ctx.set_estimate(&id, estimate);
376
377                if let Some(input) = &scan.input {
378                    self.collect_cardinality_estimates(input, ctx, depth + 1);
379                }
380            }
381            LogicalOperator::Filter(filter) => {
382                let input_estimate = self.estimate_cardinality(&filter.input);
383                let estimate = input_estimate * 0.3;
384                let id = format!("filter_{depth}");
385                ctx.set_estimate(&id, estimate);
386
387                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
388            }
389            LogicalOperator::Expand(expand) => {
390                let input_estimate = self.estimate_cardinality(&expand.input);
391                let stats = self.store.statistics();
392                let avg_degree = self.estimate_expand_degree(&stats, expand);
393                let estimate = input_estimate * avg_degree;
394                let id = format!("expand_{}", expand.to_variable);
395                ctx.set_estimate(&id, estimate);
396
397                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
398            }
399            LogicalOperator::Join(join) => {
400                let left_est = self.estimate_cardinality(&join.left);
401                let right_est = self.estimate_cardinality(&join.right);
402                let estimate = (left_est * right_est).sqrt();
403                let id = format!("join_{depth}");
404                ctx.set_estimate(&id, estimate);
405
406                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
407                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
408            }
409            LogicalOperator::Aggregate(agg) => {
410                let input_estimate = self.estimate_cardinality(&agg.input);
411                let estimate = if agg.group_by.is_empty() {
412                    1.0
413                } else {
414                    (input_estimate * 0.1).max(1.0)
415                };
416                let id = format!("aggregate_{depth}");
417                ctx.set_estimate(&id, estimate);
418
419                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
420            }
421            LogicalOperator::Distinct(distinct) => {
422                let input_estimate = self.estimate_cardinality(&distinct.input);
423                let estimate = (input_estimate * 0.5).max(1.0);
424                let id = format!("distinct_{depth}");
425                ctx.set_estimate(&id, estimate);
426
427                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
428            }
429            LogicalOperator::Return(ret) => {
430                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
431            }
432            LogicalOperator::Limit(limit) => {
433                let input_estimate = self.estimate_cardinality(&limit.input);
434                let estimate = (input_estimate).min(limit.count.estimate());
435                let id = format!("limit_{depth}");
436                ctx.set_estimate(&id, estimate);
437
438                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
439            }
440            LogicalOperator::Skip(skip) => {
441                let input_estimate = self.estimate_cardinality(&skip.input);
442                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
443                let id = format!("skip_{depth}");
444                ctx.set_estimate(&id, estimate);
445
446                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
447            }
448            LogicalOperator::Sort(sort) => {
449                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
450            }
451            LogicalOperator::Union(union) => {
452                let estimate: f64 = union
453                    .inputs
454                    .iter()
455                    .map(|input| self.estimate_cardinality(input))
456                    .sum();
457                let id = format!("union_{depth}");
458                ctx.set_estimate(&id, estimate);
459
460                for input in &union.inputs {
461                    self.collect_cardinality_estimates(input, ctx, depth + 1);
462                }
463            }
464            _ => {
465                // For other operators, try to recurse into known input patterns
466            }
467        }
468    }
469
470    /// Estimates cardinality for a logical operator subtree.
471    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
472        match op {
473            LogicalOperator::NodeScan(scan) => {
474                if let Some(label) = &scan.label {
475                    self.store.nodes_by_label(label).len() as f64
476                } else {
477                    self.store.node_count() as f64
478                }
479            }
480            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
481            LogicalOperator::Expand(expand) => {
482                let stats = self.store.statistics();
483                let avg_degree = self.estimate_expand_degree(&stats, expand);
484                self.estimate_cardinality(&expand.input) * avg_degree
485            }
486            LogicalOperator::Join(join) => {
487                let left = self.estimate_cardinality(&join.left);
488                let right = self.estimate_cardinality(&join.right);
489                (left * right).sqrt()
490            }
491            LogicalOperator::Aggregate(agg) => {
492                if agg.group_by.is_empty() {
493                    1.0
494                } else {
495                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
496                }
497            }
498            LogicalOperator::Distinct(distinct) => {
499                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
500            }
501            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
502            LogicalOperator::Limit(limit) => self
503                .estimate_cardinality(&limit.input)
504                .min(limit.count.estimate()),
505            LogicalOperator::Skip(skip) => {
506                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
507            }
508            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
509            LogicalOperator::Union(union) => union
510                .inputs
511                .iter()
512                .map(|input| self.estimate_cardinality(input))
513                .sum(),
514            LogicalOperator::Except(except) => {
515                let left = self.estimate_cardinality(&except.left);
516                let right = self.estimate_cardinality(&except.right);
517                (left - right).max(0.0)
518            }
519            LogicalOperator::Intersect(intersect) => {
520                let left = self.estimate_cardinality(&intersect.left);
521                let right = self.estimate_cardinality(&intersect.right);
522                left.min(right)
523            }
524            LogicalOperator::Otherwise(otherwise) => self
525                .estimate_cardinality(&otherwise.left)
526                .max(self.estimate_cardinality(&otherwise.right)),
527            _ => 1000.0,
528        }
529    }
530
531    /// Estimates the average edge degree for an expand operation using store statistics.
532    fn estimate_expand_degree(
533        &self,
534        stats: &grafeo_core::statistics::Statistics,
535        expand: &ExpandOp,
536    ) -> f64 {
537        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
538        if expand.edge_types.len() == 1 {
539            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
540        } else if stats.total_nodes > 0 {
541            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
542        } else {
543            10.0
544        }
545    }
546
547    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
548    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
549    fn maybe_profile(
550        &self,
551        result: Result<(Box<dyn Operator>, Vec<String>)>,
552        op: &LogicalOperator,
553    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
554        if self.profiling.get() {
555            let (physical, columns) = result?;
556            let (entry, stats) =
557                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
558            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
559            self.profile_entries.borrow_mut().push(entry);
560            Ok((Box::new(profiled), columns))
561        } else {
562            result
563        }
564    }
565
566    /// Plans a single logical operator.
567    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
568        let result = match op {
569            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
570            LogicalOperator::Expand(expand) => {
571                if self.factorized_execution {
572                    let (chain_len, _base) = Self::count_expand_chain(op);
573                    if chain_len >= 2 {
574                        return self.maybe_profile(self.plan_expand_chain(op), op);
575                    }
576                }
577                self.plan_expand(expand)
578            }
579            LogicalOperator::Return(ret) => self.plan_return(ret),
580            LogicalOperator::Filter(filter) => self.plan_filter(filter),
581            LogicalOperator::Project(project) => self.plan_project(project),
582            LogicalOperator::Limit(limit) => self.plan_limit(limit),
583            LogicalOperator::Skip(skip) => self.plan_skip(skip),
584            LogicalOperator::Sort(sort) => self.plan_sort(sort),
585            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
586            LogicalOperator::Join(join) => self.plan_join(join),
587            LogicalOperator::Union(union) => self.plan_union(union),
588            LogicalOperator::Except(except) => self.plan_except(except),
589            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
590            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
591            LogicalOperator::Apply(apply) => self.plan_apply(apply),
592            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
593            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
594            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
595            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
596            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
597            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
598            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
599            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
600            LogicalOperator::Merge(merge) => self.plan_merge(merge),
601            LogicalOperator::MergeRelationship(merge_rel) => {
602                self.plan_merge_relationship(merge_rel)
603            }
604            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
605            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
606            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
607            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
608            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
609            #[cfg(feature = "algos")]
610            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
611            #[cfg(not(feature = "algos"))]
612            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
613                "CALL procedures require the 'algos' feature".to_string(),
614            )),
615            LogicalOperator::ParameterScan(_param_scan) => {
616                let state = self
617                    .correlated_param_state
618                    .borrow()
619                    .clone()
620                    .ok_or_else(|| {
621                        Error::Internal(
622                            "ParameterScan without correlated Apply context".to_string(),
623                        )
624                    })?;
625                // Use the actual column names from the ParameterState (which may
626                // have been expanded from "*" to real variable names in plan_apply)
627                let columns = state.columns.clone();
628                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
629                Ok((operator, columns))
630            }
631            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
632            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
633            LogicalOperator::LoadData(load) => {
634                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
635                    load.path.clone(),
636                    load.format,
637                    load.with_headers,
638                    load.field_terminator,
639                    load.variable.clone(),
640                ));
641                Ok((operator, vec![load.variable.clone()]))
642            }
643            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
644            LogicalOperator::VectorScan(_) => Err(Error::Internal(
645                "VectorScan requires vector-index feature".to_string(),
646            )),
647            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
648                "VectorJoin requires vector-index feature".to_string(),
649            )),
650            _ => Err(Error::Internal(format!(
651                "Unsupported operator: {:?}",
652                std::mem::discriminant(op)
653            ))),
654        };
655        self.maybe_profile(result, op)
656    }
657
658    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
659    fn plan_horizontal_aggregate(
660        &self,
661        ha: &HorizontalAggregateOp,
662    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
663        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
664
665        let list_col_idx = child_columns
666            .iter()
667            .position(|c| c == &ha.list_column)
668            .ok_or_else(|| {
669                Error::Internal(format!(
670                    "HorizontalAggregate list column '{}' not found in {:?}",
671                    ha.list_column, child_columns
672                ))
673            })?;
674
675        let entity_kind = match ha.entity_kind {
676            LogicalEntityKind::Edge => EntityKind::Edge,
677            LogicalEntityKind::Node => EntityKind::Node,
678        };
679
680        let function = convert_aggregate_function(ha.function);
681        let input_column_count = child_columns.len();
682
683        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
684            child_op,
685            list_col_idx,
686            entity_kind,
687            function,
688            ha.property.clone(),
689            Arc::clone(&self.store) as Arc<dyn GraphStore>,
690            input_column_count,
691        ));
692
693        let mut columns = child_columns;
694        columns.push(ha.alias.clone());
695        // Mark the result as a scalar column
696        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
697
698        Ok((operator, columns))
699    }
700
701    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
702    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
703        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
704        let key_idx = child_columns
705            .iter()
706            .position(|c| c == &mc.key_var)
707            .ok_or_else(|| {
708                Error::Internal(format!(
709                    "MapCollect key '{}' not in columns {:?}",
710                    mc.key_var, child_columns
711                ))
712            })?;
713        let value_idx = child_columns
714            .iter()
715            .position(|c| c == &mc.value_var)
716            .ok_or_else(|| {
717                Error::Internal(format!(
718                    "MapCollect value '{}' not in columns {:?}",
719                    mc.value_var, child_columns
720                ))
721            })?;
722        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
723        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
724        Ok((operator, vec![mc.alias.clone()]))
725    }
726}
727
728/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
729#[cfg(feature = "algos")]
730struct StaticResultOperator {
731    rows: Vec<Vec<Value>>,
732    column_indices: Vec<usize>,
733    row_index: usize,
734}
735
736#[cfg(feature = "algos")]
737impl Operator for StaticResultOperator {
738    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
739        use grafeo_core::execution::DataChunk;
740
741        if self.row_index >= self.rows.len() {
742            return Ok(None);
743        }
744
745        let remaining = self.rows.len() - self.row_index;
746        let chunk_rows = remaining.min(1024);
747        let col_count = self.column_indices.len();
748
749        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
750        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
751
752        for row_offset in 0..chunk_rows {
753            let row = &self.rows[self.row_index + row_offset];
754            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
755                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
756                if let Some(col) = chunk.column_mut(col_idx) {
757                    col.push_value(value);
758                }
759            }
760        }
761        chunk.set_count(chunk_rows);
762
763        self.row_index += chunk_rows;
764        Ok(Some(chunk))
765    }
766
767    fn reset(&mut self) {
768        self.row_index = 0;
769    }
770
771    fn name(&self) -> &'static str {
772        "StaticResult"
773    }
774}
775
776#[cfg(test)]
777mod tests {
778    use super::*;
779    use crate::query::plan::{
780        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
781        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
782        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
783        SkipOp as LogicalSkipOp, SortKey, SortOp,
784    };
785    use grafeo_common::types::Value;
786    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
787    use grafeo_core::graph::GraphStoreMut;
788    use grafeo_core::graph::lpg::LpgStore;
789
790    fn create_test_store() -> Arc<LpgStore> {
791        let store = Arc::new(LpgStore::new().unwrap());
792        store.create_node(&["Person"]);
793        store.create_node(&["Person"]);
794        store.create_node(&["Company"]);
795        store
796    }
797
798    // ==================== Simple Scan Tests ====================
799
800    #[test]
801    fn test_plan_simple_scan() {
802        let store = create_test_store();
803        let planner = Planner::new(store);
804
805        // MATCH (n:Person) RETURN n
806        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
807            items: vec![ReturnItem {
808                expression: LogicalExpression::Variable("n".to_string()),
809                alias: None,
810            }],
811            distinct: false,
812            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
813                variable: "n".to_string(),
814                label: Some("Person".to_string()),
815                input: None,
816            })),
817        }));
818
819        let physical = planner.plan(&logical).unwrap();
820        assert_eq!(physical.columns(), &["n"]);
821    }
822
823    #[test]
824    fn test_plan_scan_without_label() {
825        let store = create_test_store();
826        let planner = Planner::new(store);
827
828        // MATCH (n) RETURN n
829        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
830            items: vec![ReturnItem {
831                expression: LogicalExpression::Variable("n".to_string()),
832                alias: None,
833            }],
834            distinct: false,
835            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
836                variable: "n".to_string(),
837                label: None,
838                input: None,
839            })),
840        }));
841
842        let physical = planner.plan(&logical).unwrap();
843        assert_eq!(physical.columns(), &["n"]);
844    }
845
846    #[test]
847    fn test_plan_return_with_alias() {
848        let store = create_test_store();
849        let planner = Planner::new(store);
850
851        // MATCH (n:Person) RETURN n AS person
852        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
853            items: vec![ReturnItem {
854                expression: LogicalExpression::Variable("n".to_string()),
855                alias: Some("person".to_string()),
856            }],
857            distinct: false,
858            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
859                variable: "n".to_string(),
860                label: Some("Person".to_string()),
861                input: None,
862            })),
863        }));
864
865        let physical = planner.plan(&logical).unwrap();
866        assert_eq!(physical.columns(), &["person"]);
867    }
868
869    #[test]
870    fn test_plan_return_property() {
871        let store = create_test_store();
872        let planner = Planner::new(store);
873
874        // MATCH (n:Person) RETURN n.name
875        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
876            items: vec![ReturnItem {
877                expression: LogicalExpression::Property {
878                    variable: "n".to_string(),
879                    property: "name".to_string(),
880                },
881                alias: None,
882            }],
883            distinct: false,
884            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
885                variable: "n".to_string(),
886                label: Some("Person".to_string()),
887                input: None,
888            })),
889        }));
890
891        let physical = planner.plan(&logical).unwrap();
892        assert_eq!(physical.columns(), &["n.name"]);
893    }
894
895    #[test]
896    fn test_plan_return_literal() {
897        let store = create_test_store();
898        let planner = Planner::new(store);
899
900        // MATCH (n) RETURN 42 AS answer
901        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
902            items: vec![ReturnItem {
903                expression: LogicalExpression::Literal(Value::Int64(42)),
904                alias: Some("answer".to_string()),
905            }],
906            distinct: false,
907            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
908                variable: "n".to_string(),
909                label: None,
910                input: None,
911            })),
912        }));
913
914        let physical = planner.plan(&logical).unwrap();
915        assert_eq!(physical.columns(), &["answer"]);
916    }
917
918    // ==================== Filter Tests ====================
919
920    #[test]
921    fn test_plan_filter_equality() {
922        let store = create_test_store();
923        let planner = Planner::new(store);
924
925        // MATCH (n:Person) WHERE n.age = 30 RETURN n
926        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
927            items: vec![ReturnItem {
928                expression: LogicalExpression::Variable("n".to_string()),
929                alias: None,
930            }],
931            distinct: false,
932            input: Box::new(LogicalOperator::Filter(FilterOp {
933                predicate: LogicalExpression::Binary {
934                    left: Box::new(LogicalExpression::Property {
935                        variable: "n".to_string(),
936                        property: "age".to_string(),
937                    }),
938                    op: BinaryOp::Eq,
939                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
940                },
941                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
942                    variable: "n".to_string(),
943                    label: Some("Person".to_string()),
944                    input: None,
945                })),
946                pushdown_hint: None,
947            })),
948        }));
949
950        let physical = planner.plan(&logical).unwrap();
951        assert_eq!(physical.columns(), &["n"]);
952    }
953
954    #[test]
955    fn test_plan_filter_compound_and() {
956        let store = create_test_store();
957        let planner = Planner::new(store);
958
959        // WHERE n.age > 20 AND n.age < 40
960        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
961            items: vec![ReturnItem {
962                expression: LogicalExpression::Variable("n".to_string()),
963                alias: None,
964            }],
965            distinct: false,
966            input: Box::new(LogicalOperator::Filter(FilterOp {
967                predicate: LogicalExpression::Binary {
968                    left: Box::new(LogicalExpression::Binary {
969                        left: Box::new(LogicalExpression::Property {
970                            variable: "n".to_string(),
971                            property: "age".to_string(),
972                        }),
973                        op: BinaryOp::Gt,
974                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
975                    }),
976                    op: BinaryOp::And,
977                    right: Box::new(LogicalExpression::Binary {
978                        left: Box::new(LogicalExpression::Property {
979                            variable: "n".to_string(),
980                            property: "age".to_string(),
981                        }),
982                        op: BinaryOp::Lt,
983                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
984                    }),
985                },
986                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
987                    variable: "n".to_string(),
988                    label: None,
989                    input: None,
990                })),
991                pushdown_hint: None,
992            })),
993        }));
994
995        let physical = planner.plan(&logical).unwrap();
996        assert_eq!(physical.columns(), &["n"]);
997    }
998
999    #[test]
1000    fn test_plan_filter_unary_not() {
1001        let store = create_test_store();
1002        let planner = Planner::new(store);
1003
1004        // WHERE NOT n.active
1005        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1006            items: vec![ReturnItem {
1007                expression: LogicalExpression::Variable("n".to_string()),
1008                alias: None,
1009            }],
1010            distinct: false,
1011            input: Box::new(LogicalOperator::Filter(FilterOp {
1012                predicate: LogicalExpression::Unary {
1013                    op: UnaryOp::Not,
1014                    operand: Box::new(LogicalExpression::Property {
1015                        variable: "n".to_string(),
1016                        property: "active".to_string(),
1017                    }),
1018                },
1019                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1020                    variable: "n".to_string(),
1021                    label: None,
1022                    input: None,
1023                })),
1024                pushdown_hint: None,
1025            })),
1026        }));
1027
1028        let physical = planner.plan(&logical).unwrap();
1029        assert_eq!(physical.columns(), &["n"]);
1030    }
1031
1032    #[test]
1033    fn test_plan_filter_is_null() {
1034        let store = create_test_store();
1035        let planner = Planner::new(store);
1036
1037        // WHERE n.email IS NULL
1038        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1039            items: vec![ReturnItem {
1040                expression: LogicalExpression::Variable("n".to_string()),
1041                alias: None,
1042            }],
1043            distinct: false,
1044            input: Box::new(LogicalOperator::Filter(FilterOp {
1045                predicate: LogicalExpression::Unary {
1046                    op: UnaryOp::IsNull,
1047                    operand: Box::new(LogicalExpression::Property {
1048                        variable: "n".to_string(),
1049                        property: "email".to_string(),
1050                    }),
1051                },
1052                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1053                    variable: "n".to_string(),
1054                    label: None,
1055                    input: None,
1056                })),
1057                pushdown_hint: None,
1058            })),
1059        }));
1060
1061        let physical = planner.plan(&logical).unwrap();
1062        assert_eq!(physical.columns(), &["n"]);
1063    }
1064
1065    #[test]
1066    fn test_plan_filter_function_call() {
1067        let store = create_test_store();
1068        let planner = Planner::new(store);
1069
1070        // WHERE size(n.friends) > 0
1071        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1072            items: vec![ReturnItem {
1073                expression: LogicalExpression::Variable("n".to_string()),
1074                alias: None,
1075            }],
1076            distinct: false,
1077            input: Box::new(LogicalOperator::Filter(FilterOp {
1078                predicate: LogicalExpression::Binary {
1079                    left: Box::new(LogicalExpression::FunctionCall {
1080                        name: "size".to_string(),
1081                        args: vec![LogicalExpression::Property {
1082                            variable: "n".to_string(),
1083                            property: "friends".to_string(),
1084                        }],
1085                        distinct: false,
1086                    }),
1087                    op: BinaryOp::Gt,
1088                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1089                },
1090                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1091                    variable: "n".to_string(),
1092                    label: None,
1093                    input: None,
1094                })),
1095                pushdown_hint: None,
1096            })),
1097        }));
1098
1099        let physical = planner.plan(&logical).unwrap();
1100        assert_eq!(physical.columns(), &["n"]);
1101    }
1102
1103    // ==================== Expand Tests ====================
1104
1105    #[test]
1106    fn test_plan_expand_outgoing() {
1107        let store = create_test_store();
1108        let planner = Planner::new(store);
1109
1110        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1111        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1112            items: vec![
1113                ReturnItem {
1114                    expression: LogicalExpression::Variable("a".to_string()),
1115                    alias: None,
1116                },
1117                ReturnItem {
1118                    expression: LogicalExpression::Variable("b".to_string()),
1119                    alias: None,
1120                },
1121            ],
1122            distinct: false,
1123            input: Box::new(LogicalOperator::Expand(ExpandOp {
1124                from_variable: "a".to_string(),
1125                to_variable: "b".to_string(),
1126                edge_variable: None,
1127                direction: ExpandDirection::Outgoing,
1128                edge_types: vec!["KNOWS".to_string()],
1129                min_hops: 1,
1130                max_hops: Some(1),
1131                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1132                    variable: "a".to_string(),
1133                    label: Some("Person".to_string()),
1134                    input: None,
1135                })),
1136                path_alias: None,
1137                path_mode: PathMode::Walk,
1138            })),
1139        }));
1140
1141        let physical = planner.plan(&logical).unwrap();
1142        // The return should have columns [a, b]
1143        assert!(physical.columns().contains(&"a".to_string()));
1144        assert!(physical.columns().contains(&"b".to_string()));
1145    }
1146
1147    #[test]
1148    fn test_plan_expand_with_edge_variable() {
1149        let store = create_test_store();
1150        let planner = Planner::new(store);
1151
1152        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1153        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1154            items: vec![
1155                ReturnItem {
1156                    expression: LogicalExpression::Variable("a".to_string()),
1157                    alias: None,
1158                },
1159                ReturnItem {
1160                    expression: LogicalExpression::Variable("r".to_string()),
1161                    alias: None,
1162                },
1163                ReturnItem {
1164                    expression: LogicalExpression::Variable("b".to_string()),
1165                    alias: None,
1166                },
1167            ],
1168            distinct: false,
1169            input: Box::new(LogicalOperator::Expand(ExpandOp {
1170                from_variable: "a".to_string(),
1171                to_variable: "b".to_string(),
1172                edge_variable: Some("r".to_string()),
1173                direction: ExpandDirection::Outgoing,
1174                edge_types: vec!["KNOWS".to_string()],
1175                min_hops: 1,
1176                max_hops: Some(1),
1177                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1178                    variable: "a".to_string(),
1179                    label: None,
1180                    input: None,
1181                })),
1182                path_alias: None,
1183                path_mode: PathMode::Walk,
1184            })),
1185        }));
1186
1187        let physical = planner.plan(&logical).unwrap();
1188        assert!(physical.columns().contains(&"a".to_string()));
1189        assert!(physical.columns().contains(&"r".to_string()));
1190        assert!(physical.columns().contains(&"b".to_string()));
1191    }
1192
1193    // ==================== Limit/Skip/Sort Tests ====================
1194
1195    #[test]
1196    fn test_plan_limit() {
1197        let store = create_test_store();
1198        let planner = Planner::new(store);
1199
1200        // MATCH (n) RETURN n LIMIT 10
1201        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1202            items: vec![ReturnItem {
1203                expression: LogicalExpression::Variable("n".to_string()),
1204                alias: None,
1205            }],
1206            distinct: false,
1207            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1208                count: 10.into(),
1209                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1210                    variable: "n".to_string(),
1211                    label: None,
1212                    input: None,
1213                })),
1214            })),
1215        }));
1216
1217        let physical = planner.plan(&logical).unwrap();
1218        assert_eq!(physical.columns(), &["n"]);
1219    }
1220
1221    #[test]
1222    fn test_plan_skip() {
1223        let store = create_test_store();
1224        let planner = Planner::new(store);
1225
1226        // MATCH (n) RETURN n SKIP 5
1227        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1228            items: vec![ReturnItem {
1229                expression: LogicalExpression::Variable("n".to_string()),
1230                alias: None,
1231            }],
1232            distinct: false,
1233            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1234                count: 5.into(),
1235                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1236                    variable: "n".to_string(),
1237                    label: None,
1238                    input: None,
1239                })),
1240            })),
1241        }));
1242
1243        let physical = planner.plan(&logical).unwrap();
1244        assert_eq!(physical.columns(), &["n"]);
1245    }
1246
1247    #[test]
1248    fn test_plan_sort() {
1249        let store = create_test_store();
1250        let planner = Planner::new(store);
1251
1252        // MATCH (n) RETURN n ORDER BY n.name ASC
1253        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1254            items: vec![ReturnItem {
1255                expression: LogicalExpression::Variable("n".to_string()),
1256                alias: None,
1257            }],
1258            distinct: false,
1259            input: Box::new(LogicalOperator::Sort(SortOp {
1260                keys: vec![SortKey {
1261                    expression: LogicalExpression::Variable("n".to_string()),
1262                    order: SortOrder::Ascending,
1263                    nulls: None,
1264                }],
1265                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1266                    variable: "n".to_string(),
1267                    label: None,
1268                    input: None,
1269                })),
1270            })),
1271        }));
1272
1273        let physical = planner.plan(&logical).unwrap();
1274        assert_eq!(physical.columns(), &["n"]);
1275    }
1276
1277    #[test]
1278    fn test_plan_sort_descending() {
1279        let store = create_test_store();
1280        let planner = Planner::new(store);
1281
1282        // ORDER BY n DESC
1283        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1284            items: vec![ReturnItem {
1285                expression: LogicalExpression::Variable("n".to_string()),
1286                alias: None,
1287            }],
1288            distinct: false,
1289            input: Box::new(LogicalOperator::Sort(SortOp {
1290                keys: vec![SortKey {
1291                    expression: LogicalExpression::Variable("n".to_string()),
1292                    order: SortOrder::Descending,
1293                    nulls: None,
1294                }],
1295                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1296                    variable: "n".to_string(),
1297                    label: None,
1298                    input: None,
1299                })),
1300            })),
1301        }));
1302
1303        let physical = planner.plan(&logical).unwrap();
1304        assert_eq!(physical.columns(), &["n"]);
1305    }
1306
1307    #[test]
1308    fn test_plan_distinct() {
1309        let store = create_test_store();
1310        let planner = Planner::new(store);
1311
1312        // MATCH (n) RETURN DISTINCT n
1313        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1314            items: vec![ReturnItem {
1315                expression: LogicalExpression::Variable("n".to_string()),
1316                alias: None,
1317            }],
1318            distinct: false,
1319            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1320                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1321                    variable: "n".to_string(),
1322                    label: None,
1323                    input: None,
1324                })),
1325                columns: None,
1326            })),
1327        }));
1328
1329        let physical = planner.plan(&logical).unwrap();
1330        assert_eq!(physical.columns(), &["n"]);
1331    }
1332
1333    #[test]
1334    fn test_plan_distinct_with_columns() {
1335        let store = create_test_store();
1336        let planner = Planner::new(store);
1337
1338        // DISTINCT on specific columns (column-specific dedup)
1339        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1340            items: vec![ReturnItem {
1341                expression: LogicalExpression::Variable("n".to_string()),
1342                alias: None,
1343            }],
1344            distinct: false,
1345            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1346                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1347                    variable: "n".to_string(),
1348                    label: None,
1349                    input: None,
1350                })),
1351                columns: Some(vec!["n".to_string()]),
1352            })),
1353        }));
1354
1355        let physical = planner.plan(&logical).unwrap();
1356        assert_eq!(physical.columns(), &["n"]);
1357    }
1358
1359    #[test]
1360    fn test_plan_distinct_with_nonexistent_columns() {
1361        let store = create_test_store();
1362        let planner = Planner::new(store);
1363
1364        // When distinct columns don't match any output columns,
1365        // it falls back to full-row distinct.
1366        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1367            items: vec![ReturnItem {
1368                expression: LogicalExpression::Variable("n".to_string()),
1369                alias: None,
1370            }],
1371            distinct: false,
1372            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1373                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1374                    variable: "n".to_string(),
1375                    label: None,
1376                    input: None,
1377                })),
1378                columns: Some(vec!["nonexistent".to_string()]),
1379            })),
1380        }));
1381
1382        let physical = planner.plan(&logical).unwrap();
1383        assert_eq!(physical.columns(), &["n"]);
1384    }
1385
1386    // ==================== Aggregate Tests ====================
1387
1388    #[test]
1389    fn test_plan_aggregate_count() {
1390        let store = create_test_store();
1391        let planner = Planner::new(store);
1392
1393        // MATCH (n) RETURN count(n)
1394        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1395            items: vec![ReturnItem {
1396                expression: LogicalExpression::Variable("cnt".to_string()),
1397                alias: None,
1398            }],
1399            distinct: false,
1400            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1401                group_by: vec![],
1402                aggregates: vec![LogicalAggregateExpr {
1403                    function: LogicalAggregateFunction::Count,
1404                    expression: Some(LogicalExpression::Variable("n".to_string())),
1405                    expression2: None,
1406                    distinct: false,
1407                    alias: Some("cnt".to_string()),
1408                    percentile: None,
1409                    separator: None,
1410                }],
1411                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1412                    variable: "n".to_string(),
1413                    label: None,
1414                    input: None,
1415                })),
1416                having: None,
1417            })),
1418        }));
1419
1420        let physical = planner.plan(&logical).unwrap();
1421        assert!(physical.columns().contains(&"cnt".to_string()));
1422    }
1423
1424    #[test]
1425    fn test_plan_aggregate_with_group_by() {
1426        let store = create_test_store();
1427        let planner = Planner::new(store);
1428
1429        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1430        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1431            group_by: vec![LogicalExpression::Property {
1432                variable: "n".to_string(),
1433                property: "city".to_string(),
1434            }],
1435            aggregates: vec![LogicalAggregateExpr {
1436                function: LogicalAggregateFunction::Count,
1437                expression: Some(LogicalExpression::Variable("n".to_string())),
1438                expression2: None,
1439                distinct: false,
1440                alias: Some("cnt".to_string()),
1441                percentile: None,
1442                separator: None,
1443            }],
1444            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1445                variable: "n".to_string(),
1446                label: Some("Person".to_string()),
1447                input: None,
1448            })),
1449            having: None,
1450        }));
1451
1452        let physical = planner.plan(&logical).unwrap();
1453        assert_eq!(physical.columns().len(), 2);
1454    }
1455
1456    #[test]
1457    fn test_plan_aggregate_sum() {
1458        let store = create_test_store();
1459        let planner = Planner::new(store);
1460
1461        // SUM(n.value)
1462        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1463            group_by: vec![],
1464            aggregates: vec![LogicalAggregateExpr {
1465                function: LogicalAggregateFunction::Sum,
1466                expression: Some(LogicalExpression::Property {
1467                    variable: "n".to_string(),
1468                    property: "value".to_string(),
1469                }),
1470                expression2: None,
1471                distinct: false,
1472                alias: Some("total".to_string()),
1473                percentile: None,
1474                separator: None,
1475            }],
1476            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1477                variable: "n".to_string(),
1478                label: None,
1479                input: None,
1480            })),
1481            having: None,
1482        }));
1483
1484        let physical = planner.plan(&logical).unwrap();
1485        assert!(physical.columns().contains(&"total".to_string()));
1486    }
1487
1488    #[test]
1489    fn test_plan_aggregate_avg() {
1490        let store = create_test_store();
1491        let planner = Planner::new(store);
1492
1493        // AVG(n.score)
1494        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1495            group_by: vec![],
1496            aggregates: vec![LogicalAggregateExpr {
1497                function: LogicalAggregateFunction::Avg,
1498                expression: Some(LogicalExpression::Property {
1499                    variable: "n".to_string(),
1500                    property: "score".to_string(),
1501                }),
1502                expression2: None,
1503                distinct: false,
1504                alias: Some("average".to_string()),
1505                percentile: None,
1506                separator: None,
1507            }],
1508            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1509                variable: "n".to_string(),
1510                label: None,
1511                input: None,
1512            })),
1513            having: None,
1514        }));
1515
1516        let physical = planner.plan(&logical).unwrap();
1517        assert!(physical.columns().contains(&"average".to_string()));
1518    }
1519
1520    #[test]
1521    fn test_plan_aggregate_min_max() {
1522        let store = create_test_store();
1523        let planner = Planner::new(store);
1524
1525        // MIN(n.age), MAX(n.age)
1526        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1527            group_by: vec![],
1528            aggregates: vec![
1529                LogicalAggregateExpr {
1530                    function: LogicalAggregateFunction::Min,
1531                    expression: Some(LogicalExpression::Property {
1532                        variable: "n".to_string(),
1533                        property: "age".to_string(),
1534                    }),
1535                    expression2: None,
1536                    distinct: false,
1537                    alias: Some("youngest".to_string()),
1538                    percentile: None,
1539                    separator: None,
1540                },
1541                LogicalAggregateExpr {
1542                    function: LogicalAggregateFunction::Max,
1543                    expression: Some(LogicalExpression::Property {
1544                        variable: "n".to_string(),
1545                        property: "age".to_string(),
1546                    }),
1547                    expression2: None,
1548                    distinct: false,
1549                    alias: Some("oldest".to_string()),
1550                    percentile: None,
1551                    separator: None,
1552                },
1553            ],
1554            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1555                variable: "n".to_string(),
1556                label: None,
1557                input: None,
1558            })),
1559            having: None,
1560        }));
1561
1562        let physical = planner.plan(&logical).unwrap();
1563        assert!(physical.columns().contains(&"youngest".to_string()));
1564        assert!(physical.columns().contains(&"oldest".to_string()));
1565    }
1566
1567    // ==================== Join Tests ====================
1568
1569    #[test]
1570    fn test_plan_inner_join() {
1571        let store = create_test_store();
1572        let planner = Planner::new(store);
1573
1574        // Inner join between two scans
1575        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1576            items: vec![
1577                ReturnItem {
1578                    expression: LogicalExpression::Variable("a".to_string()),
1579                    alias: None,
1580                },
1581                ReturnItem {
1582                    expression: LogicalExpression::Variable("b".to_string()),
1583                    alias: None,
1584                },
1585            ],
1586            distinct: false,
1587            input: Box::new(LogicalOperator::Join(JoinOp {
1588                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1589                    variable: "a".to_string(),
1590                    label: Some("Person".to_string()),
1591                    input: None,
1592                })),
1593                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1594                    variable: "b".to_string(),
1595                    label: Some("Company".to_string()),
1596                    input: None,
1597                })),
1598                join_type: JoinType::Inner,
1599                conditions: vec![JoinCondition {
1600                    left: LogicalExpression::Variable("a".to_string()),
1601                    right: LogicalExpression::Variable("b".to_string()),
1602                }],
1603            })),
1604        }));
1605
1606        let physical = planner.plan(&logical).unwrap();
1607        assert!(physical.columns().contains(&"a".to_string()));
1608        assert!(physical.columns().contains(&"b".to_string()));
1609    }
1610
1611    #[test]
1612    fn test_plan_cross_join() {
1613        let store = create_test_store();
1614        let planner = Planner::new(store);
1615
1616        // Cross join (no conditions)
1617        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1618            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1619                variable: "a".to_string(),
1620                label: None,
1621                input: None,
1622            })),
1623            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1624                variable: "b".to_string(),
1625                label: None,
1626                input: None,
1627            })),
1628            join_type: JoinType::Cross,
1629            conditions: vec![],
1630        }));
1631
1632        let physical = planner.plan(&logical).unwrap();
1633        assert_eq!(physical.columns().len(), 2);
1634    }
1635
1636    #[test]
1637    fn test_plan_left_join() {
1638        let store = create_test_store();
1639        let planner = Planner::new(store);
1640
1641        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1642            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1643                variable: "a".to_string(),
1644                label: None,
1645                input: None,
1646            })),
1647            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1648                variable: "b".to_string(),
1649                label: None,
1650                input: None,
1651            })),
1652            join_type: JoinType::Left,
1653            conditions: vec![],
1654        }));
1655
1656        let physical = planner.plan(&logical).unwrap();
1657        assert_eq!(physical.columns().len(), 2);
1658    }
1659
1660    // ==================== Mutation Tests ====================
1661
1662    fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1663        let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStore>);
1664        p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1665        p
1666    }
1667
1668    #[test]
1669    fn test_plan_create_node() {
1670        let store = create_test_store();
1671        let planner = create_writable_planner(&store);
1672
1673        // CREATE (n:Person {name: 'Alix'})
1674        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1675            variable: "n".to_string(),
1676            labels: vec!["Person".to_string()],
1677            properties: vec![(
1678                "name".to_string(),
1679                LogicalExpression::Literal(Value::String("Alix".into())),
1680            )],
1681            input: None,
1682        }));
1683
1684        let physical = planner.plan(&logical).unwrap();
1685        assert!(physical.columns().contains(&"n".to_string()));
1686    }
1687
1688    #[test]
1689    fn test_plan_create_edge() {
1690        let store = create_test_store();
1691        let planner = create_writable_planner(&store);
1692
1693        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1694        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1695            variable: Some("r".to_string()),
1696            from_variable: "a".to_string(),
1697            to_variable: "b".to_string(),
1698            edge_type: "KNOWS".to_string(),
1699            properties: vec![],
1700            input: Box::new(LogicalOperator::Join(JoinOp {
1701                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1702                    variable: "a".to_string(),
1703                    label: None,
1704                    input: None,
1705                })),
1706                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1707                    variable: "b".to_string(),
1708                    label: None,
1709                    input: None,
1710                })),
1711                join_type: JoinType::Cross,
1712                conditions: vec![],
1713            })),
1714        }));
1715
1716        let physical = planner.plan(&logical).unwrap();
1717        assert!(physical.columns().contains(&"r".to_string()));
1718    }
1719
1720    #[test]
1721    fn test_plan_delete_node() {
1722        let store = create_test_store();
1723        let planner = create_writable_planner(&store);
1724
1725        // MATCH (n) DELETE n
1726        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1727            variable: "n".to_string(),
1728            detach: false,
1729            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1730                variable: "n".to_string(),
1731                label: None,
1732                input: None,
1733            })),
1734        }));
1735
1736        let physical = planner.plan(&logical).unwrap();
1737        assert!(physical.columns().contains(&"n".to_string()));
1738    }
1739
1740    // ==================== Error Cases ====================
1741
1742    #[test]
1743    fn test_plan_empty_errors() {
1744        let store = create_test_store();
1745        let planner = Planner::new(store);
1746
1747        let logical = LogicalPlan::new(LogicalOperator::Empty);
1748        let result = planner.plan(&logical);
1749        assert!(result.is_err());
1750    }
1751
1752    #[test]
1753    fn test_plan_missing_variable_in_return() {
1754        let store = create_test_store();
1755        let planner = Planner::new(store);
1756
1757        // Return variable that doesn't exist in input
1758        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1759            items: vec![ReturnItem {
1760                expression: LogicalExpression::Variable("missing".to_string()),
1761                alias: None,
1762            }],
1763            distinct: false,
1764            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1765                variable: "n".to_string(),
1766                label: None,
1767                input: None,
1768            })),
1769        }));
1770
1771        let result = planner.plan(&logical);
1772        assert!(result.is_err());
1773    }
1774
1775    // ==================== Helper Function Tests ====================
1776
1777    #[test]
1778    fn test_convert_binary_ops() {
1779        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1780        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1781        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1782        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1783        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1784        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1785        assert!(convert_binary_op(BinaryOp::And).is_ok());
1786        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1787        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1788        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1789        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1790        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1791    }
1792
1793    #[test]
1794    fn test_convert_unary_ops() {
1795        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1796        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1797        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1798        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1799    }
1800
1801    #[test]
1802    fn test_convert_aggregate_functions() {
1803        assert!(matches!(
1804            convert_aggregate_function(LogicalAggregateFunction::Count),
1805            PhysicalAggregateFunction::Count
1806        ));
1807        assert!(matches!(
1808            convert_aggregate_function(LogicalAggregateFunction::Sum),
1809            PhysicalAggregateFunction::Sum
1810        ));
1811        assert!(matches!(
1812            convert_aggregate_function(LogicalAggregateFunction::Avg),
1813            PhysicalAggregateFunction::Avg
1814        ));
1815        assert!(matches!(
1816            convert_aggregate_function(LogicalAggregateFunction::Min),
1817            PhysicalAggregateFunction::Min
1818        ));
1819        assert!(matches!(
1820            convert_aggregate_function(LogicalAggregateFunction::Max),
1821            PhysicalAggregateFunction::Max
1822        ));
1823    }
1824
1825    #[test]
1826    fn test_planner_accessors() {
1827        let store = create_test_store();
1828        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1829
1830        assert!(planner.transaction_id().is_none());
1831        assert!(planner.transaction_manager().is_none());
1832        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1833    }
1834
1835    #[test]
1836    fn test_physical_plan_accessors() {
1837        let store = create_test_store();
1838        let planner = Planner::new(store);
1839
1840        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1841            variable: "n".to_string(),
1842            label: None,
1843            input: None,
1844        }));
1845
1846        let physical = planner.plan(&logical).unwrap();
1847        assert_eq!(physical.columns(), &["n"]);
1848
1849        // Test into_operator
1850        let _ = physical.into_operator();
1851    }
1852
1853    // ==================== Adaptive Planning Tests ====================
1854
1855    #[test]
1856    fn test_plan_adaptive_with_scan() {
1857        let store = create_test_store();
1858        let planner = Planner::new(store);
1859
1860        // MATCH (n:Person) RETURN n
1861        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1862            items: vec![ReturnItem {
1863                expression: LogicalExpression::Variable("n".to_string()),
1864                alias: None,
1865            }],
1866            distinct: false,
1867            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1868                variable: "n".to_string(),
1869                label: Some("Person".to_string()),
1870                input: None,
1871            })),
1872        }));
1873
1874        let physical = planner.plan_adaptive(&logical).unwrap();
1875        assert_eq!(physical.columns(), &["n"]);
1876        // Should have adaptive context with estimates
1877        assert!(physical.adaptive_context.is_some());
1878    }
1879
1880    #[test]
1881    fn test_plan_adaptive_with_filter() {
1882        let store = create_test_store();
1883        let planner = Planner::new(store);
1884
1885        // MATCH (n) WHERE n.age > 30 RETURN n
1886        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1887            items: vec![ReturnItem {
1888                expression: LogicalExpression::Variable("n".to_string()),
1889                alias: None,
1890            }],
1891            distinct: false,
1892            input: Box::new(LogicalOperator::Filter(FilterOp {
1893                predicate: LogicalExpression::Binary {
1894                    left: Box::new(LogicalExpression::Property {
1895                        variable: "n".to_string(),
1896                        property: "age".to_string(),
1897                    }),
1898                    op: BinaryOp::Gt,
1899                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1900                },
1901                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1902                    variable: "n".to_string(),
1903                    label: None,
1904                    input: None,
1905                })),
1906                pushdown_hint: None,
1907            })),
1908        }));
1909
1910        let physical = planner.plan_adaptive(&logical).unwrap();
1911        assert!(physical.adaptive_context.is_some());
1912    }
1913
1914    #[test]
1915    fn test_plan_adaptive_with_expand() {
1916        let store = create_test_store();
1917        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
1918            .with_factorized_execution(false);
1919
1920        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1921        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1922            items: vec![
1923                ReturnItem {
1924                    expression: LogicalExpression::Variable("a".to_string()),
1925                    alias: None,
1926                },
1927                ReturnItem {
1928                    expression: LogicalExpression::Variable("b".to_string()),
1929                    alias: None,
1930                },
1931            ],
1932            distinct: false,
1933            input: Box::new(LogicalOperator::Expand(ExpandOp {
1934                from_variable: "a".to_string(),
1935                to_variable: "b".to_string(),
1936                edge_variable: None,
1937                direction: ExpandDirection::Outgoing,
1938                edge_types: vec!["KNOWS".to_string()],
1939                min_hops: 1,
1940                max_hops: Some(1),
1941                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1942                    variable: "a".to_string(),
1943                    label: None,
1944                    input: None,
1945                })),
1946                path_alias: None,
1947                path_mode: PathMode::Walk,
1948            })),
1949        }));
1950
1951        let physical = planner.plan_adaptive(&logical).unwrap();
1952        assert!(physical.adaptive_context.is_some());
1953    }
1954
1955    #[test]
1956    fn test_plan_adaptive_with_join() {
1957        let store = create_test_store();
1958        let planner = Planner::new(store);
1959
1960        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1961            items: vec![
1962                ReturnItem {
1963                    expression: LogicalExpression::Variable("a".to_string()),
1964                    alias: None,
1965                },
1966                ReturnItem {
1967                    expression: LogicalExpression::Variable("b".to_string()),
1968                    alias: None,
1969                },
1970            ],
1971            distinct: false,
1972            input: Box::new(LogicalOperator::Join(JoinOp {
1973                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1974                    variable: "a".to_string(),
1975                    label: None,
1976                    input: None,
1977                })),
1978                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1979                    variable: "b".to_string(),
1980                    label: None,
1981                    input: None,
1982                })),
1983                join_type: JoinType::Cross,
1984                conditions: vec![],
1985            })),
1986        }));
1987
1988        let physical = planner.plan_adaptive(&logical).unwrap();
1989        assert!(physical.adaptive_context.is_some());
1990    }
1991
1992    #[test]
1993    fn test_plan_adaptive_with_aggregate() {
1994        let store = create_test_store();
1995        let planner = Planner::new(store);
1996
1997        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1998            group_by: vec![],
1999            aggregates: vec![LogicalAggregateExpr {
2000                function: LogicalAggregateFunction::Count,
2001                expression: Some(LogicalExpression::Variable("n".to_string())),
2002                expression2: None,
2003                distinct: false,
2004                alias: Some("cnt".to_string()),
2005                percentile: None,
2006                separator: None,
2007            }],
2008            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2009                variable: "n".to_string(),
2010                label: None,
2011                input: None,
2012            })),
2013            having: None,
2014        }));
2015
2016        let physical = planner.plan_adaptive(&logical).unwrap();
2017        assert!(physical.adaptive_context.is_some());
2018    }
2019
2020    #[test]
2021    fn test_plan_adaptive_with_distinct() {
2022        let store = create_test_store();
2023        let planner = Planner::new(store);
2024
2025        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2026            items: vec![ReturnItem {
2027                expression: LogicalExpression::Variable("n".to_string()),
2028                alias: None,
2029            }],
2030            distinct: false,
2031            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2032                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2033                    variable: "n".to_string(),
2034                    label: None,
2035                    input: None,
2036                })),
2037                columns: None,
2038            })),
2039        }));
2040
2041        let physical = planner.plan_adaptive(&logical).unwrap();
2042        assert!(physical.adaptive_context.is_some());
2043    }
2044
2045    #[test]
2046    fn test_plan_adaptive_with_limit() {
2047        let store = create_test_store();
2048        let planner = Planner::new(store);
2049
2050        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2051            items: vec![ReturnItem {
2052                expression: LogicalExpression::Variable("n".to_string()),
2053                alias: None,
2054            }],
2055            distinct: false,
2056            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2057                count: 10.into(),
2058                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2059                    variable: "n".to_string(),
2060                    label: None,
2061                    input: None,
2062                })),
2063            })),
2064        }));
2065
2066        let physical = planner.plan_adaptive(&logical).unwrap();
2067        assert!(physical.adaptive_context.is_some());
2068    }
2069
2070    #[test]
2071    fn test_plan_adaptive_with_skip() {
2072        let store = create_test_store();
2073        let planner = Planner::new(store);
2074
2075        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2076            items: vec![ReturnItem {
2077                expression: LogicalExpression::Variable("n".to_string()),
2078                alias: None,
2079            }],
2080            distinct: false,
2081            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2082                count: 5.into(),
2083                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2084                    variable: "n".to_string(),
2085                    label: None,
2086                    input: None,
2087                })),
2088            })),
2089        }));
2090
2091        let physical = planner.plan_adaptive(&logical).unwrap();
2092        assert!(physical.adaptive_context.is_some());
2093    }
2094
2095    #[test]
2096    fn test_plan_adaptive_with_sort() {
2097        let store = create_test_store();
2098        let planner = Planner::new(store);
2099
2100        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2101            items: vec![ReturnItem {
2102                expression: LogicalExpression::Variable("n".to_string()),
2103                alias: None,
2104            }],
2105            distinct: false,
2106            input: Box::new(LogicalOperator::Sort(SortOp {
2107                keys: vec![SortKey {
2108                    expression: LogicalExpression::Variable("n".to_string()),
2109                    order: SortOrder::Ascending,
2110                    nulls: None,
2111                }],
2112                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2113                    variable: "n".to_string(),
2114                    label: None,
2115                    input: None,
2116                })),
2117            })),
2118        }));
2119
2120        let physical = planner.plan_adaptive(&logical).unwrap();
2121        assert!(physical.adaptive_context.is_some());
2122    }
2123
2124    #[test]
2125    fn test_plan_adaptive_with_union() {
2126        let store = create_test_store();
2127        let planner = Planner::new(store);
2128
2129        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2130            items: vec![ReturnItem {
2131                expression: LogicalExpression::Variable("n".to_string()),
2132                alias: None,
2133            }],
2134            distinct: false,
2135            input: Box::new(LogicalOperator::Union(UnionOp {
2136                inputs: vec![
2137                    LogicalOperator::NodeScan(NodeScanOp {
2138                        variable: "n".to_string(),
2139                        label: Some("Person".to_string()),
2140                        input: None,
2141                    }),
2142                    LogicalOperator::NodeScan(NodeScanOp {
2143                        variable: "n".to_string(),
2144                        label: Some("Company".to_string()),
2145                        input: None,
2146                    }),
2147                ],
2148            })),
2149        }));
2150
2151        let physical = planner.plan_adaptive(&logical).unwrap();
2152        assert!(physical.adaptive_context.is_some());
2153    }
2154
2155    // ==================== Variable Length Path Tests ====================
2156
2157    #[test]
2158    fn test_plan_expand_variable_length() {
2159        let store = create_test_store();
2160        let planner = Planner::new(store);
2161
2162        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2163        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2164            items: vec![
2165                ReturnItem {
2166                    expression: LogicalExpression::Variable("a".to_string()),
2167                    alias: None,
2168                },
2169                ReturnItem {
2170                    expression: LogicalExpression::Variable("b".to_string()),
2171                    alias: None,
2172                },
2173            ],
2174            distinct: false,
2175            input: Box::new(LogicalOperator::Expand(ExpandOp {
2176                from_variable: "a".to_string(),
2177                to_variable: "b".to_string(),
2178                edge_variable: None,
2179                direction: ExpandDirection::Outgoing,
2180                edge_types: vec!["KNOWS".to_string()],
2181                min_hops: 1,
2182                max_hops: Some(3),
2183                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2184                    variable: "a".to_string(),
2185                    label: None,
2186                    input: None,
2187                })),
2188                path_alias: None,
2189                path_mode: PathMode::Walk,
2190            })),
2191        }));
2192
2193        let physical = planner.plan(&logical).unwrap();
2194        assert!(physical.columns().contains(&"a".to_string()));
2195        assert!(physical.columns().contains(&"b".to_string()));
2196    }
2197
2198    #[test]
2199    fn test_plan_expand_with_path_alias() {
2200        let store = create_test_store();
2201        let planner = Planner::new(store);
2202
2203        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2204        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2205            items: vec![
2206                ReturnItem {
2207                    expression: LogicalExpression::Variable("a".to_string()),
2208                    alias: None,
2209                },
2210                ReturnItem {
2211                    expression: LogicalExpression::Variable("b".to_string()),
2212                    alias: None,
2213                },
2214            ],
2215            distinct: false,
2216            input: Box::new(LogicalOperator::Expand(ExpandOp {
2217                from_variable: "a".to_string(),
2218                to_variable: "b".to_string(),
2219                edge_variable: None,
2220                direction: ExpandDirection::Outgoing,
2221                edge_types: vec!["KNOWS".to_string()],
2222                min_hops: 1,
2223                max_hops: Some(3),
2224                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2225                    variable: "a".to_string(),
2226                    label: None,
2227                    input: None,
2228                })),
2229                path_alias: Some("p".to_string()),
2230                path_mode: PathMode::Walk,
2231            })),
2232        }));
2233
2234        let physical = planner.plan(&logical).unwrap();
2235        // Verify plan was created successfully with expected output columns
2236        assert!(physical.columns().contains(&"a".to_string()));
2237        assert!(physical.columns().contains(&"b".to_string()));
2238    }
2239
2240    #[test]
2241    fn test_plan_expand_incoming() {
2242        let store = create_test_store();
2243        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2244            .with_factorized_execution(false);
2245
2246        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2247        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2248            items: vec![
2249                ReturnItem {
2250                    expression: LogicalExpression::Variable("a".to_string()),
2251                    alias: None,
2252                },
2253                ReturnItem {
2254                    expression: LogicalExpression::Variable("b".to_string()),
2255                    alias: None,
2256                },
2257            ],
2258            distinct: false,
2259            input: Box::new(LogicalOperator::Expand(ExpandOp {
2260                from_variable: "a".to_string(),
2261                to_variable: "b".to_string(),
2262                edge_variable: None,
2263                direction: ExpandDirection::Incoming,
2264                edge_types: vec!["KNOWS".to_string()],
2265                min_hops: 1,
2266                max_hops: Some(1),
2267                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2268                    variable: "a".to_string(),
2269                    label: None,
2270                    input: None,
2271                })),
2272                path_alias: None,
2273                path_mode: PathMode::Walk,
2274            })),
2275        }));
2276
2277        let physical = planner.plan(&logical).unwrap();
2278        assert!(physical.columns().contains(&"a".to_string()));
2279        assert!(physical.columns().contains(&"b".to_string()));
2280    }
2281
2282    #[test]
2283    fn test_plan_expand_both_directions() {
2284        let store = create_test_store();
2285        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2286            .with_factorized_execution(false);
2287
2288        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2289        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2290            items: vec![
2291                ReturnItem {
2292                    expression: LogicalExpression::Variable("a".to_string()),
2293                    alias: None,
2294                },
2295                ReturnItem {
2296                    expression: LogicalExpression::Variable("b".to_string()),
2297                    alias: None,
2298                },
2299            ],
2300            distinct: false,
2301            input: Box::new(LogicalOperator::Expand(ExpandOp {
2302                from_variable: "a".to_string(),
2303                to_variable: "b".to_string(),
2304                edge_variable: None,
2305                direction: ExpandDirection::Both,
2306                edge_types: vec!["KNOWS".to_string()],
2307                min_hops: 1,
2308                max_hops: Some(1),
2309                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2310                    variable: "a".to_string(),
2311                    label: None,
2312                    input: None,
2313                })),
2314                path_alias: None,
2315                path_mode: PathMode::Walk,
2316            })),
2317        }));
2318
2319        let physical = planner.plan(&logical).unwrap();
2320        assert!(physical.columns().contains(&"a".to_string()));
2321        assert!(physical.columns().contains(&"b".to_string()));
2322    }
2323
2324    // ==================== With Context Tests ====================
2325
2326    #[test]
2327    fn test_planner_with_context() {
2328        use crate::transaction::TransactionManager;
2329
2330        let store = create_test_store();
2331        let transaction_manager = Arc::new(TransactionManager::new());
2332        let transaction_id = transaction_manager.begin();
2333        let epoch = transaction_manager.current_epoch();
2334
2335        let planner = Planner::with_context(
2336            Arc::clone(&store) as Arc<dyn GraphStore>,
2337            Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2338            Arc::clone(&transaction_manager),
2339            Some(transaction_id),
2340            epoch,
2341        );
2342
2343        assert_eq!(planner.transaction_id(), Some(transaction_id));
2344        assert!(planner.transaction_manager().is_some());
2345        assert_eq!(planner.viewing_epoch(), epoch);
2346    }
2347
2348    #[test]
2349    fn test_planner_with_factorized_execution_disabled() {
2350        let store = create_test_store();
2351        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2352            .with_factorized_execution(false);
2353
2354        // Two consecutive expands - should NOT use factorized execution
2355        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2356            items: vec![
2357                ReturnItem {
2358                    expression: LogicalExpression::Variable("a".to_string()),
2359                    alias: None,
2360                },
2361                ReturnItem {
2362                    expression: LogicalExpression::Variable("c".to_string()),
2363                    alias: None,
2364                },
2365            ],
2366            distinct: false,
2367            input: Box::new(LogicalOperator::Expand(ExpandOp {
2368                from_variable: "b".to_string(),
2369                to_variable: "c".to_string(),
2370                edge_variable: None,
2371                direction: ExpandDirection::Outgoing,
2372                edge_types: vec![],
2373                min_hops: 1,
2374                max_hops: Some(1),
2375                input: Box::new(LogicalOperator::Expand(ExpandOp {
2376                    from_variable: "a".to_string(),
2377                    to_variable: "b".to_string(),
2378                    edge_variable: None,
2379                    direction: ExpandDirection::Outgoing,
2380                    edge_types: vec![],
2381                    min_hops: 1,
2382                    max_hops: Some(1),
2383                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2384                        variable: "a".to_string(),
2385                        label: None,
2386                        input: None,
2387                    })),
2388                    path_alias: None,
2389                    path_mode: PathMode::Walk,
2390                })),
2391                path_alias: None,
2392                path_mode: PathMode::Walk,
2393            })),
2394        }));
2395
2396        let physical = planner.plan(&logical).unwrap();
2397        assert!(physical.columns().contains(&"a".to_string()));
2398        assert!(physical.columns().contains(&"c".to_string()));
2399    }
2400
2401    // ==================== Sort with Property Tests ====================
2402
2403    #[test]
2404    fn test_plan_sort_by_property() {
2405        let store = create_test_store();
2406        let planner = Planner::new(store);
2407
2408        // MATCH (n) RETURN n ORDER BY n.name ASC
2409        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2410            items: vec![ReturnItem {
2411                expression: LogicalExpression::Variable("n".to_string()),
2412                alias: None,
2413            }],
2414            distinct: false,
2415            input: Box::new(LogicalOperator::Sort(SortOp {
2416                keys: vec![SortKey {
2417                    expression: LogicalExpression::Property {
2418                        variable: "n".to_string(),
2419                        property: "name".to_string(),
2420                    },
2421                    order: SortOrder::Ascending,
2422                    nulls: None,
2423                }],
2424                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2425                    variable: "n".to_string(),
2426                    label: None,
2427                    input: None,
2428                })),
2429            })),
2430        }));
2431
2432        let physical = planner.plan(&logical).unwrap();
2433        // Should have the property column projected
2434        assert!(physical.columns().contains(&"n".to_string()));
2435    }
2436
2437    // ==================== Scan with Input Tests ====================
2438
2439    #[test]
2440    fn test_plan_scan_with_input() {
2441        let store = create_test_store();
2442        let planner = Planner::new(store);
2443
2444        // A scan with another scan as input (for chained patterns)
2445        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2446            items: vec![
2447                ReturnItem {
2448                    expression: LogicalExpression::Variable("a".to_string()),
2449                    alias: None,
2450                },
2451                ReturnItem {
2452                    expression: LogicalExpression::Variable("b".to_string()),
2453                    alias: None,
2454                },
2455            ],
2456            distinct: false,
2457            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2458                variable: "b".to_string(),
2459                label: Some("Company".to_string()),
2460                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2461                    variable: "a".to_string(),
2462                    label: Some("Person".to_string()),
2463                    input: None,
2464                }))),
2465            })),
2466        }));
2467
2468        let physical = planner.plan(&logical).unwrap();
2469        assert!(physical.columns().contains(&"a".to_string()));
2470        assert!(physical.columns().contains(&"b".to_string()));
2471    }
2472}