Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29    UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37/// Range bounds for property-based range queries.
38struct RangeBounds<'a> {
39    min: Option<&'a Value>,
40    max: Option<&'a Value>,
41    min_inclusive: bool,
42    max_inclusive: bool,
43}
44
45/// Converts a logical plan to a physical operator tree.
46pub struct Planner {
47    /// The graph store to scan from.
48    store: Arc<LpgStore>,
49    /// Transaction manager for MVCC operations.
50    tx_manager: Option<Arc<TransactionManager>>,
51    /// Current transaction ID (if in a transaction).
52    tx_id: Option<TxId>,
53    /// Epoch to use for visibility checks.
54    viewing_epoch: EpochId,
55    /// Counter for generating unique anonymous edge column names.
56    anon_edge_counter: std::cell::Cell<u32>,
57    /// Whether to use factorized execution for multi-hop queries.
58    factorized_execution: bool,
59}
60
61impl Planner {
62    /// Creates a new planner with the given store.
63    ///
64    /// This creates a planner without transaction context, using the current
65    /// epoch from the store for visibility.
66    #[must_use]
67    pub fn new(store: Arc<LpgStore>) -> Self {
68        let epoch = store.current_epoch();
69        Self {
70            store,
71            tx_manager: None,
72            tx_id: None,
73            viewing_epoch: epoch,
74            anon_edge_counter: std::cell::Cell::new(0),
75            factorized_execution: true,
76        }
77    }
78
79    /// Creates a new planner with transaction context for MVCC-aware planning.
80    ///
81    /// # Arguments
82    ///
83    /// * `store` - The graph store
84    /// * `tx_manager` - Transaction manager for recording reads/writes
85    /// * `tx_id` - Current transaction ID (None for auto-commit)
86    /// * `viewing_epoch` - Epoch to use for version visibility
87    #[must_use]
88    pub fn with_context(
89        store: Arc<LpgStore>,
90        tx_manager: Arc<TransactionManager>,
91        tx_id: Option<TxId>,
92        viewing_epoch: EpochId,
93    ) -> Self {
94        Self {
95            store,
96            tx_manager: Some(tx_manager),
97            tx_id,
98            viewing_epoch,
99            anon_edge_counter: std::cell::Cell::new(0),
100            factorized_execution: true,
101        }
102    }
103
104    /// Returns the viewing epoch for this planner.
105    #[must_use]
106    pub fn viewing_epoch(&self) -> EpochId {
107        self.viewing_epoch
108    }
109
110    /// Returns the transaction ID for this planner, if any.
111    #[must_use]
112    pub fn tx_id(&self) -> Option<TxId> {
113        self.tx_id
114    }
115
116    /// Returns a reference to the transaction manager, if available.
117    #[must_use]
118    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119        self.tx_manager.as_ref()
120    }
121
122    /// Enables or disables factorized execution for multi-hop queries.
123    #[must_use]
124    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125        self.factorized_execution = enabled;
126        self
127    }
128
129    /// Counts consecutive single-hop expand operations.
130    ///
131    /// Returns the count and the deepest non-expand operator (the base of the chain).
132    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133        match op {
134            LogicalOperator::Expand(expand) => {
135                // Only count single-hop expands (factorization doesn't apply to variable-length)
136                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138                if is_single_hop {
139                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
140                    (inner_count + 1, base)
141                } else {
142                    // Variable-length path breaks the chain
143                    (0, op)
144                }
145            }
146            _ => (0, op),
147        }
148    }
149
150    /// Collects expand operations from the outermost down to the base.
151    ///
152    /// Returns expands in order from innermost (base) to outermost.
153    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154        let mut chain = Vec::new();
155        let mut current = op;
156
157        while let LogicalOperator::Expand(expand) = current {
158            // Only include single-hop expands
159            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160            if !is_single_hop {
161                break;
162            }
163            chain.push(expand);
164            current = &expand.input;
165        }
166
167        // Reverse so we go from base to outer
168        chain.reverse();
169        chain
170    }
171
172    /// Plans a logical plan into a physical operator.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if planning fails.
177    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179        Ok(PhysicalPlan {
180            operator,
181            columns,
182            adaptive_context: None,
183        })
184    }
185
186    /// Plans a logical plan with adaptive execution support.
187    ///
188    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
189    /// joins) that can be monitored during execution to detect estimate errors.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if planning fails.
194    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197        // Build adaptive context with cardinality estimates
198        let mut adaptive_context = AdaptiveContext::new();
199        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201        Ok(PhysicalPlan {
202            operator,
203            columns,
204            adaptive_context: Some(adaptive_context),
205        })
206    }
207
208    /// Collects cardinality estimates from the logical plan into an adaptive context.
209    fn collect_cardinality_estimates(
210        &self,
211        op: &LogicalOperator,
212        ctx: &mut AdaptiveContext,
213        depth: usize,
214    ) {
215        match op {
216            LogicalOperator::NodeScan(scan) => {
217                // Estimate based on label statistics
218                let estimate = if let Some(label) = &scan.label {
219                    self.store.nodes_by_label(label).len() as f64
220                } else {
221                    self.store.node_count() as f64
222                };
223                let id = format!("scan_{}", scan.variable);
224                ctx.set_estimate(&id, estimate);
225
226                // Recurse into input if present
227                if let Some(input) = &scan.input {
228                    self.collect_cardinality_estimates(input, ctx, depth + 1);
229                }
230            }
231            LogicalOperator::Filter(filter) => {
232                // Default selectivity estimate for filters (30%)
233                let input_estimate = self.estimate_cardinality(&filter.input);
234                let estimate = input_estimate * 0.3;
235                let id = format!("filter_{depth}");
236                ctx.set_estimate(&id, estimate);
237
238                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239            }
240            LogicalOperator::Expand(expand) => {
241                // Estimate based on average degree from store statistics
242                let input_estimate = self.estimate_cardinality(&expand.input);
243                let stats = self.store.statistics();
244                let avg_degree = self.estimate_expand_degree(&stats, expand);
245                let estimate = input_estimate * avg_degree;
246                let id = format!("expand_{}", expand.to_variable);
247                ctx.set_estimate(&id, estimate);
248
249                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250            }
251            LogicalOperator::Join(join) => {
252                // Estimate join output (product with selectivity)
253                let left_est = self.estimate_cardinality(&join.left);
254                let right_est = self.estimate_cardinality(&join.right);
255                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
256                let id = format!("join_{depth}");
257                ctx.set_estimate(&id, estimate);
258
259                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261            }
262            LogicalOperator::Aggregate(agg) => {
263                // Aggregates typically reduce cardinality
264                let input_estimate = self.estimate_cardinality(&agg.input);
265                let estimate = if agg.group_by.is_empty() {
266                    1.0 // Scalar aggregate
267                } else {
268                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
269                };
270                let id = format!("aggregate_{depth}");
271                ctx.set_estimate(&id, estimate);
272
273                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274            }
275            LogicalOperator::Distinct(distinct) => {
276                let input_estimate = self.estimate_cardinality(&distinct.input);
277                let estimate = (input_estimate * 0.5).max(1.0);
278                let id = format!("distinct_{depth}");
279                ctx.set_estimate(&id, estimate);
280
281                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282            }
283            LogicalOperator::Return(ret) => {
284                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285            }
286            LogicalOperator::Limit(limit) => {
287                let input_estimate = self.estimate_cardinality(&limit.input);
288                let estimate = (input_estimate).min(limit.count as f64);
289                let id = format!("limit_{depth}");
290                ctx.set_estimate(&id, estimate);
291
292                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293            }
294            LogicalOperator::Skip(skip) => {
295                let input_estimate = self.estimate_cardinality(&skip.input);
296                let estimate = (input_estimate - skip.count as f64).max(0.0);
297                let id = format!("skip_{depth}");
298                ctx.set_estimate(&id, estimate);
299
300                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301            }
302            LogicalOperator::Sort(sort) => {
303                // Sort doesn't change cardinality
304                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305            }
306            LogicalOperator::Union(union) => {
307                let estimate: f64 = union
308                    .inputs
309                    .iter()
310                    .map(|input| self.estimate_cardinality(input))
311                    .sum();
312                let id = format!("union_{depth}");
313                ctx.set_estimate(&id, estimate);
314
315                for input in &union.inputs {
316                    self.collect_cardinality_estimates(input, ctx, depth + 1);
317                }
318            }
319            _ => {
320                // For other operators, try to recurse into known input patterns
321            }
322        }
323    }
324
325    /// Estimates cardinality for a logical operator subtree.
326    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327        match op {
328            LogicalOperator::NodeScan(scan) => {
329                if let Some(label) = &scan.label {
330                    self.store.nodes_by_label(label).len() as f64
331                } else {
332                    self.store.node_count() as f64
333                }
334            }
335            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336            LogicalOperator::Expand(expand) => {
337                let stats = self.store.statistics();
338                let avg_degree = self.estimate_expand_degree(&stats, expand);
339                self.estimate_cardinality(&expand.input) * avg_degree
340            }
341            LogicalOperator::Join(join) => {
342                let left = self.estimate_cardinality(&join.left);
343                let right = self.estimate_cardinality(&join.right);
344                (left * right).sqrt()
345            }
346            LogicalOperator::Aggregate(agg) => {
347                if agg.group_by.is_empty() {
348                    1.0
349                } else {
350                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351                }
352            }
353            LogicalOperator::Distinct(distinct) => {
354                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355            }
356            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357            LogicalOperator::Limit(limit) => self
358                .estimate_cardinality(&limit.input)
359                .min(limit.count as f64),
360            LogicalOperator::Skip(skip) => {
361                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362            }
363            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364            LogicalOperator::Union(union) => union
365                .inputs
366                .iter()
367                .map(|input| self.estimate_cardinality(input))
368                .sum(),
369            _ => 1000.0, // Default estimate for unknown operators
370        }
371    }
372
373    /// Estimates the average edge degree for an expand operation using store statistics.
374    fn estimate_expand_degree(
375        &self,
376        stats: &grafeo_core::statistics::Statistics,
377        expand: &ExpandOp,
378    ) -> f64 {
379        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380        if let Some(edge_type) = &expand.edge_type {
381            stats.estimate_avg_degree(edge_type, outgoing)
382        } else if stats.total_nodes > 0 {
383            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384        } else {
385            10.0 // fallback for empty graph
386        }
387    }
388
389    /// Plans a single logical operator.
390    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391        match op {
392            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393            LogicalOperator::Expand(expand) => {
394                // Check for expand chains when factorized execution is enabled
395                if self.factorized_execution {
396                    let (chain_len, _base) = Self::count_expand_chain(op);
397                    if chain_len >= 2 {
398                        // Use factorized chain for 2+ consecutive single-hop expands
399                        return self.plan_expand_chain(op);
400                    }
401                }
402                self.plan_expand(expand)
403            }
404            LogicalOperator::Return(ret) => self.plan_return(ret),
405            LogicalOperator::Filter(filter) => self.plan_filter(filter),
406            LogicalOperator::Project(project) => self.plan_project(project),
407            LogicalOperator::Limit(limit) => self.plan_limit(limit),
408            LogicalOperator::Skip(skip) => self.plan_skip(skip),
409            LogicalOperator::Sort(sort) => self.plan_sort(sort),
410            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411            LogicalOperator::Join(join) => self.plan_join(join),
412            LogicalOperator::Union(union) => self.plan_union(union),
413            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421            LogicalOperator::Merge(merge) => self.plan_merge(merge),
422            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
427            LogicalOperator::VectorScan(_) => Err(Error::Internal(
428                "VectorScan requires vector-index feature".to_string(),
429            )),
430            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
431                "VectorJoin requires vector-index feature".to_string(),
432            )),
433            _ => Err(Error::Internal(format!(
434                "Unsupported operator: {:?}",
435                std::mem::discriminant(op)
436            ))),
437        }
438    }
439
440    /// Plans a node scan operator.
441    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
442        let scan_op = if let Some(label) = &scan.label {
443            ScanOperator::with_label(Arc::clone(&self.store), label)
444        } else {
445            ScanOperator::new(Arc::clone(&self.store))
446        };
447
448        // Apply MVCC context if available
449        let scan_operator: Box<dyn Operator> =
450            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
451
452        // If there's an input, chain operators with a nested loop join (cross join)
453        if let Some(input) = &scan.input {
454            let (input_op, mut input_columns) = self.plan_operator(input)?;
455
456            // Build output schema: input columns + scan column
457            let mut output_schema: Vec<LogicalType> =
458                input_columns.iter().map(|_| LogicalType::Any).collect();
459            output_schema.push(LogicalType::Node);
460
461            // Add scan column to input columns
462            input_columns.push(scan.variable.clone());
463
464            // Use nested loop join to combine input rows with scanned nodes
465            let join_op = Box::new(NestedLoopJoinOperator::new(
466                input_op,
467                scan_operator,
468                None, // No join condition (cross join)
469                PhysicalJoinType::Cross,
470                output_schema,
471            ));
472
473            Ok((join_op, input_columns))
474        } else {
475            let columns = vec![scan.variable.clone()];
476            Ok((scan_operator, columns))
477        }
478    }
479
480    /// Plans an expand operator.
481    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
482        // Plan the input operator first
483        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
484
485        // Find the source column index
486        let source_column = input_columns
487            .iter()
488            .position(|c| c == &expand.from_variable)
489            .ok_or_else(|| {
490                Error::Internal(format!(
491                    "Source variable '{}' not found in input columns",
492                    expand.from_variable
493                ))
494            })?;
495
496        // Convert expand direction
497        let direction = match expand.direction {
498            ExpandDirection::Outgoing => Direction::Outgoing,
499            ExpandDirection::Incoming => Direction::Incoming,
500            ExpandDirection::Both => Direction::Both,
501        };
502
503        // Check if this is a variable-length path
504        let is_variable_length =
505            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
506
507        let operator: Box<dyn Operator> = if is_variable_length {
508            // Use VariableLengthExpandOperator for multi-hop paths
509            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
510            let mut expand_op = VariableLengthExpandOperator::new(
511                Arc::clone(&self.store),
512                input_op,
513                source_column,
514                direction,
515                expand.edge_type.clone(),
516                expand.min_hops,
517                max_hops,
518            )
519            .with_tx_context(self.viewing_epoch, self.tx_id);
520
521            // If a path alias is set, enable path length output
522            if expand.path_alias.is_some() {
523                expand_op = expand_op.with_path_length_output();
524            }
525
526            Box::new(expand_op)
527        } else {
528            // Use simple ExpandOperator for single-hop paths
529            let expand_op = ExpandOperator::new(
530                Arc::clone(&self.store),
531                input_op,
532                source_column,
533                direction,
534                expand.edge_type.clone(),
535            )
536            .with_tx_context(self.viewing_epoch, self.tx_id);
537            Box::new(expand_op)
538        };
539
540        // Build output columns: [input_columns..., edge, target, (path_length)?]
541        // Preserve all input columns and add edge + target to match ExpandOperator output
542        let mut columns = input_columns;
543
544        // Generate edge column name - use provided name or generate anonymous name
545        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
546            let count = self.anon_edge_counter.get();
547            self.anon_edge_counter.set(count + 1);
548            format!("_anon_edge_{}", count)
549        });
550        columns.push(edge_col_name);
551
552        columns.push(expand.to_variable.clone());
553
554        // If a path alias is set, add a column for the path length
555        if let Some(ref path_alias) = expand.path_alias {
556            columns.push(format!("_path_length_{}", path_alias));
557        }
558
559        Ok((operator, columns))
560    }
561
562    /// Plans a chain of consecutive expand operations using factorized execution.
563    ///
564    /// This avoids the Cartesian product explosion that occurs with separate expands.
565    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
566    ///
567    /// The chain is executed lazily at query time, not during planning. This ensures
568    /// that any filters applied above the expand chain are properly respected.
569    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
570        let expands = Self::collect_expand_chain(op);
571        if expands.is_empty() {
572            return Err(Error::Internal("Empty expand chain".to_string()));
573        }
574
575        // Get the base operator (before first expand)
576        let first_expand = expands[0];
577        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
578
579        let mut columns = base_columns.clone();
580        let mut steps = Vec::new();
581
582        // Track the level-local source column for each expand
583        // For the first expand, it's the column in the input (base_columns)
584        // For subsequent expands, the target from the previous level is always at index 1
585        // (each level adds [edge, target], so target is at index 1)
586        let mut is_first = true;
587
588        for expand in &expands {
589            // Find source column for this expand
590            let source_column = if is_first {
591                // For first expand, find in base columns
592                base_columns
593                    .iter()
594                    .position(|c| c == &expand.from_variable)
595                    .ok_or_else(|| {
596                        Error::Internal(format!(
597                            "Source variable '{}' not found in base columns",
598                            expand.from_variable
599                        ))
600                    })?
601            } else {
602                // For subsequent expands, the target from the previous level is at index 1
603                // (each level adds [edge, target], so target is the second column)
604                1
605            };
606
607            // Convert direction
608            let direction = match expand.direction {
609                ExpandDirection::Outgoing => Direction::Outgoing,
610                ExpandDirection::Incoming => Direction::Incoming,
611                ExpandDirection::Both => Direction::Both,
612            };
613
614            // Add expand step configuration
615            steps.push(ExpandStep {
616                source_column,
617                direction,
618                edge_type: expand.edge_type.clone(),
619            });
620
621            // Add edge and target columns
622            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
623                let count = self.anon_edge_counter.get();
624                self.anon_edge_counter.set(count + 1);
625                format!("_anon_edge_{}", count)
626            });
627            columns.push(edge_col_name);
628            columns.push(expand.to_variable.clone());
629
630            is_first = false;
631        }
632
633        // Create lazy operator that executes at query time, not planning time
634        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
635
636        if let Some(tx_id) = self.tx_id {
637            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
638        } else {
639            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
640        }
641
642        Ok((Box::new(lazy_op), columns))
643    }
644
645    /// Plans a RETURN clause.
646    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
647        // Plan the input operator
648        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
649
650        // Build variable to column index mapping
651        let variable_columns: HashMap<String, usize> = input_columns
652            .iter()
653            .enumerate()
654            .map(|(i, name)| (name.clone(), i))
655            .collect();
656
657        // Extract column names from return items
658        let columns: Vec<String> = ret
659            .items
660            .iter()
661            .map(|item| {
662                item.alias.clone().unwrap_or_else(|| {
663                    // Generate a default name from the expression
664                    expression_to_string(&item.expression)
665                })
666            })
667            .collect();
668
669        // Check if we need a project operator (for property access or expression evaluation)
670        let needs_project = ret
671            .items
672            .iter()
673            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
674
675        if needs_project {
676            // Build project expressions
677            let mut projections = Vec::with_capacity(ret.items.len());
678            let mut output_types = Vec::with_capacity(ret.items.len());
679
680            for item in &ret.items {
681                match &item.expression {
682                    LogicalExpression::Variable(name) => {
683                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
684                            Error::Internal(format!("Variable '{}' not found in input", name))
685                        })?;
686                        projections.push(ProjectExpr::Column(col_idx));
687                        // Use Node type for variables (they could be nodes, edges, or values)
688                        output_types.push(LogicalType::Node);
689                    }
690                    LogicalExpression::Property { variable, property } => {
691                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
692                            Error::Internal(format!("Variable '{}' not found in input", variable))
693                        })?;
694                        projections.push(ProjectExpr::PropertyAccess {
695                            column: col_idx,
696                            property: property.clone(),
697                        });
698                        // Property could be any type - use Any/Generic to preserve type
699                        output_types.push(LogicalType::Any);
700                    }
701                    LogicalExpression::Literal(value) => {
702                        projections.push(ProjectExpr::Constant(value.clone()));
703                        output_types.push(value_to_logical_type(value));
704                    }
705                    LogicalExpression::FunctionCall { name, args, .. } => {
706                        // Handle built-in functions
707                        match name.to_lowercase().as_str() {
708                            "type" => {
709                                // type(r) returns the edge type string
710                                if args.len() != 1 {
711                                    return Err(Error::Internal(
712                                        "type() requires exactly one argument".to_string(),
713                                    ));
714                                }
715                                if let LogicalExpression::Variable(var_name) = &args[0] {
716                                    let col_idx =
717                                        *variable_columns.get(var_name).ok_or_else(|| {
718                                            Error::Internal(format!(
719                                                "Variable '{}' not found in input",
720                                                var_name
721                                            ))
722                                        })?;
723                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
724                                    output_types.push(LogicalType::String);
725                                } else {
726                                    return Err(Error::Internal(
727                                        "type() argument must be a variable".to_string(),
728                                    ));
729                                }
730                            }
731                            "length" => {
732                                // length(p) returns the path length
733                                // For shortestPath results, the path column already contains the length
734                                if args.len() != 1 {
735                                    return Err(Error::Internal(
736                                        "length() requires exactly one argument".to_string(),
737                                    ));
738                                }
739                                if let LogicalExpression::Variable(var_name) = &args[0] {
740                                    let col_idx =
741                                        *variable_columns.get(var_name).ok_or_else(|| {
742                                            Error::Internal(format!(
743                                                "Variable '{}' not found in input",
744                                                var_name
745                                            ))
746                                        })?;
747                                    // Pass through the column value directly
748                                    projections.push(ProjectExpr::Column(col_idx));
749                                    output_types.push(LogicalType::Int64);
750                                } else {
751                                    return Err(Error::Internal(
752                                        "length() argument must be a variable".to_string(),
753                                    ));
754                                }
755                            }
756                            // For other functions (head, tail, size, etc.), use expression evaluation
757                            _ => {
758                                let filter_expr = self.convert_expression(&item.expression)?;
759                                projections.push(ProjectExpr::Expression {
760                                    expr: filter_expr,
761                                    variable_columns: variable_columns.clone(),
762                                });
763                                output_types.push(LogicalType::Any);
764                            }
765                        }
766                    }
767                    LogicalExpression::Case { .. } => {
768                        // Convert CASE expression to FilterExpression for evaluation
769                        let filter_expr = self.convert_expression(&item.expression)?;
770                        projections.push(ProjectExpr::Expression {
771                            expr: filter_expr,
772                            variable_columns: variable_columns.clone(),
773                        });
774                        // CASE can return any type - use Any
775                        output_types.push(LogicalType::Any);
776                    }
777                    _ => {
778                        return Err(Error::Internal(format!(
779                            "Unsupported RETURN expression: {:?}",
780                            item.expression
781                        )));
782                    }
783                }
784            }
785
786            let operator = Box::new(ProjectOperator::with_store(
787                input_op,
788                projections,
789                output_types,
790                Arc::clone(&self.store),
791            ));
792
793            Ok((operator, columns))
794        } else {
795            // Simple case: just return variables
796            // Re-order columns to match return items if needed
797            let mut projections = Vec::with_capacity(ret.items.len());
798            let mut output_types = Vec::with_capacity(ret.items.len());
799
800            for item in &ret.items {
801                if let LogicalExpression::Variable(name) = &item.expression {
802                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
803                        Error::Internal(format!("Variable '{}' not found in input", name))
804                    })?;
805                    projections.push(ProjectExpr::Column(col_idx));
806                    output_types.push(LogicalType::Node);
807                }
808            }
809
810            // Only add ProjectOperator if reordering is needed
811            if projections.len() == input_columns.len()
812                && projections
813                    .iter()
814                    .enumerate()
815                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
816            {
817                // No reordering needed
818                Ok((input_op, columns))
819            } else {
820                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
821                Ok((operator, columns))
822            }
823        }
824    }
825
826    /// Plans a project operator (for WITH clause).
827    fn plan_project(
828        &self,
829        project: &crate::query::plan::ProjectOp,
830    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
831        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
832        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
833            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
834                // Create a single-row operator for projecting literals
835                let single_row_op: Box<dyn Operator> = Box::new(
836                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
837                );
838                (single_row_op, Vec::new())
839            } else {
840                self.plan_operator(&project.input)?
841            };
842
843        // Build variable to column index mapping
844        let variable_columns: HashMap<String, usize> = input_columns
845            .iter()
846            .enumerate()
847            .map(|(i, name)| (name.clone(), i))
848            .collect();
849
850        // Build projections and new column names
851        let mut projections = Vec::with_capacity(project.projections.len());
852        let mut output_types = Vec::with_capacity(project.projections.len());
853        let mut output_columns = Vec::with_capacity(project.projections.len());
854
855        for projection in &project.projections {
856            // Determine the output column name (alias or expression string)
857            let col_name = projection
858                .alias
859                .clone()
860                .unwrap_or_else(|| expression_to_string(&projection.expression));
861            output_columns.push(col_name);
862
863            match &projection.expression {
864                LogicalExpression::Variable(name) => {
865                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
866                        Error::Internal(format!("Variable '{}' not found in input", name))
867                    })?;
868                    projections.push(ProjectExpr::Column(col_idx));
869                    output_types.push(LogicalType::Node);
870                }
871                LogicalExpression::Property { variable, property } => {
872                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
873                        Error::Internal(format!("Variable '{}' not found in input", variable))
874                    })?;
875                    projections.push(ProjectExpr::PropertyAccess {
876                        column: col_idx,
877                        property: property.clone(),
878                    });
879                    output_types.push(LogicalType::Any);
880                }
881                LogicalExpression::Literal(value) => {
882                    projections.push(ProjectExpr::Constant(value.clone()));
883                    output_types.push(value_to_logical_type(value));
884                }
885                _ => {
886                    // For complex expressions, use full expression evaluation
887                    let filter_expr = self.convert_expression(&projection.expression)?;
888                    projections.push(ProjectExpr::Expression {
889                        expr: filter_expr,
890                        variable_columns: variable_columns.clone(),
891                    });
892                    output_types.push(LogicalType::Any);
893                }
894            }
895        }
896
897        let operator = Box::new(ProjectOperator::with_store(
898            input_op,
899            projections,
900            output_types,
901            Arc::clone(&self.store),
902        ));
903
904        Ok((operator, output_columns))
905    }
906
907    /// Plans a filter operator.
908    ///
909    /// Uses zone map pre-filtering to potentially skip scans when predicates
910    /// definitely won't match any data. Also uses property indexes when available
911    /// for O(1) lookups instead of full scans.
912    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
913        // Check zone maps for simple property predicates before scanning
914        // If zone map says "definitely no matches", we can short-circuit
915        if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
916            // Zone map says no matches possible - return empty result
917            let (_, columns) = self.plan_operator(&filter.input)?;
918            let schema = self.derive_schema_from_columns(&columns);
919            let empty_op = Box::new(EmptyOperator::new(schema));
920            return Ok((empty_op, columns));
921        }
922
923        // Try to use property index for equality predicates on indexed properties
924        if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
925            return Ok(result);
926        }
927
928        // Try to use range optimization for range predicates (>, <, >=, <=)
929        if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
930            return Ok(result);
931        }
932
933        // Plan the input operator first
934        let (input_op, columns) = self.plan_operator(&filter.input)?;
935
936        // Build variable to column index mapping
937        let variable_columns: HashMap<String, usize> = columns
938            .iter()
939            .enumerate()
940            .map(|(i, name)| (name.clone(), i))
941            .collect();
942
943        // Convert logical expression to filter expression
944        let filter_expr = self.convert_expression(&filter.predicate)?;
945
946        // Create the predicate
947        let predicate =
948            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
949
950        // Create the filter operator
951        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
952
953        Ok((operator, columns))
954    }
955
956    /// Checks zone maps for a predicate to see if we can skip the scan entirely.
957    ///
958    /// Returns:
959    /// - `Some(false)` if zone map proves no matches possible (can skip)
960    /// - `Some(true)` if zone map says matches might exist
961    /// - `None` if zone map check not applicable
962    fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
963        use grafeo_core::graph::lpg::CompareOp;
964
965        match predicate {
966            LogicalExpression::Binary { left, op, right } => {
967                // Check for AND/OR first (compound conditions)
968                match op {
969                    BinaryOp::And => {
970                        let left_result = self.check_zone_map_for_predicate(left);
971                        let right_result = self.check_zone_map_for_predicate(right);
972
973                        return match (left_result, right_result) {
974                            // If either side definitely won't match, the AND won't match
975                            (Some(false), _) | (_, Some(false)) => Some(false),
976                            // If both might match, might match overall
977                            (Some(true), Some(true)) => Some(true),
978                            // Otherwise, can't determine
979                            _ => None,
980                        };
981                    }
982                    BinaryOp::Or => {
983                        let left_result = self.check_zone_map_for_predicate(left);
984                        let right_result = self.check_zone_map_for_predicate(right);
985
986                        return match (left_result, right_result) {
987                            // Both sides definitely won't match
988                            (Some(false), Some(false)) => Some(false),
989                            // At least one side might match
990                            (Some(true), _) | (_, Some(true)) => Some(true),
991                            // Otherwise, can't determine
992                            _ => None,
993                        };
994                    }
995                    _ => {}
996                }
997
998                // Simple property comparison: n.property op value
999                let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1000                    (
1001                        LogicalExpression::Property { property, .. },
1002                        LogicalExpression::Literal(val),
1003                    ) => {
1004                        let cmp = match op {
1005                            BinaryOp::Eq => CompareOp::Eq,
1006                            BinaryOp::Ne => CompareOp::Ne,
1007                            BinaryOp::Lt => CompareOp::Lt,
1008                            BinaryOp::Le => CompareOp::Le,
1009                            BinaryOp::Gt => CompareOp::Gt,
1010                            BinaryOp::Ge => CompareOp::Ge,
1011                            _ => return None,
1012                        };
1013                        (property.clone(), cmp, val.clone())
1014                    }
1015                    (
1016                        LogicalExpression::Literal(val),
1017                        LogicalExpression::Property { property, .. },
1018                    ) => {
1019                        // Flip comparison for reversed operands
1020                        let cmp = match op {
1021                            BinaryOp::Eq => CompareOp::Eq,
1022                            BinaryOp::Ne => CompareOp::Ne,
1023                            BinaryOp::Lt => CompareOp::Gt, // val < prop means prop > val
1024                            BinaryOp::Le => CompareOp::Ge,
1025                            BinaryOp::Gt => CompareOp::Lt,
1026                            BinaryOp::Ge => CompareOp::Le,
1027                            _ => return None,
1028                        };
1029                        (property.clone(), cmp, val.clone())
1030                    }
1031                    _ => return None,
1032                };
1033
1034                // Check zone map for node properties
1035                let might_match =
1036                    self.store
1037                        .node_property_might_match(&property.into(), compare_op, &value);
1038
1039                Some(might_match)
1040            }
1041
1042            _ => None,
1043        }
1044    }
1045
1046    /// Tries to use a property index for filter optimization.
1047    ///
1048    /// When a filter predicate is an equality check on an indexed property,
1049    /// and the input is a simple NodeScan, we can use the index to look up
1050    /// matching nodes directly instead of scanning all nodes.
1051    ///
1052    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1053    /// `Ok(None)` if not applicable, or `Err` on error.
1054    fn try_plan_filter_with_property_index(
1055        &self,
1056        filter: &FilterOp,
1057    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1058        // Only optimize if input is a simple NodeScan (not nested)
1059        let (scan_variable, scan_label) = match filter.input.as_ref() {
1060            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1061                (scan.variable.clone(), scan.label.clone())
1062            }
1063            _ => return Ok(None),
1064        };
1065
1066        // Extract property equality conditions from the predicate
1067        // Handles both simple (n.prop = val) and compound (n.a = 1 AND n.b = 2)
1068        let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1069
1070        if conditions.is_empty() {
1071            return Ok(None);
1072        }
1073
1074        // Check if at least one condition has an index (otherwise full scan is needed anyway)
1075        let has_indexed_condition = conditions
1076            .iter()
1077            .any(|(prop, _)| self.store.has_property_index(prop));
1078
1079        if !has_indexed_condition {
1080            return Ok(None);
1081        }
1082
1083        // Use the optimized batch lookup for multiple conditions
1084        let conditions_ref: Vec<(&str, Value)> = conditions
1085            .iter()
1086            .map(|(p, v)| (p.as_str(), v.clone()))
1087            .collect();
1088        let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1089
1090        // If there's a label filter, also filter by label
1091        if let Some(label) = &scan_label {
1092            let label_nodes: std::collections::HashSet<_> =
1093                self.store.nodes_by_label(label).into_iter().collect();
1094            matching_nodes.retain(|n| label_nodes.contains(n));
1095        }
1096
1097        // Create a NodeListOperator with the matching nodes
1098        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1099        let columns = vec![scan_variable];
1100
1101        Ok(Some((node_list_op, columns)))
1102    }
1103
1104    /// Extracts equality conditions (property = literal) from a predicate.
1105    ///
1106    /// Handles both simple predicates and AND chains:
1107    /// - `n.name = "Alice"` → `[("name", "Alice")]`
1108    /// - `n.name = "Alice" AND n.age = 30` → `[("name", "Alice"), ("age", 30)]`
1109    fn extract_equality_conditions(
1110        &self,
1111        predicate: &LogicalExpression,
1112        target_variable: &str,
1113    ) -> Vec<(String, Value)> {
1114        let mut conditions = Vec::new();
1115        self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1116        conditions
1117    }
1118
1119    /// Recursively collects equality conditions from AND expressions.
1120    fn collect_equality_conditions(
1121        &self,
1122        expr: &LogicalExpression,
1123        target_variable: &str,
1124        conditions: &mut Vec<(String, Value)>,
1125    ) {
1126        match expr {
1127            // Handle AND: recurse into both sides
1128            LogicalExpression::Binary {
1129                left,
1130                op: BinaryOp::And,
1131                right,
1132            } => {
1133                self.collect_equality_conditions(left, target_variable, conditions);
1134                self.collect_equality_conditions(right, target_variable, conditions);
1135            }
1136
1137            // Handle equality: extract property and value
1138            LogicalExpression::Binary {
1139                left,
1140                op: BinaryOp::Eq,
1141                right,
1142            } => {
1143                if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1144                    && var == target_variable
1145                {
1146                    conditions.push((prop, val));
1147                }
1148            }
1149
1150            _ => {}
1151        }
1152    }
1153
1154    /// Extracts (variable, property, value) from a property equality expression.
1155    fn extract_property_equality(
1156        &self,
1157        left: &LogicalExpression,
1158        right: &LogicalExpression,
1159    ) -> Option<(String, String, Value)> {
1160        match (left, right) {
1161            (
1162                LogicalExpression::Property { variable, property },
1163                LogicalExpression::Literal(val),
1164            ) => Some((variable.clone(), property.clone(), val.clone())),
1165            (
1166                LogicalExpression::Literal(val),
1167                LogicalExpression::Property { variable, property },
1168            ) => Some((variable.clone(), property.clone(), val.clone())),
1169            _ => None,
1170        }
1171    }
1172
1173    /// Tries to optimize a filter using range queries on properties.
1174    ///
1175    /// This optimization is applied when:
1176    /// - The input is a simple NodeScan (no nested operations)
1177    /// - The predicate contains range comparisons (>, <, >=, <=)
1178    /// - The same variable and property are being filtered
1179    ///
1180    /// Handles both simple range predicates (`n.age > 30`) and BETWEEN patterns
1181    /// (`n.age >= 30 AND n.age <= 50`).
1182    ///
1183    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1184    /// `Ok(None)` if not applicable, or `Err` on error.
1185    fn try_plan_filter_with_range_index(
1186        &self,
1187        filter: &FilterOp,
1188    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1189        // Only optimize if input is a simple NodeScan (not nested)
1190        let (scan_variable, scan_label) = match filter.input.as_ref() {
1191            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1192                (scan.variable.clone(), scan.label.clone())
1193            }
1194            _ => return Ok(None),
1195        };
1196
1197        // Try to extract BETWEEN pattern first (more efficient)
1198        if let Some((variable, property, min, max, min_inc, max_inc)) =
1199            self.extract_between_predicate(&filter.predicate)
1200            && variable == scan_variable
1201        {
1202            return self.plan_range_filter(
1203                &scan_variable,
1204                &scan_label,
1205                &property,
1206                RangeBounds {
1207                    min: Some(&min),
1208                    max: Some(&max),
1209                    min_inclusive: min_inc,
1210                    max_inclusive: max_inc,
1211                },
1212            );
1213        }
1214
1215        // Try to extract simple range predicate
1216        if let Some((variable, property, op, value)) =
1217            self.extract_range_predicate(&filter.predicate)
1218            && variable == scan_variable
1219        {
1220            let (min, max, min_inc, max_inc) = match op {
1221                BinaryOp::Lt => (None, Some(value), false, false),
1222                BinaryOp::Le => (None, Some(value), false, true),
1223                BinaryOp::Gt => (Some(value), None, false, false),
1224                BinaryOp::Ge => (Some(value), None, true, false),
1225                _ => return Ok(None),
1226            };
1227            return self.plan_range_filter(
1228                &scan_variable,
1229                &scan_label,
1230                &property,
1231                RangeBounds {
1232                    min: min.as_ref(),
1233                    max: max.as_ref(),
1234                    min_inclusive: min_inc,
1235                    max_inclusive: max_inc,
1236                },
1237            );
1238        }
1239
1240        Ok(None)
1241    }
1242
1243    /// Plans a range filter using `find_nodes_in_range`.
1244    fn plan_range_filter(
1245        &self,
1246        scan_variable: &str,
1247        scan_label: &Option<String>,
1248        property: &str,
1249        bounds: RangeBounds<'_>,
1250    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1251        // Use the store's range query method
1252        let mut matching_nodes = self.store.find_nodes_in_range(
1253            property,
1254            bounds.min,
1255            bounds.max,
1256            bounds.min_inclusive,
1257            bounds.max_inclusive,
1258        );
1259
1260        // If there's a label filter, also filter by label
1261        if let Some(label) = scan_label {
1262            let label_nodes: std::collections::HashSet<_> =
1263                self.store.nodes_by_label(label).into_iter().collect();
1264            matching_nodes.retain(|n| label_nodes.contains(n));
1265        }
1266
1267        // Create a NodeListOperator with the matching nodes
1268        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1269        let columns = vec![scan_variable.to_string()];
1270
1271        Ok(Some((node_list_op, columns)))
1272    }
1273
1274    /// Extracts a simple range predicate (>, <, >=, <=) from an expression.
1275    ///
1276    /// Returns `(variable, property, operator, value)` if found.
1277    fn extract_range_predicate(
1278        &self,
1279        predicate: &LogicalExpression,
1280    ) -> Option<(String, String, BinaryOp, Value)> {
1281        match predicate {
1282            LogicalExpression::Binary { left, op, right } => {
1283                match op {
1284                    BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1285                        // Try property on left: n.age > 30
1286                        if let (
1287                            LogicalExpression::Property { variable, property },
1288                            LogicalExpression::Literal(val),
1289                        ) = (left.as_ref(), right.as_ref())
1290                        {
1291                            return Some((variable.clone(), property.clone(), *op, val.clone()));
1292                        }
1293
1294                        // Try property on right: 30 < n.age (flip operator)
1295                        if let (
1296                            LogicalExpression::Literal(val),
1297                            LogicalExpression::Property { variable, property },
1298                        ) = (left.as_ref(), right.as_ref())
1299                        {
1300                            let flipped_op = match op {
1301                                BinaryOp::Lt => BinaryOp::Gt,
1302                                BinaryOp::Le => BinaryOp::Ge,
1303                                BinaryOp::Gt => BinaryOp::Lt,
1304                                BinaryOp::Ge => BinaryOp::Le,
1305                                _ => return None,
1306                            };
1307                            return Some((
1308                                variable.clone(),
1309                                property.clone(),
1310                                flipped_op,
1311                                val.clone(),
1312                            ));
1313                        }
1314                    }
1315                    _ => {}
1316                }
1317            }
1318            _ => {}
1319        }
1320        None
1321    }
1322
1323    /// Extracts a BETWEEN pattern from compound predicates.
1324    ///
1325    /// Recognizes patterns like:
1326    /// - `n.age >= 30 AND n.age <= 50`
1327    /// - `n.age > 30 AND n.age < 50`
1328    ///
1329    /// Returns `(variable, property, min_value, max_value, min_inclusive, max_inclusive)`.
1330    fn extract_between_predicate(
1331        &self,
1332        predicate: &LogicalExpression,
1333    ) -> Option<(String, String, Value, Value, bool, bool)> {
1334        // Must be an AND expression
1335        let (left, right) = match predicate {
1336            LogicalExpression::Binary {
1337                left,
1338                op: BinaryOp::And,
1339                right,
1340            } => (left.as_ref(), right.as_ref()),
1341            _ => return None,
1342        };
1343
1344        // Extract range predicates from both sides
1345        let left_range = self.extract_range_predicate(left);
1346        let right_range = self.extract_range_predicate(right);
1347
1348        let (left_var, left_prop, left_op, left_val) = left_range?;
1349        let (right_var, right_prop, right_op, right_val) = right_range?;
1350
1351        // Must be same variable and property
1352        if left_var != right_var || left_prop != right_prop {
1353            return None;
1354        }
1355
1356        // Determine which is lower bound and which is upper bound
1357        let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1358            // n.x >= min AND n.x <= max
1359            (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1360            // n.x >= min AND n.x < max
1361            (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1362            // n.x > min AND n.x <= max
1363            (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1364            // n.x > min AND n.x < max
1365            (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1366            // Reversed order: n.x <= max AND n.x >= min
1367            (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1368            // n.x < max AND n.x >= min
1369            (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1370            // n.x <= max AND n.x > min
1371            (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1372            // n.x < max AND n.x > min
1373            (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1374            _ => return None,
1375        };
1376
1377        Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1378    }
1379
1380    /// Plans a LIMIT operator.
1381    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1382        let (input_op, columns) = self.plan_operator(&limit.input)?;
1383        let output_schema = self.derive_schema_from_columns(&columns);
1384        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1385        Ok((operator, columns))
1386    }
1387
1388    /// Plans a SKIP operator.
1389    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1390        let (input_op, columns) = self.plan_operator(&skip.input)?;
1391        let output_schema = self.derive_schema_from_columns(&columns);
1392        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1393        Ok((operator, columns))
1394    }
1395
1396    /// Plans a SORT (ORDER BY) operator.
1397    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1398        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1399
1400        // Build variable to column index mapping
1401        let mut variable_columns: HashMap<String, usize> = input_columns
1402            .iter()
1403            .enumerate()
1404            .map(|(i, name)| (name.clone(), i))
1405            .collect();
1406
1407        // Collect property expressions that need to be projected before sorting
1408        let mut property_projections: Vec<(String, String, String)> = Vec::new();
1409        let mut next_col_idx = input_columns.len();
1410
1411        for key in &sort.keys {
1412            if let LogicalExpression::Property { variable, property } = &key.expression {
1413                let col_name = format!("{}_{}", variable, property);
1414                if !variable_columns.contains_key(&col_name) {
1415                    property_projections.push((
1416                        variable.clone(),
1417                        property.clone(),
1418                        col_name.clone(),
1419                    ));
1420                    variable_columns.insert(col_name, next_col_idx);
1421                    next_col_idx += 1;
1422                }
1423            }
1424        }
1425
1426        // Track output columns
1427        let mut output_columns = input_columns.clone();
1428
1429        // If we have property expressions, add a projection to materialize them
1430        if !property_projections.is_empty() {
1431            let mut projections = Vec::new();
1432            let mut output_types = Vec::new();
1433
1434            // First, pass through all existing columns (use Node type to preserve node IDs
1435            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1436            for (i, _) in input_columns.iter().enumerate() {
1437                projections.push(ProjectExpr::Column(i));
1438                output_types.push(LogicalType::Node);
1439            }
1440
1441            // Then add property access projections
1442            for (variable, property, col_name) in &property_projections {
1443                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1444                    Error::Internal(format!(
1445                        "Variable '{}' not found for ORDER BY property projection",
1446                        variable
1447                    ))
1448                })?;
1449                projections.push(ProjectExpr::PropertyAccess {
1450                    column: source_col,
1451                    property: property.clone(),
1452                });
1453                output_types.push(LogicalType::Any);
1454                output_columns.push(col_name.clone());
1455            }
1456
1457            input_op = Box::new(ProjectOperator::with_store(
1458                input_op,
1459                projections,
1460                output_types,
1461                Arc::clone(&self.store),
1462            ));
1463        }
1464
1465        // Convert logical sort keys to physical sort keys
1466        let physical_keys: Vec<PhysicalSortKey> = sort
1467            .keys
1468            .iter()
1469            .map(|key| {
1470                let col_idx = self
1471                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1472                Ok(PhysicalSortKey {
1473                    column: col_idx,
1474                    direction: match key.order {
1475                        SortOrder::Ascending => SortDirection::Ascending,
1476                        SortOrder::Descending => SortDirection::Descending,
1477                    },
1478                    null_order: NullOrder::NullsLast,
1479                })
1480            })
1481            .collect::<Result<Vec<_>>>()?;
1482
1483        let output_schema = self.derive_schema_from_columns(&output_columns);
1484        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1485        Ok((operator, output_columns))
1486    }
1487
1488    /// Resolves a sort expression to a column index, using projected property columns.
1489    fn resolve_sort_expression_with_properties(
1490        &self,
1491        expr: &LogicalExpression,
1492        variable_columns: &HashMap<String, usize>,
1493    ) -> Result<usize> {
1494        match expr {
1495            LogicalExpression::Variable(name) => {
1496                variable_columns.get(name).copied().ok_or_else(|| {
1497                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1498                })
1499            }
1500            LogicalExpression::Property { variable, property } => {
1501                // Look up the projected property column (e.g., "p_age" for p.age)
1502                let col_name = format!("{}_{}", variable, property);
1503                variable_columns.get(&col_name).copied().ok_or_else(|| {
1504                    Error::Internal(format!(
1505                        "Property column '{}' not found for ORDER BY (from {}.{})",
1506                        col_name, variable, property
1507                    ))
1508                })
1509            }
1510            _ => Err(Error::Internal(format!(
1511                "Unsupported ORDER BY expression: {:?}",
1512                expr
1513            ))),
1514        }
1515    }
1516
1517    /// Derives a schema from column names (uses Any type to handle all value types).
1518    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1519        columns.iter().map(|_| LogicalType::Any).collect()
1520    }
1521
1522    /// Plans an AGGREGATE operator.
1523    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1524        // Check if we can use factorized aggregation for speedup
1525        // Conditions:
1526        // 1. Factorized execution is enabled
1527        // 2. Input is an expand chain (multi-hop)
1528        // 3. No GROUP BY
1529        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1530        if self.factorized_execution
1531            && agg.group_by.is_empty()
1532            && Self::count_expand_chain(&agg.input).0 >= 2
1533            && self.is_simple_aggregate(agg)
1534            && let Ok((op, cols)) = self.plan_factorized_aggregate(agg)
1535        {
1536            return Ok((op, cols));
1537        }
1538        // Fall through to regular aggregate if factorized planning fails
1539
1540        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1541
1542        // Build variable to column index mapping
1543        let mut variable_columns: HashMap<String, usize> = input_columns
1544            .iter()
1545            .enumerate()
1546            .map(|(i, name)| (name.clone(), i))
1547            .collect();
1548
1549        // Collect all property expressions that need to be projected before aggregation
1550        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1551        let mut next_col_idx = input_columns.len();
1552
1553        // Check group-by expressions for properties
1554        for expr in &agg.group_by {
1555            if let LogicalExpression::Property { variable, property } = expr {
1556                let col_name = format!("{}_{}", variable, property);
1557                if !variable_columns.contains_key(&col_name) {
1558                    property_projections.push((
1559                        variable.clone(),
1560                        property.clone(),
1561                        col_name.clone(),
1562                    ));
1563                    variable_columns.insert(col_name, next_col_idx);
1564                    next_col_idx += 1;
1565                }
1566            }
1567        }
1568
1569        // Check aggregate expressions for properties
1570        for agg_expr in &agg.aggregates {
1571            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1572                let col_name = format!("{}_{}", variable, property);
1573                if !variable_columns.contains_key(&col_name) {
1574                    property_projections.push((
1575                        variable.clone(),
1576                        property.clone(),
1577                        col_name.clone(),
1578                    ));
1579                    variable_columns.insert(col_name, next_col_idx);
1580                    next_col_idx += 1;
1581                }
1582            }
1583        }
1584
1585        // If we have property expressions, add a projection to materialize them
1586        if !property_projections.is_empty() {
1587            let mut projections = Vec::new();
1588            let mut output_types = Vec::new();
1589
1590            // First, pass through all existing columns (use Node type to preserve node IDs
1591            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1592            for (i, _) in input_columns.iter().enumerate() {
1593                projections.push(ProjectExpr::Column(i));
1594                output_types.push(LogicalType::Node);
1595            }
1596
1597            // Then add property access projections
1598            for (variable, property, _col_name) in &property_projections {
1599                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1600                    Error::Internal(format!(
1601                        "Variable '{}' not found for property projection",
1602                        variable
1603                    ))
1604                })?;
1605                projections.push(ProjectExpr::PropertyAccess {
1606                    column: source_col,
1607                    property: property.clone(),
1608                });
1609                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1610            }
1611
1612            input_op = Box::new(ProjectOperator::with_store(
1613                input_op,
1614                projections,
1615                output_types,
1616                Arc::clone(&self.store),
1617            ));
1618        }
1619
1620        // Convert group-by expressions to column indices
1621        let group_columns: Vec<usize> = agg
1622            .group_by
1623            .iter()
1624            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1625            .collect::<Result<Vec<_>>>()?;
1626
1627        // Convert aggregate expressions to physical form
1628        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1629            .aggregates
1630            .iter()
1631            .map(|agg_expr| {
1632                let column = agg_expr
1633                    .expression
1634                    .as_ref()
1635                    .map(|e| {
1636                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1637                    })
1638                    .transpose()?;
1639
1640                Ok(PhysicalAggregateExpr {
1641                    function: convert_aggregate_function(agg_expr.function),
1642                    column,
1643                    distinct: agg_expr.distinct,
1644                    alias: agg_expr.alias.clone(),
1645                    percentile: agg_expr.percentile,
1646                })
1647            })
1648            .collect::<Result<Vec<_>>>()?;
1649
1650        // Build output schema and column names
1651        let mut output_schema = Vec::new();
1652        let mut output_columns = Vec::new();
1653
1654        // Add group-by columns
1655        for expr in &agg.group_by {
1656            output_schema.push(LogicalType::Any); // Group-by values can be any type
1657            output_columns.push(expression_to_string(expr));
1658        }
1659
1660        // Add aggregate result columns
1661        for agg_expr in &agg.aggregates {
1662            let result_type = match agg_expr.function {
1663                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1664                    LogicalType::Int64
1665                }
1666                LogicalAggregateFunction::Sum => LogicalType::Int64,
1667                LogicalAggregateFunction::Avg => LogicalType::Float64,
1668                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1669                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1670                    // since the aggregate can return any Value type, but the most common case
1671                    // is numeric values from property expressions
1672                    LogicalType::Int64
1673                }
1674                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1675                // Statistical functions return Float64
1676                LogicalAggregateFunction::StdDev
1677                | LogicalAggregateFunction::StdDevPop
1678                | LogicalAggregateFunction::PercentileDisc
1679                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1680            };
1681            output_schema.push(result_type);
1682            output_columns.push(
1683                agg_expr
1684                    .alias
1685                    .clone()
1686                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1687            );
1688        }
1689
1690        // Choose operator based on whether there are group-by columns
1691        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1692            Box::new(SimpleAggregateOperator::new(
1693                input_op,
1694                physical_aggregates,
1695                output_schema,
1696            ))
1697        } else {
1698            Box::new(HashAggregateOperator::new(
1699                input_op,
1700                group_columns,
1701                physical_aggregates,
1702                output_schema,
1703            ))
1704        };
1705
1706        // Apply HAVING clause filter if present
1707        if let Some(having_expr) = &agg.having {
1708            // Build variable to column mapping for the aggregate output
1709            let having_var_columns: HashMap<String, usize> = output_columns
1710                .iter()
1711                .enumerate()
1712                .map(|(i, name)| (name.clone(), i))
1713                .collect();
1714
1715            let filter_expr = self.convert_expression(having_expr)?;
1716            let predicate =
1717                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1718            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1719        }
1720
1721        Ok((operator, output_columns))
1722    }
1723
1724    /// Checks if an aggregate is simple enough for factorized execution.
1725    ///
1726    /// Simple aggregates:
1727    /// - COUNT(*) or COUNT(variable)
1728    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1729    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1730        agg.aggregates.iter().all(|agg_expr| {
1731            match agg_expr.function {
1732                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1733                    // COUNT(*) is always OK, COUNT(var) is OK
1734                    agg_expr.expression.is_none()
1735                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1736                }
1737                LogicalAggregateFunction::Sum
1738                | LogicalAggregateFunction::Avg
1739                | LogicalAggregateFunction::Min
1740                | LogicalAggregateFunction::Max => {
1741                    // For now, only support when expression is a variable
1742                    // (property access would require flattening first)
1743                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1744                }
1745                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1746                _ => false,
1747            }
1748        })
1749    }
1750
1751    /// Plans a factorized aggregate that operates directly on factorized data.
1752    ///
1753    /// This avoids the O(n²) cost of flattening before aggregation.
1754    fn plan_factorized_aggregate(
1755        &self,
1756        agg: &AggregateOp,
1757    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1758        // Build the expand chain - this returns a LazyFactorizedChainOperator
1759        let expands = Self::collect_expand_chain(&agg.input);
1760        if expands.is_empty() {
1761            return Err(Error::Internal(
1762                "Expected expand chain for factorized aggregate".to_string(),
1763            ));
1764        }
1765
1766        // Get the base operator (before first expand)
1767        let first_expand = expands[0];
1768        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1769
1770        let mut columns = base_columns.clone();
1771        let mut steps = Vec::new();
1772        let mut is_first = true;
1773
1774        for expand in &expands {
1775            // Find source column for this expand
1776            let source_column = if is_first {
1777                base_columns
1778                    .iter()
1779                    .position(|c| c == &expand.from_variable)
1780                    .ok_or_else(|| {
1781                        Error::Internal(format!(
1782                            "Source variable '{}' not found in base columns",
1783                            expand.from_variable
1784                        ))
1785                    })?
1786            } else {
1787                1 // Target from previous level
1788            };
1789
1790            let direction = match expand.direction {
1791                ExpandDirection::Outgoing => Direction::Outgoing,
1792                ExpandDirection::Incoming => Direction::Incoming,
1793                ExpandDirection::Both => Direction::Both,
1794            };
1795
1796            steps.push(ExpandStep {
1797                source_column,
1798                direction,
1799                edge_type: expand.edge_type.clone(),
1800            });
1801
1802            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1803                let count = self.anon_edge_counter.get();
1804                self.anon_edge_counter.set(count + 1);
1805                format!("_anon_edge_{}", count)
1806            });
1807            columns.push(edge_col_name);
1808            columns.push(expand.to_variable.clone());
1809
1810            is_first = false;
1811        }
1812
1813        // Create the lazy factorized chain operator
1814        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1815
1816        if let Some(tx_id) = self.tx_id {
1817            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1818        } else {
1819            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1820        }
1821
1822        // Convert logical aggregates to factorized aggregates
1823        let factorized_aggs: Vec<FactorizedAggregate> = agg
1824            .aggregates
1825            .iter()
1826            .map(|agg_expr| {
1827                match agg_expr.function {
1828                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1829                        // COUNT(*) uses simple count, COUNT(col) uses column count
1830                        if agg_expr.expression.is_none() {
1831                            FactorizedAggregate::count()
1832                        } else {
1833                            // For COUNT(variable), we use the deepest level's target column
1834                            // which is the last column added to the schema
1835                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1836                        }
1837                    }
1838                    LogicalAggregateFunction::Sum => {
1839                        // SUM on deepest level target
1840                        FactorizedAggregate::sum(1)
1841                    }
1842                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1843                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1844                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1845                    _ => {
1846                        // Shouldn't reach here due to is_simple_aggregate check
1847                        FactorizedAggregate::count()
1848                    }
1849                }
1850            })
1851            .collect();
1852
1853        // Build output column names
1854        let output_columns: Vec<String> = agg
1855            .aggregates
1856            .iter()
1857            .map(|agg_expr| {
1858                agg_expr
1859                    .alias
1860                    .clone()
1861                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1862            })
1863            .collect();
1864
1865        // Create the factorized aggregate operator
1866        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1867
1868        Ok((Box::new(factorized_agg_op), output_columns))
1869    }
1870
1871    /// Resolves a logical expression to a column index.
1872    #[allow(dead_code)]
1873    fn resolve_expression_to_column(
1874        &self,
1875        expr: &LogicalExpression,
1876        variable_columns: &HashMap<String, usize>,
1877    ) -> Result<usize> {
1878        match expr {
1879            LogicalExpression::Variable(name) => variable_columns
1880                .get(name)
1881                .copied()
1882                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1883            LogicalExpression::Property { variable, .. } => variable_columns
1884                .get(variable)
1885                .copied()
1886                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1887            _ => Err(Error::Internal(format!(
1888                "Cannot resolve expression to column: {:?}",
1889                expr
1890            ))),
1891        }
1892    }
1893
1894    /// Resolves a logical expression to a column index, using projected property columns.
1895    ///
1896    /// This is used for aggregations where properties have been projected into their own columns.
1897    fn resolve_expression_to_column_with_properties(
1898        &self,
1899        expr: &LogicalExpression,
1900        variable_columns: &HashMap<String, usize>,
1901    ) -> Result<usize> {
1902        match expr {
1903            LogicalExpression::Variable(name) => variable_columns
1904                .get(name)
1905                .copied()
1906                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1907            LogicalExpression::Property { variable, property } => {
1908                // Look up the projected property column (e.g., "p_price" for p.price)
1909                let col_name = format!("{}_{}", variable, property);
1910                variable_columns.get(&col_name).copied().ok_or_else(|| {
1911                    Error::Internal(format!(
1912                        "Property column '{}' not found (from {}.{})",
1913                        col_name, variable, property
1914                    ))
1915                })
1916            }
1917            _ => Err(Error::Internal(format!(
1918                "Cannot resolve expression to column: {:?}",
1919                expr
1920            ))),
1921        }
1922    }
1923
1924    /// Converts a logical expression to a filter expression.
1925    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1926        match expr {
1927            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1928            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1929            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1930                variable: variable.clone(),
1931                property: property.clone(),
1932            }),
1933            LogicalExpression::Binary { left, op, right } => {
1934                let left_expr = self.convert_expression(left)?;
1935                let right_expr = self.convert_expression(right)?;
1936                let filter_op = convert_binary_op(*op)?;
1937                Ok(FilterExpression::Binary {
1938                    left: Box::new(left_expr),
1939                    op: filter_op,
1940                    right: Box::new(right_expr),
1941                })
1942            }
1943            LogicalExpression::Unary { op, operand } => {
1944                let operand_expr = self.convert_expression(operand)?;
1945                let filter_op = convert_unary_op(*op)?;
1946                Ok(FilterExpression::Unary {
1947                    op: filter_op,
1948                    operand: Box::new(operand_expr),
1949                })
1950            }
1951            LogicalExpression::FunctionCall { name, args, .. } => {
1952                let filter_args: Vec<FilterExpression> = args
1953                    .iter()
1954                    .map(|a| self.convert_expression(a))
1955                    .collect::<Result<Vec<_>>>()?;
1956                Ok(FilterExpression::FunctionCall {
1957                    name: name.clone(),
1958                    args: filter_args,
1959                })
1960            }
1961            LogicalExpression::Case {
1962                operand,
1963                when_clauses,
1964                else_clause,
1965            } => {
1966                let filter_operand = operand
1967                    .as_ref()
1968                    .map(|e| self.convert_expression(e))
1969                    .transpose()?
1970                    .map(Box::new);
1971                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1972                    .iter()
1973                    .map(|(cond, result)| {
1974                        Ok((
1975                            self.convert_expression(cond)?,
1976                            self.convert_expression(result)?,
1977                        ))
1978                    })
1979                    .collect::<Result<Vec<_>>>()?;
1980                let filter_else = else_clause
1981                    .as_ref()
1982                    .map(|e| self.convert_expression(e))
1983                    .transpose()?
1984                    .map(Box::new);
1985                Ok(FilterExpression::Case {
1986                    operand: filter_operand,
1987                    when_clauses: filter_when_clauses,
1988                    else_clause: filter_else,
1989                })
1990            }
1991            LogicalExpression::List(items) => {
1992                let filter_items: Vec<FilterExpression> = items
1993                    .iter()
1994                    .map(|item| self.convert_expression(item))
1995                    .collect::<Result<Vec<_>>>()?;
1996                Ok(FilterExpression::List(filter_items))
1997            }
1998            LogicalExpression::Map(pairs) => {
1999                let filter_pairs: Vec<(String, FilterExpression)> = pairs
2000                    .iter()
2001                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2002                    .collect::<Result<Vec<_>>>()?;
2003                Ok(FilterExpression::Map(filter_pairs))
2004            }
2005            LogicalExpression::IndexAccess { base, index } => {
2006                let base_expr = self.convert_expression(base)?;
2007                let index_expr = self.convert_expression(index)?;
2008                Ok(FilterExpression::IndexAccess {
2009                    base: Box::new(base_expr),
2010                    index: Box::new(index_expr),
2011                })
2012            }
2013            LogicalExpression::SliceAccess { base, start, end } => {
2014                let base_expr = self.convert_expression(base)?;
2015                let start_expr = start
2016                    .as_ref()
2017                    .map(|s| self.convert_expression(s))
2018                    .transpose()?
2019                    .map(Box::new);
2020                let end_expr = end
2021                    .as_ref()
2022                    .map(|e| self.convert_expression(e))
2023                    .transpose()?
2024                    .map(Box::new);
2025                Ok(FilterExpression::SliceAccess {
2026                    base: Box::new(base_expr),
2027                    start: start_expr,
2028                    end: end_expr,
2029                })
2030            }
2031            LogicalExpression::Parameter(_) => Err(Error::Internal(
2032                "Parameters not yet supported in filters".to_string(),
2033            )),
2034            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2035            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2036            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2037            LogicalExpression::ListComprehension {
2038                variable,
2039                list_expr,
2040                filter_expr,
2041                map_expr,
2042            } => {
2043                let list = self.convert_expression(list_expr)?;
2044                let filter = filter_expr
2045                    .as_ref()
2046                    .map(|f| self.convert_expression(f))
2047                    .transpose()?
2048                    .map(Box::new);
2049                let map = self.convert_expression(map_expr)?;
2050                Ok(FilterExpression::ListComprehension {
2051                    variable: variable.clone(),
2052                    list_expr: Box::new(list),
2053                    filter_expr: filter,
2054                    map_expr: Box::new(map),
2055                })
2056            }
2057            LogicalExpression::ExistsSubquery(subplan) => {
2058                // Extract the pattern from the subplan
2059                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
2060                let (start_var, direction, edge_type, end_labels) =
2061                    self.extract_exists_pattern(subplan)?;
2062
2063                Ok(FilterExpression::ExistsSubquery {
2064                    start_var,
2065                    direction,
2066                    edge_type,
2067                    end_labels,
2068                    min_hops: None,
2069                    max_hops: None,
2070                })
2071            }
2072            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2073                "COUNT subqueries not yet supported".to_string(),
2074            )),
2075        }
2076    }
2077
2078    /// Extracts the pattern from an EXISTS subplan.
2079    /// Returns (start_variable, direction, edge_type, end_labels).
2080    fn extract_exists_pattern(
2081        &self,
2082        subplan: &LogicalOperator,
2083    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2084        match subplan {
2085            LogicalOperator::Expand(expand) => {
2086                // Get end node labels from the to_variable if there's a node scan input
2087                let end_labels = self.extract_end_labels_from_expand(expand);
2088                let direction = match expand.direction {
2089                    ExpandDirection::Outgoing => Direction::Outgoing,
2090                    ExpandDirection::Incoming => Direction::Incoming,
2091                    ExpandDirection::Both => Direction::Both,
2092                };
2093                Ok((
2094                    expand.from_variable.clone(),
2095                    direction,
2096                    expand.edge_type.clone(),
2097                    end_labels,
2098                ))
2099            }
2100            LogicalOperator::NodeScan(scan) => {
2101                if let Some(input) = &scan.input {
2102                    self.extract_exists_pattern(input)
2103                } else {
2104                    Err(Error::Internal(
2105                        "EXISTS subquery must contain an edge pattern".to_string(),
2106                    ))
2107                }
2108            }
2109            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2110            _ => Err(Error::Internal(
2111                "Unsupported EXISTS subquery pattern".to_string(),
2112            )),
2113        }
2114    }
2115
2116    /// Extracts end node labels from an Expand operator if present.
2117    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2118        // Check if the expand has a NodeScan input with a label filter
2119        match expand.input.as_ref() {
2120            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2121            _ => None,
2122        }
2123    }
2124
2125    /// Plans a JOIN operator.
2126    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2127        let (left_op, left_columns) = self.plan_operator(&join.left)?;
2128        let (right_op, right_columns) = self.plan_operator(&join.right)?;
2129
2130        // Build combined output columns
2131        let mut columns = left_columns.clone();
2132        columns.extend(right_columns.clone());
2133
2134        // Convert join type
2135        let physical_join_type = match join.join_type {
2136            JoinType::Inner => PhysicalJoinType::Inner,
2137            JoinType::Left => PhysicalJoinType::Left,
2138            JoinType::Right => PhysicalJoinType::Right,
2139            JoinType::Full => PhysicalJoinType::Full,
2140            JoinType::Cross => PhysicalJoinType::Cross,
2141            JoinType::Semi => PhysicalJoinType::Semi,
2142            JoinType::Anti => PhysicalJoinType::Anti,
2143        };
2144
2145        // Build key columns from join conditions
2146        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2147            // Cross join - no keys
2148            (vec![], vec![])
2149        } else {
2150            join.conditions
2151                .iter()
2152                .filter_map(|cond| {
2153                    // Try to extract column indices from expressions
2154                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2155                    let right_idx = self
2156                        .expression_to_column(&cond.right, &right_columns)
2157                        .ok()?;
2158                    Some((left_idx, right_idx))
2159                })
2160                .unzip()
2161        };
2162
2163        let output_schema = self.derive_schema_from_columns(&columns);
2164
2165        // Check if we should use leapfrog join for cyclic patterns
2166        // Currently we use hash join by default; leapfrog is available but
2167        // requires explicit multi-way join detection which will be added
2168        // when we have proper cyclic pattern detection in the optimizer.
2169        // For now, LeapfrogJoinOperator is available for direct use.
2170        let _ = LeapfrogJoinOperator::new; // Suppress unused warning
2171
2172        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2173            left_op,
2174            right_op,
2175            probe_keys,
2176            build_keys,
2177            physical_join_type,
2178            output_schema,
2179        ));
2180
2181        Ok((operator, columns))
2182    }
2183
2184    /// Checks if a join pattern is cyclic (e.g., triangle, clique).
2185    ///
2186    /// A cyclic pattern occurs when join conditions reference variables
2187    /// that create a cycle in the join graph. For example, a triangle
2188    /// pattern (a)->(b)->(c)->(a) creates a cycle.
2189    ///
2190    /// Returns true if the join graph contains at least one cycle with 3+ nodes,
2191    /// indicating potential for worst-case optimal join (WCOJ) optimization.
2192    #[allow(dead_code)]
2193    fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2194        // Build adjacency list for join variables
2195        let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2196        let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2197
2198        // Collect edges from join conditions
2199        Self::collect_join_edges(
2200            &LogicalOperator::Join(join.clone()),
2201            &mut edges,
2202            &mut all_vars,
2203        );
2204
2205        // Need at least 3 variables to form a cycle
2206        if all_vars.len() < 3 {
2207            return false;
2208        }
2209
2210        // Detect cycle using DFS with coloring
2211        Self::has_cycle(&edges, &all_vars)
2212    }
2213
2214    /// Collects edges from join conditions into an adjacency list.
2215    fn collect_join_edges(
2216        op: &LogicalOperator,
2217        edges: &mut HashMap<String, Vec<String>>,
2218        vars: &mut std::collections::HashSet<String>,
2219    ) {
2220        match op {
2221            LogicalOperator::Join(join) => {
2222                // Process join conditions
2223                for cond in &join.conditions {
2224                    if let (Some(left_var), Some(right_var)) = (
2225                        Self::extract_join_variable(&cond.left),
2226                        Self::extract_join_variable(&cond.right),
2227                    ) && left_var != right_var
2228                    {
2229                        vars.insert(left_var.clone());
2230                        vars.insert(right_var.clone());
2231
2232                        // Add bidirectional edge
2233                        edges
2234                            .entry(left_var.clone())
2235                            .or_default()
2236                            .push(right_var.clone());
2237                        edges.entry(right_var).or_default().push(left_var);
2238                    }
2239                }
2240
2241                // Recurse into children
2242                Self::collect_join_edges(&join.left, edges, vars);
2243                Self::collect_join_edges(&join.right, edges, vars);
2244            }
2245            LogicalOperator::Expand(expand) => {
2246                // Expand creates implicit join between from_variable and to_variable
2247                vars.insert(expand.from_variable.clone());
2248                vars.insert(expand.to_variable.clone());
2249
2250                edges
2251                    .entry(expand.from_variable.clone())
2252                    .or_default()
2253                    .push(expand.to_variable.clone());
2254                edges
2255                    .entry(expand.to_variable.clone())
2256                    .or_default()
2257                    .push(expand.from_variable.clone());
2258
2259                Self::collect_join_edges(&expand.input, edges, vars);
2260            }
2261            LogicalOperator::Filter(filter) => {
2262                Self::collect_join_edges(&filter.input, edges, vars);
2263            }
2264            LogicalOperator::NodeScan(scan) => {
2265                vars.insert(scan.variable.clone());
2266            }
2267            _ => {}
2268        }
2269    }
2270
2271    /// Extracts the variable name from a join expression.
2272    fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2273        match expr {
2274            LogicalExpression::Variable(v) => Some(v.clone()),
2275            LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2276            LogicalExpression::Id(v) => Some(v.clone()),
2277            _ => None,
2278        }
2279    }
2280
2281    /// Detects if the graph has a cycle using DFS coloring.
2282    ///
2283    /// Colors: 0 = white (unvisited), 1 = gray (in progress), 2 = black (done)
2284    fn has_cycle(
2285        edges: &HashMap<String, Vec<String>>,
2286        vars: &std::collections::HashSet<String>,
2287    ) -> bool {
2288        let mut color: HashMap<&String, u8> = HashMap::new();
2289
2290        for var in vars {
2291            color.insert(var, 0);
2292        }
2293
2294        for start in vars {
2295            if color[start] == 0 && Self::dfs_cycle(start, None, edges, &mut color) {
2296                return true;
2297            }
2298        }
2299
2300        false
2301    }
2302
2303    /// DFS helper for cycle detection.
2304    fn dfs_cycle(
2305        node: &String,
2306        parent: Option<&String>,
2307        edges: &HashMap<String, Vec<String>>,
2308        color: &mut HashMap<&String, u8>,
2309    ) -> bool {
2310        *color.get_mut(node).unwrap() = 1; // Gray
2311
2312        if let Some(neighbors) = edges.get(node) {
2313            for neighbor in neighbors {
2314                // Skip the edge back to parent (undirected graph)
2315                if parent == Some(neighbor) {
2316                    continue;
2317                }
2318
2319                if let Some(&c) = color.get(neighbor) {
2320                    if c == 1 {
2321                        // Found a back edge - cycle detected
2322                        return true;
2323                    }
2324                    if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2325                        return true;
2326                    }
2327                }
2328            }
2329        }
2330
2331        *color.get_mut(node).unwrap() = 2; // Black
2332        false
2333    }
2334
2335    /// Counts the number of base relations in a logical operator tree.
2336    #[allow(dead_code)]
2337    fn count_relations(op: &LogicalOperator) -> usize {
2338        match op {
2339            LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2340            LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2341            LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2342            LogicalOperator::Join(j) => {
2343                Self::count_relations(&j.left) + Self::count_relations(&j.right)
2344            }
2345            _ => 0,
2346        }
2347    }
2348
2349    /// Extracts a column index from an expression.
2350    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2351        match expr {
2352            LogicalExpression::Variable(name) => columns
2353                .iter()
2354                .position(|c| c == name)
2355                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2356            _ => Err(Error::Internal(
2357                "Only variables supported in join conditions".to_string(),
2358            )),
2359        }
2360    }
2361
2362    /// Plans a UNION operator.
2363    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2364        if union.inputs.is_empty() {
2365            return Err(Error::Internal(
2366                "Union requires at least one input".to_string(),
2367            ));
2368        }
2369
2370        let mut inputs = Vec::with_capacity(union.inputs.len());
2371        let mut columns = Vec::new();
2372
2373        for (i, input) in union.inputs.iter().enumerate() {
2374            let (op, cols) = self.plan_operator(input)?;
2375            if i == 0 {
2376                columns = cols;
2377            }
2378            inputs.push(op);
2379        }
2380
2381        let output_schema = self.derive_schema_from_columns(&columns);
2382        let operator = Box::new(UnionOperator::new(inputs, output_schema));
2383
2384        Ok((operator, columns))
2385    }
2386
2387    /// Plans a DISTINCT operator.
2388    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2389        let (input_op, columns) = self.plan_operator(&distinct.input)?;
2390        let output_schema = self.derive_schema_from_columns(&columns);
2391        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2392        Ok((operator, columns))
2393    }
2394
2395    /// Plans a CREATE NODE operator.
2396    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2397        // Plan input if present
2398        let (input_op, mut columns) = if let Some(ref input) = create.input {
2399            let (op, cols) = self.plan_operator(input)?;
2400            (Some(op), cols)
2401        } else {
2402            (None, vec![])
2403        };
2404
2405        // Output column for the created node
2406        let output_column = columns.len();
2407        columns.push(create.variable.clone());
2408
2409        // Convert properties (with constant-folding for lists and function calls)
2410        let properties: Vec<(String, PropertySource)> = create
2411            .properties
2412            .iter()
2413            .map(|(name, expr)| {
2414                let source = match Self::try_fold_expression(expr) {
2415                    Some(value) => PropertySource::Constant(value),
2416                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2417                };
2418                (name.clone(), source)
2419            })
2420            .collect();
2421
2422        let output_schema = self.derive_schema_from_columns(&columns);
2423
2424        let operator = Box::new(
2425            CreateNodeOperator::new(
2426                Arc::clone(&self.store),
2427                input_op,
2428                create.labels.clone(),
2429                properties,
2430                output_schema,
2431                output_column,
2432            )
2433            .with_tx_context(self.viewing_epoch, self.tx_id),
2434        );
2435
2436        Ok((operator, columns))
2437    }
2438
2439    /// Plans a CREATE EDGE operator.
2440    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2441        let (input_op, mut columns) = self.plan_operator(&create.input)?;
2442
2443        // Find source and target columns
2444        let from_column = columns
2445            .iter()
2446            .position(|c| c == &create.from_variable)
2447            .ok_or_else(|| {
2448                Error::Internal(format!(
2449                    "Source variable '{}' not found",
2450                    create.from_variable
2451                ))
2452            })?;
2453
2454        let to_column = columns
2455            .iter()
2456            .position(|c| c == &create.to_variable)
2457            .ok_or_else(|| {
2458                Error::Internal(format!(
2459                    "Target variable '{}' not found",
2460                    create.to_variable
2461                ))
2462            })?;
2463
2464        // Output column for the created edge (if named)
2465        let output_column = create.variable.as_ref().map(|v| {
2466            let idx = columns.len();
2467            columns.push(v.clone());
2468            idx
2469        });
2470
2471        // Convert properties (with constant-folding for function calls like vector())
2472        let properties: Vec<(String, PropertySource)> = create
2473            .properties
2474            .iter()
2475            .map(|(name, expr)| {
2476                let source = match Self::try_fold_expression(expr) {
2477                    Some(value) => PropertySource::Constant(value),
2478                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2479                };
2480                (name.clone(), source)
2481            })
2482            .collect();
2483
2484        let output_schema = self.derive_schema_from_columns(&columns);
2485
2486        let mut operator = CreateEdgeOperator::new(
2487            Arc::clone(&self.store),
2488            input_op,
2489            from_column,
2490            to_column,
2491            create.edge_type.clone(),
2492            output_schema,
2493        )
2494        .with_properties(properties)
2495        .with_tx_context(self.viewing_epoch, self.tx_id);
2496
2497        if let Some(col) = output_column {
2498            operator = operator.with_output_column(col);
2499        }
2500
2501        let operator = Box::new(operator);
2502
2503        Ok((operator, columns))
2504    }
2505
2506    /// Plans a DELETE NODE operator.
2507    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2508        let (input_op, columns) = self.plan_operator(&delete.input)?;
2509
2510        let node_column = columns
2511            .iter()
2512            .position(|c| c == &delete.variable)
2513            .ok_or_else(|| {
2514                Error::Internal(format!(
2515                    "Variable '{}' not found for delete",
2516                    delete.variable
2517                ))
2518            })?;
2519
2520        // Output schema for delete count
2521        let output_schema = vec![LogicalType::Int64];
2522        let output_columns = vec!["deleted_count".to_string()];
2523
2524        let operator = Box::new(
2525            DeleteNodeOperator::new(
2526                Arc::clone(&self.store),
2527                input_op,
2528                node_column,
2529                output_schema,
2530                delete.detach, // DETACH DELETE deletes connected edges first
2531            )
2532            .with_tx_context(self.viewing_epoch, self.tx_id),
2533        );
2534
2535        Ok((operator, output_columns))
2536    }
2537
2538    /// Plans a DELETE EDGE operator.
2539    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2540        let (input_op, columns) = self.plan_operator(&delete.input)?;
2541
2542        let edge_column = columns
2543            .iter()
2544            .position(|c| c == &delete.variable)
2545            .ok_or_else(|| {
2546                Error::Internal(format!(
2547                    "Variable '{}' not found for delete",
2548                    delete.variable
2549                ))
2550            })?;
2551
2552        // Output schema for delete count
2553        let output_schema = vec![LogicalType::Int64];
2554        let output_columns = vec!["deleted_count".to_string()];
2555
2556        let operator = Box::new(
2557            DeleteEdgeOperator::new(
2558                Arc::clone(&self.store),
2559                input_op,
2560                edge_column,
2561                output_schema,
2562            )
2563            .with_tx_context(self.viewing_epoch, self.tx_id),
2564        );
2565
2566        Ok((operator, output_columns))
2567    }
2568
2569    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
2570    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2571        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2572        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2573
2574        // Build combined output columns (left + right)
2575        let mut columns = left_columns.clone();
2576        columns.extend(right_columns.clone());
2577
2578        // Find common variables between left and right for join keys
2579        let mut probe_keys = Vec::new();
2580        let mut build_keys = Vec::new();
2581
2582        for (right_idx, right_col) in right_columns.iter().enumerate() {
2583            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2584                probe_keys.push(left_idx);
2585                build_keys.push(right_idx);
2586            }
2587        }
2588
2589        let output_schema = self.derive_schema_from_columns(&columns);
2590
2591        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2592            left_op,
2593            right_op,
2594            probe_keys,
2595            build_keys,
2596            PhysicalJoinType::Left,
2597            output_schema,
2598        ));
2599
2600        Ok((operator, columns))
2601    }
2602
2603    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
2604    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2605        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2606        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2607
2608        // Anti-join only keeps left columns (filters out matching rows)
2609        let columns = left_columns.clone();
2610
2611        // Find common variables between left and right for join keys
2612        let mut probe_keys = Vec::new();
2613        let mut build_keys = Vec::new();
2614
2615        for (right_idx, right_col) in right_columns.iter().enumerate() {
2616            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2617                probe_keys.push(left_idx);
2618                build_keys.push(right_idx);
2619            }
2620        }
2621
2622        let output_schema = self.derive_schema_from_columns(&columns);
2623
2624        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2625            left_op,
2626            right_op,
2627            probe_keys,
2628            build_keys,
2629            PhysicalJoinType::Anti,
2630            output_schema,
2631        ));
2632
2633        Ok((operator, columns))
2634    }
2635
2636    /// Plans an unwind operator.
2637    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2638        // Plan the input operator first
2639        // Handle Empty specially - use a single-row operator
2640        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2641            if matches!(&*unwind.input, LogicalOperator::Empty) {
2642                // For UNWIND without prior MATCH, create a single-row input
2643                // We need an operator that produces one row with the list to unwind
2644                // For now, use EmptyScan which produces no rows - we'll handle the literal
2645                // list in the unwind operator itself
2646                let literal_list = self.convert_expression(&unwind.expression)?;
2647
2648                // Create a project operator that produces a single row with the list
2649                let single_row_op: Box<dyn Operator> = Box::new(
2650                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2651                );
2652                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2653                    single_row_op,
2654                    vec![ProjectExpr::Expression {
2655                        expr: literal_list,
2656                        variable_columns: HashMap::new(),
2657                    }],
2658                    vec![LogicalType::Any],
2659                    Arc::clone(&self.store),
2660                ));
2661
2662                (project_op, vec!["__list__".to_string()])
2663            } else {
2664                self.plan_operator(&unwind.input)?
2665            };
2666
2667        // The UNWIND expression should be a list - we need to find/evaluate it
2668        // For now, we handle the case where the expression references an existing column
2669        // or is a literal list
2670
2671        // Find if the expression references an existing column (like a list property)
2672        let list_col_idx = match &unwind.expression {
2673            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2674            LogicalExpression::Property { variable, .. } => {
2675                // Property access needs to be evaluated - for now we'll need the filter predicate
2676                // to evaluate this. For simple cases, we treat it as a list column.
2677                input_columns.iter().position(|c| c == variable)
2678            }
2679            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2680                // Literal list expression - we'll add it as a virtual column
2681                None
2682            }
2683            _ => None,
2684        };
2685
2686        // Build output columns: all input columns plus the new variable
2687        let mut columns = input_columns.clone();
2688        columns.push(unwind.variable.clone());
2689
2690        // Build output schema
2691        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2692        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2693
2694        // Use the list column index if found, otherwise default to 0
2695        // (in which case the first column should contain the list)
2696        let col_idx = list_col_idx.unwrap_or(0);
2697
2698        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2699            input_op,
2700            col_idx,
2701            unwind.variable.clone(),
2702            output_schema,
2703        ));
2704
2705        Ok((operator, columns))
2706    }
2707
2708    /// Plans a MERGE operator.
2709    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2710        // Plan the input operator if present (skip if Empty)
2711        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2712            Vec::new()
2713        } else {
2714            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2715            cols
2716        };
2717
2718        // Convert match properties from LogicalExpression to Value
2719        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2720            .match_properties
2721            .iter()
2722            .filter_map(|(name, expr)| {
2723                if let LogicalExpression::Literal(v) = expr {
2724                    Some((name.clone(), v.clone()))
2725                } else {
2726                    None // Skip non-literal expressions for now
2727                }
2728            })
2729            .collect();
2730
2731        // Convert ON CREATE properties
2732        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2733            .on_create
2734            .iter()
2735            .filter_map(|(name, expr)| {
2736                if let LogicalExpression::Literal(v) = expr {
2737                    Some((name.clone(), v.clone()))
2738                } else {
2739                    None
2740                }
2741            })
2742            .collect();
2743
2744        // Convert ON MATCH properties
2745        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2746            .on_match
2747            .iter()
2748            .filter_map(|(name, expr)| {
2749                if let LogicalExpression::Literal(v) = expr {
2750                    Some((name.clone(), v.clone()))
2751                } else {
2752                    None
2753                }
2754            })
2755            .collect();
2756
2757        // Add the merged node variable to output columns
2758        columns.push(merge.variable.clone());
2759
2760        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2761            Arc::clone(&self.store),
2762            merge.variable.clone(),
2763            merge.labels.clone(),
2764            match_properties,
2765            on_create_properties,
2766            on_match_properties,
2767        ));
2768
2769        Ok((operator, columns))
2770    }
2771
2772    /// Plans a SHORTEST PATH operator.
2773    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2774        // Plan the input operator
2775        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2776
2777        // Find source and target node columns
2778        let source_column = columns
2779            .iter()
2780            .position(|c| c == &sp.source_var)
2781            .ok_or_else(|| {
2782                Error::Internal(format!(
2783                    "Source variable '{}' not found for shortestPath",
2784                    sp.source_var
2785                ))
2786            })?;
2787
2788        let target_column = columns
2789            .iter()
2790            .position(|c| c == &sp.target_var)
2791            .ok_or_else(|| {
2792                Error::Internal(format!(
2793                    "Target variable '{}' not found for shortestPath",
2794                    sp.target_var
2795                ))
2796            })?;
2797
2798        // Convert direction
2799        let direction = match sp.direction {
2800            ExpandDirection::Outgoing => Direction::Outgoing,
2801            ExpandDirection::Incoming => Direction::Incoming,
2802            ExpandDirection::Both => Direction::Both,
2803        };
2804
2805        // Create the shortest path operator
2806        let operator: Box<dyn Operator> = Box::new(
2807            ShortestPathOperator::new(
2808                Arc::clone(&self.store),
2809                input_op,
2810                source_column,
2811                target_column,
2812                sp.edge_type.clone(),
2813                direction,
2814            )
2815            .with_all_paths(sp.all_paths),
2816        );
2817
2818        // Add path length column with the expected naming convention
2819        // The translator expects _path_length_{alias} format for length(p) calls
2820        columns.push(format!("_path_length_{}", sp.path_alias));
2821
2822        Ok((operator, columns))
2823    }
2824
2825    /// Plans an ADD LABEL operator.
2826    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2827        let (input_op, columns) = self.plan_operator(&add_label.input)?;
2828
2829        // Find the node column
2830        let node_column = columns
2831            .iter()
2832            .position(|c| c == &add_label.variable)
2833            .ok_or_else(|| {
2834                Error::Internal(format!(
2835                    "Variable '{}' not found for ADD LABEL",
2836                    add_label.variable
2837                ))
2838            })?;
2839
2840        // Output schema for update count
2841        let output_schema = vec![LogicalType::Int64];
2842        let output_columns = vec!["labels_added".to_string()];
2843
2844        let operator = Box::new(AddLabelOperator::new(
2845            Arc::clone(&self.store),
2846            input_op,
2847            node_column,
2848            add_label.labels.clone(),
2849            output_schema,
2850        ));
2851
2852        Ok((operator, output_columns))
2853    }
2854
2855    /// Plans a REMOVE LABEL operator.
2856    fn plan_remove_label(
2857        &self,
2858        remove_label: &RemoveLabelOp,
2859    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2860        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2861
2862        // Find the node column
2863        let node_column = columns
2864            .iter()
2865            .position(|c| c == &remove_label.variable)
2866            .ok_or_else(|| {
2867                Error::Internal(format!(
2868                    "Variable '{}' not found for REMOVE LABEL",
2869                    remove_label.variable
2870                ))
2871            })?;
2872
2873        // Output schema for update count
2874        let output_schema = vec![LogicalType::Int64];
2875        let output_columns = vec!["labels_removed".to_string()];
2876
2877        let operator = Box::new(RemoveLabelOperator::new(
2878            Arc::clone(&self.store),
2879            input_op,
2880            node_column,
2881            remove_label.labels.clone(),
2882            output_schema,
2883        ));
2884
2885        Ok((operator, output_columns))
2886    }
2887
2888    /// Plans a SET PROPERTY operator.
2889    fn plan_set_property(
2890        &self,
2891        set_prop: &SetPropertyOp,
2892    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2893        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2894
2895        // Find the entity column (node or edge variable)
2896        let entity_column = columns
2897            .iter()
2898            .position(|c| c == &set_prop.variable)
2899            .ok_or_else(|| {
2900                Error::Internal(format!(
2901                    "Variable '{}' not found for SET",
2902                    set_prop.variable
2903                ))
2904            })?;
2905
2906        // Convert properties to PropertySource
2907        let properties: Vec<(String, PropertySource)> = set_prop
2908            .properties
2909            .iter()
2910            .map(|(name, expr)| {
2911                let source = self.expression_to_property_source(expr, &columns)?;
2912                Ok((name.clone(), source))
2913            })
2914            .collect::<Result<Vec<_>>>()?;
2915
2916        // Output schema preserves input schema (passes through)
2917        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2918        let output_columns = columns.clone();
2919
2920        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
2921        let operator = Box::new(SetPropertyOperator::new_for_node(
2922            Arc::clone(&self.store),
2923            input_op,
2924            entity_column,
2925            properties,
2926            output_schema,
2927        ));
2928
2929        Ok((operator, output_columns))
2930    }
2931
2932    /// Converts a logical expression to a PropertySource.
2933    fn expression_to_property_source(
2934        &self,
2935        expr: &LogicalExpression,
2936        columns: &[String],
2937    ) -> Result<PropertySource> {
2938        match expr {
2939            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2940            LogicalExpression::Variable(name) => {
2941                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2942                    Error::Internal(format!("Variable '{}' not found for property source", name))
2943                })?;
2944                Ok(PropertySource::Column(col_idx))
2945            }
2946            LogicalExpression::Parameter(name) => {
2947                // Parameters should be resolved before planning
2948                // For now, treat as a placeholder
2949                Ok(PropertySource::Constant(
2950                    grafeo_common::types::Value::String(format!("${}", name).into()),
2951                ))
2952            }
2953            _ => {
2954                if let Some(value) = Self::try_fold_expression(expr) {
2955                    Ok(PropertySource::Constant(value))
2956                } else {
2957                    Err(Error::Internal(format!(
2958                        "Unsupported expression type for property source: {:?}",
2959                        expr
2960                    )))
2961                }
2962            }
2963        }
2964    }
2965
2966    /// Tries to evaluate a constant expression at plan time.
2967    ///
2968    /// Recursively folds literals, lists, and known function calls (like `vector()`)
2969    /// into concrete values. Returns `None` if the expression contains non-constant
2970    /// parts (variables, property accesses, etc.).
2971    fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
2972        match expr {
2973            LogicalExpression::Literal(v) => Some(v.clone()),
2974            LogicalExpression::List(items) => {
2975                let values: Option<Vec<Value>> =
2976                    items.iter().map(Self::try_fold_expression).collect();
2977                let values = values?;
2978                // All-numeric lists become vectors (matches Python list[float] behavior)
2979                let all_numeric = !values.is_empty()
2980                    && values
2981                        .iter()
2982                        .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
2983                if all_numeric {
2984                    let floats: Vec<f32> = values
2985                        .iter()
2986                        .filter_map(|v| match v {
2987                            Value::Float64(f) => Some(*f as f32),
2988                            Value::Int64(i) => Some(*i as f32),
2989                            _ => None,
2990                        })
2991                        .collect();
2992                    Some(Value::Vector(floats.into()))
2993                } else {
2994                    Some(Value::List(values.into()))
2995                }
2996            }
2997            LogicalExpression::FunctionCall { name, args, .. } => {
2998                match name.to_lowercase().as_str() {
2999                    "vector" => {
3000                        if args.len() != 1 {
3001                            return None;
3002                        }
3003                        let val = Self::try_fold_expression(&args[0])?;
3004                        match val {
3005                            Value::List(items) => {
3006                                let floats: Vec<f32> = items
3007                                    .iter()
3008                                    .filter_map(|v| match v {
3009                                        Value::Float64(f) => Some(*f as f32),
3010                                        Value::Int64(i) => Some(*i as f32),
3011                                        _ => None,
3012                                    })
3013                                    .collect();
3014                                if floats.len() == items.len() {
3015                                    Some(Value::Vector(floats.into()))
3016                                } else {
3017                                    None
3018                                }
3019                            }
3020                            // Already a vector (from all-numeric list folding)
3021                            Value::Vector(v) => Some(Value::Vector(v)),
3022                            _ => None,
3023                        }
3024                    }
3025                    _ => None,
3026                }
3027            }
3028            _ => None,
3029        }
3030    }
3031}
3032
3033/// Converts a logical binary operator to a filter binary operator.
3034pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3035    match op {
3036        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3037        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3038        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3039        BinaryOp::Le => Ok(BinaryFilterOp::Le),
3040        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3041        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3042        BinaryOp::And => Ok(BinaryFilterOp::And),
3043        BinaryOp::Or => Ok(BinaryFilterOp::Or),
3044        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3045        BinaryOp::Add => Ok(BinaryFilterOp::Add),
3046        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3047        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3048        BinaryOp::Div => Ok(BinaryFilterOp::Div),
3049        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3050        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3051        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3052        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3053        BinaryOp::In => Ok(BinaryFilterOp::In),
3054        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3055        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3056        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3057            "Binary operator {:?} not yet supported in filters",
3058            op
3059        ))),
3060    }
3061}
3062
3063/// Converts a logical unary operator to a filter unary operator.
3064pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3065    match op {
3066        UnaryOp::Not => Ok(UnaryFilterOp::Not),
3067        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3068        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3069        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3070    }
3071}
3072
3073/// Converts a logical aggregate function to a physical aggregate function.
3074pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3075    match func {
3076        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3077        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3078        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3079        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3080        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3081        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3082        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3083        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3084        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3085        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3086        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3087    }
3088}
3089
3090/// Converts a logical expression to a filter expression.
3091///
3092/// This is a standalone function that can be used by both LPG and RDF planners.
3093pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3094    match expr {
3095        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3096        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3097        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3098            variable: variable.clone(),
3099            property: property.clone(),
3100        }),
3101        LogicalExpression::Binary { left, op, right } => {
3102            let left_expr = convert_filter_expression(left)?;
3103            let right_expr = convert_filter_expression(right)?;
3104            let filter_op = convert_binary_op(*op)?;
3105            Ok(FilterExpression::Binary {
3106                left: Box::new(left_expr),
3107                op: filter_op,
3108                right: Box::new(right_expr),
3109            })
3110        }
3111        LogicalExpression::Unary { op, operand } => {
3112            let operand_expr = convert_filter_expression(operand)?;
3113            let filter_op = convert_unary_op(*op)?;
3114            Ok(FilterExpression::Unary {
3115                op: filter_op,
3116                operand: Box::new(operand_expr),
3117            })
3118        }
3119        LogicalExpression::FunctionCall { name, args, .. } => {
3120            let filter_args: Vec<FilterExpression> = args
3121                .iter()
3122                .map(convert_filter_expression)
3123                .collect::<Result<Vec<_>>>()?;
3124            Ok(FilterExpression::FunctionCall {
3125                name: name.clone(),
3126                args: filter_args,
3127            })
3128        }
3129        LogicalExpression::Case {
3130            operand,
3131            when_clauses,
3132            else_clause,
3133        } => {
3134            let filter_operand = operand
3135                .as_ref()
3136                .map(|e| convert_filter_expression(e))
3137                .transpose()?
3138                .map(Box::new);
3139            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3140                .iter()
3141                .map(|(cond, result)| {
3142                    Ok((
3143                        convert_filter_expression(cond)?,
3144                        convert_filter_expression(result)?,
3145                    ))
3146                })
3147                .collect::<Result<Vec<_>>>()?;
3148            let filter_else = else_clause
3149                .as_ref()
3150                .map(|e| convert_filter_expression(e))
3151                .transpose()?
3152                .map(Box::new);
3153            Ok(FilterExpression::Case {
3154                operand: filter_operand,
3155                when_clauses: filter_when_clauses,
3156                else_clause: filter_else,
3157            })
3158        }
3159        LogicalExpression::List(items) => {
3160            let filter_items: Vec<FilterExpression> = items
3161                .iter()
3162                .map(convert_filter_expression)
3163                .collect::<Result<Vec<_>>>()?;
3164            Ok(FilterExpression::List(filter_items))
3165        }
3166        LogicalExpression::Map(pairs) => {
3167            let filter_pairs: Vec<(String, FilterExpression)> = pairs
3168                .iter()
3169                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3170                .collect::<Result<Vec<_>>>()?;
3171            Ok(FilterExpression::Map(filter_pairs))
3172        }
3173        LogicalExpression::IndexAccess { base, index } => {
3174            let base_expr = convert_filter_expression(base)?;
3175            let index_expr = convert_filter_expression(index)?;
3176            Ok(FilterExpression::IndexAccess {
3177                base: Box::new(base_expr),
3178                index: Box::new(index_expr),
3179            })
3180        }
3181        LogicalExpression::SliceAccess { base, start, end } => {
3182            let base_expr = convert_filter_expression(base)?;
3183            let start_expr = start
3184                .as_ref()
3185                .map(|s| convert_filter_expression(s))
3186                .transpose()?
3187                .map(Box::new);
3188            let end_expr = end
3189                .as_ref()
3190                .map(|e| convert_filter_expression(e))
3191                .transpose()?
3192                .map(Box::new);
3193            Ok(FilterExpression::SliceAccess {
3194                base: Box::new(base_expr),
3195                start: start_expr,
3196                end: end_expr,
3197            })
3198        }
3199        LogicalExpression::Parameter(_) => Err(Error::Internal(
3200            "Parameters not yet supported in filters".to_string(),
3201        )),
3202        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3203        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3204        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3205        LogicalExpression::ListComprehension {
3206            variable,
3207            list_expr,
3208            filter_expr,
3209            map_expr,
3210        } => {
3211            let list = convert_filter_expression(list_expr)?;
3212            let filter = filter_expr
3213                .as_ref()
3214                .map(|f| convert_filter_expression(f))
3215                .transpose()?
3216                .map(Box::new);
3217            let map = convert_filter_expression(map_expr)?;
3218            Ok(FilterExpression::ListComprehension {
3219                variable: variable.clone(),
3220                list_expr: Box::new(list),
3221                filter_expr: filter,
3222                map_expr: Box::new(map),
3223            })
3224        }
3225        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3226            Error::Internal("Subqueries not yet supported in filters".to_string()),
3227        ),
3228    }
3229}
3230
3231/// Infers the logical type from a value.
3232fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3233    use grafeo_common::types::Value;
3234    match value {
3235        Value::Null => LogicalType::String, // Default type for null
3236        Value::Bool(_) => LogicalType::Bool,
3237        Value::Int64(_) => LogicalType::Int64,
3238        Value::Float64(_) => LogicalType::Float64,
3239        Value::String(_) => LogicalType::String,
3240        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
3241        Value::Timestamp(_) => LogicalType::Timestamp,
3242        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
3243        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
3244        Value::Vector(v) => LogicalType::Vector(v.len()),
3245    }
3246}
3247
3248/// Converts an expression to a string for column naming.
3249fn expression_to_string(expr: &LogicalExpression) -> String {
3250    match expr {
3251        LogicalExpression::Variable(name) => name.clone(),
3252        LogicalExpression::Property { variable, property } => {
3253            format!("{variable}.{property}")
3254        }
3255        LogicalExpression::Literal(value) => format!("{value:?}"),
3256        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3257        _ => "expr".to_string(),
3258    }
3259}
3260
3261/// A physical plan ready for execution.
3262pub struct PhysicalPlan {
3263    /// The root physical operator.
3264    pub operator: Box<dyn Operator>,
3265    /// Column names for the result.
3266    pub columns: Vec<String>,
3267    /// Adaptive execution context with cardinality estimates.
3268    ///
3269    /// When adaptive execution is enabled, this context contains estimated
3270    /// cardinalities at various checkpoints in the plan. During execution,
3271    /// actual row counts are recorded and compared against estimates.
3272    pub adaptive_context: Option<AdaptiveContext>,
3273}
3274
3275impl PhysicalPlan {
3276    /// Returns the column names.
3277    #[must_use]
3278    pub fn columns(&self) -> &[String] {
3279        &self.columns
3280    }
3281
3282    /// Consumes the plan and returns the operator.
3283    pub fn into_operator(self) -> Box<dyn Operator> {
3284        self.operator
3285    }
3286
3287    /// Returns the adaptive context, if adaptive execution is enabled.
3288    #[must_use]
3289    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3290        self.adaptive_context.as_ref()
3291    }
3292
3293    /// Takes ownership of the adaptive context.
3294    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3295        self.adaptive_context.take()
3296    }
3297}
3298
3299/// Helper operator that returns a single result chunk once.
3300///
3301/// Used by the factorized expand chain to wrap the final result.
3302#[allow(dead_code)]
3303struct SingleResultOperator {
3304    result: Option<grafeo_core::execution::DataChunk>,
3305}
3306
3307impl SingleResultOperator {
3308    #[allow(dead_code)]
3309    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3310        Self { result }
3311    }
3312}
3313
3314impl Operator for SingleResultOperator {
3315    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3316        Ok(self.result.take())
3317    }
3318
3319    fn reset(&mut self) {
3320        // Cannot reset - result is consumed
3321    }
3322
3323    fn name(&self) -> &'static str {
3324        "SingleResult"
3325    }
3326}
3327
3328#[cfg(test)]
3329mod tests {
3330    use super::*;
3331    use crate::query::plan::{
3332        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3333        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3334        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3335        SortKey, SortOp,
3336    };
3337    use grafeo_common::types::Value;
3338
3339    fn create_test_store() -> Arc<LpgStore> {
3340        let store = Arc::new(LpgStore::new());
3341        store.create_node(&["Person"]);
3342        store.create_node(&["Person"]);
3343        store.create_node(&["Company"]);
3344        store
3345    }
3346
3347    // ==================== Simple Scan Tests ====================
3348
3349    #[test]
3350    fn test_plan_simple_scan() {
3351        let store = create_test_store();
3352        let planner = Planner::new(store);
3353
3354        // MATCH (n:Person) RETURN n
3355        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3356            items: vec![ReturnItem {
3357                expression: LogicalExpression::Variable("n".to_string()),
3358                alias: None,
3359            }],
3360            distinct: false,
3361            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3362                variable: "n".to_string(),
3363                label: Some("Person".to_string()),
3364                input: None,
3365            })),
3366        }));
3367
3368        let physical = planner.plan(&logical).unwrap();
3369        assert_eq!(physical.columns(), &["n"]);
3370    }
3371
3372    #[test]
3373    fn test_plan_scan_without_label() {
3374        let store = create_test_store();
3375        let planner = Planner::new(store);
3376
3377        // MATCH (n) RETURN n
3378        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3379            items: vec![ReturnItem {
3380                expression: LogicalExpression::Variable("n".to_string()),
3381                alias: None,
3382            }],
3383            distinct: false,
3384            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3385                variable: "n".to_string(),
3386                label: None,
3387                input: None,
3388            })),
3389        }));
3390
3391        let physical = planner.plan(&logical).unwrap();
3392        assert_eq!(physical.columns(), &["n"]);
3393    }
3394
3395    #[test]
3396    fn test_plan_return_with_alias() {
3397        let store = create_test_store();
3398        let planner = Planner::new(store);
3399
3400        // MATCH (n:Person) RETURN n AS person
3401        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3402            items: vec![ReturnItem {
3403                expression: LogicalExpression::Variable("n".to_string()),
3404                alias: Some("person".to_string()),
3405            }],
3406            distinct: false,
3407            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3408                variable: "n".to_string(),
3409                label: Some("Person".to_string()),
3410                input: None,
3411            })),
3412        }));
3413
3414        let physical = planner.plan(&logical).unwrap();
3415        assert_eq!(physical.columns(), &["person"]);
3416    }
3417
3418    #[test]
3419    fn test_plan_return_property() {
3420        let store = create_test_store();
3421        let planner = Planner::new(store);
3422
3423        // MATCH (n:Person) RETURN n.name
3424        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3425            items: vec![ReturnItem {
3426                expression: LogicalExpression::Property {
3427                    variable: "n".to_string(),
3428                    property: "name".to_string(),
3429                },
3430                alias: None,
3431            }],
3432            distinct: false,
3433            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3434                variable: "n".to_string(),
3435                label: Some("Person".to_string()),
3436                input: None,
3437            })),
3438        }));
3439
3440        let physical = planner.plan(&logical).unwrap();
3441        assert_eq!(physical.columns(), &["n.name"]);
3442    }
3443
3444    #[test]
3445    fn test_plan_return_literal() {
3446        let store = create_test_store();
3447        let planner = Planner::new(store);
3448
3449        // MATCH (n) RETURN 42 AS answer
3450        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3451            items: vec![ReturnItem {
3452                expression: LogicalExpression::Literal(Value::Int64(42)),
3453                alias: Some("answer".to_string()),
3454            }],
3455            distinct: false,
3456            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3457                variable: "n".to_string(),
3458                label: None,
3459                input: None,
3460            })),
3461        }));
3462
3463        let physical = planner.plan(&logical).unwrap();
3464        assert_eq!(physical.columns(), &["answer"]);
3465    }
3466
3467    // ==================== Filter Tests ====================
3468
3469    #[test]
3470    fn test_plan_filter_equality() {
3471        let store = create_test_store();
3472        let planner = Planner::new(store);
3473
3474        // MATCH (n:Person) WHERE n.age = 30 RETURN n
3475        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3476            items: vec![ReturnItem {
3477                expression: LogicalExpression::Variable("n".to_string()),
3478                alias: None,
3479            }],
3480            distinct: false,
3481            input: Box::new(LogicalOperator::Filter(FilterOp {
3482                predicate: LogicalExpression::Binary {
3483                    left: Box::new(LogicalExpression::Property {
3484                        variable: "n".to_string(),
3485                        property: "age".to_string(),
3486                    }),
3487                    op: BinaryOp::Eq,
3488                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3489                },
3490                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3491                    variable: "n".to_string(),
3492                    label: Some("Person".to_string()),
3493                    input: None,
3494                })),
3495            })),
3496        }));
3497
3498        let physical = planner.plan(&logical).unwrap();
3499        assert_eq!(physical.columns(), &["n"]);
3500    }
3501
3502    #[test]
3503    fn test_plan_filter_compound_and() {
3504        let store = create_test_store();
3505        let planner = Planner::new(store);
3506
3507        // WHERE n.age > 20 AND n.age < 40
3508        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3509            items: vec![ReturnItem {
3510                expression: LogicalExpression::Variable("n".to_string()),
3511                alias: None,
3512            }],
3513            distinct: false,
3514            input: Box::new(LogicalOperator::Filter(FilterOp {
3515                predicate: LogicalExpression::Binary {
3516                    left: Box::new(LogicalExpression::Binary {
3517                        left: Box::new(LogicalExpression::Property {
3518                            variable: "n".to_string(),
3519                            property: "age".to_string(),
3520                        }),
3521                        op: BinaryOp::Gt,
3522                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3523                    }),
3524                    op: BinaryOp::And,
3525                    right: Box::new(LogicalExpression::Binary {
3526                        left: Box::new(LogicalExpression::Property {
3527                            variable: "n".to_string(),
3528                            property: "age".to_string(),
3529                        }),
3530                        op: BinaryOp::Lt,
3531                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3532                    }),
3533                },
3534                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3535                    variable: "n".to_string(),
3536                    label: None,
3537                    input: None,
3538                })),
3539            })),
3540        }));
3541
3542        let physical = planner.plan(&logical).unwrap();
3543        assert_eq!(physical.columns(), &["n"]);
3544    }
3545
3546    #[test]
3547    fn test_plan_filter_unary_not() {
3548        let store = create_test_store();
3549        let planner = Planner::new(store);
3550
3551        // WHERE NOT n.active
3552        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3553            items: vec![ReturnItem {
3554                expression: LogicalExpression::Variable("n".to_string()),
3555                alias: None,
3556            }],
3557            distinct: false,
3558            input: Box::new(LogicalOperator::Filter(FilterOp {
3559                predicate: LogicalExpression::Unary {
3560                    op: UnaryOp::Not,
3561                    operand: Box::new(LogicalExpression::Property {
3562                        variable: "n".to_string(),
3563                        property: "active".to_string(),
3564                    }),
3565                },
3566                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3567                    variable: "n".to_string(),
3568                    label: None,
3569                    input: None,
3570                })),
3571            })),
3572        }));
3573
3574        let physical = planner.plan(&logical).unwrap();
3575        assert_eq!(physical.columns(), &["n"]);
3576    }
3577
3578    #[test]
3579    fn test_plan_filter_is_null() {
3580        let store = create_test_store();
3581        let planner = Planner::new(store);
3582
3583        // WHERE n.email IS NULL
3584        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3585            items: vec![ReturnItem {
3586                expression: LogicalExpression::Variable("n".to_string()),
3587                alias: None,
3588            }],
3589            distinct: false,
3590            input: Box::new(LogicalOperator::Filter(FilterOp {
3591                predicate: LogicalExpression::Unary {
3592                    op: UnaryOp::IsNull,
3593                    operand: Box::new(LogicalExpression::Property {
3594                        variable: "n".to_string(),
3595                        property: "email".to_string(),
3596                    }),
3597                },
3598                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3599                    variable: "n".to_string(),
3600                    label: None,
3601                    input: None,
3602                })),
3603            })),
3604        }));
3605
3606        let physical = planner.plan(&logical).unwrap();
3607        assert_eq!(physical.columns(), &["n"]);
3608    }
3609
3610    #[test]
3611    fn test_plan_filter_function_call() {
3612        let store = create_test_store();
3613        let planner = Planner::new(store);
3614
3615        // WHERE size(n.friends) > 0
3616        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3617            items: vec![ReturnItem {
3618                expression: LogicalExpression::Variable("n".to_string()),
3619                alias: None,
3620            }],
3621            distinct: false,
3622            input: Box::new(LogicalOperator::Filter(FilterOp {
3623                predicate: LogicalExpression::Binary {
3624                    left: Box::new(LogicalExpression::FunctionCall {
3625                        name: "size".to_string(),
3626                        args: vec![LogicalExpression::Property {
3627                            variable: "n".to_string(),
3628                            property: "friends".to_string(),
3629                        }],
3630                        distinct: false,
3631                    }),
3632                    op: BinaryOp::Gt,
3633                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3634                },
3635                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3636                    variable: "n".to_string(),
3637                    label: None,
3638                    input: None,
3639                })),
3640            })),
3641        }));
3642
3643        let physical = planner.plan(&logical).unwrap();
3644        assert_eq!(physical.columns(), &["n"]);
3645    }
3646
3647    // ==================== Expand Tests ====================
3648
3649    #[test]
3650    fn test_plan_expand_outgoing() {
3651        let store = create_test_store();
3652        let planner = Planner::new(store);
3653
3654        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
3655        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3656            items: vec![
3657                ReturnItem {
3658                    expression: LogicalExpression::Variable("a".to_string()),
3659                    alias: None,
3660                },
3661                ReturnItem {
3662                    expression: LogicalExpression::Variable("b".to_string()),
3663                    alias: None,
3664                },
3665            ],
3666            distinct: false,
3667            input: Box::new(LogicalOperator::Expand(ExpandOp {
3668                from_variable: "a".to_string(),
3669                to_variable: "b".to_string(),
3670                edge_variable: None,
3671                direction: ExpandDirection::Outgoing,
3672                edge_type: Some("KNOWS".to_string()),
3673                min_hops: 1,
3674                max_hops: Some(1),
3675                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3676                    variable: "a".to_string(),
3677                    label: Some("Person".to_string()),
3678                    input: None,
3679                })),
3680                path_alias: None,
3681            })),
3682        }));
3683
3684        let physical = planner.plan(&logical).unwrap();
3685        // The return should have columns [a, b]
3686        assert!(physical.columns().contains(&"a".to_string()));
3687        assert!(physical.columns().contains(&"b".to_string()));
3688    }
3689
3690    #[test]
3691    fn test_plan_expand_with_edge_variable() {
3692        let store = create_test_store();
3693        let planner = Planner::new(store);
3694
3695        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
3696        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3697            items: vec![
3698                ReturnItem {
3699                    expression: LogicalExpression::Variable("a".to_string()),
3700                    alias: None,
3701                },
3702                ReturnItem {
3703                    expression: LogicalExpression::Variable("r".to_string()),
3704                    alias: None,
3705                },
3706                ReturnItem {
3707                    expression: LogicalExpression::Variable("b".to_string()),
3708                    alias: None,
3709                },
3710            ],
3711            distinct: false,
3712            input: Box::new(LogicalOperator::Expand(ExpandOp {
3713                from_variable: "a".to_string(),
3714                to_variable: "b".to_string(),
3715                edge_variable: Some("r".to_string()),
3716                direction: ExpandDirection::Outgoing,
3717                edge_type: Some("KNOWS".to_string()),
3718                min_hops: 1,
3719                max_hops: Some(1),
3720                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3721                    variable: "a".to_string(),
3722                    label: None,
3723                    input: None,
3724                })),
3725                path_alias: None,
3726            })),
3727        }));
3728
3729        let physical = planner.plan(&logical).unwrap();
3730        assert!(physical.columns().contains(&"a".to_string()));
3731        assert!(physical.columns().contains(&"r".to_string()));
3732        assert!(physical.columns().contains(&"b".to_string()));
3733    }
3734
3735    // ==================== Limit/Skip/Sort Tests ====================
3736
3737    #[test]
3738    fn test_plan_limit() {
3739        let store = create_test_store();
3740        let planner = Planner::new(store);
3741
3742        // MATCH (n) RETURN n LIMIT 10
3743        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3744            items: vec![ReturnItem {
3745                expression: LogicalExpression::Variable("n".to_string()),
3746                alias: None,
3747            }],
3748            distinct: false,
3749            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3750                count: 10,
3751                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3752                    variable: "n".to_string(),
3753                    label: None,
3754                    input: None,
3755                })),
3756            })),
3757        }));
3758
3759        let physical = planner.plan(&logical).unwrap();
3760        assert_eq!(physical.columns(), &["n"]);
3761    }
3762
3763    #[test]
3764    fn test_plan_skip() {
3765        let store = create_test_store();
3766        let planner = Planner::new(store);
3767
3768        // MATCH (n) RETURN n SKIP 5
3769        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3770            items: vec![ReturnItem {
3771                expression: LogicalExpression::Variable("n".to_string()),
3772                alias: None,
3773            }],
3774            distinct: false,
3775            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3776                count: 5,
3777                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3778                    variable: "n".to_string(),
3779                    label: None,
3780                    input: None,
3781                })),
3782            })),
3783        }));
3784
3785        let physical = planner.plan(&logical).unwrap();
3786        assert_eq!(physical.columns(), &["n"]);
3787    }
3788
3789    #[test]
3790    fn test_plan_sort() {
3791        let store = create_test_store();
3792        let planner = Planner::new(store);
3793
3794        // MATCH (n) RETURN n ORDER BY n.name ASC
3795        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3796            items: vec![ReturnItem {
3797                expression: LogicalExpression::Variable("n".to_string()),
3798                alias: None,
3799            }],
3800            distinct: false,
3801            input: Box::new(LogicalOperator::Sort(SortOp {
3802                keys: vec![SortKey {
3803                    expression: LogicalExpression::Variable("n".to_string()),
3804                    order: SortOrder::Ascending,
3805                }],
3806                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3807                    variable: "n".to_string(),
3808                    label: None,
3809                    input: None,
3810                })),
3811            })),
3812        }));
3813
3814        let physical = planner.plan(&logical).unwrap();
3815        assert_eq!(physical.columns(), &["n"]);
3816    }
3817
3818    #[test]
3819    fn test_plan_sort_descending() {
3820        let store = create_test_store();
3821        let planner = Planner::new(store);
3822
3823        // ORDER BY n DESC
3824        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3825            items: vec![ReturnItem {
3826                expression: LogicalExpression::Variable("n".to_string()),
3827                alias: None,
3828            }],
3829            distinct: false,
3830            input: Box::new(LogicalOperator::Sort(SortOp {
3831                keys: vec![SortKey {
3832                    expression: LogicalExpression::Variable("n".to_string()),
3833                    order: SortOrder::Descending,
3834                }],
3835                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3836                    variable: "n".to_string(),
3837                    label: None,
3838                    input: None,
3839                })),
3840            })),
3841        }));
3842
3843        let physical = planner.plan(&logical).unwrap();
3844        assert_eq!(physical.columns(), &["n"]);
3845    }
3846
3847    #[test]
3848    fn test_plan_distinct() {
3849        let store = create_test_store();
3850        let planner = Planner::new(store);
3851
3852        // MATCH (n) RETURN DISTINCT n
3853        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3854            items: vec![ReturnItem {
3855                expression: LogicalExpression::Variable("n".to_string()),
3856                alias: None,
3857            }],
3858            distinct: false,
3859            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3860                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3861                    variable: "n".to_string(),
3862                    label: None,
3863                    input: None,
3864                })),
3865                columns: None,
3866            })),
3867        }));
3868
3869        let physical = planner.plan(&logical).unwrap();
3870        assert_eq!(physical.columns(), &["n"]);
3871    }
3872
3873    // ==================== Aggregate Tests ====================
3874
3875    #[test]
3876    fn test_plan_aggregate_count() {
3877        let store = create_test_store();
3878        let planner = Planner::new(store);
3879
3880        // MATCH (n) RETURN count(n)
3881        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3882            items: vec![ReturnItem {
3883                expression: LogicalExpression::Variable("cnt".to_string()),
3884                alias: None,
3885            }],
3886            distinct: false,
3887            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3888                group_by: vec![],
3889                aggregates: vec![LogicalAggregateExpr {
3890                    function: LogicalAggregateFunction::Count,
3891                    expression: Some(LogicalExpression::Variable("n".to_string())),
3892                    distinct: false,
3893                    alias: Some("cnt".to_string()),
3894                    percentile: None,
3895                }],
3896                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3897                    variable: "n".to_string(),
3898                    label: None,
3899                    input: None,
3900                })),
3901                having: None,
3902            })),
3903        }));
3904
3905        let physical = planner.plan(&logical).unwrap();
3906        assert!(physical.columns().contains(&"cnt".to_string()));
3907    }
3908
3909    #[test]
3910    fn test_plan_aggregate_with_group_by() {
3911        let store = create_test_store();
3912        let planner = Planner::new(store);
3913
3914        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
3915        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3916            group_by: vec![LogicalExpression::Property {
3917                variable: "n".to_string(),
3918                property: "city".to_string(),
3919            }],
3920            aggregates: vec![LogicalAggregateExpr {
3921                function: LogicalAggregateFunction::Count,
3922                expression: Some(LogicalExpression::Variable("n".to_string())),
3923                distinct: false,
3924                alias: Some("cnt".to_string()),
3925                percentile: None,
3926            }],
3927            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3928                variable: "n".to_string(),
3929                label: Some("Person".to_string()),
3930                input: None,
3931            })),
3932            having: None,
3933        }));
3934
3935        let physical = planner.plan(&logical).unwrap();
3936        assert_eq!(physical.columns().len(), 2);
3937    }
3938
3939    #[test]
3940    fn test_plan_aggregate_sum() {
3941        let store = create_test_store();
3942        let planner = Planner::new(store);
3943
3944        // SUM(n.value)
3945        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3946            group_by: vec![],
3947            aggregates: vec![LogicalAggregateExpr {
3948                function: LogicalAggregateFunction::Sum,
3949                expression: Some(LogicalExpression::Property {
3950                    variable: "n".to_string(),
3951                    property: "value".to_string(),
3952                }),
3953                distinct: false,
3954                alias: Some("total".to_string()),
3955                percentile: None,
3956            }],
3957            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3958                variable: "n".to_string(),
3959                label: None,
3960                input: None,
3961            })),
3962            having: None,
3963        }));
3964
3965        let physical = planner.plan(&logical).unwrap();
3966        assert!(physical.columns().contains(&"total".to_string()));
3967    }
3968
3969    #[test]
3970    fn test_plan_aggregate_avg() {
3971        let store = create_test_store();
3972        let planner = Planner::new(store);
3973
3974        // AVG(n.score)
3975        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3976            group_by: vec![],
3977            aggregates: vec![LogicalAggregateExpr {
3978                function: LogicalAggregateFunction::Avg,
3979                expression: Some(LogicalExpression::Property {
3980                    variable: "n".to_string(),
3981                    property: "score".to_string(),
3982                }),
3983                distinct: false,
3984                alias: Some("average".to_string()),
3985                percentile: None,
3986            }],
3987            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3988                variable: "n".to_string(),
3989                label: None,
3990                input: None,
3991            })),
3992            having: None,
3993        }));
3994
3995        let physical = planner.plan(&logical).unwrap();
3996        assert!(physical.columns().contains(&"average".to_string()));
3997    }
3998
3999    #[test]
4000    fn test_plan_aggregate_min_max() {
4001        let store = create_test_store();
4002        let planner = Planner::new(store);
4003
4004        // MIN(n.age), MAX(n.age)
4005        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4006            group_by: vec![],
4007            aggregates: vec![
4008                LogicalAggregateExpr {
4009                    function: LogicalAggregateFunction::Min,
4010                    expression: Some(LogicalExpression::Property {
4011                        variable: "n".to_string(),
4012                        property: "age".to_string(),
4013                    }),
4014                    distinct: false,
4015                    alias: Some("youngest".to_string()),
4016                    percentile: None,
4017                },
4018                LogicalAggregateExpr {
4019                    function: LogicalAggregateFunction::Max,
4020                    expression: Some(LogicalExpression::Property {
4021                        variable: "n".to_string(),
4022                        property: "age".to_string(),
4023                    }),
4024                    distinct: false,
4025                    alias: Some("oldest".to_string()),
4026                    percentile: None,
4027                },
4028            ],
4029            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4030                variable: "n".to_string(),
4031                label: None,
4032                input: None,
4033            })),
4034            having: None,
4035        }));
4036
4037        let physical = planner.plan(&logical).unwrap();
4038        assert!(physical.columns().contains(&"youngest".to_string()));
4039        assert!(physical.columns().contains(&"oldest".to_string()));
4040    }
4041
4042    // ==================== Join Tests ====================
4043
4044    #[test]
4045    fn test_plan_inner_join() {
4046        let store = create_test_store();
4047        let planner = Planner::new(store);
4048
4049        // Inner join between two scans
4050        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4051            items: vec![
4052                ReturnItem {
4053                    expression: LogicalExpression::Variable("a".to_string()),
4054                    alias: None,
4055                },
4056                ReturnItem {
4057                    expression: LogicalExpression::Variable("b".to_string()),
4058                    alias: None,
4059                },
4060            ],
4061            distinct: false,
4062            input: Box::new(LogicalOperator::Join(JoinOp {
4063                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4064                    variable: "a".to_string(),
4065                    label: Some("Person".to_string()),
4066                    input: None,
4067                })),
4068                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4069                    variable: "b".to_string(),
4070                    label: Some("Company".to_string()),
4071                    input: None,
4072                })),
4073                join_type: JoinType::Inner,
4074                conditions: vec![JoinCondition {
4075                    left: LogicalExpression::Variable("a".to_string()),
4076                    right: LogicalExpression::Variable("b".to_string()),
4077                }],
4078            })),
4079        }));
4080
4081        let physical = planner.plan(&logical).unwrap();
4082        assert!(physical.columns().contains(&"a".to_string()));
4083        assert!(physical.columns().contains(&"b".to_string()));
4084    }
4085
4086    #[test]
4087    fn test_plan_cross_join() {
4088        let store = create_test_store();
4089        let planner = Planner::new(store);
4090
4091        // Cross join (no conditions)
4092        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4093            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4094                variable: "a".to_string(),
4095                label: None,
4096                input: None,
4097            })),
4098            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4099                variable: "b".to_string(),
4100                label: None,
4101                input: None,
4102            })),
4103            join_type: JoinType::Cross,
4104            conditions: vec![],
4105        }));
4106
4107        let physical = planner.plan(&logical).unwrap();
4108        assert_eq!(physical.columns().len(), 2);
4109    }
4110
4111    #[test]
4112    fn test_plan_left_join() {
4113        let store = create_test_store();
4114        let planner = Planner::new(store);
4115
4116        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4117            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4118                variable: "a".to_string(),
4119                label: None,
4120                input: None,
4121            })),
4122            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4123                variable: "b".to_string(),
4124                label: None,
4125                input: None,
4126            })),
4127            join_type: JoinType::Left,
4128            conditions: vec![],
4129        }));
4130
4131        let physical = planner.plan(&logical).unwrap();
4132        assert_eq!(physical.columns().len(), 2);
4133    }
4134
4135    // ==================== Mutation Tests ====================
4136
4137    #[test]
4138    fn test_plan_create_node() {
4139        let store = create_test_store();
4140        let planner = Planner::new(store);
4141
4142        // CREATE (n:Person {name: 'Alice'})
4143        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4144            variable: "n".to_string(),
4145            labels: vec!["Person".to_string()],
4146            properties: vec![(
4147                "name".to_string(),
4148                LogicalExpression::Literal(Value::String("Alice".into())),
4149            )],
4150            input: None,
4151        }));
4152
4153        let physical = planner.plan(&logical).unwrap();
4154        assert!(physical.columns().contains(&"n".to_string()));
4155    }
4156
4157    #[test]
4158    fn test_plan_create_edge() {
4159        let store = create_test_store();
4160        let planner = Planner::new(store);
4161
4162        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
4163        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4164            variable: Some("r".to_string()),
4165            from_variable: "a".to_string(),
4166            to_variable: "b".to_string(),
4167            edge_type: "KNOWS".to_string(),
4168            properties: vec![],
4169            input: Box::new(LogicalOperator::Join(JoinOp {
4170                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4171                    variable: "a".to_string(),
4172                    label: None,
4173                    input: None,
4174                })),
4175                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4176                    variable: "b".to_string(),
4177                    label: None,
4178                    input: None,
4179                })),
4180                join_type: JoinType::Cross,
4181                conditions: vec![],
4182            })),
4183        }));
4184
4185        let physical = planner.plan(&logical).unwrap();
4186        assert!(physical.columns().contains(&"r".to_string()));
4187    }
4188
4189    #[test]
4190    fn test_plan_delete_node() {
4191        let store = create_test_store();
4192        let planner = Planner::new(store);
4193
4194        // MATCH (n) DELETE n
4195        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4196            variable: "n".to_string(),
4197            detach: false,
4198            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4199                variable: "n".to_string(),
4200                label: None,
4201                input: None,
4202            })),
4203        }));
4204
4205        let physical = planner.plan(&logical).unwrap();
4206        assert!(physical.columns().contains(&"deleted_count".to_string()));
4207    }
4208
4209    // ==================== Error Cases ====================
4210
4211    #[test]
4212    fn test_plan_empty_errors() {
4213        let store = create_test_store();
4214        let planner = Planner::new(store);
4215
4216        let logical = LogicalPlan::new(LogicalOperator::Empty);
4217        let result = planner.plan(&logical);
4218        assert!(result.is_err());
4219    }
4220
4221    #[test]
4222    fn test_plan_missing_variable_in_return() {
4223        let store = create_test_store();
4224        let planner = Planner::new(store);
4225
4226        // Return variable that doesn't exist in input
4227        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4228            items: vec![ReturnItem {
4229                expression: LogicalExpression::Variable("missing".to_string()),
4230                alias: None,
4231            }],
4232            distinct: false,
4233            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4234                variable: "n".to_string(),
4235                label: None,
4236                input: None,
4237            })),
4238        }));
4239
4240        let result = planner.plan(&logical);
4241        assert!(result.is_err());
4242    }
4243
4244    // ==================== Helper Function Tests ====================
4245
4246    #[test]
4247    fn test_convert_binary_ops() {
4248        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4249        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4250        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4251        assert!(convert_binary_op(BinaryOp::Le).is_ok());
4252        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4253        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4254        assert!(convert_binary_op(BinaryOp::And).is_ok());
4255        assert!(convert_binary_op(BinaryOp::Or).is_ok());
4256        assert!(convert_binary_op(BinaryOp::Add).is_ok());
4257        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4258        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4259        assert!(convert_binary_op(BinaryOp::Div).is_ok());
4260    }
4261
4262    #[test]
4263    fn test_convert_unary_ops() {
4264        assert!(convert_unary_op(UnaryOp::Not).is_ok());
4265        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4266        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4267        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4268    }
4269
4270    #[test]
4271    fn test_convert_aggregate_functions() {
4272        assert!(matches!(
4273            convert_aggregate_function(LogicalAggregateFunction::Count),
4274            PhysicalAggregateFunction::Count
4275        ));
4276        assert!(matches!(
4277            convert_aggregate_function(LogicalAggregateFunction::Sum),
4278            PhysicalAggregateFunction::Sum
4279        ));
4280        assert!(matches!(
4281            convert_aggregate_function(LogicalAggregateFunction::Avg),
4282            PhysicalAggregateFunction::Avg
4283        ));
4284        assert!(matches!(
4285            convert_aggregate_function(LogicalAggregateFunction::Min),
4286            PhysicalAggregateFunction::Min
4287        ));
4288        assert!(matches!(
4289            convert_aggregate_function(LogicalAggregateFunction::Max),
4290            PhysicalAggregateFunction::Max
4291        ));
4292    }
4293
4294    #[test]
4295    fn test_planner_accessors() {
4296        let store = create_test_store();
4297        let planner = Planner::new(Arc::clone(&store));
4298
4299        assert!(planner.tx_id().is_none());
4300        assert!(planner.tx_manager().is_none());
4301        let _ = planner.viewing_epoch(); // Just ensure it's accessible
4302    }
4303
4304    #[test]
4305    fn test_physical_plan_accessors() {
4306        let store = create_test_store();
4307        let planner = Planner::new(store);
4308
4309        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4310            variable: "n".to_string(),
4311            label: None,
4312            input: None,
4313        }));
4314
4315        let physical = planner.plan(&logical).unwrap();
4316        assert_eq!(physical.columns(), &["n"]);
4317
4318        // Test into_operator
4319        let _ = physical.into_operator();
4320    }
4321
4322    // ==================== Adaptive Planning Tests ====================
4323
4324    #[test]
4325    fn test_plan_adaptive_with_scan() {
4326        let store = create_test_store();
4327        let planner = Planner::new(store);
4328
4329        // MATCH (n:Person) RETURN n
4330        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4331            items: vec![ReturnItem {
4332                expression: LogicalExpression::Variable("n".to_string()),
4333                alias: None,
4334            }],
4335            distinct: false,
4336            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4337                variable: "n".to_string(),
4338                label: Some("Person".to_string()),
4339                input: None,
4340            })),
4341        }));
4342
4343        let physical = planner.plan_adaptive(&logical).unwrap();
4344        assert_eq!(physical.columns(), &["n"]);
4345        // Should have adaptive context with estimates
4346        assert!(physical.adaptive_context.is_some());
4347    }
4348
4349    #[test]
4350    fn test_plan_adaptive_with_filter() {
4351        let store = create_test_store();
4352        let planner = Planner::new(store);
4353
4354        // MATCH (n) WHERE n.age > 30 RETURN n
4355        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4356            items: vec![ReturnItem {
4357                expression: LogicalExpression::Variable("n".to_string()),
4358                alias: None,
4359            }],
4360            distinct: false,
4361            input: Box::new(LogicalOperator::Filter(FilterOp {
4362                predicate: LogicalExpression::Binary {
4363                    left: Box::new(LogicalExpression::Property {
4364                        variable: "n".to_string(),
4365                        property: "age".to_string(),
4366                    }),
4367                    op: BinaryOp::Gt,
4368                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4369                },
4370                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4371                    variable: "n".to_string(),
4372                    label: None,
4373                    input: None,
4374                })),
4375            })),
4376        }));
4377
4378        let physical = planner.plan_adaptive(&logical).unwrap();
4379        assert!(physical.adaptive_context.is_some());
4380    }
4381
4382    #[test]
4383    fn test_plan_adaptive_with_expand() {
4384        let store = create_test_store();
4385        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4386
4387        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
4388        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4389            items: vec![
4390                ReturnItem {
4391                    expression: LogicalExpression::Variable("a".to_string()),
4392                    alias: None,
4393                },
4394                ReturnItem {
4395                    expression: LogicalExpression::Variable("b".to_string()),
4396                    alias: None,
4397                },
4398            ],
4399            distinct: false,
4400            input: Box::new(LogicalOperator::Expand(ExpandOp {
4401                from_variable: "a".to_string(),
4402                to_variable: "b".to_string(),
4403                edge_variable: None,
4404                direction: ExpandDirection::Outgoing,
4405                edge_type: Some("KNOWS".to_string()),
4406                min_hops: 1,
4407                max_hops: Some(1),
4408                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4409                    variable: "a".to_string(),
4410                    label: None,
4411                    input: None,
4412                })),
4413                path_alias: None,
4414            })),
4415        }));
4416
4417        let physical = planner.plan_adaptive(&logical).unwrap();
4418        assert!(physical.adaptive_context.is_some());
4419    }
4420
4421    #[test]
4422    fn test_plan_adaptive_with_join() {
4423        let store = create_test_store();
4424        let planner = Planner::new(store);
4425
4426        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4427            items: vec![
4428                ReturnItem {
4429                    expression: LogicalExpression::Variable("a".to_string()),
4430                    alias: None,
4431                },
4432                ReturnItem {
4433                    expression: LogicalExpression::Variable("b".to_string()),
4434                    alias: None,
4435                },
4436            ],
4437            distinct: false,
4438            input: Box::new(LogicalOperator::Join(JoinOp {
4439                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4440                    variable: "a".to_string(),
4441                    label: None,
4442                    input: None,
4443                })),
4444                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4445                    variable: "b".to_string(),
4446                    label: None,
4447                    input: None,
4448                })),
4449                join_type: JoinType::Cross,
4450                conditions: vec![],
4451            })),
4452        }));
4453
4454        let physical = planner.plan_adaptive(&logical).unwrap();
4455        assert!(physical.adaptive_context.is_some());
4456    }
4457
4458    #[test]
4459    fn test_plan_adaptive_with_aggregate() {
4460        let store = create_test_store();
4461        let planner = Planner::new(store);
4462
4463        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4464            group_by: vec![],
4465            aggregates: vec![LogicalAggregateExpr {
4466                function: LogicalAggregateFunction::Count,
4467                expression: Some(LogicalExpression::Variable("n".to_string())),
4468                distinct: false,
4469                alias: Some("cnt".to_string()),
4470                percentile: None,
4471            }],
4472            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4473                variable: "n".to_string(),
4474                label: None,
4475                input: None,
4476            })),
4477            having: None,
4478        }));
4479
4480        let physical = planner.plan_adaptive(&logical).unwrap();
4481        assert!(physical.adaptive_context.is_some());
4482    }
4483
4484    #[test]
4485    fn test_plan_adaptive_with_distinct() {
4486        let store = create_test_store();
4487        let planner = Planner::new(store);
4488
4489        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4490            items: vec![ReturnItem {
4491                expression: LogicalExpression::Variable("n".to_string()),
4492                alias: None,
4493            }],
4494            distinct: false,
4495            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4496                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4497                    variable: "n".to_string(),
4498                    label: None,
4499                    input: None,
4500                })),
4501                columns: None,
4502            })),
4503        }));
4504
4505        let physical = planner.plan_adaptive(&logical).unwrap();
4506        assert!(physical.adaptive_context.is_some());
4507    }
4508
4509    #[test]
4510    fn test_plan_adaptive_with_limit() {
4511        let store = create_test_store();
4512        let planner = Planner::new(store);
4513
4514        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4515            items: vec![ReturnItem {
4516                expression: LogicalExpression::Variable("n".to_string()),
4517                alias: None,
4518            }],
4519            distinct: false,
4520            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4521                count: 10,
4522                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4523                    variable: "n".to_string(),
4524                    label: None,
4525                    input: None,
4526                })),
4527            })),
4528        }));
4529
4530        let physical = planner.plan_adaptive(&logical).unwrap();
4531        assert!(physical.adaptive_context.is_some());
4532    }
4533
4534    #[test]
4535    fn test_plan_adaptive_with_skip() {
4536        let store = create_test_store();
4537        let planner = Planner::new(store);
4538
4539        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4540            items: vec![ReturnItem {
4541                expression: LogicalExpression::Variable("n".to_string()),
4542                alias: None,
4543            }],
4544            distinct: false,
4545            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4546                count: 5,
4547                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4548                    variable: "n".to_string(),
4549                    label: None,
4550                    input: None,
4551                })),
4552            })),
4553        }));
4554
4555        let physical = planner.plan_adaptive(&logical).unwrap();
4556        assert!(physical.adaptive_context.is_some());
4557    }
4558
4559    #[test]
4560    fn test_plan_adaptive_with_sort() {
4561        let store = create_test_store();
4562        let planner = Planner::new(store);
4563
4564        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4565            items: vec![ReturnItem {
4566                expression: LogicalExpression::Variable("n".to_string()),
4567                alias: None,
4568            }],
4569            distinct: false,
4570            input: Box::new(LogicalOperator::Sort(SortOp {
4571                keys: vec![SortKey {
4572                    expression: LogicalExpression::Variable("n".to_string()),
4573                    order: SortOrder::Ascending,
4574                }],
4575                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4576                    variable: "n".to_string(),
4577                    label: None,
4578                    input: None,
4579                })),
4580            })),
4581        }));
4582
4583        let physical = planner.plan_adaptive(&logical).unwrap();
4584        assert!(physical.adaptive_context.is_some());
4585    }
4586
4587    #[test]
4588    fn test_plan_adaptive_with_union() {
4589        let store = create_test_store();
4590        let planner = Planner::new(store);
4591
4592        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4593            items: vec![ReturnItem {
4594                expression: LogicalExpression::Variable("n".to_string()),
4595                alias: None,
4596            }],
4597            distinct: false,
4598            input: Box::new(LogicalOperator::Union(UnionOp {
4599                inputs: vec![
4600                    LogicalOperator::NodeScan(NodeScanOp {
4601                        variable: "n".to_string(),
4602                        label: Some("Person".to_string()),
4603                        input: None,
4604                    }),
4605                    LogicalOperator::NodeScan(NodeScanOp {
4606                        variable: "n".to_string(),
4607                        label: Some("Company".to_string()),
4608                        input: None,
4609                    }),
4610                ],
4611            })),
4612        }));
4613
4614        let physical = planner.plan_adaptive(&logical).unwrap();
4615        assert!(physical.adaptive_context.is_some());
4616    }
4617
4618    // ==================== Variable Length Path Tests ====================
4619
4620    #[test]
4621    fn test_plan_expand_variable_length() {
4622        let store = create_test_store();
4623        let planner = Planner::new(store);
4624
4625        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
4626        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4627            items: vec![
4628                ReturnItem {
4629                    expression: LogicalExpression::Variable("a".to_string()),
4630                    alias: None,
4631                },
4632                ReturnItem {
4633                    expression: LogicalExpression::Variable("b".to_string()),
4634                    alias: None,
4635                },
4636            ],
4637            distinct: false,
4638            input: Box::new(LogicalOperator::Expand(ExpandOp {
4639                from_variable: "a".to_string(),
4640                to_variable: "b".to_string(),
4641                edge_variable: None,
4642                direction: ExpandDirection::Outgoing,
4643                edge_type: Some("KNOWS".to_string()),
4644                min_hops: 1,
4645                max_hops: Some(3),
4646                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4647                    variable: "a".to_string(),
4648                    label: None,
4649                    input: None,
4650                })),
4651                path_alias: None,
4652            })),
4653        }));
4654
4655        let physical = planner.plan(&logical).unwrap();
4656        assert!(physical.columns().contains(&"a".to_string()));
4657        assert!(physical.columns().contains(&"b".to_string()));
4658    }
4659
4660    #[test]
4661    fn test_plan_expand_with_path_alias() {
4662        let store = create_test_store();
4663        let planner = Planner::new(store);
4664
4665        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
4666        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4667            items: vec![
4668                ReturnItem {
4669                    expression: LogicalExpression::Variable("a".to_string()),
4670                    alias: None,
4671                },
4672                ReturnItem {
4673                    expression: LogicalExpression::Variable("b".to_string()),
4674                    alias: None,
4675                },
4676            ],
4677            distinct: false,
4678            input: Box::new(LogicalOperator::Expand(ExpandOp {
4679                from_variable: "a".to_string(),
4680                to_variable: "b".to_string(),
4681                edge_variable: None,
4682                direction: ExpandDirection::Outgoing,
4683                edge_type: Some("KNOWS".to_string()),
4684                min_hops: 1,
4685                max_hops: Some(3),
4686                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4687                    variable: "a".to_string(),
4688                    label: None,
4689                    input: None,
4690                })),
4691                path_alias: Some("p".to_string()),
4692            })),
4693        }));
4694
4695        let physical = planner.plan(&logical).unwrap();
4696        // Verify plan was created successfully with expected output columns
4697        assert!(physical.columns().contains(&"a".to_string()));
4698        assert!(physical.columns().contains(&"b".to_string()));
4699    }
4700
4701    #[test]
4702    fn test_plan_expand_incoming() {
4703        let store = create_test_store();
4704        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4705
4706        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
4707        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4708            items: vec![
4709                ReturnItem {
4710                    expression: LogicalExpression::Variable("a".to_string()),
4711                    alias: None,
4712                },
4713                ReturnItem {
4714                    expression: LogicalExpression::Variable("b".to_string()),
4715                    alias: None,
4716                },
4717            ],
4718            distinct: false,
4719            input: Box::new(LogicalOperator::Expand(ExpandOp {
4720                from_variable: "a".to_string(),
4721                to_variable: "b".to_string(),
4722                edge_variable: None,
4723                direction: ExpandDirection::Incoming,
4724                edge_type: Some("KNOWS".to_string()),
4725                min_hops: 1,
4726                max_hops: Some(1),
4727                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4728                    variable: "a".to_string(),
4729                    label: None,
4730                    input: None,
4731                })),
4732                path_alias: None,
4733            })),
4734        }));
4735
4736        let physical = planner.plan(&logical).unwrap();
4737        assert!(physical.columns().contains(&"a".to_string()));
4738        assert!(physical.columns().contains(&"b".to_string()));
4739    }
4740
4741    #[test]
4742    fn test_plan_expand_both_directions() {
4743        let store = create_test_store();
4744        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4745
4746        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
4747        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4748            items: vec![
4749                ReturnItem {
4750                    expression: LogicalExpression::Variable("a".to_string()),
4751                    alias: None,
4752                },
4753                ReturnItem {
4754                    expression: LogicalExpression::Variable("b".to_string()),
4755                    alias: None,
4756                },
4757            ],
4758            distinct: false,
4759            input: Box::new(LogicalOperator::Expand(ExpandOp {
4760                from_variable: "a".to_string(),
4761                to_variable: "b".to_string(),
4762                edge_variable: None,
4763                direction: ExpandDirection::Both,
4764                edge_type: Some("KNOWS".to_string()),
4765                min_hops: 1,
4766                max_hops: Some(1),
4767                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4768                    variable: "a".to_string(),
4769                    label: None,
4770                    input: None,
4771                })),
4772                path_alias: None,
4773            })),
4774        }));
4775
4776        let physical = planner.plan(&logical).unwrap();
4777        assert!(physical.columns().contains(&"a".to_string()));
4778        assert!(physical.columns().contains(&"b".to_string()));
4779    }
4780
4781    // ==================== With Context Tests ====================
4782
4783    #[test]
4784    fn test_planner_with_context() {
4785        use crate::transaction::TransactionManager;
4786
4787        let store = create_test_store();
4788        let tx_manager = Arc::new(TransactionManager::new());
4789        let tx_id = tx_manager.begin();
4790        let epoch = tx_manager.current_epoch();
4791
4792        let planner = Planner::with_context(
4793            Arc::clone(&store),
4794            Arc::clone(&tx_manager),
4795            Some(tx_id),
4796            epoch,
4797        );
4798
4799        assert_eq!(planner.tx_id(), Some(tx_id));
4800        assert!(planner.tx_manager().is_some());
4801        assert_eq!(planner.viewing_epoch(), epoch);
4802    }
4803
4804    #[test]
4805    fn test_planner_with_factorized_execution_disabled() {
4806        let store = create_test_store();
4807        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4808
4809        // Two consecutive expands - should NOT use factorized execution
4810        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4811            items: vec![
4812                ReturnItem {
4813                    expression: LogicalExpression::Variable("a".to_string()),
4814                    alias: None,
4815                },
4816                ReturnItem {
4817                    expression: LogicalExpression::Variable("c".to_string()),
4818                    alias: None,
4819                },
4820            ],
4821            distinct: false,
4822            input: Box::new(LogicalOperator::Expand(ExpandOp {
4823                from_variable: "b".to_string(),
4824                to_variable: "c".to_string(),
4825                edge_variable: None,
4826                direction: ExpandDirection::Outgoing,
4827                edge_type: None,
4828                min_hops: 1,
4829                max_hops: Some(1),
4830                input: Box::new(LogicalOperator::Expand(ExpandOp {
4831                    from_variable: "a".to_string(),
4832                    to_variable: "b".to_string(),
4833                    edge_variable: None,
4834                    direction: ExpandDirection::Outgoing,
4835                    edge_type: None,
4836                    min_hops: 1,
4837                    max_hops: Some(1),
4838                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4839                        variable: "a".to_string(),
4840                        label: None,
4841                        input: None,
4842                    })),
4843                    path_alias: None,
4844                })),
4845                path_alias: None,
4846            })),
4847        }));
4848
4849        let physical = planner.plan(&logical).unwrap();
4850        assert!(physical.columns().contains(&"a".to_string()));
4851        assert!(physical.columns().contains(&"c".to_string()));
4852    }
4853
4854    // ==================== Sort with Property Tests ====================
4855
4856    #[test]
4857    fn test_plan_sort_by_property() {
4858        let store = create_test_store();
4859        let planner = Planner::new(store);
4860
4861        // MATCH (n) RETURN n ORDER BY n.name ASC
4862        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4863            items: vec![ReturnItem {
4864                expression: LogicalExpression::Variable("n".to_string()),
4865                alias: None,
4866            }],
4867            distinct: false,
4868            input: Box::new(LogicalOperator::Sort(SortOp {
4869                keys: vec![SortKey {
4870                    expression: LogicalExpression::Property {
4871                        variable: "n".to_string(),
4872                        property: "name".to_string(),
4873                    },
4874                    order: SortOrder::Ascending,
4875                }],
4876                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4877                    variable: "n".to_string(),
4878                    label: None,
4879                    input: None,
4880                })),
4881            })),
4882        }));
4883
4884        let physical = planner.plan(&logical).unwrap();
4885        // Should have the property column projected
4886        assert!(physical.columns().contains(&"n".to_string()));
4887    }
4888
4889    // ==================== Scan with Input Tests ====================
4890
4891    #[test]
4892    fn test_plan_scan_with_input() {
4893        let store = create_test_store();
4894        let planner = Planner::new(store);
4895
4896        // A scan with another scan as input (for chained patterns)
4897        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4898            items: vec![
4899                ReturnItem {
4900                    expression: LogicalExpression::Variable("a".to_string()),
4901                    alias: None,
4902                },
4903                ReturnItem {
4904                    expression: LogicalExpression::Variable("b".to_string()),
4905                    alias: None,
4906                },
4907            ],
4908            distinct: false,
4909            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4910                variable: "b".to_string(),
4911                label: Some("Company".to_string()),
4912                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4913                    variable: "a".to_string(),
4914                    label: Some("Person".to_string()),
4915                    input: None,
4916                }))),
4917            })),
4918        }));
4919
4920        let physical = planner.plan(&logical).unwrap();
4921        assert!(physical.columns().contains(&"a".to_string()));
4922        assert!(physical.columns().contains(&"b".to_string()));
4923    }
4924}