Skip to main content

grafeo_engine/query/planner/
mod.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7mod aggregate;
8mod expand;
9mod expression;
10mod filter;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16#[cfg(feature = "algos")]
17use crate::query::plan::CallProcedureOp;
18use crate::query::plan::{
19    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
20    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExceptOp,
21    ExpandDirection, ExpandOp, FilterOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp,
22    LogicalExpression, LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TxId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
32    AggregateFunction as PhysicalAggregateFunction, ApplyOperator, BinaryFilterOp,
33    ConstraintValidator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
34    DeleteNodeOperator, DistinctOperator, EmptyOperator, ExceptOperator, ExecutionPathMode,
35    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
36    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
37    HashJoinOperator, IntersectOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
38    LeapfrogJoinOperator, LimitOperator, MapCollectOperator, MergeOperator,
39    MergeRelationshipConfig, MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator,
40    NullOrder, Operator, OtherwiseOperator, ProjectExpr, ProjectOperator, PropertySource,
41    RemoveLabelOperator, ScanOperator, SetPropertyOperator, ShortestPathOperator,
42    SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
43    UnaryFilterOp, UnionOperator, UnwindOperator, VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::transaction::TransactionManager;
50
51/// Range bounds for property-based range queries.
52struct RangeBounds<'a> {
53    min: Option<&'a Value>,
54    max: Option<&'a Value>,
55    min_inclusive: bool,
56    max_inclusive: bool,
57}
58
59/// Converts a logical plan to a physical operator tree.
60pub struct Planner {
61    /// The graph store (supports both read and write operations).
62    pub(super) store: Arc<dyn GraphStoreMut>,
63    /// Transaction manager for MVCC operations.
64    pub(super) tx_manager: Option<Arc<TransactionManager>>,
65    /// Current transaction ID (if in a transaction).
66    pub(super) tx_id: Option<TxId>,
67    /// Epoch to use for visibility checks.
68    pub(super) viewing_epoch: EpochId,
69    /// Counter for generating unique anonymous edge column names.
70    pub(super) anon_edge_counter: std::cell::Cell<u32>,
71    /// Whether to use factorized execution for multi-hop queries.
72    pub(super) factorized_execution: bool,
73    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
74    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
75    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
76    /// Variables that hold edge IDs (from MATCH edge patterns).
77    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
78    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
79    /// Optional constraint validator for schema enforcement during mutations.
80    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
81    /// Catalog for user-defined procedure lookup.
82    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
83}
84
85impl Planner {
86    /// Creates a new planner with the given store.
87    ///
88    /// This creates a planner without transaction context, using the current
89    /// epoch from the store for visibility.
90    #[must_use]
91    pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
92        let epoch = store.current_epoch();
93        Self {
94            store,
95            tx_manager: None,
96            tx_id: None,
97            viewing_epoch: epoch,
98            anon_edge_counter: std::cell::Cell::new(0),
99            factorized_execution: true,
100            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
101            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
102            validator: None,
103            catalog: None,
104        }
105    }
106
107    /// Creates a new planner with transaction context for MVCC-aware planning.
108    ///
109    /// # Arguments
110    ///
111    /// * `store` - The graph store
112    /// * `tx_manager` - Transaction manager for recording reads/writes
113    /// * `tx_id` - Current transaction ID (None for auto-commit)
114    /// * `viewing_epoch` - Epoch to use for version visibility
115    #[must_use]
116    pub fn with_context(
117        store: Arc<dyn GraphStoreMut>,
118        tx_manager: Arc<TransactionManager>,
119        tx_id: Option<TxId>,
120        viewing_epoch: EpochId,
121    ) -> Self {
122        Self {
123            store,
124            tx_manager: Some(tx_manager),
125            tx_id,
126            viewing_epoch,
127            anon_edge_counter: std::cell::Cell::new(0),
128            factorized_execution: true,
129            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131            validator: None,
132            catalog: None,
133        }
134    }
135
136    /// Returns the viewing epoch for this planner.
137    #[must_use]
138    pub fn viewing_epoch(&self) -> EpochId {
139        self.viewing_epoch
140    }
141
142    /// Returns the transaction ID for this planner, if any.
143    #[must_use]
144    pub fn tx_id(&self) -> Option<TxId> {
145        self.tx_id
146    }
147
148    /// Returns a reference to the transaction manager, if available.
149    #[must_use]
150    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
151        self.tx_manager.as_ref()
152    }
153
154    /// Enables or disables factorized execution for multi-hop queries.
155    #[must_use]
156    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
157        self.factorized_execution = enabled;
158        self
159    }
160
161    /// Sets the constraint validator for schema enforcement during mutations.
162    #[must_use]
163    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
164        self.validator = Some(validator);
165        self
166    }
167
168    /// Sets the catalog for user-defined procedure lookup.
169    #[must_use]
170    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
171        self.catalog = Some(catalog);
172        self
173    }
174
175    /// Counts consecutive single-hop expand operations.
176    ///
177    /// Returns the count and the deepest non-expand operator (the base of the chain).
178    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
179        match op {
180            LogicalOperator::Expand(expand) => {
181                // Only count single-hop expands (factorization doesn't apply to variable-length)
182                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
183
184                if is_single_hop {
185                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
186                    (inner_count + 1, base)
187                } else {
188                    // Variable-length path breaks the chain
189                    (0, op)
190                }
191            }
192            _ => (0, op),
193        }
194    }
195
196    /// Collects expand operations from the outermost down to the base.
197    ///
198    /// Returns expands in order from innermost (base) to outermost.
199    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
200        let mut chain = Vec::new();
201        let mut current = op;
202
203        while let LogicalOperator::Expand(expand) = current {
204            // Only include single-hop expands
205            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
206            if !is_single_hop {
207                break;
208            }
209            chain.push(expand);
210            current = &expand.input;
211        }
212
213        // Reverse so we go from base to outer
214        chain.reverse();
215        chain
216    }
217
218    /// Plans a logical plan into a physical operator.
219    ///
220    /// # Errors
221    ///
222    /// Returns an error if planning fails.
223    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
224        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
225        Ok(PhysicalPlan {
226            operator,
227            columns,
228            adaptive_context: None,
229        })
230    }
231
232    /// Plans a logical plan with adaptive execution support.
233    ///
234    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
235    /// joins) that can be monitored during execution to detect estimate errors.
236    ///
237    /// # Errors
238    ///
239    /// Returns an error if planning fails.
240    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
241        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
242
243        // Build adaptive context with cardinality estimates
244        let mut adaptive_context = AdaptiveContext::new();
245        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
246
247        Ok(PhysicalPlan {
248            operator,
249            columns,
250            adaptive_context: Some(adaptive_context),
251        })
252    }
253
254    /// Collects cardinality estimates from the logical plan into an adaptive context.
255    fn collect_cardinality_estimates(
256        &self,
257        op: &LogicalOperator,
258        ctx: &mut AdaptiveContext,
259        depth: usize,
260    ) {
261        match op {
262            LogicalOperator::NodeScan(scan) => {
263                // Estimate based on label statistics
264                let estimate = if let Some(label) = &scan.label {
265                    self.store.nodes_by_label(label).len() as f64
266                } else {
267                    self.store.node_count() as f64
268                };
269                let id = format!("scan_{}", scan.variable);
270                ctx.set_estimate(&id, estimate);
271
272                // Recurse into input if present
273                if let Some(input) = &scan.input {
274                    self.collect_cardinality_estimates(input, ctx, depth + 1);
275                }
276            }
277            LogicalOperator::Filter(filter) => {
278                // Default selectivity estimate for filters (30%)
279                let input_estimate = self.estimate_cardinality(&filter.input);
280                let estimate = input_estimate * 0.3;
281                let id = format!("filter_{depth}");
282                ctx.set_estimate(&id, estimate);
283
284                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
285            }
286            LogicalOperator::Expand(expand) => {
287                // Estimate based on average degree from store statistics
288                let input_estimate = self.estimate_cardinality(&expand.input);
289                let stats = self.store.statistics();
290                let avg_degree = self.estimate_expand_degree(&stats, expand);
291                let estimate = input_estimate * avg_degree;
292                let id = format!("expand_{}", expand.to_variable);
293                ctx.set_estimate(&id, estimate);
294
295                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
296            }
297            LogicalOperator::Join(join) => {
298                // Estimate join output (product with selectivity)
299                let left_est = self.estimate_cardinality(&join.left);
300                let right_est = self.estimate_cardinality(&join.right);
301                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
302                let id = format!("join_{depth}");
303                ctx.set_estimate(&id, estimate);
304
305                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
306                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
307            }
308            LogicalOperator::Aggregate(agg) => {
309                // Aggregates typically reduce cardinality
310                let input_estimate = self.estimate_cardinality(&agg.input);
311                let estimate = if agg.group_by.is_empty() {
312                    1.0 // Scalar aggregate
313                } else {
314                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
315                };
316                let id = format!("aggregate_{depth}");
317                ctx.set_estimate(&id, estimate);
318
319                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
320            }
321            LogicalOperator::Distinct(distinct) => {
322                let input_estimate = self.estimate_cardinality(&distinct.input);
323                let estimate = (input_estimate * 0.5).max(1.0);
324                let id = format!("distinct_{depth}");
325                ctx.set_estimate(&id, estimate);
326
327                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
328            }
329            LogicalOperator::Return(ret) => {
330                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
331            }
332            LogicalOperator::Limit(limit) => {
333                let input_estimate = self.estimate_cardinality(&limit.input);
334                let estimate = (input_estimate).min(limit.count as f64);
335                let id = format!("limit_{depth}");
336                ctx.set_estimate(&id, estimate);
337
338                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
339            }
340            LogicalOperator::Skip(skip) => {
341                let input_estimate = self.estimate_cardinality(&skip.input);
342                let estimate = (input_estimate - skip.count as f64).max(0.0);
343                let id = format!("skip_{depth}");
344                ctx.set_estimate(&id, estimate);
345
346                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
347            }
348            LogicalOperator::Sort(sort) => {
349                // Sort doesn't change cardinality
350                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
351            }
352            LogicalOperator::Union(union) => {
353                let estimate: f64 = union
354                    .inputs
355                    .iter()
356                    .map(|input| self.estimate_cardinality(input))
357                    .sum();
358                let id = format!("union_{depth}");
359                ctx.set_estimate(&id, estimate);
360
361                for input in &union.inputs {
362                    self.collect_cardinality_estimates(input, ctx, depth + 1);
363                }
364            }
365            _ => {
366                // For other operators, try to recurse into known input patterns
367            }
368        }
369    }
370
371    /// Estimates cardinality for a logical operator subtree.
372    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
373        match op {
374            LogicalOperator::NodeScan(scan) => {
375                if let Some(label) = &scan.label {
376                    self.store.nodes_by_label(label).len() as f64
377                } else {
378                    self.store.node_count() as f64
379                }
380            }
381            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
382            LogicalOperator::Expand(expand) => {
383                let stats = self.store.statistics();
384                let avg_degree = self.estimate_expand_degree(&stats, expand);
385                self.estimate_cardinality(&expand.input) * avg_degree
386            }
387            LogicalOperator::Join(join) => {
388                let left = self.estimate_cardinality(&join.left);
389                let right = self.estimate_cardinality(&join.right);
390                (left * right).sqrt()
391            }
392            LogicalOperator::Aggregate(agg) => {
393                if agg.group_by.is_empty() {
394                    1.0
395                } else {
396                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
397                }
398            }
399            LogicalOperator::Distinct(distinct) => {
400                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
401            }
402            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
403            LogicalOperator::Limit(limit) => self
404                .estimate_cardinality(&limit.input)
405                .min(limit.count as f64),
406            LogicalOperator::Skip(skip) => {
407                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
408            }
409            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
410            LogicalOperator::Union(union) => union
411                .inputs
412                .iter()
413                .map(|input| self.estimate_cardinality(input))
414                .sum(),
415            LogicalOperator::Except(except) => {
416                let left = self.estimate_cardinality(&except.left);
417                let right = self.estimate_cardinality(&except.right);
418                (left - right).max(0.0)
419            }
420            LogicalOperator::Intersect(intersect) => {
421                let left = self.estimate_cardinality(&intersect.left);
422                let right = self.estimate_cardinality(&intersect.right);
423                left.min(right)
424            }
425            LogicalOperator::Otherwise(otherwise) => self
426                .estimate_cardinality(&otherwise.left)
427                .max(self.estimate_cardinality(&otherwise.right)),
428            _ => 1000.0, // Default estimate for unknown operators
429        }
430    }
431
432    /// Estimates the average edge degree for an expand operation using store statistics.
433    fn estimate_expand_degree(
434        &self,
435        stats: &grafeo_core::statistics::Statistics,
436        expand: &ExpandOp,
437    ) -> f64 {
438        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
439        if expand.edge_types.len() == 1 {
440            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
441        } else if stats.total_nodes > 0 {
442            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
443        } else {
444            10.0 // fallback for empty graph
445        }
446    }
447
448    /// Plans a single logical operator.
449    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
450        match op {
451            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
452            LogicalOperator::Expand(expand) => {
453                // Check for expand chains when factorized execution is enabled
454                if self.factorized_execution {
455                    let (chain_len, _base) = Self::count_expand_chain(op);
456                    if chain_len >= 2 {
457                        // Use factorized chain for 2+ consecutive single-hop expands
458                        return self.plan_expand_chain(op);
459                    }
460                }
461                self.plan_expand(expand)
462            }
463            LogicalOperator::Return(ret) => self.plan_return(ret),
464            LogicalOperator::Filter(filter) => self.plan_filter(filter),
465            LogicalOperator::Project(project) => self.plan_project(project),
466            LogicalOperator::Limit(limit) => self.plan_limit(limit),
467            LogicalOperator::Skip(skip) => self.plan_skip(skip),
468            LogicalOperator::Sort(sort) => self.plan_sort(sort),
469            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
470            LogicalOperator::Join(join) => self.plan_join(join),
471            LogicalOperator::Union(union) => self.plan_union(union),
472            LogicalOperator::Except(except) => self.plan_except(except),
473            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
474            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
475            LogicalOperator::Apply(apply) => self.plan_apply(apply),
476            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
477            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
478            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
479            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
480            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
481            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
482            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
483            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
484            LogicalOperator::Merge(merge) => self.plan_merge(merge),
485            LogicalOperator::MergeRelationship(merge_rel) => {
486                self.plan_merge_relationship(merge_rel)
487            }
488            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
489            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
490            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
491            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
492            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
493            #[cfg(feature = "algos")]
494            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
495            #[cfg(not(feature = "algos"))]
496            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
497                "CALL procedures require the 'algos' feature".to_string(),
498            )),
499            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
500            LogicalOperator::VectorScan(_) => Err(Error::Internal(
501                "VectorScan requires vector-index feature".to_string(),
502            )),
503            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
504                "VectorJoin requires vector-index feature".to_string(),
505            )),
506            _ => Err(Error::Internal(format!(
507                "Unsupported operator: {:?}",
508                std::mem::discriminant(op)
509            ))),
510        }
511    }
512
513    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
514    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
515        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
516        let key_idx = child_columns
517            .iter()
518            .position(|c| c == &mc.key_var)
519            .ok_or_else(|| {
520                Error::Internal(format!(
521                    "MapCollect key '{}' not in columns {:?}",
522                    mc.key_var, child_columns
523                ))
524            })?;
525        let value_idx = child_columns
526            .iter()
527            .position(|c| c == &mc.value_var)
528            .ok_or_else(|| {
529                Error::Internal(format!(
530                    "MapCollect value '{}' not in columns {:?}",
531                    mc.value_var, child_columns
532                ))
533            })?;
534        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
535        // Register the output alias as a scalar column so that the Return
536        // planner emits a plain Column pass-through instead of NodeResolve.
537        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
538        Ok((operator, vec![mc.alias.clone()]))
539    }
540}
541
542/// Converts a logical binary operator to a filter binary operator.
543pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
544    match op {
545        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
546        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
547        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
548        BinaryOp::Le => Ok(BinaryFilterOp::Le),
549        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
550        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
551        BinaryOp::And => Ok(BinaryFilterOp::And),
552        BinaryOp::Or => Ok(BinaryFilterOp::Or),
553        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
554        BinaryOp::Add => Ok(BinaryFilterOp::Add),
555        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
556        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
557        BinaryOp::Div => Ok(BinaryFilterOp::Div),
558        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
559        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
560        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
561        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
562        BinaryOp::In => Ok(BinaryFilterOp::In),
563        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
564        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
565        BinaryOp::Concat => Ok(BinaryFilterOp::Concat),
566        BinaryOp::Like => Ok(BinaryFilterOp::Like),
567    }
568}
569
570/// Converts a logical unary operator to a filter unary operator.
571pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
572    match op {
573        UnaryOp::Not => Ok(UnaryFilterOp::Not),
574        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
575        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
576        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
577    }
578}
579
580/// Converts a logical aggregate function to a physical aggregate function.
581pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
582    match func {
583        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
584        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
585        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
586        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
587        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
588        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
589        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
590        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
591        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
592        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
593        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
594    }
595}
596
597/// Converts a logical expression to a filter expression.
598///
599/// This is a standalone function that can be used by both LPG and RDF planners.
600pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
601    match expr {
602        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
603        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
604        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
605            variable: variable.clone(),
606            property: property.clone(),
607        }),
608        LogicalExpression::Binary { left, op, right } => {
609            let left_expr = convert_filter_expression(left)?;
610            let right_expr = convert_filter_expression(right)?;
611            let filter_op = convert_binary_op(*op)?;
612            Ok(FilterExpression::Binary {
613                left: Box::new(left_expr),
614                op: filter_op,
615                right: Box::new(right_expr),
616            })
617        }
618        LogicalExpression::Unary { op, operand } => {
619            let operand_expr = convert_filter_expression(operand)?;
620            let filter_op = convert_unary_op(*op)?;
621            Ok(FilterExpression::Unary {
622                op: filter_op,
623                operand: Box::new(operand_expr),
624            })
625        }
626        LogicalExpression::FunctionCall { name, args, .. } => {
627            let filter_args: Vec<FilterExpression> = args
628                .iter()
629                .map(convert_filter_expression)
630                .collect::<Result<Vec<_>>>()?;
631            Ok(FilterExpression::FunctionCall {
632                name: name.clone(),
633                args: filter_args,
634            })
635        }
636        LogicalExpression::Case {
637            operand,
638            when_clauses,
639            else_clause,
640        } => {
641            let filter_operand = operand
642                .as_ref()
643                .map(|e| convert_filter_expression(e))
644                .transpose()?
645                .map(Box::new);
646            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
647                .iter()
648                .map(|(cond, result)| {
649                    Ok((
650                        convert_filter_expression(cond)?,
651                        convert_filter_expression(result)?,
652                    ))
653                })
654                .collect::<Result<Vec<_>>>()?;
655            let filter_else = else_clause
656                .as_ref()
657                .map(|e| convert_filter_expression(e))
658                .transpose()?
659                .map(Box::new);
660            Ok(FilterExpression::Case {
661                operand: filter_operand,
662                when_clauses: filter_when_clauses,
663                else_clause: filter_else,
664            })
665        }
666        LogicalExpression::List(items) => {
667            let filter_items: Vec<FilterExpression> = items
668                .iter()
669                .map(convert_filter_expression)
670                .collect::<Result<Vec<_>>>()?;
671            Ok(FilterExpression::List(filter_items))
672        }
673        LogicalExpression::Map(pairs) => {
674            let filter_pairs: Vec<(String, FilterExpression)> = pairs
675                .iter()
676                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
677                .collect::<Result<Vec<_>>>()?;
678            Ok(FilterExpression::Map(filter_pairs))
679        }
680        LogicalExpression::IndexAccess { base, index } => {
681            let base_expr = convert_filter_expression(base)?;
682            let index_expr = convert_filter_expression(index)?;
683            Ok(FilterExpression::IndexAccess {
684                base: Box::new(base_expr),
685                index: Box::new(index_expr),
686            })
687        }
688        LogicalExpression::SliceAccess { base, start, end } => {
689            let base_expr = convert_filter_expression(base)?;
690            let start_expr = start
691                .as_ref()
692                .map(|s| convert_filter_expression(s))
693                .transpose()?
694                .map(Box::new);
695            let end_expr = end
696                .as_ref()
697                .map(|e| convert_filter_expression(e))
698                .transpose()?
699                .map(Box::new);
700            Ok(FilterExpression::SliceAccess {
701                base: Box::new(base_expr),
702                start: start_expr,
703                end: end_expr,
704            })
705        }
706        LogicalExpression::Parameter(_) => Err(Error::Internal(
707            "Parameters not yet supported in filters".to_string(),
708        )),
709        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
710        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
711        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
712        LogicalExpression::ListComprehension {
713            variable,
714            list_expr,
715            filter_expr,
716            map_expr,
717        } => {
718            let list = convert_filter_expression(list_expr)?;
719            let filter = filter_expr
720                .as_ref()
721                .map(|f| convert_filter_expression(f))
722                .transpose()?
723                .map(Box::new);
724            let map = convert_filter_expression(map_expr)?;
725            Ok(FilterExpression::ListComprehension {
726                variable: variable.clone(),
727                list_expr: Box::new(list),
728                filter_expr: filter,
729                map_expr: Box::new(map),
730            })
731        }
732        LogicalExpression::ListPredicate {
733            kind,
734            variable,
735            list_expr,
736            predicate,
737        } => {
738            use crate::query::plan::ListPredicateKind as LPK;
739            let filter_kind = match kind {
740                LPK::All => grafeo_core::execution::operators::ListPredicateKind::All,
741                LPK::Any => grafeo_core::execution::operators::ListPredicateKind::Any,
742                LPK::None => grafeo_core::execution::operators::ListPredicateKind::None,
743                LPK::Single => grafeo_core::execution::operators::ListPredicateKind::Single,
744            };
745            let list = convert_filter_expression(list_expr)?;
746            let pred = convert_filter_expression(predicate)?;
747            Ok(FilterExpression::ListPredicate {
748                kind: filter_kind,
749                variable: variable.clone(),
750                list_expr: Box::new(list),
751                predicate: Box::new(pred),
752            })
753        }
754        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
755            // Complex subqueries are handled at the plan_filter level via semi-join
756            // or Apply rewrites. If we reach here, the subquery is in a position that
757            // cannot be rewritten (e.g., nested inside a CASE expression). Return a
758            // literal false/zero as a safe fallback.
759            Err(Error::Internal(
760                "Subquery expressions in this position require the semi-join or Apply rewrite; \
761                 move the EXISTS/COUNT subquery to a top-level WHERE predicate"
762                    .to_string(),
763            ))
764        }
765        LogicalExpression::MapProjection { base, entries } => {
766            let physical_entries: Vec<(String, FilterExpression)> = entries
767                .iter()
768                .map(|entry| match entry {
769                    crate::query::plan::MapProjectionEntry::PropertySelector(name) => Ok((
770                        name.clone(),
771                        FilterExpression::Property {
772                            variable: base.clone(),
773                            property: name.clone(),
774                        },
775                    )),
776                    crate::query::plan::MapProjectionEntry::LiteralEntry(key, expr) => {
777                        Ok((key.clone(), convert_filter_expression(expr)?))
778                    }
779                    crate::query::plan::MapProjectionEntry::AllProperties => Ok((
780                        "*".to_string(),
781                        FilterExpression::FunctionCall {
782                            name: "properties".to_string(),
783                            args: vec![FilterExpression::Variable(base.clone())],
784                        },
785                    )),
786                })
787                .collect::<Result<Vec<_>>>()?;
788            Ok(FilterExpression::Map(physical_entries))
789        }
790        LogicalExpression::Reduce {
791            accumulator,
792            initial,
793            variable,
794            list,
795            expression,
796        } => Ok(FilterExpression::Reduce {
797            accumulator: accumulator.clone(),
798            initial: Box::new(convert_filter_expression(initial)?),
799            variable: variable.clone(),
800            list: Box::new(convert_filter_expression(list)?),
801            expression: Box::new(convert_filter_expression(expression)?),
802        }),
803        LogicalExpression::PatternComprehension { projection, .. } => {
804            let proj = convert_filter_expression(projection)?;
805            Ok(FilterExpression::FunctionCall {
806                name: "collect".to_string(),
807                args: vec![proj],
808            })
809        }
810    }
811}
812
813/// Infers the logical type from a value.
814fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
815    use grafeo_common::types::Value;
816    match value {
817        Value::Null => LogicalType::String, // Default type for null
818        Value::Bool(_) => LogicalType::Bool,
819        Value::Int64(_) => LogicalType::Int64,
820        Value::Float64(_) => LogicalType::Float64,
821        Value::String(_) => LogicalType::String,
822        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
823        Value::Timestamp(_) => LogicalType::Timestamp,
824        Value::Date(_) => LogicalType::Date,
825        Value::Time(_) => LogicalType::Time,
826        Value::Duration(_) => LogicalType::Duration,
827        Value::ZonedDatetime(_) => LogicalType::ZonedDatetime,
828        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
829        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
830        Value::Vector(v) => LogicalType::Vector(v.len()),
831        Value::Path { .. } => LogicalType::Any,
832    }
833}
834
835/// Converts an expression to a string for column naming.
836fn expression_to_string(expr: &LogicalExpression) -> String {
837    match expr {
838        LogicalExpression::Variable(name) => name.clone(),
839        LogicalExpression::Property { variable, property } => {
840            format!("{variable}.{property}")
841        }
842        LogicalExpression::Literal(value) => format!("{value:?}"),
843        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
844        _ => "expr".to_string(),
845    }
846}
847
848/// A physical plan ready for execution.
849pub struct PhysicalPlan {
850    /// The root physical operator.
851    pub operator: Box<dyn Operator>,
852    /// Column names for the result.
853    pub columns: Vec<String>,
854    /// Adaptive execution context with cardinality estimates.
855    ///
856    /// When adaptive execution is enabled, this context contains estimated
857    /// cardinalities at various checkpoints in the plan. During execution,
858    /// actual row counts are recorded and compared against estimates.
859    pub adaptive_context: Option<AdaptiveContext>,
860}
861
862impl PhysicalPlan {
863    /// Returns the column names.
864    #[must_use]
865    pub fn columns(&self) -> &[String] {
866        &self.columns
867    }
868
869    /// Consumes the plan and returns the operator.
870    pub fn into_operator(self) -> Box<dyn Operator> {
871        self.operator
872    }
873
874    /// Returns the adaptive context, if adaptive execution is enabled.
875    #[must_use]
876    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
877        self.adaptive_context.as_ref()
878    }
879
880    /// Takes ownership of the adaptive context.
881    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
882        self.adaptive_context.take()
883    }
884}
885
886/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
887#[cfg(feature = "algos")]
888struct StaticResultOperator {
889    rows: Vec<Vec<Value>>,
890    column_indices: Vec<usize>,
891    row_index: usize,
892}
893
894#[cfg(feature = "algos")]
895impl Operator for StaticResultOperator {
896    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
897        use grafeo_core::execution::DataChunk;
898
899        if self.row_index >= self.rows.len() {
900            return Ok(None);
901        }
902
903        let remaining = self.rows.len() - self.row_index;
904        let chunk_rows = remaining.min(1024);
905        let col_count = self.column_indices.len();
906
907        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
908        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
909
910        for row_offset in 0..chunk_rows {
911            let row = &self.rows[self.row_index + row_offset];
912            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
913                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
914                if let Some(col) = chunk.column_mut(col_idx) {
915                    col.push_value(value);
916                }
917            }
918        }
919        chunk.set_count(chunk_rows);
920
921        self.row_index += chunk_rows;
922        Ok(Some(chunk))
923    }
924
925    fn reset(&mut self) {
926        self.row_index = 0;
927    }
928
929    fn name(&self) -> &'static str {
930        "StaticResult"
931    }
932}
933
934/// Evaluates a constant logical expression to a Value.
935///
936/// Only handles literals, unary minus on numeric literals, and simple expressions.
937/// Returns an error for runtime-dependent expressions (variables, property accesses, etc.).
938pub(crate) fn eval_constant_expression(
939    expr: &crate::query::plan::LogicalExpression,
940) -> grafeo_common::utils::error::Result<grafeo_common::types::Value> {
941    use crate::query::plan::LogicalExpression;
942    use grafeo_common::types::Value;
943    use grafeo_common::utils::error::Error;
944
945    match expr {
946        LogicalExpression::Literal(val) => Ok(val.clone()),
947        LogicalExpression::Unary {
948            op: crate::query::plan::UnaryOp::Neg,
949            operand,
950        } => {
951            let val = eval_constant_expression(operand)?;
952            match val {
953                Value::Int64(n) => Ok(Value::Int64(-n)),
954                Value::Float64(f) => Ok(Value::Float64(-f)),
955                _ => Err(Error::Internal("Cannot negate non-numeric value".into())),
956            }
957        }
958        _ => Err(Error::Internal(
959            "Procedure argument must be a constant value".into(),
960        )),
961    }
962}
963
964#[cfg(test)]
965mod tests {
966    use super::*;
967    use crate::query::plan::{
968        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
969        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
970        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
971        SkipOp as LogicalSkipOp, SortKey, SortOp,
972    };
973    use grafeo_common::types::Value;
974    use grafeo_core::graph::GraphStoreMut;
975    use grafeo_core::graph::lpg::LpgStore;
976
977    fn create_test_store() -> Arc<dyn GraphStoreMut> {
978        let store = Arc::new(LpgStore::new());
979        store.create_node(&["Person"]);
980        store.create_node(&["Person"]);
981        store.create_node(&["Company"]);
982        store
983    }
984
985    // ==================== Simple Scan Tests ====================
986
987    #[test]
988    fn test_plan_simple_scan() {
989        let store = create_test_store();
990        let planner = Planner::new(store);
991
992        // MATCH (n:Person) RETURN n
993        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
994            items: vec![ReturnItem {
995                expression: LogicalExpression::Variable("n".to_string()),
996                alias: None,
997            }],
998            distinct: false,
999            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1000                variable: "n".to_string(),
1001                label: Some("Person".to_string()),
1002                input: None,
1003            })),
1004        }));
1005
1006        let physical = planner.plan(&logical).unwrap();
1007        assert_eq!(physical.columns(), &["n"]);
1008    }
1009
1010    #[test]
1011    fn test_plan_scan_without_label() {
1012        let store = create_test_store();
1013        let planner = Planner::new(store);
1014
1015        // MATCH (n) RETURN n
1016        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1017            items: vec![ReturnItem {
1018                expression: LogicalExpression::Variable("n".to_string()),
1019                alias: None,
1020            }],
1021            distinct: false,
1022            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1023                variable: "n".to_string(),
1024                label: None,
1025                input: None,
1026            })),
1027        }));
1028
1029        let physical = planner.plan(&logical).unwrap();
1030        assert_eq!(physical.columns(), &["n"]);
1031    }
1032
1033    #[test]
1034    fn test_plan_return_with_alias() {
1035        let store = create_test_store();
1036        let planner = Planner::new(store);
1037
1038        // MATCH (n:Person) RETURN n AS person
1039        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1040            items: vec![ReturnItem {
1041                expression: LogicalExpression::Variable("n".to_string()),
1042                alias: Some("person".to_string()),
1043            }],
1044            distinct: false,
1045            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1046                variable: "n".to_string(),
1047                label: Some("Person".to_string()),
1048                input: None,
1049            })),
1050        }));
1051
1052        let physical = planner.plan(&logical).unwrap();
1053        assert_eq!(physical.columns(), &["person"]);
1054    }
1055
1056    #[test]
1057    fn test_plan_return_property() {
1058        let store = create_test_store();
1059        let planner = Planner::new(store);
1060
1061        // MATCH (n:Person) RETURN n.name
1062        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1063            items: vec![ReturnItem {
1064                expression: LogicalExpression::Property {
1065                    variable: "n".to_string(),
1066                    property: "name".to_string(),
1067                },
1068                alias: None,
1069            }],
1070            distinct: false,
1071            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1072                variable: "n".to_string(),
1073                label: Some("Person".to_string()),
1074                input: None,
1075            })),
1076        }));
1077
1078        let physical = planner.plan(&logical).unwrap();
1079        assert_eq!(physical.columns(), &["n.name"]);
1080    }
1081
1082    #[test]
1083    fn test_plan_return_literal() {
1084        let store = create_test_store();
1085        let planner = Planner::new(store);
1086
1087        // MATCH (n) RETURN 42 AS answer
1088        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1089            items: vec![ReturnItem {
1090                expression: LogicalExpression::Literal(Value::Int64(42)),
1091                alias: Some("answer".to_string()),
1092            }],
1093            distinct: false,
1094            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1095                variable: "n".to_string(),
1096                label: None,
1097                input: None,
1098            })),
1099        }));
1100
1101        let physical = planner.plan(&logical).unwrap();
1102        assert_eq!(physical.columns(), &["answer"]);
1103    }
1104
1105    // ==================== Filter Tests ====================
1106
1107    #[test]
1108    fn test_plan_filter_equality() {
1109        let store = create_test_store();
1110        let planner = Planner::new(store);
1111
1112        // MATCH (n:Person) WHERE n.age = 30 RETURN n
1113        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1114            items: vec![ReturnItem {
1115                expression: LogicalExpression::Variable("n".to_string()),
1116                alias: None,
1117            }],
1118            distinct: false,
1119            input: Box::new(LogicalOperator::Filter(FilterOp {
1120                predicate: LogicalExpression::Binary {
1121                    left: Box::new(LogicalExpression::Property {
1122                        variable: "n".to_string(),
1123                        property: "age".to_string(),
1124                    }),
1125                    op: BinaryOp::Eq,
1126                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1127                },
1128                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1129                    variable: "n".to_string(),
1130                    label: Some("Person".to_string()),
1131                    input: None,
1132                })),
1133            })),
1134        }));
1135
1136        let physical = planner.plan(&logical).unwrap();
1137        assert_eq!(physical.columns(), &["n"]);
1138    }
1139
1140    #[test]
1141    fn test_plan_filter_compound_and() {
1142        let store = create_test_store();
1143        let planner = Planner::new(store);
1144
1145        // WHERE n.age > 20 AND n.age < 40
1146        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1147            items: vec![ReturnItem {
1148                expression: LogicalExpression::Variable("n".to_string()),
1149                alias: None,
1150            }],
1151            distinct: false,
1152            input: Box::new(LogicalOperator::Filter(FilterOp {
1153                predicate: LogicalExpression::Binary {
1154                    left: Box::new(LogicalExpression::Binary {
1155                        left: Box::new(LogicalExpression::Property {
1156                            variable: "n".to_string(),
1157                            property: "age".to_string(),
1158                        }),
1159                        op: BinaryOp::Gt,
1160                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1161                    }),
1162                    op: BinaryOp::And,
1163                    right: Box::new(LogicalExpression::Binary {
1164                        left: Box::new(LogicalExpression::Property {
1165                            variable: "n".to_string(),
1166                            property: "age".to_string(),
1167                        }),
1168                        op: BinaryOp::Lt,
1169                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1170                    }),
1171                },
1172                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1173                    variable: "n".to_string(),
1174                    label: None,
1175                    input: None,
1176                })),
1177            })),
1178        }));
1179
1180        let physical = planner.plan(&logical).unwrap();
1181        assert_eq!(physical.columns(), &["n"]);
1182    }
1183
1184    #[test]
1185    fn test_plan_filter_unary_not() {
1186        let store = create_test_store();
1187        let planner = Planner::new(store);
1188
1189        // WHERE NOT n.active
1190        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1191            items: vec![ReturnItem {
1192                expression: LogicalExpression::Variable("n".to_string()),
1193                alias: None,
1194            }],
1195            distinct: false,
1196            input: Box::new(LogicalOperator::Filter(FilterOp {
1197                predicate: LogicalExpression::Unary {
1198                    op: UnaryOp::Not,
1199                    operand: Box::new(LogicalExpression::Property {
1200                        variable: "n".to_string(),
1201                        property: "active".to_string(),
1202                    }),
1203                },
1204                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1205                    variable: "n".to_string(),
1206                    label: None,
1207                    input: None,
1208                })),
1209            })),
1210        }));
1211
1212        let physical = planner.plan(&logical).unwrap();
1213        assert_eq!(physical.columns(), &["n"]);
1214    }
1215
1216    #[test]
1217    fn test_plan_filter_is_null() {
1218        let store = create_test_store();
1219        let planner = Planner::new(store);
1220
1221        // WHERE n.email IS NULL
1222        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1223            items: vec![ReturnItem {
1224                expression: LogicalExpression::Variable("n".to_string()),
1225                alias: None,
1226            }],
1227            distinct: false,
1228            input: Box::new(LogicalOperator::Filter(FilterOp {
1229                predicate: LogicalExpression::Unary {
1230                    op: UnaryOp::IsNull,
1231                    operand: Box::new(LogicalExpression::Property {
1232                        variable: "n".to_string(),
1233                        property: "email".to_string(),
1234                    }),
1235                },
1236                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1237                    variable: "n".to_string(),
1238                    label: None,
1239                    input: None,
1240                })),
1241            })),
1242        }));
1243
1244        let physical = planner.plan(&logical).unwrap();
1245        assert_eq!(physical.columns(), &["n"]);
1246    }
1247
1248    #[test]
1249    fn test_plan_filter_function_call() {
1250        let store = create_test_store();
1251        let planner = Planner::new(store);
1252
1253        // WHERE size(n.friends) > 0
1254        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1255            items: vec![ReturnItem {
1256                expression: LogicalExpression::Variable("n".to_string()),
1257                alias: None,
1258            }],
1259            distinct: false,
1260            input: Box::new(LogicalOperator::Filter(FilterOp {
1261                predicate: LogicalExpression::Binary {
1262                    left: Box::new(LogicalExpression::FunctionCall {
1263                        name: "size".to_string(),
1264                        args: vec![LogicalExpression::Property {
1265                            variable: "n".to_string(),
1266                            property: "friends".to_string(),
1267                        }],
1268                        distinct: false,
1269                    }),
1270                    op: BinaryOp::Gt,
1271                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1272                },
1273                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1274                    variable: "n".to_string(),
1275                    label: None,
1276                    input: None,
1277                })),
1278            })),
1279        }));
1280
1281        let physical = planner.plan(&logical).unwrap();
1282        assert_eq!(physical.columns(), &["n"]);
1283    }
1284
1285    // ==================== Expand Tests ====================
1286
1287    #[test]
1288    fn test_plan_expand_outgoing() {
1289        let store = create_test_store();
1290        let planner = Planner::new(store);
1291
1292        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1293        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1294            items: vec![
1295                ReturnItem {
1296                    expression: LogicalExpression::Variable("a".to_string()),
1297                    alias: None,
1298                },
1299                ReturnItem {
1300                    expression: LogicalExpression::Variable("b".to_string()),
1301                    alias: None,
1302                },
1303            ],
1304            distinct: false,
1305            input: Box::new(LogicalOperator::Expand(ExpandOp {
1306                from_variable: "a".to_string(),
1307                to_variable: "b".to_string(),
1308                edge_variable: None,
1309                direction: ExpandDirection::Outgoing,
1310                edge_types: vec!["KNOWS".to_string()],
1311                min_hops: 1,
1312                max_hops: Some(1),
1313                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1314                    variable: "a".to_string(),
1315                    label: Some("Person".to_string()),
1316                    input: None,
1317                })),
1318                path_alias: None,
1319                path_mode: PathMode::Walk,
1320            })),
1321        }));
1322
1323        let physical = planner.plan(&logical).unwrap();
1324        // The return should have columns [a, b]
1325        assert!(physical.columns().contains(&"a".to_string()));
1326        assert!(physical.columns().contains(&"b".to_string()));
1327    }
1328
1329    #[test]
1330    fn test_plan_expand_with_edge_variable() {
1331        let store = create_test_store();
1332        let planner = Planner::new(store);
1333
1334        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1335        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1336            items: vec![
1337                ReturnItem {
1338                    expression: LogicalExpression::Variable("a".to_string()),
1339                    alias: None,
1340                },
1341                ReturnItem {
1342                    expression: LogicalExpression::Variable("r".to_string()),
1343                    alias: None,
1344                },
1345                ReturnItem {
1346                    expression: LogicalExpression::Variable("b".to_string()),
1347                    alias: None,
1348                },
1349            ],
1350            distinct: false,
1351            input: Box::new(LogicalOperator::Expand(ExpandOp {
1352                from_variable: "a".to_string(),
1353                to_variable: "b".to_string(),
1354                edge_variable: Some("r".to_string()),
1355                direction: ExpandDirection::Outgoing,
1356                edge_types: vec!["KNOWS".to_string()],
1357                min_hops: 1,
1358                max_hops: Some(1),
1359                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1360                    variable: "a".to_string(),
1361                    label: None,
1362                    input: None,
1363                })),
1364                path_alias: None,
1365                path_mode: PathMode::Walk,
1366            })),
1367        }));
1368
1369        let physical = planner.plan(&logical).unwrap();
1370        assert!(physical.columns().contains(&"a".to_string()));
1371        assert!(physical.columns().contains(&"r".to_string()));
1372        assert!(physical.columns().contains(&"b".to_string()));
1373    }
1374
1375    // ==================== Limit/Skip/Sort Tests ====================
1376
1377    #[test]
1378    fn test_plan_limit() {
1379        let store = create_test_store();
1380        let planner = Planner::new(store);
1381
1382        // MATCH (n) RETURN n LIMIT 10
1383        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1384            items: vec![ReturnItem {
1385                expression: LogicalExpression::Variable("n".to_string()),
1386                alias: None,
1387            }],
1388            distinct: false,
1389            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1390                count: 10,
1391                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1392                    variable: "n".to_string(),
1393                    label: None,
1394                    input: None,
1395                })),
1396            })),
1397        }));
1398
1399        let physical = planner.plan(&logical).unwrap();
1400        assert_eq!(physical.columns(), &["n"]);
1401    }
1402
1403    #[test]
1404    fn test_plan_skip() {
1405        let store = create_test_store();
1406        let planner = Planner::new(store);
1407
1408        // MATCH (n) RETURN n SKIP 5
1409        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1410            items: vec![ReturnItem {
1411                expression: LogicalExpression::Variable("n".to_string()),
1412                alias: None,
1413            }],
1414            distinct: false,
1415            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1416                count: 5,
1417                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1418                    variable: "n".to_string(),
1419                    label: None,
1420                    input: None,
1421                })),
1422            })),
1423        }));
1424
1425        let physical = planner.plan(&logical).unwrap();
1426        assert_eq!(physical.columns(), &["n"]);
1427    }
1428
1429    #[test]
1430    fn test_plan_sort() {
1431        let store = create_test_store();
1432        let planner = Planner::new(store);
1433
1434        // MATCH (n) RETURN n ORDER BY n.name ASC
1435        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1436            items: vec![ReturnItem {
1437                expression: LogicalExpression::Variable("n".to_string()),
1438                alias: None,
1439            }],
1440            distinct: false,
1441            input: Box::new(LogicalOperator::Sort(SortOp {
1442                keys: vec![SortKey {
1443                    expression: LogicalExpression::Variable("n".to_string()),
1444                    order: SortOrder::Ascending,
1445                }],
1446                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1447                    variable: "n".to_string(),
1448                    label: None,
1449                    input: None,
1450                })),
1451            })),
1452        }));
1453
1454        let physical = planner.plan(&logical).unwrap();
1455        assert_eq!(physical.columns(), &["n"]);
1456    }
1457
1458    #[test]
1459    fn test_plan_sort_descending() {
1460        let store = create_test_store();
1461        let planner = Planner::new(store);
1462
1463        // ORDER BY n DESC
1464        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1465            items: vec![ReturnItem {
1466                expression: LogicalExpression::Variable("n".to_string()),
1467                alias: None,
1468            }],
1469            distinct: false,
1470            input: Box::new(LogicalOperator::Sort(SortOp {
1471                keys: vec![SortKey {
1472                    expression: LogicalExpression::Variable("n".to_string()),
1473                    order: SortOrder::Descending,
1474                }],
1475                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1476                    variable: "n".to_string(),
1477                    label: None,
1478                    input: None,
1479                })),
1480            })),
1481        }));
1482
1483        let physical = planner.plan(&logical).unwrap();
1484        assert_eq!(physical.columns(), &["n"]);
1485    }
1486
1487    #[test]
1488    fn test_plan_distinct() {
1489        let store = create_test_store();
1490        let planner = Planner::new(store);
1491
1492        // MATCH (n) RETURN DISTINCT n
1493        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1494            items: vec![ReturnItem {
1495                expression: LogicalExpression::Variable("n".to_string()),
1496                alias: None,
1497            }],
1498            distinct: false,
1499            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1500                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1501                    variable: "n".to_string(),
1502                    label: None,
1503                    input: None,
1504                })),
1505                columns: None,
1506            })),
1507        }));
1508
1509        let physical = planner.plan(&logical).unwrap();
1510        assert_eq!(physical.columns(), &["n"]);
1511    }
1512
1513    #[test]
1514    fn test_plan_distinct_with_columns() {
1515        let store = create_test_store();
1516        let planner = Planner::new(store);
1517
1518        // DISTINCT on specific columns (column-specific dedup)
1519        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1520            items: vec![ReturnItem {
1521                expression: LogicalExpression::Variable("n".to_string()),
1522                alias: None,
1523            }],
1524            distinct: false,
1525            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1526                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1527                    variable: "n".to_string(),
1528                    label: None,
1529                    input: None,
1530                })),
1531                columns: Some(vec!["n".to_string()]),
1532            })),
1533        }));
1534
1535        let physical = planner.plan(&logical).unwrap();
1536        assert_eq!(physical.columns(), &["n"]);
1537    }
1538
1539    #[test]
1540    fn test_plan_distinct_with_nonexistent_columns() {
1541        let store = create_test_store();
1542        let planner = Planner::new(store);
1543
1544        // When distinct columns don't match any output columns,
1545        // it falls back to full-row distinct.
1546        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1547            items: vec![ReturnItem {
1548                expression: LogicalExpression::Variable("n".to_string()),
1549                alias: None,
1550            }],
1551            distinct: false,
1552            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1553                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1554                    variable: "n".to_string(),
1555                    label: None,
1556                    input: None,
1557                })),
1558                columns: Some(vec!["nonexistent".to_string()]),
1559            })),
1560        }));
1561
1562        let physical = planner.plan(&logical).unwrap();
1563        assert_eq!(physical.columns(), &["n"]);
1564    }
1565
1566    // ==================== Aggregate Tests ====================
1567
1568    #[test]
1569    fn test_plan_aggregate_count() {
1570        let store = create_test_store();
1571        let planner = Planner::new(store);
1572
1573        // MATCH (n) RETURN count(n)
1574        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1575            items: vec![ReturnItem {
1576                expression: LogicalExpression::Variable("cnt".to_string()),
1577                alias: None,
1578            }],
1579            distinct: false,
1580            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1581                group_by: vec![],
1582                aggregates: vec![LogicalAggregateExpr {
1583                    function: LogicalAggregateFunction::Count,
1584                    expression: Some(LogicalExpression::Variable("n".to_string())),
1585                    distinct: false,
1586                    alias: Some("cnt".to_string()),
1587                    percentile: None,
1588                }],
1589                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1590                    variable: "n".to_string(),
1591                    label: None,
1592                    input: None,
1593                })),
1594                having: None,
1595            })),
1596        }));
1597
1598        let physical = planner.plan(&logical).unwrap();
1599        assert!(physical.columns().contains(&"cnt".to_string()));
1600    }
1601
1602    #[test]
1603    fn test_plan_aggregate_with_group_by() {
1604        let store = create_test_store();
1605        let planner = Planner::new(store);
1606
1607        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1608        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1609            group_by: vec![LogicalExpression::Property {
1610                variable: "n".to_string(),
1611                property: "city".to_string(),
1612            }],
1613            aggregates: vec![LogicalAggregateExpr {
1614                function: LogicalAggregateFunction::Count,
1615                expression: Some(LogicalExpression::Variable("n".to_string())),
1616                distinct: false,
1617                alias: Some("cnt".to_string()),
1618                percentile: None,
1619            }],
1620            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1621                variable: "n".to_string(),
1622                label: Some("Person".to_string()),
1623                input: None,
1624            })),
1625            having: None,
1626        }));
1627
1628        let physical = planner.plan(&logical).unwrap();
1629        assert_eq!(physical.columns().len(), 2);
1630    }
1631
1632    #[test]
1633    fn test_plan_aggregate_sum() {
1634        let store = create_test_store();
1635        let planner = Planner::new(store);
1636
1637        // SUM(n.value)
1638        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1639            group_by: vec![],
1640            aggregates: vec![LogicalAggregateExpr {
1641                function: LogicalAggregateFunction::Sum,
1642                expression: Some(LogicalExpression::Property {
1643                    variable: "n".to_string(),
1644                    property: "value".to_string(),
1645                }),
1646                distinct: false,
1647                alias: Some("total".to_string()),
1648                percentile: None,
1649            }],
1650            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1651                variable: "n".to_string(),
1652                label: None,
1653                input: None,
1654            })),
1655            having: None,
1656        }));
1657
1658        let physical = planner.plan(&logical).unwrap();
1659        assert!(physical.columns().contains(&"total".to_string()));
1660    }
1661
1662    #[test]
1663    fn test_plan_aggregate_avg() {
1664        let store = create_test_store();
1665        let planner = Planner::new(store);
1666
1667        // AVG(n.score)
1668        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1669            group_by: vec![],
1670            aggregates: vec![LogicalAggregateExpr {
1671                function: LogicalAggregateFunction::Avg,
1672                expression: Some(LogicalExpression::Property {
1673                    variable: "n".to_string(),
1674                    property: "score".to_string(),
1675                }),
1676                distinct: false,
1677                alias: Some("average".to_string()),
1678                percentile: None,
1679            }],
1680            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1681                variable: "n".to_string(),
1682                label: None,
1683                input: None,
1684            })),
1685            having: None,
1686        }));
1687
1688        let physical = planner.plan(&logical).unwrap();
1689        assert!(physical.columns().contains(&"average".to_string()));
1690    }
1691
1692    #[test]
1693    fn test_plan_aggregate_min_max() {
1694        let store = create_test_store();
1695        let planner = Planner::new(store);
1696
1697        // MIN(n.age), MAX(n.age)
1698        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1699            group_by: vec![],
1700            aggregates: vec![
1701                LogicalAggregateExpr {
1702                    function: LogicalAggregateFunction::Min,
1703                    expression: Some(LogicalExpression::Property {
1704                        variable: "n".to_string(),
1705                        property: "age".to_string(),
1706                    }),
1707                    distinct: false,
1708                    alias: Some("youngest".to_string()),
1709                    percentile: None,
1710                },
1711                LogicalAggregateExpr {
1712                    function: LogicalAggregateFunction::Max,
1713                    expression: Some(LogicalExpression::Property {
1714                        variable: "n".to_string(),
1715                        property: "age".to_string(),
1716                    }),
1717                    distinct: false,
1718                    alias: Some("oldest".to_string()),
1719                    percentile: None,
1720                },
1721            ],
1722            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1723                variable: "n".to_string(),
1724                label: None,
1725                input: None,
1726            })),
1727            having: None,
1728        }));
1729
1730        let physical = planner.plan(&logical).unwrap();
1731        assert!(physical.columns().contains(&"youngest".to_string()));
1732        assert!(physical.columns().contains(&"oldest".to_string()));
1733    }
1734
1735    // ==================== Join Tests ====================
1736
1737    #[test]
1738    fn test_plan_inner_join() {
1739        let store = create_test_store();
1740        let planner = Planner::new(store);
1741
1742        // Inner join between two scans
1743        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1744            items: vec![
1745                ReturnItem {
1746                    expression: LogicalExpression::Variable("a".to_string()),
1747                    alias: None,
1748                },
1749                ReturnItem {
1750                    expression: LogicalExpression::Variable("b".to_string()),
1751                    alias: None,
1752                },
1753            ],
1754            distinct: false,
1755            input: Box::new(LogicalOperator::Join(JoinOp {
1756                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1757                    variable: "a".to_string(),
1758                    label: Some("Person".to_string()),
1759                    input: None,
1760                })),
1761                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1762                    variable: "b".to_string(),
1763                    label: Some("Company".to_string()),
1764                    input: None,
1765                })),
1766                join_type: JoinType::Inner,
1767                conditions: vec![JoinCondition {
1768                    left: LogicalExpression::Variable("a".to_string()),
1769                    right: LogicalExpression::Variable("b".to_string()),
1770                }],
1771            })),
1772        }));
1773
1774        let physical = planner.plan(&logical).unwrap();
1775        assert!(physical.columns().contains(&"a".to_string()));
1776        assert!(physical.columns().contains(&"b".to_string()));
1777    }
1778
1779    #[test]
1780    fn test_plan_cross_join() {
1781        let store = create_test_store();
1782        let planner = Planner::new(store);
1783
1784        // Cross join (no conditions)
1785        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1786            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1787                variable: "a".to_string(),
1788                label: None,
1789                input: None,
1790            })),
1791            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1792                variable: "b".to_string(),
1793                label: None,
1794                input: None,
1795            })),
1796            join_type: JoinType::Cross,
1797            conditions: vec![],
1798        }));
1799
1800        let physical = planner.plan(&logical).unwrap();
1801        assert_eq!(physical.columns().len(), 2);
1802    }
1803
1804    #[test]
1805    fn test_plan_left_join() {
1806        let store = create_test_store();
1807        let planner = Planner::new(store);
1808
1809        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1810            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1811                variable: "a".to_string(),
1812                label: None,
1813                input: None,
1814            })),
1815            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816                variable: "b".to_string(),
1817                label: None,
1818                input: None,
1819            })),
1820            join_type: JoinType::Left,
1821            conditions: vec![],
1822        }));
1823
1824        let physical = planner.plan(&logical).unwrap();
1825        assert_eq!(physical.columns().len(), 2);
1826    }
1827
1828    // ==================== Mutation Tests ====================
1829
1830    #[test]
1831    fn test_plan_create_node() {
1832        let store = create_test_store();
1833        let planner = Planner::new(store);
1834
1835        // CREATE (n:Person {name: 'Alice'})
1836        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1837            variable: "n".to_string(),
1838            labels: vec!["Person".to_string()],
1839            properties: vec![(
1840                "name".to_string(),
1841                LogicalExpression::Literal(Value::String("Alice".into())),
1842            )],
1843            input: None,
1844        }));
1845
1846        let physical = planner.plan(&logical).unwrap();
1847        assert!(physical.columns().contains(&"n".to_string()));
1848    }
1849
1850    #[test]
1851    fn test_plan_create_edge() {
1852        let store = create_test_store();
1853        let planner = Planner::new(store);
1854
1855        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1856        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1857            variable: Some("r".to_string()),
1858            from_variable: "a".to_string(),
1859            to_variable: "b".to_string(),
1860            edge_type: "KNOWS".to_string(),
1861            properties: vec![],
1862            input: Box::new(LogicalOperator::Join(JoinOp {
1863                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1864                    variable: "a".to_string(),
1865                    label: None,
1866                    input: None,
1867                })),
1868                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1869                    variable: "b".to_string(),
1870                    label: None,
1871                    input: None,
1872                })),
1873                join_type: JoinType::Cross,
1874                conditions: vec![],
1875            })),
1876        }));
1877
1878        let physical = planner.plan(&logical).unwrap();
1879        assert!(physical.columns().contains(&"r".to_string()));
1880    }
1881
1882    #[test]
1883    fn test_plan_delete_node() {
1884        let store = create_test_store();
1885        let planner = Planner::new(store);
1886
1887        // MATCH (n) DELETE n
1888        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1889            variable: "n".to_string(),
1890            detach: false,
1891            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1892                variable: "n".to_string(),
1893                label: None,
1894                input: None,
1895            })),
1896        }));
1897
1898        let physical = planner.plan(&logical).unwrap();
1899        assert!(physical.columns().contains(&"deleted_count".to_string()));
1900    }
1901
1902    // ==================== Error Cases ====================
1903
1904    #[test]
1905    fn test_plan_empty_errors() {
1906        let store = create_test_store();
1907        let planner = Planner::new(store);
1908
1909        let logical = LogicalPlan::new(LogicalOperator::Empty);
1910        let result = planner.plan(&logical);
1911        assert!(result.is_err());
1912    }
1913
1914    #[test]
1915    fn test_plan_missing_variable_in_return() {
1916        let store = create_test_store();
1917        let planner = Planner::new(store);
1918
1919        // Return variable that doesn't exist in input
1920        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1921            items: vec![ReturnItem {
1922                expression: LogicalExpression::Variable("missing".to_string()),
1923                alias: None,
1924            }],
1925            distinct: false,
1926            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1927                variable: "n".to_string(),
1928                label: None,
1929                input: None,
1930            })),
1931        }));
1932
1933        let result = planner.plan(&logical);
1934        assert!(result.is_err());
1935    }
1936
1937    // ==================== Helper Function Tests ====================
1938
1939    #[test]
1940    fn test_convert_binary_ops() {
1941        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1942        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1943        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1944        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1945        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1946        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1947        assert!(convert_binary_op(BinaryOp::And).is_ok());
1948        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1949        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1950        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1951        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1952        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1953    }
1954
1955    #[test]
1956    fn test_convert_unary_ops() {
1957        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1958        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1959        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1960        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1961    }
1962
1963    #[test]
1964    fn test_convert_aggregate_functions() {
1965        assert!(matches!(
1966            convert_aggregate_function(LogicalAggregateFunction::Count),
1967            PhysicalAggregateFunction::Count
1968        ));
1969        assert!(matches!(
1970            convert_aggregate_function(LogicalAggregateFunction::Sum),
1971            PhysicalAggregateFunction::Sum
1972        ));
1973        assert!(matches!(
1974            convert_aggregate_function(LogicalAggregateFunction::Avg),
1975            PhysicalAggregateFunction::Avg
1976        ));
1977        assert!(matches!(
1978            convert_aggregate_function(LogicalAggregateFunction::Min),
1979            PhysicalAggregateFunction::Min
1980        ));
1981        assert!(matches!(
1982            convert_aggregate_function(LogicalAggregateFunction::Max),
1983            PhysicalAggregateFunction::Max
1984        ));
1985    }
1986
1987    #[test]
1988    fn test_planner_accessors() {
1989        let store = create_test_store();
1990        let planner = Planner::new(Arc::clone(&store));
1991
1992        assert!(planner.tx_id().is_none());
1993        assert!(planner.tx_manager().is_none());
1994        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1995    }
1996
1997    #[test]
1998    fn test_physical_plan_accessors() {
1999        let store = create_test_store();
2000        let planner = Planner::new(store);
2001
2002        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2003            variable: "n".to_string(),
2004            label: None,
2005            input: None,
2006        }));
2007
2008        let physical = planner.plan(&logical).unwrap();
2009        assert_eq!(physical.columns(), &["n"]);
2010
2011        // Test into_operator
2012        let _ = physical.into_operator();
2013    }
2014
2015    // ==================== Adaptive Planning Tests ====================
2016
2017    #[test]
2018    fn test_plan_adaptive_with_scan() {
2019        let store = create_test_store();
2020        let planner = Planner::new(store);
2021
2022        // MATCH (n:Person) RETURN n
2023        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2024            items: vec![ReturnItem {
2025                expression: LogicalExpression::Variable("n".to_string()),
2026                alias: None,
2027            }],
2028            distinct: false,
2029            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2030                variable: "n".to_string(),
2031                label: Some("Person".to_string()),
2032                input: None,
2033            })),
2034        }));
2035
2036        let physical = planner.plan_adaptive(&logical).unwrap();
2037        assert_eq!(physical.columns(), &["n"]);
2038        // Should have adaptive context with estimates
2039        assert!(physical.adaptive_context.is_some());
2040    }
2041
2042    #[test]
2043    fn test_plan_adaptive_with_filter() {
2044        let store = create_test_store();
2045        let planner = Planner::new(store);
2046
2047        // MATCH (n) WHERE n.age > 30 RETURN n
2048        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2049            items: vec![ReturnItem {
2050                expression: LogicalExpression::Variable("n".to_string()),
2051                alias: None,
2052            }],
2053            distinct: false,
2054            input: Box::new(LogicalOperator::Filter(FilterOp {
2055                predicate: LogicalExpression::Binary {
2056                    left: Box::new(LogicalExpression::Property {
2057                        variable: "n".to_string(),
2058                        property: "age".to_string(),
2059                    }),
2060                    op: BinaryOp::Gt,
2061                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2062                },
2063                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2064                    variable: "n".to_string(),
2065                    label: None,
2066                    input: None,
2067                })),
2068            })),
2069        }));
2070
2071        let physical = planner.plan_adaptive(&logical).unwrap();
2072        assert!(physical.adaptive_context.is_some());
2073    }
2074
2075    #[test]
2076    fn test_plan_adaptive_with_expand() {
2077        let store = create_test_store();
2078        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2079
2080        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
2081        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2082            items: vec![
2083                ReturnItem {
2084                    expression: LogicalExpression::Variable("a".to_string()),
2085                    alias: None,
2086                },
2087                ReturnItem {
2088                    expression: LogicalExpression::Variable("b".to_string()),
2089                    alias: None,
2090                },
2091            ],
2092            distinct: false,
2093            input: Box::new(LogicalOperator::Expand(ExpandOp {
2094                from_variable: "a".to_string(),
2095                to_variable: "b".to_string(),
2096                edge_variable: None,
2097                direction: ExpandDirection::Outgoing,
2098                edge_types: vec!["KNOWS".to_string()],
2099                min_hops: 1,
2100                max_hops: Some(1),
2101                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2102                    variable: "a".to_string(),
2103                    label: None,
2104                    input: None,
2105                })),
2106                path_alias: None,
2107                path_mode: PathMode::Walk,
2108            })),
2109        }));
2110
2111        let physical = planner.plan_adaptive(&logical).unwrap();
2112        assert!(physical.adaptive_context.is_some());
2113    }
2114
2115    #[test]
2116    fn test_plan_adaptive_with_join() {
2117        let store = create_test_store();
2118        let planner = Planner::new(store);
2119
2120        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2121            items: vec![
2122                ReturnItem {
2123                    expression: LogicalExpression::Variable("a".to_string()),
2124                    alias: None,
2125                },
2126                ReturnItem {
2127                    expression: LogicalExpression::Variable("b".to_string()),
2128                    alias: None,
2129                },
2130            ],
2131            distinct: false,
2132            input: Box::new(LogicalOperator::Join(JoinOp {
2133                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2134                    variable: "a".to_string(),
2135                    label: None,
2136                    input: None,
2137                })),
2138                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2139                    variable: "b".to_string(),
2140                    label: None,
2141                    input: None,
2142                })),
2143                join_type: JoinType::Cross,
2144                conditions: vec![],
2145            })),
2146        }));
2147
2148        let physical = planner.plan_adaptive(&logical).unwrap();
2149        assert!(physical.adaptive_context.is_some());
2150    }
2151
2152    #[test]
2153    fn test_plan_adaptive_with_aggregate() {
2154        let store = create_test_store();
2155        let planner = Planner::new(store);
2156
2157        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2158            group_by: vec![],
2159            aggregates: vec![LogicalAggregateExpr {
2160                function: LogicalAggregateFunction::Count,
2161                expression: Some(LogicalExpression::Variable("n".to_string())),
2162                distinct: false,
2163                alias: Some("cnt".to_string()),
2164                percentile: None,
2165            }],
2166            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2167                variable: "n".to_string(),
2168                label: None,
2169                input: None,
2170            })),
2171            having: None,
2172        }));
2173
2174        let physical = planner.plan_adaptive(&logical).unwrap();
2175        assert!(physical.adaptive_context.is_some());
2176    }
2177
2178    #[test]
2179    fn test_plan_adaptive_with_distinct() {
2180        let store = create_test_store();
2181        let planner = Planner::new(store);
2182
2183        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2184            items: vec![ReturnItem {
2185                expression: LogicalExpression::Variable("n".to_string()),
2186                alias: None,
2187            }],
2188            distinct: false,
2189            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2190                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2191                    variable: "n".to_string(),
2192                    label: None,
2193                    input: None,
2194                })),
2195                columns: None,
2196            })),
2197        }));
2198
2199        let physical = planner.plan_adaptive(&logical).unwrap();
2200        assert!(physical.adaptive_context.is_some());
2201    }
2202
2203    #[test]
2204    fn test_plan_adaptive_with_limit() {
2205        let store = create_test_store();
2206        let planner = Planner::new(store);
2207
2208        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2209            items: vec![ReturnItem {
2210                expression: LogicalExpression::Variable("n".to_string()),
2211                alias: None,
2212            }],
2213            distinct: false,
2214            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2215                count: 10,
2216                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2217                    variable: "n".to_string(),
2218                    label: None,
2219                    input: None,
2220                })),
2221            })),
2222        }));
2223
2224        let physical = planner.plan_adaptive(&logical).unwrap();
2225        assert!(physical.adaptive_context.is_some());
2226    }
2227
2228    #[test]
2229    fn test_plan_adaptive_with_skip() {
2230        let store = create_test_store();
2231        let planner = Planner::new(store);
2232
2233        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2234            items: vec![ReturnItem {
2235                expression: LogicalExpression::Variable("n".to_string()),
2236                alias: None,
2237            }],
2238            distinct: false,
2239            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2240                count: 5,
2241                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2242                    variable: "n".to_string(),
2243                    label: None,
2244                    input: None,
2245                })),
2246            })),
2247        }));
2248
2249        let physical = planner.plan_adaptive(&logical).unwrap();
2250        assert!(physical.adaptive_context.is_some());
2251    }
2252
2253    #[test]
2254    fn test_plan_adaptive_with_sort() {
2255        let store = create_test_store();
2256        let planner = Planner::new(store);
2257
2258        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2259            items: vec![ReturnItem {
2260                expression: LogicalExpression::Variable("n".to_string()),
2261                alias: None,
2262            }],
2263            distinct: false,
2264            input: Box::new(LogicalOperator::Sort(SortOp {
2265                keys: vec![SortKey {
2266                    expression: LogicalExpression::Variable("n".to_string()),
2267                    order: SortOrder::Ascending,
2268                }],
2269                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2270                    variable: "n".to_string(),
2271                    label: None,
2272                    input: None,
2273                })),
2274            })),
2275        }));
2276
2277        let physical = planner.plan_adaptive(&logical).unwrap();
2278        assert!(physical.adaptive_context.is_some());
2279    }
2280
2281    #[test]
2282    fn test_plan_adaptive_with_union() {
2283        let store = create_test_store();
2284        let planner = Planner::new(store);
2285
2286        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2287            items: vec![ReturnItem {
2288                expression: LogicalExpression::Variable("n".to_string()),
2289                alias: None,
2290            }],
2291            distinct: false,
2292            input: Box::new(LogicalOperator::Union(UnionOp {
2293                inputs: vec![
2294                    LogicalOperator::NodeScan(NodeScanOp {
2295                        variable: "n".to_string(),
2296                        label: Some("Person".to_string()),
2297                        input: None,
2298                    }),
2299                    LogicalOperator::NodeScan(NodeScanOp {
2300                        variable: "n".to_string(),
2301                        label: Some("Company".to_string()),
2302                        input: None,
2303                    }),
2304                ],
2305            })),
2306        }));
2307
2308        let physical = planner.plan_adaptive(&logical).unwrap();
2309        assert!(physical.adaptive_context.is_some());
2310    }
2311
2312    // ==================== Variable Length Path Tests ====================
2313
2314    #[test]
2315    fn test_plan_expand_variable_length() {
2316        let store = create_test_store();
2317        let planner = Planner::new(store);
2318
2319        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2320        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2321            items: vec![
2322                ReturnItem {
2323                    expression: LogicalExpression::Variable("a".to_string()),
2324                    alias: None,
2325                },
2326                ReturnItem {
2327                    expression: LogicalExpression::Variable("b".to_string()),
2328                    alias: None,
2329                },
2330            ],
2331            distinct: false,
2332            input: Box::new(LogicalOperator::Expand(ExpandOp {
2333                from_variable: "a".to_string(),
2334                to_variable: "b".to_string(),
2335                edge_variable: None,
2336                direction: ExpandDirection::Outgoing,
2337                edge_types: vec!["KNOWS".to_string()],
2338                min_hops: 1,
2339                max_hops: Some(3),
2340                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2341                    variable: "a".to_string(),
2342                    label: None,
2343                    input: None,
2344                })),
2345                path_alias: None,
2346                path_mode: PathMode::Walk,
2347            })),
2348        }));
2349
2350        let physical = planner.plan(&logical).unwrap();
2351        assert!(physical.columns().contains(&"a".to_string()));
2352        assert!(physical.columns().contains(&"b".to_string()));
2353    }
2354
2355    #[test]
2356    fn test_plan_expand_with_path_alias() {
2357        let store = create_test_store();
2358        let planner = Planner::new(store);
2359
2360        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2361        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2362            items: vec![
2363                ReturnItem {
2364                    expression: LogicalExpression::Variable("a".to_string()),
2365                    alias: None,
2366                },
2367                ReturnItem {
2368                    expression: LogicalExpression::Variable("b".to_string()),
2369                    alias: None,
2370                },
2371            ],
2372            distinct: false,
2373            input: Box::new(LogicalOperator::Expand(ExpandOp {
2374                from_variable: "a".to_string(),
2375                to_variable: "b".to_string(),
2376                edge_variable: None,
2377                direction: ExpandDirection::Outgoing,
2378                edge_types: vec!["KNOWS".to_string()],
2379                min_hops: 1,
2380                max_hops: Some(3),
2381                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2382                    variable: "a".to_string(),
2383                    label: None,
2384                    input: None,
2385                })),
2386                path_alias: Some("p".to_string()),
2387                path_mode: PathMode::Walk,
2388            })),
2389        }));
2390
2391        let physical = planner.plan(&logical).unwrap();
2392        // Verify plan was created successfully with expected output columns
2393        assert!(physical.columns().contains(&"a".to_string()));
2394        assert!(physical.columns().contains(&"b".to_string()));
2395    }
2396
2397    #[test]
2398    fn test_plan_expand_incoming() {
2399        let store = create_test_store();
2400        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2401
2402        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2403        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2404            items: vec![
2405                ReturnItem {
2406                    expression: LogicalExpression::Variable("a".to_string()),
2407                    alias: None,
2408                },
2409                ReturnItem {
2410                    expression: LogicalExpression::Variable("b".to_string()),
2411                    alias: None,
2412                },
2413            ],
2414            distinct: false,
2415            input: Box::new(LogicalOperator::Expand(ExpandOp {
2416                from_variable: "a".to_string(),
2417                to_variable: "b".to_string(),
2418                edge_variable: None,
2419                direction: ExpandDirection::Incoming,
2420                edge_types: vec!["KNOWS".to_string()],
2421                min_hops: 1,
2422                max_hops: Some(1),
2423                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2424                    variable: "a".to_string(),
2425                    label: None,
2426                    input: None,
2427                })),
2428                path_alias: None,
2429                path_mode: PathMode::Walk,
2430            })),
2431        }));
2432
2433        let physical = planner.plan(&logical).unwrap();
2434        assert!(physical.columns().contains(&"a".to_string()));
2435        assert!(physical.columns().contains(&"b".to_string()));
2436    }
2437
2438    #[test]
2439    fn test_plan_expand_both_directions() {
2440        let store = create_test_store();
2441        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2442
2443        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2444        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2445            items: vec![
2446                ReturnItem {
2447                    expression: LogicalExpression::Variable("a".to_string()),
2448                    alias: None,
2449                },
2450                ReturnItem {
2451                    expression: LogicalExpression::Variable("b".to_string()),
2452                    alias: None,
2453                },
2454            ],
2455            distinct: false,
2456            input: Box::new(LogicalOperator::Expand(ExpandOp {
2457                from_variable: "a".to_string(),
2458                to_variable: "b".to_string(),
2459                edge_variable: None,
2460                direction: ExpandDirection::Both,
2461                edge_types: vec!["KNOWS".to_string()],
2462                min_hops: 1,
2463                max_hops: Some(1),
2464                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2465                    variable: "a".to_string(),
2466                    label: None,
2467                    input: None,
2468                })),
2469                path_alias: None,
2470                path_mode: PathMode::Walk,
2471            })),
2472        }));
2473
2474        let physical = planner.plan(&logical).unwrap();
2475        assert!(physical.columns().contains(&"a".to_string()));
2476        assert!(physical.columns().contains(&"b".to_string()));
2477    }
2478
2479    // ==================== With Context Tests ====================
2480
2481    #[test]
2482    fn test_planner_with_context() {
2483        use crate::transaction::TransactionManager;
2484
2485        let store = create_test_store();
2486        let tx_manager = Arc::new(TransactionManager::new());
2487        let tx_id = tx_manager.begin();
2488        let epoch = tx_manager.current_epoch();
2489
2490        let planner = Planner::with_context(
2491            Arc::clone(&store),
2492            Arc::clone(&tx_manager),
2493            Some(tx_id),
2494            epoch,
2495        );
2496
2497        assert_eq!(planner.tx_id(), Some(tx_id));
2498        assert!(planner.tx_manager().is_some());
2499        assert_eq!(planner.viewing_epoch(), epoch);
2500    }
2501
2502    #[test]
2503    fn test_planner_with_factorized_execution_disabled() {
2504        let store = create_test_store();
2505        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2506
2507        // Two consecutive expands - should NOT use factorized execution
2508        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2509            items: vec![
2510                ReturnItem {
2511                    expression: LogicalExpression::Variable("a".to_string()),
2512                    alias: None,
2513                },
2514                ReturnItem {
2515                    expression: LogicalExpression::Variable("c".to_string()),
2516                    alias: None,
2517                },
2518            ],
2519            distinct: false,
2520            input: Box::new(LogicalOperator::Expand(ExpandOp {
2521                from_variable: "b".to_string(),
2522                to_variable: "c".to_string(),
2523                edge_variable: None,
2524                direction: ExpandDirection::Outgoing,
2525                edge_types: vec![],
2526                min_hops: 1,
2527                max_hops: Some(1),
2528                input: Box::new(LogicalOperator::Expand(ExpandOp {
2529                    from_variable: "a".to_string(),
2530                    to_variable: "b".to_string(),
2531                    edge_variable: None,
2532                    direction: ExpandDirection::Outgoing,
2533                    edge_types: vec![],
2534                    min_hops: 1,
2535                    max_hops: Some(1),
2536                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2537                        variable: "a".to_string(),
2538                        label: None,
2539                        input: None,
2540                    })),
2541                    path_alias: None,
2542                    path_mode: PathMode::Walk,
2543                })),
2544                path_alias: None,
2545                path_mode: PathMode::Walk,
2546            })),
2547        }));
2548
2549        let physical = planner.plan(&logical).unwrap();
2550        assert!(physical.columns().contains(&"a".to_string()));
2551        assert!(physical.columns().contains(&"c".to_string()));
2552    }
2553
2554    // ==================== Sort with Property Tests ====================
2555
2556    #[test]
2557    fn test_plan_sort_by_property() {
2558        let store = create_test_store();
2559        let planner = Planner::new(store);
2560
2561        // MATCH (n) RETURN n ORDER BY n.name ASC
2562        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2563            items: vec![ReturnItem {
2564                expression: LogicalExpression::Variable("n".to_string()),
2565                alias: None,
2566            }],
2567            distinct: false,
2568            input: Box::new(LogicalOperator::Sort(SortOp {
2569                keys: vec![SortKey {
2570                    expression: LogicalExpression::Property {
2571                        variable: "n".to_string(),
2572                        property: "name".to_string(),
2573                    },
2574                    order: SortOrder::Ascending,
2575                }],
2576                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2577                    variable: "n".to_string(),
2578                    label: None,
2579                    input: None,
2580                })),
2581            })),
2582        }));
2583
2584        let physical = planner.plan(&logical).unwrap();
2585        // Should have the property column projected
2586        assert!(physical.columns().contains(&"n".to_string()));
2587    }
2588
2589    // ==================== Scan with Input Tests ====================
2590
2591    #[test]
2592    fn test_plan_scan_with_input() {
2593        let store = create_test_store();
2594        let planner = Planner::new(store);
2595
2596        // A scan with another scan as input (for chained patterns)
2597        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2598            items: vec![
2599                ReturnItem {
2600                    expression: LogicalExpression::Variable("a".to_string()),
2601                    alias: None,
2602                },
2603                ReturnItem {
2604                    expression: LogicalExpression::Variable("b".to_string()),
2605                    alias: None,
2606                },
2607            ],
2608            distinct: false,
2609            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2610                variable: "b".to_string(),
2611                label: Some("Company".to_string()),
2612                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2613                    variable: "a".to_string(),
2614                    label: Some("Person".to_string()),
2615                    input: None,
2616                }))),
2617            })),
2618        }));
2619
2620        let physical = planner.plan(&logical).unwrap();
2621        assert!(physical.columns().contains(&"a".to_string()));
2622        assert!(physical.columns().contains(&"b".to_string()));
2623    }
2624}