Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::LogicalType;
15use grafeo_common::types::{EpochId, TxId};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
22    ExpressionPredicate, FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
23    JoinType as PhysicalJoinType, LimitOperator, MergeOperator, NestedLoopJoinOperator, NullOrder,
24    Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator, ScanOperator,
25    SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
26    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
27    UnwindOperator, VariableLengthExpandOperator,
28};
29use grafeo_core::graph::{Direction, lpg::LpgStore};
30use std::collections::HashMap;
31use std::sync::Arc;
32
33use crate::transaction::TransactionManager;
34
35/// Converts a logical plan to a physical operator tree.
36pub struct Planner {
37    /// The graph store to scan from.
38    store: Arc<LpgStore>,
39    /// Transaction manager for MVCC operations.
40    tx_manager: Option<Arc<TransactionManager>>,
41    /// Current transaction ID (if in a transaction).
42    tx_id: Option<TxId>,
43    /// Epoch to use for visibility checks.
44    viewing_epoch: EpochId,
45    /// Counter for generating unique anonymous edge column names.
46    anon_edge_counter: std::cell::Cell<u32>,
47}
48
49impl Planner {
50    /// Creates a new planner with the given store.
51    ///
52    /// This creates a planner without transaction context, using the current
53    /// epoch from the store for visibility.
54    #[must_use]
55    pub fn new(store: Arc<LpgStore>) -> Self {
56        let epoch = store.current_epoch();
57        Self {
58            store,
59            tx_manager: None,
60            tx_id: None,
61            viewing_epoch: epoch,
62            anon_edge_counter: std::cell::Cell::new(0),
63        }
64    }
65
66    /// Creates a new planner with transaction context for MVCC-aware planning.
67    ///
68    /// # Arguments
69    ///
70    /// * `store` - The graph store
71    /// * `tx_manager` - Transaction manager for recording reads/writes
72    /// * `tx_id` - Current transaction ID (None for auto-commit)
73    /// * `viewing_epoch` - Epoch to use for version visibility
74    #[must_use]
75    pub fn with_context(
76        store: Arc<LpgStore>,
77        tx_manager: Arc<TransactionManager>,
78        tx_id: Option<TxId>,
79        viewing_epoch: EpochId,
80    ) -> Self {
81        Self {
82            store,
83            tx_manager: Some(tx_manager),
84            tx_id,
85            viewing_epoch,
86            anon_edge_counter: std::cell::Cell::new(0),
87        }
88    }
89
90    /// Returns the viewing epoch for this planner.
91    #[must_use]
92    pub fn viewing_epoch(&self) -> EpochId {
93        self.viewing_epoch
94    }
95
96    /// Returns the transaction ID for this planner, if any.
97    #[must_use]
98    pub fn tx_id(&self) -> Option<TxId> {
99        self.tx_id
100    }
101
102    /// Returns a reference to the transaction manager, if available.
103    #[must_use]
104    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
105        self.tx_manager.as_ref()
106    }
107
108    /// Plans a logical plan into a physical operator.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if planning fails.
113    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
114        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
115        Ok(PhysicalPlan {
116            operator,
117            columns,
118            adaptive_context: None,
119        })
120    }
121
122    /// Plans a logical plan with adaptive execution support.
123    ///
124    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
125    /// joins) that can be monitored during execution to detect estimate errors.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if planning fails.
130    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
131        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
132
133        // Build adaptive context with cardinality estimates
134        let mut adaptive_context = AdaptiveContext::new();
135        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
136
137        Ok(PhysicalPlan {
138            operator,
139            columns,
140            adaptive_context: Some(adaptive_context),
141        })
142    }
143
144    /// Collects cardinality estimates from the logical plan into an adaptive context.
145    fn collect_cardinality_estimates(
146        &self,
147        op: &LogicalOperator,
148        ctx: &mut AdaptiveContext,
149        depth: usize,
150    ) {
151        match op {
152            LogicalOperator::NodeScan(scan) => {
153                // Estimate based on label statistics
154                let estimate = if let Some(label) = &scan.label {
155                    self.store.nodes_by_label(label).len() as f64
156                } else {
157                    self.store.node_count() as f64
158                };
159                let id = format!("scan_{}", scan.variable);
160                ctx.set_estimate(&id, estimate);
161
162                // Recurse into input if present
163                if let Some(input) = &scan.input {
164                    self.collect_cardinality_estimates(input, ctx, depth + 1);
165                }
166            }
167            LogicalOperator::Filter(filter) => {
168                // Default selectivity estimate for filters (30%)
169                let input_estimate = self.estimate_cardinality(&filter.input);
170                let estimate = input_estimate * 0.3;
171                let id = format!("filter_{depth}");
172                ctx.set_estimate(&id, estimate);
173
174                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
175            }
176            LogicalOperator::Expand(expand) => {
177                // Estimate based on average degree
178                let input_estimate = self.estimate_cardinality(&expand.input);
179                let avg_degree = 10.0; // Default estimate
180                let estimate = input_estimate * avg_degree;
181                let id = format!("expand_{}", expand.to_variable);
182                ctx.set_estimate(&id, estimate);
183
184                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
185            }
186            LogicalOperator::Join(join) => {
187                // Estimate join output (product with selectivity)
188                let left_est = self.estimate_cardinality(&join.left);
189                let right_est = self.estimate_cardinality(&join.right);
190                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
191                let id = format!("join_{depth}");
192                ctx.set_estimate(&id, estimate);
193
194                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
195                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
196            }
197            LogicalOperator::Aggregate(agg) => {
198                // Aggregates typically reduce cardinality
199                let input_estimate = self.estimate_cardinality(&agg.input);
200                let estimate = if agg.group_by.is_empty() {
201                    1.0 // Scalar aggregate
202                } else {
203                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
204                };
205                let id = format!("aggregate_{depth}");
206                ctx.set_estimate(&id, estimate);
207
208                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
209            }
210            LogicalOperator::Distinct(distinct) => {
211                let input_estimate = self.estimate_cardinality(&distinct.input);
212                let estimate = (input_estimate * 0.5).max(1.0);
213                let id = format!("distinct_{depth}");
214                ctx.set_estimate(&id, estimate);
215
216                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
217            }
218            LogicalOperator::Return(ret) => {
219                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
220            }
221            LogicalOperator::Limit(limit) => {
222                let input_estimate = self.estimate_cardinality(&limit.input);
223                let estimate = (input_estimate).min(limit.count as f64);
224                let id = format!("limit_{depth}");
225                ctx.set_estimate(&id, estimate);
226
227                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
228            }
229            LogicalOperator::Skip(skip) => {
230                let input_estimate = self.estimate_cardinality(&skip.input);
231                let estimate = (input_estimate - skip.count as f64).max(0.0);
232                let id = format!("skip_{depth}");
233                ctx.set_estimate(&id, estimate);
234
235                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
236            }
237            LogicalOperator::Sort(sort) => {
238                // Sort doesn't change cardinality
239                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
240            }
241            LogicalOperator::Union(union) => {
242                let estimate: f64 = union
243                    .inputs
244                    .iter()
245                    .map(|input| self.estimate_cardinality(input))
246                    .sum();
247                let id = format!("union_{depth}");
248                ctx.set_estimate(&id, estimate);
249
250                for input in &union.inputs {
251                    self.collect_cardinality_estimates(input, ctx, depth + 1);
252                }
253            }
254            _ => {
255                // For other operators, try to recurse into known input patterns
256            }
257        }
258    }
259
260    /// Estimates cardinality for a logical operator subtree.
261    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
262        match op {
263            LogicalOperator::NodeScan(scan) => {
264                if let Some(label) = &scan.label {
265                    self.store.nodes_by_label(label).len() as f64
266                } else {
267                    self.store.node_count() as f64
268                }
269            }
270            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
271            LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
272            LogicalOperator::Join(join) => {
273                let left = self.estimate_cardinality(&join.left);
274                let right = self.estimate_cardinality(&join.right);
275                (left * right).sqrt()
276            }
277            LogicalOperator::Aggregate(agg) => {
278                if agg.group_by.is_empty() {
279                    1.0
280                } else {
281                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
282                }
283            }
284            LogicalOperator::Distinct(distinct) => {
285                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
286            }
287            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
288            LogicalOperator::Limit(limit) => self
289                .estimate_cardinality(&limit.input)
290                .min(limit.count as f64),
291            LogicalOperator::Skip(skip) => {
292                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
293            }
294            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
295            LogicalOperator::Union(union) => union
296                .inputs
297                .iter()
298                .map(|input| self.estimate_cardinality(input))
299                .sum(),
300            _ => 1000.0, // Default estimate for unknown operators
301        }
302    }
303
304    /// Plans a single logical operator.
305    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
306        match op {
307            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
308            LogicalOperator::Expand(expand) => self.plan_expand(expand),
309            LogicalOperator::Return(ret) => self.plan_return(ret),
310            LogicalOperator::Filter(filter) => self.plan_filter(filter),
311            LogicalOperator::Project(project) => self.plan_project(project),
312            LogicalOperator::Limit(limit) => self.plan_limit(limit),
313            LogicalOperator::Skip(skip) => self.plan_skip(skip),
314            LogicalOperator::Sort(sort) => self.plan_sort(sort),
315            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
316            LogicalOperator::Join(join) => self.plan_join(join),
317            LogicalOperator::Union(union) => self.plan_union(union),
318            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
319            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
320            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
321            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
322            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
323            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
324            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
325            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
326            LogicalOperator::Merge(merge) => self.plan_merge(merge),
327            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
328            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
329            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
330            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
331            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
332            _ => Err(Error::Internal(format!(
333                "Unsupported operator: {:?}",
334                std::mem::discriminant(op)
335            ))),
336        }
337    }
338
339    /// Plans a node scan operator.
340    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
341        let scan_op = if let Some(label) = &scan.label {
342            ScanOperator::with_label(Arc::clone(&self.store), label)
343        } else {
344            ScanOperator::new(Arc::clone(&self.store))
345        };
346
347        // Apply MVCC context if available
348        let scan_operator: Box<dyn Operator> =
349            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
350
351        // If there's an input, chain operators with a nested loop join (cross join)
352        if let Some(input) = &scan.input {
353            let (input_op, mut input_columns) = self.plan_operator(input)?;
354
355            // Build output schema: input columns + scan column
356            let mut output_schema: Vec<LogicalType> =
357                input_columns.iter().map(|_| LogicalType::Any).collect();
358            output_schema.push(LogicalType::Node);
359
360            // Add scan column to input columns
361            input_columns.push(scan.variable.clone());
362
363            // Use nested loop join to combine input rows with scanned nodes
364            let join_op = Box::new(NestedLoopJoinOperator::new(
365                input_op,
366                scan_operator,
367                None, // No join condition (cross join)
368                PhysicalJoinType::Cross,
369                output_schema,
370            ));
371
372            Ok((join_op, input_columns))
373        } else {
374            let columns = vec![scan.variable.clone()];
375            Ok((scan_operator, columns))
376        }
377    }
378
379    /// Plans an expand operator.
380    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
381        // Plan the input operator first
382        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
383
384        // Find the source column index
385        let source_column = input_columns
386            .iter()
387            .position(|c| c == &expand.from_variable)
388            .ok_or_else(|| {
389                Error::Internal(format!(
390                    "Source variable '{}' not found in input columns",
391                    expand.from_variable
392                ))
393            })?;
394
395        // Convert expand direction
396        let direction = match expand.direction {
397            ExpandDirection::Outgoing => Direction::Outgoing,
398            ExpandDirection::Incoming => Direction::Incoming,
399            ExpandDirection::Both => Direction::Both,
400        };
401
402        // Check if this is a variable-length path
403        let is_variable_length =
404            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
405
406        let operator: Box<dyn Operator> = if is_variable_length {
407            // Use VariableLengthExpandOperator for multi-hop paths
408            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
409            let mut expand_op = VariableLengthExpandOperator::new(
410                Arc::clone(&self.store),
411                input_op,
412                source_column,
413                direction,
414                expand.edge_type.clone(),
415                expand.min_hops,
416                max_hops,
417            )
418            .with_tx_context(self.viewing_epoch, self.tx_id);
419
420            // If a path alias is set, enable path length output
421            if expand.path_alias.is_some() {
422                expand_op = expand_op.with_path_length_output();
423            }
424
425            Box::new(expand_op)
426        } else {
427            // Use simple ExpandOperator for single-hop paths
428            let expand_op = ExpandOperator::new(
429                Arc::clone(&self.store),
430                input_op,
431                source_column,
432                direction,
433                expand.edge_type.clone(),
434            )
435            .with_tx_context(self.viewing_epoch, self.tx_id);
436            Box::new(expand_op)
437        };
438
439        // Build output columns: [input_columns..., edge, target, (path_length)?]
440        // Preserve all input columns and add edge + target to match ExpandOperator output
441        let mut columns = input_columns;
442
443        // Generate edge column name - use provided name or generate anonymous name
444        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
445            let count = self.anon_edge_counter.get();
446            self.anon_edge_counter.set(count + 1);
447            format!("_anon_edge_{}", count)
448        });
449        columns.push(edge_col_name);
450
451        columns.push(expand.to_variable.clone());
452
453        // If a path alias is set, add a column for the path length
454        if let Some(ref path_alias) = expand.path_alias {
455            columns.push(format!("_path_length_{}", path_alias));
456        }
457
458        Ok((operator, columns))
459    }
460
461    /// Plans a RETURN clause.
462    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
463        // Plan the input operator
464        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
465
466        // Build variable to column index mapping
467        let variable_columns: HashMap<String, usize> = input_columns
468            .iter()
469            .enumerate()
470            .map(|(i, name)| (name.clone(), i))
471            .collect();
472
473        // Extract column names from return items
474        let columns: Vec<String> = ret
475            .items
476            .iter()
477            .map(|item| {
478                item.alias.clone().unwrap_or_else(|| {
479                    // Generate a default name from the expression
480                    expression_to_string(&item.expression)
481                })
482            })
483            .collect();
484
485        // Check if we need a project operator (for property access or expression evaluation)
486        let needs_project = ret
487            .items
488            .iter()
489            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
490
491        if needs_project {
492            // Build project expressions
493            let mut projections = Vec::with_capacity(ret.items.len());
494            let mut output_types = Vec::with_capacity(ret.items.len());
495
496            for item in &ret.items {
497                match &item.expression {
498                    LogicalExpression::Variable(name) => {
499                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
500                            Error::Internal(format!("Variable '{}' not found in input", name))
501                        })?;
502                        projections.push(ProjectExpr::Column(col_idx));
503                        // Use Node type for variables (they could be nodes, edges, or values)
504                        output_types.push(LogicalType::Node);
505                    }
506                    LogicalExpression::Property { variable, property } => {
507                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
508                            Error::Internal(format!("Variable '{}' not found in input", variable))
509                        })?;
510                        projections.push(ProjectExpr::PropertyAccess {
511                            column: col_idx,
512                            property: property.clone(),
513                        });
514                        // Property could be any type - use Any/Generic to preserve type
515                        output_types.push(LogicalType::Any);
516                    }
517                    LogicalExpression::Literal(value) => {
518                        projections.push(ProjectExpr::Constant(value.clone()));
519                        output_types.push(value_to_logical_type(value));
520                    }
521                    LogicalExpression::FunctionCall { name, args, .. } => {
522                        // Handle built-in functions
523                        match name.to_lowercase().as_str() {
524                            "type" => {
525                                // type(r) returns the edge type string
526                                if args.len() != 1 {
527                                    return Err(Error::Internal(
528                                        "type() requires exactly one argument".to_string(),
529                                    ));
530                                }
531                                if let LogicalExpression::Variable(var_name) = &args[0] {
532                                    let col_idx =
533                                        *variable_columns.get(var_name).ok_or_else(|| {
534                                            Error::Internal(format!(
535                                                "Variable '{}' not found in input",
536                                                var_name
537                                            ))
538                                        })?;
539                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
540                                    output_types.push(LogicalType::String);
541                                } else {
542                                    return Err(Error::Internal(
543                                        "type() argument must be a variable".to_string(),
544                                    ));
545                                }
546                            }
547                            "length" => {
548                                // length(p) returns the path length
549                                // For shortestPath results, the path column already contains the length
550                                if args.len() != 1 {
551                                    return Err(Error::Internal(
552                                        "length() requires exactly one argument".to_string(),
553                                    ));
554                                }
555                                if let LogicalExpression::Variable(var_name) = &args[0] {
556                                    let col_idx =
557                                        *variable_columns.get(var_name).ok_or_else(|| {
558                                            Error::Internal(format!(
559                                                "Variable '{}' not found in input",
560                                                var_name
561                                            ))
562                                        })?;
563                                    // Pass through the column value directly
564                                    projections.push(ProjectExpr::Column(col_idx));
565                                    output_types.push(LogicalType::Int64);
566                                } else {
567                                    return Err(Error::Internal(
568                                        "length() argument must be a variable".to_string(),
569                                    ));
570                                }
571                            }
572                            // For other functions (head, tail, size, etc.), use expression evaluation
573                            _ => {
574                                let filter_expr = self.convert_expression(&item.expression)?;
575                                projections.push(ProjectExpr::Expression {
576                                    expr: filter_expr,
577                                    variable_columns: variable_columns.clone(),
578                                });
579                                output_types.push(LogicalType::Any);
580                            }
581                        }
582                    }
583                    LogicalExpression::Case { .. } => {
584                        // Convert CASE expression to FilterExpression for evaluation
585                        let filter_expr = self.convert_expression(&item.expression)?;
586                        projections.push(ProjectExpr::Expression {
587                            expr: filter_expr,
588                            variable_columns: variable_columns.clone(),
589                        });
590                        // CASE can return any type - use Any
591                        output_types.push(LogicalType::Any);
592                    }
593                    _ => {
594                        return Err(Error::Internal(format!(
595                            "Unsupported RETURN expression: {:?}",
596                            item.expression
597                        )));
598                    }
599                }
600            }
601
602            let operator = Box::new(ProjectOperator::with_store(
603                input_op,
604                projections,
605                output_types,
606                Arc::clone(&self.store),
607            ));
608
609            Ok((operator, columns))
610        } else {
611            // Simple case: just return variables
612            // Re-order columns to match return items if needed
613            let mut projections = Vec::with_capacity(ret.items.len());
614            let mut output_types = Vec::with_capacity(ret.items.len());
615
616            for item in &ret.items {
617                if let LogicalExpression::Variable(name) = &item.expression {
618                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
619                        Error::Internal(format!("Variable '{}' not found in input", name))
620                    })?;
621                    projections.push(ProjectExpr::Column(col_idx));
622                    output_types.push(LogicalType::Node);
623                }
624            }
625
626            // Only add ProjectOperator if reordering is needed
627            if projections.len() == input_columns.len()
628                && projections
629                    .iter()
630                    .enumerate()
631                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
632            {
633                // No reordering needed
634                Ok((input_op, columns))
635            } else {
636                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
637                Ok((operator, columns))
638            }
639        }
640    }
641
642    /// Plans a project operator (for WITH clause).
643    fn plan_project(
644        &self,
645        project: &crate::query::plan::ProjectOp,
646    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
647        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
648        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
649            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
650                // Create a single-row operator for projecting literals
651                let single_row_op: Box<dyn Operator> = Box::new(
652                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
653                );
654                (single_row_op, Vec::new())
655            } else {
656                self.plan_operator(&project.input)?
657            };
658
659        // Build variable to column index mapping
660        let variable_columns: HashMap<String, usize> = input_columns
661            .iter()
662            .enumerate()
663            .map(|(i, name)| (name.clone(), i))
664            .collect();
665
666        // Build projections and new column names
667        let mut projections = Vec::with_capacity(project.projections.len());
668        let mut output_types = Vec::with_capacity(project.projections.len());
669        let mut output_columns = Vec::with_capacity(project.projections.len());
670
671        for projection in &project.projections {
672            // Determine the output column name (alias or expression string)
673            let col_name = projection
674                .alias
675                .clone()
676                .unwrap_or_else(|| expression_to_string(&projection.expression));
677            output_columns.push(col_name);
678
679            match &projection.expression {
680                LogicalExpression::Variable(name) => {
681                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
682                        Error::Internal(format!("Variable '{}' not found in input", name))
683                    })?;
684                    projections.push(ProjectExpr::Column(col_idx));
685                    output_types.push(LogicalType::Node);
686                }
687                LogicalExpression::Property { variable, property } => {
688                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
689                        Error::Internal(format!("Variable '{}' not found in input", variable))
690                    })?;
691                    projections.push(ProjectExpr::PropertyAccess {
692                        column: col_idx,
693                        property: property.clone(),
694                    });
695                    output_types.push(LogicalType::Any);
696                }
697                LogicalExpression::Literal(value) => {
698                    projections.push(ProjectExpr::Constant(value.clone()));
699                    output_types.push(value_to_logical_type(value));
700                }
701                _ => {
702                    // For complex expressions, use full expression evaluation
703                    let filter_expr = self.convert_expression(&projection.expression)?;
704                    projections.push(ProjectExpr::Expression {
705                        expr: filter_expr,
706                        variable_columns: variable_columns.clone(),
707                    });
708                    output_types.push(LogicalType::Any);
709                }
710            }
711        }
712
713        let operator = Box::new(ProjectOperator::with_store(
714            input_op,
715            projections,
716            output_types,
717            Arc::clone(&self.store),
718        ));
719
720        Ok((operator, output_columns))
721    }
722
723    /// Plans a filter operator.
724    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
725        // Plan the input operator first
726        let (input_op, columns) = self.plan_operator(&filter.input)?;
727
728        // Build variable to column index mapping
729        let variable_columns: HashMap<String, usize> = columns
730            .iter()
731            .enumerate()
732            .map(|(i, name)| (name.clone(), i))
733            .collect();
734
735        // Convert logical expression to filter expression
736        let filter_expr = self.convert_expression(&filter.predicate)?;
737
738        // Create the predicate
739        let predicate =
740            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
741
742        // Create the filter operator
743        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
744
745        Ok((operator, columns))
746    }
747
748    /// Plans a LIMIT operator.
749    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
750        let (input_op, columns) = self.plan_operator(&limit.input)?;
751        let output_schema = self.derive_schema_from_columns(&columns);
752        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
753        Ok((operator, columns))
754    }
755
756    /// Plans a SKIP operator.
757    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
758        let (input_op, columns) = self.plan_operator(&skip.input)?;
759        let output_schema = self.derive_schema_from_columns(&columns);
760        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
761        Ok((operator, columns))
762    }
763
764    /// Plans a SORT (ORDER BY) operator.
765    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
766        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
767
768        // Build variable to column index mapping
769        let mut variable_columns: HashMap<String, usize> = input_columns
770            .iter()
771            .enumerate()
772            .map(|(i, name)| (name.clone(), i))
773            .collect();
774
775        // Collect property expressions that need to be projected before sorting
776        let mut property_projections: Vec<(String, String, String)> = Vec::new();
777        let mut next_col_idx = input_columns.len();
778
779        for key in &sort.keys {
780            if let LogicalExpression::Property { variable, property } = &key.expression {
781                let col_name = format!("{}_{}", variable, property);
782                if !variable_columns.contains_key(&col_name) {
783                    property_projections.push((
784                        variable.clone(),
785                        property.clone(),
786                        col_name.clone(),
787                    ));
788                    variable_columns.insert(col_name, next_col_idx);
789                    next_col_idx += 1;
790                }
791            }
792        }
793
794        // Track output columns
795        let mut output_columns = input_columns.clone();
796
797        // If we have property expressions, add a projection to materialize them
798        if !property_projections.is_empty() {
799            let mut projections = Vec::new();
800            let mut output_types = Vec::new();
801
802            // First, pass through all existing columns (use Node type to preserve node IDs
803            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
804            for (i, _) in input_columns.iter().enumerate() {
805                projections.push(ProjectExpr::Column(i));
806                output_types.push(LogicalType::Node);
807            }
808
809            // Then add property access projections
810            for (variable, property, col_name) in &property_projections {
811                let source_col = *variable_columns.get(variable).ok_or_else(|| {
812                    Error::Internal(format!(
813                        "Variable '{}' not found for ORDER BY property projection",
814                        variable
815                    ))
816                })?;
817                projections.push(ProjectExpr::PropertyAccess {
818                    column: source_col,
819                    property: property.clone(),
820                });
821                output_types.push(LogicalType::Any);
822                output_columns.push(col_name.clone());
823            }
824
825            input_op = Box::new(ProjectOperator::with_store(
826                input_op,
827                projections,
828                output_types,
829                Arc::clone(&self.store),
830            ));
831        }
832
833        // Convert logical sort keys to physical sort keys
834        let physical_keys: Vec<PhysicalSortKey> = sort
835            .keys
836            .iter()
837            .map(|key| {
838                let col_idx = self
839                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
840                Ok(PhysicalSortKey {
841                    column: col_idx,
842                    direction: match key.order {
843                        SortOrder::Ascending => SortDirection::Ascending,
844                        SortOrder::Descending => SortDirection::Descending,
845                    },
846                    null_order: NullOrder::NullsLast,
847                })
848            })
849            .collect::<Result<Vec<_>>>()?;
850
851        let output_schema = self.derive_schema_from_columns(&output_columns);
852        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
853        Ok((operator, output_columns))
854    }
855
856    /// Resolves a sort expression to a column index, using projected property columns.
857    fn resolve_sort_expression_with_properties(
858        &self,
859        expr: &LogicalExpression,
860        variable_columns: &HashMap<String, usize>,
861    ) -> Result<usize> {
862        match expr {
863            LogicalExpression::Variable(name) => {
864                variable_columns.get(name).copied().ok_or_else(|| {
865                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
866                })
867            }
868            LogicalExpression::Property { variable, property } => {
869                // Look up the projected property column (e.g., "p_age" for p.age)
870                let col_name = format!("{}_{}", variable, property);
871                variable_columns.get(&col_name).copied().ok_or_else(|| {
872                    Error::Internal(format!(
873                        "Property column '{}' not found for ORDER BY (from {}.{})",
874                        col_name, variable, property
875                    ))
876                })
877            }
878            _ => Err(Error::Internal(format!(
879                "Unsupported ORDER BY expression: {:?}",
880                expr
881            ))),
882        }
883    }
884
885    /// Derives a schema from column names (uses Any type to handle all value types).
886    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
887        columns.iter().map(|_| LogicalType::Any).collect()
888    }
889
890    /// Plans an AGGREGATE operator.
891    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
892        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
893
894        // Build variable to column index mapping
895        let mut variable_columns: HashMap<String, usize> = input_columns
896            .iter()
897            .enumerate()
898            .map(|(i, name)| (name.clone(), i))
899            .collect();
900
901        // Collect all property expressions that need to be projected before aggregation
902        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
903        let mut next_col_idx = input_columns.len();
904
905        // Check group-by expressions for properties
906        for expr in &agg.group_by {
907            if let LogicalExpression::Property { variable, property } = expr {
908                let col_name = format!("{}_{}", variable, property);
909                if !variable_columns.contains_key(&col_name) {
910                    property_projections.push((
911                        variable.clone(),
912                        property.clone(),
913                        col_name.clone(),
914                    ));
915                    variable_columns.insert(col_name, next_col_idx);
916                    next_col_idx += 1;
917                }
918            }
919        }
920
921        // Check aggregate expressions for properties
922        for agg_expr in &agg.aggregates {
923            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
924                let col_name = format!("{}_{}", variable, property);
925                if !variable_columns.contains_key(&col_name) {
926                    property_projections.push((
927                        variable.clone(),
928                        property.clone(),
929                        col_name.clone(),
930                    ));
931                    variable_columns.insert(col_name, next_col_idx);
932                    next_col_idx += 1;
933                }
934            }
935        }
936
937        // If we have property expressions, add a projection to materialize them
938        if !property_projections.is_empty() {
939            let mut projections = Vec::new();
940            let mut output_types = Vec::new();
941
942            // First, pass through all existing columns (use Node type to preserve node IDs
943            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
944            for (i, _) in input_columns.iter().enumerate() {
945                projections.push(ProjectExpr::Column(i));
946                output_types.push(LogicalType::Node);
947            }
948
949            // Then add property access projections
950            for (variable, property, _col_name) in &property_projections {
951                let source_col = *variable_columns.get(variable).ok_or_else(|| {
952                    Error::Internal(format!(
953                        "Variable '{}' not found for property projection",
954                        variable
955                    ))
956                })?;
957                projections.push(ProjectExpr::PropertyAccess {
958                    column: source_col,
959                    property: property.clone(),
960                });
961                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
962            }
963
964            input_op = Box::new(ProjectOperator::with_store(
965                input_op,
966                projections,
967                output_types,
968                Arc::clone(&self.store),
969            ));
970        }
971
972        // Convert group-by expressions to column indices
973        let group_columns: Vec<usize> = agg
974            .group_by
975            .iter()
976            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
977            .collect::<Result<Vec<_>>>()?;
978
979        // Convert aggregate expressions to physical form
980        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
981            .aggregates
982            .iter()
983            .map(|agg_expr| {
984                let column = agg_expr
985                    .expression
986                    .as_ref()
987                    .map(|e| {
988                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
989                    })
990                    .transpose()?;
991
992                Ok(PhysicalAggregateExpr {
993                    function: convert_aggregate_function(agg_expr.function),
994                    column,
995                    distinct: agg_expr.distinct,
996                    alias: agg_expr.alias.clone(),
997                    percentile: agg_expr.percentile,
998                })
999            })
1000            .collect::<Result<Vec<_>>>()?;
1001
1002        // Build output schema and column names
1003        let mut output_schema = Vec::new();
1004        let mut output_columns = Vec::new();
1005
1006        // Add group-by columns
1007        for expr in &agg.group_by {
1008            output_schema.push(LogicalType::Any); // Group-by values can be any type
1009            output_columns.push(expression_to_string(expr));
1010        }
1011
1012        // Add aggregate result columns
1013        for agg_expr in &agg.aggregates {
1014            let result_type = match agg_expr.function {
1015                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1016                    LogicalType::Int64
1017                }
1018                LogicalAggregateFunction::Sum => LogicalType::Int64,
1019                LogicalAggregateFunction::Avg => LogicalType::Float64,
1020                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1021                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1022                    // since the aggregate can return any Value type, but the most common case
1023                    // is numeric values from property expressions
1024                    LogicalType::Int64
1025                }
1026                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1027                // Statistical functions return Float64
1028                LogicalAggregateFunction::StdDev
1029                | LogicalAggregateFunction::StdDevPop
1030                | LogicalAggregateFunction::PercentileDisc
1031                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1032            };
1033            output_schema.push(result_type);
1034            output_columns.push(
1035                agg_expr
1036                    .alias
1037                    .clone()
1038                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1039            );
1040        }
1041
1042        // Choose operator based on whether there are group-by columns
1043        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1044            Box::new(SimpleAggregateOperator::new(
1045                input_op,
1046                physical_aggregates,
1047                output_schema,
1048            ))
1049        } else {
1050            Box::new(HashAggregateOperator::new(
1051                input_op,
1052                group_columns,
1053                physical_aggregates,
1054                output_schema,
1055            ))
1056        };
1057
1058        // Apply HAVING clause filter if present
1059        if let Some(having_expr) = &agg.having {
1060            // Build variable to column mapping for the aggregate output
1061            let having_var_columns: HashMap<String, usize> = output_columns
1062                .iter()
1063                .enumerate()
1064                .map(|(i, name)| (name.clone(), i))
1065                .collect();
1066
1067            let filter_expr = self.convert_expression(having_expr)?;
1068            let predicate =
1069                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1070            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1071        }
1072
1073        Ok((operator, output_columns))
1074    }
1075
1076    /// Resolves a logical expression to a column index.
1077    #[allow(dead_code)]
1078    fn resolve_expression_to_column(
1079        &self,
1080        expr: &LogicalExpression,
1081        variable_columns: &HashMap<String, usize>,
1082    ) -> Result<usize> {
1083        match expr {
1084            LogicalExpression::Variable(name) => variable_columns
1085                .get(name)
1086                .copied()
1087                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1088            LogicalExpression::Property { variable, .. } => variable_columns
1089                .get(variable)
1090                .copied()
1091                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1092            _ => Err(Error::Internal(format!(
1093                "Cannot resolve expression to column: {:?}",
1094                expr
1095            ))),
1096        }
1097    }
1098
1099    /// Resolves a logical expression to a column index, using projected property columns.
1100    ///
1101    /// This is used for aggregations where properties have been projected into their own columns.
1102    fn resolve_expression_to_column_with_properties(
1103        &self,
1104        expr: &LogicalExpression,
1105        variable_columns: &HashMap<String, usize>,
1106    ) -> Result<usize> {
1107        match expr {
1108            LogicalExpression::Variable(name) => variable_columns
1109                .get(name)
1110                .copied()
1111                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1112            LogicalExpression::Property { variable, property } => {
1113                // Look up the projected property column (e.g., "p_price" for p.price)
1114                let col_name = format!("{}_{}", variable, property);
1115                variable_columns.get(&col_name).copied().ok_or_else(|| {
1116                    Error::Internal(format!(
1117                        "Property column '{}' not found (from {}.{})",
1118                        col_name, variable, property
1119                    ))
1120                })
1121            }
1122            _ => Err(Error::Internal(format!(
1123                "Cannot resolve expression to column: {:?}",
1124                expr
1125            ))),
1126        }
1127    }
1128
1129    /// Converts a logical expression to a filter expression.
1130    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1131        match expr {
1132            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1133            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1134            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1135                variable: variable.clone(),
1136                property: property.clone(),
1137            }),
1138            LogicalExpression::Binary { left, op, right } => {
1139                let left_expr = self.convert_expression(left)?;
1140                let right_expr = self.convert_expression(right)?;
1141                let filter_op = convert_binary_op(*op)?;
1142                Ok(FilterExpression::Binary {
1143                    left: Box::new(left_expr),
1144                    op: filter_op,
1145                    right: Box::new(right_expr),
1146                })
1147            }
1148            LogicalExpression::Unary { op, operand } => {
1149                let operand_expr = self.convert_expression(operand)?;
1150                let filter_op = convert_unary_op(*op)?;
1151                Ok(FilterExpression::Unary {
1152                    op: filter_op,
1153                    operand: Box::new(operand_expr),
1154                })
1155            }
1156            LogicalExpression::FunctionCall { name, args, .. } => {
1157                let filter_args: Vec<FilterExpression> = args
1158                    .iter()
1159                    .map(|a| self.convert_expression(a))
1160                    .collect::<Result<Vec<_>>>()?;
1161                Ok(FilterExpression::FunctionCall {
1162                    name: name.clone(),
1163                    args: filter_args,
1164                })
1165            }
1166            LogicalExpression::Case {
1167                operand,
1168                when_clauses,
1169                else_clause,
1170            } => {
1171                let filter_operand = operand
1172                    .as_ref()
1173                    .map(|e| self.convert_expression(e))
1174                    .transpose()?
1175                    .map(Box::new);
1176                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1177                    .iter()
1178                    .map(|(cond, result)| {
1179                        Ok((
1180                            self.convert_expression(cond)?,
1181                            self.convert_expression(result)?,
1182                        ))
1183                    })
1184                    .collect::<Result<Vec<_>>>()?;
1185                let filter_else = else_clause
1186                    .as_ref()
1187                    .map(|e| self.convert_expression(e))
1188                    .transpose()?
1189                    .map(Box::new);
1190                Ok(FilterExpression::Case {
1191                    operand: filter_operand,
1192                    when_clauses: filter_when_clauses,
1193                    else_clause: filter_else,
1194                })
1195            }
1196            LogicalExpression::List(items) => {
1197                let filter_items: Vec<FilterExpression> = items
1198                    .iter()
1199                    .map(|item| self.convert_expression(item))
1200                    .collect::<Result<Vec<_>>>()?;
1201                Ok(FilterExpression::List(filter_items))
1202            }
1203            LogicalExpression::Map(pairs) => {
1204                let filter_pairs: Vec<(String, FilterExpression)> = pairs
1205                    .iter()
1206                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1207                    .collect::<Result<Vec<_>>>()?;
1208                Ok(FilterExpression::Map(filter_pairs))
1209            }
1210            LogicalExpression::IndexAccess { base, index } => {
1211                let base_expr = self.convert_expression(base)?;
1212                let index_expr = self.convert_expression(index)?;
1213                Ok(FilterExpression::IndexAccess {
1214                    base: Box::new(base_expr),
1215                    index: Box::new(index_expr),
1216                })
1217            }
1218            LogicalExpression::SliceAccess { base, start, end } => {
1219                let base_expr = self.convert_expression(base)?;
1220                let start_expr = start
1221                    .as_ref()
1222                    .map(|s| self.convert_expression(s))
1223                    .transpose()?
1224                    .map(Box::new);
1225                let end_expr = end
1226                    .as_ref()
1227                    .map(|e| self.convert_expression(e))
1228                    .transpose()?
1229                    .map(Box::new);
1230                Ok(FilterExpression::SliceAccess {
1231                    base: Box::new(base_expr),
1232                    start: start_expr,
1233                    end: end_expr,
1234                })
1235            }
1236            LogicalExpression::Parameter(_) => Err(Error::Internal(
1237                "Parameters not yet supported in filters".to_string(),
1238            )),
1239            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1240            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1241            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1242            LogicalExpression::ListComprehension {
1243                variable,
1244                list_expr,
1245                filter_expr,
1246                map_expr,
1247            } => {
1248                let list = self.convert_expression(list_expr)?;
1249                let filter = filter_expr
1250                    .as_ref()
1251                    .map(|f| self.convert_expression(f))
1252                    .transpose()?
1253                    .map(Box::new);
1254                let map = self.convert_expression(map_expr)?;
1255                Ok(FilterExpression::ListComprehension {
1256                    variable: variable.clone(),
1257                    list_expr: Box::new(list),
1258                    filter_expr: filter,
1259                    map_expr: Box::new(map),
1260                })
1261            }
1262            LogicalExpression::ExistsSubquery(subplan) => {
1263                // Extract the pattern from the subplan
1264                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
1265                let (start_var, direction, edge_type, end_labels) =
1266                    self.extract_exists_pattern(subplan)?;
1267
1268                Ok(FilterExpression::ExistsSubquery {
1269                    start_var,
1270                    direction,
1271                    edge_type,
1272                    end_labels,
1273                    min_hops: None,
1274                    max_hops: None,
1275                })
1276            }
1277            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
1278                "COUNT subqueries not yet supported".to_string(),
1279            )),
1280        }
1281    }
1282
1283    /// Extracts the pattern from an EXISTS subplan.
1284    /// Returns (start_variable, direction, edge_type, end_labels).
1285    fn extract_exists_pattern(
1286        &self,
1287        subplan: &LogicalOperator,
1288    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
1289        match subplan {
1290            LogicalOperator::Expand(expand) => {
1291                // Get end node labels from the to_variable if there's a node scan input
1292                let end_labels = self.extract_end_labels_from_expand(expand);
1293                let direction = match expand.direction {
1294                    ExpandDirection::Outgoing => Direction::Outgoing,
1295                    ExpandDirection::Incoming => Direction::Incoming,
1296                    ExpandDirection::Both => Direction::Both,
1297                };
1298                Ok((
1299                    expand.from_variable.clone(),
1300                    direction,
1301                    expand.edge_type.clone(),
1302                    end_labels,
1303                ))
1304            }
1305            LogicalOperator::NodeScan(scan) => {
1306                if let Some(input) = &scan.input {
1307                    self.extract_exists_pattern(input)
1308                } else {
1309                    Err(Error::Internal(
1310                        "EXISTS subquery must contain an edge pattern".to_string(),
1311                    ))
1312                }
1313            }
1314            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
1315            _ => Err(Error::Internal(
1316                "Unsupported EXISTS subquery pattern".to_string(),
1317            )),
1318        }
1319    }
1320
1321    /// Extracts end node labels from an Expand operator if present.
1322    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
1323        // Check if the expand has a NodeScan input with a label filter
1324        match expand.input.as_ref() {
1325            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
1326            _ => None,
1327        }
1328    }
1329
1330    /// Plans a JOIN operator.
1331    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1332        let (left_op, left_columns) = self.plan_operator(&join.left)?;
1333        let (right_op, right_columns) = self.plan_operator(&join.right)?;
1334
1335        // Build combined output columns
1336        let mut columns = left_columns.clone();
1337        columns.extend(right_columns.clone());
1338
1339        // Convert join type
1340        let physical_join_type = match join.join_type {
1341            JoinType::Inner => PhysicalJoinType::Inner,
1342            JoinType::Left => PhysicalJoinType::Left,
1343            JoinType::Right => PhysicalJoinType::Right,
1344            JoinType::Full => PhysicalJoinType::Full,
1345            JoinType::Cross => PhysicalJoinType::Cross,
1346            JoinType::Semi => PhysicalJoinType::Semi,
1347            JoinType::Anti => PhysicalJoinType::Anti,
1348        };
1349
1350        // Build key columns from join conditions
1351        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
1352            // Cross join - no keys
1353            (vec![], vec![])
1354        } else {
1355            join.conditions
1356                .iter()
1357                .filter_map(|cond| {
1358                    // Try to extract column indices from expressions
1359                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
1360                    let right_idx = self
1361                        .expression_to_column(&cond.right, &right_columns)
1362                        .ok()?;
1363                    Some((left_idx, right_idx))
1364                })
1365                .unzip()
1366        };
1367
1368        let output_schema = self.derive_schema_from_columns(&columns);
1369
1370        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1371            left_op,
1372            right_op,
1373            probe_keys,
1374            build_keys,
1375            physical_join_type,
1376            output_schema,
1377        ));
1378
1379        Ok((operator, columns))
1380    }
1381
1382    /// Extracts a column index from an expression.
1383    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
1384        match expr {
1385            LogicalExpression::Variable(name) => columns
1386                .iter()
1387                .position(|c| c == name)
1388                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1389            _ => Err(Error::Internal(
1390                "Only variables supported in join conditions".to_string(),
1391            )),
1392        }
1393    }
1394
1395    /// Plans a UNION operator.
1396    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1397        if union.inputs.is_empty() {
1398            return Err(Error::Internal(
1399                "Union requires at least one input".to_string(),
1400            ));
1401        }
1402
1403        let mut inputs = Vec::with_capacity(union.inputs.len());
1404        let mut columns = Vec::new();
1405
1406        for (i, input) in union.inputs.iter().enumerate() {
1407            let (op, cols) = self.plan_operator(input)?;
1408            if i == 0 {
1409                columns = cols;
1410            }
1411            inputs.push(op);
1412        }
1413
1414        let output_schema = self.derive_schema_from_columns(&columns);
1415        let operator = Box::new(UnionOperator::new(inputs, output_schema));
1416
1417        Ok((operator, columns))
1418    }
1419
1420    /// Plans a DISTINCT operator.
1421    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1422        let (input_op, columns) = self.plan_operator(&distinct.input)?;
1423        let output_schema = self.derive_schema_from_columns(&columns);
1424        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
1425        Ok((operator, columns))
1426    }
1427
1428    /// Plans a CREATE NODE operator.
1429    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1430        // Plan input if present
1431        let (input_op, mut columns) = if let Some(ref input) = create.input {
1432            let (op, cols) = self.plan_operator(input)?;
1433            (Some(op), cols)
1434        } else {
1435            (None, vec![])
1436        };
1437
1438        // Output column for the created node
1439        let output_column = columns.len();
1440        columns.push(create.variable.clone());
1441
1442        // Convert properties
1443        let properties: Vec<(String, PropertySource)> = create
1444            .properties
1445            .iter()
1446            .map(|(name, expr)| {
1447                let source = match expr {
1448                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1449                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1450                };
1451                (name.clone(), source)
1452            })
1453            .collect();
1454
1455        let output_schema = self.derive_schema_from_columns(&columns);
1456
1457        let operator = Box::new(
1458            CreateNodeOperator::new(
1459                Arc::clone(&self.store),
1460                input_op,
1461                create.labels.clone(),
1462                properties,
1463                output_schema,
1464                output_column,
1465            )
1466            .with_tx_context(self.viewing_epoch, self.tx_id),
1467        );
1468
1469        Ok((operator, columns))
1470    }
1471
1472    /// Plans a CREATE EDGE operator.
1473    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1474        let (input_op, mut columns) = self.plan_operator(&create.input)?;
1475
1476        // Find source and target columns
1477        let from_column = columns
1478            .iter()
1479            .position(|c| c == &create.from_variable)
1480            .ok_or_else(|| {
1481                Error::Internal(format!(
1482                    "Source variable '{}' not found",
1483                    create.from_variable
1484                ))
1485            })?;
1486
1487        let to_column = columns
1488            .iter()
1489            .position(|c| c == &create.to_variable)
1490            .ok_or_else(|| {
1491                Error::Internal(format!(
1492                    "Target variable '{}' not found",
1493                    create.to_variable
1494                ))
1495            })?;
1496
1497        // Output column for the created edge (if named)
1498        let output_column = create.variable.as_ref().map(|v| {
1499            let idx = columns.len();
1500            columns.push(v.clone());
1501            idx
1502        });
1503
1504        // Convert properties
1505        let properties: Vec<(String, PropertySource)> = create
1506            .properties
1507            .iter()
1508            .map(|(name, expr)| {
1509                let source = match expr {
1510                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1511                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1512                };
1513                (name.clone(), source)
1514            })
1515            .collect();
1516
1517        let output_schema = self.derive_schema_from_columns(&columns);
1518
1519        let operator = Box::new(
1520            CreateEdgeOperator::new(
1521                Arc::clone(&self.store),
1522                input_op,
1523                from_column,
1524                to_column,
1525                create.edge_type.clone(),
1526                properties,
1527                output_schema,
1528                output_column,
1529            )
1530            .with_tx_context(self.viewing_epoch, self.tx_id),
1531        );
1532
1533        Ok((operator, columns))
1534    }
1535
1536    /// Plans a DELETE NODE operator.
1537    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1538        let (input_op, columns) = self.plan_operator(&delete.input)?;
1539
1540        let node_column = columns
1541            .iter()
1542            .position(|c| c == &delete.variable)
1543            .ok_or_else(|| {
1544                Error::Internal(format!(
1545                    "Variable '{}' not found for delete",
1546                    delete.variable
1547                ))
1548            })?;
1549
1550        // Output schema for delete count
1551        let output_schema = vec![LogicalType::Int64];
1552        let output_columns = vec!["deleted_count".to_string()];
1553
1554        let operator = Box::new(
1555            DeleteNodeOperator::new(
1556                Arc::clone(&self.store),
1557                input_op,
1558                node_column,
1559                output_schema,
1560                delete.detach, // DETACH DELETE deletes connected edges first
1561            )
1562            .with_tx_context(self.viewing_epoch, self.tx_id),
1563        );
1564
1565        Ok((operator, output_columns))
1566    }
1567
1568    /// Plans a DELETE EDGE operator.
1569    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1570        let (input_op, columns) = self.plan_operator(&delete.input)?;
1571
1572        let edge_column = columns
1573            .iter()
1574            .position(|c| c == &delete.variable)
1575            .ok_or_else(|| {
1576                Error::Internal(format!(
1577                    "Variable '{}' not found for delete",
1578                    delete.variable
1579                ))
1580            })?;
1581
1582        // Output schema for delete count
1583        let output_schema = vec![LogicalType::Int64];
1584        let output_columns = vec!["deleted_count".to_string()];
1585
1586        let operator = Box::new(
1587            DeleteEdgeOperator::new(
1588                Arc::clone(&self.store),
1589                input_op,
1590                edge_column,
1591                output_schema,
1592            )
1593            .with_tx_context(self.viewing_epoch, self.tx_id),
1594        );
1595
1596        Ok((operator, output_columns))
1597    }
1598
1599    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
1600    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1601        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
1602        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
1603
1604        // Build combined output columns (left + right)
1605        let mut columns = left_columns.clone();
1606        columns.extend(right_columns.clone());
1607
1608        // Find common variables between left and right for join keys
1609        let mut probe_keys = Vec::new();
1610        let mut build_keys = Vec::new();
1611
1612        for (right_idx, right_col) in right_columns.iter().enumerate() {
1613            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1614                probe_keys.push(left_idx);
1615                build_keys.push(right_idx);
1616            }
1617        }
1618
1619        let output_schema = self.derive_schema_from_columns(&columns);
1620
1621        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1622            left_op,
1623            right_op,
1624            probe_keys,
1625            build_keys,
1626            PhysicalJoinType::Left,
1627            output_schema,
1628        ));
1629
1630        Ok((operator, columns))
1631    }
1632
1633    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
1634    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1635        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
1636        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
1637
1638        // Anti-join only keeps left columns (filters out matching rows)
1639        let columns = left_columns.clone();
1640
1641        // Find common variables between left and right for join keys
1642        let mut probe_keys = Vec::new();
1643        let mut build_keys = Vec::new();
1644
1645        for (right_idx, right_col) in right_columns.iter().enumerate() {
1646            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1647                probe_keys.push(left_idx);
1648                build_keys.push(right_idx);
1649            }
1650        }
1651
1652        let output_schema = self.derive_schema_from_columns(&columns);
1653
1654        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1655            left_op,
1656            right_op,
1657            probe_keys,
1658            build_keys,
1659            PhysicalJoinType::Anti,
1660            output_schema,
1661        ));
1662
1663        Ok((operator, columns))
1664    }
1665
1666    /// Plans an unwind operator.
1667    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1668        // Plan the input operator first
1669        // Handle Empty specially - use a single-row operator
1670        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
1671            if matches!(&*unwind.input, LogicalOperator::Empty) {
1672                // For UNWIND without prior MATCH, create a single-row input
1673                // We need an operator that produces one row with the list to unwind
1674                // For now, use EmptyScan which produces no rows - we'll handle the literal
1675                // list in the unwind operator itself
1676                let literal_list = self.convert_expression(&unwind.expression)?;
1677
1678                // Create a project operator that produces a single row with the list
1679                let single_row_op: Box<dyn Operator> = Box::new(
1680                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
1681                );
1682                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
1683                    single_row_op,
1684                    vec![ProjectExpr::Expression {
1685                        expr: literal_list,
1686                        variable_columns: HashMap::new(),
1687                    }],
1688                    vec![LogicalType::Any],
1689                    Arc::clone(&self.store),
1690                ));
1691
1692                (project_op, vec!["__list__".to_string()])
1693            } else {
1694                self.plan_operator(&unwind.input)?
1695            };
1696
1697        // The UNWIND expression should be a list - we need to find/evaluate it
1698        // For now, we handle the case where the expression references an existing column
1699        // or is a literal list
1700
1701        // Find if the expression references an existing column (like a list property)
1702        let list_col_idx = match &unwind.expression {
1703            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
1704            LogicalExpression::Property { variable, .. } => {
1705                // Property access needs to be evaluated - for now we'll need the filter predicate
1706                // to evaluate this. For simple cases, we treat it as a list column.
1707                input_columns.iter().position(|c| c == variable)
1708            }
1709            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
1710                // Literal list expression - we'll add it as a virtual column
1711                None
1712            }
1713            _ => None,
1714        };
1715
1716        // Build output columns: all input columns plus the new variable
1717        let mut columns = input_columns.clone();
1718        columns.push(unwind.variable.clone());
1719
1720        // Build output schema
1721        let mut output_schema = self.derive_schema_from_columns(&input_columns);
1722        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
1723
1724        // Use the list column index if found, otherwise default to 0
1725        // (in which case the first column should contain the list)
1726        let col_idx = list_col_idx.unwrap_or(0);
1727
1728        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
1729            input_op,
1730            col_idx,
1731            unwind.variable.clone(),
1732            output_schema,
1733        ));
1734
1735        Ok((operator, columns))
1736    }
1737
1738    /// Plans a MERGE operator.
1739    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1740        // Plan the input operator if present (skip if Empty)
1741        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
1742            Vec::new()
1743        } else {
1744            let (_input_op, cols) = self.plan_operator(&merge.input)?;
1745            cols
1746        };
1747
1748        // Convert match properties from LogicalExpression to Value
1749        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
1750            .match_properties
1751            .iter()
1752            .filter_map(|(name, expr)| {
1753                if let LogicalExpression::Literal(v) = expr {
1754                    Some((name.clone(), v.clone()))
1755                } else {
1756                    None // Skip non-literal expressions for now
1757                }
1758            })
1759            .collect();
1760
1761        // Convert ON CREATE properties
1762        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
1763            .on_create
1764            .iter()
1765            .filter_map(|(name, expr)| {
1766                if let LogicalExpression::Literal(v) = expr {
1767                    Some((name.clone(), v.clone()))
1768                } else {
1769                    None
1770                }
1771            })
1772            .collect();
1773
1774        // Convert ON MATCH properties
1775        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
1776            .on_match
1777            .iter()
1778            .filter_map(|(name, expr)| {
1779                if let LogicalExpression::Literal(v) = expr {
1780                    Some((name.clone(), v.clone()))
1781                } else {
1782                    None
1783                }
1784            })
1785            .collect();
1786
1787        // Add the merged node variable to output columns
1788        columns.push(merge.variable.clone());
1789
1790        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
1791            Arc::clone(&self.store),
1792            merge.variable.clone(),
1793            merge.labels.clone(),
1794            match_properties,
1795            on_create_properties,
1796            on_match_properties,
1797        ));
1798
1799        Ok((operator, columns))
1800    }
1801
1802    /// Plans a SHORTEST PATH operator.
1803    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1804        // Plan the input operator
1805        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
1806
1807        // Find source and target node columns
1808        let source_column = columns
1809            .iter()
1810            .position(|c| c == &sp.source_var)
1811            .ok_or_else(|| {
1812                Error::Internal(format!(
1813                    "Source variable '{}' not found for shortestPath",
1814                    sp.source_var
1815                ))
1816            })?;
1817
1818        let target_column = columns
1819            .iter()
1820            .position(|c| c == &sp.target_var)
1821            .ok_or_else(|| {
1822                Error::Internal(format!(
1823                    "Target variable '{}' not found for shortestPath",
1824                    sp.target_var
1825                ))
1826            })?;
1827
1828        // Convert direction
1829        let direction = match sp.direction {
1830            ExpandDirection::Outgoing => Direction::Outgoing,
1831            ExpandDirection::Incoming => Direction::Incoming,
1832            ExpandDirection::Both => Direction::Both,
1833        };
1834
1835        // Create the shortest path operator
1836        let operator: Box<dyn Operator> = Box::new(
1837            ShortestPathOperator::new(
1838                Arc::clone(&self.store),
1839                input_op,
1840                source_column,
1841                target_column,
1842                sp.edge_type.clone(),
1843                direction,
1844            )
1845            .with_all_paths(sp.all_paths),
1846        );
1847
1848        // Add path length column with the expected naming convention
1849        // The translator expects _path_length_{alias} format for length(p) calls
1850        columns.push(format!("_path_length_{}", sp.path_alias));
1851
1852        Ok((operator, columns))
1853    }
1854
1855    /// Plans an ADD LABEL operator.
1856    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1857        let (input_op, columns) = self.plan_operator(&add_label.input)?;
1858
1859        // Find the node column
1860        let node_column = columns
1861            .iter()
1862            .position(|c| c == &add_label.variable)
1863            .ok_or_else(|| {
1864                Error::Internal(format!(
1865                    "Variable '{}' not found for ADD LABEL",
1866                    add_label.variable
1867                ))
1868            })?;
1869
1870        // Output schema for update count
1871        let output_schema = vec![LogicalType::Int64];
1872        let output_columns = vec!["labels_added".to_string()];
1873
1874        let operator = Box::new(AddLabelOperator::new(
1875            Arc::clone(&self.store),
1876            input_op,
1877            node_column,
1878            add_label.labels.clone(),
1879            output_schema,
1880        ));
1881
1882        Ok((operator, output_columns))
1883    }
1884
1885    /// Plans a REMOVE LABEL operator.
1886    fn plan_remove_label(
1887        &self,
1888        remove_label: &RemoveLabelOp,
1889    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1890        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
1891
1892        // Find the node column
1893        let node_column = columns
1894            .iter()
1895            .position(|c| c == &remove_label.variable)
1896            .ok_or_else(|| {
1897                Error::Internal(format!(
1898                    "Variable '{}' not found for REMOVE LABEL",
1899                    remove_label.variable
1900                ))
1901            })?;
1902
1903        // Output schema for update count
1904        let output_schema = vec![LogicalType::Int64];
1905        let output_columns = vec!["labels_removed".to_string()];
1906
1907        let operator = Box::new(RemoveLabelOperator::new(
1908            Arc::clone(&self.store),
1909            input_op,
1910            node_column,
1911            remove_label.labels.clone(),
1912            output_schema,
1913        ));
1914
1915        Ok((operator, output_columns))
1916    }
1917
1918    /// Plans a SET PROPERTY operator.
1919    fn plan_set_property(
1920        &self,
1921        set_prop: &SetPropertyOp,
1922    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1923        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
1924
1925        // Find the entity column (node or edge variable)
1926        let entity_column = columns
1927            .iter()
1928            .position(|c| c == &set_prop.variable)
1929            .ok_or_else(|| {
1930                Error::Internal(format!(
1931                    "Variable '{}' not found for SET",
1932                    set_prop.variable
1933                ))
1934            })?;
1935
1936        // Convert properties to PropertySource
1937        let properties: Vec<(String, PropertySource)> = set_prop
1938            .properties
1939            .iter()
1940            .map(|(name, expr)| {
1941                let source = self.expression_to_property_source(expr, &columns)?;
1942                Ok((name.clone(), source))
1943            })
1944            .collect::<Result<Vec<_>>>()?;
1945
1946        // Output schema preserves input schema (passes through)
1947        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
1948        let output_columns = columns.clone();
1949
1950        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
1951        let operator = Box::new(SetPropertyOperator::new_for_node(
1952            Arc::clone(&self.store),
1953            input_op,
1954            entity_column,
1955            properties,
1956            output_schema,
1957        ));
1958
1959        Ok((operator, output_columns))
1960    }
1961
1962    /// Converts a logical expression to a PropertySource.
1963    fn expression_to_property_source(
1964        &self,
1965        expr: &LogicalExpression,
1966        columns: &[String],
1967    ) -> Result<PropertySource> {
1968        match expr {
1969            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
1970            LogicalExpression::Variable(name) => {
1971                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
1972                    Error::Internal(format!("Variable '{}' not found for property source", name))
1973                })?;
1974                Ok(PropertySource::Column(col_idx))
1975            }
1976            LogicalExpression::Parameter(name) => {
1977                // Parameters should be resolved before planning
1978                // For now, treat as a placeholder
1979                Ok(PropertySource::Constant(
1980                    grafeo_common::types::Value::String(format!("${}", name).into()),
1981                ))
1982            }
1983            _ => Err(Error::Internal(format!(
1984                "Unsupported expression type for property source: {:?}",
1985                expr
1986            ))),
1987        }
1988    }
1989}
1990
1991/// Converts a logical binary operator to a filter binary operator.
1992pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
1993    match op {
1994        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
1995        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
1996        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
1997        BinaryOp::Le => Ok(BinaryFilterOp::Le),
1998        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
1999        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2000        BinaryOp::And => Ok(BinaryFilterOp::And),
2001        BinaryOp::Or => Ok(BinaryFilterOp::Or),
2002        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2003        BinaryOp::Add => Ok(BinaryFilterOp::Add),
2004        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2005        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2006        BinaryOp::Div => Ok(BinaryFilterOp::Div),
2007        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2008        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2009        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2010        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2011        BinaryOp::In => Ok(BinaryFilterOp::In),
2012        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2013        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2014        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2015            "Binary operator {:?} not yet supported in filters",
2016            op
2017        ))),
2018    }
2019}
2020
2021/// Converts a logical unary operator to a filter unary operator.
2022pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2023    match op {
2024        UnaryOp::Not => Ok(UnaryFilterOp::Not),
2025        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2026        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2027        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2028    }
2029}
2030
2031/// Converts a logical aggregate function to a physical aggregate function.
2032pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2033    match func {
2034        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2035        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2036        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2037        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2038        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2039        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2040        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2041        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2042        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2043        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2044        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2045    }
2046}
2047
2048/// Converts a logical expression to a filter expression.
2049///
2050/// This is a standalone function that can be used by both LPG and RDF planners.
2051pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2052    match expr {
2053        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2054        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2055        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2056            variable: variable.clone(),
2057            property: property.clone(),
2058        }),
2059        LogicalExpression::Binary { left, op, right } => {
2060            let left_expr = convert_filter_expression(left)?;
2061            let right_expr = convert_filter_expression(right)?;
2062            let filter_op = convert_binary_op(*op)?;
2063            Ok(FilterExpression::Binary {
2064                left: Box::new(left_expr),
2065                op: filter_op,
2066                right: Box::new(right_expr),
2067            })
2068        }
2069        LogicalExpression::Unary { op, operand } => {
2070            let operand_expr = convert_filter_expression(operand)?;
2071            let filter_op = convert_unary_op(*op)?;
2072            Ok(FilterExpression::Unary {
2073                op: filter_op,
2074                operand: Box::new(operand_expr),
2075            })
2076        }
2077        LogicalExpression::FunctionCall { name, args, .. } => {
2078            let filter_args: Vec<FilterExpression> = args
2079                .iter()
2080                .map(|a| convert_filter_expression(a))
2081                .collect::<Result<Vec<_>>>()?;
2082            Ok(FilterExpression::FunctionCall {
2083                name: name.clone(),
2084                args: filter_args,
2085            })
2086        }
2087        LogicalExpression::Case {
2088            operand,
2089            when_clauses,
2090            else_clause,
2091        } => {
2092            let filter_operand = operand
2093                .as_ref()
2094                .map(|e| convert_filter_expression(e))
2095                .transpose()?
2096                .map(Box::new);
2097            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2098                .iter()
2099                .map(|(cond, result)| {
2100                    Ok((
2101                        convert_filter_expression(cond)?,
2102                        convert_filter_expression(result)?,
2103                    ))
2104                })
2105                .collect::<Result<Vec<_>>>()?;
2106            let filter_else = else_clause
2107                .as_ref()
2108                .map(|e| convert_filter_expression(e))
2109                .transpose()?
2110                .map(Box::new);
2111            Ok(FilterExpression::Case {
2112                operand: filter_operand,
2113                when_clauses: filter_when_clauses,
2114                else_clause: filter_else,
2115            })
2116        }
2117        LogicalExpression::List(items) => {
2118            let filter_items: Vec<FilterExpression> = items
2119                .iter()
2120                .map(|item| convert_filter_expression(item))
2121                .collect::<Result<Vec<_>>>()?;
2122            Ok(FilterExpression::List(filter_items))
2123        }
2124        LogicalExpression::Map(pairs) => {
2125            let filter_pairs: Vec<(String, FilterExpression)> = pairs
2126                .iter()
2127                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
2128                .collect::<Result<Vec<_>>>()?;
2129            Ok(FilterExpression::Map(filter_pairs))
2130        }
2131        LogicalExpression::IndexAccess { base, index } => {
2132            let base_expr = convert_filter_expression(base)?;
2133            let index_expr = convert_filter_expression(index)?;
2134            Ok(FilterExpression::IndexAccess {
2135                base: Box::new(base_expr),
2136                index: Box::new(index_expr),
2137            })
2138        }
2139        LogicalExpression::SliceAccess { base, start, end } => {
2140            let base_expr = convert_filter_expression(base)?;
2141            let start_expr = start
2142                .as_ref()
2143                .map(|s| convert_filter_expression(s))
2144                .transpose()?
2145                .map(Box::new);
2146            let end_expr = end
2147                .as_ref()
2148                .map(|e| convert_filter_expression(e))
2149                .transpose()?
2150                .map(Box::new);
2151            Ok(FilterExpression::SliceAccess {
2152                base: Box::new(base_expr),
2153                start: start_expr,
2154                end: end_expr,
2155            })
2156        }
2157        LogicalExpression::Parameter(_) => Err(Error::Internal(
2158            "Parameters not yet supported in filters".to_string(),
2159        )),
2160        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2161        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2162        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2163        LogicalExpression::ListComprehension {
2164            variable,
2165            list_expr,
2166            filter_expr,
2167            map_expr,
2168        } => {
2169            let list = convert_filter_expression(list_expr)?;
2170            let filter = filter_expr
2171                .as_ref()
2172                .map(|f| convert_filter_expression(f))
2173                .transpose()?
2174                .map(Box::new);
2175            let map = convert_filter_expression(map_expr)?;
2176            Ok(FilterExpression::ListComprehension {
2177                variable: variable.clone(),
2178                list_expr: Box::new(list),
2179                filter_expr: filter,
2180                map_expr: Box::new(map),
2181            })
2182        }
2183        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
2184            Error::Internal("Subqueries not yet supported in filters".to_string()),
2185        ),
2186    }
2187}
2188
2189/// Infers the logical type from a value.
2190fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
2191    use grafeo_common::types::Value;
2192    match value {
2193        Value::Null => LogicalType::String, // Default type for null
2194        Value::Bool(_) => LogicalType::Bool,
2195        Value::Int64(_) => LogicalType::Int64,
2196        Value::Float64(_) => LogicalType::Float64,
2197        Value::String(_) => LogicalType::String,
2198        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
2199        Value::Timestamp(_) => LogicalType::Timestamp,
2200        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
2201        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
2202    }
2203}
2204
2205/// Converts an expression to a string for column naming.
2206fn expression_to_string(expr: &LogicalExpression) -> String {
2207    match expr {
2208        LogicalExpression::Variable(name) => name.clone(),
2209        LogicalExpression::Property { variable, property } => {
2210            format!("{variable}.{property}")
2211        }
2212        LogicalExpression::Literal(value) => format!("{value:?}"),
2213        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
2214        _ => "expr".to_string(),
2215    }
2216}
2217
2218/// A physical plan ready for execution.
2219pub struct PhysicalPlan {
2220    /// The root physical operator.
2221    pub operator: Box<dyn Operator>,
2222    /// Column names for the result.
2223    pub columns: Vec<String>,
2224    /// Adaptive execution context with cardinality estimates.
2225    ///
2226    /// When adaptive execution is enabled, this context contains estimated
2227    /// cardinalities at various checkpoints in the plan. During execution,
2228    /// actual row counts are recorded and compared against estimates.
2229    pub adaptive_context: Option<AdaptiveContext>,
2230}
2231
2232impl PhysicalPlan {
2233    /// Returns the column names.
2234    #[must_use]
2235    pub fn columns(&self) -> &[String] {
2236        &self.columns
2237    }
2238
2239    /// Consumes the plan and returns the operator.
2240    pub fn into_operator(self) -> Box<dyn Operator> {
2241        self.operator
2242    }
2243
2244    /// Returns the adaptive context, if adaptive execution is enabled.
2245    #[must_use]
2246    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
2247        self.adaptive_context.as_ref()
2248    }
2249
2250    /// Takes ownership of the adaptive context.
2251    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
2252        self.adaptive_context.take()
2253    }
2254}
2255
2256#[cfg(test)]
2257mod tests {
2258    use super::*;
2259    use crate::query::plan::{
2260        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
2261        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
2262        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
2263        SortKey, SortOp,
2264    };
2265    use grafeo_common::types::Value;
2266
2267    fn create_test_store() -> Arc<LpgStore> {
2268        let store = Arc::new(LpgStore::new());
2269        store.create_node(&["Person"]);
2270        store.create_node(&["Person"]);
2271        store.create_node(&["Company"]);
2272        store
2273    }
2274
2275    // ==================== Simple Scan Tests ====================
2276
2277    #[test]
2278    fn test_plan_simple_scan() {
2279        let store = create_test_store();
2280        let planner = Planner::new(store);
2281
2282        // MATCH (n:Person) RETURN n
2283        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2284            items: vec![ReturnItem {
2285                expression: LogicalExpression::Variable("n".to_string()),
2286                alias: None,
2287            }],
2288            distinct: false,
2289            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2290                variable: "n".to_string(),
2291                label: Some("Person".to_string()),
2292                input: None,
2293            })),
2294        }));
2295
2296        let physical = planner.plan(&logical).unwrap();
2297        assert_eq!(physical.columns(), &["n"]);
2298    }
2299
2300    #[test]
2301    fn test_plan_scan_without_label() {
2302        let store = create_test_store();
2303        let planner = Planner::new(store);
2304
2305        // MATCH (n) RETURN n
2306        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2307            items: vec![ReturnItem {
2308                expression: LogicalExpression::Variable("n".to_string()),
2309                alias: None,
2310            }],
2311            distinct: false,
2312            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2313                variable: "n".to_string(),
2314                label: None,
2315                input: None,
2316            })),
2317        }));
2318
2319        let physical = planner.plan(&logical).unwrap();
2320        assert_eq!(physical.columns(), &["n"]);
2321    }
2322
2323    #[test]
2324    fn test_plan_return_with_alias() {
2325        let store = create_test_store();
2326        let planner = Planner::new(store);
2327
2328        // MATCH (n:Person) RETURN n AS person
2329        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2330            items: vec![ReturnItem {
2331                expression: LogicalExpression::Variable("n".to_string()),
2332                alias: Some("person".to_string()),
2333            }],
2334            distinct: false,
2335            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2336                variable: "n".to_string(),
2337                label: Some("Person".to_string()),
2338                input: None,
2339            })),
2340        }));
2341
2342        let physical = planner.plan(&logical).unwrap();
2343        assert_eq!(physical.columns(), &["person"]);
2344    }
2345
2346    #[test]
2347    fn test_plan_return_property() {
2348        let store = create_test_store();
2349        let planner = Planner::new(store);
2350
2351        // MATCH (n:Person) RETURN n.name
2352        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2353            items: vec![ReturnItem {
2354                expression: LogicalExpression::Property {
2355                    variable: "n".to_string(),
2356                    property: "name".to_string(),
2357                },
2358                alias: None,
2359            }],
2360            distinct: false,
2361            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2362                variable: "n".to_string(),
2363                label: Some("Person".to_string()),
2364                input: None,
2365            })),
2366        }));
2367
2368        let physical = planner.plan(&logical).unwrap();
2369        assert_eq!(physical.columns(), &["n.name"]);
2370    }
2371
2372    #[test]
2373    fn test_plan_return_literal() {
2374        let store = create_test_store();
2375        let planner = Planner::new(store);
2376
2377        // MATCH (n) RETURN 42 AS answer
2378        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2379            items: vec![ReturnItem {
2380                expression: LogicalExpression::Literal(Value::Int64(42)),
2381                alias: Some("answer".to_string()),
2382            }],
2383            distinct: false,
2384            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2385                variable: "n".to_string(),
2386                label: None,
2387                input: None,
2388            })),
2389        }));
2390
2391        let physical = planner.plan(&logical).unwrap();
2392        assert_eq!(physical.columns(), &["answer"]);
2393    }
2394
2395    // ==================== Filter Tests ====================
2396
2397    #[test]
2398    fn test_plan_filter_equality() {
2399        let store = create_test_store();
2400        let planner = Planner::new(store);
2401
2402        // MATCH (n:Person) WHERE n.age = 30 RETURN n
2403        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2404            items: vec![ReturnItem {
2405                expression: LogicalExpression::Variable("n".to_string()),
2406                alias: None,
2407            }],
2408            distinct: false,
2409            input: Box::new(LogicalOperator::Filter(FilterOp {
2410                predicate: LogicalExpression::Binary {
2411                    left: Box::new(LogicalExpression::Property {
2412                        variable: "n".to_string(),
2413                        property: "age".to_string(),
2414                    }),
2415                    op: BinaryOp::Eq,
2416                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2417                },
2418                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2419                    variable: "n".to_string(),
2420                    label: Some("Person".to_string()),
2421                    input: None,
2422                })),
2423            })),
2424        }));
2425
2426        let physical = planner.plan(&logical).unwrap();
2427        assert_eq!(physical.columns(), &["n"]);
2428    }
2429
2430    #[test]
2431    fn test_plan_filter_compound_and() {
2432        let store = create_test_store();
2433        let planner = Planner::new(store);
2434
2435        // WHERE n.age > 20 AND n.age < 40
2436        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2437            items: vec![ReturnItem {
2438                expression: LogicalExpression::Variable("n".to_string()),
2439                alias: None,
2440            }],
2441            distinct: false,
2442            input: Box::new(LogicalOperator::Filter(FilterOp {
2443                predicate: LogicalExpression::Binary {
2444                    left: Box::new(LogicalExpression::Binary {
2445                        left: Box::new(LogicalExpression::Property {
2446                            variable: "n".to_string(),
2447                            property: "age".to_string(),
2448                        }),
2449                        op: BinaryOp::Gt,
2450                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
2451                    }),
2452                    op: BinaryOp::And,
2453                    right: Box::new(LogicalExpression::Binary {
2454                        left: Box::new(LogicalExpression::Property {
2455                            variable: "n".to_string(),
2456                            property: "age".to_string(),
2457                        }),
2458                        op: BinaryOp::Lt,
2459                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
2460                    }),
2461                },
2462                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2463                    variable: "n".to_string(),
2464                    label: None,
2465                    input: None,
2466                })),
2467            })),
2468        }));
2469
2470        let physical = planner.plan(&logical).unwrap();
2471        assert_eq!(physical.columns(), &["n"]);
2472    }
2473
2474    #[test]
2475    fn test_plan_filter_unary_not() {
2476        let store = create_test_store();
2477        let planner = Planner::new(store);
2478
2479        // WHERE NOT n.active
2480        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2481            items: vec![ReturnItem {
2482                expression: LogicalExpression::Variable("n".to_string()),
2483                alias: None,
2484            }],
2485            distinct: false,
2486            input: Box::new(LogicalOperator::Filter(FilterOp {
2487                predicate: LogicalExpression::Unary {
2488                    op: UnaryOp::Not,
2489                    operand: Box::new(LogicalExpression::Property {
2490                        variable: "n".to_string(),
2491                        property: "active".to_string(),
2492                    }),
2493                },
2494                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2495                    variable: "n".to_string(),
2496                    label: None,
2497                    input: None,
2498                })),
2499            })),
2500        }));
2501
2502        let physical = planner.plan(&logical).unwrap();
2503        assert_eq!(physical.columns(), &["n"]);
2504    }
2505
2506    #[test]
2507    fn test_plan_filter_is_null() {
2508        let store = create_test_store();
2509        let planner = Planner::new(store);
2510
2511        // WHERE n.email IS NULL
2512        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2513            items: vec![ReturnItem {
2514                expression: LogicalExpression::Variable("n".to_string()),
2515                alias: None,
2516            }],
2517            distinct: false,
2518            input: Box::new(LogicalOperator::Filter(FilterOp {
2519                predicate: LogicalExpression::Unary {
2520                    op: UnaryOp::IsNull,
2521                    operand: Box::new(LogicalExpression::Property {
2522                        variable: "n".to_string(),
2523                        property: "email".to_string(),
2524                    }),
2525                },
2526                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2527                    variable: "n".to_string(),
2528                    label: None,
2529                    input: None,
2530                })),
2531            })),
2532        }));
2533
2534        let physical = planner.plan(&logical).unwrap();
2535        assert_eq!(physical.columns(), &["n"]);
2536    }
2537
2538    #[test]
2539    fn test_plan_filter_function_call() {
2540        let store = create_test_store();
2541        let planner = Planner::new(store);
2542
2543        // WHERE size(n.friends) > 0
2544        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2545            items: vec![ReturnItem {
2546                expression: LogicalExpression::Variable("n".to_string()),
2547                alias: None,
2548            }],
2549            distinct: false,
2550            input: Box::new(LogicalOperator::Filter(FilterOp {
2551                predicate: LogicalExpression::Binary {
2552                    left: Box::new(LogicalExpression::FunctionCall {
2553                        name: "size".to_string(),
2554                        args: vec![LogicalExpression::Property {
2555                            variable: "n".to_string(),
2556                            property: "friends".to_string(),
2557                        }],
2558                        distinct: false,
2559                    }),
2560                    op: BinaryOp::Gt,
2561                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
2562                },
2563                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2564                    variable: "n".to_string(),
2565                    label: None,
2566                    input: None,
2567                })),
2568            })),
2569        }));
2570
2571        let physical = planner.plan(&logical).unwrap();
2572        assert_eq!(physical.columns(), &["n"]);
2573    }
2574
2575    // ==================== Expand Tests ====================
2576
2577    #[test]
2578    fn test_plan_expand_outgoing() {
2579        let store = create_test_store();
2580        let planner = Planner::new(store);
2581
2582        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
2583        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2584            items: vec![
2585                ReturnItem {
2586                    expression: LogicalExpression::Variable("a".to_string()),
2587                    alias: None,
2588                },
2589                ReturnItem {
2590                    expression: LogicalExpression::Variable("b".to_string()),
2591                    alias: None,
2592                },
2593            ],
2594            distinct: false,
2595            input: Box::new(LogicalOperator::Expand(ExpandOp {
2596                from_variable: "a".to_string(),
2597                to_variable: "b".to_string(),
2598                edge_variable: None,
2599                direction: ExpandDirection::Outgoing,
2600                edge_type: Some("KNOWS".to_string()),
2601                min_hops: 1,
2602                max_hops: Some(1),
2603                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2604                    variable: "a".to_string(),
2605                    label: Some("Person".to_string()),
2606                    input: None,
2607                })),
2608                path_alias: None,
2609            })),
2610        }));
2611
2612        let physical = planner.plan(&logical).unwrap();
2613        // The return should have columns [a, b]
2614        assert!(physical.columns().contains(&"a".to_string()));
2615        assert!(physical.columns().contains(&"b".to_string()));
2616    }
2617
2618    #[test]
2619    fn test_plan_expand_with_edge_variable() {
2620        let store = create_test_store();
2621        let planner = Planner::new(store);
2622
2623        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
2624        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2625            items: vec![
2626                ReturnItem {
2627                    expression: LogicalExpression::Variable("a".to_string()),
2628                    alias: None,
2629                },
2630                ReturnItem {
2631                    expression: LogicalExpression::Variable("r".to_string()),
2632                    alias: None,
2633                },
2634                ReturnItem {
2635                    expression: LogicalExpression::Variable("b".to_string()),
2636                    alias: None,
2637                },
2638            ],
2639            distinct: false,
2640            input: Box::new(LogicalOperator::Expand(ExpandOp {
2641                from_variable: "a".to_string(),
2642                to_variable: "b".to_string(),
2643                edge_variable: Some("r".to_string()),
2644                direction: ExpandDirection::Outgoing,
2645                edge_type: Some("KNOWS".to_string()),
2646                min_hops: 1,
2647                max_hops: Some(1),
2648                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2649                    variable: "a".to_string(),
2650                    label: None,
2651                    input: None,
2652                })),
2653                path_alias: None,
2654            })),
2655        }));
2656
2657        let physical = planner.plan(&logical).unwrap();
2658        assert!(physical.columns().contains(&"a".to_string()));
2659        assert!(physical.columns().contains(&"r".to_string()));
2660        assert!(physical.columns().contains(&"b".to_string()));
2661    }
2662
2663    // ==================== Limit/Skip/Sort Tests ====================
2664
2665    #[test]
2666    fn test_plan_limit() {
2667        let store = create_test_store();
2668        let planner = Planner::new(store);
2669
2670        // MATCH (n) RETURN n LIMIT 10
2671        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2672            items: vec![ReturnItem {
2673                expression: LogicalExpression::Variable("n".to_string()),
2674                alias: None,
2675            }],
2676            distinct: false,
2677            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2678                count: 10,
2679                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2680                    variable: "n".to_string(),
2681                    label: None,
2682                    input: None,
2683                })),
2684            })),
2685        }));
2686
2687        let physical = planner.plan(&logical).unwrap();
2688        assert_eq!(physical.columns(), &["n"]);
2689    }
2690
2691    #[test]
2692    fn test_plan_skip() {
2693        let store = create_test_store();
2694        let planner = Planner::new(store);
2695
2696        // MATCH (n) RETURN n SKIP 5
2697        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2698            items: vec![ReturnItem {
2699                expression: LogicalExpression::Variable("n".to_string()),
2700                alias: None,
2701            }],
2702            distinct: false,
2703            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2704                count: 5,
2705                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2706                    variable: "n".to_string(),
2707                    label: None,
2708                    input: None,
2709                })),
2710            })),
2711        }));
2712
2713        let physical = planner.plan(&logical).unwrap();
2714        assert_eq!(physical.columns(), &["n"]);
2715    }
2716
2717    #[test]
2718    fn test_plan_sort() {
2719        let store = create_test_store();
2720        let planner = Planner::new(store);
2721
2722        // MATCH (n) RETURN n ORDER BY n.name ASC
2723        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2724            items: vec![ReturnItem {
2725                expression: LogicalExpression::Variable("n".to_string()),
2726                alias: None,
2727            }],
2728            distinct: false,
2729            input: Box::new(LogicalOperator::Sort(SortOp {
2730                keys: vec![SortKey {
2731                    expression: LogicalExpression::Variable("n".to_string()),
2732                    order: SortOrder::Ascending,
2733                }],
2734                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2735                    variable: "n".to_string(),
2736                    label: None,
2737                    input: None,
2738                })),
2739            })),
2740        }));
2741
2742        let physical = planner.plan(&logical).unwrap();
2743        assert_eq!(physical.columns(), &["n"]);
2744    }
2745
2746    #[test]
2747    fn test_plan_sort_descending() {
2748        let store = create_test_store();
2749        let planner = Planner::new(store);
2750
2751        // ORDER BY n DESC
2752        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2753            items: vec![ReturnItem {
2754                expression: LogicalExpression::Variable("n".to_string()),
2755                alias: None,
2756            }],
2757            distinct: false,
2758            input: Box::new(LogicalOperator::Sort(SortOp {
2759                keys: vec![SortKey {
2760                    expression: LogicalExpression::Variable("n".to_string()),
2761                    order: SortOrder::Descending,
2762                }],
2763                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2764                    variable: "n".to_string(),
2765                    label: None,
2766                    input: None,
2767                })),
2768            })),
2769        }));
2770
2771        let physical = planner.plan(&logical).unwrap();
2772        assert_eq!(physical.columns(), &["n"]);
2773    }
2774
2775    #[test]
2776    fn test_plan_distinct() {
2777        let store = create_test_store();
2778        let planner = Planner::new(store);
2779
2780        // MATCH (n) RETURN DISTINCT n
2781        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2782            items: vec![ReturnItem {
2783                expression: LogicalExpression::Variable("n".to_string()),
2784                alias: None,
2785            }],
2786            distinct: false,
2787            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2788                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2789                    variable: "n".to_string(),
2790                    label: None,
2791                    input: None,
2792                })),
2793                columns: None,
2794            })),
2795        }));
2796
2797        let physical = planner.plan(&logical).unwrap();
2798        assert_eq!(physical.columns(), &["n"]);
2799    }
2800
2801    // ==================== Aggregate Tests ====================
2802
2803    #[test]
2804    fn test_plan_aggregate_count() {
2805        let store = create_test_store();
2806        let planner = Planner::new(store);
2807
2808        // MATCH (n) RETURN count(n)
2809        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2810            items: vec![ReturnItem {
2811                expression: LogicalExpression::Variable("cnt".to_string()),
2812                alias: None,
2813            }],
2814            distinct: false,
2815            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
2816                group_by: vec![],
2817                aggregates: vec![LogicalAggregateExpr {
2818                    function: LogicalAggregateFunction::Count,
2819                    expression: Some(LogicalExpression::Variable("n".to_string())),
2820                    distinct: false,
2821                    alias: Some("cnt".to_string()),
2822                    percentile: None,
2823                }],
2824                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2825                    variable: "n".to_string(),
2826                    label: None,
2827                    input: None,
2828                })),
2829                having: None,
2830            })),
2831        }));
2832
2833        let physical = planner.plan(&logical).unwrap();
2834        assert!(physical.columns().contains(&"cnt".to_string()));
2835    }
2836
2837    #[test]
2838    fn test_plan_aggregate_with_group_by() {
2839        let store = create_test_store();
2840        let planner = Planner::new(store);
2841
2842        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
2843        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2844            group_by: vec![LogicalExpression::Property {
2845                variable: "n".to_string(),
2846                property: "city".to_string(),
2847            }],
2848            aggregates: vec![LogicalAggregateExpr {
2849                function: LogicalAggregateFunction::Count,
2850                expression: Some(LogicalExpression::Variable("n".to_string())),
2851                distinct: false,
2852                alias: Some("cnt".to_string()),
2853                percentile: None,
2854            }],
2855            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2856                variable: "n".to_string(),
2857                label: Some("Person".to_string()),
2858                input: None,
2859            })),
2860            having: None,
2861        }));
2862
2863        let physical = planner.plan(&logical).unwrap();
2864        assert_eq!(physical.columns().len(), 2);
2865    }
2866
2867    #[test]
2868    fn test_plan_aggregate_sum() {
2869        let store = create_test_store();
2870        let planner = Planner::new(store);
2871
2872        // SUM(n.value)
2873        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2874            group_by: vec![],
2875            aggregates: vec![LogicalAggregateExpr {
2876                function: LogicalAggregateFunction::Sum,
2877                expression: Some(LogicalExpression::Property {
2878                    variable: "n".to_string(),
2879                    property: "value".to_string(),
2880                }),
2881                distinct: false,
2882                alias: Some("total".to_string()),
2883                percentile: None,
2884            }],
2885            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2886                variable: "n".to_string(),
2887                label: None,
2888                input: None,
2889            })),
2890            having: None,
2891        }));
2892
2893        let physical = planner.plan(&logical).unwrap();
2894        assert!(physical.columns().contains(&"total".to_string()));
2895    }
2896
2897    #[test]
2898    fn test_plan_aggregate_avg() {
2899        let store = create_test_store();
2900        let planner = Planner::new(store);
2901
2902        // AVG(n.score)
2903        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2904            group_by: vec![],
2905            aggregates: vec![LogicalAggregateExpr {
2906                function: LogicalAggregateFunction::Avg,
2907                expression: Some(LogicalExpression::Property {
2908                    variable: "n".to_string(),
2909                    property: "score".to_string(),
2910                }),
2911                distinct: false,
2912                alias: Some("average".to_string()),
2913                percentile: None,
2914            }],
2915            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2916                variable: "n".to_string(),
2917                label: None,
2918                input: None,
2919            })),
2920            having: None,
2921        }));
2922
2923        let physical = planner.plan(&logical).unwrap();
2924        assert!(physical.columns().contains(&"average".to_string()));
2925    }
2926
2927    #[test]
2928    fn test_plan_aggregate_min_max() {
2929        let store = create_test_store();
2930        let planner = Planner::new(store);
2931
2932        // MIN(n.age), MAX(n.age)
2933        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2934            group_by: vec![],
2935            aggregates: vec![
2936                LogicalAggregateExpr {
2937                    function: LogicalAggregateFunction::Min,
2938                    expression: Some(LogicalExpression::Property {
2939                        variable: "n".to_string(),
2940                        property: "age".to_string(),
2941                    }),
2942                    distinct: false,
2943                    alias: Some("youngest".to_string()),
2944                    percentile: None,
2945                },
2946                LogicalAggregateExpr {
2947                    function: LogicalAggregateFunction::Max,
2948                    expression: Some(LogicalExpression::Property {
2949                        variable: "n".to_string(),
2950                        property: "age".to_string(),
2951                    }),
2952                    distinct: false,
2953                    alias: Some("oldest".to_string()),
2954                    percentile: None,
2955                },
2956            ],
2957            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2958                variable: "n".to_string(),
2959                label: None,
2960                input: None,
2961            })),
2962            having: None,
2963        }));
2964
2965        let physical = planner.plan(&logical).unwrap();
2966        assert!(physical.columns().contains(&"youngest".to_string()));
2967        assert!(physical.columns().contains(&"oldest".to_string()));
2968    }
2969
2970    // ==================== Join Tests ====================
2971
2972    #[test]
2973    fn test_plan_inner_join() {
2974        let store = create_test_store();
2975        let planner = Planner::new(store);
2976
2977        // Inner join between two scans
2978        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2979            items: vec![
2980                ReturnItem {
2981                    expression: LogicalExpression::Variable("a".to_string()),
2982                    alias: None,
2983                },
2984                ReturnItem {
2985                    expression: LogicalExpression::Variable("b".to_string()),
2986                    alias: None,
2987                },
2988            ],
2989            distinct: false,
2990            input: Box::new(LogicalOperator::Join(JoinOp {
2991                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2992                    variable: "a".to_string(),
2993                    label: Some("Person".to_string()),
2994                    input: None,
2995                })),
2996                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2997                    variable: "b".to_string(),
2998                    label: Some("Company".to_string()),
2999                    input: None,
3000                })),
3001                join_type: JoinType::Inner,
3002                conditions: vec![JoinCondition {
3003                    left: LogicalExpression::Variable("a".to_string()),
3004                    right: LogicalExpression::Variable("b".to_string()),
3005                }],
3006            })),
3007        }));
3008
3009        let physical = planner.plan(&logical).unwrap();
3010        assert!(physical.columns().contains(&"a".to_string()));
3011        assert!(physical.columns().contains(&"b".to_string()));
3012    }
3013
3014    #[test]
3015    fn test_plan_cross_join() {
3016        let store = create_test_store();
3017        let planner = Planner::new(store);
3018
3019        // Cross join (no conditions)
3020        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3021            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3022                variable: "a".to_string(),
3023                label: None,
3024                input: None,
3025            })),
3026            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3027                variable: "b".to_string(),
3028                label: None,
3029                input: None,
3030            })),
3031            join_type: JoinType::Cross,
3032            conditions: vec![],
3033        }));
3034
3035        let physical = planner.plan(&logical).unwrap();
3036        assert_eq!(physical.columns().len(), 2);
3037    }
3038
3039    #[test]
3040    fn test_plan_left_join() {
3041        let store = create_test_store();
3042        let planner = Planner::new(store);
3043
3044        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3045            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3046                variable: "a".to_string(),
3047                label: None,
3048                input: None,
3049            })),
3050            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3051                variable: "b".to_string(),
3052                label: None,
3053                input: None,
3054            })),
3055            join_type: JoinType::Left,
3056            conditions: vec![],
3057        }));
3058
3059        let physical = planner.plan(&logical).unwrap();
3060        assert_eq!(physical.columns().len(), 2);
3061    }
3062
3063    // ==================== Mutation Tests ====================
3064
3065    #[test]
3066    fn test_plan_create_node() {
3067        let store = create_test_store();
3068        let planner = Planner::new(store);
3069
3070        // CREATE (n:Person {name: 'Alice'})
3071        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3072            variable: "n".to_string(),
3073            labels: vec!["Person".to_string()],
3074            properties: vec![(
3075                "name".to_string(),
3076                LogicalExpression::Literal(Value::String("Alice".into())),
3077            )],
3078            input: None,
3079        }));
3080
3081        let physical = planner.plan(&logical).unwrap();
3082        assert!(physical.columns().contains(&"n".to_string()));
3083    }
3084
3085    #[test]
3086    fn test_plan_create_edge() {
3087        let store = create_test_store();
3088        let planner = Planner::new(store);
3089
3090        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
3091        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
3092            variable: Some("r".to_string()),
3093            from_variable: "a".to_string(),
3094            to_variable: "b".to_string(),
3095            edge_type: "KNOWS".to_string(),
3096            properties: vec![],
3097            input: Box::new(LogicalOperator::Join(JoinOp {
3098                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3099                    variable: "a".to_string(),
3100                    label: None,
3101                    input: None,
3102                })),
3103                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3104                    variable: "b".to_string(),
3105                    label: None,
3106                    input: None,
3107                })),
3108                join_type: JoinType::Cross,
3109                conditions: vec![],
3110            })),
3111        }));
3112
3113        let physical = planner.plan(&logical).unwrap();
3114        assert!(physical.columns().contains(&"r".to_string()));
3115    }
3116
3117    #[test]
3118    fn test_plan_delete_node() {
3119        let store = create_test_store();
3120        let planner = Planner::new(store);
3121
3122        // MATCH (n) DELETE n
3123        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
3124            variable: "n".to_string(),
3125            detach: false,
3126            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3127                variable: "n".to_string(),
3128                label: None,
3129                input: None,
3130            })),
3131        }));
3132
3133        let physical = planner.plan(&logical).unwrap();
3134        assert!(physical.columns().contains(&"deleted_count".to_string()));
3135    }
3136
3137    // ==================== Error Cases ====================
3138
3139    #[test]
3140    fn test_plan_empty_errors() {
3141        let store = create_test_store();
3142        let planner = Planner::new(store);
3143
3144        let logical = LogicalPlan::new(LogicalOperator::Empty);
3145        let result = planner.plan(&logical);
3146        assert!(result.is_err());
3147    }
3148
3149    #[test]
3150    fn test_plan_missing_variable_in_return() {
3151        let store = create_test_store();
3152        let planner = Planner::new(store);
3153
3154        // Return variable that doesn't exist in input
3155        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3156            items: vec![ReturnItem {
3157                expression: LogicalExpression::Variable("missing".to_string()),
3158                alias: None,
3159            }],
3160            distinct: false,
3161            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3162                variable: "n".to_string(),
3163                label: None,
3164                input: None,
3165            })),
3166        }));
3167
3168        let result = planner.plan(&logical);
3169        assert!(result.is_err());
3170    }
3171
3172    // ==================== Helper Function Tests ====================
3173
3174    #[test]
3175    fn test_convert_binary_ops() {
3176        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
3177        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
3178        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
3179        assert!(convert_binary_op(BinaryOp::Le).is_ok());
3180        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
3181        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
3182        assert!(convert_binary_op(BinaryOp::And).is_ok());
3183        assert!(convert_binary_op(BinaryOp::Or).is_ok());
3184        assert!(convert_binary_op(BinaryOp::Add).is_ok());
3185        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
3186        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
3187        assert!(convert_binary_op(BinaryOp::Div).is_ok());
3188    }
3189
3190    #[test]
3191    fn test_convert_unary_ops() {
3192        assert!(convert_unary_op(UnaryOp::Not).is_ok());
3193        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
3194        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
3195        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
3196    }
3197
3198    #[test]
3199    fn test_convert_aggregate_functions() {
3200        assert!(matches!(
3201            convert_aggregate_function(LogicalAggregateFunction::Count),
3202            PhysicalAggregateFunction::Count
3203        ));
3204        assert!(matches!(
3205            convert_aggregate_function(LogicalAggregateFunction::Sum),
3206            PhysicalAggregateFunction::Sum
3207        ));
3208        assert!(matches!(
3209            convert_aggregate_function(LogicalAggregateFunction::Avg),
3210            PhysicalAggregateFunction::Avg
3211        ));
3212        assert!(matches!(
3213            convert_aggregate_function(LogicalAggregateFunction::Min),
3214            PhysicalAggregateFunction::Min
3215        ));
3216        assert!(matches!(
3217            convert_aggregate_function(LogicalAggregateFunction::Max),
3218            PhysicalAggregateFunction::Max
3219        ));
3220    }
3221
3222    #[test]
3223    fn test_planner_accessors() {
3224        let store = create_test_store();
3225        let planner = Planner::new(Arc::clone(&store));
3226
3227        assert!(planner.tx_id().is_none());
3228        assert!(planner.tx_manager().is_none());
3229        let _ = planner.viewing_epoch(); // Just ensure it's accessible
3230    }
3231
3232    #[test]
3233    fn test_physical_plan_accessors() {
3234        let store = create_test_store();
3235        let planner = Planner::new(store);
3236
3237        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
3238            variable: "n".to_string(),
3239            label: None,
3240            input: None,
3241        }));
3242
3243        let physical = planner.plan(&logical).unwrap();
3244        assert_eq!(physical.columns(), &["n"]);
3245
3246        // Test into_operator
3247        let _ = physical.into_operator();
3248    }
3249}