Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TransactionId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33    EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34    FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35    HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37    MapCollectOperator, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator,
38    NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator, ParameterScanOperator,
39    ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator, ScanOperator,
40    SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SortDirection,
41    SortKey as PhysicalSortKey, SortOperator, UnwindOperator, VariableLengthExpandOperator,
42};
43use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
44use std::collections::HashMap;
45use std::sync::Arc;
46
47use crate::query::planner::common;
48use crate::query::planner::common::expression_to_string;
49use crate::query::planner::{
50    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
51    convert_unary_op, value_to_logical_type,
52};
53use crate::transaction::TransactionManager;
54
55/// Range bounds for property-based range queries.
56struct RangeBounds<'a> {
57    min: Option<&'a Value>,
58    max: Option<&'a Value>,
59    min_inclusive: bool,
60    max_inclusive: bool,
61}
62
63/// Converts a logical plan to a physical operator tree for LPG stores.
64pub struct Planner {
65    /// The graph store (supports both read and write operations).
66    pub(super) store: Arc<dyn GraphStoreMut>,
67    /// Transaction manager for MVCC operations.
68    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
69    /// Current transaction ID (if in a transaction).
70    pub(super) transaction_id: Option<TransactionId>,
71    /// Epoch to use for visibility checks.
72    pub(super) viewing_epoch: EpochId,
73    /// Counter for generating unique anonymous edge column names.
74    pub(super) anon_edge_counter: std::cell::Cell<u32>,
75    /// Whether to use factorized execution for multi-hop queries.
76    pub(super) factorized_execution: bool,
77    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
78    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
79    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
80    /// Variables that hold edge IDs (from MATCH edge patterns).
81    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
82    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
83    /// Optional constraint validator for schema enforcement during mutations.
84    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
85    /// Catalog for user-defined procedure lookup.
86    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
87    /// Shared parameter state for the currently planning correlated Apply.
88    /// Set by `plan_apply` before planning the inner operator, consumed by
89    /// `plan_operator` when encountering `ParameterScan`.
90    pub(super) correlated_param_state:
91        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
92    /// Variables from variable-length expand patterns (group-list variables).
93    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
94    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
95}
96
97impl Planner {
98    /// Creates a new planner with the given store.
99    ///
100    /// This creates a planner without transaction context, using the current
101    /// epoch from the store for visibility.
102    #[must_use]
103    pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
104        let epoch = store.current_epoch();
105        Self {
106            store,
107            transaction_manager: None,
108            transaction_id: None,
109            viewing_epoch: epoch,
110            anon_edge_counter: std::cell::Cell::new(0),
111            factorized_execution: true,
112            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
113            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
114            validator: None,
115            catalog: None,
116            correlated_param_state: std::cell::RefCell::new(None),
117            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
118        }
119    }
120
121    /// Creates a new planner with transaction context for MVCC-aware planning.
122    #[must_use]
123    pub fn with_context(
124        store: Arc<dyn GraphStoreMut>,
125        transaction_manager: Arc<TransactionManager>,
126        transaction_id: Option<TransactionId>,
127        viewing_epoch: EpochId,
128    ) -> Self {
129        Self {
130            store,
131            transaction_manager: Some(transaction_manager),
132            transaction_id,
133            viewing_epoch,
134            anon_edge_counter: std::cell::Cell::new(0),
135            factorized_execution: true,
136            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
137            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
138            validator: None,
139            catalog: None,
140            correlated_param_state: std::cell::RefCell::new(None),
141            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
142        }
143    }
144
145    /// Returns the viewing epoch for this planner.
146    #[must_use]
147    pub fn viewing_epoch(&self) -> EpochId {
148        self.viewing_epoch
149    }
150
151    /// Returns the transaction ID for this planner, if any.
152    #[must_use]
153    pub fn transaction_id(&self) -> Option<TransactionId> {
154        self.transaction_id
155    }
156
157    /// Returns a reference to the transaction manager, if available.
158    #[must_use]
159    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
160        self.transaction_manager.as_ref()
161    }
162
163    /// Enables or disables factorized execution for multi-hop queries.
164    #[must_use]
165    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
166        self.factorized_execution = enabled;
167        self
168    }
169
170    /// Sets the constraint validator for schema enforcement during mutations.
171    #[must_use]
172    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
173        self.validator = Some(validator);
174        self
175    }
176
177    /// Sets the catalog for user-defined procedure lookup.
178    #[must_use]
179    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
180        self.catalog = Some(catalog);
181        self
182    }
183
184    /// Counts consecutive single-hop expand operations.
185    ///
186    /// Returns the count and the deepest non-expand operator (the base of the chain).
187    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
188        match op {
189            LogicalOperator::Expand(expand) => {
190                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
191
192                if is_single_hop {
193                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
194                    (inner_count + 1, base)
195                } else {
196                    (0, op)
197                }
198            }
199            _ => (0, op),
200        }
201    }
202
203    /// Collects expand operations from the outermost down to the base.
204    ///
205    /// Returns expands in order from innermost (base) to outermost.
206    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
207        let mut chain = Vec::new();
208        let mut current = op;
209
210        while let LogicalOperator::Expand(expand) = current {
211            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
212            if !is_single_hop {
213                break;
214            }
215            chain.push(expand);
216            current = &expand.input;
217        }
218
219        chain.reverse();
220        chain
221    }
222
223    /// Plans a logical plan into a physical operator.
224    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
225        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
226        Ok(PhysicalPlan {
227            operator,
228            columns,
229            adaptive_context: None,
230        })
231    }
232
233    /// Plans a logical plan with adaptive execution support.
234    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
235        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
236
237        let mut adaptive_context = AdaptiveContext::new();
238        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
239
240        Ok(PhysicalPlan {
241            operator,
242            columns,
243            adaptive_context: Some(adaptive_context),
244        })
245    }
246
247    /// Collects cardinality estimates from the logical plan into an adaptive context.
248    fn collect_cardinality_estimates(
249        &self,
250        op: &LogicalOperator,
251        ctx: &mut AdaptiveContext,
252        depth: usize,
253    ) {
254        match op {
255            LogicalOperator::NodeScan(scan) => {
256                let estimate = if let Some(label) = &scan.label {
257                    self.store.nodes_by_label(label).len() as f64
258                } else {
259                    self.store.node_count() as f64
260                };
261                let id = format!("scan_{}", scan.variable);
262                ctx.set_estimate(&id, estimate);
263
264                if let Some(input) = &scan.input {
265                    self.collect_cardinality_estimates(input, ctx, depth + 1);
266                }
267            }
268            LogicalOperator::Filter(filter) => {
269                let input_estimate = self.estimate_cardinality(&filter.input);
270                let estimate = input_estimate * 0.3;
271                let id = format!("filter_{depth}");
272                ctx.set_estimate(&id, estimate);
273
274                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
275            }
276            LogicalOperator::Expand(expand) => {
277                let input_estimate = self.estimate_cardinality(&expand.input);
278                let stats = self.store.statistics();
279                let avg_degree = self.estimate_expand_degree(&stats, expand);
280                let estimate = input_estimate * avg_degree;
281                let id = format!("expand_{}", expand.to_variable);
282                ctx.set_estimate(&id, estimate);
283
284                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
285            }
286            LogicalOperator::Join(join) => {
287                let left_est = self.estimate_cardinality(&join.left);
288                let right_est = self.estimate_cardinality(&join.right);
289                let estimate = (left_est * right_est).sqrt();
290                let id = format!("join_{depth}");
291                ctx.set_estimate(&id, estimate);
292
293                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
294                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
295            }
296            LogicalOperator::Aggregate(agg) => {
297                let input_estimate = self.estimate_cardinality(&agg.input);
298                let estimate = if agg.group_by.is_empty() {
299                    1.0
300                } else {
301                    (input_estimate * 0.1).max(1.0)
302                };
303                let id = format!("aggregate_{depth}");
304                ctx.set_estimate(&id, estimate);
305
306                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
307            }
308            LogicalOperator::Distinct(distinct) => {
309                let input_estimate = self.estimate_cardinality(&distinct.input);
310                let estimate = (input_estimate * 0.5).max(1.0);
311                let id = format!("distinct_{depth}");
312                ctx.set_estimate(&id, estimate);
313
314                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
315            }
316            LogicalOperator::Return(ret) => {
317                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
318            }
319            LogicalOperator::Limit(limit) => {
320                let input_estimate = self.estimate_cardinality(&limit.input);
321                let estimate = (input_estimate).min(limit.count.estimate());
322                let id = format!("limit_{depth}");
323                ctx.set_estimate(&id, estimate);
324
325                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
326            }
327            LogicalOperator::Skip(skip) => {
328                let input_estimate = self.estimate_cardinality(&skip.input);
329                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
330                let id = format!("skip_{depth}");
331                ctx.set_estimate(&id, estimate);
332
333                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
334            }
335            LogicalOperator::Sort(sort) => {
336                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
337            }
338            LogicalOperator::Union(union) => {
339                let estimate: f64 = union
340                    .inputs
341                    .iter()
342                    .map(|input| self.estimate_cardinality(input))
343                    .sum();
344                let id = format!("union_{depth}");
345                ctx.set_estimate(&id, estimate);
346
347                for input in &union.inputs {
348                    self.collect_cardinality_estimates(input, ctx, depth + 1);
349                }
350            }
351            _ => {
352                // For other operators, try to recurse into known input patterns
353            }
354        }
355    }
356
357    /// Estimates cardinality for a logical operator subtree.
358    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
359        match op {
360            LogicalOperator::NodeScan(scan) => {
361                if let Some(label) = &scan.label {
362                    self.store.nodes_by_label(label).len() as f64
363                } else {
364                    self.store.node_count() as f64
365                }
366            }
367            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
368            LogicalOperator::Expand(expand) => {
369                let stats = self.store.statistics();
370                let avg_degree = self.estimate_expand_degree(&stats, expand);
371                self.estimate_cardinality(&expand.input) * avg_degree
372            }
373            LogicalOperator::Join(join) => {
374                let left = self.estimate_cardinality(&join.left);
375                let right = self.estimate_cardinality(&join.right);
376                (left * right).sqrt()
377            }
378            LogicalOperator::Aggregate(agg) => {
379                if agg.group_by.is_empty() {
380                    1.0
381                } else {
382                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
383                }
384            }
385            LogicalOperator::Distinct(distinct) => {
386                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
387            }
388            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
389            LogicalOperator::Limit(limit) => self
390                .estimate_cardinality(&limit.input)
391                .min(limit.count.estimate()),
392            LogicalOperator::Skip(skip) => {
393                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
394            }
395            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
396            LogicalOperator::Union(union) => union
397                .inputs
398                .iter()
399                .map(|input| self.estimate_cardinality(input))
400                .sum(),
401            LogicalOperator::Except(except) => {
402                let left = self.estimate_cardinality(&except.left);
403                let right = self.estimate_cardinality(&except.right);
404                (left - right).max(0.0)
405            }
406            LogicalOperator::Intersect(intersect) => {
407                let left = self.estimate_cardinality(&intersect.left);
408                let right = self.estimate_cardinality(&intersect.right);
409                left.min(right)
410            }
411            LogicalOperator::Otherwise(otherwise) => self
412                .estimate_cardinality(&otherwise.left)
413                .max(self.estimate_cardinality(&otherwise.right)),
414            _ => 1000.0,
415        }
416    }
417
418    /// Estimates the average edge degree for an expand operation using store statistics.
419    fn estimate_expand_degree(
420        &self,
421        stats: &grafeo_core::statistics::Statistics,
422        expand: &ExpandOp,
423    ) -> f64 {
424        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
425        if expand.edge_types.len() == 1 {
426            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
427        } else if stats.total_nodes > 0 {
428            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
429        } else {
430            10.0
431        }
432    }
433
434    /// Plans a single logical operator.
435    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
436        match op {
437            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
438            LogicalOperator::Expand(expand) => {
439                if self.factorized_execution {
440                    let (chain_len, _base) = Self::count_expand_chain(op);
441                    if chain_len >= 2 {
442                        return self.plan_expand_chain(op);
443                    }
444                }
445                self.plan_expand(expand)
446            }
447            LogicalOperator::Return(ret) => self.plan_return(ret),
448            LogicalOperator::Filter(filter) => self.plan_filter(filter),
449            LogicalOperator::Project(project) => self.plan_project(project),
450            LogicalOperator::Limit(limit) => self.plan_limit(limit),
451            LogicalOperator::Skip(skip) => self.plan_skip(skip),
452            LogicalOperator::Sort(sort) => self.plan_sort(sort),
453            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
454            LogicalOperator::Join(join) => self.plan_join(join),
455            LogicalOperator::Union(union) => self.plan_union(union),
456            LogicalOperator::Except(except) => self.plan_except(except),
457            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
458            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
459            LogicalOperator::Apply(apply) => self.plan_apply(apply),
460            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
461            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
462            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
463            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
464            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
465            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
466            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
467            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
468            LogicalOperator::Merge(merge) => self.plan_merge(merge),
469            LogicalOperator::MergeRelationship(merge_rel) => {
470                self.plan_merge_relationship(merge_rel)
471            }
472            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
473            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
474            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
475            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
476            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
477            #[cfg(feature = "algos")]
478            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
479            #[cfg(not(feature = "algos"))]
480            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
481                "CALL procedures require the 'algos' feature".to_string(),
482            )),
483            LogicalOperator::ParameterScan(param_scan) => {
484                let state = self
485                    .correlated_param_state
486                    .borrow()
487                    .clone()
488                    .ok_or_else(|| {
489                        Error::Internal(
490                            "ParameterScan without correlated Apply context".to_string(),
491                        )
492                    })?;
493                let columns = param_scan.columns.clone();
494                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
495                Ok((operator, columns))
496            }
497            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
498            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
499            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
500            LogicalOperator::VectorScan(_) => Err(Error::Internal(
501                "VectorScan requires vector-index feature".to_string(),
502            )),
503            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
504                "VectorJoin requires vector-index feature".to_string(),
505            )),
506            _ => Err(Error::Internal(format!(
507                "Unsupported operator: {:?}",
508                std::mem::discriminant(op)
509            ))),
510        }
511    }
512
513    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
514    fn plan_horizontal_aggregate(
515        &self,
516        ha: &HorizontalAggregateOp,
517    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
518        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
519
520        let list_col_idx = child_columns
521            .iter()
522            .position(|c| c == &ha.list_column)
523            .ok_or_else(|| {
524                Error::Internal(format!(
525                    "HorizontalAggregate list column '{}' not found in {:?}",
526                    ha.list_column, child_columns
527                ))
528            })?;
529
530        let entity_kind = match ha.entity_kind {
531            LogicalEntityKind::Edge => EntityKind::Edge,
532            LogicalEntityKind::Node => EntityKind::Node,
533        };
534
535        let function = convert_aggregate_function(ha.function);
536        let input_column_count = child_columns.len();
537
538        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
539            child_op,
540            list_col_idx,
541            entity_kind,
542            function,
543            ha.property.clone(),
544            Arc::clone(&self.store) as Arc<dyn GraphStore>,
545            input_column_count,
546        ));
547
548        let mut columns = child_columns;
549        columns.push(ha.alias.clone());
550        // Mark the result as a scalar column
551        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
552
553        Ok((operator, columns))
554    }
555
556    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
557    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
558        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
559        let key_idx = child_columns
560            .iter()
561            .position(|c| c == &mc.key_var)
562            .ok_or_else(|| {
563                Error::Internal(format!(
564                    "MapCollect key '{}' not in columns {:?}",
565                    mc.key_var, child_columns
566                ))
567            })?;
568        let value_idx = child_columns
569            .iter()
570            .position(|c| c == &mc.value_var)
571            .ok_or_else(|| {
572                Error::Internal(format!(
573                    "MapCollect value '{}' not in columns {:?}",
574                    mc.value_var, child_columns
575                ))
576            })?;
577        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
578        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
579        Ok((operator, vec![mc.alias.clone()]))
580    }
581}
582
583/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
584#[cfg(feature = "algos")]
585struct StaticResultOperator {
586    rows: Vec<Vec<Value>>,
587    column_indices: Vec<usize>,
588    row_index: usize,
589}
590
591#[cfg(feature = "algos")]
592impl Operator for StaticResultOperator {
593    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
594        use grafeo_core::execution::DataChunk;
595
596        if self.row_index >= self.rows.len() {
597            return Ok(None);
598        }
599
600        let remaining = self.rows.len() - self.row_index;
601        let chunk_rows = remaining.min(1024);
602        let col_count = self.column_indices.len();
603
604        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
605        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
606
607        for row_offset in 0..chunk_rows {
608            let row = &self.rows[self.row_index + row_offset];
609            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
610                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
611                if let Some(col) = chunk.column_mut(col_idx) {
612                    col.push_value(value);
613                }
614            }
615        }
616        chunk.set_count(chunk_rows);
617
618        self.row_index += chunk_rows;
619        Ok(Some(chunk))
620    }
621
622    fn reset(&mut self) {
623        self.row_index = 0;
624    }
625
626    fn name(&self) -> &'static str {
627        "StaticResult"
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::query::plan::{
635        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
636        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
637        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
638        SkipOp as LogicalSkipOp, SortKey, SortOp,
639    };
640    use grafeo_common::types::Value;
641    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
642    use grafeo_core::graph::GraphStoreMut;
643    use grafeo_core::graph::lpg::LpgStore;
644
645    fn create_test_store() -> Arc<dyn GraphStoreMut> {
646        let store = Arc::new(LpgStore::new().unwrap());
647        store.create_node(&["Person"]);
648        store.create_node(&["Person"]);
649        store.create_node(&["Company"]);
650        store
651    }
652
653    // ==================== Simple Scan Tests ====================
654
655    #[test]
656    fn test_plan_simple_scan() {
657        let store = create_test_store();
658        let planner = Planner::new(store);
659
660        // MATCH (n:Person) RETURN n
661        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
662            items: vec![ReturnItem {
663                expression: LogicalExpression::Variable("n".to_string()),
664                alias: None,
665            }],
666            distinct: false,
667            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
668                variable: "n".to_string(),
669                label: Some("Person".to_string()),
670                input: None,
671            })),
672        }));
673
674        let physical = planner.plan(&logical).unwrap();
675        assert_eq!(physical.columns(), &["n"]);
676    }
677
678    #[test]
679    fn test_plan_scan_without_label() {
680        let store = create_test_store();
681        let planner = Planner::new(store);
682
683        // MATCH (n) RETURN n
684        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
685            items: vec![ReturnItem {
686                expression: LogicalExpression::Variable("n".to_string()),
687                alias: None,
688            }],
689            distinct: false,
690            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
691                variable: "n".to_string(),
692                label: None,
693                input: None,
694            })),
695        }));
696
697        let physical = planner.plan(&logical).unwrap();
698        assert_eq!(physical.columns(), &["n"]);
699    }
700
701    #[test]
702    fn test_plan_return_with_alias() {
703        let store = create_test_store();
704        let planner = Planner::new(store);
705
706        // MATCH (n:Person) RETURN n AS person
707        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
708            items: vec![ReturnItem {
709                expression: LogicalExpression::Variable("n".to_string()),
710                alias: Some("person".to_string()),
711            }],
712            distinct: false,
713            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
714                variable: "n".to_string(),
715                label: Some("Person".to_string()),
716                input: None,
717            })),
718        }));
719
720        let physical = planner.plan(&logical).unwrap();
721        assert_eq!(physical.columns(), &["person"]);
722    }
723
724    #[test]
725    fn test_plan_return_property() {
726        let store = create_test_store();
727        let planner = Planner::new(store);
728
729        // MATCH (n:Person) RETURN n.name
730        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
731            items: vec![ReturnItem {
732                expression: LogicalExpression::Property {
733                    variable: "n".to_string(),
734                    property: "name".to_string(),
735                },
736                alias: None,
737            }],
738            distinct: false,
739            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
740                variable: "n".to_string(),
741                label: Some("Person".to_string()),
742                input: None,
743            })),
744        }));
745
746        let physical = planner.plan(&logical).unwrap();
747        assert_eq!(physical.columns(), &["n.name"]);
748    }
749
750    #[test]
751    fn test_plan_return_literal() {
752        let store = create_test_store();
753        let planner = Planner::new(store);
754
755        // MATCH (n) RETURN 42 AS answer
756        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
757            items: vec![ReturnItem {
758                expression: LogicalExpression::Literal(Value::Int64(42)),
759                alias: Some("answer".to_string()),
760            }],
761            distinct: false,
762            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
763                variable: "n".to_string(),
764                label: None,
765                input: None,
766            })),
767        }));
768
769        let physical = planner.plan(&logical).unwrap();
770        assert_eq!(physical.columns(), &["answer"]);
771    }
772
773    // ==================== Filter Tests ====================
774
775    #[test]
776    fn test_plan_filter_equality() {
777        let store = create_test_store();
778        let planner = Planner::new(store);
779
780        // MATCH (n:Person) WHERE n.age = 30 RETURN n
781        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
782            items: vec![ReturnItem {
783                expression: LogicalExpression::Variable("n".to_string()),
784                alias: None,
785            }],
786            distinct: false,
787            input: Box::new(LogicalOperator::Filter(FilterOp {
788                predicate: LogicalExpression::Binary {
789                    left: Box::new(LogicalExpression::Property {
790                        variable: "n".to_string(),
791                        property: "age".to_string(),
792                    }),
793                    op: BinaryOp::Eq,
794                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
795                },
796                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
797                    variable: "n".to_string(),
798                    label: Some("Person".to_string()),
799                    input: None,
800                })),
801                pushdown_hint: None,
802            })),
803        }));
804
805        let physical = planner.plan(&logical).unwrap();
806        assert_eq!(physical.columns(), &["n"]);
807    }
808
809    #[test]
810    fn test_plan_filter_compound_and() {
811        let store = create_test_store();
812        let planner = Planner::new(store);
813
814        // WHERE n.age > 20 AND n.age < 40
815        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
816            items: vec![ReturnItem {
817                expression: LogicalExpression::Variable("n".to_string()),
818                alias: None,
819            }],
820            distinct: false,
821            input: Box::new(LogicalOperator::Filter(FilterOp {
822                predicate: LogicalExpression::Binary {
823                    left: Box::new(LogicalExpression::Binary {
824                        left: Box::new(LogicalExpression::Property {
825                            variable: "n".to_string(),
826                            property: "age".to_string(),
827                        }),
828                        op: BinaryOp::Gt,
829                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
830                    }),
831                    op: BinaryOp::And,
832                    right: Box::new(LogicalExpression::Binary {
833                        left: Box::new(LogicalExpression::Property {
834                            variable: "n".to_string(),
835                            property: "age".to_string(),
836                        }),
837                        op: BinaryOp::Lt,
838                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
839                    }),
840                },
841                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
842                    variable: "n".to_string(),
843                    label: None,
844                    input: None,
845                })),
846                pushdown_hint: None,
847            })),
848        }));
849
850        let physical = planner.plan(&logical).unwrap();
851        assert_eq!(physical.columns(), &["n"]);
852    }
853
854    #[test]
855    fn test_plan_filter_unary_not() {
856        let store = create_test_store();
857        let planner = Planner::new(store);
858
859        // WHERE NOT n.active
860        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
861            items: vec![ReturnItem {
862                expression: LogicalExpression::Variable("n".to_string()),
863                alias: None,
864            }],
865            distinct: false,
866            input: Box::new(LogicalOperator::Filter(FilterOp {
867                predicate: LogicalExpression::Unary {
868                    op: UnaryOp::Not,
869                    operand: Box::new(LogicalExpression::Property {
870                        variable: "n".to_string(),
871                        property: "active".to_string(),
872                    }),
873                },
874                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
875                    variable: "n".to_string(),
876                    label: None,
877                    input: None,
878                })),
879                pushdown_hint: None,
880            })),
881        }));
882
883        let physical = planner.plan(&logical).unwrap();
884        assert_eq!(physical.columns(), &["n"]);
885    }
886
887    #[test]
888    fn test_plan_filter_is_null() {
889        let store = create_test_store();
890        let planner = Planner::new(store);
891
892        // WHERE n.email IS NULL
893        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
894            items: vec![ReturnItem {
895                expression: LogicalExpression::Variable("n".to_string()),
896                alias: None,
897            }],
898            distinct: false,
899            input: Box::new(LogicalOperator::Filter(FilterOp {
900                predicate: LogicalExpression::Unary {
901                    op: UnaryOp::IsNull,
902                    operand: Box::new(LogicalExpression::Property {
903                        variable: "n".to_string(),
904                        property: "email".to_string(),
905                    }),
906                },
907                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
908                    variable: "n".to_string(),
909                    label: None,
910                    input: None,
911                })),
912                pushdown_hint: None,
913            })),
914        }));
915
916        let physical = planner.plan(&logical).unwrap();
917        assert_eq!(physical.columns(), &["n"]);
918    }
919
920    #[test]
921    fn test_plan_filter_function_call() {
922        let store = create_test_store();
923        let planner = Planner::new(store);
924
925        // WHERE size(n.friends) > 0
926        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
927            items: vec![ReturnItem {
928                expression: LogicalExpression::Variable("n".to_string()),
929                alias: None,
930            }],
931            distinct: false,
932            input: Box::new(LogicalOperator::Filter(FilterOp {
933                predicate: LogicalExpression::Binary {
934                    left: Box::new(LogicalExpression::FunctionCall {
935                        name: "size".to_string(),
936                        args: vec![LogicalExpression::Property {
937                            variable: "n".to_string(),
938                            property: "friends".to_string(),
939                        }],
940                        distinct: false,
941                    }),
942                    op: BinaryOp::Gt,
943                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
944                },
945                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
946                    variable: "n".to_string(),
947                    label: None,
948                    input: None,
949                })),
950                pushdown_hint: None,
951            })),
952        }));
953
954        let physical = planner.plan(&logical).unwrap();
955        assert_eq!(physical.columns(), &["n"]);
956    }
957
958    // ==================== Expand Tests ====================
959
960    #[test]
961    fn test_plan_expand_outgoing() {
962        let store = create_test_store();
963        let planner = Planner::new(store);
964
965        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
966        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
967            items: vec![
968                ReturnItem {
969                    expression: LogicalExpression::Variable("a".to_string()),
970                    alias: None,
971                },
972                ReturnItem {
973                    expression: LogicalExpression::Variable("b".to_string()),
974                    alias: None,
975                },
976            ],
977            distinct: false,
978            input: Box::new(LogicalOperator::Expand(ExpandOp {
979                from_variable: "a".to_string(),
980                to_variable: "b".to_string(),
981                edge_variable: None,
982                direction: ExpandDirection::Outgoing,
983                edge_types: vec!["KNOWS".to_string()],
984                min_hops: 1,
985                max_hops: Some(1),
986                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
987                    variable: "a".to_string(),
988                    label: Some("Person".to_string()),
989                    input: None,
990                })),
991                path_alias: None,
992                path_mode: PathMode::Walk,
993            })),
994        }));
995
996        let physical = planner.plan(&logical).unwrap();
997        // The return should have columns [a, b]
998        assert!(physical.columns().contains(&"a".to_string()));
999        assert!(physical.columns().contains(&"b".to_string()));
1000    }
1001
1002    #[test]
1003    fn test_plan_expand_with_edge_variable() {
1004        let store = create_test_store();
1005        let planner = Planner::new(store);
1006
1007        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1008        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1009            items: vec![
1010                ReturnItem {
1011                    expression: LogicalExpression::Variable("a".to_string()),
1012                    alias: None,
1013                },
1014                ReturnItem {
1015                    expression: LogicalExpression::Variable("r".to_string()),
1016                    alias: None,
1017                },
1018                ReturnItem {
1019                    expression: LogicalExpression::Variable("b".to_string()),
1020                    alias: None,
1021                },
1022            ],
1023            distinct: false,
1024            input: Box::new(LogicalOperator::Expand(ExpandOp {
1025                from_variable: "a".to_string(),
1026                to_variable: "b".to_string(),
1027                edge_variable: Some("r".to_string()),
1028                direction: ExpandDirection::Outgoing,
1029                edge_types: vec!["KNOWS".to_string()],
1030                min_hops: 1,
1031                max_hops: Some(1),
1032                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1033                    variable: "a".to_string(),
1034                    label: None,
1035                    input: None,
1036                })),
1037                path_alias: None,
1038                path_mode: PathMode::Walk,
1039            })),
1040        }));
1041
1042        let physical = planner.plan(&logical).unwrap();
1043        assert!(physical.columns().contains(&"a".to_string()));
1044        assert!(physical.columns().contains(&"r".to_string()));
1045        assert!(physical.columns().contains(&"b".to_string()));
1046    }
1047
1048    // ==================== Limit/Skip/Sort Tests ====================
1049
1050    #[test]
1051    fn test_plan_limit() {
1052        let store = create_test_store();
1053        let planner = Planner::new(store);
1054
1055        // MATCH (n) RETURN n LIMIT 10
1056        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1057            items: vec![ReturnItem {
1058                expression: LogicalExpression::Variable("n".to_string()),
1059                alias: None,
1060            }],
1061            distinct: false,
1062            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1063                count: 10.into(),
1064                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1065                    variable: "n".to_string(),
1066                    label: None,
1067                    input: None,
1068                })),
1069            })),
1070        }));
1071
1072        let physical = planner.plan(&logical).unwrap();
1073        assert_eq!(physical.columns(), &["n"]);
1074    }
1075
1076    #[test]
1077    fn test_plan_skip() {
1078        let store = create_test_store();
1079        let planner = Planner::new(store);
1080
1081        // MATCH (n) RETURN n SKIP 5
1082        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1083            items: vec![ReturnItem {
1084                expression: LogicalExpression::Variable("n".to_string()),
1085                alias: None,
1086            }],
1087            distinct: false,
1088            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1089                count: 5.into(),
1090                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1091                    variable: "n".to_string(),
1092                    label: None,
1093                    input: None,
1094                })),
1095            })),
1096        }));
1097
1098        let physical = planner.plan(&logical).unwrap();
1099        assert_eq!(physical.columns(), &["n"]);
1100    }
1101
1102    #[test]
1103    fn test_plan_sort() {
1104        let store = create_test_store();
1105        let planner = Planner::new(store);
1106
1107        // MATCH (n) RETURN n ORDER BY n.name ASC
1108        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1109            items: vec![ReturnItem {
1110                expression: LogicalExpression::Variable("n".to_string()),
1111                alias: None,
1112            }],
1113            distinct: false,
1114            input: Box::new(LogicalOperator::Sort(SortOp {
1115                keys: vec![SortKey {
1116                    expression: LogicalExpression::Variable("n".to_string()),
1117                    order: SortOrder::Ascending,
1118                    nulls: None,
1119                }],
1120                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1121                    variable: "n".to_string(),
1122                    label: None,
1123                    input: None,
1124                })),
1125            })),
1126        }));
1127
1128        let physical = planner.plan(&logical).unwrap();
1129        assert_eq!(physical.columns(), &["n"]);
1130    }
1131
1132    #[test]
1133    fn test_plan_sort_descending() {
1134        let store = create_test_store();
1135        let planner = Planner::new(store);
1136
1137        // ORDER BY n DESC
1138        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1139            items: vec![ReturnItem {
1140                expression: LogicalExpression::Variable("n".to_string()),
1141                alias: None,
1142            }],
1143            distinct: false,
1144            input: Box::new(LogicalOperator::Sort(SortOp {
1145                keys: vec![SortKey {
1146                    expression: LogicalExpression::Variable("n".to_string()),
1147                    order: SortOrder::Descending,
1148                    nulls: None,
1149                }],
1150                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1151                    variable: "n".to_string(),
1152                    label: None,
1153                    input: None,
1154                })),
1155            })),
1156        }));
1157
1158        let physical = planner.plan(&logical).unwrap();
1159        assert_eq!(physical.columns(), &["n"]);
1160    }
1161
1162    #[test]
1163    fn test_plan_distinct() {
1164        let store = create_test_store();
1165        let planner = Planner::new(store);
1166
1167        // MATCH (n) RETURN DISTINCT n
1168        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1169            items: vec![ReturnItem {
1170                expression: LogicalExpression::Variable("n".to_string()),
1171                alias: None,
1172            }],
1173            distinct: false,
1174            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1175                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1176                    variable: "n".to_string(),
1177                    label: None,
1178                    input: None,
1179                })),
1180                columns: None,
1181            })),
1182        }));
1183
1184        let physical = planner.plan(&logical).unwrap();
1185        assert_eq!(physical.columns(), &["n"]);
1186    }
1187
1188    #[test]
1189    fn test_plan_distinct_with_columns() {
1190        let store = create_test_store();
1191        let planner = Planner::new(store);
1192
1193        // DISTINCT on specific columns (column-specific dedup)
1194        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1195            items: vec![ReturnItem {
1196                expression: LogicalExpression::Variable("n".to_string()),
1197                alias: None,
1198            }],
1199            distinct: false,
1200            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1201                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1202                    variable: "n".to_string(),
1203                    label: None,
1204                    input: None,
1205                })),
1206                columns: Some(vec!["n".to_string()]),
1207            })),
1208        }));
1209
1210        let physical = planner.plan(&logical).unwrap();
1211        assert_eq!(physical.columns(), &["n"]);
1212    }
1213
1214    #[test]
1215    fn test_plan_distinct_with_nonexistent_columns() {
1216        let store = create_test_store();
1217        let planner = Planner::new(store);
1218
1219        // When distinct columns don't match any output columns,
1220        // it falls back to full-row distinct.
1221        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1222            items: vec![ReturnItem {
1223                expression: LogicalExpression::Variable("n".to_string()),
1224                alias: None,
1225            }],
1226            distinct: false,
1227            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1228                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1229                    variable: "n".to_string(),
1230                    label: None,
1231                    input: None,
1232                })),
1233                columns: Some(vec!["nonexistent".to_string()]),
1234            })),
1235        }));
1236
1237        let physical = planner.plan(&logical).unwrap();
1238        assert_eq!(physical.columns(), &["n"]);
1239    }
1240
1241    // ==================== Aggregate Tests ====================
1242
1243    #[test]
1244    fn test_plan_aggregate_count() {
1245        let store = create_test_store();
1246        let planner = Planner::new(store);
1247
1248        // MATCH (n) RETURN count(n)
1249        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1250            items: vec![ReturnItem {
1251                expression: LogicalExpression::Variable("cnt".to_string()),
1252                alias: None,
1253            }],
1254            distinct: false,
1255            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1256                group_by: vec![],
1257                aggregates: vec![LogicalAggregateExpr {
1258                    function: LogicalAggregateFunction::Count,
1259                    expression: Some(LogicalExpression::Variable("n".to_string())),
1260                    expression2: None,
1261                    distinct: false,
1262                    alias: Some("cnt".to_string()),
1263                    percentile: None,
1264                    separator: None,
1265                }],
1266                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1267                    variable: "n".to_string(),
1268                    label: None,
1269                    input: None,
1270                })),
1271                having: None,
1272            })),
1273        }));
1274
1275        let physical = planner.plan(&logical).unwrap();
1276        assert!(physical.columns().contains(&"cnt".to_string()));
1277    }
1278
1279    #[test]
1280    fn test_plan_aggregate_with_group_by() {
1281        let store = create_test_store();
1282        let planner = Planner::new(store);
1283
1284        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1285        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1286            group_by: vec![LogicalExpression::Property {
1287                variable: "n".to_string(),
1288                property: "city".to_string(),
1289            }],
1290            aggregates: vec![LogicalAggregateExpr {
1291                function: LogicalAggregateFunction::Count,
1292                expression: Some(LogicalExpression::Variable("n".to_string())),
1293                expression2: None,
1294                distinct: false,
1295                alias: Some("cnt".to_string()),
1296                percentile: None,
1297                separator: None,
1298            }],
1299            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1300                variable: "n".to_string(),
1301                label: Some("Person".to_string()),
1302                input: None,
1303            })),
1304            having: None,
1305        }));
1306
1307        let physical = planner.plan(&logical).unwrap();
1308        assert_eq!(physical.columns().len(), 2);
1309    }
1310
1311    #[test]
1312    fn test_plan_aggregate_sum() {
1313        let store = create_test_store();
1314        let planner = Planner::new(store);
1315
1316        // SUM(n.value)
1317        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1318            group_by: vec![],
1319            aggregates: vec![LogicalAggregateExpr {
1320                function: LogicalAggregateFunction::Sum,
1321                expression: Some(LogicalExpression::Property {
1322                    variable: "n".to_string(),
1323                    property: "value".to_string(),
1324                }),
1325                expression2: None,
1326                distinct: false,
1327                alias: Some("total".to_string()),
1328                percentile: None,
1329                separator: None,
1330            }],
1331            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1332                variable: "n".to_string(),
1333                label: None,
1334                input: None,
1335            })),
1336            having: None,
1337        }));
1338
1339        let physical = planner.plan(&logical).unwrap();
1340        assert!(physical.columns().contains(&"total".to_string()));
1341    }
1342
1343    #[test]
1344    fn test_plan_aggregate_avg() {
1345        let store = create_test_store();
1346        let planner = Planner::new(store);
1347
1348        // AVG(n.score)
1349        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1350            group_by: vec![],
1351            aggregates: vec![LogicalAggregateExpr {
1352                function: LogicalAggregateFunction::Avg,
1353                expression: Some(LogicalExpression::Property {
1354                    variable: "n".to_string(),
1355                    property: "score".to_string(),
1356                }),
1357                expression2: None,
1358                distinct: false,
1359                alias: Some("average".to_string()),
1360                percentile: None,
1361                separator: None,
1362            }],
1363            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1364                variable: "n".to_string(),
1365                label: None,
1366                input: None,
1367            })),
1368            having: None,
1369        }));
1370
1371        let physical = planner.plan(&logical).unwrap();
1372        assert!(physical.columns().contains(&"average".to_string()));
1373    }
1374
1375    #[test]
1376    fn test_plan_aggregate_min_max() {
1377        let store = create_test_store();
1378        let planner = Planner::new(store);
1379
1380        // MIN(n.age), MAX(n.age)
1381        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1382            group_by: vec![],
1383            aggregates: vec![
1384                LogicalAggregateExpr {
1385                    function: LogicalAggregateFunction::Min,
1386                    expression: Some(LogicalExpression::Property {
1387                        variable: "n".to_string(),
1388                        property: "age".to_string(),
1389                    }),
1390                    expression2: None,
1391                    distinct: false,
1392                    alias: Some("youngest".to_string()),
1393                    percentile: None,
1394                    separator: None,
1395                },
1396                LogicalAggregateExpr {
1397                    function: LogicalAggregateFunction::Max,
1398                    expression: Some(LogicalExpression::Property {
1399                        variable: "n".to_string(),
1400                        property: "age".to_string(),
1401                    }),
1402                    expression2: None,
1403                    distinct: false,
1404                    alias: Some("oldest".to_string()),
1405                    percentile: None,
1406                    separator: None,
1407                },
1408            ],
1409            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1410                variable: "n".to_string(),
1411                label: None,
1412                input: None,
1413            })),
1414            having: None,
1415        }));
1416
1417        let physical = planner.plan(&logical).unwrap();
1418        assert!(physical.columns().contains(&"youngest".to_string()));
1419        assert!(physical.columns().contains(&"oldest".to_string()));
1420    }
1421
1422    // ==================== Join Tests ====================
1423
1424    #[test]
1425    fn test_plan_inner_join() {
1426        let store = create_test_store();
1427        let planner = Planner::new(store);
1428
1429        // Inner join between two scans
1430        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1431            items: vec![
1432                ReturnItem {
1433                    expression: LogicalExpression::Variable("a".to_string()),
1434                    alias: None,
1435                },
1436                ReturnItem {
1437                    expression: LogicalExpression::Variable("b".to_string()),
1438                    alias: None,
1439                },
1440            ],
1441            distinct: false,
1442            input: Box::new(LogicalOperator::Join(JoinOp {
1443                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1444                    variable: "a".to_string(),
1445                    label: Some("Person".to_string()),
1446                    input: None,
1447                })),
1448                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1449                    variable: "b".to_string(),
1450                    label: Some("Company".to_string()),
1451                    input: None,
1452                })),
1453                join_type: JoinType::Inner,
1454                conditions: vec![JoinCondition {
1455                    left: LogicalExpression::Variable("a".to_string()),
1456                    right: LogicalExpression::Variable("b".to_string()),
1457                }],
1458            })),
1459        }));
1460
1461        let physical = planner.plan(&logical).unwrap();
1462        assert!(physical.columns().contains(&"a".to_string()));
1463        assert!(physical.columns().contains(&"b".to_string()));
1464    }
1465
1466    #[test]
1467    fn test_plan_cross_join() {
1468        let store = create_test_store();
1469        let planner = Planner::new(store);
1470
1471        // Cross join (no conditions)
1472        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1473            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1474                variable: "a".to_string(),
1475                label: None,
1476                input: None,
1477            })),
1478            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1479                variable: "b".to_string(),
1480                label: None,
1481                input: None,
1482            })),
1483            join_type: JoinType::Cross,
1484            conditions: vec![],
1485        }));
1486
1487        let physical = planner.plan(&logical).unwrap();
1488        assert_eq!(physical.columns().len(), 2);
1489    }
1490
1491    #[test]
1492    fn test_plan_left_join() {
1493        let store = create_test_store();
1494        let planner = Planner::new(store);
1495
1496        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1497            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1498                variable: "a".to_string(),
1499                label: None,
1500                input: None,
1501            })),
1502            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1503                variable: "b".to_string(),
1504                label: None,
1505                input: None,
1506            })),
1507            join_type: JoinType::Left,
1508            conditions: vec![],
1509        }));
1510
1511        let physical = planner.plan(&logical).unwrap();
1512        assert_eq!(physical.columns().len(), 2);
1513    }
1514
1515    // ==================== Mutation Tests ====================
1516
1517    #[test]
1518    fn test_plan_create_node() {
1519        let store = create_test_store();
1520        let planner = Planner::new(store);
1521
1522        // CREATE (n:Person {name: 'Alix'})
1523        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1524            variable: "n".to_string(),
1525            labels: vec!["Person".to_string()],
1526            properties: vec![(
1527                "name".to_string(),
1528                LogicalExpression::Literal(Value::String("Alix".into())),
1529            )],
1530            input: None,
1531        }));
1532
1533        let physical = planner.plan(&logical).unwrap();
1534        assert!(physical.columns().contains(&"n".to_string()));
1535    }
1536
1537    #[test]
1538    fn test_plan_create_edge() {
1539        let store = create_test_store();
1540        let planner = Planner::new(store);
1541
1542        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1543        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1544            variable: Some("r".to_string()),
1545            from_variable: "a".to_string(),
1546            to_variable: "b".to_string(),
1547            edge_type: "KNOWS".to_string(),
1548            properties: vec![],
1549            input: Box::new(LogicalOperator::Join(JoinOp {
1550                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1551                    variable: "a".to_string(),
1552                    label: None,
1553                    input: None,
1554                })),
1555                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1556                    variable: "b".to_string(),
1557                    label: None,
1558                    input: None,
1559                })),
1560                join_type: JoinType::Cross,
1561                conditions: vec![],
1562            })),
1563        }));
1564
1565        let physical = planner.plan(&logical).unwrap();
1566        assert!(physical.columns().contains(&"r".to_string()));
1567    }
1568
1569    #[test]
1570    fn test_plan_delete_node() {
1571        let store = create_test_store();
1572        let planner = Planner::new(store);
1573
1574        // MATCH (n) DELETE n
1575        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1576            variable: "n".to_string(),
1577            detach: false,
1578            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1579                variable: "n".to_string(),
1580                label: None,
1581                input: None,
1582            })),
1583        }));
1584
1585        let physical = planner.plan(&logical).unwrap();
1586        assert!(physical.columns().contains(&"deleted_count".to_string()));
1587    }
1588
1589    // ==================== Error Cases ====================
1590
1591    #[test]
1592    fn test_plan_empty_errors() {
1593        let store = create_test_store();
1594        let planner = Planner::new(store);
1595
1596        let logical = LogicalPlan::new(LogicalOperator::Empty);
1597        let result = planner.plan(&logical);
1598        assert!(result.is_err());
1599    }
1600
1601    #[test]
1602    fn test_plan_missing_variable_in_return() {
1603        let store = create_test_store();
1604        let planner = Planner::new(store);
1605
1606        // Return variable that doesn't exist in input
1607        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1608            items: vec![ReturnItem {
1609                expression: LogicalExpression::Variable("missing".to_string()),
1610                alias: None,
1611            }],
1612            distinct: false,
1613            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1614                variable: "n".to_string(),
1615                label: None,
1616                input: None,
1617            })),
1618        }));
1619
1620        let result = planner.plan(&logical);
1621        assert!(result.is_err());
1622    }
1623
1624    // ==================== Helper Function Tests ====================
1625
1626    #[test]
1627    fn test_convert_binary_ops() {
1628        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1629        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1630        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1631        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1632        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1633        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1634        assert!(convert_binary_op(BinaryOp::And).is_ok());
1635        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1636        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1637        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1638        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1639        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1640    }
1641
1642    #[test]
1643    fn test_convert_unary_ops() {
1644        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1645        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1646        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1647        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1648    }
1649
1650    #[test]
1651    fn test_convert_aggregate_functions() {
1652        assert!(matches!(
1653            convert_aggregate_function(LogicalAggregateFunction::Count),
1654            PhysicalAggregateFunction::Count
1655        ));
1656        assert!(matches!(
1657            convert_aggregate_function(LogicalAggregateFunction::Sum),
1658            PhysicalAggregateFunction::Sum
1659        ));
1660        assert!(matches!(
1661            convert_aggregate_function(LogicalAggregateFunction::Avg),
1662            PhysicalAggregateFunction::Avg
1663        ));
1664        assert!(matches!(
1665            convert_aggregate_function(LogicalAggregateFunction::Min),
1666            PhysicalAggregateFunction::Min
1667        ));
1668        assert!(matches!(
1669            convert_aggregate_function(LogicalAggregateFunction::Max),
1670            PhysicalAggregateFunction::Max
1671        ));
1672    }
1673
1674    #[test]
1675    fn test_planner_accessors() {
1676        let store = create_test_store();
1677        let planner = Planner::new(Arc::clone(&store));
1678
1679        assert!(planner.transaction_id().is_none());
1680        assert!(planner.transaction_manager().is_none());
1681        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1682    }
1683
1684    #[test]
1685    fn test_physical_plan_accessors() {
1686        let store = create_test_store();
1687        let planner = Planner::new(store);
1688
1689        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1690            variable: "n".to_string(),
1691            label: None,
1692            input: None,
1693        }));
1694
1695        let physical = planner.plan(&logical).unwrap();
1696        assert_eq!(physical.columns(), &["n"]);
1697
1698        // Test into_operator
1699        let _ = physical.into_operator();
1700    }
1701
1702    // ==================== Adaptive Planning Tests ====================
1703
1704    #[test]
1705    fn test_plan_adaptive_with_scan() {
1706        let store = create_test_store();
1707        let planner = Planner::new(store);
1708
1709        // MATCH (n:Person) RETURN n
1710        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1711            items: vec![ReturnItem {
1712                expression: LogicalExpression::Variable("n".to_string()),
1713                alias: None,
1714            }],
1715            distinct: false,
1716            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1717                variable: "n".to_string(),
1718                label: Some("Person".to_string()),
1719                input: None,
1720            })),
1721        }));
1722
1723        let physical = planner.plan_adaptive(&logical).unwrap();
1724        assert_eq!(physical.columns(), &["n"]);
1725        // Should have adaptive context with estimates
1726        assert!(physical.adaptive_context.is_some());
1727    }
1728
1729    #[test]
1730    fn test_plan_adaptive_with_filter() {
1731        let store = create_test_store();
1732        let planner = Planner::new(store);
1733
1734        // MATCH (n) WHERE n.age > 30 RETURN n
1735        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1736            items: vec![ReturnItem {
1737                expression: LogicalExpression::Variable("n".to_string()),
1738                alias: None,
1739            }],
1740            distinct: false,
1741            input: Box::new(LogicalOperator::Filter(FilterOp {
1742                predicate: LogicalExpression::Binary {
1743                    left: Box::new(LogicalExpression::Property {
1744                        variable: "n".to_string(),
1745                        property: "age".to_string(),
1746                    }),
1747                    op: BinaryOp::Gt,
1748                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1749                },
1750                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1751                    variable: "n".to_string(),
1752                    label: None,
1753                    input: None,
1754                })),
1755                pushdown_hint: None,
1756            })),
1757        }));
1758
1759        let physical = planner.plan_adaptive(&logical).unwrap();
1760        assert!(physical.adaptive_context.is_some());
1761    }
1762
1763    #[test]
1764    fn test_plan_adaptive_with_expand() {
1765        let store = create_test_store();
1766        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1767
1768        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1769        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1770            items: vec![
1771                ReturnItem {
1772                    expression: LogicalExpression::Variable("a".to_string()),
1773                    alias: None,
1774                },
1775                ReturnItem {
1776                    expression: LogicalExpression::Variable("b".to_string()),
1777                    alias: None,
1778                },
1779            ],
1780            distinct: false,
1781            input: Box::new(LogicalOperator::Expand(ExpandOp {
1782                from_variable: "a".to_string(),
1783                to_variable: "b".to_string(),
1784                edge_variable: None,
1785                direction: ExpandDirection::Outgoing,
1786                edge_types: vec!["KNOWS".to_string()],
1787                min_hops: 1,
1788                max_hops: Some(1),
1789                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1790                    variable: "a".to_string(),
1791                    label: None,
1792                    input: None,
1793                })),
1794                path_alias: None,
1795                path_mode: PathMode::Walk,
1796            })),
1797        }));
1798
1799        let physical = planner.plan_adaptive(&logical).unwrap();
1800        assert!(physical.adaptive_context.is_some());
1801    }
1802
1803    #[test]
1804    fn test_plan_adaptive_with_join() {
1805        let store = create_test_store();
1806        let planner = Planner::new(store);
1807
1808        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1809            items: vec![
1810                ReturnItem {
1811                    expression: LogicalExpression::Variable("a".to_string()),
1812                    alias: None,
1813                },
1814                ReturnItem {
1815                    expression: LogicalExpression::Variable("b".to_string()),
1816                    alias: None,
1817                },
1818            ],
1819            distinct: false,
1820            input: Box::new(LogicalOperator::Join(JoinOp {
1821                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1822                    variable: "a".to_string(),
1823                    label: None,
1824                    input: None,
1825                })),
1826                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1827                    variable: "b".to_string(),
1828                    label: None,
1829                    input: None,
1830                })),
1831                join_type: JoinType::Cross,
1832                conditions: vec![],
1833            })),
1834        }));
1835
1836        let physical = planner.plan_adaptive(&logical).unwrap();
1837        assert!(physical.adaptive_context.is_some());
1838    }
1839
1840    #[test]
1841    fn test_plan_adaptive_with_aggregate() {
1842        let store = create_test_store();
1843        let planner = Planner::new(store);
1844
1845        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1846            group_by: vec![],
1847            aggregates: vec![LogicalAggregateExpr {
1848                function: LogicalAggregateFunction::Count,
1849                expression: Some(LogicalExpression::Variable("n".to_string())),
1850                expression2: None,
1851                distinct: false,
1852                alias: Some("cnt".to_string()),
1853                percentile: None,
1854                separator: None,
1855            }],
1856            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1857                variable: "n".to_string(),
1858                label: None,
1859                input: None,
1860            })),
1861            having: None,
1862        }));
1863
1864        let physical = planner.plan_adaptive(&logical).unwrap();
1865        assert!(physical.adaptive_context.is_some());
1866    }
1867
1868    #[test]
1869    fn test_plan_adaptive_with_distinct() {
1870        let store = create_test_store();
1871        let planner = Planner::new(store);
1872
1873        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1874            items: vec![ReturnItem {
1875                expression: LogicalExpression::Variable("n".to_string()),
1876                alias: None,
1877            }],
1878            distinct: false,
1879            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1880                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1881                    variable: "n".to_string(),
1882                    label: None,
1883                    input: None,
1884                })),
1885                columns: None,
1886            })),
1887        }));
1888
1889        let physical = planner.plan_adaptive(&logical).unwrap();
1890        assert!(physical.adaptive_context.is_some());
1891    }
1892
1893    #[test]
1894    fn test_plan_adaptive_with_limit() {
1895        let store = create_test_store();
1896        let planner = Planner::new(store);
1897
1898        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1899            items: vec![ReturnItem {
1900                expression: LogicalExpression::Variable("n".to_string()),
1901                alias: None,
1902            }],
1903            distinct: false,
1904            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1905                count: 10.into(),
1906                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1907                    variable: "n".to_string(),
1908                    label: None,
1909                    input: None,
1910                })),
1911            })),
1912        }));
1913
1914        let physical = planner.plan_adaptive(&logical).unwrap();
1915        assert!(physical.adaptive_context.is_some());
1916    }
1917
1918    #[test]
1919    fn test_plan_adaptive_with_skip() {
1920        let store = create_test_store();
1921        let planner = Planner::new(store);
1922
1923        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1924            items: vec![ReturnItem {
1925                expression: LogicalExpression::Variable("n".to_string()),
1926                alias: None,
1927            }],
1928            distinct: false,
1929            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1930                count: 5.into(),
1931                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1932                    variable: "n".to_string(),
1933                    label: None,
1934                    input: None,
1935                })),
1936            })),
1937        }));
1938
1939        let physical = planner.plan_adaptive(&logical).unwrap();
1940        assert!(physical.adaptive_context.is_some());
1941    }
1942
1943    #[test]
1944    fn test_plan_adaptive_with_sort() {
1945        let store = create_test_store();
1946        let planner = Planner::new(store);
1947
1948        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1949            items: vec![ReturnItem {
1950                expression: LogicalExpression::Variable("n".to_string()),
1951                alias: None,
1952            }],
1953            distinct: false,
1954            input: Box::new(LogicalOperator::Sort(SortOp {
1955                keys: vec![SortKey {
1956                    expression: LogicalExpression::Variable("n".to_string()),
1957                    order: SortOrder::Ascending,
1958                    nulls: None,
1959                }],
1960                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1961                    variable: "n".to_string(),
1962                    label: None,
1963                    input: None,
1964                })),
1965            })),
1966        }));
1967
1968        let physical = planner.plan_adaptive(&logical).unwrap();
1969        assert!(physical.adaptive_context.is_some());
1970    }
1971
1972    #[test]
1973    fn test_plan_adaptive_with_union() {
1974        let store = create_test_store();
1975        let planner = Planner::new(store);
1976
1977        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1978            items: vec![ReturnItem {
1979                expression: LogicalExpression::Variable("n".to_string()),
1980                alias: None,
1981            }],
1982            distinct: false,
1983            input: Box::new(LogicalOperator::Union(UnionOp {
1984                inputs: vec![
1985                    LogicalOperator::NodeScan(NodeScanOp {
1986                        variable: "n".to_string(),
1987                        label: Some("Person".to_string()),
1988                        input: None,
1989                    }),
1990                    LogicalOperator::NodeScan(NodeScanOp {
1991                        variable: "n".to_string(),
1992                        label: Some("Company".to_string()),
1993                        input: None,
1994                    }),
1995                ],
1996            })),
1997        }));
1998
1999        let physical = planner.plan_adaptive(&logical).unwrap();
2000        assert!(physical.adaptive_context.is_some());
2001    }
2002
2003    // ==================== Variable Length Path Tests ====================
2004
2005    #[test]
2006    fn test_plan_expand_variable_length() {
2007        let store = create_test_store();
2008        let planner = Planner::new(store);
2009
2010        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2011        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2012            items: vec![
2013                ReturnItem {
2014                    expression: LogicalExpression::Variable("a".to_string()),
2015                    alias: None,
2016                },
2017                ReturnItem {
2018                    expression: LogicalExpression::Variable("b".to_string()),
2019                    alias: None,
2020                },
2021            ],
2022            distinct: false,
2023            input: Box::new(LogicalOperator::Expand(ExpandOp {
2024                from_variable: "a".to_string(),
2025                to_variable: "b".to_string(),
2026                edge_variable: None,
2027                direction: ExpandDirection::Outgoing,
2028                edge_types: vec!["KNOWS".to_string()],
2029                min_hops: 1,
2030                max_hops: Some(3),
2031                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2032                    variable: "a".to_string(),
2033                    label: None,
2034                    input: None,
2035                })),
2036                path_alias: None,
2037                path_mode: PathMode::Walk,
2038            })),
2039        }));
2040
2041        let physical = planner.plan(&logical).unwrap();
2042        assert!(physical.columns().contains(&"a".to_string()));
2043        assert!(physical.columns().contains(&"b".to_string()));
2044    }
2045
2046    #[test]
2047    fn test_plan_expand_with_path_alias() {
2048        let store = create_test_store();
2049        let planner = Planner::new(store);
2050
2051        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2052        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2053            items: vec![
2054                ReturnItem {
2055                    expression: LogicalExpression::Variable("a".to_string()),
2056                    alias: None,
2057                },
2058                ReturnItem {
2059                    expression: LogicalExpression::Variable("b".to_string()),
2060                    alias: None,
2061                },
2062            ],
2063            distinct: false,
2064            input: Box::new(LogicalOperator::Expand(ExpandOp {
2065                from_variable: "a".to_string(),
2066                to_variable: "b".to_string(),
2067                edge_variable: None,
2068                direction: ExpandDirection::Outgoing,
2069                edge_types: vec!["KNOWS".to_string()],
2070                min_hops: 1,
2071                max_hops: Some(3),
2072                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2073                    variable: "a".to_string(),
2074                    label: None,
2075                    input: None,
2076                })),
2077                path_alias: Some("p".to_string()),
2078                path_mode: PathMode::Walk,
2079            })),
2080        }));
2081
2082        let physical = planner.plan(&logical).unwrap();
2083        // Verify plan was created successfully with expected output columns
2084        assert!(physical.columns().contains(&"a".to_string()));
2085        assert!(physical.columns().contains(&"b".to_string()));
2086    }
2087
2088    #[test]
2089    fn test_plan_expand_incoming() {
2090        let store = create_test_store();
2091        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2092
2093        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2094        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2095            items: vec![
2096                ReturnItem {
2097                    expression: LogicalExpression::Variable("a".to_string()),
2098                    alias: None,
2099                },
2100                ReturnItem {
2101                    expression: LogicalExpression::Variable("b".to_string()),
2102                    alias: None,
2103                },
2104            ],
2105            distinct: false,
2106            input: Box::new(LogicalOperator::Expand(ExpandOp {
2107                from_variable: "a".to_string(),
2108                to_variable: "b".to_string(),
2109                edge_variable: None,
2110                direction: ExpandDirection::Incoming,
2111                edge_types: vec!["KNOWS".to_string()],
2112                min_hops: 1,
2113                max_hops: Some(1),
2114                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2115                    variable: "a".to_string(),
2116                    label: None,
2117                    input: None,
2118                })),
2119                path_alias: None,
2120                path_mode: PathMode::Walk,
2121            })),
2122        }));
2123
2124        let physical = planner.plan(&logical).unwrap();
2125        assert!(physical.columns().contains(&"a".to_string()));
2126        assert!(physical.columns().contains(&"b".to_string()));
2127    }
2128
2129    #[test]
2130    fn test_plan_expand_both_directions() {
2131        let store = create_test_store();
2132        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2133
2134        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2135        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2136            items: vec![
2137                ReturnItem {
2138                    expression: LogicalExpression::Variable("a".to_string()),
2139                    alias: None,
2140                },
2141                ReturnItem {
2142                    expression: LogicalExpression::Variable("b".to_string()),
2143                    alias: None,
2144                },
2145            ],
2146            distinct: false,
2147            input: Box::new(LogicalOperator::Expand(ExpandOp {
2148                from_variable: "a".to_string(),
2149                to_variable: "b".to_string(),
2150                edge_variable: None,
2151                direction: ExpandDirection::Both,
2152                edge_types: vec!["KNOWS".to_string()],
2153                min_hops: 1,
2154                max_hops: Some(1),
2155                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2156                    variable: "a".to_string(),
2157                    label: None,
2158                    input: None,
2159                })),
2160                path_alias: None,
2161                path_mode: PathMode::Walk,
2162            })),
2163        }));
2164
2165        let physical = planner.plan(&logical).unwrap();
2166        assert!(physical.columns().contains(&"a".to_string()));
2167        assert!(physical.columns().contains(&"b".to_string()));
2168    }
2169
2170    // ==================== With Context Tests ====================
2171
2172    #[test]
2173    fn test_planner_with_context() {
2174        use crate::transaction::TransactionManager;
2175
2176        let store = create_test_store();
2177        let transaction_manager = Arc::new(TransactionManager::new());
2178        let transaction_id = transaction_manager.begin();
2179        let epoch = transaction_manager.current_epoch();
2180
2181        let planner = Planner::with_context(
2182            Arc::clone(&store),
2183            Arc::clone(&transaction_manager),
2184            Some(transaction_id),
2185            epoch,
2186        );
2187
2188        assert_eq!(planner.transaction_id(), Some(transaction_id));
2189        assert!(planner.transaction_manager().is_some());
2190        assert_eq!(planner.viewing_epoch(), epoch);
2191    }
2192
2193    #[test]
2194    fn test_planner_with_factorized_execution_disabled() {
2195        let store = create_test_store();
2196        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2197
2198        // Two consecutive expands - should NOT use factorized execution
2199        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2200            items: vec![
2201                ReturnItem {
2202                    expression: LogicalExpression::Variable("a".to_string()),
2203                    alias: None,
2204                },
2205                ReturnItem {
2206                    expression: LogicalExpression::Variable("c".to_string()),
2207                    alias: None,
2208                },
2209            ],
2210            distinct: false,
2211            input: Box::new(LogicalOperator::Expand(ExpandOp {
2212                from_variable: "b".to_string(),
2213                to_variable: "c".to_string(),
2214                edge_variable: None,
2215                direction: ExpandDirection::Outgoing,
2216                edge_types: vec![],
2217                min_hops: 1,
2218                max_hops: Some(1),
2219                input: Box::new(LogicalOperator::Expand(ExpandOp {
2220                    from_variable: "a".to_string(),
2221                    to_variable: "b".to_string(),
2222                    edge_variable: None,
2223                    direction: ExpandDirection::Outgoing,
2224                    edge_types: vec![],
2225                    min_hops: 1,
2226                    max_hops: Some(1),
2227                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2228                        variable: "a".to_string(),
2229                        label: None,
2230                        input: None,
2231                    })),
2232                    path_alias: None,
2233                    path_mode: PathMode::Walk,
2234                })),
2235                path_alias: None,
2236                path_mode: PathMode::Walk,
2237            })),
2238        }));
2239
2240        let physical = planner.plan(&logical).unwrap();
2241        assert!(physical.columns().contains(&"a".to_string()));
2242        assert!(physical.columns().contains(&"c".to_string()));
2243    }
2244
2245    // ==================== Sort with Property Tests ====================
2246
2247    #[test]
2248    fn test_plan_sort_by_property() {
2249        let store = create_test_store();
2250        let planner = Planner::new(store);
2251
2252        // MATCH (n) RETURN n ORDER BY n.name ASC
2253        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2254            items: vec![ReturnItem {
2255                expression: LogicalExpression::Variable("n".to_string()),
2256                alias: None,
2257            }],
2258            distinct: false,
2259            input: Box::new(LogicalOperator::Sort(SortOp {
2260                keys: vec![SortKey {
2261                    expression: LogicalExpression::Property {
2262                        variable: "n".to_string(),
2263                        property: "name".to_string(),
2264                    },
2265                    order: SortOrder::Ascending,
2266                    nulls: None,
2267                }],
2268                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2269                    variable: "n".to_string(),
2270                    label: None,
2271                    input: None,
2272                })),
2273            })),
2274        }));
2275
2276        let physical = planner.plan(&logical).unwrap();
2277        // Should have the property column projected
2278        assert!(physical.columns().contains(&"n".to_string()));
2279    }
2280
2281    // ==================== Scan with Input Tests ====================
2282
2283    #[test]
2284    fn test_plan_scan_with_input() {
2285        let store = create_test_store();
2286        let planner = Planner::new(store);
2287
2288        // A scan with another scan as input (for chained patterns)
2289        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2290            items: vec![
2291                ReturnItem {
2292                    expression: LogicalExpression::Variable("a".to_string()),
2293                    alias: None,
2294                },
2295                ReturnItem {
2296                    expression: LogicalExpression::Variable("b".to_string()),
2297                    alias: None,
2298                },
2299            ],
2300            distinct: false,
2301            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2302                variable: "b".to_string(),
2303                label: Some("Company".to_string()),
2304                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2305                    variable: "a".to_string(),
2306                    label: Some("Person".to_string()),
2307                    input: None,
2308                }))),
2309            })),
2310        }));
2311
2312        let physical = planner.plan(&logical).unwrap();
2313        assert!(physical.columns().contains(&"a".to_string()));
2314        assert!(physical.columns().contains(&"b".to_string()));
2315    }
2316}