Skip to main content

grafeo_engine/query/planner/
mod.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7mod aggregate;
8mod expand;
9mod expression;
10mod filter;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16use crate::query::plan::{
17    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
18    CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
19    ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
20    LogicalOperator, LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp,
21    ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
22};
23use grafeo_common::types::{EpochId, TxId};
24use grafeo_common::types::{LogicalType, Value};
25use grafeo_common::utils::error::{Error, Result};
26use grafeo_core::execution::AdaptiveContext;
27use grafeo_core::execution::operators::{
28    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
29    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
30    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
31    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
32    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
33    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
34    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
35    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
36    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
37    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
38    UnwindOperator, VariableLengthExpandOperator,
39};
40use grafeo_core::graph::{Direction, lpg::LpgStore};
41use std::collections::HashMap;
42use std::sync::Arc;
43
44use crate::transaction::TransactionManager;
45
46/// Range bounds for property-based range queries.
47struct RangeBounds<'a> {
48    min: Option<&'a Value>,
49    max: Option<&'a Value>,
50    min_inclusive: bool,
51    max_inclusive: bool,
52}
53
54/// Converts a logical plan to a physical operator tree.
55pub struct Planner {
56    /// The graph store to scan from.
57    pub(super) store: Arc<LpgStore>,
58    /// Transaction manager for MVCC operations.
59    pub(super) tx_manager: Option<Arc<TransactionManager>>,
60    /// Current transaction ID (if in a transaction).
61    pub(super) tx_id: Option<TxId>,
62    /// Epoch to use for visibility checks.
63    pub(super) viewing_epoch: EpochId,
64    /// Counter for generating unique anonymous edge column names.
65    pub(super) anon_edge_counter: std::cell::Cell<u32>,
66    /// Whether to use factorized execution for multi-hop queries.
67    pub(super) factorized_execution: bool,
68    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
69    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
70    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
71    /// Variables that hold edge IDs (from MATCH edge patterns).
72    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
73    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
74}
75
76impl Planner {
77    /// Creates a new planner with the given store.
78    ///
79    /// This creates a planner without transaction context, using the current
80    /// epoch from the store for visibility.
81    #[must_use]
82    pub fn new(store: Arc<LpgStore>) -> Self {
83        let epoch = store.current_epoch();
84        Self {
85            store,
86            tx_manager: None,
87            tx_id: None,
88            viewing_epoch: epoch,
89            anon_edge_counter: std::cell::Cell::new(0),
90            factorized_execution: true,
91            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
92            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
93        }
94    }
95
96    /// Creates a new planner with transaction context for MVCC-aware planning.
97    ///
98    /// # Arguments
99    ///
100    /// * `store` - The graph store
101    /// * `tx_manager` - Transaction manager for recording reads/writes
102    /// * `tx_id` - Current transaction ID (None for auto-commit)
103    /// * `viewing_epoch` - Epoch to use for version visibility
104    #[must_use]
105    pub fn with_context(
106        store: Arc<LpgStore>,
107        tx_manager: Arc<TransactionManager>,
108        tx_id: Option<TxId>,
109        viewing_epoch: EpochId,
110    ) -> Self {
111        Self {
112            store,
113            tx_manager: Some(tx_manager),
114            tx_id,
115            viewing_epoch,
116            anon_edge_counter: std::cell::Cell::new(0),
117            factorized_execution: true,
118            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
119            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
120        }
121    }
122
123    /// Returns the viewing epoch for this planner.
124    #[must_use]
125    pub fn viewing_epoch(&self) -> EpochId {
126        self.viewing_epoch
127    }
128
129    /// Returns the transaction ID for this planner, if any.
130    #[must_use]
131    pub fn tx_id(&self) -> Option<TxId> {
132        self.tx_id
133    }
134
135    /// Returns a reference to the transaction manager, if available.
136    #[must_use]
137    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
138        self.tx_manager.as_ref()
139    }
140
141    /// Enables or disables factorized execution for multi-hop queries.
142    #[must_use]
143    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
144        self.factorized_execution = enabled;
145        self
146    }
147
148    /// Counts consecutive single-hop expand operations.
149    ///
150    /// Returns the count and the deepest non-expand operator (the base of the chain).
151    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
152        match op {
153            LogicalOperator::Expand(expand) => {
154                // Only count single-hop expands (factorization doesn't apply to variable-length)
155                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
156
157                if is_single_hop {
158                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
159                    (inner_count + 1, base)
160                } else {
161                    // Variable-length path breaks the chain
162                    (0, op)
163                }
164            }
165            _ => (0, op),
166        }
167    }
168
169    /// Collects expand operations from the outermost down to the base.
170    ///
171    /// Returns expands in order from innermost (base) to outermost.
172    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
173        let mut chain = Vec::new();
174        let mut current = op;
175
176        while let LogicalOperator::Expand(expand) = current {
177            // Only include single-hop expands
178            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
179            if !is_single_hop {
180                break;
181            }
182            chain.push(expand);
183            current = &expand.input;
184        }
185
186        // Reverse so we go from base to outer
187        chain.reverse();
188        chain
189    }
190
191    /// Plans a logical plan into a physical operator.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if planning fails.
196    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
197        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
198        Ok(PhysicalPlan {
199            operator,
200            columns,
201            adaptive_context: None,
202        })
203    }
204
205    /// Plans a logical plan with adaptive execution support.
206    ///
207    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
208    /// joins) that can be monitored during execution to detect estimate errors.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if planning fails.
213    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
214        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
215
216        // Build adaptive context with cardinality estimates
217        let mut adaptive_context = AdaptiveContext::new();
218        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
219
220        Ok(PhysicalPlan {
221            operator,
222            columns,
223            adaptive_context: Some(adaptive_context),
224        })
225    }
226
227    /// Collects cardinality estimates from the logical plan into an adaptive context.
228    fn collect_cardinality_estimates(
229        &self,
230        op: &LogicalOperator,
231        ctx: &mut AdaptiveContext,
232        depth: usize,
233    ) {
234        match op {
235            LogicalOperator::NodeScan(scan) => {
236                // Estimate based on label statistics
237                let estimate = if let Some(label) = &scan.label {
238                    self.store.nodes_by_label(label).len() as f64
239                } else {
240                    self.store.node_count() as f64
241                };
242                let id = format!("scan_{}", scan.variable);
243                ctx.set_estimate(&id, estimate);
244
245                // Recurse into input if present
246                if let Some(input) = &scan.input {
247                    self.collect_cardinality_estimates(input, ctx, depth + 1);
248                }
249            }
250            LogicalOperator::Filter(filter) => {
251                // Default selectivity estimate for filters (30%)
252                let input_estimate = self.estimate_cardinality(&filter.input);
253                let estimate = input_estimate * 0.3;
254                let id = format!("filter_{depth}");
255                ctx.set_estimate(&id, estimate);
256
257                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
258            }
259            LogicalOperator::Expand(expand) => {
260                // Estimate based on average degree from store statistics
261                let input_estimate = self.estimate_cardinality(&expand.input);
262                let stats = self.store.statistics();
263                let avg_degree = self.estimate_expand_degree(&stats, expand);
264                let estimate = input_estimate * avg_degree;
265                let id = format!("expand_{}", expand.to_variable);
266                ctx.set_estimate(&id, estimate);
267
268                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
269            }
270            LogicalOperator::Join(join) => {
271                // Estimate join output (product with selectivity)
272                let left_est = self.estimate_cardinality(&join.left);
273                let right_est = self.estimate_cardinality(&join.right);
274                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
275                let id = format!("join_{depth}");
276                ctx.set_estimate(&id, estimate);
277
278                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
279                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
280            }
281            LogicalOperator::Aggregate(agg) => {
282                // Aggregates typically reduce cardinality
283                let input_estimate = self.estimate_cardinality(&agg.input);
284                let estimate = if agg.group_by.is_empty() {
285                    1.0 // Scalar aggregate
286                } else {
287                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
288                };
289                let id = format!("aggregate_{depth}");
290                ctx.set_estimate(&id, estimate);
291
292                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
293            }
294            LogicalOperator::Distinct(distinct) => {
295                let input_estimate = self.estimate_cardinality(&distinct.input);
296                let estimate = (input_estimate * 0.5).max(1.0);
297                let id = format!("distinct_{depth}");
298                ctx.set_estimate(&id, estimate);
299
300                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
301            }
302            LogicalOperator::Return(ret) => {
303                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
304            }
305            LogicalOperator::Limit(limit) => {
306                let input_estimate = self.estimate_cardinality(&limit.input);
307                let estimate = (input_estimate).min(limit.count as f64);
308                let id = format!("limit_{depth}");
309                ctx.set_estimate(&id, estimate);
310
311                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
312            }
313            LogicalOperator::Skip(skip) => {
314                let input_estimate = self.estimate_cardinality(&skip.input);
315                let estimate = (input_estimate - skip.count as f64).max(0.0);
316                let id = format!("skip_{depth}");
317                ctx.set_estimate(&id, estimate);
318
319                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
320            }
321            LogicalOperator::Sort(sort) => {
322                // Sort doesn't change cardinality
323                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
324            }
325            LogicalOperator::Union(union) => {
326                let estimate: f64 = union
327                    .inputs
328                    .iter()
329                    .map(|input| self.estimate_cardinality(input))
330                    .sum();
331                let id = format!("union_{depth}");
332                ctx.set_estimate(&id, estimate);
333
334                for input in &union.inputs {
335                    self.collect_cardinality_estimates(input, ctx, depth + 1);
336                }
337            }
338            _ => {
339                // For other operators, try to recurse into known input patterns
340            }
341        }
342    }
343
344    /// Estimates cardinality for a logical operator subtree.
345    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
346        match op {
347            LogicalOperator::NodeScan(scan) => {
348                if let Some(label) = &scan.label {
349                    self.store.nodes_by_label(label).len() as f64
350                } else {
351                    self.store.node_count() as f64
352                }
353            }
354            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
355            LogicalOperator::Expand(expand) => {
356                let stats = self.store.statistics();
357                let avg_degree = self.estimate_expand_degree(&stats, expand);
358                self.estimate_cardinality(&expand.input) * avg_degree
359            }
360            LogicalOperator::Join(join) => {
361                let left = self.estimate_cardinality(&join.left);
362                let right = self.estimate_cardinality(&join.right);
363                (left * right).sqrt()
364            }
365            LogicalOperator::Aggregate(agg) => {
366                if agg.group_by.is_empty() {
367                    1.0
368                } else {
369                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
370                }
371            }
372            LogicalOperator::Distinct(distinct) => {
373                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
374            }
375            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
376            LogicalOperator::Limit(limit) => self
377                .estimate_cardinality(&limit.input)
378                .min(limit.count as f64),
379            LogicalOperator::Skip(skip) => {
380                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
381            }
382            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
383            LogicalOperator::Union(union) => union
384                .inputs
385                .iter()
386                .map(|input| self.estimate_cardinality(input))
387                .sum(),
388            _ => 1000.0, // Default estimate for unknown operators
389        }
390    }
391
392    /// Estimates the average edge degree for an expand operation using store statistics.
393    fn estimate_expand_degree(
394        &self,
395        stats: &grafeo_core::statistics::Statistics,
396        expand: &ExpandOp,
397    ) -> f64 {
398        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
399        if let Some(edge_type) = &expand.edge_type {
400            stats.estimate_avg_degree(edge_type, outgoing)
401        } else if stats.total_nodes > 0 {
402            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
403        } else {
404            10.0 // fallback for empty graph
405        }
406    }
407
408    /// Plans a single logical operator.
409    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
410        match op {
411            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
412            LogicalOperator::Expand(expand) => {
413                // Check for expand chains when factorized execution is enabled
414                if self.factorized_execution {
415                    let (chain_len, _base) = Self::count_expand_chain(op);
416                    if chain_len >= 2 {
417                        // Use factorized chain for 2+ consecutive single-hop expands
418                        return self.plan_expand_chain(op);
419                    }
420                }
421                self.plan_expand(expand)
422            }
423            LogicalOperator::Return(ret) => self.plan_return(ret),
424            LogicalOperator::Filter(filter) => self.plan_filter(filter),
425            LogicalOperator::Project(project) => self.plan_project(project),
426            LogicalOperator::Limit(limit) => self.plan_limit(limit),
427            LogicalOperator::Skip(skip) => self.plan_skip(skip),
428            LogicalOperator::Sort(sort) => self.plan_sort(sort),
429            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
430            LogicalOperator::Join(join) => self.plan_join(join),
431            LogicalOperator::Union(union) => self.plan_union(union),
432            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
433            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
434            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
435            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
436            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
437            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
438            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
439            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
440            LogicalOperator::Merge(merge) => self.plan_merge(merge),
441            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
442            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
443            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
444            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
445            #[cfg(feature = "algos")]
446            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
447            #[cfg(not(feature = "algos"))]
448            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
449                "CALL procedures require the 'algos' feature".to_string(),
450            )),
451            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
452            LogicalOperator::VectorScan(_) => Err(Error::Internal(
453                "VectorScan requires vector-index feature".to_string(),
454            )),
455            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
456                "VectorJoin requires vector-index feature".to_string(),
457            )),
458            _ => Err(Error::Internal(format!(
459                "Unsupported operator: {:?}",
460                std::mem::discriminant(op)
461            ))),
462        }
463    }
464}
465
466/// Converts a logical binary operator to a filter binary operator.
467pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
468    match op {
469        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
470        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
471        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
472        BinaryOp::Le => Ok(BinaryFilterOp::Le),
473        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
474        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
475        BinaryOp::And => Ok(BinaryFilterOp::And),
476        BinaryOp::Or => Ok(BinaryFilterOp::Or),
477        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
478        BinaryOp::Add => Ok(BinaryFilterOp::Add),
479        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
480        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
481        BinaryOp::Div => Ok(BinaryFilterOp::Div),
482        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
483        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
484        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
485        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
486        BinaryOp::In => Ok(BinaryFilterOp::In),
487        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
488        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
489        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
490            "Binary operator {:?} not yet supported in filters",
491            op
492        ))),
493    }
494}
495
496/// Converts a logical unary operator to a filter unary operator.
497pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
498    match op {
499        UnaryOp::Not => Ok(UnaryFilterOp::Not),
500        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
501        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
502        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
503    }
504}
505
506/// Converts a logical aggregate function to a physical aggregate function.
507pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
508    match func {
509        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
510        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
511        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
512        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
513        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
514        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
515        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
516        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
517        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
518        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
519        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
520    }
521}
522
523/// Converts a logical expression to a filter expression.
524///
525/// This is a standalone function that can be used by both LPG and RDF planners.
526pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
527    match expr {
528        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
529        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
530        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
531            variable: variable.clone(),
532            property: property.clone(),
533        }),
534        LogicalExpression::Binary { left, op, right } => {
535            let left_expr = convert_filter_expression(left)?;
536            let right_expr = convert_filter_expression(right)?;
537            let filter_op = convert_binary_op(*op)?;
538            Ok(FilterExpression::Binary {
539                left: Box::new(left_expr),
540                op: filter_op,
541                right: Box::new(right_expr),
542            })
543        }
544        LogicalExpression::Unary { op, operand } => {
545            let operand_expr = convert_filter_expression(operand)?;
546            let filter_op = convert_unary_op(*op)?;
547            Ok(FilterExpression::Unary {
548                op: filter_op,
549                operand: Box::new(operand_expr),
550            })
551        }
552        LogicalExpression::FunctionCall { name, args, .. } => {
553            let filter_args: Vec<FilterExpression> = args
554                .iter()
555                .map(convert_filter_expression)
556                .collect::<Result<Vec<_>>>()?;
557            Ok(FilterExpression::FunctionCall {
558                name: name.clone(),
559                args: filter_args,
560            })
561        }
562        LogicalExpression::Case {
563            operand,
564            when_clauses,
565            else_clause,
566        } => {
567            let filter_operand = operand
568                .as_ref()
569                .map(|e| convert_filter_expression(e))
570                .transpose()?
571                .map(Box::new);
572            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
573                .iter()
574                .map(|(cond, result)| {
575                    Ok((
576                        convert_filter_expression(cond)?,
577                        convert_filter_expression(result)?,
578                    ))
579                })
580                .collect::<Result<Vec<_>>>()?;
581            let filter_else = else_clause
582                .as_ref()
583                .map(|e| convert_filter_expression(e))
584                .transpose()?
585                .map(Box::new);
586            Ok(FilterExpression::Case {
587                operand: filter_operand,
588                when_clauses: filter_when_clauses,
589                else_clause: filter_else,
590            })
591        }
592        LogicalExpression::List(items) => {
593            let filter_items: Vec<FilterExpression> = items
594                .iter()
595                .map(convert_filter_expression)
596                .collect::<Result<Vec<_>>>()?;
597            Ok(FilterExpression::List(filter_items))
598        }
599        LogicalExpression::Map(pairs) => {
600            let filter_pairs: Vec<(String, FilterExpression)> = pairs
601                .iter()
602                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
603                .collect::<Result<Vec<_>>>()?;
604            Ok(FilterExpression::Map(filter_pairs))
605        }
606        LogicalExpression::IndexAccess { base, index } => {
607            let base_expr = convert_filter_expression(base)?;
608            let index_expr = convert_filter_expression(index)?;
609            Ok(FilterExpression::IndexAccess {
610                base: Box::new(base_expr),
611                index: Box::new(index_expr),
612            })
613        }
614        LogicalExpression::SliceAccess { base, start, end } => {
615            let base_expr = convert_filter_expression(base)?;
616            let start_expr = start
617                .as_ref()
618                .map(|s| convert_filter_expression(s))
619                .transpose()?
620                .map(Box::new);
621            let end_expr = end
622                .as_ref()
623                .map(|e| convert_filter_expression(e))
624                .transpose()?
625                .map(Box::new);
626            Ok(FilterExpression::SliceAccess {
627                base: Box::new(base_expr),
628                start: start_expr,
629                end: end_expr,
630            })
631        }
632        LogicalExpression::Parameter(_) => Err(Error::Internal(
633            "Parameters not yet supported in filters".to_string(),
634        )),
635        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
636        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
637        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
638        LogicalExpression::ListComprehension {
639            variable,
640            list_expr,
641            filter_expr,
642            map_expr,
643        } => {
644            let list = convert_filter_expression(list_expr)?;
645            let filter = filter_expr
646                .as_ref()
647                .map(|f| convert_filter_expression(f))
648                .transpose()?
649                .map(Box::new);
650            let map = convert_filter_expression(map_expr)?;
651            Ok(FilterExpression::ListComprehension {
652                variable: variable.clone(),
653                list_expr: Box::new(list),
654                filter_expr: filter,
655                map_expr: Box::new(map),
656            })
657        }
658        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
659            Error::Internal("Subqueries not yet supported in filters".to_string()),
660        ),
661    }
662}
663
664/// Infers the logical type from a value.
665fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
666    use grafeo_common::types::Value;
667    match value {
668        Value::Null => LogicalType::String, // Default type for null
669        Value::Bool(_) => LogicalType::Bool,
670        Value::Int64(_) => LogicalType::Int64,
671        Value::Float64(_) => LogicalType::Float64,
672        Value::String(_) => LogicalType::String,
673        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
674        Value::Timestamp(_) => LogicalType::Timestamp,
675        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
676        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
677        Value::Vector(v) => LogicalType::Vector(v.len()),
678    }
679}
680
681/// Converts an expression to a string for column naming.
682fn expression_to_string(expr: &LogicalExpression) -> String {
683    match expr {
684        LogicalExpression::Variable(name) => name.clone(),
685        LogicalExpression::Property { variable, property } => {
686            format!("{variable}.{property}")
687        }
688        LogicalExpression::Literal(value) => format!("{value:?}"),
689        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
690        _ => "expr".to_string(),
691    }
692}
693
694/// A physical plan ready for execution.
695pub struct PhysicalPlan {
696    /// The root physical operator.
697    pub operator: Box<dyn Operator>,
698    /// Column names for the result.
699    pub columns: Vec<String>,
700    /// Adaptive execution context with cardinality estimates.
701    ///
702    /// When adaptive execution is enabled, this context contains estimated
703    /// cardinalities at various checkpoints in the plan. During execution,
704    /// actual row counts are recorded and compared against estimates.
705    pub adaptive_context: Option<AdaptiveContext>,
706}
707
708impl PhysicalPlan {
709    /// Returns the column names.
710    #[must_use]
711    pub fn columns(&self) -> &[String] {
712        &self.columns
713    }
714
715    /// Consumes the plan and returns the operator.
716    pub fn into_operator(self) -> Box<dyn Operator> {
717        self.operator
718    }
719
720    /// Returns the adaptive context, if adaptive execution is enabled.
721    #[must_use]
722    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
723        self.adaptive_context.as_ref()
724    }
725
726    /// Takes ownership of the adaptive context.
727    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
728        self.adaptive_context.take()
729    }
730}
731
732/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
733struct StaticResultOperator {
734    rows: Vec<Vec<Value>>,
735    column_indices: Vec<usize>,
736    row_index: usize,
737}
738
739impl Operator for StaticResultOperator {
740    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
741        use grafeo_core::execution::DataChunk;
742
743        if self.row_index >= self.rows.len() {
744            return Ok(None);
745        }
746
747        let remaining = self.rows.len() - self.row_index;
748        let chunk_rows = remaining.min(1024);
749        let col_count = self.column_indices.len();
750
751        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
752        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
753
754        for row_offset in 0..chunk_rows {
755            let row = &self.rows[self.row_index + row_offset];
756            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
757                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
758                if let Some(col) = chunk.column_mut(col_idx) {
759                    col.push_value(value);
760                }
761            }
762        }
763        chunk.set_count(chunk_rows);
764
765        self.row_index += chunk_rows;
766        Ok(Some(chunk))
767    }
768
769    fn reset(&mut self) {
770        self.row_index = 0;
771    }
772
773    fn name(&self) -> &'static str {
774        "StaticResult"
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781    use crate::query::plan::{
782        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
783        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
784        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
785        SortKey, SortOp,
786    };
787    use grafeo_common::types::Value;
788
789    fn create_test_store() -> Arc<LpgStore> {
790        let store = Arc::new(LpgStore::new());
791        store.create_node(&["Person"]);
792        store.create_node(&["Person"]);
793        store.create_node(&["Company"]);
794        store
795    }
796
797    // ==================== Simple Scan Tests ====================
798
799    #[test]
800    fn test_plan_simple_scan() {
801        let store = create_test_store();
802        let planner = Planner::new(store);
803
804        // MATCH (n:Person) RETURN n
805        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
806            items: vec![ReturnItem {
807                expression: LogicalExpression::Variable("n".to_string()),
808                alias: None,
809            }],
810            distinct: false,
811            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
812                variable: "n".to_string(),
813                label: Some("Person".to_string()),
814                input: None,
815            })),
816        }));
817
818        let physical = planner.plan(&logical).unwrap();
819        assert_eq!(physical.columns(), &["n"]);
820    }
821
822    #[test]
823    fn test_plan_scan_without_label() {
824        let store = create_test_store();
825        let planner = Planner::new(store);
826
827        // MATCH (n) RETURN n
828        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
829            items: vec![ReturnItem {
830                expression: LogicalExpression::Variable("n".to_string()),
831                alias: None,
832            }],
833            distinct: false,
834            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
835                variable: "n".to_string(),
836                label: None,
837                input: None,
838            })),
839        }));
840
841        let physical = planner.plan(&logical).unwrap();
842        assert_eq!(physical.columns(), &["n"]);
843    }
844
845    #[test]
846    fn test_plan_return_with_alias() {
847        let store = create_test_store();
848        let planner = Planner::new(store);
849
850        // MATCH (n:Person) RETURN n AS person
851        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
852            items: vec![ReturnItem {
853                expression: LogicalExpression::Variable("n".to_string()),
854                alias: Some("person".to_string()),
855            }],
856            distinct: false,
857            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
858                variable: "n".to_string(),
859                label: Some("Person".to_string()),
860                input: None,
861            })),
862        }));
863
864        let physical = planner.plan(&logical).unwrap();
865        assert_eq!(physical.columns(), &["person"]);
866    }
867
868    #[test]
869    fn test_plan_return_property() {
870        let store = create_test_store();
871        let planner = Planner::new(store);
872
873        // MATCH (n:Person) RETURN n.name
874        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
875            items: vec![ReturnItem {
876                expression: LogicalExpression::Property {
877                    variable: "n".to_string(),
878                    property: "name".to_string(),
879                },
880                alias: None,
881            }],
882            distinct: false,
883            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
884                variable: "n".to_string(),
885                label: Some("Person".to_string()),
886                input: None,
887            })),
888        }));
889
890        let physical = planner.plan(&logical).unwrap();
891        assert_eq!(physical.columns(), &["n.name"]);
892    }
893
894    #[test]
895    fn test_plan_return_literal() {
896        let store = create_test_store();
897        let planner = Planner::new(store);
898
899        // MATCH (n) RETURN 42 AS answer
900        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
901            items: vec![ReturnItem {
902                expression: LogicalExpression::Literal(Value::Int64(42)),
903                alias: Some("answer".to_string()),
904            }],
905            distinct: false,
906            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
907                variable: "n".to_string(),
908                label: None,
909                input: None,
910            })),
911        }));
912
913        let physical = planner.plan(&logical).unwrap();
914        assert_eq!(physical.columns(), &["answer"]);
915    }
916
917    // ==================== Filter Tests ====================
918
919    #[test]
920    fn test_plan_filter_equality() {
921        let store = create_test_store();
922        let planner = Planner::new(store);
923
924        // MATCH (n:Person) WHERE n.age = 30 RETURN n
925        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
926            items: vec![ReturnItem {
927                expression: LogicalExpression::Variable("n".to_string()),
928                alias: None,
929            }],
930            distinct: false,
931            input: Box::new(LogicalOperator::Filter(FilterOp {
932                predicate: LogicalExpression::Binary {
933                    left: Box::new(LogicalExpression::Property {
934                        variable: "n".to_string(),
935                        property: "age".to_string(),
936                    }),
937                    op: BinaryOp::Eq,
938                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
939                },
940                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
941                    variable: "n".to_string(),
942                    label: Some("Person".to_string()),
943                    input: None,
944                })),
945            })),
946        }));
947
948        let physical = planner.plan(&logical).unwrap();
949        assert_eq!(physical.columns(), &["n"]);
950    }
951
952    #[test]
953    fn test_plan_filter_compound_and() {
954        let store = create_test_store();
955        let planner = Planner::new(store);
956
957        // WHERE n.age > 20 AND n.age < 40
958        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
959            items: vec![ReturnItem {
960                expression: LogicalExpression::Variable("n".to_string()),
961                alias: None,
962            }],
963            distinct: false,
964            input: Box::new(LogicalOperator::Filter(FilterOp {
965                predicate: LogicalExpression::Binary {
966                    left: Box::new(LogicalExpression::Binary {
967                        left: Box::new(LogicalExpression::Property {
968                            variable: "n".to_string(),
969                            property: "age".to_string(),
970                        }),
971                        op: BinaryOp::Gt,
972                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
973                    }),
974                    op: BinaryOp::And,
975                    right: Box::new(LogicalExpression::Binary {
976                        left: Box::new(LogicalExpression::Property {
977                            variable: "n".to_string(),
978                            property: "age".to_string(),
979                        }),
980                        op: BinaryOp::Lt,
981                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
982                    }),
983                },
984                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
985                    variable: "n".to_string(),
986                    label: None,
987                    input: None,
988                })),
989            })),
990        }));
991
992        let physical = planner.plan(&logical).unwrap();
993        assert_eq!(physical.columns(), &["n"]);
994    }
995
996    #[test]
997    fn test_plan_filter_unary_not() {
998        let store = create_test_store();
999        let planner = Planner::new(store);
1000
1001        // WHERE NOT n.active
1002        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1003            items: vec![ReturnItem {
1004                expression: LogicalExpression::Variable("n".to_string()),
1005                alias: None,
1006            }],
1007            distinct: false,
1008            input: Box::new(LogicalOperator::Filter(FilterOp {
1009                predicate: LogicalExpression::Unary {
1010                    op: UnaryOp::Not,
1011                    operand: Box::new(LogicalExpression::Property {
1012                        variable: "n".to_string(),
1013                        property: "active".to_string(),
1014                    }),
1015                },
1016                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1017                    variable: "n".to_string(),
1018                    label: None,
1019                    input: None,
1020                })),
1021            })),
1022        }));
1023
1024        let physical = planner.plan(&logical).unwrap();
1025        assert_eq!(physical.columns(), &["n"]);
1026    }
1027
1028    #[test]
1029    fn test_plan_filter_is_null() {
1030        let store = create_test_store();
1031        let planner = Planner::new(store);
1032
1033        // WHERE n.email IS NULL
1034        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1035            items: vec![ReturnItem {
1036                expression: LogicalExpression::Variable("n".to_string()),
1037                alias: None,
1038            }],
1039            distinct: false,
1040            input: Box::new(LogicalOperator::Filter(FilterOp {
1041                predicate: LogicalExpression::Unary {
1042                    op: UnaryOp::IsNull,
1043                    operand: Box::new(LogicalExpression::Property {
1044                        variable: "n".to_string(),
1045                        property: "email".to_string(),
1046                    }),
1047                },
1048                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1049                    variable: "n".to_string(),
1050                    label: None,
1051                    input: None,
1052                })),
1053            })),
1054        }));
1055
1056        let physical = planner.plan(&logical).unwrap();
1057        assert_eq!(physical.columns(), &["n"]);
1058    }
1059
1060    #[test]
1061    fn test_plan_filter_function_call() {
1062        let store = create_test_store();
1063        let planner = Planner::new(store);
1064
1065        // WHERE size(n.friends) > 0
1066        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1067            items: vec![ReturnItem {
1068                expression: LogicalExpression::Variable("n".to_string()),
1069                alias: None,
1070            }],
1071            distinct: false,
1072            input: Box::new(LogicalOperator::Filter(FilterOp {
1073                predicate: LogicalExpression::Binary {
1074                    left: Box::new(LogicalExpression::FunctionCall {
1075                        name: "size".to_string(),
1076                        args: vec![LogicalExpression::Property {
1077                            variable: "n".to_string(),
1078                            property: "friends".to_string(),
1079                        }],
1080                        distinct: false,
1081                    }),
1082                    op: BinaryOp::Gt,
1083                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1084                },
1085                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1086                    variable: "n".to_string(),
1087                    label: None,
1088                    input: None,
1089                })),
1090            })),
1091        }));
1092
1093        let physical = planner.plan(&logical).unwrap();
1094        assert_eq!(physical.columns(), &["n"]);
1095    }
1096
1097    // ==================== Expand Tests ====================
1098
1099    #[test]
1100    fn test_plan_expand_outgoing() {
1101        let store = create_test_store();
1102        let planner = Planner::new(store);
1103
1104        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1105        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1106            items: vec![
1107                ReturnItem {
1108                    expression: LogicalExpression::Variable("a".to_string()),
1109                    alias: None,
1110                },
1111                ReturnItem {
1112                    expression: LogicalExpression::Variable("b".to_string()),
1113                    alias: None,
1114                },
1115            ],
1116            distinct: false,
1117            input: Box::new(LogicalOperator::Expand(ExpandOp {
1118                from_variable: "a".to_string(),
1119                to_variable: "b".to_string(),
1120                edge_variable: None,
1121                direction: ExpandDirection::Outgoing,
1122                edge_type: Some("KNOWS".to_string()),
1123                min_hops: 1,
1124                max_hops: Some(1),
1125                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1126                    variable: "a".to_string(),
1127                    label: Some("Person".to_string()),
1128                    input: None,
1129                })),
1130                path_alias: None,
1131            })),
1132        }));
1133
1134        let physical = planner.plan(&logical).unwrap();
1135        // The return should have columns [a, b]
1136        assert!(physical.columns().contains(&"a".to_string()));
1137        assert!(physical.columns().contains(&"b".to_string()));
1138    }
1139
1140    #[test]
1141    fn test_plan_expand_with_edge_variable() {
1142        let store = create_test_store();
1143        let planner = Planner::new(store);
1144
1145        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1146        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1147            items: vec![
1148                ReturnItem {
1149                    expression: LogicalExpression::Variable("a".to_string()),
1150                    alias: None,
1151                },
1152                ReturnItem {
1153                    expression: LogicalExpression::Variable("r".to_string()),
1154                    alias: None,
1155                },
1156                ReturnItem {
1157                    expression: LogicalExpression::Variable("b".to_string()),
1158                    alias: None,
1159                },
1160            ],
1161            distinct: false,
1162            input: Box::new(LogicalOperator::Expand(ExpandOp {
1163                from_variable: "a".to_string(),
1164                to_variable: "b".to_string(),
1165                edge_variable: Some("r".to_string()),
1166                direction: ExpandDirection::Outgoing,
1167                edge_type: Some("KNOWS".to_string()),
1168                min_hops: 1,
1169                max_hops: Some(1),
1170                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1171                    variable: "a".to_string(),
1172                    label: None,
1173                    input: None,
1174                })),
1175                path_alias: None,
1176            })),
1177        }));
1178
1179        let physical = planner.plan(&logical).unwrap();
1180        assert!(physical.columns().contains(&"a".to_string()));
1181        assert!(physical.columns().contains(&"r".to_string()));
1182        assert!(physical.columns().contains(&"b".to_string()));
1183    }
1184
1185    // ==================== Limit/Skip/Sort Tests ====================
1186
1187    #[test]
1188    fn test_plan_limit() {
1189        let store = create_test_store();
1190        let planner = Planner::new(store);
1191
1192        // MATCH (n) RETURN n LIMIT 10
1193        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1194            items: vec![ReturnItem {
1195                expression: LogicalExpression::Variable("n".to_string()),
1196                alias: None,
1197            }],
1198            distinct: false,
1199            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1200                count: 10,
1201                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1202                    variable: "n".to_string(),
1203                    label: None,
1204                    input: None,
1205                })),
1206            })),
1207        }));
1208
1209        let physical = planner.plan(&logical).unwrap();
1210        assert_eq!(physical.columns(), &["n"]);
1211    }
1212
1213    #[test]
1214    fn test_plan_skip() {
1215        let store = create_test_store();
1216        let planner = Planner::new(store);
1217
1218        // MATCH (n) RETURN n SKIP 5
1219        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1220            items: vec![ReturnItem {
1221                expression: LogicalExpression::Variable("n".to_string()),
1222                alias: None,
1223            }],
1224            distinct: false,
1225            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1226                count: 5,
1227                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1228                    variable: "n".to_string(),
1229                    label: None,
1230                    input: None,
1231                })),
1232            })),
1233        }));
1234
1235        let physical = planner.plan(&logical).unwrap();
1236        assert_eq!(physical.columns(), &["n"]);
1237    }
1238
1239    #[test]
1240    fn test_plan_sort() {
1241        let store = create_test_store();
1242        let planner = Planner::new(store);
1243
1244        // MATCH (n) RETURN n ORDER BY n.name ASC
1245        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1246            items: vec![ReturnItem {
1247                expression: LogicalExpression::Variable("n".to_string()),
1248                alias: None,
1249            }],
1250            distinct: false,
1251            input: Box::new(LogicalOperator::Sort(SortOp {
1252                keys: vec![SortKey {
1253                    expression: LogicalExpression::Variable("n".to_string()),
1254                    order: SortOrder::Ascending,
1255                }],
1256                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1257                    variable: "n".to_string(),
1258                    label: None,
1259                    input: None,
1260                })),
1261            })),
1262        }));
1263
1264        let physical = planner.plan(&logical).unwrap();
1265        assert_eq!(physical.columns(), &["n"]);
1266    }
1267
1268    #[test]
1269    fn test_plan_sort_descending() {
1270        let store = create_test_store();
1271        let planner = Planner::new(store);
1272
1273        // ORDER BY n DESC
1274        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1275            items: vec![ReturnItem {
1276                expression: LogicalExpression::Variable("n".to_string()),
1277                alias: None,
1278            }],
1279            distinct: false,
1280            input: Box::new(LogicalOperator::Sort(SortOp {
1281                keys: vec![SortKey {
1282                    expression: LogicalExpression::Variable("n".to_string()),
1283                    order: SortOrder::Descending,
1284                }],
1285                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1286                    variable: "n".to_string(),
1287                    label: None,
1288                    input: None,
1289                })),
1290            })),
1291        }));
1292
1293        let physical = planner.plan(&logical).unwrap();
1294        assert_eq!(physical.columns(), &["n"]);
1295    }
1296
1297    #[test]
1298    fn test_plan_distinct() {
1299        let store = create_test_store();
1300        let planner = Planner::new(store);
1301
1302        // MATCH (n) RETURN DISTINCT n
1303        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1304            items: vec![ReturnItem {
1305                expression: LogicalExpression::Variable("n".to_string()),
1306                alias: None,
1307            }],
1308            distinct: false,
1309            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1310                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1311                    variable: "n".to_string(),
1312                    label: None,
1313                    input: None,
1314                })),
1315                columns: None,
1316            })),
1317        }));
1318
1319        let physical = planner.plan(&logical).unwrap();
1320        assert_eq!(physical.columns(), &["n"]);
1321    }
1322
1323    // ==================== Aggregate Tests ====================
1324
1325    #[test]
1326    fn test_plan_aggregate_count() {
1327        let store = create_test_store();
1328        let planner = Planner::new(store);
1329
1330        // MATCH (n) RETURN count(n)
1331        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1332            items: vec![ReturnItem {
1333                expression: LogicalExpression::Variable("cnt".to_string()),
1334                alias: None,
1335            }],
1336            distinct: false,
1337            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1338                group_by: vec![],
1339                aggregates: vec![LogicalAggregateExpr {
1340                    function: LogicalAggregateFunction::Count,
1341                    expression: Some(LogicalExpression::Variable("n".to_string())),
1342                    distinct: false,
1343                    alias: Some("cnt".to_string()),
1344                    percentile: None,
1345                }],
1346                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1347                    variable: "n".to_string(),
1348                    label: None,
1349                    input: None,
1350                })),
1351                having: None,
1352            })),
1353        }));
1354
1355        let physical = planner.plan(&logical).unwrap();
1356        assert!(physical.columns().contains(&"cnt".to_string()));
1357    }
1358
1359    #[test]
1360    fn test_plan_aggregate_with_group_by() {
1361        let store = create_test_store();
1362        let planner = Planner::new(store);
1363
1364        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1365        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1366            group_by: vec![LogicalExpression::Property {
1367                variable: "n".to_string(),
1368                property: "city".to_string(),
1369            }],
1370            aggregates: vec![LogicalAggregateExpr {
1371                function: LogicalAggregateFunction::Count,
1372                expression: Some(LogicalExpression::Variable("n".to_string())),
1373                distinct: false,
1374                alias: Some("cnt".to_string()),
1375                percentile: None,
1376            }],
1377            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1378                variable: "n".to_string(),
1379                label: Some("Person".to_string()),
1380                input: None,
1381            })),
1382            having: None,
1383        }));
1384
1385        let physical = planner.plan(&logical).unwrap();
1386        assert_eq!(physical.columns().len(), 2);
1387    }
1388
1389    #[test]
1390    fn test_plan_aggregate_sum() {
1391        let store = create_test_store();
1392        let planner = Planner::new(store);
1393
1394        // SUM(n.value)
1395        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1396            group_by: vec![],
1397            aggregates: vec![LogicalAggregateExpr {
1398                function: LogicalAggregateFunction::Sum,
1399                expression: Some(LogicalExpression::Property {
1400                    variable: "n".to_string(),
1401                    property: "value".to_string(),
1402                }),
1403                distinct: false,
1404                alias: Some("total".to_string()),
1405                percentile: None,
1406            }],
1407            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1408                variable: "n".to_string(),
1409                label: None,
1410                input: None,
1411            })),
1412            having: None,
1413        }));
1414
1415        let physical = planner.plan(&logical).unwrap();
1416        assert!(physical.columns().contains(&"total".to_string()));
1417    }
1418
1419    #[test]
1420    fn test_plan_aggregate_avg() {
1421        let store = create_test_store();
1422        let planner = Planner::new(store);
1423
1424        // AVG(n.score)
1425        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1426            group_by: vec![],
1427            aggregates: vec![LogicalAggregateExpr {
1428                function: LogicalAggregateFunction::Avg,
1429                expression: Some(LogicalExpression::Property {
1430                    variable: "n".to_string(),
1431                    property: "score".to_string(),
1432                }),
1433                distinct: false,
1434                alias: Some("average".to_string()),
1435                percentile: None,
1436            }],
1437            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1438                variable: "n".to_string(),
1439                label: None,
1440                input: None,
1441            })),
1442            having: None,
1443        }));
1444
1445        let physical = planner.plan(&logical).unwrap();
1446        assert!(physical.columns().contains(&"average".to_string()));
1447    }
1448
1449    #[test]
1450    fn test_plan_aggregate_min_max() {
1451        let store = create_test_store();
1452        let planner = Planner::new(store);
1453
1454        // MIN(n.age), MAX(n.age)
1455        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1456            group_by: vec![],
1457            aggregates: vec![
1458                LogicalAggregateExpr {
1459                    function: LogicalAggregateFunction::Min,
1460                    expression: Some(LogicalExpression::Property {
1461                        variable: "n".to_string(),
1462                        property: "age".to_string(),
1463                    }),
1464                    distinct: false,
1465                    alias: Some("youngest".to_string()),
1466                    percentile: None,
1467                },
1468                LogicalAggregateExpr {
1469                    function: LogicalAggregateFunction::Max,
1470                    expression: Some(LogicalExpression::Property {
1471                        variable: "n".to_string(),
1472                        property: "age".to_string(),
1473                    }),
1474                    distinct: false,
1475                    alias: Some("oldest".to_string()),
1476                    percentile: None,
1477                },
1478            ],
1479            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1480                variable: "n".to_string(),
1481                label: None,
1482                input: None,
1483            })),
1484            having: None,
1485        }));
1486
1487        let physical = planner.plan(&logical).unwrap();
1488        assert!(physical.columns().contains(&"youngest".to_string()));
1489        assert!(physical.columns().contains(&"oldest".to_string()));
1490    }
1491
1492    // ==================== Join Tests ====================
1493
1494    #[test]
1495    fn test_plan_inner_join() {
1496        let store = create_test_store();
1497        let planner = Planner::new(store);
1498
1499        // Inner join between two scans
1500        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1501            items: vec![
1502                ReturnItem {
1503                    expression: LogicalExpression::Variable("a".to_string()),
1504                    alias: None,
1505                },
1506                ReturnItem {
1507                    expression: LogicalExpression::Variable("b".to_string()),
1508                    alias: None,
1509                },
1510            ],
1511            distinct: false,
1512            input: Box::new(LogicalOperator::Join(JoinOp {
1513                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1514                    variable: "a".to_string(),
1515                    label: Some("Person".to_string()),
1516                    input: None,
1517                })),
1518                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1519                    variable: "b".to_string(),
1520                    label: Some("Company".to_string()),
1521                    input: None,
1522                })),
1523                join_type: JoinType::Inner,
1524                conditions: vec![JoinCondition {
1525                    left: LogicalExpression::Variable("a".to_string()),
1526                    right: LogicalExpression::Variable("b".to_string()),
1527                }],
1528            })),
1529        }));
1530
1531        let physical = planner.plan(&logical).unwrap();
1532        assert!(physical.columns().contains(&"a".to_string()));
1533        assert!(physical.columns().contains(&"b".to_string()));
1534    }
1535
1536    #[test]
1537    fn test_plan_cross_join() {
1538        let store = create_test_store();
1539        let planner = Planner::new(store);
1540
1541        // Cross join (no conditions)
1542        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1543            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1544                variable: "a".to_string(),
1545                label: None,
1546                input: None,
1547            })),
1548            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1549                variable: "b".to_string(),
1550                label: None,
1551                input: None,
1552            })),
1553            join_type: JoinType::Cross,
1554            conditions: vec![],
1555        }));
1556
1557        let physical = planner.plan(&logical).unwrap();
1558        assert_eq!(physical.columns().len(), 2);
1559    }
1560
1561    #[test]
1562    fn test_plan_left_join() {
1563        let store = create_test_store();
1564        let planner = Planner::new(store);
1565
1566        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1567            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1568                variable: "a".to_string(),
1569                label: None,
1570                input: None,
1571            })),
1572            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1573                variable: "b".to_string(),
1574                label: None,
1575                input: None,
1576            })),
1577            join_type: JoinType::Left,
1578            conditions: vec![],
1579        }));
1580
1581        let physical = planner.plan(&logical).unwrap();
1582        assert_eq!(physical.columns().len(), 2);
1583    }
1584
1585    // ==================== Mutation Tests ====================
1586
1587    #[test]
1588    fn test_plan_create_node() {
1589        let store = create_test_store();
1590        let planner = Planner::new(store);
1591
1592        // CREATE (n:Person {name: 'Alice'})
1593        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1594            variable: "n".to_string(),
1595            labels: vec!["Person".to_string()],
1596            properties: vec![(
1597                "name".to_string(),
1598                LogicalExpression::Literal(Value::String("Alice".into())),
1599            )],
1600            input: None,
1601        }));
1602
1603        let physical = planner.plan(&logical).unwrap();
1604        assert!(physical.columns().contains(&"n".to_string()));
1605    }
1606
1607    #[test]
1608    fn test_plan_create_edge() {
1609        let store = create_test_store();
1610        let planner = Planner::new(store);
1611
1612        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1613        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1614            variable: Some("r".to_string()),
1615            from_variable: "a".to_string(),
1616            to_variable: "b".to_string(),
1617            edge_type: "KNOWS".to_string(),
1618            properties: vec![],
1619            input: Box::new(LogicalOperator::Join(JoinOp {
1620                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1621                    variable: "a".to_string(),
1622                    label: None,
1623                    input: None,
1624                })),
1625                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1626                    variable: "b".to_string(),
1627                    label: None,
1628                    input: None,
1629                })),
1630                join_type: JoinType::Cross,
1631                conditions: vec![],
1632            })),
1633        }));
1634
1635        let physical = planner.plan(&logical).unwrap();
1636        assert!(physical.columns().contains(&"r".to_string()));
1637    }
1638
1639    #[test]
1640    fn test_plan_delete_node() {
1641        let store = create_test_store();
1642        let planner = Planner::new(store);
1643
1644        // MATCH (n) DELETE n
1645        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1646            variable: "n".to_string(),
1647            detach: false,
1648            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1649                variable: "n".to_string(),
1650                label: None,
1651                input: None,
1652            })),
1653        }));
1654
1655        let physical = planner.plan(&logical).unwrap();
1656        assert!(physical.columns().contains(&"deleted_count".to_string()));
1657    }
1658
1659    // ==================== Error Cases ====================
1660
1661    #[test]
1662    fn test_plan_empty_errors() {
1663        let store = create_test_store();
1664        let planner = Planner::new(store);
1665
1666        let logical = LogicalPlan::new(LogicalOperator::Empty);
1667        let result = planner.plan(&logical);
1668        assert!(result.is_err());
1669    }
1670
1671    #[test]
1672    fn test_plan_missing_variable_in_return() {
1673        let store = create_test_store();
1674        let planner = Planner::new(store);
1675
1676        // Return variable that doesn't exist in input
1677        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1678            items: vec![ReturnItem {
1679                expression: LogicalExpression::Variable("missing".to_string()),
1680                alias: None,
1681            }],
1682            distinct: false,
1683            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1684                variable: "n".to_string(),
1685                label: None,
1686                input: None,
1687            })),
1688        }));
1689
1690        let result = planner.plan(&logical);
1691        assert!(result.is_err());
1692    }
1693
1694    // ==================== Helper Function Tests ====================
1695
1696    #[test]
1697    fn test_convert_binary_ops() {
1698        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1699        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1700        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1701        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1702        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1703        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1704        assert!(convert_binary_op(BinaryOp::And).is_ok());
1705        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1706        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1707        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1708        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1709        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1710    }
1711
1712    #[test]
1713    fn test_convert_unary_ops() {
1714        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1715        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1716        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1717        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1718    }
1719
1720    #[test]
1721    fn test_convert_aggregate_functions() {
1722        assert!(matches!(
1723            convert_aggregate_function(LogicalAggregateFunction::Count),
1724            PhysicalAggregateFunction::Count
1725        ));
1726        assert!(matches!(
1727            convert_aggregate_function(LogicalAggregateFunction::Sum),
1728            PhysicalAggregateFunction::Sum
1729        ));
1730        assert!(matches!(
1731            convert_aggregate_function(LogicalAggregateFunction::Avg),
1732            PhysicalAggregateFunction::Avg
1733        ));
1734        assert!(matches!(
1735            convert_aggregate_function(LogicalAggregateFunction::Min),
1736            PhysicalAggregateFunction::Min
1737        ));
1738        assert!(matches!(
1739            convert_aggregate_function(LogicalAggregateFunction::Max),
1740            PhysicalAggregateFunction::Max
1741        ));
1742    }
1743
1744    #[test]
1745    fn test_planner_accessors() {
1746        let store = create_test_store();
1747        let planner = Planner::new(Arc::clone(&store));
1748
1749        assert!(planner.tx_id().is_none());
1750        assert!(planner.tx_manager().is_none());
1751        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1752    }
1753
1754    #[test]
1755    fn test_physical_plan_accessors() {
1756        let store = create_test_store();
1757        let planner = Planner::new(store);
1758
1759        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1760            variable: "n".to_string(),
1761            label: None,
1762            input: None,
1763        }));
1764
1765        let physical = planner.plan(&logical).unwrap();
1766        assert_eq!(physical.columns(), &["n"]);
1767
1768        // Test into_operator
1769        let _ = physical.into_operator();
1770    }
1771
1772    // ==================== Adaptive Planning Tests ====================
1773
1774    #[test]
1775    fn test_plan_adaptive_with_scan() {
1776        let store = create_test_store();
1777        let planner = Planner::new(store);
1778
1779        // MATCH (n:Person) RETURN n
1780        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1781            items: vec![ReturnItem {
1782                expression: LogicalExpression::Variable("n".to_string()),
1783                alias: None,
1784            }],
1785            distinct: false,
1786            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1787                variable: "n".to_string(),
1788                label: Some("Person".to_string()),
1789                input: None,
1790            })),
1791        }));
1792
1793        let physical = planner.plan_adaptive(&logical).unwrap();
1794        assert_eq!(physical.columns(), &["n"]);
1795        // Should have adaptive context with estimates
1796        assert!(physical.adaptive_context.is_some());
1797    }
1798
1799    #[test]
1800    fn test_plan_adaptive_with_filter() {
1801        let store = create_test_store();
1802        let planner = Planner::new(store);
1803
1804        // MATCH (n) WHERE n.age > 30 RETURN n
1805        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1806            items: vec![ReturnItem {
1807                expression: LogicalExpression::Variable("n".to_string()),
1808                alias: None,
1809            }],
1810            distinct: false,
1811            input: Box::new(LogicalOperator::Filter(FilterOp {
1812                predicate: LogicalExpression::Binary {
1813                    left: Box::new(LogicalExpression::Property {
1814                        variable: "n".to_string(),
1815                        property: "age".to_string(),
1816                    }),
1817                    op: BinaryOp::Gt,
1818                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1819                },
1820                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1821                    variable: "n".to_string(),
1822                    label: None,
1823                    input: None,
1824                })),
1825            })),
1826        }));
1827
1828        let physical = planner.plan_adaptive(&logical).unwrap();
1829        assert!(physical.adaptive_context.is_some());
1830    }
1831
1832    #[test]
1833    fn test_plan_adaptive_with_expand() {
1834        let store = create_test_store();
1835        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1836
1837        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1838        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1839            items: vec![
1840                ReturnItem {
1841                    expression: LogicalExpression::Variable("a".to_string()),
1842                    alias: None,
1843                },
1844                ReturnItem {
1845                    expression: LogicalExpression::Variable("b".to_string()),
1846                    alias: None,
1847                },
1848            ],
1849            distinct: false,
1850            input: Box::new(LogicalOperator::Expand(ExpandOp {
1851                from_variable: "a".to_string(),
1852                to_variable: "b".to_string(),
1853                edge_variable: None,
1854                direction: ExpandDirection::Outgoing,
1855                edge_type: Some("KNOWS".to_string()),
1856                min_hops: 1,
1857                max_hops: Some(1),
1858                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1859                    variable: "a".to_string(),
1860                    label: None,
1861                    input: None,
1862                })),
1863                path_alias: None,
1864            })),
1865        }));
1866
1867        let physical = planner.plan_adaptive(&logical).unwrap();
1868        assert!(physical.adaptive_context.is_some());
1869    }
1870
1871    #[test]
1872    fn test_plan_adaptive_with_join() {
1873        let store = create_test_store();
1874        let planner = Planner::new(store);
1875
1876        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1877            items: vec![
1878                ReturnItem {
1879                    expression: LogicalExpression::Variable("a".to_string()),
1880                    alias: None,
1881                },
1882                ReturnItem {
1883                    expression: LogicalExpression::Variable("b".to_string()),
1884                    alias: None,
1885                },
1886            ],
1887            distinct: false,
1888            input: Box::new(LogicalOperator::Join(JoinOp {
1889                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1890                    variable: "a".to_string(),
1891                    label: None,
1892                    input: None,
1893                })),
1894                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1895                    variable: "b".to_string(),
1896                    label: None,
1897                    input: None,
1898                })),
1899                join_type: JoinType::Cross,
1900                conditions: vec![],
1901            })),
1902        }));
1903
1904        let physical = planner.plan_adaptive(&logical).unwrap();
1905        assert!(physical.adaptive_context.is_some());
1906    }
1907
1908    #[test]
1909    fn test_plan_adaptive_with_aggregate() {
1910        let store = create_test_store();
1911        let planner = Planner::new(store);
1912
1913        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1914            group_by: vec![],
1915            aggregates: vec![LogicalAggregateExpr {
1916                function: LogicalAggregateFunction::Count,
1917                expression: Some(LogicalExpression::Variable("n".to_string())),
1918                distinct: false,
1919                alias: Some("cnt".to_string()),
1920                percentile: None,
1921            }],
1922            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1923                variable: "n".to_string(),
1924                label: None,
1925                input: None,
1926            })),
1927            having: None,
1928        }));
1929
1930        let physical = planner.plan_adaptive(&logical).unwrap();
1931        assert!(physical.adaptive_context.is_some());
1932    }
1933
1934    #[test]
1935    fn test_plan_adaptive_with_distinct() {
1936        let store = create_test_store();
1937        let planner = Planner::new(store);
1938
1939        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1940            items: vec![ReturnItem {
1941                expression: LogicalExpression::Variable("n".to_string()),
1942                alias: None,
1943            }],
1944            distinct: false,
1945            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1946                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1947                    variable: "n".to_string(),
1948                    label: None,
1949                    input: None,
1950                })),
1951                columns: None,
1952            })),
1953        }));
1954
1955        let physical = planner.plan_adaptive(&logical).unwrap();
1956        assert!(physical.adaptive_context.is_some());
1957    }
1958
1959    #[test]
1960    fn test_plan_adaptive_with_limit() {
1961        let store = create_test_store();
1962        let planner = Planner::new(store);
1963
1964        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1965            items: vec![ReturnItem {
1966                expression: LogicalExpression::Variable("n".to_string()),
1967                alias: None,
1968            }],
1969            distinct: false,
1970            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1971                count: 10,
1972                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1973                    variable: "n".to_string(),
1974                    label: None,
1975                    input: None,
1976                })),
1977            })),
1978        }));
1979
1980        let physical = planner.plan_adaptive(&logical).unwrap();
1981        assert!(physical.adaptive_context.is_some());
1982    }
1983
1984    #[test]
1985    fn test_plan_adaptive_with_skip() {
1986        let store = create_test_store();
1987        let planner = Planner::new(store);
1988
1989        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1990            items: vec![ReturnItem {
1991                expression: LogicalExpression::Variable("n".to_string()),
1992                alias: None,
1993            }],
1994            distinct: false,
1995            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1996                count: 5,
1997                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1998                    variable: "n".to_string(),
1999                    label: None,
2000                    input: None,
2001                })),
2002            })),
2003        }));
2004
2005        let physical = planner.plan_adaptive(&logical).unwrap();
2006        assert!(physical.adaptive_context.is_some());
2007    }
2008
2009    #[test]
2010    fn test_plan_adaptive_with_sort() {
2011        let store = create_test_store();
2012        let planner = Planner::new(store);
2013
2014        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2015            items: vec![ReturnItem {
2016                expression: LogicalExpression::Variable("n".to_string()),
2017                alias: None,
2018            }],
2019            distinct: false,
2020            input: Box::new(LogicalOperator::Sort(SortOp {
2021                keys: vec![SortKey {
2022                    expression: LogicalExpression::Variable("n".to_string()),
2023                    order: SortOrder::Ascending,
2024                }],
2025                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2026                    variable: "n".to_string(),
2027                    label: None,
2028                    input: None,
2029                })),
2030            })),
2031        }));
2032
2033        let physical = planner.plan_adaptive(&logical).unwrap();
2034        assert!(physical.adaptive_context.is_some());
2035    }
2036
2037    #[test]
2038    fn test_plan_adaptive_with_union() {
2039        let store = create_test_store();
2040        let planner = Planner::new(store);
2041
2042        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2043            items: vec![ReturnItem {
2044                expression: LogicalExpression::Variable("n".to_string()),
2045                alias: None,
2046            }],
2047            distinct: false,
2048            input: Box::new(LogicalOperator::Union(UnionOp {
2049                inputs: vec![
2050                    LogicalOperator::NodeScan(NodeScanOp {
2051                        variable: "n".to_string(),
2052                        label: Some("Person".to_string()),
2053                        input: None,
2054                    }),
2055                    LogicalOperator::NodeScan(NodeScanOp {
2056                        variable: "n".to_string(),
2057                        label: Some("Company".to_string()),
2058                        input: None,
2059                    }),
2060                ],
2061            })),
2062        }));
2063
2064        let physical = planner.plan_adaptive(&logical).unwrap();
2065        assert!(physical.adaptive_context.is_some());
2066    }
2067
2068    // ==================== Variable Length Path Tests ====================
2069
2070    #[test]
2071    fn test_plan_expand_variable_length() {
2072        let store = create_test_store();
2073        let planner = Planner::new(store);
2074
2075        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2076        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2077            items: vec![
2078                ReturnItem {
2079                    expression: LogicalExpression::Variable("a".to_string()),
2080                    alias: None,
2081                },
2082                ReturnItem {
2083                    expression: LogicalExpression::Variable("b".to_string()),
2084                    alias: None,
2085                },
2086            ],
2087            distinct: false,
2088            input: Box::new(LogicalOperator::Expand(ExpandOp {
2089                from_variable: "a".to_string(),
2090                to_variable: "b".to_string(),
2091                edge_variable: None,
2092                direction: ExpandDirection::Outgoing,
2093                edge_type: Some("KNOWS".to_string()),
2094                min_hops: 1,
2095                max_hops: Some(3),
2096                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2097                    variable: "a".to_string(),
2098                    label: None,
2099                    input: None,
2100                })),
2101                path_alias: None,
2102            })),
2103        }));
2104
2105        let physical = planner.plan(&logical).unwrap();
2106        assert!(physical.columns().contains(&"a".to_string()));
2107        assert!(physical.columns().contains(&"b".to_string()));
2108    }
2109
2110    #[test]
2111    fn test_plan_expand_with_path_alias() {
2112        let store = create_test_store();
2113        let planner = Planner::new(store);
2114
2115        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2116        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2117            items: vec![
2118                ReturnItem {
2119                    expression: LogicalExpression::Variable("a".to_string()),
2120                    alias: None,
2121                },
2122                ReturnItem {
2123                    expression: LogicalExpression::Variable("b".to_string()),
2124                    alias: None,
2125                },
2126            ],
2127            distinct: false,
2128            input: Box::new(LogicalOperator::Expand(ExpandOp {
2129                from_variable: "a".to_string(),
2130                to_variable: "b".to_string(),
2131                edge_variable: None,
2132                direction: ExpandDirection::Outgoing,
2133                edge_type: Some("KNOWS".to_string()),
2134                min_hops: 1,
2135                max_hops: Some(3),
2136                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2137                    variable: "a".to_string(),
2138                    label: None,
2139                    input: None,
2140                })),
2141                path_alias: Some("p".to_string()),
2142            })),
2143        }));
2144
2145        let physical = planner.plan(&logical).unwrap();
2146        // Verify plan was created successfully with expected output columns
2147        assert!(physical.columns().contains(&"a".to_string()));
2148        assert!(physical.columns().contains(&"b".to_string()));
2149    }
2150
2151    #[test]
2152    fn test_plan_expand_incoming() {
2153        let store = create_test_store();
2154        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2155
2156        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2157        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2158            items: vec![
2159                ReturnItem {
2160                    expression: LogicalExpression::Variable("a".to_string()),
2161                    alias: None,
2162                },
2163                ReturnItem {
2164                    expression: LogicalExpression::Variable("b".to_string()),
2165                    alias: None,
2166                },
2167            ],
2168            distinct: false,
2169            input: Box::new(LogicalOperator::Expand(ExpandOp {
2170                from_variable: "a".to_string(),
2171                to_variable: "b".to_string(),
2172                edge_variable: None,
2173                direction: ExpandDirection::Incoming,
2174                edge_type: Some("KNOWS".to_string()),
2175                min_hops: 1,
2176                max_hops: Some(1),
2177                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2178                    variable: "a".to_string(),
2179                    label: None,
2180                    input: None,
2181                })),
2182                path_alias: None,
2183            })),
2184        }));
2185
2186        let physical = planner.plan(&logical).unwrap();
2187        assert!(physical.columns().contains(&"a".to_string()));
2188        assert!(physical.columns().contains(&"b".to_string()));
2189    }
2190
2191    #[test]
2192    fn test_plan_expand_both_directions() {
2193        let store = create_test_store();
2194        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2195
2196        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2197        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2198            items: vec![
2199                ReturnItem {
2200                    expression: LogicalExpression::Variable("a".to_string()),
2201                    alias: None,
2202                },
2203                ReturnItem {
2204                    expression: LogicalExpression::Variable("b".to_string()),
2205                    alias: None,
2206                },
2207            ],
2208            distinct: false,
2209            input: Box::new(LogicalOperator::Expand(ExpandOp {
2210                from_variable: "a".to_string(),
2211                to_variable: "b".to_string(),
2212                edge_variable: None,
2213                direction: ExpandDirection::Both,
2214                edge_type: Some("KNOWS".to_string()),
2215                min_hops: 1,
2216                max_hops: Some(1),
2217                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2218                    variable: "a".to_string(),
2219                    label: None,
2220                    input: None,
2221                })),
2222                path_alias: None,
2223            })),
2224        }));
2225
2226        let physical = planner.plan(&logical).unwrap();
2227        assert!(physical.columns().contains(&"a".to_string()));
2228        assert!(physical.columns().contains(&"b".to_string()));
2229    }
2230
2231    // ==================== With Context Tests ====================
2232
2233    #[test]
2234    fn test_planner_with_context() {
2235        use crate::transaction::TransactionManager;
2236
2237        let store = create_test_store();
2238        let tx_manager = Arc::new(TransactionManager::new());
2239        let tx_id = tx_manager.begin();
2240        let epoch = tx_manager.current_epoch();
2241
2242        let planner = Planner::with_context(
2243            Arc::clone(&store),
2244            Arc::clone(&tx_manager),
2245            Some(tx_id),
2246            epoch,
2247        );
2248
2249        assert_eq!(planner.tx_id(), Some(tx_id));
2250        assert!(planner.tx_manager().is_some());
2251        assert_eq!(planner.viewing_epoch(), epoch);
2252    }
2253
2254    #[test]
2255    fn test_planner_with_factorized_execution_disabled() {
2256        let store = create_test_store();
2257        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2258
2259        // Two consecutive expands - should NOT use factorized execution
2260        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2261            items: vec![
2262                ReturnItem {
2263                    expression: LogicalExpression::Variable("a".to_string()),
2264                    alias: None,
2265                },
2266                ReturnItem {
2267                    expression: LogicalExpression::Variable("c".to_string()),
2268                    alias: None,
2269                },
2270            ],
2271            distinct: false,
2272            input: Box::new(LogicalOperator::Expand(ExpandOp {
2273                from_variable: "b".to_string(),
2274                to_variable: "c".to_string(),
2275                edge_variable: None,
2276                direction: ExpandDirection::Outgoing,
2277                edge_type: None,
2278                min_hops: 1,
2279                max_hops: Some(1),
2280                input: Box::new(LogicalOperator::Expand(ExpandOp {
2281                    from_variable: "a".to_string(),
2282                    to_variable: "b".to_string(),
2283                    edge_variable: None,
2284                    direction: ExpandDirection::Outgoing,
2285                    edge_type: None,
2286                    min_hops: 1,
2287                    max_hops: Some(1),
2288                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2289                        variable: "a".to_string(),
2290                        label: None,
2291                        input: None,
2292                    })),
2293                    path_alias: None,
2294                })),
2295                path_alias: None,
2296            })),
2297        }));
2298
2299        let physical = planner.plan(&logical).unwrap();
2300        assert!(physical.columns().contains(&"a".to_string()));
2301        assert!(physical.columns().contains(&"c".to_string()));
2302    }
2303
2304    // ==================== Sort with Property Tests ====================
2305
2306    #[test]
2307    fn test_plan_sort_by_property() {
2308        let store = create_test_store();
2309        let planner = Planner::new(store);
2310
2311        // MATCH (n) RETURN n ORDER BY n.name ASC
2312        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2313            items: vec![ReturnItem {
2314                expression: LogicalExpression::Variable("n".to_string()),
2315                alias: None,
2316            }],
2317            distinct: false,
2318            input: Box::new(LogicalOperator::Sort(SortOp {
2319                keys: vec![SortKey {
2320                    expression: LogicalExpression::Property {
2321                        variable: "n".to_string(),
2322                        property: "name".to_string(),
2323                    },
2324                    order: SortOrder::Ascending,
2325                }],
2326                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2327                    variable: "n".to_string(),
2328                    label: None,
2329                    input: None,
2330                })),
2331            })),
2332        }));
2333
2334        let physical = planner.plan(&logical).unwrap();
2335        // Should have the property column projected
2336        assert!(physical.columns().contains(&"n".to_string()));
2337    }
2338
2339    // ==================== Scan with Input Tests ====================
2340
2341    #[test]
2342    fn test_plan_scan_with_input() {
2343        let store = create_test_store();
2344        let planner = Planner::new(store);
2345
2346        // A scan with another scan as input (for chained patterns)
2347        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2348            items: vec![
2349                ReturnItem {
2350                    expression: LogicalExpression::Variable("a".to_string()),
2351                    alias: None,
2352                },
2353                ReturnItem {
2354                    expression: LogicalExpression::Variable("b".to_string()),
2355                    alias: None,
2356                },
2357            ],
2358            distinct: false,
2359            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2360                variable: "b".to_string(),
2361                label: Some("Company".to_string()),
2362                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2363                    variable: "a".to_string(),
2364                    label: Some("Person".to_string()),
2365                    input: None,
2366                }))),
2367            })),
2368        }));
2369
2370        let physical = planner.plan(&logical).unwrap();
2371        assert!(physical.columns().contains(&"a".to_string()));
2372        assert!(physical.columns().contains(&"b".to_string()));
2373    }
2374}