Skip to main content

grafeo_engine/query/planner/
mod.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7mod aggregate;
8mod expand;
9mod expression;
10mod filter;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16use crate::query::plan::{
17    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
18    CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
19    ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
20    LogicalOperator, LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp,
21    ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
22};
23use grafeo_common::types::{EpochId, TxId};
24use grafeo_common::types::{LogicalType, Value};
25use grafeo_common::utils::error::{Error, Result};
26use grafeo_core::execution::AdaptiveContext;
27use grafeo_core::execution::operators::{
28    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
29    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
30    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
31    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
32    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
33    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
34    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
35    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
36    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
37    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
38    UnwindOperator, VariableLengthExpandOperator,
39};
40use grafeo_core::graph::{Direction, lpg::LpgStore};
41use std::collections::HashMap;
42use std::sync::Arc;
43
44use crate::transaction::TransactionManager;
45
46/// Range bounds for property-based range queries.
47struct RangeBounds<'a> {
48    min: Option<&'a Value>,
49    max: Option<&'a Value>,
50    min_inclusive: bool,
51    max_inclusive: bool,
52}
53
54/// Converts a logical plan to a physical operator tree.
55pub struct Planner {
56    /// The graph store to scan from.
57    pub(super) store: Arc<LpgStore>,
58    /// Transaction manager for MVCC operations.
59    pub(super) tx_manager: Option<Arc<TransactionManager>>,
60    /// Current transaction ID (if in a transaction).
61    pub(super) tx_id: Option<TxId>,
62    /// Epoch to use for visibility checks.
63    pub(super) viewing_epoch: EpochId,
64    /// Counter for generating unique anonymous edge column names.
65    pub(super) anon_edge_counter: std::cell::Cell<u32>,
66    /// Whether to use factorized execution for multi-hop queries.
67    pub(super) factorized_execution: bool,
68    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
69    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
70    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
71    /// Variables that hold edge IDs (from MATCH edge patterns).
72    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
73    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
74}
75
76impl Planner {
77    /// Creates a new planner with the given store.
78    ///
79    /// This creates a planner without transaction context, using the current
80    /// epoch from the store for visibility.
81    #[must_use]
82    pub fn new(store: Arc<LpgStore>) -> Self {
83        let epoch = store.current_epoch();
84        Self {
85            store,
86            tx_manager: None,
87            tx_id: None,
88            viewing_epoch: epoch,
89            anon_edge_counter: std::cell::Cell::new(0),
90            factorized_execution: true,
91            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
92            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
93        }
94    }
95
96    /// Creates a new planner with transaction context for MVCC-aware planning.
97    ///
98    /// # Arguments
99    ///
100    /// * `store` - The graph store
101    /// * `tx_manager` - Transaction manager for recording reads/writes
102    /// * `tx_id` - Current transaction ID (None for auto-commit)
103    /// * `viewing_epoch` - Epoch to use for version visibility
104    #[must_use]
105    pub fn with_context(
106        store: Arc<LpgStore>,
107        tx_manager: Arc<TransactionManager>,
108        tx_id: Option<TxId>,
109        viewing_epoch: EpochId,
110    ) -> Self {
111        Self {
112            store,
113            tx_manager: Some(tx_manager),
114            tx_id,
115            viewing_epoch,
116            anon_edge_counter: std::cell::Cell::new(0),
117            factorized_execution: true,
118            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
119            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
120        }
121    }
122
123    /// Returns the viewing epoch for this planner.
124    #[must_use]
125    pub fn viewing_epoch(&self) -> EpochId {
126        self.viewing_epoch
127    }
128
129    /// Returns the transaction ID for this planner, if any.
130    #[must_use]
131    pub fn tx_id(&self) -> Option<TxId> {
132        self.tx_id
133    }
134
135    /// Returns a reference to the transaction manager, if available.
136    #[must_use]
137    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
138        self.tx_manager.as_ref()
139    }
140
141    /// Enables or disables factorized execution for multi-hop queries.
142    #[must_use]
143    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
144        self.factorized_execution = enabled;
145        self
146    }
147
148    /// Counts consecutive single-hop expand operations.
149    ///
150    /// Returns the count and the deepest non-expand operator (the base of the chain).
151    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
152        match op {
153            LogicalOperator::Expand(expand) => {
154                // Only count single-hop expands (factorization doesn't apply to variable-length)
155                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
156
157                if is_single_hop {
158                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
159                    (inner_count + 1, base)
160                } else {
161                    // Variable-length path breaks the chain
162                    (0, op)
163                }
164            }
165            _ => (0, op),
166        }
167    }
168
169    /// Collects expand operations from the outermost down to the base.
170    ///
171    /// Returns expands in order from innermost (base) to outermost.
172    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
173        let mut chain = Vec::new();
174        let mut current = op;
175
176        while let LogicalOperator::Expand(expand) = current {
177            // Only include single-hop expands
178            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
179            if !is_single_hop {
180                break;
181            }
182            chain.push(expand);
183            current = &expand.input;
184        }
185
186        // Reverse so we go from base to outer
187        chain.reverse();
188        chain
189    }
190
191    /// Plans a logical plan into a physical operator.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if planning fails.
196    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
197        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
198        Ok(PhysicalPlan {
199            operator,
200            columns,
201            adaptive_context: None,
202        })
203    }
204
205    /// Plans a logical plan with adaptive execution support.
206    ///
207    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
208    /// joins) that can be monitored during execution to detect estimate errors.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if planning fails.
213    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
214        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
215
216        // Build adaptive context with cardinality estimates
217        let mut adaptive_context = AdaptiveContext::new();
218        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
219
220        Ok(PhysicalPlan {
221            operator,
222            columns,
223            adaptive_context: Some(adaptive_context),
224        })
225    }
226
227    /// Collects cardinality estimates from the logical plan into an adaptive context.
228    fn collect_cardinality_estimates(
229        &self,
230        op: &LogicalOperator,
231        ctx: &mut AdaptiveContext,
232        depth: usize,
233    ) {
234        match op {
235            LogicalOperator::NodeScan(scan) => {
236                // Estimate based on label statistics
237                let estimate = if let Some(label) = &scan.label {
238                    self.store.nodes_by_label(label).len() as f64
239                } else {
240                    self.store.node_count() as f64
241                };
242                let id = format!("scan_{}", scan.variable);
243                ctx.set_estimate(&id, estimate);
244
245                // Recurse into input if present
246                if let Some(input) = &scan.input {
247                    self.collect_cardinality_estimates(input, ctx, depth + 1);
248                }
249            }
250            LogicalOperator::Filter(filter) => {
251                // Default selectivity estimate for filters (30%)
252                let input_estimate = self.estimate_cardinality(&filter.input);
253                let estimate = input_estimate * 0.3;
254                let id = format!("filter_{depth}");
255                ctx.set_estimate(&id, estimate);
256
257                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
258            }
259            LogicalOperator::Expand(expand) => {
260                // Estimate based on average degree from store statistics
261                let input_estimate = self.estimate_cardinality(&expand.input);
262                let stats = self.store.statistics();
263                let avg_degree = self.estimate_expand_degree(&stats, expand);
264                let estimate = input_estimate * avg_degree;
265                let id = format!("expand_{}", expand.to_variable);
266                ctx.set_estimate(&id, estimate);
267
268                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
269            }
270            LogicalOperator::Join(join) => {
271                // Estimate join output (product with selectivity)
272                let left_est = self.estimate_cardinality(&join.left);
273                let right_est = self.estimate_cardinality(&join.right);
274                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
275                let id = format!("join_{depth}");
276                ctx.set_estimate(&id, estimate);
277
278                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
279                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
280            }
281            LogicalOperator::Aggregate(agg) => {
282                // Aggregates typically reduce cardinality
283                let input_estimate = self.estimate_cardinality(&agg.input);
284                let estimate = if agg.group_by.is_empty() {
285                    1.0 // Scalar aggregate
286                } else {
287                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
288                };
289                let id = format!("aggregate_{depth}");
290                ctx.set_estimate(&id, estimate);
291
292                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
293            }
294            LogicalOperator::Distinct(distinct) => {
295                let input_estimate = self.estimate_cardinality(&distinct.input);
296                let estimate = (input_estimate * 0.5).max(1.0);
297                let id = format!("distinct_{depth}");
298                ctx.set_estimate(&id, estimate);
299
300                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
301            }
302            LogicalOperator::Return(ret) => {
303                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
304            }
305            LogicalOperator::Limit(limit) => {
306                let input_estimate = self.estimate_cardinality(&limit.input);
307                let estimate = (input_estimate).min(limit.count as f64);
308                let id = format!("limit_{depth}");
309                ctx.set_estimate(&id, estimate);
310
311                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
312            }
313            LogicalOperator::Skip(skip) => {
314                let input_estimate = self.estimate_cardinality(&skip.input);
315                let estimate = (input_estimate - skip.count as f64).max(0.0);
316                let id = format!("skip_{depth}");
317                ctx.set_estimate(&id, estimate);
318
319                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
320            }
321            LogicalOperator::Sort(sort) => {
322                // Sort doesn't change cardinality
323                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
324            }
325            LogicalOperator::Union(union) => {
326                let estimate: f64 = union
327                    .inputs
328                    .iter()
329                    .map(|input| self.estimate_cardinality(input))
330                    .sum();
331                let id = format!("union_{depth}");
332                ctx.set_estimate(&id, estimate);
333
334                for input in &union.inputs {
335                    self.collect_cardinality_estimates(input, ctx, depth + 1);
336                }
337            }
338            _ => {
339                // For other operators, try to recurse into known input patterns
340            }
341        }
342    }
343
344    /// Estimates cardinality for a logical operator subtree.
345    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
346        match op {
347            LogicalOperator::NodeScan(scan) => {
348                if let Some(label) = &scan.label {
349                    self.store.nodes_by_label(label).len() as f64
350                } else {
351                    self.store.node_count() as f64
352                }
353            }
354            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
355            LogicalOperator::Expand(expand) => {
356                let stats = self.store.statistics();
357                let avg_degree = self.estimate_expand_degree(&stats, expand);
358                self.estimate_cardinality(&expand.input) * avg_degree
359            }
360            LogicalOperator::Join(join) => {
361                let left = self.estimate_cardinality(&join.left);
362                let right = self.estimate_cardinality(&join.right);
363                (left * right).sqrt()
364            }
365            LogicalOperator::Aggregate(agg) => {
366                if agg.group_by.is_empty() {
367                    1.0
368                } else {
369                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
370                }
371            }
372            LogicalOperator::Distinct(distinct) => {
373                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
374            }
375            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
376            LogicalOperator::Limit(limit) => self
377                .estimate_cardinality(&limit.input)
378                .min(limit.count as f64),
379            LogicalOperator::Skip(skip) => {
380                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
381            }
382            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
383            LogicalOperator::Union(union) => union
384                .inputs
385                .iter()
386                .map(|input| self.estimate_cardinality(input))
387                .sum(),
388            _ => 1000.0, // Default estimate for unknown operators
389        }
390    }
391
392    /// Estimates the average edge degree for an expand operation using store statistics.
393    fn estimate_expand_degree(
394        &self,
395        stats: &grafeo_core::statistics::Statistics,
396        expand: &ExpandOp,
397    ) -> f64 {
398        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
399        if let Some(edge_type) = &expand.edge_type {
400            stats.estimate_avg_degree(edge_type, outgoing)
401        } else if stats.total_nodes > 0 {
402            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
403        } else {
404            10.0 // fallback for empty graph
405        }
406    }
407
408    /// Plans a single logical operator.
409    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
410        match op {
411            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
412            LogicalOperator::Expand(expand) => {
413                // Check for expand chains when factorized execution is enabled
414                if self.factorized_execution {
415                    let (chain_len, _base) = Self::count_expand_chain(op);
416                    if chain_len >= 2 {
417                        // Use factorized chain for 2+ consecutive single-hop expands
418                        return self.plan_expand_chain(op);
419                    }
420                }
421                self.plan_expand(expand)
422            }
423            LogicalOperator::Return(ret) => self.plan_return(ret),
424            LogicalOperator::Filter(filter) => self.plan_filter(filter),
425            LogicalOperator::Project(project) => self.plan_project(project),
426            LogicalOperator::Limit(limit) => self.plan_limit(limit),
427            LogicalOperator::Skip(skip) => self.plan_skip(skip),
428            LogicalOperator::Sort(sort) => self.plan_sort(sort),
429            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
430            LogicalOperator::Join(join) => self.plan_join(join),
431            LogicalOperator::Union(union) => self.plan_union(union),
432            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
433            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
434            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
435            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
436            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
437            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
438            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
439            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
440            LogicalOperator::Merge(merge) => self.plan_merge(merge),
441            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
442            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
443            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
444            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
445            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
446            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
447            LogicalOperator::VectorScan(_) => Err(Error::Internal(
448                "VectorScan requires vector-index feature".to_string(),
449            )),
450            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
451                "VectorJoin requires vector-index feature".to_string(),
452            )),
453            _ => Err(Error::Internal(format!(
454                "Unsupported operator: {:?}",
455                std::mem::discriminant(op)
456            ))),
457        }
458    }
459}
460
461/// Converts a logical binary operator to a filter binary operator.
462pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
463    match op {
464        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
465        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
466        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
467        BinaryOp::Le => Ok(BinaryFilterOp::Le),
468        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
469        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
470        BinaryOp::And => Ok(BinaryFilterOp::And),
471        BinaryOp::Or => Ok(BinaryFilterOp::Or),
472        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
473        BinaryOp::Add => Ok(BinaryFilterOp::Add),
474        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
475        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
476        BinaryOp::Div => Ok(BinaryFilterOp::Div),
477        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
478        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
479        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
480        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
481        BinaryOp::In => Ok(BinaryFilterOp::In),
482        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
483        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
484        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
485            "Binary operator {:?} not yet supported in filters",
486            op
487        ))),
488    }
489}
490
491/// Converts a logical unary operator to a filter unary operator.
492pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
493    match op {
494        UnaryOp::Not => Ok(UnaryFilterOp::Not),
495        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
496        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
497        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
498    }
499}
500
501/// Converts a logical aggregate function to a physical aggregate function.
502pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
503    match func {
504        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
505        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
506        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
507        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
508        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
509        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
510        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
511        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
512        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
513        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
514        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
515    }
516}
517
518/// Converts a logical expression to a filter expression.
519///
520/// This is a standalone function that can be used by both LPG and RDF planners.
521pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
522    match expr {
523        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
524        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
525        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
526            variable: variable.clone(),
527            property: property.clone(),
528        }),
529        LogicalExpression::Binary { left, op, right } => {
530            let left_expr = convert_filter_expression(left)?;
531            let right_expr = convert_filter_expression(right)?;
532            let filter_op = convert_binary_op(*op)?;
533            Ok(FilterExpression::Binary {
534                left: Box::new(left_expr),
535                op: filter_op,
536                right: Box::new(right_expr),
537            })
538        }
539        LogicalExpression::Unary { op, operand } => {
540            let operand_expr = convert_filter_expression(operand)?;
541            let filter_op = convert_unary_op(*op)?;
542            Ok(FilterExpression::Unary {
543                op: filter_op,
544                operand: Box::new(operand_expr),
545            })
546        }
547        LogicalExpression::FunctionCall { name, args, .. } => {
548            let filter_args: Vec<FilterExpression> = args
549                .iter()
550                .map(convert_filter_expression)
551                .collect::<Result<Vec<_>>>()?;
552            Ok(FilterExpression::FunctionCall {
553                name: name.clone(),
554                args: filter_args,
555            })
556        }
557        LogicalExpression::Case {
558            operand,
559            when_clauses,
560            else_clause,
561        } => {
562            let filter_operand = operand
563                .as_ref()
564                .map(|e| convert_filter_expression(e))
565                .transpose()?
566                .map(Box::new);
567            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
568                .iter()
569                .map(|(cond, result)| {
570                    Ok((
571                        convert_filter_expression(cond)?,
572                        convert_filter_expression(result)?,
573                    ))
574                })
575                .collect::<Result<Vec<_>>>()?;
576            let filter_else = else_clause
577                .as_ref()
578                .map(|e| convert_filter_expression(e))
579                .transpose()?
580                .map(Box::new);
581            Ok(FilterExpression::Case {
582                operand: filter_operand,
583                when_clauses: filter_when_clauses,
584                else_clause: filter_else,
585            })
586        }
587        LogicalExpression::List(items) => {
588            let filter_items: Vec<FilterExpression> = items
589                .iter()
590                .map(convert_filter_expression)
591                .collect::<Result<Vec<_>>>()?;
592            Ok(FilterExpression::List(filter_items))
593        }
594        LogicalExpression::Map(pairs) => {
595            let filter_pairs: Vec<(String, FilterExpression)> = pairs
596                .iter()
597                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
598                .collect::<Result<Vec<_>>>()?;
599            Ok(FilterExpression::Map(filter_pairs))
600        }
601        LogicalExpression::IndexAccess { base, index } => {
602            let base_expr = convert_filter_expression(base)?;
603            let index_expr = convert_filter_expression(index)?;
604            Ok(FilterExpression::IndexAccess {
605                base: Box::new(base_expr),
606                index: Box::new(index_expr),
607            })
608        }
609        LogicalExpression::SliceAccess { base, start, end } => {
610            let base_expr = convert_filter_expression(base)?;
611            let start_expr = start
612                .as_ref()
613                .map(|s| convert_filter_expression(s))
614                .transpose()?
615                .map(Box::new);
616            let end_expr = end
617                .as_ref()
618                .map(|e| convert_filter_expression(e))
619                .transpose()?
620                .map(Box::new);
621            Ok(FilterExpression::SliceAccess {
622                base: Box::new(base_expr),
623                start: start_expr,
624                end: end_expr,
625            })
626        }
627        LogicalExpression::Parameter(_) => Err(Error::Internal(
628            "Parameters not yet supported in filters".to_string(),
629        )),
630        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
631        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
632        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
633        LogicalExpression::ListComprehension {
634            variable,
635            list_expr,
636            filter_expr,
637            map_expr,
638        } => {
639            let list = convert_filter_expression(list_expr)?;
640            let filter = filter_expr
641                .as_ref()
642                .map(|f| convert_filter_expression(f))
643                .transpose()?
644                .map(Box::new);
645            let map = convert_filter_expression(map_expr)?;
646            Ok(FilterExpression::ListComprehension {
647                variable: variable.clone(),
648                list_expr: Box::new(list),
649                filter_expr: filter,
650                map_expr: Box::new(map),
651            })
652        }
653        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
654            Error::Internal("Subqueries not yet supported in filters".to_string()),
655        ),
656    }
657}
658
659/// Infers the logical type from a value.
660fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
661    use grafeo_common::types::Value;
662    match value {
663        Value::Null => LogicalType::String, // Default type for null
664        Value::Bool(_) => LogicalType::Bool,
665        Value::Int64(_) => LogicalType::Int64,
666        Value::Float64(_) => LogicalType::Float64,
667        Value::String(_) => LogicalType::String,
668        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
669        Value::Timestamp(_) => LogicalType::Timestamp,
670        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
671        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
672        Value::Vector(v) => LogicalType::Vector(v.len()),
673    }
674}
675
676/// Converts an expression to a string for column naming.
677fn expression_to_string(expr: &LogicalExpression) -> String {
678    match expr {
679        LogicalExpression::Variable(name) => name.clone(),
680        LogicalExpression::Property { variable, property } => {
681            format!("{variable}.{property}")
682        }
683        LogicalExpression::Literal(value) => format!("{value:?}"),
684        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
685        _ => "expr".to_string(),
686    }
687}
688
689/// A physical plan ready for execution.
690pub struct PhysicalPlan {
691    /// The root physical operator.
692    pub operator: Box<dyn Operator>,
693    /// Column names for the result.
694    pub columns: Vec<String>,
695    /// Adaptive execution context with cardinality estimates.
696    ///
697    /// When adaptive execution is enabled, this context contains estimated
698    /// cardinalities at various checkpoints in the plan. During execution,
699    /// actual row counts are recorded and compared against estimates.
700    pub adaptive_context: Option<AdaptiveContext>,
701}
702
703impl PhysicalPlan {
704    /// Returns the column names.
705    #[must_use]
706    pub fn columns(&self) -> &[String] {
707        &self.columns
708    }
709
710    /// Consumes the plan and returns the operator.
711    pub fn into_operator(self) -> Box<dyn Operator> {
712        self.operator
713    }
714
715    /// Returns the adaptive context, if adaptive execution is enabled.
716    #[must_use]
717    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
718        self.adaptive_context.as_ref()
719    }
720
721    /// Takes ownership of the adaptive context.
722    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
723        self.adaptive_context.take()
724    }
725}
726
727/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
728struct StaticResultOperator {
729    rows: Vec<Vec<Value>>,
730    column_indices: Vec<usize>,
731    row_index: usize,
732}
733
734impl Operator for StaticResultOperator {
735    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
736        use grafeo_core::execution::DataChunk;
737
738        if self.row_index >= self.rows.len() {
739            return Ok(None);
740        }
741
742        let remaining = self.rows.len() - self.row_index;
743        let chunk_rows = remaining.min(1024);
744        let col_count = self.column_indices.len();
745
746        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
747        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
748
749        for row_offset in 0..chunk_rows {
750            let row = &self.rows[self.row_index + row_offset];
751            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
752                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
753                if let Some(col) = chunk.column_mut(col_idx) {
754                    col.push_value(value);
755                }
756            }
757        }
758        chunk.set_count(chunk_rows);
759
760        self.row_index += chunk_rows;
761        Ok(Some(chunk))
762    }
763
764    fn reset(&mut self) {
765        self.row_index = 0;
766    }
767
768    fn name(&self) -> &'static str {
769        "StaticResult"
770    }
771}
772
773#[cfg(test)]
774mod tests {
775    use super::*;
776    use crate::query::plan::{
777        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
778        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
779        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
780        SortKey, SortOp,
781    };
782    use grafeo_common::types::Value;
783
784    fn create_test_store() -> Arc<LpgStore> {
785        let store = Arc::new(LpgStore::new());
786        store.create_node(&["Person"]);
787        store.create_node(&["Person"]);
788        store.create_node(&["Company"]);
789        store
790    }
791
792    // ==================== Simple Scan Tests ====================
793
794    #[test]
795    fn test_plan_simple_scan() {
796        let store = create_test_store();
797        let planner = Planner::new(store);
798
799        // MATCH (n:Person) RETURN n
800        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
801            items: vec![ReturnItem {
802                expression: LogicalExpression::Variable("n".to_string()),
803                alias: None,
804            }],
805            distinct: false,
806            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
807                variable: "n".to_string(),
808                label: Some("Person".to_string()),
809                input: None,
810            })),
811        }));
812
813        let physical = planner.plan(&logical).unwrap();
814        assert_eq!(physical.columns(), &["n"]);
815    }
816
817    #[test]
818    fn test_plan_scan_without_label() {
819        let store = create_test_store();
820        let planner = Planner::new(store);
821
822        // MATCH (n) RETURN n
823        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
824            items: vec![ReturnItem {
825                expression: LogicalExpression::Variable("n".to_string()),
826                alias: None,
827            }],
828            distinct: false,
829            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
830                variable: "n".to_string(),
831                label: None,
832                input: None,
833            })),
834        }));
835
836        let physical = planner.plan(&logical).unwrap();
837        assert_eq!(physical.columns(), &["n"]);
838    }
839
840    #[test]
841    fn test_plan_return_with_alias() {
842        let store = create_test_store();
843        let planner = Planner::new(store);
844
845        // MATCH (n:Person) RETURN n AS person
846        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
847            items: vec![ReturnItem {
848                expression: LogicalExpression::Variable("n".to_string()),
849                alias: Some("person".to_string()),
850            }],
851            distinct: false,
852            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
853                variable: "n".to_string(),
854                label: Some("Person".to_string()),
855                input: None,
856            })),
857        }));
858
859        let physical = planner.plan(&logical).unwrap();
860        assert_eq!(physical.columns(), &["person"]);
861    }
862
863    #[test]
864    fn test_plan_return_property() {
865        let store = create_test_store();
866        let planner = Planner::new(store);
867
868        // MATCH (n:Person) RETURN n.name
869        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
870            items: vec![ReturnItem {
871                expression: LogicalExpression::Property {
872                    variable: "n".to_string(),
873                    property: "name".to_string(),
874                },
875                alias: None,
876            }],
877            distinct: false,
878            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
879                variable: "n".to_string(),
880                label: Some("Person".to_string()),
881                input: None,
882            })),
883        }));
884
885        let physical = planner.plan(&logical).unwrap();
886        assert_eq!(physical.columns(), &["n.name"]);
887    }
888
889    #[test]
890    fn test_plan_return_literal() {
891        let store = create_test_store();
892        let planner = Planner::new(store);
893
894        // MATCH (n) RETURN 42 AS answer
895        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
896            items: vec![ReturnItem {
897                expression: LogicalExpression::Literal(Value::Int64(42)),
898                alias: Some("answer".to_string()),
899            }],
900            distinct: false,
901            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
902                variable: "n".to_string(),
903                label: None,
904                input: None,
905            })),
906        }));
907
908        let physical = planner.plan(&logical).unwrap();
909        assert_eq!(physical.columns(), &["answer"]);
910    }
911
912    // ==================== Filter Tests ====================
913
914    #[test]
915    fn test_plan_filter_equality() {
916        let store = create_test_store();
917        let planner = Planner::new(store);
918
919        // MATCH (n:Person) WHERE n.age = 30 RETURN n
920        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
921            items: vec![ReturnItem {
922                expression: LogicalExpression::Variable("n".to_string()),
923                alias: None,
924            }],
925            distinct: false,
926            input: Box::new(LogicalOperator::Filter(FilterOp {
927                predicate: LogicalExpression::Binary {
928                    left: Box::new(LogicalExpression::Property {
929                        variable: "n".to_string(),
930                        property: "age".to_string(),
931                    }),
932                    op: BinaryOp::Eq,
933                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
934                },
935                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
936                    variable: "n".to_string(),
937                    label: Some("Person".to_string()),
938                    input: None,
939                })),
940            })),
941        }));
942
943        let physical = planner.plan(&logical).unwrap();
944        assert_eq!(physical.columns(), &["n"]);
945    }
946
947    #[test]
948    fn test_plan_filter_compound_and() {
949        let store = create_test_store();
950        let planner = Planner::new(store);
951
952        // WHERE n.age > 20 AND n.age < 40
953        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
954            items: vec![ReturnItem {
955                expression: LogicalExpression::Variable("n".to_string()),
956                alias: None,
957            }],
958            distinct: false,
959            input: Box::new(LogicalOperator::Filter(FilterOp {
960                predicate: LogicalExpression::Binary {
961                    left: Box::new(LogicalExpression::Binary {
962                        left: Box::new(LogicalExpression::Property {
963                            variable: "n".to_string(),
964                            property: "age".to_string(),
965                        }),
966                        op: BinaryOp::Gt,
967                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
968                    }),
969                    op: BinaryOp::And,
970                    right: Box::new(LogicalExpression::Binary {
971                        left: Box::new(LogicalExpression::Property {
972                            variable: "n".to_string(),
973                            property: "age".to_string(),
974                        }),
975                        op: BinaryOp::Lt,
976                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
977                    }),
978                },
979                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
980                    variable: "n".to_string(),
981                    label: None,
982                    input: None,
983                })),
984            })),
985        }));
986
987        let physical = planner.plan(&logical).unwrap();
988        assert_eq!(physical.columns(), &["n"]);
989    }
990
991    #[test]
992    fn test_plan_filter_unary_not() {
993        let store = create_test_store();
994        let planner = Planner::new(store);
995
996        // WHERE NOT n.active
997        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
998            items: vec![ReturnItem {
999                expression: LogicalExpression::Variable("n".to_string()),
1000                alias: None,
1001            }],
1002            distinct: false,
1003            input: Box::new(LogicalOperator::Filter(FilterOp {
1004                predicate: LogicalExpression::Unary {
1005                    op: UnaryOp::Not,
1006                    operand: Box::new(LogicalExpression::Property {
1007                        variable: "n".to_string(),
1008                        property: "active".to_string(),
1009                    }),
1010                },
1011                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1012                    variable: "n".to_string(),
1013                    label: None,
1014                    input: None,
1015                })),
1016            })),
1017        }));
1018
1019        let physical = planner.plan(&logical).unwrap();
1020        assert_eq!(physical.columns(), &["n"]);
1021    }
1022
1023    #[test]
1024    fn test_plan_filter_is_null() {
1025        let store = create_test_store();
1026        let planner = Planner::new(store);
1027
1028        // WHERE n.email IS NULL
1029        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1030            items: vec![ReturnItem {
1031                expression: LogicalExpression::Variable("n".to_string()),
1032                alias: None,
1033            }],
1034            distinct: false,
1035            input: Box::new(LogicalOperator::Filter(FilterOp {
1036                predicate: LogicalExpression::Unary {
1037                    op: UnaryOp::IsNull,
1038                    operand: Box::new(LogicalExpression::Property {
1039                        variable: "n".to_string(),
1040                        property: "email".to_string(),
1041                    }),
1042                },
1043                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1044                    variable: "n".to_string(),
1045                    label: None,
1046                    input: None,
1047                })),
1048            })),
1049        }));
1050
1051        let physical = planner.plan(&logical).unwrap();
1052        assert_eq!(physical.columns(), &["n"]);
1053    }
1054
1055    #[test]
1056    fn test_plan_filter_function_call() {
1057        let store = create_test_store();
1058        let planner = Planner::new(store);
1059
1060        // WHERE size(n.friends) > 0
1061        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1062            items: vec![ReturnItem {
1063                expression: LogicalExpression::Variable("n".to_string()),
1064                alias: None,
1065            }],
1066            distinct: false,
1067            input: Box::new(LogicalOperator::Filter(FilterOp {
1068                predicate: LogicalExpression::Binary {
1069                    left: Box::new(LogicalExpression::FunctionCall {
1070                        name: "size".to_string(),
1071                        args: vec![LogicalExpression::Property {
1072                            variable: "n".to_string(),
1073                            property: "friends".to_string(),
1074                        }],
1075                        distinct: false,
1076                    }),
1077                    op: BinaryOp::Gt,
1078                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1079                },
1080                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1081                    variable: "n".to_string(),
1082                    label: None,
1083                    input: None,
1084                })),
1085            })),
1086        }));
1087
1088        let physical = planner.plan(&logical).unwrap();
1089        assert_eq!(physical.columns(), &["n"]);
1090    }
1091
1092    // ==================== Expand Tests ====================
1093
1094    #[test]
1095    fn test_plan_expand_outgoing() {
1096        let store = create_test_store();
1097        let planner = Planner::new(store);
1098
1099        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1100        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1101            items: vec![
1102                ReturnItem {
1103                    expression: LogicalExpression::Variable("a".to_string()),
1104                    alias: None,
1105                },
1106                ReturnItem {
1107                    expression: LogicalExpression::Variable("b".to_string()),
1108                    alias: None,
1109                },
1110            ],
1111            distinct: false,
1112            input: Box::new(LogicalOperator::Expand(ExpandOp {
1113                from_variable: "a".to_string(),
1114                to_variable: "b".to_string(),
1115                edge_variable: None,
1116                direction: ExpandDirection::Outgoing,
1117                edge_type: Some("KNOWS".to_string()),
1118                min_hops: 1,
1119                max_hops: Some(1),
1120                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1121                    variable: "a".to_string(),
1122                    label: Some("Person".to_string()),
1123                    input: None,
1124                })),
1125                path_alias: None,
1126            })),
1127        }));
1128
1129        let physical = planner.plan(&logical).unwrap();
1130        // The return should have columns [a, b]
1131        assert!(physical.columns().contains(&"a".to_string()));
1132        assert!(physical.columns().contains(&"b".to_string()));
1133    }
1134
1135    #[test]
1136    fn test_plan_expand_with_edge_variable() {
1137        let store = create_test_store();
1138        let planner = Planner::new(store);
1139
1140        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1141        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1142            items: vec![
1143                ReturnItem {
1144                    expression: LogicalExpression::Variable("a".to_string()),
1145                    alias: None,
1146                },
1147                ReturnItem {
1148                    expression: LogicalExpression::Variable("r".to_string()),
1149                    alias: None,
1150                },
1151                ReturnItem {
1152                    expression: LogicalExpression::Variable("b".to_string()),
1153                    alias: None,
1154                },
1155            ],
1156            distinct: false,
1157            input: Box::new(LogicalOperator::Expand(ExpandOp {
1158                from_variable: "a".to_string(),
1159                to_variable: "b".to_string(),
1160                edge_variable: Some("r".to_string()),
1161                direction: ExpandDirection::Outgoing,
1162                edge_type: Some("KNOWS".to_string()),
1163                min_hops: 1,
1164                max_hops: Some(1),
1165                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1166                    variable: "a".to_string(),
1167                    label: None,
1168                    input: None,
1169                })),
1170                path_alias: None,
1171            })),
1172        }));
1173
1174        let physical = planner.plan(&logical).unwrap();
1175        assert!(physical.columns().contains(&"a".to_string()));
1176        assert!(physical.columns().contains(&"r".to_string()));
1177        assert!(physical.columns().contains(&"b".to_string()));
1178    }
1179
1180    // ==================== Limit/Skip/Sort Tests ====================
1181
1182    #[test]
1183    fn test_plan_limit() {
1184        let store = create_test_store();
1185        let planner = Planner::new(store);
1186
1187        // MATCH (n) RETURN n LIMIT 10
1188        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1189            items: vec![ReturnItem {
1190                expression: LogicalExpression::Variable("n".to_string()),
1191                alias: None,
1192            }],
1193            distinct: false,
1194            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1195                count: 10,
1196                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1197                    variable: "n".to_string(),
1198                    label: None,
1199                    input: None,
1200                })),
1201            })),
1202        }));
1203
1204        let physical = planner.plan(&logical).unwrap();
1205        assert_eq!(physical.columns(), &["n"]);
1206    }
1207
1208    #[test]
1209    fn test_plan_skip() {
1210        let store = create_test_store();
1211        let planner = Planner::new(store);
1212
1213        // MATCH (n) RETURN n SKIP 5
1214        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1215            items: vec![ReturnItem {
1216                expression: LogicalExpression::Variable("n".to_string()),
1217                alias: None,
1218            }],
1219            distinct: false,
1220            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1221                count: 5,
1222                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1223                    variable: "n".to_string(),
1224                    label: None,
1225                    input: None,
1226                })),
1227            })),
1228        }));
1229
1230        let physical = planner.plan(&logical).unwrap();
1231        assert_eq!(physical.columns(), &["n"]);
1232    }
1233
1234    #[test]
1235    fn test_plan_sort() {
1236        let store = create_test_store();
1237        let planner = Planner::new(store);
1238
1239        // MATCH (n) RETURN n ORDER BY n.name ASC
1240        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1241            items: vec![ReturnItem {
1242                expression: LogicalExpression::Variable("n".to_string()),
1243                alias: None,
1244            }],
1245            distinct: false,
1246            input: Box::new(LogicalOperator::Sort(SortOp {
1247                keys: vec![SortKey {
1248                    expression: LogicalExpression::Variable("n".to_string()),
1249                    order: SortOrder::Ascending,
1250                }],
1251                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1252                    variable: "n".to_string(),
1253                    label: None,
1254                    input: None,
1255                })),
1256            })),
1257        }));
1258
1259        let physical = planner.plan(&logical).unwrap();
1260        assert_eq!(physical.columns(), &["n"]);
1261    }
1262
1263    #[test]
1264    fn test_plan_sort_descending() {
1265        let store = create_test_store();
1266        let planner = Planner::new(store);
1267
1268        // ORDER BY n DESC
1269        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1270            items: vec![ReturnItem {
1271                expression: LogicalExpression::Variable("n".to_string()),
1272                alias: None,
1273            }],
1274            distinct: false,
1275            input: Box::new(LogicalOperator::Sort(SortOp {
1276                keys: vec![SortKey {
1277                    expression: LogicalExpression::Variable("n".to_string()),
1278                    order: SortOrder::Descending,
1279                }],
1280                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1281                    variable: "n".to_string(),
1282                    label: None,
1283                    input: None,
1284                })),
1285            })),
1286        }));
1287
1288        let physical = planner.plan(&logical).unwrap();
1289        assert_eq!(physical.columns(), &["n"]);
1290    }
1291
1292    #[test]
1293    fn test_plan_distinct() {
1294        let store = create_test_store();
1295        let planner = Planner::new(store);
1296
1297        // MATCH (n) RETURN DISTINCT n
1298        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1299            items: vec![ReturnItem {
1300                expression: LogicalExpression::Variable("n".to_string()),
1301                alias: None,
1302            }],
1303            distinct: false,
1304            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1305                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1306                    variable: "n".to_string(),
1307                    label: None,
1308                    input: None,
1309                })),
1310                columns: None,
1311            })),
1312        }));
1313
1314        let physical = planner.plan(&logical).unwrap();
1315        assert_eq!(physical.columns(), &["n"]);
1316    }
1317
1318    // ==================== Aggregate Tests ====================
1319
1320    #[test]
1321    fn test_plan_aggregate_count() {
1322        let store = create_test_store();
1323        let planner = Planner::new(store);
1324
1325        // MATCH (n) RETURN count(n)
1326        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1327            items: vec![ReturnItem {
1328                expression: LogicalExpression::Variable("cnt".to_string()),
1329                alias: None,
1330            }],
1331            distinct: false,
1332            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1333                group_by: vec![],
1334                aggregates: vec![LogicalAggregateExpr {
1335                    function: LogicalAggregateFunction::Count,
1336                    expression: Some(LogicalExpression::Variable("n".to_string())),
1337                    distinct: false,
1338                    alias: Some("cnt".to_string()),
1339                    percentile: None,
1340                }],
1341                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1342                    variable: "n".to_string(),
1343                    label: None,
1344                    input: None,
1345                })),
1346                having: None,
1347            })),
1348        }));
1349
1350        let physical = planner.plan(&logical).unwrap();
1351        assert!(physical.columns().contains(&"cnt".to_string()));
1352    }
1353
1354    #[test]
1355    fn test_plan_aggregate_with_group_by() {
1356        let store = create_test_store();
1357        let planner = Planner::new(store);
1358
1359        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1360        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1361            group_by: vec![LogicalExpression::Property {
1362                variable: "n".to_string(),
1363                property: "city".to_string(),
1364            }],
1365            aggregates: vec![LogicalAggregateExpr {
1366                function: LogicalAggregateFunction::Count,
1367                expression: Some(LogicalExpression::Variable("n".to_string())),
1368                distinct: false,
1369                alias: Some("cnt".to_string()),
1370                percentile: None,
1371            }],
1372            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1373                variable: "n".to_string(),
1374                label: Some("Person".to_string()),
1375                input: None,
1376            })),
1377            having: None,
1378        }));
1379
1380        let physical = planner.plan(&logical).unwrap();
1381        assert_eq!(physical.columns().len(), 2);
1382    }
1383
1384    #[test]
1385    fn test_plan_aggregate_sum() {
1386        let store = create_test_store();
1387        let planner = Planner::new(store);
1388
1389        // SUM(n.value)
1390        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1391            group_by: vec![],
1392            aggregates: vec![LogicalAggregateExpr {
1393                function: LogicalAggregateFunction::Sum,
1394                expression: Some(LogicalExpression::Property {
1395                    variable: "n".to_string(),
1396                    property: "value".to_string(),
1397                }),
1398                distinct: false,
1399                alias: Some("total".to_string()),
1400                percentile: None,
1401            }],
1402            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1403                variable: "n".to_string(),
1404                label: None,
1405                input: None,
1406            })),
1407            having: None,
1408        }));
1409
1410        let physical = planner.plan(&logical).unwrap();
1411        assert!(physical.columns().contains(&"total".to_string()));
1412    }
1413
1414    #[test]
1415    fn test_plan_aggregate_avg() {
1416        let store = create_test_store();
1417        let planner = Planner::new(store);
1418
1419        // AVG(n.score)
1420        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1421            group_by: vec![],
1422            aggregates: vec![LogicalAggregateExpr {
1423                function: LogicalAggregateFunction::Avg,
1424                expression: Some(LogicalExpression::Property {
1425                    variable: "n".to_string(),
1426                    property: "score".to_string(),
1427                }),
1428                distinct: false,
1429                alias: Some("average".to_string()),
1430                percentile: None,
1431            }],
1432            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1433                variable: "n".to_string(),
1434                label: None,
1435                input: None,
1436            })),
1437            having: None,
1438        }));
1439
1440        let physical = planner.plan(&logical).unwrap();
1441        assert!(physical.columns().contains(&"average".to_string()));
1442    }
1443
1444    #[test]
1445    fn test_plan_aggregate_min_max() {
1446        let store = create_test_store();
1447        let planner = Planner::new(store);
1448
1449        // MIN(n.age), MAX(n.age)
1450        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1451            group_by: vec![],
1452            aggregates: vec![
1453                LogicalAggregateExpr {
1454                    function: LogicalAggregateFunction::Min,
1455                    expression: Some(LogicalExpression::Property {
1456                        variable: "n".to_string(),
1457                        property: "age".to_string(),
1458                    }),
1459                    distinct: false,
1460                    alias: Some("youngest".to_string()),
1461                    percentile: None,
1462                },
1463                LogicalAggregateExpr {
1464                    function: LogicalAggregateFunction::Max,
1465                    expression: Some(LogicalExpression::Property {
1466                        variable: "n".to_string(),
1467                        property: "age".to_string(),
1468                    }),
1469                    distinct: false,
1470                    alias: Some("oldest".to_string()),
1471                    percentile: None,
1472                },
1473            ],
1474            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1475                variable: "n".to_string(),
1476                label: None,
1477                input: None,
1478            })),
1479            having: None,
1480        }));
1481
1482        let physical = planner.plan(&logical).unwrap();
1483        assert!(physical.columns().contains(&"youngest".to_string()));
1484        assert!(physical.columns().contains(&"oldest".to_string()));
1485    }
1486
1487    // ==================== Join Tests ====================
1488
1489    #[test]
1490    fn test_plan_inner_join() {
1491        let store = create_test_store();
1492        let planner = Planner::new(store);
1493
1494        // Inner join between two scans
1495        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1496            items: vec![
1497                ReturnItem {
1498                    expression: LogicalExpression::Variable("a".to_string()),
1499                    alias: None,
1500                },
1501                ReturnItem {
1502                    expression: LogicalExpression::Variable("b".to_string()),
1503                    alias: None,
1504                },
1505            ],
1506            distinct: false,
1507            input: Box::new(LogicalOperator::Join(JoinOp {
1508                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1509                    variable: "a".to_string(),
1510                    label: Some("Person".to_string()),
1511                    input: None,
1512                })),
1513                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1514                    variable: "b".to_string(),
1515                    label: Some("Company".to_string()),
1516                    input: None,
1517                })),
1518                join_type: JoinType::Inner,
1519                conditions: vec![JoinCondition {
1520                    left: LogicalExpression::Variable("a".to_string()),
1521                    right: LogicalExpression::Variable("b".to_string()),
1522                }],
1523            })),
1524        }));
1525
1526        let physical = planner.plan(&logical).unwrap();
1527        assert!(physical.columns().contains(&"a".to_string()));
1528        assert!(physical.columns().contains(&"b".to_string()));
1529    }
1530
1531    #[test]
1532    fn test_plan_cross_join() {
1533        let store = create_test_store();
1534        let planner = Planner::new(store);
1535
1536        // Cross join (no conditions)
1537        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1538            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1539                variable: "a".to_string(),
1540                label: None,
1541                input: None,
1542            })),
1543            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1544                variable: "b".to_string(),
1545                label: None,
1546                input: None,
1547            })),
1548            join_type: JoinType::Cross,
1549            conditions: vec![],
1550        }));
1551
1552        let physical = planner.plan(&logical).unwrap();
1553        assert_eq!(physical.columns().len(), 2);
1554    }
1555
1556    #[test]
1557    fn test_plan_left_join() {
1558        let store = create_test_store();
1559        let planner = Planner::new(store);
1560
1561        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1562            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1563                variable: "a".to_string(),
1564                label: None,
1565                input: None,
1566            })),
1567            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1568                variable: "b".to_string(),
1569                label: None,
1570                input: None,
1571            })),
1572            join_type: JoinType::Left,
1573            conditions: vec![],
1574        }));
1575
1576        let physical = planner.plan(&logical).unwrap();
1577        assert_eq!(physical.columns().len(), 2);
1578    }
1579
1580    // ==================== Mutation Tests ====================
1581
1582    #[test]
1583    fn test_plan_create_node() {
1584        let store = create_test_store();
1585        let planner = Planner::new(store);
1586
1587        // CREATE (n:Person {name: 'Alice'})
1588        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1589            variable: "n".to_string(),
1590            labels: vec!["Person".to_string()],
1591            properties: vec![(
1592                "name".to_string(),
1593                LogicalExpression::Literal(Value::String("Alice".into())),
1594            )],
1595            input: None,
1596        }));
1597
1598        let physical = planner.plan(&logical).unwrap();
1599        assert!(physical.columns().contains(&"n".to_string()));
1600    }
1601
1602    #[test]
1603    fn test_plan_create_edge() {
1604        let store = create_test_store();
1605        let planner = Planner::new(store);
1606
1607        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1608        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1609            variable: Some("r".to_string()),
1610            from_variable: "a".to_string(),
1611            to_variable: "b".to_string(),
1612            edge_type: "KNOWS".to_string(),
1613            properties: vec![],
1614            input: Box::new(LogicalOperator::Join(JoinOp {
1615                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1616                    variable: "a".to_string(),
1617                    label: None,
1618                    input: None,
1619                })),
1620                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1621                    variable: "b".to_string(),
1622                    label: None,
1623                    input: None,
1624                })),
1625                join_type: JoinType::Cross,
1626                conditions: vec![],
1627            })),
1628        }));
1629
1630        let physical = planner.plan(&logical).unwrap();
1631        assert!(physical.columns().contains(&"r".to_string()));
1632    }
1633
1634    #[test]
1635    fn test_plan_delete_node() {
1636        let store = create_test_store();
1637        let planner = Planner::new(store);
1638
1639        // MATCH (n) DELETE n
1640        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1641            variable: "n".to_string(),
1642            detach: false,
1643            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1644                variable: "n".to_string(),
1645                label: None,
1646                input: None,
1647            })),
1648        }));
1649
1650        let physical = planner.plan(&logical).unwrap();
1651        assert!(physical.columns().contains(&"deleted_count".to_string()));
1652    }
1653
1654    // ==================== Error Cases ====================
1655
1656    #[test]
1657    fn test_plan_empty_errors() {
1658        let store = create_test_store();
1659        let planner = Planner::new(store);
1660
1661        let logical = LogicalPlan::new(LogicalOperator::Empty);
1662        let result = planner.plan(&logical);
1663        assert!(result.is_err());
1664    }
1665
1666    #[test]
1667    fn test_plan_missing_variable_in_return() {
1668        let store = create_test_store();
1669        let planner = Planner::new(store);
1670
1671        // Return variable that doesn't exist in input
1672        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1673            items: vec![ReturnItem {
1674                expression: LogicalExpression::Variable("missing".to_string()),
1675                alias: None,
1676            }],
1677            distinct: false,
1678            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1679                variable: "n".to_string(),
1680                label: None,
1681                input: None,
1682            })),
1683        }));
1684
1685        let result = planner.plan(&logical);
1686        assert!(result.is_err());
1687    }
1688
1689    // ==================== Helper Function Tests ====================
1690
1691    #[test]
1692    fn test_convert_binary_ops() {
1693        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1694        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1695        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1696        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1697        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1698        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1699        assert!(convert_binary_op(BinaryOp::And).is_ok());
1700        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1701        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1702        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1703        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1704        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1705    }
1706
1707    #[test]
1708    fn test_convert_unary_ops() {
1709        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1710        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1711        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1712        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1713    }
1714
1715    #[test]
1716    fn test_convert_aggregate_functions() {
1717        assert!(matches!(
1718            convert_aggregate_function(LogicalAggregateFunction::Count),
1719            PhysicalAggregateFunction::Count
1720        ));
1721        assert!(matches!(
1722            convert_aggregate_function(LogicalAggregateFunction::Sum),
1723            PhysicalAggregateFunction::Sum
1724        ));
1725        assert!(matches!(
1726            convert_aggregate_function(LogicalAggregateFunction::Avg),
1727            PhysicalAggregateFunction::Avg
1728        ));
1729        assert!(matches!(
1730            convert_aggregate_function(LogicalAggregateFunction::Min),
1731            PhysicalAggregateFunction::Min
1732        ));
1733        assert!(matches!(
1734            convert_aggregate_function(LogicalAggregateFunction::Max),
1735            PhysicalAggregateFunction::Max
1736        ));
1737    }
1738
1739    #[test]
1740    fn test_planner_accessors() {
1741        let store = create_test_store();
1742        let planner = Planner::new(Arc::clone(&store));
1743
1744        assert!(planner.tx_id().is_none());
1745        assert!(planner.tx_manager().is_none());
1746        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1747    }
1748
1749    #[test]
1750    fn test_physical_plan_accessors() {
1751        let store = create_test_store();
1752        let planner = Planner::new(store);
1753
1754        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1755            variable: "n".to_string(),
1756            label: None,
1757            input: None,
1758        }));
1759
1760        let physical = planner.plan(&logical).unwrap();
1761        assert_eq!(physical.columns(), &["n"]);
1762
1763        // Test into_operator
1764        let _ = physical.into_operator();
1765    }
1766
1767    // ==================== Adaptive Planning Tests ====================
1768
1769    #[test]
1770    fn test_plan_adaptive_with_scan() {
1771        let store = create_test_store();
1772        let planner = Planner::new(store);
1773
1774        // MATCH (n:Person) RETURN n
1775        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1776            items: vec![ReturnItem {
1777                expression: LogicalExpression::Variable("n".to_string()),
1778                alias: None,
1779            }],
1780            distinct: false,
1781            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1782                variable: "n".to_string(),
1783                label: Some("Person".to_string()),
1784                input: None,
1785            })),
1786        }));
1787
1788        let physical = planner.plan_adaptive(&logical).unwrap();
1789        assert_eq!(physical.columns(), &["n"]);
1790        // Should have adaptive context with estimates
1791        assert!(physical.adaptive_context.is_some());
1792    }
1793
1794    #[test]
1795    fn test_plan_adaptive_with_filter() {
1796        let store = create_test_store();
1797        let planner = Planner::new(store);
1798
1799        // MATCH (n) WHERE n.age > 30 RETURN n
1800        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1801            items: vec![ReturnItem {
1802                expression: LogicalExpression::Variable("n".to_string()),
1803                alias: None,
1804            }],
1805            distinct: false,
1806            input: Box::new(LogicalOperator::Filter(FilterOp {
1807                predicate: LogicalExpression::Binary {
1808                    left: Box::new(LogicalExpression::Property {
1809                        variable: "n".to_string(),
1810                        property: "age".to_string(),
1811                    }),
1812                    op: BinaryOp::Gt,
1813                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1814                },
1815                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816                    variable: "n".to_string(),
1817                    label: None,
1818                    input: None,
1819                })),
1820            })),
1821        }));
1822
1823        let physical = planner.plan_adaptive(&logical).unwrap();
1824        assert!(physical.adaptive_context.is_some());
1825    }
1826
1827    #[test]
1828    fn test_plan_adaptive_with_expand() {
1829        let store = create_test_store();
1830        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1831
1832        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1833        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1834            items: vec![
1835                ReturnItem {
1836                    expression: LogicalExpression::Variable("a".to_string()),
1837                    alias: None,
1838                },
1839                ReturnItem {
1840                    expression: LogicalExpression::Variable("b".to_string()),
1841                    alias: None,
1842                },
1843            ],
1844            distinct: false,
1845            input: Box::new(LogicalOperator::Expand(ExpandOp {
1846                from_variable: "a".to_string(),
1847                to_variable: "b".to_string(),
1848                edge_variable: None,
1849                direction: ExpandDirection::Outgoing,
1850                edge_type: Some("KNOWS".to_string()),
1851                min_hops: 1,
1852                max_hops: Some(1),
1853                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1854                    variable: "a".to_string(),
1855                    label: None,
1856                    input: None,
1857                })),
1858                path_alias: None,
1859            })),
1860        }));
1861
1862        let physical = planner.plan_adaptive(&logical).unwrap();
1863        assert!(physical.adaptive_context.is_some());
1864    }
1865
1866    #[test]
1867    fn test_plan_adaptive_with_join() {
1868        let store = create_test_store();
1869        let planner = Planner::new(store);
1870
1871        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1872            items: vec![
1873                ReturnItem {
1874                    expression: LogicalExpression::Variable("a".to_string()),
1875                    alias: None,
1876                },
1877                ReturnItem {
1878                    expression: LogicalExpression::Variable("b".to_string()),
1879                    alias: None,
1880                },
1881            ],
1882            distinct: false,
1883            input: Box::new(LogicalOperator::Join(JoinOp {
1884                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1885                    variable: "a".to_string(),
1886                    label: None,
1887                    input: None,
1888                })),
1889                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1890                    variable: "b".to_string(),
1891                    label: None,
1892                    input: None,
1893                })),
1894                join_type: JoinType::Cross,
1895                conditions: vec![],
1896            })),
1897        }));
1898
1899        let physical = planner.plan_adaptive(&logical).unwrap();
1900        assert!(physical.adaptive_context.is_some());
1901    }
1902
1903    #[test]
1904    fn test_plan_adaptive_with_aggregate() {
1905        let store = create_test_store();
1906        let planner = Planner::new(store);
1907
1908        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1909            group_by: vec![],
1910            aggregates: vec![LogicalAggregateExpr {
1911                function: LogicalAggregateFunction::Count,
1912                expression: Some(LogicalExpression::Variable("n".to_string())),
1913                distinct: false,
1914                alias: Some("cnt".to_string()),
1915                percentile: None,
1916            }],
1917            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1918                variable: "n".to_string(),
1919                label: None,
1920                input: None,
1921            })),
1922            having: None,
1923        }));
1924
1925        let physical = planner.plan_adaptive(&logical).unwrap();
1926        assert!(physical.adaptive_context.is_some());
1927    }
1928
1929    #[test]
1930    fn test_plan_adaptive_with_distinct() {
1931        let store = create_test_store();
1932        let planner = Planner::new(store);
1933
1934        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1935            items: vec![ReturnItem {
1936                expression: LogicalExpression::Variable("n".to_string()),
1937                alias: None,
1938            }],
1939            distinct: false,
1940            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1941                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1942                    variable: "n".to_string(),
1943                    label: None,
1944                    input: None,
1945                })),
1946                columns: None,
1947            })),
1948        }));
1949
1950        let physical = planner.plan_adaptive(&logical).unwrap();
1951        assert!(physical.adaptive_context.is_some());
1952    }
1953
1954    #[test]
1955    fn test_plan_adaptive_with_limit() {
1956        let store = create_test_store();
1957        let planner = Planner::new(store);
1958
1959        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1960            items: vec![ReturnItem {
1961                expression: LogicalExpression::Variable("n".to_string()),
1962                alias: None,
1963            }],
1964            distinct: false,
1965            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1966                count: 10,
1967                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1968                    variable: "n".to_string(),
1969                    label: None,
1970                    input: None,
1971                })),
1972            })),
1973        }));
1974
1975        let physical = planner.plan_adaptive(&logical).unwrap();
1976        assert!(physical.adaptive_context.is_some());
1977    }
1978
1979    #[test]
1980    fn test_plan_adaptive_with_skip() {
1981        let store = create_test_store();
1982        let planner = Planner::new(store);
1983
1984        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1985            items: vec![ReturnItem {
1986                expression: LogicalExpression::Variable("n".to_string()),
1987                alias: None,
1988            }],
1989            distinct: false,
1990            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1991                count: 5,
1992                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1993                    variable: "n".to_string(),
1994                    label: None,
1995                    input: None,
1996                })),
1997            })),
1998        }));
1999
2000        let physical = planner.plan_adaptive(&logical).unwrap();
2001        assert!(physical.adaptive_context.is_some());
2002    }
2003
2004    #[test]
2005    fn test_plan_adaptive_with_sort() {
2006        let store = create_test_store();
2007        let planner = Planner::new(store);
2008
2009        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2010            items: vec![ReturnItem {
2011                expression: LogicalExpression::Variable("n".to_string()),
2012                alias: None,
2013            }],
2014            distinct: false,
2015            input: Box::new(LogicalOperator::Sort(SortOp {
2016                keys: vec![SortKey {
2017                    expression: LogicalExpression::Variable("n".to_string()),
2018                    order: SortOrder::Ascending,
2019                }],
2020                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2021                    variable: "n".to_string(),
2022                    label: None,
2023                    input: None,
2024                })),
2025            })),
2026        }));
2027
2028        let physical = planner.plan_adaptive(&logical).unwrap();
2029        assert!(physical.adaptive_context.is_some());
2030    }
2031
2032    #[test]
2033    fn test_plan_adaptive_with_union() {
2034        let store = create_test_store();
2035        let planner = Planner::new(store);
2036
2037        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2038            items: vec![ReturnItem {
2039                expression: LogicalExpression::Variable("n".to_string()),
2040                alias: None,
2041            }],
2042            distinct: false,
2043            input: Box::new(LogicalOperator::Union(UnionOp {
2044                inputs: vec![
2045                    LogicalOperator::NodeScan(NodeScanOp {
2046                        variable: "n".to_string(),
2047                        label: Some("Person".to_string()),
2048                        input: None,
2049                    }),
2050                    LogicalOperator::NodeScan(NodeScanOp {
2051                        variable: "n".to_string(),
2052                        label: Some("Company".to_string()),
2053                        input: None,
2054                    }),
2055                ],
2056            })),
2057        }));
2058
2059        let physical = planner.plan_adaptive(&logical).unwrap();
2060        assert!(physical.adaptive_context.is_some());
2061    }
2062
2063    // ==================== Variable Length Path Tests ====================
2064
2065    #[test]
2066    fn test_plan_expand_variable_length() {
2067        let store = create_test_store();
2068        let planner = Planner::new(store);
2069
2070        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2071        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2072            items: vec![
2073                ReturnItem {
2074                    expression: LogicalExpression::Variable("a".to_string()),
2075                    alias: None,
2076                },
2077                ReturnItem {
2078                    expression: LogicalExpression::Variable("b".to_string()),
2079                    alias: None,
2080                },
2081            ],
2082            distinct: false,
2083            input: Box::new(LogicalOperator::Expand(ExpandOp {
2084                from_variable: "a".to_string(),
2085                to_variable: "b".to_string(),
2086                edge_variable: None,
2087                direction: ExpandDirection::Outgoing,
2088                edge_type: Some("KNOWS".to_string()),
2089                min_hops: 1,
2090                max_hops: Some(3),
2091                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2092                    variable: "a".to_string(),
2093                    label: None,
2094                    input: None,
2095                })),
2096                path_alias: None,
2097            })),
2098        }));
2099
2100        let physical = planner.plan(&logical).unwrap();
2101        assert!(physical.columns().contains(&"a".to_string()));
2102        assert!(physical.columns().contains(&"b".to_string()));
2103    }
2104
2105    #[test]
2106    fn test_plan_expand_with_path_alias() {
2107        let store = create_test_store();
2108        let planner = Planner::new(store);
2109
2110        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2111        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2112            items: vec![
2113                ReturnItem {
2114                    expression: LogicalExpression::Variable("a".to_string()),
2115                    alias: None,
2116                },
2117                ReturnItem {
2118                    expression: LogicalExpression::Variable("b".to_string()),
2119                    alias: None,
2120                },
2121            ],
2122            distinct: false,
2123            input: Box::new(LogicalOperator::Expand(ExpandOp {
2124                from_variable: "a".to_string(),
2125                to_variable: "b".to_string(),
2126                edge_variable: None,
2127                direction: ExpandDirection::Outgoing,
2128                edge_type: Some("KNOWS".to_string()),
2129                min_hops: 1,
2130                max_hops: Some(3),
2131                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2132                    variable: "a".to_string(),
2133                    label: None,
2134                    input: None,
2135                })),
2136                path_alias: Some("p".to_string()),
2137            })),
2138        }));
2139
2140        let physical = planner.plan(&logical).unwrap();
2141        // Verify plan was created successfully with expected output columns
2142        assert!(physical.columns().contains(&"a".to_string()));
2143        assert!(physical.columns().contains(&"b".to_string()));
2144    }
2145
2146    #[test]
2147    fn test_plan_expand_incoming() {
2148        let store = create_test_store();
2149        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2150
2151        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2152        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2153            items: vec![
2154                ReturnItem {
2155                    expression: LogicalExpression::Variable("a".to_string()),
2156                    alias: None,
2157                },
2158                ReturnItem {
2159                    expression: LogicalExpression::Variable("b".to_string()),
2160                    alias: None,
2161                },
2162            ],
2163            distinct: false,
2164            input: Box::new(LogicalOperator::Expand(ExpandOp {
2165                from_variable: "a".to_string(),
2166                to_variable: "b".to_string(),
2167                edge_variable: None,
2168                direction: ExpandDirection::Incoming,
2169                edge_type: Some("KNOWS".to_string()),
2170                min_hops: 1,
2171                max_hops: Some(1),
2172                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2173                    variable: "a".to_string(),
2174                    label: None,
2175                    input: None,
2176                })),
2177                path_alias: None,
2178            })),
2179        }));
2180
2181        let physical = planner.plan(&logical).unwrap();
2182        assert!(physical.columns().contains(&"a".to_string()));
2183        assert!(physical.columns().contains(&"b".to_string()));
2184    }
2185
2186    #[test]
2187    fn test_plan_expand_both_directions() {
2188        let store = create_test_store();
2189        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2190
2191        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2192        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2193            items: vec![
2194                ReturnItem {
2195                    expression: LogicalExpression::Variable("a".to_string()),
2196                    alias: None,
2197                },
2198                ReturnItem {
2199                    expression: LogicalExpression::Variable("b".to_string()),
2200                    alias: None,
2201                },
2202            ],
2203            distinct: false,
2204            input: Box::new(LogicalOperator::Expand(ExpandOp {
2205                from_variable: "a".to_string(),
2206                to_variable: "b".to_string(),
2207                edge_variable: None,
2208                direction: ExpandDirection::Both,
2209                edge_type: Some("KNOWS".to_string()),
2210                min_hops: 1,
2211                max_hops: Some(1),
2212                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2213                    variable: "a".to_string(),
2214                    label: None,
2215                    input: None,
2216                })),
2217                path_alias: None,
2218            })),
2219        }));
2220
2221        let physical = planner.plan(&logical).unwrap();
2222        assert!(physical.columns().contains(&"a".to_string()));
2223        assert!(physical.columns().contains(&"b".to_string()));
2224    }
2225
2226    // ==================== With Context Tests ====================
2227
2228    #[test]
2229    fn test_planner_with_context() {
2230        use crate::transaction::TransactionManager;
2231
2232        let store = create_test_store();
2233        let tx_manager = Arc::new(TransactionManager::new());
2234        let tx_id = tx_manager.begin();
2235        let epoch = tx_manager.current_epoch();
2236
2237        let planner = Planner::with_context(
2238            Arc::clone(&store),
2239            Arc::clone(&tx_manager),
2240            Some(tx_id),
2241            epoch,
2242        );
2243
2244        assert_eq!(planner.tx_id(), Some(tx_id));
2245        assert!(planner.tx_manager().is_some());
2246        assert_eq!(planner.viewing_epoch(), epoch);
2247    }
2248
2249    #[test]
2250    fn test_planner_with_factorized_execution_disabled() {
2251        let store = create_test_store();
2252        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2253
2254        // Two consecutive expands - should NOT use factorized execution
2255        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2256            items: vec![
2257                ReturnItem {
2258                    expression: LogicalExpression::Variable("a".to_string()),
2259                    alias: None,
2260                },
2261                ReturnItem {
2262                    expression: LogicalExpression::Variable("c".to_string()),
2263                    alias: None,
2264                },
2265            ],
2266            distinct: false,
2267            input: Box::new(LogicalOperator::Expand(ExpandOp {
2268                from_variable: "b".to_string(),
2269                to_variable: "c".to_string(),
2270                edge_variable: None,
2271                direction: ExpandDirection::Outgoing,
2272                edge_type: None,
2273                min_hops: 1,
2274                max_hops: Some(1),
2275                input: Box::new(LogicalOperator::Expand(ExpandOp {
2276                    from_variable: "a".to_string(),
2277                    to_variable: "b".to_string(),
2278                    edge_variable: None,
2279                    direction: ExpandDirection::Outgoing,
2280                    edge_type: None,
2281                    min_hops: 1,
2282                    max_hops: Some(1),
2283                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2284                        variable: "a".to_string(),
2285                        label: None,
2286                        input: None,
2287                    })),
2288                    path_alias: None,
2289                })),
2290                path_alias: None,
2291            })),
2292        }));
2293
2294        let physical = planner.plan(&logical).unwrap();
2295        assert!(physical.columns().contains(&"a".to_string()));
2296        assert!(physical.columns().contains(&"c".to_string()));
2297    }
2298
2299    // ==================== Sort with Property Tests ====================
2300
2301    #[test]
2302    fn test_plan_sort_by_property() {
2303        let store = create_test_store();
2304        let planner = Planner::new(store);
2305
2306        // MATCH (n) RETURN n ORDER BY n.name ASC
2307        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2308            items: vec![ReturnItem {
2309                expression: LogicalExpression::Variable("n".to_string()),
2310                alias: None,
2311            }],
2312            distinct: false,
2313            input: Box::new(LogicalOperator::Sort(SortOp {
2314                keys: vec![SortKey {
2315                    expression: LogicalExpression::Property {
2316                        variable: "n".to_string(),
2317                        property: "name".to_string(),
2318                    },
2319                    order: SortOrder::Ascending,
2320                }],
2321                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2322                    variable: "n".to_string(),
2323                    label: None,
2324                    input: None,
2325                })),
2326            })),
2327        }));
2328
2329        let physical = planner.plan(&logical).unwrap();
2330        // Should have the property column projected
2331        assert!(physical.columns().contains(&"n".to_string()));
2332    }
2333
2334    // ==================== Scan with Input Tests ====================
2335
2336    #[test]
2337    fn test_plan_scan_with_input() {
2338        let store = create_test_store();
2339        let planner = Planner::new(store);
2340
2341        // A scan with another scan as input (for chained patterns)
2342        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2343            items: vec![
2344                ReturnItem {
2345                    expression: LogicalExpression::Variable("a".to_string()),
2346                    alias: None,
2347                },
2348                ReturnItem {
2349                    expression: LogicalExpression::Variable("b".to_string()),
2350                    alias: None,
2351                },
2352            ],
2353            distinct: false,
2354            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2355                variable: "b".to_string(),
2356                label: Some("Company".to_string()),
2357                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2358                    variable: "a".to_string(),
2359                    label: Some("Person".to_string()),
2360                    input: None,
2361                }))),
2362            })),
2363        }));
2364
2365        let physical = planner.plan(&logical).unwrap();
2366        assert!(physical.columns().contains(&"a".to_string()));
2367        assert!(physical.columns().contains(&"b".to_string()));
2368    }
2369}