Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29    UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37/// Range bounds for property-based range queries.
38struct RangeBounds<'a> {
39    min: Option<&'a Value>,
40    max: Option<&'a Value>,
41    min_inclusive: bool,
42    max_inclusive: bool,
43}
44
45/// Converts a logical plan to a physical operator tree.
46pub struct Planner {
47    /// The graph store to scan from.
48    store: Arc<LpgStore>,
49    /// Transaction manager for MVCC operations.
50    tx_manager: Option<Arc<TransactionManager>>,
51    /// Current transaction ID (if in a transaction).
52    tx_id: Option<TxId>,
53    /// Epoch to use for visibility checks.
54    viewing_epoch: EpochId,
55    /// Counter for generating unique anonymous edge column names.
56    anon_edge_counter: std::cell::Cell<u32>,
57    /// Whether to use factorized execution for multi-hop queries.
58    factorized_execution: bool,
59}
60
61impl Planner {
62    /// Creates a new planner with the given store.
63    ///
64    /// This creates a planner without transaction context, using the current
65    /// epoch from the store for visibility.
66    #[must_use]
67    pub fn new(store: Arc<LpgStore>) -> Self {
68        let epoch = store.current_epoch();
69        Self {
70            store,
71            tx_manager: None,
72            tx_id: None,
73            viewing_epoch: epoch,
74            anon_edge_counter: std::cell::Cell::new(0),
75            factorized_execution: true,
76        }
77    }
78
79    /// Creates a new planner with transaction context for MVCC-aware planning.
80    ///
81    /// # Arguments
82    ///
83    /// * `store` - The graph store
84    /// * `tx_manager` - Transaction manager for recording reads/writes
85    /// * `tx_id` - Current transaction ID (None for auto-commit)
86    /// * `viewing_epoch` - Epoch to use for version visibility
87    #[must_use]
88    pub fn with_context(
89        store: Arc<LpgStore>,
90        tx_manager: Arc<TransactionManager>,
91        tx_id: Option<TxId>,
92        viewing_epoch: EpochId,
93    ) -> Self {
94        Self {
95            store,
96            tx_manager: Some(tx_manager),
97            tx_id,
98            viewing_epoch,
99            anon_edge_counter: std::cell::Cell::new(0),
100            factorized_execution: true,
101        }
102    }
103
104    /// Returns the viewing epoch for this planner.
105    #[must_use]
106    pub fn viewing_epoch(&self) -> EpochId {
107        self.viewing_epoch
108    }
109
110    /// Returns the transaction ID for this planner, if any.
111    #[must_use]
112    pub fn tx_id(&self) -> Option<TxId> {
113        self.tx_id
114    }
115
116    /// Returns a reference to the transaction manager, if available.
117    #[must_use]
118    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119        self.tx_manager.as_ref()
120    }
121
122    /// Enables or disables factorized execution for multi-hop queries.
123    #[must_use]
124    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125        self.factorized_execution = enabled;
126        self
127    }
128
129    /// Counts consecutive single-hop expand operations.
130    ///
131    /// Returns the count and the deepest non-expand operator (the base of the chain).
132    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133        match op {
134            LogicalOperator::Expand(expand) => {
135                // Only count single-hop expands (factorization doesn't apply to variable-length)
136                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138                if is_single_hop {
139                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
140                    (inner_count + 1, base)
141                } else {
142                    // Variable-length path breaks the chain
143                    (0, op)
144                }
145            }
146            _ => (0, op),
147        }
148    }
149
150    /// Collects expand operations from the outermost down to the base.
151    ///
152    /// Returns expands in order from innermost (base) to outermost.
153    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154        let mut chain = Vec::new();
155        let mut current = op;
156
157        while let LogicalOperator::Expand(expand) = current {
158            // Only include single-hop expands
159            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160            if !is_single_hop {
161                break;
162            }
163            chain.push(expand);
164            current = &expand.input;
165        }
166
167        // Reverse so we go from base to outer
168        chain.reverse();
169        chain
170    }
171
172    /// Plans a logical plan into a physical operator.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if planning fails.
177    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179        Ok(PhysicalPlan {
180            operator,
181            columns,
182            adaptive_context: None,
183        })
184    }
185
186    /// Plans a logical plan with adaptive execution support.
187    ///
188    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
189    /// joins) that can be monitored during execution to detect estimate errors.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if planning fails.
194    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197        // Build adaptive context with cardinality estimates
198        let mut adaptive_context = AdaptiveContext::new();
199        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201        Ok(PhysicalPlan {
202            operator,
203            columns,
204            adaptive_context: Some(adaptive_context),
205        })
206    }
207
208    /// Collects cardinality estimates from the logical plan into an adaptive context.
209    fn collect_cardinality_estimates(
210        &self,
211        op: &LogicalOperator,
212        ctx: &mut AdaptiveContext,
213        depth: usize,
214    ) {
215        match op {
216            LogicalOperator::NodeScan(scan) => {
217                // Estimate based on label statistics
218                let estimate = if let Some(label) = &scan.label {
219                    self.store.nodes_by_label(label).len() as f64
220                } else {
221                    self.store.node_count() as f64
222                };
223                let id = format!("scan_{}", scan.variable);
224                ctx.set_estimate(&id, estimate);
225
226                // Recurse into input if present
227                if let Some(input) = &scan.input {
228                    self.collect_cardinality_estimates(input, ctx, depth + 1);
229                }
230            }
231            LogicalOperator::Filter(filter) => {
232                // Default selectivity estimate for filters (30%)
233                let input_estimate = self.estimate_cardinality(&filter.input);
234                let estimate = input_estimate * 0.3;
235                let id = format!("filter_{depth}");
236                ctx.set_estimate(&id, estimate);
237
238                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239            }
240            LogicalOperator::Expand(expand) => {
241                // Estimate based on average degree
242                let input_estimate = self.estimate_cardinality(&expand.input);
243                let avg_degree = 10.0; // Default estimate
244                let estimate = input_estimate * avg_degree;
245                let id = format!("expand_{}", expand.to_variable);
246                ctx.set_estimate(&id, estimate);
247
248                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
249            }
250            LogicalOperator::Join(join) => {
251                // Estimate join output (product with selectivity)
252                let left_est = self.estimate_cardinality(&join.left);
253                let right_est = self.estimate_cardinality(&join.right);
254                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
255                let id = format!("join_{depth}");
256                ctx.set_estimate(&id, estimate);
257
258                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
259                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
260            }
261            LogicalOperator::Aggregate(agg) => {
262                // Aggregates typically reduce cardinality
263                let input_estimate = self.estimate_cardinality(&agg.input);
264                let estimate = if agg.group_by.is_empty() {
265                    1.0 // Scalar aggregate
266                } else {
267                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
268                };
269                let id = format!("aggregate_{depth}");
270                ctx.set_estimate(&id, estimate);
271
272                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
273            }
274            LogicalOperator::Distinct(distinct) => {
275                let input_estimate = self.estimate_cardinality(&distinct.input);
276                let estimate = (input_estimate * 0.5).max(1.0);
277                let id = format!("distinct_{depth}");
278                ctx.set_estimate(&id, estimate);
279
280                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
281            }
282            LogicalOperator::Return(ret) => {
283                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
284            }
285            LogicalOperator::Limit(limit) => {
286                let input_estimate = self.estimate_cardinality(&limit.input);
287                let estimate = (input_estimate).min(limit.count as f64);
288                let id = format!("limit_{depth}");
289                ctx.set_estimate(&id, estimate);
290
291                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
292            }
293            LogicalOperator::Skip(skip) => {
294                let input_estimate = self.estimate_cardinality(&skip.input);
295                let estimate = (input_estimate - skip.count as f64).max(0.0);
296                let id = format!("skip_{depth}");
297                ctx.set_estimate(&id, estimate);
298
299                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
300            }
301            LogicalOperator::Sort(sort) => {
302                // Sort doesn't change cardinality
303                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
304            }
305            LogicalOperator::Union(union) => {
306                let estimate: f64 = union
307                    .inputs
308                    .iter()
309                    .map(|input| self.estimate_cardinality(input))
310                    .sum();
311                let id = format!("union_{depth}");
312                ctx.set_estimate(&id, estimate);
313
314                for input in &union.inputs {
315                    self.collect_cardinality_estimates(input, ctx, depth + 1);
316                }
317            }
318            _ => {
319                // For other operators, try to recurse into known input patterns
320            }
321        }
322    }
323
324    /// Estimates cardinality for a logical operator subtree.
325    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
326        match op {
327            LogicalOperator::NodeScan(scan) => {
328                if let Some(label) = &scan.label {
329                    self.store.nodes_by_label(label).len() as f64
330                } else {
331                    self.store.node_count() as f64
332                }
333            }
334            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
335            LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
336            LogicalOperator::Join(join) => {
337                let left = self.estimate_cardinality(&join.left);
338                let right = self.estimate_cardinality(&join.right);
339                (left * right).sqrt()
340            }
341            LogicalOperator::Aggregate(agg) => {
342                if agg.group_by.is_empty() {
343                    1.0
344                } else {
345                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
346                }
347            }
348            LogicalOperator::Distinct(distinct) => {
349                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
350            }
351            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
352            LogicalOperator::Limit(limit) => self
353                .estimate_cardinality(&limit.input)
354                .min(limit.count as f64),
355            LogicalOperator::Skip(skip) => {
356                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
357            }
358            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
359            LogicalOperator::Union(union) => union
360                .inputs
361                .iter()
362                .map(|input| self.estimate_cardinality(input))
363                .sum(),
364            _ => 1000.0, // Default estimate for unknown operators
365        }
366    }
367
368    /// Plans a single logical operator.
369    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
370        match op {
371            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
372            LogicalOperator::Expand(expand) => {
373                // Check for expand chains when factorized execution is enabled
374                if self.factorized_execution {
375                    let (chain_len, _base) = Self::count_expand_chain(op);
376                    if chain_len >= 2 {
377                        // Use factorized chain for 2+ consecutive single-hop expands
378                        return self.plan_expand_chain(op);
379                    }
380                }
381                self.plan_expand(expand)
382            }
383            LogicalOperator::Return(ret) => self.plan_return(ret),
384            LogicalOperator::Filter(filter) => self.plan_filter(filter),
385            LogicalOperator::Project(project) => self.plan_project(project),
386            LogicalOperator::Limit(limit) => self.plan_limit(limit),
387            LogicalOperator::Skip(skip) => self.plan_skip(skip),
388            LogicalOperator::Sort(sort) => self.plan_sort(sort),
389            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
390            LogicalOperator::Join(join) => self.plan_join(join),
391            LogicalOperator::Union(union) => self.plan_union(union),
392            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
393            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
394            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
395            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
396            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
397            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
398            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
399            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
400            LogicalOperator::Merge(merge) => self.plan_merge(merge),
401            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
402            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
403            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
404            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
405            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
406            _ => Err(Error::Internal(format!(
407                "Unsupported operator: {:?}",
408                std::mem::discriminant(op)
409            ))),
410        }
411    }
412
413    /// Plans a node scan operator.
414    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
415        let scan_op = if let Some(label) = &scan.label {
416            ScanOperator::with_label(Arc::clone(&self.store), label)
417        } else {
418            ScanOperator::new(Arc::clone(&self.store))
419        };
420
421        // Apply MVCC context if available
422        let scan_operator: Box<dyn Operator> =
423            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
424
425        // If there's an input, chain operators with a nested loop join (cross join)
426        if let Some(input) = &scan.input {
427            let (input_op, mut input_columns) = self.plan_operator(input)?;
428
429            // Build output schema: input columns + scan column
430            let mut output_schema: Vec<LogicalType> =
431                input_columns.iter().map(|_| LogicalType::Any).collect();
432            output_schema.push(LogicalType::Node);
433
434            // Add scan column to input columns
435            input_columns.push(scan.variable.clone());
436
437            // Use nested loop join to combine input rows with scanned nodes
438            let join_op = Box::new(NestedLoopJoinOperator::new(
439                input_op,
440                scan_operator,
441                None, // No join condition (cross join)
442                PhysicalJoinType::Cross,
443                output_schema,
444            ));
445
446            Ok((join_op, input_columns))
447        } else {
448            let columns = vec![scan.variable.clone()];
449            Ok((scan_operator, columns))
450        }
451    }
452
453    /// Plans an expand operator.
454    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
455        // Plan the input operator first
456        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
457
458        // Find the source column index
459        let source_column = input_columns
460            .iter()
461            .position(|c| c == &expand.from_variable)
462            .ok_or_else(|| {
463                Error::Internal(format!(
464                    "Source variable '{}' not found in input columns",
465                    expand.from_variable
466                ))
467            })?;
468
469        // Convert expand direction
470        let direction = match expand.direction {
471            ExpandDirection::Outgoing => Direction::Outgoing,
472            ExpandDirection::Incoming => Direction::Incoming,
473            ExpandDirection::Both => Direction::Both,
474        };
475
476        // Check if this is a variable-length path
477        let is_variable_length =
478            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
479
480        let operator: Box<dyn Operator> = if is_variable_length {
481            // Use VariableLengthExpandOperator for multi-hop paths
482            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
483            let mut expand_op = VariableLengthExpandOperator::new(
484                Arc::clone(&self.store),
485                input_op,
486                source_column,
487                direction,
488                expand.edge_type.clone(),
489                expand.min_hops,
490                max_hops,
491            )
492            .with_tx_context(self.viewing_epoch, self.tx_id);
493
494            // If a path alias is set, enable path length output
495            if expand.path_alias.is_some() {
496                expand_op = expand_op.with_path_length_output();
497            }
498
499            Box::new(expand_op)
500        } else {
501            // Use simple ExpandOperator for single-hop paths
502            let expand_op = ExpandOperator::new(
503                Arc::clone(&self.store),
504                input_op,
505                source_column,
506                direction,
507                expand.edge_type.clone(),
508            )
509            .with_tx_context(self.viewing_epoch, self.tx_id);
510            Box::new(expand_op)
511        };
512
513        // Build output columns: [input_columns..., edge, target, (path_length)?]
514        // Preserve all input columns and add edge + target to match ExpandOperator output
515        let mut columns = input_columns;
516
517        // Generate edge column name - use provided name or generate anonymous name
518        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
519            let count = self.anon_edge_counter.get();
520            self.anon_edge_counter.set(count + 1);
521            format!("_anon_edge_{}", count)
522        });
523        columns.push(edge_col_name);
524
525        columns.push(expand.to_variable.clone());
526
527        // If a path alias is set, add a column for the path length
528        if let Some(ref path_alias) = expand.path_alias {
529            columns.push(format!("_path_length_{}", path_alias));
530        }
531
532        Ok((operator, columns))
533    }
534
535    /// Plans a chain of consecutive expand operations using factorized execution.
536    ///
537    /// This avoids the Cartesian product explosion that occurs with separate expands.
538    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
539    ///
540    /// The chain is executed lazily at query time, not during planning. This ensures
541    /// that any filters applied above the expand chain are properly respected.
542    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
543        let expands = Self::collect_expand_chain(op);
544        if expands.is_empty() {
545            return Err(Error::Internal("Empty expand chain".to_string()));
546        }
547
548        // Get the base operator (before first expand)
549        let first_expand = expands[0];
550        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
551
552        let mut columns = base_columns.clone();
553        let mut steps = Vec::new();
554
555        // Track the level-local source column for each expand
556        // For the first expand, it's the column in the input (base_columns)
557        // For subsequent expands, the target from the previous level is always at index 1
558        // (each level adds [edge, target], so target is at index 1)
559        let mut is_first = true;
560
561        for expand in &expands {
562            // Find source column for this expand
563            let source_column = if is_first {
564                // For first expand, find in base columns
565                base_columns
566                    .iter()
567                    .position(|c| c == &expand.from_variable)
568                    .ok_or_else(|| {
569                        Error::Internal(format!(
570                            "Source variable '{}' not found in base columns",
571                            expand.from_variable
572                        ))
573                    })?
574            } else {
575                // For subsequent expands, the target from the previous level is at index 1
576                // (each level adds [edge, target], so target is the second column)
577                1
578            };
579
580            // Convert direction
581            let direction = match expand.direction {
582                ExpandDirection::Outgoing => Direction::Outgoing,
583                ExpandDirection::Incoming => Direction::Incoming,
584                ExpandDirection::Both => Direction::Both,
585            };
586
587            // Add expand step configuration
588            steps.push(ExpandStep {
589                source_column,
590                direction,
591                edge_type: expand.edge_type.clone(),
592            });
593
594            // Add edge and target columns
595            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
596                let count = self.anon_edge_counter.get();
597                self.anon_edge_counter.set(count + 1);
598                format!("_anon_edge_{}", count)
599            });
600            columns.push(edge_col_name);
601            columns.push(expand.to_variable.clone());
602
603            is_first = false;
604        }
605
606        // Create lazy operator that executes at query time, not planning time
607        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
608
609        if let Some(tx_id) = self.tx_id {
610            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
611        } else {
612            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
613        }
614
615        Ok((Box::new(lazy_op), columns))
616    }
617
618    /// Plans a RETURN clause.
619    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
620        // Plan the input operator
621        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
622
623        // Build variable to column index mapping
624        let variable_columns: HashMap<String, usize> = input_columns
625            .iter()
626            .enumerate()
627            .map(|(i, name)| (name.clone(), i))
628            .collect();
629
630        // Extract column names from return items
631        let columns: Vec<String> = ret
632            .items
633            .iter()
634            .map(|item| {
635                item.alias.clone().unwrap_or_else(|| {
636                    // Generate a default name from the expression
637                    expression_to_string(&item.expression)
638                })
639            })
640            .collect();
641
642        // Check if we need a project operator (for property access or expression evaluation)
643        let needs_project = ret
644            .items
645            .iter()
646            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
647
648        if needs_project {
649            // Build project expressions
650            let mut projections = Vec::with_capacity(ret.items.len());
651            let mut output_types = Vec::with_capacity(ret.items.len());
652
653            for item in &ret.items {
654                match &item.expression {
655                    LogicalExpression::Variable(name) => {
656                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
657                            Error::Internal(format!("Variable '{}' not found in input", name))
658                        })?;
659                        projections.push(ProjectExpr::Column(col_idx));
660                        // Use Node type for variables (they could be nodes, edges, or values)
661                        output_types.push(LogicalType::Node);
662                    }
663                    LogicalExpression::Property { variable, property } => {
664                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
665                            Error::Internal(format!("Variable '{}' not found in input", variable))
666                        })?;
667                        projections.push(ProjectExpr::PropertyAccess {
668                            column: col_idx,
669                            property: property.clone(),
670                        });
671                        // Property could be any type - use Any/Generic to preserve type
672                        output_types.push(LogicalType::Any);
673                    }
674                    LogicalExpression::Literal(value) => {
675                        projections.push(ProjectExpr::Constant(value.clone()));
676                        output_types.push(value_to_logical_type(value));
677                    }
678                    LogicalExpression::FunctionCall { name, args, .. } => {
679                        // Handle built-in functions
680                        match name.to_lowercase().as_str() {
681                            "type" => {
682                                // type(r) returns the edge type string
683                                if args.len() != 1 {
684                                    return Err(Error::Internal(
685                                        "type() requires exactly one argument".to_string(),
686                                    ));
687                                }
688                                if let LogicalExpression::Variable(var_name) = &args[0] {
689                                    let col_idx =
690                                        *variable_columns.get(var_name).ok_or_else(|| {
691                                            Error::Internal(format!(
692                                                "Variable '{}' not found in input",
693                                                var_name
694                                            ))
695                                        })?;
696                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
697                                    output_types.push(LogicalType::String);
698                                } else {
699                                    return Err(Error::Internal(
700                                        "type() argument must be a variable".to_string(),
701                                    ));
702                                }
703                            }
704                            "length" => {
705                                // length(p) returns the path length
706                                // For shortestPath results, the path column already contains the length
707                                if args.len() != 1 {
708                                    return Err(Error::Internal(
709                                        "length() requires exactly one argument".to_string(),
710                                    ));
711                                }
712                                if let LogicalExpression::Variable(var_name) = &args[0] {
713                                    let col_idx =
714                                        *variable_columns.get(var_name).ok_or_else(|| {
715                                            Error::Internal(format!(
716                                                "Variable '{}' not found in input",
717                                                var_name
718                                            ))
719                                        })?;
720                                    // Pass through the column value directly
721                                    projections.push(ProjectExpr::Column(col_idx));
722                                    output_types.push(LogicalType::Int64);
723                                } else {
724                                    return Err(Error::Internal(
725                                        "length() argument must be a variable".to_string(),
726                                    ));
727                                }
728                            }
729                            // For other functions (head, tail, size, etc.), use expression evaluation
730                            _ => {
731                                let filter_expr = self.convert_expression(&item.expression)?;
732                                projections.push(ProjectExpr::Expression {
733                                    expr: filter_expr,
734                                    variable_columns: variable_columns.clone(),
735                                });
736                                output_types.push(LogicalType::Any);
737                            }
738                        }
739                    }
740                    LogicalExpression::Case { .. } => {
741                        // Convert CASE expression to FilterExpression for evaluation
742                        let filter_expr = self.convert_expression(&item.expression)?;
743                        projections.push(ProjectExpr::Expression {
744                            expr: filter_expr,
745                            variable_columns: variable_columns.clone(),
746                        });
747                        // CASE can return any type - use Any
748                        output_types.push(LogicalType::Any);
749                    }
750                    _ => {
751                        return Err(Error::Internal(format!(
752                            "Unsupported RETURN expression: {:?}",
753                            item.expression
754                        )));
755                    }
756                }
757            }
758
759            let operator = Box::new(ProjectOperator::with_store(
760                input_op,
761                projections,
762                output_types,
763                Arc::clone(&self.store),
764            ));
765
766            Ok((operator, columns))
767        } else {
768            // Simple case: just return variables
769            // Re-order columns to match return items if needed
770            let mut projections = Vec::with_capacity(ret.items.len());
771            let mut output_types = Vec::with_capacity(ret.items.len());
772
773            for item in &ret.items {
774                if let LogicalExpression::Variable(name) = &item.expression {
775                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
776                        Error::Internal(format!("Variable '{}' not found in input", name))
777                    })?;
778                    projections.push(ProjectExpr::Column(col_idx));
779                    output_types.push(LogicalType::Node);
780                }
781            }
782
783            // Only add ProjectOperator if reordering is needed
784            if projections.len() == input_columns.len()
785                && projections
786                    .iter()
787                    .enumerate()
788                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
789            {
790                // No reordering needed
791                Ok((input_op, columns))
792            } else {
793                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
794                Ok((operator, columns))
795            }
796        }
797    }
798
799    /// Plans a project operator (for WITH clause).
800    fn plan_project(
801        &self,
802        project: &crate::query::plan::ProjectOp,
803    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
804        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
805        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
806            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
807                // Create a single-row operator for projecting literals
808                let single_row_op: Box<dyn Operator> = Box::new(
809                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
810                );
811                (single_row_op, Vec::new())
812            } else {
813                self.plan_operator(&project.input)?
814            };
815
816        // Build variable to column index mapping
817        let variable_columns: HashMap<String, usize> = input_columns
818            .iter()
819            .enumerate()
820            .map(|(i, name)| (name.clone(), i))
821            .collect();
822
823        // Build projections and new column names
824        let mut projections = Vec::with_capacity(project.projections.len());
825        let mut output_types = Vec::with_capacity(project.projections.len());
826        let mut output_columns = Vec::with_capacity(project.projections.len());
827
828        for projection in &project.projections {
829            // Determine the output column name (alias or expression string)
830            let col_name = projection
831                .alias
832                .clone()
833                .unwrap_or_else(|| expression_to_string(&projection.expression));
834            output_columns.push(col_name);
835
836            match &projection.expression {
837                LogicalExpression::Variable(name) => {
838                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
839                        Error::Internal(format!("Variable '{}' not found in input", name))
840                    })?;
841                    projections.push(ProjectExpr::Column(col_idx));
842                    output_types.push(LogicalType::Node);
843                }
844                LogicalExpression::Property { variable, property } => {
845                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
846                        Error::Internal(format!("Variable '{}' not found in input", variable))
847                    })?;
848                    projections.push(ProjectExpr::PropertyAccess {
849                        column: col_idx,
850                        property: property.clone(),
851                    });
852                    output_types.push(LogicalType::Any);
853                }
854                LogicalExpression::Literal(value) => {
855                    projections.push(ProjectExpr::Constant(value.clone()));
856                    output_types.push(value_to_logical_type(value));
857                }
858                _ => {
859                    // For complex expressions, use full expression evaluation
860                    let filter_expr = self.convert_expression(&projection.expression)?;
861                    projections.push(ProjectExpr::Expression {
862                        expr: filter_expr,
863                        variable_columns: variable_columns.clone(),
864                    });
865                    output_types.push(LogicalType::Any);
866                }
867            }
868        }
869
870        let operator = Box::new(ProjectOperator::with_store(
871            input_op,
872            projections,
873            output_types,
874            Arc::clone(&self.store),
875        ));
876
877        Ok((operator, output_columns))
878    }
879
880    /// Plans a filter operator.
881    ///
882    /// Uses zone map pre-filtering to potentially skip scans when predicates
883    /// definitely won't match any data. Also uses property indexes when available
884    /// for O(1) lookups instead of full scans.
885    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
886        // Check zone maps for simple property predicates before scanning
887        // If zone map says "definitely no matches", we can short-circuit
888        if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
889            // Zone map says no matches possible - return empty result
890            let (_, columns) = self.plan_operator(&filter.input)?;
891            let schema = self.derive_schema_from_columns(&columns);
892            let empty_op = Box::new(EmptyOperator::new(schema));
893            return Ok((empty_op, columns));
894        }
895
896        // Try to use property index for equality predicates on indexed properties
897        if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
898            return Ok(result);
899        }
900
901        // Try to use range optimization for range predicates (>, <, >=, <=)
902        if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
903            return Ok(result);
904        }
905
906        // Plan the input operator first
907        let (input_op, columns) = self.plan_operator(&filter.input)?;
908
909        // Build variable to column index mapping
910        let variable_columns: HashMap<String, usize> = columns
911            .iter()
912            .enumerate()
913            .map(|(i, name)| (name.clone(), i))
914            .collect();
915
916        // Convert logical expression to filter expression
917        let filter_expr = self.convert_expression(&filter.predicate)?;
918
919        // Create the predicate
920        let predicate =
921            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
922
923        // Create the filter operator
924        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
925
926        Ok((operator, columns))
927    }
928
929    /// Checks zone maps for a predicate to see if we can skip the scan entirely.
930    ///
931    /// Returns:
932    /// - `Some(false)` if zone map proves no matches possible (can skip)
933    /// - `Some(true)` if zone map says matches might exist
934    /// - `None` if zone map check not applicable
935    fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
936        use grafeo_core::graph::lpg::CompareOp;
937
938        match predicate {
939            LogicalExpression::Binary { left, op, right } => {
940                // Check for AND/OR first (compound conditions)
941                match op {
942                    BinaryOp::And => {
943                        let left_result = self.check_zone_map_for_predicate(left);
944                        let right_result = self.check_zone_map_for_predicate(right);
945
946                        return match (left_result, right_result) {
947                            // If either side definitely won't match, the AND won't match
948                            (Some(false), _) | (_, Some(false)) => Some(false),
949                            // If both might match, might match overall
950                            (Some(true), Some(true)) => Some(true),
951                            // Otherwise, can't determine
952                            _ => None,
953                        };
954                    }
955                    BinaryOp::Or => {
956                        let left_result = self.check_zone_map_for_predicate(left);
957                        let right_result = self.check_zone_map_for_predicate(right);
958
959                        return match (left_result, right_result) {
960                            // Both sides definitely won't match
961                            (Some(false), Some(false)) => Some(false),
962                            // At least one side might match
963                            (Some(true), _) | (_, Some(true)) => Some(true),
964                            // Otherwise, can't determine
965                            _ => None,
966                        };
967                    }
968                    _ => {}
969                }
970
971                // Simple property comparison: n.property op value
972                let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
973                    (
974                        LogicalExpression::Property { property, .. },
975                        LogicalExpression::Literal(val),
976                    ) => {
977                        let cmp = match op {
978                            BinaryOp::Eq => CompareOp::Eq,
979                            BinaryOp::Ne => CompareOp::Ne,
980                            BinaryOp::Lt => CompareOp::Lt,
981                            BinaryOp::Le => CompareOp::Le,
982                            BinaryOp::Gt => CompareOp::Gt,
983                            BinaryOp::Ge => CompareOp::Ge,
984                            _ => return None,
985                        };
986                        (property.clone(), cmp, val.clone())
987                    }
988                    (
989                        LogicalExpression::Literal(val),
990                        LogicalExpression::Property { property, .. },
991                    ) => {
992                        // Flip comparison for reversed operands
993                        let cmp = match op {
994                            BinaryOp::Eq => CompareOp::Eq,
995                            BinaryOp::Ne => CompareOp::Ne,
996                            BinaryOp::Lt => CompareOp::Gt, // val < prop means prop > val
997                            BinaryOp::Le => CompareOp::Ge,
998                            BinaryOp::Gt => CompareOp::Lt,
999                            BinaryOp::Ge => CompareOp::Le,
1000                            _ => return None,
1001                        };
1002                        (property.clone(), cmp, val.clone())
1003                    }
1004                    _ => return None,
1005                };
1006
1007                // Check zone map for node properties
1008                let might_match =
1009                    self.store
1010                        .node_property_might_match(&property.into(), compare_op, &value);
1011
1012                Some(might_match)
1013            }
1014
1015            _ => None,
1016        }
1017    }
1018
1019    /// Tries to use a property index for filter optimization.
1020    ///
1021    /// When a filter predicate is an equality check on an indexed property,
1022    /// and the input is a simple NodeScan, we can use the index to look up
1023    /// matching nodes directly instead of scanning all nodes.
1024    ///
1025    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1026    /// `Ok(None)` if not applicable, or `Err` on error.
1027    fn try_plan_filter_with_property_index(
1028        &self,
1029        filter: &FilterOp,
1030    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1031        // Only optimize if input is a simple NodeScan (not nested)
1032        let (scan_variable, scan_label) = match filter.input.as_ref() {
1033            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1034                (scan.variable.clone(), scan.label.clone())
1035            }
1036            _ => return Ok(None),
1037        };
1038
1039        // Extract property equality conditions from the predicate
1040        // Handles both simple (n.prop = val) and compound (n.a = 1 AND n.b = 2)
1041        let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1042
1043        if conditions.is_empty() {
1044            return Ok(None);
1045        }
1046
1047        // Check if at least one condition has an index (otherwise full scan is needed anyway)
1048        let has_indexed_condition = conditions
1049            .iter()
1050            .any(|(prop, _)| self.store.has_property_index(prop));
1051
1052        if !has_indexed_condition {
1053            return Ok(None);
1054        }
1055
1056        // Use the optimized batch lookup for multiple conditions
1057        let conditions_ref: Vec<(&str, Value)> = conditions
1058            .iter()
1059            .map(|(p, v)| (p.as_str(), v.clone()))
1060            .collect();
1061        let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1062
1063        // If there's a label filter, also filter by label
1064        if let Some(label) = &scan_label {
1065            let label_nodes: std::collections::HashSet<_> =
1066                self.store.nodes_by_label(label).into_iter().collect();
1067            matching_nodes.retain(|n| label_nodes.contains(n));
1068        }
1069
1070        // Create a NodeListOperator with the matching nodes
1071        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1072        let columns = vec![scan_variable];
1073
1074        Ok(Some((node_list_op, columns)))
1075    }
1076
1077    /// Extracts equality conditions (property = literal) from a predicate.
1078    ///
1079    /// Handles both simple predicates and AND chains:
1080    /// - `n.name = "Alice"` → `[("name", "Alice")]`
1081    /// - `n.name = "Alice" AND n.age = 30` → `[("name", "Alice"), ("age", 30)]`
1082    fn extract_equality_conditions(
1083        &self,
1084        predicate: &LogicalExpression,
1085        target_variable: &str,
1086    ) -> Vec<(String, Value)> {
1087        let mut conditions = Vec::new();
1088        self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1089        conditions
1090    }
1091
1092    /// Recursively collects equality conditions from AND expressions.
1093    fn collect_equality_conditions(
1094        &self,
1095        expr: &LogicalExpression,
1096        target_variable: &str,
1097        conditions: &mut Vec<(String, Value)>,
1098    ) {
1099        match expr {
1100            // Handle AND: recurse into both sides
1101            LogicalExpression::Binary {
1102                left,
1103                op: BinaryOp::And,
1104                right,
1105            } => {
1106                self.collect_equality_conditions(left, target_variable, conditions);
1107                self.collect_equality_conditions(right, target_variable, conditions);
1108            }
1109
1110            // Handle equality: extract property and value
1111            LogicalExpression::Binary {
1112                left,
1113                op: BinaryOp::Eq,
1114                right,
1115            } => {
1116                if let Some((var, prop, val)) = self.extract_property_equality(left, right) {
1117                    if var == target_variable {
1118                        conditions.push((prop, val));
1119                    }
1120                }
1121            }
1122
1123            _ => {}
1124        }
1125    }
1126
1127    /// Extracts (variable, property, value) from a property equality expression.
1128    fn extract_property_equality(
1129        &self,
1130        left: &LogicalExpression,
1131        right: &LogicalExpression,
1132    ) -> Option<(String, String, Value)> {
1133        match (left, right) {
1134            (
1135                LogicalExpression::Property { variable, property },
1136                LogicalExpression::Literal(val),
1137            ) => Some((variable.clone(), property.clone(), val.clone())),
1138            (
1139                LogicalExpression::Literal(val),
1140                LogicalExpression::Property { variable, property },
1141            ) => Some((variable.clone(), property.clone(), val.clone())),
1142            _ => None,
1143        }
1144    }
1145
1146    /// Tries to optimize a filter using range queries on properties.
1147    ///
1148    /// This optimization is applied when:
1149    /// - The input is a simple NodeScan (no nested operations)
1150    /// - The predicate contains range comparisons (>, <, >=, <=)
1151    /// - The same variable and property are being filtered
1152    ///
1153    /// Handles both simple range predicates (`n.age > 30`) and BETWEEN patterns
1154    /// (`n.age >= 30 AND n.age <= 50`).
1155    ///
1156    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1157    /// `Ok(None)` if not applicable, or `Err` on error.
1158    fn try_plan_filter_with_range_index(
1159        &self,
1160        filter: &FilterOp,
1161    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1162        // Only optimize if input is a simple NodeScan (not nested)
1163        let (scan_variable, scan_label) = match filter.input.as_ref() {
1164            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1165                (scan.variable.clone(), scan.label.clone())
1166            }
1167            _ => return Ok(None),
1168        };
1169
1170        // Try to extract BETWEEN pattern first (more efficient)
1171        if let Some((variable, property, min, max, min_inc, max_inc)) =
1172            self.extract_between_predicate(&filter.predicate)
1173        {
1174            if variable == scan_variable {
1175                return self.plan_range_filter(
1176                    &scan_variable,
1177                    &scan_label,
1178                    &property,
1179                    RangeBounds {
1180                        min: Some(&min),
1181                        max: Some(&max),
1182                        min_inclusive: min_inc,
1183                        max_inclusive: max_inc,
1184                    },
1185                );
1186            }
1187        }
1188
1189        // Try to extract simple range predicate
1190        if let Some((variable, property, op, value)) =
1191            self.extract_range_predicate(&filter.predicate)
1192        {
1193            if variable == scan_variable {
1194                let (min, max, min_inc, max_inc) = match op {
1195                    BinaryOp::Lt => (None, Some(value), false, false),
1196                    BinaryOp::Le => (None, Some(value), false, true),
1197                    BinaryOp::Gt => (Some(value), None, false, false),
1198                    BinaryOp::Ge => (Some(value), None, true, false),
1199                    _ => return Ok(None),
1200                };
1201                return self.plan_range_filter(
1202                    &scan_variable,
1203                    &scan_label,
1204                    &property,
1205                    RangeBounds {
1206                        min: min.as_ref(),
1207                        max: max.as_ref(),
1208                        min_inclusive: min_inc,
1209                        max_inclusive: max_inc,
1210                    },
1211                );
1212            }
1213        }
1214
1215        Ok(None)
1216    }
1217
1218    /// Plans a range filter using `find_nodes_in_range`.
1219    fn plan_range_filter(
1220        &self,
1221        scan_variable: &str,
1222        scan_label: &Option<String>,
1223        property: &str,
1224        bounds: RangeBounds<'_>,
1225    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1226        // Use the store's range query method
1227        let mut matching_nodes = self.store.find_nodes_in_range(
1228            property,
1229            bounds.min,
1230            bounds.max,
1231            bounds.min_inclusive,
1232            bounds.max_inclusive,
1233        );
1234
1235        // If there's a label filter, also filter by label
1236        if let Some(label) = scan_label {
1237            let label_nodes: std::collections::HashSet<_> =
1238                self.store.nodes_by_label(label).into_iter().collect();
1239            matching_nodes.retain(|n| label_nodes.contains(n));
1240        }
1241
1242        // Create a NodeListOperator with the matching nodes
1243        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1244        let columns = vec![scan_variable.to_string()];
1245
1246        Ok(Some((node_list_op, columns)))
1247    }
1248
1249    /// Extracts a simple range predicate (>, <, >=, <=) from an expression.
1250    ///
1251    /// Returns `(variable, property, operator, value)` if found.
1252    fn extract_range_predicate(
1253        &self,
1254        predicate: &LogicalExpression,
1255    ) -> Option<(String, String, BinaryOp, Value)> {
1256        match predicate {
1257            LogicalExpression::Binary { left, op, right } => {
1258                match op {
1259                    BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1260                        // Try property on left: n.age > 30
1261                        if let (
1262                            LogicalExpression::Property { variable, property },
1263                            LogicalExpression::Literal(val),
1264                        ) = (left.as_ref(), right.as_ref())
1265                        {
1266                            return Some((variable.clone(), property.clone(), *op, val.clone()));
1267                        }
1268
1269                        // Try property on right: 30 < n.age (flip operator)
1270                        if let (
1271                            LogicalExpression::Literal(val),
1272                            LogicalExpression::Property { variable, property },
1273                        ) = (left.as_ref(), right.as_ref())
1274                        {
1275                            let flipped_op = match op {
1276                                BinaryOp::Lt => BinaryOp::Gt,
1277                                BinaryOp::Le => BinaryOp::Ge,
1278                                BinaryOp::Gt => BinaryOp::Lt,
1279                                BinaryOp::Ge => BinaryOp::Le,
1280                                _ => return None,
1281                            };
1282                            return Some((
1283                                variable.clone(),
1284                                property.clone(),
1285                                flipped_op,
1286                                val.clone(),
1287                            ));
1288                        }
1289                    }
1290                    _ => {}
1291                }
1292            }
1293            _ => {}
1294        }
1295        None
1296    }
1297
1298    /// Extracts a BETWEEN pattern from compound predicates.
1299    ///
1300    /// Recognizes patterns like:
1301    /// - `n.age >= 30 AND n.age <= 50`
1302    /// - `n.age > 30 AND n.age < 50`
1303    ///
1304    /// Returns `(variable, property, min_value, max_value, min_inclusive, max_inclusive)`.
1305    fn extract_between_predicate(
1306        &self,
1307        predicate: &LogicalExpression,
1308    ) -> Option<(String, String, Value, Value, bool, bool)> {
1309        // Must be an AND expression
1310        let (left, right) = match predicate {
1311            LogicalExpression::Binary {
1312                left,
1313                op: BinaryOp::And,
1314                right,
1315            } => (left.as_ref(), right.as_ref()),
1316            _ => return None,
1317        };
1318
1319        // Extract range predicates from both sides
1320        let left_range = self.extract_range_predicate(left);
1321        let right_range = self.extract_range_predicate(right);
1322
1323        let (left_var, left_prop, left_op, left_val) = left_range?;
1324        let (right_var, right_prop, right_op, right_val) = right_range?;
1325
1326        // Must be same variable and property
1327        if left_var != right_var || left_prop != right_prop {
1328            return None;
1329        }
1330
1331        // Determine which is lower bound and which is upper bound
1332        let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1333            // n.x >= min AND n.x <= max
1334            (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1335            // n.x >= min AND n.x < max
1336            (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1337            // n.x > min AND n.x <= max
1338            (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1339            // n.x > min AND n.x < max
1340            (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1341            // Reversed order: n.x <= max AND n.x >= min
1342            (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1343            // n.x < max AND n.x >= min
1344            (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1345            // n.x <= max AND n.x > min
1346            (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1347            // n.x < max AND n.x > min
1348            (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1349            _ => return None,
1350        };
1351
1352        Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1353    }
1354
1355    /// Plans a LIMIT operator.
1356    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1357        let (input_op, columns) = self.plan_operator(&limit.input)?;
1358        let output_schema = self.derive_schema_from_columns(&columns);
1359        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1360        Ok((operator, columns))
1361    }
1362
1363    /// Plans a SKIP operator.
1364    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1365        let (input_op, columns) = self.plan_operator(&skip.input)?;
1366        let output_schema = self.derive_schema_from_columns(&columns);
1367        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1368        Ok((operator, columns))
1369    }
1370
1371    /// Plans a SORT (ORDER BY) operator.
1372    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1373        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1374
1375        // Build variable to column index mapping
1376        let mut variable_columns: HashMap<String, usize> = input_columns
1377            .iter()
1378            .enumerate()
1379            .map(|(i, name)| (name.clone(), i))
1380            .collect();
1381
1382        // Collect property expressions that need to be projected before sorting
1383        let mut property_projections: Vec<(String, String, String)> = Vec::new();
1384        let mut next_col_idx = input_columns.len();
1385
1386        for key in &sort.keys {
1387            if let LogicalExpression::Property { variable, property } = &key.expression {
1388                let col_name = format!("{}_{}", variable, property);
1389                if !variable_columns.contains_key(&col_name) {
1390                    property_projections.push((
1391                        variable.clone(),
1392                        property.clone(),
1393                        col_name.clone(),
1394                    ));
1395                    variable_columns.insert(col_name, next_col_idx);
1396                    next_col_idx += 1;
1397                }
1398            }
1399        }
1400
1401        // Track output columns
1402        let mut output_columns = input_columns.clone();
1403
1404        // If we have property expressions, add a projection to materialize them
1405        if !property_projections.is_empty() {
1406            let mut projections = Vec::new();
1407            let mut output_types = Vec::new();
1408
1409            // First, pass through all existing columns (use Node type to preserve node IDs
1410            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1411            for (i, _) in input_columns.iter().enumerate() {
1412                projections.push(ProjectExpr::Column(i));
1413                output_types.push(LogicalType::Node);
1414            }
1415
1416            // Then add property access projections
1417            for (variable, property, col_name) in &property_projections {
1418                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1419                    Error::Internal(format!(
1420                        "Variable '{}' not found for ORDER BY property projection",
1421                        variable
1422                    ))
1423                })?;
1424                projections.push(ProjectExpr::PropertyAccess {
1425                    column: source_col,
1426                    property: property.clone(),
1427                });
1428                output_types.push(LogicalType::Any);
1429                output_columns.push(col_name.clone());
1430            }
1431
1432            input_op = Box::new(ProjectOperator::with_store(
1433                input_op,
1434                projections,
1435                output_types,
1436                Arc::clone(&self.store),
1437            ));
1438        }
1439
1440        // Convert logical sort keys to physical sort keys
1441        let physical_keys: Vec<PhysicalSortKey> = sort
1442            .keys
1443            .iter()
1444            .map(|key| {
1445                let col_idx = self
1446                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1447                Ok(PhysicalSortKey {
1448                    column: col_idx,
1449                    direction: match key.order {
1450                        SortOrder::Ascending => SortDirection::Ascending,
1451                        SortOrder::Descending => SortDirection::Descending,
1452                    },
1453                    null_order: NullOrder::NullsLast,
1454                })
1455            })
1456            .collect::<Result<Vec<_>>>()?;
1457
1458        let output_schema = self.derive_schema_from_columns(&output_columns);
1459        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1460        Ok((operator, output_columns))
1461    }
1462
1463    /// Resolves a sort expression to a column index, using projected property columns.
1464    fn resolve_sort_expression_with_properties(
1465        &self,
1466        expr: &LogicalExpression,
1467        variable_columns: &HashMap<String, usize>,
1468    ) -> Result<usize> {
1469        match expr {
1470            LogicalExpression::Variable(name) => {
1471                variable_columns.get(name).copied().ok_or_else(|| {
1472                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1473                })
1474            }
1475            LogicalExpression::Property { variable, property } => {
1476                // Look up the projected property column (e.g., "p_age" for p.age)
1477                let col_name = format!("{}_{}", variable, property);
1478                variable_columns.get(&col_name).copied().ok_or_else(|| {
1479                    Error::Internal(format!(
1480                        "Property column '{}' not found for ORDER BY (from {}.{})",
1481                        col_name, variable, property
1482                    ))
1483                })
1484            }
1485            _ => Err(Error::Internal(format!(
1486                "Unsupported ORDER BY expression: {:?}",
1487                expr
1488            ))),
1489        }
1490    }
1491
1492    /// Derives a schema from column names (uses Any type to handle all value types).
1493    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1494        columns.iter().map(|_| LogicalType::Any).collect()
1495    }
1496
1497    /// Plans an AGGREGATE operator.
1498    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1499        // Check if we can use factorized aggregation for speedup
1500        // Conditions:
1501        // 1. Factorized execution is enabled
1502        // 2. Input is an expand chain (multi-hop)
1503        // 3. No GROUP BY
1504        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1505        if self.factorized_execution
1506            && agg.group_by.is_empty()
1507            && Self::count_expand_chain(&agg.input).0 >= 2
1508            && self.is_simple_aggregate(agg)
1509        {
1510            if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1511                return Ok((op, cols));
1512            }
1513            // Fall through to regular aggregate if factorized planning fails
1514        }
1515
1516        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1517
1518        // Build variable to column index mapping
1519        let mut variable_columns: HashMap<String, usize> = input_columns
1520            .iter()
1521            .enumerate()
1522            .map(|(i, name)| (name.clone(), i))
1523            .collect();
1524
1525        // Collect all property expressions that need to be projected before aggregation
1526        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1527        let mut next_col_idx = input_columns.len();
1528
1529        // Check group-by expressions for properties
1530        for expr in &agg.group_by {
1531            if let LogicalExpression::Property { variable, property } = expr {
1532                let col_name = format!("{}_{}", variable, property);
1533                if !variable_columns.contains_key(&col_name) {
1534                    property_projections.push((
1535                        variable.clone(),
1536                        property.clone(),
1537                        col_name.clone(),
1538                    ));
1539                    variable_columns.insert(col_name, next_col_idx);
1540                    next_col_idx += 1;
1541                }
1542            }
1543        }
1544
1545        // Check aggregate expressions for properties
1546        for agg_expr in &agg.aggregates {
1547            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1548                let col_name = format!("{}_{}", variable, property);
1549                if !variable_columns.contains_key(&col_name) {
1550                    property_projections.push((
1551                        variable.clone(),
1552                        property.clone(),
1553                        col_name.clone(),
1554                    ));
1555                    variable_columns.insert(col_name, next_col_idx);
1556                    next_col_idx += 1;
1557                }
1558            }
1559        }
1560
1561        // If we have property expressions, add a projection to materialize them
1562        if !property_projections.is_empty() {
1563            let mut projections = Vec::new();
1564            let mut output_types = Vec::new();
1565
1566            // First, pass through all existing columns (use Node type to preserve node IDs
1567            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1568            for (i, _) in input_columns.iter().enumerate() {
1569                projections.push(ProjectExpr::Column(i));
1570                output_types.push(LogicalType::Node);
1571            }
1572
1573            // Then add property access projections
1574            for (variable, property, _col_name) in &property_projections {
1575                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1576                    Error::Internal(format!(
1577                        "Variable '{}' not found for property projection",
1578                        variable
1579                    ))
1580                })?;
1581                projections.push(ProjectExpr::PropertyAccess {
1582                    column: source_col,
1583                    property: property.clone(),
1584                });
1585                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1586            }
1587
1588            input_op = Box::new(ProjectOperator::with_store(
1589                input_op,
1590                projections,
1591                output_types,
1592                Arc::clone(&self.store),
1593            ));
1594        }
1595
1596        // Convert group-by expressions to column indices
1597        let group_columns: Vec<usize> = agg
1598            .group_by
1599            .iter()
1600            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1601            .collect::<Result<Vec<_>>>()?;
1602
1603        // Convert aggregate expressions to physical form
1604        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1605            .aggregates
1606            .iter()
1607            .map(|agg_expr| {
1608                let column = agg_expr
1609                    .expression
1610                    .as_ref()
1611                    .map(|e| {
1612                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1613                    })
1614                    .transpose()?;
1615
1616                Ok(PhysicalAggregateExpr {
1617                    function: convert_aggregate_function(agg_expr.function),
1618                    column,
1619                    distinct: agg_expr.distinct,
1620                    alias: agg_expr.alias.clone(),
1621                    percentile: agg_expr.percentile,
1622                })
1623            })
1624            .collect::<Result<Vec<_>>>()?;
1625
1626        // Build output schema and column names
1627        let mut output_schema = Vec::new();
1628        let mut output_columns = Vec::new();
1629
1630        // Add group-by columns
1631        for expr in &agg.group_by {
1632            output_schema.push(LogicalType::Any); // Group-by values can be any type
1633            output_columns.push(expression_to_string(expr));
1634        }
1635
1636        // Add aggregate result columns
1637        for agg_expr in &agg.aggregates {
1638            let result_type = match agg_expr.function {
1639                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1640                    LogicalType::Int64
1641                }
1642                LogicalAggregateFunction::Sum => LogicalType::Int64,
1643                LogicalAggregateFunction::Avg => LogicalType::Float64,
1644                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1645                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1646                    // since the aggregate can return any Value type, but the most common case
1647                    // is numeric values from property expressions
1648                    LogicalType::Int64
1649                }
1650                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1651                // Statistical functions return Float64
1652                LogicalAggregateFunction::StdDev
1653                | LogicalAggregateFunction::StdDevPop
1654                | LogicalAggregateFunction::PercentileDisc
1655                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1656            };
1657            output_schema.push(result_type);
1658            output_columns.push(
1659                agg_expr
1660                    .alias
1661                    .clone()
1662                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1663            );
1664        }
1665
1666        // Choose operator based on whether there are group-by columns
1667        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1668            Box::new(SimpleAggregateOperator::new(
1669                input_op,
1670                physical_aggregates,
1671                output_schema,
1672            ))
1673        } else {
1674            Box::new(HashAggregateOperator::new(
1675                input_op,
1676                group_columns,
1677                physical_aggregates,
1678                output_schema,
1679            ))
1680        };
1681
1682        // Apply HAVING clause filter if present
1683        if let Some(having_expr) = &agg.having {
1684            // Build variable to column mapping for the aggregate output
1685            let having_var_columns: HashMap<String, usize> = output_columns
1686                .iter()
1687                .enumerate()
1688                .map(|(i, name)| (name.clone(), i))
1689                .collect();
1690
1691            let filter_expr = self.convert_expression(having_expr)?;
1692            let predicate =
1693                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1694            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1695        }
1696
1697        Ok((operator, output_columns))
1698    }
1699
1700    /// Checks if an aggregate is simple enough for factorized execution.
1701    ///
1702    /// Simple aggregates:
1703    /// - COUNT(*) or COUNT(variable)
1704    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1705    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1706        agg.aggregates.iter().all(|agg_expr| {
1707            match agg_expr.function {
1708                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1709                    // COUNT(*) is always OK, COUNT(var) is OK
1710                    agg_expr.expression.is_none()
1711                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1712                }
1713                LogicalAggregateFunction::Sum
1714                | LogicalAggregateFunction::Avg
1715                | LogicalAggregateFunction::Min
1716                | LogicalAggregateFunction::Max => {
1717                    // For now, only support when expression is a variable
1718                    // (property access would require flattening first)
1719                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1720                }
1721                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1722                _ => false,
1723            }
1724        })
1725    }
1726
1727    /// Plans a factorized aggregate that operates directly on factorized data.
1728    ///
1729    /// This avoids the O(n²) cost of flattening before aggregation.
1730    fn plan_factorized_aggregate(
1731        &self,
1732        agg: &AggregateOp,
1733    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1734        // Build the expand chain - this returns a LazyFactorizedChainOperator
1735        let expands = Self::collect_expand_chain(&agg.input);
1736        if expands.is_empty() {
1737            return Err(Error::Internal(
1738                "Expected expand chain for factorized aggregate".to_string(),
1739            ));
1740        }
1741
1742        // Get the base operator (before first expand)
1743        let first_expand = expands[0];
1744        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1745
1746        let mut columns = base_columns.clone();
1747        let mut steps = Vec::new();
1748        let mut is_first = true;
1749
1750        for expand in &expands {
1751            // Find source column for this expand
1752            let source_column = if is_first {
1753                base_columns
1754                    .iter()
1755                    .position(|c| c == &expand.from_variable)
1756                    .ok_or_else(|| {
1757                        Error::Internal(format!(
1758                            "Source variable '{}' not found in base columns",
1759                            expand.from_variable
1760                        ))
1761                    })?
1762            } else {
1763                1 // Target from previous level
1764            };
1765
1766            let direction = match expand.direction {
1767                ExpandDirection::Outgoing => Direction::Outgoing,
1768                ExpandDirection::Incoming => Direction::Incoming,
1769                ExpandDirection::Both => Direction::Both,
1770            };
1771
1772            steps.push(ExpandStep {
1773                source_column,
1774                direction,
1775                edge_type: expand.edge_type.clone(),
1776            });
1777
1778            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1779                let count = self.anon_edge_counter.get();
1780                self.anon_edge_counter.set(count + 1);
1781                format!("_anon_edge_{}", count)
1782            });
1783            columns.push(edge_col_name);
1784            columns.push(expand.to_variable.clone());
1785
1786            is_first = false;
1787        }
1788
1789        // Create the lazy factorized chain operator
1790        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1791
1792        if let Some(tx_id) = self.tx_id {
1793            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1794        } else {
1795            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1796        }
1797
1798        // Convert logical aggregates to factorized aggregates
1799        let factorized_aggs: Vec<FactorizedAggregate> = agg
1800            .aggregates
1801            .iter()
1802            .map(|agg_expr| {
1803                match agg_expr.function {
1804                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1805                        // COUNT(*) uses simple count, COUNT(col) uses column count
1806                        if agg_expr.expression.is_none() {
1807                            FactorizedAggregate::count()
1808                        } else {
1809                            // For COUNT(variable), we use the deepest level's target column
1810                            // which is the last column added to the schema
1811                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1812                        }
1813                    }
1814                    LogicalAggregateFunction::Sum => {
1815                        // SUM on deepest level target
1816                        FactorizedAggregate::sum(1)
1817                    }
1818                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1819                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1820                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1821                    _ => {
1822                        // Shouldn't reach here due to is_simple_aggregate check
1823                        FactorizedAggregate::count()
1824                    }
1825                }
1826            })
1827            .collect();
1828
1829        // Build output column names
1830        let output_columns: Vec<String> = agg
1831            .aggregates
1832            .iter()
1833            .map(|agg_expr| {
1834                agg_expr
1835                    .alias
1836                    .clone()
1837                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1838            })
1839            .collect();
1840
1841        // Create the factorized aggregate operator
1842        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1843
1844        Ok((Box::new(factorized_agg_op), output_columns))
1845    }
1846
1847    /// Resolves a logical expression to a column index.
1848    #[allow(dead_code)]
1849    fn resolve_expression_to_column(
1850        &self,
1851        expr: &LogicalExpression,
1852        variable_columns: &HashMap<String, usize>,
1853    ) -> Result<usize> {
1854        match expr {
1855            LogicalExpression::Variable(name) => variable_columns
1856                .get(name)
1857                .copied()
1858                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1859            LogicalExpression::Property { variable, .. } => variable_columns
1860                .get(variable)
1861                .copied()
1862                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1863            _ => Err(Error::Internal(format!(
1864                "Cannot resolve expression to column: {:?}",
1865                expr
1866            ))),
1867        }
1868    }
1869
1870    /// Resolves a logical expression to a column index, using projected property columns.
1871    ///
1872    /// This is used for aggregations where properties have been projected into their own columns.
1873    fn resolve_expression_to_column_with_properties(
1874        &self,
1875        expr: &LogicalExpression,
1876        variable_columns: &HashMap<String, usize>,
1877    ) -> Result<usize> {
1878        match expr {
1879            LogicalExpression::Variable(name) => variable_columns
1880                .get(name)
1881                .copied()
1882                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1883            LogicalExpression::Property { variable, property } => {
1884                // Look up the projected property column (e.g., "p_price" for p.price)
1885                let col_name = format!("{}_{}", variable, property);
1886                variable_columns.get(&col_name).copied().ok_or_else(|| {
1887                    Error::Internal(format!(
1888                        "Property column '{}' not found (from {}.{})",
1889                        col_name, variable, property
1890                    ))
1891                })
1892            }
1893            _ => Err(Error::Internal(format!(
1894                "Cannot resolve expression to column: {:?}",
1895                expr
1896            ))),
1897        }
1898    }
1899
1900    /// Converts a logical expression to a filter expression.
1901    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1902        match expr {
1903            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1904            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1905            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1906                variable: variable.clone(),
1907                property: property.clone(),
1908            }),
1909            LogicalExpression::Binary { left, op, right } => {
1910                let left_expr = self.convert_expression(left)?;
1911                let right_expr = self.convert_expression(right)?;
1912                let filter_op = convert_binary_op(*op)?;
1913                Ok(FilterExpression::Binary {
1914                    left: Box::new(left_expr),
1915                    op: filter_op,
1916                    right: Box::new(right_expr),
1917                })
1918            }
1919            LogicalExpression::Unary { op, operand } => {
1920                let operand_expr = self.convert_expression(operand)?;
1921                let filter_op = convert_unary_op(*op)?;
1922                Ok(FilterExpression::Unary {
1923                    op: filter_op,
1924                    operand: Box::new(operand_expr),
1925                })
1926            }
1927            LogicalExpression::FunctionCall { name, args, .. } => {
1928                let filter_args: Vec<FilterExpression> = args
1929                    .iter()
1930                    .map(|a| self.convert_expression(a))
1931                    .collect::<Result<Vec<_>>>()?;
1932                Ok(FilterExpression::FunctionCall {
1933                    name: name.clone(),
1934                    args: filter_args,
1935                })
1936            }
1937            LogicalExpression::Case {
1938                operand,
1939                when_clauses,
1940                else_clause,
1941            } => {
1942                let filter_operand = operand
1943                    .as_ref()
1944                    .map(|e| self.convert_expression(e))
1945                    .transpose()?
1946                    .map(Box::new);
1947                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1948                    .iter()
1949                    .map(|(cond, result)| {
1950                        Ok((
1951                            self.convert_expression(cond)?,
1952                            self.convert_expression(result)?,
1953                        ))
1954                    })
1955                    .collect::<Result<Vec<_>>>()?;
1956                let filter_else = else_clause
1957                    .as_ref()
1958                    .map(|e| self.convert_expression(e))
1959                    .transpose()?
1960                    .map(Box::new);
1961                Ok(FilterExpression::Case {
1962                    operand: filter_operand,
1963                    when_clauses: filter_when_clauses,
1964                    else_clause: filter_else,
1965                })
1966            }
1967            LogicalExpression::List(items) => {
1968                let filter_items: Vec<FilterExpression> = items
1969                    .iter()
1970                    .map(|item| self.convert_expression(item))
1971                    .collect::<Result<Vec<_>>>()?;
1972                Ok(FilterExpression::List(filter_items))
1973            }
1974            LogicalExpression::Map(pairs) => {
1975                let filter_pairs: Vec<(String, FilterExpression)> = pairs
1976                    .iter()
1977                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1978                    .collect::<Result<Vec<_>>>()?;
1979                Ok(FilterExpression::Map(filter_pairs))
1980            }
1981            LogicalExpression::IndexAccess { base, index } => {
1982                let base_expr = self.convert_expression(base)?;
1983                let index_expr = self.convert_expression(index)?;
1984                Ok(FilterExpression::IndexAccess {
1985                    base: Box::new(base_expr),
1986                    index: Box::new(index_expr),
1987                })
1988            }
1989            LogicalExpression::SliceAccess { base, start, end } => {
1990                let base_expr = self.convert_expression(base)?;
1991                let start_expr = start
1992                    .as_ref()
1993                    .map(|s| self.convert_expression(s))
1994                    .transpose()?
1995                    .map(Box::new);
1996                let end_expr = end
1997                    .as_ref()
1998                    .map(|e| self.convert_expression(e))
1999                    .transpose()?
2000                    .map(Box::new);
2001                Ok(FilterExpression::SliceAccess {
2002                    base: Box::new(base_expr),
2003                    start: start_expr,
2004                    end: end_expr,
2005                })
2006            }
2007            LogicalExpression::Parameter(_) => Err(Error::Internal(
2008                "Parameters not yet supported in filters".to_string(),
2009            )),
2010            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2011            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2012            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2013            LogicalExpression::ListComprehension {
2014                variable,
2015                list_expr,
2016                filter_expr,
2017                map_expr,
2018            } => {
2019                let list = self.convert_expression(list_expr)?;
2020                let filter = filter_expr
2021                    .as_ref()
2022                    .map(|f| self.convert_expression(f))
2023                    .transpose()?
2024                    .map(Box::new);
2025                let map = self.convert_expression(map_expr)?;
2026                Ok(FilterExpression::ListComprehension {
2027                    variable: variable.clone(),
2028                    list_expr: Box::new(list),
2029                    filter_expr: filter,
2030                    map_expr: Box::new(map),
2031                })
2032            }
2033            LogicalExpression::ExistsSubquery(subplan) => {
2034                // Extract the pattern from the subplan
2035                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
2036                let (start_var, direction, edge_type, end_labels) =
2037                    self.extract_exists_pattern(subplan)?;
2038
2039                Ok(FilterExpression::ExistsSubquery {
2040                    start_var,
2041                    direction,
2042                    edge_type,
2043                    end_labels,
2044                    min_hops: None,
2045                    max_hops: None,
2046                })
2047            }
2048            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2049                "COUNT subqueries not yet supported".to_string(),
2050            )),
2051        }
2052    }
2053
2054    /// Extracts the pattern from an EXISTS subplan.
2055    /// Returns (start_variable, direction, edge_type, end_labels).
2056    fn extract_exists_pattern(
2057        &self,
2058        subplan: &LogicalOperator,
2059    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2060        match subplan {
2061            LogicalOperator::Expand(expand) => {
2062                // Get end node labels from the to_variable if there's a node scan input
2063                let end_labels = self.extract_end_labels_from_expand(expand);
2064                let direction = match expand.direction {
2065                    ExpandDirection::Outgoing => Direction::Outgoing,
2066                    ExpandDirection::Incoming => Direction::Incoming,
2067                    ExpandDirection::Both => Direction::Both,
2068                };
2069                Ok((
2070                    expand.from_variable.clone(),
2071                    direction,
2072                    expand.edge_type.clone(),
2073                    end_labels,
2074                ))
2075            }
2076            LogicalOperator::NodeScan(scan) => {
2077                if let Some(input) = &scan.input {
2078                    self.extract_exists_pattern(input)
2079                } else {
2080                    Err(Error::Internal(
2081                        "EXISTS subquery must contain an edge pattern".to_string(),
2082                    ))
2083                }
2084            }
2085            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2086            _ => Err(Error::Internal(
2087                "Unsupported EXISTS subquery pattern".to_string(),
2088            )),
2089        }
2090    }
2091
2092    /// Extracts end node labels from an Expand operator if present.
2093    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2094        // Check if the expand has a NodeScan input with a label filter
2095        match expand.input.as_ref() {
2096            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2097            _ => None,
2098        }
2099    }
2100
2101    /// Plans a JOIN operator.
2102    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2103        let (left_op, left_columns) = self.plan_operator(&join.left)?;
2104        let (right_op, right_columns) = self.plan_operator(&join.right)?;
2105
2106        // Build combined output columns
2107        let mut columns = left_columns.clone();
2108        columns.extend(right_columns.clone());
2109
2110        // Convert join type
2111        let physical_join_type = match join.join_type {
2112            JoinType::Inner => PhysicalJoinType::Inner,
2113            JoinType::Left => PhysicalJoinType::Left,
2114            JoinType::Right => PhysicalJoinType::Right,
2115            JoinType::Full => PhysicalJoinType::Full,
2116            JoinType::Cross => PhysicalJoinType::Cross,
2117            JoinType::Semi => PhysicalJoinType::Semi,
2118            JoinType::Anti => PhysicalJoinType::Anti,
2119        };
2120
2121        // Build key columns from join conditions
2122        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2123            // Cross join - no keys
2124            (vec![], vec![])
2125        } else {
2126            join.conditions
2127                .iter()
2128                .filter_map(|cond| {
2129                    // Try to extract column indices from expressions
2130                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2131                    let right_idx = self
2132                        .expression_to_column(&cond.right, &right_columns)
2133                        .ok()?;
2134                    Some((left_idx, right_idx))
2135                })
2136                .unzip()
2137        };
2138
2139        let output_schema = self.derive_schema_from_columns(&columns);
2140
2141        // Check if we should use leapfrog join for cyclic patterns
2142        // Currently we use hash join by default; leapfrog is available but
2143        // requires explicit multi-way join detection which will be added
2144        // when we have proper cyclic pattern detection in the optimizer.
2145        // For now, LeapfrogJoinOperator is available for direct use.
2146        let _ = LeapfrogJoinOperator::new; // Suppress unused warning
2147
2148        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2149            left_op,
2150            right_op,
2151            probe_keys,
2152            build_keys,
2153            physical_join_type,
2154            output_schema,
2155        ));
2156
2157        Ok((operator, columns))
2158    }
2159
2160    /// Checks if a join pattern is cyclic (e.g., triangle, clique).
2161    ///
2162    /// A cyclic pattern occurs when join conditions reference variables
2163    /// that create a cycle in the join graph. For example, a triangle
2164    /// pattern (a)->(b)->(c)->(a) creates a cycle.
2165    ///
2166    /// Returns true if the join graph contains at least one cycle with 3+ nodes,
2167    /// indicating potential for worst-case optimal join (WCOJ) optimization.
2168    #[allow(dead_code)]
2169    fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2170        // Build adjacency list for join variables
2171        let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2172        let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2173
2174        // Collect edges from join conditions
2175        Self::collect_join_edges(
2176            &LogicalOperator::Join(join.clone()),
2177            &mut edges,
2178            &mut all_vars,
2179        );
2180
2181        // Need at least 3 variables to form a cycle
2182        if all_vars.len() < 3 {
2183            return false;
2184        }
2185
2186        // Detect cycle using DFS with coloring
2187        Self::has_cycle(&edges, &all_vars)
2188    }
2189
2190    /// Collects edges from join conditions into an adjacency list.
2191    fn collect_join_edges(
2192        op: &LogicalOperator,
2193        edges: &mut HashMap<String, Vec<String>>,
2194        vars: &mut std::collections::HashSet<String>,
2195    ) {
2196        match op {
2197            LogicalOperator::Join(join) => {
2198                // Process join conditions
2199                for cond in &join.conditions {
2200                    if let (Some(left_var), Some(right_var)) = (
2201                        Self::extract_join_variable(&cond.left),
2202                        Self::extract_join_variable(&cond.right),
2203                    ) {
2204                        if left_var != right_var {
2205                            vars.insert(left_var.clone());
2206                            vars.insert(right_var.clone());
2207
2208                            // Add bidirectional edge
2209                            edges
2210                                .entry(left_var.clone())
2211                                .or_default()
2212                                .push(right_var.clone());
2213                            edges.entry(right_var).or_default().push(left_var);
2214                        }
2215                    }
2216                }
2217
2218                // Recurse into children
2219                Self::collect_join_edges(&join.left, edges, vars);
2220                Self::collect_join_edges(&join.right, edges, vars);
2221            }
2222            LogicalOperator::Expand(expand) => {
2223                // Expand creates implicit join between from_variable and to_variable
2224                vars.insert(expand.from_variable.clone());
2225                vars.insert(expand.to_variable.clone());
2226
2227                edges
2228                    .entry(expand.from_variable.clone())
2229                    .or_default()
2230                    .push(expand.to_variable.clone());
2231                edges
2232                    .entry(expand.to_variable.clone())
2233                    .or_default()
2234                    .push(expand.from_variable.clone());
2235
2236                Self::collect_join_edges(&expand.input, edges, vars);
2237            }
2238            LogicalOperator::Filter(filter) => {
2239                Self::collect_join_edges(&filter.input, edges, vars);
2240            }
2241            LogicalOperator::NodeScan(scan) => {
2242                vars.insert(scan.variable.clone());
2243            }
2244            _ => {}
2245        }
2246    }
2247
2248    /// Extracts the variable name from a join expression.
2249    fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2250        match expr {
2251            LogicalExpression::Variable(v) => Some(v.clone()),
2252            LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2253            LogicalExpression::Id(v) => Some(v.clone()),
2254            _ => None,
2255        }
2256    }
2257
2258    /// Detects if the graph has a cycle using DFS coloring.
2259    ///
2260    /// Colors: 0 = white (unvisited), 1 = gray (in progress), 2 = black (done)
2261    fn has_cycle(
2262        edges: &HashMap<String, Vec<String>>,
2263        vars: &std::collections::HashSet<String>,
2264    ) -> bool {
2265        let mut color: HashMap<&String, u8> = HashMap::new();
2266
2267        for var in vars {
2268            color.insert(var, 0);
2269        }
2270
2271        for start in vars {
2272            if color[start] == 0 {
2273                if Self::dfs_cycle(start, None, edges, &mut color) {
2274                    return true;
2275                }
2276            }
2277        }
2278
2279        false
2280    }
2281
2282    /// DFS helper for cycle detection.
2283    fn dfs_cycle(
2284        node: &String,
2285        parent: Option<&String>,
2286        edges: &HashMap<String, Vec<String>>,
2287        color: &mut HashMap<&String, u8>,
2288    ) -> bool {
2289        *color.get_mut(node).unwrap() = 1; // Gray
2290
2291        if let Some(neighbors) = edges.get(node) {
2292            for neighbor in neighbors {
2293                // Skip the edge back to parent (undirected graph)
2294                if parent == Some(neighbor) {
2295                    continue;
2296                }
2297
2298                if let Some(&c) = color.get(neighbor) {
2299                    if c == 1 {
2300                        // Found a back edge - cycle detected
2301                        return true;
2302                    }
2303                    if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2304                        return true;
2305                    }
2306                }
2307            }
2308        }
2309
2310        *color.get_mut(node).unwrap() = 2; // Black
2311        false
2312    }
2313
2314    /// Counts the number of base relations in a logical operator tree.
2315    #[allow(dead_code)]
2316    fn count_relations(op: &LogicalOperator) -> usize {
2317        match op {
2318            LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2319            LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2320            LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2321            LogicalOperator::Join(j) => {
2322                Self::count_relations(&j.left) + Self::count_relations(&j.right)
2323            }
2324            _ => 0,
2325        }
2326    }
2327
2328    /// Extracts a column index from an expression.
2329    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2330        match expr {
2331            LogicalExpression::Variable(name) => columns
2332                .iter()
2333                .position(|c| c == name)
2334                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2335            _ => Err(Error::Internal(
2336                "Only variables supported in join conditions".to_string(),
2337            )),
2338        }
2339    }
2340
2341    /// Plans a UNION operator.
2342    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2343        if union.inputs.is_empty() {
2344            return Err(Error::Internal(
2345                "Union requires at least one input".to_string(),
2346            ));
2347        }
2348
2349        let mut inputs = Vec::with_capacity(union.inputs.len());
2350        let mut columns = Vec::new();
2351
2352        for (i, input) in union.inputs.iter().enumerate() {
2353            let (op, cols) = self.plan_operator(input)?;
2354            if i == 0 {
2355                columns = cols;
2356            }
2357            inputs.push(op);
2358        }
2359
2360        let output_schema = self.derive_schema_from_columns(&columns);
2361        let operator = Box::new(UnionOperator::new(inputs, output_schema));
2362
2363        Ok((operator, columns))
2364    }
2365
2366    /// Plans a DISTINCT operator.
2367    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2368        let (input_op, columns) = self.plan_operator(&distinct.input)?;
2369        let output_schema = self.derive_schema_from_columns(&columns);
2370        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2371        Ok((operator, columns))
2372    }
2373
2374    /// Plans a CREATE NODE operator.
2375    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2376        // Plan input if present
2377        let (input_op, mut columns) = if let Some(ref input) = create.input {
2378            let (op, cols) = self.plan_operator(input)?;
2379            (Some(op), cols)
2380        } else {
2381            (None, vec![])
2382        };
2383
2384        // Output column for the created node
2385        let output_column = columns.len();
2386        columns.push(create.variable.clone());
2387
2388        // Convert properties
2389        let properties: Vec<(String, PropertySource)> = create
2390            .properties
2391            .iter()
2392            .map(|(name, expr)| {
2393                let source = match expr {
2394                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2395                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2396                };
2397                (name.clone(), source)
2398            })
2399            .collect();
2400
2401        let output_schema = self.derive_schema_from_columns(&columns);
2402
2403        let operator = Box::new(
2404            CreateNodeOperator::new(
2405                Arc::clone(&self.store),
2406                input_op,
2407                create.labels.clone(),
2408                properties,
2409                output_schema,
2410                output_column,
2411            )
2412            .with_tx_context(self.viewing_epoch, self.tx_id),
2413        );
2414
2415        Ok((operator, columns))
2416    }
2417
2418    /// Plans a CREATE EDGE operator.
2419    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2420        let (input_op, mut columns) = self.plan_operator(&create.input)?;
2421
2422        // Find source and target columns
2423        let from_column = columns
2424            .iter()
2425            .position(|c| c == &create.from_variable)
2426            .ok_or_else(|| {
2427                Error::Internal(format!(
2428                    "Source variable '{}' not found",
2429                    create.from_variable
2430                ))
2431            })?;
2432
2433        let to_column = columns
2434            .iter()
2435            .position(|c| c == &create.to_variable)
2436            .ok_or_else(|| {
2437                Error::Internal(format!(
2438                    "Target variable '{}' not found",
2439                    create.to_variable
2440                ))
2441            })?;
2442
2443        // Output column for the created edge (if named)
2444        let output_column = create.variable.as_ref().map(|v| {
2445            let idx = columns.len();
2446            columns.push(v.clone());
2447            idx
2448        });
2449
2450        // Convert properties
2451        let properties: Vec<(String, PropertySource)> = create
2452            .properties
2453            .iter()
2454            .map(|(name, expr)| {
2455                let source = match expr {
2456                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2457                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2458                };
2459                (name.clone(), source)
2460            })
2461            .collect();
2462
2463        let output_schema = self.derive_schema_from_columns(&columns);
2464
2465        let operator = Box::new(
2466            CreateEdgeOperator::new(
2467                Arc::clone(&self.store),
2468                input_op,
2469                from_column,
2470                to_column,
2471                create.edge_type.clone(),
2472                properties,
2473                output_schema,
2474                output_column,
2475            )
2476            .with_tx_context(self.viewing_epoch, self.tx_id),
2477        );
2478
2479        Ok((operator, columns))
2480    }
2481
2482    /// Plans a DELETE NODE operator.
2483    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2484        let (input_op, columns) = self.plan_operator(&delete.input)?;
2485
2486        let node_column = columns
2487            .iter()
2488            .position(|c| c == &delete.variable)
2489            .ok_or_else(|| {
2490                Error::Internal(format!(
2491                    "Variable '{}' not found for delete",
2492                    delete.variable
2493                ))
2494            })?;
2495
2496        // Output schema for delete count
2497        let output_schema = vec![LogicalType::Int64];
2498        let output_columns = vec!["deleted_count".to_string()];
2499
2500        let operator = Box::new(
2501            DeleteNodeOperator::new(
2502                Arc::clone(&self.store),
2503                input_op,
2504                node_column,
2505                output_schema,
2506                delete.detach, // DETACH DELETE deletes connected edges first
2507            )
2508            .with_tx_context(self.viewing_epoch, self.tx_id),
2509        );
2510
2511        Ok((operator, output_columns))
2512    }
2513
2514    /// Plans a DELETE EDGE operator.
2515    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2516        let (input_op, columns) = self.plan_operator(&delete.input)?;
2517
2518        let edge_column = columns
2519            .iter()
2520            .position(|c| c == &delete.variable)
2521            .ok_or_else(|| {
2522                Error::Internal(format!(
2523                    "Variable '{}' not found for delete",
2524                    delete.variable
2525                ))
2526            })?;
2527
2528        // Output schema for delete count
2529        let output_schema = vec![LogicalType::Int64];
2530        let output_columns = vec!["deleted_count".to_string()];
2531
2532        let operator = Box::new(
2533            DeleteEdgeOperator::new(
2534                Arc::clone(&self.store),
2535                input_op,
2536                edge_column,
2537                output_schema,
2538            )
2539            .with_tx_context(self.viewing_epoch, self.tx_id),
2540        );
2541
2542        Ok((operator, output_columns))
2543    }
2544
2545    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
2546    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2547        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2548        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2549
2550        // Build combined output columns (left + right)
2551        let mut columns = left_columns.clone();
2552        columns.extend(right_columns.clone());
2553
2554        // Find common variables between left and right for join keys
2555        let mut probe_keys = Vec::new();
2556        let mut build_keys = Vec::new();
2557
2558        for (right_idx, right_col) in right_columns.iter().enumerate() {
2559            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2560                probe_keys.push(left_idx);
2561                build_keys.push(right_idx);
2562            }
2563        }
2564
2565        let output_schema = self.derive_schema_from_columns(&columns);
2566
2567        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2568            left_op,
2569            right_op,
2570            probe_keys,
2571            build_keys,
2572            PhysicalJoinType::Left,
2573            output_schema,
2574        ));
2575
2576        Ok((operator, columns))
2577    }
2578
2579    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
2580    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2581        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2582        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2583
2584        // Anti-join only keeps left columns (filters out matching rows)
2585        let columns = left_columns.clone();
2586
2587        // Find common variables between left and right for join keys
2588        let mut probe_keys = Vec::new();
2589        let mut build_keys = Vec::new();
2590
2591        for (right_idx, right_col) in right_columns.iter().enumerate() {
2592            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2593                probe_keys.push(left_idx);
2594                build_keys.push(right_idx);
2595            }
2596        }
2597
2598        let output_schema = self.derive_schema_from_columns(&columns);
2599
2600        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2601            left_op,
2602            right_op,
2603            probe_keys,
2604            build_keys,
2605            PhysicalJoinType::Anti,
2606            output_schema,
2607        ));
2608
2609        Ok((operator, columns))
2610    }
2611
2612    /// Plans an unwind operator.
2613    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2614        // Plan the input operator first
2615        // Handle Empty specially - use a single-row operator
2616        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2617            if matches!(&*unwind.input, LogicalOperator::Empty) {
2618                // For UNWIND without prior MATCH, create a single-row input
2619                // We need an operator that produces one row with the list to unwind
2620                // For now, use EmptyScan which produces no rows - we'll handle the literal
2621                // list in the unwind operator itself
2622                let literal_list = self.convert_expression(&unwind.expression)?;
2623
2624                // Create a project operator that produces a single row with the list
2625                let single_row_op: Box<dyn Operator> = Box::new(
2626                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2627                );
2628                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2629                    single_row_op,
2630                    vec![ProjectExpr::Expression {
2631                        expr: literal_list,
2632                        variable_columns: HashMap::new(),
2633                    }],
2634                    vec![LogicalType::Any],
2635                    Arc::clone(&self.store),
2636                ));
2637
2638                (project_op, vec!["__list__".to_string()])
2639            } else {
2640                self.plan_operator(&unwind.input)?
2641            };
2642
2643        // The UNWIND expression should be a list - we need to find/evaluate it
2644        // For now, we handle the case where the expression references an existing column
2645        // or is a literal list
2646
2647        // Find if the expression references an existing column (like a list property)
2648        let list_col_idx = match &unwind.expression {
2649            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2650            LogicalExpression::Property { variable, .. } => {
2651                // Property access needs to be evaluated - for now we'll need the filter predicate
2652                // to evaluate this. For simple cases, we treat it as a list column.
2653                input_columns.iter().position(|c| c == variable)
2654            }
2655            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2656                // Literal list expression - we'll add it as a virtual column
2657                None
2658            }
2659            _ => None,
2660        };
2661
2662        // Build output columns: all input columns plus the new variable
2663        let mut columns = input_columns.clone();
2664        columns.push(unwind.variable.clone());
2665
2666        // Build output schema
2667        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2668        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2669
2670        // Use the list column index if found, otherwise default to 0
2671        // (in which case the first column should contain the list)
2672        let col_idx = list_col_idx.unwrap_or(0);
2673
2674        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2675            input_op,
2676            col_idx,
2677            unwind.variable.clone(),
2678            output_schema,
2679        ));
2680
2681        Ok((operator, columns))
2682    }
2683
2684    /// Plans a MERGE operator.
2685    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2686        // Plan the input operator if present (skip if Empty)
2687        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2688            Vec::new()
2689        } else {
2690            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2691            cols
2692        };
2693
2694        // Convert match properties from LogicalExpression to Value
2695        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2696            .match_properties
2697            .iter()
2698            .filter_map(|(name, expr)| {
2699                if let LogicalExpression::Literal(v) = expr {
2700                    Some((name.clone(), v.clone()))
2701                } else {
2702                    None // Skip non-literal expressions for now
2703                }
2704            })
2705            .collect();
2706
2707        // Convert ON CREATE properties
2708        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2709            .on_create
2710            .iter()
2711            .filter_map(|(name, expr)| {
2712                if let LogicalExpression::Literal(v) = expr {
2713                    Some((name.clone(), v.clone()))
2714                } else {
2715                    None
2716                }
2717            })
2718            .collect();
2719
2720        // Convert ON MATCH properties
2721        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2722            .on_match
2723            .iter()
2724            .filter_map(|(name, expr)| {
2725                if let LogicalExpression::Literal(v) = expr {
2726                    Some((name.clone(), v.clone()))
2727                } else {
2728                    None
2729                }
2730            })
2731            .collect();
2732
2733        // Add the merged node variable to output columns
2734        columns.push(merge.variable.clone());
2735
2736        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2737            Arc::clone(&self.store),
2738            merge.variable.clone(),
2739            merge.labels.clone(),
2740            match_properties,
2741            on_create_properties,
2742            on_match_properties,
2743        ));
2744
2745        Ok((operator, columns))
2746    }
2747
2748    /// Plans a SHORTEST PATH operator.
2749    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2750        // Plan the input operator
2751        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2752
2753        // Find source and target node columns
2754        let source_column = columns
2755            .iter()
2756            .position(|c| c == &sp.source_var)
2757            .ok_or_else(|| {
2758                Error::Internal(format!(
2759                    "Source variable '{}' not found for shortestPath",
2760                    sp.source_var
2761                ))
2762            })?;
2763
2764        let target_column = columns
2765            .iter()
2766            .position(|c| c == &sp.target_var)
2767            .ok_or_else(|| {
2768                Error::Internal(format!(
2769                    "Target variable '{}' not found for shortestPath",
2770                    sp.target_var
2771                ))
2772            })?;
2773
2774        // Convert direction
2775        let direction = match sp.direction {
2776            ExpandDirection::Outgoing => Direction::Outgoing,
2777            ExpandDirection::Incoming => Direction::Incoming,
2778            ExpandDirection::Both => Direction::Both,
2779        };
2780
2781        // Create the shortest path operator
2782        let operator: Box<dyn Operator> = Box::new(
2783            ShortestPathOperator::new(
2784                Arc::clone(&self.store),
2785                input_op,
2786                source_column,
2787                target_column,
2788                sp.edge_type.clone(),
2789                direction,
2790            )
2791            .with_all_paths(sp.all_paths),
2792        );
2793
2794        // Add path length column with the expected naming convention
2795        // The translator expects _path_length_{alias} format for length(p) calls
2796        columns.push(format!("_path_length_{}", sp.path_alias));
2797
2798        Ok((operator, columns))
2799    }
2800
2801    /// Plans an ADD LABEL operator.
2802    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2803        let (input_op, columns) = self.plan_operator(&add_label.input)?;
2804
2805        // Find the node column
2806        let node_column = columns
2807            .iter()
2808            .position(|c| c == &add_label.variable)
2809            .ok_or_else(|| {
2810                Error::Internal(format!(
2811                    "Variable '{}' not found for ADD LABEL",
2812                    add_label.variable
2813                ))
2814            })?;
2815
2816        // Output schema for update count
2817        let output_schema = vec![LogicalType::Int64];
2818        let output_columns = vec!["labels_added".to_string()];
2819
2820        let operator = Box::new(AddLabelOperator::new(
2821            Arc::clone(&self.store),
2822            input_op,
2823            node_column,
2824            add_label.labels.clone(),
2825            output_schema,
2826        ));
2827
2828        Ok((operator, output_columns))
2829    }
2830
2831    /// Plans a REMOVE LABEL operator.
2832    fn plan_remove_label(
2833        &self,
2834        remove_label: &RemoveLabelOp,
2835    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2836        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2837
2838        // Find the node column
2839        let node_column = columns
2840            .iter()
2841            .position(|c| c == &remove_label.variable)
2842            .ok_or_else(|| {
2843                Error::Internal(format!(
2844                    "Variable '{}' not found for REMOVE LABEL",
2845                    remove_label.variable
2846                ))
2847            })?;
2848
2849        // Output schema for update count
2850        let output_schema = vec![LogicalType::Int64];
2851        let output_columns = vec!["labels_removed".to_string()];
2852
2853        let operator = Box::new(RemoveLabelOperator::new(
2854            Arc::clone(&self.store),
2855            input_op,
2856            node_column,
2857            remove_label.labels.clone(),
2858            output_schema,
2859        ));
2860
2861        Ok((operator, output_columns))
2862    }
2863
2864    /// Plans a SET PROPERTY operator.
2865    fn plan_set_property(
2866        &self,
2867        set_prop: &SetPropertyOp,
2868    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2869        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2870
2871        // Find the entity column (node or edge variable)
2872        let entity_column = columns
2873            .iter()
2874            .position(|c| c == &set_prop.variable)
2875            .ok_or_else(|| {
2876                Error::Internal(format!(
2877                    "Variable '{}' not found for SET",
2878                    set_prop.variable
2879                ))
2880            })?;
2881
2882        // Convert properties to PropertySource
2883        let properties: Vec<(String, PropertySource)> = set_prop
2884            .properties
2885            .iter()
2886            .map(|(name, expr)| {
2887                let source = self.expression_to_property_source(expr, &columns)?;
2888                Ok((name.clone(), source))
2889            })
2890            .collect::<Result<Vec<_>>>()?;
2891
2892        // Output schema preserves input schema (passes through)
2893        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2894        let output_columns = columns.clone();
2895
2896        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
2897        let operator = Box::new(SetPropertyOperator::new_for_node(
2898            Arc::clone(&self.store),
2899            input_op,
2900            entity_column,
2901            properties,
2902            output_schema,
2903        ));
2904
2905        Ok((operator, output_columns))
2906    }
2907
2908    /// Converts a logical expression to a PropertySource.
2909    fn expression_to_property_source(
2910        &self,
2911        expr: &LogicalExpression,
2912        columns: &[String],
2913    ) -> Result<PropertySource> {
2914        match expr {
2915            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2916            LogicalExpression::Variable(name) => {
2917                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2918                    Error::Internal(format!("Variable '{}' not found for property source", name))
2919                })?;
2920                Ok(PropertySource::Column(col_idx))
2921            }
2922            LogicalExpression::Parameter(name) => {
2923                // Parameters should be resolved before planning
2924                // For now, treat as a placeholder
2925                Ok(PropertySource::Constant(
2926                    grafeo_common::types::Value::String(format!("${}", name).into()),
2927                ))
2928            }
2929            _ => Err(Error::Internal(format!(
2930                "Unsupported expression type for property source: {:?}",
2931                expr
2932            ))),
2933        }
2934    }
2935}
2936
2937/// Converts a logical binary operator to a filter binary operator.
2938pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2939    match op {
2940        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2941        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2942        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2943        BinaryOp::Le => Ok(BinaryFilterOp::Le),
2944        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2945        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2946        BinaryOp::And => Ok(BinaryFilterOp::And),
2947        BinaryOp::Or => Ok(BinaryFilterOp::Or),
2948        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2949        BinaryOp::Add => Ok(BinaryFilterOp::Add),
2950        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2951        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2952        BinaryOp::Div => Ok(BinaryFilterOp::Div),
2953        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2954        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2955        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2956        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2957        BinaryOp::In => Ok(BinaryFilterOp::In),
2958        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2959        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2960        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2961            "Binary operator {:?} not yet supported in filters",
2962            op
2963        ))),
2964    }
2965}
2966
2967/// Converts a logical unary operator to a filter unary operator.
2968pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2969    match op {
2970        UnaryOp::Not => Ok(UnaryFilterOp::Not),
2971        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2972        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2973        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2974    }
2975}
2976
2977/// Converts a logical aggregate function to a physical aggregate function.
2978pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2979    match func {
2980        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2981        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2982        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2983        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2984        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2985        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2986        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2987        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2988        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2989        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2990        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2991    }
2992}
2993
2994/// Converts a logical expression to a filter expression.
2995///
2996/// This is a standalone function that can be used by both LPG and RDF planners.
2997pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2998    match expr {
2999        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3000        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3001        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3002            variable: variable.clone(),
3003            property: property.clone(),
3004        }),
3005        LogicalExpression::Binary { left, op, right } => {
3006            let left_expr = convert_filter_expression(left)?;
3007            let right_expr = convert_filter_expression(right)?;
3008            let filter_op = convert_binary_op(*op)?;
3009            Ok(FilterExpression::Binary {
3010                left: Box::new(left_expr),
3011                op: filter_op,
3012                right: Box::new(right_expr),
3013            })
3014        }
3015        LogicalExpression::Unary { op, operand } => {
3016            let operand_expr = convert_filter_expression(operand)?;
3017            let filter_op = convert_unary_op(*op)?;
3018            Ok(FilterExpression::Unary {
3019                op: filter_op,
3020                operand: Box::new(operand_expr),
3021            })
3022        }
3023        LogicalExpression::FunctionCall { name, args, .. } => {
3024            let filter_args: Vec<FilterExpression> = args
3025                .iter()
3026                .map(|a| convert_filter_expression(a))
3027                .collect::<Result<Vec<_>>>()?;
3028            Ok(FilterExpression::FunctionCall {
3029                name: name.clone(),
3030                args: filter_args,
3031            })
3032        }
3033        LogicalExpression::Case {
3034            operand,
3035            when_clauses,
3036            else_clause,
3037        } => {
3038            let filter_operand = operand
3039                .as_ref()
3040                .map(|e| convert_filter_expression(e))
3041                .transpose()?
3042                .map(Box::new);
3043            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3044                .iter()
3045                .map(|(cond, result)| {
3046                    Ok((
3047                        convert_filter_expression(cond)?,
3048                        convert_filter_expression(result)?,
3049                    ))
3050                })
3051                .collect::<Result<Vec<_>>>()?;
3052            let filter_else = else_clause
3053                .as_ref()
3054                .map(|e| convert_filter_expression(e))
3055                .transpose()?
3056                .map(Box::new);
3057            Ok(FilterExpression::Case {
3058                operand: filter_operand,
3059                when_clauses: filter_when_clauses,
3060                else_clause: filter_else,
3061            })
3062        }
3063        LogicalExpression::List(items) => {
3064            let filter_items: Vec<FilterExpression> = items
3065                .iter()
3066                .map(|item| convert_filter_expression(item))
3067                .collect::<Result<Vec<_>>>()?;
3068            Ok(FilterExpression::List(filter_items))
3069        }
3070        LogicalExpression::Map(pairs) => {
3071            let filter_pairs: Vec<(String, FilterExpression)> = pairs
3072                .iter()
3073                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3074                .collect::<Result<Vec<_>>>()?;
3075            Ok(FilterExpression::Map(filter_pairs))
3076        }
3077        LogicalExpression::IndexAccess { base, index } => {
3078            let base_expr = convert_filter_expression(base)?;
3079            let index_expr = convert_filter_expression(index)?;
3080            Ok(FilterExpression::IndexAccess {
3081                base: Box::new(base_expr),
3082                index: Box::new(index_expr),
3083            })
3084        }
3085        LogicalExpression::SliceAccess { base, start, end } => {
3086            let base_expr = convert_filter_expression(base)?;
3087            let start_expr = start
3088                .as_ref()
3089                .map(|s| convert_filter_expression(s))
3090                .transpose()?
3091                .map(Box::new);
3092            let end_expr = end
3093                .as_ref()
3094                .map(|e| convert_filter_expression(e))
3095                .transpose()?
3096                .map(Box::new);
3097            Ok(FilterExpression::SliceAccess {
3098                base: Box::new(base_expr),
3099                start: start_expr,
3100                end: end_expr,
3101            })
3102        }
3103        LogicalExpression::Parameter(_) => Err(Error::Internal(
3104            "Parameters not yet supported in filters".to_string(),
3105        )),
3106        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3107        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3108        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3109        LogicalExpression::ListComprehension {
3110            variable,
3111            list_expr,
3112            filter_expr,
3113            map_expr,
3114        } => {
3115            let list = convert_filter_expression(list_expr)?;
3116            let filter = filter_expr
3117                .as_ref()
3118                .map(|f| convert_filter_expression(f))
3119                .transpose()?
3120                .map(Box::new);
3121            let map = convert_filter_expression(map_expr)?;
3122            Ok(FilterExpression::ListComprehension {
3123                variable: variable.clone(),
3124                list_expr: Box::new(list),
3125                filter_expr: filter,
3126                map_expr: Box::new(map),
3127            })
3128        }
3129        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3130            Error::Internal("Subqueries not yet supported in filters".to_string()),
3131        ),
3132    }
3133}
3134
3135/// Infers the logical type from a value.
3136fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3137    use grafeo_common::types::Value;
3138    match value {
3139        Value::Null => LogicalType::String, // Default type for null
3140        Value::Bool(_) => LogicalType::Bool,
3141        Value::Int64(_) => LogicalType::Int64,
3142        Value::Float64(_) => LogicalType::Float64,
3143        Value::String(_) => LogicalType::String,
3144        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
3145        Value::Timestamp(_) => LogicalType::Timestamp,
3146        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
3147        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
3148    }
3149}
3150
3151/// Converts an expression to a string for column naming.
3152fn expression_to_string(expr: &LogicalExpression) -> String {
3153    match expr {
3154        LogicalExpression::Variable(name) => name.clone(),
3155        LogicalExpression::Property { variable, property } => {
3156            format!("{variable}.{property}")
3157        }
3158        LogicalExpression::Literal(value) => format!("{value:?}"),
3159        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3160        _ => "expr".to_string(),
3161    }
3162}
3163
3164/// A physical plan ready for execution.
3165pub struct PhysicalPlan {
3166    /// The root physical operator.
3167    pub operator: Box<dyn Operator>,
3168    /// Column names for the result.
3169    pub columns: Vec<String>,
3170    /// Adaptive execution context with cardinality estimates.
3171    ///
3172    /// When adaptive execution is enabled, this context contains estimated
3173    /// cardinalities at various checkpoints in the plan. During execution,
3174    /// actual row counts are recorded and compared against estimates.
3175    pub adaptive_context: Option<AdaptiveContext>,
3176}
3177
3178impl PhysicalPlan {
3179    /// Returns the column names.
3180    #[must_use]
3181    pub fn columns(&self) -> &[String] {
3182        &self.columns
3183    }
3184
3185    /// Consumes the plan and returns the operator.
3186    pub fn into_operator(self) -> Box<dyn Operator> {
3187        self.operator
3188    }
3189
3190    /// Returns the adaptive context, if adaptive execution is enabled.
3191    #[must_use]
3192    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3193        self.adaptive_context.as_ref()
3194    }
3195
3196    /// Takes ownership of the adaptive context.
3197    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3198        self.adaptive_context.take()
3199    }
3200}
3201
3202/// Helper operator that returns a single result chunk once.
3203///
3204/// Used by the factorized expand chain to wrap the final result.
3205#[allow(dead_code)]
3206struct SingleResultOperator {
3207    result: Option<grafeo_core::execution::DataChunk>,
3208}
3209
3210impl SingleResultOperator {
3211    #[allow(dead_code)]
3212    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3213        Self { result }
3214    }
3215}
3216
3217impl Operator for SingleResultOperator {
3218    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3219        Ok(self.result.take())
3220    }
3221
3222    fn reset(&mut self) {
3223        // Cannot reset - result is consumed
3224    }
3225
3226    fn name(&self) -> &'static str {
3227        "SingleResult"
3228    }
3229}
3230
3231#[cfg(test)]
3232mod tests {
3233    use super::*;
3234    use crate::query::plan::{
3235        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3236        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3237        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3238        SortKey, SortOp,
3239    };
3240    use grafeo_common::types::Value;
3241
3242    fn create_test_store() -> Arc<LpgStore> {
3243        let store = Arc::new(LpgStore::new());
3244        store.create_node(&["Person"]);
3245        store.create_node(&["Person"]);
3246        store.create_node(&["Company"]);
3247        store
3248    }
3249
3250    // ==================== Simple Scan Tests ====================
3251
3252    #[test]
3253    fn test_plan_simple_scan() {
3254        let store = create_test_store();
3255        let planner = Planner::new(store);
3256
3257        // MATCH (n:Person) RETURN n
3258        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3259            items: vec![ReturnItem {
3260                expression: LogicalExpression::Variable("n".to_string()),
3261                alias: None,
3262            }],
3263            distinct: false,
3264            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3265                variable: "n".to_string(),
3266                label: Some("Person".to_string()),
3267                input: None,
3268            })),
3269        }));
3270
3271        let physical = planner.plan(&logical).unwrap();
3272        assert_eq!(physical.columns(), &["n"]);
3273    }
3274
3275    #[test]
3276    fn test_plan_scan_without_label() {
3277        let store = create_test_store();
3278        let planner = Planner::new(store);
3279
3280        // MATCH (n) RETURN n
3281        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3282            items: vec![ReturnItem {
3283                expression: LogicalExpression::Variable("n".to_string()),
3284                alias: None,
3285            }],
3286            distinct: false,
3287            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3288                variable: "n".to_string(),
3289                label: None,
3290                input: None,
3291            })),
3292        }));
3293
3294        let physical = planner.plan(&logical).unwrap();
3295        assert_eq!(physical.columns(), &["n"]);
3296    }
3297
3298    #[test]
3299    fn test_plan_return_with_alias() {
3300        let store = create_test_store();
3301        let planner = Planner::new(store);
3302
3303        // MATCH (n:Person) RETURN n AS person
3304        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3305            items: vec![ReturnItem {
3306                expression: LogicalExpression::Variable("n".to_string()),
3307                alias: Some("person".to_string()),
3308            }],
3309            distinct: false,
3310            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3311                variable: "n".to_string(),
3312                label: Some("Person".to_string()),
3313                input: None,
3314            })),
3315        }));
3316
3317        let physical = planner.plan(&logical).unwrap();
3318        assert_eq!(physical.columns(), &["person"]);
3319    }
3320
3321    #[test]
3322    fn test_plan_return_property() {
3323        let store = create_test_store();
3324        let planner = Planner::new(store);
3325
3326        // MATCH (n:Person) RETURN n.name
3327        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3328            items: vec![ReturnItem {
3329                expression: LogicalExpression::Property {
3330                    variable: "n".to_string(),
3331                    property: "name".to_string(),
3332                },
3333                alias: None,
3334            }],
3335            distinct: false,
3336            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3337                variable: "n".to_string(),
3338                label: Some("Person".to_string()),
3339                input: None,
3340            })),
3341        }));
3342
3343        let physical = planner.plan(&logical).unwrap();
3344        assert_eq!(physical.columns(), &["n.name"]);
3345    }
3346
3347    #[test]
3348    fn test_plan_return_literal() {
3349        let store = create_test_store();
3350        let planner = Planner::new(store);
3351
3352        // MATCH (n) RETURN 42 AS answer
3353        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3354            items: vec![ReturnItem {
3355                expression: LogicalExpression::Literal(Value::Int64(42)),
3356                alias: Some("answer".to_string()),
3357            }],
3358            distinct: false,
3359            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3360                variable: "n".to_string(),
3361                label: None,
3362                input: None,
3363            })),
3364        }));
3365
3366        let physical = planner.plan(&logical).unwrap();
3367        assert_eq!(physical.columns(), &["answer"]);
3368    }
3369
3370    // ==================== Filter Tests ====================
3371
3372    #[test]
3373    fn test_plan_filter_equality() {
3374        let store = create_test_store();
3375        let planner = Planner::new(store);
3376
3377        // MATCH (n:Person) WHERE n.age = 30 RETURN n
3378        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3379            items: vec![ReturnItem {
3380                expression: LogicalExpression::Variable("n".to_string()),
3381                alias: None,
3382            }],
3383            distinct: false,
3384            input: Box::new(LogicalOperator::Filter(FilterOp {
3385                predicate: LogicalExpression::Binary {
3386                    left: Box::new(LogicalExpression::Property {
3387                        variable: "n".to_string(),
3388                        property: "age".to_string(),
3389                    }),
3390                    op: BinaryOp::Eq,
3391                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3392                },
3393                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3394                    variable: "n".to_string(),
3395                    label: Some("Person".to_string()),
3396                    input: None,
3397                })),
3398            })),
3399        }));
3400
3401        let physical = planner.plan(&logical).unwrap();
3402        assert_eq!(physical.columns(), &["n"]);
3403    }
3404
3405    #[test]
3406    fn test_plan_filter_compound_and() {
3407        let store = create_test_store();
3408        let planner = Planner::new(store);
3409
3410        // WHERE n.age > 20 AND n.age < 40
3411        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3412            items: vec![ReturnItem {
3413                expression: LogicalExpression::Variable("n".to_string()),
3414                alias: None,
3415            }],
3416            distinct: false,
3417            input: Box::new(LogicalOperator::Filter(FilterOp {
3418                predicate: LogicalExpression::Binary {
3419                    left: Box::new(LogicalExpression::Binary {
3420                        left: Box::new(LogicalExpression::Property {
3421                            variable: "n".to_string(),
3422                            property: "age".to_string(),
3423                        }),
3424                        op: BinaryOp::Gt,
3425                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3426                    }),
3427                    op: BinaryOp::And,
3428                    right: Box::new(LogicalExpression::Binary {
3429                        left: Box::new(LogicalExpression::Property {
3430                            variable: "n".to_string(),
3431                            property: "age".to_string(),
3432                        }),
3433                        op: BinaryOp::Lt,
3434                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3435                    }),
3436                },
3437                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3438                    variable: "n".to_string(),
3439                    label: None,
3440                    input: None,
3441                })),
3442            })),
3443        }));
3444
3445        let physical = planner.plan(&logical).unwrap();
3446        assert_eq!(physical.columns(), &["n"]);
3447    }
3448
3449    #[test]
3450    fn test_plan_filter_unary_not() {
3451        let store = create_test_store();
3452        let planner = Planner::new(store);
3453
3454        // WHERE NOT n.active
3455        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3456            items: vec![ReturnItem {
3457                expression: LogicalExpression::Variable("n".to_string()),
3458                alias: None,
3459            }],
3460            distinct: false,
3461            input: Box::new(LogicalOperator::Filter(FilterOp {
3462                predicate: LogicalExpression::Unary {
3463                    op: UnaryOp::Not,
3464                    operand: Box::new(LogicalExpression::Property {
3465                        variable: "n".to_string(),
3466                        property: "active".to_string(),
3467                    }),
3468                },
3469                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3470                    variable: "n".to_string(),
3471                    label: None,
3472                    input: None,
3473                })),
3474            })),
3475        }));
3476
3477        let physical = planner.plan(&logical).unwrap();
3478        assert_eq!(physical.columns(), &["n"]);
3479    }
3480
3481    #[test]
3482    fn test_plan_filter_is_null() {
3483        let store = create_test_store();
3484        let planner = Planner::new(store);
3485
3486        // WHERE n.email IS NULL
3487        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3488            items: vec![ReturnItem {
3489                expression: LogicalExpression::Variable("n".to_string()),
3490                alias: None,
3491            }],
3492            distinct: false,
3493            input: Box::new(LogicalOperator::Filter(FilterOp {
3494                predicate: LogicalExpression::Unary {
3495                    op: UnaryOp::IsNull,
3496                    operand: Box::new(LogicalExpression::Property {
3497                        variable: "n".to_string(),
3498                        property: "email".to_string(),
3499                    }),
3500                },
3501                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3502                    variable: "n".to_string(),
3503                    label: None,
3504                    input: None,
3505                })),
3506            })),
3507        }));
3508
3509        let physical = planner.plan(&logical).unwrap();
3510        assert_eq!(physical.columns(), &["n"]);
3511    }
3512
3513    #[test]
3514    fn test_plan_filter_function_call() {
3515        let store = create_test_store();
3516        let planner = Planner::new(store);
3517
3518        // WHERE size(n.friends) > 0
3519        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3520            items: vec![ReturnItem {
3521                expression: LogicalExpression::Variable("n".to_string()),
3522                alias: None,
3523            }],
3524            distinct: false,
3525            input: Box::new(LogicalOperator::Filter(FilterOp {
3526                predicate: LogicalExpression::Binary {
3527                    left: Box::new(LogicalExpression::FunctionCall {
3528                        name: "size".to_string(),
3529                        args: vec![LogicalExpression::Property {
3530                            variable: "n".to_string(),
3531                            property: "friends".to_string(),
3532                        }],
3533                        distinct: false,
3534                    }),
3535                    op: BinaryOp::Gt,
3536                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3537                },
3538                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3539                    variable: "n".to_string(),
3540                    label: None,
3541                    input: None,
3542                })),
3543            })),
3544        }));
3545
3546        let physical = planner.plan(&logical).unwrap();
3547        assert_eq!(physical.columns(), &["n"]);
3548    }
3549
3550    // ==================== Expand Tests ====================
3551
3552    #[test]
3553    fn test_plan_expand_outgoing() {
3554        let store = create_test_store();
3555        let planner = Planner::new(store);
3556
3557        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
3558        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3559            items: vec![
3560                ReturnItem {
3561                    expression: LogicalExpression::Variable("a".to_string()),
3562                    alias: None,
3563                },
3564                ReturnItem {
3565                    expression: LogicalExpression::Variable("b".to_string()),
3566                    alias: None,
3567                },
3568            ],
3569            distinct: false,
3570            input: Box::new(LogicalOperator::Expand(ExpandOp {
3571                from_variable: "a".to_string(),
3572                to_variable: "b".to_string(),
3573                edge_variable: None,
3574                direction: ExpandDirection::Outgoing,
3575                edge_type: Some("KNOWS".to_string()),
3576                min_hops: 1,
3577                max_hops: Some(1),
3578                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3579                    variable: "a".to_string(),
3580                    label: Some("Person".to_string()),
3581                    input: None,
3582                })),
3583                path_alias: None,
3584            })),
3585        }));
3586
3587        let physical = planner.plan(&logical).unwrap();
3588        // The return should have columns [a, b]
3589        assert!(physical.columns().contains(&"a".to_string()));
3590        assert!(physical.columns().contains(&"b".to_string()));
3591    }
3592
3593    #[test]
3594    fn test_plan_expand_with_edge_variable() {
3595        let store = create_test_store();
3596        let planner = Planner::new(store);
3597
3598        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
3599        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3600            items: vec![
3601                ReturnItem {
3602                    expression: LogicalExpression::Variable("a".to_string()),
3603                    alias: None,
3604                },
3605                ReturnItem {
3606                    expression: LogicalExpression::Variable("r".to_string()),
3607                    alias: None,
3608                },
3609                ReturnItem {
3610                    expression: LogicalExpression::Variable("b".to_string()),
3611                    alias: None,
3612                },
3613            ],
3614            distinct: false,
3615            input: Box::new(LogicalOperator::Expand(ExpandOp {
3616                from_variable: "a".to_string(),
3617                to_variable: "b".to_string(),
3618                edge_variable: Some("r".to_string()),
3619                direction: ExpandDirection::Outgoing,
3620                edge_type: Some("KNOWS".to_string()),
3621                min_hops: 1,
3622                max_hops: Some(1),
3623                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3624                    variable: "a".to_string(),
3625                    label: None,
3626                    input: None,
3627                })),
3628                path_alias: None,
3629            })),
3630        }));
3631
3632        let physical = planner.plan(&logical).unwrap();
3633        assert!(physical.columns().contains(&"a".to_string()));
3634        assert!(physical.columns().contains(&"r".to_string()));
3635        assert!(physical.columns().contains(&"b".to_string()));
3636    }
3637
3638    // ==================== Limit/Skip/Sort Tests ====================
3639
3640    #[test]
3641    fn test_plan_limit() {
3642        let store = create_test_store();
3643        let planner = Planner::new(store);
3644
3645        // MATCH (n) RETURN n LIMIT 10
3646        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3647            items: vec![ReturnItem {
3648                expression: LogicalExpression::Variable("n".to_string()),
3649                alias: None,
3650            }],
3651            distinct: false,
3652            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3653                count: 10,
3654                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3655                    variable: "n".to_string(),
3656                    label: None,
3657                    input: None,
3658                })),
3659            })),
3660        }));
3661
3662        let physical = planner.plan(&logical).unwrap();
3663        assert_eq!(physical.columns(), &["n"]);
3664    }
3665
3666    #[test]
3667    fn test_plan_skip() {
3668        let store = create_test_store();
3669        let planner = Planner::new(store);
3670
3671        // MATCH (n) RETURN n SKIP 5
3672        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3673            items: vec![ReturnItem {
3674                expression: LogicalExpression::Variable("n".to_string()),
3675                alias: None,
3676            }],
3677            distinct: false,
3678            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3679                count: 5,
3680                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3681                    variable: "n".to_string(),
3682                    label: None,
3683                    input: None,
3684                })),
3685            })),
3686        }));
3687
3688        let physical = planner.plan(&logical).unwrap();
3689        assert_eq!(physical.columns(), &["n"]);
3690    }
3691
3692    #[test]
3693    fn test_plan_sort() {
3694        let store = create_test_store();
3695        let planner = Planner::new(store);
3696
3697        // MATCH (n) RETURN n ORDER BY n.name ASC
3698        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3699            items: vec![ReturnItem {
3700                expression: LogicalExpression::Variable("n".to_string()),
3701                alias: None,
3702            }],
3703            distinct: false,
3704            input: Box::new(LogicalOperator::Sort(SortOp {
3705                keys: vec![SortKey {
3706                    expression: LogicalExpression::Variable("n".to_string()),
3707                    order: SortOrder::Ascending,
3708                }],
3709                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3710                    variable: "n".to_string(),
3711                    label: None,
3712                    input: None,
3713                })),
3714            })),
3715        }));
3716
3717        let physical = planner.plan(&logical).unwrap();
3718        assert_eq!(physical.columns(), &["n"]);
3719    }
3720
3721    #[test]
3722    fn test_plan_sort_descending() {
3723        let store = create_test_store();
3724        let planner = Planner::new(store);
3725
3726        // ORDER BY n DESC
3727        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3728            items: vec![ReturnItem {
3729                expression: LogicalExpression::Variable("n".to_string()),
3730                alias: None,
3731            }],
3732            distinct: false,
3733            input: Box::new(LogicalOperator::Sort(SortOp {
3734                keys: vec![SortKey {
3735                    expression: LogicalExpression::Variable("n".to_string()),
3736                    order: SortOrder::Descending,
3737                }],
3738                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3739                    variable: "n".to_string(),
3740                    label: None,
3741                    input: None,
3742                })),
3743            })),
3744        }));
3745
3746        let physical = planner.plan(&logical).unwrap();
3747        assert_eq!(physical.columns(), &["n"]);
3748    }
3749
3750    #[test]
3751    fn test_plan_distinct() {
3752        let store = create_test_store();
3753        let planner = Planner::new(store);
3754
3755        // MATCH (n) RETURN DISTINCT n
3756        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3757            items: vec![ReturnItem {
3758                expression: LogicalExpression::Variable("n".to_string()),
3759                alias: None,
3760            }],
3761            distinct: false,
3762            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3763                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3764                    variable: "n".to_string(),
3765                    label: None,
3766                    input: None,
3767                })),
3768                columns: None,
3769            })),
3770        }));
3771
3772        let physical = planner.plan(&logical).unwrap();
3773        assert_eq!(physical.columns(), &["n"]);
3774    }
3775
3776    // ==================== Aggregate Tests ====================
3777
3778    #[test]
3779    fn test_plan_aggregate_count() {
3780        let store = create_test_store();
3781        let planner = Planner::new(store);
3782
3783        // MATCH (n) RETURN count(n)
3784        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3785            items: vec![ReturnItem {
3786                expression: LogicalExpression::Variable("cnt".to_string()),
3787                alias: None,
3788            }],
3789            distinct: false,
3790            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3791                group_by: vec![],
3792                aggregates: vec![LogicalAggregateExpr {
3793                    function: LogicalAggregateFunction::Count,
3794                    expression: Some(LogicalExpression::Variable("n".to_string())),
3795                    distinct: false,
3796                    alias: Some("cnt".to_string()),
3797                    percentile: None,
3798                }],
3799                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3800                    variable: "n".to_string(),
3801                    label: None,
3802                    input: None,
3803                })),
3804                having: None,
3805            })),
3806        }));
3807
3808        let physical = planner.plan(&logical).unwrap();
3809        assert!(physical.columns().contains(&"cnt".to_string()));
3810    }
3811
3812    #[test]
3813    fn test_plan_aggregate_with_group_by() {
3814        let store = create_test_store();
3815        let planner = Planner::new(store);
3816
3817        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
3818        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3819            group_by: vec![LogicalExpression::Property {
3820                variable: "n".to_string(),
3821                property: "city".to_string(),
3822            }],
3823            aggregates: vec![LogicalAggregateExpr {
3824                function: LogicalAggregateFunction::Count,
3825                expression: Some(LogicalExpression::Variable("n".to_string())),
3826                distinct: false,
3827                alias: Some("cnt".to_string()),
3828                percentile: None,
3829            }],
3830            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3831                variable: "n".to_string(),
3832                label: Some("Person".to_string()),
3833                input: None,
3834            })),
3835            having: None,
3836        }));
3837
3838        let physical = planner.plan(&logical).unwrap();
3839        assert_eq!(physical.columns().len(), 2);
3840    }
3841
3842    #[test]
3843    fn test_plan_aggregate_sum() {
3844        let store = create_test_store();
3845        let planner = Planner::new(store);
3846
3847        // SUM(n.value)
3848        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3849            group_by: vec![],
3850            aggregates: vec![LogicalAggregateExpr {
3851                function: LogicalAggregateFunction::Sum,
3852                expression: Some(LogicalExpression::Property {
3853                    variable: "n".to_string(),
3854                    property: "value".to_string(),
3855                }),
3856                distinct: false,
3857                alias: Some("total".to_string()),
3858                percentile: None,
3859            }],
3860            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3861                variable: "n".to_string(),
3862                label: None,
3863                input: None,
3864            })),
3865            having: None,
3866        }));
3867
3868        let physical = planner.plan(&logical).unwrap();
3869        assert!(physical.columns().contains(&"total".to_string()));
3870    }
3871
3872    #[test]
3873    fn test_plan_aggregate_avg() {
3874        let store = create_test_store();
3875        let planner = Planner::new(store);
3876
3877        // AVG(n.score)
3878        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3879            group_by: vec![],
3880            aggregates: vec![LogicalAggregateExpr {
3881                function: LogicalAggregateFunction::Avg,
3882                expression: Some(LogicalExpression::Property {
3883                    variable: "n".to_string(),
3884                    property: "score".to_string(),
3885                }),
3886                distinct: false,
3887                alias: Some("average".to_string()),
3888                percentile: None,
3889            }],
3890            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3891                variable: "n".to_string(),
3892                label: None,
3893                input: None,
3894            })),
3895            having: None,
3896        }));
3897
3898        let physical = planner.plan(&logical).unwrap();
3899        assert!(physical.columns().contains(&"average".to_string()));
3900    }
3901
3902    #[test]
3903    fn test_plan_aggregate_min_max() {
3904        let store = create_test_store();
3905        let planner = Planner::new(store);
3906
3907        // MIN(n.age), MAX(n.age)
3908        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3909            group_by: vec![],
3910            aggregates: vec![
3911                LogicalAggregateExpr {
3912                    function: LogicalAggregateFunction::Min,
3913                    expression: Some(LogicalExpression::Property {
3914                        variable: "n".to_string(),
3915                        property: "age".to_string(),
3916                    }),
3917                    distinct: false,
3918                    alias: Some("youngest".to_string()),
3919                    percentile: None,
3920                },
3921                LogicalAggregateExpr {
3922                    function: LogicalAggregateFunction::Max,
3923                    expression: Some(LogicalExpression::Property {
3924                        variable: "n".to_string(),
3925                        property: "age".to_string(),
3926                    }),
3927                    distinct: false,
3928                    alias: Some("oldest".to_string()),
3929                    percentile: None,
3930                },
3931            ],
3932            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3933                variable: "n".to_string(),
3934                label: None,
3935                input: None,
3936            })),
3937            having: None,
3938        }));
3939
3940        let physical = planner.plan(&logical).unwrap();
3941        assert!(physical.columns().contains(&"youngest".to_string()));
3942        assert!(physical.columns().contains(&"oldest".to_string()));
3943    }
3944
3945    // ==================== Join Tests ====================
3946
3947    #[test]
3948    fn test_plan_inner_join() {
3949        let store = create_test_store();
3950        let planner = Planner::new(store);
3951
3952        // Inner join between two scans
3953        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3954            items: vec![
3955                ReturnItem {
3956                    expression: LogicalExpression::Variable("a".to_string()),
3957                    alias: None,
3958                },
3959                ReturnItem {
3960                    expression: LogicalExpression::Variable("b".to_string()),
3961                    alias: None,
3962                },
3963            ],
3964            distinct: false,
3965            input: Box::new(LogicalOperator::Join(JoinOp {
3966                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3967                    variable: "a".to_string(),
3968                    label: Some("Person".to_string()),
3969                    input: None,
3970                })),
3971                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3972                    variable: "b".to_string(),
3973                    label: Some("Company".to_string()),
3974                    input: None,
3975                })),
3976                join_type: JoinType::Inner,
3977                conditions: vec![JoinCondition {
3978                    left: LogicalExpression::Variable("a".to_string()),
3979                    right: LogicalExpression::Variable("b".to_string()),
3980                }],
3981            })),
3982        }));
3983
3984        let physical = planner.plan(&logical).unwrap();
3985        assert!(physical.columns().contains(&"a".to_string()));
3986        assert!(physical.columns().contains(&"b".to_string()));
3987    }
3988
3989    #[test]
3990    fn test_plan_cross_join() {
3991        let store = create_test_store();
3992        let planner = Planner::new(store);
3993
3994        // Cross join (no conditions)
3995        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3996            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3997                variable: "a".to_string(),
3998                label: None,
3999                input: None,
4000            })),
4001            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4002                variable: "b".to_string(),
4003                label: None,
4004                input: None,
4005            })),
4006            join_type: JoinType::Cross,
4007            conditions: vec![],
4008        }));
4009
4010        let physical = planner.plan(&logical).unwrap();
4011        assert_eq!(physical.columns().len(), 2);
4012    }
4013
4014    #[test]
4015    fn test_plan_left_join() {
4016        let store = create_test_store();
4017        let planner = Planner::new(store);
4018
4019        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4020            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4021                variable: "a".to_string(),
4022                label: None,
4023                input: None,
4024            })),
4025            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4026                variable: "b".to_string(),
4027                label: None,
4028                input: None,
4029            })),
4030            join_type: JoinType::Left,
4031            conditions: vec![],
4032        }));
4033
4034        let physical = planner.plan(&logical).unwrap();
4035        assert_eq!(physical.columns().len(), 2);
4036    }
4037
4038    // ==================== Mutation Tests ====================
4039
4040    #[test]
4041    fn test_plan_create_node() {
4042        let store = create_test_store();
4043        let planner = Planner::new(store);
4044
4045        // CREATE (n:Person {name: 'Alice'})
4046        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4047            variable: "n".to_string(),
4048            labels: vec!["Person".to_string()],
4049            properties: vec![(
4050                "name".to_string(),
4051                LogicalExpression::Literal(Value::String("Alice".into())),
4052            )],
4053            input: None,
4054        }));
4055
4056        let physical = planner.plan(&logical).unwrap();
4057        assert!(physical.columns().contains(&"n".to_string()));
4058    }
4059
4060    #[test]
4061    fn test_plan_create_edge() {
4062        let store = create_test_store();
4063        let planner = Planner::new(store);
4064
4065        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
4066        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4067            variable: Some("r".to_string()),
4068            from_variable: "a".to_string(),
4069            to_variable: "b".to_string(),
4070            edge_type: "KNOWS".to_string(),
4071            properties: vec![],
4072            input: Box::new(LogicalOperator::Join(JoinOp {
4073                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4074                    variable: "a".to_string(),
4075                    label: None,
4076                    input: None,
4077                })),
4078                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4079                    variable: "b".to_string(),
4080                    label: None,
4081                    input: None,
4082                })),
4083                join_type: JoinType::Cross,
4084                conditions: vec![],
4085            })),
4086        }));
4087
4088        let physical = planner.plan(&logical).unwrap();
4089        assert!(physical.columns().contains(&"r".to_string()));
4090    }
4091
4092    #[test]
4093    fn test_plan_delete_node() {
4094        let store = create_test_store();
4095        let planner = Planner::new(store);
4096
4097        // MATCH (n) DELETE n
4098        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4099            variable: "n".to_string(),
4100            detach: false,
4101            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4102                variable: "n".to_string(),
4103                label: None,
4104                input: None,
4105            })),
4106        }));
4107
4108        let physical = planner.plan(&logical).unwrap();
4109        assert!(physical.columns().contains(&"deleted_count".to_string()));
4110    }
4111
4112    // ==================== Error Cases ====================
4113
4114    #[test]
4115    fn test_plan_empty_errors() {
4116        let store = create_test_store();
4117        let planner = Planner::new(store);
4118
4119        let logical = LogicalPlan::new(LogicalOperator::Empty);
4120        let result = planner.plan(&logical);
4121        assert!(result.is_err());
4122    }
4123
4124    #[test]
4125    fn test_plan_missing_variable_in_return() {
4126        let store = create_test_store();
4127        let planner = Planner::new(store);
4128
4129        // Return variable that doesn't exist in input
4130        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4131            items: vec![ReturnItem {
4132                expression: LogicalExpression::Variable("missing".to_string()),
4133                alias: None,
4134            }],
4135            distinct: false,
4136            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4137                variable: "n".to_string(),
4138                label: None,
4139                input: None,
4140            })),
4141        }));
4142
4143        let result = planner.plan(&logical);
4144        assert!(result.is_err());
4145    }
4146
4147    // ==================== Helper Function Tests ====================
4148
4149    #[test]
4150    fn test_convert_binary_ops() {
4151        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4152        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4153        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4154        assert!(convert_binary_op(BinaryOp::Le).is_ok());
4155        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4156        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4157        assert!(convert_binary_op(BinaryOp::And).is_ok());
4158        assert!(convert_binary_op(BinaryOp::Or).is_ok());
4159        assert!(convert_binary_op(BinaryOp::Add).is_ok());
4160        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4161        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4162        assert!(convert_binary_op(BinaryOp::Div).is_ok());
4163    }
4164
4165    #[test]
4166    fn test_convert_unary_ops() {
4167        assert!(convert_unary_op(UnaryOp::Not).is_ok());
4168        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4169        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4170        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4171    }
4172
4173    #[test]
4174    fn test_convert_aggregate_functions() {
4175        assert!(matches!(
4176            convert_aggregate_function(LogicalAggregateFunction::Count),
4177            PhysicalAggregateFunction::Count
4178        ));
4179        assert!(matches!(
4180            convert_aggregate_function(LogicalAggregateFunction::Sum),
4181            PhysicalAggregateFunction::Sum
4182        ));
4183        assert!(matches!(
4184            convert_aggregate_function(LogicalAggregateFunction::Avg),
4185            PhysicalAggregateFunction::Avg
4186        ));
4187        assert!(matches!(
4188            convert_aggregate_function(LogicalAggregateFunction::Min),
4189            PhysicalAggregateFunction::Min
4190        ));
4191        assert!(matches!(
4192            convert_aggregate_function(LogicalAggregateFunction::Max),
4193            PhysicalAggregateFunction::Max
4194        ));
4195    }
4196
4197    #[test]
4198    fn test_planner_accessors() {
4199        let store = create_test_store();
4200        let planner = Planner::new(Arc::clone(&store));
4201
4202        assert!(planner.tx_id().is_none());
4203        assert!(planner.tx_manager().is_none());
4204        let _ = planner.viewing_epoch(); // Just ensure it's accessible
4205    }
4206
4207    #[test]
4208    fn test_physical_plan_accessors() {
4209        let store = create_test_store();
4210        let planner = Planner::new(store);
4211
4212        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4213            variable: "n".to_string(),
4214            label: None,
4215            input: None,
4216        }));
4217
4218        let physical = planner.plan(&logical).unwrap();
4219        assert_eq!(physical.columns(), &["n"]);
4220
4221        // Test into_operator
4222        let _ = physical.into_operator();
4223    }
4224
4225    // ==================== Adaptive Planning Tests ====================
4226
4227    #[test]
4228    fn test_plan_adaptive_with_scan() {
4229        let store = create_test_store();
4230        let planner = Planner::new(store);
4231
4232        // MATCH (n:Person) RETURN n
4233        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4234            items: vec![ReturnItem {
4235                expression: LogicalExpression::Variable("n".to_string()),
4236                alias: None,
4237            }],
4238            distinct: false,
4239            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4240                variable: "n".to_string(),
4241                label: Some("Person".to_string()),
4242                input: None,
4243            })),
4244        }));
4245
4246        let physical = planner.plan_adaptive(&logical).unwrap();
4247        assert_eq!(physical.columns(), &["n"]);
4248        // Should have adaptive context with estimates
4249        assert!(physical.adaptive_context.is_some());
4250    }
4251
4252    #[test]
4253    fn test_plan_adaptive_with_filter() {
4254        let store = create_test_store();
4255        let planner = Planner::new(store);
4256
4257        // MATCH (n) WHERE n.age > 30 RETURN n
4258        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4259            items: vec![ReturnItem {
4260                expression: LogicalExpression::Variable("n".to_string()),
4261                alias: None,
4262            }],
4263            distinct: false,
4264            input: Box::new(LogicalOperator::Filter(FilterOp {
4265                predicate: LogicalExpression::Binary {
4266                    left: Box::new(LogicalExpression::Property {
4267                        variable: "n".to_string(),
4268                        property: "age".to_string(),
4269                    }),
4270                    op: BinaryOp::Gt,
4271                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4272                },
4273                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4274                    variable: "n".to_string(),
4275                    label: None,
4276                    input: None,
4277                })),
4278            })),
4279        }));
4280
4281        let physical = planner.plan_adaptive(&logical).unwrap();
4282        assert!(physical.adaptive_context.is_some());
4283    }
4284
4285    #[test]
4286    fn test_plan_adaptive_with_expand() {
4287        let store = create_test_store();
4288        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4289
4290        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
4291        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4292            items: vec![
4293                ReturnItem {
4294                    expression: LogicalExpression::Variable("a".to_string()),
4295                    alias: None,
4296                },
4297                ReturnItem {
4298                    expression: LogicalExpression::Variable("b".to_string()),
4299                    alias: None,
4300                },
4301            ],
4302            distinct: false,
4303            input: Box::new(LogicalOperator::Expand(ExpandOp {
4304                from_variable: "a".to_string(),
4305                to_variable: "b".to_string(),
4306                edge_variable: None,
4307                direction: ExpandDirection::Outgoing,
4308                edge_type: Some("KNOWS".to_string()),
4309                min_hops: 1,
4310                max_hops: Some(1),
4311                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4312                    variable: "a".to_string(),
4313                    label: None,
4314                    input: None,
4315                })),
4316                path_alias: None,
4317            })),
4318        }));
4319
4320        let physical = planner.plan_adaptive(&logical).unwrap();
4321        assert!(physical.adaptive_context.is_some());
4322    }
4323
4324    #[test]
4325    fn test_plan_adaptive_with_join() {
4326        let store = create_test_store();
4327        let planner = Planner::new(store);
4328
4329        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4330            items: vec![
4331                ReturnItem {
4332                    expression: LogicalExpression::Variable("a".to_string()),
4333                    alias: None,
4334                },
4335                ReturnItem {
4336                    expression: LogicalExpression::Variable("b".to_string()),
4337                    alias: None,
4338                },
4339            ],
4340            distinct: false,
4341            input: Box::new(LogicalOperator::Join(JoinOp {
4342                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4343                    variable: "a".to_string(),
4344                    label: None,
4345                    input: None,
4346                })),
4347                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4348                    variable: "b".to_string(),
4349                    label: None,
4350                    input: None,
4351                })),
4352                join_type: JoinType::Cross,
4353                conditions: vec![],
4354            })),
4355        }));
4356
4357        let physical = planner.plan_adaptive(&logical).unwrap();
4358        assert!(physical.adaptive_context.is_some());
4359    }
4360
4361    #[test]
4362    fn test_plan_adaptive_with_aggregate() {
4363        let store = create_test_store();
4364        let planner = Planner::new(store);
4365
4366        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4367            group_by: vec![],
4368            aggregates: vec![LogicalAggregateExpr {
4369                function: LogicalAggregateFunction::Count,
4370                expression: Some(LogicalExpression::Variable("n".to_string())),
4371                distinct: false,
4372                alias: Some("cnt".to_string()),
4373                percentile: None,
4374            }],
4375            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4376                variable: "n".to_string(),
4377                label: None,
4378                input: None,
4379            })),
4380            having: None,
4381        }));
4382
4383        let physical = planner.plan_adaptive(&logical).unwrap();
4384        assert!(physical.adaptive_context.is_some());
4385    }
4386
4387    #[test]
4388    fn test_plan_adaptive_with_distinct() {
4389        let store = create_test_store();
4390        let planner = Planner::new(store);
4391
4392        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4393            items: vec![ReturnItem {
4394                expression: LogicalExpression::Variable("n".to_string()),
4395                alias: None,
4396            }],
4397            distinct: false,
4398            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4399                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4400                    variable: "n".to_string(),
4401                    label: None,
4402                    input: None,
4403                })),
4404                columns: None,
4405            })),
4406        }));
4407
4408        let physical = planner.plan_adaptive(&logical).unwrap();
4409        assert!(physical.adaptive_context.is_some());
4410    }
4411
4412    #[test]
4413    fn test_plan_adaptive_with_limit() {
4414        let store = create_test_store();
4415        let planner = Planner::new(store);
4416
4417        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4418            items: vec![ReturnItem {
4419                expression: LogicalExpression::Variable("n".to_string()),
4420                alias: None,
4421            }],
4422            distinct: false,
4423            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4424                count: 10,
4425                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4426                    variable: "n".to_string(),
4427                    label: None,
4428                    input: None,
4429                })),
4430            })),
4431        }));
4432
4433        let physical = planner.plan_adaptive(&logical).unwrap();
4434        assert!(physical.adaptive_context.is_some());
4435    }
4436
4437    #[test]
4438    fn test_plan_adaptive_with_skip() {
4439        let store = create_test_store();
4440        let planner = Planner::new(store);
4441
4442        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4443            items: vec![ReturnItem {
4444                expression: LogicalExpression::Variable("n".to_string()),
4445                alias: None,
4446            }],
4447            distinct: false,
4448            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4449                count: 5,
4450                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4451                    variable: "n".to_string(),
4452                    label: None,
4453                    input: None,
4454                })),
4455            })),
4456        }));
4457
4458        let physical = planner.plan_adaptive(&logical).unwrap();
4459        assert!(physical.adaptive_context.is_some());
4460    }
4461
4462    #[test]
4463    fn test_plan_adaptive_with_sort() {
4464        let store = create_test_store();
4465        let planner = Planner::new(store);
4466
4467        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4468            items: vec![ReturnItem {
4469                expression: LogicalExpression::Variable("n".to_string()),
4470                alias: None,
4471            }],
4472            distinct: false,
4473            input: Box::new(LogicalOperator::Sort(SortOp {
4474                keys: vec![SortKey {
4475                    expression: LogicalExpression::Variable("n".to_string()),
4476                    order: SortOrder::Ascending,
4477                }],
4478                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4479                    variable: "n".to_string(),
4480                    label: None,
4481                    input: None,
4482                })),
4483            })),
4484        }));
4485
4486        let physical = planner.plan_adaptive(&logical).unwrap();
4487        assert!(physical.adaptive_context.is_some());
4488    }
4489
4490    #[test]
4491    fn test_plan_adaptive_with_union() {
4492        let store = create_test_store();
4493        let planner = Planner::new(store);
4494
4495        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4496            items: vec![ReturnItem {
4497                expression: LogicalExpression::Variable("n".to_string()),
4498                alias: None,
4499            }],
4500            distinct: false,
4501            input: Box::new(LogicalOperator::Union(UnionOp {
4502                inputs: vec![
4503                    LogicalOperator::NodeScan(NodeScanOp {
4504                        variable: "n".to_string(),
4505                        label: Some("Person".to_string()),
4506                        input: None,
4507                    }),
4508                    LogicalOperator::NodeScan(NodeScanOp {
4509                        variable: "n".to_string(),
4510                        label: Some("Company".to_string()),
4511                        input: None,
4512                    }),
4513                ],
4514            })),
4515        }));
4516
4517        let physical = planner.plan_adaptive(&logical).unwrap();
4518        assert!(physical.adaptive_context.is_some());
4519    }
4520
4521    // ==================== Variable Length Path Tests ====================
4522
4523    #[test]
4524    fn test_plan_expand_variable_length() {
4525        let store = create_test_store();
4526        let planner = Planner::new(store);
4527
4528        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
4529        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4530            items: vec![
4531                ReturnItem {
4532                    expression: LogicalExpression::Variable("a".to_string()),
4533                    alias: None,
4534                },
4535                ReturnItem {
4536                    expression: LogicalExpression::Variable("b".to_string()),
4537                    alias: None,
4538                },
4539            ],
4540            distinct: false,
4541            input: Box::new(LogicalOperator::Expand(ExpandOp {
4542                from_variable: "a".to_string(),
4543                to_variable: "b".to_string(),
4544                edge_variable: None,
4545                direction: ExpandDirection::Outgoing,
4546                edge_type: Some("KNOWS".to_string()),
4547                min_hops: 1,
4548                max_hops: Some(3),
4549                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4550                    variable: "a".to_string(),
4551                    label: None,
4552                    input: None,
4553                })),
4554                path_alias: None,
4555            })),
4556        }));
4557
4558        let physical = planner.plan(&logical).unwrap();
4559        assert!(physical.columns().contains(&"a".to_string()));
4560        assert!(physical.columns().contains(&"b".to_string()));
4561    }
4562
4563    #[test]
4564    fn test_plan_expand_with_path_alias() {
4565        let store = create_test_store();
4566        let planner = Planner::new(store);
4567
4568        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
4569        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4570            items: vec![
4571                ReturnItem {
4572                    expression: LogicalExpression::Variable("a".to_string()),
4573                    alias: None,
4574                },
4575                ReturnItem {
4576                    expression: LogicalExpression::Variable("b".to_string()),
4577                    alias: None,
4578                },
4579            ],
4580            distinct: false,
4581            input: Box::new(LogicalOperator::Expand(ExpandOp {
4582                from_variable: "a".to_string(),
4583                to_variable: "b".to_string(),
4584                edge_variable: None,
4585                direction: ExpandDirection::Outgoing,
4586                edge_type: Some("KNOWS".to_string()),
4587                min_hops: 1,
4588                max_hops: Some(3),
4589                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4590                    variable: "a".to_string(),
4591                    label: None,
4592                    input: None,
4593                })),
4594                path_alias: Some("p".to_string()),
4595            })),
4596        }));
4597
4598        let physical = planner.plan(&logical).unwrap();
4599        // Verify plan was created successfully with expected output columns
4600        assert!(physical.columns().contains(&"a".to_string()));
4601        assert!(physical.columns().contains(&"b".to_string()));
4602    }
4603
4604    #[test]
4605    fn test_plan_expand_incoming() {
4606        let store = create_test_store();
4607        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4608
4609        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
4610        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4611            items: vec![
4612                ReturnItem {
4613                    expression: LogicalExpression::Variable("a".to_string()),
4614                    alias: None,
4615                },
4616                ReturnItem {
4617                    expression: LogicalExpression::Variable("b".to_string()),
4618                    alias: None,
4619                },
4620            ],
4621            distinct: false,
4622            input: Box::new(LogicalOperator::Expand(ExpandOp {
4623                from_variable: "a".to_string(),
4624                to_variable: "b".to_string(),
4625                edge_variable: None,
4626                direction: ExpandDirection::Incoming,
4627                edge_type: Some("KNOWS".to_string()),
4628                min_hops: 1,
4629                max_hops: Some(1),
4630                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4631                    variable: "a".to_string(),
4632                    label: None,
4633                    input: None,
4634                })),
4635                path_alias: None,
4636            })),
4637        }));
4638
4639        let physical = planner.plan(&logical).unwrap();
4640        assert!(physical.columns().contains(&"a".to_string()));
4641        assert!(physical.columns().contains(&"b".to_string()));
4642    }
4643
4644    #[test]
4645    fn test_plan_expand_both_directions() {
4646        let store = create_test_store();
4647        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4648
4649        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
4650        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4651            items: vec![
4652                ReturnItem {
4653                    expression: LogicalExpression::Variable("a".to_string()),
4654                    alias: None,
4655                },
4656                ReturnItem {
4657                    expression: LogicalExpression::Variable("b".to_string()),
4658                    alias: None,
4659                },
4660            ],
4661            distinct: false,
4662            input: Box::new(LogicalOperator::Expand(ExpandOp {
4663                from_variable: "a".to_string(),
4664                to_variable: "b".to_string(),
4665                edge_variable: None,
4666                direction: ExpandDirection::Both,
4667                edge_type: Some("KNOWS".to_string()),
4668                min_hops: 1,
4669                max_hops: Some(1),
4670                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4671                    variable: "a".to_string(),
4672                    label: None,
4673                    input: None,
4674                })),
4675                path_alias: None,
4676            })),
4677        }));
4678
4679        let physical = planner.plan(&logical).unwrap();
4680        assert!(physical.columns().contains(&"a".to_string()));
4681        assert!(physical.columns().contains(&"b".to_string()));
4682    }
4683
4684    // ==================== With Context Tests ====================
4685
4686    #[test]
4687    fn test_planner_with_context() {
4688        use crate::transaction::TransactionManager;
4689
4690        let store = create_test_store();
4691        let tx_manager = Arc::new(TransactionManager::new());
4692        let tx_id = tx_manager.begin();
4693        let epoch = tx_manager.current_epoch();
4694
4695        let planner = Planner::with_context(
4696            Arc::clone(&store),
4697            Arc::clone(&tx_manager),
4698            Some(tx_id),
4699            epoch,
4700        );
4701
4702        assert_eq!(planner.tx_id(), Some(tx_id));
4703        assert!(planner.tx_manager().is_some());
4704        assert_eq!(planner.viewing_epoch(), epoch);
4705    }
4706
4707    #[test]
4708    fn test_planner_with_factorized_execution_disabled() {
4709        let store = create_test_store();
4710        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4711
4712        // Two consecutive expands - should NOT use factorized execution
4713        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4714            items: vec![
4715                ReturnItem {
4716                    expression: LogicalExpression::Variable("a".to_string()),
4717                    alias: None,
4718                },
4719                ReturnItem {
4720                    expression: LogicalExpression::Variable("c".to_string()),
4721                    alias: None,
4722                },
4723            ],
4724            distinct: false,
4725            input: Box::new(LogicalOperator::Expand(ExpandOp {
4726                from_variable: "b".to_string(),
4727                to_variable: "c".to_string(),
4728                edge_variable: None,
4729                direction: ExpandDirection::Outgoing,
4730                edge_type: None,
4731                min_hops: 1,
4732                max_hops: Some(1),
4733                input: Box::new(LogicalOperator::Expand(ExpandOp {
4734                    from_variable: "a".to_string(),
4735                    to_variable: "b".to_string(),
4736                    edge_variable: None,
4737                    direction: ExpandDirection::Outgoing,
4738                    edge_type: None,
4739                    min_hops: 1,
4740                    max_hops: Some(1),
4741                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4742                        variable: "a".to_string(),
4743                        label: None,
4744                        input: None,
4745                    })),
4746                    path_alias: None,
4747                })),
4748                path_alias: None,
4749            })),
4750        }));
4751
4752        let physical = planner.plan(&logical).unwrap();
4753        assert!(physical.columns().contains(&"a".to_string()));
4754        assert!(physical.columns().contains(&"c".to_string()));
4755    }
4756
4757    // ==================== Sort with Property Tests ====================
4758
4759    #[test]
4760    fn test_plan_sort_by_property() {
4761        let store = create_test_store();
4762        let planner = Planner::new(store);
4763
4764        // MATCH (n) RETURN n ORDER BY n.name ASC
4765        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4766            items: vec![ReturnItem {
4767                expression: LogicalExpression::Variable("n".to_string()),
4768                alias: None,
4769            }],
4770            distinct: false,
4771            input: Box::new(LogicalOperator::Sort(SortOp {
4772                keys: vec![SortKey {
4773                    expression: LogicalExpression::Property {
4774                        variable: "n".to_string(),
4775                        property: "name".to_string(),
4776                    },
4777                    order: SortOrder::Ascending,
4778                }],
4779                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4780                    variable: "n".to_string(),
4781                    label: None,
4782                    input: None,
4783                })),
4784            })),
4785        }));
4786
4787        let physical = planner.plan(&logical).unwrap();
4788        // Should have the property column projected
4789        assert!(physical.columns().contains(&"n".to_string()));
4790    }
4791
4792    // ==================== Scan with Input Tests ====================
4793
4794    #[test]
4795    fn test_plan_scan_with_input() {
4796        let store = create_test_store();
4797        let planner = Planner::new(store);
4798
4799        // A scan with another scan as input (for chained patterns)
4800        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4801            items: vec![
4802                ReturnItem {
4803                    expression: LogicalExpression::Variable("a".to_string()),
4804                    alias: None,
4805                },
4806                ReturnItem {
4807                    expression: LogicalExpression::Variable("b".to_string()),
4808                    alias: None,
4809                },
4810            ],
4811            distinct: false,
4812            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4813                variable: "b".to_string(),
4814                label: Some("Company".to_string()),
4815                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4816                    variable: "a".to_string(),
4817                    label: Some("Person".to_string()),
4818                    input: None,
4819                }))),
4820            })),
4821        }));
4822
4823        let physical = planner.plan(&logical).unwrap();
4824        assert!(physical.columns().contains(&"a".to_string()));
4825        assert!(physical.columns().contains(&"b".to_string()));
4826    }
4827}