Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29    UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37/// Range bounds for property-based range queries.
38struct RangeBounds<'a> {
39    min: Option<&'a Value>,
40    max: Option<&'a Value>,
41    min_inclusive: bool,
42    max_inclusive: bool,
43}
44
45/// Converts a logical plan to a physical operator tree.
46pub struct Planner {
47    /// The graph store to scan from.
48    store: Arc<LpgStore>,
49    /// Transaction manager for MVCC operations.
50    tx_manager: Option<Arc<TransactionManager>>,
51    /// Current transaction ID (if in a transaction).
52    tx_id: Option<TxId>,
53    /// Epoch to use for visibility checks.
54    viewing_epoch: EpochId,
55    /// Counter for generating unique anonymous edge column names.
56    anon_edge_counter: std::cell::Cell<u32>,
57    /// Whether to use factorized execution for multi-hop queries.
58    factorized_execution: bool,
59}
60
61impl Planner {
62    /// Creates a new planner with the given store.
63    ///
64    /// This creates a planner without transaction context, using the current
65    /// epoch from the store for visibility.
66    #[must_use]
67    pub fn new(store: Arc<LpgStore>) -> Self {
68        let epoch = store.current_epoch();
69        Self {
70            store,
71            tx_manager: None,
72            tx_id: None,
73            viewing_epoch: epoch,
74            anon_edge_counter: std::cell::Cell::new(0),
75            factorized_execution: true,
76        }
77    }
78
79    /// Creates a new planner with transaction context for MVCC-aware planning.
80    ///
81    /// # Arguments
82    ///
83    /// * `store` - The graph store
84    /// * `tx_manager` - Transaction manager for recording reads/writes
85    /// * `tx_id` - Current transaction ID (None for auto-commit)
86    /// * `viewing_epoch` - Epoch to use for version visibility
87    #[must_use]
88    pub fn with_context(
89        store: Arc<LpgStore>,
90        tx_manager: Arc<TransactionManager>,
91        tx_id: Option<TxId>,
92        viewing_epoch: EpochId,
93    ) -> Self {
94        Self {
95            store,
96            tx_manager: Some(tx_manager),
97            tx_id,
98            viewing_epoch,
99            anon_edge_counter: std::cell::Cell::new(0),
100            factorized_execution: true,
101        }
102    }
103
104    /// Returns the viewing epoch for this planner.
105    #[must_use]
106    pub fn viewing_epoch(&self) -> EpochId {
107        self.viewing_epoch
108    }
109
110    /// Returns the transaction ID for this planner, if any.
111    #[must_use]
112    pub fn tx_id(&self) -> Option<TxId> {
113        self.tx_id
114    }
115
116    /// Returns a reference to the transaction manager, if available.
117    #[must_use]
118    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119        self.tx_manager.as_ref()
120    }
121
122    /// Enables or disables factorized execution for multi-hop queries.
123    #[must_use]
124    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125        self.factorized_execution = enabled;
126        self
127    }
128
129    /// Counts consecutive single-hop expand operations.
130    ///
131    /// Returns the count and the deepest non-expand operator (the base of the chain).
132    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133        match op {
134            LogicalOperator::Expand(expand) => {
135                // Only count single-hop expands (factorization doesn't apply to variable-length)
136                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138                if is_single_hop {
139                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
140                    (inner_count + 1, base)
141                } else {
142                    // Variable-length path breaks the chain
143                    (0, op)
144                }
145            }
146            _ => (0, op),
147        }
148    }
149
150    /// Collects expand operations from the outermost down to the base.
151    ///
152    /// Returns expands in order from innermost (base) to outermost.
153    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154        let mut chain = Vec::new();
155        let mut current = op;
156
157        while let LogicalOperator::Expand(expand) = current {
158            // Only include single-hop expands
159            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160            if !is_single_hop {
161                break;
162            }
163            chain.push(expand);
164            current = &expand.input;
165        }
166
167        // Reverse so we go from base to outer
168        chain.reverse();
169        chain
170    }
171
172    /// Plans a logical plan into a physical operator.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if planning fails.
177    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179        Ok(PhysicalPlan {
180            operator,
181            columns,
182            adaptive_context: None,
183        })
184    }
185
186    /// Plans a logical plan with adaptive execution support.
187    ///
188    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
189    /// joins) that can be monitored during execution to detect estimate errors.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if planning fails.
194    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197        // Build adaptive context with cardinality estimates
198        let mut adaptive_context = AdaptiveContext::new();
199        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201        Ok(PhysicalPlan {
202            operator,
203            columns,
204            adaptive_context: Some(adaptive_context),
205        })
206    }
207
208    /// Collects cardinality estimates from the logical plan into an adaptive context.
209    fn collect_cardinality_estimates(
210        &self,
211        op: &LogicalOperator,
212        ctx: &mut AdaptiveContext,
213        depth: usize,
214    ) {
215        match op {
216            LogicalOperator::NodeScan(scan) => {
217                // Estimate based on label statistics
218                let estimate = if let Some(label) = &scan.label {
219                    self.store.nodes_by_label(label).len() as f64
220                } else {
221                    self.store.node_count() as f64
222                };
223                let id = format!("scan_{}", scan.variable);
224                ctx.set_estimate(&id, estimate);
225
226                // Recurse into input if present
227                if let Some(input) = &scan.input {
228                    self.collect_cardinality_estimates(input, ctx, depth + 1);
229                }
230            }
231            LogicalOperator::Filter(filter) => {
232                // Default selectivity estimate for filters (30%)
233                let input_estimate = self.estimate_cardinality(&filter.input);
234                let estimate = input_estimate * 0.3;
235                let id = format!("filter_{depth}");
236                ctx.set_estimate(&id, estimate);
237
238                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239            }
240            LogicalOperator::Expand(expand) => {
241                // Estimate based on average degree
242                let input_estimate = self.estimate_cardinality(&expand.input);
243                let avg_degree = 10.0; // Default estimate
244                let estimate = input_estimate * avg_degree;
245                let id = format!("expand_{}", expand.to_variable);
246                ctx.set_estimate(&id, estimate);
247
248                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
249            }
250            LogicalOperator::Join(join) => {
251                // Estimate join output (product with selectivity)
252                let left_est = self.estimate_cardinality(&join.left);
253                let right_est = self.estimate_cardinality(&join.right);
254                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
255                let id = format!("join_{depth}");
256                ctx.set_estimate(&id, estimate);
257
258                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
259                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
260            }
261            LogicalOperator::Aggregate(agg) => {
262                // Aggregates typically reduce cardinality
263                let input_estimate = self.estimate_cardinality(&agg.input);
264                let estimate = if agg.group_by.is_empty() {
265                    1.0 // Scalar aggregate
266                } else {
267                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
268                };
269                let id = format!("aggregate_{depth}");
270                ctx.set_estimate(&id, estimate);
271
272                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
273            }
274            LogicalOperator::Distinct(distinct) => {
275                let input_estimate = self.estimate_cardinality(&distinct.input);
276                let estimate = (input_estimate * 0.5).max(1.0);
277                let id = format!("distinct_{depth}");
278                ctx.set_estimate(&id, estimate);
279
280                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
281            }
282            LogicalOperator::Return(ret) => {
283                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
284            }
285            LogicalOperator::Limit(limit) => {
286                let input_estimate = self.estimate_cardinality(&limit.input);
287                let estimate = (input_estimate).min(limit.count as f64);
288                let id = format!("limit_{depth}");
289                ctx.set_estimate(&id, estimate);
290
291                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
292            }
293            LogicalOperator::Skip(skip) => {
294                let input_estimate = self.estimate_cardinality(&skip.input);
295                let estimate = (input_estimate - skip.count as f64).max(0.0);
296                let id = format!("skip_{depth}");
297                ctx.set_estimate(&id, estimate);
298
299                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
300            }
301            LogicalOperator::Sort(sort) => {
302                // Sort doesn't change cardinality
303                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
304            }
305            LogicalOperator::Union(union) => {
306                let estimate: f64 = union
307                    .inputs
308                    .iter()
309                    .map(|input| self.estimate_cardinality(input))
310                    .sum();
311                let id = format!("union_{depth}");
312                ctx.set_estimate(&id, estimate);
313
314                for input in &union.inputs {
315                    self.collect_cardinality_estimates(input, ctx, depth + 1);
316                }
317            }
318            _ => {
319                // For other operators, try to recurse into known input patterns
320            }
321        }
322    }
323
324    /// Estimates cardinality for a logical operator subtree.
325    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
326        match op {
327            LogicalOperator::NodeScan(scan) => {
328                if let Some(label) = &scan.label {
329                    self.store.nodes_by_label(label).len() as f64
330                } else {
331                    self.store.node_count() as f64
332                }
333            }
334            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
335            LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
336            LogicalOperator::Join(join) => {
337                let left = self.estimate_cardinality(&join.left);
338                let right = self.estimate_cardinality(&join.right);
339                (left * right).sqrt()
340            }
341            LogicalOperator::Aggregate(agg) => {
342                if agg.group_by.is_empty() {
343                    1.0
344                } else {
345                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
346                }
347            }
348            LogicalOperator::Distinct(distinct) => {
349                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
350            }
351            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
352            LogicalOperator::Limit(limit) => self
353                .estimate_cardinality(&limit.input)
354                .min(limit.count as f64),
355            LogicalOperator::Skip(skip) => {
356                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
357            }
358            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
359            LogicalOperator::Union(union) => union
360                .inputs
361                .iter()
362                .map(|input| self.estimate_cardinality(input))
363                .sum(),
364            _ => 1000.0, // Default estimate for unknown operators
365        }
366    }
367
368    /// Plans a single logical operator.
369    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
370        match op {
371            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
372            LogicalOperator::Expand(expand) => {
373                // Check for expand chains when factorized execution is enabled
374                if self.factorized_execution {
375                    let (chain_len, _base) = Self::count_expand_chain(op);
376                    if chain_len >= 2 {
377                        // Use factorized chain for 2+ consecutive single-hop expands
378                        return self.plan_expand_chain(op);
379                    }
380                }
381                self.plan_expand(expand)
382            }
383            LogicalOperator::Return(ret) => self.plan_return(ret),
384            LogicalOperator::Filter(filter) => self.plan_filter(filter),
385            LogicalOperator::Project(project) => self.plan_project(project),
386            LogicalOperator::Limit(limit) => self.plan_limit(limit),
387            LogicalOperator::Skip(skip) => self.plan_skip(skip),
388            LogicalOperator::Sort(sort) => self.plan_sort(sort),
389            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
390            LogicalOperator::Join(join) => self.plan_join(join),
391            LogicalOperator::Union(union) => self.plan_union(union),
392            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
393            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
394            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
395            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
396            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
397            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
398            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
399            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
400            LogicalOperator::Merge(merge) => self.plan_merge(merge),
401            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
402            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
403            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
404            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
405            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
406            LogicalOperator::VectorScan(_) => Err(Error::Internal(
407                "VectorScan requires vector-index feature".to_string(),
408            )),
409            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
410                "VectorJoin requires vector-index feature".to_string(),
411            )),
412            _ => Err(Error::Internal(format!(
413                "Unsupported operator: {:?}",
414                std::mem::discriminant(op)
415            ))),
416        }
417    }
418
419    /// Plans a node scan operator.
420    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
421        let scan_op = if let Some(label) = &scan.label {
422            ScanOperator::with_label(Arc::clone(&self.store), label)
423        } else {
424            ScanOperator::new(Arc::clone(&self.store))
425        };
426
427        // Apply MVCC context if available
428        let scan_operator: Box<dyn Operator> =
429            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
430
431        // If there's an input, chain operators with a nested loop join (cross join)
432        if let Some(input) = &scan.input {
433            let (input_op, mut input_columns) = self.plan_operator(input)?;
434
435            // Build output schema: input columns + scan column
436            let mut output_schema: Vec<LogicalType> =
437                input_columns.iter().map(|_| LogicalType::Any).collect();
438            output_schema.push(LogicalType::Node);
439
440            // Add scan column to input columns
441            input_columns.push(scan.variable.clone());
442
443            // Use nested loop join to combine input rows with scanned nodes
444            let join_op = Box::new(NestedLoopJoinOperator::new(
445                input_op,
446                scan_operator,
447                None, // No join condition (cross join)
448                PhysicalJoinType::Cross,
449                output_schema,
450            ));
451
452            Ok((join_op, input_columns))
453        } else {
454            let columns = vec![scan.variable.clone()];
455            Ok((scan_operator, columns))
456        }
457    }
458
459    /// Plans an expand operator.
460    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
461        // Plan the input operator first
462        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
463
464        // Find the source column index
465        let source_column = input_columns
466            .iter()
467            .position(|c| c == &expand.from_variable)
468            .ok_or_else(|| {
469                Error::Internal(format!(
470                    "Source variable '{}' not found in input columns",
471                    expand.from_variable
472                ))
473            })?;
474
475        // Convert expand direction
476        let direction = match expand.direction {
477            ExpandDirection::Outgoing => Direction::Outgoing,
478            ExpandDirection::Incoming => Direction::Incoming,
479            ExpandDirection::Both => Direction::Both,
480        };
481
482        // Check if this is a variable-length path
483        let is_variable_length =
484            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
485
486        let operator: Box<dyn Operator> = if is_variable_length {
487            // Use VariableLengthExpandOperator for multi-hop paths
488            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
489            let mut expand_op = VariableLengthExpandOperator::new(
490                Arc::clone(&self.store),
491                input_op,
492                source_column,
493                direction,
494                expand.edge_type.clone(),
495                expand.min_hops,
496                max_hops,
497            )
498            .with_tx_context(self.viewing_epoch, self.tx_id);
499
500            // If a path alias is set, enable path length output
501            if expand.path_alias.is_some() {
502                expand_op = expand_op.with_path_length_output();
503            }
504
505            Box::new(expand_op)
506        } else {
507            // Use simple ExpandOperator for single-hop paths
508            let expand_op = ExpandOperator::new(
509                Arc::clone(&self.store),
510                input_op,
511                source_column,
512                direction,
513                expand.edge_type.clone(),
514            )
515            .with_tx_context(self.viewing_epoch, self.tx_id);
516            Box::new(expand_op)
517        };
518
519        // Build output columns: [input_columns..., edge, target, (path_length)?]
520        // Preserve all input columns and add edge + target to match ExpandOperator output
521        let mut columns = input_columns;
522
523        // Generate edge column name - use provided name or generate anonymous name
524        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
525            let count = self.anon_edge_counter.get();
526            self.anon_edge_counter.set(count + 1);
527            format!("_anon_edge_{}", count)
528        });
529        columns.push(edge_col_name);
530
531        columns.push(expand.to_variable.clone());
532
533        // If a path alias is set, add a column for the path length
534        if let Some(ref path_alias) = expand.path_alias {
535            columns.push(format!("_path_length_{}", path_alias));
536        }
537
538        Ok((operator, columns))
539    }
540
541    /// Plans a chain of consecutive expand operations using factorized execution.
542    ///
543    /// This avoids the Cartesian product explosion that occurs with separate expands.
544    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
545    ///
546    /// The chain is executed lazily at query time, not during planning. This ensures
547    /// that any filters applied above the expand chain are properly respected.
548    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
549        let expands = Self::collect_expand_chain(op);
550        if expands.is_empty() {
551            return Err(Error::Internal("Empty expand chain".to_string()));
552        }
553
554        // Get the base operator (before first expand)
555        let first_expand = expands[0];
556        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
557
558        let mut columns = base_columns.clone();
559        let mut steps = Vec::new();
560
561        // Track the level-local source column for each expand
562        // For the first expand, it's the column in the input (base_columns)
563        // For subsequent expands, the target from the previous level is always at index 1
564        // (each level adds [edge, target], so target is at index 1)
565        let mut is_first = true;
566
567        for expand in &expands {
568            // Find source column for this expand
569            let source_column = if is_first {
570                // For first expand, find in base columns
571                base_columns
572                    .iter()
573                    .position(|c| c == &expand.from_variable)
574                    .ok_or_else(|| {
575                        Error::Internal(format!(
576                            "Source variable '{}' not found in base columns",
577                            expand.from_variable
578                        ))
579                    })?
580            } else {
581                // For subsequent expands, the target from the previous level is at index 1
582                // (each level adds [edge, target], so target is the second column)
583                1
584            };
585
586            // Convert direction
587            let direction = match expand.direction {
588                ExpandDirection::Outgoing => Direction::Outgoing,
589                ExpandDirection::Incoming => Direction::Incoming,
590                ExpandDirection::Both => Direction::Both,
591            };
592
593            // Add expand step configuration
594            steps.push(ExpandStep {
595                source_column,
596                direction,
597                edge_type: expand.edge_type.clone(),
598            });
599
600            // Add edge and target columns
601            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
602                let count = self.anon_edge_counter.get();
603                self.anon_edge_counter.set(count + 1);
604                format!("_anon_edge_{}", count)
605            });
606            columns.push(edge_col_name);
607            columns.push(expand.to_variable.clone());
608
609            is_first = false;
610        }
611
612        // Create lazy operator that executes at query time, not planning time
613        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
614
615        if let Some(tx_id) = self.tx_id {
616            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
617        } else {
618            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
619        }
620
621        Ok((Box::new(lazy_op), columns))
622    }
623
624    /// Plans a RETURN clause.
625    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
626        // Plan the input operator
627        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
628
629        // Build variable to column index mapping
630        let variable_columns: HashMap<String, usize> = input_columns
631            .iter()
632            .enumerate()
633            .map(|(i, name)| (name.clone(), i))
634            .collect();
635
636        // Extract column names from return items
637        let columns: Vec<String> = ret
638            .items
639            .iter()
640            .map(|item| {
641                item.alias.clone().unwrap_or_else(|| {
642                    // Generate a default name from the expression
643                    expression_to_string(&item.expression)
644                })
645            })
646            .collect();
647
648        // Check if we need a project operator (for property access or expression evaluation)
649        let needs_project = ret
650            .items
651            .iter()
652            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
653
654        if needs_project {
655            // Build project expressions
656            let mut projections = Vec::with_capacity(ret.items.len());
657            let mut output_types = Vec::with_capacity(ret.items.len());
658
659            for item in &ret.items {
660                match &item.expression {
661                    LogicalExpression::Variable(name) => {
662                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
663                            Error::Internal(format!("Variable '{}' not found in input", name))
664                        })?;
665                        projections.push(ProjectExpr::Column(col_idx));
666                        // Use Node type for variables (they could be nodes, edges, or values)
667                        output_types.push(LogicalType::Node);
668                    }
669                    LogicalExpression::Property { variable, property } => {
670                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
671                            Error::Internal(format!("Variable '{}' not found in input", variable))
672                        })?;
673                        projections.push(ProjectExpr::PropertyAccess {
674                            column: col_idx,
675                            property: property.clone(),
676                        });
677                        // Property could be any type - use Any/Generic to preserve type
678                        output_types.push(LogicalType::Any);
679                    }
680                    LogicalExpression::Literal(value) => {
681                        projections.push(ProjectExpr::Constant(value.clone()));
682                        output_types.push(value_to_logical_type(value));
683                    }
684                    LogicalExpression::FunctionCall { name, args, .. } => {
685                        // Handle built-in functions
686                        match name.to_lowercase().as_str() {
687                            "type" => {
688                                // type(r) returns the edge type string
689                                if args.len() != 1 {
690                                    return Err(Error::Internal(
691                                        "type() requires exactly one argument".to_string(),
692                                    ));
693                                }
694                                if let LogicalExpression::Variable(var_name) = &args[0] {
695                                    let col_idx =
696                                        *variable_columns.get(var_name).ok_or_else(|| {
697                                            Error::Internal(format!(
698                                                "Variable '{}' not found in input",
699                                                var_name
700                                            ))
701                                        })?;
702                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
703                                    output_types.push(LogicalType::String);
704                                } else {
705                                    return Err(Error::Internal(
706                                        "type() argument must be a variable".to_string(),
707                                    ));
708                                }
709                            }
710                            "length" => {
711                                // length(p) returns the path length
712                                // For shortestPath results, the path column already contains the length
713                                if args.len() != 1 {
714                                    return Err(Error::Internal(
715                                        "length() requires exactly one argument".to_string(),
716                                    ));
717                                }
718                                if let LogicalExpression::Variable(var_name) = &args[0] {
719                                    let col_idx =
720                                        *variable_columns.get(var_name).ok_or_else(|| {
721                                            Error::Internal(format!(
722                                                "Variable '{}' not found in input",
723                                                var_name
724                                            ))
725                                        })?;
726                                    // Pass through the column value directly
727                                    projections.push(ProjectExpr::Column(col_idx));
728                                    output_types.push(LogicalType::Int64);
729                                } else {
730                                    return Err(Error::Internal(
731                                        "length() argument must be a variable".to_string(),
732                                    ));
733                                }
734                            }
735                            // For other functions (head, tail, size, etc.), use expression evaluation
736                            _ => {
737                                let filter_expr = self.convert_expression(&item.expression)?;
738                                projections.push(ProjectExpr::Expression {
739                                    expr: filter_expr,
740                                    variable_columns: variable_columns.clone(),
741                                });
742                                output_types.push(LogicalType::Any);
743                            }
744                        }
745                    }
746                    LogicalExpression::Case { .. } => {
747                        // Convert CASE expression to FilterExpression for evaluation
748                        let filter_expr = self.convert_expression(&item.expression)?;
749                        projections.push(ProjectExpr::Expression {
750                            expr: filter_expr,
751                            variable_columns: variable_columns.clone(),
752                        });
753                        // CASE can return any type - use Any
754                        output_types.push(LogicalType::Any);
755                    }
756                    _ => {
757                        return Err(Error::Internal(format!(
758                            "Unsupported RETURN expression: {:?}",
759                            item.expression
760                        )));
761                    }
762                }
763            }
764
765            let operator = Box::new(ProjectOperator::with_store(
766                input_op,
767                projections,
768                output_types,
769                Arc::clone(&self.store),
770            ));
771
772            Ok((operator, columns))
773        } else {
774            // Simple case: just return variables
775            // Re-order columns to match return items if needed
776            let mut projections = Vec::with_capacity(ret.items.len());
777            let mut output_types = Vec::with_capacity(ret.items.len());
778
779            for item in &ret.items {
780                if let LogicalExpression::Variable(name) = &item.expression {
781                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
782                        Error::Internal(format!("Variable '{}' not found in input", name))
783                    })?;
784                    projections.push(ProjectExpr::Column(col_idx));
785                    output_types.push(LogicalType::Node);
786                }
787            }
788
789            // Only add ProjectOperator if reordering is needed
790            if projections.len() == input_columns.len()
791                && projections
792                    .iter()
793                    .enumerate()
794                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
795            {
796                // No reordering needed
797                Ok((input_op, columns))
798            } else {
799                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
800                Ok((operator, columns))
801            }
802        }
803    }
804
805    /// Plans a project operator (for WITH clause).
806    fn plan_project(
807        &self,
808        project: &crate::query::plan::ProjectOp,
809    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
810        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
811        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
812            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
813                // Create a single-row operator for projecting literals
814                let single_row_op: Box<dyn Operator> = Box::new(
815                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
816                );
817                (single_row_op, Vec::new())
818            } else {
819                self.plan_operator(&project.input)?
820            };
821
822        // Build variable to column index mapping
823        let variable_columns: HashMap<String, usize> = input_columns
824            .iter()
825            .enumerate()
826            .map(|(i, name)| (name.clone(), i))
827            .collect();
828
829        // Build projections and new column names
830        let mut projections = Vec::with_capacity(project.projections.len());
831        let mut output_types = Vec::with_capacity(project.projections.len());
832        let mut output_columns = Vec::with_capacity(project.projections.len());
833
834        for projection in &project.projections {
835            // Determine the output column name (alias or expression string)
836            let col_name = projection
837                .alias
838                .clone()
839                .unwrap_or_else(|| expression_to_string(&projection.expression));
840            output_columns.push(col_name);
841
842            match &projection.expression {
843                LogicalExpression::Variable(name) => {
844                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
845                        Error::Internal(format!("Variable '{}' not found in input", name))
846                    })?;
847                    projections.push(ProjectExpr::Column(col_idx));
848                    output_types.push(LogicalType::Node);
849                }
850                LogicalExpression::Property { variable, property } => {
851                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
852                        Error::Internal(format!("Variable '{}' not found in input", variable))
853                    })?;
854                    projections.push(ProjectExpr::PropertyAccess {
855                        column: col_idx,
856                        property: property.clone(),
857                    });
858                    output_types.push(LogicalType::Any);
859                }
860                LogicalExpression::Literal(value) => {
861                    projections.push(ProjectExpr::Constant(value.clone()));
862                    output_types.push(value_to_logical_type(value));
863                }
864                _ => {
865                    // For complex expressions, use full expression evaluation
866                    let filter_expr = self.convert_expression(&projection.expression)?;
867                    projections.push(ProjectExpr::Expression {
868                        expr: filter_expr,
869                        variable_columns: variable_columns.clone(),
870                    });
871                    output_types.push(LogicalType::Any);
872                }
873            }
874        }
875
876        let operator = Box::new(ProjectOperator::with_store(
877            input_op,
878            projections,
879            output_types,
880            Arc::clone(&self.store),
881        ));
882
883        Ok((operator, output_columns))
884    }
885
886    /// Plans a filter operator.
887    ///
888    /// Uses zone map pre-filtering to potentially skip scans when predicates
889    /// definitely won't match any data. Also uses property indexes when available
890    /// for O(1) lookups instead of full scans.
891    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
892        // Check zone maps for simple property predicates before scanning
893        // If zone map says "definitely no matches", we can short-circuit
894        if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
895            // Zone map says no matches possible - return empty result
896            let (_, columns) = self.plan_operator(&filter.input)?;
897            let schema = self.derive_schema_from_columns(&columns);
898            let empty_op = Box::new(EmptyOperator::new(schema));
899            return Ok((empty_op, columns));
900        }
901
902        // Try to use property index for equality predicates on indexed properties
903        if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
904            return Ok(result);
905        }
906
907        // Try to use range optimization for range predicates (>, <, >=, <=)
908        if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
909            return Ok(result);
910        }
911
912        // Plan the input operator first
913        let (input_op, columns) = self.plan_operator(&filter.input)?;
914
915        // Build variable to column index mapping
916        let variable_columns: HashMap<String, usize> = columns
917            .iter()
918            .enumerate()
919            .map(|(i, name)| (name.clone(), i))
920            .collect();
921
922        // Convert logical expression to filter expression
923        let filter_expr = self.convert_expression(&filter.predicate)?;
924
925        // Create the predicate
926        let predicate =
927            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
928
929        // Create the filter operator
930        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
931
932        Ok((operator, columns))
933    }
934
935    /// Checks zone maps for a predicate to see if we can skip the scan entirely.
936    ///
937    /// Returns:
938    /// - `Some(false)` if zone map proves no matches possible (can skip)
939    /// - `Some(true)` if zone map says matches might exist
940    /// - `None` if zone map check not applicable
941    fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
942        use grafeo_core::graph::lpg::CompareOp;
943
944        match predicate {
945            LogicalExpression::Binary { left, op, right } => {
946                // Check for AND/OR first (compound conditions)
947                match op {
948                    BinaryOp::And => {
949                        let left_result = self.check_zone_map_for_predicate(left);
950                        let right_result = self.check_zone_map_for_predicate(right);
951
952                        return match (left_result, right_result) {
953                            // If either side definitely won't match, the AND won't match
954                            (Some(false), _) | (_, Some(false)) => Some(false),
955                            // If both might match, might match overall
956                            (Some(true), Some(true)) => Some(true),
957                            // Otherwise, can't determine
958                            _ => None,
959                        };
960                    }
961                    BinaryOp::Or => {
962                        let left_result = self.check_zone_map_for_predicate(left);
963                        let right_result = self.check_zone_map_for_predicate(right);
964
965                        return match (left_result, right_result) {
966                            // Both sides definitely won't match
967                            (Some(false), Some(false)) => Some(false),
968                            // At least one side might match
969                            (Some(true), _) | (_, Some(true)) => Some(true),
970                            // Otherwise, can't determine
971                            _ => None,
972                        };
973                    }
974                    _ => {}
975                }
976
977                // Simple property comparison: n.property op value
978                let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
979                    (
980                        LogicalExpression::Property { property, .. },
981                        LogicalExpression::Literal(val),
982                    ) => {
983                        let cmp = match op {
984                            BinaryOp::Eq => CompareOp::Eq,
985                            BinaryOp::Ne => CompareOp::Ne,
986                            BinaryOp::Lt => CompareOp::Lt,
987                            BinaryOp::Le => CompareOp::Le,
988                            BinaryOp::Gt => CompareOp::Gt,
989                            BinaryOp::Ge => CompareOp::Ge,
990                            _ => return None,
991                        };
992                        (property.clone(), cmp, val.clone())
993                    }
994                    (
995                        LogicalExpression::Literal(val),
996                        LogicalExpression::Property { property, .. },
997                    ) => {
998                        // Flip comparison for reversed operands
999                        let cmp = match op {
1000                            BinaryOp::Eq => CompareOp::Eq,
1001                            BinaryOp::Ne => CompareOp::Ne,
1002                            BinaryOp::Lt => CompareOp::Gt, // val < prop means prop > val
1003                            BinaryOp::Le => CompareOp::Ge,
1004                            BinaryOp::Gt => CompareOp::Lt,
1005                            BinaryOp::Ge => CompareOp::Le,
1006                            _ => return None,
1007                        };
1008                        (property.clone(), cmp, val.clone())
1009                    }
1010                    _ => return None,
1011                };
1012
1013                // Check zone map for node properties
1014                let might_match =
1015                    self.store
1016                        .node_property_might_match(&property.into(), compare_op, &value);
1017
1018                Some(might_match)
1019            }
1020
1021            _ => None,
1022        }
1023    }
1024
1025    /// Tries to use a property index for filter optimization.
1026    ///
1027    /// When a filter predicate is an equality check on an indexed property,
1028    /// and the input is a simple NodeScan, we can use the index to look up
1029    /// matching nodes directly instead of scanning all nodes.
1030    ///
1031    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1032    /// `Ok(None)` if not applicable, or `Err` on error.
1033    fn try_plan_filter_with_property_index(
1034        &self,
1035        filter: &FilterOp,
1036    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1037        // Only optimize if input is a simple NodeScan (not nested)
1038        let (scan_variable, scan_label) = match filter.input.as_ref() {
1039            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1040                (scan.variable.clone(), scan.label.clone())
1041            }
1042            _ => return Ok(None),
1043        };
1044
1045        // Extract property equality conditions from the predicate
1046        // Handles both simple (n.prop = val) and compound (n.a = 1 AND n.b = 2)
1047        let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1048
1049        if conditions.is_empty() {
1050            return Ok(None);
1051        }
1052
1053        // Check if at least one condition has an index (otherwise full scan is needed anyway)
1054        let has_indexed_condition = conditions
1055            .iter()
1056            .any(|(prop, _)| self.store.has_property_index(prop));
1057
1058        if !has_indexed_condition {
1059            return Ok(None);
1060        }
1061
1062        // Use the optimized batch lookup for multiple conditions
1063        let conditions_ref: Vec<(&str, Value)> = conditions
1064            .iter()
1065            .map(|(p, v)| (p.as_str(), v.clone()))
1066            .collect();
1067        let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1068
1069        // If there's a label filter, also filter by label
1070        if let Some(label) = &scan_label {
1071            let label_nodes: std::collections::HashSet<_> =
1072                self.store.nodes_by_label(label).into_iter().collect();
1073            matching_nodes.retain(|n| label_nodes.contains(n));
1074        }
1075
1076        // Create a NodeListOperator with the matching nodes
1077        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1078        let columns = vec![scan_variable];
1079
1080        Ok(Some((node_list_op, columns)))
1081    }
1082
1083    /// Extracts equality conditions (property = literal) from a predicate.
1084    ///
1085    /// Handles both simple predicates and AND chains:
1086    /// - `n.name = "Alice"` → `[("name", "Alice")]`
1087    /// - `n.name = "Alice" AND n.age = 30` → `[("name", "Alice"), ("age", 30)]`
1088    fn extract_equality_conditions(
1089        &self,
1090        predicate: &LogicalExpression,
1091        target_variable: &str,
1092    ) -> Vec<(String, Value)> {
1093        let mut conditions = Vec::new();
1094        self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1095        conditions
1096    }
1097
1098    /// Recursively collects equality conditions from AND expressions.
1099    fn collect_equality_conditions(
1100        &self,
1101        expr: &LogicalExpression,
1102        target_variable: &str,
1103        conditions: &mut Vec<(String, Value)>,
1104    ) {
1105        match expr {
1106            // Handle AND: recurse into both sides
1107            LogicalExpression::Binary {
1108                left,
1109                op: BinaryOp::And,
1110                right,
1111            } => {
1112                self.collect_equality_conditions(left, target_variable, conditions);
1113                self.collect_equality_conditions(right, target_variable, conditions);
1114            }
1115
1116            // Handle equality: extract property and value
1117            LogicalExpression::Binary {
1118                left,
1119                op: BinaryOp::Eq,
1120                right,
1121            } => {
1122                if let Some((var, prop, val)) = self.extract_property_equality(left, right) {
1123                    if var == target_variable {
1124                        conditions.push((prop, val));
1125                    }
1126                }
1127            }
1128
1129            _ => {}
1130        }
1131    }
1132
1133    /// Extracts (variable, property, value) from a property equality expression.
1134    fn extract_property_equality(
1135        &self,
1136        left: &LogicalExpression,
1137        right: &LogicalExpression,
1138    ) -> Option<(String, String, Value)> {
1139        match (left, right) {
1140            (
1141                LogicalExpression::Property { variable, property },
1142                LogicalExpression::Literal(val),
1143            ) => Some((variable.clone(), property.clone(), val.clone())),
1144            (
1145                LogicalExpression::Literal(val),
1146                LogicalExpression::Property { variable, property },
1147            ) => Some((variable.clone(), property.clone(), val.clone())),
1148            _ => None,
1149        }
1150    }
1151
1152    /// Tries to optimize a filter using range queries on properties.
1153    ///
1154    /// This optimization is applied when:
1155    /// - The input is a simple NodeScan (no nested operations)
1156    /// - The predicate contains range comparisons (>, <, >=, <=)
1157    /// - The same variable and property are being filtered
1158    ///
1159    /// Handles both simple range predicates (`n.age > 30`) and BETWEEN patterns
1160    /// (`n.age >= 30 AND n.age <= 50`).
1161    ///
1162    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1163    /// `Ok(None)` if not applicable, or `Err` on error.
1164    fn try_plan_filter_with_range_index(
1165        &self,
1166        filter: &FilterOp,
1167    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1168        // Only optimize if input is a simple NodeScan (not nested)
1169        let (scan_variable, scan_label) = match filter.input.as_ref() {
1170            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1171                (scan.variable.clone(), scan.label.clone())
1172            }
1173            _ => return Ok(None),
1174        };
1175
1176        // Try to extract BETWEEN pattern first (more efficient)
1177        if let Some((variable, property, min, max, min_inc, max_inc)) =
1178            self.extract_between_predicate(&filter.predicate)
1179        {
1180            if variable == scan_variable {
1181                return self.plan_range_filter(
1182                    &scan_variable,
1183                    &scan_label,
1184                    &property,
1185                    RangeBounds {
1186                        min: Some(&min),
1187                        max: Some(&max),
1188                        min_inclusive: min_inc,
1189                        max_inclusive: max_inc,
1190                    },
1191                );
1192            }
1193        }
1194
1195        // Try to extract simple range predicate
1196        if let Some((variable, property, op, value)) =
1197            self.extract_range_predicate(&filter.predicate)
1198        {
1199            if variable == scan_variable {
1200                let (min, max, min_inc, max_inc) = match op {
1201                    BinaryOp::Lt => (None, Some(value), false, false),
1202                    BinaryOp::Le => (None, Some(value), false, true),
1203                    BinaryOp::Gt => (Some(value), None, false, false),
1204                    BinaryOp::Ge => (Some(value), None, true, false),
1205                    _ => return Ok(None),
1206                };
1207                return self.plan_range_filter(
1208                    &scan_variable,
1209                    &scan_label,
1210                    &property,
1211                    RangeBounds {
1212                        min: min.as_ref(),
1213                        max: max.as_ref(),
1214                        min_inclusive: min_inc,
1215                        max_inclusive: max_inc,
1216                    },
1217                );
1218            }
1219        }
1220
1221        Ok(None)
1222    }
1223
1224    /// Plans a range filter using `find_nodes_in_range`.
1225    fn plan_range_filter(
1226        &self,
1227        scan_variable: &str,
1228        scan_label: &Option<String>,
1229        property: &str,
1230        bounds: RangeBounds<'_>,
1231    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1232        // Use the store's range query method
1233        let mut matching_nodes = self.store.find_nodes_in_range(
1234            property,
1235            bounds.min,
1236            bounds.max,
1237            bounds.min_inclusive,
1238            bounds.max_inclusive,
1239        );
1240
1241        // If there's a label filter, also filter by label
1242        if let Some(label) = scan_label {
1243            let label_nodes: std::collections::HashSet<_> =
1244                self.store.nodes_by_label(label).into_iter().collect();
1245            matching_nodes.retain(|n| label_nodes.contains(n));
1246        }
1247
1248        // Create a NodeListOperator with the matching nodes
1249        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1250        let columns = vec![scan_variable.to_string()];
1251
1252        Ok(Some((node_list_op, columns)))
1253    }
1254
1255    /// Extracts a simple range predicate (>, <, >=, <=) from an expression.
1256    ///
1257    /// Returns `(variable, property, operator, value)` if found.
1258    fn extract_range_predicate(
1259        &self,
1260        predicate: &LogicalExpression,
1261    ) -> Option<(String, String, BinaryOp, Value)> {
1262        match predicate {
1263            LogicalExpression::Binary { left, op, right } => {
1264                match op {
1265                    BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1266                        // Try property on left: n.age > 30
1267                        if let (
1268                            LogicalExpression::Property { variable, property },
1269                            LogicalExpression::Literal(val),
1270                        ) = (left.as_ref(), right.as_ref())
1271                        {
1272                            return Some((variable.clone(), property.clone(), *op, val.clone()));
1273                        }
1274
1275                        // Try property on right: 30 < n.age (flip operator)
1276                        if let (
1277                            LogicalExpression::Literal(val),
1278                            LogicalExpression::Property { variable, property },
1279                        ) = (left.as_ref(), right.as_ref())
1280                        {
1281                            let flipped_op = match op {
1282                                BinaryOp::Lt => BinaryOp::Gt,
1283                                BinaryOp::Le => BinaryOp::Ge,
1284                                BinaryOp::Gt => BinaryOp::Lt,
1285                                BinaryOp::Ge => BinaryOp::Le,
1286                                _ => return None,
1287                            };
1288                            return Some((
1289                                variable.clone(),
1290                                property.clone(),
1291                                flipped_op,
1292                                val.clone(),
1293                            ));
1294                        }
1295                    }
1296                    _ => {}
1297                }
1298            }
1299            _ => {}
1300        }
1301        None
1302    }
1303
1304    /// Extracts a BETWEEN pattern from compound predicates.
1305    ///
1306    /// Recognizes patterns like:
1307    /// - `n.age >= 30 AND n.age <= 50`
1308    /// - `n.age > 30 AND n.age < 50`
1309    ///
1310    /// Returns `(variable, property, min_value, max_value, min_inclusive, max_inclusive)`.
1311    fn extract_between_predicate(
1312        &self,
1313        predicate: &LogicalExpression,
1314    ) -> Option<(String, String, Value, Value, bool, bool)> {
1315        // Must be an AND expression
1316        let (left, right) = match predicate {
1317            LogicalExpression::Binary {
1318                left,
1319                op: BinaryOp::And,
1320                right,
1321            } => (left.as_ref(), right.as_ref()),
1322            _ => return None,
1323        };
1324
1325        // Extract range predicates from both sides
1326        let left_range = self.extract_range_predicate(left);
1327        let right_range = self.extract_range_predicate(right);
1328
1329        let (left_var, left_prop, left_op, left_val) = left_range?;
1330        let (right_var, right_prop, right_op, right_val) = right_range?;
1331
1332        // Must be same variable and property
1333        if left_var != right_var || left_prop != right_prop {
1334            return None;
1335        }
1336
1337        // Determine which is lower bound and which is upper bound
1338        let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1339            // n.x >= min AND n.x <= max
1340            (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1341            // n.x >= min AND n.x < max
1342            (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1343            // n.x > min AND n.x <= max
1344            (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1345            // n.x > min AND n.x < max
1346            (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1347            // Reversed order: n.x <= max AND n.x >= min
1348            (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1349            // n.x < max AND n.x >= min
1350            (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1351            // n.x <= max AND n.x > min
1352            (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1353            // n.x < max AND n.x > min
1354            (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1355            _ => return None,
1356        };
1357
1358        Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1359    }
1360
1361    /// Plans a LIMIT operator.
1362    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1363        let (input_op, columns) = self.plan_operator(&limit.input)?;
1364        let output_schema = self.derive_schema_from_columns(&columns);
1365        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1366        Ok((operator, columns))
1367    }
1368
1369    /// Plans a SKIP operator.
1370    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1371        let (input_op, columns) = self.plan_operator(&skip.input)?;
1372        let output_schema = self.derive_schema_from_columns(&columns);
1373        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1374        Ok((operator, columns))
1375    }
1376
1377    /// Plans a SORT (ORDER BY) operator.
1378    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1379        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1380
1381        // Build variable to column index mapping
1382        let mut variable_columns: HashMap<String, usize> = input_columns
1383            .iter()
1384            .enumerate()
1385            .map(|(i, name)| (name.clone(), i))
1386            .collect();
1387
1388        // Collect property expressions that need to be projected before sorting
1389        let mut property_projections: Vec<(String, String, String)> = Vec::new();
1390        let mut next_col_idx = input_columns.len();
1391
1392        for key in &sort.keys {
1393            if let LogicalExpression::Property { variable, property } = &key.expression {
1394                let col_name = format!("{}_{}", variable, property);
1395                if !variable_columns.contains_key(&col_name) {
1396                    property_projections.push((
1397                        variable.clone(),
1398                        property.clone(),
1399                        col_name.clone(),
1400                    ));
1401                    variable_columns.insert(col_name, next_col_idx);
1402                    next_col_idx += 1;
1403                }
1404            }
1405        }
1406
1407        // Track output columns
1408        let mut output_columns = input_columns.clone();
1409
1410        // If we have property expressions, add a projection to materialize them
1411        if !property_projections.is_empty() {
1412            let mut projections = Vec::new();
1413            let mut output_types = Vec::new();
1414
1415            // First, pass through all existing columns (use Node type to preserve node IDs
1416            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1417            for (i, _) in input_columns.iter().enumerate() {
1418                projections.push(ProjectExpr::Column(i));
1419                output_types.push(LogicalType::Node);
1420            }
1421
1422            // Then add property access projections
1423            for (variable, property, col_name) in &property_projections {
1424                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1425                    Error::Internal(format!(
1426                        "Variable '{}' not found for ORDER BY property projection",
1427                        variable
1428                    ))
1429                })?;
1430                projections.push(ProjectExpr::PropertyAccess {
1431                    column: source_col,
1432                    property: property.clone(),
1433                });
1434                output_types.push(LogicalType::Any);
1435                output_columns.push(col_name.clone());
1436            }
1437
1438            input_op = Box::new(ProjectOperator::with_store(
1439                input_op,
1440                projections,
1441                output_types,
1442                Arc::clone(&self.store),
1443            ));
1444        }
1445
1446        // Convert logical sort keys to physical sort keys
1447        let physical_keys: Vec<PhysicalSortKey> = sort
1448            .keys
1449            .iter()
1450            .map(|key| {
1451                let col_idx = self
1452                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1453                Ok(PhysicalSortKey {
1454                    column: col_idx,
1455                    direction: match key.order {
1456                        SortOrder::Ascending => SortDirection::Ascending,
1457                        SortOrder::Descending => SortDirection::Descending,
1458                    },
1459                    null_order: NullOrder::NullsLast,
1460                })
1461            })
1462            .collect::<Result<Vec<_>>>()?;
1463
1464        let output_schema = self.derive_schema_from_columns(&output_columns);
1465        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1466        Ok((operator, output_columns))
1467    }
1468
1469    /// Resolves a sort expression to a column index, using projected property columns.
1470    fn resolve_sort_expression_with_properties(
1471        &self,
1472        expr: &LogicalExpression,
1473        variable_columns: &HashMap<String, usize>,
1474    ) -> Result<usize> {
1475        match expr {
1476            LogicalExpression::Variable(name) => {
1477                variable_columns.get(name).copied().ok_or_else(|| {
1478                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1479                })
1480            }
1481            LogicalExpression::Property { variable, property } => {
1482                // Look up the projected property column (e.g., "p_age" for p.age)
1483                let col_name = format!("{}_{}", variable, property);
1484                variable_columns.get(&col_name).copied().ok_or_else(|| {
1485                    Error::Internal(format!(
1486                        "Property column '{}' not found for ORDER BY (from {}.{})",
1487                        col_name, variable, property
1488                    ))
1489                })
1490            }
1491            _ => Err(Error::Internal(format!(
1492                "Unsupported ORDER BY expression: {:?}",
1493                expr
1494            ))),
1495        }
1496    }
1497
1498    /// Derives a schema from column names (uses Any type to handle all value types).
1499    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1500        columns.iter().map(|_| LogicalType::Any).collect()
1501    }
1502
1503    /// Plans an AGGREGATE operator.
1504    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1505        // Check if we can use factorized aggregation for speedup
1506        // Conditions:
1507        // 1. Factorized execution is enabled
1508        // 2. Input is an expand chain (multi-hop)
1509        // 3. No GROUP BY
1510        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1511        if self.factorized_execution
1512            && agg.group_by.is_empty()
1513            && Self::count_expand_chain(&agg.input).0 >= 2
1514            && self.is_simple_aggregate(agg)
1515        {
1516            if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1517                return Ok((op, cols));
1518            }
1519            // Fall through to regular aggregate if factorized planning fails
1520        }
1521
1522        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1523
1524        // Build variable to column index mapping
1525        let mut variable_columns: HashMap<String, usize> = input_columns
1526            .iter()
1527            .enumerate()
1528            .map(|(i, name)| (name.clone(), i))
1529            .collect();
1530
1531        // Collect all property expressions that need to be projected before aggregation
1532        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1533        let mut next_col_idx = input_columns.len();
1534
1535        // Check group-by expressions for properties
1536        for expr in &agg.group_by {
1537            if let LogicalExpression::Property { variable, property } = expr {
1538                let col_name = format!("{}_{}", variable, property);
1539                if !variable_columns.contains_key(&col_name) {
1540                    property_projections.push((
1541                        variable.clone(),
1542                        property.clone(),
1543                        col_name.clone(),
1544                    ));
1545                    variable_columns.insert(col_name, next_col_idx);
1546                    next_col_idx += 1;
1547                }
1548            }
1549        }
1550
1551        // Check aggregate expressions for properties
1552        for agg_expr in &agg.aggregates {
1553            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1554                let col_name = format!("{}_{}", variable, property);
1555                if !variable_columns.contains_key(&col_name) {
1556                    property_projections.push((
1557                        variable.clone(),
1558                        property.clone(),
1559                        col_name.clone(),
1560                    ));
1561                    variable_columns.insert(col_name, next_col_idx);
1562                    next_col_idx += 1;
1563                }
1564            }
1565        }
1566
1567        // If we have property expressions, add a projection to materialize them
1568        if !property_projections.is_empty() {
1569            let mut projections = Vec::new();
1570            let mut output_types = Vec::new();
1571
1572            // First, pass through all existing columns (use Node type to preserve node IDs
1573            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1574            for (i, _) in input_columns.iter().enumerate() {
1575                projections.push(ProjectExpr::Column(i));
1576                output_types.push(LogicalType::Node);
1577            }
1578
1579            // Then add property access projections
1580            for (variable, property, _col_name) in &property_projections {
1581                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1582                    Error::Internal(format!(
1583                        "Variable '{}' not found for property projection",
1584                        variable
1585                    ))
1586                })?;
1587                projections.push(ProjectExpr::PropertyAccess {
1588                    column: source_col,
1589                    property: property.clone(),
1590                });
1591                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1592            }
1593
1594            input_op = Box::new(ProjectOperator::with_store(
1595                input_op,
1596                projections,
1597                output_types,
1598                Arc::clone(&self.store),
1599            ));
1600        }
1601
1602        // Convert group-by expressions to column indices
1603        let group_columns: Vec<usize> = agg
1604            .group_by
1605            .iter()
1606            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1607            .collect::<Result<Vec<_>>>()?;
1608
1609        // Convert aggregate expressions to physical form
1610        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1611            .aggregates
1612            .iter()
1613            .map(|agg_expr| {
1614                let column = agg_expr
1615                    .expression
1616                    .as_ref()
1617                    .map(|e| {
1618                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1619                    })
1620                    .transpose()?;
1621
1622                Ok(PhysicalAggregateExpr {
1623                    function: convert_aggregate_function(agg_expr.function),
1624                    column,
1625                    distinct: agg_expr.distinct,
1626                    alias: agg_expr.alias.clone(),
1627                    percentile: agg_expr.percentile,
1628                })
1629            })
1630            .collect::<Result<Vec<_>>>()?;
1631
1632        // Build output schema and column names
1633        let mut output_schema = Vec::new();
1634        let mut output_columns = Vec::new();
1635
1636        // Add group-by columns
1637        for expr in &agg.group_by {
1638            output_schema.push(LogicalType::Any); // Group-by values can be any type
1639            output_columns.push(expression_to_string(expr));
1640        }
1641
1642        // Add aggregate result columns
1643        for agg_expr in &agg.aggregates {
1644            let result_type = match agg_expr.function {
1645                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1646                    LogicalType::Int64
1647                }
1648                LogicalAggregateFunction::Sum => LogicalType::Int64,
1649                LogicalAggregateFunction::Avg => LogicalType::Float64,
1650                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1651                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1652                    // since the aggregate can return any Value type, but the most common case
1653                    // is numeric values from property expressions
1654                    LogicalType::Int64
1655                }
1656                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1657                // Statistical functions return Float64
1658                LogicalAggregateFunction::StdDev
1659                | LogicalAggregateFunction::StdDevPop
1660                | LogicalAggregateFunction::PercentileDisc
1661                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1662            };
1663            output_schema.push(result_type);
1664            output_columns.push(
1665                agg_expr
1666                    .alias
1667                    .clone()
1668                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1669            );
1670        }
1671
1672        // Choose operator based on whether there are group-by columns
1673        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1674            Box::new(SimpleAggregateOperator::new(
1675                input_op,
1676                physical_aggregates,
1677                output_schema,
1678            ))
1679        } else {
1680            Box::new(HashAggregateOperator::new(
1681                input_op,
1682                group_columns,
1683                physical_aggregates,
1684                output_schema,
1685            ))
1686        };
1687
1688        // Apply HAVING clause filter if present
1689        if let Some(having_expr) = &agg.having {
1690            // Build variable to column mapping for the aggregate output
1691            let having_var_columns: HashMap<String, usize> = output_columns
1692                .iter()
1693                .enumerate()
1694                .map(|(i, name)| (name.clone(), i))
1695                .collect();
1696
1697            let filter_expr = self.convert_expression(having_expr)?;
1698            let predicate =
1699                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1700            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1701        }
1702
1703        Ok((operator, output_columns))
1704    }
1705
1706    /// Checks if an aggregate is simple enough for factorized execution.
1707    ///
1708    /// Simple aggregates:
1709    /// - COUNT(*) or COUNT(variable)
1710    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1711    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1712        agg.aggregates.iter().all(|agg_expr| {
1713            match agg_expr.function {
1714                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1715                    // COUNT(*) is always OK, COUNT(var) is OK
1716                    agg_expr.expression.is_none()
1717                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1718                }
1719                LogicalAggregateFunction::Sum
1720                | LogicalAggregateFunction::Avg
1721                | LogicalAggregateFunction::Min
1722                | LogicalAggregateFunction::Max => {
1723                    // For now, only support when expression is a variable
1724                    // (property access would require flattening first)
1725                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1726                }
1727                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1728                _ => false,
1729            }
1730        })
1731    }
1732
1733    /// Plans a factorized aggregate that operates directly on factorized data.
1734    ///
1735    /// This avoids the O(n²) cost of flattening before aggregation.
1736    fn plan_factorized_aggregate(
1737        &self,
1738        agg: &AggregateOp,
1739    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1740        // Build the expand chain - this returns a LazyFactorizedChainOperator
1741        let expands = Self::collect_expand_chain(&agg.input);
1742        if expands.is_empty() {
1743            return Err(Error::Internal(
1744                "Expected expand chain for factorized aggregate".to_string(),
1745            ));
1746        }
1747
1748        // Get the base operator (before first expand)
1749        let first_expand = expands[0];
1750        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1751
1752        let mut columns = base_columns.clone();
1753        let mut steps = Vec::new();
1754        let mut is_first = true;
1755
1756        for expand in &expands {
1757            // Find source column for this expand
1758            let source_column = if is_first {
1759                base_columns
1760                    .iter()
1761                    .position(|c| c == &expand.from_variable)
1762                    .ok_or_else(|| {
1763                        Error::Internal(format!(
1764                            "Source variable '{}' not found in base columns",
1765                            expand.from_variable
1766                        ))
1767                    })?
1768            } else {
1769                1 // Target from previous level
1770            };
1771
1772            let direction = match expand.direction {
1773                ExpandDirection::Outgoing => Direction::Outgoing,
1774                ExpandDirection::Incoming => Direction::Incoming,
1775                ExpandDirection::Both => Direction::Both,
1776            };
1777
1778            steps.push(ExpandStep {
1779                source_column,
1780                direction,
1781                edge_type: expand.edge_type.clone(),
1782            });
1783
1784            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1785                let count = self.anon_edge_counter.get();
1786                self.anon_edge_counter.set(count + 1);
1787                format!("_anon_edge_{}", count)
1788            });
1789            columns.push(edge_col_name);
1790            columns.push(expand.to_variable.clone());
1791
1792            is_first = false;
1793        }
1794
1795        // Create the lazy factorized chain operator
1796        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1797
1798        if let Some(tx_id) = self.tx_id {
1799            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1800        } else {
1801            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1802        }
1803
1804        // Convert logical aggregates to factorized aggregates
1805        let factorized_aggs: Vec<FactorizedAggregate> = agg
1806            .aggregates
1807            .iter()
1808            .map(|agg_expr| {
1809                match agg_expr.function {
1810                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1811                        // COUNT(*) uses simple count, COUNT(col) uses column count
1812                        if agg_expr.expression.is_none() {
1813                            FactorizedAggregate::count()
1814                        } else {
1815                            // For COUNT(variable), we use the deepest level's target column
1816                            // which is the last column added to the schema
1817                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1818                        }
1819                    }
1820                    LogicalAggregateFunction::Sum => {
1821                        // SUM on deepest level target
1822                        FactorizedAggregate::sum(1)
1823                    }
1824                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1825                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1826                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1827                    _ => {
1828                        // Shouldn't reach here due to is_simple_aggregate check
1829                        FactorizedAggregate::count()
1830                    }
1831                }
1832            })
1833            .collect();
1834
1835        // Build output column names
1836        let output_columns: Vec<String> = agg
1837            .aggregates
1838            .iter()
1839            .map(|agg_expr| {
1840                agg_expr
1841                    .alias
1842                    .clone()
1843                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1844            })
1845            .collect();
1846
1847        // Create the factorized aggregate operator
1848        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1849
1850        Ok((Box::new(factorized_agg_op), output_columns))
1851    }
1852
1853    /// Resolves a logical expression to a column index.
1854    #[allow(dead_code)]
1855    fn resolve_expression_to_column(
1856        &self,
1857        expr: &LogicalExpression,
1858        variable_columns: &HashMap<String, usize>,
1859    ) -> Result<usize> {
1860        match expr {
1861            LogicalExpression::Variable(name) => variable_columns
1862                .get(name)
1863                .copied()
1864                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1865            LogicalExpression::Property { variable, .. } => variable_columns
1866                .get(variable)
1867                .copied()
1868                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1869            _ => Err(Error::Internal(format!(
1870                "Cannot resolve expression to column: {:?}",
1871                expr
1872            ))),
1873        }
1874    }
1875
1876    /// Resolves a logical expression to a column index, using projected property columns.
1877    ///
1878    /// This is used for aggregations where properties have been projected into their own columns.
1879    fn resolve_expression_to_column_with_properties(
1880        &self,
1881        expr: &LogicalExpression,
1882        variable_columns: &HashMap<String, usize>,
1883    ) -> Result<usize> {
1884        match expr {
1885            LogicalExpression::Variable(name) => variable_columns
1886                .get(name)
1887                .copied()
1888                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1889            LogicalExpression::Property { variable, property } => {
1890                // Look up the projected property column (e.g., "p_price" for p.price)
1891                let col_name = format!("{}_{}", variable, property);
1892                variable_columns.get(&col_name).copied().ok_or_else(|| {
1893                    Error::Internal(format!(
1894                        "Property column '{}' not found (from {}.{})",
1895                        col_name, variable, property
1896                    ))
1897                })
1898            }
1899            _ => Err(Error::Internal(format!(
1900                "Cannot resolve expression to column: {:?}",
1901                expr
1902            ))),
1903        }
1904    }
1905
1906    /// Converts a logical expression to a filter expression.
1907    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1908        match expr {
1909            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1910            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1911            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1912                variable: variable.clone(),
1913                property: property.clone(),
1914            }),
1915            LogicalExpression::Binary { left, op, right } => {
1916                let left_expr = self.convert_expression(left)?;
1917                let right_expr = self.convert_expression(right)?;
1918                let filter_op = convert_binary_op(*op)?;
1919                Ok(FilterExpression::Binary {
1920                    left: Box::new(left_expr),
1921                    op: filter_op,
1922                    right: Box::new(right_expr),
1923                })
1924            }
1925            LogicalExpression::Unary { op, operand } => {
1926                let operand_expr = self.convert_expression(operand)?;
1927                let filter_op = convert_unary_op(*op)?;
1928                Ok(FilterExpression::Unary {
1929                    op: filter_op,
1930                    operand: Box::new(operand_expr),
1931                })
1932            }
1933            LogicalExpression::FunctionCall { name, args, .. } => {
1934                let filter_args: Vec<FilterExpression> = args
1935                    .iter()
1936                    .map(|a| self.convert_expression(a))
1937                    .collect::<Result<Vec<_>>>()?;
1938                Ok(FilterExpression::FunctionCall {
1939                    name: name.clone(),
1940                    args: filter_args,
1941                })
1942            }
1943            LogicalExpression::Case {
1944                operand,
1945                when_clauses,
1946                else_clause,
1947            } => {
1948                let filter_operand = operand
1949                    .as_ref()
1950                    .map(|e| self.convert_expression(e))
1951                    .transpose()?
1952                    .map(Box::new);
1953                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1954                    .iter()
1955                    .map(|(cond, result)| {
1956                        Ok((
1957                            self.convert_expression(cond)?,
1958                            self.convert_expression(result)?,
1959                        ))
1960                    })
1961                    .collect::<Result<Vec<_>>>()?;
1962                let filter_else = else_clause
1963                    .as_ref()
1964                    .map(|e| self.convert_expression(e))
1965                    .transpose()?
1966                    .map(Box::new);
1967                Ok(FilterExpression::Case {
1968                    operand: filter_operand,
1969                    when_clauses: filter_when_clauses,
1970                    else_clause: filter_else,
1971                })
1972            }
1973            LogicalExpression::List(items) => {
1974                let filter_items: Vec<FilterExpression> = items
1975                    .iter()
1976                    .map(|item| self.convert_expression(item))
1977                    .collect::<Result<Vec<_>>>()?;
1978                Ok(FilterExpression::List(filter_items))
1979            }
1980            LogicalExpression::Map(pairs) => {
1981                let filter_pairs: Vec<(String, FilterExpression)> = pairs
1982                    .iter()
1983                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1984                    .collect::<Result<Vec<_>>>()?;
1985                Ok(FilterExpression::Map(filter_pairs))
1986            }
1987            LogicalExpression::IndexAccess { base, index } => {
1988                let base_expr = self.convert_expression(base)?;
1989                let index_expr = self.convert_expression(index)?;
1990                Ok(FilterExpression::IndexAccess {
1991                    base: Box::new(base_expr),
1992                    index: Box::new(index_expr),
1993                })
1994            }
1995            LogicalExpression::SliceAccess { base, start, end } => {
1996                let base_expr = self.convert_expression(base)?;
1997                let start_expr = start
1998                    .as_ref()
1999                    .map(|s| self.convert_expression(s))
2000                    .transpose()?
2001                    .map(Box::new);
2002                let end_expr = end
2003                    .as_ref()
2004                    .map(|e| self.convert_expression(e))
2005                    .transpose()?
2006                    .map(Box::new);
2007                Ok(FilterExpression::SliceAccess {
2008                    base: Box::new(base_expr),
2009                    start: start_expr,
2010                    end: end_expr,
2011                })
2012            }
2013            LogicalExpression::Parameter(_) => Err(Error::Internal(
2014                "Parameters not yet supported in filters".to_string(),
2015            )),
2016            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2017            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2018            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2019            LogicalExpression::ListComprehension {
2020                variable,
2021                list_expr,
2022                filter_expr,
2023                map_expr,
2024            } => {
2025                let list = self.convert_expression(list_expr)?;
2026                let filter = filter_expr
2027                    .as_ref()
2028                    .map(|f| self.convert_expression(f))
2029                    .transpose()?
2030                    .map(Box::new);
2031                let map = self.convert_expression(map_expr)?;
2032                Ok(FilterExpression::ListComprehension {
2033                    variable: variable.clone(),
2034                    list_expr: Box::new(list),
2035                    filter_expr: filter,
2036                    map_expr: Box::new(map),
2037                })
2038            }
2039            LogicalExpression::ExistsSubquery(subplan) => {
2040                // Extract the pattern from the subplan
2041                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
2042                let (start_var, direction, edge_type, end_labels) =
2043                    self.extract_exists_pattern(subplan)?;
2044
2045                Ok(FilterExpression::ExistsSubquery {
2046                    start_var,
2047                    direction,
2048                    edge_type,
2049                    end_labels,
2050                    min_hops: None,
2051                    max_hops: None,
2052                })
2053            }
2054            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2055                "COUNT subqueries not yet supported".to_string(),
2056            )),
2057        }
2058    }
2059
2060    /// Extracts the pattern from an EXISTS subplan.
2061    /// Returns (start_variable, direction, edge_type, end_labels).
2062    fn extract_exists_pattern(
2063        &self,
2064        subplan: &LogicalOperator,
2065    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2066        match subplan {
2067            LogicalOperator::Expand(expand) => {
2068                // Get end node labels from the to_variable if there's a node scan input
2069                let end_labels = self.extract_end_labels_from_expand(expand);
2070                let direction = match expand.direction {
2071                    ExpandDirection::Outgoing => Direction::Outgoing,
2072                    ExpandDirection::Incoming => Direction::Incoming,
2073                    ExpandDirection::Both => Direction::Both,
2074                };
2075                Ok((
2076                    expand.from_variable.clone(),
2077                    direction,
2078                    expand.edge_type.clone(),
2079                    end_labels,
2080                ))
2081            }
2082            LogicalOperator::NodeScan(scan) => {
2083                if let Some(input) = &scan.input {
2084                    self.extract_exists_pattern(input)
2085                } else {
2086                    Err(Error::Internal(
2087                        "EXISTS subquery must contain an edge pattern".to_string(),
2088                    ))
2089                }
2090            }
2091            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2092            _ => Err(Error::Internal(
2093                "Unsupported EXISTS subquery pattern".to_string(),
2094            )),
2095        }
2096    }
2097
2098    /// Extracts end node labels from an Expand operator if present.
2099    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2100        // Check if the expand has a NodeScan input with a label filter
2101        match expand.input.as_ref() {
2102            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2103            _ => None,
2104        }
2105    }
2106
2107    /// Plans a JOIN operator.
2108    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2109        let (left_op, left_columns) = self.plan_operator(&join.left)?;
2110        let (right_op, right_columns) = self.plan_operator(&join.right)?;
2111
2112        // Build combined output columns
2113        let mut columns = left_columns.clone();
2114        columns.extend(right_columns.clone());
2115
2116        // Convert join type
2117        let physical_join_type = match join.join_type {
2118            JoinType::Inner => PhysicalJoinType::Inner,
2119            JoinType::Left => PhysicalJoinType::Left,
2120            JoinType::Right => PhysicalJoinType::Right,
2121            JoinType::Full => PhysicalJoinType::Full,
2122            JoinType::Cross => PhysicalJoinType::Cross,
2123            JoinType::Semi => PhysicalJoinType::Semi,
2124            JoinType::Anti => PhysicalJoinType::Anti,
2125        };
2126
2127        // Build key columns from join conditions
2128        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2129            // Cross join - no keys
2130            (vec![], vec![])
2131        } else {
2132            join.conditions
2133                .iter()
2134                .filter_map(|cond| {
2135                    // Try to extract column indices from expressions
2136                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2137                    let right_idx = self
2138                        .expression_to_column(&cond.right, &right_columns)
2139                        .ok()?;
2140                    Some((left_idx, right_idx))
2141                })
2142                .unzip()
2143        };
2144
2145        let output_schema = self.derive_schema_from_columns(&columns);
2146
2147        // Check if we should use leapfrog join for cyclic patterns
2148        // Currently we use hash join by default; leapfrog is available but
2149        // requires explicit multi-way join detection which will be added
2150        // when we have proper cyclic pattern detection in the optimizer.
2151        // For now, LeapfrogJoinOperator is available for direct use.
2152        let _ = LeapfrogJoinOperator::new; // Suppress unused warning
2153
2154        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2155            left_op,
2156            right_op,
2157            probe_keys,
2158            build_keys,
2159            physical_join_type,
2160            output_schema,
2161        ));
2162
2163        Ok((operator, columns))
2164    }
2165
2166    /// Checks if a join pattern is cyclic (e.g., triangle, clique).
2167    ///
2168    /// A cyclic pattern occurs when join conditions reference variables
2169    /// that create a cycle in the join graph. For example, a triangle
2170    /// pattern (a)->(b)->(c)->(a) creates a cycle.
2171    ///
2172    /// Returns true if the join graph contains at least one cycle with 3+ nodes,
2173    /// indicating potential for worst-case optimal join (WCOJ) optimization.
2174    #[allow(dead_code)]
2175    fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2176        // Build adjacency list for join variables
2177        let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2178        let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2179
2180        // Collect edges from join conditions
2181        Self::collect_join_edges(
2182            &LogicalOperator::Join(join.clone()),
2183            &mut edges,
2184            &mut all_vars,
2185        );
2186
2187        // Need at least 3 variables to form a cycle
2188        if all_vars.len() < 3 {
2189            return false;
2190        }
2191
2192        // Detect cycle using DFS with coloring
2193        Self::has_cycle(&edges, &all_vars)
2194    }
2195
2196    /// Collects edges from join conditions into an adjacency list.
2197    fn collect_join_edges(
2198        op: &LogicalOperator,
2199        edges: &mut HashMap<String, Vec<String>>,
2200        vars: &mut std::collections::HashSet<String>,
2201    ) {
2202        match op {
2203            LogicalOperator::Join(join) => {
2204                // Process join conditions
2205                for cond in &join.conditions {
2206                    if let (Some(left_var), Some(right_var)) = (
2207                        Self::extract_join_variable(&cond.left),
2208                        Self::extract_join_variable(&cond.right),
2209                    ) {
2210                        if left_var != right_var {
2211                            vars.insert(left_var.clone());
2212                            vars.insert(right_var.clone());
2213
2214                            // Add bidirectional edge
2215                            edges
2216                                .entry(left_var.clone())
2217                                .or_default()
2218                                .push(right_var.clone());
2219                            edges.entry(right_var).or_default().push(left_var);
2220                        }
2221                    }
2222                }
2223
2224                // Recurse into children
2225                Self::collect_join_edges(&join.left, edges, vars);
2226                Self::collect_join_edges(&join.right, edges, vars);
2227            }
2228            LogicalOperator::Expand(expand) => {
2229                // Expand creates implicit join between from_variable and to_variable
2230                vars.insert(expand.from_variable.clone());
2231                vars.insert(expand.to_variable.clone());
2232
2233                edges
2234                    .entry(expand.from_variable.clone())
2235                    .or_default()
2236                    .push(expand.to_variable.clone());
2237                edges
2238                    .entry(expand.to_variable.clone())
2239                    .or_default()
2240                    .push(expand.from_variable.clone());
2241
2242                Self::collect_join_edges(&expand.input, edges, vars);
2243            }
2244            LogicalOperator::Filter(filter) => {
2245                Self::collect_join_edges(&filter.input, edges, vars);
2246            }
2247            LogicalOperator::NodeScan(scan) => {
2248                vars.insert(scan.variable.clone());
2249            }
2250            _ => {}
2251        }
2252    }
2253
2254    /// Extracts the variable name from a join expression.
2255    fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2256        match expr {
2257            LogicalExpression::Variable(v) => Some(v.clone()),
2258            LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2259            LogicalExpression::Id(v) => Some(v.clone()),
2260            _ => None,
2261        }
2262    }
2263
2264    /// Detects if the graph has a cycle using DFS coloring.
2265    ///
2266    /// Colors: 0 = white (unvisited), 1 = gray (in progress), 2 = black (done)
2267    fn has_cycle(
2268        edges: &HashMap<String, Vec<String>>,
2269        vars: &std::collections::HashSet<String>,
2270    ) -> bool {
2271        let mut color: HashMap<&String, u8> = HashMap::new();
2272
2273        for var in vars {
2274            color.insert(var, 0);
2275        }
2276
2277        for start in vars {
2278            if color[start] == 0 {
2279                if Self::dfs_cycle(start, None, edges, &mut color) {
2280                    return true;
2281                }
2282            }
2283        }
2284
2285        false
2286    }
2287
2288    /// DFS helper for cycle detection.
2289    fn dfs_cycle(
2290        node: &String,
2291        parent: Option<&String>,
2292        edges: &HashMap<String, Vec<String>>,
2293        color: &mut HashMap<&String, u8>,
2294    ) -> bool {
2295        *color.get_mut(node).unwrap() = 1; // Gray
2296
2297        if let Some(neighbors) = edges.get(node) {
2298            for neighbor in neighbors {
2299                // Skip the edge back to parent (undirected graph)
2300                if parent == Some(neighbor) {
2301                    continue;
2302                }
2303
2304                if let Some(&c) = color.get(neighbor) {
2305                    if c == 1 {
2306                        // Found a back edge - cycle detected
2307                        return true;
2308                    }
2309                    if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2310                        return true;
2311                    }
2312                }
2313            }
2314        }
2315
2316        *color.get_mut(node).unwrap() = 2; // Black
2317        false
2318    }
2319
2320    /// Counts the number of base relations in a logical operator tree.
2321    #[allow(dead_code)]
2322    fn count_relations(op: &LogicalOperator) -> usize {
2323        match op {
2324            LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2325            LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2326            LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2327            LogicalOperator::Join(j) => {
2328                Self::count_relations(&j.left) + Self::count_relations(&j.right)
2329            }
2330            _ => 0,
2331        }
2332    }
2333
2334    /// Extracts a column index from an expression.
2335    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2336        match expr {
2337            LogicalExpression::Variable(name) => columns
2338                .iter()
2339                .position(|c| c == name)
2340                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2341            _ => Err(Error::Internal(
2342                "Only variables supported in join conditions".to_string(),
2343            )),
2344        }
2345    }
2346
2347    /// Plans a UNION operator.
2348    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2349        if union.inputs.is_empty() {
2350            return Err(Error::Internal(
2351                "Union requires at least one input".to_string(),
2352            ));
2353        }
2354
2355        let mut inputs = Vec::with_capacity(union.inputs.len());
2356        let mut columns = Vec::new();
2357
2358        for (i, input) in union.inputs.iter().enumerate() {
2359            let (op, cols) = self.plan_operator(input)?;
2360            if i == 0 {
2361                columns = cols;
2362            }
2363            inputs.push(op);
2364        }
2365
2366        let output_schema = self.derive_schema_from_columns(&columns);
2367        let operator = Box::new(UnionOperator::new(inputs, output_schema));
2368
2369        Ok((operator, columns))
2370    }
2371
2372    /// Plans a DISTINCT operator.
2373    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2374        let (input_op, columns) = self.plan_operator(&distinct.input)?;
2375        let output_schema = self.derive_schema_from_columns(&columns);
2376        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2377        Ok((operator, columns))
2378    }
2379
2380    /// Plans a CREATE NODE operator.
2381    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2382        // Plan input if present
2383        let (input_op, mut columns) = if let Some(ref input) = create.input {
2384            let (op, cols) = self.plan_operator(input)?;
2385            (Some(op), cols)
2386        } else {
2387            (None, vec![])
2388        };
2389
2390        // Output column for the created node
2391        let output_column = columns.len();
2392        columns.push(create.variable.clone());
2393
2394        // Convert properties
2395        let properties: Vec<(String, PropertySource)> = create
2396            .properties
2397            .iter()
2398            .map(|(name, expr)| {
2399                let source = match expr {
2400                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2401                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2402                };
2403                (name.clone(), source)
2404            })
2405            .collect();
2406
2407        let output_schema = self.derive_schema_from_columns(&columns);
2408
2409        let operator = Box::new(
2410            CreateNodeOperator::new(
2411                Arc::clone(&self.store),
2412                input_op,
2413                create.labels.clone(),
2414                properties,
2415                output_schema,
2416                output_column,
2417            )
2418            .with_tx_context(self.viewing_epoch, self.tx_id),
2419        );
2420
2421        Ok((operator, columns))
2422    }
2423
2424    /// Plans a CREATE EDGE operator.
2425    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2426        let (input_op, mut columns) = self.plan_operator(&create.input)?;
2427
2428        // Find source and target columns
2429        let from_column = columns
2430            .iter()
2431            .position(|c| c == &create.from_variable)
2432            .ok_or_else(|| {
2433                Error::Internal(format!(
2434                    "Source variable '{}' not found",
2435                    create.from_variable
2436                ))
2437            })?;
2438
2439        let to_column = columns
2440            .iter()
2441            .position(|c| c == &create.to_variable)
2442            .ok_or_else(|| {
2443                Error::Internal(format!(
2444                    "Target variable '{}' not found",
2445                    create.to_variable
2446                ))
2447            })?;
2448
2449        // Output column for the created edge (if named)
2450        let output_column = create.variable.as_ref().map(|v| {
2451            let idx = columns.len();
2452            columns.push(v.clone());
2453            idx
2454        });
2455
2456        // Convert properties
2457        let properties: Vec<(String, PropertySource)> = create
2458            .properties
2459            .iter()
2460            .map(|(name, expr)| {
2461                let source = match expr {
2462                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2463                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2464                };
2465                (name.clone(), source)
2466            })
2467            .collect();
2468
2469        let output_schema = self.derive_schema_from_columns(&columns);
2470
2471        let mut operator = CreateEdgeOperator::new(
2472            Arc::clone(&self.store),
2473            input_op,
2474            from_column,
2475            to_column,
2476            create.edge_type.clone(),
2477            output_schema,
2478        )
2479        .with_properties(properties)
2480        .with_tx_context(self.viewing_epoch, self.tx_id);
2481
2482        if let Some(col) = output_column {
2483            operator = operator.with_output_column(col);
2484        }
2485
2486        let operator = Box::new(operator);
2487
2488        Ok((operator, columns))
2489    }
2490
2491    /// Plans a DELETE NODE operator.
2492    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2493        let (input_op, columns) = self.plan_operator(&delete.input)?;
2494
2495        let node_column = columns
2496            .iter()
2497            .position(|c| c == &delete.variable)
2498            .ok_or_else(|| {
2499                Error::Internal(format!(
2500                    "Variable '{}' not found for delete",
2501                    delete.variable
2502                ))
2503            })?;
2504
2505        // Output schema for delete count
2506        let output_schema = vec![LogicalType::Int64];
2507        let output_columns = vec!["deleted_count".to_string()];
2508
2509        let operator = Box::new(
2510            DeleteNodeOperator::new(
2511                Arc::clone(&self.store),
2512                input_op,
2513                node_column,
2514                output_schema,
2515                delete.detach, // DETACH DELETE deletes connected edges first
2516            )
2517            .with_tx_context(self.viewing_epoch, self.tx_id),
2518        );
2519
2520        Ok((operator, output_columns))
2521    }
2522
2523    /// Plans a DELETE EDGE operator.
2524    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2525        let (input_op, columns) = self.plan_operator(&delete.input)?;
2526
2527        let edge_column = columns
2528            .iter()
2529            .position(|c| c == &delete.variable)
2530            .ok_or_else(|| {
2531                Error::Internal(format!(
2532                    "Variable '{}' not found for delete",
2533                    delete.variable
2534                ))
2535            })?;
2536
2537        // Output schema for delete count
2538        let output_schema = vec![LogicalType::Int64];
2539        let output_columns = vec!["deleted_count".to_string()];
2540
2541        let operator = Box::new(
2542            DeleteEdgeOperator::new(
2543                Arc::clone(&self.store),
2544                input_op,
2545                edge_column,
2546                output_schema,
2547            )
2548            .with_tx_context(self.viewing_epoch, self.tx_id),
2549        );
2550
2551        Ok((operator, output_columns))
2552    }
2553
2554    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
2555    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2556        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2557        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2558
2559        // Build combined output columns (left + right)
2560        let mut columns = left_columns.clone();
2561        columns.extend(right_columns.clone());
2562
2563        // Find common variables between left and right for join keys
2564        let mut probe_keys = Vec::new();
2565        let mut build_keys = Vec::new();
2566
2567        for (right_idx, right_col) in right_columns.iter().enumerate() {
2568            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2569                probe_keys.push(left_idx);
2570                build_keys.push(right_idx);
2571            }
2572        }
2573
2574        let output_schema = self.derive_schema_from_columns(&columns);
2575
2576        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2577            left_op,
2578            right_op,
2579            probe_keys,
2580            build_keys,
2581            PhysicalJoinType::Left,
2582            output_schema,
2583        ));
2584
2585        Ok((operator, columns))
2586    }
2587
2588    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
2589    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2590        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2591        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2592
2593        // Anti-join only keeps left columns (filters out matching rows)
2594        let columns = left_columns.clone();
2595
2596        // Find common variables between left and right for join keys
2597        let mut probe_keys = Vec::new();
2598        let mut build_keys = Vec::new();
2599
2600        for (right_idx, right_col) in right_columns.iter().enumerate() {
2601            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2602                probe_keys.push(left_idx);
2603                build_keys.push(right_idx);
2604            }
2605        }
2606
2607        let output_schema = self.derive_schema_from_columns(&columns);
2608
2609        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2610            left_op,
2611            right_op,
2612            probe_keys,
2613            build_keys,
2614            PhysicalJoinType::Anti,
2615            output_schema,
2616        ));
2617
2618        Ok((operator, columns))
2619    }
2620
2621    /// Plans an unwind operator.
2622    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2623        // Plan the input operator first
2624        // Handle Empty specially - use a single-row operator
2625        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2626            if matches!(&*unwind.input, LogicalOperator::Empty) {
2627                // For UNWIND without prior MATCH, create a single-row input
2628                // We need an operator that produces one row with the list to unwind
2629                // For now, use EmptyScan which produces no rows - we'll handle the literal
2630                // list in the unwind operator itself
2631                let literal_list = self.convert_expression(&unwind.expression)?;
2632
2633                // Create a project operator that produces a single row with the list
2634                let single_row_op: Box<dyn Operator> = Box::new(
2635                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2636                );
2637                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2638                    single_row_op,
2639                    vec![ProjectExpr::Expression {
2640                        expr: literal_list,
2641                        variable_columns: HashMap::new(),
2642                    }],
2643                    vec![LogicalType::Any],
2644                    Arc::clone(&self.store),
2645                ));
2646
2647                (project_op, vec!["__list__".to_string()])
2648            } else {
2649                self.plan_operator(&unwind.input)?
2650            };
2651
2652        // The UNWIND expression should be a list - we need to find/evaluate it
2653        // For now, we handle the case where the expression references an existing column
2654        // or is a literal list
2655
2656        // Find if the expression references an existing column (like a list property)
2657        let list_col_idx = match &unwind.expression {
2658            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2659            LogicalExpression::Property { variable, .. } => {
2660                // Property access needs to be evaluated - for now we'll need the filter predicate
2661                // to evaluate this. For simple cases, we treat it as a list column.
2662                input_columns.iter().position(|c| c == variable)
2663            }
2664            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2665                // Literal list expression - we'll add it as a virtual column
2666                None
2667            }
2668            _ => None,
2669        };
2670
2671        // Build output columns: all input columns plus the new variable
2672        let mut columns = input_columns.clone();
2673        columns.push(unwind.variable.clone());
2674
2675        // Build output schema
2676        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2677        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2678
2679        // Use the list column index if found, otherwise default to 0
2680        // (in which case the first column should contain the list)
2681        let col_idx = list_col_idx.unwrap_or(0);
2682
2683        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2684            input_op,
2685            col_idx,
2686            unwind.variable.clone(),
2687            output_schema,
2688        ));
2689
2690        Ok((operator, columns))
2691    }
2692
2693    /// Plans a MERGE operator.
2694    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2695        // Plan the input operator if present (skip if Empty)
2696        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2697            Vec::new()
2698        } else {
2699            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2700            cols
2701        };
2702
2703        // Convert match properties from LogicalExpression to Value
2704        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2705            .match_properties
2706            .iter()
2707            .filter_map(|(name, expr)| {
2708                if let LogicalExpression::Literal(v) = expr {
2709                    Some((name.clone(), v.clone()))
2710                } else {
2711                    None // Skip non-literal expressions for now
2712                }
2713            })
2714            .collect();
2715
2716        // Convert ON CREATE properties
2717        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2718            .on_create
2719            .iter()
2720            .filter_map(|(name, expr)| {
2721                if let LogicalExpression::Literal(v) = expr {
2722                    Some((name.clone(), v.clone()))
2723                } else {
2724                    None
2725                }
2726            })
2727            .collect();
2728
2729        // Convert ON MATCH properties
2730        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2731            .on_match
2732            .iter()
2733            .filter_map(|(name, expr)| {
2734                if let LogicalExpression::Literal(v) = expr {
2735                    Some((name.clone(), v.clone()))
2736                } else {
2737                    None
2738                }
2739            })
2740            .collect();
2741
2742        // Add the merged node variable to output columns
2743        columns.push(merge.variable.clone());
2744
2745        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2746            Arc::clone(&self.store),
2747            merge.variable.clone(),
2748            merge.labels.clone(),
2749            match_properties,
2750            on_create_properties,
2751            on_match_properties,
2752        ));
2753
2754        Ok((operator, columns))
2755    }
2756
2757    /// Plans a SHORTEST PATH operator.
2758    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2759        // Plan the input operator
2760        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2761
2762        // Find source and target node columns
2763        let source_column = columns
2764            .iter()
2765            .position(|c| c == &sp.source_var)
2766            .ok_or_else(|| {
2767                Error::Internal(format!(
2768                    "Source variable '{}' not found for shortestPath",
2769                    sp.source_var
2770                ))
2771            })?;
2772
2773        let target_column = columns
2774            .iter()
2775            .position(|c| c == &sp.target_var)
2776            .ok_or_else(|| {
2777                Error::Internal(format!(
2778                    "Target variable '{}' not found for shortestPath",
2779                    sp.target_var
2780                ))
2781            })?;
2782
2783        // Convert direction
2784        let direction = match sp.direction {
2785            ExpandDirection::Outgoing => Direction::Outgoing,
2786            ExpandDirection::Incoming => Direction::Incoming,
2787            ExpandDirection::Both => Direction::Both,
2788        };
2789
2790        // Create the shortest path operator
2791        let operator: Box<dyn Operator> = Box::new(
2792            ShortestPathOperator::new(
2793                Arc::clone(&self.store),
2794                input_op,
2795                source_column,
2796                target_column,
2797                sp.edge_type.clone(),
2798                direction,
2799            )
2800            .with_all_paths(sp.all_paths),
2801        );
2802
2803        // Add path length column with the expected naming convention
2804        // The translator expects _path_length_{alias} format for length(p) calls
2805        columns.push(format!("_path_length_{}", sp.path_alias));
2806
2807        Ok((operator, columns))
2808    }
2809
2810    /// Plans an ADD LABEL operator.
2811    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2812        let (input_op, columns) = self.plan_operator(&add_label.input)?;
2813
2814        // Find the node column
2815        let node_column = columns
2816            .iter()
2817            .position(|c| c == &add_label.variable)
2818            .ok_or_else(|| {
2819                Error::Internal(format!(
2820                    "Variable '{}' not found for ADD LABEL",
2821                    add_label.variable
2822                ))
2823            })?;
2824
2825        // Output schema for update count
2826        let output_schema = vec![LogicalType::Int64];
2827        let output_columns = vec!["labels_added".to_string()];
2828
2829        let operator = Box::new(AddLabelOperator::new(
2830            Arc::clone(&self.store),
2831            input_op,
2832            node_column,
2833            add_label.labels.clone(),
2834            output_schema,
2835        ));
2836
2837        Ok((operator, output_columns))
2838    }
2839
2840    /// Plans a REMOVE LABEL operator.
2841    fn plan_remove_label(
2842        &self,
2843        remove_label: &RemoveLabelOp,
2844    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2845        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2846
2847        // Find the node column
2848        let node_column = columns
2849            .iter()
2850            .position(|c| c == &remove_label.variable)
2851            .ok_or_else(|| {
2852                Error::Internal(format!(
2853                    "Variable '{}' not found for REMOVE LABEL",
2854                    remove_label.variable
2855                ))
2856            })?;
2857
2858        // Output schema for update count
2859        let output_schema = vec![LogicalType::Int64];
2860        let output_columns = vec!["labels_removed".to_string()];
2861
2862        let operator = Box::new(RemoveLabelOperator::new(
2863            Arc::clone(&self.store),
2864            input_op,
2865            node_column,
2866            remove_label.labels.clone(),
2867            output_schema,
2868        ));
2869
2870        Ok((operator, output_columns))
2871    }
2872
2873    /// Plans a SET PROPERTY operator.
2874    fn plan_set_property(
2875        &self,
2876        set_prop: &SetPropertyOp,
2877    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2878        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2879
2880        // Find the entity column (node or edge variable)
2881        let entity_column = columns
2882            .iter()
2883            .position(|c| c == &set_prop.variable)
2884            .ok_or_else(|| {
2885                Error::Internal(format!(
2886                    "Variable '{}' not found for SET",
2887                    set_prop.variable
2888                ))
2889            })?;
2890
2891        // Convert properties to PropertySource
2892        let properties: Vec<(String, PropertySource)> = set_prop
2893            .properties
2894            .iter()
2895            .map(|(name, expr)| {
2896                let source = self.expression_to_property_source(expr, &columns)?;
2897                Ok((name.clone(), source))
2898            })
2899            .collect::<Result<Vec<_>>>()?;
2900
2901        // Output schema preserves input schema (passes through)
2902        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2903        let output_columns = columns.clone();
2904
2905        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
2906        let operator = Box::new(SetPropertyOperator::new_for_node(
2907            Arc::clone(&self.store),
2908            input_op,
2909            entity_column,
2910            properties,
2911            output_schema,
2912        ));
2913
2914        Ok((operator, output_columns))
2915    }
2916
2917    /// Converts a logical expression to a PropertySource.
2918    fn expression_to_property_source(
2919        &self,
2920        expr: &LogicalExpression,
2921        columns: &[String],
2922    ) -> Result<PropertySource> {
2923        match expr {
2924            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2925            LogicalExpression::Variable(name) => {
2926                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2927                    Error::Internal(format!("Variable '{}' not found for property source", name))
2928                })?;
2929                Ok(PropertySource::Column(col_idx))
2930            }
2931            LogicalExpression::Parameter(name) => {
2932                // Parameters should be resolved before planning
2933                // For now, treat as a placeholder
2934                Ok(PropertySource::Constant(
2935                    grafeo_common::types::Value::String(format!("${}", name).into()),
2936                ))
2937            }
2938            _ => Err(Error::Internal(format!(
2939                "Unsupported expression type for property source: {:?}",
2940                expr
2941            ))),
2942        }
2943    }
2944}
2945
2946/// Converts a logical binary operator to a filter binary operator.
2947pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2948    match op {
2949        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2950        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2951        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2952        BinaryOp::Le => Ok(BinaryFilterOp::Le),
2953        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2954        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2955        BinaryOp::And => Ok(BinaryFilterOp::And),
2956        BinaryOp::Or => Ok(BinaryFilterOp::Or),
2957        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2958        BinaryOp::Add => Ok(BinaryFilterOp::Add),
2959        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2960        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2961        BinaryOp::Div => Ok(BinaryFilterOp::Div),
2962        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2963        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2964        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2965        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2966        BinaryOp::In => Ok(BinaryFilterOp::In),
2967        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2968        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2969        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2970            "Binary operator {:?} not yet supported in filters",
2971            op
2972        ))),
2973    }
2974}
2975
2976/// Converts a logical unary operator to a filter unary operator.
2977pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2978    match op {
2979        UnaryOp::Not => Ok(UnaryFilterOp::Not),
2980        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2981        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2982        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2983    }
2984}
2985
2986/// Converts a logical aggregate function to a physical aggregate function.
2987pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2988    match func {
2989        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2990        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2991        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2992        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2993        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2994        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2995        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2996        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2997        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2998        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2999        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3000    }
3001}
3002
3003/// Converts a logical expression to a filter expression.
3004///
3005/// This is a standalone function that can be used by both LPG and RDF planners.
3006pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3007    match expr {
3008        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3009        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3010        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3011            variable: variable.clone(),
3012            property: property.clone(),
3013        }),
3014        LogicalExpression::Binary { left, op, right } => {
3015            let left_expr = convert_filter_expression(left)?;
3016            let right_expr = convert_filter_expression(right)?;
3017            let filter_op = convert_binary_op(*op)?;
3018            Ok(FilterExpression::Binary {
3019                left: Box::new(left_expr),
3020                op: filter_op,
3021                right: Box::new(right_expr),
3022            })
3023        }
3024        LogicalExpression::Unary { op, operand } => {
3025            let operand_expr = convert_filter_expression(operand)?;
3026            let filter_op = convert_unary_op(*op)?;
3027            Ok(FilterExpression::Unary {
3028                op: filter_op,
3029                operand: Box::new(operand_expr),
3030            })
3031        }
3032        LogicalExpression::FunctionCall { name, args, .. } => {
3033            let filter_args: Vec<FilterExpression> = args
3034                .iter()
3035                .map(|a| convert_filter_expression(a))
3036                .collect::<Result<Vec<_>>>()?;
3037            Ok(FilterExpression::FunctionCall {
3038                name: name.clone(),
3039                args: filter_args,
3040            })
3041        }
3042        LogicalExpression::Case {
3043            operand,
3044            when_clauses,
3045            else_clause,
3046        } => {
3047            let filter_operand = operand
3048                .as_ref()
3049                .map(|e| convert_filter_expression(e))
3050                .transpose()?
3051                .map(Box::new);
3052            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3053                .iter()
3054                .map(|(cond, result)| {
3055                    Ok((
3056                        convert_filter_expression(cond)?,
3057                        convert_filter_expression(result)?,
3058                    ))
3059                })
3060                .collect::<Result<Vec<_>>>()?;
3061            let filter_else = else_clause
3062                .as_ref()
3063                .map(|e| convert_filter_expression(e))
3064                .transpose()?
3065                .map(Box::new);
3066            Ok(FilterExpression::Case {
3067                operand: filter_operand,
3068                when_clauses: filter_when_clauses,
3069                else_clause: filter_else,
3070            })
3071        }
3072        LogicalExpression::List(items) => {
3073            let filter_items: Vec<FilterExpression> = items
3074                .iter()
3075                .map(|item| convert_filter_expression(item))
3076                .collect::<Result<Vec<_>>>()?;
3077            Ok(FilterExpression::List(filter_items))
3078        }
3079        LogicalExpression::Map(pairs) => {
3080            let filter_pairs: Vec<(String, FilterExpression)> = pairs
3081                .iter()
3082                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3083                .collect::<Result<Vec<_>>>()?;
3084            Ok(FilterExpression::Map(filter_pairs))
3085        }
3086        LogicalExpression::IndexAccess { base, index } => {
3087            let base_expr = convert_filter_expression(base)?;
3088            let index_expr = convert_filter_expression(index)?;
3089            Ok(FilterExpression::IndexAccess {
3090                base: Box::new(base_expr),
3091                index: Box::new(index_expr),
3092            })
3093        }
3094        LogicalExpression::SliceAccess { base, start, end } => {
3095            let base_expr = convert_filter_expression(base)?;
3096            let start_expr = start
3097                .as_ref()
3098                .map(|s| convert_filter_expression(s))
3099                .transpose()?
3100                .map(Box::new);
3101            let end_expr = end
3102                .as_ref()
3103                .map(|e| convert_filter_expression(e))
3104                .transpose()?
3105                .map(Box::new);
3106            Ok(FilterExpression::SliceAccess {
3107                base: Box::new(base_expr),
3108                start: start_expr,
3109                end: end_expr,
3110            })
3111        }
3112        LogicalExpression::Parameter(_) => Err(Error::Internal(
3113            "Parameters not yet supported in filters".to_string(),
3114        )),
3115        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3116        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3117        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3118        LogicalExpression::ListComprehension {
3119            variable,
3120            list_expr,
3121            filter_expr,
3122            map_expr,
3123        } => {
3124            let list = convert_filter_expression(list_expr)?;
3125            let filter = filter_expr
3126                .as_ref()
3127                .map(|f| convert_filter_expression(f))
3128                .transpose()?
3129                .map(Box::new);
3130            let map = convert_filter_expression(map_expr)?;
3131            Ok(FilterExpression::ListComprehension {
3132                variable: variable.clone(),
3133                list_expr: Box::new(list),
3134                filter_expr: filter,
3135                map_expr: Box::new(map),
3136            })
3137        }
3138        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3139            Error::Internal("Subqueries not yet supported in filters".to_string()),
3140        ),
3141    }
3142}
3143
3144/// Infers the logical type from a value.
3145fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3146    use grafeo_common::types::Value;
3147    match value {
3148        Value::Null => LogicalType::String, // Default type for null
3149        Value::Bool(_) => LogicalType::Bool,
3150        Value::Int64(_) => LogicalType::Int64,
3151        Value::Float64(_) => LogicalType::Float64,
3152        Value::String(_) => LogicalType::String,
3153        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
3154        Value::Timestamp(_) => LogicalType::Timestamp,
3155        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
3156        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
3157        Value::Vector(v) => LogicalType::Vector(v.len()),
3158    }
3159}
3160
3161/// Converts an expression to a string for column naming.
3162fn expression_to_string(expr: &LogicalExpression) -> String {
3163    match expr {
3164        LogicalExpression::Variable(name) => name.clone(),
3165        LogicalExpression::Property { variable, property } => {
3166            format!("{variable}.{property}")
3167        }
3168        LogicalExpression::Literal(value) => format!("{value:?}"),
3169        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3170        _ => "expr".to_string(),
3171    }
3172}
3173
3174/// A physical plan ready for execution.
3175pub struct PhysicalPlan {
3176    /// The root physical operator.
3177    pub operator: Box<dyn Operator>,
3178    /// Column names for the result.
3179    pub columns: Vec<String>,
3180    /// Adaptive execution context with cardinality estimates.
3181    ///
3182    /// When adaptive execution is enabled, this context contains estimated
3183    /// cardinalities at various checkpoints in the plan. During execution,
3184    /// actual row counts are recorded and compared against estimates.
3185    pub adaptive_context: Option<AdaptiveContext>,
3186}
3187
3188impl PhysicalPlan {
3189    /// Returns the column names.
3190    #[must_use]
3191    pub fn columns(&self) -> &[String] {
3192        &self.columns
3193    }
3194
3195    /// Consumes the plan and returns the operator.
3196    pub fn into_operator(self) -> Box<dyn Operator> {
3197        self.operator
3198    }
3199
3200    /// Returns the adaptive context, if adaptive execution is enabled.
3201    #[must_use]
3202    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3203        self.adaptive_context.as_ref()
3204    }
3205
3206    /// Takes ownership of the adaptive context.
3207    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3208        self.adaptive_context.take()
3209    }
3210}
3211
3212/// Helper operator that returns a single result chunk once.
3213///
3214/// Used by the factorized expand chain to wrap the final result.
3215#[allow(dead_code)]
3216struct SingleResultOperator {
3217    result: Option<grafeo_core::execution::DataChunk>,
3218}
3219
3220impl SingleResultOperator {
3221    #[allow(dead_code)]
3222    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3223        Self { result }
3224    }
3225}
3226
3227impl Operator for SingleResultOperator {
3228    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3229        Ok(self.result.take())
3230    }
3231
3232    fn reset(&mut self) {
3233        // Cannot reset - result is consumed
3234    }
3235
3236    fn name(&self) -> &'static str {
3237        "SingleResult"
3238    }
3239}
3240
3241#[cfg(test)]
3242mod tests {
3243    use super::*;
3244    use crate::query::plan::{
3245        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3246        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3247        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3248        SortKey, SortOp,
3249    };
3250    use grafeo_common::types::Value;
3251
3252    fn create_test_store() -> Arc<LpgStore> {
3253        let store = Arc::new(LpgStore::new());
3254        store.create_node(&["Person"]);
3255        store.create_node(&["Person"]);
3256        store.create_node(&["Company"]);
3257        store
3258    }
3259
3260    // ==================== Simple Scan Tests ====================
3261
3262    #[test]
3263    fn test_plan_simple_scan() {
3264        let store = create_test_store();
3265        let planner = Planner::new(store);
3266
3267        // MATCH (n:Person) RETURN n
3268        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3269            items: vec![ReturnItem {
3270                expression: LogicalExpression::Variable("n".to_string()),
3271                alias: None,
3272            }],
3273            distinct: false,
3274            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3275                variable: "n".to_string(),
3276                label: Some("Person".to_string()),
3277                input: None,
3278            })),
3279        }));
3280
3281        let physical = planner.plan(&logical).unwrap();
3282        assert_eq!(physical.columns(), &["n"]);
3283    }
3284
3285    #[test]
3286    fn test_plan_scan_without_label() {
3287        let store = create_test_store();
3288        let planner = Planner::new(store);
3289
3290        // MATCH (n) RETURN n
3291        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3292            items: vec![ReturnItem {
3293                expression: LogicalExpression::Variable("n".to_string()),
3294                alias: None,
3295            }],
3296            distinct: false,
3297            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3298                variable: "n".to_string(),
3299                label: None,
3300                input: None,
3301            })),
3302        }));
3303
3304        let physical = planner.plan(&logical).unwrap();
3305        assert_eq!(physical.columns(), &["n"]);
3306    }
3307
3308    #[test]
3309    fn test_plan_return_with_alias() {
3310        let store = create_test_store();
3311        let planner = Planner::new(store);
3312
3313        // MATCH (n:Person) RETURN n AS person
3314        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3315            items: vec![ReturnItem {
3316                expression: LogicalExpression::Variable("n".to_string()),
3317                alias: Some("person".to_string()),
3318            }],
3319            distinct: false,
3320            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3321                variable: "n".to_string(),
3322                label: Some("Person".to_string()),
3323                input: None,
3324            })),
3325        }));
3326
3327        let physical = planner.plan(&logical).unwrap();
3328        assert_eq!(physical.columns(), &["person"]);
3329    }
3330
3331    #[test]
3332    fn test_plan_return_property() {
3333        let store = create_test_store();
3334        let planner = Planner::new(store);
3335
3336        // MATCH (n:Person) RETURN n.name
3337        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3338            items: vec![ReturnItem {
3339                expression: LogicalExpression::Property {
3340                    variable: "n".to_string(),
3341                    property: "name".to_string(),
3342                },
3343                alias: None,
3344            }],
3345            distinct: false,
3346            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3347                variable: "n".to_string(),
3348                label: Some("Person".to_string()),
3349                input: None,
3350            })),
3351        }));
3352
3353        let physical = planner.plan(&logical).unwrap();
3354        assert_eq!(physical.columns(), &["n.name"]);
3355    }
3356
3357    #[test]
3358    fn test_plan_return_literal() {
3359        let store = create_test_store();
3360        let planner = Planner::new(store);
3361
3362        // MATCH (n) RETURN 42 AS answer
3363        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3364            items: vec![ReturnItem {
3365                expression: LogicalExpression::Literal(Value::Int64(42)),
3366                alias: Some("answer".to_string()),
3367            }],
3368            distinct: false,
3369            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3370                variable: "n".to_string(),
3371                label: None,
3372                input: None,
3373            })),
3374        }));
3375
3376        let physical = planner.plan(&logical).unwrap();
3377        assert_eq!(physical.columns(), &["answer"]);
3378    }
3379
3380    // ==================== Filter Tests ====================
3381
3382    #[test]
3383    fn test_plan_filter_equality() {
3384        let store = create_test_store();
3385        let planner = Planner::new(store);
3386
3387        // MATCH (n:Person) WHERE n.age = 30 RETURN n
3388        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3389            items: vec![ReturnItem {
3390                expression: LogicalExpression::Variable("n".to_string()),
3391                alias: None,
3392            }],
3393            distinct: false,
3394            input: Box::new(LogicalOperator::Filter(FilterOp {
3395                predicate: LogicalExpression::Binary {
3396                    left: Box::new(LogicalExpression::Property {
3397                        variable: "n".to_string(),
3398                        property: "age".to_string(),
3399                    }),
3400                    op: BinaryOp::Eq,
3401                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3402                },
3403                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3404                    variable: "n".to_string(),
3405                    label: Some("Person".to_string()),
3406                    input: None,
3407                })),
3408            })),
3409        }));
3410
3411        let physical = planner.plan(&logical).unwrap();
3412        assert_eq!(physical.columns(), &["n"]);
3413    }
3414
3415    #[test]
3416    fn test_plan_filter_compound_and() {
3417        let store = create_test_store();
3418        let planner = Planner::new(store);
3419
3420        // WHERE n.age > 20 AND n.age < 40
3421        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3422            items: vec![ReturnItem {
3423                expression: LogicalExpression::Variable("n".to_string()),
3424                alias: None,
3425            }],
3426            distinct: false,
3427            input: Box::new(LogicalOperator::Filter(FilterOp {
3428                predicate: LogicalExpression::Binary {
3429                    left: Box::new(LogicalExpression::Binary {
3430                        left: Box::new(LogicalExpression::Property {
3431                            variable: "n".to_string(),
3432                            property: "age".to_string(),
3433                        }),
3434                        op: BinaryOp::Gt,
3435                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3436                    }),
3437                    op: BinaryOp::And,
3438                    right: Box::new(LogicalExpression::Binary {
3439                        left: Box::new(LogicalExpression::Property {
3440                            variable: "n".to_string(),
3441                            property: "age".to_string(),
3442                        }),
3443                        op: BinaryOp::Lt,
3444                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3445                    }),
3446                },
3447                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3448                    variable: "n".to_string(),
3449                    label: None,
3450                    input: None,
3451                })),
3452            })),
3453        }));
3454
3455        let physical = planner.plan(&logical).unwrap();
3456        assert_eq!(physical.columns(), &["n"]);
3457    }
3458
3459    #[test]
3460    fn test_plan_filter_unary_not() {
3461        let store = create_test_store();
3462        let planner = Planner::new(store);
3463
3464        // WHERE NOT n.active
3465        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3466            items: vec![ReturnItem {
3467                expression: LogicalExpression::Variable("n".to_string()),
3468                alias: None,
3469            }],
3470            distinct: false,
3471            input: Box::new(LogicalOperator::Filter(FilterOp {
3472                predicate: LogicalExpression::Unary {
3473                    op: UnaryOp::Not,
3474                    operand: Box::new(LogicalExpression::Property {
3475                        variable: "n".to_string(),
3476                        property: "active".to_string(),
3477                    }),
3478                },
3479                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3480                    variable: "n".to_string(),
3481                    label: None,
3482                    input: None,
3483                })),
3484            })),
3485        }));
3486
3487        let physical = planner.plan(&logical).unwrap();
3488        assert_eq!(physical.columns(), &["n"]);
3489    }
3490
3491    #[test]
3492    fn test_plan_filter_is_null() {
3493        let store = create_test_store();
3494        let planner = Planner::new(store);
3495
3496        // WHERE n.email IS NULL
3497        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3498            items: vec![ReturnItem {
3499                expression: LogicalExpression::Variable("n".to_string()),
3500                alias: None,
3501            }],
3502            distinct: false,
3503            input: Box::new(LogicalOperator::Filter(FilterOp {
3504                predicate: LogicalExpression::Unary {
3505                    op: UnaryOp::IsNull,
3506                    operand: Box::new(LogicalExpression::Property {
3507                        variable: "n".to_string(),
3508                        property: "email".to_string(),
3509                    }),
3510                },
3511                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3512                    variable: "n".to_string(),
3513                    label: None,
3514                    input: None,
3515                })),
3516            })),
3517        }));
3518
3519        let physical = planner.plan(&logical).unwrap();
3520        assert_eq!(physical.columns(), &["n"]);
3521    }
3522
3523    #[test]
3524    fn test_plan_filter_function_call() {
3525        let store = create_test_store();
3526        let planner = Planner::new(store);
3527
3528        // WHERE size(n.friends) > 0
3529        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3530            items: vec![ReturnItem {
3531                expression: LogicalExpression::Variable("n".to_string()),
3532                alias: None,
3533            }],
3534            distinct: false,
3535            input: Box::new(LogicalOperator::Filter(FilterOp {
3536                predicate: LogicalExpression::Binary {
3537                    left: Box::new(LogicalExpression::FunctionCall {
3538                        name: "size".to_string(),
3539                        args: vec![LogicalExpression::Property {
3540                            variable: "n".to_string(),
3541                            property: "friends".to_string(),
3542                        }],
3543                        distinct: false,
3544                    }),
3545                    op: BinaryOp::Gt,
3546                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3547                },
3548                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3549                    variable: "n".to_string(),
3550                    label: None,
3551                    input: None,
3552                })),
3553            })),
3554        }));
3555
3556        let physical = planner.plan(&logical).unwrap();
3557        assert_eq!(physical.columns(), &["n"]);
3558    }
3559
3560    // ==================== Expand Tests ====================
3561
3562    #[test]
3563    fn test_plan_expand_outgoing() {
3564        let store = create_test_store();
3565        let planner = Planner::new(store);
3566
3567        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
3568        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3569            items: vec![
3570                ReturnItem {
3571                    expression: LogicalExpression::Variable("a".to_string()),
3572                    alias: None,
3573                },
3574                ReturnItem {
3575                    expression: LogicalExpression::Variable("b".to_string()),
3576                    alias: None,
3577                },
3578            ],
3579            distinct: false,
3580            input: Box::new(LogicalOperator::Expand(ExpandOp {
3581                from_variable: "a".to_string(),
3582                to_variable: "b".to_string(),
3583                edge_variable: None,
3584                direction: ExpandDirection::Outgoing,
3585                edge_type: Some("KNOWS".to_string()),
3586                min_hops: 1,
3587                max_hops: Some(1),
3588                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3589                    variable: "a".to_string(),
3590                    label: Some("Person".to_string()),
3591                    input: None,
3592                })),
3593                path_alias: None,
3594            })),
3595        }));
3596
3597        let physical = planner.plan(&logical).unwrap();
3598        // The return should have columns [a, b]
3599        assert!(physical.columns().contains(&"a".to_string()));
3600        assert!(physical.columns().contains(&"b".to_string()));
3601    }
3602
3603    #[test]
3604    fn test_plan_expand_with_edge_variable() {
3605        let store = create_test_store();
3606        let planner = Planner::new(store);
3607
3608        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
3609        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3610            items: vec![
3611                ReturnItem {
3612                    expression: LogicalExpression::Variable("a".to_string()),
3613                    alias: None,
3614                },
3615                ReturnItem {
3616                    expression: LogicalExpression::Variable("r".to_string()),
3617                    alias: None,
3618                },
3619                ReturnItem {
3620                    expression: LogicalExpression::Variable("b".to_string()),
3621                    alias: None,
3622                },
3623            ],
3624            distinct: false,
3625            input: Box::new(LogicalOperator::Expand(ExpandOp {
3626                from_variable: "a".to_string(),
3627                to_variable: "b".to_string(),
3628                edge_variable: Some("r".to_string()),
3629                direction: ExpandDirection::Outgoing,
3630                edge_type: Some("KNOWS".to_string()),
3631                min_hops: 1,
3632                max_hops: Some(1),
3633                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3634                    variable: "a".to_string(),
3635                    label: None,
3636                    input: None,
3637                })),
3638                path_alias: None,
3639            })),
3640        }));
3641
3642        let physical = planner.plan(&logical).unwrap();
3643        assert!(physical.columns().contains(&"a".to_string()));
3644        assert!(physical.columns().contains(&"r".to_string()));
3645        assert!(physical.columns().contains(&"b".to_string()));
3646    }
3647
3648    // ==================== Limit/Skip/Sort Tests ====================
3649
3650    #[test]
3651    fn test_plan_limit() {
3652        let store = create_test_store();
3653        let planner = Planner::new(store);
3654
3655        // MATCH (n) RETURN n LIMIT 10
3656        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3657            items: vec![ReturnItem {
3658                expression: LogicalExpression::Variable("n".to_string()),
3659                alias: None,
3660            }],
3661            distinct: false,
3662            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3663                count: 10,
3664                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3665                    variable: "n".to_string(),
3666                    label: None,
3667                    input: None,
3668                })),
3669            })),
3670        }));
3671
3672        let physical = planner.plan(&logical).unwrap();
3673        assert_eq!(physical.columns(), &["n"]);
3674    }
3675
3676    #[test]
3677    fn test_plan_skip() {
3678        let store = create_test_store();
3679        let planner = Planner::new(store);
3680
3681        // MATCH (n) RETURN n SKIP 5
3682        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3683            items: vec![ReturnItem {
3684                expression: LogicalExpression::Variable("n".to_string()),
3685                alias: None,
3686            }],
3687            distinct: false,
3688            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3689                count: 5,
3690                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3691                    variable: "n".to_string(),
3692                    label: None,
3693                    input: None,
3694                })),
3695            })),
3696        }));
3697
3698        let physical = planner.plan(&logical).unwrap();
3699        assert_eq!(physical.columns(), &["n"]);
3700    }
3701
3702    #[test]
3703    fn test_plan_sort() {
3704        let store = create_test_store();
3705        let planner = Planner::new(store);
3706
3707        // MATCH (n) RETURN n ORDER BY n.name ASC
3708        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3709            items: vec![ReturnItem {
3710                expression: LogicalExpression::Variable("n".to_string()),
3711                alias: None,
3712            }],
3713            distinct: false,
3714            input: Box::new(LogicalOperator::Sort(SortOp {
3715                keys: vec![SortKey {
3716                    expression: LogicalExpression::Variable("n".to_string()),
3717                    order: SortOrder::Ascending,
3718                }],
3719                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3720                    variable: "n".to_string(),
3721                    label: None,
3722                    input: None,
3723                })),
3724            })),
3725        }));
3726
3727        let physical = planner.plan(&logical).unwrap();
3728        assert_eq!(physical.columns(), &["n"]);
3729    }
3730
3731    #[test]
3732    fn test_plan_sort_descending() {
3733        let store = create_test_store();
3734        let planner = Planner::new(store);
3735
3736        // ORDER BY n DESC
3737        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3738            items: vec![ReturnItem {
3739                expression: LogicalExpression::Variable("n".to_string()),
3740                alias: None,
3741            }],
3742            distinct: false,
3743            input: Box::new(LogicalOperator::Sort(SortOp {
3744                keys: vec![SortKey {
3745                    expression: LogicalExpression::Variable("n".to_string()),
3746                    order: SortOrder::Descending,
3747                }],
3748                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3749                    variable: "n".to_string(),
3750                    label: None,
3751                    input: None,
3752                })),
3753            })),
3754        }));
3755
3756        let physical = planner.plan(&logical).unwrap();
3757        assert_eq!(physical.columns(), &["n"]);
3758    }
3759
3760    #[test]
3761    fn test_plan_distinct() {
3762        let store = create_test_store();
3763        let planner = Planner::new(store);
3764
3765        // MATCH (n) RETURN DISTINCT n
3766        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3767            items: vec![ReturnItem {
3768                expression: LogicalExpression::Variable("n".to_string()),
3769                alias: None,
3770            }],
3771            distinct: false,
3772            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3773                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3774                    variable: "n".to_string(),
3775                    label: None,
3776                    input: None,
3777                })),
3778                columns: None,
3779            })),
3780        }));
3781
3782        let physical = planner.plan(&logical).unwrap();
3783        assert_eq!(physical.columns(), &["n"]);
3784    }
3785
3786    // ==================== Aggregate Tests ====================
3787
3788    #[test]
3789    fn test_plan_aggregate_count() {
3790        let store = create_test_store();
3791        let planner = Planner::new(store);
3792
3793        // MATCH (n) RETURN count(n)
3794        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3795            items: vec![ReturnItem {
3796                expression: LogicalExpression::Variable("cnt".to_string()),
3797                alias: None,
3798            }],
3799            distinct: false,
3800            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3801                group_by: vec![],
3802                aggregates: vec![LogicalAggregateExpr {
3803                    function: LogicalAggregateFunction::Count,
3804                    expression: Some(LogicalExpression::Variable("n".to_string())),
3805                    distinct: false,
3806                    alias: Some("cnt".to_string()),
3807                    percentile: None,
3808                }],
3809                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3810                    variable: "n".to_string(),
3811                    label: None,
3812                    input: None,
3813                })),
3814                having: None,
3815            })),
3816        }));
3817
3818        let physical = planner.plan(&logical).unwrap();
3819        assert!(physical.columns().contains(&"cnt".to_string()));
3820    }
3821
3822    #[test]
3823    fn test_plan_aggregate_with_group_by() {
3824        let store = create_test_store();
3825        let planner = Planner::new(store);
3826
3827        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
3828        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3829            group_by: vec![LogicalExpression::Property {
3830                variable: "n".to_string(),
3831                property: "city".to_string(),
3832            }],
3833            aggregates: vec![LogicalAggregateExpr {
3834                function: LogicalAggregateFunction::Count,
3835                expression: Some(LogicalExpression::Variable("n".to_string())),
3836                distinct: false,
3837                alias: Some("cnt".to_string()),
3838                percentile: None,
3839            }],
3840            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3841                variable: "n".to_string(),
3842                label: Some("Person".to_string()),
3843                input: None,
3844            })),
3845            having: None,
3846        }));
3847
3848        let physical = planner.plan(&logical).unwrap();
3849        assert_eq!(physical.columns().len(), 2);
3850    }
3851
3852    #[test]
3853    fn test_plan_aggregate_sum() {
3854        let store = create_test_store();
3855        let planner = Planner::new(store);
3856
3857        // SUM(n.value)
3858        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3859            group_by: vec![],
3860            aggregates: vec![LogicalAggregateExpr {
3861                function: LogicalAggregateFunction::Sum,
3862                expression: Some(LogicalExpression::Property {
3863                    variable: "n".to_string(),
3864                    property: "value".to_string(),
3865                }),
3866                distinct: false,
3867                alias: Some("total".to_string()),
3868                percentile: None,
3869            }],
3870            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3871                variable: "n".to_string(),
3872                label: None,
3873                input: None,
3874            })),
3875            having: None,
3876        }));
3877
3878        let physical = planner.plan(&logical).unwrap();
3879        assert!(physical.columns().contains(&"total".to_string()));
3880    }
3881
3882    #[test]
3883    fn test_plan_aggregate_avg() {
3884        let store = create_test_store();
3885        let planner = Planner::new(store);
3886
3887        // AVG(n.score)
3888        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3889            group_by: vec![],
3890            aggregates: vec![LogicalAggregateExpr {
3891                function: LogicalAggregateFunction::Avg,
3892                expression: Some(LogicalExpression::Property {
3893                    variable: "n".to_string(),
3894                    property: "score".to_string(),
3895                }),
3896                distinct: false,
3897                alias: Some("average".to_string()),
3898                percentile: None,
3899            }],
3900            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3901                variable: "n".to_string(),
3902                label: None,
3903                input: None,
3904            })),
3905            having: None,
3906        }));
3907
3908        let physical = planner.plan(&logical).unwrap();
3909        assert!(physical.columns().contains(&"average".to_string()));
3910    }
3911
3912    #[test]
3913    fn test_plan_aggregate_min_max() {
3914        let store = create_test_store();
3915        let planner = Planner::new(store);
3916
3917        // MIN(n.age), MAX(n.age)
3918        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3919            group_by: vec![],
3920            aggregates: vec![
3921                LogicalAggregateExpr {
3922                    function: LogicalAggregateFunction::Min,
3923                    expression: Some(LogicalExpression::Property {
3924                        variable: "n".to_string(),
3925                        property: "age".to_string(),
3926                    }),
3927                    distinct: false,
3928                    alias: Some("youngest".to_string()),
3929                    percentile: None,
3930                },
3931                LogicalAggregateExpr {
3932                    function: LogicalAggregateFunction::Max,
3933                    expression: Some(LogicalExpression::Property {
3934                        variable: "n".to_string(),
3935                        property: "age".to_string(),
3936                    }),
3937                    distinct: false,
3938                    alias: Some("oldest".to_string()),
3939                    percentile: None,
3940                },
3941            ],
3942            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3943                variable: "n".to_string(),
3944                label: None,
3945                input: None,
3946            })),
3947            having: None,
3948        }));
3949
3950        let physical = planner.plan(&logical).unwrap();
3951        assert!(physical.columns().contains(&"youngest".to_string()));
3952        assert!(physical.columns().contains(&"oldest".to_string()));
3953    }
3954
3955    // ==================== Join Tests ====================
3956
3957    #[test]
3958    fn test_plan_inner_join() {
3959        let store = create_test_store();
3960        let planner = Planner::new(store);
3961
3962        // Inner join between two scans
3963        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3964            items: vec![
3965                ReturnItem {
3966                    expression: LogicalExpression::Variable("a".to_string()),
3967                    alias: None,
3968                },
3969                ReturnItem {
3970                    expression: LogicalExpression::Variable("b".to_string()),
3971                    alias: None,
3972                },
3973            ],
3974            distinct: false,
3975            input: Box::new(LogicalOperator::Join(JoinOp {
3976                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3977                    variable: "a".to_string(),
3978                    label: Some("Person".to_string()),
3979                    input: None,
3980                })),
3981                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3982                    variable: "b".to_string(),
3983                    label: Some("Company".to_string()),
3984                    input: None,
3985                })),
3986                join_type: JoinType::Inner,
3987                conditions: vec![JoinCondition {
3988                    left: LogicalExpression::Variable("a".to_string()),
3989                    right: LogicalExpression::Variable("b".to_string()),
3990                }],
3991            })),
3992        }));
3993
3994        let physical = planner.plan(&logical).unwrap();
3995        assert!(physical.columns().contains(&"a".to_string()));
3996        assert!(physical.columns().contains(&"b".to_string()));
3997    }
3998
3999    #[test]
4000    fn test_plan_cross_join() {
4001        let store = create_test_store();
4002        let planner = Planner::new(store);
4003
4004        // Cross join (no conditions)
4005        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4006            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4007                variable: "a".to_string(),
4008                label: None,
4009                input: None,
4010            })),
4011            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4012                variable: "b".to_string(),
4013                label: None,
4014                input: None,
4015            })),
4016            join_type: JoinType::Cross,
4017            conditions: vec![],
4018        }));
4019
4020        let physical = planner.plan(&logical).unwrap();
4021        assert_eq!(physical.columns().len(), 2);
4022    }
4023
4024    #[test]
4025    fn test_plan_left_join() {
4026        let store = create_test_store();
4027        let planner = Planner::new(store);
4028
4029        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4030            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4031                variable: "a".to_string(),
4032                label: None,
4033                input: None,
4034            })),
4035            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4036                variable: "b".to_string(),
4037                label: None,
4038                input: None,
4039            })),
4040            join_type: JoinType::Left,
4041            conditions: vec![],
4042        }));
4043
4044        let physical = planner.plan(&logical).unwrap();
4045        assert_eq!(physical.columns().len(), 2);
4046    }
4047
4048    // ==================== Mutation Tests ====================
4049
4050    #[test]
4051    fn test_plan_create_node() {
4052        let store = create_test_store();
4053        let planner = Planner::new(store);
4054
4055        // CREATE (n:Person {name: 'Alice'})
4056        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4057            variable: "n".to_string(),
4058            labels: vec!["Person".to_string()],
4059            properties: vec![(
4060                "name".to_string(),
4061                LogicalExpression::Literal(Value::String("Alice".into())),
4062            )],
4063            input: None,
4064        }));
4065
4066        let physical = planner.plan(&logical).unwrap();
4067        assert!(physical.columns().contains(&"n".to_string()));
4068    }
4069
4070    #[test]
4071    fn test_plan_create_edge() {
4072        let store = create_test_store();
4073        let planner = Planner::new(store);
4074
4075        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
4076        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4077            variable: Some("r".to_string()),
4078            from_variable: "a".to_string(),
4079            to_variable: "b".to_string(),
4080            edge_type: "KNOWS".to_string(),
4081            properties: vec![],
4082            input: Box::new(LogicalOperator::Join(JoinOp {
4083                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4084                    variable: "a".to_string(),
4085                    label: None,
4086                    input: None,
4087                })),
4088                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4089                    variable: "b".to_string(),
4090                    label: None,
4091                    input: None,
4092                })),
4093                join_type: JoinType::Cross,
4094                conditions: vec![],
4095            })),
4096        }));
4097
4098        let physical = planner.plan(&logical).unwrap();
4099        assert!(physical.columns().contains(&"r".to_string()));
4100    }
4101
4102    #[test]
4103    fn test_plan_delete_node() {
4104        let store = create_test_store();
4105        let planner = Planner::new(store);
4106
4107        // MATCH (n) DELETE n
4108        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4109            variable: "n".to_string(),
4110            detach: false,
4111            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4112                variable: "n".to_string(),
4113                label: None,
4114                input: None,
4115            })),
4116        }));
4117
4118        let physical = planner.plan(&logical).unwrap();
4119        assert!(physical.columns().contains(&"deleted_count".to_string()));
4120    }
4121
4122    // ==================== Error Cases ====================
4123
4124    #[test]
4125    fn test_plan_empty_errors() {
4126        let store = create_test_store();
4127        let planner = Planner::new(store);
4128
4129        let logical = LogicalPlan::new(LogicalOperator::Empty);
4130        let result = planner.plan(&logical);
4131        assert!(result.is_err());
4132    }
4133
4134    #[test]
4135    fn test_plan_missing_variable_in_return() {
4136        let store = create_test_store();
4137        let planner = Planner::new(store);
4138
4139        // Return variable that doesn't exist in input
4140        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4141            items: vec![ReturnItem {
4142                expression: LogicalExpression::Variable("missing".to_string()),
4143                alias: None,
4144            }],
4145            distinct: false,
4146            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4147                variable: "n".to_string(),
4148                label: None,
4149                input: None,
4150            })),
4151        }));
4152
4153        let result = planner.plan(&logical);
4154        assert!(result.is_err());
4155    }
4156
4157    // ==================== Helper Function Tests ====================
4158
4159    #[test]
4160    fn test_convert_binary_ops() {
4161        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4162        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4163        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4164        assert!(convert_binary_op(BinaryOp::Le).is_ok());
4165        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4166        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4167        assert!(convert_binary_op(BinaryOp::And).is_ok());
4168        assert!(convert_binary_op(BinaryOp::Or).is_ok());
4169        assert!(convert_binary_op(BinaryOp::Add).is_ok());
4170        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4171        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4172        assert!(convert_binary_op(BinaryOp::Div).is_ok());
4173    }
4174
4175    #[test]
4176    fn test_convert_unary_ops() {
4177        assert!(convert_unary_op(UnaryOp::Not).is_ok());
4178        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4179        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4180        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4181    }
4182
4183    #[test]
4184    fn test_convert_aggregate_functions() {
4185        assert!(matches!(
4186            convert_aggregate_function(LogicalAggregateFunction::Count),
4187            PhysicalAggregateFunction::Count
4188        ));
4189        assert!(matches!(
4190            convert_aggregate_function(LogicalAggregateFunction::Sum),
4191            PhysicalAggregateFunction::Sum
4192        ));
4193        assert!(matches!(
4194            convert_aggregate_function(LogicalAggregateFunction::Avg),
4195            PhysicalAggregateFunction::Avg
4196        ));
4197        assert!(matches!(
4198            convert_aggregate_function(LogicalAggregateFunction::Min),
4199            PhysicalAggregateFunction::Min
4200        ));
4201        assert!(matches!(
4202            convert_aggregate_function(LogicalAggregateFunction::Max),
4203            PhysicalAggregateFunction::Max
4204        ));
4205    }
4206
4207    #[test]
4208    fn test_planner_accessors() {
4209        let store = create_test_store();
4210        let planner = Planner::new(Arc::clone(&store));
4211
4212        assert!(planner.tx_id().is_none());
4213        assert!(planner.tx_manager().is_none());
4214        let _ = planner.viewing_epoch(); // Just ensure it's accessible
4215    }
4216
4217    #[test]
4218    fn test_physical_plan_accessors() {
4219        let store = create_test_store();
4220        let planner = Planner::new(store);
4221
4222        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4223            variable: "n".to_string(),
4224            label: None,
4225            input: None,
4226        }));
4227
4228        let physical = planner.plan(&logical).unwrap();
4229        assert_eq!(physical.columns(), &["n"]);
4230
4231        // Test into_operator
4232        let _ = physical.into_operator();
4233    }
4234
4235    // ==================== Adaptive Planning Tests ====================
4236
4237    #[test]
4238    fn test_plan_adaptive_with_scan() {
4239        let store = create_test_store();
4240        let planner = Planner::new(store);
4241
4242        // MATCH (n:Person) RETURN n
4243        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4244            items: vec![ReturnItem {
4245                expression: LogicalExpression::Variable("n".to_string()),
4246                alias: None,
4247            }],
4248            distinct: false,
4249            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4250                variable: "n".to_string(),
4251                label: Some("Person".to_string()),
4252                input: None,
4253            })),
4254        }));
4255
4256        let physical = planner.plan_adaptive(&logical).unwrap();
4257        assert_eq!(physical.columns(), &["n"]);
4258        // Should have adaptive context with estimates
4259        assert!(physical.adaptive_context.is_some());
4260    }
4261
4262    #[test]
4263    fn test_plan_adaptive_with_filter() {
4264        let store = create_test_store();
4265        let planner = Planner::new(store);
4266
4267        // MATCH (n) WHERE n.age > 30 RETURN n
4268        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4269            items: vec![ReturnItem {
4270                expression: LogicalExpression::Variable("n".to_string()),
4271                alias: None,
4272            }],
4273            distinct: false,
4274            input: Box::new(LogicalOperator::Filter(FilterOp {
4275                predicate: LogicalExpression::Binary {
4276                    left: Box::new(LogicalExpression::Property {
4277                        variable: "n".to_string(),
4278                        property: "age".to_string(),
4279                    }),
4280                    op: BinaryOp::Gt,
4281                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4282                },
4283                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4284                    variable: "n".to_string(),
4285                    label: None,
4286                    input: None,
4287                })),
4288            })),
4289        }));
4290
4291        let physical = planner.plan_adaptive(&logical).unwrap();
4292        assert!(physical.adaptive_context.is_some());
4293    }
4294
4295    #[test]
4296    fn test_plan_adaptive_with_expand() {
4297        let store = create_test_store();
4298        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4299
4300        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
4301        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4302            items: vec![
4303                ReturnItem {
4304                    expression: LogicalExpression::Variable("a".to_string()),
4305                    alias: None,
4306                },
4307                ReturnItem {
4308                    expression: LogicalExpression::Variable("b".to_string()),
4309                    alias: None,
4310                },
4311            ],
4312            distinct: false,
4313            input: Box::new(LogicalOperator::Expand(ExpandOp {
4314                from_variable: "a".to_string(),
4315                to_variable: "b".to_string(),
4316                edge_variable: None,
4317                direction: ExpandDirection::Outgoing,
4318                edge_type: Some("KNOWS".to_string()),
4319                min_hops: 1,
4320                max_hops: Some(1),
4321                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4322                    variable: "a".to_string(),
4323                    label: None,
4324                    input: None,
4325                })),
4326                path_alias: None,
4327            })),
4328        }));
4329
4330        let physical = planner.plan_adaptive(&logical).unwrap();
4331        assert!(physical.adaptive_context.is_some());
4332    }
4333
4334    #[test]
4335    fn test_plan_adaptive_with_join() {
4336        let store = create_test_store();
4337        let planner = Planner::new(store);
4338
4339        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4340            items: vec![
4341                ReturnItem {
4342                    expression: LogicalExpression::Variable("a".to_string()),
4343                    alias: None,
4344                },
4345                ReturnItem {
4346                    expression: LogicalExpression::Variable("b".to_string()),
4347                    alias: None,
4348                },
4349            ],
4350            distinct: false,
4351            input: Box::new(LogicalOperator::Join(JoinOp {
4352                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4353                    variable: "a".to_string(),
4354                    label: None,
4355                    input: None,
4356                })),
4357                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4358                    variable: "b".to_string(),
4359                    label: None,
4360                    input: None,
4361                })),
4362                join_type: JoinType::Cross,
4363                conditions: vec![],
4364            })),
4365        }));
4366
4367        let physical = planner.plan_adaptive(&logical).unwrap();
4368        assert!(physical.adaptive_context.is_some());
4369    }
4370
4371    #[test]
4372    fn test_plan_adaptive_with_aggregate() {
4373        let store = create_test_store();
4374        let planner = Planner::new(store);
4375
4376        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4377            group_by: vec![],
4378            aggregates: vec![LogicalAggregateExpr {
4379                function: LogicalAggregateFunction::Count,
4380                expression: Some(LogicalExpression::Variable("n".to_string())),
4381                distinct: false,
4382                alias: Some("cnt".to_string()),
4383                percentile: None,
4384            }],
4385            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4386                variable: "n".to_string(),
4387                label: None,
4388                input: None,
4389            })),
4390            having: None,
4391        }));
4392
4393        let physical = planner.plan_adaptive(&logical).unwrap();
4394        assert!(physical.adaptive_context.is_some());
4395    }
4396
4397    #[test]
4398    fn test_plan_adaptive_with_distinct() {
4399        let store = create_test_store();
4400        let planner = Planner::new(store);
4401
4402        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4403            items: vec![ReturnItem {
4404                expression: LogicalExpression::Variable("n".to_string()),
4405                alias: None,
4406            }],
4407            distinct: false,
4408            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4409                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4410                    variable: "n".to_string(),
4411                    label: None,
4412                    input: None,
4413                })),
4414                columns: None,
4415            })),
4416        }));
4417
4418        let physical = planner.plan_adaptive(&logical).unwrap();
4419        assert!(physical.adaptive_context.is_some());
4420    }
4421
4422    #[test]
4423    fn test_plan_adaptive_with_limit() {
4424        let store = create_test_store();
4425        let planner = Planner::new(store);
4426
4427        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4428            items: vec![ReturnItem {
4429                expression: LogicalExpression::Variable("n".to_string()),
4430                alias: None,
4431            }],
4432            distinct: false,
4433            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4434                count: 10,
4435                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4436                    variable: "n".to_string(),
4437                    label: None,
4438                    input: None,
4439                })),
4440            })),
4441        }));
4442
4443        let physical = planner.plan_adaptive(&logical).unwrap();
4444        assert!(physical.adaptive_context.is_some());
4445    }
4446
4447    #[test]
4448    fn test_plan_adaptive_with_skip() {
4449        let store = create_test_store();
4450        let planner = Planner::new(store);
4451
4452        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4453            items: vec![ReturnItem {
4454                expression: LogicalExpression::Variable("n".to_string()),
4455                alias: None,
4456            }],
4457            distinct: false,
4458            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4459                count: 5,
4460                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4461                    variable: "n".to_string(),
4462                    label: None,
4463                    input: None,
4464                })),
4465            })),
4466        }));
4467
4468        let physical = planner.plan_adaptive(&logical).unwrap();
4469        assert!(physical.adaptive_context.is_some());
4470    }
4471
4472    #[test]
4473    fn test_plan_adaptive_with_sort() {
4474        let store = create_test_store();
4475        let planner = Planner::new(store);
4476
4477        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4478            items: vec![ReturnItem {
4479                expression: LogicalExpression::Variable("n".to_string()),
4480                alias: None,
4481            }],
4482            distinct: false,
4483            input: Box::new(LogicalOperator::Sort(SortOp {
4484                keys: vec![SortKey {
4485                    expression: LogicalExpression::Variable("n".to_string()),
4486                    order: SortOrder::Ascending,
4487                }],
4488                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4489                    variable: "n".to_string(),
4490                    label: None,
4491                    input: None,
4492                })),
4493            })),
4494        }));
4495
4496        let physical = planner.plan_adaptive(&logical).unwrap();
4497        assert!(physical.adaptive_context.is_some());
4498    }
4499
4500    #[test]
4501    fn test_plan_adaptive_with_union() {
4502        let store = create_test_store();
4503        let planner = Planner::new(store);
4504
4505        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4506            items: vec![ReturnItem {
4507                expression: LogicalExpression::Variable("n".to_string()),
4508                alias: None,
4509            }],
4510            distinct: false,
4511            input: Box::new(LogicalOperator::Union(UnionOp {
4512                inputs: vec![
4513                    LogicalOperator::NodeScan(NodeScanOp {
4514                        variable: "n".to_string(),
4515                        label: Some("Person".to_string()),
4516                        input: None,
4517                    }),
4518                    LogicalOperator::NodeScan(NodeScanOp {
4519                        variable: "n".to_string(),
4520                        label: Some("Company".to_string()),
4521                        input: None,
4522                    }),
4523                ],
4524            })),
4525        }));
4526
4527        let physical = planner.plan_adaptive(&logical).unwrap();
4528        assert!(physical.adaptive_context.is_some());
4529    }
4530
4531    // ==================== Variable Length Path Tests ====================
4532
4533    #[test]
4534    fn test_plan_expand_variable_length() {
4535        let store = create_test_store();
4536        let planner = Planner::new(store);
4537
4538        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
4539        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4540            items: vec![
4541                ReturnItem {
4542                    expression: LogicalExpression::Variable("a".to_string()),
4543                    alias: None,
4544                },
4545                ReturnItem {
4546                    expression: LogicalExpression::Variable("b".to_string()),
4547                    alias: None,
4548                },
4549            ],
4550            distinct: false,
4551            input: Box::new(LogicalOperator::Expand(ExpandOp {
4552                from_variable: "a".to_string(),
4553                to_variable: "b".to_string(),
4554                edge_variable: None,
4555                direction: ExpandDirection::Outgoing,
4556                edge_type: Some("KNOWS".to_string()),
4557                min_hops: 1,
4558                max_hops: Some(3),
4559                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4560                    variable: "a".to_string(),
4561                    label: None,
4562                    input: None,
4563                })),
4564                path_alias: None,
4565            })),
4566        }));
4567
4568        let physical = planner.plan(&logical).unwrap();
4569        assert!(physical.columns().contains(&"a".to_string()));
4570        assert!(physical.columns().contains(&"b".to_string()));
4571    }
4572
4573    #[test]
4574    fn test_plan_expand_with_path_alias() {
4575        let store = create_test_store();
4576        let planner = Planner::new(store);
4577
4578        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
4579        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4580            items: vec![
4581                ReturnItem {
4582                    expression: LogicalExpression::Variable("a".to_string()),
4583                    alias: None,
4584                },
4585                ReturnItem {
4586                    expression: LogicalExpression::Variable("b".to_string()),
4587                    alias: None,
4588                },
4589            ],
4590            distinct: false,
4591            input: Box::new(LogicalOperator::Expand(ExpandOp {
4592                from_variable: "a".to_string(),
4593                to_variable: "b".to_string(),
4594                edge_variable: None,
4595                direction: ExpandDirection::Outgoing,
4596                edge_type: Some("KNOWS".to_string()),
4597                min_hops: 1,
4598                max_hops: Some(3),
4599                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4600                    variable: "a".to_string(),
4601                    label: None,
4602                    input: None,
4603                })),
4604                path_alias: Some("p".to_string()),
4605            })),
4606        }));
4607
4608        let physical = planner.plan(&logical).unwrap();
4609        // Verify plan was created successfully with expected output columns
4610        assert!(physical.columns().contains(&"a".to_string()));
4611        assert!(physical.columns().contains(&"b".to_string()));
4612    }
4613
4614    #[test]
4615    fn test_plan_expand_incoming() {
4616        let store = create_test_store();
4617        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4618
4619        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
4620        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4621            items: vec![
4622                ReturnItem {
4623                    expression: LogicalExpression::Variable("a".to_string()),
4624                    alias: None,
4625                },
4626                ReturnItem {
4627                    expression: LogicalExpression::Variable("b".to_string()),
4628                    alias: None,
4629                },
4630            ],
4631            distinct: false,
4632            input: Box::new(LogicalOperator::Expand(ExpandOp {
4633                from_variable: "a".to_string(),
4634                to_variable: "b".to_string(),
4635                edge_variable: None,
4636                direction: ExpandDirection::Incoming,
4637                edge_type: Some("KNOWS".to_string()),
4638                min_hops: 1,
4639                max_hops: Some(1),
4640                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4641                    variable: "a".to_string(),
4642                    label: None,
4643                    input: None,
4644                })),
4645                path_alias: None,
4646            })),
4647        }));
4648
4649        let physical = planner.plan(&logical).unwrap();
4650        assert!(physical.columns().contains(&"a".to_string()));
4651        assert!(physical.columns().contains(&"b".to_string()));
4652    }
4653
4654    #[test]
4655    fn test_plan_expand_both_directions() {
4656        let store = create_test_store();
4657        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4658
4659        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
4660        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4661            items: vec![
4662                ReturnItem {
4663                    expression: LogicalExpression::Variable("a".to_string()),
4664                    alias: None,
4665                },
4666                ReturnItem {
4667                    expression: LogicalExpression::Variable("b".to_string()),
4668                    alias: None,
4669                },
4670            ],
4671            distinct: false,
4672            input: Box::new(LogicalOperator::Expand(ExpandOp {
4673                from_variable: "a".to_string(),
4674                to_variable: "b".to_string(),
4675                edge_variable: None,
4676                direction: ExpandDirection::Both,
4677                edge_type: Some("KNOWS".to_string()),
4678                min_hops: 1,
4679                max_hops: Some(1),
4680                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4681                    variable: "a".to_string(),
4682                    label: None,
4683                    input: None,
4684                })),
4685                path_alias: None,
4686            })),
4687        }));
4688
4689        let physical = planner.plan(&logical).unwrap();
4690        assert!(physical.columns().contains(&"a".to_string()));
4691        assert!(physical.columns().contains(&"b".to_string()));
4692    }
4693
4694    // ==================== With Context Tests ====================
4695
4696    #[test]
4697    fn test_planner_with_context() {
4698        use crate::transaction::TransactionManager;
4699
4700        let store = create_test_store();
4701        let tx_manager = Arc::new(TransactionManager::new());
4702        let tx_id = tx_manager.begin();
4703        let epoch = tx_manager.current_epoch();
4704
4705        let planner = Planner::with_context(
4706            Arc::clone(&store),
4707            Arc::clone(&tx_manager),
4708            Some(tx_id),
4709            epoch,
4710        );
4711
4712        assert_eq!(planner.tx_id(), Some(tx_id));
4713        assert!(planner.tx_manager().is_some());
4714        assert_eq!(planner.viewing_epoch(), epoch);
4715    }
4716
4717    #[test]
4718    fn test_planner_with_factorized_execution_disabled() {
4719        let store = create_test_store();
4720        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4721
4722        // Two consecutive expands - should NOT use factorized execution
4723        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4724            items: vec![
4725                ReturnItem {
4726                    expression: LogicalExpression::Variable("a".to_string()),
4727                    alias: None,
4728                },
4729                ReturnItem {
4730                    expression: LogicalExpression::Variable("c".to_string()),
4731                    alias: None,
4732                },
4733            ],
4734            distinct: false,
4735            input: Box::new(LogicalOperator::Expand(ExpandOp {
4736                from_variable: "b".to_string(),
4737                to_variable: "c".to_string(),
4738                edge_variable: None,
4739                direction: ExpandDirection::Outgoing,
4740                edge_type: None,
4741                min_hops: 1,
4742                max_hops: Some(1),
4743                input: Box::new(LogicalOperator::Expand(ExpandOp {
4744                    from_variable: "a".to_string(),
4745                    to_variable: "b".to_string(),
4746                    edge_variable: None,
4747                    direction: ExpandDirection::Outgoing,
4748                    edge_type: None,
4749                    min_hops: 1,
4750                    max_hops: Some(1),
4751                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4752                        variable: "a".to_string(),
4753                        label: None,
4754                        input: None,
4755                    })),
4756                    path_alias: None,
4757                })),
4758                path_alias: None,
4759            })),
4760        }));
4761
4762        let physical = planner.plan(&logical).unwrap();
4763        assert!(physical.columns().contains(&"a".to_string()));
4764        assert!(physical.columns().contains(&"c".to_string()));
4765    }
4766
4767    // ==================== Sort with Property Tests ====================
4768
4769    #[test]
4770    fn test_plan_sort_by_property() {
4771        let store = create_test_store();
4772        let planner = Planner::new(store);
4773
4774        // MATCH (n) RETURN n ORDER BY n.name ASC
4775        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4776            items: vec![ReturnItem {
4777                expression: LogicalExpression::Variable("n".to_string()),
4778                alias: None,
4779            }],
4780            distinct: false,
4781            input: Box::new(LogicalOperator::Sort(SortOp {
4782                keys: vec![SortKey {
4783                    expression: LogicalExpression::Property {
4784                        variable: "n".to_string(),
4785                        property: "name".to_string(),
4786                    },
4787                    order: SortOrder::Ascending,
4788                }],
4789                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4790                    variable: "n".to_string(),
4791                    label: None,
4792                    input: None,
4793                })),
4794            })),
4795        }));
4796
4797        let physical = planner.plan(&logical).unwrap();
4798        // Should have the property column projected
4799        assert!(physical.columns().contains(&"n".to_string()));
4800    }
4801
4802    // ==================== Scan with Input Tests ====================
4803
4804    #[test]
4805    fn test_plan_scan_with_input() {
4806        let store = create_test_store();
4807        let planner = Planner::new(store);
4808
4809        // A scan with another scan as input (for chained patterns)
4810        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4811            items: vec![
4812                ReturnItem {
4813                    expression: LogicalExpression::Variable("a".to_string()),
4814                    alias: None,
4815                },
4816                ReturnItem {
4817                    expression: LogicalExpression::Variable("b".to_string()),
4818                    alias: None,
4819                },
4820            ],
4821            distinct: false,
4822            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4823                variable: "b".to_string(),
4824                label: Some("Company".to_string()),
4825                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4826                    variable: "a".to_string(),
4827                    label: Some("Person".to_string()),
4828                    input: None,
4829                }))),
4830            })),
4831        }));
4832
4833        let physical = planner.plan(&logical).unwrap();
4834        assert!(physical.columns().contains(&"a".to_string()));
4835        assert!(physical.columns().contains(&"b".to_string()));
4836    }
4837}