Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34    DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35    ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36    FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43    VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53    convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57/// Range bounds for property-based range queries.
58struct RangeBounds<'a> {
59    min: Option<&'a Value>,
60    max: Option<&'a Value>,
61    min_inclusive: bool,
62    max_inclusive: bool,
63}
64
65/// Converts a logical plan to a physical operator tree for LPG stores.
66pub struct Planner {
67    /// The graph store (read-only operations).
68    pub(super) store: Arc<dyn GraphStore>,
69    /// Writable graph store (None for read-only databases).
70    pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
71    /// Transaction manager for MVCC operations.
72    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
73    /// Current transaction ID (if in a transaction).
74    pub(super) transaction_id: Option<TransactionId>,
75    /// Epoch to use for visibility checks.
76    pub(super) viewing_epoch: EpochId,
77    /// Counter for generating unique anonymous edge column names.
78    pub(super) anon_edge_counter: std::cell::Cell<u32>,
79    /// Whether to use factorized execution for multi-hop queries.
80    pub(super) factorized_execution: bool,
81    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
82    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
83    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84    /// Variables that hold edge IDs (from MATCH edge patterns).
85    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
86    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
87    /// Optional constraint validator for schema enforcement during mutations.
88    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
89    /// Catalog for user-defined procedure lookup.
90    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
91    /// Shared parameter state for the currently planning correlated Apply.
92    /// Set by `plan_apply` before planning the inner operator, consumed by
93    /// `plan_operator` when encountering `ParameterScan`.
94    pub(super) correlated_param_state:
95        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
96    /// Variables from variable-length expand patterns (group-list variables).
97    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
98    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
99    /// When true, each physical operator is wrapped in `ProfiledOperator`.
100    profiling: std::cell::Cell<bool>,
101    /// Profile entries collected during planning (post-order).
102    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
103    /// Optional write tracker for recording writes during mutations.
104    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
105    /// Session context for introspection functions (info, schema, current_schema, etc.).
106    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
107    /// When true, expand operators use epoch-only visibility (no MVCC version
108    /// chain walks).  Set when the plan contains no mutations, so PENDING
109    /// writes are impossible to observe.
110    pub(super) read_only: bool,
111}
112
113impl Planner {
114    /// Creates a new planner with the given store.
115    ///
116    /// This creates a planner without transaction context, using the current
117    /// epoch from the store for visibility.
118    #[must_use]
119    pub fn new(store: Arc<dyn GraphStore>) -> Self {
120        let epoch = store.current_epoch();
121        Self {
122            store,
123            write_store: None,
124            transaction_manager: None,
125            transaction_id: None,
126            viewing_epoch: epoch,
127            anon_edge_counter: std::cell::Cell::new(0),
128            factorized_execution: true,
129            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131            validator: None,
132            catalog: None,
133            correlated_param_state: std::cell::RefCell::new(None),
134            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
135            profiling: std::cell::Cell::new(false),
136            profile_entries: std::cell::RefCell::new(Vec::new()),
137            write_tracker: None,
138            session_context: grafeo_core::execution::operators::SessionContext::default(),
139            read_only: false,
140        }
141    }
142
143    /// Creates a new planner with transaction context for MVCC-aware planning.
144    #[must_use]
145    pub fn with_context(
146        store: Arc<dyn GraphStore>,
147        write_store: Option<Arc<dyn GraphStoreMut>>,
148        transaction_manager: Arc<TransactionManager>,
149        transaction_id: Option<TransactionId>,
150        viewing_epoch: EpochId,
151    ) -> Self {
152        use crate::transaction::TransactionWriteTracker;
153
154        // Create write tracker when there's an active transaction
155        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
156            if transaction_id.is_some() {
157                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
158                    &transaction_manager,
159                ))))
160            } else {
161                None
162            };
163
164        Self {
165            store,
166            write_store,
167            transaction_manager: Some(transaction_manager),
168            transaction_id,
169            viewing_epoch,
170            anon_edge_counter: std::cell::Cell::new(0),
171            factorized_execution: true,
172            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
173            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
174            validator: None,
175            catalog: None,
176            correlated_param_state: std::cell::RefCell::new(None),
177            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
178            profiling: std::cell::Cell::new(false),
179            profile_entries: std::cell::RefCell::new(Vec::new()),
180            write_tracker,
181            session_context: grafeo_core::execution::operators::SessionContext::default(),
182            read_only: false,
183        }
184    }
185
186    /// Marks this planner as planning a read-only query (no mutations),
187    /// enabling fast-path visibility checks in expand operators.
188    #[must_use]
189    pub fn with_read_only(mut self, read_only: bool) -> Self {
190        self.read_only = read_only;
191        self
192    }
193
194    /// Returns the writable store, or `TransactionError::ReadOnly` if unavailable.
195    fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
196        self.write_store
197            .as_ref()
198            .map(Arc::clone)
199            .ok_or(Error::Transaction(
200                grafeo_common::utils::error::TransactionError::ReadOnly,
201            ))
202    }
203
204    /// Returns the viewing epoch for this planner.
205    #[must_use]
206    pub fn viewing_epoch(&self) -> EpochId {
207        self.viewing_epoch
208    }
209
210    /// Returns the transaction ID for this planner, if any.
211    #[must_use]
212    pub fn transaction_id(&self) -> Option<TransactionId> {
213        self.transaction_id
214    }
215
216    /// Returns a reference to the transaction manager, if available.
217    #[must_use]
218    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
219        self.transaction_manager.as_ref()
220    }
221
222    /// Enables or disables factorized execution for multi-hop queries.
223    #[must_use]
224    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
225        self.factorized_execution = enabled;
226        self
227    }
228
229    /// Sets the constraint validator for schema enforcement during mutations.
230    #[must_use]
231    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
232        self.validator = Some(validator);
233        self
234    }
235
236    /// Sets the catalog for user-defined procedure lookup.
237    #[must_use]
238    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
239        self.catalog = Some(catalog);
240        self
241    }
242
243    /// Sets the session context for introspection functions.
244    #[must_use]
245    pub fn with_session_context(
246        mut self,
247        context: grafeo_core::execution::operators::SessionContext,
248    ) -> Self {
249        self.session_context = context;
250        self
251    }
252
253    /// Generates an edge column name from an expand's edge variable (or an
254    /// anonymous fallback) and registers it in `edge_columns` so downstream
255    /// RETURN emits `EdgeResolve` instead of `NodeResolve`.
256    ///
257    /// Returns the column name for the caller to push into its output columns.
258    pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
259        let name = edge_variable.clone().unwrap_or_else(|| {
260            let count = self.anon_edge_counter.get();
261            self.anon_edge_counter.set(count + 1);
262            format!("_anon_edge_{}", count)
263        });
264        self.edge_columns.borrow_mut().insert(name.clone());
265        name
266    }
267
268    /// Counts consecutive single-hop expand operations.
269    ///
270    /// Returns the count and the deepest non-expand operator (the base of the chain).
271    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
272        match op {
273            LogicalOperator::Expand(expand) => {
274                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
275
276                if is_single_hop {
277                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
278                    (inner_count + 1, base)
279                } else {
280                    (0, op)
281                }
282            }
283            _ => (0, op),
284        }
285    }
286
287    /// Collects expand operations from the outermost down to the base.
288    ///
289    /// Returns expands in order from innermost (base) to outermost.
290    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
291        let mut chain = Vec::new();
292        let mut current = op;
293
294        while let LogicalOperator::Expand(expand) = current {
295            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
296            if !is_single_hop {
297                break;
298            }
299            chain.push(expand);
300            current = &expand.input;
301        }
302
303        chain.reverse();
304        chain
305    }
306
307    /// Plans a logical plan into a physical operator.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if the logical plan contains unsupported operators
312    /// or invalid expressions.
313    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
314        let _span = grafeo_debug_span!("grafeo::query::plan");
315        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
316        Ok(PhysicalPlan {
317            operator,
318            columns,
319            adaptive_context: None,
320        })
321    }
322
323    /// Plans a logical plan with profiling: each physical operator is wrapped
324    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
325    /// collect row counts and timing. Returns the physical plan together with
326    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
327    /// items in post-order (children before parents).
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the logical plan contains unsupported operators
332    /// or invalid expressions.
333    pub fn plan_profiled(
334        &self,
335        logical_plan: &LogicalPlan,
336    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
337        self.profiling.set(true);
338        self.profile_entries.borrow_mut().clear();
339
340        let result = self.plan_operator(&logical_plan.root);
341
342        self.profiling.set(false);
343        let (operator, columns) = result?;
344        let entries = self.profile_entries.borrow_mut().drain(..).collect();
345
346        Ok((
347            PhysicalPlan {
348                operator,
349                columns,
350                adaptive_context: None,
351            },
352            entries,
353        ))
354    }
355
356    /// Plans a logical plan with adaptive execution support.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the logical plan contains unsupported operators
361    /// or invalid expressions.
362    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
363        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
364
365        let mut adaptive_context = AdaptiveContext::new();
366        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
367
368        Ok(PhysicalPlan {
369            operator,
370            columns,
371            adaptive_context: Some(adaptive_context),
372        })
373    }
374
375    /// Collects cardinality estimates from the logical plan into an adaptive context.
376    fn collect_cardinality_estimates(
377        &self,
378        op: &LogicalOperator,
379        ctx: &mut AdaptiveContext,
380        depth: usize,
381    ) {
382        match op {
383            LogicalOperator::NodeScan(scan) => {
384                let estimate = if let Some(label) = &scan.label {
385                    self.store.nodes_by_label(label).len() as f64
386                } else {
387                    self.store.node_count() as f64
388                };
389                let id = format!("scan_{}", scan.variable);
390                ctx.set_estimate(&id, estimate);
391
392                if let Some(input) = &scan.input {
393                    self.collect_cardinality_estimates(input, ctx, depth + 1);
394                }
395            }
396            LogicalOperator::Filter(filter) => {
397                let input_estimate = self.estimate_cardinality(&filter.input);
398                let estimate = input_estimate * 0.3;
399                let id = format!("filter_{depth}");
400                ctx.set_estimate(&id, estimate);
401
402                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
403            }
404            LogicalOperator::Expand(expand) => {
405                let input_estimate = self.estimate_cardinality(&expand.input);
406                let stats = self.store.statistics();
407                let avg_degree = self.estimate_expand_degree(&stats, expand);
408                let estimate = input_estimate * avg_degree;
409                let id = format!("expand_{}", expand.to_variable);
410                ctx.set_estimate(&id, estimate);
411
412                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
413            }
414            LogicalOperator::Join(join) => {
415                let left_est = self.estimate_cardinality(&join.left);
416                let right_est = self.estimate_cardinality(&join.right);
417                let estimate = (left_est * right_est).sqrt();
418                let id = format!("join_{depth}");
419                ctx.set_estimate(&id, estimate);
420
421                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
422                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
423            }
424            LogicalOperator::Aggregate(agg) => {
425                let input_estimate = self.estimate_cardinality(&agg.input);
426                let estimate = if agg.group_by.is_empty() {
427                    1.0
428                } else {
429                    (input_estimate * 0.1).max(1.0)
430                };
431                let id = format!("aggregate_{depth}");
432                ctx.set_estimate(&id, estimate);
433
434                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
435            }
436            LogicalOperator::Distinct(distinct) => {
437                let input_estimate = self.estimate_cardinality(&distinct.input);
438                let estimate = (input_estimate * 0.5).max(1.0);
439                let id = format!("distinct_{depth}");
440                ctx.set_estimate(&id, estimate);
441
442                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
443            }
444            LogicalOperator::Return(ret) => {
445                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
446            }
447            LogicalOperator::Limit(limit) => {
448                let input_estimate = self.estimate_cardinality(&limit.input);
449                let estimate = (input_estimate).min(limit.count.estimate());
450                let id = format!("limit_{depth}");
451                ctx.set_estimate(&id, estimate);
452
453                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
454            }
455            LogicalOperator::Skip(skip) => {
456                let input_estimate = self.estimate_cardinality(&skip.input);
457                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
458                let id = format!("skip_{depth}");
459                ctx.set_estimate(&id, estimate);
460
461                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
462            }
463            LogicalOperator::Sort(sort) => {
464                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
465            }
466            LogicalOperator::Union(union) => {
467                let estimate: f64 = union
468                    .inputs
469                    .iter()
470                    .map(|input| self.estimate_cardinality(input))
471                    .sum();
472                let id = format!("union_{depth}");
473                ctx.set_estimate(&id, estimate);
474
475                for input in &union.inputs {
476                    self.collect_cardinality_estimates(input, ctx, depth + 1);
477                }
478            }
479            _ => {
480                // For other operators, try to recurse into known input patterns
481            }
482        }
483    }
484
485    /// Estimates cardinality for a logical operator subtree.
486    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
487        match op {
488            LogicalOperator::NodeScan(scan) => {
489                if let Some(label) = &scan.label {
490                    self.store.nodes_by_label(label).len() as f64
491                } else {
492                    self.store.node_count() as f64
493                }
494            }
495            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
496            LogicalOperator::Expand(expand) => {
497                let stats = self.store.statistics();
498                let avg_degree = self.estimate_expand_degree(&stats, expand);
499                self.estimate_cardinality(&expand.input) * avg_degree
500            }
501            LogicalOperator::Join(join) => {
502                let left = self.estimate_cardinality(&join.left);
503                let right = self.estimate_cardinality(&join.right);
504                (left * right).sqrt()
505            }
506            LogicalOperator::Aggregate(agg) => {
507                if agg.group_by.is_empty() {
508                    1.0
509                } else {
510                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
511                }
512            }
513            LogicalOperator::Distinct(distinct) => {
514                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
515            }
516            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
517            LogicalOperator::Limit(limit) => self
518                .estimate_cardinality(&limit.input)
519                .min(limit.count.estimate()),
520            LogicalOperator::Skip(skip) => {
521                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
522            }
523            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
524            LogicalOperator::Union(union) => union
525                .inputs
526                .iter()
527                .map(|input| self.estimate_cardinality(input))
528                .sum(),
529            LogicalOperator::Except(except) => {
530                let left = self.estimate_cardinality(&except.left);
531                let right = self.estimate_cardinality(&except.right);
532                (left - right).max(0.0)
533            }
534            LogicalOperator::Intersect(intersect) => {
535                let left = self.estimate_cardinality(&intersect.left);
536                let right = self.estimate_cardinality(&intersect.right);
537                left.min(right)
538            }
539            LogicalOperator::Otherwise(otherwise) => self
540                .estimate_cardinality(&otherwise.left)
541                .max(self.estimate_cardinality(&otherwise.right)),
542            _ => 1000.0,
543        }
544    }
545
546    /// Estimates the average edge degree for an expand operation using store statistics.
547    fn estimate_expand_degree(
548        &self,
549        stats: &grafeo_core::statistics::Statistics,
550        expand: &ExpandOp,
551    ) -> f64 {
552        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
553        if expand.edge_types.len() == 1 {
554            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
555        } else if stats.total_nodes > 0 {
556            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
557        } else {
558            10.0
559        }
560    }
561
562    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
563    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
564    fn maybe_profile(
565        &self,
566        result: Result<(Box<dyn Operator>, Vec<String>)>,
567        op: &LogicalOperator,
568    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
569        if self.profiling.get() {
570            let (physical, columns) = result?;
571            let (entry, stats) =
572                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
573            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
574            self.profile_entries.borrow_mut().push(entry);
575            Ok((Box::new(profiled), columns))
576        } else {
577            result
578        }
579    }
580
581    /// Plans a single logical operator.
582    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
583        let result = match op {
584            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
585            LogicalOperator::Expand(expand) => {
586                if self.factorized_execution {
587                    let (chain_len, _base) = Self::count_expand_chain(op);
588                    if chain_len >= 2 {
589                        return self.maybe_profile(self.plan_expand_chain(op), op);
590                    }
591                }
592                self.plan_expand(expand)
593            }
594            LogicalOperator::Return(ret) => self.plan_return(ret),
595            LogicalOperator::Filter(filter) => self.plan_filter(filter),
596            LogicalOperator::Project(project) => self.plan_project(project),
597            LogicalOperator::Limit(limit) => self.plan_limit(limit),
598            LogicalOperator::Skip(skip) => self.plan_skip(skip),
599            LogicalOperator::Sort(sort) => self.plan_sort(sort),
600            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
601            LogicalOperator::Join(join) => self.plan_join(join),
602            LogicalOperator::Union(union) => self.plan_union(union),
603            LogicalOperator::Except(except) => self.plan_except(except),
604            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
605            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
606            LogicalOperator::Apply(apply) => self.plan_apply(apply),
607            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
608            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
609            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
610            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
611            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
612            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
613            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
614            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
615            LogicalOperator::Merge(merge) => self.plan_merge(merge),
616            LogicalOperator::MergeRelationship(merge_rel) => {
617                self.plan_merge_relationship(merge_rel)
618            }
619            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
620            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
621            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
622            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
623            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
624            #[cfg(feature = "algos")]
625            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
626            #[cfg(not(feature = "algos"))]
627            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
628                "CALL procedures require the 'algos' feature".to_string(),
629            )),
630            LogicalOperator::ParameterScan(_param_scan) => {
631                let state = self
632                    .correlated_param_state
633                    .borrow()
634                    .clone()
635                    .ok_or_else(|| {
636                        Error::Internal(
637                            "ParameterScan without correlated Apply context".to_string(),
638                        )
639                    })?;
640                // Use the actual column names from the ParameterState (which may
641                // have been expanded from "*" to real variable names in plan_apply)
642                let columns = state.columns.clone();
643                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
644                Ok((operator, columns))
645            }
646            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
647            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
648            LogicalOperator::LoadData(load) => {
649                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
650                    load.path.clone(),
651                    load.format,
652                    load.with_headers,
653                    load.field_terminator,
654                    load.variable.clone(),
655                ));
656                Ok((operator, vec![load.variable.clone()]))
657            }
658            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
659            LogicalOperator::VectorScan(_) => Err(Error::Internal(
660                "VectorScan requires vector-index feature".to_string(),
661            )),
662            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
663                "VectorJoin requires vector-index feature".to_string(),
664            )),
665            _ => Err(Error::Internal(format!(
666                "Unsupported operator: {:?}",
667                std::mem::discriminant(op)
668            ))),
669        };
670        self.maybe_profile(result, op)
671    }
672
673    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
674    fn plan_horizontal_aggregate(
675        &self,
676        ha: &HorizontalAggregateOp,
677    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
678        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
679
680        let list_col_idx = child_columns
681            .iter()
682            .position(|c| c == &ha.list_column)
683            .ok_or_else(|| {
684                Error::Internal(format!(
685                    "HorizontalAggregate list column '{}' not found in {:?}",
686                    ha.list_column, child_columns
687                ))
688            })?;
689
690        let entity_kind = match ha.entity_kind {
691            LogicalEntityKind::Edge => EntityKind::Edge,
692            LogicalEntityKind::Node => EntityKind::Node,
693        };
694
695        let function = convert_aggregate_function(ha.function);
696        let input_column_count = child_columns.len();
697
698        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
699            child_op,
700            list_col_idx,
701            entity_kind,
702            function,
703            ha.property.clone(),
704            Arc::clone(&self.store) as Arc<dyn GraphStore>,
705            input_column_count,
706        ));
707
708        let mut columns = child_columns;
709        columns.push(ha.alias.clone());
710        // Mark the result as a scalar column
711        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
712
713        Ok((operator, columns))
714    }
715
716    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
717    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
718        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
719        let key_idx = child_columns
720            .iter()
721            .position(|c| c == &mc.key_var)
722            .ok_or_else(|| {
723                Error::Internal(format!(
724                    "MapCollect key '{}' not in columns {:?}",
725                    mc.key_var, child_columns
726                ))
727            })?;
728        let value_idx = child_columns
729            .iter()
730            .position(|c| c == &mc.value_var)
731            .ok_or_else(|| {
732                Error::Internal(format!(
733                    "MapCollect value '{}' not in columns {:?}",
734                    mc.value_var, child_columns
735                ))
736            })?;
737        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
738        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
739        Ok((operator, vec![mc.alias.clone()]))
740    }
741}
742
743/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
744#[cfg(feature = "algos")]
745struct StaticResultOperator {
746    rows: Vec<Vec<Value>>,
747    column_indices: Vec<usize>,
748    row_index: usize,
749}
750
751#[cfg(feature = "algos")]
752impl Operator for StaticResultOperator {
753    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
754        use grafeo_core::execution::DataChunk;
755
756        if self.row_index >= self.rows.len() {
757            return Ok(None);
758        }
759
760        let remaining = self.rows.len() - self.row_index;
761        let chunk_rows = remaining.min(1024);
762        let col_count = self.column_indices.len();
763
764        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
765        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
766
767        for row_offset in 0..chunk_rows {
768            let row = &self.rows[self.row_index + row_offset];
769            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
770                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
771                if let Some(col) = chunk.column_mut(col_idx) {
772                    col.push_value(value);
773                }
774            }
775        }
776        chunk.set_count(chunk_rows);
777
778        self.row_index += chunk_rows;
779        Ok(Some(chunk))
780    }
781
782    fn reset(&mut self) {
783        self.row_index = 0;
784    }
785
786    fn name(&self) -> &'static str {
787        "StaticResult"
788    }
789}
790
791#[cfg(test)]
792mod tests {
793    use super::*;
794    use crate::query::plan::{
795        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
796        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
797        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
798        SkipOp as LogicalSkipOp, SortKey, SortOp,
799    };
800    use grafeo_common::types::Value;
801    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
802    use grafeo_core::graph::GraphStoreMut;
803    use grafeo_core::graph::lpg::LpgStore;
804
805    fn create_test_store() -> Arc<LpgStore> {
806        let store = Arc::new(LpgStore::new().unwrap());
807        store.create_node(&["Person"]);
808        store.create_node(&["Person"]);
809        store.create_node(&["Company"]);
810        store
811    }
812
813    // ==================== Simple Scan Tests ====================
814
815    #[test]
816    fn test_plan_simple_scan() {
817        let store = create_test_store();
818        let planner = Planner::new(store);
819
820        // MATCH (n:Person) RETURN n
821        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
822            items: vec![ReturnItem {
823                expression: LogicalExpression::Variable("n".to_string()),
824                alias: None,
825            }],
826            distinct: false,
827            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
828                variable: "n".to_string(),
829                label: Some("Person".to_string()),
830                input: None,
831            })),
832        }));
833
834        let physical = planner.plan(&logical).unwrap();
835        assert_eq!(physical.columns(), &["n"]);
836    }
837
838    #[test]
839    fn test_plan_scan_without_label() {
840        let store = create_test_store();
841        let planner = Planner::new(store);
842
843        // MATCH (n) RETURN n
844        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
845            items: vec![ReturnItem {
846                expression: LogicalExpression::Variable("n".to_string()),
847                alias: None,
848            }],
849            distinct: false,
850            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
851                variable: "n".to_string(),
852                label: None,
853                input: None,
854            })),
855        }));
856
857        let physical = planner.plan(&logical).unwrap();
858        assert_eq!(physical.columns(), &["n"]);
859    }
860
861    #[test]
862    fn test_plan_return_with_alias() {
863        let store = create_test_store();
864        let planner = Planner::new(store);
865
866        // MATCH (n:Person) RETURN n AS person
867        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
868            items: vec![ReturnItem {
869                expression: LogicalExpression::Variable("n".to_string()),
870                alias: Some("person".to_string()),
871            }],
872            distinct: false,
873            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
874                variable: "n".to_string(),
875                label: Some("Person".to_string()),
876                input: None,
877            })),
878        }));
879
880        let physical = planner.plan(&logical).unwrap();
881        assert_eq!(physical.columns(), &["person"]);
882    }
883
884    #[test]
885    fn test_plan_return_property() {
886        let store = create_test_store();
887        let planner = Planner::new(store);
888
889        // MATCH (n:Person) RETURN n.name
890        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
891            items: vec![ReturnItem {
892                expression: LogicalExpression::Property {
893                    variable: "n".to_string(),
894                    property: "name".to_string(),
895                },
896                alias: None,
897            }],
898            distinct: false,
899            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
900                variable: "n".to_string(),
901                label: Some("Person".to_string()),
902                input: None,
903            })),
904        }));
905
906        let physical = planner.plan(&logical).unwrap();
907        assert_eq!(physical.columns(), &["n.name"]);
908    }
909
910    #[test]
911    fn test_plan_return_literal() {
912        let store = create_test_store();
913        let planner = Planner::new(store);
914
915        // MATCH (n) RETURN 42 AS answer
916        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
917            items: vec![ReturnItem {
918                expression: LogicalExpression::Literal(Value::Int64(42)),
919                alias: Some("answer".to_string()),
920            }],
921            distinct: false,
922            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
923                variable: "n".to_string(),
924                label: None,
925                input: None,
926            })),
927        }));
928
929        let physical = planner.plan(&logical).unwrap();
930        assert_eq!(physical.columns(), &["answer"]);
931    }
932
933    // ==================== Filter Tests ====================
934
935    #[test]
936    fn test_plan_filter_equality() {
937        let store = create_test_store();
938        let planner = Planner::new(store);
939
940        // MATCH (n:Person) WHERE n.age = 30 RETURN n
941        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
942            items: vec![ReturnItem {
943                expression: LogicalExpression::Variable("n".to_string()),
944                alias: None,
945            }],
946            distinct: false,
947            input: Box::new(LogicalOperator::Filter(FilterOp {
948                predicate: LogicalExpression::Binary {
949                    left: Box::new(LogicalExpression::Property {
950                        variable: "n".to_string(),
951                        property: "age".to_string(),
952                    }),
953                    op: BinaryOp::Eq,
954                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
955                },
956                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
957                    variable: "n".to_string(),
958                    label: Some("Person".to_string()),
959                    input: None,
960                })),
961                pushdown_hint: None,
962            })),
963        }));
964
965        let physical = planner.plan(&logical).unwrap();
966        assert_eq!(physical.columns(), &["n"]);
967    }
968
969    #[test]
970    fn test_plan_filter_compound_and() {
971        let store = create_test_store();
972        let planner = Planner::new(store);
973
974        // WHERE n.age > 20 AND n.age < 40
975        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
976            items: vec![ReturnItem {
977                expression: LogicalExpression::Variable("n".to_string()),
978                alias: None,
979            }],
980            distinct: false,
981            input: Box::new(LogicalOperator::Filter(FilterOp {
982                predicate: LogicalExpression::Binary {
983                    left: Box::new(LogicalExpression::Binary {
984                        left: Box::new(LogicalExpression::Property {
985                            variable: "n".to_string(),
986                            property: "age".to_string(),
987                        }),
988                        op: BinaryOp::Gt,
989                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
990                    }),
991                    op: BinaryOp::And,
992                    right: Box::new(LogicalExpression::Binary {
993                        left: Box::new(LogicalExpression::Property {
994                            variable: "n".to_string(),
995                            property: "age".to_string(),
996                        }),
997                        op: BinaryOp::Lt,
998                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
999                    }),
1000                },
1001                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1002                    variable: "n".to_string(),
1003                    label: None,
1004                    input: None,
1005                })),
1006                pushdown_hint: None,
1007            })),
1008        }));
1009
1010        let physical = planner.plan(&logical).unwrap();
1011        assert_eq!(physical.columns(), &["n"]);
1012    }
1013
1014    #[test]
1015    fn test_plan_filter_unary_not() {
1016        let store = create_test_store();
1017        let planner = Planner::new(store);
1018
1019        // WHERE NOT n.active
1020        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1021            items: vec![ReturnItem {
1022                expression: LogicalExpression::Variable("n".to_string()),
1023                alias: None,
1024            }],
1025            distinct: false,
1026            input: Box::new(LogicalOperator::Filter(FilterOp {
1027                predicate: LogicalExpression::Unary {
1028                    op: UnaryOp::Not,
1029                    operand: Box::new(LogicalExpression::Property {
1030                        variable: "n".to_string(),
1031                        property: "active".to_string(),
1032                    }),
1033                },
1034                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1035                    variable: "n".to_string(),
1036                    label: None,
1037                    input: None,
1038                })),
1039                pushdown_hint: None,
1040            })),
1041        }));
1042
1043        let physical = planner.plan(&logical).unwrap();
1044        assert_eq!(physical.columns(), &["n"]);
1045    }
1046
1047    #[test]
1048    fn test_plan_filter_is_null() {
1049        let store = create_test_store();
1050        let planner = Planner::new(store);
1051
1052        // WHERE n.email IS NULL
1053        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1054            items: vec![ReturnItem {
1055                expression: LogicalExpression::Variable("n".to_string()),
1056                alias: None,
1057            }],
1058            distinct: false,
1059            input: Box::new(LogicalOperator::Filter(FilterOp {
1060                predicate: LogicalExpression::Unary {
1061                    op: UnaryOp::IsNull,
1062                    operand: Box::new(LogicalExpression::Property {
1063                        variable: "n".to_string(),
1064                        property: "email".to_string(),
1065                    }),
1066                },
1067                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1068                    variable: "n".to_string(),
1069                    label: None,
1070                    input: None,
1071                })),
1072                pushdown_hint: None,
1073            })),
1074        }));
1075
1076        let physical = planner.plan(&logical).unwrap();
1077        assert_eq!(physical.columns(), &["n"]);
1078    }
1079
1080    #[test]
1081    fn test_plan_filter_function_call() {
1082        let store = create_test_store();
1083        let planner = Planner::new(store);
1084
1085        // WHERE size(n.friends) > 0
1086        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1087            items: vec![ReturnItem {
1088                expression: LogicalExpression::Variable("n".to_string()),
1089                alias: None,
1090            }],
1091            distinct: false,
1092            input: Box::new(LogicalOperator::Filter(FilterOp {
1093                predicate: LogicalExpression::Binary {
1094                    left: Box::new(LogicalExpression::FunctionCall {
1095                        name: "size".to_string(),
1096                        args: vec![LogicalExpression::Property {
1097                            variable: "n".to_string(),
1098                            property: "friends".to_string(),
1099                        }],
1100                        distinct: false,
1101                    }),
1102                    op: BinaryOp::Gt,
1103                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1104                },
1105                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1106                    variable: "n".to_string(),
1107                    label: None,
1108                    input: None,
1109                })),
1110                pushdown_hint: None,
1111            })),
1112        }));
1113
1114        let physical = planner.plan(&logical).unwrap();
1115        assert_eq!(physical.columns(), &["n"]);
1116    }
1117
1118    // ==================== Expand Tests ====================
1119
1120    #[test]
1121    fn test_plan_expand_outgoing() {
1122        let store = create_test_store();
1123        let planner = Planner::new(store);
1124
1125        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1126        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1127            items: vec![
1128                ReturnItem {
1129                    expression: LogicalExpression::Variable("a".to_string()),
1130                    alias: None,
1131                },
1132                ReturnItem {
1133                    expression: LogicalExpression::Variable("b".to_string()),
1134                    alias: None,
1135                },
1136            ],
1137            distinct: false,
1138            input: Box::new(LogicalOperator::Expand(ExpandOp {
1139                from_variable: "a".to_string(),
1140                to_variable: "b".to_string(),
1141                edge_variable: None,
1142                direction: ExpandDirection::Outgoing,
1143                edge_types: vec!["KNOWS".to_string()],
1144                min_hops: 1,
1145                max_hops: Some(1),
1146                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1147                    variable: "a".to_string(),
1148                    label: Some("Person".to_string()),
1149                    input: None,
1150                })),
1151                path_alias: None,
1152                path_mode: PathMode::Walk,
1153            })),
1154        }));
1155
1156        let physical = planner.plan(&logical).unwrap();
1157        // The return should have columns [a, b]
1158        assert!(physical.columns().contains(&"a".to_string()));
1159        assert!(physical.columns().contains(&"b".to_string()));
1160    }
1161
1162    #[test]
1163    fn test_plan_expand_with_edge_variable() {
1164        let store = create_test_store();
1165        let planner = Planner::new(store);
1166
1167        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1168        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1169            items: vec![
1170                ReturnItem {
1171                    expression: LogicalExpression::Variable("a".to_string()),
1172                    alias: None,
1173                },
1174                ReturnItem {
1175                    expression: LogicalExpression::Variable("r".to_string()),
1176                    alias: None,
1177                },
1178                ReturnItem {
1179                    expression: LogicalExpression::Variable("b".to_string()),
1180                    alias: None,
1181                },
1182            ],
1183            distinct: false,
1184            input: Box::new(LogicalOperator::Expand(ExpandOp {
1185                from_variable: "a".to_string(),
1186                to_variable: "b".to_string(),
1187                edge_variable: Some("r".to_string()),
1188                direction: ExpandDirection::Outgoing,
1189                edge_types: vec!["KNOWS".to_string()],
1190                min_hops: 1,
1191                max_hops: Some(1),
1192                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1193                    variable: "a".to_string(),
1194                    label: None,
1195                    input: None,
1196                })),
1197                path_alias: None,
1198                path_mode: PathMode::Walk,
1199            })),
1200        }));
1201
1202        let physical = planner.plan(&logical).unwrap();
1203        assert!(physical.columns().contains(&"a".to_string()));
1204        assert!(physical.columns().contains(&"r".to_string()));
1205        assert!(physical.columns().contains(&"b".to_string()));
1206    }
1207
1208    // ==================== Limit/Skip/Sort Tests ====================
1209
1210    #[test]
1211    fn test_plan_limit() {
1212        let store = create_test_store();
1213        let planner = Planner::new(store);
1214
1215        // MATCH (n) RETURN n LIMIT 10
1216        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1217            items: vec![ReturnItem {
1218                expression: LogicalExpression::Variable("n".to_string()),
1219                alias: None,
1220            }],
1221            distinct: false,
1222            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1223                count: 10.into(),
1224                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1225                    variable: "n".to_string(),
1226                    label: None,
1227                    input: None,
1228                })),
1229            })),
1230        }));
1231
1232        let physical = planner.plan(&logical).unwrap();
1233        assert_eq!(physical.columns(), &["n"]);
1234    }
1235
1236    #[test]
1237    fn test_plan_skip() {
1238        let store = create_test_store();
1239        let planner = Planner::new(store);
1240
1241        // MATCH (n) RETURN n SKIP 5
1242        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1243            items: vec![ReturnItem {
1244                expression: LogicalExpression::Variable("n".to_string()),
1245                alias: None,
1246            }],
1247            distinct: false,
1248            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1249                count: 5.into(),
1250                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1251                    variable: "n".to_string(),
1252                    label: None,
1253                    input: None,
1254                })),
1255            })),
1256        }));
1257
1258        let physical = planner.plan(&logical).unwrap();
1259        assert_eq!(physical.columns(), &["n"]);
1260    }
1261
1262    #[test]
1263    fn test_plan_sort() {
1264        let store = create_test_store();
1265        let planner = Planner::new(store);
1266
1267        // MATCH (n) RETURN n ORDER BY n.name ASC
1268        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1269            items: vec![ReturnItem {
1270                expression: LogicalExpression::Variable("n".to_string()),
1271                alias: None,
1272            }],
1273            distinct: false,
1274            input: Box::new(LogicalOperator::Sort(SortOp {
1275                keys: vec![SortKey {
1276                    expression: LogicalExpression::Variable("n".to_string()),
1277                    order: SortOrder::Ascending,
1278                    nulls: None,
1279                }],
1280                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1281                    variable: "n".to_string(),
1282                    label: None,
1283                    input: None,
1284                })),
1285            })),
1286        }));
1287
1288        let physical = planner.plan(&logical).unwrap();
1289        assert_eq!(physical.columns(), &["n"]);
1290    }
1291
1292    #[test]
1293    fn test_plan_sort_descending() {
1294        let store = create_test_store();
1295        let planner = Planner::new(store);
1296
1297        // ORDER BY n DESC
1298        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1299            items: vec![ReturnItem {
1300                expression: LogicalExpression::Variable("n".to_string()),
1301                alias: None,
1302            }],
1303            distinct: false,
1304            input: Box::new(LogicalOperator::Sort(SortOp {
1305                keys: vec![SortKey {
1306                    expression: LogicalExpression::Variable("n".to_string()),
1307                    order: SortOrder::Descending,
1308                    nulls: None,
1309                }],
1310                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1311                    variable: "n".to_string(),
1312                    label: None,
1313                    input: None,
1314                })),
1315            })),
1316        }));
1317
1318        let physical = planner.plan(&logical).unwrap();
1319        assert_eq!(physical.columns(), &["n"]);
1320    }
1321
1322    #[test]
1323    fn test_plan_distinct() {
1324        let store = create_test_store();
1325        let planner = Planner::new(store);
1326
1327        // MATCH (n) RETURN DISTINCT n
1328        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1329            items: vec![ReturnItem {
1330                expression: LogicalExpression::Variable("n".to_string()),
1331                alias: None,
1332            }],
1333            distinct: false,
1334            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1335                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1336                    variable: "n".to_string(),
1337                    label: None,
1338                    input: None,
1339                })),
1340                columns: None,
1341            })),
1342        }));
1343
1344        let physical = planner.plan(&logical).unwrap();
1345        assert_eq!(physical.columns(), &["n"]);
1346    }
1347
1348    #[test]
1349    fn test_plan_distinct_with_columns() {
1350        let store = create_test_store();
1351        let planner = Planner::new(store);
1352
1353        // DISTINCT on specific columns (column-specific dedup)
1354        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1355            items: vec![ReturnItem {
1356                expression: LogicalExpression::Variable("n".to_string()),
1357                alias: None,
1358            }],
1359            distinct: false,
1360            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1361                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1362                    variable: "n".to_string(),
1363                    label: None,
1364                    input: None,
1365                })),
1366                columns: Some(vec!["n".to_string()]),
1367            })),
1368        }));
1369
1370        let physical = planner.plan(&logical).unwrap();
1371        assert_eq!(physical.columns(), &["n"]);
1372    }
1373
1374    #[test]
1375    fn test_plan_distinct_with_nonexistent_columns() {
1376        let store = create_test_store();
1377        let planner = Planner::new(store);
1378
1379        // When distinct columns don't match any output columns,
1380        // it falls back to full-row distinct.
1381        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1382            items: vec![ReturnItem {
1383                expression: LogicalExpression::Variable("n".to_string()),
1384                alias: None,
1385            }],
1386            distinct: false,
1387            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1388                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1389                    variable: "n".to_string(),
1390                    label: None,
1391                    input: None,
1392                })),
1393                columns: Some(vec!["nonexistent".to_string()]),
1394            })),
1395        }));
1396
1397        let physical = planner.plan(&logical).unwrap();
1398        assert_eq!(physical.columns(), &["n"]);
1399    }
1400
1401    // ==================== Aggregate Tests ====================
1402
1403    #[test]
1404    fn test_plan_aggregate_count() {
1405        let store = create_test_store();
1406        let planner = Planner::new(store);
1407
1408        // MATCH (n) RETURN count(n)
1409        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1410            items: vec![ReturnItem {
1411                expression: LogicalExpression::Variable("cnt".to_string()),
1412                alias: None,
1413            }],
1414            distinct: false,
1415            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1416                group_by: vec![],
1417                aggregates: vec![LogicalAggregateExpr {
1418                    function: LogicalAggregateFunction::Count,
1419                    expression: Some(LogicalExpression::Variable("n".to_string())),
1420                    expression2: None,
1421                    distinct: false,
1422                    alias: Some("cnt".to_string()),
1423                    percentile: None,
1424                    separator: None,
1425                }],
1426                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1427                    variable: "n".to_string(),
1428                    label: None,
1429                    input: None,
1430                })),
1431                having: None,
1432            })),
1433        }));
1434
1435        let physical = planner.plan(&logical).unwrap();
1436        assert!(physical.columns().contains(&"cnt".to_string()));
1437    }
1438
1439    #[test]
1440    fn test_plan_aggregate_with_group_by() {
1441        let store = create_test_store();
1442        let planner = Planner::new(store);
1443
1444        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1445        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1446            group_by: vec![LogicalExpression::Property {
1447                variable: "n".to_string(),
1448                property: "city".to_string(),
1449            }],
1450            aggregates: vec![LogicalAggregateExpr {
1451                function: LogicalAggregateFunction::Count,
1452                expression: Some(LogicalExpression::Variable("n".to_string())),
1453                expression2: None,
1454                distinct: false,
1455                alias: Some("cnt".to_string()),
1456                percentile: None,
1457                separator: None,
1458            }],
1459            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1460                variable: "n".to_string(),
1461                label: Some("Person".to_string()),
1462                input: None,
1463            })),
1464            having: None,
1465        }));
1466
1467        let physical = planner.plan(&logical).unwrap();
1468        assert_eq!(physical.columns().len(), 2);
1469    }
1470
1471    #[test]
1472    fn test_plan_aggregate_sum() {
1473        let store = create_test_store();
1474        let planner = Planner::new(store);
1475
1476        // SUM(n.value)
1477        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1478            group_by: vec![],
1479            aggregates: vec![LogicalAggregateExpr {
1480                function: LogicalAggregateFunction::Sum,
1481                expression: Some(LogicalExpression::Property {
1482                    variable: "n".to_string(),
1483                    property: "value".to_string(),
1484                }),
1485                expression2: None,
1486                distinct: false,
1487                alias: Some("total".to_string()),
1488                percentile: None,
1489                separator: None,
1490            }],
1491            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1492                variable: "n".to_string(),
1493                label: None,
1494                input: None,
1495            })),
1496            having: None,
1497        }));
1498
1499        let physical = planner.plan(&logical).unwrap();
1500        assert!(physical.columns().contains(&"total".to_string()));
1501    }
1502
1503    #[test]
1504    fn test_plan_aggregate_avg() {
1505        let store = create_test_store();
1506        let planner = Planner::new(store);
1507
1508        // AVG(n.score)
1509        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1510            group_by: vec![],
1511            aggregates: vec![LogicalAggregateExpr {
1512                function: LogicalAggregateFunction::Avg,
1513                expression: Some(LogicalExpression::Property {
1514                    variable: "n".to_string(),
1515                    property: "score".to_string(),
1516                }),
1517                expression2: None,
1518                distinct: false,
1519                alias: Some("average".to_string()),
1520                percentile: None,
1521                separator: None,
1522            }],
1523            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1524                variable: "n".to_string(),
1525                label: None,
1526                input: None,
1527            })),
1528            having: None,
1529        }));
1530
1531        let physical = planner.plan(&logical).unwrap();
1532        assert!(physical.columns().contains(&"average".to_string()));
1533    }
1534
1535    #[test]
1536    fn test_plan_aggregate_min_max() {
1537        let store = create_test_store();
1538        let planner = Planner::new(store);
1539
1540        // MIN(n.age), MAX(n.age)
1541        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1542            group_by: vec![],
1543            aggregates: vec![
1544                LogicalAggregateExpr {
1545                    function: LogicalAggregateFunction::Min,
1546                    expression: Some(LogicalExpression::Property {
1547                        variable: "n".to_string(),
1548                        property: "age".to_string(),
1549                    }),
1550                    expression2: None,
1551                    distinct: false,
1552                    alias: Some("youngest".to_string()),
1553                    percentile: None,
1554                    separator: None,
1555                },
1556                LogicalAggregateExpr {
1557                    function: LogicalAggregateFunction::Max,
1558                    expression: Some(LogicalExpression::Property {
1559                        variable: "n".to_string(),
1560                        property: "age".to_string(),
1561                    }),
1562                    expression2: None,
1563                    distinct: false,
1564                    alias: Some("oldest".to_string()),
1565                    percentile: None,
1566                    separator: None,
1567                },
1568            ],
1569            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1570                variable: "n".to_string(),
1571                label: None,
1572                input: None,
1573            })),
1574            having: None,
1575        }));
1576
1577        let physical = planner.plan(&logical).unwrap();
1578        assert!(physical.columns().contains(&"youngest".to_string()));
1579        assert!(physical.columns().contains(&"oldest".to_string()));
1580    }
1581
1582    // ==================== Join Tests ====================
1583
1584    #[test]
1585    fn test_plan_inner_join() {
1586        let store = create_test_store();
1587        let planner = Planner::new(store);
1588
1589        // Inner join between two scans
1590        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1591            items: vec![
1592                ReturnItem {
1593                    expression: LogicalExpression::Variable("a".to_string()),
1594                    alias: None,
1595                },
1596                ReturnItem {
1597                    expression: LogicalExpression::Variable("b".to_string()),
1598                    alias: None,
1599                },
1600            ],
1601            distinct: false,
1602            input: Box::new(LogicalOperator::Join(JoinOp {
1603                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1604                    variable: "a".to_string(),
1605                    label: Some("Person".to_string()),
1606                    input: None,
1607                })),
1608                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1609                    variable: "b".to_string(),
1610                    label: Some("Company".to_string()),
1611                    input: None,
1612                })),
1613                join_type: JoinType::Inner,
1614                conditions: vec![JoinCondition {
1615                    left: LogicalExpression::Variable("a".to_string()),
1616                    right: LogicalExpression::Variable("b".to_string()),
1617                }],
1618            })),
1619        }));
1620
1621        let physical = planner.plan(&logical).unwrap();
1622        assert!(physical.columns().contains(&"a".to_string()));
1623        assert!(physical.columns().contains(&"b".to_string()));
1624    }
1625
1626    #[test]
1627    fn test_plan_cross_join() {
1628        let store = create_test_store();
1629        let planner = Planner::new(store);
1630
1631        // Cross join (no conditions)
1632        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1633            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1634                variable: "a".to_string(),
1635                label: None,
1636                input: None,
1637            })),
1638            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1639                variable: "b".to_string(),
1640                label: None,
1641                input: None,
1642            })),
1643            join_type: JoinType::Cross,
1644            conditions: vec![],
1645        }));
1646
1647        let physical = planner.plan(&logical).unwrap();
1648        assert_eq!(physical.columns().len(), 2);
1649    }
1650
1651    #[test]
1652    fn test_plan_left_join() {
1653        let store = create_test_store();
1654        let planner = Planner::new(store);
1655
1656        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1657            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1658                variable: "a".to_string(),
1659                label: None,
1660                input: None,
1661            })),
1662            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1663                variable: "b".to_string(),
1664                label: None,
1665                input: None,
1666            })),
1667            join_type: JoinType::Left,
1668            conditions: vec![],
1669        }));
1670
1671        let physical = planner.plan(&logical).unwrap();
1672        assert_eq!(physical.columns().len(), 2);
1673    }
1674
1675    // ==================== Mutation Tests ====================
1676
1677    fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1678        let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStore>);
1679        p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1680        p
1681    }
1682
1683    #[test]
1684    fn test_plan_create_node() {
1685        let store = create_test_store();
1686        let planner = create_writable_planner(&store);
1687
1688        // CREATE (n:Person {name: 'Alix'})
1689        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1690            variable: "n".to_string(),
1691            labels: vec!["Person".to_string()],
1692            properties: vec![(
1693                "name".to_string(),
1694                LogicalExpression::Literal(Value::String("Alix".into())),
1695            )],
1696            input: None,
1697        }));
1698
1699        let physical = planner.plan(&logical).unwrap();
1700        assert!(physical.columns().contains(&"n".to_string()));
1701    }
1702
1703    #[test]
1704    fn test_plan_create_edge() {
1705        let store = create_test_store();
1706        let planner = create_writable_planner(&store);
1707
1708        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1709        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1710            variable: Some("r".to_string()),
1711            from_variable: "a".to_string(),
1712            to_variable: "b".to_string(),
1713            edge_type: "KNOWS".to_string(),
1714            properties: vec![],
1715            input: Box::new(LogicalOperator::Join(JoinOp {
1716                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1717                    variable: "a".to_string(),
1718                    label: None,
1719                    input: None,
1720                })),
1721                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1722                    variable: "b".to_string(),
1723                    label: None,
1724                    input: None,
1725                })),
1726                join_type: JoinType::Cross,
1727                conditions: vec![],
1728            })),
1729        }));
1730
1731        let physical = planner.plan(&logical).unwrap();
1732        assert!(physical.columns().contains(&"r".to_string()));
1733    }
1734
1735    #[test]
1736    fn test_plan_delete_node() {
1737        let store = create_test_store();
1738        let planner = create_writable_planner(&store);
1739
1740        // MATCH (n) DELETE n
1741        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1742            variable: "n".to_string(),
1743            detach: false,
1744            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1745                variable: "n".to_string(),
1746                label: None,
1747                input: None,
1748            })),
1749        }));
1750
1751        let physical = planner.plan(&logical).unwrap();
1752        assert!(physical.columns().contains(&"n".to_string()));
1753    }
1754
1755    // ==================== Error Cases ====================
1756
1757    #[test]
1758    fn test_plan_empty_errors() {
1759        let store = create_test_store();
1760        let planner = Planner::new(store);
1761
1762        let logical = LogicalPlan::new(LogicalOperator::Empty);
1763        let result = planner.plan(&logical);
1764        assert!(result.is_err());
1765    }
1766
1767    #[test]
1768    fn test_plan_missing_variable_in_return() {
1769        let store = create_test_store();
1770        let planner = Planner::new(store);
1771
1772        // Return variable that doesn't exist in input
1773        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1774            items: vec![ReturnItem {
1775                expression: LogicalExpression::Variable("missing".to_string()),
1776                alias: None,
1777            }],
1778            distinct: false,
1779            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1780                variable: "n".to_string(),
1781                label: None,
1782                input: None,
1783            })),
1784        }));
1785
1786        let result = planner.plan(&logical);
1787        assert!(result.is_err());
1788    }
1789
1790    // ==================== Helper Function Tests ====================
1791
1792    #[test]
1793    fn test_convert_binary_ops() {
1794        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1795        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1796        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1797        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1798        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1799        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1800        assert!(convert_binary_op(BinaryOp::And).is_ok());
1801        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1802        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1803        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1804        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1805        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1806    }
1807
1808    #[test]
1809    fn test_convert_unary_ops() {
1810        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1811        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1812        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1813        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1814    }
1815
1816    #[test]
1817    fn test_convert_aggregate_functions() {
1818        assert!(matches!(
1819            convert_aggregate_function(LogicalAggregateFunction::Count),
1820            PhysicalAggregateFunction::Count
1821        ));
1822        assert!(matches!(
1823            convert_aggregate_function(LogicalAggregateFunction::Sum),
1824            PhysicalAggregateFunction::Sum
1825        ));
1826        assert!(matches!(
1827            convert_aggregate_function(LogicalAggregateFunction::Avg),
1828            PhysicalAggregateFunction::Avg
1829        ));
1830        assert!(matches!(
1831            convert_aggregate_function(LogicalAggregateFunction::Min),
1832            PhysicalAggregateFunction::Min
1833        ));
1834        assert!(matches!(
1835            convert_aggregate_function(LogicalAggregateFunction::Max),
1836            PhysicalAggregateFunction::Max
1837        ));
1838    }
1839
1840    #[test]
1841    fn test_planner_accessors() {
1842        let store = create_test_store();
1843        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1844
1845        assert!(planner.transaction_id().is_none());
1846        assert!(planner.transaction_manager().is_none());
1847        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1848    }
1849
1850    #[test]
1851    fn test_physical_plan_accessors() {
1852        let store = create_test_store();
1853        let planner = Planner::new(store);
1854
1855        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1856            variable: "n".to_string(),
1857            label: None,
1858            input: None,
1859        }));
1860
1861        let physical = planner.plan(&logical).unwrap();
1862        assert_eq!(physical.columns(), &["n"]);
1863
1864        // Test into_operator
1865        let _ = physical.into_operator();
1866    }
1867
1868    // ==================== Adaptive Planning Tests ====================
1869
1870    #[test]
1871    fn test_plan_adaptive_with_scan() {
1872        let store = create_test_store();
1873        let planner = Planner::new(store);
1874
1875        // MATCH (n:Person) RETURN n
1876        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1877            items: vec![ReturnItem {
1878                expression: LogicalExpression::Variable("n".to_string()),
1879                alias: None,
1880            }],
1881            distinct: false,
1882            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1883                variable: "n".to_string(),
1884                label: Some("Person".to_string()),
1885                input: None,
1886            })),
1887        }));
1888
1889        let physical = planner.plan_adaptive(&logical).unwrap();
1890        assert_eq!(physical.columns(), &["n"]);
1891        // Should have adaptive context with estimates
1892        assert!(physical.adaptive_context.is_some());
1893    }
1894
1895    #[test]
1896    fn test_plan_adaptive_with_filter() {
1897        let store = create_test_store();
1898        let planner = Planner::new(store);
1899
1900        // MATCH (n) WHERE n.age > 30 RETURN n
1901        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1902            items: vec![ReturnItem {
1903                expression: LogicalExpression::Variable("n".to_string()),
1904                alias: None,
1905            }],
1906            distinct: false,
1907            input: Box::new(LogicalOperator::Filter(FilterOp {
1908                predicate: LogicalExpression::Binary {
1909                    left: Box::new(LogicalExpression::Property {
1910                        variable: "n".to_string(),
1911                        property: "age".to_string(),
1912                    }),
1913                    op: BinaryOp::Gt,
1914                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1915                },
1916                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1917                    variable: "n".to_string(),
1918                    label: None,
1919                    input: None,
1920                })),
1921                pushdown_hint: None,
1922            })),
1923        }));
1924
1925        let physical = planner.plan_adaptive(&logical).unwrap();
1926        assert!(physical.adaptive_context.is_some());
1927    }
1928
1929    #[test]
1930    fn test_plan_adaptive_with_expand() {
1931        let store = create_test_store();
1932        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
1933            .with_factorized_execution(false);
1934
1935        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1936        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1937            items: vec![
1938                ReturnItem {
1939                    expression: LogicalExpression::Variable("a".to_string()),
1940                    alias: None,
1941                },
1942                ReturnItem {
1943                    expression: LogicalExpression::Variable("b".to_string()),
1944                    alias: None,
1945                },
1946            ],
1947            distinct: false,
1948            input: Box::new(LogicalOperator::Expand(ExpandOp {
1949                from_variable: "a".to_string(),
1950                to_variable: "b".to_string(),
1951                edge_variable: None,
1952                direction: ExpandDirection::Outgoing,
1953                edge_types: vec!["KNOWS".to_string()],
1954                min_hops: 1,
1955                max_hops: Some(1),
1956                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1957                    variable: "a".to_string(),
1958                    label: None,
1959                    input: None,
1960                })),
1961                path_alias: None,
1962                path_mode: PathMode::Walk,
1963            })),
1964        }));
1965
1966        let physical = planner.plan_adaptive(&logical).unwrap();
1967        assert!(physical.adaptive_context.is_some());
1968    }
1969
1970    #[test]
1971    fn test_plan_adaptive_with_join() {
1972        let store = create_test_store();
1973        let planner = Planner::new(store);
1974
1975        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1976            items: vec![
1977                ReturnItem {
1978                    expression: LogicalExpression::Variable("a".to_string()),
1979                    alias: None,
1980                },
1981                ReturnItem {
1982                    expression: LogicalExpression::Variable("b".to_string()),
1983                    alias: None,
1984                },
1985            ],
1986            distinct: false,
1987            input: Box::new(LogicalOperator::Join(JoinOp {
1988                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1989                    variable: "a".to_string(),
1990                    label: None,
1991                    input: None,
1992                })),
1993                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1994                    variable: "b".to_string(),
1995                    label: None,
1996                    input: None,
1997                })),
1998                join_type: JoinType::Cross,
1999                conditions: vec![],
2000            })),
2001        }));
2002
2003        let physical = planner.plan_adaptive(&logical).unwrap();
2004        assert!(physical.adaptive_context.is_some());
2005    }
2006
2007    #[test]
2008    fn test_plan_adaptive_with_aggregate() {
2009        let store = create_test_store();
2010        let planner = Planner::new(store);
2011
2012        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2013            group_by: vec![],
2014            aggregates: vec![LogicalAggregateExpr {
2015                function: LogicalAggregateFunction::Count,
2016                expression: Some(LogicalExpression::Variable("n".to_string())),
2017                expression2: None,
2018                distinct: false,
2019                alias: Some("cnt".to_string()),
2020                percentile: None,
2021                separator: None,
2022            }],
2023            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2024                variable: "n".to_string(),
2025                label: None,
2026                input: None,
2027            })),
2028            having: None,
2029        }));
2030
2031        let physical = planner.plan_adaptive(&logical).unwrap();
2032        assert!(physical.adaptive_context.is_some());
2033    }
2034
2035    #[test]
2036    fn test_plan_adaptive_with_distinct() {
2037        let store = create_test_store();
2038        let planner = Planner::new(store);
2039
2040        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2041            items: vec![ReturnItem {
2042                expression: LogicalExpression::Variable("n".to_string()),
2043                alias: None,
2044            }],
2045            distinct: false,
2046            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2047                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2048                    variable: "n".to_string(),
2049                    label: None,
2050                    input: None,
2051                })),
2052                columns: None,
2053            })),
2054        }));
2055
2056        let physical = planner.plan_adaptive(&logical).unwrap();
2057        assert!(physical.adaptive_context.is_some());
2058    }
2059
2060    #[test]
2061    fn test_plan_adaptive_with_limit() {
2062        let store = create_test_store();
2063        let planner = Planner::new(store);
2064
2065        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2066            items: vec![ReturnItem {
2067                expression: LogicalExpression::Variable("n".to_string()),
2068                alias: None,
2069            }],
2070            distinct: false,
2071            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2072                count: 10.into(),
2073                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2074                    variable: "n".to_string(),
2075                    label: None,
2076                    input: None,
2077                })),
2078            })),
2079        }));
2080
2081        let physical = planner.plan_adaptive(&logical).unwrap();
2082        assert!(physical.adaptive_context.is_some());
2083    }
2084
2085    #[test]
2086    fn test_plan_adaptive_with_skip() {
2087        let store = create_test_store();
2088        let planner = Planner::new(store);
2089
2090        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2091            items: vec![ReturnItem {
2092                expression: LogicalExpression::Variable("n".to_string()),
2093                alias: None,
2094            }],
2095            distinct: false,
2096            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2097                count: 5.into(),
2098                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2099                    variable: "n".to_string(),
2100                    label: None,
2101                    input: None,
2102                })),
2103            })),
2104        }));
2105
2106        let physical = planner.plan_adaptive(&logical).unwrap();
2107        assert!(physical.adaptive_context.is_some());
2108    }
2109
2110    #[test]
2111    fn test_plan_adaptive_with_sort() {
2112        let store = create_test_store();
2113        let planner = Planner::new(store);
2114
2115        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2116            items: vec![ReturnItem {
2117                expression: LogicalExpression::Variable("n".to_string()),
2118                alias: None,
2119            }],
2120            distinct: false,
2121            input: Box::new(LogicalOperator::Sort(SortOp {
2122                keys: vec![SortKey {
2123                    expression: LogicalExpression::Variable("n".to_string()),
2124                    order: SortOrder::Ascending,
2125                    nulls: None,
2126                }],
2127                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2128                    variable: "n".to_string(),
2129                    label: None,
2130                    input: None,
2131                })),
2132            })),
2133        }));
2134
2135        let physical = planner.plan_adaptive(&logical).unwrap();
2136        assert!(physical.adaptive_context.is_some());
2137    }
2138
2139    #[test]
2140    fn test_plan_adaptive_with_union() {
2141        let store = create_test_store();
2142        let planner = Planner::new(store);
2143
2144        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2145            items: vec![ReturnItem {
2146                expression: LogicalExpression::Variable("n".to_string()),
2147                alias: None,
2148            }],
2149            distinct: false,
2150            input: Box::new(LogicalOperator::Union(UnionOp {
2151                inputs: vec![
2152                    LogicalOperator::NodeScan(NodeScanOp {
2153                        variable: "n".to_string(),
2154                        label: Some("Person".to_string()),
2155                        input: None,
2156                    }),
2157                    LogicalOperator::NodeScan(NodeScanOp {
2158                        variable: "n".to_string(),
2159                        label: Some("Company".to_string()),
2160                        input: None,
2161                    }),
2162                ],
2163            })),
2164        }));
2165
2166        let physical = planner.plan_adaptive(&logical).unwrap();
2167        assert!(physical.adaptive_context.is_some());
2168    }
2169
2170    // ==================== Variable Length Path Tests ====================
2171
2172    #[test]
2173    fn test_plan_expand_variable_length() {
2174        let store = create_test_store();
2175        let planner = Planner::new(store);
2176
2177        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2178        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2179            items: vec![
2180                ReturnItem {
2181                    expression: LogicalExpression::Variable("a".to_string()),
2182                    alias: None,
2183                },
2184                ReturnItem {
2185                    expression: LogicalExpression::Variable("b".to_string()),
2186                    alias: None,
2187                },
2188            ],
2189            distinct: false,
2190            input: Box::new(LogicalOperator::Expand(ExpandOp {
2191                from_variable: "a".to_string(),
2192                to_variable: "b".to_string(),
2193                edge_variable: None,
2194                direction: ExpandDirection::Outgoing,
2195                edge_types: vec!["KNOWS".to_string()],
2196                min_hops: 1,
2197                max_hops: Some(3),
2198                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2199                    variable: "a".to_string(),
2200                    label: None,
2201                    input: None,
2202                })),
2203                path_alias: None,
2204                path_mode: PathMode::Walk,
2205            })),
2206        }));
2207
2208        let physical = planner.plan(&logical).unwrap();
2209        assert!(physical.columns().contains(&"a".to_string()));
2210        assert!(physical.columns().contains(&"b".to_string()));
2211    }
2212
2213    #[test]
2214    fn test_plan_expand_with_path_alias() {
2215        let store = create_test_store();
2216        let planner = Planner::new(store);
2217
2218        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2219        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2220            items: vec![
2221                ReturnItem {
2222                    expression: LogicalExpression::Variable("a".to_string()),
2223                    alias: None,
2224                },
2225                ReturnItem {
2226                    expression: LogicalExpression::Variable("b".to_string()),
2227                    alias: None,
2228                },
2229            ],
2230            distinct: false,
2231            input: Box::new(LogicalOperator::Expand(ExpandOp {
2232                from_variable: "a".to_string(),
2233                to_variable: "b".to_string(),
2234                edge_variable: None,
2235                direction: ExpandDirection::Outgoing,
2236                edge_types: vec!["KNOWS".to_string()],
2237                min_hops: 1,
2238                max_hops: Some(3),
2239                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2240                    variable: "a".to_string(),
2241                    label: None,
2242                    input: None,
2243                })),
2244                path_alias: Some("p".to_string()),
2245                path_mode: PathMode::Walk,
2246            })),
2247        }));
2248
2249        let physical = planner.plan(&logical).unwrap();
2250        // Verify plan was created successfully with expected output columns
2251        assert!(physical.columns().contains(&"a".to_string()));
2252        assert!(physical.columns().contains(&"b".to_string()));
2253    }
2254
2255    #[test]
2256    fn test_plan_expand_incoming() {
2257        let store = create_test_store();
2258        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2259            .with_factorized_execution(false);
2260
2261        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2262        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2263            items: vec![
2264                ReturnItem {
2265                    expression: LogicalExpression::Variable("a".to_string()),
2266                    alias: None,
2267                },
2268                ReturnItem {
2269                    expression: LogicalExpression::Variable("b".to_string()),
2270                    alias: None,
2271                },
2272            ],
2273            distinct: false,
2274            input: Box::new(LogicalOperator::Expand(ExpandOp {
2275                from_variable: "a".to_string(),
2276                to_variable: "b".to_string(),
2277                edge_variable: None,
2278                direction: ExpandDirection::Incoming,
2279                edge_types: vec!["KNOWS".to_string()],
2280                min_hops: 1,
2281                max_hops: Some(1),
2282                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2283                    variable: "a".to_string(),
2284                    label: None,
2285                    input: None,
2286                })),
2287                path_alias: None,
2288                path_mode: PathMode::Walk,
2289            })),
2290        }));
2291
2292        let physical = planner.plan(&logical).unwrap();
2293        assert!(physical.columns().contains(&"a".to_string()));
2294        assert!(physical.columns().contains(&"b".to_string()));
2295    }
2296
2297    #[test]
2298    fn test_plan_expand_both_directions() {
2299        let store = create_test_store();
2300        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2301            .with_factorized_execution(false);
2302
2303        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2304        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2305            items: vec![
2306                ReturnItem {
2307                    expression: LogicalExpression::Variable("a".to_string()),
2308                    alias: None,
2309                },
2310                ReturnItem {
2311                    expression: LogicalExpression::Variable("b".to_string()),
2312                    alias: None,
2313                },
2314            ],
2315            distinct: false,
2316            input: Box::new(LogicalOperator::Expand(ExpandOp {
2317                from_variable: "a".to_string(),
2318                to_variable: "b".to_string(),
2319                edge_variable: None,
2320                direction: ExpandDirection::Both,
2321                edge_types: vec!["KNOWS".to_string()],
2322                min_hops: 1,
2323                max_hops: Some(1),
2324                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2325                    variable: "a".to_string(),
2326                    label: None,
2327                    input: None,
2328                })),
2329                path_alias: None,
2330                path_mode: PathMode::Walk,
2331            })),
2332        }));
2333
2334        let physical = planner.plan(&logical).unwrap();
2335        assert!(physical.columns().contains(&"a".to_string()));
2336        assert!(physical.columns().contains(&"b".to_string()));
2337    }
2338
2339    // ==================== With Context Tests ====================
2340
2341    #[test]
2342    fn test_planner_with_context() {
2343        use crate::transaction::TransactionManager;
2344
2345        let store = create_test_store();
2346        let transaction_manager = Arc::new(TransactionManager::new());
2347        let transaction_id = transaction_manager.begin();
2348        let epoch = transaction_manager.current_epoch();
2349
2350        let planner = Planner::with_context(
2351            Arc::clone(&store) as Arc<dyn GraphStore>,
2352            Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2353            Arc::clone(&transaction_manager),
2354            Some(transaction_id),
2355            epoch,
2356        );
2357
2358        assert_eq!(planner.transaction_id(), Some(transaction_id));
2359        assert!(planner.transaction_manager().is_some());
2360        assert_eq!(planner.viewing_epoch(), epoch);
2361    }
2362
2363    #[test]
2364    fn test_planner_with_factorized_execution_disabled() {
2365        let store = create_test_store();
2366        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2367            .with_factorized_execution(false);
2368
2369        // Two consecutive expands - should NOT use factorized execution
2370        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2371            items: vec![
2372                ReturnItem {
2373                    expression: LogicalExpression::Variable("a".to_string()),
2374                    alias: None,
2375                },
2376                ReturnItem {
2377                    expression: LogicalExpression::Variable("c".to_string()),
2378                    alias: None,
2379                },
2380            ],
2381            distinct: false,
2382            input: Box::new(LogicalOperator::Expand(ExpandOp {
2383                from_variable: "b".to_string(),
2384                to_variable: "c".to_string(),
2385                edge_variable: None,
2386                direction: ExpandDirection::Outgoing,
2387                edge_types: vec![],
2388                min_hops: 1,
2389                max_hops: Some(1),
2390                input: Box::new(LogicalOperator::Expand(ExpandOp {
2391                    from_variable: "a".to_string(),
2392                    to_variable: "b".to_string(),
2393                    edge_variable: None,
2394                    direction: ExpandDirection::Outgoing,
2395                    edge_types: vec![],
2396                    min_hops: 1,
2397                    max_hops: Some(1),
2398                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2399                        variable: "a".to_string(),
2400                        label: None,
2401                        input: None,
2402                    })),
2403                    path_alias: None,
2404                    path_mode: PathMode::Walk,
2405                })),
2406                path_alias: None,
2407                path_mode: PathMode::Walk,
2408            })),
2409        }));
2410
2411        let physical = planner.plan(&logical).unwrap();
2412        assert!(physical.columns().contains(&"a".to_string()));
2413        assert!(physical.columns().contains(&"c".to_string()));
2414    }
2415
2416    // ==================== Sort with Property Tests ====================
2417
2418    #[test]
2419    fn test_plan_sort_by_property() {
2420        let store = create_test_store();
2421        let planner = Planner::new(store);
2422
2423        // MATCH (n) RETURN n ORDER BY n.name ASC
2424        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2425            items: vec![ReturnItem {
2426                expression: LogicalExpression::Variable("n".to_string()),
2427                alias: None,
2428            }],
2429            distinct: false,
2430            input: Box::new(LogicalOperator::Sort(SortOp {
2431                keys: vec![SortKey {
2432                    expression: LogicalExpression::Property {
2433                        variable: "n".to_string(),
2434                        property: "name".to_string(),
2435                    },
2436                    order: SortOrder::Ascending,
2437                    nulls: None,
2438                }],
2439                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2440                    variable: "n".to_string(),
2441                    label: None,
2442                    input: None,
2443                })),
2444            })),
2445        }));
2446
2447        let physical = planner.plan(&logical).unwrap();
2448        // Should have the property column projected
2449        assert!(physical.columns().contains(&"n".to_string()));
2450    }
2451
2452    // ==================== Scan with Input Tests ====================
2453
2454    #[test]
2455    fn test_plan_scan_with_input() {
2456        let store = create_test_store();
2457        let planner = Planner::new(store);
2458
2459        // A scan with another scan as input (for chained patterns)
2460        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2461            items: vec![
2462                ReturnItem {
2463                    expression: LogicalExpression::Variable("a".to_string()),
2464                    alias: None,
2465                },
2466                ReturnItem {
2467                    expression: LogicalExpression::Variable("b".to_string()),
2468                    alias: None,
2469                },
2470            ],
2471            distinct: false,
2472            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2473                variable: "b".to_string(),
2474                label: Some("Company".to_string()),
2475                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2476                    variable: "a".to_string(),
2477                    label: Some("Person".to_string()),
2478                    input: None,
2479                }))),
2480            })),
2481        }));
2482
2483        let physical = planner.plan(&logical).unwrap();
2484        assert!(physical.columns().contains(&"a".to_string()));
2485        assert!(physical.columns().contains(&"b".to_string()));
2486    }
2487}