Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34    DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35    ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36    FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43    VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53    convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57/// Range bounds for property-based range queries.
58struct RangeBounds<'a> {
59    min: Option<&'a Value>,
60    max: Option<&'a Value>,
61    min_inclusive: bool,
62    max_inclusive: bool,
63}
64
65/// Converts a logical plan to a physical operator tree for LPG stores.
66pub struct Planner {
67    /// The graph store (read-only operations).
68    pub(super) store: Arc<dyn GraphStore>,
69    /// Writable graph store (None for read-only databases).
70    pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
71    /// Transaction manager for MVCC operations.
72    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
73    /// Current transaction ID (if in a transaction).
74    pub(super) transaction_id: Option<TransactionId>,
75    /// Epoch to use for visibility checks.
76    pub(super) viewing_epoch: EpochId,
77    /// Counter for generating unique anonymous edge column names.
78    pub(super) anon_edge_counter: std::cell::Cell<u32>,
79    /// Whether to use factorized execution for multi-hop queries.
80    pub(super) factorized_execution: bool,
81    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
82    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
83    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84    /// Variables that hold edge IDs (from MATCH edge patterns).
85    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
86    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
87    /// Optional constraint validator for schema enforcement during mutations.
88    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
89    /// Catalog for user-defined procedure lookup.
90    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
91    /// Shared parameter state for the currently planning correlated Apply.
92    /// Set by `plan_apply` before planning the inner operator, consumed by
93    /// `plan_operator` when encountering `ParameterScan`.
94    pub(super) correlated_param_state:
95        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
96    /// Variables from variable-length expand patterns (group-list variables).
97    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
98    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
99    /// When true, each physical operator is wrapped in `ProfiledOperator`.
100    profiling: std::cell::Cell<bool>,
101    /// Profile entries collected during planning (post-order).
102    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
103    /// Optional write tracker for recording writes during mutations.
104    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
105    /// Session context for introspection functions (info, schema, current_schema, etc.).
106    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
107    /// When true, expand operators use epoch-only visibility (no MVCC version
108    /// chain walks).  Set when the plan contains no mutations, so PENDING
109    /// writes are impossible to observe.
110    pub(super) read_only: bool,
111}
112
113impl Planner {
114    /// Creates a new planner with the given store.
115    ///
116    /// This creates a planner without transaction context, using the current
117    /// epoch from the store for visibility.
118    #[must_use]
119    pub fn new(store: Arc<dyn GraphStore>) -> Self {
120        let epoch = store.current_epoch();
121        Self {
122            store,
123            write_store: None,
124            transaction_manager: None,
125            transaction_id: None,
126            viewing_epoch: epoch,
127            anon_edge_counter: std::cell::Cell::new(0),
128            factorized_execution: true,
129            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131            validator: None,
132            catalog: None,
133            correlated_param_state: std::cell::RefCell::new(None),
134            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
135            profiling: std::cell::Cell::new(false),
136            profile_entries: std::cell::RefCell::new(Vec::new()),
137            write_tracker: None,
138            session_context: grafeo_core::execution::operators::SessionContext::default(),
139            read_only: false,
140        }
141    }
142
143    /// Creates a new planner with transaction context for MVCC-aware planning.
144    #[must_use]
145    pub fn with_context(
146        store: Arc<dyn GraphStore>,
147        write_store: Option<Arc<dyn GraphStoreMut>>,
148        transaction_manager: Arc<TransactionManager>,
149        transaction_id: Option<TransactionId>,
150        viewing_epoch: EpochId,
151    ) -> Self {
152        use crate::transaction::TransactionWriteTracker;
153
154        // Create write tracker when there's an active transaction
155        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
156            if transaction_id.is_some() {
157                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
158                    &transaction_manager,
159                ))))
160            } else {
161                None
162            };
163
164        Self {
165            store,
166            write_store,
167            transaction_manager: Some(transaction_manager),
168            transaction_id,
169            viewing_epoch,
170            anon_edge_counter: std::cell::Cell::new(0),
171            factorized_execution: true,
172            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
173            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
174            validator: None,
175            catalog: None,
176            correlated_param_state: std::cell::RefCell::new(None),
177            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
178            profiling: std::cell::Cell::new(false),
179            profile_entries: std::cell::RefCell::new(Vec::new()),
180            write_tracker,
181            session_context: grafeo_core::execution::operators::SessionContext::default(),
182            read_only: false,
183        }
184    }
185
186    /// Marks this planner as planning a read-only query (no mutations),
187    /// enabling fast-path visibility checks in expand operators.
188    #[must_use]
189    pub fn with_read_only(mut self, read_only: bool) -> Self {
190        self.read_only = read_only;
191        self
192    }
193
194    /// Returns the writable store, or `TransactionError::ReadOnly` if unavailable.
195    fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
196        self.write_store
197            .as_ref()
198            .map(Arc::clone)
199            .ok_or(Error::Transaction(
200                grafeo_common::utils::error::TransactionError::ReadOnly,
201            ))
202    }
203
204    /// Returns the viewing epoch for this planner.
205    #[must_use]
206    pub fn viewing_epoch(&self) -> EpochId {
207        self.viewing_epoch
208    }
209
210    /// Returns the transaction ID for this planner, if any.
211    #[must_use]
212    pub fn transaction_id(&self) -> Option<TransactionId> {
213        self.transaction_id
214    }
215
216    /// Returns a reference to the transaction manager, if available.
217    #[must_use]
218    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
219        self.transaction_manager.as_ref()
220    }
221
222    /// Enables or disables factorized execution for multi-hop queries.
223    #[must_use]
224    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
225        self.factorized_execution = enabled;
226        self
227    }
228
229    /// Sets the constraint validator for schema enforcement during mutations.
230    #[must_use]
231    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
232        self.validator = Some(validator);
233        self
234    }
235
236    /// Sets the catalog for user-defined procedure lookup.
237    #[must_use]
238    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
239        self.catalog = Some(catalog);
240        self
241    }
242
243    /// Sets the session context for introspection functions.
244    #[must_use]
245    pub fn with_session_context(
246        mut self,
247        context: grafeo_core::execution::operators::SessionContext,
248    ) -> Self {
249        self.session_context = context;
250        self
251    }
252
253    /// Generates an edge column name from an expand's edge variable (or an
254    /// anonymous fallback) and registers it in `edge_columns` so downstream
255    /// RETURN emits `EdgeResolve` instead of `NodeResolve`.
256    ///
257    /// Returns the column name for the caller to push into its output columns.
258    pub(super) fn register_edge_column(&self, edge_variable: &Option<String>) -> String {
259        let name = edge_variable.clone().unwrap_or_else(|| {
260            let count = self.anon_edge_counter.get();
261            self.anon_edge_counter.set(count + 1);
262            format!("_anon_edge_{}", count)
263        });
264        self.edge_columns.borrow_mut().insert(name.clone());
265        name
266    }
267
268    /// Counts consecutive single-hop expand operations.
269    ///
270    /// Returns the count and the deepest non-expand operator (the base of the chain).
271    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
272        match op {
273            LogicalOperator::Expand(expand) => {
274                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
275
276                if is_single_hop {
277                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
278                    (inner_count + 1, base)
279                } else {
280                    (0, op)
281                }
282            }
283            _ => (0, op),
284        }
285    }
286
287    /// Collects expand operations from the outermost down to the base.
288    ///
289    /// Returns expands in order from innermost (base) to outermost.
290    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
291        let mut chain = Vec::new();
292        let mut current = op;
293
294        while let LogicalOperator::Expand(expand) = current {
295            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
296            if !is_single_hop {
297                break;
298            }
299            chain.push(expand);
300            current = &expand.input;
301        }
302
303        chain.reverse();
304        chain
305    }
306
307    /// Plans a logical plan into a physical operator.
308    ///
309    /// # Errors
310    ///
311    /// Returns an error if the logical plan contains unsupported operators
312    /// or invalid expressions.
313    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
314        let _span = grafeo_debug_span!("grafeo::query::plan");
315        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
316        Ok(PhysicalPlan {
317            operator,
318            columns,
319            adaptive_context: None,
320        })
321    }
322
323    /// Plans a logical plan with profiling: each physical operator is wrapped
324    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
325    /// collect row counts and timing. Returns the physical plan together with
326    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
327    /// items in post-order (children before parents).
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the logical plan contains unsupported operators
332    /// or invalid expressions.
333    pub fn plan_profiled(
334        &self,
335        logical_plan: &LogicalPlan,
336    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
337        self.profiling.set(true);
338        self.profile_entries.borrow_mut().clear();
339
340        let result = self.plan_operator(&logical_plan.root);
341
342        self.profiling.set(false);
343        let (operator, columns) = result?;
344        let entries = self.profile_entries.borrow_mut().drain(..).collect();
345
346        Ok((
347            PhysicalPlan {
348                operator,
349                columns,
350                adaptive_context: None,
351            },
352            entries,
353        ))
354    }
355
356    /// Plans a logical plan with adaptive execution support.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the logical plan contains unsupported operators
361    /// or invalid expressions.
362    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
363        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
364
365        let mut adaptive_context = AdaptiveContext::new();
366        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
367
368        Ok(PhysicalPlan {
369            operator,
370            columns,
371            adaptive_context: Some(adaptive_context),
372        })
373    }
374
375    /// Collects cardinality estimates from the logical plan into an adaptive context.
376    fn collect_cardinality_estimates(
377        &self,
378        op: &LogicalOperator,
379        ctx: &mut AdaptiveContext,
380        depth: usize,
381    ) {
382        match op {
383            LogicalOperator::NodeScan(scan) => {
384                let estimate = if let Some(label) = &scan.label {
385                    self.store.nodes_by_label(label).len() as f64
386                } else {
387                    self.store.node_count() as f64
388                };
389                let id = format!("scan_{}", scan.variable);
390                ctx.set_estimate(&id, estimate);
391
392                if let Some(input) = &scan.input {
393                    self.collect_cardinality_estimates(input, ctx, depth + 1);
394                }
395            }
396            LogicalOperator::Filter(filter) => {
397                let input_estimate = self.estimate_cardinality(&filter.input);
398                let estimate = input_estimate * 0.3;
399                let id = format!("filter_{depth}");
400                ctx.set_estimate(&id, estimate);
401
402                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
403            }
404            LogicalOperator::Expand(expand) => {
405                let input_estimate = self.estimate_cardinality(&expand.input);
406                let stats = self.store.statistics();
407                let avg_degree = self.estimate_expand_degree(&stats, expand);
408                let estimate = input_estimate * avg_degree;
409                let id = format!("expand_{}", expand.to_variable);
410                ctx.set_estimate(&id, estimate);
411
412                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
413            }
414            LogicalOperator::Join(join) => {
415                let left_est = self.estimate_cardinality(&join.left);
416                let right_est = self.estimate_cardinality(&join.right);
417                let estimate = (left_est * right_est).sqrt();
418                let id = format!("join_{depth}");
419                ctx.set_estimate(&id, estimate);
420
421                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
422                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
423            }
424            LogicalOperator::Aggregate(agg) => {
425                let input_estimate = self.estimate_cardinality(&agg.input);
426                let estimate = if agg.group_by.is_empty() {
427                    1.0
428                } else {
429                    (input_estimate * 0.1).max(1.0)
430                };
431                let id = format!("aggregate_{depth}");
432                ctx.set_estimate(&id, estimate);
433
434                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
435            }
436            LogicalOperator::Distinct(distinct) => {
437                let input_estimate = self.estimate_cardinality(&distinct.input);
438                let estimate = (input_estimate * 0.5).max(1.0);
439                let id = format!("distinct_{depth}");
440                ctx.set_estimate(&id, estimate);
441
442                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
443            }
444            LogicalOperator::Return(ret) => {
445                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
446            }
447            LogicalOperator::Limit(limit) => {
448                let input_estimate = self.estimate_cardinality(&limit.input);
449                let estimate = (input_estimate).min(limit.count.estimate());
450                let id = format!("limit_{depth}");
451                ctx.set_estimate(&id, estimate);
452
453                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
454            }
455            LogicalOperator::Skip(skip) => {
456                let input_estimate = self.estimate_cardinality(&skip.input);
457                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
458                let id = format!("skip_{depth}");
459                ctx.set_estimate(&id, estimate);
460
461                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
462            }
463            LogicalOperator::Sort(sort) => {
464                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
465            }
466            LogicalOperator::Union(union) => {
467                let estimate: f64 = union
468                    .inputs
469                    .iter()
470                    .map(|input| self.estimate_cardinality(input))
471                    .sum();
472                let id = format!("union_{depth}");
473                ctx.set_estimate(&id, estimate);
474
475                for input in &union.inputs {
476                    self.collect_cardinality_estimates(input, ctx, depth + 1);
477                }
478            }
479            _ => {
480                // For other operators, try to recurse into known input patterns
481            }
482        }
483    }
484
485    /// Estimates cardinality for a logical operator subtree.
486    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
487        match op {
488            LogicalOperator::NodeScan(scan) => {
489                if let Some(label) = &scan.label {
490                    self.store.nodes_by_label(label).len() as f64
491                } else {
492                    self.store.node_count() as f64
493                }
494            }
495            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
496            LogicalOperator::Expand(expand) => {
497                let stats = self.store.statistics();
498                let avg_degree = self.estimate_expand_degree(&stats, expand);
499                self.estimate_cardinality(&expand.input) * avg_degree
500            }
501            LogicalOperator::Join(join) => {
502                let left = self.estimate_cardinality(&join.left);
503                let right = self.estimate_cardinality(&join.right);
504                (left * right).sqrt()
505            }
506            LogicalOperator::Aggregate(agg) => {
507                if agg.group_by.is_empty() {
508                    1.0
509                } else {
510                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
511                }
512            }
513            LogicalOperator::Distinct(distinct) => {
514                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
515            }
516            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
517            LogicalOperator::Limit(limit) => self
518                .estimate_cardinality(&limit.input)
519                .min(limit.count.estimate()),
520            LogicalOperator::Skip(skip) => {
521                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
522            }
523            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
524            LogicalOperator::Union(union) => union
525                .inputs
526                .iter()
527                .map(|input| self.estimate_cardinality(input))
528                .sum(),
529            LogicalOperator::Except(except) => {
530                let left = self.estimate_cardinality(&except.left);
531                let right = self.estimate_cardinality(&except.right);
532                (left - right).max(0.0)
533            }
534            LogicalOperator::Intersect(intersect) => {
535                let left = self.estimate_cardinality(&intersect.left);
536                let right = self.estimate_cardinality(&intersect.right);
537                left.min(right)
538            }
539            LogicalOperator::Otherwise(otherwise) => self
540                .estimate_cardinality(&otherwise.left)
541                .max(self.estimate_cardinality(&otherwise.right)),
542            _ => 1000.0,
543        }
544    }
545
546    /// Estimates the average edge degree for an expand operation using store statistics.
547    fn estimate_expand_degree(
548        &self,
549        stats: &grafeo_core::statistics::Statistics,
550        expand: &ExpandOp,
551    ) -> f64 {
552        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
553        if expand.edge_types.len() == 1 {
554            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
555        } else if stats.total_nodes > 0 {
556            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
557        } else {
558            10.0
559        }
560    }
561
562    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
563    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
564    fn maybe_profile(
565        &self,
566        result: Result<(Box<dyn Operator>, Vec<String>)>,
567        op: &LogicalOperator,
568    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
569        if self.profiling.get() {
570            let (physical, columns) = result?;
571            let (entry, stats) =
572                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
573            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
574            self.profile_entries.borrow_mut().push(entry);
575            Ok((Box::new(profiled), columns))
576        } else {
577            result
578        }
579    }
580
581    /// Plans a single logical operator.
582    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
583        let result = match op {
584            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
585            LogicalOperator::Expand(expand) => {
586                if self.factorized_execution {
587                    let (chain_len, _base) = Self::count_expand_chain(op);
588                    if chain_len >= 2 {
589                        return self.maybe_profile(self.plan_expand_chain(op), op);
590                    }
591                }
592                self.plan_expand(expand)
593            }
594            LogicalOperator::Return(ret) => self.plan_return(ret),
595            LogicalOperator::Filter(filter) => self.plan_filter(filter),
596            LogicalOperator::Project(project) => self.plan_project(project),
597            LogicalOperator::Limit(limit) => self.plan_limit(limit),
598            LogicalOperator::Skip(skip) => self.plan_skip(skip),
599            LogicalOperator::Sort(sort) => self.plan_sort(sort),
600            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
601            LogicalOperator::Join(join) => self.plan_join(join),
602            LogicalOperator::Union(union) => self.plan_union(union),
603            LogicalOperator::Except(except) => self.plan_except(except),
604            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
605            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
606            LogicalOperator::Apply(apply) => self.plan_apply(apply),
607            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
608            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
609            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
610            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
611            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
612            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
613            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
614            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
615            LogicalOperator::Merge(merge) => self.plan_merge(merge),
616            LogicalOperator::MergeRelationship(merge_rel) => {
617                self.plan_merge_relationship(merge_rel)
618            }
619            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
620            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
621            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
622            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
623            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
624            #[cfg(feature = "algos")]
625            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
626            #[cfg(not(feature = "algos"))]
627            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
628                "CALL procedures require the 'algos' feature".to_string(),
629            )),
630            LogicalOperator::ParameterScan(_param_scan) => {
631                let state = self
632                    .correlated_param_state
633                    .borrow()
634                    .clone()
635                    .ok_or_else(|| {
636                        Error::Internal(
637                            "ParameterScan without correlated Apply context".to_string(),
638                        )
639                    })?;
640                // Use the actual column names from the ParameterState (which may
641                // have been expanded from "*" to real variable names in plan_apply)
642                let columns = state.columns.clone();
643                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
644                Ok((operator, columns))
645            }
646            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
647            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
648            LogicalOperator::LoadData(load) => {
649                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
650                    load.path.clone(),
651                    load.format,
652                    load.with_headers,
653                    load.field_terminator,
654                    load.variable.clone(),
655                ));
656                Ok((operator, vec![load.variable.clone()]))
657            }
658            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
659            LogicalOperator::VectorScan(_) => Err(Error::Internal(
660                "VectorScan requires vector-index feature".to_string(),
661            )),
662            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
663                "VectorJoin requires vector-index feature".to_string(),
664            )),
665            _ => Err(Error::Internal(format!(
666                "Unsupported operator: {:?}",
667                std::mem::discriminant(op)
668            ))),
669        };
670        self.maybe_profile(result, op)
671    }
672
673    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
674    fn plan_horizontal_aggregate(
675        &self,
676        ha: &HorizontalAggregateOp,
677    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
678        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
679
680        let list_col_idx = child_columns
681            .iter()
682            .position(|c| c == &ha.list_column)
683            .ok_or_else(|| {
684                Error::Internal(format!(
685                    "HorizontalAggregate list column '{}' not found in {:?}",
686                    ha.list_column, child_columns
687                ))
688            })?;
689
690        let entity_kind = match ha.entity_kind {
691            LogicalEntityKind::Edge => EntityKind::Edge,
692            LogicalEntityKind::Node => EntityKind::Node,
693        };
694
695        let function = convert_aggregate_function(ha.function);
696        let input_column_count = child_columns.len();
697
698        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
699            child_op,
700            list_col_idx,
701            entity_kind,
702            function,
703            ha.property.clone(),
704            Arc::clone(&self.store) as Arc<dyn GraphStore>,
705            input_column_count,
706        ));
707
708        let mut columns = child_columns;
709        columns.push(ha.alias.clone());
710        // Mark the result as a scalar column
711        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
712
713        Ok((operator, columns))
714    }
715
716    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
717    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
718        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
719        let key_idx = child_columns
720            .iter()
721            .position(|c| c == &mc.key_var)
722            .ok_or_else(|| {
723                Error::Internal(format!(
724                    "MapCollect key '{}' not in columns {:?}",
725                    mc.key_var, child_columns
726                ))
727            })?;
728        let value_idx = child_columns
729            .iter()
730            .position(|c| c == &mc.value_var)
731            .ok_or_else(|| {
732                Error::Internal(format!(
733                    "MapCollect value '{}' not in columns {:?}",
734                    mc.value_var, child_columns
735                ))
736            })?;
737        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
738        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
739        Ok((operator, vec![mc.alias.clone()]))
740    }
741}
742
743/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
744#[cfg(feature = "algos")]
745struct StaticResultOperator {
746    rows: Vec<Vec<Value>>,
747    column_indices: Vec<usize>,
748    row_index: usize,
749}
750
751#[cfg(feature = "algos")]
752impl Operator for StaticResultOperator {
753    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
754        use grafeo_core::execution::DataChunk;
755
756        if self.row_index >= self.rows.len() {
757            return Ok(None);
758        }
759
760        let remaining = self.rows.len() - self.row_index;
761        let chunk_rows = remaining.min(1024);
762        let col_count = self.column_indices.len();
763
764        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
765        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
766
767        for row_offset in 0..chunk_rows {
768            let row = &self.rows[self.row_index + row_offset];
769            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
770                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
771                if let Some(col) = chunk.column_mut(col_idx) {
772                    col.push_value(value);
773                }
774            }
775        }
776        chunk.set_count(chunk_rows);
777
778        self.row_index += chunk_rows;
779        Ok(Some(chunk))
780    }
781
782    fn reset(&mut self) {
783        self.row_index = 0;
784    }
785
786    fn name(&self) -> &'static str {
787        "StaticResult"
788    }
789
790    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
791        self
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use super::*;
798    use crate::query::plan::{
799        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
800        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
801        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
802        SkipOp as LogicalSkipOp, SortKey, SortOp,
803    };
804    use grafeo_common::types::Value;
805    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
806    use grafeo_core::graph::GraphStoreMut;
807    use grafeo_core::graph::lpg::LpgStore;
808
809    fn create_test_store() -> Arc<LpgStore> {
810        let store = Arc::new(LpgStore::new().unwrap());
811        store.create_node(&["Person"]);
812        store.create_node(&["Person"]);
813        store.create_node(&["Company"]);
814        store
815    }
816
817    // ==================== Simple Scan Tests ====================
818
819    #[test]
820    fn test_plan_simple_scan() {
821        let store = create_test_store();
822        let planner = Planner::new(store);
823
824        // MATCH (n:Person) RETURN n
825        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
826            items: vec![ReturnItem {
827                expression: LogicalExpression::Variable("n".to_string()),
828                alias: None,
829            }],
830            distinct: false,
831            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
832                variable: "n".to_string(),
833                label: Some("Person".to_string()),
834                input: None,
835            })),
836        }));
837
838        let physical = planner.plan(&logical).unwrap();
839        assert_eq!(physical.columns(), &["n"]);
840    }
841
842    #[test]
843    fn test_plan_scan_without_label() {
844        let store = create_test_store();
845        let planner = Planner::new(store);
846
847        // MATCH (n) RETURN n
848        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
849            items: vec![ReturnItem {
850                expression: LogicalExpression::Variable("n".to_string()),
851                alias: None,
852            }],
853            distinct: false,
854            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
855                variable: "n".to_string(),
856                label: None,
857                input: None,
858            })),
859        }));
860
861        let physical = planner.plan(&logical).unwrap();
862        assert_eq!(physical.columns(), &["n"]);
863    }
864
865    #[test]
866    fn test_plan_return_with_alias() {
867        let store = create_test_store();
868        let planner = Planner::new(store);
869
870        // MATCH (n:Person) RETURN n AS person
871        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
872            items: vec![ReturnItem {
873                expression: LogicalExpression::Variable("n".to_string()),
874                alias: Some("person".to_string()),
875            }],
876            distinct: false,
877            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
878                variable: "n".to_string(),
879                label: Some("Person".to_string()),
880                input: None,
881            })),
882        }));
883
884        let physical = planner.plan(&logical).unwrap();
885        assert_eq!(physical.columns(), &["person"]);
886    }
887
888    #[test]
889    fn test_plan_return_property() {
890        let store = create_test_store();
891        let planner = Planner::new(store);
892
893        // MATCH (n:Person) RETURN n.name
894        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
895            items: vec![ReturnItem {
896                expression: LogicalExpression::Property {
897                    variable: "n".to_string(),
898                    property: "name".to_string(),
899                },
900                alias: None,
901            }],
902            distinct: false,
903            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
904                variable: "n".to_string(),
905                label: Some("Person".to_string()),
906                input: None,
907            })),
908        }));
909
910        let physical = planner.plan(&logical).unwrap();
911        assert_eq!(physical.columns(), &["n.name"]);
912    }
913
914    #[test]
915    fn test_plan_return_literal() {
916        let store = create_test_store();
917        let planner = Planner::new(store);
918
919        // MATCH (n) RETURN 42 AS answer
920        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
921            items: vec![ReturnItem {
922                expression: LogicalExpression::Literal(Value::Int64(42)),
923                alias: Some("answer".to_string()),
924            }],
925            distinct: false,
926            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
927                variable: "n".to_string(),
928                label: None,
929                input: None,
930            })),
931        }));
932
933        let physical = planner.plan(&logical).unwrap();
934        assert_eq!(physical.columns(), &["answer"]);
935    }
936
937    // ==================== Filter Tests ====================
938
939    #[test]
940    fn test_plan_filter_equality() {
941        let store = create_test_store();
942        let planner = Planner::new(store);
943
944        // MATCH (n:Person) WHERE n.age = 30 RETURN n
945        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
946            items: vec![ReturnItem {
947                expression: LogicalExpression::Variable("n".to_string()),
948                alias: None,
949            }],
950            distinct: false,
951            input: Box::new(LogicalOperator::Filter(FilterOp {
952                predicate: LogicalExpression::Binary {
953                    left: Box::new(LogicalExpression::Property {
954                        variable: "n".to_string(),
955                        property: "age".to_string(),
956                    }),
957                    op: BinaryOp::Eq,
958                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
959                },
960                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
961                    variable: "n".to_string(),
962                    label: Some("Person".to_string()),
963                    input: None,
964                })),
965                pushdown_hint: None,
966            })),
967        }));
968
969        let physical = planner.plan(&logical).unwrap();
970        assert_eq!(physical.columns(), &["n"]);
971    }
972
973    #[test]
974    fn test_plan_filter_compound_and() {
975        let store = create_test_store();
976        let planner = Planner::new(store);
977
978        // WHERE n.age > 20 AND n.age < 40
979        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
980            items: vec![ReturnItem {
981                expression: LogicalExpression::Variable("n".to_string()),
982                alias: None,
983            }],
984            distinct: false,
985            input: Box::new(LogicalOperator::Filter(FilterOp {
986                predicate: LogicalExpression::Binary {
987                    left: Box::new(LogicalExpression::Binary {
988                        left: Box::new(LogicalExpression::Property {
989                            variable: "n".to_string(),
990                            property: "age".to_string(),
991                        }),
992                        op: BinaryOp::Gt,
993                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
994                    }),
995                    op: BinaryOp::And,
996                    right: Box::new(LogicalExpression::Binary {
997                        left: Box::new(LogicalExpression::Property {
998                            variable: "n".to_string(),
999                            property: "age".to_string(),
1000                        }),
1001                        op: BinaryOp::Lt,
1002                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1003                    }),
1004                },
1005                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1006                    variable: "n".to_string(),
1007                    label: None,
1008                    input: None,
1009                })),
1010                pushdown_hint: None,
1011            })),
1012        }));
1013
1014        let physical = planner.plan(&logical).unwrap();
1015        assert_eq!(physical.columns(), &["n"]);
1016    }
1017
1018    #[test]
1019    fn test_plan_filter_unary_not() {
1020        let store = create_test_store();
1021        let planner = Planner::new(store);
1022
1023        // WHERE NOT n.active
1024        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1025            items: vec![ReturnItem {
1026                expression: LogicalExpression::Variable("n".to_string()),
1027                alias: None,
1028            }],
1029            distinct: false,
1030            input: Box::new(LogicalOperator::Filter(FilterOp {
1031                predicate: LogicalExpression::Unary {
1032                    op: UnaryOp::Not,
1033                    operand: Box::new(LogicalExpression::Property {
1034                        variable: "n".to_string(),
1035                        property: "active".to_string(),
1036                    }),
1037                },
1038                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1039                    variable: "n".to_string(),
1040                    label: None,
1041                    input: None,
1042                })),
1043                pushdown_hint: None,
1044            })),
1045        }));
1046
1047        let physical = planner.plan(&logical).unwrap();
1048        assert_eq!(physical.columns(), &["n"]);
1049    }
1050
1051    #[test]
1052    fn test_plan_filter_is_null() {
1053        let store = create_test_store();
1054        let planner = Planner::new(store);
1055
1056        // WHERE n.email IS NULL
1057        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1058            items: vec![ReturnItem {
1059                expression: LogicalExpression::Variable("n".to_string()),
1060                alias: None,
1061            }],
1062            distinct: false,
1063            input: Box::new(LogicalOperator::Filter(FilterOp {
1064                predicate: LogicalExpression::Unary {
1065                    op: UnaryOp::IsNull,
1066                    operand: Box::new(LogicalExpression::Property {
1067                        variable: "n".to_string(),
1068                        property: "email".to_string(),
1069                    }),
1070                },
1071                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1072                    variable: "n".to_string(),
1073                    label: None,
1074                    input: None,
1075                })),
1076                pushdown_hint: None,
1077            })),
1078        }));
1079
1080        let physical = planner.plan(&logical).unwrap();
1081        assert_eq!(physical.columns(), &["n"]);
1082    }
1083
1084    #[test]
1085    fn test_plan_filter_function_call() {
1086        let store = create_test_store();
1087        let planner = Planner::new(store);
1088
1089        // WHERE size(n.friends) > 0
1090        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1091            items: vec![ReturnItem {
1092                expression: LogicalExpression::Variable("n".to_string()),
1093                alias: None,
1094            }],
1095            distinct: false,
1096            input: Box::new(LogicalOperator::Filter(FilterOp {
1097                predicate: LogicalExpression::Binary {
1098                    left: Box::new(LogicalExpression::FunctionCall {
1099                        name: "size".to_string(),
1100                        args: vec![LogicalExpression::Property {
1101                            variable: "n".to_string(),
1102                            property: "friends".to_string(),
1103                        }],
1104                        distinct: false,
1105                    }),
1106                    op: BinaryOp::Gt,
1107                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1108                },
1109                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1110                    variable: "n".to_string(),
1111                    label: None,
1112                    input: None,
1113                })),
1114                pushdown_hint: None,
1115            })),
1116        }));
1117
1118        let physical = planner.plan(&logical).unwrap();
1119        assert_eq!(physical.columns(), &["n"]);
1120    }
1121
1122    // ==================== Expand Tests ====================
1123
1124    #[test]
1125    fn test_plan_expand_outgoing() {
1126        let store = create_test_store();
1127        let planner = Planner::new(store);
1128
1129        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1130        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1131            items: vec![
1132                ReturnItem {
1133                    expression: LogicalExpression::Variable("a".to_string()),
1134                    alias: None,
1135                },
1136                ReturnItem {
1137                    expression: LogicalExpression::Variable("b".to_string()),
1138                    alias: None,
1139                },
1140            ],
1141            distinct: false,
1142            input: Box::new(LogicalOperator::Expand(ExpandOp {
1143                from_variable: "a".to_string(),
1144                to_variable: "b".to_string(),
1145                edge_variable: None,
1146                direction: ExpandDirection::Outgoing,
1147                edge_types: vec!["KNOWS".to_string()],
1148                min_hops: 1,
1149                max_hops: Some(1),
1150                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1151                    variable: "a".to_string(),
1152                    label: Some("Person".to_string()),
1153                    input: None,
1154                })),
1155                path_alias: None,
1156                path_mode: PathMode::Walk,
1157            })),
1158        }));
1159
1160        let physical = planner.plan(&logical).unwrap();
1161        // The return should have columns [a, b]
1162        assert!(physical.columns().contains(&"a".to_string()));
1163        assert!(physical.columns().contains(&"b".to_string()));
1164    }
1165
1166    #[test]
1167    fn test_plan_expand_with_edge_variable() {
1168        let store = create_test_store();
1169        let planner = Planner::new(store);
1170
1171        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1172        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1173            items: vec![
1174                ReturnItem {
1175                    expression: LogicalExpression::Variable("a".to_string()),
1176                    alias: None,
1177                },
1178                ReturnItem {
1179                    expression: LogicalExpression::Variable("r".to_string()),
1180                    alias: None,
1181                },
1182                ReturnItem {
1183                    expression: LogicalExpression::Variable("b".to_string()),
1184                    alias: None,
1185                },
1186            ],
1187            distinct: false,
1188            input: Box::new(LogicalOperator::Expand(ExpandOp {
1189                from_variable: "a".to_string(),
1190                to_variable: "b".to_string(),
1191                edge_variable: Some("r".to_string()),
1192                direction: ExpandDirection::Outgoing,
1193                edge_types: vec!["KNOWS".to_string()],
1194                min_hops: 1,
1195                max_hops: Some(1),
1196                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1197                    variable: "a".to_string(),
1198                    label: None,
1199                    input: None,
1200                })),
1201                path_alias: None,
1202                path_mode: PathMode::Walk,
1203            })),
1204        }));
1205
1206        let physical = planner.plan(&logical).unwrap();
1207        assert!(physical.columns().contains(&"a".to_string()));
1208        assert!(physical.columns().contains(&"r".to_string()));
1209        assert!(physical.columns().contains(&"b".to_string()));
1210    }
1211
1212    // ==================== Limit/Skip/Sort Tests ====================
1213
1214    #[test]
1215    fn test_plan_limit() {
1216        let store = create_test_store();
1217        let planner = Planner::new(store);
1218
1219        // MATCH (n) RETURN n LIMIT 10
1220        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1221            items: vec![ReturnItem {
1222                expression: LogicalExpression::Variable("n".to_string()),
1223                alias: None,
1224            }],
1225            distinct: false,
1226            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1227                count: 10.into(),
1228                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1229                    variable: "n".to_string(),
1230                    label: None,
1231                    input: None,
1232                })),
1233            })),
1234        }));
1235
1236        let physical = planner.plan(&logical).unwrap();
1237        assert_eq!(physical.columns(), &["n"]);
1238    }
1239
1240    #[test]
1241    fn test_plan_skip() {
1242        let store = create_test_store();
1243        let planner = Planner::new(store);
1244
1245        // MATCH (n) RETURN n SKIP 5
1246        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1247            items: vec![ReturnItem {
1248                expression: LogicalExpression::Variable("n".to_string()),
1249                alias: None,
1250            }],
1251            distinct: false,
1252            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1253                count: 5.into(),
1254                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1255                    variable: "n".to_string(),
1256                    label: None,
1257                    input: None,
1258                })),
1259            })),
1260        }));
1261
1262        let physical = planner.plan(&logical).unwrap();
1263        assert_eq!(physical.columns(), &["n"]);
1264    }
1265
1266    #[test]
1267    fn test_plan_sort() {
1268        let store = create_test_store();
1269        let planner = Planner::new(store);
1270
1271        // MATCH (n) RETURN n ORDER BY n.name ASC
1272        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1273            items: vec![ReturnItem {
1274                expression: LogicalExpression::Variable("n".to_string()),
1275                alias: None,
1276            }],
1277            distinct: false,
1278            input: Box::new(LogicalOperator::Sort(SortOp {
1279                keys: vec![SortKey {
1280                    expression: LogicalExpression::Variable("n".to_string()),
1281                    order: SortOrder::Ascending,
1282                    nulls: None,
1283                }],
1284                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1285                    variable: "n".to_string(),
1286                    label: None,
1287                    input: None,
1288                })),
1289            })),
1290        }));
1291
1292        let physical = planner.plan(&logical).unwrap();
1293        assert_eq!(physical.columns(), &["n"]);
1294    }
1295
1296    #[test]
1297    fn test_plan_sort_descending() {
1298        let store = create_test_store();
1299        let planner = Planner::new(store);
1300
1301        // ORDER BY n DESC
1302        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1303            items: vec![ReturnItem {
1304                expression: LogicalExpression::Variable("n".to_string()),
1305                alias: None,
1306            }],
1307            distinct: false,
1308            input: Box::new(LogicalOperator::Sort(SortOp {
1309                keys: vec![SortKey {
1310                    expression: LogicalExpression::Variable("n".to_string()),
1311                    order: SortOrder::Descending,
1312                    nulls: None,
1313                }],
1314                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1315                    variable: "n".to_string(),
1316                    label: None,
1317                    input: None,
1318                })),
1319            })),
1320        }));
1321
1322        let physical = planner.plan(&logical).unwrap();
1323        assert_eq!(physical.columns(), &["n"]);
1324    }
1325
1326    #[test]
1327    fn test_plan_distinct() {
1328        let store = create_test_store();
1329        let planner = Planner::new(store);
1330
1331        // MATCH (n) RETURN DISTINCT n
1332        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1333            items: vec![ReturnItem {
1334                expression: LogicalExpression::Variable("n".to_string()),
1335                alias: None,
1336            }],
1337            distinct: false,
1338            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1339                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1340                    variable: "n".to_string(),
1341                    label: None,
1342                    input: None,
1343                })),
1344                columns: None,
1345            })),
1346        }));
1347
1348        let physical = planner.plan(&logical).unwrap();
1349        assert_eq!(physical.columns(), &["n"]);
1350    }
1351
1352    #[test]
1353    fn test_plan_distinct_with_columns() {
1354        let store = create_test_store();
1355        let planner = Planner::new(store);
1356
1357        // DISTINCT on specific columns (column-specific dedup)
1358        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1359            items: vec![ReturnItem {
1360                expression: LogicalExpression::Variable("n".to_string()),
1361                alias: None,
1362            }],
1363            distinct: false,
1364            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1365                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1366                    variable: "n".to_string(),
1367                    label: None,
1368                    input: None,
1369                })),
1370                columns: Some(vec!["n".to_string()]),
1371            })),
1372        }));
1373
1374        let physical = planner.plan(&logical).unwrap();
1375        assert_eq!(physical.columns(), &["n"]);
1376    }
1377
1378    #[test]
1379    fn test_plan_distinct_with_nonexistent_columns() {
1380        let store = create_test_store();
1381        let planner = Planner::new(store);
1382
1383        // When distinct columns don't match any output columns,
1384        // it falls back to full-row distinct.
1385        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1386            items: vec![ReturnItem {
1387                expression: LogicalExpression::Variable("n".to_string()),
1388                alias: None,
1389            }],
1390            distinct: false,
1391            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1392                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1393                    variable: "n".to_string(),
1394                    label: None,
1395                    input: None,
1396                })),
1397                columns: Some(vec!["nonexistent".to_string()]),
1398            })),
1399        }));
1400
1401        let physical = planner.plan(&logical).unwrap();
1402        assert_eq!(physical.columns(), &["n"]);
1403    }
1404
1405    // ==================== Aggregate Tests ====================
1406
1407    #[test]
1408    fn test_plan_aggregate_count() {
1409        let store = create_test_store();
1410        let planner = Planner::new(store);
1411
1412        // MATCH (n) RETURN count(n)
1413        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1414            items: vec![ReturnItem {
1415                expression: LogicalExpression::Variable("cnt".to_string()),
1416                alias: None,
1417            }],
1418            distinct: false,
1419            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1420                group_by: vec![],
1421                aggregates: vec![LogicalAggregateExpr {
1422                    function: LogicalAggregateFunction::Count,
1423                    expression: Some(LogicalExpression::Variable("n".to_string())),
1424                    expression2: None,
1425                    distinct: false,
1426                    alias: Some("cnt".to_string()),
1427                    percentile: None,
1428                    separator: None,
1429                }],
1430                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1431                    variable: "n".to_string(),
1432                    label: None,
1433                    input: None,
1434                })),
1435                having: None,
1436            })),
1437        }));
1438
1439        let physical = planner.plan(&logical).unwrap();
1440        assert!(physical.columns().contains(&"cnt".to_string()));
1441    }
1442
1443    #[test]
1444    fn test_plan_aggregate_with_group_by() {
1445        let store = create_test_store();
1446        let planner = Planner::new(store);
1447
1448        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1449        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1450            group_by: vec![LogicalExpression::Property {
1451                variable: "n".to_string(),
1452                property: "city".to_string(),
1453            }],
1454            aggregates: vec![LogicalAggregateExpr {
1455                function: LogicalAggregateFunction::Count,
1456                expression: Some(LogicalExpression::Variable("n".to_string())),
1457                expression2: None,
1458                distinct: false,
1459                alias: Some("cnt".to_string()),
1460                percentile: None,
1461                separator: None,
1462            }],
1463            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1464                variable: "n".to_string(),
1465                label: Some("Person".to_string()),
1466                input: None,
1467            })),
1468            having: None,
1469        }));
1470
1471        let physical = planner.plan(&logical).unwrap();
1472        assert_eq!(physical.columns().len(), 2);
1473    }
1474
1475    #[test]
1476    fn test_plan_aggregate_sum() {
1477        let store = create_test_store();
1478        let planner = Planner::new(store);
1479
1480        // SUM(n.value)
1481        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1482            group_by: vec![],
1483            aggregates: vec![LogicalAggregateExpr {
1484                function: LogicalAggregateFunction::Sum,
1485                expression: Some(LogicalExpression::Property {
1486                    variable: "n".to_string(),
1487                    property: "value".to_string(),
1488                }),
1489                expression2: None,
1490                distinct: false,
1491                alias: Some("total".to_string()),
1492                percentile: None,
1493                separator: None,
1494            }],
1495            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1496                variable: "n".to_string(),
1497                label: None,
1498                input: None,
1499            })),
1500            having: None,
1501        }));
1502
1503        let physical = planner.plan(&logical).unwrap();
1504        assert!(physical.columns().contains(&"total".to_string()));
1505    }
1506
1507    #[test]
1508    fn test_plan_aggregate_avg() {
1509        let store = create_test_store();
1510        let planner = Planner::new(store);
1511
1512        // AVG(n.score)
1513        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1514            group_by: vec![],
1515            aggregates: vec![LogicalAggregateExpr {
1516                function: LogicalAggregateFunction::Avg,
1517                expression: Some(LogicalExpression::Property {
1518                    variable: "n".to_string(),
1519                    property: "score".to_string(),
1520                }),
1521                expression2: None,
1522                distinct: false,
1523                alias: Some("average".to_string()),
1524                percentile: None,
1525                separator: None,
1526            }],
1527            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1528                variable: "n".to_string(),
1529                label: None,
1530                input: None,
1531            })),
1532            having: None,
1533        }));
1534
1535        let physical = planner.plan(&logical).unwrap();
1536        assert!(physical.columns().contains(&"average".to_string()));
1537    }
1538
1539    #[test]
1540    fn test_plan_aggregate_min_max() {
1541        let store = create_test_store();
1542        let planner = Planner::new(store);
1543
1544        // MIN(n.age), MAX(n.age)
1545        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1546            group_by: vec![],
1547            aggregates: vec![
1548                LogicalAggregateExpr {
1549                    function: LogicalAggregateFunction::Min,
1550                    expression: Some(LogicalExpression::Property {
1551                        variable: "n".to_string(),
1552                        property: "age".to_string(),
1553                    }),
1554                    expression2: None,
1555                    distinct: false,
1556                    alias: Some("youngest".to_string()),
1557                    percentile: None,
1558                    separator: None,
1559                },
1560                LogicalAggregateExpr {
1561                    function: LogicalAggregateFunction::Max,
1562                    expression: Some(LogicalExpression::Property {
1563                        variable: "n".to_string(),
1564                        property: "age".to_string(),
1565                    }),
1566                    expression2: None,
1567                    distinct: false,
1568                    alias: Some("oldest".to_string()),
1569                    percentile: None,
1570                    separator: None,
1571                },
1572            ],
1573            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1574                variable: "n".to_string(),
1575                label: None,
1576                input: None,
1577            })),
1578            having: None,
1579        }));
1580
1581        let physical = planner.plan(&logical).unwrap();
1582        assert!(physical.columns().contains(&"youngest".to_string()));
1583        assert!(physical.columns().contains(&"oldest".to_string()));
1584    }
1585
1586    // ==================== Join Tests ====================
1587
1588    #[test]
1589    fn test_plan_inner_join() {
1590        let store = create_test_store();
1591        let planner = Planner::new(store);
1592
1593        // Inner join between two scans
1594        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1595            items: vec![
1596                ReturnItem {
1597                    expression: LogicalExpression::Variable("a".to_string()),
1598                    alias: None,
1599                },
1600                ReturnItem {
1601                    expression: LogicalExpression::Variable("b".to_string()),
1602                    alias: None,
1603                },
1604            ],
1605            distinct: false,
1606            input: Box::new(LogicalOperator::Join(JoinOp {
1607                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1608                    variable: "a".to_string(),
1609                    label: Some("Person".to_string()),
1610                    input: None,
1611                })),
1612                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1613                    variable: "b".to_string(),
1614                    label: Some("Company".to_string()),
1615                    input: None,
1616                })),
1617                join_type: JoinType::Inner,
1618                conditions: vec![JoinCondition {
1619                    left: LogicalExpression::Variable("a".to_string()),
1620                    right: LogicalExpression::Variable("b".to_string()),
1621                }],
1622            })),
1623        }));
1624
1625        let physical = planner.plan(&logical).unwrap();
1626        assert!(physical.columns().contains(&"a".to_string()));
1627        assert!(physical.columns().contains(&"b".to_string()));
1628    }
1629
1630    #[test]
1631    fn test_plan_cross_join() {
1632        let store = create_test_store();
1633        let planner = Planner::new(store);
1634
1635        // Cross join (no conditions)
1636        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1637            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1638                variable: "a".to_string(),
1639                label: None,
1640                input: None,
1641            })),
1642            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1643                variable: "b".to_string(),
1644                label: None,
1645                input: None,
1646            })),
1647            join_type: JoinType::Cross,
1648            conditions: vec![],
1649        }));
1650
1651        let physical = planner.plan(&logical).unwrap();
1652        assert_eq!(physical.columns().len(), 2);
1653    }
1654
1655    #[test]
1656    fn test_plan_left_join() {
1657        let store = create_test_store();
1658        let planner = Planner::new(store);
1659
1660        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1661            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1662                variable: "a".to_string(),
1663                label: None,
1664                input: None,
1665            })),
1666            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1667                variable: "b".to_string(),
1668                label: None,
1669                input: None,
1670            })),
1671            join_type: JoinType::Left,
1672            conditions: vec![],
1673        }));
1674
1675        let physical = planner.plan(&logical).unwrap();
1676        assert_eq!(physical.columns().len(), 2);
1677    }
1678
1679    // ==================== Mutation Tests ====================
1680
1681    fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1682        let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStore>);
1683        p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1684        p
1685    }
1686
1687    #[test]
1688    fn test_plan_create_node() {
1689        let store = create_test_store();
1690        let planner = create_writable_planner(&store);
1691
1692        // CREATE (n:Person {name: 'Alix'})
1693        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1694            variable: "n".to_string(),
1695            labels: vec!["Person".to_string()],
1696            properties: vec![(
1697                "name".to_string(),
1698                LogicalExpression::Literal(Value::String("Alix".into())),
1699            )],
1700            input: None,
1701        }));
1702
1703        let physical = planner.plan(&logical).unwrap();
1704        assert!(physical.columns().contains(&"n".to_string()));
1705    }
1706
1707    #[test]
1708    fn test_plan_create_edge() {
1709        let store = create_test_store();
1710        let planner = create_writable_planner(&store);
1711
1712        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1713        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1714            variable: Some("r".to_string()),
1715            from_variable: "a".to_string(),
1716            to_variable: "b".to_string(),
1717            edge_type: "KNOWS".to_string(),
1718            properties: vec![],
1719            input: Box::new(LogicalOperator::Join(JoinOp {
1720                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1721                    variable: "a".to_string(),
1722                    label: None,
1723                    input: None,
1724                })),
1725                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1726                    variable: "b".to_string(),
1727                    label: None,
1728                    input: None,
1729                })),
1730                join_type: JoinType::Cross,
1731                conditions: vec![],
1732            })),
1733        }));
1734
1735        let physical = planner.plan(&logical).unwrap();
1736        assert!(physical.columns().contains(&"r".to_string()));
1737    }
1738
1739    #[test]
1740    fn test_plan_delete_node() {
1741        let store = create_test_store();
1742        let planner = create_writable_planner(&store);
1743
1744        // MATCH (n) DELETE n
1745        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1746            variable: "n".to_string(),
1747            detach: false,
1748            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1749                variable: "n".to_string(),
1750                label: None,
1751                input: None,
1752            })),
1753        }));
1754
1755        let physical = planner.plan(&logical).unwrap();
1756        assert!(physical.columns().contains(&"n".to_string()));
1757    }
1758
1759    // ==================== Error Cases ====================
1760
1761    #[test]
1762    fn test_plan_empty_errors() {
1763        let store = create_test_store();
1764        let planner = Planner::new(store);
1765
1766        let logical = LogicalPlan::new(LogicalOperator::Empty);
1767        let result = planner.plan(&logical);
1768        assert!(result.is_err());
1769    }
1770
1771    #[test]
1772    fn test_plan_missing_variable_in_return() {
1773        let store = create_test_store();
1774        let planner = Planner::new(store);
1775
1776        // Return variable that doesn't exist in input
1777        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1778            items: vec![ReturnItem {
1779                expression: LogicalExpression::Variable("missing".to_string()),
1780                alias: None,
1781            }],
1782            distinct: false,
1783            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1784                variable: "n".to_string(),
1785                label: None,
1786                input: None,
1787            })),
1788        }));
1789
1790        let result = planner.plan(&logical);
1791        assert!(result.is_err());
1792    }
1793
1794    // ==================== Helper Function Tests ====================
1795
1796    #[test]
1797    fn test_convert_binary_ops() {
1798        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1799        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1800        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1801        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1802        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1803        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1804        assert!(convert_binary_op(BinaryOp::And).is_ok());
1805        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1806        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1807        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1808        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1809        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1810    }
1811
1812    #[test]
1813    fn test_convert_unary_ops() {
1814        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1815        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1816        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1817        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1818    }
1819
1820    #[test]
1821    fn test_convert_aggregate_functions() {
1822        assert!(matches!(
1823            convert_aggregate_function(LogicalAggregateFunction::Count),
1824            PhysicalAggregateFunction::Count
1825        ));
1826        assert!(matches!(
1827            convert_aggregate_function(LogicalAggregateFunction::Sum),
1828            PhysicalAggregateFunction::Sum
1829        ));
1830        assert!(matches!(
1831            convert_aggregate_function(LogicalAggregateFunction::Avg),
1832            PhysicalAggregateFunction::Avg
1833        ));
1834        assert!(matches!(
1835            convert_aggregate_function(LogicalAggregateFunction::Min),
1836            PhysicalAggregateFunction::Min
1837        ));
1838        assert!(matches!(
1839            convert_aggregate_function(LogicalAggregateFunction::Max),
1840            PhysicalAggregateFunction::Max
1841        ));
1842    }
1843
1844    #[test]
1845    fn test_planner_accessors() {
1846        let store = create_test_store();
1847        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1848
1849        assert!(planner.transaction_id().is_none());
1850        assert!(planner.transaction_manager().is_none());
1851        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1852    }
1853
1854    #[test]
1855    fn test_physical_plan_accessors() {
1856        let store = create_test_store();
1857        let planner = Planner::new(store);
1858
1859        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1860            variable: "n".to_string(),
1861            label: None,
1862            input: None,
1863        }));
1864
1865        let physical = planner.plan(&logical).unwrap();
1866        assert_eq!(physical.columns(), &["n"]);
1867
1868        // Test into_operator
1869        let _ = physical.into_operator();
1870    }
1871
1872    // ==================== Adaptive Planning Tests ====================
1873
1874    #[test]
1875    fn test_plan_adaptive_with_scan() {
1876        let store = create_test_store();
1877        let planner = Planner::new(store);
1878
1879        // MATCH (n:Person) RETURN n
1880        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1881            items: vec![ReturnItem {
1882                expression: LogicalExpression::Variable("n".to_string()),
1883                alias: None,
1884            }],
1885            distinct: false,
1886            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1887                variable: "n".to_string(),
1888                label: Some("Person".to_string()),
1889                input: None,
1890            })),
1891        }));
1892
1893        let physical = planner.plan_adaptive(&logical).unwrap();
1894        assert_eq!(physical.columns(), &["n"]);
1895        // Should have adaptive context with estimates
1896        assert!(physical.adaptive_context.is_some());
1897    }
1898
1899    #[test]
1900    fn test_plan_adaptive_with_filter() {
1901        let store = create_test_store();
1902        let planner = Planner::new(store);
1903
1904        // MATCH (n) WHERE n.age > 30 RETURN n
1905        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1906            items: vec![ReturnItem {
1907                expression: LogicalExpression::Variable("n".to_string()),
1908                alias: None,
1909            }],
1910            distinct: false,
1911            input: Box::new(LogicalOperator::Filter(FilterOp {
1912                predicate: LogicalExpression::Binary {
1913                    left: Box::new(LogicalExpression::Property {
1914                        variable: "n".to_string(),
1915                        property: "age".to_string(),
1916                    }),
1917                    op: BinaryOp::Gt,
1918                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1919                },
1920                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1921                    variable: "n".to_string(),
1922                    label: None,
1923                    input: None,
1924                })),
1925                pushdown_hint: None,
1926            })),
1927        }));
1928
1929        let physical = planner.plan_adaptive(&logical).unwrap();
1930        assert!(physical.adaptive_context.is_some());
1931    }
1932
1933    #[test]
1934    fn test_plan_adaptive_with_expand() {
1935        let store = create_test_store();
1936        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
1937            .with_factorized_execution(false);
1938
1939        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1940        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1941            items: vec![
1942                ReturnItem {
1943                    expression: LogicalExpression::Variable("a".to_string()),
1944                    alias: None,
1945                },
1946                ReturnItem {
1947                    expression: LogicalExpression::Variable("b".to_string()),
1948                    alias: None,
1949                },
1950            ],
1951            distinct: false,
1952            input: Box::new(LogicalOperator::Expand(ExpandOp {
1953                from_variable: "a".to_string(),
1954                to_variable: "b".to_string(),
1955                edge_variable: None,
1956                direction: ExpandDirection::Outgoing,
1957                edge_types: vec!["KNOWS".to_string()],
1958                min_hops: 1,
1959                max_hops: Some(1),
1960                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1961                    variable: "a".to_string(),
1962                    label: None,
1963                    input: None,
1964                })),
1965                path_alias: None,
1966                path_mode: PathMode::Walk,
1967            })),
1968        }));
1969
1970        let physical = planner.plan_adaptive(&logical).unwrap();
1971        assert!(physical.adaptive_context.is_some());
1972    }
1973
1974    #[test]
1975    fn test_plan_adaptive_with_join() {
1976        let store = create_test_store();
1977        let planner = Planner::new(store);
1978
1979        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1980            items: vec![
1981                ReturnItem {
1982                    expression: LogicalExpression::Variable("a".to_string()),
1983                    alias: None,
1984                },
1985                ReturnItem {
1986                    expression: LogicalExpression::Variable("b".to_string()),
1987                    alias: None,
1988                },
1989            ],
1990            distinct: false,
1991            input: Box::new(LogicalOperator::Join(JoinOp {
1992                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1993                    variable: "a".to_string(),
1994                    label: None,
1995                    input: None,
1996                })),
1997                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1998                    variable: "b".to_string(),
1999                    label: None,
2000                    input: None,
2001                })),
2002                join_type: JoinType::Cross,
2003                conditions: vec![],
2004            })),
2005        }));
2006
2007        let physical = planner.plan_adaptive(&logical).unwrap();
2008        assert!(physical.adaptive_context.is_some());
2009    }
2010
2011    #[test]
2012    fn test_plan_adaptive_with_aggregate() {
2013        let store = create_test_store();
2014        let planner = Planner::new(store);
2015
2016        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2017            group_by: vec![],
2018            aggregates: vec![LogicalAggregateExpr {
2019                function: LogicalAggregateFunction::Count,
2020                expression: Some(LogicalExpression::Variable("n".to_string())),
2021                expression2: None,
2022                distinct: false,
2023                alias: Some("cnt".to_string()),
2024                percentile: None,
2025                separator: None,
2026            }],
2027            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2028                variable: "n".to_string(),
2029                label: None,
2030                input: None,
2031            })),
2032            having: None,
2033        }));
2034
2035        let physical = planner.plan_adaptive(&logical).unwrap();
2036        assert!(physical.adaptive_context.is_some());
2037    }
2038
2039    #[test]
2040    fn test_plan_adaptive_with_distinct() {
2041        let store = create_test_store();
2042        let planner = Planner::new(store);
2043
2044        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2045            items: vec![ReturnItem {
2046                expression: LogicalExpression::Variable("n".to_string()),
2047                alias: None,
2048            }],
2049            distinct: false,
2050            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2051                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2052                    variable: "n".to_string(),
2053                    label: None,
2054                    input: None,
2055                })),
2056                columns: None,
2057            })),
2058        }));
2059
2060        let physical = planner.plan_adaptive(&logical).unwrap();
2061        assert!(physical.adaptive_context.is_some());
2062    }
2063
2064    #[test]
2065    fn test_plan_adaptive_with_limit() {
2066        let store = create_test_store();
2067        let planner = Planner::new(store);
2068
2069        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2070            items: vec![ReturnItem {
2071                expression: LogicalExpression::Variable("n".to_string()),
2072                alias: None,
2073            }],
2074            distinct: false,
2075            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2076                count: 10.into(),
2077                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2078                    variable: "n".to_string(),
2079                    label: None,
2080                    input: None,
2081                })),
2082            })),
2083        }));
2084
2085        let physical = planner.plan_adaptive(&logical).unwrap();
2086        assert!(physical.adaptive_context.is_some());
2087    }
2088
2089    #[test]
2090    fn test_plan_adaptive_with_skip() {
2091        let store = create_test_store();
2092        let planner = Planner::new(store);
2093
2094        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2095            items: vec![ReturnItem {
2096                expression: LogicalExpression::Variable("n".to_string()),
2097                alias: None,
2098            }],
2099            distinct: false,
2100            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2101                count: 5.into(),
2102                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2103                    variable: "n".to_string(),
2104                    label: None,
2105                    input: None,
2106                })),
2107            })),
2108        }));
2109
2110        let physical = planner.plan_adaptive(&logical).unwrap();
2111        assert!(physical.adaptive_context.is_some());
2112    }
2113
2114    #[test]
2115    fn test_plan_adaptive_with_sort() {
2116        let store = create_test_store();
2117        let planner = Planner::new(store);
2118
2119        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2120            items: vec![ReturnItem {
2121                expression: LogicalExpression::Variable("n".to_string()),
2122                alias: None,
2123            }],
2124            distinct: false,
2125            input: Box::new(LogicalOperator::Sort(SortOp {
2126                keys: vec![SortKey {
2127                    expression: LogicalExpression::Variable("n".to_string()),
2128                    order: SortOrder::Ascending,
2129                    nulls: None,
2130                }],
2131                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2132                    variable: "n".to_string(),
2133                    label: None,
2134                    input: None,
2135                })),
2136            })),
2137        }));
2138
2139        let physical = planner.plan_adaptive(&logical).unwrap();
2140        assert!(physical.adaptive_context.is_some());
2141    }
2142
2143    #[test]
2144    fn test_plan_adaptive_with_union() {
2145        let store = create_test_store();
2146        let planner = Planner::new(store);
2147
2148        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2149            items: vec![ReturnItem {
2150                expression: LogicalExpression::Variable("n".to_string()),
2151                alias: None,
2152            }],
2153            distinct: false,
2154            input: Box::new(LogicalOperator::Union(UnionOp {
2155                inputs: vec![
2156                    LogicalOperator::NodeScan(NodeScanOp {
2157                        variable: "n".to_string(),
2158                        label: Some("Person".to_string()),
2159                        input: None,
2160                    }),
2161                    LogicalOperator::NodeScan(NodeScanOp {
2162                        variable: "n".to_string(),
2163                        label: Some("Company".to_string()),
2164                        input: None,
2165                    }),
2166                ],
2167            })),
2168        }));
2169
2170        let physical = planner.plan_adaptive(&logical).unwrap();
2171        assert!(physical.adaptive_context.is_some());
2172    }
2173
2174    // ==================== Variable Length Path Tests ====================
2175
2176    #[test]
2177    fn test_plan_expand_variable_length() {
2178        let store = create_test_store();
2179        let planner = Planner::new(store);
2180
2181        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2182        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2183            items: vec![
2184                ReturnItem {
2185                    expression: LogicalExpression::Variable("a".to_string()),
2186                    alias: None,
2187                },
2188                ReturnItem {
2189                    expression: LogicalExpression::Variable("b".to_string()),
2190                    alias: None,
2191                },
2192            ],
2193            distinct: false,
2194            input: Box::new(LogicalOperator::Expand(ExpandOp {
2195                from_variable: "a".to_string(),
2196                to_variable: "b".to_string(),
2197                edge_variable: None,
2198                direction: ExpandDirection::Outgoing,
2199                edge_types: vec!["KNOWS".to_string()],
2200                min_hops: 1,
2201                max_hops: Some(3),
2202                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2203                    variable: "a".to_string(),
2204                    label: None,
2205                    input: None,
2206                })),
2207                path_alias: None,
2208                path_mode: PathMode::Walk,
2209            })),
2210        }));
2211
2212        let physical = planner.plan(&logical).unwrap();
2213        assert!(physical.columns().contains(&"a".to_string()));
2214        assert!(physical.columns().contains(&"b".to_string()));
2215    }
2216
2217    #[test]
2218    fn test_plan_expand_with_path_alias() {
2219        let store = create_test_store();
2220        let planner = Planner::new(store);
2221
2222        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2223        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2224            items: vec![
2225                ReturnItem {
2226                    expression: LogicalExpression::Variable("a".to_string()),
2227                    alias: None,
2228                },
2229                ReturnItem {
2230                    expression: LogicalExpression::Variable("b".to_string()),
2231                    alias: None,
2232                },
2233            ],
2234            distinct: false,
2235            input: Box::new(LogicalOperator::Expand(ExpandOp {
2236                from_variable: "a".to_string(),
2237                to_variable: "b".to_string(),
2238                edge_variable: None,
2239                direction: ExpandDirection::Outgoing,
2240                edge_types: vec!["KNOWS".to_string()],
2241                min_hops: 1,
2242                max_hops: Some(3),
2243                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2244                    variable: "a".to_string(),
2245                    label: None,
2246                    input: None,
2247                })),
2248                path_alias: Some("p".to_string()),
2249                path_mode: PathMode::Walk,
2250            })),
2251        }));
2252
2253        let physical = planner.plan(&logical).unwrap();
2254        // Verify plan was created successfully with expected output columns
2255        assert!(physical.columns().contains(&"a".to_string()));
2256        assert!(physical.columns().contains(&"b".to_string()));
2257    }
2258
2259    #[test]
2260    fn test_plan_expand_incoming() {
2261        let store = create_test_store();
2262        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2263            .with_factorized_execution(false);
2264
2265        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2266        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2267            items: vec![
2268                ReturnItem {
2269                    expression: LogicalExpression::Variable("a".to_string()),
2270                    alias: None,
2271                },
2272                ReturnItem {
2273                    expression: LogicalExpression::Variable("b".to_string()),
2274                    alias: None,
2275                },
2276            ],
2277            distinct: false,
2278            input: Box::new(LogicalOperator::Expand(ExpandOp {
2279                from_variable: "a".to_string(),
2280                to_variable: "b".to_string(),
2281                edge_variable: None,
2282                direction: ExpandDirection::Incoming,
2283                edge_types: vec!["KNOWS".to_string()],
2284                min_hops: 1,
2285                max_hops: Some(1),
2286                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2287                    variable: "a".to_string(),
2288                    label: None,
2289                    input: None,
2290                })),
2291                path_alias: None,
2292                path_mode: PathMode::Walk,
2293            })),
2294        }));
2295
2296        let physical = planner.plan(&logical).unwrap();
2297        assert!(physical.columns().contains(&"a".to_string()));
2298        assert!(physical.columns().contains(&"b".to_string()));
2299    }
2300
2301    #[test]
2302    fn test_plan_expand_both_directions() {
2303        let store = create_test_store();
2304        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2305            .with_factorized_execution(false);
2306
2307        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2308        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2309            items: vec![
2310                ReturnItem {
2311                    expression: LogicalExpression::Variable("a".to_string()),
2312                    alias: None,
2313                },
2314                ReturnItem {
2315                    expression: LogicalExpression::Variable("b".to_string()),
2316                    alias: None,
2317                },
2318            ],
2319            distinct: false,
2320            input: Box::new(LogicalOperator::Expand(ExpandOp {
2321                from_variable: "a".to_string(),
2322                to_variable: "b".to_string(),
2323                edge_variable: None,
2324                direction: ExpandDirection::Both,
2325                edge_types: vec!["KNOWS".to_string()],
2326                min_hops: 1,
2327                max_hops: Some(1),
2328                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2329                    variable: "a".to_string(),
2330                    label: None,
2331                    input: None,
2332                })),
2333                path_alias: None,
2334                path_mode: PathMode::Walk,
2335            })),
2336        }));
2337
2338        let physical = planner.plan(&logical).unwrap();
2339        assert!(physical.columns().contains(&"a".to_string()));
2340        assert!(physical.columns().contains(&"b".to_string()));
2341    }
2342
2343    // ==================== With Context Tests ====================
2344
2345    #[test]
2346    fn test_planner_with_context() {
2347        use crate::transaction::TransactionManager;
2348
2349        let store = create_test_store();
2350        let transaction_manager = Arc::new(TransactionManager::new());
2351        let transaction_id = transaction_manager.begin();
2352        let epoch = transaction_manager.current_epoch();
2353
2354        let planner = Planner::with_context(
2355            Arc::clone(&store) as Arc<dyn GraphStore>,
2356            Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2357            Arc::clone(&transaction_manager),
2358            Some(transaction_id),
2359            epoch,
2360        );
2361
2362        assert_eq!(planner.transaction_id(), Some(transaction_id));
2363        assert!(planner.transaction_manager().is_some());
2364        assert_eq!(planner.viewing_epoch(), epoch);
2365    }
2366
2367    #[test]
2368    fn test_planner_with_factorized_execution_disabled() {
2369        let store = create_test_store();
2370        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2371            .with_factorized_execution(false);
2372
2373        // Two consecutive expands - should NOT use factorized execution
2374        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2375            items: vec![
2376                ReturnItem {
2377                    expression: LogicalExpression::Variable("a".to_string()),
2378                    alias: None,
2379                },
2380                ReturnItem {
2381                    expression: LogicalExpression::Variable("c".to_string()),
2382                    alias: None,
2383                },
2384            ],
2385            distinct: false,
2386            input: Box::new(LogicalOperator::Expand(ExpandOp {
2387                from_variable: "b".to_string(),
2388                to_variable: "c".to_string(),
2389                edge_variable: None,
2390                direction: ExpandDirection::Outgoing,
2391                edge_types: vec![],
2392                min_hops: 1,
2393                max_hops: Some(1),
2394                input: Box::new(LogicalOperator::Expand(ExpandOp {
2395                    from_variable: "a".to_string(),
2396                    to_variable: "b".to_string(),
2397                    edge_variable: None,
2398                    direction: ExpandDirection::Outgoing,
2399                    edge_types: vec![],
2400                    min_hops: 1,
2401                    max_hops: Some(1),
2402                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2403                        variable: "a".to_string(),
2404                        label: None,
2405                        input: None,
2406                    })),
2407                    path_alias: None,
2408                    path_mode: PathMode::Walk,
2409                })),
2410                path_alias: None,
2411                path_mode: PathMode::Walk,
2412            })),
2413        }));
2414
2415        let physical = planner.plan(&logical).unwrap();
2416        assert!(physical.columns().contains(&"a".to_string()));
2417        assert!(physical.columns().contains(&"c".to_string()));
2418    }
2419
2420    // ==================== Sort with Property Tests ====================
2421
2422    #[test]
2423    fn test_plan_sort_by_property() {
2424        let store = create_test_store();
2425        let planner = Planner::new(store);
2426
2427        // MATCH (n) RETURN n ORDER BY n.name ASC
2428        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2429            items: vec![ReturnItem {
2430                expression: LogicalExpression::Variable("n".to_string()),
2431                alias: None,
2432            }],
2433            distinct: false,
2434            input: Box::new(LogicalOperator::Sort(SortOp {
2435                keys: vec![SortKey {
2436                    expression: LogicalExpression::Property {
2437                        variable: "n".to_string(),
2438                        property: "name".to_string(),
2439                    },
2440                    order: SortOrder::Ascending,
2441                    nulls: None,
2442                }],
2443                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2444                    variable: "n".to_string(),
2445                    label: None,
2446                    input: None,
2447                })),
2448            })),
2449        }));
2450
2451        let physical = planner.plan(&logical).unwrap();
2452        // Should have the property column projected
2453        assert!(physical.columns().contains(&"n".to_string()));
2454    }
2455
2456    // ==================== Scan with Input Tests ====================
2457
2458    #[test]
2459    fn test_plan_scan_with_input() {
2460        let store = create_test_store();
2461        let planner = Planner::new(store);
2462
2463        // A scan with another scan as input (for chained patterns)
2464        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2465            items: vec![
2466                ReturnItem {
2467                    expression: LogicalExpression::Variable("a".to_string()),
2468                    alias: None,
2469                },
2470                ReturnItem {
2471                    expression: LogicalExpression::Variable("b".to_string()),
2472                    alias: None,
2473                },
2474            ],
2475            distinct: false,
2476            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2477                variable: "b".to_string(),
2478                label: Some("Company".to_string()),
2479                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2480                    variable: "a".to_string(),
2481                    label: Some("Person".to_string()),
2482                    input: None,
2483                }))),
2484            })),
2485        }));
2486
2487        let physical = planner.plan(&logical).unwrap();
2488        assert!(physical.columns().contains(&"a".to_string()));
2489        assert!(physical.columns().contains(&"b".to_string()));
2490    }
2491}