Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
10    ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
11    LogicalOperator, LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp,
12    ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29    UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37/// Range bounds for property-based range queries.
38struct RangeBounds<'a> {
39    min: Option<&'a Value>,
40    max: Option<&'a Value>,
41    min_inclusive: bool,
42    max_inclusive: bool,
43}
44
45/// Converts a logical plan to a physical operator tree.
46pub struct Planner {
47    /// The graph store to scan from.
48    store: Arc<LpgStore>,
49    /// Transaction manager for MVCC operations.
50    tx_manager: Option<Arc<TransactionManager>>,
51    /// Current transaction ID (if in a transaction).
52    tx_id: Option<TxId>,
53    /// Epoch to use for visibility checks.
54    viewing_epoch: EpochId,
55    /// Counter for generating unique anonymous edge column names.
56    anon_edge_counter: std::cell::Cell<u32>,
57    /// Whether to use factorized execution for multi-hop queries.
58    factorized_execution: bool,
59}
60
61impl Planner {
62    /// Creates a new planner with the given store.
63    ///
64    /// This creates a planner without transaction context, using the current
65    /// epoch from the store for visibility.
66    #[must_use]
67    pub fn new(store: Arc<LpgStore>) -> Self {
68        let epoch = store.current_epoch();
69        Self {
70            store,
71            tx_manager: None,
72            tx_id: None,
73            viewing_epoch: epoch,
74            anon_edge_counter: std::cell::Cell::new(0),
75            factorized_execution: true,
76        }
77    }
78
79    /// Creates a new planner with transaction context for MVCC-aware planning.
80    ///
81    /// # Arguments
82    ///
83    /// * `store` - The graph store
84    /// * `tx_manager` - Transaction manager for recording reads/writes
85    /// * `tx_id` - Current transaction ID (None for auto-commit)
86    /// * `viewing_epoch` - Epoch to use for version visibility
87    #[must_use]
88    pub fn with_context(
89        store: Arc<LpgStore>,
90        tx_manager: Arc<TransactionManager>,
91        tx_id: Option<TxId>,
92        viewing_epoch: EpochId,
93    ) -> Self {
94        Self {
95            store,
96            tx_manager: Some(tx_manager),
97            tx_id,
98            viewing_epoch,
99            anon_edge_counter: std::cell::Cell::new(0),
100            factorized_execution: true,
101        }
102    }
103
104    /// Returns the viewing epoch for this planner.
105    #[must_use]
106    pub fn viewing_epoch(&self) -> EpochId {
107        self.viewing_epoch
108    }
109
110    /// Returns the transaction ID for this planner, if any.
111    #[must_use]
112    pub fn tx_id(&self) -> Option<TxId> {
113        self.tx_id
114    }
115
116    /// Returns a reference to the transaction manager, if available.
117    #[must_use]
118    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119        self.tx_manager.as_ref()
120    }
121
122    /// Enables or disables factorized execution for multi-hop queries.
123    #[must_use]
124    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125        self.factorized_execution = enabled;
126        self
127    }
128
129    /// Counts consecutive single-hop expand operations.
130    ///
131    /// Returns the count and the deepest non-expand operator (the base of the chain).
132    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133        match op {
134            LogicalOperator::Expand(expand) => {
135                // Only count single-hop expands (factorization doesn't apply to variable-length)
136                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138                if is_single_hop {
139                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
140                    (inner_count + 1, base)
141                } else {
142                    // Variable-length path breaks the chain
143                    (0, op)
144                }
145            }
146            _ => (0, op),
147        }
148    }
149
150    /// Collects expand operations from the outermost down to the base.
151    ///
152    /// Returns expands in order from innermost (base) to outermost.
153    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154        let mut chain = Vec::new();
155        let mut current = op;
156
157        while let LogicalOperator::Expand(expand) = current {
158            // Only include single-hop expands
159            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160            if !is_single_hop {
161                break;
162            }
163            chain.push(expand);
164            current = &expand.input;
165        }
166
167        // Reverse so we go from base to outer
168        chain.reverse();
169        chain
170    }
171
172    /// Plans a logical plan into a physical operator.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if planning fails.
177    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179        Ok(PhysicalPlan {
180            operator,
181            columns,
182            adaptive_context: None,
183        })
184    }
185
186    /// Plans a logical plan with adaptive execution support.
187    ///
188    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
189    /// joins) that can be monitored during execution to detect estimate errors.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if planning fails.
194    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197        // Build adaptive context with cardinality estimates
198        let mut adaptive_context = AdaptiveContext::new();
199        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201        Ok(PhysicalPlan {
202            operator,
203            columns,
204            adaptive_context: Some(adaptive_context),
205        })
206    }
207
208    /// Collects cardinality estimates from the logical plan into an adaptive context.
209    fn collect_cardinality_estimates(
210        &self,
211        op: &LogicalOperator,
212        ctx: &mut AdaptiveContext,
213        depth: usize,
214    ) {
215        match op {
216            LogicalOperator::NodeScan(scan) => {
217                // Estimate based on label statistics
218                let estimate = if let Some(label) = &scan.label {
219                    self.store.nodes_by_label(label).len() as f64
220                } else {
221                    self.store.node_count() as f64
222                };
223                let id = format!("scan_{}", scan.variable);
224                ctx.set_estimate(&id, estimate);
225
226                // Recurse into input if present
227                if let Some(input) = &scan.input {
228                    self.collect_cardinality_estimates(input, ctx, depth + 1);
229                }
230            }
231            LogicalOperator::Filter(filter) => {
232                // Default selectivity estimate for filters (30%)
233                let input_estimate = self.estimate_cardinality(&filter.input);
234                let estimate = input_estimate * 0.3;
235                let id = format!("filter_{depth}");
236                ctx.set_estimate(&id, estimate);
237
238                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239            }
240            LogicalOperator::Expand(expand) => {
241                // Estimate based on average degree from store statistics
242                let input_estimate = self.estimate_cardinality(&expand.input);
243                let stats = self.store.statistics();
244                let avg_degree = self.estimate_expand_degree(&stats, expand);
245                let estimate = input_estimate * avg_degree;
246                let id = format!("expand_{}", expand.to_variable);
247                ctx.set_estimate(&id, estimate);
248
249                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250            }
251            LogicalOperator::Join(join) => {
252                // Estimate join output (product with selectivity)
253                let left_est = self.estimate_cardinality(&join.left);
254                let right_est = self.estimate_cardinality(&join.right);
255                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
256                let id = format!("join_{depth}");
257                ctx.set_estimate(&id, estimate);
258
259                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261            }
262            LogicalOperator::Aggregate(agg) => {
263                // Aggregates typically reduce cardinality
264                let input_estimate = self.estimate_cardinality(&agg.input);
265                let estimate = if agg.group_by.is_empty() {
266                    1.0 // Scalar aggregate
267                } else {
268                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
269                };
270                let id = format!("aggregate_{depth}");
271                ctx.set_estimate(&id, estimate);
272
273                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274            }
275            LogicalOperator::Distinct(distinct) => {
276                let input_estimate = self.estimate_cardinality(&distinct.input);
277                let estimate = (input_estimate * 0.5).max(1.0);
278                let id = format!("distinct_{depth}");
279                ctx.set_estimate(&id, estimate);
280
281                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282            }
283            LogicalOperator::Return(ret) => {
284                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285            }
286            LogicalOperator::Limit(limit) => {
287                let input_estimate = self.estimate_cardinality(&limit.input);
288                let estimate = (input_estimate).min(limit.count as f64);
289                let id = format!("limit_{depth}");
290                ctx.set_estimate(&id, estimate);
291
292                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293            }
294            LogicalOperator::Skip(skip) => {
295                let input_estimate = self.estimate_cardinality(&skip.input);
296                let estimate = (input_estimate - skip.count as f64).max(0.0);
297                let id = format!("skip_{depth}");
298                ctx.set_estimate(&id, estimate);
299
300                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301            }
302            LogicalOperator::Sort(sort) => {
303                // Sort doesn't change cardinality
304                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305            }
306            LogicalOperator::Union(union) => {
307                let estimate: f64 = union
308                    .inputs
309                    .iter()
310                    .map(|input| self.estimate_cardinality(input))
311                    .sum();
312                let id = format!("union_{depth}");
313                ctx.set_estimate(&id, estimate);
314
315                for input in &union.inputs {
316                    self.collect_cardinality_estimates(input, ctx, depth + 1);
317                }
318            }
319            _ => {
320                // For other operators, try to recurse into known input patterns
321            }
322        }
323    }
324
325    /// Estimates cardinality for a logical operator subtree.
326    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327        match op {
328            LogicalOperator::NodeScan(scan) => {
329                if let Some(label) = &scan.label {
330                    self.store.nodes_by_label(label).len() as f64
331                } else {
332                    self.store.node_count() as f64
333                }
334            }
335            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336            LogicalOperator::Expand(expand) => {
337                let stats = self.store.statistics();
338                let avg_degree = self.estimate_expand_degree(&stats, expand);
339                self.estimate_cardinality(&expand.input) * avg_degree
340            }
341            LogicalOperator::Join(join) => {
342                let left = self.estimate_cardinality(&join.left);
343                let right = self.estimate_cardinality(&join.right);
344                (left * right).sqrt()
345            }
346            LogicalOperator::Aggregate(agg) => {
347                if agg.group_by.is_empty() {
348                    1.0
349                } else {
350                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351                }
352            }
353            LogicalOperator::Distinct(distinct) => {
354                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355            }
356            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357            LogicalOperator::Limit(limit) => self
358                .estimate_cardinality(&limit.input)
359                .min(limit.count as f64),
360            LogicalOperator::Skip(skip) => {
361                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362            }
363            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364            LogicalOperator::Union(union) => union
365                .inputs
366                .iter()
367                .map(|input| self.estimate_cardinality(input))
368                .sum(),
369            _ => 1000.0, // Default estimate for unknown operators
370        }
371    }
372
373    /// Estimates the average edge degree for an expand operation using store statistics.
374    fn estimate_expand_degree(
375        &self,
376        stats: &grafeo_core::statistics::Statistics,
377        expand: &ExpandOp,
378    ) -> f64 {
379        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380        if let Some(edge_type) = &expand.edge_type {
381            stats.estimate_avg_degree(edge_type, outgoing)
382        } else if stats.total_nodes > 0 {
383            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384        } else {
385            10.0 // fallback for empty graph
386        }
387    }
388
389    /// Plans a single logical operator.
390    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391        match op {
392            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393            LogicalOperator::Expand(expand) => {
394                // Check for expand chains when factorized execution is enabled
395                if self.factorized_execution {
396                    let (chain_len, _base) = Self::count_expand_chain(op);
397                    if chain_len >= 2 {
398                        // Use factorized chain for 2+ consecutive single-hop expands
399                        return self.plan_expand_chain(op);
400                    }
401                }
402                self.plan_expand(expand)
403            }
404            LogicalOperator::Return(ret) => self.plan_return(ret),
405            LogicalOperator::Filter(filter) => self.plan_filter(filter),
406            LogicalOperator::Project(project) => self.plan_project(project),
407            LogicalOperator::Limit(limit) => self.plan_limit(limit),
408            LogicalOperator::Skip(skip) => self.plan_skip(skip),
409            LogicalOperator::Sort(sort) => self.plan_sort(sort),
410            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411            LogicalOperator::Join(join) => self.plan_join(join),
412            LogicalOperator::Union(union) => self.plan_union(union),
413            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421            LogicalOperator::Merge(merge) => self.plan_merge(merge),
422            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
427            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
428            LogicalOperator::VectorScan(_) => Err(Error::Internal(
429                "VectorScan requires vector-index feature".to_string(),
430            )),
431            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
432                "VectorJoin requires vector-index feature".to_string(),
433            )),
434            _ => Err(Error::Internal(format!(
435                "Unsupported operator: {:?}",
436                std::mem::discriminant(op)
437            ))),
438        }
439    }
440
441    /// Plans a node scan operator.
442    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
443        let scan_op = if let Some(label) = &scan.label {
444            ScanOperator::with_label(Arc::clone(&self.store), label)
445        } else {
446            ScanOperator::new(Arc::clone(&self.store))
447        };
448
449        // Apply MVCC context if available
450        let scan_operator: Box<dyn Operator> =
451            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
452
453        // If there's an input, chain operators with a nested loop join (cross join)
454        if let Some(input) = &scan.input {
455            let (input_op, mut input_columns) = self.plan_operator(input)?;
456
457            // Build output schema: input columns + scan column
458            let mut output_schema: Vec<LogicalType> =
459                input_columns.iter().map(|_| LogicalType::Any).collect();
460            output_schema.push(LogicalType::Node);
461
462            // Add scan column to input columns
463            input_columns.push(scan.variable.clone());
464
465            // Use nested loop join to combine input rows with scanned nodes
466            let join_op = Box::new(NestedLoopJoinOperator::new(
467                input_op,
468                scan_operator,
469                None, // No join condition (cross join)
470                PhysicalJoinType::Cross,
471                output_schema,
472            ));
473
474            Ok((join_op, input_columns))
475        } else {
476            let columns = vec![scan.variable.clone()];
477            Ok((scan_operator, columns))
478        }
479    }
480
481    /// Plans an expand operator.
482    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
483        // Plan the input operator first
484        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
485
486        // Find the source column index
487        let source_column = input_columns
488            .iter()
489            .position(|c| c == &expand.from_variable)
490            .ok_or_else(|| {
491                Error::Internal(format!(
492                    "Source variable '{}' not found in input columns",
493                    expand.from_variable
494                ))
495            })?;
496
497        // Convert expand direction
498        let direction = match expand.direction {
499            ExpandDirection::Outgoing => Direction::Outgoing,
500            ExpandDirection::Incoming => Direction::Incoming,
501            ExpandDirection::Both => Direction::Both,
502        };
503
504        // Check if this is a variable-length path
505        let is_variable_length =
506            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
507
508        let operator: Box<dyn Operator> = if is_variable_length {
509            // Use VariableLengthExpandOperator for multi-hop paths
510            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
511            let mut expand_op = VariableLengthExpandOperator::new(
512                Arc::clone(&self.store),
513                input_op,
514                source_column,
515                direction,
516                expand.edge_type.clone(),
517                expand.min_hops,
518                max_hops,
519            )
520            .with_tx_context(self.viewing_epoch, self.tx_id);
521
522            // If a path alias is set, enable path length and detail output
523            if expand.path_alias.is_some() {
524                expand_op = expand_op
525                    .with_path_length_output()
526                    .with_path_detail_output();
527            }
528
529            Box::new(expand_op)
530        } else {
531            // Use simple ExpandOperator for single-hop paths
532            let expand_op = ExpandOperator::new(
533                Arc::clone(&self.store),
534                input_op,
535                source_column,
536                direction,
537                expand.edge_type.clone(),
538            )
539            .with_tx_context(self.viewing_epoch, self.tx_id);
540            Box::new(expand_op)
541        };
542
543        // Build output columns: [input_columns..., edge, target, (path_length)?]
544        // Preserve all input columns and add edge + target to match ExpandOperator output
545        let mut columns = input_columns;
546
547        // Generate edge column name - use provided name or generate anonymous name
548        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
549            let count = self.anon_edge_counter.get();
550            self.anon_edge_counter.set(count + 1);
551            format!("_anon_edge_{}", count)
552        });
553        columns.push(edge_col_name);
554
555        columns.push(expand.to_variable.clone());
556
557        // If a path alias is set, add columns for path length, nodes, and edges
558        if let Some(ref path_alias) = expand.path_alias {
559            columns.push(format!("_path_length_{}", path_alias));
560            columns.push(format!("_path_nodes_{}", path_alias));
561            columns.push(format!("_path_edges_{}", path_alias));
562        }
563
564        Ok((operator, columns))
565    }
566
567    /// Plans a chain of consecutive expand operations using factorized execution.
568    ///
569    /// This avoids the Cartesian product explosion that occurs with separate expands.
570    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
571    ///
572    /// The chain is executed lazily at query time, not during planning. This ensures
573    /// that any filters applied above the expand chain are properly respected.
574    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
575        let expands = Self::collect_expand_chain(op);
576        if expands.is_empty() {
577            return Err(Error::Internal("Empty expand chain".to_string()));
578        }
579
580        // Get the base operator (before first expand)
581        let first_expand = expands[0];
582        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
583
584        let mut columns = base_columns.clone();
585        let mut steps = Vec::new();
586
587        // Track the level-local source column for each expand
588        // For the first expand, it's the column in the input (base_columns)
589        // For subsequent expands, the target from the previous level is always at index 1
590        // (each level adds [edge, target], so target is at index 1)
591        let mut is_first = true;
592
593        for expand in &expands {
594            // Find source column for this expand
595            let source_column = if is_first {
596                // For first expand, find in base columns
597                base_columns
598                    .iter()
599                    .position(|c| c == &expand.from_variable)
600                    .ok_or_else(|| {
601                        Error::Internal(format!(
602                            "Source variable '{}' not found in base columns",
603                            expand.from_variable
604                        ))
605                    })?
606            } else {
607                // For subsequent expands, the target from the previous level is at index 1
608                // (each level adds [edge, target], so target is the second column)
609                1
610            };
611
612            // Convert direction
613            let direction = match expand.direction {
614                ExpandDirection::Outgoing => Direction::Outgoing,
615                ExpandDirection::Incoming => Direction::Incoming,
616                ExpandDirection::Both => Direction::Both,
617            };
618
619            // Add expand step configuration
620            steps.push(ExpandStep {
621                source_column,
622                direction,
623                edge_type: expand.edge_type.clone(),
624            });
625
626            // Add edge and target columns
627            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
628                let count = self.anon_edge_counter.get();
629                self.anon_edge_counter.set(count + 1);
630                format!("_anon_edge_{}", count)
631            });
632            columns.push(edge_col_name);
633            columns.push(expand.to_variable.clone());
634
635            is_first = false;
636        }
637
638        // Create lazy operator that executes at query time, not planning time
639        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
640
641        if let Some(tx_id) = self.tx_id {
642            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
643        } else {
644            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
645        }
646
647        Ok((Box::new(lazy_op), columns))
648    }
649
650    /// Plans a RETURN clause.
651    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
652        // Plan the input operator
653        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
654
655        // Build variable to column index mapping
656        let variable_columns: HashMap<String, usize> = input_columns
657            .iter()
658            .enumerate()
659            .map(|(i, name)| (name.clone(), i))
660            .collect();
661
662        // Extract column names from return items
663        let columns: Vec<String> = ret
664            .items
665            .iter()
666            .map(|item| {
667                item.alias.clone().unwrap_or_else(|| {
668                    // Generate a default name from the expression
669                    expression_to_string(&item.expression)
670                })
671            })
672            .collect();
673
674        // Check if we need a project operator (for property access or expression evaluation)
675        let needs_project = ret
676            .items
677            .iter()
678            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
679
680        if needs_project {
681            // Build project expressions
682            let mut projections = Vec::with_capacity(ret.items.len());
683            let mut output_types = Vec::with_capacity(ret.items.len());
684
685            for item in &ret.items {
686                match &item.expression {
687                    LogicalExpression::Variable(name) => {
688                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
689                            Error::Internal(format!("Variable '{}' not found in input", name))
690                        })?;
691                        projections.push(ProjectExpr::Column(col_idx));
692                        // Path detail variables carry List values — use Any/Generic
693                        if name.starts_with("_path_nodes_")
694                            || name.starts_with("_path_edges_")
695                            || name.starts_with("_path_length_")
696                        {
697                            output_types.push(LogicalType::Any);
698                        } else {
699                            output_types.push(LogicalType::Node);
700                        }
701                    }
702                    LogicalExpression::Property { variable, property } => {
703                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
704                            Error::Internal(format!("Variable '{}' not found in input", variable))
705                        })?;
706                        projections.push(ProjectExpr::PropertyAccess {
707                            column: col_idx,
708                            property: property.clone(),
709                        });
710                        // Property could be any type - use Any/Generic to preserve type
711                        output_types.push(LogicalType::Any);
712                    }
713                    LogicalExpression::Literal(value) => {
714                        projections.push(ProjectExpr::Constant(value.clone()));
715                        output_types.push(value_to_logical_type(value));
716                    }
717                    LogicalExpression::FunctionCall { name, args, .. } => {
718                        // Handle built-in functions
719                        match name.to_lowercase().as_str() {
720                            "type" => {
721                                // type(r) returns the edge type string
722                                if args.len() != 1 {
723                                    return Err(Error::Internal(
724                                        "type() requires exactly one argument".to_string(),
725                                    ));
726                                }
727                                if let LogicalExpression::Variable(var_name) = &args[0] {
728                                    let col_idx =
729                                        *variable_columns.get(var_name).ok_or_else(|| {
730                                            Error::Internal(format!(
731                                                "Variable '{}' not found in input",
732                                                var_name
733                                            ))
734                                        })?;
735                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
736                                    output_types.push(LogicalType::String);
737                                } else {
738                                    return Err(Error::Internal(
739                                        "type() argument must be a variable".to_string(),
740                                    ));
741                                }
742                            }
743                            "length" => {
744                                // length(p) returns the path length
745                                // For shortestPath results, the path column already contains the length
746                                if args.len() != 1 {
747                                    return Err(Error::Internal(
748                                        "length() requires exactly one argument".to_string(),
749                                    ));
750                                }
751                                if let LogicalExpression::Variable(var_name) = &args[0] {
752                                    let col_idx =
753                                        *variable_columns.get(var_name).ok_or_else(|| {
754                                            Error::Internal(format!(
755                                                "Variable '{}' not found in input",
756                                                var_name
757                                            ))
758                                        })?;
759                                    // Pass through the column value directly
760                                    projections.push(ProjectExpr::Column(col_idx));
761                                    output_types.push(LogicalType::Int64);
762                                } else {
763                                    return Err(Error::Internal(
764                                        "length() argument must be a variable".to_string(),
765                                    ));
766                                }
767                            }
768                            "nodes" | "edges" => {
769                                // nodes(p) / edges(p) returns the list of nodes/edges in the path
770                                if args.len() != 1 {
771                                    return Err(Error::Internal(format!(
772                                        "{}() requires exactly one argument",
773                                        name
774                                    )));
775                                }
776                                if let LogicalExpression::Variable(var_name) = &args[0] {
777                                    let col_idx =
778                                        *variable_columns.get(var_name).ok_or_else(|| {
779                                            Error::Internal(format!(
780                                                "Variable '{var_name}' not found in input",
781                                            ))
782                                        })?;
783                                    projections.push(ProjectExpr::Column(col_idx));
784                                    output_types.push(LogicalType::Any);
785                                } else {
786                                    return Err(Error::Internal(format!(
787                                        "{}() argument must be a variable",
788                                        name
789                                    )));
790                                }
791                            }
792                            // For other functions (head, tail, size, etc.), use expression evaluation
793                            _ => {
794                                let filter_expr = self.convert_expression(&item.expression)?;
795                                projections.push(ProjectExpr::Expression {
796                                    expr: filter_expr,
797                                    variable_columns: variable_columns.clone(),
798                                });
799                                output_types.push(LogicalType::Any);
800                            }
801                        }
802                    }
803                    LogicalExpression::Case { .. } => {
804                        // Convert CASE expression to FilterExpression for evaluation
805                        let filter_expr = self.convert_expression(&item.expression)?;
806                        projections.push(ProjectExpr::Expression {
807                            expr: filter_expr,
808                            variable_columns: variable_columns.clone(),
809                        });
810                        // CASE can return any type - use Any
811                        output_types.push(LogicalType::Any);
812                    }
813                    _ => {
814                        return Err(Error::Internal(format!(
815                            "Unsupported RETURN expression: {:?}",
816                            item.expression
817                        )));
818                    }
819                }
820            }
821
822            let operator = Box::new(ProjectOperator::with_store(
823                input_op,
824                projections,
825                output_types,
826                Arc::clone(&self.store),
827            ));
828
829            Ok((operator, columns))
830        } else {
831            // Simple case: just return variables
832            // Re-order columns to match return items if needed
833            let mut projections = Vec::with_capacity(ret.items.len());
834            let mut output_types = Vec::with_capacity(ret.items.len());
835
836            for item in &ret.items {
837                if let LogicalExpression::Variable(name) = &item.expression {
838                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
839                        Error::Internal(format!("Variable '{}' not found in input", name))
840                    })?;
841                    projections.push(ProjectExpr::Column(col_idx));
842                    output_types.push(LogicalType::Node);
843                }
844            }
845
846            // Only add ProjectOperator if reordering is needed
847            if projections.len() == input_columns.len()
848                && projections
849                    .iter()
850                    .enumerate()
851                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
852            {
853                // No reordering needed
854                Ok((input_op, columns))
855            } else {
856                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
857                Ok((operator, columns))
858            }
859        }
860    }
861
862    /// Plans a project operator (for WITH clause).
863    fn plan_project(
864        &self,
865        project: &crate::query::plan::ProjectOp,
866    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
867        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
868        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
869            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
870                // Create a single-row operator for projecting literals
871                let single_row_op: Box<dyn Operator> = Box::new(
872                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
873                );
874                (single_row_op, Vec::new())
875            } else {
876                self.plan_operator(&project.input)?
877            };
878
879        // Build variable to column index mapping
880        let variable_columns: HashMap<String, usize> = input_columns
881            .iter()
882            .enumerate()
883            .map(|(i, name)| (name.clone(), i))
884            .collect();
885
886        // Build projections and new column names
887        let mut projections = Vec::with_capacity(project.projections.len());
888        let mut output_types = Vec::with_capacity(project.projections.len());
889        let mut output_columns = Vec::with_capacity(project.projections.len());
890
891        for projection in &project.projections {
892            // Determine the output column name (alias or expression string)
893            let col_name = projection
894                .alias
895                .clone()
896                .unwrap_or_else(|| expression_to_string(&projection.expression));
897            output_columns.push(col_name);
898
899            match &projection.expression {
900                LogicalExpression::Variable(name) => {
901                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
902                        Error::Internal(format!("Variable '{}' not found in input", name))
903                    })?;
904                    projections.push(ProjectExpr::Column(col_idx));
905                    output_types.push(LogicalType::Node);
906                }
907                LogicalExpression::Property { variable, property } => {
908                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
909                        Error::Internal(format!("Variable '{}' not found in input", variable))
910                    })?;
911                    projections.push(ProjectExpr::PropertyAccess {
912                        column: col_idx,
913                        property: property.clone(),
914                    });
915                    output_types.push(LogicalType::Any);
916                }
917                LogicalExpression::Literal(value) => {
918                    projections.push(ProjectExpr::Constant(value.clone()));
919                    output_types.push(value_to_logical_type(value));
920                }
921                _ => {
922                    // For complex expressions, use full expression evaluation
923                    let filter_expr = self.convert_expression(&projection.expression)?;
924                    projections.push(ProjectExpr::Expression {
925                        expr: filter_expr,
926                        variable_columns: variable_columns.clone(),
927                    });
928                    output_types.push(LogicalType::Any);
929                }
930            }
931        }
932
933        let operator = Box::new(ProjectOperator::with_store(
934            input_op,
935            projections,
936            output_types,
937            Arc::clone(&self.store),
938        ));
939
940        Ok((operator, output_columns))
941    }
942
943    /// Plans a filter operator.
944    ///
945    /// Uses zone map pre-filtering to potentially skip scans when predicates
946    /// definitely won't match any data. Also uses property indexes when available
947    /// for O(1) lookups instead of full scans.
948    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
949        // Check zone maps for simple property predicates before scanning
950        // If zone map says "definitely no matches", we can short-circuit
951        if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
952            // Zone map says no matches possible - return empty result
953            let (_, columns) = self.plan_operator(&filter.input)?;
954            let schema = self.derive_schema_from_columns(&columns);
955            let empty_op = Box::new(EmptyOperator::new(schema));
956            return Ok((empty_op, columns));
957        }
958
959        // Try to use property index for equality predicates on indexed properties
960        if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
961            return Ok(result);
962        }
963
964        // Try to use range optimization for range predicates (>, <, >=, <=)
965        if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
966            return Ok(result);
967        }
968
969        // Plan the input operator first
970        let (input_op, columns) = self.plan_operator(&filter.input)?;
971
972        // Build variable to column index mapping
973        let variable_columns: HashMap<String, usize> = columns
974            .iter()
975            .enumerate()
976            .map(|(i, name)| (name.clone(), i))
977            .collect();
978
979        // Convert logical expression to filter expression
980        let filter_expr = self.convert_expression(&filter.predicate)?;
981
982        // Create the predicate
983        let predicate =
984            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
985
986        // Create the filter operator
987        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
988
989        Ok((operator, columns))
990    }
991
992    /// Checks zone maps for a predicate to see if we can skip the scan entirely.
993    ///
994    /// Returns:
995    /// - `Some(false)` if zone map proves no matches possible (can skip)
996    /// - `Some(true)` if zone map says matches might exist
997    /// - `None` if zone map check not applicable
998    fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
999        use grafeo_core::graph::lpg::CompareOp;
1000
1001        match predicate {
1002            LogicalExpression::Binary { left, op, right } => {
1003                // Check for AND/OR first (compound conditions)
1004                match op {
1005                    BinaryOp::And => {
1006                        let left_result = self.check_zone_map_for_predicate(left);
1007                        let right_result = self.check_zone_map_for_predicate(right);
1008
1009                        return match (left_result, right_result) {
1010                            // If either side definitely won't match, the AND won't match
1011                            (Some(false), _) | (_, Some(false)) => Some(false),
1012                            // If both might match, might match overall
1013                            (Some(true), Some(true)) => Some(true),
1014                            // Otherwise, can't determine
1015                            _ => None,
1016                        };
1017                    }
1018                    BinaryOp::Or => {
1019                        let left_result = self.check_zone_map_for_predicate(left);
1020                        let right_result = self.check_zone_map_for_predicate(right);
1021
1022                        return match (left_result, right_result) {
1023                            // Both sides definitely won't match
1024                            (Some(false), Some(false)) => Some(false),
1025                            // At least one side might match
1026                            (Some(true), _) | (_, Some(true)) => Some(true),
1027                            // Otherwise, can't determine
1028                            _ => None,
1029                        };
1030                    }
1031                    _ => {}
1032                }
1033
1034                // Simple property comparison: n.property op value
1035                let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1036                    (
1037                        LogicalExpression::Property { property, .. },
1038                        LogicalExpression::Literal(val),
1039                    ) => {
1040                        let cmp = match op {
1041                            BinaryOp::Eq => CompareOp::Eq,
1042                            BinaryOp::Ne => CompareOp::Ne,
1043                            BinaryOp::Lt => CompareOp::Lt,
1044                            BinaryOp::Le => CompareOp::Le,
1045                            BinaryOp::Gt => CompareOp::Gt,
1046                            BinaryOp::Ge => CompareOp::Ge,
1047                            _ => return None,
1048                        };
1049                        (property.clone(), cmp, val.clone())
1050                    }
1051                    (
1052                        LogicalExpression::Literal(val),
1053                        LogicalExpression::Property { property, .. },
1054                    ) => {
1055                        // Flip comparison for reversed operands
1056                        let cmp = match op {
1057                            BinaryOp::Eq => CompareOp::Eq,
1058                            BinaryOp::Ne => CompareOp::Ne,
1059                            BinaryOp::Lt => CompareOp::Gt, // val < prop means prop > val
1060                            BinaryOp::Le => CompareOp::Ge,
1061                            BinaryOp::Gt => CompareOp::Lt,
1062                            BinaryOp::Ge => CompareOp::Le,
1063                            _ => return None,
1064                        };
1065                        (property.clone(), cmp, val.clone())
1066                    }
1067                    _ => return None,
1068                };
1069
1070                // Check zone map for node properties
1071                let might_match =
1072                    self.store
1073                        .node_property_might_match(&property.into(), compare_op, &value);
1074
1075                Some(might_match)
1076            }
1077
1078            _ => None,
1079        }
1080    }
1081
1082    /// Tries to use a property index for filter optimization.
1083    ///
1084    /// When a filter predicate is an equality check on an indexed property,
1085    /// and the input is a simple NodeScan, we can use the index to look up
1086    /// matching nodes directly instead of scanning all nodes.
1087    ///
1088    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1089    /// `Ok(None)` if not applicable, or `Err` on error.
1090    fn try_plan_filter_with_property_index(
1091        &self,
1092        filter: &FilterOp,
1093    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1094        // Only optimize if input is a simple NodeScan (not nested)
1095        let (scan_variable, scan_label) = match filter.input.as_ref() {
1096            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1097                (scan.variable.clone(), scan.label.clone())
1098            }
1099            _ => return Ok(None),
1100        };
1101
1102        // Extract property equality conditions from the predicate
1103        // Handles both simple (n.prop = val) and compound (n.a = 1 AND n.b = 2)
1104        let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1105
1106        if conditions.is_empty() {
1107            return Ok(None);
1108        }
1109
1110        // Check if at least one condition has an index
1111        let has_indexed_condition = conditions
1112            .iter()
1113            .any(|(prop, _)| self.store.has_property_index(prop));
1114
1115        // Without an index we can still optimize when there's a label constraint:
1116        // label-first scan + property check avoids DataChunk/expression overhead.
1117        if !has_indexed_condition && scan_label.is_none() {
1118            return Ok(None);
1119        }
1120
1121        let mut matching_nodes = if has_indexed_condition {
1122            // Use index-based batch lookup
1123            let conditions_ref: Vec<(&str, Value)> = conditions
1124                .iter()
1125                .map(|(p, v)| (p.as_str(), v.clone()))
1126                .collect();
1127            let mut nodes = self.store.find_nodes_by_properties(&conditions_ref);
1128
1129            // Intersect with label if present
1130            if let Some(label) = &scan_label {
1131                let label_nodes: std::collections::HashSet<_> =
1132                    self.store.nodes_by_label(label).into_iter().collect();
1133                nodes.retain(|n| label_nodes.contains(n));
1134            }
1135            nodes
1136        } else {
1137            // No index but we have a label — scan label first, then check properties.
1138            // This is more efficient than ScanOperator → DataChunk → FilterOperator
1139            // because it avoids DataChunk materialization and expression evaluation.
1140            let label = scan_label.as_ref().expect("label checked above");
1141            let label_nodes = self.store.nodes_by_label(label);
1142            label_nodes
1143                .into_iter()
1144                .filter(|&node_id| {
1145                    conditions.iter().all(|(prop, val)| {
1146                        let key = grafeo_common::types::PropertyKey::new(prop);
1147                        self.store
1148                            .get_node_property(node_id, &key)
1149                            .is_some_and(|v| v == *val)
1150                    })
1151                })
1152                .collect()
1153        };
1154
1155        // MVCC visibility: filter out nodes not visible at the current epoch/tx.
1156        // Without this, rolled-back or uncommitted nodes could leak through.
1157        let epoch = self.viewing_epoch;
1158        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
1159        matching_nodes.retain(|id| self.store.get_node_versioned(*id, epoch, tx).is_some());
1160
1161        let columns = vec![scan_variable.clone()];
1162        let node_list_op: Box<dyn Operator> = Box::new(NodeListOperator::new(matching_nodes, 2048));
1163
1164        // Check for remaining predicate parts that weren't pushed down
1165        // (e.g., range conditions in a compound predicate like `n.name = 'Alice' AND n.age > 30`)
1166        if let Some(remaining) =
1167            self.extract_remaining_predicate(&filter.predicate, &scan_variable, &conditions)
1168        {
1169            let variable_columns: HashMap<String, usize> = columns
1170                .iter()
1171                .enumerate()
1172                .map(|(i, name)| (name.clone(), i))
1173                .collect();
1174            let filter_expr = self.convert_expression(&remaining)?;
1175            let predicate =
1176                ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
1177            let filtered = Box::new(FilterOperator::new(node_list_op, Box::new(predicate)));
1178            Ok(Some((filtered, columns)))
1179        } else {
1180            Ok(Some((node_list_op, columns)))
1181        }
1182    }
1183
1184    /// Extracts the remaining predicate after removing pushed-down equality conditions.
1185    ///
1186    /// Given `n.name = 'Alice' AND n.age > 30` with pushed conditions `[("name", "Alice")]`,
1187    /// returns `Some(n.age > 30)`. Returns `None` when all conditions were pushed down.
1188    fn extract_remaining_predicate(
1189        &self,
1190        predicate: &LogicalExpression,
1191        target_variable: &str,
1192        pushed_conditions: &[(String, Value)],
1193    ) -> Option<LogicalExpression> {
1194        match predicate {
1195            LogicalExpression::Binary {
1196                left,
1197                op: BinaryOp::And,
1198                right,
1199            } => {
1200                let left_remaining =
1201                    self.extract_remaining_predicate(left, target_variable, pushed_conditions);
1202                let right_remaining =
1203                    self.extract_remaining_predicate(right, target_variable, pushed_conditions);
1204
1205                match (left_remaining, right_remaining) {
1206                    (Some(l), Some(r)) => Some(LogicalExpression::Binary {
1207                        left: Box::new(l),
1208                        op: BinaryOp::And,
1209                        right: Box::new(r),
1210                    }),
1211                    (Some(l), None) => Some(l),
1212                    (None, Some(r)) => Some(r),
1213                    (None, None) => None,
1214                }
1215            }
1216            LogicalExpression::Binary {
1217                left,
1218                op: BinaryOp::Eq,
1219                right,
1220            } => {
1221                // Check if this equality was pushed down
1222                if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1223                    && var == target_variable
1224                    && pushed_conditions
1225                        .iter()
1226                        .any(|(p, v)| *p == prop && *v == val)
1227                {
1228                    None // Already handled at the store level
1229                } else {
1230                    Some(predicate.clone())
1231                }
1232            }
1233            _ => Some(predicate.clone()),
1234        }
1235    }
1236
1237    /// Extracts equality conditions (property = literal) from a predicate.
1238    ///
1239    /// Handles both simple predicates and AND chains:
1240    /// - `n.name = "Alice"` → `[("name", "Alice")]`
1241    /// - `n.name = "Alice" AND n.age = 30` → `[("name", "Alice"), ("age", 30)]`
1242    fn extract_equality_conditions(
1243        &self,
1244        predicate: &LogicalExpression,
1245        target_variable: &str,
1246    ) -> Vec<(String, Value)> {
1247        let mut conditions = Vec::new();
1248        self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1249        conditions
1250    }
1251
1252    /// Recursively collects equality conditions from AND expressions.
1253    fn collect_equality_conditions(
1254        &self,
1255        expr: &LogicalExpression,
1256        target_variable: &str,
1257        conditions: &mut Vec<(String, Value)>,
1258    ) {
1259        match expr {
1260            // Handle AND: recurse into both sides
1261            LogicalExpression::Binary {
1262                left,
1263                op: BinaryOp::And,
1264                right,
1265            } => {
1266                self.collect_equality_conditions(left, target_variable, conditions);
1267                self.collect_equality_conditions(right, target_variable, conditions);
1268            }
1269
1270            // Handle equality: extract property and value
1271            LogicalExpression::Binary {
1272                left,
1273                op: BinaryOp::Eq,
1274                right,
1275            } => {
1276                if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1277                    && var == target_variable
1278                {
1279                    conditions.push((prop, val));
1280                }
1281            }
1282
1283            _ => {}
1284        }
1285    }
1286
1287    /// Extracts (variable, property, value) from a property equality expression.
1288    fn extract_property_equality(
1289        &self,
1290        left: &LogicalExpression,
1291        right: &LogicalExpression,
1292    ) -> Option<(String, String, Value)> {
1293        match (left, right) {
1294            (
1295                LogicalExpression::Property { variable, property },
1296                LogicalExpression::Literal(val),
1297            ) => Some((variable.clone(), property.clone(), val.clone())),
1298            (
1299                LogicalExpression::Literal(val),
1300                LogicalExpression::Property { variable, property },
1301            ) => Some((variable.clone(), property.clone(), val.clone())),
1302            _ => None,
1303        }
1304    }
1305
1306    /// Tries to optimize a filter using range queries on properties.
1307    ///
1308    /// This optimization is applied when:
1309    /// - The input is a simple NodeScan (no nested operations)
1310    /// - The predicate contains range comparisons (>, <, >=, <=)
1311    /// - The same variable and property are being filtered
1312    ///
1313    /// Handles both simple range predicates (`n.age > 30`) and BETWEEN patterns
1314    /// (`n.age >= 30 AND n.age <= 50`).
1315    ///
1316    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1317    /// `Ok(None)` if not applicable, or `Err` on error.
1318    fn try_plan_filter_with_range_index(
1319        &self,
1320        filter: &FilterOp,
1321    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1322        // Only optimize if input is a simple NodeScan (not nested)
1323        let (scan_variable, scan_label) = match filter.input.as_ref() {
1324            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1325                (scan.variable.clone(), scan.label.clone())
1326            }
1327            _ => return Ok(None),
1328        };
1329
1330        // Try to extract BETWEEN pattern first (more efficient)
1331        if let Some((variable, property, min, max, min_inc, max_inc)) =
1332            self.extract_between_predicate(&filter.predicate)
1333            && variable == scan_variable
1334        {
1335            return self.plan_range_filter(
1336                &scan_variable,
1337                &scan_label,
1338                &property,
1339                RangeBounds {
1340                    min: Some(&min),
1341                    max: Some(&max),
1342                    min_inclusive: min_inc,
1343                    max_inclusive: max_inc,
1344                },
1345            );
1346        }
1347
1348        // Try to extract simple range predicate
1349        if let Some((variable, property, op, value)) =
1350            self.extract_range_predicate(&filter.predicate)
1351            && variable == scan_variable
1352        {
1353            let (min, max, min_inc, max_inc) = match op {
1354                BinaryOp::Lt => (None, Some(value), false, false),
1355                BinaryOp::Le => (None, Some(value), false, true),
1356                BinaryOp::Gt => (Some(value), None, false, false),
1357                BinaryOp::Ge => (Some(value), None, true, false),
1358                _ => return Ok(None),
1359            };
1360            return self.plan_range_filter(
1361                &scan_variable,
1362                &scan_label,
1363                &property,
1364                RangeBounds {
1365                    min: min.as_ref(),
1366                    max: max.as_ref(),
1367                    min_inclusive: min_inc,
1368                    max_inclusive: max_inc,
1369                },
1370            );
1371        }
1372
1373        Ok(None)
1374    }
1375
1376    /// Plans a range filter using `find_nodes_in_range`.
1377    fn plan_range_filter(
1378        &self,
1379        scan_variable: &str,
1380        scan_label: &Option<String>,
1381        property: &str,
1382        bounds: RangeBounds<'_>,
1383    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1384        // Use the store's range query method
1385        let mut matching_nodes = self.store.find_nodes_in_range(
1386            property,
1387            bounds.min,
1388            bounds.max,
1389            bounds.min_inclusive,
1390            bounds.max_inclusive,
1391        );
1392
1393        // If there's a label filter, also filter by label
1394        if let Some(label) = scan_label {
1395            let label_nodes: std::collections::HashSet<_> =
1396                self.store.nodes_by_label(label).into_iter().collect();
1397            matching_nodes.retain(|n| label_nodes.contains(n));
1398        }
1399
1400        // MVCC visibility: filter out nodes not visible at the current epoch/tx.
1401        let epoch = self.viewing_epoch;
1402        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
1403        matching_nodes.retain(|id| self.store.get_node_versioned(*id, epoch, tx).is_some());
1404
1405        // Create a NodeListOperator with the matching nodes
1406        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1407        let columns = vec![scan_variable.to_string()];
1408
1409        Ok(Some((node_list_op, columns)))
1410    }
1411
1412    /// Extracts a simple range predicate (>, <, >=, <=) from an expression.
1413    ///
1414    /// Returns `(variable, property, operator, value)` if found.
1415    fn extract_range_predicate(
1416        &self,
1417        predicate: &LogicalExpression,
1418    ) -> Option<(String, String, BinaryOp, Value)> {
1419        match predicate {
1420            LogicalExpression::Binary { left, op, right } => {
1421                match op {
1422                    BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1423                        // Try property on left: n.age > 30
1424                        if let (
1425                            LogicalExpression::Property { variable, property },
1426                            LogicalExpression::Literal(val),
1427                        ) = (left.as_ref(), right.as_ref())
1428                        {
1429                            return Some((variable.clone(), property.clone(), *op, val.clone()));
1430                        }
1431
1432                        // Try property on right: 30 < n.age (flip operator)
1433                        if let (
1434                            LogicalExpression::Literal(val),
1435                            LogicalExpression::Property { variable, property },
1436                        ) = (left.as_ref(), right.as_ref())
1437                        {
1438                            let flipped_op = match op {
1439                                BinaryOp::Lt => BinaryOp::Gt,
1440                                BinaryOp::Le => BinaryOp::Ge,
1441                                BinaryOp::Gt => BinaryOp::Lt,
1442                                BinaryOp::Ge => BinaryOp::Le,
1443                                _ => return None,
1444                            };
1445                            return Some((
1446                                variable.clone(),
1447                                property.clone(),
1448                                flipped_op,
1449                                val.clone(),
1450                            ));
1451                        }
1452                    }
1453                    _ => {}
1454                }
1455            }
1456            _ => {}
1457        }
1458        None
1459    }
1460
1461    /// Extracts a BETWEEN pattern from compound predicates.
1462    ///
1463    /// Recognizes patterns like:
1464    /// - `n.age >= 30 AND n.age <= 50`
1465    /// - `n.age > 30 AND n.age < 50`
1466    ///
1467    /// Returns `(variable, property, min_value, max_value, min_inclusive, max_inclusive)`.
1468    fn extract_between_predicate(
1469        &self,
1470        predicate: &LogicalExpression,
1471    ) -> Option<(String, String, Value, Value, bool, bool)> {
1472        // Must be an AND expression
1473        let (left, right) = match predicate {
1474            LogicalExpression::Binary {
1475                left,
1476                op: BinaryOp::And,
1477                right,
1478            } => (left.as_ref(), right.as_ref()),
1479            _ => return None,
1480        };
1481
1482        // Extract range predicates from both sides
1483        let left_range = self.extract_range_predicate(left);
1484        let right_range = self.extract_range_predicate(right);
1485
1486        let (left_var, left_prop, left_op, left_val) = left_range?;
1487        let (right_var, right_prop, right_op, right_val) = right_range?;
1488
1489        // Must be same variable and property
1490        if left_var != right_var || left_prop != right_prop {
1491            return None;
1492        }
1493
1494        // Determine which is lower bound and which is upper bound
1495        let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1496            // n.x >= min AND n.x <= max
1497            (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1498            // n.x >= min AND n.x < max
1499            (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1500            // n.x > min AND n.x <= max
1501            (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1502            // n.x > min AND n.x < max
1503            (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1504            // Reversed order: n.x <= max AND n.x >= min
1505            (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1506            // n.x < max AND n.x >= min
1507            (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1508            // n.x <= max AND n.x > min
1509            (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1510            // n.x < max AND n.x > min
1511            (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1512            _ => return None,
1513        };
1514
1515        Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1516    }
1517
1518    /// Plans a LIMIT operator.
1519    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1520        let (input_op, columns) = self.plan_operator(&limit.input)?;
1521        let output_schema = self.derive_schema_from_columns(&columns);
1522        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1523        Ok((operator, columns))
1524    }
1525
1526    /// Plans a SKIP operator.
1527    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1528        let (input_op, columns) = self.plan_operator(&skip.input)?;
1529        let output_schema = self.derive_schema_from_columns(&columns);
1530        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1531        Ok((operator, columns))
1532    }
1533
1534    /// Plans a SORT (ORDER BY) operator.
1535    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1536        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1537
1538        // Build variable to column index mapping
1539        let mut variable_columns: HashMap<String, usize> = input_columns
1540            .iter()
1541            .enumerate()
1542            .map(|(i, name)| (name.clone(), i))
1543            .collect();
1544
1545        // Collect property expressions that need to be projected before sorting
1546        let mut property_projections: Vec<(String, String, String)> = Vec::new();
1547        let mut next_col_idx = input_columns.len();
1548
1549        for key in &sort.keys {
1550            if let LogicalExpression::Property { variable, property } = &key.expression {
1551                let col_name = format!("{}_{}", variable, property);
1552                if !variable_columns.contains_key(&col_name) {
1553                    property_projections.push((
1554                        variable.clone(),
1555                        property.clone(),
1556                        col_name.clone(),
1557                    ));
1558                    variable_columns.insert(col_name, next_col_idx);
1559                    next_col_idx += 1;
1560                }
1561            }
1562        }
1563
1564        // Track output columns
1565        let mut output_columns = input_columns.clone();
1566
1567        // If we have property expressions, add a projection to materialize them
1568        if !property_projections.is_empty() {
1569            let mut projections = Vec::new();
1570            let mut output_types = Vec::new();
1571
1572            // First, pass through all existing columns (use Node type to preserve node IDs
1573            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1574            for (i, _) in input_columns.iter().enumerate() {
1575                projections.push(ProjectExpr::Column(i));
1576                output_types.push(LogicalType::Node);
1577            }
1578
1579            // Then add property access projections
1580            for (variable, property, col_name) in &property_projections {
1581                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1582                    Error::Internal(format!(
1583                        "Variable '{}' not found for ORDER BY property projection",
1584                        variable
1585                    ))
1586                })?;
1587                projections.push(ProjectExpr::PropertyAccess {
1588                    column: source_col,
1589                    property: property.clone(),
1590                });
1591                output_types.push(LogicalType::Any);
1592                output_columns.push(col_name.clone());
1593            }
1594
1595            input_op = Box::new(ProjectOperator::with_store(
1596                input_op,
1597                projections,
1598                output_types,
1599                Arc::clone(&self.store),
1600            ));
1601        }
1602
1603        // Convert logical sort keys to physical sort keys
1604        let physical_keys: Vec<PhysicalSortKey> = sort
1605            .keys
1606            .iter()
1607            .map(|key| {
1608                let col_idx = self
1609                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1610                Ok(PhysicalSortKey {
1611                    column: col_idx,
1612                    direction: match key.order {
1613                        SortOrder::Ascending => SortDirection::Ascending,
1614                        SortOrder::Descending => SortDirection::Descending,
1615                    },
1616                    null_order: NullOrder::NullsLast,
1617                })
1618            })
1619            .collect::<Result<Vec<_>>>()?;
1620
1621        let output_schema = self.derive_schema_from_columns(&output_columns);
1622        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1623        Ok((operator, output_columns))
1624    }
1625
1626    /// Resolves a sort expression to a column index, using projected property columns.
1627    fn resolve_sort_expression_with_properties(
1628        &self,
1629        expr: &LogicalExpression,
1630        variable_columns: &HashMap<String, usize>,
1631    ) -> Result<usize> {
1632        match expr {
1633            LogicalExpression::Variable(name) => {
1634                variable_columns.get(name).copied().ok_or_else(|| {
1635                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1636                })
1637            }
1638            LogicalExpression::Property { variable, property } => {
1639                // Look up the projected property column (e.g., "p_age" for p.age)
1640                let col_name = format!("{}_{}", variable, property);
1641                variable_columns.get(&col_name).copied().ok_or_else(|| {
1642                    Error::Internal(format!(
1643                        "Property column '{}' not found for ORDER BY (from {}.{})",
1644                        col_name, variable, property
1645                    ))
1646                })
1647            }
1648            _ => Err(Error::Internal(format!(
1649                "Unsupported ORDER BY expression: {:?}",
1650                expr
1651            ))),
1652        }
1653    }
1654
1655    /// Derives a schema from column names (uses Any type to handle all value types).
1656    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1657        columns.iter().map(|_| LogicalType::Any).collect()
1658    }
1659
1660    /// Plans an AGGREGATE operator.
1661    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1662        // Check if we can use factorized aggregation for speedup
1663        // Conditions:
1664        // 1. Factorized execution is enabled
1665        // 2. Input is an expand chain (multi-hop)
1666        // 3. No GROUP BY
1667        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1668        if self.factorized_execution
1669            && agg.group_by.is_empty()
1670            && Self::count_expand_chain(&agg.input).0 >= 2
1671            && self.is_simple_aggregate(agg)
1672            && let Ok((op, cols)) = self.plan_factorized_aggregate(agg)
1673        {
1674            return Ok((op, cols));
1675        }
1676        // Fall through to regular aggregate if factorized planning fails
1677
1678        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1679
1680        // Build variable to column index mapping
1681        let mut variable_columns: HashMap<String, usize> = input_columns
1682            .iter()
1683            .enumerate()
1684            .map(|(i, name)| (name.clone(), i))
1685            .collect();
1686
1687        // Collect all property expressions that need to be projected before aggregation
1688        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1689        let mut next_col_idx = input_columns.len();
1690
1691        // Check group-by expressions for properties
1692        for expr in &agg.group_by {
1693            if let LogicalExpression::Property { variable, property } = expr {
1694                let col_name = format!("{}_{}", variable, property);
1695                if !variable_columns.contains_key(&col_name) {
1696                    property_projections.push((
1697                        variable.clone(),
1698                        property.clone(),
1699                        col_name.clone(),
1700                    ));
1701                    variable_columns.insert(col_name, next_col_idx);
1702                    next_col_idx += 1;
1703                }
1704            }
1705        }
1706
1707        // Check aggregate expressions for properties
1708        for agg_expr in &agg.aggregates {
1709            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1710                let col_name = format!("{}_{}", variable, property);
1711                if !variable_columns.contains_key(&col_name) {
1712                    property_projections.push((
1713                        variable.clone(),
1714                        property.clone(),
1715                        col_name.clone(),
1716                    ));
1717                    variable_columns.insert(col_name, next_col_idx);
1718                    next_col_idx += 1;
1719                }
1720            }
1721        }
1722
1723        // If we have property expressions, add a projection to materialize them
1724        if !property_projections.is_empty() {
1725            let mut projections = Vec::new();
1726            let mut output_types = Vec::new();
1727
1728            // First, pass through all existing columns (use Node type to preserve node IDs
1729            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1730            for (i, _) in input_columns.iter().enumerate() {
1731                projections.push(ProjectExpr::Column(i));
1732                output_types.push(LogicalType::Node);
1733            }
1734
1735            // Then add property access projections
1736            for (variable, property, _col_name) in &property_projections {
1737                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1738                    Error::Internal(format!(
1739                        "Variable '{}' not found for property projection",
1740                        variable
1741                    ))
1742                })?;
1743                projections.push(ProjectExpr::PropertyAccess {
1744                    column: source_col,
1745                    property: property.clone(),
1746                });
1747                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1748            }
1749
1750            input_op = Box::new(ProjectOperator::with_store(
1751                input_op,
1752                projections,
1753                output_types,
1754                Arc::clone(&self.store),
1755            ));
1756        }
1757
1758        // Convert group-by expressions to column indices
1759        let group_columns: Vec<usize> = agg
1760            .group_by
1761            .iter()
1762            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1763            .collect::<Result<Vec<_>>>()?;
1764
1765        // Convert aggregate expressions to physical form
1766        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1767            .aggregates
1768            .iter()
1769            .map(|agg_expr| {
1770                let column = agg_expr
1771                    .expression
1772                    .as_ref()
1773                    .map(|e| {
1774                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1775                    })
1776                    .transpose()?;
1777
1778                Ok(PhysicalAggregateExpr {
1779                    function: convert_aggregate_function(agg_expr.function),
1780                    column,
1781                    distinct: agg_expr.distinct,
1782                    alias: agg_expr.alias.clone(),
1783                    percentile: agg_expr.percentile,
1784                })
1785            })
1786            .collect::<Result<Vec<_>>>()?;
1787
1788        // Build output schema and column names
1789        let mut output_schema = Vec::new();
1790        let mut output_columns = Vec::new();
1791
1792        // Add group-by columns
1793        for expr in &agg.group_by {
1794            output_schema.push(LogicalType::Any); // Group-by values can be any type
1795            output_columns.push(expression_to_string(expr));
1796        }
1797
1798        // Add aggregate result columns
1799        for agg_expr in &agg.aggregates {
1800            let result_type = match agg_expr.function {
1801                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1802                    LogicalType::Int64
1803                }
1804                LogicalAggregateFunction::Sum => LogicalType::Int64,
1805                LogicalAggregateFunction::Avg => LogicalType::Float64,
1806                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1807                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1808                    // since the aggregate can return any Value type, but the most common case
1809                    // is numeric values from property expressions
1810                    LogicalType::Int64
1811                }
1812                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1813                // Statistical functions return Float64
1814                LogicalAggregateFunction::StdDev
1815                | LogicalAggregateFunction::StdDevPop
1816                | LogicalAggregateFunction::PercentileDisc
1817                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1818            };
1819            output_schema.push(result_type);
1820            output_columns.push(
1821                agg_expr
1822                    .alias
1823                    .clone()
1824                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1825            );
1826        }
1827
1828        // Choose operator based on whether there are group-by columns
1829        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1830            Box::new(SimpleAggregateOperator::new(
1831                input_op,
1832                physical_aggregates,
1833                output_schema,
1834            ))
1835        } else {
1836            Box::new(HashAggregateOperator::new(
1837                input_op,
1838                group_columns,
1839                physical_aggregates,
1840                output_schema,
1841            ))
1842        };
1843
1844        // Apply HAVING clause filter if present
1845        if let Some(having_expr) = &agg.having {
1846            // Build variable to column mapping for the aggregate output
1847            let having_var_columns: HashMap<String, usize> = output_columns
1848                .iter()
1849                .enumerate()
1850                .map(|(i, name)| (name.clone(), i))
1851                .collect();
1852
1853            let filter_expr = self.convert_expression(having_expr)?;
1854            let predicate =
1855                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1856            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1857        }
1858
1859        Ok((operator, output_columns))
1860    }
1861
1862    /// Checks if an aggregate is simple enough for factorized execution.
1863    ///
1864    /// Simple aggregates:
1865    /// - COUNT(*) or COUNT(variable)
1866    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1867    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1868        agg.aggregates.iter().all(|agg_expr| {
1869            match agg_expr.function {
1870                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1871                    // COUNT(*) is always OK, COUNT(var) is OK
1872                    agg_expr.expression.is_none()
1873                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1874                }
1875                LogicalAggregateFunction::Sum
1876                | LogicalAggregateFunction::Avg
1877                | LogicalAggregateFunction::Min
1878                | LogicalAggregateFunction::Max => {
1879                    // For now, only support when expression is a variable
1880                    // (property access would require flattening first)
1881                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1882                }
1883                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1884                _ => false,
1885            }
1886        })
1887    }
1888
1889    /// Plans a factorized aggregate that operates directly on factorized data.
1890    ///
1891    /// This avoids the O(n²) cost of flattening before aggregation.
1892    fn plan_factorized_aggregate(
1893        &self,
1894        agg: &AggregateOp,
1895    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1896        // Build the expand chain - this returns a LazyFactorizedChainOperator
1897        let expands = Self::collect_expand_chain(&agg.input);
1898        if expands.is_empty() {
1899            return Err(Error::Internal(
1900                "Expected expand chain for factorized aggregate".to_string(),
1901            ));
1902        }
1903
1904        // Get the base operator (before first expand)
1905        let first_expand = expands[0];
1906        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1907
1908        let mut columns = base_columns.clone();
1909        let mut steps = Vec::new();
1910        let mut is_first = true;
1911
1912        for expand in &expands {
1913            // Find source column for this expand
1914            let source_column = if is_first {
1915                base_columns
1916                    .iter()
1917                    .position(|c| c == &expand.from_variable)
1918                    .ok_or_else(|| {
1919                        Error::Internal(format!(
1920                            "Source variable '{}' not found in base columns",
1921                            expand.from_variable
1922                        ))
1923                    })?
1924            } else {
1925                1 // Target from previous level
1926            };
1927
1928            let direction = match expand.direction {
1929                ExpandDirection::Outgoing => Direction::Outgoing,
1930                ExpandDirection::Incoming => Direction::Incoming,
1931                ExpandDirection::Both => Direction::Both,
1932            };
1933
1934            steps.push(ExpandStep {
1935                source_column,
1936                direction,
1937                edge_type: expand.edge_type.clone(),
1938            });
1939
1940            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1941                let count = self.anon_edge_counter.get();
1942                self.anon_edge_counter.set(count + 1);
1943                format!("_anon_edge_{}", count)
1944            });
1945            columns.push(edge_col_name);
1946            columns.push(expand.to_variable.clone());
1947
1948            is_first = false;
1949        }
1950
1951        // Create the lazy factorized chain operator
1952        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1953
1954        if let Some(tx_id) = self.tx_id {
1955            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1956        } else {
1957            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1958        }
1959
1960        // Convert logical aggregates to factorized aggregates
1961        let factorized_aggs: Vec<FactorizedAggregate> = agg
1962            .aggregates
1963            .iter()
1964            .map(|agg_expr| {
1965                match agg_expr.function {
1966                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1967                        // COUNT(*) uses simple count, COUNT(col) uses column count
1968                        if agg_expr.expression.is_none() {
1969                            FactorizedAggregate::count()
1970                        } else {
1971                            // For COUNT(variable), we use the deepest level's target column
1972                            // which is the last column added to the schema
1973                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1974                        }
1975                    }
1976                    LogicalAggregateFunction::Sum => {
1977                        // SUM on deepest level target
1978                        FactorizedAggregate::sum(1)
1979                    }
1980                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1981                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1982                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1983                    _ => {
1984                        // Shouldn't reach here due to is_simple_aggregate check
1985                        FactorizedAggregate::count()
1986                    }
1987                }
1988            })
1989            .collect();
1990
1991        // Build output column names
1992        let output_columns: Vec<String> = agg
1993            .aggregates
1994            .iter()
1995            .map(|agg_expr| {
1996                agg_expr
1997                    .alias
1998                    .clone()
1999                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
2000            })
2001            .collect();
2002
2003        // Create the factorized aggregate operator
2004        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
2005
2006        Ok((Box::new(factorized_agg_op), output_columns))
2007    }
2008
2009    /// Resolves a logical expression to a column index.
2010    #[allow(dead_code)]
2011    fn resolve_expression_to_column(
2012        &self,
2013        expr: &LogicalExpression,
2014        variable_columns: &HashMap<String, usize>,
2015    ) -> Result<usize> {
2016        match expr {
2017            LogicalExpression::Variable(name) => variable_columns
2018                .get(name)
2019                .copied()
2020                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2021            LogicalExpression::Property { variable, .. } => variable_columns
2022                .get(variable)
2023                .copied()
2024                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
2025            _ => Err(Error::Internal(format!(
2026                "Cannot resolve expression to column: {:?}",
2027                expr
2028            ))),
2029        }
2030    }
2031
2032    /// Resolves a logical expression to a column index, using projected property columns.
2033    ///
2034    /// This is used for aggregations where properties have been projected into their own columns.
2035    fn resolve_expression_to_column_with_properties(
2036        &self,
2037        expr: &LogicalExpression,
2038        variable_columns: &HashMap<String, usize>,
2039    ) -> Result<usize> {
2040        match expr {
2041            LogicalExpression::Variable(name) => variable_columns
2042                .get(name)
2043                .copied()
2044                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2045            LogicalExpression::Property { variable, property } => {
2046                // Look up the projected property column (e.g., "p_price" for p.price)
2047                let col_name = format!("{}_{}", variable, property);
2048                variable_columns.get(&col_name).copied().ok_or_else(|| {
2049                    Error::Internal(format!(
2050                        "Property column '{}' not found (from {}.{})",
2051                        col_name, variable, property
2052                    ))
2053                })
2054            }
2055            _ => Err(Error::Internal(format!(
2056                "Cannot resolve expression to column: {:?}",
2057                expr
2058            ))),
2059        }
2060    }
2061
2062    /// Converts a logical expression to a filter expression.
2063    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
2064        match expr {
2065            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2066            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2067            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2068                variable: variable.clone(),
2069                property: property.clone(),
2070            }),
2071            LogicalExpression::Binary { left, op, right } => {
2072                let left_expr = self.convert_expression(left)?;
2073                let right_expr = self.convert_expression(right)?;
2074                let filter_op = convert_binary_op(*op)?;
2075                Ok(FilterExpression::Binary {
2076                    left: Box::new(left_expr),
2077                    op: filter_op,
2078                    right: Box::new(right_expr),
2079                })
2080            }
2081            LogicalExpression::Unary { op, operand } => {
2082                let operand_expr = self.convert_expression(operand)?;
2083                let filter_op = convert_unary_op(*op)?;
2084                Ok(FilterExpression::Unary {
2085                    op: filter_op,
2086                    operand: Box::new(operand_expr),
2087                })
2088            }
2089            LogicalExpression::FunctionCall { name, args, .. } => {
2090                let filter_args: Vec<FilterExpression> = args
2091                    .iter()
2092                    .map(|a| self.convert_expression(a))
2093                    .collect::<Result<Vec<_>>>()?;
2094                Ok(FilterExpression::FunctionCall {
2095                    name: name.clone(),
2096                    args: filter_args,
2097                })
2098            }
2099            LogicalExpression::Case {
2100                operand,
2101                when_clauses,
2102                else_clause,
2103            } => {
2104                let filter_operand = operand
2105                    .as_ref()
2106                    .map(|e| self.convert_expression(e))
2107                    .transpose()?
2108                    .map(Box::new);
2109                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2110                    .iter()
2111                    .map(|(cond, result)| {
2112                        Ok((
2113                            self.convert_expression(cond)?,
2114                            self.convert_expression(result)?,
2115                        ))
2116                    })
2117                    .collect::<Result<Vec<_>>>()?;
2118                let filter_else = else_clause
2119                    .as_ref()
2120                    .map(|e| self.convert_expression(e))
2121                    .transpose()?
2122                    .map(Box::new);
2123                Ok(FilterExpression::Case {
2124                    operand: filter_operand,
2125                    when_clauses: filter_when_clauses,
2126                    else_clause: filter_else,
2127                })
2128            }
2129            LogicalExpression::List(items) => {
2130                let filter_items: Vec<FilterExpression> = items
2131                    .iter()
2132                    .map(|item| self.convert_expression(item))
2133                    .collect::<Result<Vec<_>>>()?;
2134                Ok(FilterExpression::List(filter_items))
2135            }
2136            LogicalExpression::Map(pairs) => {
2137                let filter_pairs: Vec<(String, FilterExpression)> = pairs
2138                    .iter()
2139                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2140                    .collect::<Result<Vec<_>>>()?;
2141                Ok(FilterExpression::Map(filter_pairs))
2142            }
2143            LogicalExpression::IndexAccess { base, index } => {
2144                let base_expr = self.convert_expression(base)?;
2145                let index_expr = self.convert_expression(index)?;
2146                Ok(FilterExpression::IndexAccess {
2147                    base: Box::new(base_expr),
2148                    index: Box::new(index_expr),
2149                })
2150            }
2151            LogicalExpression::SliceAccess { base, start, end } => {
2152                let base_expr = self.convert_expression(base)?;
2153                let start_expr = start
2154                    .as_ref()
2155                    .map(|s| self.convert_expression(s))
2156                    .transpose()?
2157                    .map(Box::new);
2158                let end_expr = end
2159                    .as_ref()
2160                    .map(|e| self.convert_expression(e))
2161                    .transpose()?
2162                    .map(Box::new);
2163                Ok(FilterExpression::SliceAccess {
2164                    base: Box::new(base_expr),
2165                    start: start_expr,
2166                    end: end_expr,
2167                })
2168            }
2169            LogicalExpression::Parameter(_) => Err(Error::Internal(
2170                "Parameters not yet supported in filters".to_string(),
2171            )),
2172            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2173            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2174            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2175            LogicalExpression::ListComprehension {
2176                variable,
2177                list_expr,
2178                filter_expr,
2179                map_expr,
2180            } => {
2181                let list = self.convert_expression(list_expr)?;
2182                let filter = filter_expr
2183                    .as_ref()
2184                    .map(|f| self.convert_expression(f))
2185                    .transpose()?
2186                    .map(Box::new);
2187                let map = self.convert_expression(map_expr)?;
2188                Ok(FilterExpression::ListComprehension {
2189                    variable: variable.clone(),
2190                    list_expr: Box::new(list),
2191                    filter_expr: filter,
2192                    map_expr: Box::new(map),
2193                })
2194            }
2195            LogicalExpression::ExistsSubquery(subplan) => {
2196                // Extract the pattern from the subplan
2197                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
2198                let (start_var, direction, edge_type, end_labels) =
2199                    self.extract_exists_pattern(subplan)?;
2200
2201                Ok(FilterExpression::ExistsSubquery {
2202                    start_var,
2203                    direction,
2204                    edge_type,
2205                    end_labels,
2206                    min_hops: None,
2207                    max_hops: None,
2208                })
2209            }
2210            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2211                "COUNT subqueries not yet supported".to_string(),
2212            )),
2213        }
2214    }
2215
2216    /// Extracts the pattern from an EXISTS subplan.
2217    /// Returns (start_variable, direction, edge_type, end_labels).
2218    fn extract_exists_pattern(
2219        &self,
2220        subplan: &LogicalOperator,
2221    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2222        match subplan {
2223            LogicalOperator::Expand(expand) => {
2224                // Get end node labels from the to_variable if there's a node scan input
2225                let end_labels = self.extract_end_labels_from_expand(expand);
2226                let direction = match expand.direction {
2227                    ExpandDirection::Outgoing => Direction::Outgoing,
2228                    ExpandDirection::Incoming => Direction::Incoming,
2229                    ExpandDirection::Both => Direction::Both,
2230                };
2231                Ok((
2232                    expand.from_variable.clone(),
2233                    direction,
2234                    expand.edge_type.clone(),
2235                    end_labels,
2236                ))
2237            }
2238            LogicalOperator::NodeScan(scan) => {
2239                if let Some(input) = &scan.input {
2240                    self.extract_exists_pattern(input)
2241                } else {
2242                    Err(Error::Internal(
2243                        "EXISTS subquery must contain an edge pattern".to_string(),
2244                    ))
2245                }
2246            }
2247            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2248            _ => Err(Error::Internal(
2249                "Unsupported EXISTS subquery pattern".to_string(),
2250            )),
2251        }
2252    }
2253
2254    /// Extracts end node labels from an Expand operator if present.
2255    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2256        // Check if the expand has a NodeScan input with a label filter
2257        match expand.input.as_ref() {
2258            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2259            _ => None,
2260        }
2261    }
2262
2263    /// Plans a JOIN operator.
2264    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2265        let (left_op, left_columns) = self.plan_operator(&join.left)?;
2266        let (right_op, right_columns) = self.plan_operator(&join.right)?;
2267
2268        // Build combined output columns
2269        let mut columns = left_columns.clone();
2270        columns.extend(right_columns.clone());
2271
2272        // Convert join type
2273        let physical_join_type = match join.join_type {
2274            JoinType::Inner => PhysicalJoinType::Inner,
2275            JoinType::Left => PhysicalJoinType::Left,
2276            JoinType::Right => PhysicalJoinType::Right,
2277            JoinType::Full => PhysicalJoinType::Full,
2278            JoinType::Cross => PhysicalJoinType::Cross,
2279            JoinType::Semi => PhysicalJoinType::Semi,
2280            JoinType::Anti => PhysicalJoinType::Anti,
2281        };
2282
2283        // Build key columns from join conditions
2284        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2285            // Cross join - no keys
2286            (vec![], vec![])
2287        } else {
2288            join.conditions
2289                .iter()
2290                .filter_map(|cond| {
2291                    // Try to extract column indices from expressions
2292                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2293                    let right_idx = self
2294                        .expression_to_column(&cond.right, &right_columns)
2295                        .ok()?;
2296                    Some((left_idx, right_idx))
2297                })
2298                .unzip()
2299        };
2300
2301        let output_schema = self.derive_schema_from_columns(&columns);
2302
2303        // Check if we should use leapfrog join for cyclic patterns
2304        // Currently we use hash join by default; leapfrog is available but
2305        // requires explicit multi-way join detection which will be added
2306        // when we have proper cyclic pattern detection in the optimizer.
2307        // For now, LeapfrogJoinOperator is available for direct use.
2308        let _ = LeapfrogJoinOperator::new; // Suppress unused warning
2309
2310        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2311            left_op,
2312            right_op,
2313            probe_keys,
2314            build_keys,
2315            physical_join_type,
2316            output_schema,
2317        ));
2318
2319        Ok((operator, columns))
2320    }
2321
2322    /// Checks if a join pattern is cyclic (e.g., triangle, clique).
2323    ///
2324    /// A cyclic pattern occurs when join conditions reference variables
2325    /// that create a cycle in the join graph. For example, a triangle
2326    /// pattern (a)->(b)->(c)->(a) creates a cycle.
2327    ///
2328    /// Returns true if the join graph contains at least one cycle with 3+ nodes,
2329    /// indicating potential for worst-case optimal join (WCOJ) optimization.
2330    #[allow(dead_code)]
2331    fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2332        // Build adjacency list for join variables
2333        let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2334        let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2335
2336        // Collect edges from join conditions
2337        Self::collect_join_edges(
2338            &LogicalOperator::Join(join.clone()),
2339            &mut edges,
2340            &mut all_vars,
2341        );
2342
2343        // Need at least 3 variables to form a cycle
2344        if all_vars.len() < 3 {
2345            return false;
2346        }
2347
2348        // Detect cycle using DFS with coloring
2349        Self::has_cycle(&edges, &all_vars)
2350    }
2351
2352    /// Collects edges from join conditions into an adjacency list.
2353    fn collect_join_edges(
2354        op: &LogicalOperator,
2355        edges: &mut HashMap<String, Vec<String>>,
2356        vars: &mut std::collections::HashSet<String>,
2357    ) {
2358        match op {
2359            LogicalOperator::Join(join) => {
2360                // Process join conditions
2361                for cond in &join.conditions {
2362                    if let (Some(left_var), Some(right_var)) = (
2363                        Self::extract_join_variable(&cond.left),
2364                        Self::extract_join_variable(&cond.right),
2365                    ) && left_var != right_var
2366                    {
2367                        vars.insert(left_var.clone());
2368                        vars.insert(right_var.clone());
2369
2370                        // Add bidirectional edge
2371                        edges
2372                            .entry(left_var.clone())
2373                            .or_default()
2374                            .push(right_var.clone());
2375                        edges.entry(right_var).or_default().push(left_var);
2376                    }
2377                }
2378
2379                // Recurse into children
2380                Self::collect_join_edges(&join.left, edges, vars);
2381                Self::collect_join_edges(&join.right, edges, vars);
2382            }
2383            LogicalOperator::Expand(expand) => {
2384                // Expand creates implicit join between from_variable and to_variable
2385                vars.insert(expand.from_variable.clone());
2386                vars.insert(expand.to_variable.clone());
2387
2388                edges
2389                    .entry(expand.from_variable.clone())
2390                    .or_default()
2391                    .push(expand.to_variable.clone());
2392                edges
2393                    .entry(expand.to_variable.clone())
2394                    .or_default()
2395                    .push(expand.from_variable.clone());
2396
2397                Self::collect_join_edges(&expand.input, edges, vars);
2398            }
2399            LogicalOperator::Filter(filter) => {
2400                Self::collect_join_edges(&filter.input, edges, vars);
2401            }
2402            LogicalOperator::NodeScan(scan) => {
2403                vars.insert(scan.variable.clone());
2404            }
2405            _ => {}
2406        }
2407    }
2408
2409    /// Extracts the variable name from a join expression.
2410    fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2411        match expr {
2412            LogicalExpression::Variable(v) => Some(v.clone()),
2413            LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2414            LogicalExpression::Id(v) => Some(v.clone()),
2415            _ => None,
2416        }
2417    }
2418
2419    /// Detects if the graph has a cycle using DFS coloring.
2420    ///
2421    /// Colors: 0 = white (unvisited), 1 = gray (in progress), 2 = black (done)
2422    fn has_cycle(
2423        edges: &HashMap<String, Vec<String>>,
2424        vars: &std::collections::HashSet<String>,
2425    ) -> bool {
2426        let mut color: HashMap<&String, u8> = HashMap::new();
2427
2428        for var in vars {
2429            color.insert(var, 0);
2430        }
2431
2432        for start in vars {
2433            if color[start] == 0 && Self::dfs_cycle(start, None, edges, &mut color) {
2434                return true;
2435            }
2436        }
2437
2438        false
2439    }
2440
2441    /// DFS helper for cycle detection.
2442    fn dfs_cycle(
2443        node: &String,
2444        parent: Option<&String>,
2445        edges: &HashMap<String, Vec<String>>,
2446        color: &mut HashMap<&String, u8>,
2447    ) -> bool {
2448        *color.get_mut(node).unwrap() = 1; // Gray
2449
2450        if let Some(neighbors) = edges.get(node) {
2451            for neighbor in neighbors {
2452                // Skip the edge back to parent (undirected graph)
2453                if parent == Some(neighbor) {
2454                    continue;
2455                }
2456
2457                if let Some(&c) = color.get(neighbor) {
2458                    if c == 1 {
2459                        // Found a back edge - cycle detected
2460                        return true;
2461                    }
2462                    if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2463                        return true;
2464                    }
2465                }
2466            }
2467        }
2468
2469        *color.get_mut(node).unwrap() = 2; // Black
2470        false
2471    }
2472
2473    /// Counts the number of base relations in a logical operator tree.
2474    #[allow(dead_code)]
2475    fn count_relations(op: &LogicalOperator) -> usize {
2476        match op {
2477            LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2478            LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2479            LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2480            LogicalOperator::Join(j) => {
2481                Self::count_relations(&j.left) + Self::count_relations(&j.right)
2482            }
2483            _ => 0,
2484        }
2485    }
2486
2487    /// Extracts a column index from an expression.
2488    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2489        match expr {
2490            LogicalExpression::Variable(name) => columns
2491                .iter()
2492                .position(|c| c == name)
2493                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2494            _ => Err(Error::Internal(
2495                "Only variables supported in join conditions".to_string(),
2496            )),
2497        }
2498    }
2499
2500    /// Plans a UNION operator.
2501    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2502        if union.inputs.is_empty() {
2503            return Err(Error::Internal(
2504                "Union requires at least one input".to_string(),
2505            ));
2506        }
2507
2508        let mut inputs = Vec::with_capacity(union.inputs.len());
2509        let mut columns = Vec::new();
2510
2511        for (i, input) in union.inputs.iter().enumerate() {
2512            let (op, cols) = self.plan_operator(input)?;
2513            if i == 0 {
2514                columns = cols;
2515            }
2516            inputs.push(op);
2517        }
2518
2519        let output_schema = self.derive_schema_from_columns(&columns);
2520        let operator = Box::new(UnionOperator::new(inputs, output_schema));
2521
2522        Ok((operator, columns))
2523    }
2524
2525    /// Plans a DISTINCT operator.
2526    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2527        let (input_op, columns) = self.plan_operator(&distinct.input)?;
2528        let output_schema = self.derive_schema_from_columns(&columns);
2529        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2530        Ok((operator, columns))
2531    }
2532
2533    /// Plans a CREATE NODE operator.
2534    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2535        // Plan input if present
2536        let (input_op, mut columns) = if let Some(ref input) = create.input {
2537            let (op, cols) = self.plan_operator(input)?;
2538            (Some(op), cols)
2539        } else {
2540            (None, vec![])
2541        };
2542
2543        // Output column for the created node
2544        let output_column = columns.len();
2545        columns.push(create.variable.clone());
2546
2547        // Convert properties (with constant-folding for lists and function calls)
2548        let properties: Vec<(String, PropertySource)> = create
2549            .properties
2550            .iter()
2551            .map(|(name, expr)| {
2552                let source = match Self::try_fold_expression(expr) {
2553                    Some(value) => PropertySource::Constant(value),
2554                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2555                };
2556                (name.clone(), source)
2557            })
2558            .collect();
2559
2560        let output_schema = self.derive_schema_from_columns(&columns);
2561
2562        let operator = Box::new(
2563            CreateNodeOperator::new(
2564                Arc::clone(&self.store),
2565                input_op,
2566                create.labels.clone(),
2567                properties,
2568                output_schema,
2569                output_column,
2570            )
2571            .with_tx_context(self.viewing_epoch, self.tx_id),
2572        );
2573
2574        Ok((operator, columns))
2575    }
2576
2577    /// Plans a CREATE EDGE operator.
2578    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2579        let (input_op, mut columns) = self.plan_operator(&create.input)?;
2580
2581        // Find source and target columns
2582        let from_column = columns
2583            .iter()
2584            .position(|c| c == &create.from_variable)
2585            .ok_or_else(|| {
2586                Error::Internal(format!(
2587                    "Source variable '{}' not found",
2588                    create.from_variable
2589                ))
2590            })?;
2591
2592        let to_column = columns
2593            .iter()
2594            .position(|c| c == &create.to_variable)
2595            .ok_or_else(|| {
2596                Error::Internal(format!(
2597                    "Target variable '{}' not found",
2598                    create.to_variable
2599                ))
2600            })?;
2601
2602        // Output column for the created edge (if named)
2603        let output_column = create.variable.as_ref().map(|v| {
2604            let idx = columns.len();
2605            columns.push(v.clone());
2606            idx
2607        });
2608
2609        // Convert properties (with constant-folding for function calls like vector())
2610        let properties: Vec<(String, PropertySource)> = create
2611            .properties
2612            .iter()
2613            .map(|(name, expr)| {
2614                let source = match Self::try_fold_expression(expr) {
2615                    Some(value) => PropertySource::Constant(value),
2616                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2617                };
2618                (name.clone(), source)
2619            })
2620            .collect();
2621
2622        let output_schema = self.derive_schema_from_columns(&columns);
2623
2624        let mut operator = CreateEdgeOperator::new(
2625            Arc::clone(&self.store),
2626            input_op,
2627            from_column,
2628            to_column,
2629            create.edge_type.clone(),
2630            output_schema,
2631        )
2632        .with_properties(properties)
2633        .with_tx_context(self.viewing_epoch, self.tx_id);
2634
2635        if let Some(col) = output_column {
2636            operator = operator.with_output_column(col);
2637        }
2638
2639        let operator = Box::new(operator);
2640
2641        Ok((operator, columns))
2642    }
2643
2644    /// Plans a DELETE NODE operator.
2645    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2646        let (input_op, columns) = self.plan_operator(&delete.input)?;
2647
2648        let node_column = columns
2649            .iter()
2650            .position(|c| c == &delete.variable)
2651            .ok_or_else(|| {
2652                Error::Internal(format!(
2653                    "Variable '{}' not found for delete",
2654                    delete.variable
2655                ))
2656            })?;
2657
2658        // Output schema for delete count
2659        let output_schema = vec![LogicalType::Int64];
2660        let output_columns = vec!["deleted_count".to_string()];
2661
2662        let operator = Box::new(
2663            DeleteNodeOperator::new(
2664                Arc::clone(&self.store),
2665                input_op,
2666                node_column,
2667                output_schema,
2668                delete.detach, // DETACH DELETE deletes connected edges first
2669            )
2670            .with_tx_context(self.viewing_epoch, self.tx_id),
2671        );
2672
2673        Ok((operator, output_columns))
2674    }
2675
2676    /// Plans a DELETE EDGE operator.
2677    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2678        let (input_op, columns) = self.plan_operator(&delete.input)?;
2679
2680        let edge_column = columns
2681            .iter()
2682            .position(|c| c == &delete.variable)
2683            .ok_or_else(|| {
2684                Error::Internal(format!(
2685                    "Variable '{}' not found for delete",
2686                    delete.variable
2687                ))
2688            })?;
2689
2690        // Output schema for delete count
2691        let output_schema = vec![LogicalType::Int64];
2692        let output_columns = vec!["deleted_count".to_string()];
2693
2694        let operator = Box::new(
2695            DeleteEdgeOperator::new(
2696                Arc::clone(&self.store),
2697                input_op,
2698                edge_column,
2699                output_schema,
2700            )
2701            .with_tx_context(self.viewing_epoch, self.tx_id),
2702        );
2703
2704        Ok((operator, output_columns))
2705    }
2706
2707    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
2708    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2709        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2710        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2711
2712        // Build combined output columns (left + right)
2713        let mut columns = left_columns.clone();
2714        columns.extend(right_columns.clone());
2715
2716        // Find common variables between left and right for join keys
2717        let mut probe_keys = Vec::new();
2718        let mut build_keys = Vec::new();
2719
2720        for (right_idx, right_col) in right_columns.iter().enumerate() {
2721            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2722                probe_keys.push(left_idx);
2723                build_keys.push(right_idx);
2724            }
2725        }
2726
2727        let output_schema = self.derive_schema_from_columns(&columns);
2728
2729        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2730            left_op,
2731            right_op,
2732            probe_keys,
2733            build_keys,
2734            PhysicalJoinType::Left,
2735            output_schema,
2736        ));
2737
2738        Ok((operator, columns))
2739    }
2740
2741    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
2742    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2743        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2744        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2745
2746        // Anti-join only keeps left columns (filters out matching rows)
2747        let columns = left_columns.clone();
2748
2749        // Find common variables between left and right for join keys
2750        let mut probe_keys = Vec::new();
2751        let mut build_keys = Vec::new();
2752
2753        for (right_idx, right_col) in right_columns.iter().enumerate() {
2754            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2755                probe_keys.push(left_idx);
2756                build_keys.push(right_idx);
2757            }
2758        }
2759
2760        let output_schema = self.derive_schema_from_columns(&columns);
2761
2762        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2763            left_op,
2764            right_op,
2765            probe_keys,
2766            build_keys,
2767            PhysicalJoinType::Anti,
2768            output_schema,
2769        ));
2770
2771        Ok((operator, columns))
2772    }
2773
2774    /// Plans an unwind operator.
2775    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2776        // Plan the input operator first
2777        // Handle Empty specially - use a single-row operator
2778        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2779            if matches!(&*unwind.input, LogicalOperator::Empty) {
2780                // For UNWIND without prior MATCH, create a single-row input
2781                // We need an operator that produces one row with the list to unwind
2782                // For now, use EmptyScan which produces no rows - we'll handle the literal
2783                // list in the unwind operator itself
2784                let literal_list = self.convert_expression(&unwind.expression)?;
2785
2786                // Create a project operator that produces a single row with the list
2787                let single_row_op: Box<dyn Operator> = Box::new(
2788                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2789                );
2790                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2791                    single_row_op,
2792                    vec![ProjectExpr::Expression {
2793                        expr: literal_list,
2794                        variable_columns: HashMap::new(),
2795                    }],
2796                    vec![LogicalType::Any],
2797                    Arc::clone(&self.store),
2798                ));
2799
2800                (project_op, vec!["__list__".to_string()])
2801            } else {
2802                self.plan_operator(&unwind.input)?
2803            };
2804
2805        // The UNWIND expression should be a list - we need to find/evaluate it
2806        // For now, we handle the case where the expression references an existing column
2807        // or is a literal list
2808
2809        // Find if the expression references an existing column (like a list property)
2810        let list_col_idx = match &unwind.expression {
2811            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2812            LogicalExpression::Property { variable, .. } => {
2813                // Property access needs to be evaluated - for now we'll need the filter predicate
2814                // to evaluate this. For simple cases, we treat it as a list column.
2815                input_columns.iter().position(|c| c == variable)
2816            }
2817            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2818                // Literal list expression - we'll add it as a virtual column
2819                None
2820            }
2821            _ => None,
2822        };
2823
2824        // Build output columns: all input columns plus the new variable
2825        let mut columns = input_columns.clone();
2826        columns.push(unwind.variable.clone());
2827
2828        // Build output schema
2829        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2830        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2831
2832        // Use the list column index if found, otherwise default to 0
2833        // (in which case the first column should contain the list)
2834        let col_idx = list_col_idx.unwrap_or(0);
2835
2836        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2837            input_op,
2838            col_idx,
2839            unwind.variable.clone(),
2840            output_schema,
2841        ));
2842
2843        Ok((operator, columns))
2844    }
2845
2846    /// Plans a MERGE operator.
2847    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2848        // Plan the input operator if present (skip if Empty)
2849        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2850            Vec::new()
2851        } else {
2852            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2853            cols
2854        };
2855
2856        // Convert match properties from LogicalExpression to Value
2857        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2858            .match_properties
2859            .iter()
2860            .filter_map(|(name, expr)| {
2861                if let LogicalExpression::Literal(v) = expr {
2862                    Some((name.clone(), v.clone()))
2863                } else {
2864                    None // Skip non-literal expressions for now
2865                }
2866            })
2867            .collect();
2868
2869        // Convert ON CREATE properties
2870        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2871            .on_create
2872            .iter()
2873            .filter_map(|(name, expr)| {
2874                if let LogicalExpression::Literal(v) = expr {
2875                    Some((name.clone(), v.clone()))
2876                } else {
2877                    None
2878                }
2879            })
2880            .collect();
2881
2882        // Convert ON MATCH properties
2883        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2884            .on_match
2885            .iter()
2886            .filter_map(|(name, expr)| {
2887                if let LogicalExpression::Literal(v) = expr {
2888                    Some((name.clone(), v.clone()))
2889                } else {
2890                    None
2891                }
2892            })
2893            .collect();
2894
2895        // Add the merged node variable to output columns
2896        columns.push(merge.variable.clone());
2897
2898        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2899            Arc::clone(&self.store),
2900            merge.variable.clone(),
2901            merge.labels.clone(),
2902            match_properties,
2903            on_create_properties,
2904            on_match_properties,
2905        ));
2906
2907        Ok((operator, columns))
2908    }
2909
2910    /// Plans a SHORTEST PATH operator.
2911    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2912        // Plan the input operator
2913        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2914
2915        // Find source and target node columns
2916        let source_column = columns
2917            .iter()
2918            .position(|c| c == &sp.source_var)
2919            .ok_or_else(|| {
2920                Error::Internal(format!(
2921                    "Source variable '{}' not found for shortestPath",
2922                    sp.source_var
2923                ))
2924            })?;
2925
2926        let target_column = columns
2927            .iter()
2928            .position(|c| c == &sp.target_var)
2929            .ok_or_else(|| {
2930                Error::Internal(format!(
2931                    "Target variable '{}' not found for shortestPath",
2932                    sp.target_var
2933                ))
2934            })?;
2935
2936        // Convert direction
2937        let direction = match sp.direction {
2938            ExpandDirection::Outgoing => Direction::Outgoing,
2939            ExpandDirection::Incoming => Direction::Incoming,
2940            ExpandDirection::Both => Direction::Both,
2941        };
2942
2943        // Create the shortest path operator
2944        let operator: Box<dyn Operator> = Box::new(
2945            ShortestPathOperator::new(
2946                Arc::clone(&self.store),
2947                input_op,
2948                source_column,
2949                target_column,
2950                sp.edge_type.clone(),
2951                direction,
2952            )
2953            .with_all_paths(sp.all_paths),
2954        );
2955
2956        // Add path length column with the expected naming convention
2957        // The translator expects _path_length_{alias} format for length(p) calls
2958        columns.push(format!("_path_length_{}", sp.path_alias));
2959
2960        Ok((operator, columns))
2961    }
2962
2963    /// Plans a CALL procedure operator.
2964    fn plan_call_procedure(
2965        &self,
2966        call: &CallProcedureOp,
2967    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2968        use crate::procedures::{self, BuiltinProcedures};
2969
2970        static PROCEDURES: std::sync::OnceLock<BuiltinProcedures> = std::sync::OnceLock::new();
2971        let registry = PROCEDURES.get_or_init(BuiltinProcedures::new);
2972
2973        // Special case: grafeo.procedures() lists all procedures
2974        let resolved_name = call.name.join(".");
2975        if resolved_name == "grafeo.procedures" || resolved_name == "procedures" {
2976            let result = procedures::procedures_result(registry);
2977            return self.plan_static_result(result, &call.yield_items);
2978        }
2979
2980        // Look up the algorithm
2981        let algorithm = registry.get(&call.name).ok_or_else(|| {
2982            Error::Internal(format!(
2983                "Unknown procedure: '{}'. Use CALL grafeo.procedures() to list available procedures.",
2984                call.name.join(".")
2985            ))
2986        })?;
2987
2988        // Evaluate arguments to Parameters
2989        let params = procedures::evaluate_arguments(&call.arguments, algorithm.parameters());
2990
2991        // Canonical column names for this algorithm (user-facing names)
2992        let canonical_columns = procedures::output_columns_for_name(algorithm.as_ref());
2993
2994        // Determine output columns from YIELD or algorithm defaults
2995        let yield_columns = call.yield_items.as_ref().map(|items| {
2996            items
2997                .iter()
2998                .map(|item| (item.field_name.clone(), item.alias.clone()))
2999                .collect::<Vec<_>>()
3000        });
3001
3002        let output_columns = if let Some(yield_cols) = &yield_columns {
3003            yield_cols
3004                .iter()
3005                .map(|(name, alias)| alias.clone().unwrap_or_else(|| name.clone()))
3006                .collect()
3007        } else {
3008            canonical_columns.clone()
3009        };
3010
3011        let operator = Box::new(
3012            crate::query::executor::procedure_call::ProcedureCallOperator::new(
3013                Arc::clone(&self.store),
3014                algorithm,
3015                params,
3016                yield_columns,
3017                canonical_columns,
3018            ),
3019        );
3020
3021        Ok((operator, output_columns))
3022    }
3023
3024    /// Plans a static result set (e.g., from `grafeo.procedures()`).
3025    fn plan_static_result(
3026        &self,
3027        result: grafeo_adapters::plugins::AlgorithmResult,
3028        yield_items: &Option<Vec<crate::query::plan::ProcedureYield>>,
3029    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3030        // Determine output columns and column indices
3031        let (output_columns, column_indices) = if let Some(items) = yield_items {
3032            let mut cols = Vec::new();
3033            let mut indices = Vec::new();
3034            for item in items {
3035                let idx = result
3036                    .columns
3037                    .iter()
3038                    .position(|c| c == &item.field_name)
3039                    .ok_or_else(|| {
3040                        Error::Internal(format!(
3041                            "YIELD column '{}' not found (available: {})",
3042                            item.field_name,
3043                            result.columns.join(", ")
3044                        ))
3045                    })?;
3046                indices.push(idx);
3047                cols.push(
3048                    item.alias
3049                        .clone()
3050                        .unwrap_or_else(|| item.field_name.clone()),
3051                );
3052            }
3053            (cols, indices)
3054        } else {
3055            let indices: Vec<usize> = (0..result.columns.len()).collect();
3056            (result.columns.clone(), indices)
3057        };
3058
3059        let operator = Box::new(StaticResultOperator {
3060            rows: result.rows,
3061            column_indices,
3062            row_index: 0,
3063        });
3064
3065        Ok((operator, output_columns))
3066    }
3067
3068    /// Plans an ADD LABEL operator.
3069    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
3070        let (input_op, columns) = self.plan_operator(&add_label.input)?;
3071
3072        // Find the node column
3073        let node_column = columns
3074            .iter()
3075            .position(|c| c == &add_label.variable)
3076            .ok_or_else(|| {
3077                Error::Internal(format!(
3078                    "Variable '{}' not found for ADD LABEL",
3079                    add_label.variable
3080                ))
3081            })?;
3082
3083        // Output schema for update count
3084        let output_schema = vec![LogicalType::Int64];
3085        let output_columns = vec!["labels_added".to_string()];
3086
3087        let operator = Box::new(AddLabelOperator::new(
3088            Arc::clone(&self.store),
3089            input_op,
3090            node_column,
3091            add_label.labels.clone(),
3092            output_schema,
3093        ));
3094
3095        Ok((operator, output_columns))
3096    }
3097
3098    /// Plans a REMOVE LABEL operator.
3099    fn plan_remove_label(
3100        &self,
3101        remove_label: &RemoveLabelOp,
3102    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3103        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
3104
3105        // Find the node column
3106        let node_column = columns
3107            .iter()
3108            .position(|c| c == &remove_label.variable)
3109            .ok_or_else(|| {
3110                Error::Internal(format!(
3111                    "Variable '{}' not found for REMOVE LABEL",
3112                    remove_label.variable
3113                ))
3114            })?;
3115
3116        // Output schema for update count
3117        let output_schema = vec![LogicalType::Int64];
3118        let output_columns = vec!["labels_removed".to_string()];
3119
3120        let operator = Box::new(RemoveLabelOperator::new(
3121            Arc::clone(&self.store),
3122            input_op,
3123            node_column,
3124            remove_label.labels.clone(),
3125            output_schema,
3126        ));
3127
3128        Ok((operator, output_columns))
3129    }
3130
3131    /// Plans a SET PROPERTY operator.
3132    fn plan_set_property(
3133        &self,
3134        set_prop: &SetPropertyOp,
3135    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3136        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
3137
3138        // Find the entity column (node or edge variable)
3139        let entity_column = columns
3140            .iter()
3141            .position(|c| c == &set_prop.variable)
3142            .ok_or_else(|| {
3143                Error::Internal(format!(
3144                    "Variable '{}' not found for SET",
3145                    set_prop.variable
3146                ))
3147            })?;
3148
3149        // Convert properties to PropertySource
3150        let properties: Vec<(String, PropertySource)> = set_prop
3151            .properties
3152            .iter()
3153            .map(|(name, expr)| {
3154                let source = self.expression_to_property_source(expr, &columns)?;
3155                Ok((name.clone(), source))
3156            })
3157            .collect::<Result<Vec<_>>>()?;
3158
3159        // Output schema preserves input schema (passes through)
3160        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
3161        let output_columns = columns.clone();
3162
3163        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
3164        let operator = Box::new(SetPropertyOperator::new_for_node(
3165            Arc::clone(&self.store),
3166            input_op,
3167            entity_column,
3168            properties,
3169            output_schema,
3170        ));
3171
3172        Ok((operator, output_columns))
3173    }
3174
3175    /// Converts a logical expression to a PropertySource.
3176    fn expression_to_property_source(
3177        &self,
3178        expr: &LogicalExpression,
3179        columns: &[String],
3180    ) -> Result<PropertySource> {
3181        match expr {
3182            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
3183            LogicalExpression::Variable(name) => {
3184                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
3185                    Error::Internal(format!("Variable '{}' not found for property source", name))
3186                })?;
3187                Ok(PropertySource::Column(col_idx))
3188            }
3189            LogicalExpression::Parameter(name) => {
3190                // Parameters should be resolved before planning
3191                // For now, treat as a placeholder
3192                Ok(PropertySource::Constant(
3193                    grafeo_common::types::Value::String(format!("${}", name).into()),
3194                ))
3195            }
3196            _ => {
3197                if let Some(value) = Self::try_fold_expression(expr) {
3198                    Ok(PropertySource::Constant(value))
3199                } else {
3200                    Err(Error::Internal(format!(
3201                        "Unsupported expression type for property source: {:?}",
3202                        expr
3203                    )))
3204                }
3205            }
3206        }
3207    }
3208
3209    /// Tries to evaluate a constant expression at plan time.
3210    ///
3211    /// Recursively folds literals, lists, and known function calls (like `vector()`)
3212    /// into concrete values. Returns `None` if the expression contains non-constant
3213    /// parts (variables, property accesses, etc.).
3214    fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
3215        match expr {
3216            LogicalExpression::Literal(v) => Some(v.clone()),
3217            LogicalExpression::List(items) => {
3218                let values: Option<Vec<Value>> =
3219                    items.iter().map(Self::try_fold_expression).collect();
3220                let values = values?;
3221                // All-numeric lists become vectors (matches Python list[float] behavior)
3222                let all_numeric = !values.is_empty()
3223                    && values
3224                        .iter()
3225                        .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
3226                if all_numeric {
3227                    let floats: Vec<f32> = values
3228                        .iter()
3229                        .filter_map(|v| match v {
3230                            Value::Float64(f) => Some(*f as f32),
3231                            Value::Int64(i) => Some(*i as f32),
3232                            _ => None,
3233                        })
3234                        .collect();
3235                    Some(Value::Vector(floats.into()))
3236                } else {
3237                    Some(Value::List(values.into()))
3238                }
3239            }
3240            LogicalExpression::FunctionCall { name, args, .. } => {
3241                match name.to_lowercase().as_str() {
3242                    "vector" => {
3243                        if args.len() != 1 {
3244                            return None;
3245                        }
3246                        let val = Self::try_fold_expression(&args[0])?;
3247                        match val {
3248                            Value::List(items) => {
3249                                let floats: Vec<f32> = items
3250                                    .iter()
3251                                    .filter_map(|v| match v {
3252                                        Value::Float64(f) => Some(*f as f32),
3253                                        Value::Int64(i) => Some(*i as f32),
3254                                        _ => None,
3255                                    })
3256                                    .collect();
3257                                if floats.len() == items.len() {
3258                                    Some(Value::Vector(floats.into()))
3259                                } else {
3260                                    None
3261                                }
3262                            }
3263                            // Already a vector (from all-numeric list folding)
3264                            Value::Vector(v) => Some(Value::Vector(v)),
3265                            _ => None,
3266                        }
3267                    }
3268                    _ => None,
3269                }
3270            }
3271            _ => None,
3272        }
3273    }
3274}
3275
3276/// Converts a logical binary operator to a filter binary operator.
3277pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3278    match op {
3279        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3280        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3281        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3282        BinaryOp::Le => Ok(BinaryFilterOp::Le),
3283        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3284        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3285        BinaryOp::And => Ok(BinaryFilterOp::And),
3286        BinaryOp::Or => Ok(BinaryFilterOp::Or),
3287        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3288        BinaryOp::Add => Ok(BinaryFilterOp::Add),
3289        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3290        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3291        BinaryOp::Div => Ok(BinaryFilterOp::Div),
3292        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3293        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3294        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3295        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3296        BinaryOp::In => Ok(BinaryFilterOp::In),
3297        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3298        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3299        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3300            "Binary operator {:?} not yet supported in filters",
3301            op
3302        ))),
3303    }
3304}
3305
3306/// Converts a logical unary operator to a filter unary operator.
3307pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3308    match op {
3309        UnaryOp::Not => Ok(UnaryFilterOp::Not),
3310        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3311        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3312        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3313    }
3314}
3315
3316/// Converts a logical aggregate function to a physical aggregate function.
3317pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3318    match func {
3319        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3320        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3321        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3322        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3323        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3324        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3325        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3326        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3327        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3328        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3329        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3330    }
3331}
3332
3333/// Converts a logical expression to a filter expression.
3334///
3335/// This is a standalone function that can be used by both LPG and RDF planners.
3336pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3337    match expr {
3338        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3339        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3340        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3341            variable: variable.clone(),
3342            property: property.clone(),
3343        }),
3344        LogicalExpression::Binary { left, op, right } => {
3345            let left_expr = convert_filter_expression(left)?;
3346            let right_expr = convert_filter_expression(right)?;
3347            let filter_op = convert_binary_op(*op)?;
3348            Ok(FilterExpression::Binary {
3349                left: Box::new(left_expr),
3350                op: filter_op,
3351                right: Box::new(right_expr),
3352            })
3353        }
3354        LogicalExpression::Unary { op, operand } => {
3355            let operand_expr = convert_filter_expression(operand)?;
3356            let filter_op = convert_unary_op(*op)?;
3357            Ok(FilterExpression::Unary {
3358                op: filter_op,
3359                operand: Box::new(operand_expr),
3360            })
3361        }
3362        LogicalExpression::FunctionCall { name, args, .. } => {
3363            let filter_args: Vec<FilterExpression> = args
3364                .iter()
3365                .map(convert_filter_expression)
3366                .collect::<Result<Vec<_>>>()?;
3367            Ok(FilterExpression::FunctionCall {
3368                name: name.clone(),
3369                args: filter_args,
3370            })
3371        }
3372        LogicalExpression::Case {
3373            operand,
3374            when_clauses,
3375            else_clause,
3376        } => {
3377            let filter_operand = operand
3378                .as_ref()
3379                .map(|e| convert_filter_expression(e))
3380                .transpose()?
3381                .map(Box::new);
3382            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3383                .iter()
3384                .map(|(cond, result)| {
3385                    Ok((
3386                        convert_filter_expression(cond)?,
3387                        convert_filter_expression(result)?,
3388                    ))
3389                })
3390                .collect::<Result<Vec<_>>>()?;
3391            let filter_else = else_clause
3392                .as_ref()
3393                .map(|e| convert_filter_expression(e))
3394                .transpose()?
3395                .map(Box::new);
3396            Ok(FilterExpression::Case {
3397                operand: filter_operand,
3398                when_clauses: filter_when_clauses,
3399                else_clause: filter_else,
3400            })
3401        }
3402        LogicalExpression::List(items) => {
3403            let filter_items: Vec<FilterExpression> = items
3404                .iter()
3405                .map(convert_filter_expression)
3406                .collect::<Result<Vec<_>>>()?;
3407            Ok(FilterExpression::List(filter_items))
3408        }
3409        LogicalExpression::Map(pairs) => {
3410            let filter_pairs: Vec<(String, FilterExpression)> = pairs
3411                .iter()
3412                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3413                .collect::<Result<Vec<_>>>()?;
3414            Ok(FilterExpression::Map(filter_pairs))
3415        }
3416        LogicalExpression::IndexAccess { base, index } => {
3417            let base_expr = convert_filter_expression(base)?;
3418            let index_expr = convert_filter_expression(index)?;
3419            Ok(FilterExpression::IndexAccess {
3420                base: Box::new(base_expr),
3421                index: Box::new(index_expr),
3422            })
3423        }
3424        LogicalExpression::SliceAccess { base, start, end } => {
3425            let base_expr = convert_filter_expression(base)?;
3426            let start_expr = start
3427                .as_ref()
3428                .map(|s| convert_filter_expression(s))
3429                .transpose()?
3430                .map(Box::new);
3431            let end_expr = end
3432                .as_ref()
3433                .map(|e| convert_filter_expression(e))
3434                .transpose()?
3435                .map(Box::new);
3436            Ok(FilterExpression::SliceAccess {
3437                base: Box::new(base_expr),
3438                start: start_expr,
3439                end: end_expr,
3440            })
3441        }
3442        LogicalExpression::Parameter(_) => Err(Error::Internal(
3443            "Parameters not yet supported in filters".to_string(),
3444        )),
3445        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3446        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3447        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3448        LogicalExpression::ListComprehension {
3449            variable,
3450            list_expr,
3451            filter_expr,
3452            map_expr,
3453        } => {
3454            let list = convert_filter_expression(list_expr)?;
3455            let filter = filter_expr
3456                .as_ref()
3457                .map(|f| convert_filter_expression(f))
3458                .transpose()?
3459                .map(Box::new);
3460            let map = convert_filter_expression(map_expr)?;
3461            Ok(FilterExpression::ListComprehension {
3462                variable: variable.clone(),
3463                list_expr: Box::new(list),
3464                filter_expr: filter,
3465                map_expr: Box::new(map),
3466            })
3467        }
3468        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3469            Error::Internal("Subqueries not yet supported in filters".to_string()),
3470        ),
3471    }
3472}
3473
3474/// Infers the logical type from a value.
3475fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3476    use grafeo_common::types::Value;
3477    match value {
3478        Value::Null => LogicalType::String, // Default type for null
3479        Value::Bool(_) => LogicalType::Bool,
3480        Value::Int64(_) => LogicalType::Int64,
3481        Value::Float64(_) => LogicalType::Float64,
3482        Value::String(_) => LogicalType::String,
3483        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
3484        Value::Timestamp(_) => LogicalType::Timestamp,
3485        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
3486        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
3487        Value::Vector(v) => LogicalType::Vector(v.len()),
3488    }
3489}
3490
3491/// Converts an expression to a string for column naming.
3492fn expression_to_string(expr: &LogicalExpression) -> String {
3493    match expr {
3494        LogicalExpression::Variable(name) => name.clone(),
3495        LogicalExpression::Property { variable, property } => {
3496            format!("{variable}.{property}")
3497        }
3498        LogicalExpression::Literal(value) => format!("{value:?}"),
3499        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3500        _ => "expr".to_string(),
3501    }
3502}
3503
3504/// A physical plan ready for execution.
3505pub struct PhysicalPlan {
3506    /// The root physical operator.
3507    pub operator: Box<dyn Operator>,
3508    /// Column names for the result.
3509    pub columns: Vec<String>,
3510    /// Adaptive execution context with cardinality estimates.
3511    ///
3512    /// When adaptive execution is enabled, this context contains estimated
3513    /// cardinalities at various checkpoints in the plan. During execution,
3514    /// actual row counts are recorded and compared against estimates.
3515    pub adaptive_context: Option<AdaptiveContext>,
3516}
3517
3518impl PhysicalPlan {
3519    /// Returns the column names.
3520    #[must_use]
3521    pub fn columns(&self) -> &[String] {
3522        &self.columns
3523    }
3524
3525    /// Consumes the plan and returns the operator.
3526    pub fn into_operator(self) -> Box<dyn Operator> {
3527        self.operator
3528    }
3529
3530    /// Returns the adaptive context, if adaptive execution is enabled.
3531    #[must_use]
3532    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3533        self.adaptive_context.as_ref()
3534    }
3535
3536    /// Takes ownership of the adaptive context.
3537    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3538        self.adaptive_context.take()
3539    }
3540}
3541
3542/// Helper operator that returns a single result chunk once.
3543///
3544/// Used by the factorized expand chain to wrap the final result.
3545#[allow(dead_code)]
3546struct SingleResultOperator {
3547    result: Option<grafeo_core::execution::DataChunk>,
3548}
3549
3550impl SingleResultOperator {
3551    #[allow(dead_code)]
3552    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3553        Self { result }
3554    }
3555}
3556
3557impl Operator for SingleResultOperator {
3558    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3559        Ok(self.result.take())
3560    }
3561
3562    fn reset(&mut self) {
3563        // Cannot reset - result is consumed
3564    }
3565
3566    fn name(&self) -> &'static str {
3567        "SingleResult"
3568    }
3569}
3570
3571/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
3572struct StaticResultOperator {
3573    rows: Vec<Vec<Value>>,
3574    column_indices: Vec<usize>,
3575    row_index: usize,
3576}
3577
3578impl Operator for StaticResultOperator {
3579    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3580        use grafeo_core::execution::DataChunk;
3581
3582        if self.row_index >= self.rows.len() {
3583            return Ok(None);
3584        }
3585
3586        let remaining = self.rows.len() - self.row_index;
3587        let chunk_rows = remaining.min(1024);
3588        let col_count = self.column_indices.len();
3589
3590        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
3591        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
3592
3593        for row_offset in 0..chunk_rows {
3594            let row = &self.rows[self.row_index + row_offset];
3595            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
3596                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
3597                if let Some(col) = chunk.column_mut(col_idx) {
3598                    col.push_value(value);
3599                }
3600            }
3601        }
3602        chunk.set_count(chunk_rows);
3603
3604        self.row_index += chunk_rows;
3605        Ok(Some(chunk))
3606    }
3607
3608    fn reset(&mut self) {
3609        self.row_index = 0;
3610    }
3611
3612    fn name(&self) -> &'static str {
3613        "StaticResult"
3614    }
3615}
3616
3617#[cfg(test)]
3618mod tests {
3619    use super::*;
3620    use crate::query::plan::{
3621        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3622        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3623        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3624        SortKey, SortOp,
3625    };
3626    use grafeo_common::types::Value;
3627
3628    fn create_test_store() -> Arc<LpgStore> {
3629        let store = Arc::new(LpgStore::new());
3630        store.create_node(&["Person"]);
3631        store.create_node(&["Person"]);
3632        store.create_node(&["Company"]);
3633        store
3634    }
3635
3636    // ==================== Simple Scan Tests ====================
3637
3638    #[test]
3639    fn test_plan_simple_scan() {
3640        let store = create_test_store();
3641        let planner = Planner::new(store);
3642
3643        // MATCH (n:Person) RETURN n
3644        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3645            items: vec![ReturnItem {
3646                expression: LogicalExpression::Variable("n".to_string()),
3647                alias: None,
3648            }],
3649            distinct: false,
3650            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3651                variable: "n".to_string(),
3652                label: Some("Person".to_string()),
3653                input: None,
3654            })),
3655        }));
3656
3657        let physical = planner.plan(&logical).unwrap();
3658        assert_eq!(physical.columns(), &["n"]);
3659    }
3660
3661    #[test]
3662    fn test_plan_scan_without_label() {
3663        let store = create_test_store();
3664        let planner = Planner::new(store);
3665
3666        // MATCH (n) RETURN n
3667        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3668            items: vec![ReturnItem {
3669                expression: LogicalExpression::Variable("n".to_string()),
3670                alias: None,
3671            }],
3672            distinct: false,
3673            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3674                variable: "n".to_string(),
3675                label: None,
3676                input: None,
3677            })),
3678        }));
3679
3680        let physical = planner.plan(&logical).unwrap();
3681        assert_eq!(physical.columns(), &["n"]);
3682    }
3683
3684    #[test]
3685    fn test_plan_return_with_alias() {
3686        let store = create_test_store();
3687        let planner = Planner::new(store);
3688
3689        // MATCH (n:Person) RETURN n AS person
3690        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3691            items: vec![ReturnItem {
3692                expression: LogicalExpression::Variable("n".to_string()),
3693                alias: Some("person".to_string()),
3694            }],
3695            distinct: false,
3696            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3697                variable: "n".to_string(),
3698                label: Some("Person".to_string()),
3699                input: None,
3700            })),
3701        }));
3702
3703        let physical = planner.plan(&logical).unwrap();
3704        assert_eq!(physical.columns(), &["person"]);
3705    }
3706
3707    #[test]
3708    fn test_plan_return_property() {
3709        let store = create_test_store();
3710        let planner = Planner::new(store);
3711
3712        // MATCH (n:Person) RETURN n.name
3713        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3714            items: vec![ReturnItem {
3715                expression: LogicalExpression::Property {
3716                    variable: "n".to_string(),
3717                    property: "name".to_string(),
3718                },
3719                alias: None,
3720            }],
3721            distinct: false,
3722            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3723                variable: "n".to_string(),
3724                label: Some("Person".to_string()),
3725                input: None,
3726            })),
3727        }));
3728
3729        let physical = planner.plan(&logical).unwrap();
3730        assert_eq!(physical.columns(), &["n.name"]);
3731    }
3732
3733    #[test]
3734    fn test_plan_return_literal() {
3735        let store = create_test_store();
3736        let planner = Planner::new(store);
3737
3738        // MATCH (n) RETURN 42 AS answer
3739        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3740            items: vec![ReturnItem {
3741                expression: LogicalExpression::Literal(Value::Int64(42)),
3742                alias: Some("answer".to_string()),
3743            }],
3744            distinct: false,
3745            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3746                variable: "n".to_string(),
3747                label: None,
3748                input: None,
3749            })),
3750        }));
3751
3752        let physical = planner.plan(&logical).unwrap();
3753        assert_eq!(physical.columns(), &["answer"]);
3754    }
3755
3756    // ==================== Filter Tests ====================
3757
3758    #[test]
3759    fn test_plan_filter_equality() {
3760        let store = create_test_store();
3761        let planner = Planner::new(store);
3762
3763        // MATCH (n:Person) WHERE n.age = 30 RETURN n
3764        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3765            items: vec![ReturnItem {
3766                expression: LogicalExpression::Variable("n".to_string()),
3767                alias: None,
3768            }],
3769            distinct: false,
3770            input: Box::new(LogicalOperator::Filter(FilterOp {
3771                predicate: LogicalExpression::Binary {
3772                    left: Box::new(LogicalExpression::Property {
3773                        variable: "n".to_string(),
3774                        property: "age".to_string(),
3775                    }),
3776                    op: BinaryOp::Eq,
3777                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3778                },
3779                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3780                    variable: "n".to_string(),
3781                    label: Some("Person".to_string()),
3782                    input: None,
3783                })),
3784            })),
3785        }));
3786
3787        let physical = planner.plan(&logical).unwrap();
3788        assert_eq!(physical.columns(), &["n"]);
3789    }
3790
3791    #[test]
3792    fn test_plan_filter_compound_and() {
3793        let store = create_test_store();
3794        let planner = Planner::new(store);
3795
3796        // WHERE n.age > 20 AND n.age < 40
3797        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3798            items: vec![ReturnItem {
3799                expression: LogicalExpression::Variable("n".to_string()),
3800                alias: None,
3801            }],
3802            distinct: false,
3803            input: Box::new(LogicalOperator::Filter(FilterOp {
3804                predicate: LogicalExpression::Binary {
3805                    left: Box::new(LogicalExpression::Binary {
3806                        left: Box::new(LogicalExpression::Property {
3807                            variable: "n".to_string(),
3808                            property: "age".to_string(),
3809                        }),
3810                        op: BinaryOp::Gt,
3811                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3812                    }),
3813                    op: BinaryOp::And,
3814                    right: Box::new(LogicalExpression::Binary {
3815                        left: Box::new(LogicalExpression::Property {
3816                            variable: "n".to_string(),
3817                            property: "age".to_string(),
3818                        }),
3819                        op: BinaryOp::Lt,
3820                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3821                    }),
3822                },
3823                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3824                    variable: "n".to_string(),
3825                    label: None,
3826                    input: None,
3827                })),
3828            })),
3829        }));
3830
3831        let physical = planner.plan(&logical).unwrap();
3832        assert_eq!(physical.columns(), &["n"]);
3833    }
3834
3835    #[test]
3836    fn test_plan_filter_unary_not() {
3837        let store = create_test_store();
3838        let planner = Planner::new(store);
3839
3840        // WHERE NOT n.active
3841        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3842            items: vec![ReturnItem {
3843                expression: LogicalExpression::Variable("n".to_string()),
3844                alias: None,
3845            }],
3846            distinct: false,
3847            input: Box::new(LogicalOperator::Filter(FilterOp {
3848                predicate: LogicalExpression::Unary {
3849                    op: UnaryOp::Not,
3850                    operand: Box::new(LogicalExpression::Property {
3851                        variable: "n".to_string(),
3852                        property: "active".to_string(),
3853                    }),
3854                },
3855                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3856                    variable: "n".to_string(),
3857                    label: None,
3858                    input: None,
3859                })),
3860            })),
3861        }));
3862
3863        let physical = planner.plan(&logical).unwrap();
3864        assert_eq!(physical.columns(), &["n"]);
3865    }
3866
3867    #[test]
3868    fn test_plan_filter_is_null() {
3869        let store = create_test_store();
3870        let planner = Planner::new(store);
3871
3872        // WHERE n.email IS NULL
3873        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3874            items: vec![ReturnItem {
3875                expression: LogicalExpression::Variable("n".to_string()),
3876                alias: None,
3877            }],
3878            distinct: false,
3879            input: Box::new(LogicalOperator::Filter(FilterOp {
3880                predicate: LogicalExpression::Unary {
3881                    op: UnaryOp::IsNull,
3882                    operand: Box::new(LogicalExpression::Property {
3883                        variable: "n".to_string(),
3884                        property: "email".to_string(),
3885                    }),
3886                },
3887                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3888                    variable: "n".to_string(),
3889                    label: None,
3890                    input: None,
3891                })),
3892            })),
3893        }));
3894
3895        let physical = planner.plan(&logical).unwrap();
3896        assert_eq!(physical.columns(), &["n"]);
3897    }
3898
3899    #[test]
3900    fn test_plan_filter_function_call() {
3901        let store = create_test_store();
3902        let planner = Planner::new(store);
3903
3904        // WHERE size(n.friends) > 0
3905        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3906            items: vec![ReturnItem {
3907                expression: LogicalExpression::Variable("n".to_string()),
3908                alias: None,
3909            }],
3910            distinct: false,
3911            input: Box::new(LogicalOperator::Filter(FilterOp {
3912                predicate: LogicalExpression::Binary {
3913                    left: Box::new(LogicalExpression::FunctionCall {
3914                        name: "size".to_string(),
3915                        args: vec![LogicalExpression::Property {
3916                            variable: "n".to_string(),
3917                            property: "friends".to_string(),
3918                        }],
3919                        distinct: false,
3920                    }),
3921                    op: BinaryOp::Gt,
3922                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3923                },
3924                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3925                    variable: "n".to_string(),
3926                    label: None,
3927                    input: None,
3928                })),
3929            })),
3930        }));
3931
3932        let physical = planner.plan(&logical).unwrap();
3933        assert_eq!(physical.columns(), &["n"]);
3934    }
3935
3936    // ==================== Expand Tests ====================
3937
3938    #[test]
3939    fn test_plan_expand_outgoing() {
3940        let store = create_test_store();
3941        let planner = Planner::new(store);
3942
3943        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
3944        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3945            items: vec![
3946                ReturnItem {
3947                    expression: LogicalExpression::Variable("a".to_string()),
3948                    alias: None,
3949                },
3950                ReturnItem {
3951                    expression: LogicalExpression::Variable("b".to_string()),
3952                    alias: None,
3953                },
3954            ],
3955            distinct: false,
3956            input: Box::new(LogicalOperator::Expand(ExpandOp {
3957                from_variable: "a".to_string(),
3958                to_variable: "b".to_string(),
3959                edge_variable: None,
3960                direction: ExpandDirection::Outgoing,
3961                edge_type: Some("KNOWS".to_string()),
3962                min_hops: 1,
3963                max_hops: Some(1),
3964                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3965                    variable: "a".to_string(),
3966                    label: Some("Person".to_string()),
3967                    input: None,
3968                })),
3969                path_alias: None,
3970            })),
3971        }));
3972
3973        let physical = planner.plan(&logical).unwrap();
3974        // The return should have columns [a, b]
3975        assert!(physical.columns().contains(&"a".to_string()));
3976        assert!(physical.columns().contains(&"b".to_string()));
3977    }
3978
3979    #[test]
3980    fn test_plan_expand_with_edge_variable() {
3981        let store = create_test_store();
3982        let planner = Planner::new(store);
3983
3984        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
3985        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3986            items: vec![
3987                ReturnItem {
3988                    expression: LogicalExpression::Variable("a".to_string()),
3989                    alias: None,
3990                },
3991                ReturnItem {
3992                    expression: LogicalExpression::Variable("r".to_string()),
3993                    alias: None,
3994                },
3995                ReturnItem {
3996                    expression: LogicalExpression::Variable("b".to_string()),
3997                    alias: None,
3998                },
3999            ],
4000            distinct: false,
4001            input: Box::new(LogicalOperator::Expand(ExpandOp {
4002                from_variable: "a".to_string(),
4003                to_variable: "b".to_string(),
4004                edge_variable: Some("r".to_string()),
4005                direction: ExpandDirection::Outgoing,
4006                edge_type: Some("KNOWS".to_string()),
4007                min_hops: 1,
4008                max_hops: Some(1),
4009                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4010                    variable: "a".to_string(),
4011                    label: None,
4012                    input: None,
4013                })),
4014                path_alias: None,
4015            })),
4016        }));
4017
4018        let physical = planner.plan(&logical).unwrap();
4019        assert!(physical.columns().contains(&"a".to_string()));
4020        assert!(physical.columns().contains(&"r".to_string()));
4021        assert!(physical.columns().contains(&"b".to_string()));
4022    }
4023
4024    // ==================== Limit/Skip/Sort Tests ====================
4025
4026    #[test]
4027    fn test_plan_limit() {
4028        let store = create_test_store();
4029        let planner = Planner::new(store);
4030
4031        // MATCH (n) RETURN n LIMIT 10
4032        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4033            items: vec![ReturnItem {
4034                expression: LogicalExpression::Variable("n".to_string()),
4035                alias: None,
4036            }],
4037            distinct: false,
4038            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4039                count: 10,
4040                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4041                    variable: "n".to_string(),
4042                    label: None,
4043                    input: None,
4044                })),
4045            })),
4046        }));
4047
4048        let physical = planner.plan(&logical).unwrap();
4049        assert_eq!(physical.columns(), &["n"]);
4050    }
4051
4052    #[test]
4053    fn test_plan_skip() {
4054        let store = create_test_store();
4055        let planner = Planner::new(store);
4056
4057        // MATCH (n) RETURN n SKIP 5
4058        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4059            items: vec![ReturnItem {
4060                expression: LogicalExpression::Variable("n".to_string()),
4061                alias: None,
4062            }],
4063            distinct: false,
4064            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4065                count: 5,
4066                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4067                    variable: "n".to_string(),
4068                    label: None,
4069                    input: None,
4070                })),
4071            })),
4072        }));
4073
4074        let physical = planner.plan(&logical).unwrap();
4075        assert_eq!(physical.columns(), &["n"]);
4076    }
4077
4078    #[test]
4079    fn test_plan_sort() {
4080        let store = create_test_store();
4081        let planner = Planner::new(store);
4082
4083        // MATCH (n) RETURN n ORDER BY n.name ASC
4084        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4085            items: vec![ReturnItem {
4086                expression: LogicalExpression::Variable("n".to_string()),
4087                alias: None,
4088            }],
4089            distinct: false,
4090            input: Box::new(LogicalOperator::Sort(SortOp {
4091                keys: vec![SortKey {
4092                    expression: LogicalExpression::Variable("n".to_string()),
4093                    order: SortOrder::Ascending,
4094                }],
4095                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4096                    variable: "n".to_string(),
4097                    label: None,
4098                    input: None,
4099                })),
4100            })),
4101        }));
4102
4103        let physical = planner.plan(&logical).unwrap();
4104        assert_eq!(physical.columns(), &["n"]);
4105    }
4106
4107    #[test]
4108    fn test_plan_sort_descending() {
4109        let store = create_test_store();
4110        let planner = Planner::new(store);
4111
4112        // ORDER BY n DESC
4113        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4114            items: vec![ReturnItem {
4115                expression: LogicalExpression::Variable("n".to_string()),
4116                alias: None,
4117            }],
4118            distinct: false,
4119            input: Box::new(LogicalOperator::Sort(SortOp {
4120                keys: vec![SortKey {
4121                    expression: LogicalExpression::Variable("n".to_string()),
4122                    order: SortOrder::Descending,
4123                }],
4124                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4125                    variable: "n".to_string(),
4126                    label: None,
4127                    input: None,
4128                })),
4129            })),
4130        }));
4131
4132        let physical = planner.plan(&logical).unwrap();
4133        assert_eq!(physical.columns(), &["n"]);
4134    }
4135
4136    #[test]
4137    fn test_plan_distinct() {
4138        let store = create_test_store();
4139        let planner = Planner::new(store);
4140
4141        // MATCH (n) RETURN DISTINCT n
4142        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4143            items: vec![ReturnItem {
4144                expression: LogicalExpression::Variable("n".to_string()),
4145                alias: None,
4146            }],
4147            distinct: false,
4148            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4149                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4150                    variable: "n".to_string(),
4151                    label: None,
4152                    input: None,
4153                })),
4154                columns: None,
4155            })),
4156        }));
4157
4158        let physical = planner.plan(&logical).unwrap();
4159        assert_eq!(physical.columns(), &["n"]);
4160    }
4161
4162    // ==================== Aggregate Tests ====================
4163
4164    #[test]
4165    fn test_plan_aggregate_count() {
4166        let store = create_test_store();
4167        let planner = Planner::new(store);
4168
4169        // MATCH (n) RETURN count(n)
4170        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4171            items: vec![ReturnItem {
4172                expression: LogicalExpression::Variable("cnt".to_string()),
4173                alias: None,
4174            }],
4175            distinct: false,
4176            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
4177                group_by: vec![],
4178                aggregates: vec![LogicalAggregateExpr {
4179                    function: LogicalAggregateFunction::Count,
4180                    expression: Some(LogicalExpression::Variable("n".to_string())),
4181                    distinct: false,
4182                    alias: Some("cnt".to_string()),
4183                    percentile: None,
4184                }],
4185                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4186                    variable: "n".to_string(),
4187                    label: None,
4188                    input: None,
4189                })),
4190                having: None,
4191            })),
4192        }));
4193
4194        let physical = planner.plan(&logical).unwrap();
4195        assert!(physical.columns().contains(&"cnt".to_string()));
4196    }
4197
4198    #[test]
4199    fn test_plan_aggregate_with_group_by() {
4200        let store = create_test_store();
4201        let planner = Planner::new(store);
4202
4203        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
4204        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4205            group_by: vec![LogicalExpression::Property {
4206                variable: "n".to_string(),
4207                property: "city".to_string(),
4208            }],
4209            aggregates: vec![LogicalAggregateExpr {
4210                function: LogicalAggregateFunction::Count,
4211                expression: Some(LogicalExpression::Variable("n".to_string())),
4212                distinct: false,
4213                alias: Some("cnt".to_string()),
4214                percentile: None,
4215            }],
4216            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4217                variable: "n".to_string(),
4218                label: Some("Person".to_string()),
4219                input: None,
4220            })),
4221            having: None,
4222        }));
4223
4224        let physical = planner.plan(&logical).unwrap();
4225        assert_eq!(physical.columns().len(), 2);
4226    }
4227
4228    #[test]
4229    fn test_plan_aggregate_sum() {
4230        let store = create_test_store();
4231        let planner = Planner::new(store);
4232
4233        // SUM(n.value)
4234        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4235            group_by: vec![],
4236            aggregates: vec![LogicalAggregateExpr {
4237                function: LogicalAggregateFunction::Sum,
4238                expression: Some(LogicalExpression::Property {
4239                    variable: "n".to_string(),
4240                    property: "value".to_string(),
4241                }),
4242                distinct: false,
4243                alias: Some("total".to_string()),
4244                percentile: None,
4245            }],
4246            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4247                variable: "n".to_string(),
4248                label: None,
4249                input: None,
4250            })),
4251            having: None,
4252        }));
4253
4254        let physical = planner.plan(&logical).unwrap();
4255        assert!(physical.columns().contains(&"total".to_string()));
4256    }
4257
4258    #[test]
4259    fn test_plan_aggregate_avg() {
4260        let store = create_test_store();
4261        let planner = Planner::new(store);
4262
4263        // AVG(n.score)
4264        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4265            group_by: vec![],
4266            aggregates: vec![LogicalAggregateExpr {
4267                function: LogicalAggregateFunction::Avg,
4268                expression: Some(LogicalExpression::Property {
4269                    variable: "n".to_string(),
4270                    property: "score".to_string(),
4271                }),
4272                distinct: false,
4273                alias: Some("average".to_string()),
4274                percentile: None,
4275            }],
4276            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4277                variable: "n".to_string(),
4278                label: None,
4279                input: None,
4280            })),
4281            having: None,
4282        }));
4283
4284        let physical = planner.plan(&logical).unwrap();
4285        assert!(physical.columns().contains(&"average".to_string()));
4286    }
4287
4288    #[test]
4289    fn test_plan_aggregate_min_max() {
4290        let store = create_test_store();
4291        let planner = Planner::new(store);
4292
4293        // MIN(n.age), MAX(n.age)
4294        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4295            group_by: vec![],
4296            aggregates: vec![
4297                LogicalAggregateExpr {
4298                    function: LogicalAggregateFunction::Min,
4299                    expression: Some(LogicalExpression::Property {
4300                        variable: "n".to_string(),
4301                        property: "age".to_string(),
4302                    }),
4303                    distinct: false,
4304                    alias: Some("youngest".to_string()),
4305                    percentile: None,
4306                },
4307                LogicalAggregateExpr {
4308                    function: LogicalAggregateFunction::Max,
4309                    expression: Some(LogicalExpression::Property {
4310                        variable: "n".to_string(),
4311                        property: "age".to_string(),
4312                    }),
4313                    distinct: false,
4314                    alias: Some("oldest".to_string()),
4315                    percentile: None,
4316                },
4317            ],
4318            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4319                variable: "n".to_string(),
4320                label: None,
4321                input: None,
4322            })),
4323            having: None,
4324        }));
4325
4326        let physical = planner.plan(&logical).unwrap();
4327        assert!(physical.columns().contains(&"youngest".to_string()));
4328        assert!(physical.columns().contains(&"oldest".to_string()));
4329    }
4330
4331    // ==================== Join Tests ====================
4332
4333    #[test]
4334    fn test_plan_inner_join() {
4335        let store = create_test_store();
4336        let planner = Planner::new(store);
4337
4338        // Inner join between two scans
4339        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4340            items: vec![
4341                ReturnItem {
4342                    expression: LogicalExpression::Variable("a".to_string()),
4343                    alias: None,
4344                },
4345                ReturnItem {
4346                    expression: LogicalExpression::Variable("b".to_string()),
4347                    alias: None,
4348                },
4349            ],
4350            distinct: false,
4351            input: Box::new(LogicalOperator::Join(JoinOp {
4352                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4353                    variable: "a".to_string(),
4354                    label: Some("Person".to_string()),
4355                    input: None,
4356                })),
4357                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4358                    variable: "b".to_string(),
4359                    label: Some("Company".to_string()),
4360                    input: None,
4361                })),
4362                join_type: JoinType::Inner,
4363                conditions: vec![JoinCondition {
4364                    left: LogicalExpression::Variable("a".to_string()),
4365                    right: LogicalExpression::Variable("b".to_string()),
4366                }],
4367            })),
4368        }));
4369
4370        let physical = planner.plan(&logical).unwrap();
4371        assert!(physical.columns().contains(&"a".to_string()));
4372        assert!(physical.columns().contains(&"b".to_string()));
4373    }
4374
4375    #[test]
4376    fn test_plan_cross_join() {
4377        let store = create_test_store();
4378        let planner = Planner::new(store);
4379
4380        // Cross join (no conditions)
4381        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4382            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4383                variable: "a".to_string(),
4384                label: None,
4385                input: None,
4386            })),
4387            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4388                variable: "b".to_string(),
4389                label: None,
4390                input: None,
4391            })),
4392            join_type: JoinType::Cross,
4393            conditions: vec![],
4394        }));
4395
4396        let physical = planner.plan(&logical).unwrap();
4397        assert_eq!(physical.columns().len(), 2);
4398    }
4399
4400    #[test]
4401    fn test_plan_left_join() {
4402        let store = create_test_store();
4403        let planner = Planner::new(store);
4404
4405        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4406            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4407                variable: "a".to_string(),
4408                label: None,
4409                input: None,
4410            })),
4411            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4412                variable: "b".to_string(),
4413                label: None,
4414                input: None,
4415            })),
4416            join_type: JoinType::Left,
4417            conditions: vec![],
4418        }));
4419
4420        let physical = planner.plan(&logical).unwrap();
4421        assert_eq!(physical.columns().len(), 2);
4422    }
4423
4424    // ==================== Mutation Tests ====================
4425
4426    #[test]
4427    fn test_plan_create_node() {
4428        let store = create_test_store();
4429        let planner = Planner::new(store);
4430
4431        // CREATE (n:Person {name: 'Alice'})
4432        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4433            variable: "n".to_string(),
4434            labels: vec!["Person".to_string()],
4435            properties: vec![(
4436                "name".to_string(),
4437                LogicalExpression::Literal(Value::String("Alice".into())),
4438            )],
4439            input: None,
4440        }));
4441
4442        let physical = planner.plan(&logical).unwrap();
4443        assert!(physical.columns().contains(&"n".to_string()));
4444    }
4445
4446    #[test]
4447    fn test_plan_create_edge() {
4448        let store = create_test_store();
4449        let planner = Planner::new(store);
4450
4451        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
4452        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4453            variable: Some("r".to_string()),
4454            from_variable: "a".to_string(),
4455            to_variable: "b".to_string(),
4456            edge_type: "KNOWS".to_string(),
4457            properties: vec![],
4458            input: Box::new(LogicalOperator::Join(JoinOp {
4459                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4460                    variable: "a".to_string(),
4461                    label: None,
4462                    input: None,
4463                })),
4464                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4465                    variable: "b".to_string(),
4466                    label: None,
4467                    input: None,
4468                })),
4469                join_type: JoinType::Cross,
4470                conditions: vec![],
4471            })),
4472        }));
4473
4474        let physical = planner.plan(&logical).unwrap();
4475        assert!(physical.columns().contains(&"r".to_string()));
4476    }
4477
4478    #[test]
4479    fn test_plan_delete_node() {
4480        let store = create_test_store();
4481        let planner = Planner::new(store);
4482
4483        // MATCH (n) DELETE n
4484        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4485            variable: "n".to_string(),
4486            detach: false,
4487            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4488                variable: "n".to_string(),
4489                label: None,
4490                input: None,
4491            })),
4492        }));
4493
4494        let physical = planner.plan(&logical).unwrap();
4495        assert!(physical.columns().contains(&"deleted_count".to_string()));
4496    }
4497
4498    // ==================== Error Cases ====================
4499
4500    #[test]
4501    fn test_plan_empty_errors() {
4502        let store = create_test_store();
4503        let planner = Planner::new(store);
4504
4505        let logical = LogicalPlan::new(LogicalOperator::Empty);
4506        let result = planner.plan(&logical);
4507        assert!(result.is_err());
4508    }
4509
4510    #[test]
4511    fn test_plan_missing_variable_in_return() {
4512        let store = create_test_store();
4513        let planner = Planner::new(store);
4514
4515        // Return variable that doesn't exist in input
4516        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4517            items: vec![ReturnItem {
4518                expression: LogicalExpression::Variable("missing".to_string()),
4519                alias: None,
4520            }],
4521            distinct: false,
4522            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4523                variable: "n".to_string(),
4524                label: None,
4525                input: None,
4526            })),
4527        }));
4528
4529        let result = planner.plan(&logical);
4530        assert!(result.is_err());
4531    }
4532
4533    // ==================== Helper Function Tests ====================
4534
4535    #[test]
4536    fn test_convert_binary_ops() {
4537        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4538        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4539        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4540        assert!(convert_binary_op(BinaryOp::Le).is_ok());
4541        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4542        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4543        assert!(convert_binary_op(BinaryOp::And).is_ok());
4544        assert!(convert_binary_op(BinaryOp::Or).is_ok());
4545        assert!(convert_binary_op(BinaryOp::Add).is_ok());
4546        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4547        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4548        assert!(convert_binary_op(BinaryOp::Div).is_ok());
4549    }
4550
4551    #[test]
4552    fn test_convert_unary_ops() {
4553        assert!(convert_unary_op(UnaryOp::Not).is_ok());
4554        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4555        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4556        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4557    }
4558
4559    #[test]
4560    fn test_convert_aggregate_functions() {
4561        assert!(matches!(
4562            convert_aggregate_function(LogicalAggregateFunction::Count),
4563            PhysicalAggregateFunction::Count
4564        ));
4565        assert!(matches!(
4566            convert_aggregate_function(LogicalAggregateFunction::Sum),
4567            PhysicalAggregateFunction::Sum
4568        ));
4569        assert!(matches!(
4570            convert_aggregate_function(LogicalAggregateFunction::Avg),
4571            PhysicalAggregateFunction::Avg
4572        ));
4573        assert!(matches!(
4574            convert_aggregate_function(LogicalAggregateFunction::Min),
4575            PhysicalAggregateFunction::Min
4576        ));
4577        assert!(matches!(
4578            convert_aggregate_function(LogicalAggregateFunction::Max),
4579            PhysicalAggregateFunction::Max
4580        ));
4581    }
4582
4583    #[test]
4584    fn test_planner_accessors() {
4585        let store = create_test_store();
4586        let planner = Planner::new(Arc::clone(&store));
4587
4588        assert!(planner.tx_id().is_none());
4589        assert!(planner.tx_manager().is_none());
4590        let _ = planner.viewing_epoch(); // Just ensure it's accessible
4591    }
4592
4593    #[test]
4594    fn test_physical_plan_accessors() {
4595        let store = create_test_store();
4596        let planner = Planner::new(store);
4597
4598        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4599            variable: "n".to_string(),
4600            label: None,
4601            input: None,
4602        }));
4603
4604        let physical = planner.plan(&logical).unwrap();
4605        assert_eq!(physical.columns(), &["n"]);
4606
4607        // Test into_operator
4608        let _ = physical.into_operator();
4609    }
4610
4611    // ==================== Adaptive Planning Tests ====================
4612
4613    #[test]
4614    fn test_plan_adaptive_with_scan() {
4615        let store = create_test_store();
4616        let planner = Planner::new(store);
4617
4618        // MATCH (n:Person) RETURN n
4619        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4620            items: vec![ReturnItem {
4621                expression: LogicalExpression::Variable("n".to_string()),
4622                alias: None,
4623            }],
4624            distinct: false,
4625            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4626                variable: "n".to_string(),
4627                label: Some("Person".to_string()),
4628                input: None,
4629            })),
4630        }));
4631
4632        let physical = planner.plan_adaptive(&logical).unwrap();
4633        assert_eq!(physical.columns(), &["n"]);
4634        // Should have adaptive context with estimates
4635        assert!(physical.adaptive_context.is_some());
4636    }
4637
4638    #[test]
4639    fn test_plan_adaptive_with_filter() {
4640        let store = create_test_store();
4641        let planner = Planner::new(store);
4642
4643        // MATCH (n) WHERE n.age > 30 RETURN n
4644        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4645            items: vec![ReturnItem {
4646                expression: LogicalExpression::Variable("n".to_string()),
4647                alias: None,
4648            }],
4649            distinct: false,
4650            input: Box::new(LogicalOperator::Filter(FilterOp {
4651                predicate: LogicalExpression::Binary {
4652                    left: Box::new(LogicalExpression::Property {
4653                        variable: "n".to_string(),
4654                        property: "age".to_string(),
4655                    }),
4656                    op: BinaryOp::Gt,
4657                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4658                },
4659                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4660                    variable: "n".to_string(),
4661                    label: None,
4662                    input: None,
4663                })),
4664            })),
4665        }));
4666
4667        let physical = planner.plan_adaptive(&logical).unwrap();
4668        assert!(physical.adaptive_context.is_some());
4669    }
4670
4671    #[test]
4672    fn test_plan_adaptive_with_expand() {
4673        let store = create_test_store();
4674        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4675
4676        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
4677        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4678            items: vec![
4679                ReturnItem {
4680                    expression: LogicalExpression::Variable("a".to_string()),
4681                    alias: None,
4682                },
4683                ReturnItem {
4684                    expression: LogicalExpression::Variable("b".to_string()),
4685                    alias: None,
4686                },
4687            ],
4688            distinct: false,
4689            input: Box::new(LogicalOperator::Expand(ExpandOp {
4690                from_variable: "a".to_string(),
4691                to_variable: "b".to_string(),
4692                edge_variable: None,
4693                direction: ExpandDirection::Outgoing,
4694                edge_type: Some("KNOWS".to_string()),
4695                min_hops: 1,
4696                max_hops: Some(1),
4697                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4698                    variable: "a".to_string(),
4699                    label: None,
4700                    input: None,
4701                })),
4702                path_alias: None,
4703            })),
4704        }));
4705
4706        let physical = planner.plan_adaptive(&logical).unwrap();
4707        assert!(physical.adaptive_context.is_some());
4708    }
4709
4710    #[test]
4711    fn test_plan_adaptive_with_join() {
4712        let store = create_test_store();
4713        let planner = Planner::new(store);
4714
4715        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4716            items: vec![
4717                ReturnItem {
4718                    expression: LogicalExpression::Variable("a".to_string()),
4719                    alias: None,
4720                },
4721                ReturnItem {
4722                    expression: LogicalExpression::Variable("b".to_string()),
4723                    alias: None,
4724                },
4725            ],
4726            distinct: false,
4727            input: Box::new(LogicalOperator::Join(JoinOp {
4728                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4729                    variable: "a".to_string(),
4730                    label: None,
4731                    input: None,
4732                })),
4733                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4734                    variable: "b".to_string(),
4735                    label: None,
4736                    input: None,
4737                })),
4738                join_type: JoinType::Cross,
4739                conditions: vec![],
4740            })),
4741        }));
4742
4743        let physical = planner.plan_adaptive(&logical).unwrap();
4744        assert!(physical.adaptive_context.is_some());
4745    }
4746
4747    #[test]
4748    fn test_plan_adaptive_with_aggregate() {
4749        let store = create_test_store();
4750        let planner = Planner::new(store);
4751
4752        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4753            group_by: vec![],
4754            aggregates: vec![LogicalAggregateExpr {
4755                function: LogicalAggregateFunction::Count,
4756                expression: Some(LogicalExpression::Variable("n".to_string())),
4757                distinct: false,
4758                alias: Some("cnt".to_string()),
4759                percentile: None,
4760            }],
4761            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4762                variable: "n".to_string(),
4763                label: None,
4764                input: None,
4765            })),
4766            having: None,
4767        }));
4768
4769        let physical = planner.plan_adaptive(&logical).unwrap();
4770        assert!(physical.adaptive_context.is_some());
4771    }
4772
4773    #[test]
4774    fn test_plan_adaptive_with_distinct() {
4775        let store = create_test_store();
4776        let planner = Planner::new(store);
4777
4778        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4779            items: vec![ReturnItem {
4780                expression: LogicalExpression::Variable("n".to_string()),
4781                alias: None,
4782            }],
4783            distinct: false,
4784            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4785                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4786                    variable: "n".to_string(),
4787                    label: None,
4788                    input: None,
4789                })),
4790                columns: None,
4791            })),
4792        }));
4793
4794        let physical = planner.plan_adaptive(&logical).unwrap();
4795        assert!(physical.adaptive_context.is_some());
4796    }
4797
4798    #[test]
4799    fn test_plan_adaptive_with_limit() {
4800        let store = create_test_store();
4801        let planner = Planner::new(store);
4802
4803        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4804            items: vec![ReturnItem {
4805                expression: LogicalExpression::Variable("n".to_string()),
4806                alias: None,
4807            }],
4808            distinct: false,
4809            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4810                count: 10,
4811                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4812                    variable: "n".to_string(),
4813                    label: None,
4814                    input: None,
4815                })),
4816            })),
4817        }));
4818
4819        let physical = planner.plan_adaptive(&logical).unwrap();
4820        assert!(physical.adaptive_context.is_some());
4821    }
4822
4823    #[test]
4824    fn test_plan_adaptive_with_skip() {
4825        let store = create_test_store();
4826        let planner = Planner::new(store);
4827
4828        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4829            items: vec![ReturnItem {
4830                expression: LogicalExpression::Variable("n".to_string()),
4831                alias: None,
4832            }],
4833            distinct: false,
4834            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4835                count: 5,
4836                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4837                    variable: "n".to_string(),
4838                    label: None,
4839                    input: None,
4840                })),
4841            })),
4842        }));
4843
4844        let physical = planner.plan_adaptive(&logical).unwrap();
4845        assert!(physical.adaptive_context.is_some());
4846    }
4847
4848    #[test]
4849    fn test_plan_adaptive_with_sort() {
4850        let store = create_test_store();
4851        let planner = Planner::new(store);
4852
4853        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4854            items: vec![ReturnItem {
4855                expression: LogicalExpression::Variable("n".to_string()),
4856                alias: None,
4857            }],
4858            distinct: false,
4859            input: Box::new(LogicalOperator::Sort(SortOp {
4860                keys: vec![SortKey {
4861                    expression: LogicalExpression::Variable("n".to_string()),
4862                    order: SortOrder::Ascending,
4863                }],
4864                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4865                    variable: "n".to_string(),
4866                    label: None,
4867                    input: None,
4868                })),
4869            })),
4870        }));
4871
4872        let physical = planner.plan_adaptive(&logical).unwrap();
4873        assert!(physical.adaptive_context.is_some());
4874    }
4875
4876    #[test]
4877    fn test_plan_adaptive_with_union() {
4878        let store = create_test_store();
4879        let planner = Planner::new(store);
4880
4881        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4882            items: vec![ReturnItem {
4883                expression: LogicalExpression::Variable("n".to_string()),
4884                alias: None,
4885            }],
4886            distinct: false,
4887            input: Box::new(LogicalOperator::Union(UnionOp {
4888                inputs: vec![
4889                    LogicalOperator::NodeScan(NodeScanOp {
4890                        variable: "n".to_string(),
4891                        label: Some("Person".to_string()),
4892                        input: None,
4893                    }),
4894                    LogicalOperator::NodeScan(NodeScanOp {
4895                        variable: "n".to_string(),
4896                        label: Some("Company".to_string()),
4897                        input: None,
4898                    }),
4899                ],
4900            })),
4901        }));
4902
4903        let physical = planner.plan_adaptive(&logical).unwrap();
4904        assert!(physical.adaptive_context.is_some());
4905    }
4906
4907    // ==================== Variable Length Path Tests ====================
4908
4909    #[test]
4910    fn test_plan_expand_variable_length() {
4911        let store = create_test_store();
4912        let planner = Planner::new(store);
4913
4914        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
4915        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4916            items: vec![
4917                ReturnItem {
4918                    expression: LogicalExpression::Variable("a".to_string()),
4919                    alias: None,
4920                },
4921                ReturnItem {
4922                    expression: LogicalExpression::Variable("b".to_string()),
4923                    alias: None,
4924                },
4925            ],
4926            distinct: false,
4927            input: Box::new(LogicalOperator::Expand(ExpandOp {
4928                from_variable: "a".to_string(),
4929                to_variable: "b".to_string(),
4930                edge_variable: None,
4931                direction: ExpandDirection::Outgoing,
4932                edge_type: Some("KNOWS".to_string()),
4933                min_hops: 1,
4934                max_hops: Some(3),
4935                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4936                    variable: "a".to_string(),
4937                    label: None,
4938                    input: None,
4939                })),
4940                path_alias: None,
4941            })),
4942        }));
4943
4944        let physical = planner.plan(&logical).unwrap();
4945        assert!(physical.columns().contains(&"a".to_string()));
4946        assert!(physical.columns().contains(&"b".to_string()));
4947    }
4948
4949    #[test]
4950    fn test_plan_expand_with_path_alias() {
4951        let store = create_test_store();
4952        let planner = Planner::new(store);
4953
4954        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
4955        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4956            items: vec![
4957                ReturnItem {
4958                    expression: LogicalExpression::Variable("a".to_string()),
4959                    alias: None,
4960                },
4961                ReturnItem {
4962                    expression: LogicalExpression::Variable("b".to_string()),
4963                    alias: None,
4964                },
4965            ],
4966            distinct: false,
4967            input: Box::new(LogicalOperator::Expand(ExpandOp {
4968                from_variable: "a".to_string(),
4969                to_variable: "b".to_string(),
4970                edge_variable: None,
4971                direction: ExpandDirection::Outgoing,
4972                edge_type: Some("KNOWS".to_string()),
4973                min_hops: 1,
4974                max_hops: Some(3),
4975                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4976                    variable: "a".to_string(),
4977                    label: None,
4978                    input: None,
4979                })),
4980                path_alias: Some("p".to_string()),
4981            })),
4982        }));
4983
4984        let physical = planner.plan(&logical).unwrap();
4985        // Verify plan was created successfully with expected output columns
4986        assert!(physical.columns().contains(&"a".to_string()));
4987        assert!(physical.columns().contains(&"b".to_string()));
4988    }
4989
4990    #[test]
4991    fn test_plan_expand_incoming() {
4992        let store = create_test_store();
4993        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4994
4995        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
4996        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4997            items: vec![
4998                ReturnItem {
4999                    expression: LogicalExpression::Variable("a".to_string()),
5000                    alias: None,
5001                },
5002                ReturnItem {
5003                    expression: LogicalExpression::Variable("b".to_string()),
5004                    alias: None,
5005                },
5006            ],
5007            distinct: false,
5008            input: Box::new(LogicalOperator::Expand(ExpandOp {
5009                from_variable: "a".to_string(),
5010                to_variable: "b".to_string(),
5011                edge_variable: None,
5012                direction: ExpandDirection::Incoming,
5013                edge_type: Some("KNOWS".to_string()),
5014                min_hops: 1,
5015                max_hops: Some(1),
5016                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5017                    variable: "a".to_string(),
5018                    label: None,
5019                    input: None,
5020                })),
5021                path_alias: None,
5022            })),
5023        }));
5024
5025        let physical = planner.plan(&logical).unwrap();
5026        assert!(physical.columns().contains(&"a".to_string()));
5027        assert!(physical.columns().contains(&"b".to_string()));
5028    }
5029
5030    #[test]
5031    fn test_plan_expand_both_directions() {
5032        let store = create_test_store();
5033        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
5034
5035        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
5036        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5037            items: vec![
5038                ReturnItem {
5039                    expression: LogicalExpression::Variable("a".to_string()),
5040                    alias: None,
5041                },
5042                ReturnItem {
5043                    expression: LogicalExpression::Variable("b".to_string()),
5044                    alias: None,
5045                },
5046            ],
5047            distinct: false,
5048            input: Box::new(LogicalOperator::Expand(ExpandOp {
5049                from_variable: "a".to_string(),
5050                to_variable: "b".to_string(),
5051                edge_variable: None,
5052                direction: ExpandDirection::Both,
5053                edge_type: Some("KNOWS".to_string()),
5054                min_hops: 1,
5055                max_hops: Some(1),
5056                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5057                    variable: "a".to_string(),
5058                    label: None,
5059                    input: None,
5060                })),
5061                path_alias: None,
5062            })),
5063        }));
5064
5065        let physical = planner.plan(&logical).unwrap();
5066        assert!(physical.columns().contains(&"a".to_string()));
5067        assert!(physical.columns().contains(&"b".to_string()));
5068    }
5069
5070    // ==================== With Context Tests ====================
5071
5072    #[test]
5073    fn test_planner_with_context() {
5074        use crate::transaction::TransactionManager;
5075
5076        let store = create_test_store();
5077        let tx_manager = Arc::new(TransactionManager::new());
5078        let tx_id = tx_manager.begin();
5079        let epoch = tx_manager.current_epoch();
5080
5081        let planner = Planner::with_context(
5082            Arc::clone(&store),
5083            Arc::clone(&tx_manager),
5084            Some(tx_id),
5085            epoch,
5086        );
5087
5088        assert_eq!(planner.tx_id(), Some(tx_id));
5089        assert!(planner.tx_manager().is_some());
5090        assert_eq!(planner.viewing_epoch(), epoch);
5091    }
5092
5093    #[test]
5094    fn test_planner_with_factorized_execution_disabled() {
5095        let store = create_test_store();
5096        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
5097
5098        // Two consecutive expands - should NOT use factorized execution
5099        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5100            items: vec![
5101                ReturnItem {
5102                    expression: LogicalExpression::Variable("a".to_string()),
5103                    alias: None,
5104                },
5105                ReturnItem {
5106                    expression: LogicalExpression::Variable("c".to_string()),
5107                    alias: None,
5108                },
5109            ],
5110            distinct: false,
5111            input: Box::new(LogicalOperator::Expand(ExpandOp {
5112                from_variable: "b".to_string(),
5113                to_variable: "c".to_string(),
5114                edge_variable: None,
5115                direction: ExpandDirection::Outgoing,
5116                edge_type: None,
5117                min_hops: 1,
5118                max_hops: Some(1),
5119                input: Box::new(LogicalOperator::Expand(ExpandOp {
5120                    from_variable: "a".to_string(),
5121                    to_variable: "b".to_string(),
5122                    edge_variable: None,
5123                    direction: ExpandDirection::Outgoing,
5124                    edge_type: None,
5125                    min_hops: 1,
5126                    max_hops: Some(1),
5127                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5128                        variable: "a".to_string(),
5129                        label: None,
5130                        input: None,
5131                    })),
5132                    path_alias: None,
5133                })),
5134                path_alias: None,
5135            })),
5136        }));
5137
5138        let physical = planner.plan(&logical).unwrap();
5139        assert!(physical.columns().contains(&"a".to_string()));
5140        assert!(physical.columns().contains(&"c".to_string()));
5141    }
5142
5143    // ==================== Sort with Property Tests ====================
5144
5145    #[test]
5146    fn test_plan_sort_by_property() {
5147        let store = create_test_store();
5148        let planner = Planner::new(store);
5149
5150        // MATCH (n) RETURN n ORDER BY n.name ASC
5151        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5152            items: vec![ReturnItem {
5153                expression: LogicalExpression::Variable("n".to_string()),
5154                alias: None,
5155            }],
5156            distinct: false,
5157            input: Box::new(LogicalOperator::Sort(SortOp {
5158                keys: vec![SortKey {
5159                    expression: LogicalExpression::Property {
5160                        variable: "n".to_string(),
5161                        property: "name".to_string(),
5162                    },
5163                    order: SortOrder::Ascending,
5164                }],
5165                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5166                    variable: "n".to_string(),
5167                    label: None,
5168                    input: None,
5169                })),
5170            })),
5171        }));
5172
5173        let physical = planner.plan(&logical).unwrap();
5174        // Should have the property column projected
5175        assert!(physical.columns().contains(&"n".to_string()));
5176    }
5177
5178    // ==================== Scan with Input Tests ====================
5179
5180    #[test]
5181    fn test_plan_scan_with_input() {
5182        let store = create_test_store();
5183        let planner = Planner::new(store);
5184
5185        // A scan with another scan as input (for chained patterns)
5186        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5187            items: vec![
5188                ReturnItem {
5189                    expression: LogicalExpression::Variable("a".to_string()),
5190                    alias: None,
5191                },
5192                ReturnItem {
5193                    expression: LogicalExpression::Variable("b".to_string()),
5194                    alias: None,
5195                },
5196            ],
5197            distinct: false,
5198            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5199                variable: "b".to_string(),
5200                label: Some("Company".to_string()),
5201                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
5202                    variable: "a".to_string(),
5203                    label: Some("Person".to_string()),
5204                    input: None,
5205                }))),
5206            })),
5207        }));
5208
5209        let physical = planner.plan(&logical).unwrap();
5210        assert!(physical.columns().contains(&"a".to_string()));
5211        assert!(physical.columns().contains(&"b".to_string()));
5212    }
5213}