Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29    UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37/// Range bounds for property-based range queries.
38struct RangeBounds<'a> {
39    min: Option<&'a Value>,
40    max: Option<&'a Value>,
41    min_inclusive: bool,
42    max_inclusive: bool,
43}
44
45/// Converts a logical plan to a physical operator tree.
46pub struct Planner {
47    /// The graph store to scan from.
48    store: Arc<LpgStore>,
49    /// Transaction manager for MVCC operations.
50    tx_manager: Option<Arc<TransactionManager>>,
51    /// Current transaction ID (if in a transaction).
52    tx_id: Option<TxId>,
53    /// Epoch to use for visibility checks.
54    viewing_epoch: EpochId,
55    /// Counter for generating unique anonymous edge column names.
56    anon_edge_counter: std::cell::Cell<u32>,
57    /// Whether to use factorized execution for multi-hop queries.
58    factorized_execution: bool,
59}
60
61impl Planner {
62    /// Creates a new planner with the given store.
63    ///
64    /// This creates a planner without transaction context, using the current
65    /// epoch from the store for visibility.
66    #[must_use]
67    pub fn new(store: Arc<LpgStore>) -> Self {
68        let epoch = store.current_epoch();
69        Self {
70            store,
71            tx_manager: None,
72            tx_id: None,
73            viewing_epoch: epoch,
74            anon_edge_counter: std::cell::Cell::new(0),
75            factorized_execution: true,
76        }
77    }
78
79    /// Creates a new planner with transaction context for MVCC-aware planning.
80    ///
81    /// # Arguments
82    ///
83    /// * `store` - The graph store
84    /// * `tx_manager` - Transaction manager for recording reads/writes
85    /// * `tx_id` - Current transaction ID (None for auto-commit)
86    /// * `viewing_epoch` - Epoch to use for version visibility
87    #[must_use]
88    pub fn with_context(
89        store: Arc<LpgStore>,
90        tx_manager: Arc<TransactionManager>,
91        tx_id: Option<TxId>,
92        viewing_epoch: EpochId,
93    ) -> Self {
94        Self {
95            store,
96            tx_manager: Some(tx_manager),
97            tx_id,
98            viewing_epoch,
99            anon_edge_counter: std::cell::Cell::new(0),
100            factorized_execution: true,
101        }
102    }
103
104    /// Returns the viewing epoch for this planner.
105    #[must_use]
106    pub fn viewing_epoch(&self) -> EpochId {
107        self.viewing_epoch
108    }
109
110    /// Returns the transaction ID for this planner, if any.
111    #[must_use]
112    pub fn tx_id(&self) -> Option<TxId> {
113        self.tx_id
114    }
115
116    /// Returns a reference to the transaction manager, if available.
117    #[must_use]
118    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119        self.tx_manager.as_ref()
120    }
121
122    /// Enables or disables factorized execution for multi-hop queries.
123    #[must_use]
124    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125        self.factorized_execution = enabled;
126        self
127    }
128
129    /// Counts consecutive single-hop expand operations.
130    ///
131    /// Returns the count and the deepest non-expand operator (the base of the chain).
132    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133        match op {
134            LogicalOperator::Expand(expand) => {
135                // Only count single-hop expands (factorization doesn't apply to variable-length)
136                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138                if is_single_hop {
139                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
140                    (inner_count + 1, base)
141                } else {
142                    // Variable-length path breaks the chain
143                    (0, op)
144                }
145            }
146            _ => (0, op),
147        }
148    }
149
150    /// Collects expand operations from the outermost down to the base.
151    ///
152    /// Returns expands in order from innermost (base) to outermost.
153    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154        let mut chain = Vec::new();
155        let mut current = op;
156
157        while let LogicalOperator::Expand(expand) = current {
158            // Only include single-hop expands
159            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160            if !is_single_hop {
161                break;
162            }
163            chain.push(expand);
164            current = &expand.input;
165        }
166
167        // Reverse so we go from base to outer
168        chain.reverse();
169        chain
170    }
171
172    /// Plans a logical plan into a physical operator.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if planning fails.
177    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179        Ok(PhysicalPlan {
180            operator,
181            columns,
182            adaptive_context: None,
183        })
184    }
185
186    /// Plans a logical plan with adaptive execution support.
187    ///
188    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
189    /// joins) that can be monitored during execution to detect estimate errors.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if planning fails.
194    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197        // Build adaptive context with cardinality estimates
198        let mut adaptive_context = AdaptiveContext::new();
199        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201        Ok(PhysicalPlan {
202            operator,
203            columns,
204            adaptive_context: Some(adaptive_context),
205        })
206    }
207
208    /// Collects cardinality estimates from the logical plan into an adaptive context.
209    fn collect_cardinality_estimates(
210        &self,
211        op: &LogicalOperator,
212        ctx: &mut AdaptiveContext,
213        depth: usize,
214    ) {
215        match op {
216            LogicalOperator::NodeScan(scan) => {
217                // Estimate based on label statistics
218                let estimate = if let Some(label) = &scan.label {
219                    self.store.nodes_by_label(label).len() as f64
220                } else {
221                    self.store.node_count() as f64
222                };
223                let id = format!("scan_{}", scan.variable);
224                ctx.set_estimate(&id, estimate);
225
226                // Recurse into input if present
227                if let Some(input) = &scan.input {
228                    self.collect_cardinality_estimates(input, ctx, depth + 1);
229                }
230            }
231            LogicalOperator::Filter(filter) => {
232                // Default selectivity estimate for filters (30%)
233                let input_estimate = self.estimate_cardinality(&filter.input);
234                let estimate = input_estimate * 0.3;
235                let id = format!("filter_{depth}");
236                ctx.set_estimate(&id, estimate);
237
238                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239            }
240            LogicalOperator::Expand(expand) => {
241                // Estimate based on average degree from store statistics
242                let input_estimate = self.estimate_cardinality(&expand.input);
243                let stats = self.store.statistics();
244                let avg_degree = self.estimate_expand_degree(&stats, expand);
245                let estimate = input_estimate * avg_degree;
246                let id = format!("expand_{}", expand.to_variable);
247                ctx.set_estimate(&id, estimate);
248
249                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250            }
251            LogicalOperator::Join(join) => {
252                // Estimate join output (product with selectivity)
253                let left_est = self.estimate_cardinality(&join.left);
254                let right_est = self.estimate_cardinality(&join.right);
255                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
256                let id = format!("join_{depth}");
257                ctx.set_estimate(&id, estimate);
258
259                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261            }
262            LogicalOperator::Aggregate(agg) => {
263                // Aggregates typically reduce cardinality
264                let input_estimate = self.estimate_cardinality(&agg.input);
265                let estimate = if agg.group_by.is_empty() {
266                    1.0 // Scalar aggregate
267                } else {
268                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
269                };
270                let id = format!("aggregate_{depth}");
271                ctx.set_estimate(&id, estimate);
272
273                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274            }
275            LogicalOperator::Distinct(distinct) => {
276                let input_estimate = self.estimate_cardinality(&distinct.input);
277                let estimate = (input_estimate * 0.5).max(1.0);
278                let id = format!("distinct_{depth}");
279                ctx.set_estimate(&id, estimate);
280
281                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282            }
283            LogicalOperator::Return(ret) => {
284                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285            }
286            LogicalOperator::Limit(limit) => {
287                let input_estimate = self.estimate_cardinality(&limit.input);
288                let estimate = (input_estimate).min(limit.count as f64);
289                let id = format!("limit_{depth}");
290                ctx.set_estimate(&id, estimate);
291
292                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293            }
294            LogicalOperator::Skip(skip) => {
295                let input_estimate = self.estimate_cardinality(&skip.input);
296                let estimate = (input_estimate - skip.count as f64).max(0.0);
297                let id = format!("skip_{depth}");
298                ctx.set_estimate(&id, estimate);
299
300                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301            }
302            LogicalOperator::Sort(sort) => {
303                // Sort doesn't change cardinality
304                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305            }
306            LogicalOperator::Union(union) => {
307                let estimate: f64 = union
308                    .inputs
309                    .iter()
310                    .map(|input| self.estimate_cardinality(input))
311                    .sum();
312                let id = format!("union_{depth}");
313                ctx.set_estimate(&id, estimate);
314
315                for input in &union.inputs {
316                    self.collect_cardinality_estimates(input, ctx, depth + 1);
317                }
318            }
319            _ => {
320                // For other operators, try to recurse into known input patterns
321            }
322        }
323    }
324
325    /// Estimates cardinality for a logical operator subtree.
326    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327        match op {
328            LogicalOperator::NodeScan(scan) => {
329                if let Some(label) = &scan.label {
330                    self.store.nodes_by_label(label).len() as f64
331                } else {
332                    self.store.node_count() as f64
333                }
334            }
335            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336            LogicalOperator::Expand(expand) => {
337                let stats = self.store.statistics();
338                let avg_degree = self.estimate_expand_degree(&stats, expand);
339                self.estimate_cardinality(&expand.input) * avg_degree
340            }
341            LogicalOperator::Join(join) => {
342                let left = self.estimate_cardinality(&join.left);
343                let right = self.estimate_cardinality(&join.right);
344                (left * right).sqrt()
345            }
346            LogicalOperator::Aggregate(agg) => {
347                if agg.group_by.is_empty() {
348                    1.0
349                } else {
350                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351                }
352            }
353            LogicalOperator::Distinct(distinct) => {
354                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355            }
356            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357            LogicalOperator::Limit(limit) => self
358                .estimate_cardinality(&limit.input)
359                .min(limit.count as f64),
360            LogicalOperator::Skip(skip) => {
361                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362            }
363            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364            LogicalOperator::Union(union) => union
365                .inputs
366                .iter()
367                .map(|input| self.estimate_cardinality(input))
368                .sum(),
369            _ => 1000.0, // Default estimate for unknown operators
370        }
371    }
372
373    /// Estimates the average edge degree for an expand operation using store statistics.
374    fn estimate_expand_degree(
375        &self,
376        stats: &grafeo_core::statistics::Statistics,
377        expand: &ExpandOp,
378    ) -> f64 {
379        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380        if let Some(edge_type) = &expand.edge_type {
381            stats.estimate_avg_degree(edge_type, outgoing)
382        } else if stats.total_nodes > 0 {
383            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384        } else {
385            10.0 // fallback for empty graph
386        }
387    }
388
389    /// Plans a single logical operator.
390    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391        match op {
392            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393            LogicalOperator::Expand(expand) => {
394                // Check for expand chains when factorized execution is enabled
395                if self.factorized_execution {
396                    let (chain_len, _base) = Self::count_expand_chain(op);
397                    if chain_len >= 2 {
398                        // Use factorized chain for 2+ consecutive single-hop expands
399                        return self.plan_expand_chain(op);
400                    }
401                }
402                self.plan_expand(expand)
403            }
404            LogicalOperator::Return(ret) => self.plan_return(ret),
405            LogicalOperator::Filter(filter) => self.plan_filter(filter),
406            LogicalOperator::Project(project) => self.plan_project(project),
407            LogicalOperator::Limit(limit) => self.plan_limit(limit),
408            LogicalOperator::Skip(skip) => self.plan_skip(skip),
409            LogicalOperator::Sort(sort) => self.plan_sort(sort),
410            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411            LogicalOperator::Join(join) => self.plan_join(join),
412            LogicalOperator::Union(union) => self.plan_union(union),
413            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421            LogicalOperator::Merge(merge) => self.plan_merge(merge),
422            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
427            LogicalOperator::VectorScan(_) => Err(Error::Internal(
428                "VectorScan requires vector-index feature".to_string(),
429            )),
430            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
431                "VectorJoin requires vector-index feature".to_string(),
432            )),
433            _ => Err(Error::Internal(format!(
434                "Unsupported operator: {:?}",
435                std::mem::discriminant(op)
436            ))),
437        }
438    }
439
440    /// Plans a node scan operator.
441    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
442        let scan_op = if let Some(label) = &scan.label {
443            ScanOperator::with_label(Arc::clone(&self.store), label)
444        } else {
445            ScanOperator::new(Arc::clone(&self.store))
446        };
447
448        // Apply MVCC context if available
449        let scan_operator: Box<dyn Operator> =
450            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
451
452        // If there's an input, chain operators with a nested loop join (cross join)
453        if let Some(input) = &scan.input {
454            let (input_op, mut input_columns) = self.plan_operator(input)?;
455
456            // Build output schema: input columns + scan column
457            let mut output_schema: Vec<LogicalType> =
458                input_columns.iter().map(|_| LogicalType::Any).collect();
459            output_schema.push(LogicalType::Node);
460
461            // Add scan column to input columns
462            input_columns.push(scan.variable.clone());
463
464            // Use nested loop join to combine input rows with scanned nodes
465            let join_op = Box::new(NestedLoopJoinOperator::new(
466                input_op,
467                scan_operator,
468                None, // No join condition (cross join)
469                PhysicalJoinType::Cross,
470                output_schema,
471            ));
472
473            Ok((join_op, input_columns))
474        } else {
475            let columns = vec![scan.variable.clone()];
476            Ok((scan_operator, columns))
477        }
478    }
479
480    /// Plans an expand operator.
481    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
482        // Plan the input operator first
483        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
484
485        // Find the source column index
486        let source_column = input_columns
487            .iter()
488            .position(|c| c == &expand.from_variable)
489            .ok_or_else(|| {
490                Error::Internal(format!(
491                    "Source variable '{}' not found in input columns",
492                    expand.from_variable
493                ))
494            })?;
495
496        // Convert expand direction
497        let direction = match expand.direction {
498            ExpandDirection::Outgoing => Direction::Outgoing,
499            ExpandDirection::Incoming => Direction::Incoming,
500            ExpandDirection::Both => Direction::Both,
501        };
502
503        // Check if this is a variable-length path
504        let is_variable_length =
505            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
506
507        let operator: Box<dyn Operator> = if is_variable_length {
508            // Use VariableLengthExpandOperator for multi-hop paths
509            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
510            let mut expand_op = VariableLengthExpandOperator::new(
511                Arc::clone(&self.store),
512                input_op,
513                source_column,
514                direction,
515                expand.edge_type.clone(),
516                expand.min_hops,
517                max_hops,
518            )
519            .with_tx_context(self.viewing_epoch, self.tx_id);
520
521            // If a path alias is set, enable path length and detail output
522            if expand.path_alias.is_some() {
523                expand_op = expand_op
524                    .with_path_length_output()
525                    .with_path_detail_output();
526            }
527
528            Box::new(expand_op)
529        } else {
530            // Use simple ExpandOperator for single-hop paths
531            let expand_op = ExpandOperator::new(
532                Arc::clone(&self.store),
533                input_op,
534                source_column,
535                direction,
536                expand.edge_type.clone(),
537            )
538            .with_tx_context(self.viewing_epoch, self.tx_id);
539            Box::new(expand_op)
540        };
541
542        // Build output columns: [input_columns..., edge, target, (path_length)?]
543        // Preserve all input columns and add edge + target to match ExpandOperator output
544        let mut columns = input_columns;
545
546        // Generate edge column name - use provided name or generate anonymous name
547        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
548            let count = self.anon_edge_counter.get();
549            self.anon_edge_counter.set(count + 1);
550            format!("_anon_edge_{}", count)
551        });
552        columns.push(edge_col_name);
553
554        columns.push(expand.to_variable.clone());
555
556        // If a path alias is set, add columns for path length, nodes, and edges
557        if let Some(ref path_alias) = expand.path_alias {
558            columns.push(format!("_path_length_{}", path_alias));
559            columns.push(format!("_path_nodes_{}", path_alias));
560            columns.push(format!("_path_edges_{}", path_alias));
561        }
562
563        Ok((operator, columns))
564    }
565
566    /// Plans a chain of consecutive expand operations using factorized execution.
567    ///
568    /// This avoids the Cartesian product explosion that occurs with separate expands.
569    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
570    ///
571    /// The chain is executed lazily at query time, not during planning. This ensures
572    /// that any filters applied above the expand chain are properly respected.
573    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
574        let expands = Self::collect_expand_chain(op);
575        if expands.is_empty() {
576            return Err(Error::Internal("Empty expand chain".to_string()));
577        }
578
579        // Get the base operator (before first expand)
580        let first_expand = expands[0];
581        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
582
583        let mut columns = base_columns.clone();
584        let mut steps = Vec::new();
585
586        // Track the level-local source column for each expand
587        // For the first expand, it's the column in the input (base_columns)
588        // For subsequent expands, the target from the previous level is always at index 1
589        // (each level adds [edge, target], so target is at index 1)
590        let mut is_first = true;
591
592        for expand in &expands {
593            // Find source column for this expand
594            let source_column = if is_first {
595                // For first expand, find in base columns
596                base_columns
597                    .iter()
598                    .position(|c| c == &expand.from_variable)
599                    .ok_or_else(|| {
600                        Error::Internal(format!(
601                            "Source variable '{}' not found in base columns",
602                            expand.from_variable
603                        ))
604                    })?
605            } else {
606                // For subsequent expands, the target from the previous level is at index 1
607                // (each level adds [edge, target], so target is the second column)
608                1
609            };
610
611            // Convert direction
612            let direction = match expand.direction {
613                ExpandDirection::Outgoing => Direction::Outgoing,
614                ExpandDirection::Incoming => Direction::Incoming,
615                ExpandDirection::Both => Direction::Both,
616            };
617
618            // Add expand step configuration
619            steps.push(ExpandStep {
620                source_column,
621                direction,
622                edge_type: expand.edge_type.clone(),
623            });
624
625            // Add edge and target columns
626            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
627                let count = self.anon_edge_counter.get();
628                self.anon_edge_counter.set(count + 1);
629                format!("_anon_edge_{}", count)
630            });
631            columns.push(edge_col_name);
632            columns.push(expand.to_variable.clone());
633
634            is_first = false;
635        }
636
637        // Create lazy operator that executes at query time, not planning time
638        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
639
640        if let Some(tx_id) = self.tx_id {
641            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
642        } else {
643            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
644        }
645
646        Ok((Box::new(lazy_op), columns))
647    }
648
649    /// Plans a RETURN clause.
650    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
651        // Plan the input operator
652        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
653
654        // Build variable to column index mapping
655        let variable_columns: HashMap<String, usize> = input_columns
656            .iter()
657            .enumerate()
658            .map(|(i, name)| (name.clone(), i))
659            .collect();
660
661        // Extract column names from return items
662        let columns: Vec<String> = ret
663            .items
664            .iter()
665            .map(|item| {
666                item.alias.clone().unwrap_or_else(|| {
667                    // Generate a default name from the expression
668                    expression_to_string(&item.expression)
669                })
670            })
671            .collect();
672
673        // Check if we need a project operator (for property access or expression evaluation)
674        let needs_project = ret
675            .items
676            .iter()
677            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
678
679        if needs_project {
680            // Build project expressions
681            let mut projections = Vec::with_capacity(ret.items.len());
682            let mut output_types = Vec::with_capacity(ret.items.len());
683
684            for item in &ret.items {
685                match &item.expression {
686                    LogicalExpression::Variable(name) => {
687                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
688                            Error::Internal(format!("Variable '{}' not found in input", name))
689                        })?;
690                        projections.push(ProjectExpr::Column(col_idx));
691                        // Path detail variables carry List values — use Any/Generic
692                        if name.starts_with("_path_nodes_")
693                            || name.starts_with("_path_edges_")
694                            || name.starts_with("_path_length_")
695                        {
696                            output_types.push(LogicalType::Any);
697                        } else {
698                            output_types.push(LogicalType::Node);
699                        }
700                    }
701                    LogicalExpression::Property { variable, property } => {
702                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
703                            Error::Internal(format!("Variable '{}' not found in input", variable))
704                        })?;
705                        projections.push(ProjectExpr::PropertyAccess {
706                            column: col_idx,
707                            property: property.clone(),
708                        });
709                        // Property could be any type - use Any/Generic to preserve type
710                        output_types.push(LogicalType::Any);
711                    }
712                    LogicalExpression::Literal(value) => {
713                        projections.push(ProjectExpr::Constant(value.clone()));
714                        output_types.push(value_to_logical_type(value));
715                    }
716                    LogicalExpression::FunctionCall { name, args, .. } => {
717                        // Handle built-in functions
718                        match name.to_lowercase().as_str() {
719                            "type" => {
720                                // type(r) returns the edge type string
721                                if args.len() != 1 {
722                                    return Err(Error::Internal(
723                                        "type() requires exactly one argument".to_string(),
724                                    ));
725                                }
726                                if let LogicalExpression::Variable(var_name) = &args[0] {
727                                    let col_idx =
728                                        *variable_columns.get(var_name).ok_or_else(|| {
729                                            Error::Internal(format!(
730                                                "Variable '{}' not found in input",
731                                                var_name
732                                            ))
733                                        })?;
734                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
735                                    output_types.push(LogicalType::String);
736                                } else {
737                                    return Err(Error::Internal(
738                                        "type() argument must be a variable".to_string(),
739                                    ));
740                                }
741                            }
742                            "length" => {
743                                // length(p) returns the path length
744                                // For shortestPath results, the path column already contains the length
745                                if args.len() != 1 {
746                                    return Err(Error::Internal(
747                                        "length() requires exactly one argument".to_string(),
748                                    ));
749                                }
750                                if let LogicalExpression::Variable(var_name) = &args[0] {
751                                    let col_idx =
752                                        *variable_columns.get(var_name).ok_or_else(|| {
753                                            Error::Internal(format!(
754                                                "Variable '{}' not found in input",
755                                                var_name
756                                            ))
757                                        })?;
758                                    // Pass through the column value directly
759                                    projections.push(ProjectExpr::Column(col_idx));
760                                    output_types.push(LogicalType::Int64);
761                                } else {
762                                    return Err(Error::Internal(
763                                        "length() argument must be a variable".to_string(),
764                                    ));
765                                }
766                            }
767                            "nodes" | "edges" => {
768                                // nodes(p) / edges(p) returns the list of nodes/edges in the path
769                                if args.len() != 1 {
770                                    return Err(Error::Internal(format!(
771                                        "{}() requires exactly one argument",
772                                        name
773                                    )));
774                                }
775                                if let LogicalExpression::Variable(var_name) = &args[0] {
776                                    let col_idx =
777                                        *variable_columns.get(var_name).ok_or_else(|| {
778                                            Error::Internal(format!(
779                                                "Variable '{var_name}' not found in input",
780                                            ))
781                                        })?;
782                                    projections.push(ProjectExpr::Column(col_idx));
783                                    output_types.push(LogicalType::Any);
784                                } else {
785                                    return Err(Error::Internal(format!(
786                                        "{}() argument must be a variable",
787                                        name
788                                    )));
789                                }
790                            }
791                            // For other functions (head, tail, size, etc.), use expression evaluation
792                            _ => {
793                                let filter_expr = self.convert_expression(&item.expression)?;
794                                projections.push(ProjectExpr::Expression {
795                                    expr: filter_expr,
796                                    variable_columns: variable_columns.clone(),
797                                });
798                                output_types.push(LogicalType::Any);
799                            }
800                        }
801                    }
802                    LogicalExpression::Case { .. } => {
803                        // Convert CASE expression to FilterExpression for evaluation
804                        let filter_expr = self.convert_expression(&item.expression)?;
805                        projections.push(ProjectExpr::Expression {
806                            expr: filter_expr,
807                            variable_columns: variable_columns.clone(),
808                        });
809                        // CASE can return any type - use Any
810                        output_types.push(LogicalType::Any);
811                    }
812                    _ => {
813                        return Err(Error::Internal(format!(
814                            "Unsupported RETURN expression: {:?}",
815                            item.expression
816                        )));
817                    }
818                }
819            }
820
821            let operator = Box::new(ProjectOperator::with_store(
822                input_op,
823                projections,
824                output_types,
825                Arc::clone(&self.store),
826            ));
827
828            Ok((operator, columns))
829        } else {
830            // Simple case: just return variables
831            // Re-order columns to match return items if needed
832            let mut projections = Vec::with_capacity(ret.items.len());
833            let mut output_types = Vec::with_capacity(ret.items.len());
834
835            for item in &ret.items {
836                if let LogicalExpression::Variable(name) = &item.expression {
837                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
838                        Error::Internal(format!("Variable '{}' not found in input", name))
839                    })?;
840                    projections.push(ProjectExpr::Column(col_idx));
841                    output_types.push(LogicalType::Node);
842                }
843            }
844
845            // Only add ProjectOperator if reordering is needed
846            if projections.len() == input_columns.len()
847                && projections
848                    .iter()
849                    .enumerate()
850                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
851            {
852                // No reordering needed
853                Ok((input_op, columns))
854            } else {
855                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
856                Ok((operator, columns))
857            }
858        }
859    }
860
861    /// Plans a project operator (for WITH clause).
862    fn plan_project(
863        &self,
864        project: &crate::query::plan::ProjectOp,
865    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
866        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
867        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
868            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
869                // Create a single-row operator for projecting literals
870                let single_row_op: Box<dyn Operator> = Box::new(
871                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
872                );
873                (single_row_op, Vec::new())
874            } else {
875                self.plan_operator(&project.input)?
876            };
877
878        // Build variable to column index mapping
879        let variable_columns: HashMap<String, usize> = input_columns
880            .iter()
881            .enumerate()
882            .map(|(i, name)| (name.clone(), i))
883            .collect();
884
885        // Build projections and new column names
886        let mut projections = Vec::with_capacity(project.projections.len());
887        let mut output_types = Vec::with_capacity(project.projections.len());
888        let mut output_columns = Vec::with_capacity(project.projections.len());
889
890        for projection in &project.projections {
891            // Determine the output column name (alias or expression string)
892            let col_name = projection
893                .alias
894                .clone()
895                .unwrap_or_else(|| expression_to_string(&projection.expression));
896            output_columns.push(col_name);
897
898            match &projection.expression {
899                LogicalExpression::Variable(name) => {
900                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
901                        Error::Internal(format!("Variable '{}' not found in input", name))
902                    })?;
903                    projections.push(ProjectExpr::Column(col_idx));
904                    output_types.push(LogicalType::Node);
905                }
906                LogicalExpression::Property { variable, property } => {
907                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
908                        Error::Internal(format!("Variable '{}' not found in input", variable))
909                    })?;
910                    projections.push(ProjectExpr::PropertyAccess {
911                        column: col_idx,
912                        property: property.clone(),
913                    });
914                    output_types.push(LogicalType::Any);
915                }
916                LogicalExpression::Literal(value) => {
917                    projections.push(ProjectExpr::Constant(value.clone()));
918                    output_types.push(value_to_logical_type(value));
919                }
920                _ => {
921                    // For complex expressions, use full expression evaluation
922                    let filter_expr = self.convert_expression(&projection.expression)?;
923                    projections.push(ProjectExpr::Expression {
924                        expr: filter_expr,
925                        variable_columns: variable_columns.clone(),
926                    });
927                    output_types.push(LogicalType::Any);
928                }
929            }
930        }
931
932        let operator = Box::new(ProjectOperator::with_store(
933            input_op,
934            projections,
935            output_types,
936            Arc::clone(&self.store),
937        ));
938
939        Ok((operator, output_columns))
940    }
941
942    /// Plans a filter operator.
943    ///
944    /// Uses zone map pre-filtering to potentially skip scans when predicates
945    /// definitely won't match any data. Also uses property indexes when available
946    /// for O(1) lookups instead of full scans.
947    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
948        // Check zone maps for simple property predicates before scanning
949        // If zone map says "definitely no matches", we can short-circuit
950        if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
951            // Zone map says no matches possible - return empty result
952            let (_, columns) = self.plan_operator(&filter.input)?;
953            let schema = self.derive_schema_from_columns(&columns);
954            let empty_op = Box::new(EmptyOperator::new(schema));
955            return Ok((empty_op, columns));
956        }
957
958        // Try to use property index for equality predicates on indexed properties
959        if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
960            return Ok(result);
961        }
962
963        // Try to use range optimization for range predicates (>, <, >=, <=)
964        if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
965            return Ok(result);
966        }
967
968        // Plan the input operator first
969        let (input_op, columns) = self.plan_operator(&filter.input)?;
970
971        // Build variable to column index mapping
972        let variable_columns: HashMap<String, usize> = columns
973            .iter()
974            .enumerate()
975            .map(|(i, name)| (name.clone(), i))
976            .collect();
977
978        // Convert logical expression to filter expression
979        let filter_expr = self.convert_expression(&filter.predicate)?;
980
981        // Create the predicate
982        let predicate =
983            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
984
985        // Create the filter operator
986        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
987
988        Ok((operator, columns))
989    }
990
991    /// Checks zone maps for a predicate to see if we can skip the scan entirely.
992    ///
993    /// Returns:
994    /// - `Some(false)` if zone map proves no matches possible (can skip)
995    /// - `Some(true)` if zone map says matches might exist
996    /// - `None` if zone map check not applicable
997    fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
998        use grafeo_core::graph::lpg::CompareOp;
999
1000        match predicate {
1001            LogicalExpression::Binary { left, op, right } => {
1002                // Check for AND/OR first (compound conditions)
1003                match op {
1004                    BinaryOp::And => {
1005                        let left_result = self.check_zone_map_for_predicate(left);
1006                        let right_result = self.check_zone_map_for_predicate(right);
1007
1008                        return match (left_result, right_result) {
1009                            // If either side definitely won't match, the AND won't match
1010                            (Some(false), _) | (_, Some(false)) => Some(false),
1011                            // If both might match, might match overall
1012                            (Some(true), Some(true)) => Some(true),
1013                            // Otherwise, can't determine
1014                            _ => None,
1015                        };
1016                    }
1017                    BinaryOp::Or => {
1018                        let left_result = self.check_zone_map_for_predicate(left);
1019                        let right_result = self.check_zone_map_for_predicate(right);
1020
1021                        return match (left_result, right_result) {
1022                            // Both sides definitely won't match
1023                            (Some(false), Some(false)) => Some(false),
1024                            // At least one side might match
1025                            (Some(true), _) | (_, Some(true)) => Some(true),
1026                            // Otherwise, can't determine
1027                            _ => None,
1028                        };
1029                    }
1030                    _ => {}
1031                }
1032
1033                // Simple property comparison: n.property op value
1034                let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1035                    (
1036                        LogicalExpression::Property { property, .. },
1037                        LogicalExpression::Literal(val),
1038                    ) => {
1039                        let cmp = match op {
1040                            BinaryOp::Eq => CompareOp::Eq,
1041                            BinaryOp::Ne => CompareOp::Ne,
1042                            BinaryOp::Lt => CompareOp::Lt,
1043                            BinaryOp::Le => CompareOp::Le,
1044                            BinaryOp::Gt => CompareOp::Gt,
1045                            BinaryOp::Ge => CompareOp::Ge,
1046                            _ => return None,
1047                        };
1048                        (property.clone(), cmp, val.clone())
1049                    }
1050                    (
1051                        LogicalExpression::Literal(val),
1052                        LogicalExpression::Property { property, .. },
1053                    ) => {
1054                        // Flip comparison for reversed operands
1055                        let cmp = match op {
1056                            BinaryOp::Eq => CompareOp::Eq,
1057                            BinaryOp::Ne => CompareOp::Ne,
1058                            BinaryOp::Lt => CompareOp::Gt, // val < prop means prop > val
1059                            BinaryOp::Le => CompareOp::Ge,
1060                            BinaryOp::Gt => CompareOp::Lt,
1061                            BinaryOp::Ge => CompareOp::Le,
1062                            _ => return None,
1063                        };
1064                        (property.clone(), cmp, val.clone())
1065                    }
1066                    _ => return None,
1067                };
1068
1069                // Check zone map for node properties
1070                let might_match =
1071                    self.store
1072                        .node_property_might_match(&property.into(), compare_op, &value);
1073
1074                Some(might_match)
1075            }
1076
1077            _ => None,
1078        }
1079    }
1080
1081    /// Tries to use a property index for filter optimization.
1082    ///
1083    /// When a filter predicate is an equality check on an indexed property,
1084    /// and the input is a simple NodeScan, we can use the index to look up
1085    /// matching nodes directly instead of scanning all nodes.
1086    ///
1087    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1088    /// `Ok(None)` if not applicable, or `Err` on error.
1089    fn try_plan_filter_with_property_index(
1090        &self,
1091        filter: &FilterOp,
1092    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1093        // Only optimize if input is a simple NodeScan (not nested)
1094        let (scan_variable, scan_label) = match filter.input.as_ref() {
1095            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1096                (scan.variable.clone(), scan.label.clone())
1097            }
1098            _ => return Ok(None),
1099        };
1100
1101        // Extract property equality conditions from the predicate
1102        // Handles both simple (n.prop = val) and compound (n.a = 1 AND n.b = 2)
1103        let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1104
1105        if conditions.is_empty() {
1106            return Ok(None);
1107        }
1108
1109        // Check if at least one condition has an index (otherwise full scan is needed anyway)
1110        let has_indexed_condition = conditions
1111            .iter()
1112            .any(|(prop, _)| self.store.has_property_index(prop));
1113
1114        if !has_indexed_condition {
1115            return Ok(None);
1116        }
1117
1118        // Use the optimized batch lookup for multiple conditions
1119        let conditions_ref: Vec<(&str, Value)> = conditions
1120            .iter()
1121            .map(|(p, v)| (p.as_str(), v.clone()))
1122            .collect();
1123        let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1124
1125        // If there's a label filter, also filter by label
1126        if let Some(label) = &scan_label {
1127            let label_nodes: std::collections::HashSet<_> =
1128                self.store.nodes_by_label(label).into_iter().collect();
1129            matching_nodes.retain(|n| label_nodes.contains(n));
1130        }
1131
1132        // Create a NodeListOperator with the matching nodes
1133        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1134        let columns = vec![scan_variable];
1135
1136        Ok(Some((node_list_op, columns)))
1137    }
1138
1139    /// Extracts equality conditions (property = literal) from a predicate.
1140    ///
1141    /// Handles both simple predicates and AND chains:
1142    /// - `n.name = "Alice"` → `[("name", "Alice")]`
1143    /// - `n.name = "Alice" AND n.age = 30` → `[("name", "Alice"), ("age", 30)]`
1144    fn extract_equality_conditions(
1145        &self,
1146        predicate: &LogicalExpression,
1147        target_variable: &str,
1148    ) -> Vec<(String, Value)> {
1149        let mut conditions = Vec::new();
1150        self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1151        conditions
1152    }
1153
1154    /// Recursively collects equality conditions from AND expressions.
1155    fn collect_equality_conditions(
1156        &self,
1157        expr: &LogicalExpression,
1158        target_variable: &str,
1159        conditions: &mut Vec<(String, Value)>,
1160    ) {
1161        match expr {
1162            // Handle AND: recurse into both sides
1163            LogicalExpression::Binary {
1164                left,
1165                op: BinaryOp::And,
1166                right,
1167            } => {
1168                self.collect_equality_conditions(left, target_variable, conditions);
1169                self.collect_equality_conditions(right, target_variable, conditions);
1170            }
1171
1172            // Handle equality: extract property and value
1173            LogicalExpression::Binary {
1174                left,
1175                op: BinaryOp::Eq,
1176                right,
1177            } => {
1178                if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1179                    && var == target_variable
1180                {
1181                    conditions.push((prop, val));
1182                }
1183            }
1184
1185            _ => {}
1186        }
1187    }
1188
1189    /// Extracts (variable, property, value) from a property equality expression.
1190    fn extract_property_equality(
1191        &self,
1192        left: &LogicalExpression,
1193        right: &LogicalExpression,
1194    ) -> Option<(String, String, Value)> {
1195        match (left, right) {
1196            (
1197                LogicalExpression::Property { variable, property },
1198                LogicalExpression::Literal(val),
1199            ) => Some((variable.clone(), property.clone(), val.clone())),
1200            (
1201                LogicalExpression::Literal(val),
1202                LogicalExpression::Property { variable, property },
1203            ) => Some((variable.clone(), property.clone(), val.clone())),
1204            _ => None,
1205        }
1206    }
1207
1208    /// Tries to optimize a filter using range queries on properties.
1209    ///
1210    /// This optimization is applied when:
1211    /// - The input is a simple NodeScan (no nested operations)
1212    /// - The predicate contains range comparisons (>, <, >=, <=)
1213    /// - The same variable and property are being filtered
1214    ///
1215    /// Handles both simple range predicates (`n.age > 30`) and BETWEEN patterns
1216    /// (`n.age >= 30 AND n.age <= 50`).
1217    ///
1218    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1219    /// `Ok(None)` if not applicable, or `Err` on error.
1220    fn try_plan_filter_with_range_index(
1221        &self,
1222        filter: &FilterOp,
1223    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1224        // Only optimize if input is a simple NodeScan (not nested)
1225        let (scan_variable, scan_label) = match filter.input.as_ref() {
1226            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1227                (scan.variable.clone(), scan.label.clone())
1228            }
1229            _ => return Ok(None),
1230        };
1231
1232        // Try to extract BETWEEN pattern first (more efficient)
1233        if let Some((variable, property, min, max, min_inc, max_inc)) =
1234            self.extract_between_predicate(&filter.predicate)
1235            && variable == scan_variable
1236        {
1237            return self.plan_range_filter(
1238                &scan_variable,
1239                &scan_label,
1240                &property,
1241                RangeBounds {
1242                    min: Some(&min),
1243                    max: Some(&max),
1244                    min_inclusive: min_inc,
1245                    max_inclusive: max_inc,
1246                },
1247            );
1248        }
1249
1250        // Try to extract simple range predicate
1251        if let Some((variable, property, op, value)) =
1252            self.extract_range_predicate(&filter.predicate)
1253            && variable == scan_variable
1254        {
1255            let (min, max, min_inc, max_inc) = match op {
1256                BinaryOp::Lt => (None, Some(value), false, false),
1257                BinaryOp::Le => (None, Some(value), false, true),
1258                BinaryOp::Gt => (Some(value), None, false, false),
1259                BinaryOp::Ge => (Some(value), None, true, false),
1260                _ => return Ok(None),
1261            };
1262            return self.plan_range_filter(
1263                &scan_variable,
1264                &scan_label,
1265                &property,
1266                RangeBounds {
1267                    min: min.as_ref(),
1268                    max: max.as_ref(),
1269                    min_inclusive: min_inc,
1270                    max_inclusive: max_inc,
1271                },
1272            );
1273        }
1274
1275        Ok(None)
1276    }
1277
1278    /// Plans a range filter using `find_nodes_in_range`.
1279    fn plan_range_filter(
1280        &self,
1281        scan_variable: &str,
1282        scan_label: &Option<String>,
1283        property: &str,
1284        bounds: RangeBounds<'_>,
1285    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1286        // Use the store's range query method
1287        let mut matching_nodes = self.store.find_nodes_in_range(
1288            property,
1289            bounds.min,
1290            bounds.max,
1291            bounds.min_inclusive,
1292            bounds.max_inclusive,
1293        );
1294
1295        // If there's a label filter, also filter by label
1296        if let Some(label) = scan_label {
1297            let label_nodes: std::collections::HashSet<_> =
1298                self.store.nodes_by_label(label).into_iter().collect();
1299            matching_nodes.retain(|n| label_nodes.contains(n));
1300        }
1301
1302        // Create a NodeListOperator with the matching nodes
1303        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1304        let columns = vec![scan_variable.to_string()];
1305
1306        Ok(Some((node_list_op, columns)))
1307    }
1308
1309    /// Extracts a simple range predicate (>, <, >=, <=) from an expression.
1310    ///
1311    /// Returns `(variable, property, operator, value)` if found.
1312    fn extract_range_predicate(
1313        &self,
1314        predicate: &LogicalExpression,
1315    ) -> Option<(String, String, BinaryOp, Value)> {
1316        match predicate {
1317            LogicalExpression::Binary { left, op, right } => {
1318                match op {
1319                    BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1320                        // Try property on left: n.age > 30
1321                        if let (
1322                            LogicalExpression::Property { variable, property },
1323                            LogicalExpression::Literal(val),
1324                        ) = (left.as_ref(), right.as_ref())
1325                        {
1326                            return Some((variable.clone(), property.clone(), *op, val.clone()));
1327                        }
1328
1329                        // Try property on right: 30 < n.age (flip operator)
1330                        if let (
1331                            LogicalExpression::Literal(val),
1332                            LogicalExpression::Property { variable, property },
1333                        ) = (left.as_ref(), right.as_ref())
1334                        {
1335                            let flipped_op = match op {
1336                                BinaryOp::Lt => BinaryOp::Gt,
1337                                BinaryOp::Le => BinaryOp::Ge,
1338                                BinaryOp::Gt => BinaryOp::Lt,
1339                                BinaryOp::Ge => BinaryOp::Le,
1340                                _ => return None,
1341                            };
1342                            return Some((
1343                                variable.clone(),
1344                                property.clone(),
1345                                flipped_op,
1346                                val.clone(),
1347                            ));
1348                        }
1349                    }
1350                    _ => {}
1351                }
1352            }
1353            _ => {}
1354        }
1355        None
1356    }
1357
1358    /// Extracts a BETWEEN pattern from compound predicates.
1359    ///
1360    /// Recognizes patterns like:
1361    /// - `n.age >= 30 AND n.age <= 50`
1362    /// - `n.age > 30 AND n.age < 50`
1363    ///
1364    /// Returns `(variable, property, min_value, max_value, min_inclusive, max_inclusive)`.
1365    fn extract_between_predicate(
1366        &self,
1367        predicate: &LogicalExpression,
1368    ) -> Option<(String, String, Value, Value, bool, bool)> {
1369        // Must be an AND expression
1370        let (left, right) = match predicate {
1371            LogicalExpression::Binary {
1372                left,
1373                op: BinaryOp::And,
1374                right,
1375            } => (left.as_ref(), right.as_ref()),
1376            _ => return None,
1377        };
1378
1379        // Extract range predicates from both sides
1380        let left_range = self.extract_range_predicate(left);
1381        let right_range = self.extract_range_predicate(right);
1382
1383        let (left_var, left_prop, left_op, left_val) = left_range?;
1384        let (right_var, right_prop, right_op, right_val) = right_range?;
1385
1386        // Must be same variable and property
1387        if left_var != right_var || left_prop != right_prop {
1388            return None;
1389        }
1390
1391        // Determine which is lower bound and which is upper bound
1392        let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1393            // n.x >= min AND n.x <= max
1394            (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1395            // n.x >= min AND n.x < max
1396            (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1397            // n.x > min AND n.x <= max
1398            (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1399            // n.x > min AND n.x < max
1400            (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1401            // Reversed order: n.x <= max AND n.x >= min
1402            (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1403            // n.x < max AND n.x >= min
1404            (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1405            // n.x <= max AND n.x > min
1406            (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1407            // n.x < max AND n.x > min
1408            (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1409            _ => return None,
1410        };
1411
1412        Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1413    }
1414
1415    /// Plans a LIMIT operator.
1416    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1417        let (input_op, columns) = self.plan_operator(&limit.input)?;
1418        let output_schema = self.derive_schema_from_columns(&columns);
1419        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1420        Ok((operator, columns))
1421    }
1422
1423    /// Plans a SKIP operator.
1424    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1425        let (input_op, columns) = self.plan_operator(&skip.input)?;
1426        let output_schema = self.derive_schema_from_columns(&columns);
1427        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1428        Ok((operator, columns))
1429    }
1430
1431    /// Plans a SORT (ORDER BY) operator.
1432    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1433        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1434
1435        // Build variable to column index mapping
1436        let mut variable_columns: HashMap<String, usize> = input_columns
1437            .iter()
1438            .enumerate()
1439            .map(|(i, name)| (name.clone(), i))
1440            .collect();
1441
1442        // Collect property expressions that need to be projected before sorting
1443        let mut property_projections: Vec<(String, String, String)> = Vec::new();
1444        let mut next_col_idx = input_columns.len();
1445
1446        for key in &sort.keys {
1447            if let LogicalExpression::Property { variable, property } = &key.expression {
1448                let col_name = format!("{}_{}", variable, property);
1449                if !variable_columns.contains_key(&col_name) {
1450                    property_projections.push((
1451                        variable.clone(),
1452                        property.clone(),
1453                        col_name.clone(),
1454                    ));
1455                    variable_columns.insert(col_name, next_col_idx);
1456                    next_col_idx += 1;
1457                }
1458            }
1459        }
1460
1461        // Track output columns
1462        let mut output_columns = input_columns.clone();
1463
1464        // If we have property expressions, add a projection to materialize them
1465        if !property_projections.is_empty() {
1466            let mut projections = Vec::new();
1467            let mut output_types = Vec::new();
1468
1469            // First, pass through all existing columns (use Node type to preserve node IDs
1470            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1471            for (i, _) in input_columns.iter().enumerate() {
1472                projections.push(ProjectExpr::Column(i));
1473                output_types.push(LogicalType::Node);
1474            }
1475
1476            // Then add property access projections
1477            for (variable, property, col_name) in &property_projections {
1478                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1479                    Error::Internal(format!(
1480                        "Variable '{}' not found for ORDER BY property projection",
1481                        variable
1482                    ))
1483                })?;
1484                projections.push(ProjectExpr::PropertyAccess {
1485                    column: source_col,
1486                    property: property.clone(),
1487                });
1488                output_types.push(LogicalType::Any);
1489                output_columns.push(col_name.clone());
1490            }
1491
1492            input_op = Box::new(ProjectOperator::with_store(
1493                input_op,
1494                projections,
1495                output_types,
1496                Arc::clone(&self.store),
1497            ));
1498        }
1499
1500        // Convert logical sort keys to physical sort keys
1501        let physical_keys: Vec<PhysicalSortKey> = sort
1502            .keys
1503            .iter()
1504            .map(|key| {
1505                let col_idx = self
1506                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1507                Ok(PhysicalSortKey {
1508                    column: col_idx,
1509                    direction: match key.order {
1510                        SortOrder::Ascending => SortDirection::Ascending,
1511                        SortOrder::Descending => SortDirection::Descending,
1512                    },
1513                    null_order: NullOrder::NullsLast,
1514                })
1515            })
1516            .collect::<Result<Vec<_>>>()?;
1517
1518        let output_schema = self.derive_schema_from_columns(&output_columns);
1519        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1520        Ok((operator, output_columns))
1521    }
1522
1523    /// Resolves a sort expression to a column index, using projected property columns.
1524    fn resolve_sort_expression_with_properties(
1525        &self,
1526        expr: &LogicalExpression,
1527        variable_columns: &HashMap<String, usize>,
1528    ) -> Result<usize> {
1529        match expr {
1530            LogicalExpression::Variable(name) => {
1531                variable_columns.get(name).copied().ok_or_else(|| {
1532                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1533                })
1534            }
1535            LogicalExpression::Property { variable, property } => {
1536                // Look up the projected property column (e.g., "p_age" for p.age)
1537                let col_name = format!("{}_{}", variable, property);
1538                variable_columns.get(&col_name).copied().ok_or_else(|| {
1539                    Error::Internal(format!(
1540                        "Property column '{}' not found for ORDER BY (from {}.{})",
1541                        col_name, variable, property
1542                    ))
1543                })
1544            }
1545            _ => Err(Error::Internal(format!(
1546                "Unsupported ORDER BY expression: {:?}",
1547                expr
1548            ))),
1549        }
1550    }
1551
1552    /// Derives a schema from column names (uses Any type to handle all value types).
1553    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1554        columns.iter().map(|_| LogicalType::Any).collect()
1555    }
1556
1557    /// Plans an AGGREGATE operator.
1558    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1559        // Check if we can use factorized aggregation for speedup
1560        // Conditions:
1561        // 1. Factorized execution is enabled
1562        // 2. Input is an expand chain (multi-hop)
1563        // 3. No GROUP BY
1564        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1565        if self.factorized_execution
1566            && agg.group_by.is_empty()
1567            && Self::count_expand_chain(&agg.input).0 >= 2
1568            && self.is_simple_aggregate(agg)
1569            && let Ok((op, cols)) = self.plan_factorized_aggregate(agg)
1570        {
1571            return Ok((op, cols));
1572        }
1573        // Fall through to regular aggregate if factorized planning fails
1574
1575        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1576
1577        // Build variable to column index mapping
1578        let mut variable_columns: HashMap<String, usize> = input_columns
1579            .iter()
1580            .enumerate()
1581            .map(|(i, name)| (name.clone(), i))
1582            .collect();
1583
1584        // Collect all property expressions that need to be projected before aggregation
1585        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1586        let mut next_col_idx = input_columns.len();
1587
1588        // Check group-by expressions for properties
1589        for expr in &agg.group_by {
1590            if let LogicalExpression::Property { variable, property } = expr {
1591                let col_name = format!("{}_{}", variable, property);
1592                if !variable_columns.contains_key(&col_name) {
1593                    property_projections.push((
1594                        variable.clone(),
1595                        property.clone(),
1596                        col_name.clone(),
1597                    ));
1598                    variable_columns.insert(col_name, next_col_idx);
1599                    next_col_idx += 1;
1600                }
1601            }
1602        }
1603
1604        // Check aggregate expressions for properties
1605        for agg_expr in &agg.aggregates {
1606            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1607                let col_name = format!("{}_{}", variable, property);
1608                if !variable_columns.contains_key(&col_name) {
1609                    property_projections.push((
1610                        variable.clone(),
1611                        property.clone(),
1612                        col_name.clone(),
1613                    ));
1614                    variable_columns.insert(col_name, next_col_idx);
1615                    next_col_idx += 1;
1616                }
1617            }
1618        }
1619
1620        // If we have property expressions, add a projection to materialize them
1621        if !property_projections.is_empty() {
1622            let mut projections = Vec::new();
1623            let mut output_types = Vec::new();
1624
1625            // First, pass through all existing columns (use Node type to preserve node IDs
1626            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1627            for (i, _) in input_columns.iter().enumerate() {
1628                projections.push(ProjectExpr::Column(i));
1629                output_types.push(LogicalType::Node);
1630            }
1631
1632            // Then add property access projections
1633            for (variable, property, _col_name) in &property_projections {
1634                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1635                    Error::Internal(format!(
1636                        "Variable '{}' not found for property projection",
1637                        variable
1638                    ))
1639                })?;
1640                projections.push(ProjectExpr::PropertyAccess {
1641                    column: source_col,
1642                    property: property.clone(),
1643                });
1644                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1645            }
1646
1647            input_op = Box::new(ProjectOperator::with_store(
1648                input_op,
1649                projections,
1650                output_types,
1651                Arc::clone(&self.store),
1652            ));
1653        }
1654
1655        // Convert group-by expressions to column indices
1656        let group_columns: Vec<usize> = agg
1657            .group_by
1658            .iter()
1659            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1660            .collect::<Result<Vec<_>>>()?;
1661
1662        // Convert aggregate expressions to physical form
1663        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1664            .aggregates
1665            .iter()
1666            .map(|agg_expr| {
1667                let column = agg_expr
1668                    .expression
1669                    .as_ref()
1670                    .map(|e| {
1671                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1672                    })
1673                    .transpose()?;
1674
1675                Ok(PhysicalAggregateExpr {
1676                    function: convert_aggregate_function(agg_expr.function),
1677                    column,
1678                    distinct: agg_expr.distinct,
1679                    alias: agg_expr.alias.clone(),
1680                    percentile: agg_expr.percentile,
1681                })
1682            })
1683            .collect::<Result<Vec<_>>>()?;
1684
1685        // Build output schema and column names
1686        let mut output_schema = Vec::new();
1687        let mut output_columns = Vec::new();
1688
1689        // Add group-by columns
1690        for expr in &agg.group_by {
1691            output_schema.push(LogicalType::Any); // Group-by values can be any type
1692            output_columns.push(expression_to_string(expr));
1693        }
1694
1695        // Add aggregate result columns
1696        for agg_expr in &agg.aggregates {
1697            let result_type = match agg_expr.function {
1698                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1699                    LogicalType::Int64
1700                }
1701                LogicalAggregateFunction::Sum => LogicalType::Int64,
1702                LogicalAggregateFunction::Avg => LogicalType::Float64,
1703                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1704                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1705                    // since the aggregate can return any Value type, but the most common case
1706                    // is numeric values from property expressions
1707                    LogicalType::Int64
1708                }
1709                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1710                // Statistical functions return Float64
1711                LogicalAggregateFunction::StdDev
1712                | LogicalAggregateFunction::StdDevPop
1713                | LogicalAggregateFunction::PercentileDisc
1714                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1715            };
1716            output_schema.push(result_type);
1717            output_columns.push(
1718                agg_expr
1719                    .alias
1720                    .clone()
1721                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1722            );
1723        }
1724
1725        // Choose operator based on whether there are group-by columns
1726        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1727            Box::new(SimpleAggregateOperator::new(
1728                input_op,
1729                physical_aggregates,
1730                output_schema,
1731            ))
1732        } else {
1733            Box::new(HashAggregateOperator::new(
1734                input_op,
1735                group_columns,
1736                physical_aggregates,
1737                output_schema,
1738            ))
1739        };
1740
1741        // Apply HAVING clause filter if present
1742        if let Some(having_expr) = &agg.having {
1743            // Build variable to column mapping for the aggregate output
1744            let having_var_columns: HashMap<String, usize> = output_columns
1745                .iter()
1746                .enumerate()
1747                .map(|(i, name)| (name.clone(), i))
1748                .collect();
1749
1750            let filter_expr = self.convert_expression(having_expr)?;
1751            let predicate =
1752                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1753            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1754        }
1755
1756        Ok((operator, output_columns))
1757    }
1758
1759    /// Checks if an aggregate is simple enough for factorized execution.
1760    ///
1761    /// Simple aggregates:
1762    /// - COUNT(*) or COUNT(variable)
1763    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1764    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1765        agg.aggregates.iter().all(|agg_expr| {
1766            match agg_expr.function {
1767                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1768                    // COUNT(*) is always OK, COUNT(var) is OK
1769                    agg_expr.expression.is_none()
1770                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1771                }
1772                LogicalAggregateFunction::Sum
1773                | LogicalAggregateFunction::Avg
1774                | LogicalAggregateFunction::Min
1775                | LogicalAggregateFunction::Max => {
1776                    // For now, only support when expression is a variable
1777                    // (property access would require flattening first)
1778                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1779                }
1780                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1781                _ => false,
1782            }
1783        })
1784    }
1785
1786    /// Plans a factorized aggregate that operates directly on factorized data.
1787    ///
1788    /// This avoids the O(n²) cost of flattening before aggregation.
1789    fn plan_factorized_aggregate(
1790        &self,
1791        agg: &AggregateOp,
1792    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1793        // Build the expand chain - this returns a LazyFactorizedChainOperator
1794        let expands = Self::collect_expand_chain(&agg.input);
1795        if expands.is_empty() {
1796            return Err(Error::Internal(
1797                "Expected expand chain for factorized aggregate".to_string(),
1798            ));
1799        }
1800
1801        // Get the base operator (before first expand)
1802        let first_expand = expands[0];
1803        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1804
1805        let mut columns = base_columns.clone();
1806        let mut steps = Vec::new();
1807        let mut is_first = true;
1808
1809        for expand in &expands {
1810            // Find source column for this expand
1811            let source_column = if is_first {
1812                base_columns
1813                    .iter()
1814                    .position(|c| c == &expand.from_variable)
1815                    .ok_or_else(|| {
1816                        Error::Internal(format!(
1817                            "Source variable '{}' not found in base columns",
1818                            expand.from_variable
1819                        ))
1820                    })?
1821            } else {
1822                1 // Target from previous level
1823            };
1824
1825            let direction = match expand.direction {
1826                ExpandDirection::Outgoing => Direction::Outgoing,
1827                ExpandDirection::Incoming => Direction::Incoming,
1828                ExpandDirection::Both => Direction::Both,
1829            };
1830
1831            steps.push(ExpandStep {
1832                source_column,
1833                direction,
1834                edge_type: expand.edge_type.clone(),
1835            });
1836
1837            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1838                let count = self.anon_edge_counter.get();
1839                self.anon_edge_counter.set(count + 1);
1840                format!("_anon_edge_{}", count)
1841            });
1842            columns.push(edge_col_name);
1843            columns.push(expand.to_variable.clone());
1844
1845            is_first = false;
1846        }
1847
1848        // Create the lazy factorized chain operator
1849        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1850
1851        if let Some(tx_id) = self.tx_id {
1852            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1853        } else {
1854            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1855        }
1856
1857        // Convert logical aggregates to factorized aggregates
1858        let factorized_aggs: Vec<FactorizedAggregate> = agg
1859            .aggregates
1860            .iter()
1861            .map(|agg_expr| {
1862                match agg_expr.function {
1863                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1864                        // COUNT(*) uses simple count, COUNT(col) uses column count
1865                        if agg_expr.expression.is_none() {
1866                            FactorizedAggregate::count()
1867                        } else {
1868                            // For COUNT(variable), we use the deepest level's target column
1869                            // which is the last column added to the schema
1870                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1871                        }
1872                    }
1873                    LogicalAggregateFunction::Sum => {
1874                        // SUM on deepest level target
1875                        FactorizedAggregate::sum(1)
1876                    }
1877                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1878                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1879                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1880                    _ => {
1881                        // Shouldn't reach here due to is_simple_aggregate check
1882                        FactorizedAggregate::count()
1883                    }
1884                }
1885            })
1886            .collect();
1887
1888        // Build output column names
1889        let output_columns: Vec<String> = agg
1890            .aggregates
1891            .iter()
1892            .map(|agg_expr| {
1893                agg_expr
1894                    .alias
1895                    .clone()
1896                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1897            })
1898            .collect();
1899
1900        // Create the factorized aggregate operator
1901        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1902
1903        Ok((Box::new(factorized_agg_op), output_columns))
1904    }
1905
1906    /// Resolves a logical expression to a column index.
1907    #[allow(dead_code)]
1908    fn resolve_expression_to_column(
1909        &self,
1910        expr: &LogicalExpression,
1911        variable_columns: &HashMap<String, usize>,
1912    ) -> Result<usize> {
1913        match expr {
1914            LogicalExpression::Variable(name) => variable_columns
1915                .get(name)
1916                .copied()
1917                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1918            LogicalExpression::Property { variable, .. } => variable_columns
1919                .get(variable)
1920                .copied()
1921                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1922            _ => Err(Error::Internal(format!(
1923                "Cannot resolve expression to column: {:?}",
1924                expr
1925            ))),
1926        }
1927    }
1928
1929    /// Resolves a logical expression to a column index, using projected property columns.
1930    ///
1931    /// This is used for aggregations where properties have been projected into their own columns.
1932    fn resolve_expression_to_column_with_properties(
1933        &self,
1934        expr: &LogicalExpression,
1935        variable_columns: &HashMap<String, usize>,
1936    ) -> Result<usize> {
1937        match expr {
1938            LogicalExpression::Variable(name) => variable_columns
1939                .get(name)
1940                .copied()
1941                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1942            LogicalExpression::Property { variable, property } => {
1943                // Look up the projected property column (e.g., "p_price" for p.price)
1944                let col_name = format!("{}_{}", variable, property);
1945                variable_columns.get(&col_name).copied().ok_or_else(|| {
1946                    Error::Internal(format!(
1947                        "Property column '{}' not found (from {}.{})",
1948                        col_name, variable, property
1949                    ))
1950                })
1951            }
1952            _ => Err(Error::Internal(format!(
1953                "Cannot resolve expression to column: {:?}",
1954                expr
1955            ))),
1956        }
1957    }
1958
1959    /// Converts a logical expression to a filter expression.
1960    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1961        match expr {
1962            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1963            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1964            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1965                variable: variable.clone(),
1966                property: property.clone(),
1967            }),
1968            LogicalExpression::Binary { left, op, right } => {
1969                let left_expr = self.convert_expression(left)?;
1970                let right_expr = self.convert_expression(right)?;
1971                let filter_op = convert_binary_op(*op)?;
1972                Ok(FilterExpression::Binary {
1973                    left: Box::new(left_expr),
1974                    op: filter_op,
1975                    right: Box::new(right_expr),
1976                })
1977            }
1978            LogicalExpression::Unary { op, operand } => {
1979                let operand_expr = self.convert_expression(operand)?;
1980                let filter_op = convert_unary_op(*op)?;
1981                Ok(FilterExpression::Unary {
1982                    op: filter_op,
1983                    operand: Box::new(operand_expr),
1984                })
1985            }
1986            LogicalExpression::FunctionCall { name, args, .. } => {
1987                let filter_args: Vec<FilterExpression> = args
1988                    .iter()
1989                    .map(|a| self.convert_expression(a))
1990                    .collect::<Result<Vec<_>>>()?;
1991                Ok(FilterExpression::FunctionCall {
1992                    name: name.clone(),
1993                    args: filter_args,
1994                })
1995            }
1996            LogicalExpression::Case {
1997                operand,
1998                when_clauses,
1999                else_clause,
2000            } => {
2001                let filter_operand = operand
2002                    .as_ref()
2003                    .map(|e| self.convert_expression(e))
2004                    .transpose()?
2005                    .map(Box::new);
2006                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2007                    .iter()
2008                    .map(|(cond, result)| {
2009                        Ok((
2010                            self.convert_expression(cond)?,
2011                            self.convert_expression(result)?,
2012                        ))
2013                    })
2014                    .collect::<Result<Vec<_>>>()?;
2015                let filter_else = else_clause
2016                    .as_ref()
2017                    .map(|e| self.convert_expression(e))
2018                    .transpose()?
2019                    .map(Box::new);
2020                Ok(FilterExpression::Case {
2021                    operand: filter_operand,
2022                    when_clauses: filter_when_clauses,
2023                    else_clause: filter_else,
2024                })
2025            }
2026            LogicalExpression::List(items) => {
2027                let filter_items: Vec<FilterExpression> = items
2028                    .iter()
2029                    .map(|item| self.convert_expression(item))
2030                    .collect::<Result<Vec<_>>>()?;
2031                Ok(FilterExpression::List(filter_items))
2032            }
2033            LogicalExpression::Map(pairs) => {
2034                let filter_pairs: Vec<(String, FilterExpression)> = pairs
2035                    .iter()
2036                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2037                    .collect::<Result<Vec<_>>>()?;
2038                Ok(FilterExpression::Map(filter_pairs))
2039            }
2040            LogicalExpression::IndexAccess { base, index } => {
2041                let base_expr = self.convert_expression(base)?;
2042                let index_expr = self.convert_expression(index)?;
2043                Ok(FilterExpression::IndexAccess {
2044                    base: Box::new(base_expr),
2045                    index: Box::new(index_expr),
2046                })
2047            }
2048            LogicalExpression::SliceAccess { base, start, end } => {
2049                let base_expr = self.convert_expression(base)?;
2050                let start_expr = start
2051                    .as_ref()
2052                    .map(|s| self.convert_expression(s))
2053                    .transpose()?
2054                    .map(Box::new);
2055                let end_expr = end
2056                    .as_ref()
2057                    .map(|e| self.convert_expression(e))
2058                    .transpose()?
2059                    .map(Box::new);
2060                Ok(FilterExpression::SliceAccess {
2061                    base: Box::new(base_expr),
2062                    start: start_expr,
2063                    end: end_expr,
2064                })
2065            }
2066            LogicalExpression::Parameter(_) => Err(Error::Internal(
2067                "Parameters not yet supported in filters".to_string(),
2068            )),
2069            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2070            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2071            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2072            LogicalExpression::ListComprehension {
2073                variable,
2074                list_expr,
2075                filter_expr,
2076                map_expr,
2077            } => {
2078                let list = self.convert_expression(list_expr)?;
2079                let filter = filter_expr
2080                    .as_ref()
2081                    .map(|f| self.convert_expression(f))
2082                    .transpose()?
2083                    .map(Box::new);
2084                let map = self.convert_expression(map_expr)?;
2085                Ok(FilterExpression::ListComprehension {
2086                    variable: variable.clone(),
2087                    list_expr: Box::new(list),
2088                    filter_expr: filter,
2089                    map_expr: Box::new(map),
2090                })
2091            }
2092            LogicalExpression::ExistsSubquery(subplan) => {
2093                // Extract the pattern from the subplan
2094                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
2095                let (start_var, direction, edge_type, end_labels) =
2096                    self.extract_exists_pattern(subplan)?;
2097
2098                Ok(FilterExpression::ExistsSubquery {
2099                    start_var,
2100                    direction,
2101                    edge_type,
2102                    end_labels,
2103                    min_hops: None,
2104                    max_hops: None,
2105                })
2106            }
2107            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2108                "COUNT subqueries not yet supported".to_string(),
2109            )),
2110        }
2111    }
2112
2113    /// Extracts the pattern from an EXISTS subplan.
2114    /// Returns (start_variable, direction, edge_type, end_labels).
2115    fn extract_exists_pattern(
2116        &self,
2117        subplan: &LogicalOperator,
2118    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2119        match subplan {
2120            LogicalOperator::Expand(expand) => {
2121                // Get end node labels from the to_variable if there's a node scan input
2122                let end_labels = self.extract_end_labels_from_expand(expand);
2123                let direction = match expand.direction {
2124                    ExpandDirection::Outgoing => Direction::Outgoing,
2125                    ExpandDirection::Incoming => Direction::Incoming,
2126                    ExpandDirection::Both => Direction::Both,
2127                };
2128                Ok((
2129                    expand.from_variable.clone(),
2130                    direction,
2131                    expand.edge_type.clone(),
2132                    end_labels,
2133                ))
2134            }
2135            LogicalOperator::NodeScan(scan) => {
2136                if let Some(input) = &scan.input {
2137                    self.extract_exists_pattern(input)
2138                } else {
2139                    Err(Error::Internal(
2140                        "EXISTS subquery must contain an edge pattern".to_string(),
2141                    ))
2142                }
2143            }
2144            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2145            _ => Err(Error::Internal(
2146                "Unsupported EXISTS subquery pattern".to_string(),
2147            )),
2148        }
2149    }
2150
2151    /// Extracts end node labels from an Expand operator if present.
2152    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2153        // Check if the expand has a NodeScan input with a label filter
2154        match expand.input.as_ref() {
2155            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2156            _ => None,
2157        }
2158    }
2159
2160    /// Plans a JOIN operator.
2161    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2162        let (left_op, left_columns) = self.plan_operator(&join.left)?;
2163        let (right_op, right_columns) = self.plan_operator(&join.right)?;
2164
2165        // Build combined output columns
2166        let mut columns = left_columns.clone();
2167        columns.extend(right_columns.clone());
2168
2169        // Convert join type
2170        let physical_join_type = match join.join_type {
2171            JoinType::Inner => PhysicalJoinType::Inner,
2172            JoinType::Left => PhysicalJoinType::Left,
2173            JoinType::Right => PhysicalJoinType::Right,
2174            JoinType::Full => PhysicalJoinType::Full,
2175            JoinType::Cross => PhysicalJoinType::Cross,
2176            JoinType::Semi => PhysicalJoinType::Semi,
2177            JoinType::Anti => PhysicalJoinType::Anti,
2178        };
2179
2180        // Build key columns from join conditions
2181        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2182            // Cross join - no keys
2183            (vec![], vec![])
2184        } else {
2185            join.conditions
2186                .iter()
2187                .filter_map(|cond| {
2188                    // Try to extract column indices from expressions
2189                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2190                    let right_idx = self
2191                        .expression_to_column(&cond.right, &right_columns)
2192                        .ok()?;
2193                    Some((left_idx, right_idx))
2194                })
2195                .unzip()
2196        };
2197
2198        let output_schema = self.derive_schema_from_columns(&columns);
2199
2200        // Check if we should use leapfrog join for cyclic patterns
2201        // Currently we use hash join by default; leapfrog is available but
2202        // requires explicit multi-way join detection which will be added
2203        // when we have proper cyclic pattern detection in the optimizer.
2204        // For now, LeapfrogJoinOperator is available for direct use.
2205        let _ = LeapfrogJoinOperator::new; // Suppress unused warning
2206
2207        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2208            left_op,
2209            right_op,
2210            probe_keys,
2211            build_keys,
2212            physical_join_type,
2213            output_schema,
2214        ));
2215
2216        Ok((operator, columns))
2217    }
2218
2219    /// Checks if a join pattern is cyclic (e.g., triangle, clique).
2220    ///
2221    /// A cyclic pattern occurs when join conditions reference variables
2222    /// that create a cycle in the join graph. For example, a triangle
2223    /// pattern (a)->(b)->(c)->(a) creates a cycle.
2224    ///
2225    /// Returns true if the join graph contains at least one cycle with 3+ nodes,
2226    /// indicating potential for worst-case optimal join (WCOJ) optimization.
2227    #[allow(dead_code)]
2228    fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2229        // Build adjacency list for join variables
2230        let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2231        let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2232
2233        // Collect edges from join conditions
2234        Self::collect_join_edges(
2235            &LogicalOperator::Join(join.clone()),
2236            &mut edges,
2237            &mut all_vars,
2238        );
2239
2240        // Need at least 3 variables to form a cycle
2241        if all_vars.len() < 3 {
2242            return false;
2243        }
2244
2245        // Detect cycle using DFS with coloring
2246        Self::has_cycle(&edges, &all_vars)
2247    }
2248
2249    /// Collects edges from join conditions into an adjacency list.
2250    fn collect_join_edges(
2251        op: &LogicalOperator,
2252        edges: &mut HashMap<String, Vec<String>>,
2253        vars: &mut std::collections::HashSet<String>,
2254    ) {
2255        match op {
2256            LogicalOperator::Join(join) => {
2257                // Process join conditions
2258                for cond in &join.conditions {
2259                    if let (Some(left_var), Some(right_var)) = (
2260                        Self::extract_join_variable(&cond.left),
2261                        Self::extract_join_variable(&cond.right),
2262                    ) && left_var != right_var
2263                    {
2264                        vars.insert(left_var.clone());
2265                        vars.insert(right_var.clone());
2266
2267                        // Add bidirectional edge
2268                        edges
2269                            .entry(left_var.clone())
2270                            .or_default()
2271                            .push(right_var.clone());
2272                        edges.entry(right_var).or_default().push(left_var);
2273                    }
2274                }
2275
2276                // Recurse into children
2277                Self::collect_join_edges(&join.left, edges, vars);
2278                Self::collect_join_edges(&join.right, edges, vars);
2279            }
2280            LogicalOperator::Expand(expand) => {
2281                // Expand creates implicit join between from_variable and to_variable
2282                vars.insert(expand.from_variable.clone());
2283                vars.insert(expand.to_variable.clone());
2284
2285                edges
2286                    .entry(expand.from_variable.clone())
2287                    .or_default()
2288                    .push(expand.to_variable.clone());
2289                edges
2290                    .entry(expand.to_variable.clone())
2291                    .or_default()
2292                    .push(expand.from_variable.clone());
2293
2294                Self::collect_join_edges(&expand.input, edges, vars);
2295            }
2296            LogicalOperator::Filter(filter) => {
2297                Self::collect_join_edges(&filter.input, edges, vars);
2298            }
2299            LogicalOperator::NodeScan(scan) => {
2300                vars.insert(scan.variable.clone());
2301            }
2302            _ => {}
2303        }
2304    }
2305
2306    /// Extracts the variable name from a join expression.
2307    fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2308        match expr {
2309            LogicalExpression::Variable(v) => Some(v.clone()),
2310            LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2311            LogicalExpression::Id(v) => Some(v.clone()),
2312            _ => None,
2313        }
2314    }
2315
2316    /// Detects if the graph has a cycle using DFS coloring.
2317    ///
2318    /// Colors: 0 = white (unvisited), 1 = gray (in progress), 2 = black (done)
2319    fn has_cycle(
2320        edges: &HashMap<String, Vec<String>>,
2321        vars: &std::collections::HashSet<String>,
2322    ) -> bool {
2323        let mut color: HashMap<&String, u8> = HashMap::new();
2324
2325        for var in vars {
2326            color.insert(var, 0);
2327        }
2328
2329        for start in vars {
2330            if color[start] == 0 && Self::dfs_cycle(start, None, edges, &mut color) {
2331                return true;
2332            }
2333        }
2334
2335        false
2336    }
2337
2338    /// DFS helper for cycle detection.
2339    fn dfs_cycle(
2340        node: &String,
2341        parent: Option<&String>,
2342        edges: &HashMap<String, Vec<String>>,
2343        color: &mut HashMap<&String, u8>,
2344    ) -> bool {
2345        *color.get_mut(node).unwrap() = 1; // Gray
2346
2347        if let Some(neighbors) = edges.get(node) {
2348            for neighbor in neighbors {
2349                // Skip the edge back to parent (undirected graph)
2350                if parent == Some(neighbor) {
2351                    continue;
2352                }
2353
2354                if let Some(&c) = color.get(neighbor) {
2355                    if c == 1 {
2356                        // Found a back edge - cycle detected
2357                        return true;
2358                    }
2359                    if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2360                        return true;
2361                    }
2362                }
2363            }
2364        }
2365
2366        *color.get_mut(node).unwrap() = 2; // Black
2367        false
2368    }
2369
2370    /// Counts the number of base relations in a logical operator tree.
2371    #[allow(dead_code)]
2372    fn count_relations(op: &LogicalOperator) -> usize {
2373        match op {
2374            LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2375            LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2376            LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2377            LogicalOperator::Join(j) => {
2378                Self::count_relations(&j.left) + Self::count_relations(&j.right)
2379            }
2380            _ => 0,
2381        }
2382    }
2383
2384    /// Extracts a column index from an expression.
2385    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2386        match expr {
2387            LogicalExpression::Variable(name) => columns
2388                .iter()
2389                .position(|c| c == name)
2390                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2391            _ => Err(Error::Internal(
2392                "Only variables supported in join conditions".to_string(),
2393            )),
2394        }
2395    }
2396
2397    /// Plans a UNION operator.
2398    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2399        if union.inputs.is_empty() {
2400            return Err(Error::Internal(
2401                "Union requires at least one input".to_string(),
2402            ));
2403        }
2404
2405        let mut inputs = Vec::with_capacity(union.inputs.len());
2406        let mut columns = Vec::new();
2407
2408        for (i, input) in union.inputs.iter().enumerate() {
2409            let (op, cols) = self.plan_operator(input)?;
2410            if i == 0 {
2411                columns = cols;
2412            }
2413            inputs.push(op);
2414        }
2415
2416        let output_schema = self.derive_schema_from_columns(&columns);
2417        let operator = Box::new(UnionOperator::new(inputs, output_schema));
2418
2419        Ok((operator, columns))
2420    }
2421
2422    /// Plans a DISTINCT operator.
2423    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2424        let (input_op, columns) = self.plan_operator(&distinct.input)?;
2425        let output_schema = self.derive_schema_from_columns(&columns);
2426        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2427        Ok((operator, columns))
2428    }
2429
2430    /// Plans a CREATE NODE operator.
2431    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2432        // Plan input if present
2433        let (input_op, mut columns) = if let Some(ref input) = create.input {
2434            let (op, cols) = self.plan_operator(input)?;
2435            (Some(op), cols)
2436        } else {
2437            (None, vec![])
2438        };
2439
2440        // Output column for the created node
2441        let output_column = columns.len();
2442        columns.push(create.variable.clone());
2443
2444        // Convert properties (with constant-folding for lists and function calls)
2445        let properties: Vec<(String, PropertySource)> = create
2446            .properties
2447            .iter()
2448            .map(|(name, expr)| {
2449                let source = match Self::try_fold_expression(expr) {
2450                    Some(value) => PropertySource::Constant(value),
2451                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2452                };
2453                (name.clone(), source)
2454            })
2455            .collect();
2456
2457        let output_schema = self.derive_schema_from_columns(&columns);
2458
2459        let operator = Box::new(
2460            CreateNodeOperator::new(
2461                Arc::clone(&self.store),
2462                input_op,
2463                create.labels.clone(),
2464                properties,
2465                output_schema,
2466                output_column,
2467            )
2468            .with_tx_context(self.viewing_epoch, self.tx_id),
2469        );
2470
2471        Ok((operator, columns))
2472    }
2473
2474    /// Plans a CREATE EDGE operator.
2475    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2476        let (input_op, mut columns) = self.plan_operator(&create.input)?;
2477
2478        // Find source and target columns
2479        let from_column = columns
2480            .iter()
2481            .position(|c| c == &create.from_variable)
2482            .ok_or_else(|| {
2483                Error::Internal(format!(
2484                    "Source variable '{}' not found",
2485                    create.from_variable
2486                ))
2487            })?;
2488
2489        let to_column = columns
2490            .iter()
2491            .position(|c| c == &create.to_variable)
2492            .ok_or_else(|| {
2493                Error::Internal(format!(
2494                    "Target variable '{}' not found",
2495                    create.to_variable
2496                ))
2497            })?;
2498
2499        // Output column for the created edge (if named)
2500        let output_column = create.variable.as_ref().map(|v| {
2501            let idx = columns.len();
2502            columns.push(v.clone());
2503            idx
2504        });
2505
2506        // Convert properties (with constant-folding for function calls like vector())
2507        let properties: Vec<(String, PropertySource)> = create
2508            .properties
2509            .iter()
2510            .map(|(name, expr)| {
2511                let source = match Self::try_fold_expression(expr) {
2512                    Some(value) => PropertySource::Constant(value),
2513                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2514                };
2515                (name.clone(), source)
2516            })
2517            .collect();
2518
2519        let output_schema = self.derive_schema_from_columns(&columns);
2520
2521        let mut operator = CreateEdgeOperator::new(
2522            Arc::clone(&self.store),
2523            input_op,
2524            from_column,
2525            to_column,
2526            create.edge_type.clone(),
2527            output_schema,
2528        )
2529        .with_properties(properties)
2530        .with_tx_context(self.viewing_epoch, self.tx_id);
2531
2532        if let Some(col) = output_column {
2533            operator = operator.with_output_column(col);
2534        }
2535
2536        let operator = Box::new(operator);
2537
2538        Ok((operator, columns))
2539    }
2540
2541    /// Plans a DELETE NODE operator.
2542    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2543        let (input_op, columns) = self.plan_operator(&delete.input)?;
2544
2545        let node_column = columns
2546            .iter()
2547            .position(|c| c == &delete.variable)
2548            .ok_or_else(|| {
2549                Error::Internal(format!(
2550                    "Variable '{}' not found for delete",
2551                    delete.variable
2552                ))
2553            })?;
2554
2555        // Output schema for delete count
2556        let output_schema = vec![LogicalType::Int64];
2557        let output_columns = vec!["deleted_count".to_string()];
2558
2559        let operator = Box::new(
2560            DeleteNodeOperator::new(
2561                Arc::clone(&self.store),
2562                input_op,
2563                node_column,
2564                output_schema,
2565                delete.detach, // DETACH DELETE deletes connected edges first
2566            )
2567            .with_tx_context(self.viewing_epoch, self.tx_id),
2568        );
2569
2570        Ok((operator, output_columns))
2571    }
2572
2573    /// Plans a DELETE EDGE operator.
2574    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2575        let (input_op, columns) = self.plan_operator(&delete.input)?;
2576
2577        let edge_column = columns
2578            .iter()
2579            .position(|c| c == &delete.variable)
2580            .ok_or_else(|| {
2581                Error::Internal(format!(
2582                    "Variable '{}' not found for delete",
2583                    delete.variable
2584                ))
2585            })?;
2586
2587        // Output schema for delete count
2588        let output_schema = vec![LogicalType::Int64];
2589        let output_columns = vec!["deleted_count".to_string()];
2590
2591        let operator = Box::new(
2592            DeleteEdgeOperator::new(
2593                Arc::clone(&self.store),
2594                input_op,
2595                edge_column,
2596                output_schema,
2597            )
2598            .with_tx_context(self.viewing_epoch, self.tx_id),
2599        );
2600
2601        Ok((operator, output_columns))
2602    }
2603
2604    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
2605    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2606        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2607        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2608
2609        // Build combined output columns (left + right)
2610        let mut columns = left_columns.clone();
2611        columns.extend(right_columns.clone());
2612
2613        // Find common variables between left and right for join keys
2614        let mut probe_keys = Vec::new();
2615        let mut build_keys = Vec::new();
2616
2617        for (right_idx, right_col) in right_columns.iter().enumerate() {
2618            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2619                probe_keys.push(left_idx);
2620                build_keys.push(right_idx);
2621            }
2622        }
2623
2624        let output_schema = self.derive_schema_from_columns(&columns);
2625
2626        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2627            left_op,
2628            right_op,
2629            probe_keys,
2630            build_keys,
2631            PhysicalJoinType::Left,
2632            output_schema,
2633        ));
2634
2635        Ok((operator, columns))
2636    }
2637
2638    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
2639    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2640        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2641        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2642
2643        // Anti-join only keeps left columns (filters out matching rows)
2644        let columns = left_columns.clone();
2645
2646        // Find common variables between left and right for join keys
2647        let mut probe_keys = Vec::new();
2648        let mut build_keys = Vec::new();
2649
2650        for (right_idx, right_col) in right_columns.iter().enumerate() {
2651            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2652                probe_keys.push(left_idx);
2653                build_keys.push(right_idx);
2654            }
2655        }
2656
2657        let output_schema = self.derive_schema_from_columns(&columns);
2658
2659        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2660            left_op,
2661            right_op,
2662            probe_keys,
2663            build_keys,
2664            PhysicalJoinType::Anti,
2665            output_schema,
2666        ));
2667
2668        Ok((operator, columns))
2669    }
2670
2671    /// Plans an unwind operator.
2672    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2673        // Plan the input operator first
2674        // Handle Empty specially - use a single-row operator
2675        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2676            if matches!(&*unwind.input, LogicalOperator::Empty) {
2677                // For UNWIND without prior MATCH, create a single-row input
2678                // We need an operator that produces one row with the list to unwind
2679                // For now, use EmptyScan which produces no rows - we'll handle the literal
2680                // list in the unwind operator itself
2681                let literal_list = self.convert_expression(&unwind.expression)?;
2682
2683                // Create a project operator that produces a single row with the list
2684                let single_row_op: Box<dyn Operator> = Box::new(
2685                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2686                );
2687                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2688                    single_row_op,
2689                    vec![ProjectExpr::Expression {
2690                        expr: literal_list,
2691                        variable_columns: HashMap::new(),
2692                    }],
2693                    vec![LogicalType::Any],
2694                    Arc::clone(&self.store),
2695                ));
2696
2697                (project_op, vec!["__list__".to_string()])
2698            } else {
2699                self.plan_operator(&unwind.input)?
2700            };
2701
2702        // The UNWIND expression should be a list - we need to find/evaluate it
2703        // For now, we handle the case where the expression references an existing column
2704        // or is a literal list
2705
2706        // Find if the expression references an existing column (like a list property)
2707        let list_col_idx = match &unwind.expression {
2708            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2709            LogicalExpression::Property { variable, .. } => {
2710                // Property access needs to be evaluated - for now we'll need the filter predicate
2711                // to evaluate this. For simple cases, we treat it as a list column.
2712                input_columns.iter().position(|c| c == variable)
2713            }
2714            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2715                // Literal list expression - we'll add it as a virtual column
2716                None
2717            }
2718            _ => None,
2719        };
2720
2721        // Build output columns: all input columns plus the new variable
2722        let mut columns = input_columns.clone();
2723        columns.push(unwind.variable.clone());
2724
2725        // Build output schema
2726        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2727        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2728
2729        // Use the list column index if found, otherwise default to 0
2730        // (in which case the first column should contain the list)
2731        let col_idx = list_col_idx.unwrap_or(0);
2732
2733        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2734            input_op,
2735            col_idx,
2736            unwind.variable.clone(),
2737            output_schema,
2738        ));
2739
2740        Ok((operator, columns))
2741    }
2742
2743    /// Plans a MERGE operator.
2744    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2745        // Plan the input operator if present (skip if Empty)
2746        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2747            Vec::new()
2748        } else {
2749            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2750            cols
2751        };
2752
2753        // Convert match properties from LogicalExpression to Value
2754        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2755            .match_properties
2756            .iter()
2757            .filter_map(|(name, expr)| {
2758                if let LogicalExpression::Literal(v) = expr {
2759                    Some((name.clone(), v.clone()))
2760                } else {
2761                    None // Skip non-literal expressions for now
2762                }
2763            })
2764            .collect();
2765
2766        // Convert ON CREATE properties
2767        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2768            .on_create
2769            .iter()
2770            .filter_map(|(name, expr)| {
2771                if let LogicalExpression::Literal(v) = expr {
2772                    Some((name.clone(), v.clone()))
2773                } else {
2774                    None
2775                }
2776            })
2777            .collect();
2778
2779        // Convert ON MATCH properties
2780        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2781            .on_match
2782            .iter()
2783            .filter_map(|(name, expr)| {
2784                if let LogicalExpression::Literal(v) = expr {
2785                    Some((name.clone(), v.clone()))
2786                } else {
2787                    None
2788                }
2789            })
2790            .collect();
2791
2792        // Add the merged node variable to output columns
2793        columns.push(merge.variable.clone());
2794
2795        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2796            Arc::clone(&self.store),
2797            merge.variable.clone(),
2798            merge.labels.clone(),
2799            match_properties,
2800            on_create_properties,
2801            on_match_properties,
2802        ));
2803
2804        Ok((operator, columns))
2805    }
2806
2807    /// Plans a SHORTEST PATH operator.
2808    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2809        // Plan the input operator
2810        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2811
2812        // Find source and target node columns
2813        let source_column = columns
2814            .iter()
2815            .position(|c| c == &sp.source_var)
2816            .ok_or_else(|| {
2817                Error::Internal(format!(
2818                    "Source variable '{}' not found for shortestPath",
2819                    sp.source_var
2820                ))
2821            })?;
2822
2823        let target_column = columns
2824            .iter()
2825            .position(|c| c == &sp.target_var)
2826            .ok_or_else(|| {
2827                Error::Internal(format!(
2828                    "Target variable '{}' not found for shortestPath",
2829                    sp.target_var
2830                ))
2831            })?;
2832
2833        // Convert direction
2834        let direction = match sp.direction {
2835            ExpandDirection::Outgoing => Direction::Outgoing,
2836            ExpandDirection::Incoming => Direction::Incoming,
2837            ExpandDirection::Both => Direction::Both,
2838        };
2839
2840        // Create the shortest path operator
2841        let operator: Box<dyn Operator> = Box::new(
2842            ShortestPathOperator::new(
2843                Arc::clone(&self.store),
2844                input_op,
2845                source_column,
2846                target_column,
2847                sp.edge_type.clone(),
2848                direction,
2849            )
2850            .with_all_paths(sp.all_paths),
2851        );
2852
2853        // Add path length column with the expected naming convention
2854        // The translator expects _path_length_{alias} format for length(p) calls
2855        columns.push(format!("_path_length_{}", sp.path_alias));
2856
2857        Ok((operator, columns))
2858    }
2859
2860    /// Plans an ADD LABEL operator.
2861    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2862        let (input_op, columns) = self.plan_operator(&add_label.input)?;
2863
2864        // Find the node column
2865        let node_column = columns
2866            .iter()
2867            .position(|c| c == &add_label.variable)
2868            .ok_or_else(|| {
2869                Error::Internal(format!(
2870                    "Variable '{}' not found for ADD LABEL",
2871                    add_label.variable
2872                ))
2873            })?;
2874
2875        // Output schema for update count
2876        let output_schema = vec![LogicalType::Int64];
2877        let output_columns = vec!["labels_added".to_string()];
2878
2879        let operator = Box::new(AddLabelOperator::new(
2880            Arc::clone(&self.store),
2881            input_op,
2882            node_column,
2883            add_label.labels.clone(),
2884            output_schema,
2885        ));
2886
2887        Ok((operator, output_columns))
2888    }
2889
2890    /// Plans a REMOVE LABEL operator.
2891    fn plan_remove_label(
2892        &self,
2893        remove_label: &RemoveLabelOp,
2894    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2895        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2896
2897        // Find the node column
2898        let node_column = columns
2899            .iter()
2900            .position(|c| c == &remove_label.variable)
2901            .ok_or_else(|| {
2902                Error::Internal(format!(
2903                    "Variable '{}' not found for REMOVE LABEL",
2904                    remove_label.variable
2905                ))
2906            })?;
2907
2908        // Output schema for update count
2909        let output_schema = vec![LogicalType::Int64];
2910        let output_columns = vec!["labels_removed".to_string()];
2911
2912        let operator = Box::new(RemoveLabelOperator::new(
2913            Arc::clone(&self.store),
2914            input_op,
2915            node_column,
2916            remove_label.labels.clone(),
2917            output_schema,
2918        ));
2919
2920        Ok((operator, output_columns))
2921    }
2922
2923    /// Plans a SET PROPERTY operator.
2924    fn plan_set_property(
2925        &self,
2926        set_prop: &SetPropertyOp,
2927    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2928        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2929
2930        // Find the entity column (node or edge variable)
2931        let entity_column = columns
2932            .iter()
2933            .position(|c| c == &set_prop.variable)
2934            .ok_or_else(|| {
2935                Error::Internal(format!(
2936                    "Variable '{}' not found for SET",
2937                    set_prop.variable
2938                ))
2939            })?;
2940
2941        // Convert properties to PropertySource
2942        let properties: Vec<(String, PropertySource)> = set_prop
2943            .properties
2944            .iter()
2945            .map(|(name, expr)| {
2946                let source = self.expression_to_property_source(expr, &columns)?;
2947                Ok((name.clone(), source))
2948            })
2949            .collect::<Result<Vec<_>>>()?;
2950
2951        // Output schema preserves input schema (passes through)
2952        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2953        let output_columns = columns.clone();
2954
2955        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
2956        let operator = Box::new(SetPropertyOperator::new_for_node(
2957            Arc::clone(&self.store),
2958            input_op,
2959            entity_column,
2960            properties,
2961            output_schema,
2962        ));
2963
2964        Ok((operator, output_columns))
2965    }
2966
2967    /// Converts a logical expression to a PropertySource.
2968    fn expression_to_property_source(
2969        &self,
2970        expr: &LogicalExpression,
2971        columns: &[String],
2972    ) -> Result<PropertySource> {
2973        match expr {
2974            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2975            LogicalExpression::Variable(name) => {
2976                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2977                    Error::Internal(format!("Variable '{}' not found for property source", name))
2978                })?;
2979                Ok(PropertySource::Column(col_idx))
2980            }
2981            LogicalExpression::Parameter(name) => {
2982                // Parameters should be resolved before planning
2983                // For now, treat as a placeholder
2984                Ok(PropertySource::Constant(
2985                    grafeo_common::types::Value::String(format!("${}", name).into()),
2986                ))
2987            }
2988            _ => {
2989                if let Some(value) = Self::try_fold_expression(expr) {
2990                    Ok(PropertySource::Constant(value))
2991                } else {
2992                    Err(Error::Internal(format!(
2993                        "Unsupported expression type for property source: {:?}",
2994                        expr
2995                    )))
2996                }
2997            }
2998        }
2999    }
3000
3001    /// Tries to evaluate a constant expression at plan time.
3002    ///
3003    /// Recursively folds literals, lists, and known function calls (like `vector()`)
3004    /// into concrete values. Returns `None` if the expression contains non-constant
3005    /// parts (variables, property accesses, etc.).
3006    fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
3007        match expr {
3008            LogicalExpression::Literal(v) => Some(v.clone()),
3009            LogicalExpression::List(items) => {
3010                let values: Option<Vec<Value>> =
3011                    items.iter().map(Self::try_fold_expression).collect();
3012                let values = values?;
3013                // All-numeric lists become vectors (matches Python list[float] behavior)
3014                let all_numeric = !values.is_empty()
3015                    && values
3016                        .iter()
3017                        .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
3018                if all_numeric {
3019                    let floats: Vec<f32> = values
3020                        .iter()
3021                        .filter_map(|v| match v {
3022                            Value::Float64(f) => Some(*f as f32),
3023                            Value::Int64(i) => Some(*i as f32),
3024                            _ => None,
3025                        })
3026                        .collect();
3027                    Some(Value::Vector(floats.into()))
3028                } else {
3029                    Some(Value::List(values.into()))
3030                }
3031            }
3032            LogicalExpression::FunctionCall { name, args, .. } => {
3033                match name.to_lowercase().as_str() {
3034                    "vector" => {
3035                        if args.len() != 1 {
3036                            return None;
3037                        }
3038                        let val = Self::try_fold_expression(&args[0])?;
3039                        match val {
3040                            Value::List(items) => {
3041                                let floats: Vec<f32> = items
3042                                    .iter()
3043                                    .filter_map(|v| match v {
3044                                        Value::Float64(f) => Some(*f as f32),
3045                                        Value::Int64(i) => Some(*i as f32),
3046                                        _ => None,
3047                                    })
3048                                    .collect();
3049                                if floats.len() == items.len() {
3050                                    Some(Value::Vector(floats.into()))
3051                                } else {
3052                                    None
3053                                }
3054                            }
3055                            // Already a vector (from all-numeric list folding)
3056                            Value::Vector(v) => Some(Value::Vector(v)),
3057                            _ => None,
3058                        }
3059                    }
3060                    _ => None,
3061                }
3062            }
3063            _ => None,
3064        }
3065    }
3066}
3067
3068/// Converts a logical binary operator to a filter binary operator.
3069pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3070    match op {
3071        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3072        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3073        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3074        BinaryOp::Le => Ok(BinaryFilterOp::Le),
3075        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3076        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3077        BinaryOp::And => Ok(BinaryFilterOp::And),
3078        BinaryOp::Or => Ok(BinaryFilterOp::Or),
3079        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3080        BinaryOp::Add => Ok(BinaryFilterOp::Add),
3081        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3082        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3083        BinaryOp::Div => Ok(BinaryFilterOp::Div),
3084        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3085        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3086        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3087        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3088        BinaryOp::In => Ok(BinaryFilterOp::In),
3089        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3090        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3091        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3092            "Binary operator {:?} not yet supported in filters",
3093            op
3094        ))),
3095    }
3096}
3097
3098/// Converts a logical unary operator to a filter unary operator.
3099pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3100    match op {
3101        UnaryOp::Not => Ok(UnaryFilterOp::Not),
3102        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3103        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3104        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3105    }
3106}
3107
3108/// Converts a logical aggregate function to a physical aggregate function.
3109pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3110    match func {
3111        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3112        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3113        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3114        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3115        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3116        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3117        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3118        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3119        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3120        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3121        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3122    }
3123}
3124
3125/// Converts a logical expression to a filter expression.
3126///
3127/// This is a standalone function that can be used by both LPG and RDF planners.
3128pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3129    match expr {
3130        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3131        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3132        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3133            variable: variable.clone(),
3134            property: property.clone(),
3135        }),
3136        LogicalExpression::Binary { left, op, right } => {
3137            let left_expr = convert_filter_expression(left)?;
3138            let right_expr = convert_filter_expression(right)?;
3139            let filter_op = convert_binary_op(*op)?;
3140            Ok(FilterExpression::Binary {
3141                left: Box::new(left_expr),
3142                op: filter_op,
3143                right: Box::new(right_expr),
3144            })
3145        }
3146        LogicalExpression::Unary { op, operand } => {
3147            let operand_expr = convert_filter_expression(operand)?;
3148            let filter_op = convert_unary_op(*op)?;
3149            Ok(FilterExpression::Unary {
3150                op: filter_op,
3151                operand: Box::new(operand_expr),
3152            })
3153        }
3154        LogicalExpression::FunctionCall { name, args, .. } => {
3155            let filter_args: Vec<FilterExpression> = args
3156                .iter()
3157                .map(convert_filter_expression)
3158                .collect::<Result<Vec<_>>>()?;
3159            Ok(FilterExpression::FunctionCall {
3160                name: name.clone(),
3161                args: filter_args,
3162            })
3163        }
3164        LogicalExpression::Case {
3165            operand,
3166            when_clauses,
3167            else_clause,
3168        } => {
3169            let filter_operand = operand
3170                .as_ref()
3171                .map(|e| convert_filter_expression(e))
3172                .transpose()?
3173                .map(Box::new);
3174            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3175                .iter()
3176                .map(|(cond, result)| {
3177                    Ok((
3178                        convert_filter_expression(cond)?,
3179                        convert_filter_expression(result)?,
3180                    ))
3181                })
3182                .collect::<Result<Vec<_>>>()?;
3183            let filter_else = else_clause
3184                .as_ref()
3185                .map(|e| convert_filter_expression(e))
3186                .transpose()?
3187                .map(Box::new);
3188            Ok(FilterExpression::Case {
3189                operand: filter_operand,
3190                when_clauses: filter_when_clauses,
3191                else_clause: filter_else,
3192            })
3193        }
3194        LogicalExpression::List(items) => {
3195            let filter_items: Vec<FilterExpression> = items
3196                .iter()
3197                .map(convert_filter_expression)
3198                .collect::<Result<Vec<_>>>()?;
3199            Ok(FilterExpression::List(filter_items))
3200        }
3201        LogicalExpression::Map(pairs) => {
3202            let filter_pairs: Vec<(String, FilterExpression)> = pairs
3203                .iter()
3204                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3205                .collect::<Result<Vec<_>>>()?;
3206            Ok(FilterExpression::Map(filter_pairs))
3207        }
3208        LogicalExpression::IndexAccess { base, index } => {
3209            let base_expr = convert_filter_expression(base)?;
3210            let index_expr = convert_filter_expression(index)?;
3211            Ok(FilterExpression::IndexAccess {
3212                base: Box::new(base_expr),
3213                index: Box::new(index_expr),
3214            })
3215        }
3216        LogicalExpression::SliceAccess { base, start, end } => {
3217            let base_expr = convert_filter_expression(base)?;
3218            let start_expr = start
3219                .as_ref()
3220                .map(|s| convert_filter_expression(s))
3221                .transpose()?
3222                .map(Box::new);
3223            let end_expr = end
3224                .as_ref()
3225                .map(|e| convert_filter_expression(e))
3226                .transpose()?
3227                .map(Box::new);
3228            Ok(FilterExpression::SliceAccess {
3229                base: Box::new(base_expr),
3230                start: start_expr,
3231                end: end_expr,
3232            })
3233        }
3234        LogicalExpression::Parameter(_) => Err(Error::Internal(
3235            "Parameters not yet supported in filters".to_string(),
3236        )),
3237        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3238        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3239        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3240        LogicalExpression::ListComprehension {
3241            variable,
3242            list_expr,
3243            filter_expr,
3244            map_expr,
3245        } => {
3246            let list = convert_filter_expression(list_expr)?;
3247            let filter = filter_expr
3248                .as_ref()
3249                .map(|f| convert_filter_expression(f))
3250                .transpose()?
3251                .map(Box::new);
3252            let map = convert_filter_expression(map_expr)?;
3253            Ok(FilterExpression::ListComprehension {
3254                variable: variable.clone(),
3255                list_expr: Box::new(list),
3256                filter_expr: filter,
3257                map_expr: Box::new(map),
3258            })
3259        }
3260        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3261            Error::Internal("Subqueries not yet supported in filters".to_string()),
3262        ),
3263    }
3264}
3265
3266/// Infers the logical type from a value.
3267fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3268    use grafeo_common::types::Value;
3269    match value {
3270        Value::Null => LogicalType::String, // Default type for null
3271        Value::Bool(_) => LogicalType::Bool,
3272        Value::Int64(_) => LogicalType::Int64,
3273        Value::Float64(_) => LogicalType::Float64,
3274        Value::String(_) => LogicalType::String,
3275        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
3276        Value::Timestamp(_) => LogicalType::Timestamp,
3277        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
3278        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
3279        Value::Vector(v) => LogicalType::Vector(v.len()),
3280    }
3281}
3282
3283/// Converts an expression to a string for column naming.
3284fn expression_to_string(expr: &LogicalExpression) -> String {
3285    match expr {
3286        LogicalExpression::Variable(name) => name.clone(),
3287        LogicalExpression::Property { variable, property } => {
3288            format!("{variable}.{property}")
3289        }
3290        LogicalExpression::Literal(value) => format!("{value:?}"),
3291        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3292        _ => "expr".to_string(),
3293    }
3294}
3295
3296/// A physical plan ready for execution.
3297pub struct PhysicalPlan {
3298    /// The root physical operator.
3299    pub operator: Box<dyn Operator>,
3300    /// Column names for the result.
3301    pub columns: Vec<String>,
3302    /// Adaptive execution context with cardinality estimates.
3303    ///
3304    /// When adaptive execution is enabled, this context contains estimated
3305    /// cardinalities at various checkpoints in the plan. During execution,
3306    /// actual row counts are recorded and compared against estimates.
3307    pub adaptive_context: Option<AdaptiveContext>,
3308}
3309
3310impl PhysicalPlan {
3311    /// Returns the column names.
3312    #[must_use]
3313    pub fn columns(&self) -> &[String] {
3314        &self.columns
3315    }
3316
3317    /// Consumes the plan and returns the operator.
3318    pub fn into_operator(self) -> Box<dyn Operator> {
3319        self.operator
3320    }
3321
3322    /// Returns the adaptive context, if adaptive execution is enabled.
3323    #[must_use]
3324    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3325        self.adaptive_context.as_ref()
3326    }
3327
3328    /// Takes ownership of the adaptive context.
3329    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3330        self.adaptive_context.take()
3331    }
3332}
3333
3334/// Helper operator that returns a single result chunk once.
3335///
3336/// Used by the factorized expand chain to wrap the final result.
3337#[allow(dead_code)]
3338struct SingleResultOperator {
3339    result: Option<grafeo_core::execution::DataChunk>,
3340}
3341
3342impl SingleResultOperator {
3343    #[allow(dead_code)]
3344    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3345        Self { result }
3346    }
3347}
3348
3349impl Operator for SingleResultOperator {
3350    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3351        Ok(self.result.take())
3352    }
3353
3354    fn reset(&mut self) {
3355        // Cannot reset - result is consumed
3356    }
3357
3358    fn name(&self) -> &'static str {
3359        "SingleResult"
3360    }
3361}
3362
3363#[cfg(test)]
3364mod tests {
3365    use super::*;
3366    use crate::query::plan::{
3367        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3368        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3369        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3370        SortKey, SortOp,
3371    };
3372    use grafeo_common::types::Value;
3373
3374    fn create_test_store() -> Arc<LpgStore> {
3375        let store = Arc::new(LpgStore::new());
3376        store.create_node(&["Person"]);
3377        store.create_node(&["Person"]);
3378        store.create_node(&["Company"]);
3379        store
3380    }
3381
3382    // ==================== Simple Scan Tests ====================
3383
3384    #[test]
3385    fn test_plan_simple_scan() {
3386        let store = create_test_store();
3387        let planner = Planner::new(store);
3388
3389        // MATCH (n:Person) RETURN n
3390        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3391            items: vec![ReturnItem {
3392                expression: LogicalExpression::Variable("n".to_string()),
3393                alias: None,
3394            }],
3395            distinct: false,
3396            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3397                variable: "n".to_string(),
3398                label: Some("Person".to_string()),
3399                input: None,
3400            })),
3401        }));
3402
3403        let physical = planner.plan(&logical).unwrap();
3404        assert_eq!(physical.columns(), &["n"]);
3405    }
3406
3407    #[test]
3408    fn test_plan_scan_without_label() {
3409        let store = create_test_store();
3410        let planner = Planner::new(store);
3411
3412        // MATCH (n) RETURN n
3413        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3414            items: vec![ReturnItem {
3415                expression: LogicalExpression::Variable("n".to_string()),
3416                alias: None,
3417            }],
3418            distinct: false,
3419            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3420                variable: "n".to_string(),
3421                label: None,
3422                input: None,
3423            })),
3424        }));
3425
3426        let physical = planner.plan(&logical).unwrap();
3427        assert_eq!(physical.columns(), &["n"]);
3428    }
3429
3430    #[test]
3431    fn test_plan_return_with_alias() {
3432        let store = create_test_store();
3433        let planner = Planner::new(store);
3434
3435        // MATCH (n:Person) RETURN n AS person
3436        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3437            items: vec![ReturnItem {
3438                expression: LogicalExpression::Variable("n".to_string()),
3439                alias: Some("person".to_string()),
3440            }],
3441            distinct: false,
3442            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3443                variable: "n".to_string(),
3444                label: Some("Person".to_string()),
3445                input: None,
3446            })),
3447        }));
3448
3449        let physical = planner.plan(&logical).unwrap();
3450        assert_eq!(physical.columns(), &["person"]);
3451    }
3452
3453    #[test]
3454    fn test_plan_return_property() {
3455        let store = create_test_store();
3456        let planner = Planner::new(store);
3457
3458        // MATCH (n:Person) RETURN n.name
3459        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3460            items: vec![ReturnItem {
3461                expression: LogicalExpression::Property {
3462                    variable: "n".to_string(),
3463                    property: "name".to_string(),
3464                },
3465                alias: None,
3466            }],
3467            distinct: false,
3468            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3469                variable: "n".to_string(),
3470                label: Some("Person".to_string()),
3471                input: None,
3472            })),
3473        }));
3474
3475        let physical = planner.plan(&logical).unwrap();
3476        assert_eq!(physical.columns(), &["n.name"]);
3477    }
3478
3479    #[test]
3480    fn test_plan_return_literal() {
3481        let store = create_test_store();
3482        let planner = Planner::new(store);
3483
3484        // MATCH (n) RETURN 42 AS answer
3485        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3486            items: vec![ReturnItem {
3487                expression: LogicalExpression::Literal(Value::Int64(42)),
3488                alias: Some("answer".to_string()),
3489            }],
3490            distinct: false,
3491            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3492                variable: "n".to_string(),
3493                label: None,
3494                input: None,
3495            })),
3496        }));
3497
3498        let physical = planner.plan(&logical).unwrap();
3499        assert_eq!(physical.columns(), &["answer"]);
3500    }
3501
3502    // ==================== Filter Tests ====================
3503
3504    #[test]
3505    fn test_plan_filter_equality() {
3506        let store = create_test_store();
3507        let planner = Planner::new(store);
3508
3509        // MATCH (n:Person) WHERE n.age = 30 RETURN n
3510        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3511            items: vec![ReturnItem {
3512                expression: LogicalExpression::Variable("n".to_string()),
3513                alias: None,
3514            }],
3515            distinct: false,
3516            input: Box::new(LogicalOperator::Filter(FilterOp {
3517                predicate: LogicalExpression::Binary {
3518                    left: Box::new(LogicalExpression::Property {
3519                        variable: "n".to_string(),
3520                        property: "age".to_string(),
3521                    }),
3522                    op: BinaryOp::Eq,
3523                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3524                },
3525                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3526                    variable: "n".to_string(),
3527                    label: Some("Person".to_string()),
3528                    input: None,
3529                })),
3530            })),
3531        }));
3532
3533        let physical = planner.plan(&logical).unwrap();
3534        assert_eq!(physical.columns(), &["n"]);
3535    }
3536
3537    #[test]
3538    fn test_plan_filter_compound_and() {
3539        let store = create_test_store();
3540        let planner = Planner::new(store);
3541
3542        // WHERE n.age > 20 AND n.age < 40
3543        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3544            items: vec![ReturnItem {
3545                expression: LogicalExpression::Variable("n".to_string()),
3546                alias: None,
3547            }],
3548            distinct: false,
3549            input: Box::new(LogicalOperator::Filter(FilterOp {
3550                predicate: LogicalExpression::Binary {
3551                    left: Box::new(LogicalExpression::Binary {
3552                        left: Box::new(LogicalExpression::Property {
3553                            variable: "n".to_string(),
3554                            property: "age".to_string(),
3555                        }),
3556                        op: BinaryOp::Gt,
3557                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3558                    }),
3559                    op: BinaryOp::And,
3560                    right: Box::new(LogicalExpression::Binary {
3561                        left: Box::new(LogicalExpression::Property {
3562                            variable: "n".to_string(),
3563                            property: "age".to_string(),
3564                        }),
3565                        op: BinaryOp::Lt,
3566                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3567                    }),
3568                },
3569                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3570                    variable: "n".to_string(),
3571                    label: None,
3572                    input: None,
3573                })),
3574            })),
3575        }));
3576
3577        let physical = planner.plan(&logical).unwrap();
3578        assert_eq!(physical.columns(), &["n"]);
3579    }
3580
3581    #[test]
3582    fn test_plan_filter_unary_not() {
3583        let store = create_test_store();
3584        let planner = Planner::new(store);
3585
3586        // WHERE NOT n.active
3587        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3588            items: vec![ReturnItem {
3589                expression: LogicalExpression::Variable("n".to_string()),
3590                alias: None,
3591            }],
3592            distinct: false,
3593            input: Box::new(LogicalOperator::Filter(FilterOp {
3594                predicate: LogicalExpression::Unary {
3595                    op: UnaryOp::Not,
3596                    operand: Box::new(LogicalExpression::Property {
3597                        variable: "n".to_string(),
3598                        property: "active".to_string(),
3599                    }),
3600                },
3601                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3602                    variable: "n".to_string(),
3603                    label: None,
3604                    input: None,
3605                })),
3606            })),
3607        }));
3608
3609        let physical = planner.plan(&logical).unwrap();
3610        assert_eq!(physical.columns(), &["n"]);
3611    }
3612
3613    #[test]
3614    fn test_plan_filter_is_null() {
3615        let store = create_test_store();
3616        let planner = Planner::new(store);
3617
3618        // WHERE n.email IS NULL
3619        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3620            items: vec![ReturnItem {
3621                expression: LogicalExpression::Variable("n".to_string()),
3622                alias: None,
3623            }],
3624            distinct: false,
3625            input: Box::new(LogicalOperator::Filter(FilterOp {
3626                predicate: LogicalExpression::Unary {
3627                    op: UnaryOp::IsNull,
3628                    operand: Box::new(LogicalExpression::Property {
3629                        variable: "n".to_string(),
3630                        property: "email".to_string(),
3631                    }),
3632                },
3633                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3634                    variable: "n".to_string(),
3635                    label: None,
3636                    input: None,
3637                })),
3638            })),
3639        }));
3640
3641        let physical = planner.plan(&logical).unwrap();
3642        assert_eq!(physical.columns(), &["n"]);
3643    }
3644
3645    #[test]
3646    fn test_plan_filter_function_call() {
3647        let store = create_test_store();
3648        let planner = Planner::new(store);
3649
3650        // WHERE size(n.friends) > 0
3651        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3652            items: vec![ReturnItem {
3653                expression: LogicalExpression::Variable("n".to_string()),
3654                alias: None,
3655            }],
3656            distinct: false,
3657            input: Box::new(LogicalOperator::Filter(FilterOp {
3658                predicate: LogicalExpression::Binary {
3659                    left: Box::new(LogicalExpression::FunctionCall {
3660                        name: "size".to_string(),
3661                        args: vec![LogicalExpression::Property {
3662                            variable: "n".to_string(),
3663                            property: "friends".to_string(),
3664                        }],
3665                        distinct: false,
3666                    }),
3667                    op: BinaryOp::Gt,
3668                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3669                },
3670                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3671                    variable: "n".to_string(),
3672                    label: None,
3673                    input: None,
3674                })),
3675            })),
3676        }));
3677
3678        let physical = planner.plan(&logical).unwrap();
3679        assert_eq!(physical.columns(), &["n"]);
3680    }
3681
3682    // ==================== Expand Tests ====================
3683
3684    #[test]
3685    fn test_plan_expand_outgoing() {
3686        let store = create_test_store();
3687        let planner = Planner::new(store);
3688
3689        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
3690        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3691            items: vec![
3692                ReturnItem {
3693                    expression: LogicalExpression::Variable("a".to_string()),
3694                    alias: None,
3695                },
3696                ReturnItem {
3697                    expression: LogicalExpression::Variable("b".to_string()),
3698                    alias: None,
3699                },
3700            ],
3701            distinct: false,
3702            input: Box::new(LogicalOperator::Expand(ExpandOp {
3703                from_variable: "a".to_string(),
3704                to_variable: "b".to_string(),
3705                edge_variable: None,
3706                direction: ExpandDirection::Outgoing,
3707                edge_type: Some("KNOWS".to_string()),
3708                min_hops: 1,
3709                max_hops: Some(1),
3710                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3711                    variable: "a".to_string(),
3712                    label: Some("Person".to_string()),
3713                    input: None,
3714                })),
3715                path_alias: None,
3716            })),
3717        }));
3718
3719        let physical = planner.plan(&logical).unwrap();
3720        // The return should have columns [a, b]
3721        assert!(physical.columns().contains(&"a".to_string()));
3722        assert!(physical.columns().contains(&"b".to_string()));
3723    }
3724
3725    #[test]
3726    fn test_plan_expand_with_edge_variable() {
3727        let store = create_test_store();
3728        let planner = Planner::new(store);
3729
3730        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
3731        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3732            items: vec![
3733                ReturnItem {
3734                    expression: LogicalExpression::Variable("a".to_string()),
3735                    alias: None,
3736                },
3737                ReturnItem {
3738                    expression: LogicalExpression::Variable("r".to_string()),
3739                    alias: None,
3740                },
3741                ReturnItem {
3742                    expression: LogicalExpression::Variable("b".to_string()),
3743                    alias: None,
3744                },
3745            ],
3746            distinct: false,
3747            input: Box::new(LogicalOperator::Expand(ExpandOp {
3748                from_variable: "a".to_string(),
3749                to_variable: "b".to_string(),
3750                edge_variable: Some("r".to_string()),
3751                direction: ExpandDirection::Outgoing,
3752                edge_type: Some("KNOWS".to_string()),
3753                min_hops: 1,
3754                max_hops: Some(1),
3755                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3756                    variable: "a".to_string(),
3757                    label: None,
3758                    input: None,
3759                })),
3760                path_alias: None,
3761            })),
3762        }));
3763
3764        let physical = planner.plan(&logical).unwrap();
3765        assert!(physical.columns().contains(&"a".to_string()));
3766        assert!(physical.columns().contains(&"r".to_string()));
3767        assert!(physical.columns().contains(&"b".to_string()));
3768    }
3769
3770    // ==================== Limit/Skip/Sort Tests ====================
3771
3772    #[test]
3773    fn test_plan_limit() {
3774        let store = create_test_store();
3775        let planner = Planner::new(store);
3776
3777        // MATCH (n) RETURN n LIMIT 10
3778        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3779            items: vec![ReturnItem {
3780                expression: LogicalExpression::Variable("n".to_string()),
3781                alias: None,
3782            }],
3783            distinct: false,
3784            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3785                count: 10,
3786                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3787                    variable: "n".to_string(),
3788                    label: None,
3789                    input: None,
3790                })),
3791            })),
3792        }));
3793
3794        let physical = planner.plan(&logical).unwrap();
3795        assert_eq!(physical.columns(), &["n"]);
3796    }
3797
3798    #[test]
3799    fn test_plan_skip() {
3800        let store = create_test_store();
3801        let planner = Planner::new(store);
3802
3803        // MATCH (n) RETURN n SKIP 5
3804        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3805            items: vec![ReturnItem {
3806                expression: LogicalExpression::Variable("n".to_string()),
3807                alias: None,
3808            }],
3809            distinct: false,
3810            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3811                count: 5,
3812                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3813                    variable: "n".to_string(),
3814                    label: None,
3815                    input: None,
3816                })),
3817            })),
3818        }));
3819
3820        let physical = planner.plan(&logical).unwrap();
3821        assert_eq!(physical.columns(), &["n"]);
3822    }
3823
3824    #[test]
3825    fn test_plan_sort() {
3826        let store = create_test_store();
3827        let planner = Planner::new(store);
3828
3829        // MATCH (n) RETURN n ORDER BY n.name ASC
3830        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3831            items: vec![ReturnItem {
3832                expression: LogicalExpression::Variable("n".to_string()),
3833                alias: None,
3834            }],
3835            distinct: false,
3836            input: Box::new(LogicalOperator::Sort(SortOp {
3837                keys: vec![SortKey {
3838                    expression: LogicalExpression::Variable("n".to_string()),
3839                    order: SortOrder::Ascending,
3840                }],
3841                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3842                    variable: "n".to_string(),
3843                    label: None,
3844                    input: None,
3845                })),
3846            })),
3847        }));
3848
3849        let physical = planner.plan(&logical).unwrap();
3850        assert_eq!(physical.columns(), &["n"]);
3851    }
3852
3853    #[test]
3854    fn test_plan_sort_descending() {
3855        let store = create_test_store();
3856        let planner = Planner::new(store);
3857
3858        // ORDER BY n DESC
3859        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3860            items: vec![ReturnItem {
3861                expression: LogicalExpression::Variable("n".to_string()),
3862                alias: None,
3863            }],
3864            distinct: false,
3865            input: Box::new(LogicalOperator::Sort(SortOp {
3866                keys: vec![SortKey {
3867                    expression: LogicalExpression::Variable("n".to_string()),
3868                    order: SortOrder::Descending,
3869                }],
3870                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3871                    variable: "n".to_string(),
3872                    label: None,
3873                    input: None,
3874                })),
3875            })),
3876        }));
3877
3878        let physical = planner.plan(&logical).unwrap();
3879        assert_eq!(physical.columns(), &["n"]);
3880    }
3881
3882    #[test]
3883    fn test_plan_distinct() {
3884        let store = create_test_store();
3885        let planner = Planner::new(store);
3886
3887        // MATCH (n) RETURN DISTINCT n
3888        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3889            items: vec![ReturnItem {
3890                expression: LogicalExpression::Variable("n".to_string()),
3891                alias: None,
3892            }],
3893            distinct: false,
3894            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3895                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3896                    variable: "n".to_string(),
3897                    label: None,
3898                    input: None,
3899                })),
3900                columns: None,
3901            })),
3902        }));
3903
3904        let physical = planner.plan(&logical).unwrap();
3905        assert_eq!(physical.columns(), &["n"]);
3906    }
3907
3908    // ==================== Aggregate Tests ====================
3909
3910    #[test]
3911    fn test_plan_aggregate_count() {
3912        let store = create_test_store();
3913        let planner = Planner::new(store);
3914
3915        // MATCH (n) RETURN count(n)
3916        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3917            items: vec![ReturnItem {
3918                expression: LogicalExpression::Variable("cnt".to_string()),
3919                alias: None,
3920            }],
3921            distinct: false,
3922            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3923                group_by: vec![],
3924                aggregates: vec![LogicalAggregateExpr {
3925                    function: LogicalAggregateFunction::Count,
3926                    expression: Some(LogicalExpression::Variable("n".to_string())),
3927                    distinct: false,
3928                    alias: Some("cnt".to_string()),
3929                    percentile: None,
3930                }],
3931                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3932                    variable: "n".to_string(),
3933                    label: None,
3934                    input: None,
3935                })),
3936                having: None,
3937            })),
3938        }));
3939
3940        let physical = planner.plan(&logical).unwrap();
3941        assert!(physical.columns().contains(&"cnt".to_string()));
3942    }
3943
3944    #[test]
3945    fn test_plan_aggregate_with_group_by() {
3946        let store = create_test_store();
3947        let planner = Planner::new(store);
3948
3949        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
3950        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3951            group_by: vec![LogicalExpression::Property {
3952                variable: "n".to_string(),
3953                property: "city".to_string(),
3954            }],
3955            aggregates: vec![LogicalAggregateExpr {
3956                function: LogicalAggregateFunction::Count,
3957                expression: Some(LogicalExpression::Variable("n".to_string())),
3958                distinct: false,
3959                alias: Some("cnt".to_string()),
3960                percentile: None,
3961            }],
3962            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3963                variable: "n".to_string(),
3964                label: Some("Person".to_string()),
3965                input: None,
3966            })),
3967            having: None,
3968        }));
3969
3970        let physical = planner.plan(&logical).unwrap();
3971        assert_eq!(physical.columns().len(), 2);
3972    }
3973
3974    #[test]
3975    fn test_plan_aggregate_sum() {
3976        let store = create_test_store();
3977        let planner = Planner::new(store);
3978
3979        // SUM(n.value)
3980        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3981            group_by: vec![],
3982            aggregates: vec![LogicalAggregateExpr {
3983                function: LogicalAggregateFunction::Sum,
3984                expression: Some(LogicalExpression::Property {
3985                    variable: "n".to_string(),
3986                    property: "value".to_string(),
3987                }),
3988                distinct: false,
3989                alias: Some("total".to_string()),
3990                percentile: None,
3991            }],
3992            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3993                variable: "n".to_string(),
3994                label: None,
3995                input: None,
3996            })),
3997            having: None,
3998        }));
3999
4000        let physical = planner.plan(&logical).unwrap();
4001        assert!(physical.columns().contains(&"total".to_string()));
4002    }
4003
4004    #[test]
4005    fn test_plan_aggregate_avg() {
4006        let store = create_test_store();
4007        let planner = Planner::new(store);
4008
4009        // AVG(n.score)
4010        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4011            group_by: vec![],
4012            aggregates: vec![LogicalAggregateExpr {
4013                function: LogicalAggregateFunction::Avg,
4014                expression: Some(LogicalExpression::Property {
4015                    variable: "n".to_string(),
4016                    property: "score".to_string(),
4017                }),
4018                distinct: false,
4019                alias: Some("average".to_string()),
4020                percentile: None,
4021            }],
4022            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4023                variable: "n".to_string(),
4024                label: None,
4025                input: None,
4026            })),
4027            having: None,
4028        }));
4029
4030        let physical = planner.plan(&logical).unwrap();
4031        assert!(physical.columns().contains(&"average".to_string()));
4032    }
4033
4034    #[test]
4035    fn test_plan_aggregate_min_max() {
4036        let store = create_test_store();
4037        let planner = Planner::new(store);
4038
4039        // MIN(n.age), MAX(n.age)
4040        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4041            group_by: vec![],
4042            aggregates: vec![
4043                LogicalAggregateExpr {
4044                    function: LogicalAggregateFunction::Min,
4045                    expression: Some(LogicalExpression::Property {
4046                        variable: "n".to_string(),
4047                        property: "age".to_string(),
4048                    }),
4049                    distinct: false,
4050                    alias: Some("youngest".to_string()),
4051                    percentile: None,
4052                },
4053                LogicalAggregateExpr {
4054                    function: LogicalAggregateFunction::Max,
4055                    expression: Some(LogicalExpression::Property {
4056                        variable: "n".to_string(),
4057                        property: "age".to_string(),
4058                    }),
4059                    distinct: false,
4060                    alias: Some("oldest".to_string()),
4061                    percentile: None,
4062                },
4063            ],
4064            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4065                variable: "n".to_string(),
4066                label: None,
4067                input: None,
4068            })),
4069            having: None,
4070        }));
4071
4072        let physical = planner.plan(&logical).unwrap();
4073        assert!(physical.columns().contains(&"youngest".to_string()));
4074        assert!(physical.columns().contains(&"oldest".to_string()));
4075    }
4076
4077    // ==================== Join Tests ====================
4078
4079    #[test]
4080    fn test_plan_inner_join() {
4081        let store = create_test_store();
4082        let planner = Planner::new(store);
4083
4084        // Inner join between two scans
4085        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4086            items: vec![
4087                ReturnItem {
4088                    expression: LogicalExpression::Variable("a".to_string()),
4089                    alias: None,
4090                },
4091                ReturnItem {
4092                    expression: LogicalExpression::Variable("b".to_string()),
4093                    alias: None,
4094                },
4095            ],
4096            distinct: false,
4097            input: Box::new(LogicalOperator::Join(JoinOp {
4098                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4099                    variable: "a".to_string(),
4100                    label: Some("Person".to_string()),
4101                    input: None,
4102                })),
4103                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4104                    variable: "b".to_string(),
4105                    label: Some("Company".to_string()),
4106                    input: None,
4107                })),
4108                join_type: JoinType::Inner,
4109                conditions: vec![JoinCondition {
4110                    left: LogicalExpression::Variable("a".to_string()),
4111                    right: LogicalExpression::Variable("b".to_string()),
4112                }],
4113            })),
4114        }));
4115
4116        let physical = planner.plan(&logical).unwrap();
4117        assert!(physical.columns().contains(&"a".to_string()));
4118        assert!(physical.columns().contains(&"b".to_string()));
4119    }
4120
4121    #[test]
4122    fn test_plan_cross_join() {
4123        let store = create_test_store();
4124        let planner = Planner::new(store);
4125
4126        // Cross join (no conditions)
4127        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4128            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4129                variable: "a".to_string(),
4130                label: None,
4131                input: None,
4132            })),
4133            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4134                variable: "b".to_string(),
4135                label: None,
4136                input: None,
4137            })),
4138            join_type: JoinType::Cross,
4139            conditions: vec![],
4140        }));
4141
4142        let physical = planner.plan(&logical).unwrap();
4143        assert_eq!(physical.columns().len(), 2);
4144    }
4145
4146    #[test]
4147    fn test_plan_left_join() {
4148        let store = create_test_store();
4149        let planner = Planner::new(store);
4150
4151        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4152            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4153                variable: "a".to_string(),
4154                label: None,
4155                input: None,
4156            })),
4157            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4158                variable: "b".to_string(),
4159                label: None,
4160                input: None,
4161            })),
4162            join_type: JoinType::Left,
4163            conditions: vec![],
4164        }));
4165
4166        let physical = planner.plan(&logical).unwrap();
4167        assert_eq!(physical.columns().len(), 2);
4168    }
4169
4170    // ==================== Mutation Tests ====================
4171
4172    #[test]
4173    fn test_plan_create_node() {
4174        let store = create_test_store();
4175        let planner = Planner::new(store);
4176
4177        // CREATE (n:Person {name: 'Alice'})
4178        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4179            variable: "n".to_string(),
4180            labels: vec!["Person".to_string()],
4181            properties: vec![(
4182                "name".to_string(),
4183                LogicalExpression::Literal(Value::String("Alice".into())),
4184            )],
4185            input: None,
4186        }));
4187
4188        let physical = planner.plan(&logical).unwrap();
4189        assert!(physical.columns().contains(&"n".to_string()));
4190    }
4191
4192    #[test]
4193    fn test_plan_create_edge() {
4194        let store = create_test_store();
4195        let planner = Planner::new(store);
4196
4197        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
4198        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4199            variable: Some("r".to_string()),
4200            from_variable: "a".to_string(),
4201            to_variable: "b".to_string(),
4202            edge_type: "KNOWS".to_string(),
4203            properties: vec![],
4204            input: Box::new(LogicalOperator::Join(JoinOp {
4205                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4206                    variable: "a".to_string(),
4207                    label: None,
4208                    input: None,
4209                })),
4210                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4211                    variable: "b".to_string(),
4212                    label: None,
4213                    input: None,
4214                })),
4215                join_type: JoinType::Cross,
4216                conditions: vec![],
4217            })),
4218        }));
4219
4220        let physical = planner.plan(&logical).unwrap();
4221        assert!(physical.columns().contains(&"r".to_string()));
4222    }
4223
4224    #[test]
4225    fn test_plan_delete_node() {
4226        let store = create_test_store();
4227        let planner = Planner::new(store);
4228
4229        // MATCH (n) DELETE n
4230        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4231            variable: "n".to_string(),
4232            detach: false,
4233            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4234                variable: "n".to_string(),
4235                label: None,
4236                input: None,
4237            })),
4238        }));
4239
4240        let physical = planner.plan(&logical).unwrap();
4241        assert!(physical.columns().contains(&"deleted_count".to_string()));
4242    }
4243
4244    // ==================== Error Cases ====================
4245
4246    #[test]
4247    fn test_plan_empty_errors() {
4248        let store = create_test_store();
4249        let planner = Planner::new(store);
4250
4251        let logical = LogicalPlan::new(LogicalOperator::Empty);
4252        let result = planner.plan(&logical);
4253        assert!(result.is_err());
4254    }
4255
4256    #[test]
4257    fn test_plan_missing_variable_in_return() {
4258        let store = create_test_store();
4259        let planner = Planner::new(store);
4260
4261        // Return variable that doesn't exist in input
4262        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4263            items: vec![ReturnItem {
4264                expression: LogicalExpression::Variable("missing".to_string()),
4265                alias: None,
4266            }],
4267            distinct: false,
4268            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4269                variable: "n".to_string(),
4270                label: None,
4271                input: None,
4272            })),
4273        }));
4274
4275        let result = planner.plan(&logical);
4276        assert!(result.is_err());
4277    }
4278
4279    // ==================== Helper Function Tests ====================
4280
4281    #[test]
4282    fn test_convert_binary_ops() {
4283        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4284        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4285        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4286        assert!(convert_binary_op(BinaryOp::Le).is_ok());
4287        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4288        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4289        assert!(convert_binary_op(BinaryOp::And).is_ok());
4290        assert!(convert_binary_op(BinaryOp::Or).is_ok());
4291        assert!(convert_binary_op(BinaryOp::Add).is_ok());
4292        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4293        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4294        assert!(convert_binary_op(BinaryOp::Div).is_ok());
4295    }
4296
4297    #[test]
4298    fn test_convert_unary_ops() {
4299        assert!(convert_unary_op(UnaryOp::Not).is_ok());
4300        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4301        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4302        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4303    }
4304
4305    #[test]
4306    fn test_convert_aggregate_functions() {
4307        assert!(matches!(
4308            convert_aggregate_function(LogicalAggregateFunction::Count),
4309            PhysicalAggregateFunction::Count
4310        ));
4311        assert!(matches!(
4312            convert_aggregate_function(LogicalAggregateFunction::Sum),
4313            PhysicalAggregateFunction::Sum
4314        ));
4315        assert!(matches!(
4316            convert_aggregate_function(LogicalAggregateFunction::Avg),
4317            PhysicalAggregateFunction::Avg
4318        ));
4319        assert!(matches!(
4320            convert_aggregate_function(LogicalAggregateFunction::Min),
4321            PhysicalAggregateFunction::Min
4322        ));
4323        assert!(matches!(
4324            convert_aggregate_function(LogicalAggregateFunction::Max),
4325            PhysicalAggregateFunction::Max
4326        ));
4327    }
4328
4329    #[test]
4330    fn test_planner_accessors() {
4331        let store = create_test_store();
4332        let planner = Planner::new(Arc::clone(&store));
4333
4334        assert!(planner.tx_id().is_none());
4335        assert!(planner.tx_manager().is_none());
4336        let _ = planner.viewing_epoch(); // Just ensure it's accessible
4337    }
4338
4339    #[test]
4340    fn test_physical_plan_accessors() {
4341        let store = create_test_store();
4342        let planner = Planner::new(store);
4343
4344        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4345            variable: "n".to_string(),
4346            label: None,
4347            input: None,
4348        }));
4349
4350        let physical = planner.plan(&logical).unwrap();
4351        assert_eq!(physical.columns(), &["n"]);
4352
4353        // Test into_operator
4354        let _ = physical.into_operator();
4355    }
4356
4357    // ==================== Adaptive Planning Tests ====================
4358
4359    #[test]
4360    fn test_plan_adaptive_with_scan() {
4361        let store = create_test_store();
4362        let planner = Planner::new(store);
4363
4364        // MATCH (n:Person) RETURN n
4365        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4366            items: vec![ReturnItem {
4367                expression: LogicalExpression::Variable("n".to_string()),
4368                alias: None,
4369            }],
4370            distinct: false,
4371            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4372                variable: "n".to_string(),
4373                label: Some("Person".to_string()),
4374                input: None,
4375            })),
4376        }));
4377
4378        let physical = planner.plan_adaptive(&logical).unwrap();
4379        assert_eq!(physical.columns(), &["n"]);
4380        // Should have adaptive context with estimates
4381        assert!(physical.adaptive_context.is_some());
4382    }
4383
4384    #[test]
4385    fn test_plan_adaptive_with_filter() {
4386        let store = create_test_store();
4387        let planner = Planner::new(store);
4388
4389        // MATCH (n) WHERE n.age > 30 RETURN n
4390        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4391            items: vec![ReturnItem {
4392                expression: LogicalExpression::Variable("n".to_string()),
4393                alias: None,
4394            }],
4395            distinct: false,
4396            input: Box::new(LogicalOperator::Filter(FilterOp {
4397                predicate: LogicalExpression::Binary {
4398                    left: Box::new(LogicalExpression::Property {
4399                        variable: "n".to_string(),
4400                        property: "age".to_string(),
4401                    }),
4402                    op: BinaryOp::Gt,
4403                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4404                },
4405                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4406                    variable: "n".to_string(),
4407                    label: None,
4408                    input: None,
4409                })),
4410            })),
4411        }));
4412
4413        let physical = planner.plan_adaptive(&logical).unwrap();
4414        assert!(physical.adaptive_context.is_some());
4415    }
4416
4417    #[test]
4418    fn test_plan_adaptive_with_expand() {
4419        let store = create_test_store();
4420        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4421
4422        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
4423        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4424            items: vec![
4425                ReturnItem {
4426                    expression: LogicalExpression::Variable("a".to_string()),
4427                    alias: None,
4428                },
4429                ReturnItem {
4430                    expression: LogicalExpression::Variable("b".to_string()),
4431                    alias: None,
4432                },
4433            ],
4434            distinct: false,
4435            input: Box::new(LogicalOperator::Expand(ExpandOp {
4436                from_variable: "a".to_string(),
4437                to_variable: "b".to_string(),
4438                edge_variable: None,
4439                direction: ExpandDirection::Outgoing,
4440                edge_type: Some("KNOWS".to_string()),
4441                min_hops: 1,
4442                max_hops: Some(1),
4443                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4444                    variable: "a".to_string(),
4445                    label: None,
4446                    input: None,
4447                })),
4448                path_alias: None,
4449            })),
4450        }));
4451
4452        let physical = planner.plan_adaptive(&logical).unwrap();
4453        assert!(physical.adaptive_context.is_some());
4454    }
4455
4456    #[test]
4457    fn test_plan_adaptive_with_join() {
4458        let store = create_test_store();
4459        let planner = Planner::new(store);
4460
4461        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4462            items: vec![
4463                ReturnItem {
4464                    expression: LogicalExpression::Variable("a".to_string()),
4465                    alias: None,
4466                },
4467                ReturnItem {
4468                    expression: LogicalExpression::Variable("b".to_string()),
4469                    alias: None,
4470                },
4471            ],
4472            distinct: false,
4473            input: Box::new(LogicalOperator::Join(JoinOp {
4474                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4475                    variable: "a".to_string(),
4476                    label: None,
4477                    input: None,
4478                })),
4479                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4480                    variable: "b".to_string(),
4481                    label: None,
4482                    input: None,
4483                })),
4484                join_type: JoinType::Cross,
4485                conditions: vec![],
4486            })),
4487        }));
4488
4489        let physical = planner.plan_adaptive(&logical).unwrap();
4490        assert!(physical.adaptive_context.is_some());
4491    }
4492
4493    #[test]
4494    fn test_plan_adaptive_with_aggregate() {
4495        let store = create_test_store();
4496        let planner = Planner::new(store);
4497
4498        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4499            group_by: vec![],
4500            aggregates: vec![LogicalAggregateExpr {
4501                function: LogicalAggregateFunction::Count,
4502                expression: Some(LogicalExpression::Variable("n".to_string())),
4503                distinct: false,
4504                alias: Some("cnt".to_string()),
4505                percentile: None,
4506            }],
4507            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4508                variable: "n".to_string(),
4509                label: None,
4510                input: None,
4511            })),
4512            having: None,
4513        }));
4514
4515        let physical = planner.plan_adaptive(&logical).unwrap();
4516        assert!(physical.adaptive_context.is_some());
4517    }
4518
4519    #[test]
4520    fn test_plan_adaptive_with_distinct() {
4521        let store = create_test_store();
4522        let planner = Planner::new(store);
4523
4524        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4525            items: vec![ReturnItem {
4526                expression: LogicalExpression::Variable("n".to_string()),
4527                alias: None,
4528            }],
4529            distinct: false,
4530            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4531                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4532                    variable: "n".to_string(),
4533                    label: None,
4534                    input: None,
4535                })),
4536                columns: None,
4537            })),
4538        }));
4539
4540        let physical = planner.plan_adaptive(&logical).unwrap();
4541        assert!(physical.adaptive_context.is_some());
4542    }
4543
4544    #[test]
4545    fn test_plan_adaptive_with_limit() {
4546        let store = create_test_store();
4547        let planner = Planner::new(store);
4548
4549        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4550            items: vec![ReturnItem {
4551                expression: LogicalExpression::Variable("n".to_string()),
4552                alias: None,
4553            }],
4554            distinct: false,
4555            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4556                count: 10,
4557                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4558                    variable: "n".to_string(),
4559                    label: None,
4560                    input: None,
4561                })),
4562            })),
4563        }));
4564
4565        let physical = planner.plan_adaptive(&logical).unwrap();
4566        assert!(physical.adaptive_context.is_some());
4567    }
4568
4569    #[test]
4570    fn test_plan_adaptive_with_skip() {
4571        let store = create_test_store();
4572        let planner = Planner::new(store);
4573
4574        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4575            items: vec![ReturnItem {
4576                expression: LogicalExpression::Variable("n".to_string()),
4577                alias: None,
4578            }],
4579            distinct: false,
4580            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4581                count: 5,
4582                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4583                    variable: "n".to_string(),
4584                    label: None,
4585                    input: None,
4586                })),
4587            })),
4588        }));
4589
4590        let physical = planner.plan_adaptive(&logical).unwrap();
4591        assert!(physical.adaptive_context.is_some());
4592    }
4593
4594    #[test]
4595    fn test_plan_adaptive_with_sort() {
4596        let store = create_test_store();
4597        let planner = Planner::new(store);
4598
4599        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4600            items: vec![ReturnItem {
4601                expression: LogicalExpression::Variable("n".to_string()),
4602                alias: None,
4603            }],
4604            distinct: false,
4605            input: Box::new(LogicalOperator::Sort(SortOp {
4606                keys: vec![SortKey {
4607                    expression: LogicalExpression::Variable("n".to_string()),
4608                    order: SortOrder::Ascending,
4609                }],
4610                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4611                    variable: "n".to_string(),
4612                    label: None,
4613                    input: None,
4614                })),
4615            })),
4616        }));
4617
4618        let physical = planner.plan_adaptive(&logical).unwrap();
4619        assert!(physical.adaptive_context.is_some());
4620    }
4621
4622    #[test]
4623    fn test_plan_adaptive_with_union() {
4624        let store = create_test_store();
4625        let planner = Planner::new(store);
4626
4627        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4628            items: vec![ReturnItem {
4629                expression: LogicalExpression::Variable("n".to_string()),
4630                alias: None,
4631            }],
4632            distinct: false,
4633            input: Box::new(LogicalOperator::Union(UnionOp {
4634                inputs: vec![
4635                    LogicalOperator::NodeScan(NodeScanOp {
4636                        variable: "n".to_string(),
4637                        label: Some("Person".to_string()),
4638                        input: None,
4639                    }),
4640                    LogicalOperator::NodeScan(NodeScanOp {
4641                        variable: "n".to_string(),
4642                        label: Some("Company".to_string()),
4643                        input: None,
4644                    }),
4645                ],
4646            })),
4647        }));
4648
4649        let physical = planner.plan_adaptive(&logical).unwrap();
4650        assert!(physical.adaptive_context.is_some());
4651    }
4652
4653    // ==================== Variable Length Path Tests ====================
4654
4655    #[test]
4656    fn test_plan_expand_variable_length() {
4657        let store = create_test_store();
4658        let planner = Planner::new(store);
4659
4660        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
4661        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4662            items: vec![
4663                ReturnItem {
4664                    expression: LogicalExpression::Variable("a".to_string()),
4665                    alias: None,
4666                },
4667                ReturnItem {
4668                    expression: LogicalExpression::Variable("b".to_string()),
4669                    alias: None,
4670                },
4671            ],
4672            distinct: false,
4673            input: Box::new(LogicalOperator::Expand(ExpandOp {
4674                from_variable: "a".to_string(),
4675                to_variable: "b".to_string(),
4676                edge_variable: None,
4677                direction: ExpandDirection::Outgoing,
4678                edge_type: Some("KNOWS".to_string()),
4679                min_hops: 1,
4680                max_hops: Some(3),
4681                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4682                    variable: "a".to_string(),
4683                    label: None,
4684                    input: None,
4685                })),
4686                path_alias: None,
4687            })),
4688        }));
4689
4690        let physical = planner.plan(&logical).unwrap();
4691        assert!(physical.columns().contains(&"a".to_string()));
4692        assert!(physical.columns().contains(&"b".to_string()));
4693    }
4694
4695    #[test]
4696    fn test_plan_expand_with_path_alias() {
4697        let store = create_test_store();
4698        let planner = Planner::new(store);
4699
4700        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
4701        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4702            items: vec![
4703                ReturnItem {
4704                    expression: LogicalExpression::Variable("a".to_string()),
4705                    alias: None,
4706                },
4707                ReturnItem {
4708                    expression: LogicalExpression::Variable("b".to_string()),
4709                    alias: None,
4710                },
4711            ],
4712            distinct: false,
4713            input: Box::new(LogicalOperator::Expand(ExpandOp {
4714                from_variable: "a".to_string(),
4715                to_variable: "b".to_string(),
4716                edge_variable: None,
4717                direction: ExpandDirection::Outgoing,
4718                edge_type: Some("KNOWS".to_string()),
4719                min_hops: 1,
4720                max_hops: Some(3),
4721                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4722                    variable: "a".to_string(),
4723                    label: None,
4724                    input: None,
4725                })),
4726                path_alias: Some("p".to_string()),
4727            })),
4728        }));
4729
4730        let physical = planner.plan(&logical).unwrap();
4731        // Verify plan was created successfully with expected output columns
4732        assert!(physical.columns().contains(&"a".to_string()));
4733        assert!(physical.columns().contains(&"b".to_string()));
4734    }
4735
4736    #[test]
4737    fn test_plan_expand_incoming() {
4738        let store = create_test_store();
4739        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4740
4741        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
4742        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4743            items: vec![
4744                ReturnItem {
4745                    expression: LogicalExpression::Variable("a".to_string()),
4746                    alias: None,
4747                },
4748                ReturnItem {
4749                    expression: LogicalExpression::Variable("b".to_string()),
4750                    alias: None,
4751                },
4752            ],
4753            distinct: false,
4754            input: Box::new(LogicalOperator::Expand(ExpandOp {
4755                from_variable: "a".to_string(),
4756                to_variable: "b".to_string(),
4757                edge_variable: None,
4758                direction: ExpandDirection::Incoming,
4759                edge_type: Some("KNOWS".to_string()),
4760                min_hops: 1,
4761                max_hops: Some(1),
4762                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4763                    variable: "a".to_string(),
4764                    label: None,
4765                    input: None,
4766                })),
4767                path_alias: None,
4768            })),
4769        }));
4770
4771        let physical = planner.plan(&logical).unwrap();
4772        assert!(physical.columns().contains(&"a".to_string()));
4773        assert!(physical.columns().contains(&"b".to_string()));
4774    }
4775
4776    #[test]
4777    fn test_plan_expand_both_directions() {
4778        let store = create_test_store();
4779        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4780
4781        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
4782        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4783            items: vec![
4784                ReturnItem {
4785                    expression: LogicalExpression::Variable("a".to_string()),
4786                    alias: None,
4787                },
4788                ReturnItem {
4789                    expression: LogicalExpression::Variable("b".to_string()),
4790                    alias: None,
4791                },
4792            ],
4793            distinct: false,
4794            input: Box::new(LogicalOperator::Expand(ExpandOp {
4795                from_variable: "a".to_string(),
4796                to_variable: "b".to_string(),
4797                edge_variable: None,
4798                direction: ExpandDirection::Both,
4799                edge_type: Some("KNOWS".to_string()),
4800                min_hops: 1,
4801                max_hops: Some(1),
4802                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4803                    variable: "a".to_string(),
4804                    label: None,
4805                    input: None,
4806                })),
4807                path_alias: None,
4808            })),
4809        }));
4810
4811        let physical = planner.plan(&logical).unwrap();
4812        assert!(physical.columns().contains(&"a".to_string()));
4813        assert!(physical.columns().contains(&"b".to_string()));
4814    }
4815
4816    // ==================== With Context Tests ====================
4817
4818    #[test]
4819    fn test_planner_with_context() {
4820        use crate::transaction::TransactionManager;
4821
4822        let store = create_test_store();
4823        let tx_manager = Arc::new(TransactionManager::new());
4824        let tx_id = tx_manager.begin();
4825        let epoch = tx_manager.current_epoch();
4826
4827        let planner = Planner::with_context(
4828            Arc::clone(&store),
4829            Arc::clone(&tx_manager),
4830            Some(tx_id),
4831            epoch,
4832        );
4833
4834        assert_eq!(planner.tx_id(), Some(tx_id));
4835        assert!(planner.tx_manager().is_some());
4836        assert_eq!(planner.viewing_epoch(), epoch);
4837    }
4838
4839    #[test]
4840    fn test_planner_with_factorized_execution_disabled() {
4841        let store = create_test_store();
4842        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4843
4844        // Two consecutive expands - should NOT use factorized execution
4845        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4846            items: vec![
4847                ReturnItem {
4848                    expression: LogicalExpression::Variable("a".to_string()),
4849                    alias: None,
4850                },
4851                ReturnItem {
4852                    expression: LogicalExpression::Variable("c".to_string()),
4853                    alias: None,
4854                },
4855            ],
4856            distinct: false,
4857            input: Box::new(LogicalOperator::Expand(ExpandOp {
4858                from_variable: "b".to_string(),
4859                to_variable: "c".to_string(),
4860                edge_variable: None,
4861                direction: ExpandDirection::Outgoing,
4862                edge_type: None,
4863                min_hops: 1,
4864                max_hops: Some(1),
4865                input: Box::new(LogicalOperator::Expand(ExpandOp {
4866                    from_variable: "a".to_string(),
4867                    to_variable: "b".to_string(),
4868                    edge_variable: None,
4869                    direction: ExpandDirection::Outgoing,
4870                    edge_type: None,
4871                    min_hops: 1,
4872                    max_hops: Some(1),
4873                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4874                        variable: "a".to_string(),
4875                        label: None,
4876                        input: None,
4877                    })),
4878                    path_alias: None,
4879                })),
4880                path_alias: None,
4881            })),
4882        }));
4883
4884        let physical = planner.plan(&logical).unwrap();
4885        assert!(physical.columns().contains(&"a".to_string()));
4886        assert!(physical.columns().contains(&"c".to_string()));
4887    }
4888
4889    // ==================== Sort with Property Tests ====================
4890
4891    #[test]
4892    fn test_plan_sort_by_property() {
4893        let store = create_test_store();
4894        let planner = Planner::new(store);
4895
4896        // MATCH (n) RETURN n ORDER BY n.name ASC
4897        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4898            items: vec![ReturnItem {
4899                expression: LogicalExpression::Variable("n".to_string()),
4900                alias: None,
4901            }],
4902            distinct: false,
4903            input: Box::new(LogicalOperator::Sort(SortOp {
4904                keys: vec![SortKey {
4905                    expression: LogicalExpression::Property {
4906                        variable: "n".to_string(),
4907                        property: "name".to_string(),
4908                    },
4909                    order: SortOrder::Ascending,
4910                }],
4911                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4912                    variable: "n".to_string(),
4913                    label: None,
4914                    input: None,
4915                })),
4916            })),
4917        }));
4918
4919        let physical = planner.plan(&logical).unwrap();
4920        // Should have the property column projected
4921        assert!(physical.columns().contains(&"n".to_string()));
4922    }
4923
4924    // ==================== Scan with Input Tests ====================
4925
4926    #[test]
4927    fn test_plan_scan_with_input() {
4928        let store = create_test_store();
4929        let planner = Planner::new(store);
4930
4931        // A scan with another scan as input (for chained patterns)
4932        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4933            items: vec![
4934                ReturnItem {
4935                    expression: LogicalExpression::Variable("a".to_string()),
4936                    alias: None,
4937                },
4938                ReturnItem {
4939                    expression: LogicalExpression::Variable("b".to_string()),
4940                    alias: None,
4941                },
4942            ],
4943            distinct: false,
4944            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4945                variable: "b".to_string(),
4946                label: Some("Company".to_string()),
4947                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4948                    variable: "a".to_string(),
4949                    label: Some("Person".to_string()),
4950                    input: None,
4951                }))),
4952            })),
4953        }));
4954
4955        let physical = planner.plan(&logical).unwrap();
4956        assert!(physical.columns().contains(&"a".to_string()));
4957        assert!(physical.columns().contains(&"b".to_string()));
4958    }
4959}