Skip to main content

grafeo_engine/query/planner/
mod.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7mod aggregate;
8mod expand;
9mod expression;
10mod filter;
11mod join;
12mod mutation;
13mod project;
14mod scan;
15
16use crate::query::plan::{
17    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
18    CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
19    ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
20    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, NodeScanOp,
21    RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp,
22    UnionOp, UnwindOp,
23};
24use grafeo_common::types::{EpochId, TxId};
25use grafeo_common::types::{LogicalType, Value};
26use grafeo_common::utils::error::{Error, Result};
27use grafeo_core::execution::AdaptiveContext;
28use grafeo_core::execution::operators::{
29    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
30    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
31    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
32    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
33    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
34    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
35    LeapfrogJoinOperator, LimitOperator, MapCollectOperator, MergeOperator,
36    MergeRelationshipConfig, MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator,
37    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
38    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
39    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
40    UnwindOperator, VariableLengthExpandOperator,
41};
42use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
43use std::collections::HashMap;
44use std::sync::Arc;
45
46use crate::transaction::TransactionManager;
47
48/// Range bounds for property-based range queries.
49struct RangeBounds<'a> {
50    min: Option<&'a Value>,
51    max: Option<&'a Value>,
52    min_inclusive: bool,
53    max_inclusive: bool,
54}
55
56/// Converts a logical plan to a physical operator tree.
57pub struct Planner {
58    /// The graph store (supports both read and write operations).
59    pub(super) store: Arc<dyn GraphStoreMut>,
60    /// Transaction manager for MVCC operations.
61    pub(super) tx_manager: Option<Arc<TransactionManager>>,
62    /// Current transaction ID (if in a transaction).
63    pub(super) tx_id: Option<TxId>,
64    /// Epoch to use for visibility checks.
65    pub(super) viewing_epoch: EpochId,
66    /// Counter for generating unique anonymous edge column names.
67    pub(super) anon_edge_counter: std::cell::Cell<u32>,
68    /// Whether to use factorized execution for multi-hop queries.
69    pub(super) factorized_execution: bool,
70    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
71    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
72    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
73    /// Variables that hold edge IDs (from MATCH edge patterns).
74    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
75    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
76}
77
78impl Planner {
79    /// Creates a new planner with the given store.
80    ///
81    /// This creates a planner without transaction context, using the current
82    /// epoch from the store for visibility.
83    #[must_use]
84    pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
85        let epoch = store.current_epoch();
86        Self {
87            store,
88            tx_manager: None,
89            tx_id: None,
90            viewing_epoch: epoch,
91            anon_edge_counter: std::cell::Cell::new(0),
92            factorized_execution: true,
93            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
94            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
95        }
96    }
97
98    /// Creates a new planner with transaction context for MVCC-aware planning.
99    ///
100    /// # Arguments
101    ///
102    /// * `store` - The graph store
103    /// * `tx_manager` - Transaction manager for recording reads/writes
104    /// * `tx_id` - Current transaction ID (None for auto-commit)
105    /// * `viewing_epoch` - Epoch to use for version visibility
106    #[must_use]
107    pub fn with_context(
108        store: Arc<dyn GraphStoreMut>,
109        tx_manager: Arc<TransactionManager>,
110        tx_id: Option<TxId>,
111        viewing_epoch: EpochId,
112    ) -> Self {
113        Self {
114            store,
115            tx_manager: Some(tx_manager),
116            tx_id,
117            viewing_epoch,
118            anon_edge_counter: std::cell::Cell::new(0),
119            factorized_execution: true,
120            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
121            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
122        }
123    }
124
125    /// Returns the viewing epoch for this planner.
126    #[must_use]
127    pub fn viewing_epoch(&self) -> EpochId {
128        self.viewing_epoch
129    }
130
131    /// Returns the transaction ID for this planner, if any.
132    #[must_use]
133    pub fn tx_id(&self) -> Option<TxId> {
134        self.tx_id
135    }
136
137    /// Returns a reference to the transaction manager, if available.
138    #[must_use]
139    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
140        self.tx_manager.as_ref()
141    }
142
143    /// Enables or disables factorized execution for multi-hop queries.
144    #[must_use]
145    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
146        self.factorized_execution = enabled;
147        self
148    }
149
150    /// Counts consecutive single-hop expand operations.
151    ///
152    /// Returns the count and the deepest non-expand operator (the base of the chain).
153    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
154        match op {
155            LogicalOperator::Expand(expand) => {
156                // Only count single-hop expands (factorization doesn't apply to variable-length)
157                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
158
159                if is_single_hop {
160                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
161                    (inner_count + 1, base)
162                } else {
163                    // Variable-length path breaks the chain
164                    (0, op)
165                }
166            }
167            _ => (0, op),
168        }
169    }
170
171    /// Collects expand operations from the outermost down to the base.
172    ///
173    /// Returns expands in order from innermost (base) to outermost.
174    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
175        let mut chain = Vec::new();
176        let mut current = op;
177
178        while let LogicalOperator::Expand(expand) = current {
179            // Only include single-hop expands
180            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
181            if !is_single_hop {
182                break;
183            }
184            chain.push(expand);
185            current = &expand.input;
186        }
187
188        // Reverse so we go from base to outer
189        chain.reverse();
190        chain
191    }
192
193    /// Plans a logical plan into a physical operator.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if planning fails.
198    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
199        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
200        Ok(PhysicalPlan {
201            operator,
202            columns,
203            adaptive_context: None,
204        })
205    }
206
207    /// Plans a logical plan with adaptive execution support.
208    ///
209    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
210    /// joins) that can be monitored during execution to detect estimate errors.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if planning fails.
215    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
216        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
217
218        // Build adaptive context with cardinality estimates
219        let mut adaptive_context = AdaptiveContext::new();
220        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
221
222        Ok(PhysicalPlan {
223            operator,
224            columns,
225            adaptive_context: Some(adaptive_context),
226        })
227    }
228
229    /// Collects cardinality estimates from the logical plan into an adaptive context.
230    fn collect_cardinality_estimates(
231        &self,
232        op: &LogicalOperator,
233        ctx: &mut AdaptiveContext,
234        depth: usize,
235    ) {
236        match op {
237            LogicalOperator::NodeScan(scan) => {
238                // Estimate based on label statistics
239                let estimate = if let Some(label) = &scan.label {
240                    self.store.nodes_by_label(label).len() as f64
241                } else {
242                    self.store.node_count() as f64
243                };
244                let id = format!("scan_{}", scan.variable);
245                ctx.set_estimate(&id, estimate);
246
247                // Recurse into input if present
248                if let Some(input) = &scan.input {
249                    self.collect_cardinality_estimates(input, ctx, depth + 1);
250                }
251            }
252            LogicalOperator::Filter(filter) => {
253                // Default selectivity estimate for filters (30%)
254                let input_estimate = self.estimate_cardinality(&filter.input);
255                let estimate = input_estimate * 0.3;
256                let id = format!("filter_{depth}");
257                ctx.set_estimate(&id, estimate);
258
259                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
260            }
261            LogicalOperator::Expand(expand) => {
262                // Estimate based on average degree from store statistics
263                let input_estimate = self.estimate_cardinality(&expand.input);
264                let stats = self.store.statistics();
265                let avg_degree = self.estimate_expand_degree(&stats, expand);
266                let estimate = input_estimate * avg_degree;
267                let id = format!("expand_{}", expand.to_variable);
268                ctx.set_estimate(&id, estimate);
269
270                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
271            }
272            LogicalOperator::Join(join) => {
273                // Estimate join output (product with selectivity)
274                let left_est = self.estimate_cardinality(&join.left);
275                let right_est = self.estimate_cardinality(&join.right);
276                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
277                let id = format!("join_{depth}");
278                ctx.set_estimate(&id, estimate);
279
280                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
281                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
282            }
283            LogicalOperator::Aggregate(agg) => {
284                // Aggregates typically reduce cardinality
285                let input_estimate = self.estimate_cardinality(&agg.input);
286                let estimate = if agg.group_by.is_empty() {
287                    1.0 // Scalar aggregate
288                } else {
289                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
290                };
291                let id = format!("aggregate_{depth}");
292                ctx.set_estimate(&id, estimate);
293
294                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
295            }
296            LogicalOperator::Distinct(distinct) => {
297                let input_estimate = self.estimate_cardinality(&distinct.input);
298                let estimate = (input_estimate * 0.5).max(1.0);
299                let id = format!("distinct_{depth}");
300                ctx.set_estimate(&id, estimate);
301
302                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
303            }
304            LogicalOperator::Return(ret) => {
305                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
306            }
307            LogicalOperator::Limit(limit) => {
308                let input_estimate = self.estimate_cardinality(&limit.input);
309                let estimate = (input_estimate).min(limit.count as f64);
310                let id = format!("limit_{depth}");
311                ctx.set_estimate(&id, estimate);
312
313                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
314            }
315            LogicalOperator::Skip(skip) => {
316                let input_estimate = self.estimate_cardinality(&skip.input);
317                let estimate = (input_estimate - skip.count as f64).max(0.0);
318                let id = format!("skip_{depth}");
319                ctx.set_estimate(&id, estimate);
320
321                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
322            }
323            LogicalOperator::Sort(sort) => {
324                // Sort doesn't change cardinality
325                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
326            }
327            LogicalOperator::Union(union) => {
328                let estimate: f64 = union
329                    .inputs
330                    .iter()
331                    .map(|input| self.estimate_cardinality(input))
332                    .sum();
333                let id = format!("union_{depth}");
334                ctx.set_estimate(&id, estimate);
335
336                for input in &union.inputs {
337                    self.collect_cardinality_estimates(input, ctx, depth + 1);
338                }
339            }
340            _ => {
341                // For other operators, try to recurse into known input patterns
342            }
343        }
344    }
345
346    /// Estimates cardinality for a logical operator subtree.
347    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
348        match op {
349            LogicalOperator::NodeScan(scan) => {
350                if let Some(label) = &scan.label {
351                    self.store.nodes_by_label(label).len() as f64
352                } else {
353                    self.store.node_count() as f64
354                }
355            }
356            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
357            LogicalOperator::Expand(expand) => {
358                let stats = self.store.statistics();
359                let avg_degree = self.estimate_expand_degree(&stats, expand);
360                self.estimate_cardinality(&expand.input) * avg_degree
361            }
362            LogicalOperator::Join(join) => {
363                let left = self.estimate_cardinality(&join.left);
364                let right = self.estimate_cardinality(&join.right);
365                (left * right).sqrt()
366            }
367            LogicalOperator::Aggregate(agg) => {
368                if agg.group_by.is_empty() {
369                    1.0
370                } else {
371                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
372                }
373            }
374            LogicalOperator::Distinct(distinct) => {
375                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
376            }
377            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
378            LogicalOperator::Limit(limit) => self
379                .estimate_cardinality(&limit.input)
380                .min(limit.count as f64),
381            LogicalOperator::Skip(skip) => {
382                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
383            }
384            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
385            LogicalOperator::Union(union) => union
386                .inputs
387                .iter()
388                .map(|input| self.estimate_cardinality(input))
389                .sum(),
390            _ => 1000.0, // Default estimate for unknown operators
391        }
392    }
393
394    /// Estimates the average edge degree for an expand operation using store statistics.
395    fn estimate_expand_degree(
396        &self,
397        stats: &grafeo_core::statistics::Statistics,
398        expand: &ExpandOp,
399    ) -> f64 {
400        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
401        if let Some(edge_type) = &expand.edge_type {
402            stats.estimate_avg_degree(edge_type, outgoing)
403        } else if stats.total_nodes > 0 {
404            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
405        } else {
406            10.0 // fallback for empty graph
407        }
408    }
409
410    /// Plans a single logical operator.
411    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
412        match op {
413            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
414            LogicalOperator::Expand(expand) => {
415                // Check for expand chains when factorized execution is enabled
416                if self.factorized_execution {
417                    let (chain_len, _base) = Self::count_expand_chain(op);
418                    if chain_len >= 2 {
419                        // Use factorized chain for 2+ consecutive single-hop expands
420                        return self.plan_expand_chain(op);
421                    }
422                }
423                self.plan_expand(expand)
424            }
425            LogicalOperator::Return(ret) => self.plan_return(ret),
426            LogicalOperator::Filter(filter) => self.plan_filter(filter),
427            LogicalOperator::Project(project) => self.plan_project(project),
428            LogicalOperator::Limit(limit) => self.plan_limit(limit),
429            LogicalOperator::Skip(skip) => self.plan_skip(skip),
430            LogicalOperator::Sort(sort) => self.plan_sort(sort),
431            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
432            LogicalOperator::Join(join) => self.plan_join(join),
433            LogicalOperator::Union(union) => self.plan_union(union),
434            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
435            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
436            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
437            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
438            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
439            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
440            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
441            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
442            LogicalOperator::Merge(merge) => self.plan_merge(merge),
443            LogicalOperator::MergeRelationship(merge_rel) => {
444                self.plan_merge_relationship(merge_rel)
445            }
446            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
447            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
448            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
449            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
450            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
451            #[cfg(feature = "algos")]
452            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
453            #[cfg(not(feature = "algos"))]
454            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
455                "CALL procedures require the 'algos' feature".to_string(),
456            )),
457            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
458            LogicalOperator::VectorScan(_) => Err(Error::Internal(
459                "VectorScan requires vector-index feature".to_string(),
460            )),
461            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
462                "VectorJoin requires vector-index feature".to_string(),
463            )),
464            _ => Err(Error::Internal(format!(
465                "Unsupported operator: {:?}",
466                std::mem::discriminant(op)
467            ))),
468        }
469    }
470
471    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
472    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
473        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
474        let key_idx = child_columns
475            .iter()
476            .position(|c| c == &mc.key_var)
477            .ok_or_else(|| {
478                Error::Internal(format!(
479                    "MapCollect key '{}' not in columns {:?}",
480                    mc.key_var, child_columns
481                ))
482            })?;
483        let value_idx = child_columns
484            .iter()
485            .position(|c| c == &mc.value_var)
486            .ok_or_else(|| {
487                Error::Internal(format!(
488                    "MapCollect value '{}' not in columns {:?}",
489                    mc.value_var, child_columns
490                ))
491            })?;
492        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
493        // Register the output alias as a scalar column so that the Return
494        // planner emits a plain Column pass-through instead of NodeResolve.
495        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
496        Ok((operator, vec![mc.alias.clone()]))
497    }
498}
499
500/// Converts a logical binary operator to a filter binary operator.
501pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
502    match op {
503        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
504        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
505        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
506        BinaryOp::Le => Ok(BinaryFilterOp::Le),
507        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
508        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
509        BinaryOp::And => Ok(BinaryFilterOp::And),
510        BinaryOp::Or => Ok(BinaryFilterOp::Or),
511        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
512        BinaryOp::Add => Ok(BinaryFilterOp::Add),
513        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
514        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
515        BinaryOp::Div => Ok(BinaryFilterOp::Div),
516        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
517        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
518        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
519        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
520        BinaryOp::In => Ok(BinaryFilterOp::In),
521        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
522        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
523        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
524            "Binary operator {:?} not yet supported in filters",
525            op
526        ))),
527    }
528}
529
530/// Converts a logical unary operator to a filter unary operator.
531pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
532    match op {
533        UnaryOp::Not => Ok(UnaryFilterOp::Not),
534        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
535        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
536        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
537    }
538}
539
540/// Converts a logical aggregate function to a physical aggregate function.
541pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
542    match func {
543        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
544        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
545        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
546        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
547        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
548        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
549        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
550        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
551        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
552        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
553        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
554    }
555}
556
557/// Converts a logical expression to a filter expression.
558///
559/// This is a standalone function that can be used by both LPG and RDF planners.
560pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
561    match expr {
562        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
563        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
564        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
565            variable: variable.clone(),
566            property: property.clone(),
567        }),
568        LogicalExpression::Binary { left, op, right } => {
569            let left_expr = convert_filter_expression(left)?;
570            let right_expr = convert_filter_expression(right)?;
571            let filter_op = convert_binary_op(*op)?;
572            Ok(FilterExpression::Binary {
573                left: Box::new(left_expr),
574                op: filter_op,
575                right: Box::new(right_expr),
576            })
577        }
578        LogicalExpression::Unary { op, operand } => {
579            let operand_expr = convert_filter_expression(operand)?;
580            let filter_op = convert_unary_op(*op)?;
581            Ok(FilterExpression::Unary {
582                op: filter_op,
583                operand: Box::new(operand_expr),
584            })
585        }
586        LogicalExpression::FunctionCall { name, args, .. } => {
587            let filter_args: Vec<FilterExpression> = args
588                .iter()
589                .map(convert_filter_expression)
590                .collect::<Result<Vec<_>>>()?;
591            Ok(FilterExpression::FunctionCall {
592                name: name.clone(),
593                args: filter_args,
594            })
595        }
596        LogicalExpression::Case {
597            operand,
598            when_clauses,
599            else_clause,
600        } => {
601            let filter_operand = operand
602                .as_ref()
603                .map(|e| convert_filter_expression(e))
604                .transpose()?
605                .map(Box::new);
606            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
607                .iter()
608                .map(|(cond, result)| {
609                    Ok((
610                        convert_filter_expression(cond)?,
611                        convert_filter_expression(result)?,
612                    ))
613                })
614                .collect::<Result<Vec<_>>>()?;
615            let filter_else = else_clause
616                .as_ref()
617                .map(|e| convert_filter_expression(e))
618                .transpose()?
619                .map(Box::new);
620            Ok(FilterExpression::Case {
621                operand: filter_operand,
622                when_clauses: filter_when_clauses,
623                else_clause: filter_else,
624            })
625        }
626        LogicalExpression::List(items) => {
627            let filter_items: Vec<FilterExpression> = items
628                .iter()
629                .map(convert_filter_expression)
630                .collect::<Result<Vec<_>>>()?;
631            Ok(FilterExpression::List(filter_items))
632        }
633        LogicalExpression::Map(pairs) => {
634            let filter_pairs: Vec<(String, FilterExpression)> = pairs
635                .iter()
636                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
637                .collect::<Result<Vec<_>>>()?;
638            Ok(FilterExpression::Map(filter_pairs))
639        }
640        LogicalExpression::IndexAccess { base, index } => {
641            let base_expr = convert_filter_expression(base)?;
642            let index_expr = convert_filter_expression(index)?;
643            Ok(FilterExpression::IndexAccess {
644                base: Box::new(base_expr),
645                index: Box::new(index_expr),
646            })
647        }
648        LogicalExpression::SliceAccess { base, start, end } => {
649            let base_expr = convert_filter_expression(base)?;
650            let start_expr = start
651                .as_ref()
652                .map(|s| convert_filter_expression(s))
653                .transpose()?
654                .map(Box::new);
655            let end_expr = end
656                .as_ref()
657                .map(|e| convert_filter_expression(e))
658                .transpose()?
659                .map(Box::new);
660            Ok(FilterExpression::SliceAccess {
661                base: Box::new(base_expr),
662                start: start_expr,
663                end: end_expr,
664            })
665        }
666        LogicalExpression::Parameter(_) => Err(Error::Internal(
667            "Parameters not yet supported in filters".to_string(),
668        )),
669        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
670        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
671        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
672        LogicalExpression::ListComprehension {
673            variable,
674            list_expr,
675            filter_expr,
676            map_expr,
677        } => {
678            let list = convert_filter_expression(list_expr)?;
679            let filter = filter_expr
680                .as_ref()
681                .map(|f| convert_filter_expression(f))
682                .transpose()?
683                .map(Box::new);
684            let map = convert_filter_expression(map_expr)?;
685            Ok(FilterExpression::ListComprehension {
686                variable: variable.clone(),
687                list_expr: Box::new(list),
688                filter_expr: filter,
689                map_expr: Box::new(map),
690            })
691        }
692        LogicalExpression::ListPredicate {
693            kind,
694            variable,
695            list_expr,
696            predicate,
697        } => {
698            use crate::query::plan::ListPredicateKind as LPK;
699            let filter_kind = match kind {
700                LPK::All => grafeo_core::execution::operators::ListPredicateKind::All,
701                LPK::Any => grafeo_core::execution::operators::ListPredicateKind::Any,
702                LPK::None => grafeo_core::execution::operators::ListPredicateKind::None,
703                LPK::Single => grafeo_core::execution::operators::ListPredicateKind::Single,
704            };
705            let list = convert_filter_expression(list_expr)?;
706            let pred = convert_filter_expression(predicate)?;
707            Ok(FilterExpression::ListPredicate {
708                kind: filter_kind,
709                variable: variable.clone(),
710                list_expr: Box::new(list),
711                predicate: Box::new(pred),
712            })
713        }
714        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
715            Error::Internal("Subqueries not yet supported in filters".to_string()),
716        ),
717    }
718}
719
720/// Infers the logical type from a value.
721fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
722    use grafeo_common::types::Value;
723    match value {
724        Value::Null => LogicalType::String, // Default type for null
725        Value::Bool(_) => LogicalType::Bool,
726        Value::Int64(_) => LogicalType::Int64,
727        Value::Float64(_) => LogicalType::Float64,
728        Value::String(_) => LogicalType::String,
729        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
730        Value::Timestamp(_) => LogicalType::Timestamp,
731        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
732        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
733        Value::Vector(v) => LogicalType::Vector(v.len()),
734    }
735}
736
737/// Converts an expression to a string for column naming.
738fn expression_to_string(expr: &LogicalExpression) -> String {
739    match expr {
740        LogicalExpression::Variable(name) => name.clone(),
741        LogicalExpression::Property { variable, property } => {
742            format!("{variable}.{property}")
743        }
744        LogicalExpression::Literal(value) => format!("{value:?}"),
745        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
746        _ => "expr".to_string(),
747    }
748}
749
750/// A physical plan ready for execution.
751pub struct PhysicalPlan {
752    /// The root physical operator.
753    pub operator: Box<dyn Operator>,
754    /// Column names for the result.
755    pub columns: Vec<String>,
756    /// Adaptive execution context with cardinality estimates.
757    ///
758    /// When adaptive execution is enabled, this context contains estimated
759    /// cardinalities at various checkpoints in the plan. During execution,
760    /// actual row counts are recorded and compared against estimates.
761    pub adaptive_context: Option<AdaptiveContext>,
762}
763
764impl PhysicalPlan {
765    /// Returns the column names.
766    #[must_use]
767    pub fn columns(&self) -> &[String] {
768        &self.columns
769    }
770
771    /// Consumes the plan and returns the operator.
772    pub fn into_operator(self) -> Box<dyn Operator> {
773        self.operator
774    }
775
776    /// Returns the adaptive context, if adaptive execution is enabled.
777    #[must_use]
778    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
779        self.adaptive_context.as_ref()
780    }
781
782    /// Takes ownership of the adaptive context.
783    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
784        self.adaptive_context.take()
785    }
786}
787
788/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
789struct StaticResultOperator {
790    rows: Vec<Vec<Value>>,
791    column_indices: Vec<usize>,
792    row_index: usize,
793}
794
795impl Operator for StaticResultOperator {
796    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
797        use grafeo_core::execution::DataChunk;
798
799        if self.row_index >= self.rows.len() {
800            return Ok(None);
801        }
802
803        let remaining = self.rows.len() - self.row_index;
804        let chunk_rows = remaining.min(1024);
805        let col_count = self.column_indices.len();
806
807        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
808        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
809
810        for row_offset in 0..chunk_rows {
811            let row = &self.rows[self.row_index + row_offset];
812            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
813                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
814                if let Some(col) = chunk.column_mut(col_idx) {
815                    col.push_value(value);
816                }
817            }
818        }
819        chunk.set_count(chunk_rows);
820
821        self.row_index += chunk_rows;
822        Ok(Some(chunk))
823    }
824
825    fn reset(&mut self) {
826        self.row_index = 0;
827    }
828
829    fn name(&self) -> &'static str {
830        "StaticResult"
831    }
832}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837    use crate::query::plan::{
838        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
839        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
840        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
841        SortKey, SortOp,
842    };
843    use grafeo_common::types::Value;
844    use grafeo_core::graph::GraphStoreMut;
845    use grafeo_core::graph::lpg::LpgStore;
846
847    fn create_test_store() -> Arc<dyn GraphStoreMut> {
848        let store = Arc::new(LpgStore::new());
849        store.create_node(&["Person"]);
850        store.create_node(&["Person"]);
851        store.create_node(&["Company"]);
852        store
853    }
854
855    // ==================== Simple Scan Tests ====================
856
857    #[test]
858    fn test_plan_simple_scan() {
859        let store = create_test_store();
860        let planner = Planner::new(store);
861
862        // MATCH (n:Person) RETURN n
863        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
864            items: vec![ReturnItem {
865                expression: LogicalExpression::Variable("n".to_string()),
866                alias: None,
867            }],
868            distinct: false,
869            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
870                variable: "n".to_string(),
871                label: Some("Person".to_string()),
872                input: None,
873            })),
874        }));
875
876        let physical = planner.plan(&logical).unwrap();
877        assert_eq!(physical.columns(), &["n"]);
878    }
879
880    #[test]
881    fn test_plan_scan_without_label() {
882        let store = create_test_store();
883        let planner = Planner::new(store);
884
885        // MATCH (n) RETURN n
886        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
887            items: vec![ReturnItem {
888                expression: LogicalExpression::Variable("n".to_string()),
889                alias: None,
890            }],
891            distinct: false,
892            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
893                variable: "n".to_string(),
894                label: None,
895                input: None,
896            })),
897        }));
898
899        let physical = planner.plan(&logical).unwrap();
900        assert_eq!(physical.columns(), &["n"]);
901    }
902
903    #[test]
904    fn test_plan_return_with_alias() {
905        let store = create_test_store();
906        let planner = Planner::new(store);
907
908        // MATCH (n:Person) RETURN n AS person
909        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
910            items: vec![ReturnItem {
911                expression: LogicalExpression::Variable("n".to_string()),
912                alias: Some("person".to_string()),
913            }],
914            distinct: false,
915            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
916                variable: "n".to_string(),
917                label: Some("Person".to_string()),
918                input: None,
919            })),
920        }));
921
922        let physical = planner.plan(&logical).unwrap();
923        assert_eq!(physical.columns(), &["person"]);
924    }
925
926    #[test]
927    fn test_plan_return_property() {
928        let store = create_test_store();
929        let planner = Planner::new(store);
930
931        // MATCH (n:Person) RETURN n.name
932        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
933            items: vec![ReturnItem {
934                expression: LogicalExpression::Property {
935                    variable: "n".to_string(),
936                    property: "name".to_string(),
937                },
938                alias: None,
939            }],
940            distinct: false,
941            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
942                variable: "n".to_string(),
943                label: Some("Person".to_string()),
944                input: None,
945            })),
946        }));
947
948        let physical = planner.plan(&logical).unwrap();
949        assert_eq!(physical.columns(), &["n.name"]);
950    }
951
952    #[test]
953    fn test_plan_return_literal() {
954        let store = create_test_store();
955        let planner = Planner::new(store);
956
957        // MATCH (n) RETURN 42 AS answer
958        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
959            items: vec![ReturnItem {
960                expression: LogicalExpression::Literal(Value::Int64(42)),
961                alias: Some("answer".to_string()),
962            }],
963            distinct: false,
964            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
965                variable: "n".to_string(),
966                label: None,
967                input: None,
968            })),
969        }));
970
971        let physical = planner.plan(&logical).unwrap();
972        assert_eq!(physical.columns(), &["answer"]);
973    }
974
975    // ==================== Filter Tests ====================
976
977    #[test]
978    fn test_plan_filter_equality() {
979        let store = create_test_store();
980        let planner = Planner::new(store);
981
982        // MATCH (n:Person) WHERE n.age = 30 RETURN n
983        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
984            items: vec![ReturnItem {
985                expression: LogicalExpression::Variable("n".to_string()),
986                alias: None,
987            }],
988            distinct: false,
989            input: Box::new(LogicalOperator::Filter(FilterOp {
990                predicate: LogicalExpression::Binary {
991                    left: Box::new(LogicalExpression::Property {
992                        variable: "n".to_string(),
993                        property: "age".to_string(),
994                    }),
995                    op: BinaryOp::Eq,
996                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
997                },
998                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
999                    variable: "n".to_string(),
1000                    label: Some("Person".to_string()),
1001                    input: None,
1002                })),
1003            })),
1004        }));
1005
1006        let physical = planner.plan(&logical).unwrap();
1007        assert_eq!(physical.columns(), &["n"]);
1008    }
1009
1010    #[test]
1011    fn test_plan_filter_compound_and() {
1012        let store = create_test_store();
1013        let planner = Planner::new(store);
1014
1015        // WHERE n.age > 20 AND n.age < 40
1016        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1017            items: vec![ReturnItem {
1018                expression: LogicalExpression::Variable("n".to_string()),
1019                alias: None,
1020            }],
1021            distinct: false,
1022            input: Box::new(LogicalOperator::Filter(FilterOp {
1023                predicate: LogicalExpression::Binary {
1024                    left: Box::new(LogicalExpression::Binary {
1025                        left: Box::new(LogicalExpression::Property {
1026                            variable: "n".to_string(),
1027                            property: "age".to_string(),
1028                        }),
1029                        op: BinaryOp::Gt,
1030                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1031                    }),
1032                    op: BinaryOp::And,
1033                    right: Box::new(LogicalExpression::Binary {
1034                        left: Box::new(LogicalExpression::Property {
1035                            variable: "n".to_string(),
1036                            property: "age".to_string(),
1037                        }),
1038                        op: BinaryOp::Lt,
1039                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1040                    }),
1041                },
1042                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1043                    variable: "n".to_string(),
1044                    label: None,
1045                    input: None,
1046                })),
1047            })),
1048        }));
1049
1050        let physical = planner.plan(&logical).unwrap();
1051        assert_eq!(physical.columns(), &["n"]);
1052    }
1053
1054    #[test]
1055    fn test_plan_filter_unary_not() {
1056        let store = create_test_store();
1057        let planner = Planner::new(store);
1058
1059        // WHERE NOT n.active
1060        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1061            items: vec![ReturnItem {
1062                expression: LogicalExpression::Variable("n".to_string()),
1063                alias: None,
1064            }],
1065            distinct: false,
1066            input: Box::new(LogicalOperator::Filter(FilterOp {
1067                predicate: LogicalExpression::Unary {
1068                    op: UnaryOp::Not,
1069                    operand: Box::new(LogicalExpression::Property {
1070                        variable: "n".to_string(),
1071                        property: "active".to_string(),
1072                    }),
1073                },
1074                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1075                    variable: "n".to_string(),
1076                    label: None,
1077                    input: None,
1078                })),
1079            })),
1080        }));
1081
1082        let physical = planner.plan(&logical).unwrap();
1083        assert_eq!(physical.columns(), &["n"]);
1084    }
1085
1086    #[test]
1087    fn test_plan_filter_is_null() {
1088        let store = create_test_store();
1089        let planner = Planner::new(store);
1090
1091        // WHERE n.email IS NULL
1092        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1093            items: vec![ReturnItem {
1094                expression: LogicalExpression::Variable("n".to_string()),
1095                alias: None,
1096            }],
1097            distinct: false,
1098            input: Box::new(LogicalOperator::Filter(FilterOp {
1099                predicate: LogicalExpression::Unary {
1100                    op: UnaryOp::IsNull,
1101                    operand: Box::new(LogicalExpression::Property {
1102                        variable: "n".to_string(),
1103                        property: "email".to_string(),
1104                    }),
1105                },
1106                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1107                    variable: "n".to_string(),
1108                    label: None,
1109                    input: None,
1110                })),
1111            })),
1112        }));
1113
1114        let physical = planner.plan(&logical).unwrap();
1115        assert_eq!(physical.columns(), &["n"]);
1116    }
1117
1118    #[test]
1119    fn test_plan_filter_function_call() {
1120        let store = create_test_store();
1121        let planner = Planner::new(store);
1122
1123        // WHERE size(n.friends) > 0
1124        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1125            items: vec![ReturnItem {
1126                expression: LogicalExpression::Variable("n".to_string()),
1127                alias: None,
1128            }],
1129            distinct: false,
1130            input: Box::new(LogicalOperator::Filter(FilterOp {
1131                predicate: LogicalExpression::Binary {
1132                    left: Box::new(LogicalExpression::FunctionCall {
1133                        name: "size".to_string(),
1134                        args: vec![LogicalExpression::Property {
1135                            variable: "n".to_string(),
1136                            property: "friends".to_string(),
1137                        }],
1138                        distinct: false,
1139                    }),
1140                    op: BinaryOp::Gt,
1141                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1142                },
1143                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1144                    variable: "n".to_string(),
1145                    label: None,
1146                    input: None,
1147                })),
1148            })),
1149        }));
1150
1151        let physical = planner.plan(&logical).unwrap();
1152        assert_eq!(physical.columns(), &["n"]);
1153    }
1154
1155    // ==================== Expand Tests ====================
1156
1157    #[test]
1158    fn test_plan_expand_outgoing() {
1159        let store = create_test_store();
1160        let planner = Planner::new(store);
1161
1162        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1163        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1164            items: vec![
1165                ReturnItem {
1166                    expression: LogicalExpression::Variable("a".to_string()),
1167                    alias: None,
1168                },
1169                ReturnItem {
1170                    expression: LogicalExpression::Variable("b".to_string()),
1171                    alias: None,
1172                },
1173            ],
1174            distinct: false,
1175            input: Box::new(LogicalOperator::Expand(ExpandOp {
1176                from_variable: "a".to_string(),
1177                to_variable: "b".to_string(),
1178                edge_variable: None,
1179                direction: ExpandDirection::Outgoing,
1180                edge_type: Some("KNOWS".to_string()),
1181                min_hops: 1,
1182                max_hops: Some(1),
1183                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1184                    variable: "a".to_string(),
1185                    label: Some("Person".to_string()),
1186                    input: None,
1187                })),
1188                path_alias: None,
1189            })),
1190        }));
1191
1192        let physical = planner.plan(&logical).unwrap();
1193        // The return should have columns [a, b]
1194        assert!(physical.columns().contains(&"a".to_string()));
1195        assert!(physical.columns().contains(&"b".to_string()));
1196    }
1197
1198    #[test]
1199    fn test_plan_expand_with_edge_variable() {
1200        let store = create_test_store();
1201        let planner = Planner::new(store);
1202
1203        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1204        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1205            items: vec![
1206                ReturnItem {
1207                    expression: LogicalExpression::Variable("a".to_string()),
1208                    alias: None,
1209                },
1210                ReturnItem {
1211                    expression: LogicalExpression::Variable("r".to_string()),
1212                    alias: None,
1213                },
1214                ReturnItem {
1215                    expression: LogicalExpression::Variable("b".to_string()),
1216                    alias: None,
1217                },
1218            ],
1219            distinct: false,
1220            input: Box::new(LogicalOperator::Expand(ExpandOp {
1221                from_variable: "a".to_string(),
1222                to_variable: "b".to_string(),
1223                edge_variable: Some("r".to_string()),
1224                direction: ExpandDirection::Outgoing,
1225                edge_type: Some("KNOWS".to_string()),
1226                min_hops: 1,
1227                max_hops: Some(1),
1228                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1229                    variable: "a".to_string(),
1230                    label: None,
1231                    input: None,
1232                })),
1233                path_alias: None,
1234            })),
1235        }));
1236
1237        let physical = planner.plan(&logical).unwrap();
1238        assert!(physical.columns().contains(&"a".to_string()));
1239        assert!(physical.columns().contains(&"r".to_string()));
1240        assert!(physical.columns().contains(&"b".to_string()));
1241    }
1242
1243    // ==================== Limit/Skip/Sort Tests ====================
1244
1245    #[test]
1246    fn test_plan_limit() {
1247        let store = create_test_store();
1248        let planner = Planner::new(store);
1249
1250        // MATCH (n) RETURN n LIMIT 10
1251        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1252            items: vec![ReturnItem {
1253                expression: LogicalExpression::Variable("n".to_string()),
1254                alias: None,
1255            }],
1256            distinct: false,
1257            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1258                count: 10,
1259                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1260                    variable: "n".to_string(),
1261                    label: None,
1262                    input: None,
1263                })),
1264            })),
1265        }));
1266
1267        let physical = planner.plan(&logical).unwrap();
1268        assert_eq!(physical.columns(), &["n"]);
1269    }
1270
1271    #[test]
1272    fn test_plan_skip() {
1273        let store = create_test_store();
1274        let planner = Planner::new(store);
1275
1276        // MATCH (n) RETURN n SKIP 5
1277        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1278            items: vec![ReturnItem {
1279                expression: LogicalExpression::Variable("n".to_string()),
1280                alias: None,
1281            }],
1282            distinct: false,
1283            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1284                count: 5,
1285                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1286                    variable: "n".to_string(),
1287                    label: None,
1288                    input: None,
1289                })),
1290            })),
1291        }));
1292
1293        let physical = planner.plan(&logical).unwrap();
1294        assert_eq!(physical.columns(), &["n"]);
1295    }
1296
1297    #[test]
1298    fn test_plan_sort() {
1299        let store = create_test_store();
1300        let planner = Planner::new(store);
1301
1302        // MATCH (n) RETURN n ORDER BY n.name ASC
1303        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1304            items: vec![ReturnItem {
1305                expression: LogicalExpression::Variable("n".to_string()),
1306                alias: None,
1307            }],
1308            distinct: false,
1309            input: Box::new(LogicalOperator::Sort(SortOp {
1310                keys: vec![SortKey {
1311                    expression: LogicalExpression::Variable("n".to_string()),
1312                    order: SortOrder::Ascending,
1313                }],
1314                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1315                    variable: "n".to_string(),
1316                    label: None,
1317                    input: None,
1318                })),
1319            })),
1320        }));
1321
1322        let physical = planner.plan(&logical).unwrap();
1323        assert_eq!(physical.columns(), &["n"]);
1324    }
1325
1326    #[test]
1327    fn test_plan_sort_descending() {
1328        let store = create_test_store();
1329        let planner = Planner::new(store);
1330
1331        // ORDER BY n DESC
1332        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1333            items: vec![ReturnItem {
1334                expression: LogicalExpression::Variable("n".to_string()),
1335                alias: None,
1336            }],
1337            distinct: false,
1338            input: Box::new(LogicalOperator::Sort(SortOp {
1339                keys: vec![SortKey {
1340                    expression: LogicalExpression::Variable("n".to_string()),
1341                    order: SortOrder::Descending,
1342                }],
1343                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1344                    variable: "n".to_string(),
1345                    label: None,
1346                    input: None,
1347                })),
1348            })),
1349        }));
1350
1351        let physical = planner.plan(&logical).unwrap();
1352        assert_eq!(physical.columns(), &["n"]);
1353    }
1354
1355    #[test]
1356    fn test_plan_distinct() {
1357        let store = create_test_store();
1358        let planner = Planner::new(store);
1359
1360        // MATCH (n) RETURN DISTINCT n
1361        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1362            items: vec![ReturnItem {
1363                expression: LogicalExpression::Variable("n".to_string()),
1364                alias: None,
1365            }],
1366            distinct: false,
1367            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1368                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1369                    variable: "n".to_string(),
1370                    label: None,
1371                    input: None,
1372                })),
1373                columns: None,
1374            })),
1375        }));
1376
1377        let physical = planner.plan(&logical).unwrap();
1378        assert_eq!(physical.columns(), &["n"]);
1379    }
1380
1381    #[test]
1382    fn test_plan_distinct_with_columns() {
1383        let store = create_test_store();
1384        let planner = Planner::new(store);
1385
1386        // DISTINCT on specific columns (column-specific dedup)
1387        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1388            items: vec![ReturnItem {
1389                expression: LogicalExpression::Variable("n".to_string()),
1390                alias: None,
1391            }],
1392            distinct: false,
1393            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1394                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1395                    variable: "n".to_string(),
1396                    label: None,
1397                    input: None,
1398                })),
1399                columns: Some(vec!["n".to_string()]),
1400            })),
1401        }));
1402
1403        let physical = planner.plan(&logical).unwrap();
1404        assert_eq!(physical.columns(), &["n"]);
1405    }
1406
1407    #[test]
1408    fn test_plan_distinct_with_nonexistent_columns() {
1409        let store = create_test_store();
1410        let planner = Planner::new(store);
1411
1412        // When distinct columns don't match any output columns,
1413        // it falls back to full-row distinct.
1414        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1415            items: vec![ReturnItem {
1416                expression: LogicalExpression::Variable("n".to_string()),
1417                alias: None,
1418            }],
1419            distinct: false,
1420            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1421                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1422                    variable: "n".to_string(),
1423                    label: None,
1424                    input: None,
1425                })),
1426                columns: Some(vec!["nonexistent".to_string()]),
1427            })),
1428        }));
1429
1430        let physical = planner.plan(&logical).unwrap();
1431        assert_eq!(physical.columns(), &["n"]);
1432    }
1433
1434    // ==================== Aggregate Tests ====================
1435
1436    #[test]
1437    fn test_plan_aggregate_count() {
1438        let store = create_test_store();
1439        let planner = Planner::new(store);
1440
1441        // MATCH (n) RETURN count(n)
1442        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1443            items: vec![ReturnItem {
1444                expression: LogicalExpression::Variable("cnt".to_string()),
1445                alias: None,
1446            }],
1447            distinct: false,
1448            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1449                group_by: vec![],
1450                aggregates: vec![LogicalAggregateExpr {
1451                    function: LogicalAggregateFunction::Count,
1452                    expression: Some(LogicalExpression::Variable("n".to_string())),
1453                    distinct: false,
1454                    alias: Some("cnt".to_string()),
1455                    percentile: None,
1456                }],
1457                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1458                    variable: "n".to_string(),
1459                    label: None,
1460                    input: None,
1461                })),
1462                having: None,
1463            })),
1464        }));
1465
1466        let physical = planner.plan(&logical).unwrap();
1467        assert!(physical.columns().contains(&"cnt".to_string()));
1468    }
1469
1470    #[test]
1471    fn test_plan_aggregate_with_group_by() {
1472        let store = create_test_store();
1473        let planner = Planner::new(store);
1474
1475        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1476        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1477            group_by: vec![LogicalExpression::Property {
1478                variable: "n".to_string(),
1479                property: "city".to_string(),
1480            }],
1481            aggregates: vec![LogicalAggregateExpr {
1482                function: LogicalAggregateFunction::Count,
1483                expression: Some(LogicalExpression::Variable("n".to_string())),
1484                distinct: false,
1485                alias: Some("cnt".to_string()),
1486                percentile: None,
1487            }],
1488            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1489                variable: "n".to_string(),
1490                label: Some("Person".to_string()),
1491                input: None,
1492            })),
1493            having: None,
1494        }));
1495
1496        let physical = planner.plan(&logical).unwrap();
1497        assert_eq!(physical.columns().len(), 2);
1498    }
1499
1500    #[test]
1501    fn test_plan_aggregate_sum() {
1502        let store = create_test_store();
1503        let planner = Planner::new(store);
1504
1505        // SUM(n.value)
1506        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1507            group_by: vec![],
1508            aggregates: vec![LogicalAggregateExpr {
1509                function: LogicalAggregateFunction::Sum,
1510                expression: Some(LogicalExpression::Property {
1511                    variable: "n".to_string(),
1512                    property: "value".to_string(),
1513                }),
1514                distinct: false,
1515                alias: Some("total".to_string()),
1516                percentile: None,
1517            }],
1518            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1519                variable: "n".to_string(),
1520                label: None,
1521                input: None,
1522            })),
1523            having: None,
1524        }));
1525
1526        let physical = planner.plan(&logical).unwrap();
1527        assert!(physical.columns().contains(&"total".to_string()));
1528    }
1529
1530    #[test]
1531    fn test_plan_aggregate_avg() {
1532        let store = create_test_store();
1533        let planner = Planner::new(store);
1534
1535        // AVG(n.score)
1536        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1537            group_by: vec![],
1538            aggregates: vec![LogicalAggregateExpr {
1539                function: LogicalAggregateFunction::Avg,
1540                expression: Some(LogicalExpression::Property {
1541                    variable: "n".to_string(),
1542                    property: "score".to_string(),
1543                }),
1544                distinct: false,
1545                alias: Some("average".to_string()),
1546                percentile: None,
1547            }],
1548            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1549                variable: "n".to_string(),
1550                label: None,
1551                input: None,
1552            })),
1553            having: None,
1554        }));
1555
1556        let physical = planner.plan(&logical).unwrap();
1557        assert!(physical.columns().contains(&"average".to_string()));
1558    }
1559
1560    #[test]
1561    fn test_plan_aggregate_min_max() {
1562        let store = create_test_store();
1563        let planner = Planner::new(store);
1564
1565        // MIN(n.age), MAX(n.age)
1566        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1567            group_by: vec![],
1568            aggregates: vec![
1569                LogicalAggregateExpr {
1570                    function: LogicalAggregateFunction::Min,
1571                    expression: Some(LogicalExpression::Property {
1572                        variable: "n".to_string(),
1573                        property: "age".to_string(),
1574                    }),
1575                    distinct: false,
1576                    alias: Some("youngest".to_string()),
1577                    percentile: None,
1578                },
1579                LogicalAggregateExpr {
1580                    function: LogicalAggregateFunction::Max,
1581                    expression: Some(LogicalExpression::Property {
1582                        variable: "n".to_string(),
1583                        property: "age".to_string(),
1584                    }),
1585                    distinct: false,
1586                    alias: Some("oldest".to_string()),
1587                    percentile: None,
1588                },
1589            ],
1590            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1591                variable: "n".to_string(),
1592                label: None,
1593                input: None,
1594            })),
1595            having: None,
1596        }));
1597
1598        let physical = planner.plan(&logical).unwrap();
1599        assert!(physical.columns().contains(&"youngest".to_string()));
1600        assert!(physical.columns().contains(&"oldest".to_string()));
1601    }
1602
1603    // ==================== Join Tests ====================
1604
1605    #[test]
1606    fn test_plan_inner_join() {
1607        let store = create_test_store();
1608        let planner = Planner::new(store);
1609
1610        // Inner join between two scans
1611        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1612            items: vec![
1613                ReturnItem {
1614                    expression: LogicalExpression::Variable("a".to_string()),
1615                    alias: None,
1616                },
1617                ReturnItem {
1618                    expression: LogicalExpression::Variable("b".to_string()),
1619                    alias: None,
1620                },
1621            ],
1622            distinct: false,
1623            input: Box::new(LogicalOperator::Join(JoinOp {
1624                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1625                    variable: "a".to_string(),
1626                    label: Some("Person".to_string()),
1627                    input: None,
1628                })),
1629                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1630                    variable: "b".to_string(),
1631                    label: Some("Company".to_string()),
1632                    input: None,
1633                })),
1634                join_type: JoinType::Inner,
1635                conditions: vec![JoinCondition {
1636                    left: LogicalExpression::Variable("a".to_string()),
1637                    right: LogicalExpression::Variable("b".to_string()),
1638                }],
1639            })),
1640        }));
1641
1642        let physical = planner.plan(&logical).unwrap();
1643        assert!(physical.columns().contains(&"a".to_string()));
1644        assert!(physical.columns().contains(&"b".to_string()));
1645    }
1646
1647    #[test]
1648    fn test_plan_cross_join() {
1649        let store = create_test_store();
1650        let planner = Planner::new(store);
1651
1652        // Cross join (no conditions)
1653        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1654            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1655                variable: "a".to_string(),
1656                label: None,
1657                input: None,
1658            })),
1659            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1660                variable: "b".to_string(),
1661                label: None,
1662                input: None,
1663            })),
1664            join_type: JoinType::Cross,
1665            conditions: vec![],
1666        }));
1667
1668        let physical = planner.plan(&logical).unwrap();
1669        assert_eq!(physical.columns().len(), 2);
1670    }
1671
1672    #[test]
1673    fn test_plan_left_join() {
1674        let store = create_test_store();
1675        let planner = Planner::new(store);
1676
1677        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1678            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1679                variable: "a".to_string(),
1680                label: None,
1681                input: None,
1682            })),
1683            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1684                variable: "b".to_string(),
1685                label: None,
1686                input: None,
1687            })),
1688            join_type: JoinType::Left,
1689            conditions: vec![],
1690        }));
1691
1692        let physical = planner.plan(&logical).unwrap();
1693        assert_eq!(physical.columns().len(), 2);
1694    }
1695
1696    // ==================== Mutation Tests ====================
1697
1698    #[test]
1699    fn test_plan_create_node() {
1700        let store = create_test_store();
1701        let planner = Planner::new(store);
1702
1703        // CREATE (n:Person {name: 'Alice'})
1704        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1705            variable: "n".to_string(),
1706            labels: vec!["Person".to_string()],
1707            properties: vec![(
1708                "name".to_string(),
1709                LogicalExpression::Literal(Value::String("Alice".into())),
1710            )],
1711            input: None,
1712        }));
1713
1714        let physical = planner.plan(&logical).unwrap();
1715        assert!(physical.columns().contains(&"n".to_string()));
1716    }
1717
1718    #[test]
1719    fn test_plan_create_edge() {
1720        let store = create_test_store();
1721        let planner = Planner::new(store);
1722
1723        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1724        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1725            variable: Some("r".to_string()),
1726            from_variable: "a".to_string(),
1727            to_variable: "b".to_string(),
1728            edge_type: "KNOWS".to_string(),
1729            properties: vec![],
1730            input: Box::new(LogicalOperator::Join(JoinOp {
1731                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1732                    variable: "a".to_string(),
1733                    label: None,
1734                    input: None,
1735                })),
1736                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1737                    variable: "b".to_string(),
1738                    label: None,
1739                    input: None,
1740                })),
1741                join_type: JoinType::Cross,
1742                conditions: vec![],
1743            })),
1744        }));
1745
1746        let physical = planner.plan(&logical).unwrap();
1747        assert!(physical.columns().contains(&"r".to_string()));
1748    }
1749
1750    #[test]
1751    fn test_plan_delete_node() {
1752        let store = create_test_store();
1753        let planner = Planner::new(store);
1754
1755        // MATCH (n) DELETE n
1756        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1757            variable: "n".to_string(),
1758            detach: false,
1759            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1760                variable: "n".to_string(),
1761                label: None,
1762                input: None,
1763            })),
1764        }));
1765
1766        let physical = planner.plan(&logical).unwrap();
1767        assert!(physical.columns().contains(&"deleted_count".to_string()));
1768    }
1769
1770    // ==================== Error Cases ====================
1771
1772    #[test]
1773    fn test_plan_empty_errors() {
1774        let store = create_test_store();
1775        let planner = Planner::new(store);
1776
1777        let logical = LogicalPlan::new(LogicalOperator::Empty);
1778        let result = planner.plan(&logical);
1779        assert!(result.is_err());
1780    }
1781
1782    #[test]
1783    fn test_plan_missing_variable_in_return() {
1784        let store = create_test_store();
1785        let planner = Planner::new(store);
1786
1787        // Return variable that doesn't exist in input
1788        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1789            items: vec![ReturnItem {
1790                expression: LogicalExpression::Variable("missing".to_string()),
1791                alias: None,
1792            }],
1793            distinct: false,
1794            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1795                variable: "n".to_string(),
1796                label: None,
1797                input: None,
1798            })),
1799        }));
1800
1801        let result = planner.plan(&logical);
1802        assert!(result.is_err());
1803    }
1804
1805    // ==================== Helper Function Tests ====================
1806
1807    #[test]
1808    fn test_convert_binary_ops() {
1809        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1810        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1811        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1812        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1813        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1814        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1815        assert!(convert_binary_op(BinaryOp::And).is_ok());
1816        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1817        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1818        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1819        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1820        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1821    }
1822
1823    #[test]
1824    fn test_convert_unary_ops() {
1825        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1826        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1827        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1828        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1829    }
1830
1831    #[test]
1832    fn test_convert_aggregate_functions() {
1833        assert!(matches!(
1834            convert_aggregate_function(LogicalAggregateFunction::Count),
1835            PhysicalAggregateFunction::Count
1836        ));
1837        assert!(matches!(
1838            convert_aggregate_function(LogicalAggregateFunction::Sum),
1839            PhysicalAggregateFunction::Sum
1840        ));
1841        assert!(matches!(
1842            convert_aggregate_function(LogicalAggregateFunction::Avg),
1843            PhysicalAggregateFunction::Avg
1844        ));
1845        assert!(matches!(
1846            convert_aggregate_function(LogicalAggregateFunction::Min),
1847            PhysicalAggregateFunction::Min
1848        ));
1849        assert!(matches!(
1850            convert_aggregate_function(LogicalAggregateFunction::Max),
1851            PhysicalAggregateFunction::Max
1852        ));
1853    }
1854
1855    #[test]
1856    fn test_planner_accessors() {
1857        let store = create_test_store();
1858        let planner = Planner::new(Arc::clone(&store));
1859
1860        assert!(planner.tx_id().is_none());
1861        assert!(planner.tx_manager().is_none());
1862        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1863    }
1864
1865    #[test]
1866    fn test_physical_plan_accessors() {
1867        let store = create_test_store();
1868        let planner = Planner::new(store);
1869
1870        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1871            variable: "n".to_string(),
1872            label: None,
1873            input: None,
1874        }));
1875
1876        let physical = planner.plan(&logical).unwrap();
1877        assert_eq!(physical.columns(), &["n"]);
1878
1879        // Test into_operator
1880        let _ = physical.into_operator();
1881    }
1882
1883    // ==================== Adaptive Planning Tests ====================
1884
1885    #[test]
1886    fn test_plan_adaptive_with_scan() {
1887        let store = create_test_store();
1888        let planner = Planner::new(store);
1889
1890        // MATCH (n:Person) RETURN n
1891        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1892            items: vec![ReturnItem {
1893                expression: LogicalExpression::Variable("n".to_string()),
1894                alias: None,
1895            }],
1896            distinct: false,
1897            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1898                variable: "n".to_string(),
1899                label: Some("Person".to_string()),
1900                input: None,
1901            })),
1902        }));
1903
1904        let physical = planner.plan_adaptive(&logical).unwrap();
1905        assert_eq!(physical.columns(), &["n"]);
1906        // Should have adaptive context with estimates
1907        assert!(physical.adaptive_context.is_some());
1908    }
1909
1910    #[test]
1911    fn test_plan_adaptive_with_filter() {
1912        let store = create_test_store();
1913        let planner = Planner::new(store);
1914
1915        // MATCH (n) WHERE n.age > 30 RETURN n
1916        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1917            items: vec![ReturnItem {
1918                expression: LogicalExpression::Variable("n".to_string()),
1919                alias: None,
1920            }],
1921            distinct: false,
1922            input: Box::new(LogicalOperator::Filter(FilterOp {
1923                predicate: LogicalExpression::Binary {
1924                    left: Box::new(LogicalExpression::Property {
1925                        variable: "n".to_string(),
1926                        property: "age".to_string(),
1927                    }),
1928                    op: BinaryOp::Gt,
1929                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1930                },
1931                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1932                    variable: "n".to_string(),
1933                    label: None,
1934                    input: None,
1935                })),
1936            })),
1937        }));
1938
1939        let physical = planner.plan_adaptive(&logical).unwrap();
1940        assert!(physical.adaptive_context.is_some());
1941    }
1942
1943    #[test]
1944    fn test_plan_adaptive_with_expand() {
1945        let store = create_test_store();
1946        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1947
1948        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1949        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1950            items: vec![
1951                ReturnItem {
1952                    expression: LogicalExpression::Variable("a".to_string()),
1953                    alias: None,
1954                },
1955                ReturnItem {
1956                    expression: LogicalExpression::Variable("b".to_string()),
1957                    alias: None,
1958                },
1959            ],
1960            distinct: false,
1961            input: Box::new(LogicalOperator::Expand(ExpandOp {
1962                from_variable: "a".to_string(),
1963                to_variable: "b".to_string(),
1964                edge_variable: None,
1965                direction: ExpandDirection::Outgoing,
1966                edge_type: Some("KNOWS".to_string()),
1967                min_hops: 1,
1968                max_hops: Some(1),
1969                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1970                    variable: "a".to_string(),
1971                    label: None,
1972                    input: None,
1973                })),
1974                path_alias: None,
1975            })),
1976        }));
1977
1978        let physical = planner.plan_adaptive(&logical).unwrap();
1979        assert!(physical.adaptive_context.is_some());
1980    }
1981
1982    #[test]
1983    fn test_plan_adaptive_with_join() {
1984        let store = create_test_store();
1985        let planner = Planner::new(store);
1986
1987        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1988            items: vec![
1989                ReturnItem {
1990                    expression: LogicalExpression::Variable("a".to_string()),
1991                    alias: None,
1992                },
1993                ReturnItem {
1994                    expression: LogicalExpression::Variable("b".to_string()),
1995                    alias: None,
1996                },
1997            ],
1998            distinct: false,
1999            input: Box::new(LogicalOperator::Join(JoinOp {
2000                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2001                    variable: "a".to_string(),
2002                    label: None,
2003                    input: None,
2004                })),
2005                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2006                    variable: "b".to_string(),
2007                    label: None,
2008                    input: None,
2009                })),
2010                join_type: JoinType::Cross,
2011                conditions: vec![],
2012            })),
2013        }));
2014
2015        let physical = planner.plan_adaptive(&logical).unwrap();
2016        assert!(physical.adaptive_context.is_some());
2017    }
2018
2019    #[test]
2020    fn test_plan_adaptive_with_aggregate() {
2021        let store = create_test_store();
2022        let planner = Planner::new(store);
2023
2024        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2025            group_by: vec![],
2026            aggregates: vec![LogicalAggregateExpr {
2027                function: LogicalAggregateFunction::Count,
2028                expression: Some(LogicalExpression::Variable("n".to_string())),
2029                distinct: false,
2030                alias: Some("cnt".to_string()),
2031                percentile: None,
2032            }],
2033            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2034                variable: "n".to_string(),
2035                label: None,
2036                input: None,
2037            })),
2038            having: None,
2039        }));
2040
2041        let physical = planner.plan_adaptive(&logical).unwrap();
2042        assert!(physical.adaptive_context.is_some());
2043    }
2044
2045    #[test]
2046    fn test_plan_adaptive_with_distinct() {
2047        let store = create_test_store();
2048        let planner = Planner::new(store);
2049
2050        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2051            items: vec![ReturnItem {
2052                expression: LogicalExpression::Variable("n".to_string()),
2053                alias: None,
2054            }],
2055            distinct: false,
2056            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2057                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2058                    variable: "n".to_string(),
2059                    label: None,
2060                    input: None,
2061                })),
2062                columns: None,
2063            })),
2064        }));
2065
2066        let physical = planner.plan_adaptive(&logical).unwrap();
2067        assert!(physical.adaptive_context.is_some());
2068    }
2069
2070    #[test]
2071    fn test_plan_adaptive_with_limit() {
2072        let store = create_test_store();
2073        let planner = Planner::new(store);
2074
2075        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2076            items: vec![ReturnItem {
2077                expression: LogicalExpression::Variable("n".to_string()),
2078                alias: None,
2079            }],
2080            distinct: false,
2081            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2082                count: 10,
2083                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2084                    variable: "n".to_string(),
2085                    label: None,
2086                    input: None,
2087                })),
2088            })),
2089        }));
2090
2091        let physical = planner.plan_adaptive(&logical).unwrap();
2092        assert!(physical.adaptive_context.is_some());
2093    }
2094
2095    #[test]
2096    fn test_plan_adaptive_with_skip() {
2097        let store = create_test_store();
2098        let planner = Planner::new(store);
2099
2100        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2101            items: vec![ReturnItem {
2102                expression: LogicalExpression::Variable("n".to_string()),
2103                alias: None,
2104            }],
2105            distinct: false,
2106            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2107                count: 5,
2108                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2109                    variable: "n".to_string(),
2110                    label: None,
2111                    input: None,
2112                })),
2113            })),
2114        }));
2115
2116        let physical = planner.plan_adaptive(&logical).unwrap();
2117        assert!(physical.adaptive_context.is_some());
2118    }
2119
2120    #[test]
2121    fn test_plan_adaptive_with_sort() {
2122        let store = create_test_store();
2123        let planner = Planner::new(store);
2124
2125        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2126            items: vec![ReturnItem {
2127                expression: LogicalExpression::Variable("n".to_string()),
2128                alias: None,
2129            }],
2130            distinct: false,
2131            input: Box::new(LogicalOperator::Sort(SortOp {
2132                keys: vec![SortKey {
2133                    expression: LogicalExpression::Variable("n".to_string()),
2134                    order: SortOrder::Ascending,
2135                }],
2136                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2137                    variable: "n".to_string(),
2138                    label: None,
2139                    input: None,
2140                })),
2141            })),
2142        }));
2143
2144        let physical = planner.plan_adaptive(&logical).unwrap();
2145        assert!(physical.adaptive_context.is_some());
2146    }
2147
2148    #[test]
2149    fn test_plan_adaptive_with_union() {
2150        let store = create_test_store();
2151        let planner = Planner::new(store);
2152
2153        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2154            items: vec![ReturnItem {
2155                expression: LogicalExpression::Variable("n".to_string()),
2156                alias: None,
2157            }],
2158            distinct: false,
2159            input: Box::new(LogicalOperator::Union(UnionOp {
2160                inputs: vec![
2161                    LogicalOperator::NodeScan(NodeScanOp {
2162                        variable: "n".to_string(),
2163                        label: Some("Person".to_string()),
2164                        input: None,
2165                    }),
2166                    LogicalOperator::NodeScan(NodeScanOp {
2167                        variable: "n".to_string(),
2168                        label: Some("Company".to_string()),
2169                        input: None,
2170                    }),
2171                ],
2172            })),
2173        }));
2174
2175        let physical = planner.plan_adaptive(&logical).unwrap();
2176        assert!(physical.adaptive_context.is_some());
2177    }
2178
2179    // ==================== Variable Length Path Tests ====================
2180
2181    #[test]
2182    fn test_plan_expand_variable_length() {
2183        let store = create_test_store();
2184        let planner = Planner::new(store);
2185
2186        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2187        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2188            items: vec![
2189                ReturnItem {
2190                    expression: LogicalExpression::Variable("a".to_string()),
2191                    alias: None,
2192                },
2193                ReturnItem {
2194                    expression: LogicalExpression::Variable("b".to_string()),
2195                    alias: None,
2196                },
2197            ],
2198            distinct: false,
2199            input: Box::new(LogicalOperator::Expand(ExpandOp {
2200                from_variable: "a".to_string(),
2201                to_variable: "b".to_string(),
2202                edge_variable: None,
2203                direction: ExpandDirection::Outgoing,
2204                edge_type: Some("KNOWS".to_string()),
2205                min_hops: 1,
2206                max_hops: Some(3),
2207                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2208                    variable: "a".to_string(),
2209                    label: None,
2210                    input: None,
2211                })),
2212                path_alias: None,
2213            })),
2214        }));
2215
2216        let physical = planner.plan(&logical).unwrap();
2217        assert!(physical.columns().contains(&"a".to_string()));
2218        assert!(physical.columns().contains(&"b".to_string()));
2219    }
2220
2221    #[test]
2222    fn test_plan_expand_with_path_alias() {
2223        let store = create_test_store();
2224        let planner = Planner::new(store);
2225
2226        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2227        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2228            items: vec![
2229                ReturnItem {
2230                    expression: LogicalExpression::Variable("a".to_string()),
2231                    alias: None,
2232                },
2233                ReturnItem {
2234                    expression: LogicalExpression::Variable("b".to_string()),
2235                    alias: None,
2236                },
2237            ],
2238            distinct: false,
2239            input: Box::new(LogicalOperator::Expand(ExpandOp {
2240                from_variable: "a".to_string(),
2241                to_variable: "b".to_string(),
2242                edge_variable: None,
2243                direction: ExpandDirection::Outgoing,
2244                edge_type: Some("KNOWS".to_string()),
2245                min_hops: 1,
2246                max_hops: Some(3),
2247                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2248                    variable: "a".to_string(),
2249                    label: None,
2250                    input: None,
2251                })),
2252                path_alias: Some("p".to_string()),
2253            })),
2254        }));
2255
2256        let physical = planner.plan(&logical).unwrap();
2257        // Verify plan was created successfully with expected output columns
2258        assert!(physical.columns().contains(&"a".to_string()));
2259        assert!(physical.columns().contains(&"b".to_string()));
2260    }
2261
2262    #[test]
2263    fn test_plan_expand_incoming() {
2264        let store = create_test_store();
2265        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2266
2267        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2268        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2269            items: vec![
2270                ReturnItem {
2271                    expression: LogicalExpression::Variable("a".to_string()),
2272                    alias: None,
2273                },
2274                ReturnItem {
2275                    expression: LogicalExpression::Variable("b".to_string()),
2276                    alias: None,
2277                },
2278            ],
2279            distinct: false,
2280            input: Box::new(LogicalOperator::Expand(ExpandOp {
2281                from_variable: "a".to_string(),
2282                to_variable: "b".to_string(),
2283                edge_variable: None,
2284                direction: ExpandDirection::Incoming,
2285                edge_type: Some("KNOWS".to_string()),
2286                min_hops: 1,
2287                max_hops: Some(1),
2288                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2289                    variable: "a".to_string(),
2290                    label: None,
2291                    input: None,
2292                })),
2293                path_alias: None,
2294            })),
2295        }));
2296
2297        let physical = planner.plan(&logical).unwrap();
2298        assert!(physical.columns().contains(&"a".to_string()));
2299        assert!(physical.columns().contains(&"b".to_string()));
2300    }
2301
2302    #[test]
2303    fn test_plan_expand_both_directions() {
2304        let store = create_test_store();
2305        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2306
2307        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2308        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2309            items: vec![
2310                ReturnItem {
2311                    expression: LogicalExpression::Variable("a".to_string()),
2312                    alias: None,
2313                },
2314                ReturnItem {
2315                    expression: LogicalExpression::Variable("b".to_string()),
2316                    alias: None,
2317                },
2318            ],
2319            distinct: false,
2320            input: Box::new(LogicalOperator::Expand(ExpandOp {
2321                from_variable: "a".to_string(),
2322                to_variable: "b".to_string(),
2323                edge_variable: None,
2324                direction: ExpandDirection::Both,
2325                edge_type: Some("KNOWS".to_string()),
2326                min_hops: 1,
2327                max_hops: Some(1),
2328                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2329                    variable: "a".to_string(),
2330                    label: None,
2331                    input: None,
2332                })),
2333                path_alias: None,
2334            })),
2335        }));
2336
2337        let physical = planner.plan(&logical).unwrap();
2338        assert!(physical.columns().contains(&"a".to_string()));
2339        assert!(physical.columns().contains(&"b".to_string()));
2340    }
2341
2342    // ==================== With Context Tests ====================
2343
2344    #[test]
2345    fn test_planner_with_context() {
2346        use crate::transaction::TransactionManager;
2347
2348        let store = create_test_store();
2349        let tx_manager = Arc::new(TransactionManager::new());
2350        let tx_id = tx_manager.begin();
2351        let epoch = tx_manager.current_epoch();
2352
2353        let planner = Planner::with_context(
2354            Arc::clone(&store),
2355            Arc::clone(&tx_manager),
2356            Some(tx_id),
2357            epoch,
2358        );
2359
2360        assert_eq!(planner.tx_id(), Some(tx_id));
2361        assert!(planner.tx_manager().is_some());
2362        assert_eq!(planner.viewing_epoch(), epoch);
2363    }
2364
2365    #[test]
2366    fn test_planner_with_factorized_execution_disabled() {
2367        let store = create_test_store();
2368        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2369
2370        // Two consecutive expands - should NOT use factorized execution
2371        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2372            items: vec![
2373                ReturnItem {
2374                    expression: LogicalExpression::Variable("a".to_string()),
2375                    alias: None,
2376                },
2377                ReturnItem {
2378                    expression: LogicalExpression::Variable("c".to_string()),
2379                    alias: None,
2380                },
2381            ],
2382            distinct: false,
2383            input: Box::new(LogicalOperator::Expand(ExpandOp {
2384                from_variable: "b".to_string(),
2385                to_variable: "c".to_string(),
2386                edge_variable: None,
2387                direction: ExpandDirection::Outgoing,
2388                edge_type: None,
2389                min_hops: 1,
2390                max_hops: Some(1),
2391                input: Box::new(LogicalOperator::Expand(ExpandOp {
2392                    from_variable: "a".to_string(),
2393                    to_variable: "b".to_string(),
2394                    edge_variable: None,
2395                    direction: ExpandDirection::Outgoing,
2396                    edge_type: None,
2397                    min_hops: 1,
2398                    max_hops: Some(1),
2399                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2400                        variable: "a".to_string(),
2401                        label: None,
2402                        input: None,
2403                    })),
2404                    path_alias: None,
2405                })),
2406                path_alias: None,
2407            })),
2408        }));
2409
2410        let physical = planner.plan(&logical).unwrap();
2411        assert!(physical.columns().contains(&"a".to_string()));
2412        assert!(physical.columns().contains(&"c".to_string()));
2413    }
2414
2415    // ==================== Sort with Property Tests ====================
2416
2417    #[test]
2418    fn test_plan_sort_by_property() {
2419        let store = create_test_store();
2420        let planner = Planner::new(store);
2421
2422        // MATCH (n) RETURN n ORDER BY n.name ASC
2423        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2424            items: vec![ReturnItem {
2425                expression: LogicalExpression::Variable("n".to_string()),
2426                alias: None,
2427            }],
2428            distinct: false,
2429            input: Box::new(LogicalOperator::Sort(SortOp {
2430                keys: vec![SortKey {
2431                    expression: LogicalExpression::Property {
2432                        variable: "n".to_string(),
2433                        property: "name".to_string(),
2434                    },
2435                    order: SortOrder::Ascending,
2436                }],
2437                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2438                    variable: "n".to_string(),
2439                    label: None,
2440                    input: None,
2441                })),
2442            })),
2443        }));
2444
2445        let physical = planner.plan(&logical).unwrap();
2446        // Should have the property column projected
2447        assert!(physical.columns().contains(&"n".to_string()));
2448    }
2449
2450    // ==================== Scan with Input Tests ====================
2451
2452    #[test]
2453    fn test_plan_scan_with_input() {
2454        let store = create_test_store();
2455        let planner = Planner::new(store);
2456
2457        // A scan with another scan as input (for chained patterns)
2458        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2459            items: vec![
2460                ReturnItem {
2461                    expression: LogicalExpression::Variable("a".to_string()),
2462                    alias: None,
2463                },
2464                ReturnItem {
2465                    expression: LogicalExpression::Variable("b".to_string()),
2466                    alias: None,
2467                },
2468            ],
2469            distinct: false,
2470            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2471                variable: "b".to_string(),
2472                label: Some("Company".to_string()),
2473                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2474                    variable: "a".to_string(),
2475                    label: Some("Person".to_string()),
2476                    input: None,
2477                }))),
2478            })),
2479        }));
2480
2481        let physical = planner.plan(&logical).unwrap();
2482        assert!(physical.columns().contains(&"a".to_string()));
2483        assert!(physical.columns().contains(&"b".to_string()));
2484    }
2485}