Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29    UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37/// Range bounds for property-based range queries.
38struct RangeBounds<'a> {
39    min: Option<&'a Value>,
40    max: Option<&'a Value>,
41    min_inclusive: bool,
42    max_inclusive: bool,
43}
44
45/// Converts a logical plan to a physical operator tree.
46pub struct Planner {
47    /// The graph store to scan from.
48    store: Arc<LpgStore>,
49    /// Transaction manager for MVCC operations.
50    tx_manager: Option<Arc<TransactionManager>>,
51    /// Current transaction ID (if in a transaction).
52    tx_id: Option<TxId>,
53    /// Epoch to use for visibility checks.
54    viewing_epoch: EpochId,
55    /// Counter for generating unique anonymous edge column names.
56    anon_edge_counter: std::cell::Cell<u32>,
57    /// Whether to use factorized execution for multi-hop queries.
58    factorized_execution: bool,
59}
60
61impl Planner {
62    /// Creates a new planner with the given store.
63    ///
64    /// This creates a planner without transaction context, using the current
65    /// epoch from the store for visibility.
66    #[must_use]
67    pub fn new(store: Arc<LpgStore>) -> Self {
68        let epoch = store.current_epoch();
69        Self {
70            store,
71            tx_manager: None,
72            tx_id: None,
73            viewing_epoch: epoch,
74            anon_edge_counter: std::cell::Cell::new(0),
75            factorized_execution: true,
76        }
77    }
78
79    /// Creates a new planner with transaction context for MVCC-aware planning.
80    ///
81    /// # Arguments
82    ///
83    /// * `store` - The graph store
84    /// * `tx_manager` - Transaction manager for recording reads/writes
85    /// * `tx_id` - Current transaction ID (None for auto-commit)
86    /// * `viewing_epoch` - Epoch to use for version visibility
87    #[must_use]
88    pub fn with_context(
89        store: Arc<LpgStore>,
90        tx_manager: Arc<TransactionManager>,
91        tx_id: Option<TxId>,
92        viewing_epoch: EpochId,
93    ) -> Self {
94        Self {
95            store,
96            tx_manager: Some(tx_manager),
97            tx_id,
98            viewing_epoch,
99            anon_edge_counter: std::cell::Cell::new(0),
100            factorized_execution: true,
101        }
102    }
103
104    /// Returns the viewing epoch for this planner.
105    #[must_use]
106    pub fn viewing_epoch(&self) -> EpochId {
107        self.viewing_epoch
108    }
109
110    /// Returns the transaction ID for this planner, if any.
111    #[must_use]
112    pub fn tx_id(&self) -> Option<TxId> {
113        self.tx_id
114    }
115
116    /// Returns a reference to the transaction manager, if available.
117    #[must_use]
118    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119        self.tx_manager.as_ref()
120    }
121
122    /// Enables or disables factorized execution for multi-hop queries.
123    #[must_use]
124    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125        self.factorized_execution = enabled;
126        self
127    }
128
129    /// Counts consecutive single-hop expand operations.
130    ///
131    /// Returns the count and the deepest non-expand operator (the base of the chain).
132    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133        match op {
134            LogicalOperator::Expand(expand) => {
135                // Only count single-hop expands (factorization doesn't apply to variable-length)
136                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138                if is_single_hop {
139                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
140                    (inner_count + 1, base)
141                } else {
142                    // Variable-length path breaks the chain
143                    (0, op)
144                }
145            }
146            _ => (0, op),
147        }
148    }
149
150    /// Collects expand operations from the outermost down to the base.
151    ///
152    /// Returns expands in order from innermost (base) to outermost.
153    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154        let mut chain = Vec::new();
155        let mut current = op;
156
157        while let LogicalOperator::Expand(expand) = current {
158            // Only include single-hop expands
159            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160            if !is_single_hop {
161                break;
162            }
163            chain.push(expand);
164            current = &expand.input;
165        }
166
167        // Reverse so we go from base to outer
168        chain.reverse();
169        chain
170    }
171
172    /// Plans a logical plan into a physical operator.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if planning fails.
177    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179        Ok(PhysicalPlan {
180            operator,
181            columns,
182            adaptive_context: None,
183        })
184    }
185
186    /// Plans a logical plan with adaptive execution support.
187    ///
188    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
189    /// joins) that can be monitored during execution to detect estimate errors.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if planning fails.
194    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197        // Build adaptive context with cardinality estimates
198        let mut adaptive_context = AdaptiveContext::new();
199        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201        Ok(PhysicalPlan {
202            operator,
203            columns,
204            adaptive_context: Some(adaptive_context),
205        })
206    }
207
208    /// Collects cardinality estimates from the logical plan into an adaptive context.
209    fn collect_cardinality_estimates(
210        &self,
211        op: &LogicalOperator,
212        ctx: &mut AdaptiveContext,
213        depth: usize,
214    ) {
215        match op {
216            LogicalOperator::NodeScan(scan) => {
217                // Estimate based on label statistics
218                let estimate = if let Some(label) = &scan.label {
219                    self.store.nodes_by_label(label).len() as f64
220                } else {
221                    self.store.node_count() as f64
222                };
223                let id = format!("scan_{}", scan.variable);
224                ctx.set_estimate(&id, estimate);
225
226                // Recurse into input if present
227                if let Some(input) = &scan.input {
228                    self.collect_cardinality_estimates(input, ctx, depth + 1);
229                }
230            }
231            LogicalOperator::Filter(filter) => {
232                // Default selectivity estimate for filters (30%)
233                let input_estimate = self.estimate_cardinality(&filter.input);
234                let estimate = input_estimate * 0.3;
235                let id = format!("filter_{depth}");
236                ctx.set_estimate(&id, estimate);
237
238                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239            }
240            LogicalOperator::Expand(expand) => {
241                // Estimate based on average degree from store statistics
242                let input_estimate = self.estimate_cardinality(&expand.input);
243                let stats = self.store.statistics();
244                let avg_degree = self.estimate_expand_degree(&stats, expand);
245                let estimate = input_estimate * avg_degree;
246                let id = format!("expand_{}", expand.to_variable);
247                ctx.set_estimate(&id, estimate);
248
249                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250            }
251            LogicalOperator::Join(join) => {
252                // Estimate join output (product with selectivity)
253                let left_est = self.estimate_cardinality(&join.left);
254                let right_est = self.estimate_cardinality(&join.right);
255                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
256                let id = format!("join_{depth}");
257                ctx.set_estimate(&id, estimate);
258
259                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261            }
262            LogicalOperator::Aggregate(agg) => {
263                // Aggregates typically reduce cardinality
264                let input_estimate = self.estimate_cardinality(&agg.input);
265                let estimate = if agg.group_by.is_empty() {
266                    1.0 // Scalar aggregate
267                } else {
268                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
269                };
270                let id = format!("aggregate_{depth}");
271                ctx.set_estimate(&id, estimate);
272
273                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274            }
275            LogicalOperator::Distinct(distinct) => {
276                let input_estimate = self.estimate_cardinality(&distinct.input);
277                let estimate = (input_estimate * 0.5).max(1.0);
278                let id = format!("distinct_{depth}");
279                ctx.set_estimate(&id, estimate);
280
281                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282            }
283            LogicalOperator::Return(ret) => {
284                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285            }
286            LogicalOperator::Limit(limit) => {
287                let input_estimate = self.estimate_cardinality(&limit.input);
288                let estimate = (input_estimate).min(limit.count as f64);
289                let id = format!("limit_{depth}");
290                ctx.set_estimate(&id, estimate);
291
292                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293            }
294            LogicalOperator::Skip(skip) => {
295                let input_estimate = self.estimate_cardinality(&skip.input);
296                let estimate = (input_estimate - skip.count as f64).max(0.0);
297                let id = format!("skip_{depth}");
298                ctx.set_estimate(&id, estimate);
299
300                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301            }
302            LogicalOperator::Sort(sort) => {
303                // Sort doesn't change cardinality
304                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305            }
306            LogicalOperator::Union(union) => {
307                let estimate: f64 = union
308                    .inputs
309                    .iter()
310                    .map(|input| self.estimate_cardinality(input))
311                    .sum();
312                let id = format!("union_{depth}");
313                ctx.set_estimate(&id, estimate);
314
315                for input in &union.inputs {
316                    self.collect_cardinality_estimates(input, ctx, depth + 1);
317                }
318            }
319            _ => {
320                // For other operators, try to recurse into known input patterns
321            }
322        }
323    }
324
325    /// Estimates cardinality for a logical operator subtree.
326    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327        match op {
328            LogicalOperator::NodeScan(scan) => {
329                if let Some(label) = &scan.label {
330                    self.store.nodes_by_label(label).len() as f64
331                } else {
332                    self.store.node_count() as f64
333                }
334            }
335            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336            LogicalOperator::Expand(expand) => {
337                let stats = self.store.statistics();
338                let avg_degree = self.estimate_expand_degree(&stats, expand);
339                self.estimate_cardinality(&expand.input) * avg_degree
340            }
341            LogicalOperator::Join(join) => {
342                let left = self.estimate_cardinality(&join.left);
343                let right = self.estimate_cardinality(&join.right);
344                (left * right).sqrt()
345            }
346            LogicalOperator::Aggregate(agg) => {
347                if agg.group_by.is_empty() {
348                    1.0
349                } else {
350                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351                }
352            }
353            LogicalOperator::Distinct(distinct) => {
354                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355            }
356            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357            LogicalOperator::Limit(limit) => self
358                .estimate_cardinality(&limit.input)
359                .min(limit.count as f64),
360            LogicalOperator::Skip(skip) => {
361                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362            }
363            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364            LogicalOperator::Union(union) => union
365                .inputs
366                .iter()
367                .map(|input| self.estimate_cardinality(input))
368                .sum(),
369            _ => 1000.0, // Default estimate for unknown operators
370        }
371    }
372
373    /// Estimates the average edge degree for an expand operation using store statistics.
374    fn estimate_expand_degree(
375        &self,
376        stats: &grafeo_core::statistics::Statistics,
377        expand: &ExpandOp,
378    ) -> f64 {
379        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380        if let Some(edge_type) = &expand.edge_type {
381            stats.estimate_avg_degree(edge_type, outgoing)
382        } else if stats.total_nodes > 0 {
383            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384        } else {
385            10.0 // fallback for empty graph
386        }
387    }
388
389    /// Plans a single logical operator.
390    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391        match op {
392            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393            LogicalOperator::Expand(expand) => {
394                // Check for expand chains when factorized execution is enabled
395                if self.factorized_execution {
396                    let (chain_len, _base) = Self::count_expand_chain(op);
397                    if chain_len >= 2 {
398                        // Use factorized chain for 2+ consecutive single-hop expands
399                        return self.plan_expand_chain(op);
400                    }
401                }
402                self.plan_expand(expand)
403            }
404            LogicalOperator::Return(ret) => self.plan_return(ret),
405            LogicalOperator::Filter(filter) => self.plan_filter(filter),
406            LogicalOperator::Project(project) => self.plan_project(project),
407            LogicalOperator::Limit(limit) => self.plan_limit(limit),
408            LogicalOperator::Skip(skip) => self.plan_skip(skip),
409            LogicalOperator::Sort(sort) => self.plan_sort(sort),
410            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411            LogicalOperator::Join(join) => self.plan_join(join),
412            LogicalOperator::Union(union) => self.plan_union(union),
413            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421            LogicalOperator::Merge(merge) => self.plan_merge(merge),
422            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
427            LogicalOperator::VectorScan(_) => Err(Error::Internal(
428                "VectorScan requires vector-index feature".to_string(),
429            )),
430            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
431                "VectorJoin requires vector-index feature".to_string(),
432            )),
433            _ => Err(Error::Internal(format!(
434                "Unsupported operator: {:?}",
435                std::mem::discriminant(op)
436            ))),
437        }
438    }
439
440    /// Plans a node scan operator.
441    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
442        let scan_op = if let Some(label) = &scan.label {
443            ScanOperator::with_label(Arc::clone(&self.store), label)
444        } else {
445            ScanOperator::new(Arc::clone(&self.store))
446        };
447
448        // Apply MVCC context if available
449        let scan_operator: Box<dyn Operator> =
450            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
451
452        // If there's an input, chain operators with a nested loop join (cross join)
453        if let Some(input) = &scan.input {
454            let (input_op, mut input_columns) = self.plan_operator(input)?;
455
456            // Build output schema: input columns + scan column
457            let mut output_schema: Vec<LogicalType> =
458                input_columns.iter().map(|_| LogicalType::Any).collect();
459            output_schema.push(LogicalType::Node);
460
461            // Add scan column to input columns
462            input_columns.push(scan.variable.clone());
463
464            // Use nested loop join to combine input rows with scanned nodes
465            let join_op = Box::new(NestedLoopJoinOperator::new(
466                input_op,
467                scan_operator,
468                None, // No join condition (cross join)
469                PhysicalJoinType::Cross,
470                output_schema,
471            ));
472
473            Ok((join_op, input_columns))
474        } else {
475            let columns = vec![scan.variable.clone()];
476            Ok((scan_operator, columns))
477        }
478    }
479
480    /// Plans an expand operator.
481    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
482        // Plan the input operator first
483        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
484
485        // Find the source column index
486        let source_column = input_columns
487            .iter()
488            .position(|c| c == &expand.from_variable)
489            .ok_or_else(|| {
490                Error::Internal(format!(
491                    "Source variable '{}' not found in input columns",
492                    expand.from_variable
493                ))
494            })?;
495
496        // Convert expand direction
497        let direction = match expand.direction {
498            ExpandDirection::Outgoing => Direction::Outgoing,
499            ExpandDirection::Incoming => Direction::Incoming,
500            ExpandDirection::Both => Direction::Both,
501        };
502
503        // Check if this is a variable-length path
504        let is_variable_length =
505            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
506
507        let operator: Box<dyn Operator> = if is_variable_length {
508            // Use VariableLengthExpandOperator for multi-hop paths
509            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
510            let mut expand_op = VariableLengthExpandOperator::new(
511                Arc::clone(&self.store),
512                input_op,
513                source_column,
514                direction,
515                expand.edge_type.clone(),
516                expand.min_hops,
517                max_hops,
518            )
519            .with_tx_context(self.viewing_epoch, self.tx_id);
520
521            // If a path alias is set, enable path length output
522            if expand.path_alias.is_some() {
523                expand_op = expand_op.with_path_length_output();
524            }
525
526            Box::new(expand_op)
527        } else {
528            // Use simple ExpandOperator for single-hop paths
529            let expand_op = ExpandOperator::new(
530                Arc::clone(&self.store),
531                input_op,
532                source_column,
533                direction,
534                expand.edge_type.clone(),
535            )
536            .with_tx_context(self.viewing_epoch, self.tx_id);
537            Box::new(expand_op)
538        };
539
540        // Build output columns: [input_columns..., edge, target, (path_length)?]
541        // Preserve all input columns and add edge + target to match ExpandOperator output
542        let mut columns = input_columns;
543
544        // Generate edge column name - use provided name or generate anonymous name
545        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
546            let count = self.anon_edge_counter.get();
547            self.anon_edge_counter.set(count + 1);
548            format!("_anon_edge_{}", count)
549        });
550        columns.push(edge_col_name);
551
552        columns.push(expand.to_variable.clone());
553
554        // If a path alias is set, add a column for the path length
555        if let Some(ref path_alias) = expand.path_alias {
556            columns.push(format!("_path_length_{}", path_alias));
557        }
558
559        Ok((operator, columns))
560    }
561
562    /// Plans a chain of consecutive expand operations using factorized execution.
563    ///
564    /// This avoids the Cartesian product explosion that occurs with separate expands.
565    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
566    ///
567    /// The chain is executed lazily at query time, not during planning. This ensures
568    /// that any filters applied above the expand chain are properly respected.
569    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
570        let expands = Self::collect_expand_chain(op);
571        if expands.is_empty() {
572            return Err(Error::Internal("Empty expand chain".to_string()));
573        }
574
575        // Get the base operator (before first expand)
576        let first_expand = expands[0];
577        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
578
579        let mut columns = base_columns.clone();
580        let mut steps = Vec::new();
581
582        // Track the level-local source column for each expand
583        // For the first expand, it's the column in the input (base_columns)
584        // For subsequent expands, the target from the previous level is always at index 1
585        // (each level adds [edge, target], so target is at index 1)
586        let mut is_first = true;
587
588        for expand in &expands {
589            // Find source column for this expand
590            let source_column = if is_first {
591                // For first expand, find in base columns
592                base_columns
593                    .iter()
594                    .position(|c| c == &expand.from_variable)
595                    .ok_or_else(|| {
596                        Error::Internal(format!(
597                            "Source variable '{}' not found in base columns",
598                            expand.from_variable
599                        ))
600                    })?
601            } else {
602                // For subsequent expands, the target from the previous level is at index 1
603                // (each level adds [edge, target], so target is the second column)
604                1
605            };
606
607            // Convert direction
608            let direction = match expand.direction {
609                ExpandDirection::Outgoing => Direction::Outgoing,
610                ExpandDirection::Incoming => Direction::Incoming,
611                ExpandDirection::Both => Direction::Both,
612            };
613
614            // Add expand step configuration
615            steps.push(ExpandStep {
616                source_column,
617                direction,
618                edge_type: expand.edge_type.clone(),
619            });
620
621            // Add edge and target columns
622            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
623                let count = self.anon_edge_counter.get();
624                self.anon_edge_counter.set(count + 1);
625                format!("_anon_edge_{}", count)
626            });
627            columns.push(edge_col_name);
628            columns.push(expand.to_variable.clone());
629
630            is_first = false;
631        }
632
633        // Create lazy operator that executes at query time, not planning time
634        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
635
636        if let Some(tx_id) = self.tx_id {
637            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
638        } else {
639            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
640        }
641
642        Ok((Box::new(lazy_op), columns))
643    }
644
645    /// Plans a RETURN clause.
646    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
647        // Plan the input operator
648        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
649
650        // Build variable to column index mapping
651        let variable_columns: HashMap<String, usize> = input_columns
652            .iter()
653            .enumerate()
654            .map(|(i, name)| (name.clone(), i))
655            .collect();
656
657        // Extract column names from return items
658        let columns: Vec<String> = ret
659            .items
660            .iter()
661            .map(|item| {
662                item.alias.clone().unwrap_or_else(|| {
663                    // Generate a default name from the expression
664                    expression_to_string(&item.expression)
665                })
666            })
667            .collect();
668
669        // Check if we need a project operator (for property access or expression evaluation)
670        let needs_project = ret
671            .items
672            .iter()
673            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
674
675        if needs_project {
676            // Build project expressions
677            let mut projections = Vec::with_capacity(ret.items.len());
678            let mut output_types = Vec::with_capacity(ret.items.len());
679
680            for item in &ret.items {
681                match &item.expression {
682                    LogicalExpression::Variable(name) => {
683                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
684                            Error::Internal(format!("Variable '{}' not found in input", name))
685                        })?;
686                        projections.push(ProjectExpr::Column(col_idx));
687                        // Use Node type for variables (they could be nodes, edges, or values)
688                        output_types.push(LogicalType::Node);
689                    }
690                    LogicalExpression::Property { variable, property } => {
691                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
692                            Error::Internal(format!("Variable '{}' not found in input", variable))
693                        })?;
694                        projections.push(ProjectExpr::PropertyAccess {
695                            column: col_idx,
696                            property: property.clone(),
697                        });
698                        // Property could be any type - use Any/Generic to preserve type
699                        output_types.push(LogicalType::Any);
700                    }
701                    LogicalExpression::Literal(value) => {
702                        projections.push(ProjectExpr::Constant(value.clone()));
703                        output_types.push(value_to_logical_type(value));
704                    }
705                    LogicalExpression::FunctionCall { name, args, .. } => {
706                        // Handle built-in functions
707                        match name.to_lowercase().as_str() {
708                            "type" => {
709                                // type(r) returns the edge type string
710                                if args.len() != 1 {
711                                    return Err(Error::Internal(
712                                        "type() requires exactly one argument".to_string(),
713                                    ));
714                                }
715                                if let LogicalExpression::Variable(var_name) = &args[0] {
716                                    let col_idx =
717                                        *variable_columns.get(var_name).ok_or_else(|| {
718                                            Error::Internal(format!(
719                                                "Variable '{}' not found in input",
720                                                var_name
721                                            ))
722                                        })?;
723                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
724                                    output_types.push(LogicalType::String);
725                                } else {
726                                    return Err(Error::Internal(
727                                        "type() argument must be a variable".to_string(),
728                                    ));
729                                }
730                            }
731                            "length" => {
732                                // length(p) returns the path length
733                                // For shortestPath results, the path column already contains the length
734                                if args.len() != 1 {
735                                    return Err(Error::Internal(
736                                        "length() requires exactly one argument".to_string(),
737                                    ));
738                                }
739                                if let LogicalExpression::Variable(var_name) = &args[0] {
740                                    let col_idx =
741                                        *variable_columns.get(var_name).ok_or_else(|| {
742                                            Error::Internal(format!(
743                                                "Variable '{}' not found in input",
744                                                var_name
745                                            ))
746                                        })?;
747                                    // Pass through the column value directly
748                                    projections.push(ProjectExpr::Column(col_idx));
749                                    output_types.push(LogicalType::Int64);
750                                } else {
751                                    return Err(Error::Internal(
752                                        "length() argument must be a variable".to_string(),
753                                    ));
754                                }
755                            }
756                            // For other functions (head, tail, size, etc.), use expression evaluation
757                            _ => {
758                                let filter_expr = self.convert_expression(&item.expression)?;
759                                projections.push(ProjectExpr::Expression {
760                                    expr: filter_expr,
761                                    variable_columns: variable_columns.clone(),
762                                });
763                                output_types.push(LogicalType::Any);
764                            }
765                        }
766                    }
767                    LogicalExpression::Case { .. } => {
768                        // Convert CASE expression to FilterExpression for evaluation
769                        let filter_expr = self.convert_expression(&item.expression)?;
770                        projections.push(ProjectExpr::Expression {
771                            expr: filter_expr,
772                            variable_columns: variable_columns.clone(),
773                        });
774                        // CASE can return any type - use Any
775                        output_types.push(LogicalType::Any);
776                    }
777                    _ => {
778                        return Err(Error::Internal(format!(
779                            "Unsupported RETURN expression: {:?}",
780                            item.expression
781                        )));
782                    }
783                }
784            }
785
786            let operator = Box::new(ProjectOperator::with_store(
787                input_op,
788                projections,
789                output_types,
790                Arc::clone(&self.store),
791            ));
792
793            Ok((operator, columns))
794        } else {
795            // Simple case: just return variables
796            // Re-order columns to match return items if needed
797            let mut projections = Vec::with_capacity(ret.items.len());
798            let mut output_types = Vec::with_capacity(ret.items.len());
799
800            for item in &ret.items {
801                if let LogicalExpression::Variable(name) = &item.expression {
802                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
803                        Error::Internal(format!("Variable '{}' not found in input", name))
804                    })?;
805                    projections.push(ProjectExpr::Column(col_idx));
806                    output_types.push(LogicalType::Node);
807                }
808            }
809
810            // Only add ProjectOperator if reordering is needed
811            if projections.len() == input_columns.len()
812                && projections
813                    .iter()
814                    .enumerate()
815                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
816            {
817                // No reordering needed
818                Ok((input_op, columns))
819            } else {
820                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
821                Ok((operator, columns))
822            }
823        }
824    }
825
826    /// Plans a project operator (for WITH clause).
827    fn plan_project(
828        &self,
829        project: &crate::query::plan::ProjectOp,
830    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
831        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
832        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
833            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
834                // Create a single-row operator for projecting literals
835                let single_row_op: Box<dyn Operator> = Box::new(
836                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
837                );
838                (single_row_op, Vec::new())
839            } else {
840                self.plan_operator(&project.input)?
841            };
842
843        // Build variable to column index mapping
844        let variable_columns: HashMap<String, usize> = input_columns
845            .iter()
846            .enumerate()
847            .map(|(i, name)| (name.clone(), i))
848            .collect();
849
850        // Build projections and new column names
851        let mut projections = Vec::with_capacity(project.projections.len());
852        let mut output_types = Vec::with_capacity(project.projections.len());
853        let mut output_columns = Vec::with_capacity(project.projections.len());
854
855        for projection in &project.projections {
856            // Determine the output column name (alias or expression string)
857            let col_name = projection
858                .alias
859                .clone()
860                .unwrap_or_else(|| expression_to_string(&projection.expression));
861            output_columns.push(col_name);
862
863            match &projection.expression {
864                LogicalExpression::Variable(name) => {
865                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
866                        Error::Internal(format!("Variable '{}' not found in input", name))
867                    })?;
868                    projections.push(ProjectExpr::Column(col_idx));
869                    output_types.push(LogicalType::Node);
870                }
871                LogicalExpression::Property { variable, property } => {
872                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
873                        Error::Internal(format!("Variable '{}' not found in input", variable))
874                    })?;
875                    projections.push(ProjectExpr::PropertyAccess {
876                        column: col_idx,
877                        property: property.clone(),
878                    });
879                    output_types.push(LogicalType::Any);
880                }
881                LogicalExpression::Literal(value) => {
882                    projections.push(ProjectExpr::Constant(value.clone()));
883                    output_types.push(value_to_logical_type(value));
884                }
885                _ => {
886                    // For complex expressions, use full expression evaluation
887                    let filter_expr = self.convert_expression(&projection.expression)?;
888                    projections.push(ProjectExpr::Expression {
889                        expr: filter_expr,
890                        variable_columns: variable_columns.clone(),
891                    });
892                    output_types.push(LogicalType::Any);
893                }
894            }
895        }
896
897        let operator = Box::new(ProjectOperator::with_store(
898            input_op,
899            projections,
900            output_types,
901            Arc::clone(&self.store),
902        ));
903
904        Ok((operator, output_columns))
905    }
906
907    /// Plans a filter operator.
908    ///
909    /// Uses zone map pre-filtering to potentially skip scans when predicates
910    /// definitely won't match any data. Also uses property indexes when available
911    /// for O(1) lookups instead of full scans.
912    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
913        // Check zone maps for simple property predicates before scanning
914        // If zone map says "definitely no matches", we can short-circuit
915        if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
916            // Zone map says no matches possible - return empty result
917            let (_, columns) = self.plan_operator(&filter.input)?;
918            let schema = self.derive_schema_from_columns(&columns);
919            let empty_op = Box::new(EmptyOperator::new(schema));
920            return Ok((empty_op, columns));
921        }
922
923        // Try to use property index for equality predicates on indexed properties
924        if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
925            return Ok(result);
926        }
927
928        // Try to use range optimization for range predicates (>, <, >=, <=)
929        if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
930            return Ok(result);
931        }
932
933        // Plan the input operator first
934        let (input_op, columns) = self.plan_operator(&filter.input)?;
935
936        // Build variable to column index mapping
937        let variable_columns: HashMap<String, usize> = columns
938            .iter()
939            .enumerate()
940            .map(|(i, name)| (name.clone(), i))
941            .collect();
942
943        // Convert logical expression to filter expression
944        let filter_expr = self.convert_expression(&filter.predicate)?;
945
946        // Create the predicate
947        let predicate =
948            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
949
950        // Create the filter operator
951        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
952
953        Ok((operator, columns))
954    }
955
956    /// Checks zone maps for a predicate to see if we can skip the scan entirely.
957    ///
958    /// Returns:
959    /// - `Some(false)` if zone map proves no matches possible (can skip)
960    /// - `Some(true)` if zone map says matches might exist
961    /// - `None` if zone map check not applicable
962    fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
963        use grafeo_core::graph::lpg::CompareOp;
964
965        match predicate {
966            LogicalExpression::Binary { left, op, right } => {
967                // Check for AND/OR first (compound conditions)
968                match op {
969                    BinaryOp::And => {
970                        let left_result = self.check_zone_map_for_predicate(left);
971                        let right_result = self.check_zone_map_for_predicate(right);
972
973                        return match (left_result, right_result) {
974                            // If either side definitely won't match, the AND won't match
975                            (Some(false), _) | (_, Some(false)) => Some(false),
976                            // If both might match, might match overall
977                            (Some(true), Some(true)) => Some(true),
978                            // Otherwise, can't determine
979                            _ => None,
980                        };
981                    }
982                    BinaryOp::Or => {
983                        let left_result = self.check_zone_map_for_predicate(left);
984                        let right_result = self.check_zone_map_for_predicate(right);
985
986                        return match (left_result, right_result) {
987                            // Both sides definitely won't match
988                            (Some(false), Some(false)) => Some(false),
989                            // At least one side might match
990                            (Some(true), _) | (_, Some(true)) => Some(true),
991                            // Otherwise, can't determine
992                            _ => None,
993                        };
994                    }
995                    _ => {}
996                }
997
998                // Simple property comparison: n.property op value
999                let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1000                    (
1001                        LogicalExpression::Property { property, .. },
1002                        LogicalExpression::Literal(val),
1003                    ) => {
1004                        let cmp = match op {
1005                            BinaryOp::Eq => CompareOp::Eq,
1006                            BinaryOp::Ne => CompareOp::Ne,
1007                            BinaryOp::Lt => CompareOp::Lt,
1008                            BinaryOp::Le => CompareOp::Le,
1009                            BinaryOp::Gt => CompareOp::Gt,
1010                            BinaryOp::Ge => CompareOp::Ge,
1011                            _ => return None,
1012                        };
1013                        (property.clone(), cmp, val.clone())
1014                    }
1015                    (
1016                        LogicalExpression::Literal(val),
1017                        LogicalExpression::Property { property, .. },
1018                    ) => {
1019                        // Flip comparison for reversed operands
1020                        let cmp = match op {
1021                            BinaryOp::Eq => CompareOp::Eq,
1022                            BinaryOp::Ne => CompareOp::Ne,
1023                            BinaryOp::Lt => CompareOp::Gt, // val < prop means prop > val
1024                            BinaryOp::Le => CompareOp::Ge,
1025                            BinaryOp::Gt => CompareOp::Lt,
1026                            BinaryOp::Ge => CompareOp::Le,
1027                            _ => return None,
1028                        };
1029                        (property.clone(), cmp, val.clone())
1030                    }
1031                    _ => return None,
1032                };
1033
1034                // Check zone map for node properties
1035                let might_match =
1036                    self.store
1037                        .node_property_might_match(&property.into(), compare_op, &value);
1038
1039                Some(might_match)
1040            }
1041
1042            _ => None,
1043        }
1044    }
1045
1046    /// Tries to use a property index for filter optimization.
1047    ///
1048    /// When a filter predicate is an equality check on an indexed property,
1049    /// and the input is a simple NodeScan, we can use the index to look up
1050    /// matching nodes directly instead of scanning all nodes.
1051    ///
1052    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1053    /// `Ok(None)` if not applicable, or `Err` on error.
1054    fn try_plan_filter_with_property_index(
1055        &self,
1056        filter: &FilterOp,
1057    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1058        // Only optimize if input is a simple NodeScan (not nested)
1059        let (scan_variable, scan_label) = match filter.input.as_ref() {
1060            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1061                (scan.variable.clone(), scan.label.clone())
1062            }
1063            _ => return Ok(None),
1064        };
1065
1066        // Extract property equality conditions from the predicate
1067        // Handles both simple (n.prop = val) and compound (n.a = 1 AND n.b = 2)
1068        let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1069
1070        if conditions.is_empty() {
1071            return Ok(None);
1072        }
1073
1074        // Check if at least one condition has an index (otherwise full scan is needed anyway)
1075        let has_indexed_condition = conditions
1076            .iter()
1077            .any(|(prop, _)| self.store.has_property_index(prop));
1078
1079        if !has_indexed_condition {
1080            return Ok(None);
1081        }
1082
1083        // Use the optimized batch lookup for multiple conditions
1084        let conditions_ref: Vec<(&str, Value)> = conditions
1085            .iter()
1086            .map(|(p, v)| (p.as_str(), v.clone()))
1087            .collect();
1088        let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1089
1090        // If there's a label filter, also filter by label
1091        if let Some(label) = &scan_label {
1092            let label_nodes: std::collections::HashSet<_> =
1093                self.store.nodes_by_label(label).into_iter().collect();
1094            matching_nodes.retain(|n| label_nodes.contains(n));
1095        }
1096
1097        // Create a NodeListOperator with the matching nodes
1098        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1099        let columns = vec![scan_variable];
1100
1101        Ok(Some((node_list_op, columns)))
1102    }
1103
1104    /// Extracts equality conditions (property = literal) from a predicate.
1105    ///
1106    /// Handles both simple predicates and AND chains:
1107    /// - `n.name = "Alice"` → `[("name", "Alice")]`
1108    /// - `n.name = "Alice" AND n.age = 30` → `[("name", "Alice"), ("age", 30)]`
1109    fn extract_equality_conditions(
1110        &self,
1111        predicate: &LogicalExpression,
1112        target_variable: &str,
1113    ) -> Vec<(String, Value)> {
1114        let mut conditions = Vec::new();
1115        self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1116        conditions
1117    }
1118
1119    /// Recursively collects equality conditions from AND expressions.
1120    fn collect_equality_conditions(
1121        &self,
1122        expr: &LogicalExpression,
1123        target_variable: &str,
1124        conditions: &mut Vec<(String, Value)>,
1125    ) {
1126        match expr {
1127            // Handle AND: recurse into both sides
1128            LogicalExpression::Binary {
1129                left,
1130                op: BinaryOp::And,
1131                right,
1132            } => {
1133                self.collect_equality_conditions(left, target_variable, conditions);
1134                self.collect_equality_conditions(right, target_variable, conditions);
1135            }
1136
1137            // Handle equality: extract property and value
1138            LogicalExpression::Binary {
1139                left,
1140                op: BinaryOp::Eq,
1141                right,
1142            } => {
1143                if let Some((var, prop, val)) = self.extract_property_equality(left, right) {
1144                    if var == target_variable {
1145                        conditions.push((prop, val));
1146                    }
1147                }
1148            }
1149
1150            _ => {}
1151        }
1152    }
1153
1154    /// Extracts (variable, property, value) from a property equality expression.
1155    fn extract_property_equality(
1156        &self,
1157        left: &LogicalExpression,
1158        right: &LogicalExpression,
1159    ) -> Option<(String, String, Value)> {
1160        match (left, right) {
1161            (
1162                LogicalExpression::Property { variable, property },
1163                LogicalExpression::Literal(val),
1164            ) => Some((variable.clone(), property.clone(), val.clone())),
1165            (
1166                LogicalExpression::Literal(val),
1167                LogicalExpression::Property { variable, property },
1168            ) => Some((variable.clone(), property.clone(), val.clone())),
1169            _ => None,
1170        }
1171    }
1172
1173    /// Tries to optimize a filter using range queries on properties.
1174    ///
1175    /// This optimization is applied when:
1176    /// - The input is a simple NodeScan (no nested operations)
1177    /// - The predicate contains range comparisons (>, <, >=, <=)
1178    /// - The same variable and property are being filtered
1179    ///
1180    /// Handles both simple range predicates (`n.age > 30`) and BETWEEN patterns
1181    /// (`n.age >= 30 AND n.age <= 50`).
1182    ///
1183    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1184    /// `Ok(None)` if not applicable, or `Err` on error.
1185    fn try_plan_filter_with_range_index(
1186        &self,
1187        filter: &FilterOp,
1188    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1189        // Only optimize if input is a simple NodeScan (not nested)
1190        let (scan_variable, scan_label) = match filter.input.as_ref() {
1191            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1192                (scan.variable.clone(), scan.label.clone())
1193            }
1194            _ => return Ok(None),
1195        };
1196
1197        // Try to extract BETWEEN pattern first (more efficient)
1198        if let Some((variable, property, min, max, min_inc, max_inc)) =
1199            self.extract_between_predicate(&filter.predicate)
1200        {
1201            if variable == scan_variable {
1202                return self.plan_range_filter(
1203                    &scan_variable,
1204                    &scan_label,
1205                    &property,
1206                    RangeBounds {
1207                        min: Some(&min),
1208                        max: Some(&max),
1209                        min_inclusive: min_inc,
1210                        max_inclusive: max_inc,
1211                    },
1212                );
1213            }
1214        }
1215
1216        // Try to extract simple range predicate
1217        if let Some((variable, property, op, value)) =
1218            self.extract_range_predicate(&filter.predicate)
1219        {
1220            if variable == scan_variable {
1221                let (min, max, min_inc, max_inc) = match op {
1222                    BinaryOp::Lt => (None, Some(value), false, false),
1223                    BinaryOp::Le => (None, Some(value), false, true),
1224                    BinaryOp::Gt => (Some(value), None, false, false),
1225                    BinaryOp::Ge => (Some(value), None, true, false),
1226                    _ => return Ok(None),
1227                };
1228                return self.plan_range_filter(
1229                    &scan_variable,
1230                    &scan_label,
1231                    &property,
1232                    RangeBounds {
1233                        min: min.as_ref(),
1234                        max: max.as_ref(),
1235                        min_inclusive: min_inc,
1236                        max_inclusive: max_inc,
1237                    },
1238                );
1239            }
1240        }
1241
1242        Ok(None)
1243    }
1244
1245    /// Plans a range filter using `find_nodes_in_range`.
1246    fn plan_range_filter(
1247        &self,
1248        scan_variable: &str,
1249        scan_label: &Option<String>,
1250        property: &str,
1251        bounds: RangeBounds<'_>,
1252    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1253        // Use the store's range query method
1254        let mut matching_nodes = self.store.find_nodes_in_range(
1255            property,
1256            bounds.min,
1257            bounds.max,
1258            bounds.min_inclusive,
1259            bounds.max_inclusive,
1260        );
1261
1262        // If there's a label filter, also filter by label
1263        if let Some(label) = scan_label {
1264            let label_nodes: std::collections::HashSet<_> =
1265                self.store.nodes_by_label(label).into_iter().collect();
1266            matching_nodes.retain(|n| label_nodes.contains(n));
1267        }
1268
1269        // Create a NodeListOperator with the matching nodes
1270        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1271        let columns = vec![scan_variable.to_string()];
1272
1273        Ok(Some((node_list_op, columns)))
1274    }
1275
1276    /// Extracts a simple range predicate (>, <, >=, <=) from an expression.
1277    ///
1278    /// Returns `(variable, property, operator, value)` if found.
1279    fn extract_range_predicate(
1280        &self,
1281        predicate: &LogicalExpression,
1282    ) -> Option<(String, String, BinaryOp, Value)> {
1283        match predicate {
1284            LogicalExpression::Binary { left, op, right } => {
1285                match op {
1286                    BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1287                        // Try property on left: n.age > 30
1288                        if let (
1289                            LogicalExpression::Property { variable, property },
1290                            LogicalExpression::Literal(val),
1291                        ) = (left.as_ref(), right.as_ref())
1292                        {
1293                            return Some((variable.clone(), property.clone(), *op, val.clone()));
1294                        }
1295
1296                        // Try property on right: 30 < n.age (flip operator)
1297                        if let (
1298                            LogicalExpression::Literal(val),
1299                            LogicalExpression::Property { variable, property },
1300                        ) = (left.as_ref(), right.as_ref())
1301                        {
1302                            let flipped_op = match op {
1303                                BinaryOp::Lt => BinaryOp::Gt,
1304                                BinaryOp::Le => BinaryOp::Ge,
1305                                BinaryOp::Gt => BinaryOp::Lt,
1306                                BinaryOp::Ge => BinaryOp::Le,
1307                                _ => return None,
1308                            };
1309                            return Some((
1310                                variable.clone(),
1311                                property.clone(),
1312                                flipped_op,
1313                                val.clone(),
1314                            ));
1315                        }
1316                    }
1317                    _ => {}
1318                }
1319            }
1320            _ => {}
1321        }
1322        None
1323    }
1324
1325    /// Extracts a BETWEEN pattern from compound predicates.
1326    ///
1327    /// Recognizes patterns like:
1328    /// - `n.age >= 30 AND n.age <= 50`
1329    /// - `n.age > 30 AND n.age < 50`
1330    ///
1331    /// Returns `(variable, property, min_value, max_value, min_inclusive, max_inclusive)`.
1332    fn extract_between_predicate(
1333        &self,
1334        predicate: &LogicalExpression,
1335    ) -> Option<(String, String, Value, Value, bool, bool)> {
1336        // Must be an AND expression
1337        let (left, right) = match predicate {
1338            LogicalExpression::Binary {
1339                left,
1340                op: BinaryOp::And,
1341                right,
1342            } => (left.as_ref(), right.as_ref()),
1343            _ => return None,
1344        };
1345
1346        // Extract range predicates from both sides
1347        let left_range = self.extract_range_predicate(left);
1348        let right_range = self.extract_range_predicate(right);
1349
1350        let (left_var, left_prop, left_op, left_val) = left_range?;
1351        let (right_var, right_prop, right_op, right_val) = right_range?;
1352
1353        // Must be same variable and property
1354        if left_var != right_var || left_prop != right_prop {
1355            return None;
1356        }
1357
1358        // Determine which is lower bound and which is upper bound
1359        let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1360            // n.x >= min AND n.x <= max
1361            (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1362            // n.x >= min AND n.x < max
1363            (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1364            // n.x > min AND n.x <= max
1365            (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1366            // n.x > min AND n.x < max
1367            (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1368            // Reversed order: n.x <= max AND n.x >= min
1369            (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1370            // n.x < max AND n.x >= min
1371            (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1372            // n.x <= max AND n.x > min
1373            (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1374            // n.x < max AND n.x > min
1375            (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1376            _ => return None,
1377        };
1378
1379        Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1380    }
1381
1382    /// Plans a LIMIT operator.
1383    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1384        let (input_op, columns) = self.plan_operator(&limit.input)?;
1385        let output_schema = self.derive_schema_from_columns(&columns);
1386        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1387        Ok((operator, columns))
1388    }
1389
1390    /// Plans a SKIP operator.
1391    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1392        let (input_op, columns) = self.plan_operator(&skip.input)?;
1393        let output_schema = self.derive_schema_from_columns(&columns);
1394        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1395        Ok((operator, columns))
1396    }
1397
1398    /// Plans a SORT (ORDER BY) operator.
1399    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1400        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1401
1402        // Build variable to column index mapping
1403        let mut variable_columns: HashMap<String, usize> = input_columns
1404            .iter()
1405            .enumerate()
1406            .map(|(i, name)| (name.clone(), i))
1407            .collect();
1408
1409        // Collect property expressions that need to be projected before sorting
1410        let mut property_projections: Vec<(String, String, String)> = Vec::new();
1411        let mut next_col_idx = input_columns.len();
1412
1413        for key in &sort.keys {
1414            if let LogicalExpression::Property { variable, property } = &key.expression {
1415                let col_name = format!("{}_{}", variable, property);
1416                if !variable_columns.contains_key(&col_name) {
1417                    property_projections.push((
1418                        variable.clone(),
1419                        property.clone(),
1420                        col_name.clone(),
1421                    ));
1422                    variable_columns.insert(col_name, next_col_idx);
1423                    next_col_idx += 1;
1424                }
1425            }
1426        }
1427
1428        // Track output columns
1429        let mut output_columns = input_columns.clone();
1430
1431        // If we have property expressions, add a projection to materialize them
1432        if !property_projections.is_empty() {
1433            let mut projections = Vec::new();
1434            let mut output_types = Vec::new();
1435
1436            // First, pass through all existing columns (use Node type to preserve node IDs
1437            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1438            for (i, _) in input_columns.iter().enumerate() {
1439                projections.push(ProjectExpr::Column(i));
1440                output_types.push(LogicalType::Node);
1441            }
1442
1443            // Then add property access projections
1444            for (variable, property, col_name) in &property_projections {
1445                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1446                    Error::Internal(format!(
1447                        "Variable '{}' not found for ORDER BY property projection",
1448                        variable
1449                    ))
1450                })?;
1451                projections.push(ProjectExpr::PropertyAccess {
1452                    column: source_col,
1453                    property: property.clone(),
1454                });
1455                output_types.push(LogicalType::Any);
1456                output_columns.push(col_name.clone());
1457            }
1458
1459            input_op = Box::new(ProjectOperator::with_store(
1460                input_op,
1461                projections,
1462                output_types,
1463                Arc::clone(&self.store),
1464            ));
1465        }
1466
1467        // Convert logical sort keys to physical sort keys
1468        let physical_keys: Vec<PhysicalSortKey> = sort
1469            .keys
1470            .iter()
1471            .map(|key| {
1472                let col_idx = self
1473                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1474                Ok(PhysicalSortKey {
1475                    column: col_idx,
1476                    direction: match key.order {
1477                        SortOrder::Ascending => SortDirection::Ascending,
1478                        SortOrder::Descending => SortDirection::Descending,
1479                    },
1480                    null_order: NullOrder::NullsLast,
1481                })
1482            })
1483            .collect::<Result<Vec<_>>>()?;
1484
1485        let output_schema = self.derive_schema_from_columns(&output_columns);
1486        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1487        Ok((operator, output_columns))
1488    }
1489
1490    /// Resolves a sort expression to a column index, using projected property columns.
1491    fn resolve_sort_expression_with_properties(
1492        &self,
1493        expr: &LogicalExpression,
1494        variable_columns: &HashMap<String, usize>,
1495    ) -> Result<usize> {
1496        match expr {
1497            LogicalExpression::Variable(name) => {
1498                variable_columns.get(name).copied().ok_or_else(|| {
1499                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1500                })
1501            }
1502            LogicalExpression::Property { variable, property } => {
1503                // Look up the projected property column (e.g., "p_age" for p.age)
1504                let col_name = format!("{}_{}", variable, property);
1505                variable_columns.get(&col_name).copied().ok_or_else(|| {
1506                    Error::Internal(format!(
1507                        "Property column '{}' not found for ORDER BY (from {}.{})",
1508                        col_name, variable, property
1509                    ))
1510                })
1511            }
1512            _ => Err(Error::Internal(format!(
1513                "Unsupported ORDER BY expression: {:?}",
1514                expr
1515            ))),
1516        }
1517    }
1518
1519    /// Derives a schema from column names (uses Any type to handle all value types).
1520    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1521        columns.iter().map(|_| LogicalType::Any).collect()
1522    }
1523
1524    /// Plans an AGGREGATE operator.
1525    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1526        // Check if we can use factorized aggregation for speedup
1527        // Conditions:
1528        // 1. Factorized execution is enabled
1529        // 2. Input is an expand chain (multi-hop)
1530        // 3. No GROUP BY
1531        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1532        if self.factorized_execution
1533            && agg.group_by.is_empty()
1534            && Self::count_expand_chain(&agg.input).0 >= 2
1535            && self.is_simple_aggregate(agg)
1536        {
1537            if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1538                return Ok((op, cols));
1539            }
1540            // Fall through to regular aggregate if factorized planning fails
1541        }
1542
1543        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1544
1545        // Build variable to column index mapping
1546        let mut variable_columns: HashMap<String, usize> = input_columns
1547            .iter()
1548            .enumerate()
1549            .map(|(i, name)| (name.clone(), i))
1550            .collect();
1551
1552        // Collect all property expressions that need to be projected before aggregation
1553        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1554        let mut next_col_idx = input_columns.len();
1555
1556        // Check group-by expressions for properties
1557        for expr in &agg.group_by {
1558            if let LogicalExpression::Property { variable, property } = expr {
1559                let col_name = format!("{}_{}", variable, property);
1560                if !variable_columns.contains_key(&col_name) {
1561                    property_projections.push((
1562                        variable.clone(),
1563                        property.clone(),
1564                        col_name.clone(),
1565                    ));
1566                    variable_columns.insert(col_name, next_col_idx);
1567                    next_col_idx += 1;
1568                }
1569            }
1570        }
1571
1572        // Check aggregate expressions for properties
1573        for agg_expr in &agg.aggregates {
1574            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1575                let col_name = format!("{}_{}", variable, property);
1576                if !variable_columns.contains_key(&col_name) {
1577                    property_projections.push((
1578                        variable.clone(),
1579                        property.clone(),
1580                        col_name.clone(),
1581                    ));
1582                    variable_columns.insert(col_name, next_col_idx);
1583                    next_col_idx += 1;
1584                }
1585            }
1586        }
1587
1588        // If we have property expressions, add a projection to materialize them
1589        if !property_projections.is_empty() {
1590            let mut projections = Vec::new();
1591            let mut output_types = Vec::new();
1592
1593            // First, pass through all existing columns (use Node type to preserve node IDs
1594            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1595            for (i, _) in input_columns.iter().enumerate() {
1596                projections.push(ProjectExpr::Column(i));
1597                output_types.push(LogicalType::Node);
1598            }
1599
1600            // Then add property access projections
1601            for (variable, property, _col_name) in &property_projections {
1602                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1603                    Error::Internal(format!(
1604                        "Variable '{}' not found for property projection",
1605                        variable
1606                    ))
1607                })?;
1608                projections.push(ProjectExpr::PropertyAccess {
1609                    column: source_col,
1610                    property: property.clone(),
1611                });
1612                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1613            }
1614
1615            input_op = Box::new(ProjectOperator::with_store(
1616                input_op,
1617                projections,
1618                output_types,
1619                Arc::clone(&self.store),
1620            ));
1621        }
1622
1623        // Convert group-by expressions to column indices
1624        let group_columns: Vec<usize> = agg
1625            .group_by
1626            .iter()
1627            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1628            .collect::<Result<Vec<_>>>()?;
1629
1630        // Convert aggregate expressions to physical form
1631        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1632            .aggregates
1633            .iter()
1634            .map(|agg_expr| {
1635                let column = agg_expr
1636                    .expression
1637                    .as_ref()
1638                    .map(|e| {
1639                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1640                    })
1641                    .transpose()?;
1642
1643                Ok(PhysicalAggregateExpr {
1644                    function: convert_aggregate_function(agg_expr.function),
1645                    column,
1646                    distinct: agg_expr.distinct,
1647                    alias: agg_expr.alias.clone(),
1648                    percentile: agg_expr.percentile,
1649                })
1650            })
1651            .collect::<Result<Vec<_>>>()?;
1652
1653        // Build output schema and column names
1654        let mut output_schema = Vec::new();
1655        let mut output_columns = Vec::new();
1656
1657        // Add group-by columns
1658        for expr in &agg.group_by {
1659            output_schema.push(LogicalType::Any); // Group-by values can be any type
1660            output_columns.push(expression_to_string(expr));
1661        }
1662
1663        // Add aggregate result columns
1664        for agg_expr in &agg.aggregates {
1665            let result_type = match agg_expr.function {
1666                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1667                    LogicalType::Int64
1668                }
1669                LogicalAggregateFunction::Sum => LogicalType::Int64,
1670                LogicalAggregateFunction::Avg => LogicalType::Float64,
1671                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1672                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1673                    // since the aggregate can return any Value type, but the most common case
1674                    // is numeric values from property expressions
1675                    LogicalType::Int64
1676                }
1677                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1678                // Statistical functions return Float64
1679                LogicalAggregateFunction::StdDev
1680                | LogicalAggregateFunction::StdDevPop
1681                | LogicalAggregateFunction::PercentileDisc
1682                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1683            };
1684            output_schema.push(result_type);
1685            output_columns.push(
1686                agg_expr
1687                    .alias
1688                    .clone()
1689                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1690            );
1691        }
1692
1693        // Choose operator based on whether there are group-by columns
1694        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1695            Box::new(SimpleAggregateOperator::new(
1696                input_op,
1697                physical_aggregates,
1698                output_schema,
1699            ))
1700        } else {
1701            Box::new(HashAggregateOperator::new(
1702                input_op,
1703                group_columns,
1704                physical_aggregates,
1705                output_schema,
1706            ))
1707        };
1708
1709        // Apply HAVING clause filter if present
1710        if let Some(having_expr) = &agg.having {
1711            // Build variable to column mapping for the aggregate output
1712            let having_var_columns: HashMap<String, usize> = output_columns
1713                .iter()
1714                .enumerate()
1715                .map(|(i, name)| (name.clone(), i))
1716                .collect();
1717
1718            let filter_expr = self.convert_expression(having_expr)?;
1719            let predicate =
1720                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1721            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1722        }
1723
1724        Ok((operator, output_columns))
1725    }
1726
1727    /// Checks if an aggregate is simple enough for factorized execution.
1728    ///
1729    /// Simple aggregates:
1730    /// - COUNT(*) or COUNT(variable)
1731    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1732    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1733        agg.aggregates.iter().all(|agg_expr| {
1734            match agg_expr.function {
1735                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1736                    // COUNT(*) is always OK, COUNT(var) is OK
1737                    agg_expr.expression.is_none()
1738                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1739                }
1740                LogicalAggregateFunction::Sum
1741                | LogicalAggregateFunction::Avg
1742                | LogicalAggregateFunction::Min
1743                | LogicalAggregateFunction::Max => {
1744                    // For now, only support when expression is a variable
1745                    // (property access would require flattening first)
1746                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1747                }
1748                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1749                _ => false,
1750            }
1751        })
1752    }
1753
1754    /// Plans a factorized aggregate that operates directly on factorized data.
1755    ///
1756    /// This avoids the O(n²) cost of flattening before aggregation.
1757    fn plan_factorized_aggregate(
1758        &self,
1759        agg: &AggregateOp,
1760    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1761        // Build the expand chain - this returns a LazyFactorizedChainOperator
1762        let expands = Self::collect_expand_chain(&agg.input);
1763        if expands.is_empty() {
1764            return Err(Error::Internal(
1765                "Expected expand chain for factorized aggregate".to_string(),
1766            ));
1767        }
1768
1769        // Get the base operator (before first expand)
1770        let first_expand = expands[0];
1771        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1772
1773        let mut columns = base_columns.clone();
1774        let mut steps = Vec::new();
1775        let mut is_first = true;
1776
1777        for expand in &expands {
1778            // Find source column for this expand
1779            let source_column = if is_first {
1780                base_columns
1781                    .iter()
1782                    .position(|c| c == &expand.from_variable)
1783                    .ok_or_else(|| {
1784                        Error::Internal(format!(
1785                            "Source variable '{}' not found in base columns",
1786                            expand.from_variable
1787                        ))
1788                    })?
1789            } else {
1790                1 // Target from previous level
1791            };
1792
1793            let direction = match expand.direction {
1794                ExpandDirection::Outgoing => Direction::Outgoing,
1795                ExpandDirection::Incoming => Direction::Incoming,
1796                ExpandDirection::Both => Direction::Both,
1797            };
1798
1799            steps.push(ExpandStep {
1800                source_column,
1801                direction,
1802                edge_type: expand.edge_type.clone(),
1803            });
1804
1805            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1806                let count = self.anon_edge_counter.get();
1807                self.anon_edge_counter.set(count + 1);
1808                format!("_anon_edge_{}", count)
1809            });
1810            columns.push(edge_col_name);
1811            columns.push(expand.to_variable.clone());
1812
1813            is_first = false;
1814        }
1815
1816        // Create the lazy factorized chain operator
1817        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1818
1819        if let Some(tx_id) = self.tx_id {
1820            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1821        } else {
1822            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1823        }
1824
1825        // Convert logical aggregates to factorized aggregates
1826        let factorized_aggs: Vec<FactorizedAggregate> = agg
1827            .aggregates
1828            .iter()
1829            .map(|agg_expr| {
1830                match agg_expr.function {
1831                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1832                        // COUNT(*) uses simple count, COUNT(col) uses column count
1833                        if agg_expr.expression.is_none() {
1834                            FactorizedAggregate::count()
1835                        } else {
1836                            // For COUNT(variable), we use the deepest level's target column
1837                            // which is the last column added to the schema
1838                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1839                        }
1840                    }
1841                    LogicalAggregateFunction::Sum => {
1842                        // SUM on deepest level target
1843                        FactorizedAggregate::sum(1)
1844                    }
1845                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1846                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1847                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1848                    _ => {
1849                        // Shouldn't reach here due to is_simple_aggregate check
1850                        FactorizedAggregate::count()
1851                    }
1852                }
1853            })
1854            .collect();
1855
1856        // Build output column names
1857        let output_columns: Vec<String> = agg
1858            .aggregates
1859            .iter()
1860            .map(|agg_expr| {
1861                agg_expr
1862                    .alias
1863                    .clone()
1864                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1865            })
1866            .collect();
1867
1868        // Create the factorized aggregate operator
1869        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1870
1871        Ok((Box::new(factorized_agg_op), output_columns))
1872    }
1873
1874    /// Resolves a logical expression to a column index.
1875    #[allow(dead_code)]
1876    fn resolve_expression_to_column(
1877        &self,
1878        expr: &LogicalExpression,
1879        variable_columns: &HashMap<String, usize>,
1880    ) -> Result<usize> {
1881        match expr {
1882            LogicalExpression::Variable(name) => variable_columns
1883                .get(name)
1884                .copied()
1885                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1886            LogicalExpression::Property { variable, .. } => variable_columns
1887                .get(variable)
1888                .copied()
1889                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1890            _ => Err(Error::Internal(format!(
1891                "Cannot resolve expression to column: {:?}",
1892                expr
1893            ))),
1894        }
1895    }
1896
1897    /// Resolves a logical expression to a column index, using projected property columns.
1898    ///
1899    /// This is used for aggregations where properties have been projected into their own columns.
1900    fn resolve_expression_to_column_with_properties(
1901        &self,
1902        expr: &LogicalExpression,
1903        variable_columns: &HashMap<String, usize>,
1904    ) -> Result<usize> {
1905        match expr {
1906            LogicalExpression::Variable(name) => variable_columns
1907                .get(name)
1908                .copied()
1909                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1910            LogicalExpression::Property { variable, property } => {
1911                // Look up the projected property column (e.g., "p_price" for p.price)
1912                let col_name = format!("{}_{}", variable, property);
1913                variable_columns.get(&col_name).copied().ok_or_else(|| {
1914                    Error::Internal(format!(
1915                        "Property column '{}' not found (from {}.{})",
1916                        col_name, variable, property
1917                    ))
1918                })
1919            }
1920            _ => Err(Error::Internal(format!(
1921                "Cannot resolve expression to column: {:?}",
1922                expr
1923            ))),
1924        }
1925    }
1926
1927    /// Converts a logical expression to a filter expression.
1928    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1929        match expr {
1930            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1931            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1932            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1933                variable: variable.clone(),
1934                property: property.clone(),
1935            }),
1936            LogicalExpression::Binary { left, op, right } => {
1937                let left_expr = self.convert_expression(left)?;
1938                let right_expr = self.convert_expression(right)?;
1939                let filter_op = convert_binary_op(*op)?;
1940                Ok(FilterExpression::Binary {
1941                    left: Box::new(left_expr),
1942                    op: filter_op,
1943                    right: Box::new(right_expr),
1944                })
1945            }
1946            LogicalExpression::Unary { op, operand } => {
1947                let operand_expr = self.convert_expression(operand)?;
1948                let filter_op = convert_unary_op(*op)?;
1949                Ok(FilterExpression::Unary {
1950                    op: filter_op,
1951                    operand: Box::new(operand_expr),
1952                })
1953            }
1954            LogicalExpression::FunctionCall { name, args, .. } => {
1955                let filter_args: Vec<FilterExpression> = args
1956                    .iter()
1957                    .map(|a| self.convert_expression(a))
1958                    .collect::<Result<Vec<_>>>()?;
1959                Ok(FilterExpression::FunctionCall {
1960                    name: name.clone(),
1961                    args: filter_args,
1962                })
1963            }
1964            LogicalExpression::Case {
1965                operand,
1966                when_clauses,
1967                else_clause,
1968            } => {
1969                let filter_operand = operand
1970                    .as_ref()
1971                    .map(|e| self.convert_expression(e))
1972                    .transpose()?
1973                    .map(Box::new);
1974                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1975                    .iter()
1976                    .map(|(cond, result)| {
1977                        Ok((
1978                            self.convert_expression(cond)?,
1979                            self.convert_expression(result)?,
1980                        ))
1981                    })
1982                    .collect::<Result<Vec<_>>>()?;
1983                let filter_else = else_clause
1984                    .as_ref()
1985                    .map(|e| self.convert_expression(e))
1986                    .transpose()?
1987                    .map(Box::new);
1988                Ok(FilterExpression::Case {
1989                    operand: filter_operand,
1990                    when_clauses: filter_when_clauses,
1991                    else_clause: filter_else,
1992                })
1993            }
1994            LogicalExpression::List(items) => {
1995                let filter_items: Vec<FilterExpression> = items
1996                    .iter()
1997                    .map(|item| self.convert_expression(item))
1998                    .collect::<Result<Vec<_>>>()?;
1999                Ok(FilterExpression::List(filter_items))
2000            }
2001            LogicalExpression::Map(pairs) => {
2002                let filter_pairs: Vec<(String, FilterExpression)> = pairs
2003                    .iter()
2004                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2005                    .collect::<Result<Vec<_>>>()?;
2006                Ok(FilterExpression::Map(filter_pairs))
2007            }
2008            LogicalExpression::IndexAccess { base, index } => {
2009                let base_expr = self.convert_expression(base)?;
2010                let index_expr = self.convert_expression(index)?;
2011                Ok(FilterExpression::IndexAccess {
2012                    base: Box::new(base_expr),
2013                    index: Box::new(index_expr),
2014                })
2015            }
2016            LogicalExpression::SliceAccess { base, start, end } => {
2017                let base_expr = self.convert_expression(base)?;
2018                let start_expr = start
2019                    .as_ref()
2020                    .map(|s| self.convert_expression(s))
2021                    .transpose()?
2022                    .map(Box::new);
2023                let end_expr = end
2024                    .as_ref()
2025                    .map(|e| self.convert_expression(e))
2026                    .transpose()?
2027                    .map(Box::new);
2028                Ok(FilterExpression::SliceAccess {
2029                    base: Box::new(base_expr),
2030                    start: start_expr,
2031                    end: end_expr,
2032                })
2033            }
2034            LogicalExpression::Parameter(_) => Err(Error::Internal(
2035                "Parameters not yet supported in filters".to_string(),
2036            )),
2037            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2038            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2039            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2040            LogicalExpression::ListComprehension {
2041                variable,
2042                list_expr,
2043                filter_expr,
2044                map_expr,
2045            } => {
2046                let list = self.convert_expression(list_expr)?;
2047                let filter = filter_expr
2048                    .as_ref()
2049                    .map(|f| self.convert_expression(f))
2050                    .transpose()?
2051                    .map(Box::new);
2052                let map = self.convert_expression(map_expr)?;
2053                Ok(FilterExpression::ListComprehension {
2054                    variable: variable.clone(),
2055                    list_expr: Box::new(list),
2056                    filter_expr: filter,
2057                    map_expr: Box::new(map),
2058                })
2059            }
2060            LogicalExpression::ExistsSubquery(subplan) => {
2061                // Extract the pattern from the subplan
2062                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
2063                let (start_var, direction, edge_type, end_labels) =
2064                    self.extract_exists_pattern(subplan)?;
2065
2066                Ok(FilterExpression::ExistsSubquery {
2067                    start_var,
2068                    direction,
2069                    edge_type,
2070                    end_labels,
2071                    min_hops: None,
2072                    max_hops: None,
2073                })
2074            }
2075            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2076                "COUNT subqueries not yet supported".to_string(),
2077            )),
2078        }
2079    }
2080
2081    /// Extracts the pattern from an EXISTS subplan.
2082    /// Returns (start_variable, direction, edge_type, end_labels).
2083    fn extract_exists_pattern(
2084        &self,
2085        subplan: &LogicalOperator,
2086    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2087        match subplan {
2088            LogicalOperator::Expand(expand) => {
2089                // Get end node labels from the to_variable if there's a node scan input
2090                let end_labels = self.extract_end_labels_from_expand(expand);
2091                let direction = match expand.direction {
2092                    ExpandDirection::Outgoing => Direction::Outgoing,
2093                    ExpandDirection::Incoming => Direction::Incoming,
2094                    ExpandDirection::Both => Direction::Both,
2095                };
2096                Ok((
2097                    expand.from_variable.clone(),
2098                    direction,
2099                    expand.edge_type.clone(),
2100                    end_labels,
2101                ))
2102            }
2103            LogicalOperator::NodeScan(scan) => {
2104                if let Some(input) = &scan.input {
2105                    self.extract_exists_pattern(input)
2106                } else {
2107                    Err(Error::Internal(
2108                        "EXISTS subquery must contain an edge pattern".to_string(),
2109                    ))
2110                }
2111            }
2112            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2113            _ => Err(Error::Internal(
2114                "Unsupported EXISTS subquery pattern".to_string(),
2115            )),
2116        }
2117    }
2118
2119    /// Extracts end node labels from an Expand operator if present.
2120    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2121        // Check if the expand has a NodeScan input with a label filter
2122        match expand.input.as_ref() {
2123            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2124            _ => None,
2125        }
2126    }
2127
2128    /// Plans a JOIN operator.
2129    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2130        let (left_op, left_columns) = self.plan_operator(&join.left)?;
2131        let (right_op, right_columns) = self.plan_operator(&join.right)?;
2132
2133        // Build combined output columns
2134        let mut columns = left_columns.clone();
2135        columns.extend(right_columns.clone());
2136
2137        // Convert join type
2138        let physical_join_type = match join.join_type {
2139            JoinType::Inner => PhysicalJoinType::Inner,
2140            JoinType::Left => PhysicalJoinType::Left,
2141            JoinType::Right => PhysicalJoinType::Right,
2142            JoinType::Full => PhysicalJoinType::Full,
2143            JoinType::Cross => PhysicalJoinType::Cross,
2144            JoinType::Semi => PhysicalJoinType::Semi,
2145            JoinType::Anti => PhysicalJoinType::Anti,
2146        };
2147
2148        // Build key columns from join conditions
2149        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2150            // Cross join - no keys
2151            (vec![], vec![])
2152        } else {
2153            join.conditions
2154                .iter()
2155                .filter_map(|cond| {
2156                    // Try to extract column indices from expressions
2157                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2158                    let right_idx = self
2159                        .expression_to_column(&cond.right, &right_columns)
2160                        .ok()?;
2161                    Some((left_idx, right_idx))
2162                })
2163                .unzip()
2164        };
2165
2166        let output_schema = self.derive_schema_from_columns(&columns);
2167
2168        // Check if we should use leapfrog join for cyclic patterns
2169        // Currently we use hash join by default; leapfrog is available but
2170        // requires explicit multi-way join detection which will be added
2171        // when we have proper cyclic pattern detection in the optimizer.
2172        // For now, LeapfrogJoinOperator is available for direct use.
2173        let _ = LeapfrogJoinOperator::new; // Suppress unused warning
2174
2175        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2176            left_op,
2177            right_op,
2178            probe_keys,
2179            build_keys,
2180            physical_join_type,
2181            output_schema,
2182        ));
2183
2184        Ok((operator, columns))
2185    }
2186
2187    /// Checks if a join pattern is cyclic (e.g., triangle, clique).
2188    ///
2189    /// A cyclic pattern occurs when join conditions reference variables
2190    /// that create a cycle in the join graph. For example, a triangle
2191    /// pattern (a)->(b)->(c)->(a) creates a cycle.
2192    ///
2193    /// Returns true if the join graph contains at least one cycle with 3+ nodes,
2194    /// indicating potential for worst-case optimal join (WCOJ) optimization.
2195    #[allow(dead_code)]
2196    fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2197        // Build adjacency list for join variables
2198        let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2199        let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2200
2201        // Collect edges from join conditions
2202        Self::collect_join_edges(
2203            &LogicalOperator::Join(join.clone()),
2204            &mut edges,
2205            &mut all_vars,
2206        );
2207
2208        // Need at least 3 variables to form a cycle
2209        if all_vars.len() < 3 {
2210            return false;
2211        }
2212
2213        // Detect cycle using DFS with coloring
2214        Self::has_cycle(&edges, &all_vars)
2215    }
2216
2217    /// Collects edges from join conditions into an adjacency list.
2218    fn collect_join_edges(
2219        op: &LogicalOperator,
2220        edges: &mut HashMap<String, Vec<String>>,
2221        vars: &mut std::collections::HashSet<String>,
2222    ) {
2223        match op {
2224            LogicalOperator::Join(join) => {
2225                // Process join conditions
2226                for cond in &join.conditions {
2227                    if let (Some(left_var), Some(right_var)) = (
2228                        Self::extract_join_variable(&cond.left),
2229                        Self::extract_join_variable(&cond.right),
2230                    ) {
2231                        if left_var != right_var {
2232                            vars.insert(left_var.clone());
2233                            vars.insert(right_var.clone());
2234
2235                            // Add bidirectional edge
2236                            edges
2237                                .entry(left_var.clone())
2238                                .or_default()
2239                                .push(right_var.clone());
2240                            edges.entry(right_var).or_default().push(left_var);
2241                        }
2242                    }
2243                }
2244
2245                // Recurse into children
2246                Self::collect_join_edges(&join.left, edges, vars);
2247                Self::collect_join_edges(&join.right, edges, vars);
2248            }
2249            LogicalOperator::Expand(expand) => {
2250                // Expand creates implicit join between from_variable and to_variable
2251                vars.insert(expand.from_variable.clone());
2252                vars.insert(expand.to_variable.clone());
2253
2254                edges
2255                    .entry(expand.from_variable.clone())
2256                    .or_default()
2257                    .push(expand.to_variable.clone());
2258                edges
2259                    .entry(expand.to_variable.clone())
2260                    .or_default()
2261                    .push(expand.from_variable.clone());
2262
2263                Self::collect_join_edges(&expand.input, edges, vars);
2264            }
2265            LogicalOperator::Filter(filter) => {
2266                Self::collect_join_edges(&filter.input, edges, vars);
2267            }
2268            LogicalOperator::NodeScan(scan) => {
2269                vars.insert(scan.variable.clone());
2270            }
2271            _ => {}
2272        }
2273    }
2274
2275    /// Extracts the variable name from a join expression.
2276    fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2277        match expr {
2278            LogicalExpression::Variable(v) => Some(v.clone()),
2279            LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2280            LogicalExpression::Id(v) => Some(v.clone()),
2281            _ => None,
2282        }
2283    }
2284
2285    /// Detects if the graph has a cycle using DFS coloring.
2286    ///
2287    /// Colors: 0 = white (unvisited), 1 = gray (in progress), 2 = black (done)
2288    fn has_cycle(
2289        edges: &HashMap<String, Vec<String>>,
2290        vars: &std::collections::HashSet<String>,
2291    ) -> bool {
2292        let mut color: HashMap<&String, u8> = HashMap::new();
2293
2294        for var in vars {
2295            color.insert(var, 0);
2296        }
2297
2298        for start in vars {
2299            if color[start] == 0 {
2300                if Self::dfs_cycle(start, None, edges, &mut color) {
2301                    return true;
2302                }
2303            }
2304        }
2305
2306        false
2307    }
2308
2309    /// DFS helper for cycle detection.
2310    fn dfs_cycle(
2311        node: &String,
2312        parent: Option<&String>,
2313        edges: &HashMap<String, Vec<String>>,
2314        color: &mut HashMap<&String, u8>,
2315    ) -> bool {
2316        *color.get_mut(node).unwrap() = 1; // Gray
2317
2318        if let Some(neighbors) = edges.get(node) {
2319            for neighbor in neighbors {
2320                // Skip the edge back to parent (undirected graph)
2321                if parent == Some(neighbor) {
2322                    continue;
2323                }
2324
2325                if let Some(&c) = color.get(neighbor) {
2326                    if c == 1 {
2327                        // Found a back edge - cycle detected
2328                        return true;
2329                    }
2330                    if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2331                        return true;
2332                    }
2333                }
2334            }
2335        }
2336
2337        *color.get_mut(node).unwrap() = 2; // Black
2338        false
2339    }
2340
2341    /// Counts the number of base relations in a logical operator tree.
2342    #[allow(dead_code)]
2343    fn count_relations(op: &LogicalOperator) -> usize {
2344        match op {
2345            LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2346            LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2347            LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2348            LogicalOperator::Join(j) => {
2349                Self::count_relations(&j.left) + Self::count_relations(&j.right)
2350            }
2351            _ => 0,
2352        }
2353    }
2354
2355    /// Extracts a column index from an expression.
2356    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2357        match expr {
2358            LogicalExpression::Variable(name) => columns
2359                .iter()
2360                .position(|c| c == name)
2361                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2362            _ => Err(Error::Internal(
2363                "Only variables supported in join conditions".to_string(),
2364            )),
2365        }
2366    }
2367
2368    /// Plans a UNION operator.
2369    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2370        if union.inputs.is_empty() {
2371            return Err(Error::Internal(
2372                "Union requires at least one input".to_string(),
2373            ));
2374        }
2375
2376        let mut inputs = Vec::with_capacity(union.inputs.len());
2377        let mut columns = Vec::new();
2378
2379        for (i, input) in union.inputs.iter().enumerate() {
2380            let (op, cols) = self.plan_operator(input)?;
2381            if i == 0 {
2382                columns = cols;
2383            }
2384            inputs.push(op);
2385        }
2386
2387        let output_schema = self.derive_schema_from_columns(&columns);
2388        let operator = Box::new(UnionOperator::new(inputs, output_schema));
2389
2390        Ok((operator, columns))
2391    }
2392
2393    /// Plans a DISTINCT operator.
2394    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2395        let (input_op, columns) = self.plan_operator(&distinct.input)?;
2396        let output_schema = self.derive_schema_from_columns(&columns);
2397        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2398        Ok((operator, columns))
2399    }
2400
2401    /// Plans a CREATE NODE operator.
2402    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2403        // Plan input if present
2404        let (input_op, mut columns) = if let Some(ref input) = create.input {
2405            let (op, cols) = self.plan_operator(input)?;
2406            (Some(op), cols)
2407        } else {
2408            (None, vec![])
2409        };
2410
2411        // Output column for the created node
2412        let output_column = columns.len();
2413        columns.push(create.variable.clone());
2414
2415        // Convert properties (with constant-folding for lists and function calls)
2416        let properties: Vec<(String, PropertySource)> = create
2417            .properties
2418            .iter()
2419            .map(|(name, expr)| {
2420                let source = match Self::try_fold_expression(expr) {
2421                    Some(value) => PropertySource::Constant(value),
2422                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2423                };
2424                (name.clone(), source)
2425            })
2426            .collect();
2427
2428        let output_schema = self.derive_schema_from_columns(&columns);
2429
2430        let operator = Box::new(
2431            CreateNodeOperator::new(
2432                Arc::clone(&self.store),
2433                input_op,
2434                create.labels.clone(),
2435                properties,
2436                output_schema,
2437                output_column,
2438            )
2439            .with_tx_context(self.viewing_epoch, self.tx_id),
2440        );
2441
2442        Ok((operator, columns))
2443    }
2444
2445    /// Plans a CREATE EDGE operator.
2446    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2447        let (input_op, mut columns) = self.plan_operator(&create.input)?;
2448
2449        // Find source and target columns
2450        let from_column = columns
2451            .iter()
2452            .position(|c| c == &create.from_variable)
2453            .ok_or_else(|| {
2454                Error::Internal(format!(
2455                    "Source variable '{}' not found",
2456                    create.from_variable
2457                ))
2458            })?;
2459
2460        let to_column = columns
2461            .iter()
2462            .position(|c| c == &create.to_variable)
2463            .ok_or_else(|| {
2464                Error::Internal(format!(
2465                    "Target variable '{}' not found",
2466                    create.to_variable
2467                ))
2468            })?;
2469
2470        // Output column for the created edge (if named)
2471        let output_column = create.variable.as_ref().map(|v| {
2472            let idx = columns.len();
2473            columns.push(v.clone());
2474            idx
2475        });
2476
2477        // Convert properties (with constant-folding for function calls like vector())
2478        let properties: Vec<(String, PropertySource)> = create
2479            .properties
2480            .iter()
2481            .map(|(name, expr)| {
2482                let source = match Self::try_fold_expression(expr) {
2483                    Some(value) => PropertySource::Constant(value),
2484                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2485                };
2486                (name.clone(), source)
2487            })
2488            .collect();
2489
2490        let output_schema = self.derive_schema_from_columns(&columns);
2491
2492        let mut operator = CreateEdgeOperator::new(
2493            Arc::clone(&self.store),
2494            input_op,
2495            from_column,
2496            to_column,
2497            create.edge_type.clone(),
2498            output_schema,
2499        )
2500        .with_properties(properties)
2501        .with_tx_context(self.viewing_epoch, self.tx_id);
2502
2503        if let Some(col) = output_column {
2504            operator = operator.with_output_column(col);
2505        }
2506
2507        let operator = Box::new(operator);
2508
2509        Ok((operator, columns))
2510    }
2511
2512    /// Plans a DELETE NODE operator.
2513    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2514        let (input_op, columns) = self.plan_operator(&delete.input)?;
2515
2516        let node_column = columns
2517            .iter()
2518            .position(|c| c == &delete.variable)
2519            .ok_or_else(|| {
2520                Error::Internal(format!(
2521                    "Variable '{}' not found for delete",
2522                    delete.variable
2523                ))
2524            })?;
2525
2526        // Output schema for delete count
2527        let output_schema = vec![LogicalType::Int64];
2528        let output_columns = vec!["deleted_count".to_string()];
2529
2530        let operator = Box::new(
2531            DeleteNodeOperator::new(
2532                Arc::clone(&self.store),
2533                input_op,
2534                node_column,
2535                output_schema,
2536                delete.detach, // DETACH DELETE deletes connected edges first
2537            )
2538            .with_tx_context(self.viewing_epoch, self.tx_id),
2539        );
2540
2541        Ok((operator, output_columns))
2542    }
2543
2544    /// Plans a DELETE EDGE operator.
2545    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2546        let (input_op, columns) = self.plan_operator(&delete.input)?;
2547
2548        let edge_column = columns
2549            .iter()
2550            .position(|c| c == &delete.variable)
2551            .ok_or_else(|| {
2552                Error::Internal(format!(
2553                    "Variable '{}' not found for delete",
2554                    delete.variable
2555                ))
2556            })?;
2557
2558        // Output schema for delete count
2559        let output_schema = vec![LogicalType::Int64];
2560        let output_columns = vec!["deleted_count".to_string()];
2561
2562        let operator = Box::new(
2563            DeleteEdgeOperator::new(
2564                Arc::clone(&self.store),
2565                input_op,
2566                edge_column,
2567                output_schema,
2568            )
2569            .with_tx_context(self.viewing_epoch, self.tx_id),
2570        );
2571
2572        Ok((operator, output_columns))
2573    }
2574
2575    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
2576    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2577        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2578        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2579
2580        // Build combined output columns (left + right)
2581        let mut columns = left_columns.clone();
2582        columns.extend(right_columns.clone());
2583
2584        // Find common variables between left and right for join keys
2585        let mut probe_keys = Vec::new();
2586        let mut build_keys = Vec::new();
2587
2588        for (right_idx, right_col) in right_columns.iter().enumerate() {
2589            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2590                probe_keys.push(left_idx);
2591                build_keys.push(right_idx);
2592            }
2593        }
2594
2595        let output_schema = self.derive_schema_from_columns(&columns);
2596
2597        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2598            left_op,
2599            right_op,
2600            probe_keys,
2601            build_keys,
2602            PhysicalJoinType::Left,
2603            output_schema,
2604        ));
2605
2606        Ok((operator, columns))
2607    }
2608
2609    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
2610    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2611        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2612        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2613
2614        // Anti-join only keeps left columns (filters out matching rows)
2615        let columns = left_columns.clone();
2616
2617        // Find common variables between left and right for join keys
2618        let mut probe_keys = Vec::new();
2619        let mut build_keys = Vec::new();
2620
2621        for (right_idx, right_col) in right_columns.iter().enumerate() {
2622            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2623                probe_keys.push(left_idx);
2624                build_keys.push(right_idx);
2625            }
2626        }
2627
2628        let output_schema = self.derive_schema_from_columns(&columns);
2629
2630        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2631            left_op,
2632            right_op,
2633            probe_keys,
2634            build_keys,
2635            PhysicalJoinType::Anti,
2636            output_schema,
2637        ));
2638
2639        Ok((operator, columns))
2640    }
2641
2642    /// Plans an unwind operator.
2643    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2644        // Plan the input operator first
2645        // Handle Empty specially - use a single-row operator
2646        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2647            if matches!(&*unwind.input, LogicalOperator::Empty) {
2648                // For UNWIND without prior MATCH, create a single-row input
2649                // We need an operator that produces one row with the list to unwind
2650                // For now, use EmptyScan which produces no rows - we'll handle the literal
2651                // list in the unwind operator itself
2652                let literal_list = self.convert_expression(&unwind.expression)?;
2653
2654                // Create a project operator that produces a single row with the list
2655                let single_row_op: Box<dyn Operator> = Box::new(
2656                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2657                );
2658                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2659                    single_row_op,
2660                    vec![ProjectExpr::Expression {
2661                        expr: literal_list,
2662                        variable_columns: HashMap::new(),
2663                    }],
2664                    vec![LogicalType::Any],
2665                    Arc::clone(&self.store),
2666                ));
2667
2668                (project_op, vec!["__list__".to_string()])
2669            } else {
2670                self.plan_operator(&unwind.input)?
2671            };
2672
2673        // The UNWIND expression should be a list - we need to find/evaluate it
2674        // For now, we handle the case where the expression references an existing column
2675        // or is a literal list
2676
2677        // Find if the expression references an existing column (like a list property)
2678        let list_col_idx = match &unwind.expression {
2679            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2680            LogicalExpression::Property { variable, .. } => {
2681                // Property access needs to be evaluated - for now we'll need the filter predicate
2682                // to evaluate this. For simple cases, we treat it as a list column.
2683                input_columns.iter().position(|c| c == variable)
2684            }
2685            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2686                // Literal list expression - we'll add it as a virtual column
2687                None
2688            }
2689            _ => None,
2690        };
2691
2692        // Build output columns: all input columns plus the new variable
2693        let mut columns = input_columns.clone();
2694        columns.push(unwind.variable.clone());
2695
2696        // Build output schema
2697        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2698        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2699
2700        // Use the list column index if found, otherwise default to 0
2701        // (in which case the first column should contain the list)
2702        let col_idx = list_col_idx.unwrap_or(0);
2703
2704        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2705            input_op,
2706            col_idx,
2707            unwind.variable.clone(),
2708            output_schema,
2709        ));
2710
2711        Ok((operator, columns))
2712    }
2713
2714    /// Plans a MERGE operator.
2715    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2716        // Plan the input operator if present (skip if Empty)
2717        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2718            Vec::new()
2719        } else {
2720            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2721            cols
2722        };
2723
2724        // Convert match properties from LogicalExpression to Value
2725        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2726            .match_properties
2727            .iter()
2728            .filter_map(|(name, expr)| {
2729                if let LogicalExpression::Literal(v) = expr {
2730                    Some((name.clone(), v.clone()))
2731                } else {
2732                    None // Skip non-literal expressions for now
2733                }
2734            })
2735            .collect();
2736
2737        // Convert ON CREATE properties
2738        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2739            .on_create
2740            .iter()
2741            .filter_map(|(name, expr)| {
2742                if let LogicalExpression::Literal(v) = expr {
2743                    Some((name.clone(), v.clone()))
2744                } else {
2745                    None
2746                }
2747            })
2748            .collect();
2749
2750        // Convert ON MATCH properties
2751        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2752            .on_match
2753            .iter()
2754            .filter_map(|(name, expr)| {
2755                if let LogicalExpression::Literal(v) = expr {
2756                    Some((name.clone(), v.clone()))
2757                } else {
2758                    None
2759                }
2760            })
2761            .collect();
2762
2763        // Add the merged node variable to output columns
2764        columns.push(merge.variable.clone());
2765
2766        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2767            Arc::clone(&self.store),
2768            merge.variable.clone(),
2769            merge.labels.clone(),
2770            match_properties,
2771            on_create_properties,
2772            on_match_properties,
2773        ));
2774
2775        Ok((operator, columns))
2776    }
2777
2778    /// Plans a SHORTEST PATH operator.
2779    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2780        // Plan the input operator
2781        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2782
2783        // Find source and target node columns
2784        let source_column = columns
2785            .iter()
2786            .position(|c| c == &sp.source_var)
2787            .ok_or_else(|| {
2788                Error::Internal(format!(
2789                    "Source variable '{}' not found for shortestPath",
2790                    sp.source_var
2791                ))
2792            })?;
2793
2794        let target_column = columns
2795            .iter()
2796            .position(|c| c == &sp.target_var)
2797            .ok_or_else(|| {
2798                Error::Internal(format!(
2799                    "Target variable '{}' not found for shortestPath",
2800                    sp.target_var
2801                ))
2802            })?;
2803
2804        // Convert direction
2805        let direction = match sp.direction {
2806            ExpandDirection::Outgoing => Direction::Outgoing,
2807            ExpandDirection::Incoming => Direction::Incoming,
2808            ExpandDirection::Both => Direction::Both,
2809        };
2810
2811        // Create the shortest path operator
2812        let operator: Box<dyn Operator> = Box::new(
2813            ShortestPathOperator::new(
2814                Arc::clone(&self.store),
2815                input_op,
2816                source_column,
2817                target_column,
2818                sp.edge_type.clone(),
2819                direction,
2820            )
2821            .with_all_paths(sp.all_paths),
2822        );
2823
2824        // Add path length column with the expected naming convention
2825        // The translator expects _path_length_{alias} format for length(p) calls
2826        columns.push(format!("_path_length_{}", sp.path_alias));
2827
2828        Ok((operator, columns))
2829    }
2830
2831    /// Plans an ADD LABEL operator.
2832    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2833        let (input_op, columns) = self.plan_operator(&add_label.input)?;
2834
2835        // Find the node column
2836        let node_column = columns
2837            .iter()
2838            .position(|c| c == &add_label.variable)
2839            .ok_or_else(|| {
2840                Error::Internal(format!(
2841                    "Variable '{}' not found for ADD LABEL",
2842                    add_label.variable
2843                ))
2844            })?;
2845
2846        // Output schema for update count
2847        let output_schema = vec![LogicalType::Int64];
2848        let output_columns = vec!["labels_added".to_string()];
2849
2850        let operator = Box::new(AddLabelOperator::new(
2851            Arc::clone(&self.store),
2852            input_op,
2853            node_column,
2854            add_label.labels.clone(),
2855            output_schema,
2856        ));
2857
2858        Ok((operator, output_columns))
2859    }
2860
2861    /// Plans a REMOVE LABEL operator.
2862    fn plan_remove_label(
2863        &self,
2864        remove_label: &RemoveLabelOp,
2865    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2866        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2867
2868        // Find the node column
2869        let node_column = columns
2870            .iter()
2871            .position(|c| c == &remove_label.variable)
2872            .ok_or_else(|| {
2873                Error::Internal(format!(
2874                    "Variable '{}' not found for REMOVE LABEL",
2875                    remove_label.variable
2876                ))
2877            })?;
2878
2879        // Output schema for update count
2880        let output_schema = vec![LogicalType::Int64];
2881        let output_columns = vec!["labels_removed".to_string()];
2882
2883        let operator = Box::new(RemoveLabelOperator::new(
2884            Arc::clone(&self.store),
2885            input_op,
2886            node_column,
2887            remove_label.labels.clone(),
2888            output_schema,
2889        ));
2890
2891        Ok((operator, output_columns))
2892    }
2893
2894    /// Plans a SET PROPERTY operator.
2895    fn plan_set_property(
2896        &self,
2897        set_prop: &SetPropertyOp,
2898    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2899        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2900
2901        // Find the entity column (node or edge variable)
2902        let entity_column = columns
2903            .iter()
2904            .position(|c| c == &set_prop.variable)
2905            .ok_or_else(|| {
2906                Error::Internal(format!(
2907                    "Variable '{}' not found for SET",
2908                    set_prop.variable
2909                ))
2910            })?;
2911
2912        // Convert properties to PropertySource
2913        let properties: Vec<(String, PropertySource)> = set_prop
2914            .properties
2915            .iter()
2916            .map(|(name, expr)| {
2917                let source = self.expression_to_property_source(expr, &columns)?;
2918                Ok((name.clone(), source))
2919            })
2920            .collect::<Result<Vec<_>>>()?;
2921
2922        // Output schema preserves input schema (passes through)
2923        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2924        let output_columns = columns.clone();
2925
2926        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
2927        let operator = Box::new(SetPropertyOperator::new_for_node(
2928            Arc::clone(&self.store),
2929            input_op,
2930            entity_column,
2931            properties,
2932            output_schema,
2933        ));
2934
2935        Ok((operator, output_columns))
2936    }
2937
2938    /// Converts a logical expression to a PropertySource.
2939    fn expression_to_property_source(
2940        &self,
2941        expr: &LogicalExpression,
2942        columns: &[String],
2943    ) -> Result<PropertySource> {
2944        match expr {
2945            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2946            LogicalExpression::Variable(name) => {
2947                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2948                    Error::Internal(format!("Variable '{}' not found for property source", name))
2949                })?;
2950                Ok(PropertySource::Column(col_idx))
2951            }
2952            LogicalExpression::Parameter(name) => {
2953                // Parameters should be resolved before planning
2954                // For now, treat as a placeholder
2955                Ok(PropertySource::Constant(
2956                    grafeo_common::types::Value::String(format!("${}", name).into()),
2957                ))
2958            }
2959            _ => {
2960                if let Some(value) = Self::try_fold_expression(expr) {
2961                    Ok(PropertySource::Constant(value))
2962                } else {
2963                    Err(Error::Internal(format!(
2964                        "Unsupported expression type for property source: {:?}",
2965                        expr
2966                    )))
2967                }
2968            }
2969        }
2970    }
2971
2972    /// Tries to evaluate a constant expression at plan time.
2973    ///
2974    /// Recursively folds literals, lists, and known function calls (like `vector()`)
2975    /// into concrete values. Returns `None` if the expression contains non-constant
2976    /// parts (variables, property accesses, etc.).
2977    fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
2978        match expr {
2979            LogicalExpression::Literal(v) => Some(v.clone()),
2980            LogicalExpression::List(items) => {
2981                let values: Option<Vec<Value>> =
2982                    items.iter().map(Self::try_fold_expression).collect();
2983                let values = values?;
2984                // All-numeric lists become vectors (matches Python list[float] behavior)
2985                let all_numeric = !values.is_empty()
2986                    && values
2987                        .iter()
2988                        .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
2989                if all_numeric {
2990                    let floats: Vec<f32> = values
2991                        .iter()
2992                        .filter_map(|v| match v {
2993                            Value::Float64(f) => Some(*f as f32),
2994                            Value::Int64(i) => Some(*i as f32),
2995                            _ => None,
2996                        })
2997                        .collect();
2998                    Some(Value::Vector(floats.into()))
2999                } else {
3000                    Some(Value::List(values.into()))
3001                }
3002            }
3003            LogicalExpression::FunctionCall { name, args, .. } => {
3004                match name.to_lowercase().as_str() {
3005                    "vector" => {
3006                        if args.len() != 1 {
3007                            return None;
3008                        }
3009                        let val = Self::try_fold_expression(&args[0])?;
3010                        match val {
3011                            Value::List(items) => {
3012                                let floats: Vec<f32> = items
3013                                    .iter()
3014                                    .filter_map(|v| match v {
3015                                        Value::Float64(f) => Some(*f as f32),
3016                                        Value::Int64(i) => Some(*i as f32),
3017                                        _ => None,
3018                                    })
3019                                    .collect();
3020                                if floats.len() == items.len() {
3021                                    Some(Value::Vector(floats.into()))
3022                                } else {
3023                                    None
3024                                }
3025                            }
3026                            // Already a vector (from all-numeric list folding)
3027                            Value::Vector(v) => Some(Value::Vector(v)),
3028                            _ => None,
3029                        }
3030                    }
3031                    _ => None,
3032                }
3033            }
3034            _ => None,
3035        }
3036    }
3037}
3038
3039/// Converts a logical binary operator to a filter binary operator.
3040pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3041    match op {
3042        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3043        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3044        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3045        BinaryOp::Le => Ok(BinaryFilterOp::Le),
3046        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3047        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3048        BinaryOp::And => Ok(BinaryFilterOp::And),
3049        BinaryOp::Or => Ok(BinaryFilterOp::Or),
3050        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3051        BinaryOp::Add => Ok(BinaryFilterOp::Add),
3052        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3053        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3054        BinaryOp::Div => Ok(BinaryFilterOp::Div),
3055        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3056        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3057        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3058        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3059        BinaryOp::In => Ok(BinaryFilterOp::In),
3060        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3061        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3062        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3063            "Binary operator {:?} not yet supported in filters",
3064            op
3065        ))),
3066    }
3067}
3068
3069/// Converts a logical unary operator to a filter unary operator.
3070pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3071    match op {
3072        UnaryOp::Not => Ok(UnaryFilterOp::Not),
3073        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3074        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3075        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3076    }
3077}
3078
3079/// Converts a logical aggregate function to a physical aggregate function.
3080pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3081    match func {
3082        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3083        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3084        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3085        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3086        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3087        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3088        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3089        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3090        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3091        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3092        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3093    }
3094}
3095
3096/// Converts a logical expression to a filter expression.
3097///
3098/// This is a standalone function that can be used by both LPG and RDF planners.
3099pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3100    match expr {
3101        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3102        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3103        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3104            variable: variable.clone(),
3105            property: property.clone(),
3106        }),
3107        LogicalExpression::Binary { left, op, right } => {
3108            let left_expr = convert_filter_expression(left)?;
3109            let right_expr = convert_filter_expression(right)?;
3110            let filter_op = convert_binary_op(*op)?;
3111            Ok(FilterExpression::Binary {
3112                left: Box::new(left_expr),
3113                op: filter_op,
3114                right: Box::new(right_expr),
3115            })
3116        }
3117        LogicalExpression::Unary { op, operand } => {
3118            let operand_expr = convert_filter_expression(operand)?;
3119            let filter_op = convert_unary_op(*op)?;
3120            Ok(FilterExpression::Unary {
3121                op: filter_op,
3122                operand: Box::new(operand_expr),
3123            })
3124        }
3125        LogicalExpression::FunctionCall { name, args, .. } => {
3126            let filter_args: Vec<FilterExpression> = args
3127                .iter()
3128                .map(convert_filter_expression)
3129                .collect::<Result<Vec<_>>>()?;
3130            Ok(FilterExpression::FunctionCall {
3131                name: name.clone(),
3132                args: filter_args,
3133            })
3134        }
3135        LogicalExpression::Case {
3136            operand,
3137            when_clauses,
3138            else_clause,
3139        } => {
3140            let filter_operand = operand
3141                .as_ref()
3142                .map(|e| convert_filter_expression(e))
3143                .transpose()?
3144                .map(Box::new);
3145            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3146                .iter()
3147                .map(|(cond, result)| {
3148                    Ok((
3149                        convert_filter_expression(cond)?,
3150                        convert_filter_expression(result)?,
3151                    ))
3152                })
3153                .collect::<Result<Vec<_>>>()?;
3154            let filter_else = else_clause
3155                .as_ref()
3156                .map(|e| convert_filter_expression(e))
3157                .transpose()?
3158                .map(Box::new);
3159            Ok(FilterExpression::Case {
3160                operand: filter_operand,
3161                when_clauses: filter_when_clauses,
3162                else_clause: filter_else,
3163            })
3164        }
3165        LogicalExpression::List(items) => {
3166            let filter_items: Vec<FilterExpression> = items
3167                .iter()
3168                .map(convert_filter_expression)
3169                .collect::<Result<Vec<_>>>()?;
3170            Ok(FilterExpression::List(filter_items))
3171        }
3172        LogicalExpression::Map(pairs) => {
3173            let filter_pairs: Vec<(String, FilterExpression)> = pairs
3174                .iter()
3175                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3176                .collect::<Result<Vec<_>>>()?;
3177            Ok(FilterExpression::Map(filter_pairs))
3178        }
3179        LogicalExpression::IndexAccess { base, index } => {
3180            let base_expr = convert_filter_expression(base)?;
3181            let index_expr = convert_filter_expression(index)?;
3182            Ok(FilterExpression::IndexAccess {
3183                base: Box::new(base_expr),
3184                index: Box::new(index_expr),
3185            })
3186        }
3187        LogicalExpression::SliceAccess { base, start, end } => {
3188            let base_expr = convert_filter_expression(base)?;
3189            let start_expr = start
3190                .as_ref()
3191                .map(|s| convert_filter_expression(s))
3192                .transpose()?
3193                .map(Box::new);
3194            let end_expr = end
3195                .as_ref()
3196                .map(|e| convert_filter_expression(e))
3197                .transpose()?
3198                .map(Box::new);
3199            Ok(FilterExpression::SliceAccess {
3200                base: Box::new(base_expr),
3201                start: start_expr,
3202                end: end_expr,
3203            })
3204        }
3205        LogicalExpression::Parameter(_) => Err(Error::Internal(
3206            "Parameters not yet supported in filters".to_string(),
3207        )),
3208        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3209        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3210        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3211        LogicalExpression::ListComprehension {
3212            variable,
3213            list_expr,
3214            filter_expr,
3215            map_expr,
3216        } => {
3217            let list = convert_filter_expression(list_expr)?;
3218            let filter = filter_expr
3219                .as_ref()
3220                .map(|f| convert_filter_expression(f))
3221                .transpose()?
3222                .map(Box::new);
3223            let map = convert_filter_expression(map_expr)?;
3224            Ok(FilterExpression::ListComprehension {
3225                variable: variable.clone(),
3226                list_expr: Box::new(list),
3227                filter_expr: filter,
3228                map_expr: Box::new(map),
3229            })
3230        }
3231        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3232            Error::Internal("Subqueries not yet supported in filters".to_string()),
3233        ),
3234    }
3235}
3236
3237/// Infers the logical type from a value.
3238fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3239    use grafeo_common::types::Value;
3240    match value {
3241        Value::Null => LogicalType::String, // Default type for null
3242        Value::Bool(_) => LogicalType::Bool,
3243        Value::Int64(_) => LogicalType::Int64,
3244        Value::Float64(_) => LogicalType::Float64,
3245        Value::String(_) => LogicalType::String,
3246        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
3247        Value::Timestamp(_) => LogicalType::Timestamp,
3248        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
3249        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
3250        Value::Vector(v) => LogicalType::Vector(v.len()),
3251    }
3252}
3253
3254/// Converts an expression to a string for column naming.
3255fn expression_to_string(expr: &LogicalExpression) -> String {
3256    match expr {
3257        LogicalExpression::Variable(name) => name.clone(),
3258        LogicalExpression::Property { variable, property } => {
3259            format!("{variable}.{property}")
3260        }
3261        LogicalExpression::Literal(value) => format!("{value:?}"),
3262        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3263        _ => "expr".to_string(),
3264    }
3265}
3266
3267/// A physical plan ready for execution.
3268pub struct PhysicalPlan {
3269    /// The root physical operator.
3270    pub operator: Box<dyn Operator>,
3271    /// Column names for the result.
3272    pub columns: Vec<String>,
3273    /// Adaptive execution context with cardinality estimates.
3274    ///
3275    /// When adaptive execution is enabled, this context contains estimated
3276    /// cardinalities at various checkpoints in the plan. During execution,
3277    /// actual row counts are recorded and compared against estimates.
3278    pub adaptive_context: Option<AdaptiveContext>,
3279}
3280
3281impl PhysicalPlan {
3282    /// Returns the column names.
3283    #[must_use]
3284    pub fn columns(&self) -> &[String] {
3285        &self.columns
3286    }
3287
3288    /// Consumes the plan and returns the operator.
3289    pub fn into_operator(self) -> Box<dyn Operator> {
3290        self.operator
3291    }
3292
3293    /// Returns the adaptive context, if adaptive execution is enabled.
3294    #[must_use]
3295    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3296        self.adaptive_context.as_ref()
3297    }
3298
3299    /// Takes ownership of the adaptive context.
3300    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3301        self.adaptive_context.take()
3302    }
3303}
3304
3305/// Helper operator that returns a single result chunk once.
3306///
3307/// Used by the factorized expand chain to wrap the final result.
3308#[allow(dead_code)]
3309struct SingleResultOperator {
3310    result: Option<grafeo_core::execution::DataChunk>,
3311}
3312
3313impl SingleResultOperator {
3314    #[allow(dead_code)]
3315    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3316        Self { result }
3317    }
3318}
3319
3320impl Operator for SingleResultOperator {
3321    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3322        Ok(self.result.take())
3323    }
3324
3325    fn reset(&mut self) {
3326        // Cannot reset - result is consumed
3327    }
3328
3329    fn name(&self) -> &'static str {
3330        "SingleResult"
3331    }
3332}
3333
3334#[cfg(test)]
3335mod tests {
3336    use super::*;
3337    use crate::query::plan::{
3338        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3339        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3340        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3341        SortKey, SortOp,
3342    };
3343    use grafeo_common::types::Value;
3344
3345    fn create_test_store() -> Arc<LpgStore> {
3346        let store = Arc::new(LpgStore::new());
3347        store.create_node(&["Person"]);
3348        store.create_node(&["Person"]);
3349        store.create_node(&["Company"]);
3350        store
3351    }
3352
3353    // ==================== Simple Scan Tests ====================
3354
3355    #[test]
3356    fn test_plan_simple_scan() {
3357        let store = create_test_store();
3358        let planner = Planner::new(store);
3359
3360        // MATCH (n:Person) RETURN n
3361        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3362            items: vec![ReturnItem {
3363                expression: LogicalExpression::Variable("n".to_string()),
3364                alias: None,
3365            }],
3366            distinct: false,
3367            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3368                variable: "n".to_string(),
3369                label: Some("Person".to_string()),
3370                input: None,
3371            })),
3372        }));
3373
3374        let physical = planner.plan(&logical).unwrap();
3375        assert_eq!(physical.columns(), &["n"]);
3376    }
3377
3378    #[test]
3379    fn test_plan_scan_without_label() {
3380        let store = create_test_store();
3381        let planner = Planner::new(store);
3382
3383        // MATCH (n) RETURN n
3384        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3385            items: vec![ReturnItem {
3386                expression: LogicalExpression::Variable("n".to_string()),
3387                alias: None,
3388            }],
3389            distinct: false,
3390            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3391                variable: "n".to_string(),
3392                label: None,
3393                input: None,
3394            })),
3395        }));
3396
3397        let physical = planner.plan(&logical).unwrap();
3398        assert_eq!(physical.columns(), &["n"]);
3399    }
3400
3401    #[test]
3402    fn test_plan_return_with_alias() {
3403        let store = create_test_store();
3404        let planner = Planner::new(store);
3405
3406        // MATCH (n:Person) RETURN n AS person
3407        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3408            items: vec![ReturnItem {
3409                expression: LogicalExpression::Variable("n".to_string()),
3410                alias: Some("person".to_string()),
3411            }],
3412            distinct: false,
3413            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3414                variable: "n".to_string(),
3415                label: Some("Person".to_string()),
3416                input: None,
3417            })),
3418        }));
3419
3420        let physical = planner.plan(&logical).unwrap();
3421        assert_eq!(physical.columns(), &["person"]);
3422    }
3423
3424    #[test]
3425    fn test_plan_return_property() {
3426        let store = create_test_store();
3427        let planner = Planner::new(store);
3428
3429        // MATCH (n:Person) RETURN n.name
3430        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3431            items: vec![ReturnItem {
3432                expression: LogicalExpression::Property {
3433                    variable: "n".to_string(),
3434                    property: "name".to_string(),
3435                },
3436                alias: None,
3437            }],
3438            distinct: false,
3439            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3440                variable: "n".to_string(),
3441                label: Some("Person".to_string()),
3442                input: None,
3443            })),
3444        }));
3445
3446        let physical = planner.plan(&logical).unwrap();
3447        assert_eq!(physical.columns(), &["n.name"]);
3448    }
3449
3450    #[test]
3451    fn test_plan_return_literal() {
3452        let store = create_test_store();
3453        let planner = Planner::new(store);
3454
3455        // MATCH (n) RETURN 42 AS answer
3456        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3457            items: vec![ReturnItem {
3458                expression: LogicalExpression::Literal(Value::Int64(42)),
3459                alias: Some("answer".to_string()),
3460            }],
3461            distinct: false,
3462            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3463                variable: "n".to_string(),
3464                label: None,
3465                input: None,
3466            })),
3467        }));
3468
3469        let physical = planner.plan(&logical).unwrap();
3470        assert_eq!(physical.columns(), &["answer"]);
3471    }
3472
3473    // ==================== Filter Tests ====================
3474
3475    #[test]
3476    fn test_plan_filter_equality() {
3477        let store = create_test_store();
3478        let planner = Planner::new(store);
3479
3480        // MATCH (n:Person) WHERE n.age = 30 RETURN n
3481        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3482            items: vec![ReturnItem {
3483                expression: LogicalExpression::Variable("n".to_string()),
3484                alias: None,
3485            }],
3486            distinct: false,
3487            input: Box::new(LogicalOperator::Filter(FilterOp {
3488                predicate: LogicalExpression::Binary {
3489                    left: Box::new(LogicalExpression::Property {
3490                        variable: "n".to_string(),
3491                        property: "age".to_string(),
3492                    }),
3493                    op: BinaryOp::Eq,
3494                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3495                },
3496                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3497                    variable: "n".to_string(),
3498                    label: Some("Person".to_string()),
3499                    input: None,
3500                })),
3501            })),
3502        }));
3503
3504        let physical = planner.plan(&logical).unwrap();
3505        assert_eq!(physical.columns(), &["n"]);
3506    }
3507
3508    #[test]
3509    fn test_plan_filter_compound_and() {
3510        let store = create_test_store();
3511        let planner = Planner::new(store);
3512
3513        // WHERE n.age > 20 AND n.age < 40
3514        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3515            items: vec![ReturnItem {
3516                expression: LogicalExpression::Variable("n".to_string()),
3517                alias: None,
3518            }],
3519            distinct: false,
3520            input: Box::new(LogicalOperator::Filter(FilterOp {
3521                predicate: LogicalExpression::Binary {
3522                    left: Box::new(LogicalExpression::Binary {
3523                        left: Box::new(LogicalExpression::Property {
3524                            variable: "n".to_string(),
3525                            property: "age".to_string(),
3526                        }),
3527                        op: BinaryOp::Gt,
3528                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3529                    }),
3530                    op: BinaryOp::And,
3531                    right: Box::new(LogicalExpression::Binary {
3532                        left: Box::new(LogicalExpression::Property {
3533                            variable: "n".to_string(),
3534                            property: "age".to_string(),
3535                        }),
3536                        op: BinaryOp::Lt,
3537                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3538                    }),
3539                },
3540                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3541                    variable: "n".to_string(),
3542                    label: None,
3543                    input: None,
3544                })),
3545            })),
3546        }));
3547
3548        let physical = planner.plan(&logical).unwrap();
3549        assert_eq!(physical.columns(), &["n"]);
3550    }
3551
3552    #[test]
3553    fn test_plan_filter_unary_not() {
3554        let store = create_test_store();
3555        let planner = Planner::new(store);
3556
3557        // WHERE NOT n.active
3558        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3559            items: vec![ReturnItem {
3560                expression: LogicalExpression::Variable("n".to_string()),
3561                alias: None,
3562            }],
3563            distinct: false,
3564            input: Box::new(LogicalOperator::Filter(FilterOp {
3565                predicate: LogicalExpression::Unary {
3566                    op: UnaryOp::Not,
3567                    operand: Box::new(LogicalExpression::Property {
3568                        variable: "n".to_string(),
3569                        property: "active".to_string(),
3570                    }),
3571                },
3572                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3573                    variable: "n".to_string(),
3574                    label: None,
3575                    input: None,
3576                })),
3577            })),
3578        }));
3579
3580        let physical = planner.plan(&logical).unwrap();
3581        assert_eq!(physical.columns(), &["n"]);
3582    }
3583
3584    #[test]
3585    fn test_plan_filter_is_null() {
3586        let store = create_test_store();
3587        let planner = Planner::new(store);
3588
3589        // WHERE n.email IS NULL
3590        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3591            items: vec![ReturnItem {
3592                expression: LogicalExpression::Variable("n".to_string()),
3593                alias: None,
3594            }],
3595            distinct: false,
3596            input: Box::new(LogicalOperator::Filter(FilterOp {
3597                predicate: LogicalExpression::Unary {
3598                    op: UnaryOp::IsNull,
3599                    operand: Box::new(LogicalExpression::Property {
3600                        variable: "n".to_string(),
3601                        property: "email".to_string(),
3602                    }),
3603                },
3604                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3605                    variable: "n".to_string(),
3606                    label: None,
3607                    input: None,
3608                })),
3609            })),
3610        }));
3611
3612        let physical = planner.plan(&logical).unwrap();
3613        assert_eq!(physical.columns(), &["n"]);
3614    }
3615
3616    #[test]
3617    fn test_plan_filter_function_call() {
3618        let store = create_test_store();
3619        let planner = Planner::new(store);
3620
3621        // WHERE size(n.friends) > 0
3622        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3623            items: vec![ReturnItem {
3624                expression: LogicalExpression::Variable("n".to_string()),
3625                alias: None,
3626            }],
3627            distinct: false,
3628            input: Box::new(LogicalOperator::Filter(FilterOp {
3629                predicate: LogicalExpression::Binary {
3630                    left: Box::new(LogicalExpression::FunctionCall {
3631                        name: "size".to_string(),
3632                        args: vec![LogicalExpression::Property {
3633                            variable: "n".to_string(),
3634                            property: "friends".to_string(),
3635                        }],
3636                        distinct: false,
3637                    }),
3638                    op: BinaryOp::Gt,
3639                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3640                },
3641                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3642                    variable: "n".to_string(),
3643                    label: None,
3644                    input: None,
3645                })),
3646            })),
3647        }));
3648
3649        let physical = planner.plan(&logical).unwrap();
3650        assert_eq!(physical.columns(), &["n"]);
3651    }
3652
3653    // ==================== Expand Tests ====================
3654
3655    #[test]
3656    fn test_plan_expand_outgoing() {
3657        let store = create_test_store();
3658        let planner = Planner::new(store);
3659
3660        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
3661        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3662            items: vec![
3663                ReturnItem {
3664                    expression: LogicalExpression::Variable("a".to_string()),
3665                    alias: None,
3666                },
3667                ReturnItem {
3668                    expression: LogicalExpression::Variable("b".to_string()),
3669                    alias: None,
3670                },
3671            ],
3672            distinct: false,
3673            input: Box::new(LogicalOperator::Expand(ExpandOp {
3674                from_variable: "a".to_string(),
3675                to_variable: "b".to_string(),
3676                edge_variable: None,
3677                direction: ExpandDirection::Outgoing,
3678                edge_type: Some("KNOWS".to_string()),
3679                min_hops: 1,
3680                max_hops: Some(1),
3681                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3682                    variable: "a".to_string(),
3683                    label: Some("Person".to_string()),
3684                    input: None,
3685                })),
3686                path_alias: None,
3687            })),
3688        }));
3689
3690        let physical = planner.plan(&logical).unwrap();
3691        // The return should have columns [a, b]
3692        assert!(physical.columns().contains(&"a".to_string()));
3693        assert!(physical.columns().contains(&"b".to_string()));
3694    }
3695
3696    #[test]
3697    fn test_plan_expand_with_edge_variable() {
3698        let store = create_test_store();
3699        let planner = Planner::new(store);
3700
3701        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
3702        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3703            items: vec![
3704                ReturnItem {
3705                    expression: LogicalExpression::Variable("a".to_string()),
3706                    alias: None,
3707                },
3708                ReturnItem {
3709                    expression: LogicalExpression::Variable("r".to_string()),
3710                    alias: None,
3711                },
3712                ReturnItem {
3713                    expression: LogicalExpression::Variable("b".to_string()),
3714                    alias: None,
3715                },
3716            ],
3717            distinct: false,
3718            input: Box::new(LogicalOperator::Expand(ExpandOp {
3719                from_variable: "a".to_string(),
3720                to_variable: "b".to_string(),
3721                edge_variable: Some("r".to_string()),
3722                direction: ExpandDirection::Outgoing,
3723                edge_type: Some("KNOWS".to_string()),
3724                min_hops: 1,
3725                max_hops: Some(1),
3726                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3727                    variable: "a".to_string(),
3728                    label: None,
3729                    input: None,
3730                })),
3731                path_alias: None,
3732            })),
3733        }));
3734
3735        let physical = planner.plan(&logical).unwrap();
3736        assert!(physical.columns().contains(&"a".to_string()));
3737        assert!(physical.columns().contains(&"r".to_string()));
3738        assert!(physical.columns().contains(&"b".to_string()));
3739    }
3740
3741    // ==================== Limit/Skip/Sort Tests ====================
3742
3743    #[test]
3744    fn test_plan_limit() {
3745        let store = create_test_store();
3746        let planner = Planner::new(store);
3747
3748        // MATCH (n) RETURN n LIMIT 10
3749        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3750            items: vec![ReturnItem {
3751                expression: LogicalExpression::Variable("n".to_string()),
3752                alias: None,
3753            }],
3754            distinct: false,
3755            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3756                count: 10,
3757                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3758                    variable: "n".to_string(),
3759                    label: None,
3760                    input: None,
3761                })),
3762            })),
3763        }));
3764
3765        let physical = planner.plan(&logical).unwrap();
3766        assert_eq!(physical.columns(), &["n"]);
3767    }
3768
3769    #[test]
3770    fn test_plan_skip() {
3771        let store = create_test_store();
3772        let planner = Planner::new(store);
3773
3774        // MATCH (n) RETURN n SKIP 5
3775        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3776            items: vec![ReturnItem {
3777                expression: LogicalExpression::Variable("n".to_string()),
3778                alias: None,
3779            }],
3780            distinct: false,
3781            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3782                count: 5,
3783                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3784                    variable: "n".to_string(),
3785                    label: None,
3786                    input: None,
3787                })),
3788            })),
3789        }));
3790
3791        let physical = planner.plan(&logical).unwrap();
3792        assert_eq!(physical.columns(), &["n"]);
3793    }
3794
3795    #[test]
3796    fn test_plan_sort() {
3797        let store = create_test_store();
3798        let planner = Planner::new(store);
3799
3800        // MATCH (n) RETURN n ORDER BY n.name ASC
3801        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3802            items: vec![ReturnItem {
3803                expression: LogicalExpression::Variable("n".to_string()),
3804                alias: None,
3805            }],
3806            distinct: false,
3807            input: Box::new(LogicalOperator::Sort(SortOp {
3808                keys: vec![SortKey {
3809                    expression: LogicalExpression::Variable("n".to_string()),
3810                    order: SortOrder::Ascending,
3811                }],
3812                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3813                    variable: "n".to_string(),
3814                    label: None,
3815                    input: None,
3816                })),
3817            })),
3818        }));
3819
3820        let physical = planner.plan(&logical).unwrap();
3821        assert_eq!(physical.columns(), &["n"]);
3822    }
3823
3824    #[test]
3825    fn test_plan_sort_descending() {
3826        let store = create_test_store();
3827        let planner = Planner::new(store);
3828
3829        // ORDER BY n DESC
3830        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3831            items: vec![ReturnItem {
3832                expression: LogicalExpression::Variable("n".to_string()),
3833                alias: None,
3834            }],
3835            distinct: false,
3836            input: Box::new(LogicalOperator::Sort(SortOp {
3837                keys: vec![SortKey {
3838                    expression: LogicalExpression::Variable("n".to_string()),
3839                    order: SortOrder::Descending,
3840                }],
3841                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3842                    variable: "n".to_string(),
3843                    label: None,
3844                    input: None,
3845                })),
3846            })),
3847        }));
3848
3849        let physical = planner.plan(&logical).unwrap();
3850        assert_eq!(physical.columns(), &["n"]);
3851    }
3852
3853    #[test]
3854    fn test_plan_distinct() {
3855        let store = create_test_store();
3856        let planner = Planner::new(store);
3857
3858        // MATCH (n) RETURN DISTINCT n
3859        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3860            items: vec![ReturnItem {
3861                expression: LogicalExpression::Variable("n".to_string()),
3862                alias: None,
3863            }],
3864            distinct: false,
3865            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3866                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3867                    variable: "n".to_string(),
3868                    label: None,
3869                    input: None,
3870                })),
3871                columns: None,
3872            })),
3873        }));
3874
3875        let physical = planner.plan(&logical).unwrap();
3876        assert_eq!(physical.columns(), &["n"]);
3877    }
3878
3879    // ==================== Aggregate Tests ====================
3880
3881    #[test]
3882    fn test_plan_aggregate_count() {
3883        let store = create_test_store();
3884        let planner = Planner::new(store);
3885
3886        // MATCH (n) RETURN count(n)
3887        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3888            items: vec![ReturnItem {
3889                expression: LogicalExpression::Variable("cnt".to_string()),
3890                alias: None,
3891            }],
3892            distinct: false,
3893            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3894                group_by: vec![],
3895                aggregates: vec![LogicalAggregateExpr {
3896                    function: LogicalAggregateFunction::Count,
3897                    expression: Some(LogicalExpression::Variable("n".to_string())),
3898                    distinct: false,
3899                    alias: Some("cnt".to_string()),
3900                    percentile: None,
3901                }],
3902                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3903                    variable: "n".to_string(),
3904                    label: None,
3905                    input: None,
3906                })),
3907                having: None,
3908            })),
3909        }));
3910
3911        let physical = planner.plan(&logical).unwrap();
3912        assert!(physical.columns().contains(&"cnt".to_string()));
3913    }
3914
3915    #[test]
3916    fn test_plan_aggregate_with_group_by() {
3917        let store = create_test_store();
3918        let planner = Planner::new(store);
3919
3920        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
3921        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3922            group_by: vec![LogicalExpression::Property {
3923                variable: "n".to_string(),
3924                property: "city".to_string(),
3925            }],
3926            aggregates: vec![LogicalAggregateExpr {
3927                function: LogicalAggregateFunction::Count,
3928                expression: Some(LogicalExpression::Variable("n".to_string())),
3929                distinct: false,
3930                alias: Some("cnt".to_string()),
3931                percentile: None,
3932            }],
3933            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3934                variable: "n".to_string(),
3935                label: Some("Person".to_string()),
3936                input: None,
3937            })),
3938            having: None,
3939        }));
3940
3941        let physical = planner.plan(&logical).unwrap();
3942        assert_eq!(physical.columns().len(), 2);
3943    }
3944
3945    #[test]
3946    fn test_plan_aggregate_sum() {
3947        let store = create_test_store();
3948        let planner = Planner::new(store);
3949
3950        // SUM(n.value)
3951        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3952            group_by: vec![],
3953            aggregates: vec![LogicalAggregateExpr {
3954                function: LogicalAggregateFunction::Sum,
3955                expression: Some(LogicalExpression::Property {
3956                    variable: "n".to_string(),
3957                    property: "value".to_string(),
3958                }),
3959                distinct: false,
3960                alias: Some("total".to_string()),
3961                percentile: None,
3962            }],
3963            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3964                variable: "n".to_string(),
3965                label: None,
3966                input: None,
3967            })),
3968            having: None,
3969        }));
3970
3971        let physical = planner.plan(&logical).unwrap();
3972        assert!(physical.columns().contains(&"total".to_string()));
3973    }
3974
3975    #[test]
3976    fn test_plan_aggregate_avg() {
3977        let store = create_test_store();
3978        let planner = Planner::new(store);
3979
3980        // AVG(n.score)
3981        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3982            group_by: vec![],
3983            aggregates: vec![LogicalAggregateExpr {
3984                function: LogicalAggregateFunction::Avg,
3985                expression: Some(LogicalExpression::Property {
3986                    variable: "n".to_string(),
3987                    property: "score".to_string(),
3988                }),
3989                distinct: false,
3990                alias: Some("average".to_string()),
3991                percentile: None,
3992            }],
3993            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3994                variable: "n".to_string(),
3995                label: None,
3996                input: None,
3997            })),
3998            having: None,
3999        }));
4000
4001        let physical = planner.plan(&logical).unwrap();
4002        assert!(physical.columns().contains(&"average".to_string()));
4003    }
4004
4005    #[test]
4006    fn test_plan_aggregate_min_max() {
4007        let store = create_test_store();
4008        let planner = Planner::new(store);
4009
4010        // MIN(n.age), MAX(n.age)
4011        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4012            group_by: vec![],
4013            aggregates: vec![
4014                LogicalAggregateExpr {
4015                    function: LogicalAggregateFunction::Min,
4016                    expression: Some(LogicalExpression::Property {
4017                        variable: "n".to_string(),
4018                        property: "age".to_string(),
4019                    }),
4020                    distinct: false,
4021                    alias: Some("youngest".to_string()),
4022                    percentile: None,
4023                },
4024                LogicalAggregateExpr {
4025                    function: LogicalAggregateFunction::Max,
4026                    expression: Some(LogicalExpression::Property {
4027                        variable: "n".to_string(),
4028                        property: "age".to_string(),
4029                    }),
4030                    distinct: false,
4031                    alias: Some("oldest".to_string()),
4032                    percentile: None,
4033                },
4034            ],
4035            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4036                variable: "n".to_string(),
4037                label: None,
4038                input: None,
4039            })),
4040            having: None,
4041        }));
4042
4043        let physical = planner.plan(&logical).unwrap();
4044        assert!(physical.columns().contains(&"youngest".to_string()));
4045        assert!(physical.columns().contains(&"oldest".to_string()));
4046    }
4047
4048    // ==================== Join Tests ====================
4049
4050    #[test]
4051    fn test_plan_inner_join() {
4052        let store = create_test_store();
4053        let planner = Planner::new(store);
4054
4055        // Inner join between two scans
4056        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4057            items: vec![
4058                ReturnItem {
4059                    expression: LogicalExpression::Variable("a".to_string()),
4060                    alias: None,
4061                },
4062                ReturnItem {
4063                    expression: LogicalExpression::Variable("b".to_string()),
4064                    alias: None,
4065                },
4066            ],
4067            distinct: false,
4068            input: Box::new(LogicalOperator::Join(JoinOp {
4069                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4070                    variable: "a".to_string(),
4071                    label: Some("Person".to_string()),
4072                    input: None,
4073                })),
4074                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4075                    variable: "b".to_string(),
4076                    label: Some("Company".to_string()),
4077                    input: None,
4078                })),
4079                join_type: JoinType::Inner,
4080                conditions: vec![JoinCondition {
4081                    left: LogicalExpression::Variable("a".to_string()),
4082                    right: LogicalExpression::Variable("b".to_string()),
4083                }],
4084            })),
4085        }));
4086
4087        let physical = planner.plan(&logical).unwrap();
4088        assert!(physical.columns().contains(&"a".to_string()));
4089        assert!(physical.columns().contains(&"b".to_string()));
4090    }
4091
4092    #[test]
4093    fn test_plan_cross_join() {
4094        let store = create_test_store();
4095        let planner = Planner::new(store);
4096
4097        // Cross join (no conditions)
4098        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4099            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4100                variable: "a".to_string(),
4101                label: None,
4102                input: None,
4103            })),
4104            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4105                variable: "b".to_string(),
4106                label: None,
4107                input: None,
4108            })),
4109            join_type: JoinType::Cross,
4110            conditions: vec![],
4111        }));
4112
4113        let physical = planner.plan(&logical).unwrap();
4114        assert_eq!(physical.columns().len(), 2);
4115    }
4116
4117    #[test]
4118    fn test_plan_left_join() {
4119        let store = create_test_store();
4120        let planner = Planner::new(store);
4121
4122        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4123            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4124                variable: "a".to_string(),
4125                label: None,
4126                input: None,
4127            })),
4128            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4129                variable: "b".to_string(),
4130                label: None,
4131                input: None,
4132            })),
4133            join_type: JoinType::Left,
4134            conditions: vec![],
4135        }));
4136
4137        let physical = planner.plan(&logical).unwrap();
4138        assert_eq!(physical.columns().len(), 2);
4139    }
4140
4141    // ==================== Mutation Tests ====================
4142
4143    #[test]
4144    fn test_plan_create_node() {
4145        let store = create_test_store();
4146        let planner = Planner::new(store);
4147
4148        // CREATE (n:Person {name: 'Alice'})
4149        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4150            variable: "n".to_string(),
4151            labels: vec!["Person".to_string()],
4152            properties: vec![(
4153                "name".to_string(),
4154                LogicalExpression::Literal(Value::String("Alice".into())),
4155            )],
4156            input: None,
4157        }));
4158
4159        let physical = planner.plan(&logical).unwrap();
4160        assert!(physical.columns().contains(&"n".to_string()));
4161    }
4162
4163    #[test]
4164    fn test_plan_create_edge() {
4165        let store = create_test_store();
4166        let planner = Planner::new(store);
4167
4168        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
4169        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4170            variable: Some("r".to_string()),
4171            from_variable: "a".to_string(),
4172            to_variable: "b".to_string(),
4173            edge_type: "KNOWS".to_string(),
4174            properties: vec![],
4175            input: Box::new(LogicalOperator::Join(JoinOp {
4176                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4177                    variable: "a".to_string(),
4178                    label: None,
4179                    input: None,
4180                })),
4181                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4182                    variable: "b".to_string(),
4183                    label: None,
4184                    input: None,
4185                })),
4186                join_type: JoinType::Cross,
4187                conditions: vec![],
4188            })),
4189        }));
4190
4191        let physical = planner.plan(&logical).unwrap();
4192        assert!(physical.columns().contains(&"r".to_string()));
4193    }
4194
4195    #[test]
4196    fn test_plan_delete_node() {
4197        let store = create_test_store();
4198        let planner = Planner::new(store);
4199
4200        // MATCH (n) DELETE n
4201        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4202            variable: "n".to_string(),
4203            detach: false,
4204            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4205                variable: "n".to_string(),
4206                label: None,
4207                input: None,
4208            })),
4209        }));
4210
4211        let physical = planner.plan(&logical).unwrap();
4212        assert!(physical.columns().contains(&"deleted_count".to_string()));
4213    }
4214
4215    // ==================== Error Cases ====================
4216
4217    #[test]
4218    fn test_plan_empty_errors() {
4219        let store = create_test_store();
4220        let planner = Planner::new(store);
4221
4222        let logical = LogicalPlan::new(LogicalOperator::Empty);
4223        let result = planner.plan(&logical);
4224        assert!(result.is_err());
4225    }
4226
4227    #[test]
4228    fn test_plan_missing_variable_in_return() {
4229        let store = create_test_store();
4230        let planner = Planner::new(store);
4231
4232        // Return variable that doesn't exist in input
4233        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4234            items: vec![ReturnItem {
4235                expression: LogicalExpression::Variable("missing".to_string()),
4236                alias: None,
4237            }],
4238            distinct: false,
4239            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4240                variable: "n".to_string(),
4241                label: None,
4242                input: None,
4243            })),
4244        }));
4245
4246        let result = planner.plan(&logical);
4247        assert!(result.is_err());
4248    }
4249
4250    // ==================== Helper Function Tests ====================
4251
4252    #[test]
4253    fn test_convert_binary_ops() {
4254        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4255        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4256        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4257        assert!(convert_binary_op(BinaryOp::Le).is_ok());
4258        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4259        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4260        assert!(convert_binary_op(BinaryOp::And).is_ok());
4261        assert!(convert_binary_op(BinaryOp::Or).is_ok());
4262        assert!(convert_binary_op(BinaryOp::Add).is_ok());
4263        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4264        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4265        assert!(convert_binary_op(BinaryOp::Div).is_ok());
4266    }
4267
4268    #[test]
4269    fn test_convert_unary_ops() {
4270        assert!(convert_unary_op(UnaryOp::Not).is_ok());
4271        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4272        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4273        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4274    }
4275
4276    #[test]
4277    fn test_convert_aggregate_functions() {
4278        assert!(matches!(
4279            convert_aggregate_function(LogicalAggregateFunction::Count),
4280            PhysicalAggregateFunction::Count
4281        ));
4282        assert!(matches!(
4283            convert_aggregate_function(LogicalAggregateFunction::Sum),
4284            PhysicalAggregateFunction::Sum
4285        ));
4286        assert!(matches!(
4287            convert_aggregate_function(LogicalAggregateFunction::Avg),
4288            PhysicalAggregateFunction::Avg
4289        ));
4290        assert!(matches!(
4291            convert_aggregate_function(LogicalAggregateFunction::Min),
4292            PhysicalAggregateFunction::Min
4293        ));
4294        assert!(matches!(
4295            convert_aggregate_function(LogicalAggregateFunction::Max),
4296            PhysicalAggregateFunction::Max
4297        ));
4298    }
4299
4300    #[test]
4301    fn test_planner_accessors() {
4302        let store = create_test_store();
4303        let planner = Planner::new(Arc::clone(&store));
4304
4305        assert!(planner.tx_id().is_none());
4306        assert!(planner.tx_manager().is_none());
4307        let _ = planner.viewing_epoch(); // Just ensure it's accessible
4308    }
4309
4310    #[test]
4311    fn test_physical_plan_accessors() {
4312        let store = create_test_store();
4313        let planner = Planner::new(store);
4314
4315        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4316            variable: "n".to_string(),
4317            label: None,
4318            input: None,
4319        }));
4320
4321        let physical = planner.plan(&logical).unwrap();
4322        assert_eq!(physical.columns(), &["n"]);
4323
4324        // Test into_operator
4325        let _ = physical.into_operator();
4326    }
4327
4328    // ==================== Adaptive Planning Tests ====================
4329
4330    #[test]
4331    fn test_plan_adaptive_with_scan() {
4332        let store = create_test_store();
4333        let planner = Planner::new(store);
4334
4335        // MATCH (n:Person) RETURN n
4336        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4337            items: vec![ReturnItem {
4338                expression: LogicalExpression::Variable("n".to_string()),
4339                alias: None,
4340            }],
4341            distinct: false,
4342            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4343                variable: "n".to_string(),
4344                label: Some("Person".to_string()),
4345                input: None,
4346            })),
4347        }));
4348
4349        let physical = planner.plan_adaptive(&logical).unwrap();
4350        assert_eq!(physical.columns(), &["n"]);
4351        // Should have adaptive context with estimates
4352        assert!(physical.adaptive_context.is_some());
4353    }
4354
4355    #[test]
4356    fn test_plan_adaptive_with_filter() {
4357        let store = create_test_store();
4358        let planner = Planner::new(store);
4359
4360        // MATCH (n) WHERE n.age > 30 RETURN n
4361        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4362            items: vec![ReturnItem {
4363                expression: LogicalExpression::Variable("n".to_string()),
4364                alias: None,
4365            }],
4366            distinct: false,
4367            input: Box::new(LogicalOperator::Filter(FilterOp {
4368                predicate: LogicalExpression::Binary {
4369                    left: Box::new(LogicalExpression::Property {
4370                        variable: "n".to_string(),
4371                        property: "age".to_string(),
4372                    }),
4373                    op: BinaryOp::Gt,
4374                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4375                },
4376                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4377                    variable: "n".to_string(),
4378                    label: None,
4379                    input: None,
4380                })),
4381            })),
4382        }));
4383
4384        let physical = planner.plan_adaptive(&logical).unwrap();
4385        assert!(physical.adaptive_context.is_some());
4386    }
4387
4388    #[test]
4389    fn test_plan_adaptive_with_expand() {
4390        let store = create_test_store();
4391        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4392
4393        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
4394        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4395            items: vec![
4396                ReturnItem {
4397                    expression: LogicalExpression::Variable("a".to_string()),
4398                    alias: None,
4399                },
4400                ReturnItem {
4401                    expression: LogicalExpression::Variable("b".to_string()),
4402                    alias: None,
4403                },
4404            ],
4405            distinct: false,
4406            input: Box::new(LogicalOperator::Expand(ExpandOp {
4407                from_variable: "a".to_string(),
4408                to_variable: "b".to_string(),
4409                edge_variable: None,
4410                direction: ExpandDirection::Outgoing,
4411                edge_type: Some("KNOWS".to_string()),
4412                min_hops: 1,
4413                max_hops: Some(1),
4414                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4415                    variable: "a".to_string(),
4416                    label: None,
4417                    input: None,
4418                })),
4419                path_alias: None,
4420            })),
4421        }));
4422
4423        let physical = planner.plan_adaptive(&logical).unwrap();
4424        assert!(physical.adaptive_context.is_some());
4425    }
4426
4427    #[test]
4428    fn test_plan_adaptive_with_join() {
4429        let store = create_test_store();
4430        let planner = Planner::new(store);
4431
4432        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4433            items: vec![
4434                ReturnItem {
4435                    expression: LogicalExpression::Variable("a".to_string()),
4436                    alias: None,
4437                },
4438                ReturnItem {
4439                    expression: LogicalExpression::Variable("b".to_string()),
4440                    alias: None,
4441                },
4442            ],
4443            distinct: false,
4444            input: Box::new(LogicalOperator::Join(JoinOp {
4445                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4446                    variable: "a".to_string(),
4447                    label: None,
4448                    input: None,
4449                })),
4450                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4451                    variable: "b".to_string(),
4452                    label: None,
4453                    input: None,
4454                })),
4455                join_type: JoinType::Cross,
4456                conditions: vec![],
4457            })),
4458        }));
4459
4460        let physical = planner.plan_adaptive(&logical).unwrap();
4461        assert!(physical.adaptive_context.is_some());
4462    }
4463
4464    #[test]
4465    fn test_plan_adaptive_with_aggregate() {
4466        let store = create_test_store();
4467        let planner = Planner::new(store);
4468
4469        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4470            group_by: vec![],
4471            aggregates: vec![LogicalAggregateExpr {
4472                function: LogicalAggregateFunction::Count,
4473                expression: Some(LogicalExpression::Variable("n".to_string())),
4474                distinct: false,
4475                alias: Some("cnt".to_string()),
4476                percentile: None,
4477            }],
4478            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4479                variable: "n".to_string(),
4480                label: None,
4481                input: None,
4482            })),
4483            having: None,
4484        }));
4485
4486        let physical = planner.plan_adaptive(&logical).unwrap();
4487        assert!(physical.adaptive_context.is_some());
4488    }
4489
4490    #[test]
4491    fn test_plan_adaptive_with_distinct() {
4492        let store = create_test_store();
4493        let planner = Planner::new(store);
4494
4495        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4496            items: vec![ReturnItem {
4497                expression: LogicalExpression::Variable("n".to_string()),
4498                alias: None,
4499            }],
4500            distinct: false,
4501            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4502                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4503                    variable: "n".to_string(),
4504                    label: None,
4505                    input: None,
4506                })),
4507                columns: None,
4508            })),
4509        }));
4510
4511        let physical = planner.plan_adaptive(&logical).unwrap();
4512        assert!(physical.adaptive_context.is_some());
4513    }
4514
4515    #[test]
4516    fn test_plan_adaptive_with_limit() {
4517        let store = create_test_store();
4518        let planner = Planner::new(store);
4519
4520        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4521            items: vec![ReturnItem {
4522                expression: LogicalExpression::Variable("n".to_string()),
4523                alias: None,
4524            }],
4525            distinct: false,
4526            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4527                count: 10,
4528                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4529                    variable: "n".to_string(),
4530                    label: None,
4531                    input: None,
4532                })),
4533            })),
4534        }));
4535
4536        let physical = planner.plan_adaptive(&logical).unwrap();
4537        assert!(physical.adaptive_context.is_some());
4538    }
4539
4540    #[test]
4541    fn test_plan_adaptive_with_skip() {
4542        let store = create_test_store();
4543        let planner = Planner::new(store);
4544
4545        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4546            items: vec![ReturnItem {
4547                expression: LogicalExpression::Variable("n".to_string()),
4548                alias: None,
4549            }],
4550            distinct: false,
4551            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4552                count: 5,
4553                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4554                    variable: "n".to_string(),
4555                    label: None,
4556                    input: None,
4557                })),
4558            })),
4559        }));
4560
4561        let physical = planner.plan_adaptive(&logical).unwrap();
4562        assert!(physical.adaptive_context.is_some());
4563    }
4564
4565    #[test]
4566    fn test_plan_adaptive_with_sort() {
4567        let store = create_test_store();
4568        let planner = Planner::new(store);
4569
4570        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4571            items: vec![ReturnItem {
4572                expression: LogicalExpression::Variable("n".to_string()),
4573                alias: None,
4574            }],
4575            distinct: false,
4576            input: Box::new(LogicalOperator::Sort(SortOp {
4577                keys: vec![SortKey {
4578                    expression: LogicalExpression::Variable("n".to_string()),
4579                    order: SortOrder::Ascending,
4580                }],
4581                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4582                    variable: "n".to_string(),
4583                    label: None,
4584                    input: None,
4585                })),
4586            })),
4587        }));
4588
4589        let physical = planner.plan_adaptive(&logical).unwrap();
4590        assert!(physical.adaptive_context.is_some());
4591    }
4592
4593    #[test]
4594    fn test_plan_adaptive_with_union() {
4595        let store = create_test_store();
4596        let planner = Planner::new(store);
4597
4598        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4599            items: vec![ReturnItem {
4600                expression: LogicalExpression::Variable("n".to_string()),
4601                alias: None,
4602            }],
4603            distinct: false,
4604            input: Box::new(LogicalOperator::Union(UnionOp {
4605                inputs: vec![
4606                    LogicalOperator::NodeScan(NodeScanOp {
4607                        variable: "n".to_string(),
4608                        label: Some("Person".to_string()),
4609                        input: None,
4610                    }),
4611                    LogicalOperator::NodeScan(NodeScanOp {
4612                        variable: "n".to_string(),
4613                        label: Some("Company".to_string()),
4614                        input: None,
4615                    }),
4616                ],
4617            })),
4618        }));
4619
4620        let physical = planner.plan_adaptive(&logical).unwrap();
4621        assert!(physical.adaptive_context.is_some());
4622    }
4623
4624    // ==================== Variable Length Path Tests ====================
4625
4626    #[test]
4627    fn test_plan_expand_variable_length() {
4628        let store = create_test_store();
4629        let planner = Planner::new(store);
4630
4631        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
4632        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4633            items: vec![
4634                ReturnItem {
4635                    expression: LogicalExpression::Variable("a".to_string()),
4636                    alias: None,
4637                },
4638                ReturnItem {
4639                    expression: LogicalExpression::Variable("b".to_string()),
4640                    alias: None,
4641                },
4642            ],
4643            distinct: false,
4644            input: Box::new(LogicalOperator::Expand(ExpandOp {
4645                from_variable: "a".to_string(),
4646                to_variable: "b".to_string(),
4647                edge_variable: None,
4648                direction: ExpandDirection::Outgoing,
4649                edge_type: Some("KNOWS".to_string()),
4650                min_hops: 1,
4651                max_hops: Some(3),
4652                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4653                    variable: "a".to_string(),
4654                    label: None,
4655                    input: None,
4656                })),
4657                path_alias: None,
4658            })),
4659        }));
4660
4661        let physical = planner.plan(&logical).unwrap();
4662        assert!(physical.columns().contains(&"a".to_string()));
4663        assert!(physical.columns().contains(&"b".to_string()));
4664    }
4665
4666    #[test]
4667    fn test_plan_expand_with_path_alias() {
4668        let store = create_test_store();
4669        let planner = Planner::new(store);
4670
4671        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
4672        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4673            items: vec![
4674                ReturnItem {
4675                    expression: LogicalExpression::Variable("a".to_string()),
4676                    alias: None,
4677                },
4678                ReturnItem {
4679                    expression: LogicalExpression::Variable("b".to_string()),
4680                    alias: None,
4681                },
4682            ],
4683            distinct: false,
4684            input: Box::new(LogicalOperator::Expand(ExpandOp {
4685                from_variable: "a".to_string(),
4686                to_variable: "b".to_string(),
4687                edge_variable: None,
4688                direction: ExpandDirection::Outgoing,
4689                edge_type: Some("KNOWS".to_string()),
4690                min_hops: 1,
4691                max_hops: Some(3),
4692                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4693                    variable: "a".to_string(),
4694                    label: None,
4695                    input: None,
4696                })),
4697                path_alias: Some("p".to_string()),
4698            })),
4699        }));
4700
4701        let physical = planner.plan(&logical).unwrap();
4702        // Verify plan was created successfully with expected output columns
4703        assert!(physical.columns().contains(&"a".to_string()));
4704        assert!(physical.columns().contains(&"b".to_string()));
4705    }
4706
4707    #[test]
4708    fn test_plan_expand_incoming() {
4709        let store = create_test_store();
4710        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4711
4712        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
4713        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4714            items: vec![
4715                ReturnItem {
4716                    expression: LogicalExpression::Variable("a".to_string()),
4717                    alias: None,
4718                },
4719                ReturnItem {
4720                    expression: LogicalExpression::Variable("b".to_string()),
4721                    alias: None,
4722                },
4723            ],
4724            distinct: false,
4725            input: Box::new(LogicalOperator::Expand(ExpandOp {
4726                from_variable: "a".to_string(),
4727                to_variable: "b".to_string(),
4728                edge_variable: None,
4729                direction: ExpandDirection::Incoming,
4730                edge_type: Some("KNOWS".to_string()),
4731                min_hops: 1,
4732                max_hops: Some(1),
4733                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4734                    variable: "a".to_string(),
4735                    label: None,
4736                    input: None,
4737                })),
4738                path_alias: None,
4739            })),
4740        }));
4741
4742        let physical = planner.plan(&logical).unwrap();
4743        assert!(physical.columns().contains(&"a".to_string()));
4744        assert!(physical.columns().contains(&"b".to_string()));
4745    }
4746
4747    #[test]
4748    fn test_plan_expand_both_directions() {
4749        let store = create_test_store();
4750        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4751
4752        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
4753        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4754            items: vec![
4755                ReturnItem {
4756                    expression: LogicalExpression::Variable("a".to_string()),
4757                    alias: None,
4758                },
4759                ReturnItem {
4760                    expression: LogicalExpression::Variable("b".to_string()),
4761                    alias: None,
4762                },
4763            ],
4764            distinct: false,
4765            input: Box::new(LogicalOperator::Expand(ExpandOp {
4766                from_variable: "a".to_string(),
4767                to_variable: "b".to_string(),
4768                edge_variable: None,
4769                direction: ExpandDirection::Both,
4770                edge_type: Some("KNOWS".to_string()),
4771                min_hops: 1,
4772                max_hops: Some(1),
4773                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4774                    variable: "a".to_string(),
4775                    label: None,
4776                    input: None,
4777                })),
4778                path_alias: None,
4779            })),
4780        }));
4781
4782        let physical = planner.plan(&logical).unwrap();
4783        assert!(physical.columns().contains(&"a".to_string()));
4784        assert!(physical.columns().contains(&"b".to_string()));
4785    }
4786
4787    // ==================== With Context Tests ====================
4788
4789    #[test]
4790    fn test_planner_with_context() {
4791        use crate::transaction::TransactionManager;
4792
4793        let store = create_test_store();
4794        let tx_manager = Arc::new(TransactionManager::new());
4795        let tx_id = tx_manager.begin();
4796        let epoch = tx_manager.current_epoch();
4797
4798        let planner = Planner::with_context(
4799            Arc::clone(&store),
4800            Arc::clone(&tx_manager),
4801            Some(tx_id),
4802            epoch,
4803        );
4804
4805        assert_eq!(planner.tx_id(), Some(tx_id));
4806        assert!(planner.tx_manager().is_some());
4807        assert_eq!(planner.viewing_epoch(), epoch);
4808    }
4809
4810    #[test]
4811    fn test_planner_with_factorized_execution_disabled() {
4812        let store = create_test_store();
4813        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4814
4815        // Two consecutive expands - should NOT use factorized execution
4816        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4817            items: vec![
4818                ReturnItem {
4819                    expression: LogicalExpression::Variable("a".to_string()),
4820                    alias: None,
4821                },
4822                ReturnItem {
4823                    expression: LogicalExpression::Variable("c".to_string()),
4824                    alias: None,
4825                },
4826            ],
4827            distinct: false,
4828            input: Box::new(LogicalOperator::Expand(ExpandOp {
4829                from_variable: "b".to_string(),
4830                to_variable: "c".to_string(),
4831                edge_variable: None,
4832                direction: ExpandDirection::Outgoing,
4833                edge_type: None,
4834                min_hops: 1,
4835                max_hops: Some(1),
4836                input: Box::new(LogicalOperator::Expand(ExpandOp {
4837                    from_variable: "a".to_string(),
4838                    to_variable: "b".to_string(),
4839                    edge_variable: None,
4840                    direction: ExpandDirection::Outgoing,
4841                    edge_type: None,
4842                    min_hops: 1,
4843                    max_hops: Some(1),
4844                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4845                        variable: "a".to_string(),
4846                        label: None,
4847                        input: None,
4848                    })),
4849                    path_alias: None,
4850                })),
4851                path_alias: None,
4852            })),
4853        }));
4854
4855        let physical = planner.plan(&logical).unwrap();
4856        assert!(physical.columns().contains(&"a".to_string()));
4857        assert!(physical.columns().contains(&"c".to_string()));
4858    }
4859
4860    // ==================== Sort with Property Tests ====================
4861
4862    #[test]
4863    fn test_plan_sort_by_property() {
4864        let store = create_test_store();
4865        let planner = Planner::new(store);
4866
4867        // MATCH (n) RETURN n ORDER BY n.name ASC
4868        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4869            items: vec![ReturnItem {
4870                expression: LogicalExpression::Variable("n".to_string()),
4871                alias: None,
4872            }],
4873            distinct: false,
4874            input: Box::new(LogicalOperator::Sort(SortOp {
4875                keys: vec![SortKey {
4876                    expression: LogicalExpression::Property {
4877                        variable: "n".to_string(),
4878                        property: "name".to_string(),
4879                    },
4880                    order: SortOrder::Ascending,
4881                }],
4882                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4883                    variable: "n".to_string(),
4884                    label: None,
4885                    input: None,
4886                })),
4887            })),
4888        }));
4889
4890        let physical = planner.plan(&logical).unwrap();
4891        // Should have the property column projected
4892        assert!(physical.columns().contains(&"n".to_string()));
4893    }
4894
4895    // ==================== Scan with Input Tests ====================
4896
4897    #[test]
4898    fn test_plan_scan_with_input() {
4899        let store = create_test_store();
4900        let planner = Planner::new(store);
4901
4902        // A scan with another scan as input (for chained patterns)
4903        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4904            items: vec![
4905                ReturnItem {
4906                    expression: LogicalExpression::Variable("a".to_string()),
4907                    alias: None,
4908                },
4909                ReturnItem {
4910                    expression: LogicalExpression::Variable("b".to_string()),
4911                    alias: None,
4912                },
4913            ],
4914            distinct: false,
4915            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4916                variable: "b".to_string(),
4917                label: Some("Company".to_string()),
4918                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4919                    variable: "a".to_string(),
4920                    label: Some("Person".to_string()),
4921                    input: None,
4922                }))),
4923            })),
4924        }));
4925
4926        let physical = planner.plan(&logical).unwrap();
4927        assert!(physical.columns().contains(&"a".to_string()));
4928        assert!(physical.columns().contains(&"b".to_string()));
4929    }
4930}