Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TxId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33    EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34    FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35    HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37    MapCollectOperator, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator,
38    NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator, ParameterScanOperator,
39    ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator, ScanOperator,
40    SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SortDirection,
41    SortKey as PhysicalSortKey, SortOperator, UnwindOperator, VariableLengthExpandOperator,
42};
43use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
44use std::collections::HashMap;
45use std::sync::Arc;
46
47use crate::query::planner::common;
48use crate::query::planner::common::expression_to_string;
49use crate::query::planner::{
50    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
51    convert_unary_op, value_to_logical_type,
52};
53use crate::transaction::TransactionManager;
54
55/// Range bounds for property-based range queries.
56struct RangeBounds<'a> {
57    min: Option<&'a Value>,
58    max: Option<&'a Value>,
59    min_inclusive: bool,
60    max_inclusive: bool,
61}
62
63/// Converts a logical plan to a physical operator tree for LPG stores.
64pub struct Planner {
65    /// The graph store (supports both read and write operations).
66    pub(super) store: Arc<dyn GraphStoreMut>,
67    /// Transaction manager for MVCC operations.
68    pub(super) tx_manager: Option<Arc<TransactionManager>>,
69    /// Current transaction ID (if in a transaction).
70    pub(super) tx_id: Option<TxId>,
71    /// Epoch to use for visibility checks.
72    pub(super) viewing_epoch: EpochId,
73    /// Counter for generating unique anonymous edge column names.
74    pub(super) anon_edge_counter: std::cell::Cell<u32>,
75    /// Whether to use factorized execution for multi-hop queries.
76    pub(super) factorized_execution: bool,
77    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
78    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
79    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
80    /// Variables that hold edge IDs (from MATCH edge patterns).
81    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
82    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
83    /// Optional constraint validator for schema enforcement during mutations.
84    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
85    /// Catalog for user-defined procedure lookup.
86    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
87    /// Shared parameter state for the currently planning correlated Apply.
88    /// Set by `plan_apply` before planning the inner operator, consumed by
89    /// `plan_operator` when encountering `ParameterScan`.
90    pub(super) correlated_param_state:
91        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
92    /// Variables from variable-length expand patterns (group-list variables).
93    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
94    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
95}
96
97impl Planner {
98    /// Creates a new planner with the given store.
99    ///
100    /// This creates a planner without transaction context, using the current
101    /// epoch from the store for visibility.
102    #[must_use]
103    pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
104        let epoch = store.current_epoch();
105        Self {
106            store,
107            tx_manager: None,
108            tx_id: None,
109            viewing_epoch: epoch,
110            anon_edge_counter: std::cell::Cell::new(0),
111            factorized_execution: true,
112            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
113            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
114            validator: None,
115            catalog: None,
116            correlated_param_state: std::cell::RefCell::new(None),
117            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
118        }
119    }
120
121    /// Creates a new planner with transaction context for MVCC-aware planning.
122    #[must_use]
123    pub fn with_context(
124        store: Arc<dyn GraphStoreMut>,
125        tx_manager: Arc<TransactionManager>,
126        tx_id: Option<TxId>,
127        viewing_epoch: EpochId,
128    ) -> Self {
129        Self {
130            store,
131            tx_manager: Some(tx_manager),
132            tx_id,
133            viewing_epoch,
134            anon_edge_counter: std::cell::Cell::new(0),
135            factorized_execution: true,
136            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
137            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
138            validator: None,
139            catalog: None,
140            correlated_param_state: std::cell::RefCell::new(None),
141            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
142        }
143    }
144
145    /// Returns the viewing epoch for this planner.
146    #[must_use]
147    pub fn viewing_epoch(&self) -> EpochId {
148        self.viewing_epoch
149    }
150
151    /// Returns the transaction ID for this planner, if any.
152    #[must_use]
153    pub fn tx_id(&self) -> Option<TxId> {
154        self.tx_id
155    }
156
157    /// Returns a reference to the transaction manager, if available.
158    #[must_use]
159    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
160        self.tx_manager.as_ref()
161    }
162
163    /// Enables or disables factorized execution for multi-hop queries.
164    #[must_use]
165    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
166        self.factorized_execution = enabled;
167        self
168    }
169
170    /// Sets the constraint validator for schema enforcement during mutations.
171    #[must_use]
172    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
173        self.validator = Some(validator);
174        self
175    }
176
177    /// Sets the catalog for user-defined procedure lookup.
178    #[must_use]
179    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
180        self.catalog = Some(catalog);
181        self
182    }
183
184    /// Counts consecutive single-hop expand operations.
185    ///
186    /// Returns the count and the deepest non-expand operator (the base of the chain).
187    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
188        match op {
189            LogicalOperator::Expand(expand) => {
190                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
191
192                if is_single_hop {
193                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
194                    (inner_count + 1, base)
195                } else {
196                    (0, op)
197                }
198            }
199            _ => (0, op),
200        }
201    }
202
203    /// Collects expand operations from the outermost down to the base.
204    ///
205    /// Returns expands in order from innermost (base) to outermost.
206    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
207        let mut chain = Vec::new();
208        let mut current = op;
209
210        while let LogicalOperator::Expand(expand) = current {
211            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
212            if !is_single_hop {
213                break;
214            }
215            chain.push(expand);
216            current = &expand.input;
217        }
218
219        chain.reverse();
220        chain
221    }
222
223    /// Plans a logical plan into a physical operator.
224    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
225        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
226        Ok(PhysicalPlan {
227            operator,
228            columns,
229            adaptive_context: None,
230        })
231    }
232
233    /// Plans a logical plan with adaptive execution support.
234    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
235        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
236
237        let mut adaptive_context = AdaptiveContext::new();
238        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
239
240        Ok(PhysicalPlan {
241            operator,
242            columns,
243            adaptive_context: Some(adaptive_context),
244        })
245    }
246
247    /// Collects cardinality estimates from the logical plan into an adaptive context.
248    fn collect_cardinality_estimates(
249        &self,
250        op: &LogicalOperator,
251        ctx: &mut AdaptiveContext,
252        depth: usize,
253    ) {
254        match op {
255            LogicalOperator::NodeScan(scan) => {
256                let estimate = if let Some(label) = &scan.label {
257                    self.store.nodes_by_label(label).len() as f64
258                } else {
259                    self.store.node_count() as f64
260                };
261                let id = format!("scan_{}", scan.variable);
262                ctx.set_estimate(&id, estimate);
263
264                if let Some(input) = &scan.input {
265                    self.collect_cardinality_estimates(input, ctx, depth + 1);
266                }
267            }
268            LogicalOperator::Filter(filter) => {
269                let input_estimate = self.estimate_cardinality(&filter.input);
270                let estimate = input_estimate * 0.3;
271                let id = format!("filter_{depth}");
272                ctx.set_estimate(&id, estimate);
273
274                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
275            }
276            LogicalOperator::Expand(expand) => {
277                let input_estimate = self.estimate_cardinality(&expand.input);
278                let stats = self.store.statistics();
279                let avg_degree = self.estimate_expand_degree(&stats, expand);
280                let estimate = input_estimate * avg_degree;
281                let id = format!("expand_{}", expand.to_variable);
282                ctx.set_estimate(&id, estimate);
283
284                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
285            }
286            LogicalOperator::Join(join) => {
287                let left_est = self.estimate_cardinality(&join.left);
288                let right_est = self.estimate_cardinality(&join.right);
289                let estimate = (left_est * right_est).sqrt();
290                let id = format!("join_{depth}");
291                ctx.set_estimate(&id, estimate);
292
293                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
294                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
295            }
296            LogicalOperator::Aggregate(agg) => {
297                let input_estimate = self.estimate_cardinality(&agg.input);
298                let estimate = if agg.group_by.is_empty() {
299                    1.0
300                } else {
301                    (input_estimate * 0.1).max(1.0)
302                };
303                let id = format!("aggregate_{depth}");
304                ctx.set_estimate(&id, estimate);
305
306                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
307            }
308            LogicalOperator::Distinct(distinct) => {
309                let input_estimate = self.estimate_cardinality(&distinct.input);
310                let estimate = (input_estimate * 0.5).max(1.0);
311                let id = format!("distinct_{depth}");
312                ctx.set_estimate(&id, estimate);
313
314                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
315            }
316            LogicalOperator::Return(ret) => {
317                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
318            }
319            LogicalOperator::Limit(limit) => {
320                let input_estimate = self.estimate_cardinality(&limit.input);
321                let estimate = (input_estimate).min(limit.count as f64);
322                let id = format!("limit_{depth}");
323                ctx.set_estimate(&id, estimate);
324
325                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
326            }
327            LogicalOperator::Skip(skip) => {
328                let input_estimate = self.estimate_cardinality(&skip.input);
329                let estimate = (input_estimate - skip.count as f64).max(0.0);
330                let id = format!("skip_{depth}");
331                ctx.set_estimate(&id, estimate);
332
333                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
334            }
335            LogicalOperator::Sort(sort) => {
336                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
337            }
338            LogicalOperator::Union(union) => {
339                let estimate: f64 = union
340                    .inputs
341                    .iter()
342                    .map(|input| self.estimate_cardinality(input))
343                    .sum();
344                let id = format!("union_{depth}");
345                ctx.set_estimate(&id, estimate);
346
347                for input in &union.inputs {
348                    self.collect_cardinality_estimates(input, ctx, depth + 1);
349                }
350            }
351            _ => {
352                // For other operators, try to recurse into known input patterns
353            }
354        }
355    }
356
357    /// Estimates cardinality for a logical operator subtree.
358    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
359        match op {
360            LogicalOperator::NodeScan(scan) => {
361                if let Some(label) = &scan.label {
362                    self.store.nodes_by_label(label).len() as f64
363                } else {
364                    self.store.node_count() as f64
365                }
366            }
367            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
368            LogicalOperator::Expand(expand) => {
369                let stats = self.store.statistics();
370                let avg_degree = self.estimate_expand_degree(&stats, expand);
371                self.estimate_cardinality(&expand.input) * avg_degree
372            }
373            LogicalOperator::Join(join) => {
374                let left = self.estimate_cardinality(&join.left);
375                let right = self.estimate_cardinality(&join.right);
376                (left * right).sqrt()
377            }
378            LogicalOperator::Aggregate(agg) => {
379                if agg.group_by.is_empty() {
380                    1.0
381                } else {
382                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
383                }
384            }
385            LogicalOperator::Distinct(distinct) => {
386                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
387            }
388            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
389            LogicalOperator::Limit(limit) => self
390                .estimate_cardinality(&limit.input)
391                .min(limit.count as f64),
392            LogicalOperator::Skip(skip) => {
393                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
394            }
395            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
396            LogicalOperator::Union(union) => union
397                .inputs
398                .iter()
399                .map(|input| self.estimate_cardinality(input))
400                .sum(),
401            LogicalOperator::Except(except) => {
402                let left = self.estimate_cardinality(&except.left);
403                let right = self.estimate_cardinality(&except.right);
404                (left - right).max(0.0)
405            }
406            LogicalOperator::Intersect(intersect) => {
407                let left = self.estimate_cardinality(&intersect.left);
408                let right = self.estimate_cardinality(&intersect.right);
409                left.min(right)
410            }
411            LogicalOperator::Otherwise(otherwise) => self
412                .estimate_cardinality(&otherwise.left)
413                .max(self.estimate_cardinality(&otherwise.right)),
414            _ => 1000.0,
415        }
416    }
417
418    /// Estimates the average edge degree for an expand operation using store statistics.
419    fn estimate_expand_degree(
420        &self,
421        stats: &grafeo_core::statistics::Statistics,
422        expand: &ExpandOp,
423    ) -> f64 {
424        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
425        if expand.edge_types.len() == 1 {
426            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
427        } else if stats.total_nodes > 0 {
428            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
429        } else {
430            10.0
431        }
432    }
433
434    /// Plans a single logical operator.
435    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
436        match op {
437            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
438            LogicalOperator::Expand(expand) => {
439                if self.factorized_execution {
440                    let (chain_len, _base) = Self::count_expand_chain(op);
441                    if chain_len >= 2 {
442                        return self.plan_expand_chain(op);
443                    }
444                }
445                self.plan_expand(expand)
446            }
447            LogicalOperator::Return(ret) => self.plan_return(ret),
448            LogicalOperator::Filter(filter) => self.plan_filter(filter),
449            LogicalOperator::Project(project) => self.plan_project(project),
450            LogicalOperator::Limit(limit) => self.plan_limit(limit),
451            LogicalOperator::Skip(skip) => self.plan_skip(skip),
452            LogicalOperator::Sort(sort) => self.plan_sort(sort),
453            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
454            LogicalOperator::Join(join) => self.plan_join(join),
455            LogicalOperator::Union(union) => self.plan_union(union),
456            LogicalOperator::Except(except) => self.plan_except(except),
457            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
458            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
459            LogicalOperator::Apply(apply) => self.plan_apply(apply),
460            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
461            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
462            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
463            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
464            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
465            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
466            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
467            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
468            LogicalOperator::Merge(merge) => self.plan_merge(merge),
469            LogicalOperator::MergeRelationship(merge_rel) => {
470                self.plan_merge_relationship(merge_rel)
471            }
472            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
473            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
474            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
475            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
476            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
477            #[cfg(feature = "algos")]
478            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
479            #[cfg(not(feature = "algos"))]
480            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
481                "CALL procedures require the 'algos' feature".to_string(),
482            )),
483            LogicalOperator::ParameterScan(param_scan) => {
484                let state = self
485                    .correlated_param_state
486                    .borrow()
487                    .clone()
488                    .ok_or_else(|| {
489                        Error::Internal(
490                            "ParameterScan without correlated Apply context".to_string(),
491                        )
492                    })?;
493                let columns = param_scan.columns.clone();
494                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
495                Ok((operator, columns))
496            }
497            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
498            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
499            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
500            LogicalOperator::VectorScan(_) => Err(Error::Internal(
501                "VectorScan requires vector-index feature".to_string(),
502            )),
503            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
504                "VectorJoin requires vector-index feature".to_string(),
505            )),
506            _ => Err(Error::Internal(format!(
507                "Unsupported operator: {:?}",
508                std::mem::discriminant(op)
509            ))),
510        }
511    }
512
513    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
514    fn plan_horizontal_aggregate(
515        &self,
516        ha: &HorizontalAggregateOp,
517    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
518        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
519
520        let list_col_idx = child_columns
521            .iter()
522            .position(|c| c == &ha.list_column)
523            .ok_or_else(|| {
524                Error::Internal(format!(
525                    "HorizontalAggregate list column '{}' not found in {:?}",
526                    ha.list_column, child_columns
527                ))
528            })?;
529
530        let entity_kind = match ha.entity_kind {
531            LogicalEntityKind::Edge => EntityKind::Edge,
532            LogicalEntityKind::Node => EntityKind::Node,
533        };
534
535        let function = convert_aggregate_function(ha.function);
536        let input_column_count = child_columns.len();
537
538        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
539            child_op,
540            list_col_idx,
541            entity_kind,
542            function,
543            ha.property.clone(),
544            Arc::clone(&self.store) as Arc<dyn GraphStore>,
545            input_column_count,
546        ));
547
548        let mut columns = child_columns;
549        columns.push(ha.alias.clone());
550        // Mark the result as a scalar column
551        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
552
553        Ok((operator, columns))
554    }
555
556    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
557    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
558        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
559        let key_idx = child_columns
560            .iter()
561            .position(|c| c == &mc.key_var)
562            .ok_or_else(|| {
563                Error::Internal(format!(
564                    "MapCollect key '{}' not in columns {:?}",
565                    mc.key_var, child_columns
566                ))
567            })?;
568        let value_idx = child_columns
569            .iter()
570            .position(|c| c == &mc.value_var)
571            .ok_or_else(|| {
572                Error::Internal(format!(
573                    "MapCollect value '{}' not in columns {:?}",
574                    mc.value_var, child_columns
575                ))
576            })?;
577        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
578        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
579        Ok((operator, vec![mc.alias.clone()]))
580    }
581}
582
583/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
584#[cfg(feature = "algos")]
585struct StaticResultOperator {
586    rows: Vec<Vec<Value>>,
587    column_indices: Vec<usize>,
588    row_index: usize,
589}
590
591#[cfg(feature = "algos")]
592impl Operator for StaticResultOperator {
593    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
594        use grafeo_core::execution::DataChunk;
595
596        if self.row_index >= self.rows.len() {
597            return Ok(None);
598        }
599
600        let remaining = self.rows.len() - self.row_index;
601        let chunk_rows = remaining.min(1024);
602        let col_count = self.column_indices.len();
603
604        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
605        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
606
607        for row_offset in 0..chunk_rows {
608            let row = &self.rows[self.row_index + row_offset];
609            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
610                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
611                if let Some(col) = chunk.column_mut(col_idx) {
612                    col.push_value(value);
613                }
614            }
615        }
616        chunk.set_count(chunk_rows);
617
618        self.row_index += chunk_rows;
619        Ok(Some(chunk))
620    }
621
622    fn reset(&mut self) {
623        self.row_index = 0;
624    }
625
626    fn name(&self) -> &'static str {
627        "StaticResult"
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::query::plan::{
635        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
636        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
637        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
638        SkipOp as LogicalSkipOp, SortKey, SortOp,
639    };
640    use grafeo_common::types::Value;
641    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
642    use grafeo_core::graph::GraphStoreMut;
643    use grafeo_core::graph::lpg::LpgStore;
644
645    fn create_test_store() -> Arc<dyn GraphStoreMut> {
646        let store = Arc::new(LpgStore::new().unwrap());
647        store.create_node(&["Person"]);
648        store.create_node(&["Person"]);
649        store.create_node(&["Company"]);
650        store
651    }
652
653    // ==================== Simple Scan Tests ====================
654
655    #[test]
656    fn test_plan_simple_scan() {
657        let store = create_test_store();
658        let planner = Planner::new(store);
659
660        // MATCH (n:Person) RETURN n
661        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
662            items: vec![ReturnItem {
663                expression: LogicalExpression::Variable("n".to_string()),
664                alias: None,
665            }],
666            distinct: false,
667            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
668                variable: "n".to_string(),
669                label: Some("Person".to_string()),
670                input: None,
671            })),
672        }));
673
674        let physical = planner.plan(&logical).unwrap();
675        assert_eq!(physical.columns(), &["n"]);
676    }
677
678    #[test]
679    fn test_plan_scan_without_label() {
680        let store = create_test_store();
681        let planner = Planner::new(store);
682
683        // MATCH (n) RETURN n
684        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
685            items: vec![ReturnItem {
686                expression: LogicalExpression::Variable("n".to_string()),
687                alias: None,
688            }],
689            distinct: false,
690            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
691                variable: "n".to_string(),
692                label: None,
693                input: None,
694            })),
695        }));
696
697        let physical = planner.plan(&logical).unwrap();
698        assert_eq!(physical.columns(), &["n"]);
699    }
700
701    #[test]
702    fn test_plan_return_with_alias() {
703        let store = create_test_store();
704        let planner = Planner::new(store);
705
706        // MATCH (n:Person) RETURN n AS person
707        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
708            items: vec![ReturnItem {
709                expression: LogicalExpression::Variable("n".to_string()),
710                alias: Some("person".to_string()),
711            }],
712            distinct: false,
713            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
714                variable: "n".to_string(),
715                label: Some("Person".to_string()),
716                input: None,
717            })),
718        }));
719
720        let physical = planner.plan(&logical).unwrap();
721        assert_eq!(physical.columns(), &["person"]);
722    }
723
724    #[test]
725    fn test_plan_return_property() {
726        let store = create_test_store();
727        let planner = Planner::new(store);
728
729        // MATCH (n:Person) RETURN n.name
730        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
731            items: vec![ReturnItem {
732                expression: LogicalExpression::Property {
733                    variable: "n".to_string(),
734                    property: "name".to_string(),
735                },
736                alias: None,
737            }],
738            distinct: false,
739            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
740                variable: "n".to_string(),
741                label: Some("Person".to_string()),
742                input: None,
743            })),
744        }));
745
746        let physical = planner.plan(&logical).unwrap();
747        assert_eq!(physical.columns(), &["n.name"]);
748    }
749
750    #[test]
751    fn test_plan_return_literal() {
752        let store = create_test_store();
753        let planner = Planner::new(store);
754
755        // MATCH (n) RETURN 42 AS answer
756        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
757            items: vec![ReturnItem {
758                expression: LogicalExpression::Literal(Value::Int64(42)),
759                alias: Some("answer".to_string()),
760            }],
761            distinct: false,
762            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
763                variable: "n".to_string(),
764                label: None,
765                input: None,
766            })),
767        }));
768
769        let physical = planner.plan(&logical).unwrap();
770        assert_eq!(physical.columns(), &["answer"]);
771    }
772
773    // ==================== Filter Tests ====================
774
775    #[test]
776    fn test_plan_filter_equality() {
777        let store = create_test_store();
778        let planner = Planner::new(store);
779
780        // MATCH (n:Person) WHERE n.age = 30 RETURN n
781        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
782            items: vec![ReturnItem {
783                expression: LogicalExpression::Variable("n".to_string()),
784                alias: None,
785            }],
786            distinct: false,
787            input: Box::new(LogicalOperator::Filter(FilterOp {
788                predicate: LogicalExpression::Binary {
789                    left: Box::new(LogicalExpression::Property {
790                        variable: "n".to_string(),
791                        property: "age".to_string(),
792                    }),
793                    op: BinaryOp::Eq,
794                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
795                },
796                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
797                    variable: "n".to_string(),
798                    label: Some("Person".to_string()),
799                    input: None,
800                })),
801                pushdown_hint: None,
802            })),
803        }));
804
805        let physical = planner.plan(&logical).unwrap();
806        assert_eq!(physical.columns(), &["n"]);
807    }
808
809    #[test]
810    fn test_plan_filter_compound_and() {
811        let store = create_test_store();
812        let planner = Planner::new(store);
813
814        // WHERE n.age > 20 AND n.age < 40
815        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
816            items: vec![ReturnItem {
817                expression: LogicalExpression::Variable("n".to_string()),
818                alias: None,
819            }],
820            distinct: false,
821            input: Box::new(LogicalOperator::Filter(FilterOp {
822                predicate: LogicalExpression::Binary {
823                    left: Box::new(LogicalExpression::Binary {
824                        left: Box::new(LogicalExpression::Property {
825                            variable: "n".to_string(),
826                            property: "age".to_string(),
827                        }),
828                        op: BinaryOp::Gt,
829                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
830                    }),
831                    op: BinaryOp::And,
832                    right: Box::new(LogicalExpression::Binary {
833                        left: Box::new(LogicalExpression::Property {
834                            variable: "n".to_string(),
835                            property: "age".to_string(),
836                        }),
837                        op: BinaryOp::Lt,
838                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
839                    }),
840                },
841                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
842                    variable: "n".to_string(),
843                    label: None,
844                    input: None,
845                })),
846                pushdown_hint: None,
847            })),
848        }));
849
850        let physical = planner.plan(&logical).unwrap();
851        assert_eq!(physical.columns(), &["n"]);
852    }
853
854    #[test]
855    fn test_plan_filter_unary_not() {
856        let store = create_test_store();
857        let planner = Planner::new(store);
858
859        // WHERE NOT n.active
860        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
861            items: vec![ReturnItem {
862                expression: LogicalExpression::Variable("n".to_string()),
863                alias: None,
864            }],
865            distinct: false,
866            input: Box::new(LogicalOperator::Filter(FilterOp {
867                predicate: LogicalExpression::Unary {
868                    op: UnaryOp::Not,
869                    operand: Box::new(LogicalExpression::Property {
870                        variable: "n".to_string(),
871                        property: "active".to_string(),
872                    }),
873                },
874                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
875                    variable: "n".to_string(),
876                    label: None,
877                    input: None,
878                })),
879                pushdown_hint: None,
880            })),
881        }));
882
883        let physical = planner.plan(&logical).unwrap();
884        assert_eq!(physical.columns(), &["n"]);
885    }
886
887    #[test]
888    fn test_plan_filter_is_null() {
889        let store = create_test_store();
890        let planner = Planner::new(store);
891
892        // WHERE n.email IS NULL
893        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
894            items: vec![ReturnItem {
895                expression: LogicalExpression::Variable("n".to_string()),
896                alias: None,
897            }],
898            distinct: false,
899            input: Box::new(LogicalOperator::Filter(FilterOp {
900                predicate: LogicalExpression::Unary {
901                    op: UnaryOp::IsNull,
902                    operand: Box::new(LogicalExpression::Property {
903                        variable: "n".to_string(),
904                        property: "email".to_string(),
905                    }),
906                },
907                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
908                    variable: "n".to_string(),
909                    label: None,
910                    input: None,
911                })),
912                pushdown_hint: None,
913            })),
914        }));
915
916        let physical = planner.plan(&logical).unwrap();
917        assert_eq!(physical.columns(), &["n"]);
918    }
919
920    #[test]
921    fn test_plan_filter_function_call() {
922        let store = create_test_store();
923        let planner = Planner::new(store);
924
925        // WHERE size(n.friends) > 0
926        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
927            items: vec![ReturnItem {
928                expression: LogicalExpression::Variable("n".to_string()),
929                alias: None,
930            }],
931            distinct: false,
932            input: Box::new(LogicalOperator::Filter(FilterOp {
933                predicate: LogicalExpression::Binary {
934                    left: Box::new(LogicalExpression::FunctionCall {
935                        name: "size".to_string(),
936                        args: vec![LogicalExpression::Property {
937                            variable: "n".to_string(),
938                            property: "friends".to_string(),
939                        }],
940                        distinct: false,
941                    }),
942                    op: BinaryOp::Gt,
943                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
944                },
945                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
946                    variable: "n".to_string(),
947                    label: None,
948                    input: None,
949                })),
950                pushdown_hint: None,
951            })),
952        }));
953
954        let physical = planner.plan(&logical).unwrap();
955        assert_eq!(physical.columns(), &["n"]);
956    }
957
958    // ==================== Expand Tests ====================
959
960    #[test]
961    fn test_plan_expand_outgoing() {
962        let store = create_test_store();
963        let planner = Planner::new(store);
964
965        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
966        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
967            items: vec![
968                ReturnItem {
969                    expression: LogicalExpression::Variable("a".to_string()),
970                    alias: None,
971                },
972                ReturnItem {
973                    expression: LogicalExpression::Variable("b".to_string()),
974                    alias: None,
975                },
976            ],
977            distinct: false,
978            input: Box::new(LogicalOperator::Expand(ExpandOp {
979                from_variable: "a".to_string(),
980                to_variable: "b".to_string(),
981                edge_variable: None,
982                direction: ExpandDirection::Outgoing,
983                edge_types: vec!["KNOWS".to_string()],
984                min_hops: 1,
985                max_hops: Some(1),
986                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
987                    variable: "a".to_string(),
988                    label: Some("Person".to_string()),
989                    input: None,
990                })),
991                path_alias: None,
992                path_mode: PathMode::Walk,
993            })),
994        }));
995
996        let physical = planner.plan(&logical).unwrap();
997        // The return should have columns [a, b]
998        assert!(physical.columns().contains(&"a".to_string()));
999        assert!(physical.columns().contains(&"b".to_string()));
1000    }
1001
1002    #[test]
1003    fn test_plan_expand_with_edge_variable() {
1004        let store = create_test_store();
1005        let planner = Planner::new(store);
1006
1007        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1008        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1009            items: vec![
1010                ReturnItem {
1011                    expression: LogicalExpression::Variable("a".to_string()),
1012                    alias: None,
1013                },
1014                ReturnItem {
1015                    expression: LogicalExpression::Variable("r".to_string()),
1016                    alias: None,
1017                },
1018                ReturnItem {
1019                    expression: LogicalExpression::Variable("b".to_string()),
1020                    alias: None,
1021                },
1022            ],
1023            distinct: false,
1024            input: Box::new(LogicalOperator::Expand(ExpandOp {
1025                from_variable: "a".to_string(),
1026                to_variable: "b".to_string(),
1027                edge_variable: Some("r".to_string()),
1028                direction: ExpandDirection::Outgoing,
1029                edge_types: vec!["KNOWS".to_string()],
1030                min_hops: 1,
1031                max_hops: Some(1),
1032                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1033                    variable: "a".to_string(),
1034                    label: None,
1035                    input: None,
1036                })),
1037                path_alias: None,
1038                path_mode: PathMode::Walk,
1039            })),
1040        }));
1041
1042        let physical = planner.plan(&logical).unwrap();
1043        assert!(physical.columns().contains(&"a".to_string()));
1044        assert!(physical.columns().contains(&"r".to_string()));
1045        assert!(physical.columns().contains(&"b".to_string()));
1046    }
1047
1048    // ==================== Limit/Skip/Sort Tests ====================
1049
1050    #[test]
1051    fn test_plan_limit() {
1052        let store = create_test_store();
1053        let planner = Planner::new(store);
1054
1055        // MATCH (n) RETURN n LIMIT 10
1056        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1057            items: vec![ReturnItem {
1058                expression: LogicalExpression::Variable("n".to_string()),
1059                alias: None,
1060            }],
1061            distinct: false,
1062            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1063                count: 10,
1064                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1065                    variable: "n".to_string(),
1066                    label: None,
1067                    input: None,
1068                })),
1069            })),
1070        }));
1071
1072        let physical = planner.plan(&logical).unwrap();
1073        assert_eq!(physical.columns(), &["n"]);
1074    }
1075
1076    #[test]
1077    fn test_plan_skip() {
1078        let store = create_test_store();
1079        let planner = Planner::new(store);
1080
1081        // MATCH (n) RETURN n SKIP 5
1082        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1083            items: vec![ReturnItem {
1084                expression: LogicalExpression::Variable("n".to_string()),
1085                alias: None,
1086            }],
1087            distinct: false,
1088            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1089                count: 5,
1090                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1091                    variable: "n".to_string(),
1092                    label: None,
1093                    input: None,
1094                })),
1095            })),
1096        }));
1097
1098        let physical = planner.plan(&logical).unwrap();
1099        assert_eq!(physical.columns(), &["n"]);
1100    }
1101
1102    #[test]
1103    fn test_plan_sort() {
1104        let store = create_test_store();
1105        let planner = Planner::new(store);
1106
1107        // MATCH (n) RETURN n ORDER BY n.name ASC
1108        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1109            items: vec![ReturnItem {
1110                expression: LogicalExpression::Variable("n".to_string()),
1111                alias: None,
1112            }],
1113            distinct: false,
1114            input: Box::new(LogicalOperator::Sort(SortOp {
1115                keys: vec![SortKey {
1116                    expression: LogicalExpression::Variable("n".to_string()),
1117                    order: SortOrder::Ascending,
1118                    nulls: None,
1119                }],
1120                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1121                    variable: "n".to_string(),
1122                    label: None,
1123                    input: None,
1124                })),
1125            })),
1126        }));
1127
1128        let physical = planner.plan(&logical).unwrap();
1129        assert_eq!(physical.columns(), &["n"]);
1130    }
1131
1132    #[test]
1133    fn test_plan_sort_descending() {
1134        let store = create_test_store();
1135        let planner = Planner::new(store);
1136
1137        // ORDER BY n DESC
1138        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1139            items: vec![ReturnItem {
1140                expression: LogicalExpression::Variable("n".to_string()),
1141                alias: None,
1142            }],
1143            distinct: false,
1144            input: Box::new(LogicalOperator::Sort(SortOp {
1145                keys: vec![SortKey {
1146                    expression: LogicalExpression::Variable("n".to_string()),
1147                    order: SortOrder::Descending,
1148                    nulls: None,
1149                }],
1150                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1151                    variable: "n".to_string(),
1152                    label: None,
1153                    input: None,
1154                })),
1155            })),
1156        }));
1157
1158        let physical = planner.plan(&logical).unwrap();
1159        assert_eq!(physical.columns(), &["n"]);
1160    }
1161
1162    #[test]
1163    fn test_plan_distinct() {
1164        let store = create_test_store();
1165        let planner = Planner::new(store);
1166
1167        // MATCH (n) RETURN DISTINCT n
1168        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1169            items: vec![ReturnItem {
1170                expression: LogicalExpression::Variable("n".to_string()),
1171                alias: None,
1172            }],
1173            distinct: false,
1174            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1175                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1176                    variable: "n".to_string(),
1177                    label: None,
1178                    input: None,
1179                })),
1180                columns: None,
1181            })),
1182        }));
1183
1184        let physical = planner.plan(&logical).unwrap();
1185        assert_eq!(physical.columns(), &["n"]);
1186    }
1187
1188    #[test]
1189    fn test_plan_distinct_with_columns() {
1190        let store = create_test_store();
1191        let planner = Planner::new(store);
1192
1193        // DISTINCT on specific columns (column-specific dedup)
1194        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1195            items: vec![ReturnItem {
1196                expression: LogicalExpression::Variable("n".to_string()),
1197                alias: None,
1198            }],
1199            distinct: false,
1200            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1201                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1202                    variable: "n".to_string(),
1203                    label: None,
1204                    input: None,
1205                })),
1206                columns: Some(vec!["n".to_string()]),
1207            })),
1208        }));
1209
1210        let physical = planner.plan(&logical).unwrap();
1211        assert_eq!(physical.columns(), &["n"]);
1212    }
1213
1214    #[test]
1215    fn test_plan_distinct_with_nonexistent_columns() {
1216        let store = create_test_store();
1217        let planner = Planner::new(store);
1218
1219        // When distinct columns don't match any output columns,
1220        // it falls back to full-row distinct.
1221        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1222            items: vec![ReturnItem {
1223                expression: LogicalExpression::Variable("n".to_string()),
1224                alias: None,
1225            }],
1226            distinct: false,
1227            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1228                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1229                    variable: "n".to_string(),
1230                    label: None,
1231                    input: None,
1232                })),
1233                columns: Some(vec!["nonexistent".to_string()]),
1234            })),
1235        }));
1236
1237        let physical = planner.plan(&logical).unwrap();
1238        assert_eq!(physical.columns(), &["n"]);
1239    }
1240
1241    // ==================== Aggregate Tests ====================
1242
1243    #[test]
1244    fn test_plan_aggregate_count() {
1245        let store = create_test_store();
1246        let planner = Planner::new(store);
1247
1248        // MATCH (n) RETURN count(n)
1249        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1250            items: vec![ReturnItem {
1251                expression: LogicalExpression::Variable("cnt".to_string()),
1252                alias: None,
1253            }],
1254            distinct: false,
1255            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1256                group_by: vec![],
1257                aggregates: vec![LogicalAggregateExpr {
1258                    function: LogicalAggregateFunction::Count,
1259                    expression: Some(LogicalExpression::Variable("n".to_string())),
1260                    expression2: None,
1261                    distinct: false,
1262                    alias: Some("cnt".to_string()),
1263                    percentile: None,
1264                }],
1265                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1266                    variable: "n".to_string(),
1267                    label: None,
1268                    input: None,
1269                })),
1270                having: None,
1271            })),
1272        }));
1273
1274        let physical = planner.plan(&logical).unwrap();
1275        assert!(physical.columns().contains(&"cnt".to_string()));
1276    }
1277
1278    #[test]
1279    fn test_plan_aggregate_with_group_by() {
1280        let store = create_test_store();
1281        let planner = Planner::new(store);
1282
1283        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1284        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1285            group_by: vec![LogicalExpression::Property {
1286                variable: "n".to_string(),
1287                property: "city".to_string(),
1288            }],
1289            aggregates: vec![LogicalAggregateExpr {
1290                function: LogicalAggregateFunction::Count,
1291                expression: Some(LogicalExpression::Variable("n".to_string())),
1292                expression2: None,
1293                distinct: false,
1294                alias: Some("cnt".to_string()),
1295                percentile: None,
1296            }],
1297            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1298                variable: "n".to_string(),
1299                label: Some("Person".to_string()),
1300                input: None,
1301            })),
1302            having: None,
1303        }));
1304
1305        let physical = planner.plan(&logical).unwrap();
1306        assert_eq!(physical.columns().len(), 2);
1307    }
1308
1309    #[test]
1310    fn test_plan_aggregate_sum() {
1311        let store = create_test_store();
1312        let planner = Planner::new(store);
1313
1314        // SUM(n.value)
1315        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1316            group_by: vec![],
1317            aggregates: vec![LogicalAggregateExpr {
1318                function: LogicalAggregateFunction::Sum,
1319                expression: Some(LogicalExpression::Property {
1320                    variable: "n".to_string(),
1321                    property: "value".to_string(),
1322                }),
1323                expression2: None,
1324                distinct: false,
1325                alias: Some("total".to_string()),
1326                percentile: None,
1327            }],
1328            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1329                variable: "n".to_string(),
1330                label: None,
1331                input: None,
1332            })),
1333            having: None,
1334        }));
1335
1336        let physical = planner.plan(&logical).unwrap();
1337        assert!(physical.columns().contains(&"total".to_string()));
1338    }
1339
1340    #[test]
1341    fn test_plan_aggregate_avg() {
1342        let store = create_test_store();
1343        let planner = Planner::new(store);
1344
1345        // AVG(n.score)
1346        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1347            group_by: vec![],
1348            aggregates: vec![LogicalAggregateExpr {
1349                function: LogicalAggregateFunction::Avg,
1350                expression: Some(LogicalExpression::Property {
1351                    variable: "n".to_string(),
1352                    property: "score".to_string(),
1353                }),
1354                expression2: None,
1355                distinct: false,
1356                alias: Some("average".to_string()),
1357                percentile: None,
1358            }],
1359            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1360                variable: "n".to_string(),
1361                label: None,
1362                input: None,
1363            })),
1364            having: None,
1365        }));
1366
1367        let physical = planner.plan(&logical).unwrap();
1368        assert!(physical.columns().contains(&"average".to_string()));
1369    }
1370
1371    #[test]
1372    fn test_plan_aggregate_min_max() {
1373        let store = create_test_store();
1374        let planner = Planner::new(store);
1375
1376        // MIN(n.age), MAX(n.age)
1377        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1378            group_by: vec![],
1379            aggregates: vec![
1380                LogicalAggregateExpr {
1381                    function: LogicalAggregateFunction::Min,
1382                    expression: Some(LogicalExpression::Property {
1383                        variable: "n".to_string(),
1384                        property: "age".to_string(),
1385                    }),
1386                    expression2: None,
1387                    distinct: false,
1388                    alias: Some("youngest".to_string()),
1389                    percentile: None,
1390                },
1391                LogicalAggregateExpr {
1392                    function: LogicalAggregateFunction::Max,
1393                    expression: Some(LogicalExpression::Property {
1394                        variable: "n".to_string(),
1395                        property: "age".to_string(),
1396                    }),
1397                    expression2: None,
1398                    distinct: false,
1399                    alias: Some("oldest".to_string()),
1400                    percentile: None,
1401                },
1402            ],
1403            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1404                variable: "n".to_string(),
1405                label: None,
1406                input: None,
1407            })),
1408            having: None,
1409        }));
1410
1411        let physical = planner.plan(&logical).unwrap();
1412        assert!(physical.columns().contains(&"youngest".to_string()));
1413        assert!(physical.columns().contains(&"oldest".to_string()));
1414    }
1415
1416    // ==================== Join Tests ====================
1417
1418    #[test]
1419    fn test_plan_inner_join() {
1420        let store = create_test_store();
1421        let planner = Planner::new(store);
1422
1423        // Inner join between two scans
1424        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1425            items: vec![
1426                ReturnItem {
1427                    expression: LogicalExpression::Variable("a".to_string()),
1428                    alias: None,
1429                },
1430                ReturnItem {
1431                    expression: LogicalExpression::Variable("b".to_string()),
1432                    alias: None,
1433                },
1434            ],
1435            distinct: false,
1436            input: Box::new(LogicalOperator::Join(JoinOp {
1437                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1438                    variable: "a".to_string(),
1439                    label: Some("Person".to_string()),
1440                    input: None,
1441                })),
1442                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1443                    variable: "b".to_string(),
1444                    label: Some("Company".to_string()),
1445                    input: None,
1446                })),
1447                join_type: JoinType::Inner,
1448                conditions: vec![JoinCondition {
1449                    left: LogicalExpression::Variable("a".to_string()),
1450                    right: LogicalExpression::Variable("b".to_string()),
1451                }],
1452            })),
1453        }));
1454
1455        let physical = planner.plan(&logical).unwrap();
1456        assert!(physical.columns().contains(&"a".to_string()));
1457        assert!(physical.columns().contains(&"b".to_string()));
1458    }
1459
1460    #[test]
1461    fn test_plan_cross_join() {
1462        let store = create_test_store();
1463        let planner = Planner::new(store);
1464
1465        // Cross join (no conditions)
1466        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1467            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1468                variable: "a".to_string(),
1469                label: None,
1470                input: None,
1471            })),
1472            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1473                variable: "b".to_string(),
1474                label: None,
1475                input: None,
1476            })),
1477            join_type: JoinType::Cross,
1478            conditions: vec![],
1479        }));
1480
1481        let physical = planner.plan(&logical).unwrap();
1482        assert_eq!(physical.columns().len(), 2);
1483    }
1484
1485    #[test]
1486    fn test_plan_left_join() {
1487        let store = create_test_store();
1488        let planner = Planner::new(store);
1489
1490        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1491            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1492                variable: "a".to_string(),
1493                label: None,
1494                input: None,
1495            })),
1496            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1497                variable: "b".to_string(),
1498                label: None,
1499                input: None,
1500            })),
1501            join_type: JoinType::Left,
1502            conditions: vec![],
1503        }));
1504
1505        let physical = planner.plan(&logical).unwrap();
1506        assert_eq!(physical.columns().len(), 2);
1507    }
1508
1509    // ==================== Mutation Tests ====================
1510
1511    #[test]
1512    fn test_plan_create_node() {
1513        let store = create_test_store();
1514        let planner = Planner::new(store);
1515
1516        // CREATE (n:Person {name: 'Alix'})
1517        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1518            variable: "n".to_string(),
1519            labels: vec!["Person".to_string()],
1520            properties: vec![(
1521                "name".to_string(),
1522                LogicalExpression::Literal(Value::String("Alix".into())),
1523            )],
1524            input: None,
1525        }));
1526
1527        let physical = planner.plan(&logical).unwrap();
1528        assert!(physical.columns().contains(&"n".to_string()));
1529    }
1530
1531    #[test]
1532    fn test_plan_create_edge() {
1533        let store = create_test_store();
1534        let planner = Planner::new(store);
1535
1536        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1537        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1538            variable: Some("r".to_string()),
1539            from_variable: "a".to_string(),
1540            to_variable: "b".to_string(),
1541            edge_type: "KNOWS".to_string(),
1542            properties: vec![],
1543            input: Box::new(LogicalOperator::Join(JoinOp {
1544                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1545                    variable: "a".to_string(),
1546                    label: None,
1547                    input: None,
1548                })),
1549                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1550                    variable: "b".to_string(),
1551                    label: None,
1552                    input: None,
1553                })),
1554                join_type: JoinType::Cross,
1555                conditions: vec![],
1556            })),
1557        }));
1558
1559        let physical = planner.plan(&logical).unwrap();
1560        assert!(physical.columns().contains(&"r".to_string()));
1561    }
1562
1563    #[test]
1564    fn test_plan_delete_node() {
1565        let store = create_test_store();
1566        let planner = Planner::new(store);
1567
1568        // MATCH (n) DELETE n
1569        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1570            variable: "n".to_string(),
1571            detach: false,
1572            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1573                variable: "n".to_string(),
1574                label: None,
1575                input: None,
1576            })),
1577        }));
1578
1579        let physical = planner.plan(&logical).unwrap();
1580        assert!(physical.columns().contains(&"deleted_count".to_string()));
1581    }
1582
1583    // ==================== Error Cases ====================
1584
1585    #[test]
1586    fn test_plan_empty_errors() {
1587        let store = create_test_store();
1588        let planner = Planner::new(store);
1589
1590        let logical = LogicalPlan::new(LogicalOperator::Empty);
1591        let result = planner.plan(&logical);
1592        assert!(result.is_err());
1593    }
1594
1595    #[test]
1596    fn test_plan_missing_variable_in_return() {
1597        let store = create_test_store();
1598        let planner = Planner::new(store);
1599
1600        // Return variable that doesn't exist in input
1601        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1602            items: vec![ReturnItem {
1603                expression: LogicalExpression::Variable("missing".to_string()),
1604                alias: None,
1605            }],
1606            distinct: false,
1607            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1608                variable: "n".to_string(),
1609                label: None,
1610                input: None,
1611            })),
1612        }));
1613
1614        let result = planner.plan(&logical);
1615        assert!(result.is_err());
1616    }
1617
1618    // ==================== Helper Function Tests ====================
1619
1620    #[test]
1621    fn test_convert_binary_ops() {
1622        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1623        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1624        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1625        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1626        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1627        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1628        assert!(convert_binary_op(BinaryOp::And).is_ok());
1629        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1630        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1631        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1632        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1633        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1634    }
1635
1636    #[test]
1637    fn test_convert_unary_ops() {
1638        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1639        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1640        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1641        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1642    }
1643
1644    #[test]
1645    fn test_convert_aggregate_functions() {
1646        assert!(matches!(
1647            convert_aggregate_function(LogicalAggregateFunction::Count),
1648            PhysicalAggregateFunction::Count
1649        ));
1650        assert!(matches!(
1651            convert_aggregate_function(LogicalAggregateFunction::Sum),
1652            PhysicalAggregateFunction::Sum
1653        ));
1654        assert!(matches!(
1655            convert_aggregate_function(LogicalAggregateFunction::Avg),
1656            PhysicalAggregateFunction::Avg
1657        ));
1658        assert!(matches!(
1659            convert_aggregate_function(LogicalAggregateFunction::Min),
1660            PhysicalAggregateFunction::Min
1661        ));
1662        assert!(matches!(
1663            convert_aggregate_function(LogicalAggregateFunction::Max),
1664            PhysicalAggregateFunction::Max
1665        ));
1666    }
1667
1668    #[test]
1669    fn test_planner_accessors() {
1670        let store = create_test_store();
1671        let planner = Planner::new(Arc::clone(&store));
1672
1673        assert!(planner.tx_id().is_none());
1674        assert!(planner.tx_manager().is_none());
1675        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1676    }
1677
1678    #[test]
1679    fn test_physical_plan_accessors() {
1680        let store = create_test_store();
1681        let planner = Planner::new(store);
1682
1683        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1684            variable: "n".to_string(),
1685            label: None,
1686            input: None,
1687        }));
1688
1689        let physical = planner.plan(&logical).unwrap();
1690        assert_eq!(physical.columns(), &["n"]);
1691
1692        // Test into_operator
1693        let _ = physical.into_operator();
1694    }
1695
1696    // ==================== Adaptive Planning Tests ====================
1697
1698    #[test]
1699    fn test_plan_adaptive_with_scan() {
1700        let store = create_test_store();
1701        let planner = Planner::new(store);
1702
1703        // MATCH (n:Person) RETURN n
1704        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1705            items: vec![ReturnItem {
1706                expression: LogicalExpression::Variable("n".to_string()),
1707                alias: None,
1708            }],
1709            distinct: false,
1710            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1711                variable: "n".to_string(),
1712                label: Some("Person".to_string()),
1713                input: None,
1714            })),
1715        }));
1716
1717        let physical = planner.plan_adaptive(&logical).unwrap();
1718        assert_eq!(physical.columns(), &["n"]);
1719        // Should have adaptive context with estimates
1720        assert!(physical.adaptive_context.is_some());
1721    }
1722
1723    #[test]
1724    fn test_plan_adaptive_with_filter() {
1725        let store = create_test_store();
1726        let planner = Planner::new(store);
1727
1728        // MATCH (n) WHERE n.age > 30 RETURN n
1729        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1730            items: vec![ReturnItem {
1731                expression: LogicalExpression::Variable("n".to_string()),
1732                alias: None,
1733            }],
1734            distinct: false,
1735            input: Box::new(LogicalOperator::Filter(FilterOp {
1736                predicate: LogicalExpression::Binary {
1737                    left: Box::new(LogicalExpression::Property {
1738                        variable: "n".to_string(),
1739                        property: "age".to_string(),
1740                    }),
1741                    op: BinaryOp::Gt,
1742                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1743                },
1744                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1745                    variable: "n".to_string(),
1746                    label: None,
1747                    input: None,
1748                })),
1749                pushdown_hint: None,
1750            })),
1751        }));
1752
1753        let physical = planner.plan_adaptive(&logical).unwrap();
1754        assert!(physical.adaptive_context.is_some());
1755    }
1756
1757    #[test]
1758    fn test_plan_adaptive_with_expand() {
1759        let store = create_test_store();
1760        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1761
1762        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1763        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1764            items: vec![
1765                ReturnItem {
1766                    expression: LogicalExpression::Variable("a".to_string()),
1767                    alias: None,
1768                },
1769                ReturnItem {
1770                    expression: LogicalExpression::Variable("b".to_string()),
1771                    alias: None,
1772                },
1773            ],
1774            distinct: false,
1775            input: Box::new(LogicalOperator::Expand(ExpandOp {
1776                from_variable: "a".to_string(),
1777                to_variable: "b".to_string(),
1778                edge_variable: None,
1779                direction: ExpandDirection::Outgoing,
1780                edge_types: vec!["KNOWS".to_string()],
1781                min_hops: 1,
1782                max_hops: Some(1),
1783                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1784                    variable: "a".to_string(),
1785                    label: None,
1786                    input: None,
1787                })),
1788                path_alias: None,
1789                path_mode: PathMode::Walk,
1790            })),
1791        }));
1792
1793        let physical = planner.plan_adaptive(&logical).unwrap();
1794        assert!(physical.adaptive_context.is_some());
1795    }
1796
1797    #[test]
1798    fn test_plan_adaptive_with_join() {
1799        let store = create_test_store();
1800        let planner = Planner::new(store);
1801
1802        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1803            items: vec![
1804                ReturnItem {
1805                    expression: LogicalExpression::Variable("a".to_string()),
1806                    alias: None,
1807                },
1808                ReturnItem {
1809                    expression: LogicalExpression::Variable("b".to_string()),
1810                    alias: None,
1811                },
1812            ],
1813            distinct: false,
1814            input: Box::new(LogicalOperator::Join(JoinOp {
1815                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816                    variable: "a".to_string(),
1817                    label: None,
1818                    input: None,
1819                })),
1820                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1821                    variable: "b".to_string(),
1822                    label: None,
1823                    input: None,
1824                })),
1825                join_type: JoinType::Cross,
1826                conditions: vec![],
1827            })),
1828        }));
1829
1830        let physical = planner.plan_adaptive(&logical).unwrap();
1831        assert!(physical.adaptive_context.is_some());
1832    }
1833
1834    #[test]
1835    fn test_plan_adaptive_with_aggregate() {
1836        let store = create_test_store();
1837        let planner = Planner::new(store);
1838
1839        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1840            group_by: vec![],
1841            aggregates: vec![LogicalAggregateExpr {
1842                function: LogicalAggregateFunction::Count,
1843                expression: Some(LogicalExpression::Variable("n".to_string())),
1844                expression2: None,
1845                distinct: false,
1846                alias: Some("cnt".to_string()),
1847                percentile: None,
1848            }],
1849            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1850                variable: "n".to_string(),
1851                label: None,
1852                input: None,
1853            })),
1854            having: None,
1855        }));
1856
1857        let physical = planner.plan_adaptive(&logical).unwrap();
1858        assert!(physical.adaptive_context.is_some());
1859    }
1860
1861    #[test]
1862    fn test_plan_adaptive_with_distinct() {
1863        let store = create_test_store();
1864        let planner = Planner::new(store);
1865
1866        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1867            items: vec![ReturnItem {
1868                expression: LogicalExpression::Variable("n".to_string()),
1869                alias: None,
1870            }],
1871            distinct: false,
1872            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1873                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1874                    variable: "n".to_string(),
1875                    label: None,
1876                    input: None,
1877                })),
1878                columns: None,
1879            })),
1880        }));
1881
1882        let physical = planner.plan_adaptive(&logical).unwrap();
1883        assert!(physical.adaptive_context.is_some());
1884    }
1885
1886    #[test]
1887    fn test_plan_adaptive_with_limit() {
1888        let store = create_test_store();
1889        let planner = Planner::new(store);
1890
1891        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1892            items: vec![ReturnItem {
1893                expression: LogicalExpression::Variable("n".to_string()),
1894                alias: None,
1895            }],
1896            distinct: false,
1897            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1898                count: 10,
1899                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1900                    variable: "n".to_string(),
1901                    label: None,
1902                    input: None,
1903                })),
1904            })),
1905        }));
1906
1907        let physical = planner.plan_adaptive(&logical).unwrap();
1908        assert!(physical.adaptive_context.is_some());
1909    }
1910
1911    #[test]
1912    fn test_plan_adaptive_with_skip() {
1913        let store = create_test_store();
1914        let planner = Planner::new(store);
1915
1916        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1917            items: vec![ReturnItem {
1918                expression: LogicalExpression::Variable("n".to_string()),
1919                alias: None,
1920            }],
1921            distinct: false,
1922            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1923                count: 5,
1924                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1925                    variable: "n".to_string(),
1926                    label: None,
1927                    input: None,
1928                })),
1929            })),
1930        }));
1931
1932        let physical = planner.plan_adaptive(&logical).unwrap();
1933        assert!(physical.adaptive_context.is_some());
1934    }
1935
1936    #[test]
1937    fn test_plan_adaptive_with_sort() {
1938        let store = create_test_store();
1939        let planner = Planner::new(store);
1940
1941        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1942            items: vec![ReturnItem {
1943                expression: LogicalExpression::Variable("n".to_string()),
1944                alias: None,
1945            }],
1946            distinct: false,
1947            input: Box::new(LogicalOperator::Sort(SortOp {
1948                keys: vec![SortKey {
1949                    expression: LogicalExpression::Variable("n".to_string()),
1950                    order: SortOrder::Ascending,
1951                    nulls: None,
1952                }],
1953                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1954                    variable: "n".to_string(),
1955                    label: None,
1956                    input: None,
1957                })),
1958            })),
1959        }));
1960
1961        let physical = planner.plan_adaptive(&logical).unwrap();
1962        assert!(physical.adaptive_context.is_some());
1963    }
1964
1965    #[test]
1966    fn test_plan_adaptive_with_union() {
1967        let store = create_test_store();
1968        let planner = Planner::new(store);
1969
1970        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1971            items: vec![ReturnItem {
1972                expression: LogicalExpression::Variable("n".to_string()),
1973                alias: None,
1974            }],
1975            distinct: false,
1976            input: Box::new(LogicalOperator::Union(UnionOp {
1977                inputs: vec![
1978                    LogicalOperator::NodeScan(NodeScanOp {
1979                        variable: "n".to_string(),
1980                        label: Some("Person".to_string()),
1981                        input: None,
1982                    }),
1983                    LogicalOperator::NodeScan(NodeScanOp {
1984                        variable: "n".to_string(),
1985                        label: Some("Company".to_string()),
1986                        input: None,
1987                    }),
1988                ],
1989            })),
1990        }));
1991
1992        let physical = planner.plan_adaptive(&logical).unwrap();
1993        assert!(physical.adaptive_context.is_some());
1994    }
1995
1996    // ==================== Variable Length Path Tests ====================
1997
1998    #[test]
1999    fn test_plan_expand_variable_length() {
2000        let store = create_test_store();
2001        let planner = Planner::new(store);
2002
2003        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2004        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2005            items: vec![
2006                ReturnItem {
2007                    expression: LogicalExpression::Variable("a".to_string()),
2008                    alias: None,
2009                },
2010                ReturnItem {
2011                    expression: LogicalExpression::Variable("b".to_string()),
2012                    alias: None,
2013                },
2014            ],
2015            distinct: false,
2016            input: Box::new(LogicalOperator::Expand(ExpandOp {
2017                from_variable: "a".to_string(),
2018                to_variable: "b".to_string(),
2019                edge_variable: None,
2020                direction: ExpandDirection::Outgoing,
2021                edge_types: vec!["KNOWS".to_string()],
2022                min_hops: 1,
2023                max_hops: Some(3),
2024                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2025                    variable: "a".to_string(),
2026                    label: None,
2027                    input: None,
2028                })),
2029                path_alias: None,
2030                path_mode: PathMode::Walk,
2031            })),
2032        }));
2033
2034        let physical = planner.plan(&logical).unwrap();
2035        assert!(physical.columns().contains(&"a".to_string()));
2036        assert!(physical.columns().contains(&"b".to_string()));
2037    }
2038
2039    #[test]
2040    fn test_plan_expand_with_path_alias() {
2041        let store = create_test_store();
2042        let planner = Planner::new(store);
2043
2044        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2045        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2046            items: vec![
2047                ReturnItem {
2048                    expression: LogicalExpression::Variable("a".to_string()),
2049                    alias: None,
2050                },
2051                ReturnItem {
2052                    expression: LogicalExpression::Variable("b".to_string()),
2053                    alias: None,
2054                },
2055            ],
2056            distinct: false,
2057            input: Box::new(LogicalOperator::Expand(ExpandOp {
2058                from_variable: "a".to_string(),
2059                to_variable: "b".to_string(),
2060                edge_variable: None,
2061                direction: ExpandDirection::Outgoing,
2062                edge_types: vec!["KNOWS".to_string()],
2063                min_hops: 1,
2064                max_hops: Some(3),
2065                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2066                    variable: "a".to_string(),
2067                    label: None,
2068                    input: None,
2069                })),
2070                path_alias: Some("p".to_string()),
2071                path_mode: PathMode::Walk,
2072            })),
2073        }));
2074
2075        let physical = planner.plan(&logical).unwrap();
2076        // Verify plan was created successfully with expected output columns
2077        assert!(physical.columns().contains(&"a".to_string()));
2078        assert!(physical.columns().contains(&"b".to_string()));
2079    }
2080
2081    #[test]
2082    fn test_plan_expand_incoming() {
2083        let store = create_test_store();
2084        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2085
2086        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2087        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2088            items: vec![
2089                ReturnItem {
2090                    expression: LogicalExpression::Variable("a".to_string()),
2091                    alias: None,
2092                },
2093                ReturnItem {
2094                    expression: LogicalExpression::Variable("b".to_string()),
2095                    alias: None,
2096                },
2097            ],
2098            distinct: false,
2099            input: Box::new(LogicalOperator::Expand(ExpandOp {
2100                from_variable: "a".to_string(),
2101                to_variable: "b".to_string(),
2102                edge_variable: None,
2103                direction: ExpandDirection::Incoming,
2104                edge_types: vec!["KNOWS".to_string()],
2105                min_hops: 1,
2106                max_hops: Some(1),
2107                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2108                    variable: "a".to_string(),
2109                    label: None,
2110                    input: None,
2111                })),
2112                path_alias: None,
2113                path_mode: PathMode::Walk,
2114            })),
2115        }));
2116
2117        let physical = planner.plan(&logical).unwrap();
2118        assert!(physical.columns().contains(&"a".to_string()));
2119        assert!(physical.columns().contains(&"b".to_string()));
2120    }
2121
2122    #[test]
2123    fn test_plan_expand_both_directions() {
2124        let store = create_test_store();
2125        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2126
2127        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2128        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2129            items: vec![
2130                ReturnItem {
2131                    expression: LogicalExpression::Variable("a".to_string()),
2132                    alias: None,
2133                },
2134                ReturnItem {
2135                    expression: LogicalExpression::Variable("b".to_string()),
2136                    alias: None,
2137                },
2138            ],
2139            distinct: false,
2140            input: Box::new(LogicalOperator::Expand(ExpandOp {
2141                from_variable: "a".to_string(),
2142                to_variable: "b".to_string(),
2143                edge_variable: None,
2144                direction: ExpandDirection::Both,
2145                edge_types: vec!["KNOWS".to_string()],
2146                min_hops: 1,
2147                max_hops: Some(1),
2148                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2149                    variable: "a".to_string(),
2150                    label: None,
2151                    input: None,
2152                })),
2153                path_alias: None,
2154                path_mode: PathMode::Walk,
2155            })),
2156        }));
2157
2158        let physical = planner.plan(&logical).unwrap();
2159        assert!(physical.columns().contains(&"a".to_string()));
2160        assert!(physical.columns().contains(&"b".to_string()));
2161    }
2162
2163    // ==================== With Context Tests ====================
2164
2165    #[test]
2166    fn test_planner_with_context() {
2167        use crate::transaction::TransactionManager;
2168
2169        let store = create_test_store();
2170        let tx_manager = Arc::new(TransactionManager::new());
2171        let tx_id = tx_manager.begin();
2172        let epoch = tx_manager.current_epoch();
2173
2174        let planner = Planner::with_context(
2175            Arc::clone(&store),
2176            Arc::clone(&tx_manager),
2177            Some(tx_id),
2178            epoch,
2179        );
2180
2181        assert_eq!(planner.tx_id(), Some(tx_id));
2182        assert!(planner.tx_manager().is_some());
2183        assert_eq!(planner.viewing_epoch(), epoch);
2184    }
2185
2186    #[test]
2187    fn test_planner_with_factorized_execution_disabled() {
2188        let store = create_test_store();
2189        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2190
2191        // Two consecutive expands - should NOT use factorized execution
2192        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2193            items: vec![
2194                ReturnItem {
2195                    expression: LogicalExpression::Variable("a".to_string()),
2196                    alias: None,
2197                },
2198                ReturnItem {
2199                    expression: LogicalExpression::Variable("c".to_string()),
2200                    alias: None,
2201                },
2202            ],
2203            distinct: false,
2204            input: Box::new(LogicalOperator::Expand(ExpandOp {
2205                from_variable: "b".to_string(),
2206                to_variable: "c".to_string(),
2207                edge_variable: None,
2208                direction: ExpandDirection::Outgoing,
2209                edge_types: vec![],
2210                min_hops: 1,
2211                max_hops: Some(1),
2212                input: Box::new(LogicalOperator::Expand(ExpandOp {
2213                    from_variable: "a".to_string(),
2214                    to_variable: "b".to_string(),
2215                    edge_variable: None,
2216                    direction: ExpandDirection::Outgoing,
2217                    edge_types: vec![],
2218                    min_hops: 1,
2219                    max_hops: Some(1),
2220                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2221                        variable: "a".to_string(),
2222                        label: None,
2223                        input: None,
2224                    })),
2225                    path_alias: None,
2226                    path_mode: PathMode::Walk,
2227                })),
2228                path_alias: None,
2229                path_mode: PathMode::Walk,
2230            })),
2231        }));
2232
2233        let physical = planner.plan(&logical).unwrap();
2234        assert!(physical.columns().contains(&"a".to_string()));
2235        assert!(physical.columns().contains(&"c".to_string()));
2236    }
2237
2238    // ==================== Sort with Property Tests ====================
2239
2240    #[test]
2241    fn test_plan_sort_by_property() {
2242        let store = create_test_store();
2243        let planner = Planner::new(store);
2244
2245        // MATCH (n) RETURN n ORDER BY n.name ASC
2246        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2247            items: vec![ReturnItem {
2248                expression: LogicalExpression::Variable("n".to_string()),
2249                alias: None,
2250            }],
2251            distinct: false,
2252            input: Box::new(LogicalOperator::Sort(SortOp {
2253                keys: vec![SortKey {
2254                    expression: LogicalExpression::Property {
2255                        variable: "n".to_string(),
2256                        property: "name".to_string(),
2257                    },
2258                    order: SortOrder::Ascending,
2259                    nulls: None,
2260                }],
2261                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2262                    variable: "n".to_string(),
2263                    label: None,
2264                    input: None,
2265                })),
2266            })),
2267        }));
2268
2269        let physical = planner.plan(&logical).unwrap();
2270        // Should have the property column projected
2271        assert!(physical.columns().contains(&"n".to_string()));
2272    }
2273
2274    // ==================== Scan with Input Tests ====================
2275
2276    #[test]
2277    fn test_plan_scan_with_input() {
2278        let store = create_test_store();
2279        let planner = Planner::new(store);
2280
2281        // A scan with another scan as input (for chained patterns)
2282        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2283            items: vec![
2284                ReturnItem {
2285                    expression: LogicalExpression::Variable("a".to_string()),
2286                    alias: None,
2287                },
2288                ReturnItem {
2289                    expression: LogicalExpression::Variable("b".to_string()),
2290                    alias: None,
2291                },
2292            ],
2293            distinct: false,
2294            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2295                variable: "b".to_string(),
2296                label: Some("Company".to_string()),
2297                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2298                    variable: "a".to_string(),
2299                    label: Some("Person".to_string()),
2300                    input: None,
2301                }))),
2302            })),
2303        }));
2304
2305        let physical = planner.plan(&logical).unwrap();
2306        assert!(physical.columns().contains(&"a".to_string()));
2307        assert!(physical.columns().contains(&"b".to_string()));
2308    }
2309}