Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::LogicalType;
15use grafeo_common::types::{EpochId, TxId};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
22    ExpandStep, ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator,
23    FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
24    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LimitOperator, MergeOperator,
25    NestedLoopJoinOperator, NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource,
26    RemoveLabelOperator, ScanOperator, SetPropertyOperator, ShortestPathOperator,
27    SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
28    UnaryFilterOp, UnionOperator, UnwindOperator, VariableLengthExpandOperator,
29};
30use grafeo_core::graph::{Direction, lpg::LpgStore};
31use std::collections::HashMap;
32use std::sync::Arc;
33
34use crate::transaction::TransactionManager;
35
36/// Converts a logical plan to a physical operator tree.
37pub struct Planner {
38    /// The graph store to scan from.
39    store: Arc<LpgStore>,
40    /// Transaction manager for MVCC operations.
41    tx_manager: Option<Arc<TransactionManager>>,
42    /// Current transaction ID (if in a transaction).
43    tx_id: Option<TxId>,
44    /// Epoch to use for visibility checks.
45    viewing_epoch: EpochId,
46    /// Counter for generating unique anonymous edge column names.
47    anon_edge_counter: std::cell::Cell<u32>,
48    /// Whether to use factorized execution for multi-hop queries.
49    factorized_execution: bool,
50}
51
52impl Planner {
53    /// Creates a new planner with the given store.
54    ///
55    /// This creates a planner without transaction context, using the current
56    /// epoch from the store for visibility.
57    #[must_use]
58    pub fn new(store: Arc<LpgStore>) -> Self {
59        let epoch = store.current_epoch();
60        Self {
61            store,
62            tx_manager: None,
63            tx_id: None,
64            viewing_epoch: epoch,
65            anon_edge_counter: std::cell::Cell::new(0),
66            factorized_execution: true,
67        }
68    }
69
70    /// Creates a new planner with transaction context for MVCC-aware planning.
71    ///
72    /// # Arguments
73    ///
74    /// * `store` - The graph store
75    /// * `tx_manager` - Transaction manager for recording reads/writes
76    /// * `tx_id` - Current transaction ID (None for auto-commit)
77    /// * `viewing_epoch` - Epoch to use for version visibility
78    #[must_use]
79    pub fn with_context(
80        store: Arc<LpgStore>,
81        tx_manager: Arc<TransactionManager>,
82        tx_id: Option<TxId>,
83        viewing_epoch: EpochId,
84    ) -> Self {
85        Self {
86            store,
87            tx_manager: Some(tx_manager),
88            tx_id,
89            viewing_epoch,
90            anon_edge_counter: std::cell::Cell::new(0),
91            factorized_execution: true,
92        }
93    }
94
95    /// Returns the viewing epoch for this planner.
96    #[must_use]
97    pub fn viewing_epoch(&self) -> EpochId {
98        self.viewing_epoch
99    }
100
101    /// Returns the transaction ID for this planner, if any.
102    #[must_use]
103    pub fn tx_id(&self) -> Option<TxId> {
104        self.tx_id
105    }
106
107    /// Returns a reference to the transaction manager, if available.
108    #[must_use]
109    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
110        self.tx_manager.as_ref()
111    }
112
113    /// Enables or disables factorized execution for multi-hop queries.
114    #[must_use]
115    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
116        self.factorized_execution = enabled;
117        self
118    }
119
120    /// Counts consecutive single-hop expand operations.
121    ///
122    /// Returns the count and the deepest non-expand operator (the base of the chain).
123    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
124        match op {
125            LogicalOperator::Expand(expand) => {
126                // Only count single-hop expands (factorization doesn't apply to variable-length)
127                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
128
129                if is_single_hop {
130                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
131                    (inner_count + 1, base)
132                } else {
133                    // Variable-length path breaks the chain
134                    (0, op)
135                }
136            }
137            _ => (0, op),
138        }
139    }
140
141    /// Collects expand operations from the outermost down to the base.
142    ///
143    /// Returns expands in order from innermost (base) to outermost.
144    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
145        let mut chain = Vec::new();
146        let mut current = op;
147
148        while let LogicalOperator::Expand(expand) = current {
149            // Only include single-hop expands
150            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
151            if !is_single_hop {
152                break;
153            }
154            chain.push(expand);
155            current = &expand.input;
156        }
157
158        // Reverse so we go from base to outer
159        chain.reverse();
160        chain
161    }
162
163    /// Plans a logical plan into a physical operator.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if planning fails.
168    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
169        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
170        Ok(PhysicalPlan {
171            operator,
172            columns,
173            adaptive_context: None,
174        })
175    }
176
177    /// Plans a logical plan with adaptive execution support.
178    ///
179    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
180    /// joins) that can be monitored during execution to detect estimate errors.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if planning fails.
185    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
186        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
187
188        // Build adaptive context with cardinality estimates
189        let mut adaptive_context = AdaptiveContext::new();
190        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
191
192        Ok(PhysicalPlan {
193            operator,
194            columns,
195            adaptive_context: Some(adaptive_context),
196        })
197    }
198
199    /// Collects cardinality estimates from the logical plan into an adaptive context.
200    fn collect_cardinality_estimates(
201        &self,
202        op: &LogicalOperator,
203        ctx: &mut AdaptiveContext,
204        depth: usize,
205    ) {
206        match op {
207            LogicalOperator::NodeScan(scan) => {
208                // Estimate based on label statistics
209                let estimate = if let Some(label) = &scan.label {
210                    self.store.nodes_by_label(label).len() as f64
211                } else {
212                    self.store.node_count() as f64
213                };
214                let id = format!("scan_{}", scan.variable);
215                ctx.set_estimate(&id, estimate);
216
217                // Recurse into input if present
218                if let Some(input) = &scan.input {
219                    self.collect_cardinality_estimates(input, ctx, depth + 1);
220                }
221            }
222            LogicalOperator::Filter(filter) => {
223                // Default selectivity estimate for filters (30%)
224                let input_estimate = self.estimate_cardinality(&filter.input);
225                let estimate = input_estimate * 0.3;
226                let id = format!("filter_{depth}");
227                ctx.set_estimate(&id, estimate);
228
229                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
230            }
231            LogicalOperator::Expand(expand) => {
232                // Estimate based on average degree
233                let input_estimate = self.estimate_cardinality(&expand.input);
234                let avg_degree = 10.0; // Default estimate
235                let estimate = input_estimate * avg_degree;
236                let id = format!("expand_{}", expand.to_variable);
237                ctx.set_estimate(&id, estimate);
238
239                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
240            }
241            LogicalOperator::Join(join) => {
242                // Estimate join output (product with selectivity)
243                let left_est = self.estimate_cardinality(&join.left);
244                let right_est = self.estimate_cardinality(&join.right);
245                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
246                let id = format!("join_{depth}");
247                ctx.set_estimate(&id, estimate);
248
249                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
250                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
251            }
252            LogicalOperator::Aggregate(agg) => {
253                // Aggregates typically reduce cardinality
254                let input_estimate = self.estimate_cardinality(&agg.input);
255                let estimate = if agg.group_by.is_empty() {
256                    1.0 // Scalar aggregate
257                } else {
258                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
259                };
260                let id = format!("aggregate_{depth}");
261                ctx.set_estimate(&id, estimate);
262
263                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
264            }
265            LogicalOperator::Distinct(distinct) => {
266                let input_estimate = self.estimate_cardinality(&distinct.input);
267                let estimate = (input_estimate * 0.5).max(1.0);
268                let id = format!("distinct_{depth}");
269                ctx.set_estimate(&id, estimate);
270
271                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
272            }
273            LogicalOperator::Return(ret) => {
274                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
275            }
276            LogicalOperator::Limit(limit) => {
277                let input_estimate = self.estimate_cardinality(&limit.input);
278                let estimate = (input_estimate).min(limit.count as f64);
279                let id = format!("limit_{depth}");
280                ctx.set_estimate(&id, estimate);
281
282                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
283            }
284            LogicalOperator::Skip(skip) => {
285                let input_estimate = self.estimate_cardinality(&skip.input);
286                let estimate = (input_estimate - skip.count as f64).max(0.0);
287                let id = format!("skip_{depth}");
288                ctx.set_estimate(&id, estimate);
289
290                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
291            }
292            LogicalOperator::Sort(sort) => {
293                // Sort doesn't change cardinality
294                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
295            }
296            LogicalOperator::Union(union) => {
297                let estimate: f64 = union
298                    .inputs
299                    .iter()
300                    .map(|input| self.estimate_cardinality(input))
301                    .sum();
302                let id = format!("union_{depth}");
303                ctx.set_estimate(&id, estimate);
304
305                for input in &union.inputs {
306                    self.collect_cardinality_estimates(input, ctx, depth + 1);
307                }
308            }
309            _ => {
310                // For other operators, try to recurse into known input patterns
311            }
312        }
313    }
314
315    /// Estimates cardinality for a logical operator subtree.
316    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
317        match op {
318            LogicalOperator::NodeScan(scan) => {
319                if let Some(label) = &scan.label {
320                    self.store.nodes_by_label(label).len() as f64
321                } else {
322                    self.store.node_count() as f64
323                }
324            }
325            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
326            LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
327            LogicalOperator::Join(join) => {
328                let left = self.estimate_cardinality(&join.left);
329                let right = self.estimate_cardinality(&join.right);
330                (left * right).sqrt()
331            }
332            LogicalOperator::Aggregate(agg) => {
333                if agg.group_by.is_empty() {
334                    1.0
335                } else {
336                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
337                }
338            }
339            LogicalOperator::Distinct(distinct) => {
340                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
341            }
342            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
343            LogicalOperator::Limit(limit) => self
344                .estimate_cardinality(&limit.input)
345                .min(limit.count as f64),
346            LogicalOperator::Skip(skip) => {
347                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
348            }
349            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
350            LogicalOperator::Union(union) => union
351                .inputs
352                .iter()
353                .map(|input| self.estimate_cardinality(input))
354                .sum(),
355            _ => 1000.0, // Default estimate for unknown operators
356        }
357    }
358
359    /// Plans a single logical operator.
360    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
361        match op {
362            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
363            LogicalOperator::Expand(expand) => {
364                // Check for expand chains when factorized execution is enabled
365                if self.factorized_execution {
366                    let (chain_len, _base) = Self::count_expand_chain(op);
367                    if chain_len >= 2 {
368                        // Use factorized chain for 2+ consecutive single-hop expands
369                        return self.plan_expand_chain(op);
370                    }
371                }
372                self.plan_expand(expand)
373            }
374            LogicalOperator::Return(ret) => self.plan_return(ret),
375            LogicalOperator::Filter(filter) => self.plan_filter(filter),
376            LogicalOperator::Project(project) => self.plan_project(project),
377            LogicalOperator::Limit(limit) => self.plan_limit(limit),
378            LogicalOperator::Skip(skip) => self.plan_skip(skip),
379            LogicalOperator::Sort(sort) => self.plan_sort(sort),
380            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
381            LogicalOperator::Join(join) => self.plan_join(join),
382            LogicalOperator::Union(union) => self.plan_union(union),
383            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
384            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
385            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
386            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
387            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
388            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
389            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
390            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
391            LogicalOperator::Merge(merge) => self.plan_merge(merge),
392            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
393            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
394            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
395            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
396            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
397            _ => Err(Error::Internal(format!(
398                "Unsupported operator: {:?}",
399                std::mem::discriminant(op)
400            ))),
401        }
402    }
403
404    /// Plans a node scan operator.
405    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
406        let scan_op = if let Some(label) = &scan.label {
407            ScanOperator::with_label(Arc::clone(&self.store), label)
408        } else {
409            ScanOperator::new(Arc::clone(&self.store))
410        };
411
412        // Apply MVCC context if available
413        let scan_operator: Box<dyn Operator> =
414            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
415
416        // If there's an input, chain operators with a nested loop join (cross join)
417        if let Some(input) = &scan.input {
418            let (input_op, mut input_columns) = self.plan_operator(input)?;
419
420            // Build output schema: input columns + scan column
421            let mut output_schema: Vec<LogicalType> =
422                input_columns.iter().map(|_| LogicalType::Any).collect();
423            output_schema.push(LogicalType::Node);
424
425            // Add scan column to input columns
426            input_columns.push(scan.variable.clone());
427
428            // Use nested loop join to combine input rows with scanned nodes
429            let join_op = Box::new(NestedLoopJoinOperator::new(
430                input_op,
431                scan_operator,
432                None, // No join condition (cross join)
433                PhysicalJoinType::Cross,
434                output_schema,
435            ));
436
437            Ok((join_op, input_columns))
438        } else {
439            let columns = vec![scan.variable.clone()];
440            Ok((scan_operator, columns))
441        }
442    }
443
444    /// Plans an expand operator.
445    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
446        // Plan the input operator first
447        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
448
449        // Find the source column index
450        let source_column = input_columns
451            .iter()
452            .position(|c| c == &expand.from_variable)
453            .ok_or_else(|| {
454                Error::Internal(format!(
455                    "Source variable '{}' not found in input columns",
456                    expand.from_variable
457                ))
458            })?;
459
460        // Convert expand direction
461        let direction = match expand.direction {
462            ExpandDirection::Outgoing => Direction::Outgoing,
463            ExpandDirection::Incoming => Direction::Incoming,
464            ExpandDirection::Both => Direction::Both,
465        };
466
467        // Check if this is a variable-length path
468        let is_variable_length =
469            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
470
471        let operator: Box<dyn Operator> = if is_variable_length {
472            // Use VariableLengthExpandOperator for multi-hop paths
473            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
474            let mut expand_op = VariableLengthExpandOperator::new(
475                Arc::clone(&self.store),
476                input_op,
477                source_column,
478                direction,
479                expand.edge_type.clone(),
480                expand.min_hops,
481                max_hops,
482            )
483            .with_tx_context(self.viewing_epoch, self.tx_id);
484
485            // If a path alias is set, enable path length output
486            if expand.path_alias.is_some() {
487                expand_op = expand_op.with_path_length_output();
488            }
489
490            Box::new(expand_op)
491        } else {
492            // Use simple ExpandOperator for single-hop paths
493            let expand_op = ExpandOperator::new(
494                Arc::clone(&self.store),
495                input_op,
496                source_column,
497                direction,
498                expand.edge_type.clone(),
499            )
500            .with_tx_context(self.viewing_epoch, self.tx_id);
501            Box::new(expand_op)
502        };
503
504        // Build output columns: [input_columns..., edge, target, (path_length)?]
505        // Preserve all input columns and add edge + target to match ExpandOperator output
506        let mut columns = input_columns;
507
508        // Generate edge column name - use provided name or generate anonymous name
509        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
510            let count = self.anon_edge_counter.get();
511            self.anon_edge_counter.set(count + 1);
512            format!("_anon_edge_{}", count)
513        });
514        columns.push(edge_col_name);
515
516        columns.push(expand.to_variable.clone());
517
518        // If a path alias is set, add a column for the path length
519        if let Some(ref path_alias) = expand.path_alias {
520            columns.push(format!("_path_length_{}", path_alias));
521        }
522
523        Ok((operator, columns))
524    }
525
526    /// Plans a chain of consecutive expand operations using factorized execution.
527    ///
528    /// This avoids the Cartesian product explosion that occurs with separate expands.
529    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
530    ///
531    /// The chain is executed lazily at query time, not during planning. This ensures
532    /// that any filters applied above the expand chain are properly respected.
533    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
534        let expands = Self::collect_expand_chain(op);
535        if expands.is_empty() {
536            return Err(Error::Internal("Empty expand chain".to_string()));
537        }
538
539        // Get the base operator (before first expand)
540        let first_expand = expands[0];
541        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
542
543        let mut columns = base_columns.clone();
544        let mut steps = Vec::new();
545
546        // Track the level-local source column for each expand
547        // For the first expand, it's the column in the input (base_columns)
548        // For subsequent expands, the target from the previous level is always at index 1
549        // (each level adds [edge, target], so target is at index 1)
550        let mut is_first = true;
551
552        for expand in &expands {
553            // Find source column for this expand
554            let source_column = if is_first {
555                // For first expand, find in base columns
556                base_columns
557                    .iter()
558                    .position(|c| c == &expand.from_variable)
559                    .ok_or_else(|| {
560                        Error::Internal(format!(
561                            "Source variable '{}' not found in base columns",
562                            expand.from_variable
563                        ))
564                    })?
565            } else {
566                // For subsequent expands, the target from the previous level is at index 1
567                // (each level adds [edge, target], so target is the second column)
568                1
569            };
570
571            // Convert direction
572            let direction = match expand.direction {
573                ExpandDirection::Outgoing => Direction::Outgoing,
574                ExpandDirection::Incoming => Direction::Incoming,
575                ExpandDirection::Both => Direction::Both,
576            };
577
578            // Add expand step configuration
579            steps.push(ExpandStep {
580                source_column,
581                direction,
582                edge_type: expand.edge_type.clone(),
583            });
584
585            // Add edge and target columns
586            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
587                let count = self.anon_edge_counter.get();
588                self.anon_edge_counter.set(count + 1);
589                format!("_anon_edge_{}", count)
590            });
591            columns.push(edge_col_name);
592            columns.push(expand.to_variable.clone());
593
594            is_first = false;
595        }
596
597        // Create lazy operator that executes at query time, not planning time
598        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
599
600        if let Some(tx_id) = self.tx_id {
601            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
602        } else {
603            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
604        }
605
606        Ok((Box::new(lazy_op), columns))
607    }
608
609    /// Plans a RETURN clause.
610    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
611        // Plan the input operator
612        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
613
614        // Build variable to column index mapping
615        let variable_columns: HashMap<String, usize> = input_columns
616            .iter()
617            .enumerate()
618            .map(|(i, name)| (name.clone(), i))
619            .collect();
620
621        // Extract column names from return items
622        let columns: Vec<String> = ret
623            .items
624            .iter()
625            .map(|item| {
626                item.alias.clone().unwrap_or_else(|| {
627                    // Generate a default name from the expression
628                    expression_to_string(&item.expression)
629                })
630            })
631            .collect();
632
633        // Check if we need a project operator (for property access or expression evaluation)
634        let needs_project = ret
635            .items
636            .iter()
637            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
638
639        if needs_project {
640            // Build project expressions
641            let mut projections = Vec::with_capacity(ret.items.len());
642            let mut output_types = Vec::with_capacity(ret.items.len());
643
644            for item in &ret.items {
645                match &item.expression {
646                    LogicalExpression::Variable(name) => {
647                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
648                            Error::Internal(format!("Variable '{}' not found in input", name))
649                        })?;
650                        projections.push(ProjectExpr::Column(col_idx));
651                        // Use Node type for variables (they could be nodes, edges, or values)
652                        output_types.push(LogicalType::Node);
653                    }
654                    LogicalExpression::Property { variable, property } => {
655                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
656                            Error::Internal(format!("Variable '{}' not found in input", variable))
657                        })?;
658                        projections.push(ProjectExpr::PropertyAccess {
659                            column: col_idx,
660                            property: property.clone(),
661                        });
662                        // Property could be any type - use Any/Generic to preserve type
663                        output_types.push(LogicalType::Any);
664                    }
665                    LogicalExpression::Literal(value) => {
666                        projections.push(ProjectExpr::Constant(value.clone()));
667                        output_types.push(value_to_logical_type(value));
668                    }
669                    LogicalExpression::FunctionCall { name, args, .. } => {
670                        // Handle built-in functions
671                        match name.to_lowercase().as_str() {
672                            "type" => {
673                                // type(r) returns the edge type string
674                                if args.len() != 1 {
675                                    return Err(Error::Internal(
676                                        "type() requires exactly one argument".to_string(),
677                                    ));
678                                }
679                                if let LogicalExpression::Variable(var_name) = &args[0] {
680                                    let col_idx =
681                                        *variable_columns.get(var_name).ok_or_else(|| {
682                                            Error::Internal(format!(
683                                                "Variable '{}' not found in input",
684                                                var_name
685                                            ))
686                                        })?;
687                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
688                                    output_types.push(LogicalType::String);
689                                } else {
690                                    return Err(Error::Internal(
691                                        "type() argument must be a variable".to_string(),
692                                    ));
693                                }
694                            }
695                            "length" => {
696                                // length(p) returns the path length
697                                // For shortestPath results, the path column already contains the length
698                                if args.len() != 1 {
699                                    return Err(Error::Internal(
700                                        "length() requires exactly one argument".to_string(),
701                                    ));
702                                }
703                                if let LogicalExpression::Variable(var_name) = &args[0] {
704                                    let col_idx =
705                                        *variable_columns.get(var_name).ok_or_else(|| {
706                                            Error::Internal(format!(
707                                                "Variable '{}' not found in input",
708                                                var_name
709                                            ))
710                                        })?;
711                                    // Pass through the column value directly
712                                    projections.push(ProjectExpr::Column(col_idx));
713                                    output_types.push(LogicalType::Int64);
714                                } else {
715                                    return Err(Error::Internal(
716                                        "length() argument must be a variable".to_string(),
717                                    ));
718                                }
719                            }
720                            // For other functions (head, tail, size, etc.), use expression evaluation
721                            _ => {
722                                let filter_expr = self.convert_expression(&item.expression)?;
723                                projections.push(ProjectExpr::Expression {
724                                    expr: filter_expr,
725                                    variable_columns: variable_columns.clone(),
726                                });
727                                output_types.push(LogicalType::Any);
728                            }
729                        }
730                    }
731                    LogicalExpression::Case { .. } => {
732                        // Convert CASE expression to FilterExpression for evaluation
733                        let filter_expr = self.convert_expression(&item.expression)?;
734                        projections.push(ProjectExpr::Expression {
735                            expr: filter_expr,
736                            variable_columns: variable_columns.clone(),
737                        });
738                        // CASE can return any type - use Any
739                        output_types.push(LogicalType::Any);
740                    }
741                    _ => {
742                        return Err(Error::Internal(format!(
743                            "Unsupported RETURN expression: {:?}",
744                            item.expression
745                        )));
746                    }
747                }
748            }
749
750            let operator = Box::new(ProjectOperator::with_store(
751                input_op,
752                projections,
753                output_types,
754                Arc::clone(&self.store),
755            ));
756
757            Ok((operator, columns))
758        } else {
759            // Simple case: just return variables
760            // Re-order columns to match return items if needed
761            let mut projections = Vec::with_capacity(ret.items.len());
762            let mut output_types = Vec::with_capacity(ret.items.len());
763
764            for item in &ret.items {
765                if let LogicalExpression::Variable(name) = &item.expression {
766                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
767                        Error::Internal(format!("Variable '{}' not found in input", name))
768                    })?;
769                    projections.push(ProjectExpr::Column(col_idx));
770                    output_types.push(LogicalType::Node);
771                }
772            }
773
774            // Only add ProjectOperator if reordering is needed
775            if projections.len() == input_columns.len()
776                && projections
777                    .iter()
778                    .enumerate()
779                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
780            {
781                // No reordering needed
782                Ok((input_op, columns))
783            } else {
784                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
785                Ok((operator, columns))
786            }
787        }
788    }
789
790    /// Plans a project operator (for WITH clause).
791    fn plan_project(
792        &self,
793        project: &crate::query::plan::ProjectOp,
794    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
795        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
796        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
797            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
798                // Create a single-row operator for projecting literals
799                let single_row_op: Box<dyn Operator> = Box::new(
800                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
801                );
802                (single_row_op, Vec::new())
803            } else {
804                self.plan_operator(&project.input)?
805            };
806
807        // Build variable to column index mapping
808        let variable_columns: HashMap<String, usize> = input_columns
809            .iter()
810            .enumerate()
811            .map(|(i, name)| (name.clone(), i))
812            .collect();
813
814        // Build projections and new column names
815        let mut projections = Vec::with_capacity(project.projections.len());
816        let mut output_types = Vec::with_capacity(project.projections.len());
817        let mut output_columns = Vec::with_capacity(project.projections.len());
818
819        for projection in &project.projections {
820            // Determine the output column name (alias or expression string)
821            let col_name = projection
822                .alias
823                .clone()
824                .unwrap_or_else(|| expression_to_string(&projection.expression));
825            output_columns.push(col_name);
826
827            match &projection.expression {
828                LogicalExpression::Variable(name) => {
829                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
830                        Error::Internal(format!("Variable '{}' not found in input", name))
831                    })?;
832                    projections.push(ProjectExpr::Column(col_idx));
833                    output_types.push(LogicalType::Node);
834                }
835                LogicalExpression::Property { variable, property } => {
836                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
837                        Error::Internal(format!("Variable '{}' not found in input", variable))
838                    })?;
839                    projections.push(ProjectExpr::PropertyAccess {
840                        column: col_idx,
841                        property: property.clone(),
842                    });
843                    output_types.push(LogicalType::Any);
844                }
845                LogicalExpression::Literal(value) => {
846                    projections.push(ProjectExpr::Constant(value.clone()));
847                    output_types.push(value_to_logical_type(value));
848                }
849                _ => {
850                    // For complex expressions, use full expression evaluation
851                    let filter_expr = self.convert_expression(&projection.expression)?;
852                    projections.push(ProjectExpr::Expression {
853                        expr: filter_expr,
854                        variable_columns: variable_columns.clone(),
855                    });
856                    output_types.push(LogicalType::Any);
857                }
858            }
859        }
860
861        let operator = Box::new(ProjectOperator::with_store(
862            input_op,
863            projections,
864            output_types,
865            Arc::clone(&self.store),
866        ));
867
868        Ok((operator, output_columns))
869    }
870
871    /// Plans a filter operator.
872    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
873        // Plan the input operator first
874        let (input_op, columns) = self.plan_operator(&filter.input)?;
875
876        // Build variable to column index mapping
877        let variable_columns: HashMap<String, usize> = columns
878            .iter()
879            .enumerate()
880            .map(|(i, name)| (name.clone(), i))
881            .collect();
882
883        // Convert logical expression to filter expression
884        let filter_expr = self.convert_expression(&filter.predicate)?;
885
886        // Create the predicate
887        let predicate =
888            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
889
890        // Create the filter operator
891        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
892
893        Ok((operator, columns))
894    }
895
896    /// Plans a LIMIT operator.
897    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
898        let (input_op, columns) = self.plan_operator(&limit.input)?;
899        let output_schema = self.derive_schema_from_columns(&columns);
900        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
901        Ok((operator, columns))
902    }
903
904    /// Plans a SKIP operator.
905    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
906        let (input_op, columns) = self.plan_operator(&skip.input)?;
907        let output_schema = self.derive_schema_from_columns(&columns);
908        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
909        Ok((operator, columns))
910    }
911
912    /// Plans a SORT (ORDER BY) operator.
913    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
914        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
915
916        // Build variable to column index mapping
917        let mut variable_columns: HashMap<String, usize> = input_columns
918            .iter()
919            .enumerate()
920            .map(|(i, name)| (name.clone(), i))
921            .collect();
922
923        // Collect property expressions that need to be projected before sorting
924        let mut property_projections: Vec<(String, String, String)> = Vec::new();
925        let mut next_col_idx = input_columns.len();
926
927        for key in &sort.keys {
928            if let LogicalExpression::Property { variable, property } = &key.expression {
929                let col_name = format!("{}_{}", variable, property);
930                if !variable_columns.contains_key(&col_name) {
931                    property_projections.push((
932                        variable.clone(),
933                        property.clone(),
934                        col_name.clone(),
935                    ));
936                    variable_columns.insert(col_name, next_col_idx);
937                    next_col_idx += 1;
938                }
939            }
940        }
941
942        // Track output columns
943        let mut output_columns = input_columns.clone();
944
945        // If we have property expressions, add a projection to materialize them
946        if !property_projections.is_empty() {
947            let mut projections = Vec::new();
948            let mut output_types = Vec::new();
949
950            // First, pass through all existing columns (use Node type to preserve node IDs
951            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
952            for (i, _) in input_columns.iter().enumerate() {
953                projections.push(ProjectExpr::Column(i));
954                output_types.push(LogicalType::Node);
955            }
956
957            // Then add property access projections
958            for (variable, property, col_name) in &property_projections {
959                let source_col = *variable_columns.get(variable).ok_or_else(|| {
960                    Error::Internal(format!(
961                        "Variable '{}' not found for ORDER BY property projection",
962                        variable
963                    ))
964                })?;
965                projections.push(ProjectExpr::PropertyAccess {
966                    column: source_col,
967                    property: property.clone(),
968                });
969                output_types.push(LogicalType::Any);
970                output_columns.push(col_name.clone());
971            }
972
973            input_op = Box::new(ProjectOperator::with_store(
974                input_op,
975                projections,
976                output_types,
977                Arc::clone(&self.store),
978            ));
979        }
980
981        // Convert logical sort keys to physical sort keys
982        let physical_keys: Vec<PhysicalSortKey> = sort
983            .keys
984            .iter()
985            .map(|key| {
986                let col_idx = self
987                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
988                Ok(PhysicalSortKey {
989                    column: col_idx,
990                    direction: match key.order {
991                        SortOrder::Ascending => SortDirection::Ascending,
992                        SortOrder::Descending => SortDirection::Descending,
993                    },
994                    null_order: NullOrder::NullsLast,
995                })
996            })
997            .collect::<Result<Vec<_>>>()?;
998
999        let output_schema = self.derive_schema_from_columns(&output_columns);
1000        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1001        Ok((operator, output_columns))
1002    }
1003
1004    /// Resolves a sort expression to a column index, using projected property columns.
1005    fn resolve_sort_expression_with_properties(
1006        &self,
1007        expr: &LogicalExpression,
1008        variable_columns: &HashMap<String, usize>,
1009    ) -> Result<usize> {
1010        match expr {
1011            LogicalExpression::Variable(name) => {
1012                variable_columns.get(name).copied().ok_or_else(|| {
1013                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1014                })
1015            }
1016            LogicalExpression::Property { variable, property } => {
1017                // Look up the projected property column (e.g., "p_age" for p.age)
1018                let col_name = format!("{}_{}", variable, property);
1019                variable_columns.get(&col_name).copied().ok_or_else(|| {
1020                    Error::Internal(format!(
1021                        "Property column '{}' not found for ORDER BY (from {}.{})",
1022                        col_name, variable, property
1023                    ))
1024                })
1025            }
1026            _ => Err(Error::Internal(format!(
1027                "Unsupported ORDER BY expression: {:?}",
1028                expr
1029            ))),
1030        }
1031    }
1032
1033    /// Derives a schema from column names (uses Any type to handle all value types).
1034    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1035        columns.iter().map(|_| LogicalType::Any).collect()
1036    }
1037
1038    /// Plans an AGGREGATE operator.
1039    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1040        // Check if we can use factorized aggregation for speedup
1041        // Conditions:
1042        // 1. Factorized execution is enabled
1043        // 2. Input is an expand chain (multi-hop)
1044        // 3. No GROUP BY
1045        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1046        if self.factorized_execution
1047            && agg.group_by.is_empty()
1048            && Self::count_expand_chain(&agg.input).0 >= 2
1049            && self.is_simple_aggregate(agg)
1050        {
1051            if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1052                return Ok((op, cols));
1053            }
1054            // Fall through to regular aggregate if factorized planning fails
1055        }
1056
1057        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1058
1059        // Build variable to column index mapping
1060        let mut variable_columns: HashMap<String, usize> = input_columns
1061            .iter()
1062            .enumerate()
1063            .map(|(i, name)| (name.clone(), i))
1064            .collect();
1065
1066        // Collect all property expressions that need to be projected before aggregation
1067        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1068        let mut next_col_idx = input_columns.len();
1069
1070        // Check group-by expressions for properties
1071        for expr in &agg.group_by {
1072            if let LogicalExpression::Property { variable, property } = expr {
1073                let col_name = format!("{}_{}", variable, property);
1074                if !variable_columns.contains_key(&col_name) {
1075                    property_projections.push((
1076                        variable.clone(),
1077                        property.clone(),
1078                        col_name.clone(),
1079                    ));
1080                    variable_columns.insert(col_name, next_col_idx);
1081                    next_col_idx += 1;
1082                }
1083            }
1084        }
1085
1086        // Check aggregate expressions for properties
1087        for agg_expr in &agg.aggregates {
1088            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1089                let col_name = format!("{}_{}", variable, property);
1090                if !variable_columns.contains_key(&col_name) {
1091                    property_projections.push((
1092                        variable.clone(),
1093                        property.clone(),
1094                        col_name.clone(),
1095                    ));
1096                    variable_columns.insert(col_name, next_col_idx);
1097                    next_col_idx += 1;
1098                }
1099            }
1100        }
1101
1102        // If we have property expressions, add a projection to materialize them
1103        if !property_projections.is_empty() {
1104            let mut projections = Vec::new();
1105            let mut output_types = Vec::new();
1106
1107            // First, pass through all existing columns (use Node type to preserve node IDs
1108            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1109            for (i, _) in input_columns.iter().enumerate() {
1110                projections.push(ProjectExpr::Column(i));
1111                output_types.push(LogicalType::Node);
1112            }
1113
1114            // Then add property access projections
1115            for (variable, property, _col_name) in &property_projections {
1116                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1117                    Error::Internal(format!(
1118                        "Variable '{}' not found for property projection",
1119                        variable
1120                    ))
1121                })?;
1122                projections.push(ProjectExpr::PropertyAccess {
1123                    column: source_col,
1124                    property: property.clone(),
1125                });
1126                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1127            }
1128
1129            input_op = Box::new(ProjectOperator::with_store(
1130                input_op,
1131                projections,
1132                output_types,
1133                Arc::clone(&self.store),
1134            ));
1135        }
1136
1137        // Convert group-by expressions to column indices
1138        let group_columns: Vec<usize> = agg
1139            .group_by
1140            .iter()
1141            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1142            .collect::<Result<Vec<_>>>()?;
1143
1144        // Convert aggregate expressions to physical form
1145        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1146            .aggregates
1147            .iter()
1148            .map(|agg_expr| {
1149                let column = agg_expr
1150                    .expression
1151                    .as_ref()
1152                    .map(|e| {
1153                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1154                    })
1155                    .transpose()?;
1156
1157                Ok(PhysicalAggregateExpr {
1158                    function: convert_aggregate_function(agg_expr.function),
1159                    column,
1160                    distinct: agg_expr.distinct,
1161                    alias: agg_expr.alias.clone(),
1162                    percentile: agg_expr.percentile,
1163                })
1164            })
1165            .collect::<Result<Vec<_>>>()?;
1166
1167        // Build output schema and column names
1168        let mut output_schema = Vec::new();
1169        let mut output_columns = Vec::new();
1170
1171        // Add group-by columns
1172        for expr in &agg.group_by {
1173            output_schema.push(LogicalType::Any); // Group-by values can be any type
1174            output_columns.push(expression_to_string(expr));
1175        }
1176
1177        // Add aggregate result columns
1178        for agg_expr in &agg.aggregates {
1179            let result_type = match agg_expr.function {
1180                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1181                    LogicalType::Int64
1182                }
1183                LogicalAggregateFunction::Sum => LogicalType::Int64,
1184                LogicalAggregateFunction::Avg => LogicalType::Float64,
1185                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1186                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1187                    // since the aggregate can return any Value type, but the most common case
1188                    // is numeric values from property expressions
1189                    LogicalType::Int64
1190                }
1191                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1192                // Statistical functions return Float64
1193                LogicalAggregateFunction::StdDev
1194                | LogicalAggregateFunction::StdDevPop
1195                | LogicalAggregateFunction::PercentileDisc
1196                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1197            };
1198            output_schema.push(result_type);
1199            output_columns.push(
1200                agg_expr
1201                    .alias
1202                    .clone()
1203                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1204            );
1205        }
1206
1207        // Choose operator based on whether there are group-by columns
1208        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1209            Box::new(SimpleAggregateOperator::new(
1210                input_op,
1211                physical_aggregates,
1212                output_schema,
1213            ))
1214        } else {
1215            Box::new(HashAggregateOperator::new(
1216                input_op,
1217                group_columns,
1218                physical_aggregates,
1219                output_schema,
1220            ))
1221        };
1222
1223        // Apply HAVING clause filter if present
1224        if let Some(having_expr) = &agg.having {
1225            // Build variable to column mapping for the aggregate output
1226            let having_var_columns: HashMap<String, usize> = output_columns
1227                .iter()
1228                .enumerate()
1229                .map(|(i, name)| (name.clone(), i))
1230                .collect();
1231
1232            let filter_expr = self.convert_expression(having_expr)?;
1233            let predicate =
1234                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1235            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1236        }
1237
1238        Ok((operator, output_columns))
1239    }
1240
1241    /// Checks if an aggregate is simple enough for factorized execution.
1242    ///
1243    /// Simple aggregates:
1244    /// - COUNT(*) or COUNT(variable)
1245    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1246    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1247        agg.aggregates.iter().all(|agg_expr| {
1248            match agg_expr.function {
1249                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1250                    // COUNT(*) is always OK, COUNT(var) is OK
1251                    agg_expr.expression.is_none()
1252                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1253                }
1254                LogicalAggregateFunction::Sum
1255                | LogicalAggregateFunction::Avg
1256                | LogicalAggregateFunction::Min
1257                | LogicalAggregateFunction::Max => {
1258                    // For now, only support when expression is a variable
1259                    // (property access would require flattening first)
1260                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1261                }
1262                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1263                _ => false,
1264            }
1265        })
1266    }
1267
1268    /// Plans a factorized aggregate that operates directly on factorized data.
1269    ///
1270    /// This avoids the O(n²) cost of flattening before aggregation.
1271    fn plan_factorized_aggregate(
1272        &self,
1273        agg: &AggregateOp,
1274    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1275        // Build the expand chain - this returns a LazyFactorizedChainOperator
1276        let expands = Self::collect_expand_chain(&agg.input);
1277        if expands.is_empty() {
1278            return Err(Error::Internal(
1279                "Expected expand chain for factorized aggregate".to_string(),
1280            ));
1281        }
1282
1283        // Get the base operator (before first expand)
1284        let first_expand = expands[0];
1285        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1286
1287        let mut columns = base_columns.clone();
1288        let mut steps = Vec::new();
1289        let mut is_first = true;
1290
1291        for expand in &expands {
1292            // Find source column for this expand
1293            let source_column = if is_first {
1294                base_columns
1295                    .iter()
1296                    .position(|c| c == &expand.from_variable)
1297                    .ok_or_else(|| {
1298                        Error::Internal(format!(
1299                            "Source variable '{}' not found in base columns",
1300                            expand.from_variable
1301                        ))
1302                    })?
1303            } else {
1304                1 // Target from previous level
1305            };
1306
1307            let direction = match expand.direction {
1308                ExpandDirection::Outgoing => Direction::Outgoing,
1309                ExpandDirection::Incoming => Direction::Incoming,
1310                ExpandDirection::Both => Direction::Both,
1311            };
1312
1313            steps.push(ExpandStep {
1314                source_column,
1315                direction,
1316                edge_type: expand.edge_type.clone(),
1317            });
1318
1319            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1320                let count = self.anon_edge_counter.get();
1321                self.anon_edge_counter.set(count + 1);
1322                format!("_anon_edge_{}", count)
1323            });
1324            columns.push(edge_col_name);
1325            columns.push(expand.to_variable.clone());
1326
1327            is_first = false;
1328        }
1329
1330        // Create the lazy factorized chain operator
1331        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1332
1333        if let Some(tx_id) = self.tx_id {
1334            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1335        } else {
1336            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1337        }
1338
1339        // Convert logical aggregates to factorized aggregates
1340        let factorized_aggs: Vec<FactorizedAggregate> = agg
1341            .aggregates
1342            .iter()
1343            .map(|agg_expr| {
1344                match agg_expr.function {
1345                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1346                        // COUNT(*) uses simple count, COUNT(col) uses column count
1347                        if agg_expr.expression.is_none() {
1348                            FactorizedAggregate::count()
1349                        } else {
1350                            // For COUNT(variable), we use the deepest level's target column
1351                            // which is the last column added to the schema
1352                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1353                        }
1354                    }
1355                    LogicalAggregateFunction::Sum => {
1356                        // SUM on deepest level target
1357                        FactorizedAggregate::sum(1)
1358                    }
1359                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1360                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1361                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1362                    _ => {
1363                        // Shouldn't reach here due to is_simple_aggregate check
1364                        FactorizedAggregate::count()
1365                    }
1366                }
1367            })
1368            .collect();
1369
1370        // Build output column names
1371        let output_columns: Vec<String> = agg
1372            .aggregates
1373            .iter()
1374            .map(|agg_expr| {
1375                agg_expr
1376                    .alias
1377                    .clone()
1378                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1379            })
1380            .collect();
1381
1382        // Create the factorized aggregate operator
1383        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1384
1385        Ok((Box::new(factorized_agg_op), output_columns))
1386    }
1387
1388    /// Resolves a logical expression to a column index.
1389    #[allow(dead_code)]
1390    fn resolve_expression_to_column(
1391        &self,
1392        expr: &LogicalExpression,
1393        variable_columns: &HashMap<String, usize>,
1394    ) -> Result<usize> {
1395        match expr {
1396            LogicalExpression::Variable(name) => variable_columns
1397                .get(name)
1398                .copied()
1399                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1400            LogicalExpression::Property { variable, .. } => variable_columns
1401                .get(variable)
1402                .copied()
1403                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1404            _ => Err(Error::Internal(format!(
1405                "Cannot resolve expression to column: {:?}",
1406                expr
1407            ))),
1408        }
1409    }
1410
1411    /// Resolves a logical expression to a column index, using projected property columns.
1412    ///
1413    /// This is used for aggregations where properties have been projected into their own columns.
1414    fn resolve_expression_to_column_with_properties(
1415        &self,
1416        expr: &LogicalExpression,
1417        variable_columns: &HashMap<String, usize>,
1418    ) -> Result<usize> {
1419        match expr {
1420            LogicalExpression::Variable(name) => variable_columns
1421                .get(name)
1422                .copied()
1423                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1424            LogicalExpression::Property { variable, property } => {
1425                // Look up the projected property column (e.g., "p_price" for p.price)
1426                let col_name = format!("{}_{}", variable, property);
1427                variable_columns.get(&col_name).copied().ok_or_else(|| {
1428                    Error::Internal(format!(
1429                        "Property column '{}' not found (from {}.{})",
1430                        col_name, variable, property
1431                    ))
1432                })
1433            }
1434            _ => Err(Error::Internal(format!(
1435                "Cannot resolve expression to column: {:?}",
1436                expr
1437            ))),
1438        }
1439    }
1440
1441    /// Converts a logical expression to a filter expression.
1442    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1443        match expr {
1444            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1445            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1446            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1447                variable: variable.clone(),
1448                property: property.clone(),
1449            }),
1450            LogicalExpression::Binary { left, op, right } => {
1451                let left_expr = self.convert_expression(left)?;
1452                let right_expr = self.convert_expression(right)?;
1453                let filter_op = convert_binary_op(*op)?;
1454                Ok(FilterExpression::Binary {
1455                    left: Box::new(left_expr),
1456                    op: filter_op,
1457                    right: Box::new(right_expr),
1458                })
1459            }
1460            LogicalExpression::Unary { op, operand } => {
1461                let operand_expr = self.convert_expression(operand)?;
1462                let filter_op = convert_unary_op(*op)?;
1463                Ok(FilterExpression::Unary {
1464                    op: filter_op,
1465                    operand: Box::new(operand_expr),
1466                })
1467            }
1468            LogicalExpression::FunctionCall { name, args, .. } => {
1469                let filter_args: Vec<FilterExpression> = args
1470                    .iter()
1471                    .map(|a| self.convert_expression(a))
1472                    .collect::<Result<Vec<_>>>()?;
1473                Ok(FilterExpression::FunctionCall {
1474                    name: name.clone(),
1475                    args: filter_args,
1476                })
1477            }
1478            LogicalExpression::Case {
1479                operand,
1480                when_clauses,
1481                else_clause,
1482            } => {
1483                let filter_operand = operand
1484                    .as_ref()
1485                    .map(|e| self.convert_expression(e))
1486                    .transpose()?
1487                    .map(Box::new);
1488                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1489                    .iter()
1490                    .map(|(cond, result)| {
1491                        Ok((
1492                            self.convert_expression(cond)?,
1493                            self.convert_expression(result)?,
1494                        ))
1495                    })
1496                    .collect::<Result<Vec<_>>>()?;
1497                let filter_else = else_clause
1498                    .as_ref()
1499                    .map(|e| self.convert_expression(e))
1500                    .transpose()?
1501                    .map(Box::new);
1502                Ok(FilterExpression::Case {
1503                    operand: filter_operand,
1504                    when_clauses: filter_when_clauses,
1505                    else_clause: filter_else,
1506                })
1507            }
1508            LogicalExpression::List(items) => {
1509                let filter_items: Vec<FilterExpression> = items
1510                    .iter()
1511                    .map(|item| self.convert_expression(item))
1512                    .collect::<Result<Vec<_>>>()?;
1513                Ok(FilterExpression::List(filter_items))
1514            }
1515            LogicalExpression::Map(pairs) => {
1516                let filter_pairs: Vec<(String, FilterExpression)> = pairs
1517                    .iter()
1518                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1519                    .collect::<Result<Vec<_>>>()?;
1520                Ok(FilterExpression::Map(filter_pairs))
1521            }
1522            LogicalExpression::IndexAccess { base, index } => {
1523                let base_expr = self.convert_expression(base)?;
1524                let index_expr = self.convert_expression(index)?;
1525                Ok(FilterExpression::IndexAccess {
1526                    base: Box::new(base_expr),
1527                    index: Box::new(index_expr),
1528                })
1529            }
1530            LogicalExpression::SliceAccess { base, start, end } => {
1531                let base_expr = self.convert_expression(base)?;
1532                let start_expr = start
1533                    .as_ref()
1534                    .map(|s| self.convert_expression(s))
1535                    .transpose()?
1536                    .map(Box::new);
1537                let end_expr = end
1538                    .as_ref()
1539                    .map(|e| self.convert_expression(e))
1540                    .transpose()?
1541                    .map(Box::new);
1542                Ok(FilterExpression::SliceAccess {
1543                    base: Box::new(base_expr),
1544                    start: start_expr,
1545                    end: end_expr,
1546                })
1547            }
1548            LogicalExpression::Parameter(_) => Err(Error::Internal(
1549                "Parameters not yet supported in filters".to_string(),
1550            )),
1551            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1552            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1553            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1554            LogicalExpression::ListComprehension {
1555                variable,
1556                list_expr,
1557                filter_expr,
1558                map_expr,
1559            } => {
1560                let list = self.convert_expression(list_expr)?;
1561                let filter = filter_expr
1562                    .as_ref()
1563                    .map(|f| self.convert_expression(f))
1564                    .transpose()?
1565                    .map(Box::new);
1566                let map = self.convert_expression(map_expr)?;
1567                Ok(FilterExpression::ListComprehension {
1568                    variable: variable.clone(),
1569                    list_expr: Box::new(list),
1570                    filter_expr: filter,
1571                    map_expr: Box::new(map),
1572                })
1573            }
1574            LogicalExpression::ExistsSubquery(subplan) => {
1575                // Extract the pattern from the subplan
1576                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
1577                let (start_var, direction, edge_type, end_labels) =
1578                    self.extract_exists_pattern(subplan)?;
1579
1580                Ok(FilterExpression::ExistsSubquery {
1581                    start_var,
1582                    direction,
1583                    edge_type,
1584                    end_labels,
1585                    min_hops: None,
1586                    max_hops: None,
1587                })
1588            }
1589            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
1590                "COUNT subqueries not yet supported".to_string(),
1591            )),
1592        }
1593    }
1594
1595    /// Extracts the pattern from an EXISTS subplan.
1596    /// Returns (start_variable, direction, edge_type, end_labels).
1597    fn extract_exists_pattern(
1598        &self,
1599        subplan: &LogicalOperator,
1600    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
1601        match subplan {
1602            LogicalOperator::Expand(expand) => {
1603                // Get end node labels from the to_variable if there's a node scan input
1604                let end_labels = self.extract_end_labels_from_expand(expand);
1605                let direction = match expand.direction {
1606                    ExpandDirection::Outgoing => Direction::Outgoing,
1607                    ExpandDirection::Incoming => Direction::Incoming,
1608                    ExpandDirection::Both => Direction::Both,
1609                };
1610                Ok((
1611                    expand.from_variable.clone(),
1612                    direction,
1613                    expand.edge_type.clone(),
1614                    end_labels,
1615                ))
1616            }
1617            LogicalOperator::NodeScan(scan) => {
1618                if let Some(input) = &scan.input {
1619                    self.extract_exists_pattern(input)
1620                } else {
1621                    Err(Error::Internal(
1622                        "EXISTS subquery must contain an edge pattern".to_string(),
1623                    ))
1624                }
1625            }
1626            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
1627            _ => Err(Error::Internal(
1628                "Unsupported EXISTS subquery pattern".to_string(),
1629            )),
1630        }
1631    }
1632
1633    /// Extracts end node labels from an Expand operator if present.
1634    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
1635        // Check if the expand has a NodeScan input with a label filter
1636        match expand.input.as_ref() {
1637            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
1638            _ => None,
1639        }
1640    }
1641
1642    /// Plans a JOIN operator.
1643    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1644        let (left_op, left_columns) = self.plan_operator(&join.left)?;
1645        let (right_op, right_columns) = self.plan_operator(&join.right)?;
1646
1647        // Build combined output columns
1648        let mut columns = left_columns.clone();
1649        columns.extend(right_columns.clone());
1650
1651        // Convert join type
1652        let physical_join_type = match join.join_type {
1653            JoinType::Inner => PhysicalJoinType::Inner,
1654            JoinType::Left => PhysicalJoinType::Left,
1655            JoinType::Right => PhysicalJoinType::Right,
1656            JoinType::Full => PhysicalJoinType::Full,
1657            JoinType::Cross => PhysicalJoinType::Cross,
1658            JoinType::Semi => PhysicalJoinType::Semi,
1659            JoinType::Anti => PhysicalJoinType::Anti,
1660        };
1661
1662        // Build key columns from join conditions
1663        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
1664            // Cross join - no keys
1665            (vec![], vec![])
1666        } else {
1667            join.conditions
1668                .iter()
1669                .filter_map(|cond| {
1670                    // Try to extract column indices from expressions
1671                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
1672                    let right_idx = self
1673                        .expression_to_column(&cond.right, &right_columns)
1674                        .ok()?;
1675                    Some((left_idx, right_idx))
1676                })
1677                .unzip()
1678        };
1679
1680        let output_schema = self.derive_schema_from_columns(&columns);
1681
1682        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1683            left_op,
1684            right_op,
1685            probe_keys,
1686            build_keys,
1687            physical_join_type,
1688            output_schema,
1689        ));
1690
1691        Ok((operator, columns))
1692    }
1693
1694    /// Extracts a column index from an expression.
1695    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
1696        match expr {
1697            LogicalExpression::Variable(name) => columns
1698                .iter()
1699                .position(|c| c == name)
1700                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1701            _ => Err(Error::Internal(
1702                "Only variables supported in join conditions".to_string(),
1703            )),
1704        }
1705    }
1706
1707    /// Plans a UNION operator.
1708    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1709        if union.inputs.is_empty() {
1710            return Err(Error::Internal(
1711                "Union requires at least one input".to_string(),
1712            ));
1713        }
1714
1715        let mut inputs = Vec::with_capacity(union.inputs.len());
1716        let mut columns = Vec::new();
1717
1718        for (i, input) in union.inputs.iter().enumerate() {
1719            let (op, cols) = self.plan_operator(input)?;
1720            if i == 0 {
1721                columns = cols;
1722            }
1723            inputs.push(op);
1724        }
1725
1726        let output_schema = self.derive_schema_from_columns(&columns);
1727        let operator = Box::new(UnionOperator::new(inputs, output_schema));
1728
1729        Ok((operator, columns))
1730    }
1731
1732    /// Plans a DISTINCT operator.
1733    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1734        let (input_op, columns) = self.plan_operator(&distinct.input)?;
1735        let output_schema = self.derive_schema_from_columns(&columns);
1736        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
1737        Ok((operator, columns))
1738    }
1739
1740    /// Plans a CREATE NODE operator.
1741    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1742        // Plan input if present
1743        let (input_op, mut columns) = if let Some(ref input) = create.input {
1744            let (op, cols) = self.plan_operator(input)?;
1745            (Some(op), cols)
1746        } else {
1747            (None, vec![])
1748        };
1749
1750        // Output column for the created node
1751        let output_column = columns.len();
1752        columns.push(create.variable.clone());
1753
1754        // Convert properties
1755        let properties: Vec<(String, PropertySource)> = create
1756            .properties
1757            .iter()
1758            .map(|(name, expr)| {
1759                let source = match expr {
1760                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1761                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1762                };
1763                (name.clone(), source)
1764            })
1765            .collect();
1766
1767        let output_schema = self.derive_schema_from_columns(&columns);
1768
1769        let operator = Box::new(
1770            CreateNodeOperator::new(
1771                Arc::clone(&self.store),
1772                input_op,
1773                create.labels.clone(),
1774                properties,
1775                output_schema,
1776                output_column,
1777            )
1778            .with_tx_context(self.viewing_epoch, self.tx_id),
1779        );
1780
1781        Ok((operator, columns))
1782    }
1783
1784    /// Plans a CREATE EDGE operator.
1785    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1786        let (input_op, mut columns) = self.plan_operator(&create.input)?;
1787
1788        // Find source and target columns
1789        let from_column = columns
1790            .iter()
1791            .position(|c| c == &create.from_variable)
1792            .ok_or_else(|| {
1793                Error::Internal(format!(
1794                    "Source variable '{}' not found",
1795                    create.from_variable
1796                ))
1797            })?;
1798
1799        let to_column = columns
1800            .iter()
1801            .position(|c| c == &create.to_variable)
1802            .ok_or_else(|| {
1803                Error::Internal(format!(
1804                    "Target variable '{}' not found",
1805                    create.to_variable
1806                ))
1807            })?;
1808
1809        // Output column for the created edge (if named)
1810        let output_column = create.variable.as_ref().map(|v| {
1811            let idx = columns.len();
1812            columns.push(v.clone());
1813            idx
1814        });
1815
1816        // Convert properties
1817        let properties: Vec<(String, PropertySource)> = create
1818            .properties
1819            .iter()
1820            .map(|(name, expr)| {
1821                let source = match expr {
1822                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1823                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1824                };
1825                (name.clone(), source)
1826            })
1827            .collect();
1828
1829        let output_schema = self.derive_schema_from_columns(&columns);
1830
1831        let operator = Box::new(
1832            CreateEdgeOperator::new(
1833                Arc::clone(&self.store),
1834                input_op,
1835                from_column,
1836                to_column,
1837                create.edge_type.clone(),
1838                properties,
1839                output_schema,
1840                output_column,
1841            )
1842            .with_tx_context(self.viewing_epoch, self.tx_id),
1843        );
1844
1845        Ok((operator, columns))
1846    }
1847
1848    /// Plans a DELETE NODE operator.
1849    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1850        let (input_op, columns) = self.plan_operator(&delete.input)?;
1851
1852        let node_column = columns
1853            .iter()
1854            .position(|c| c == &delete.variable)
1855            .ok_or_else(|| {
1856                Error::Internal(format!(
1857                    "Variable '{}' not found for delete",
1858                    delete.variable
1859                ))
1860            })?;
1861
1862        // Output schema for delete count
1863        let output_schema = vec![LogicalType::Int64];
1864        let output_columns = vec!["deleted_count".to_string()];
1865
1866        let operator = Box::new(
1867            DeleteNodeOperator::new(
1868                Arc::clone(&self.store),
1869                input_op,
1870                node_column,
1871                output_schema,
1872                delete.detach, // DETACH DELETE deletes connected edges first
1873            )
1874            .with_tx_context(self.viewing_epoch, self.tx_id),
1875        );
1876
1877        Ok((operator, output_columns))
1878    }
1879
1880    /// Plans a DELETE EDGE operator.
1881    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1882        let (input_op, columns) = self.plan_operator(&delete.input)?;
1883
1884        let edge_column = columns
1885            .iter()
1886            .position(|c| c == &delete.variable)
1887            .ok_or_else(|| {
1888                Error::Internal(format!(
1889                    "Variable '{}' not found for delete",
1890                    delete.variable
1891                ))
1892            })?;
1893
1894        // Output schema for delete count
1895        let output_schema = vec![LogicalType::Int64];
1896        let output_columns = vec!["deleted_count".to_string()];
1897
1898        let operator = Box::new(
1899            DeleteEdgeOperator::new(
1900                Arc::clone(&self.store),
1901                input_op,
1902                edge_column,
1903                output_schema,
1904            )
1905            .with_tx_context(self.viewing_epoch, self.tx_id),
1906        );
1907
1908        Ok((operator, output_columns))
1909    }
1910
1911    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
1912    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1913        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
1914        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
1915
1916        // Build combined output columns (left + right)
1917        let mut columns = left_columns.clone();
1918        columns.extend(right_columns.clone());
1919
1920        // Find common variables between left and right for join keys
1921        let mut probe_keys = Vec::new();
1922        let mut build_keys = Vec::new();
1923
1924        for (right_idx, right_col) in right_columns.iter().enumerate() {
1925            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1926                probe_keys.push(left_idx);
1927                build_keys.push(right_idx);
1928            }
1929        }
1930
1931        let output_schema = self.derive_schema_from_columns(&columns);
1932
1933        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1934            left_op,
1935            right_op,
1936            probe_keys,
1937            build_keys,
1938            PhysicalJoinType::Left,
1939            output_schema,
1940        ));
1941
1942        Ok((operator, columns))
1943    }
1944
1945    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
1946    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1947        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
1948        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
1949
1950        // Anti-join only keeps left columns (filters out matching rows)
1951        let columns = left_columns.clone();
1952
1953        // Find common variables between left and right for join keys
1954        let mut probe_keys = Vec::new();
1955        let mut build_keys = Vec::new();
1956
1957        for (right_idx, right_col) in right_columns.iter().enumerate() {
1958            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1959                probe_keys.push(left_idx);
1960                build_keys.push(right_idx);
1961            }
1962        }
1963
1964        let output_schema = self.derive_schema_from_columns(&columns);
1965
1966        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1967            left_op,
1968            right_op,
1969            probe_keys,
1970            build_keys,
1971            PhysicalJoinType::Anti,
1972            output_schema,
1973        ));
1974
1975        Ok((operator, columns))
1976    }
1977
1978    /// Plans an unwind operator.
1979    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1980        // Plan the input operator first
1981        // Handle Empty specially - use a single-row operator
1982        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
1983            if matches!(&*unwind.input, LogicalOperator::Empty) {
1984                // For UNWIND without prior MATCH, create a single-row input
1985                // We need an operator that produces one row with the list to unwind
1986                // For now, use EmptyScan which produces no rows - we'll handle the literal
1987                // list in the unwind operator itself
1988                let literal_list = self.convert_expression(&unwind.expression)?;
1989
1990                // Create a project operator that produces a single row with the list
1991                let single_row_op: Box<dyn Operator> = Box::new(
1992                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
1993                );
1994                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
1995                    single_row_op,
1996                    vec![ProjectExpr::Expression {
1997                        expr: literal_list,
1998                        variable_columns: HashMap::new(),
1999                    }],
2000                    vec![LogicalType::Any],
2001                    Arc::clone(&self.store),
2002                ));
2003
2004                (project_op, vec!["__list__".to_string()])
2005            } else {
2006                self.plan_operator(&unwind.input)?
2007            };
2008
2009        // The UNWIND expression should be a list - we need to find/evaluate it
2010        // For now, we handle the case where the expression references an existing column
2011        // or is a literal list
2012
2013        // Find if the expression references an existing column (like a list property)
2014        let list_col_idx = match &unwind.expression {
2015            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2016            LogicalExpression::Property { variable, .. } => {
2017                // Property access needs to be evaluated - for now we'll need the filter predicate
2018                // to evaluate this. For simple cases, we treat it as a list column.
2019                input_columns.iter().position(|c| c == variable)
2020            }
2021            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2022                // Literal list expression - we'll add it as a virtual column
2023                None
2024            }
2025            _ => None,
2026        };
2027
2028        // Build output columns: all input columns plus the new variable
2029        let mut columns = input_columns.clone();
2030        columns.push(unwind.variable.clone());
2031
2032        // Build output schema
2033        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2034        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2035
2036        // Use the list column index if found, otherwise default to 0
2037        // (in which case the first column should contain the list)
2038        let col_idx = list_col_idx.unwrap_or(0);
2039
2040        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2041            input_op,
2042            col_idx,
2043            unwind.variable.clone(),
2044            output_schema,
2045        ));
2046
2047        Ok((operator, columns))
2048    }
2049
2050    /// Plans a MERGE operator.
2051    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2052        // Plan the input operator if present (skip if Empty)
2053        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2054            Vec::new()
2055        } else {
2056            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2057            cols
2058        };
2059
2060        // Convert match properties from LogicalExpression to Value
2061        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2062            .match_properties
2063            .iter()
2064            .filter_map(|(name, expr)| {
2065                if let LogicalExpression::Literal(v) = expr {
2066                    Some((name.clone(), v.clone()))
2067                } else {
2068                    None // Skip non-literal expressions for now
2069                }
2070            })
2071            .collect();
2072
2073        // Convert ON CREATE properties
2074        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2075            .on_create
2076            .iter()
2077            .filter_map(|(name, expr)| {
2078                if let LogicalExpression::Literal(v) = expr {
2079                    Some((name.clone(), v.clone()))
2080                } else {
2081                    None
2082                }
2083            })
2084            .collect();
2085
2086        // Convert ON MATCH properties
2087        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2088            .on_match
2089            .iter()
2090            .filter_map(|(name, expr)| {
2091                if let LogicalExpression::Literal(v) = expr {
2092                    Some((name.clone(), v.clone()))
2093                } else {
2094                    None
2095                }
2096            })
2097            .collect();
2098
2099        // Add the merged node variable to output columns
2100        columns.push(merge.variable.clone());
2101
2102        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2103            Arc::clone(&self.store),
2104            merge.variable.clone(),
2105            merge.labels.clone(),
2106            match_properties,
2107            on_create_properties,
2108            on_match_properties,
2109        ));
2110
2111        Ok((operator, columns))
2112    }
2113
2114    /// Plans a SHORTEST PATH operator.
2115    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2116        // Plan the input operator
2117        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2118
2119        // Find source and target node columns
2120        let source_column = columns
2121            .iter()
2122            .position(|c| c == &sp.source_var)
2123            .ok_or_else(|| {
2124                Error::Internal(format!(
2125                    "Source variable '{}' not found for shortestPath",
2126                    sp.source_var
2127                ))
2128            })?;
2129
2130        let target_column = columns
2131            .iter()
2132            .position(|c| c == &sp.target_var)
2133            .ok_or_else(|| {
2134                Error::Internal(format!(
2135                    "Target variable '{}' not found for shortestPath",
2136                    sp.target_var
2137                ))
2138            })?;
2139
2140        // Convert direction
2141        let direction = match sp.direction {
2142            ExpandDirection::Outgoing => Direction::Outgoing,
2143            ExpandDirection::Incoming => Direction::Incoming,
2144            ExpandDirection::Both => Direction::Both,
2145        };
2146
2147        // Create the shortest path operator
2148        let operator: Box<dyn Operator> = Box::new(
2149            ShortestPathOperator::new(
2150                Arc::clone(&self.store),
2151                input_op,
2152                source_column,
2153                target_column,
2154                sp.edge_type.clone(),
2155                direction,
2156            )
2157            .with_all_paths(sp.all_paths),
2158        );
2159
2160        // Add path length column with the expected naming convention
2161        // The translator expects _path_length_{alias} format for length(p) calls
2162        columns.push(format!("_path_length_{}", sp.path_alias));
2163
2164        Ok((operator, columns))
2165    }
2166
2167    /// Plans an ADD LABEL operator.
2168    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2169        let (input_op, columns) = self.plan_operator(&add_label.input)?;
2170
2171        // Find the node column
2172        let node_column = columns
2173            .iter()
2174            .position(|c| c == &add_label.variable)
2175            .ok_or_else(|| {
2176                Error::Internal(format!(
2177                    "Variable '{}' not found for ADD LABEL",
2178                    add_label.variable
2179                ))
2180            })?;
2181
2182        // Output schema for update count
2183        let output_schema = vec![LogicalType::Int64];
2184        let output_columns = vec!["labels_added".to_string()];
2185
2186        let operator = Box::new(AddLabelOperator::new(
2187            Arc::clone(&self.store),
2188            input_op,
2189            node_column,
2190            add_label.labels.clone(),
2191            output_schema,
2192        ));
2193
2194        Ok((operator, output_columns))
2195    }
2196
2197    /// Plans a REMOVE LABEL operator.
2198    fn plan_remove_label(
2199        &self,
2200        remove_label: &RemoveLabelOp,
2201    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2202        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2203
2204        // Find the node column
2205        let node_column = columns
2206            .iter()
2207            .position(|c| c == &remove_label.variable)
2208            .ok_or_else(|| {
2209                Error::Internal(format!(
2210                    "Variable '{}' not found for REMOVE LABEL",
2211                    remove_label.variable
2212                ))
2213            })?;
2214
2215        // Output schema for update count
2216        let output_schema = vec![LogicalType::Int64];
2217        let output_columns = vec!["labels_removed".to_string()];
2218
2219        let operator = Box::new(RemoveLabelOperator::new(
2220            Arc::clone(&self.store),
2221            input_op,
2222            node_column,
2223            remove_label.labels.clone(),
2224            output_schema,
2225        ));
2226
2227        Ok((operator, output_columns))
2228    }
2229
2230    /// Plans a SET PROPERTY operator.
2231    fn plan_set_property(
2232        &self,
2233        set_prop: &SetPropertyOp,
2234    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2235        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2236
2237        // Find the entity column (node or edge variable)
2238        let entity_column = columns
2239            .iter()
2240            .position(|c| c == &set_prop.variable)
2241            .ok_or_else(|| {
2242                Error::Internal(format!(
2243                    "Variable '{}' not found for SET",
2244                    set_prop.variable
2245                ))
2246            })?;
2247
2248        // Convert properties to PropertySource
2249        let properties: Vec<(String, PropertySource)> = set_prop
2250            .properties
2251            .iter()
2252            .map(|(name, expr)| {
2253                let source = self.expression_to_property_source(expr, &columns)?;
2254                Ok((name.clone(), source))
2255            })
2256            .collect::<Result<Vec<_>>>()?;
2257
2258        // Output schema preserves input schema (passes through)
2259        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2260        let output_columns = columns.clone();
2261
2262        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
2263        let operator = Box::new(SetPropertyOperator::new_for_node(
2264            Arc::clone(&self.store),
2265            input_op,
2266            entity_column,
2267            properties,
2268            output_schema,
2269        ));
2270
2271        Ok((operator, output_columns))
2272    }
2273
2274    /// Converts a logical expression to a PropertySource.
2275    fn expression_to_property_source(
2276        &self,
2277        expr: &LogicalExpression,
2278        columns: &[String],
2279    ) -> Result<PropertySource> {
2280        match expr {
2281            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2282            LogicalExpression::Variable(name) => {
2283                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2284                    Error::Internal(format!("Variable '{}' not found for property source", name))
2285                })?;
2286                Ok(PropertySource::Column(col_idx))
2287            }
2288            LogicalExpression::Parameter(name) => {
2289                // Parameters should be resolved before planning
2290                // For now, treat as a placeholder
2291                Ok(PropertySource::Constant(
2292                    grafeo_common::types::Value::String(format!("${}", name).into()),
2293                ))
2294            }
2295            _ => Err(Error::Internal(format!(
2296                "Unsupported expression type for property source: {:?}",
2297                expr
2298            ))),
2299        }
2300    }
2301}
2302
2303/// Converts a logical binary operator to a filter binary operator.
2304pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2305    match op {
2306        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2307        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2308        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2309        BinaryOp::Le => Ok(BinaryFilterOp::Le),
2310        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2311        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2312        BinaryOp::And => Ok(BinaryFilterOp::And),
2313        BinaryOp::Or => Ok(BinaryFilterOp::Or),
2314        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2315        BinaryOp::Add => Ok(BinaryFilterOp::Add),
2316        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2317        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2318        BinaryOp::Div => Ok(BinaryFilterOp::Div),
2319        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2320        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2321        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2322        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2323        BinaryOp::In => Ok(BinaryFilterOp::In),
2324        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2325        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2326        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2327            "Binary operator {:?} not yet supported in filters",
2328            op
2329        ))),
2330    }
2331}
2332
2333/// Converts a logical unary operator to a filter unary operator.
2334pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2335    match op {
2336        UnaryOp::Not => Ok(UnaryFilterOp::Not),
2337        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2338        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2339        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2340    }
2341}
2342
2343/// Converts a logical aggregate function to a physical aggregate function.
2344pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2345    match func {
2346        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2347        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2348        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2349        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2350        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2351        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2352        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2353        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2354        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2355        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2356        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2357    }
2358}
2359
2360/// Converts a logical expression to a filter expression.
2361///
2362/// This is a standalone function that can be used by both LPG and RDF planners.
2363pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2364    match expr {
2365        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2366        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2367        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2368            variable: variable.clone(),
2369            property: property.clone(),
2370        }),
2371        LogicalExpression::Binary { left, op, right } => {
2372            let left_expr = convert_filter_expression(left)?;
2373            let right_expr = convert_filter_expression(right)?;
2374            let filter_op = convert_binary_op(*op)?;
2375            Ok(FilterExpression::Binary {
2376                left: Box::new(left_expr),
2377                op: filter_op,
2378                right: Box::new(right_expr),
2379            })
2380        }
2381        LogicalExpression::Unary { op, operand } => {
2382            let operand_expr = convert_filter_expression(operand)?;
2383            let filter_op = convert_unary_op(*op)?;
2384            Ok(FilterExpression::Unary {
2385                op: filter_op,
2386                operand: Box::new(operand_expr),
2387            })
2388        }
2389        LogicalExpression::FunctionCall { name, args, .. } => {
2390            let filter_args: Vec<FilterExpression> = args
2391                .iter()
2392                .map(|a| convert_filter_expression(a))
2393                .collect::<Result<Vec<_>>>()?;
2394            Ok(FilterExpression::FunctionCall {
2395                name: name.clone(),
2396                args: filter_args,
2397            })
2398        }
2399        LogicalExpression::Case {
2400            operand,
2401            when_clauses,
2402            else_clause,
2403        } => {
2404            let filter_operand = operand
2405                .as_ref()
2406                .map(|e| convert_filter_expression(e))
2407                .transpose()?
2408                .map(Box::new);
2409            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2410                .iter()
2411                .map(|(cond, result)| {
2412                    Ok((
2413                        convert_filter_expression(cond)?,
2414                        convert_filter_expression(result)?,
2415                    ))
2416                })
2417                .collect::<Result<Vec<_>>>()?;
2418            let filter_else = else_clause
2419                .as_ref()
2420                .map(|e| convert_filter_expression(e))
2421                .transpose()?
2422                .map(Box::new);
2423            Ok(FilterExpression::Case {
2424                operand: filter_operand,
2425                when_clauses: filter_when_clauses,
2426                else_clause: filter_else,
2427            })
2428        }
2429        LogicalExpression::List(items) => {
2430            let filter_items: Vec<FilterExpression> = items
2431                .iter()
2432                .map(|item| convert_filter_expression(item))
2433                .collect::<Result<Vec<_>>>()?;
2434            Ok(FilterExpression::List(filter_items))
2435        }
2436        LogicalExpression::Map(pairs) => {
2437            let filter_pairs: Vec<(String, FilterExpression)> = pairs
2438                .iter()
2439                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
2440                .collect::<Result<Vec<_>>>()?;
2441            Ok(FilterExpression::Map(filter_pairs))
2442        }
2443        LogicalExpression::IndexAccess { base, index } => {
2444            let base_expr = convert_filter_expression(base)?;
2445            let index_expr = convert_filter_expression(index)?;
2446            Ok(FilterExpression::IndexAccess {
2447                base: Box::new(base_expr),
2448                index: Box::new(index_expr),
2449            })
2450        }
2451        LogicalExpression::SliceAccess { base, start, end } => {
2452            let base_expr = convert_filter_expression(base)?;
2453            let start_expr = start
2454                .as_ref()
2455                .map(|s| convert_filter_expression(s))
2456                .transpose()?
2457                .map(Box::new);
2458            let end_expr = end
2459                .as_ref()
2460                .map(|e| convert_filter_expression(e))
2461                .transpose()?
2462                .map(Box::new);
2463            Ok(FilterExpression::SliceAccess {
2464                base: Box::new(base_expr),
2465                start: start_expr,
2466                end: end_expr,
2467            })
2468        }
2469        LogicalExpression::Parameter(_) => Err(Error::Internal(
2470            "Parameters not yet supported in filters".to_string(),
2471        )),
2472        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2473        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2474        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2475        LogicalExpression::ListComprehension {
2476            variable,
2477            list_expr,
2478            filter_expr,
2479            map_expr,
2480        } => {
2481            let list = convert_filter_expression(list_expr)?;
2482            let filter = filter_expr
2483                .as_ref()
2484                .map(|f| convert_filter_expression(f))
2485                .transpose()?
2486                .map(Box::new);
2487            let map = convert_filter_expression(map_expr)?;
2488            Ok(FilterExpression::ListComprehension {
2489                variable: variable.clone(),
2490                list_expr: Box::new(list),
2491                filter_expr: filter,
2492                map_expr: Box::new(map),
2493            })
2494        }
2495        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
2496            Error::Internal("Subqueries not yet supported in filters".to_string()),
2497        ),
2498    }
2499}
2500
2501/// Infers the logical type from a value.
2502fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
2503    use grafeo_common::types::Value;
2504    match value {
2505        Value::Null => LogicalType::String, // Default type for null
2506        Value::Bool(_) => LogicalType::Bool,
2507        Value::Int64(_) => LogicalType::Int64,
2508        Value::Float64(_) => LogicalType::Float64,
2509        Value::String(_) => LogicalType::String,
2510        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
2511        Value::Timestamp(_) => LogicalType::Timestamp,
2512        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
2513        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
2514    }
2515}
2516
2517/// Converts an expression to a string for column naming.
2518fn expression_to_string(expr: &LogicalExpression) -> String {
2519    match expr {
2520        LogicalExpression::Variable(name) => name.clone(),
2521        LogicalExpression::Property { variable, property } => {
2522            format!("{variable}.{property}")
2523        }
2524        LogicalExpression::Literal(value) => format!("{value:?}"),
2525        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
2526        _ => "expr".to_string(),
2527    }
2528}
2529
2530/// A physical plan ready for execution.
2531pub struct PhysicalPlan {
2532    /// The root physical operator.
2533    pub operator: Box<dyn Operator>,
2534    /// Column names for the result.
2535    pub columns: Vec<String>,
2536    /// Adaptive execution context with cardinality estimates.
2537    ///
2538    /// When adaptive execution is enabled, this context contains estimated
2539    /// cardinalities at various checkpoints in the plan. During execution,
2540    /// actual row counts are recorded and compared against estimates.
2541    pub adaptive_context: Option<AdaptiveContext>,
2542}
2543
2544impl PhysicalPlan {
2545    /// Returns the column names.
2546    #[must_use]
2547    pub fn columns(&self) -> &[String] {
2548        &self.columns
2549    }
2550
2551    /// Consumes the plan and returns the operator.
2552    pub fn into_operator(self) -> Box<dyn Operator> {
2553        self.operator
2554    }
2555
2556    /// Returns the adaptive context, if adaptive execution is enabled.
2557    #[must_use]
2558    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
2559        self.adaptive_context.as_ref()
2560    }
2561
2562    /// Takes ownership of the adaptive context.
2563    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
2564        self.adaptive_context.take()
2565    }
2566}
2567
2568/// Helper operator that returns a single result chunk once.
2569///
2570/// Used by the factorized expand chain to wrap the final result.
2571#[allow(dead_code)]
2572struct SingleResultOperator {
2573    result: Option<grafeo_core::execution::DataChunk>,
2574}
2575
2576impl SingleResultOperator {
2577    #[allow(dead_code)]
2578    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
2579        Self { result }
2580    }
2581}
2582
2583impl Operator for SingleResultOperator {
2584    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
2585        Ok(self.result.take())
2586    }
2587
2588    fn reset(&mut self) {
2589        // Cannot reset - result is consumed
2590    }
2591
2592    fn name(&self) -> &'static str {
2593        "SingleResult"
2594    }
2595}
2596
2597#[cfg(test)]
2598mod tests {
2599    use super::*;
2600    use crate::query::plan::{
2601        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
2602        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
2603        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
2604        SortKey, SortOp,
2605    };
2606    use grafeo_common::types::Value;
2607
2608    fn create_test_store() -> Arc<LpgStore> {
2609        let store = Arc::new(LpgStore::new());
2610        store.create_node(&["Person"]);
2611        store.create_node(&["Person"]);
2612        store.create_node(&["Company"]);
2613        store
2614    }
2615
2616    // ==================== Simple Scan Tests ====================
2617
2618    #[test]
2619    fn test_plan_simple_scan() {
2620        let store = create_test_store();
2621        let planner = Planner::new(store);
2622
2623        // MATCH (n:Person) RETURN n
2624        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2625            items: vec![ReturnItem {
2626                expression: LogicalExpression::Variable("n".to_string()),
2627                alias: None,
2628            }],
2629            distinct: false,
2630            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2631                variable: "n".to_string(),
2632                label: Some("Person".to_string()),
2633                input: None,
2634            })),
2635        }));
2636
2637        let physical = planner.plan(&logical).unwrap();
2638        assert_eq!(physical.columns(), &["n"]);
2639    }
2640
2641    #[test]
2642    fn test_plan_scan_without_label() {
2643        let store = create_test_store();
2644        let planner = Planner::new(store);
2645
2646        // MATCH (n) RETURN n
2647        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2648            items: vec![ReturnItem {
2649                expression: LogicalExpression::Variable("n".to_string()),
2650                alias: None,
2651            }],
2652            distinct: false,
2653            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2654                variable: "n".to_string(),
2655                label: None,
2656                input: None,
2657            })),
2658        }));
2659
2660        let physical = planner.plan(&logical).unwrap();
2661        assert_eq!(physical.columns(), &["n"]);
2662    }
2663
2664    #[test]
2665    fn test_plan_return_with_alias() {
2666        let store = create_test_store();
2667        let planner = Planner::new(store);
2668
2669        // MATCH (n:Person) RETURN n AS person
2670        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2671            items: vec![ReturnItem {
2672                expression: LogicalExpression::Variable("n".to_string()),
2673                alias: Some("person".to_string()),
2674            }],
2675            distinct: false,
2676            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2677                variable: "n".to_string(),
2678                label: Some("Person".to_string()),
2679                input: None,
2680            })),
2681        }));
2682
2683        let physical = planner.plan(&logical).unwrap();
2684        assert_eq!(physical.columns(), &["person"]);
2685    }
2686
2687    #[test]
2688    fn test_plan_return_property() {
2689        let store = create_test_store();
2690        let planner = Planner::new(store);
2691
2692        // MATCH (n:Person) RETURN n.name
2693        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2694            items: vec![ReturnItem {
2695                expression: LogicalExpression::Property {
2696                    variable: "n".to_string(),
2697                    property: "name".to_string(),
2698                },
2699                alias: None,
2700            }],
2701            distinct: false,
2702            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2703                variable: "n".to_string(),
2704                label: Some("Person".to_string()),
2705                input: None,
2706            })),
2707        }));
2708
2709        let physical = planner.plan(&logical).unwrap();
2710        assert_eq!(physical.columns(), &["n.name"]);
2711    }
2712
2713    #[test]
2714    fn test_plan_return_literal() {
2715        let store = create_test_store();
2716        let planner = Planner::new(store);
2717
2718        // MATCH (n) RETURN 42 AS answer
2719        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2720            items: vec![ReturnItem {
2721                expression: LogicalExpression::Literal(Value::Int64(42)),
2722                alias: Some("answer".to_string()),
2723            }],
2724            distinct: false,
2725            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2726                variable: "n".to_string(),
2727                label: None,
2728                input: None,
2729            })),
2730        }));
2731
2732        let physical = planner.plan(&logical).unwrap();
2733        assert_eq!(physical.columns(), &["answer"]);
2734    }
2735
2736    // ==================== Filter Tests ====================
2737
2738    #[test]
2739    fn test_plan_filter_equality() {
2740        let store = create_test_store();
2741        let planner = Planner::new(store);
2742
2743        // MATCH (n:Person) WHERE n.age = 30 RETURN n
2744        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2745            items: vec![ReturnItem {
2746                expression: LogicalExpression::Variable("n".to_string()),
2747                alias: None,
2748            }],
2749            distinct: false,
2750            input: Box::new(LogicalOperator::Filter(FilterOp {
2751                predicate: LogicalExpression::Binary {
2752                    left: Box::new(LogicalExpression::Property {
2753                        variable: "n".to_string(),
2754                        property: "age".to_string(),
2755                    }),
2756                    op: BinaryOp::Eq,
2757                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2758                },
2759                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2760                    variable: "n".to_string(),
2761                    label: Some("Person".to_string()),
2762                    input: None,
2763                })),
2764            })),
2765        }));
2766
2767        let physical = planner.plan(&logical).unwrap();
2768        assert_eq!(physical.columns(), &["n"]);
2769    }
2770
2771    #[test]
2772    fn test_plan_filter_compound_and() {
2773        let store = create_test_store();
2774        let planner = Planner::new(store);
2775
2776        // WHERE n.age > 20 AND n.age < 40
2777        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2778            items: vec![ReturnItem {
2779                expression: LogicalExpression::Variable("n".to_string()),
2780                alias: None,
2781            }],
2782            distinct: false,
2783            input: Box::new(LogicalOperator::Filter(FilterOp {
2784                predicate: LogicalExpression::Binary {
2785                    left: Box::new(LogicalExpression::Binary {
2786                        left: Box::new(LogicalExpression::Property {
2787                            variable: "n".to_string(),
2788                            property: "age".to_string(),
2789                        }),
2790                        op: BinaryOp::Gt,
2791                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
2792                    }),
2793                    op: BinaryOp::And,
2794                    right: Box::new(LogicalExpression::Binary {
2795                        left: Box::new(LogicalExpression::Property {
2796                            variable: "n".to_string(),
2797                            property: "age".to_string(),
2798                        }),
2799                        op: BinaryOp::Lt,
2800                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
2801                    }),
2802                },
2803                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2804                    variable: "n".to_string(),
2805                    label: None,
2806                    input: None,
2807                })),
2808            })),
2809        }));
2810
2811        let physical = planner.plan(&logical).unwrap();
2812        assert_eq!(physical.columns(), &["n"]);
2813    }
2814
2815    #[test]
2816    fn test_plan_filter_unary_not() {
2817        let store = create_test_store();
2818        let planner = Planner::new(store);
2819
2820        // WHERE NOT n.active
2821        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2822            items: vec![ReturnItem {
2823                expression: LogicalExpression::Variable("n".to_string()),
2824                alias: None,
2825            }],
2826            distinct: false,
2827            input: Box::new(LogicalOperator::Filter(FilterOp {
2828                predicate: LogicalExpression::Unary {
2829                    op: UnaryOp::Not,
2830                    operand: Box::new(LogicalExpression::Property {
2831                        variable: "n".to_string(),
2832                        property: "active".to_string(),
2833                    }),
2834                },
2835                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2836                    variable: "n".to_string(),
2837                    label: None,
2838                    input: None,
2839                })),
2840            })),
2841        }));
2842
2843        let physical = planner.plan(&logical).unwrap();
2844        assert_eq!(physical.columns(), &["n"]);
2845    }
2846
2847    #[test]
2848    fn test_plan_filter_is_null() {
2849        let store = create_test_store();
2850        let planner = Planner::new(store);
2851
2852        // WHERE n.email IS NULL
2853        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2854            items: vec![ReturnItem {
2855                expression: LogicalExpression::Variable("n".to_string()),
2856                alias: None,
2857            }],
2858            distinct: false,
2859            input: Box::new(LogicalOperator::Filter(FilterOp {
2860                predicate: LogicalExpression::Unary {
2861                    op: UnaryOp::IsNull,
2862                    operand: Box::new(LogicalExpression::Property {
2863                        variable: "n".to_string(),
2864                        property: "email".to_string(),
2865                    }),
2866                },
2867                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2868                    variable: "n".to_string(),
2869                    label: None,
2870                    input: None,
2871                })),
2872            })),
2873        }));
2874
2875        let physical = planner.plan(&logical).unwrap();
2876        assert_eq!(physical.columns(), &["n"]);
2877    }
2878
2879    #[test]
2880    fn test_plan_filter_function_call() {
2881        let store = create_test_store();
2882        let planner = Planner::new(store);
2883
2884        // WHERE size(n.friends) > 0
2885        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2886            items: vec![ReturnItem {
2887                expression: LogicalExpression::Variable("n".to_string()),
2888                alias: None,
2889            }],
2890            distinct: false,
2891            input: Box::new(LogicalOperator::Filter(FilterOp {
2892                predicate: LogicalExpression::Binary {
2893                    left: Box::new(LogicalExpression::FunctionCall {
2894                        name: "size".to_string(),
2895                        args: vec![LogicalExpression::Property {
2896                            variable: "n".to_string(),
2897                            property: "friends".to_string(),
2898                        }],
2899                        distinct: false,
2900                    }),
2901                    op: BinaryOp::Gt,
2902                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
2903                },
2904                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2905                    variable: "n".to_string(),
2906                    label: None,
2907                    input: None,
2908                })),
2909            })),
2910        }));
2911
2912        let physical = planner.plan(&logical).unwrap();
2913        assert_eq!(physical.columns(), &["n"]);
2914    }
2915
2916    // ==================== Expand Tests ====================
2917
2918    #[test]
2919    fn test_plan_expand_outgoing() {
2920        let store = create_test_store();
2921        let planner = Planner::new(store);
2922
2923        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
2924        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2925            items: vec![
2926                ReturnItem {
2927                    expression: LogicalExpression::Variable("a".to_string()),
2928                    alias: None,
2929                },
2930                ReturnItem {
2931                    expression: LogicalExpression::Variable("b".to_string()),
2932                    alias: None,
2933                },
2934            ],
2935            distinct: false,
2936            input: Box::new(LogicalOperator::Expand(ExpandOp {
2937                from_variable: "a".to_string(),
2938                to_variable: "b".to_string(),
2939                edge_variable: None,
2940                direction: ExpandDirection::Outgoing,
2941                edge_type: Some("KNOWS".to_string()),
2942                min_hops: 1,
2943                max_hops: Some(1),
2944                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2945                    variable: "a".to_string(),
2946                    label: Some("Person".to_string()),
2947                    input: None,
2948                })),
2949                path_alias: None,
2950            })),
2951        }));
2952
2953        let physical = planner.plan(&logical).unwrap();
2954        // The return should have columns [a, b]
2955        assert!(physical.columns().contains(&"a".to_string()));
2956        assert!(physical.columns().contains(&"b".to_string()));
2957    }
2958
2959    #[test]
2960    fn test_plan_expand_with_edge_variable() {
2961        let store = create_test_store();
2962        let planner = Planner::new(store);
2963
2964        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
2965        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2966            items: vec![
2967                ReturnItem {
2968                    expression: LogicalExpression::Variable("a".to_string()),
2969                    alias: None,
2970                },
2971                ReturnItem {
2972                    expression: LogicalExpression::Variable("r".to_string()),
2973                    alias: None,
2974                },
2975                ReturnItem {
2976                    expression: LogicalExpression::Variable("b".to_string()),
2977                    alias: None,
2978                },
2979            ],
2980            distinct: false,
2981            input: Box::new(LogicalOperator::Expand(ExpandOp {
2982                from_variable: "a".to_string(),
2983                to_variable: "b".to_string(),
2984                edge_variable: Some("r".to_string()),
2985                direction: ExpandDirection::Outgoing,
2986                edge_type: Some("KNOWS".to_string()),
2987                min_hops: 1,
2988                max_hops: Some(1),
2989                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2990                    variable: "a".to_string(),
2991                    label: None,
2992                    input: None,
2993                })),
2994                path_alias: None,
2995            })),
2996        }));
2997
2998        let physical = planner.plan(&logical).unwrap();
2999        assert!(physical.columns().contains(&"a".to_string()));
3000        assert!(physical.columns().contains(&"r".to_string()));
3001        assert!(physical.columns().contains(&"b".to_string()));
3002    }
3003
3004    // ==================== Limit/Skip/Sort Tests ====================
3005
3006    #[test]
3007    fn test_plan_limit() {
3008        let store = create_test_store();
3009        let planner = Planner::new(store);
3010
3011        // MATCH (n) RETURN n LIMIT 10
3012        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3013            items: vec![ReturnItem {
3014                expression: LogicalExpression::Variable("n".to_string()),
3015                alias: None,
3016            }],
3017            distinct: false,
3018            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3019                count: 10,
3020                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3021                    variable: "n".to_string(),
3022                    label: None,
3023                    input: None,
3024                })),
3025            })),
3026        }));
3027
3028        let physical = planner.plan(&logical).unwrap();
3029        assert_eq!(physical.columns(), &["n"]);
3030    }
3031
3032    #[test]
3033    fn test_plan_skip() {
3034        let store = create_test_store();
3035        let planner = Planner::new(store);
3036
3037        // MATCH (n) RETURN n SKIP 5
3038        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3039            items: vec![ReturnItem {
3040                expression: LogicalExpression::Variable("n".to_string()),
3041                alias: None,
3042            }],
3043            distinct: false,
3044            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3045                count: 5,
3046                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3047                    variable: "n".to_string(),
3048                    label: None,
3049                    input: None,
3050                })),
3051            })),
3052        }));
3053
3054        let physical = planner.plan(&logical).unwrap();
3055        assert_eq!(physical.columns(), &["n"]);
3056    }
3057
3058    #[test]
3059    fn test_plan_sort() {
3060        let store = create_test_store();
3061        let planner = Planner::new(store);
3062
3063        // MATCH (n) RETURN n ORDER BY n.name ASC
3064        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3065            items: vec![ReturnItem {
3066                expression: LogicalExpression::Variable("n".to_string()),
3067                alias: None,
3068            }],
3069            distinct: false,
3070            input: Box::new(LogicalOperator::Sort(SortOp {
3071                keys: vec![SortKey {
3072                    expression: LogicalExpression::Variable("n".to_string()),
3073                    order: SortOrder::Ascending,
3074                }],
3075                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3076                    variable: "n".to_string(),
3077                    label: None,
3078                    input: None,
3079                })),
3080            })),
3081        }));
3082
3083        let physical = planner.plan(&logical).unwrap();
3084        assert_eq!(physical.columns(), &["n"]);
3085    }
3086
3087    #[test]
3088    fn test_plan_sort_descending() {
3089        let store = create_test_store();
3090        let planner = Planner::new(store);
3091
3092        // ORDER BY n DESC
3093        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3094            items: vec![ReturnItem {
3095                expression: LogicalExpression::Variable("n".to_string()),
3096                alias: None,
3097            }],
3098            distinct: false,
3099            input: Box::new(LogicalOperator::Sort(SortOp {
3100                keys: vec![SortKey {
3101                    expression: LogicalExpression::Variable("n".to_string()),
3102                    order: SortOrder::Descending,
3103                }],
3104                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3105                    variable: "n".to_string(),
3106                    label: None,
3107                    input: None,
3108                })),
3109            })),
3110        }));
3111
3112        let physical = planner.plan(&logical).unwrap();
3113        assert_eq!(physical.columns(), &["n"]);
3114    }
3115
3116    #[test]
3117    fn test_plan_distinct() {
3118        let store = create_test_store();
3119        let planner = Planner::new(store);
3120
3121        // MATCH (n) RETURN DISTINCT n
3122        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3123            items: vec![ReturnItem {
3124                expression: LogicalExpression::Variable("n".to_string()),
3125                alias: None,
3126            }],
3127            distinct: false,
3128            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3129                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3130                    variable: "n".to_string(),
3131                    label: None,
3132                    input: None,
3133                })),
3134                columns: None,
3135            })),
3136        }));
3137
3138        let physical = planner.plan(&logical).unwrap();
3139        assert_eq!(physical.columns(), &["n"]);
3140    }
3141
3142    // ==================== Aggregate Tests ====================
3143
3144    #[test]
3145    fn test_plan_aggregate_count() {
3146        let store = create_test_store();
3147        let planner = Planner::new(store);
3148
3149        // MATCH (n) RETURN count(n)
3150        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3151            items: vec![ReturnItem {
3152                expression: LogicalExpression::Variable("cnt".to_string()),
3153                alias: None,
3154            }],
3155            distinct: false,
3156            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3157                group_by: vec![],
3158                aggregates: vec![LogicalAggregateExpr {
3159                    function: LogicalAggregateFunction::Count,
3160                    expression: Some(LogicalExpression::Variable("n".to_string())),
3161                    distinct: false,
3162                    alias: Some("cnt".to_string()),
3163                    percentile: None,
3164                }],
3165                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3166                    variable: "n".to_string(),
3167                    label: None,
3168                    input: None,
3169                })),
3170                having: None,
3171            })),
3172        }));
3173
3174        let physical = planner.plan(&logical).unwrap();
3175        assert!(physical.columns().contains(&"cnt".to_string()));
3176    }
3177
3178    #[test]
3179    fn test_plan_aggregate_with_group_by() {
3180        let store = create_test_store();
3181        let planner = Planner::new(store);
3182
3183        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
3184        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3185            group_by: vec![LogicalExpression::Property {
3186                variable: "n".to_string(),
3187                property: "city".to_string(),
3188            }],
3189            aggregates: vec![LogicalAggregateExpr {
3190                function: LogicalAggregateFunction::Count,
3191                expression: Some(LogicalExpression::Variable("n".to_string())),
3192                distinct: false,
3193                alias: Some("cnt".to_string()),
3194                percentile: None,
3195            }],
3196            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3197                variable: "n".to_string(),
3198                label: Some("Person".to_string()),
3199                input: None,
3200            })),
3201            having: None,
3202        }));
3203
3204        let physical = planner.plan(&logical).unwrap();
3205        assert_eq!(physical.columns().len(), 2);
3206    }
3207
3208    #[test]
3209    fn test_plan_aggregate_sum() {
3210        let store = create_test_store();
3211        let planner = Planner::new(store);
3212
3213        // SUM(n.value)
3214        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3215            group_by: vec![],
3216            aggregates: vec![LogicalAggregateExpr {
3217                function: LogicalAggregateFunction::Sum,
3218                expression: Some(LogicalExpression::Property {
3219                    variable: "n".to_string(),
3220                    property: "value".to_string(),
3221                }),
3222                distinct: false,
3223                alias: Some("total".to_string()),
3224                percentile: None,
3225            }],
3226            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3227                variable: "n".to_string(),
3228                label: None,
3229                input: None,
3230            })),
3231            having: None,
3232        }));
3233
3234        let physical = planner.plan(&logical).unwrap();
3235        assert!(physical.columns().contains(&"total".to_string()));
3236    }
3237
3238    #[test]
3239    fn test_plan_aggregate_avg() {
3240        let store = create_test_store();
3241        let planner = Planner::new(store);
3242
3243        // AVG(n.score)
3244        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3245            group_by: vec![],
3246            aggregates: vec![LogicalAggregateExpr {
3247                function: LogicalAggregateFunction::Avg,
3248                expression: Some(LogicalExpression::Property {
3249                    variable: "n".to_string(),
3250                    property: "score".to_string(),
3251                }),
3252                distinct: false,
3253                alias: Some("average".to_string()),
3254                percentile: None,
3255            }],
3256            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3257                variable: "n".to_string(),
3258                label: None,
3259                input: None,
3260            })),
3261            having: None,
3262        }));
3263
3264        let physical = planner.plan(&logical).unwrap();
3265        assert!(physical.columns().contains(&"average".to_string()));
3266    }
3267
3268    #[test]
3269    fn test_plan_aggregate_min_max() {
3270        let store = create_test_store();
3271        let planner = Planner::new(store);
3272
3273        // MIN(n.age), MAX(n.age)
3274        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3275            group_by: vec![],
3276            aggregates: vec![
3277                LogicalAggregateExpr {
3278                    function: LogicalAggregateFunction::Min,
3279                    expression: Some(LogicalExpression::Property {
3280                        variable: "n".to_string(),
3281                        property: "age".to_string(),
3282                    }),
3283                    distinct: false,
3284                    alias: Some("youngest".to_string()),
3285                    percentile: None,
3286                },
3287                LogicalAggregateExpr {
3288                    function: LogicalAggregateFunction::Max,
3289                    expression: Some(LogicalExpression::Property {
3290                        variable: "n".to_string(),
3291                        property: "age".to_string(),
3292                    }),
3293                    distinct: false,
3294                    alias: Some("oldest".to_string()),
3295                    percentile: None,
3296                },
3297            ],
3298            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3299                variable: "n".to_string(),
3300                label: None,
3301                input: None,
3302            })),
3303            having: None,
3304        }));
3305
3306        let physical = planner.plan(&logical).unwrap();
3307        assert!(physical.columns().contains(&"youngest".to_string()));
3308        assert!(physical.columns().contains(&"oldest".to_string()));
3309    }
3310
3311    // ==================== Join Tests ====================
3312
3313    #[test]
3314    fn test_plan_inner_join() {
3315        let store = create_test_store();
3316        let planner = Planner::new(store);
3317
3318        // Inner join between two scans
3319        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3320            items: vec![
3321                ReturnItem {
3322                    expression: LogicalExpression::Variable("a".to_string()),
3323                    alias: None,
3324                },
3325                ReturnItem {
3326                    expression: LogicalExpression::Variable("b".to_string()),
3327                    alias: None,
3328                },
3329            ],
3330            distinct: false,
3331            input: Box::new(LogicalOperator::Join(JoinOp {
3332                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3333                    variable: "a".to_string(),
3334                    label: Some("Person".to_string()),
3335                    input: None,
3336                })),
3337                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3338                    variable: "b".to_string(),
3339                    label: Some("Company".to_string()),
3340                    input: None,
3341                })),
3342                join_type: JoinType::Inner,
3343                conditions: vec![JoinCondition {
3344                    left: LogicalExpression::Variable("a".to_string()),
3345                    right: LogicalExpression::Variable("b".to_string()),
3346                }],
3347            })),
3348        }));
3349
3350        let physical = planner.plan(&logical).unwrap();
3351        assert!(physical.columns().contains(&"a".to_string()));
3352        assert!(physical.columns().contains(&"b".to_string()));
3353    }
3354
3355    #[test]
3356    fn test_plan_cross_join() {
3357        let store = create_test_store();
3358        let planner = Planner::new(store);
3359
3360        // Cross join (no conditions)
3361        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3362            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3363                variable: "a".to_string(),
3364                label: None,
3365                input: None,
3366            })),
3367            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3368                variable: "b".to_string(),
3369                label: None,
3370                input: None,
3371            })),
3372            join_type: JoinType::Cross,
3373            conditions: vec![],
3374        }));
3375
3376        let physical = planner.plan(&logical).unwrap();
3377        assert_eq!(physical.columns().len(), 2);
3378    }
3379
3380    #[test]
3381    fn test_plan_left_join() {
3382        let store = create_test_store();
3383        let planner = Planner::new(store);
3384
3385        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3386            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3387                variable: "a".to_string(),
3388                label: None,
3389                input: None,
3390            })),
3391            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3392                variable: "b".to_string(),
3393                label: None,
3394                input: None,
3395            })),
3396            join_type: JoinType::Left,
3397            conditions: vec![],
3398        }));
3399
3400        let physical = planner.plan(&logical).unwrap();
3401        assert_eq!(physical.columns().len(), 2);
3402    }
3403
3404    // ==================== Mutation Tests ====================
3405
3406    #[test]
3407    fn test_plan_create_node() {
3408        let store = create_test_store();
3409        let planner = Planner::new(store);
3410
3411        // CREATE (n:Person {name: 'Alice'})
3412        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3413            variable: "n".to_string(),
3414            labels: vec!["Person".to_string()],
3415            properties: vec![(
3416                "name".to_string(),
3417                LogicalExpression::Literal(Value::String("Alice".into())),
3418            )],
3419            input: None,
3420        }));
3421
3422        let physical = planner.plan(&logical).unwrap();
3423        assert!(physical.columns().contains(&"n".to_string()));
3424    }
3425
3426    #[test]
3427    fn test_plan_create_edge() {
3428        let store = create_test_store();
3429        let planner = Planner::new(store);
3430
3431        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
3432        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
3433            variable: Some("r".to_string()),
3434            from_variable: "a".to_string(),
3435            to_variable: "b".to_string(),
3436            edge_type: "KNOWS".to_string(),
3437            properties: vec![],
3438            input: Box::new(LogicalOperator::Join(JoinOp {
3439                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3440                    variable: "a".to_string(),
3441                    label: None,
3442                    input: None,
3443                })),
3444                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3445                    variable: "b".to_string(),
3446                    label: None,
3447                    input: None,
3448                })),
3449                join_type: JoinType::Cross,
3450                conditions: vec![],
3451            })),
3452        }));
3453
3454        let physical = planner.plan(&logical).unwrap();
3455        assert!(physical.columns().contains(&"r".to_string()));
3456    }
3457
3458    #[test]
3459    fn test_plan_delete_node() {
3460        let store = create_test_store();
3461        let planner = Planner::new(store);
3462
3463        // MATCH (n) DELETE n
3464        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
3465            variable: "n".to_string(),
3466            detach: false,
3467            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3468                variable: "n".to_string(),
3469                label: None,
3470                input: None,
3471            })),
3472        }));
3473
3474        let physical = planner.plan(&logical).unwrap();
3475        assert!(physical.columns().contains(&"deleted_count".to_string()));
3476    }
3477
3478    // ==================== Error Cases ====================
3479
3480    #[test]
3481    fn test_plan_empty_errors() {
3482        let store = create_test_store();
3483        let planner = Planner::new(store);
3484
3485        let logical = LogicalPlan::new(LogicalOperator::Empty);
3486        let result = planner.plan(&logical);
3487        assert!(result.is_err());
3488    }
3489
3490    #[test]
3491    fn test_plan_missing_variable_in_return() {
3492        let store = create_test_store();
3493        let planner = Planner::new(store);
3494
3495        // Return variable that doesn't exist in input
3496        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3497            items: vec![ReturnItem {
3498                expression: LogicalExpression::Variable("missing".to_string()),
3499                alias: None,
3500            }],
3501            distinct: false,
3502            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3503                variable: "n".to_string(),
3504                label: None,
3505                input: None,
3506            })),
3507        }));
3508
3509        let result = planner.plan(&logical);
3510        assert!(result.is_err());
3511    }
3512
3513    // ==================== Helper Function Tests ====================
3514
3515    #[test]
3516    fn test_convert_binary_ops() {
3517        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
3518        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
3519        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
3520        assert!(convert_binary_op(BinaryOp::Le).is_ok());
3521        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
3522        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
3523        assert!(convert_binary_op(BinaryOp::And).is_ok());
3524        assert!(convert_binary_op(BinaryOp::Or).is_ok());
3525        assert!(convert_binary_op(BinaryOp::Add).is_ok());
3526        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
3527        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
3528        assert!(convert_binary_op(BinaryOp::Div).is_ok());
3529    }
3530
3531    #[test]
3532    fn test_convert_unary_ops() {
3533        assert!(convert_unary_op(UnaryOp::Not).is_ok());
3534        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
3535        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
3536        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
3537    }
3538
3539    #[test]
3540    fn test_convert_aggregate_functions() {
3541        assert!(matches!(
3542            convert_aggregate_function(LogicalAggregateFunction::Count),
3543            PhysicalAggregateFunction::Count
3544        ));
3545        assert!(matches!(
3546            convert_aggregate_function(LogicalAggregateFunction::Sum),
3547            PhysicalAggregateFunction::Sum
3548        ));
3549        assert!(matches!(
3550            convert_aggregate_function(LogicalAggregateFunction::Avg),
3551            PhysicalAggregateFunction::Avg
3552        ));
3553        assert!(matches!(
3554            convert_aggregate_function(LogicalAggregateFunction::Min),
3555            PhysicalAggregateFunction::Min
3556        ));
3557        assert!(matches!(
3558            convert_aggregate_function(LogicalAggregateFunction::Max),
3559            PhysicalAggregateFunction::Max
3560        ));
3561    }
3562
3563    #[test]
3564    fn test_planner_accessors() {
3565        let store = create_test_store();
3566        let planner = Planner::new(Arc::clone(&store));
3567
3568        assert!(planner.tx_id().is_none());
3569        assert!(planner.tx_manager().is_none());
3570        let _ = planner.viewing_epoch(); // Just ensure it's accessible
3571    }
3572
3573    #[test]
3574    fn test_physical_plan_accessors() {
3575        let store = create_test_store();
3576        let planner = Planner::new(store);
3577
3578        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
3579            variable: "n".to_string(),
3580            label: None,
3581            input: None,
3582        }));
3583
3584        let physical = planner.plan(&logical).unwrap();
3585        assert_eq!(physical.columns(), &["n"]);
3586
3587        // Test into_operator
3588        let _ = physical.into_operator();
3589    }
3590}