Skip to main content

grafeo_engine/query/
planner.rs

1//! Logical query planner.
2//!
3//! Converts a logical plan into a physical plan (tree of operators).
4
5use crate::query::plan::{
6    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
7    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
8    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
9    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, SkipOp, SortOp,
10    SortOrder, UnaryOp, UnionOp, UnwindOp,
11};
12use grafeo_common::types::LogicalType;
13use grafeo_common::types::{EpochId, TxId};
14use grafeo_common::utils::error::{Error, Result};
15use grafeo_core::execution::operators::{
16    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
17    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
18    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
19    ExpressionPredicate, FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
20    JoinType as PhysicalJoinType, LimitOperator, MergeOperator, NullOrder, Operator, ProjectExpr,
21    ProjectOperator, PropertySource, RemoveLabelOperator, ScanOperator, SetPropertyOperator,
22    SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
23    UnaryFilterOp, UnionOperator, UnwindOperator,
24};
25use grafeo_core::graph::{Direction, lpg::LpgStore};
26use std::collections::HashMap;
27use std::sync::Arc;
28
29use crate::transaction::TransactionManager;
30
31/// Converts a logical plan to a physical operator tree.
32pub struct Planner {
33    /// The graph store to scan from.
34    store: Arc<LpgStore>,
35    /// Transaction manager for MVCC operations.
36    tx_manager: Option<Arc<TransactionManager>>,
37    /// Current transaction ID (if in a transaction).
38    tx_id: Option<TxId>,
39    /// Epoch to use for visibility checks.
40    viewing_epoch: EpochId,
41    /// Counter for generating unique anonymous edge column names.
42    anon_edge_counter: std::cell::Cell<u32>,
43}
44
45impl Planner {
46    /// Creates a new planner with the given store.
47    ///
48    /// This creates a planner without transaction context, using the current
49    /// epoch from the store for visibility.
50    #[must_use]
51    pub fn new(store: Arc<LpgStore>) -> Self {
52        let epoch = store.current_epoch();
53        Self {
54            store,
55            tx_manager: None,
56            tx_id: None,
57            viewing_epoch: epoch,
58            anon_edge_counter: std::cell::Cell::new(0),
59        }
60    }
61
62    /// Creates a new planner with transaction context for MVCC-aware planning.
63    ///
64    /// # Arguments
65    ///
66    /// * `store` - The graph store
67    /// * `tx_manager` - Transaction manager for recording reads/writes
68    /// * `tx_id` - Current transaction ID (None for auto-commit)
69    /// * `viewing_epoch` - Epoch to use for version visibility
70    #[must_use]
71    pub fn with_context(
72        store: Arc<LpgStore>,
73        tx_manager: Arc<TransactionManager>,
74        tx_id: Option<TxId>,
75        viewing_epoch: EpochId,
76    ) -> Self {
77        Self {
78            store,
79            tx_manager: Some(tx_manager),
80            tx_id,
81            viewing_epoch,
82            anon_edge_counter: std::cell::Cell::new(0),
83        }
84    }
85
86    /// Returns the viewing epoch for this planner.
87    #[must_use]
88    pub fn viewing_epoch(&self) -> EpochId {
89        self.viewing_epoch
90    }
91
92    /// Returns the transaction ID for this planner, if any.
93    #[must_use]
94    pub fn tx_id(&self) -> Option<TxId> {
95        self.tx_id
96    }
97
98    /// Returns a reference to the transaction manager, if available.
99    #[must_use]
100    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
101        self.tx_manager.as_ref()
102    }
103
104    /// Plans a logical plan into a physical operator.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if planning fails.
109    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
110        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
111        Ok(PhysicalPlan { operator, columns })
112    }
113
114    /// Plans a single logical operator.
115    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
116        match op {
117            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
118            LogicalOperator::Expand(expand) => self.plan_expand(expand),
119            LogicalOperator::Return(ret) => self.plan_return(ret),
120            LogicalOperator::Filter(filter) => self.plan_filter(filter),
121            LogicalOperator::Project(project) => {
122                // For now, just plan the input
123                self.plan_operator(&project.input)
124            }
125            LogicalOperator::Limit(limit) => self.plan_limit(limit),
126            LogicalOperator::Skip(skip) => self.plan_skip(skip),
127            LogicalOperator::Sort(sort) => self.plan_sort(sort),
128            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
129            LogicalOperator::Join(join) => self.plan_join(join),
130            LogicalOperator::Union(union) => self.plan_union(union),
131            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
132            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
133            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
134            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
135            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
136            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
137            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
138            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
139            LogicalOperator::Merge(merge) => self.plan_merge(merge),
140            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
141            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
142            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
143            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
144            _ => Err(Error::Internal(format!(
145                "Unsupported operator: {:?}",
146                std::mem::discriminant(op)
147            ))),
148        }
149    }
150
151    /// Plans a node scan operator.
152    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
153        let scan_op = if let Some(label) = &scan.label {
154            ScanOperator::with_label(Arc::clone(&self.store), label)
155        } else {
156            ScanOperator::new(Arc::clone(&self.store))
157        };
158
159        // Apply MVCC context if available
160        let operator: Box<dyn Operator> =
161            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
162
163        let columns = vec![scan.variable.clone()];
164
165        // If there's an input, we'd need to chain operators
166        // For now, just return the scan
167        Ok((operator, columns))
168    }
169
170    /// Plans an expand operator.
171    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
172        // Plan the input operator first
173        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
174
175        // Find the source column index
176        let source_column = input_columns
177            .iter()
178            .position(|c| c == &expand.from_variable)
179            .ok_or_else(|| {
180                Error::Internal(format!(
181                    "Source variable '{}' not found in input columns",
182                    expand.from_variable
183                ))
184            })?;
185
186        // Convert expand direction
187        let direction = match expand.direction {
188            ExpandDirection::Outgoing => Direction::Outgoing,
189            ExpandDirection::Incoming => Direction::Incoming,
190            ExpandDirection::Both => Direction::Both,
191        };
192
193        // Create the expand operator with MVCC context
194        let expand_op = ExpandOperator::new(
195            Arc::clone(&self.store),
196            input_op,
197            source_column,
198            direction,
199            expand.edge_type.clone(),
200        )
201        .with_tx_context(self.viewing_epoch, self.tx_id);
202
203        let operator: Box<dyn Operator> = Box::new(expand_op);
204
205        // Build output columns: [input_columns..., edge, target]
206        // Preserve all input columns and add edge + target to match ExpandOperator output
207        let mut columns = input_columns;
208
209        // Generate edge column name - use provided name or generate anonymous name
210        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
211            let count = self.anon_edge_counter.get();
212            self.anon_edge_counter.set(count + 1);
213            format!("_anon_edge_{}", count)
214        });
215        columns.push(edge_col_name);
216
217        columns.push(expand.to_variable.clone());
218
219        Ok((operator, columns))
220    }
221
222    /// Plans a RETURN clause.
223    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
224        // Plan the input operator
225        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
226
227        // Build variable to column index mapping
228        let variable_columns: HashMap<String, usize> = input_columns
229            .iter()
230            .enumerate()
231            .map(|(i, name)| (name.clone(), i))
232            .collect();
233
234        // Extract column names from return items
235        let columns: Vec<String> = ret
236            .items
237            .iter()
238            .map(|item| {
239                item.alias.clone().unwrap_or_else(|| {
240                    // Generate a default name from the expression
241                    expression_to_string(&item.expression)
242                })
243            })
244            .collect();
245
246        // Check if we need a project operator (for property access or expression evaluation)
247        let needs_project = ret
248            .items
249            .iter()
250            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
251
252        if needs_project {
253            // Build project expressions
254            let mut projections = Vec::with_capacity(ret.items.len());
255            let mut output_types = Vec::with_capacity(ret.items.len());
256
257            for item in &ret.items {
258                match &item.expression {
259                    LogicalExpression::Variable(name) => {
260                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
261                            Error::Internal(format!("Variable '{}' not found in input", name))
262                        })?;
263                        projections.push(ProjectExpr::Column(col_idx));
264                        // Use Node type for variables (they could be nodes, edges, or values)
265                        output_types.push(LogicalType::Node);
266                    }
267                    LogicalExpression::Property { variable, property } => {
268                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
269                            Error::Internal(format!("Variable '{}' not found in input", variable))
270                        })?;
271                        projections.push(ProjectExpr::PropertyAccess {
272                            column: col_idx,
273                            property: property.clone(),
274                        });
275                        // Property could be any type - use String as default
276                        output_types.push(LogicalType::String);
277                    }
278                    LogicalExpression::Literal(value) => {
279                        projections.push(ProjectExpr::Constant(value.clone()));
280                        output_types.push(value_to_logical_type(value));
281                    }
282                    _ => {
283                        return Err(Error::Internal(format!(
284                            "Unsupported RETURN expression: {:?}",
285                            item.expression
286                        )));
287                    }
288                }
289            }
290
291            let operator = Box::new(ProjectOperator::with_store(
292                input_op,
293                projections,
294                output_types,
295                Arc::clone(&self.store),
296            ));
297
298            Ok((operator, columns))
299        } else {
300            // Simple case: just return variables
301            // Re-order columns to match return items if needed
302            let mut projections = Vec::with_capacity(ret.items.len());
303            let mut output_types = Vec::with_capacity(ret.items.len());
304
305            for item in &ret.items {
306                if let LogicalExpression::Variable(name) = &item.expression {
307                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
308                        Error::Internal(format!("Variable '{}' not found in input", name))
309                    })?;
310                    projections.push(ProjectExpr::Column(col_idx));
311                    output_types.push(LogicalType::Node);
312                }
313            }
314
315            // Only add ProjectOperator if reordering is needed
316            if projections.len() == input_columns.len()
317                && projections
318                    .iter()
319                    .enumerate()
320                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
321            {
322                // No reordering needed
323                Ok((input_op, columns))
324            } else {
325                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
326                Ok((operator, columns))
327            }
328        }
329    }
330
331    /// Plans a filter operator.
332    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
333        // Plan the input operator first
334        let (input_op, columns) = self.plan_operator(&filter.input)?;
335
336        // Build variable to column index mapping
337        let variable_columns: HashMap<String, usize> = columns
338            .iter()
339            .enumerate()
340            .map(|(i, name)| (name.clone(), i))
341            .collect();
342
343        // Convert logical expression to filter expression
344        let filter_expr = self.convert_expression(&filter.predicate)?;
345
346        // Create the predicate
347        let predicate =
348            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
349
350        // Create the filter operator
351        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
352
353        Ok((operator, columns))
354    }
355
356    /// Plans a LIMIT operator.
357    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
358        let (input_op, columns) = self.plan_operator(&limit.input)?;
359        let output_schema = self.derive_schema_from_columns(&columns);
360        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
361        Ok((operator, columns))
362    }
363
364    /// Plans a SKIP operator.
365    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
366        let (input_op, columns) = self.plan_operator(&skip.input)?;
367        let output_schema = self.derive_schema_from_columns(&columns);
368        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
369        Ok((operator, columns))
370    }
371
372    /// Plans a SORT (ORDER BY) operator.
373    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
374        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
375
376        // Build variable to column index mapping
377        let mut variable_columns: HashMap<String, usize> = input_columns
378            .iter()
379            .enumerate()
380            .map(|(i, name)| (name.clone(), i))
381            .collect();
382
383        // Collect property expressions that need to be projected before sorting
384        let mut property_projections: Vec<(String, String, String)> = Vec::new();
385        let mut next_col_idx = input_columns.len();
386
387        for key in &sort.keys {
388            if let LogicalExpression::Property { variable, property } = &key.expression {
389                let col_name = format!("{}_{}", variable, property);
390                if !variable_columns.contains_key(&col_name) {
391                    property_projections.push((
392                        variable.clone(),
393                        property.clone(),
394                        col_name.clone(),
395                    ));
396                    variable_columns.insert(col_name, next_col_idx);
397                    next_col_idx += 1;
398                }
399            }
400        }
401
402        // Track output columns
403        let mut output_columns = input_columns.clone();
404
405        // If we have property expressions, add a projection to materialize them
406        if !property_projections.is_empty() {
407            let mut projections = Vec::new();
408            let mut output_types = Vec::new();
409
410            // First, pass through all existing columns
411            for (i, _) in input_columns.iter().enumerate() {
412                projections.push(ProjectExpr::Column(i));
413                output_types.push(LogicalType::Node);
414            }
415
416            // Then add property access projections
417            for (variable, property, col_name) in &property_projections {
418                let source_col = *variable_columns.get(variable).ok_or_else(|| {
419                    Error::Internal(format!(
420                        "Variable '{}' not found for ORDER BY property projection",
421                        variable
422                    ))
423                })?;
424                projections.push(ProjectExpr::PropertyAccess {
425                    column: source_col,
426                    property: property.clone(),
427                });
428                output_types.push(LogicalType::Any);
429                output_columns.push(col_name.clone());
430            }
431
432            input_op = Box::new(ProjectOperator::with_store(
433                input_op,
434                projections,
435                output_types,
436                Arc::clone(&self.store),
437            ));
438        }
439
440        // Convert logical sort keys to physical sort keys
441        let physical_keys: Vec<PhysicalSortKey> = sort
442            .keys
443            .iter()
444            .map(|key| {
445                let col_idx =
446                    self.resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
447                Ok(PhysicalSortKey {
448                    column: col_idx,
449                    direction: match key.order {
450                        SortOrder::Ascending => SortDirection::Ascending,
451                        SortOrder::Descending => SortDirection::Descending,
452                    },
453                    null_order: NullOrder::NullsLast,
454                })
455            })
456            .collect::<Result<Vec<_>>>()?;
457
458        let output_schema = self.derive_schema_from_columns(&output_columns);
459        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
460        Ok((operator, output_columns))
461    }
462
463    /// Resolves a sort expression to a column index, using projected property columns.
464    fn resolve_sort_expression_with_properties(
465        &self,
466        expr: &LogicalExpression,
467        variable_columns: &HashMap<String, usize>,
468    ) -> Result<usize> {
469        match expr {
470            LogicalExpression::Variable(name) => variable_columns
471                .get(name)
472                .copied()
473                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found for ORDER BY", name))),
474            LogicalExpression::Property { variable, property } => {
475                // Look up the projected property column (e.g., "p_age" for p.age)
476                let col_name = format!("{}_{}", variable, property);
477                variable_columns.get(&col_name).copied().ok_or_else(|| {
478                    Error::Internal(format!(
479                        "Property column '{}' not found for ORDER BY (from {}.{})",
480                        col_name, variable, property
481                    ))
482                })
483            }
484            _ => Err(Error::Internal(format!(
485                "Unsupported ORDER BY expression: {:?}",
486                expr
487            ))),
488        }
489    }
490
491    /// Derives a schema from column names (assumes Node type as default).
492    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
493        columns.iter().map(|_| LogicalType::Node).collect()
494    }
495
496    /// Plans an AGGREGATE operator.
497    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
498        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
499
500        // Build variable to column index mapping
501        let mut variable_columns: HashMap<String, usize> = input_columns
502            .iter()
503            .enumerate()
504            .map(|(i, name)| (name.clone(), i))
505            .collect();
506
507        // Collect all property expressions that need to be projected before aggregation
508        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
509        let mut next_col_idx = input_columns.len();
510
511        // Check group-by expressions for properties
512        for expr in &agg.group_by {
513            if let LogicalExpression::Property { variable, property } = expr {
514                let col_name = format!("{}_{}", variable, property);
515                if !variable_columns.contains_key(&col_name) {
516                    property_projections.push((
517                        variable.clone(),
518                        property.clone(),
519                        col_name.clone(),
520                    ));
521                    variable_columns.insert(col_name, next_col_idx);
522                    next_col_idx += 1;
523                }
524            }
525        }
526
527        // Check aggregate expressions for properties
528        for agg_expr in &agg.aggregates {
529            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
530                let col_name = format!("{}_{}", variable, property);
531                if !variable_columns.contains_key(&col_name) {
532                    property_projections.push((
533                        variable.clone(),
534                        property.clone(),
535                        col_name.clone(),
536                    ));
537                    variable_columns.insert(col_name, next_col_idx);
538                    next_col_idx += 1;
539                }
540            }
541        }
542
543        // If we have property expressions, add a projection to materialize them
544        if !property_projections.is_empty() {
545            let mut projections = Vec::new();
546            let mut output_types = Vec::new();
547
548            // First, pass through all existing columns
549            for (i, _) in input_columns.iter().enumerate() {
550                projections.push(ProjectExpr::Column(i));
551                output_types.push(LogicalType::Node);
552            }
553
554            // Then add property access projections
555            for (variable, property, _col_name) in &property_projections {
556                let source_col = *variable_columns.get(variable).ok_or_else(|| {
557                    Error::Internal(format!(
558                        "Variable '{}' not found for property projection",
559                        variable
560                    ))
561                })?;
562                projections.push(ProjectExpr::PropertyAccess {
563                    column: source_col,
564                    property: property.clone(),
565                });
566                output_types.push(LogicalType::Int64); // Properties are typically numeric for aggregates
567            }
568
569            input_op = Box::new(ProjectOperator::with_store(
570                input_op,
571                projections,
572                output_types,
573                Arc::clone(&self.store),
574            ));
575        }
576
577        // Convert group-by expressions to column indices
578        let group_columns: Vec<usize> = agg
579            .group_by
580            .iter()
581            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
582            .collect::<Result<Vec<_>>>()?;
583
584        // Convert aggregate expressions to physical form
585        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
586            .aggregates
587            .iter()
588            .map(|agg_expr| {
589                let column = agg_expr
590                    .expression
591                    .as_ref()
592                    .map(|e| {
593                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
594                    })
595                    .transpose()?;
596
597                Ok(PhysicalAggregateExpr {
598                    function: convert_aggregate_function(agg_expr.function),
599                    column,
600                    distinct: agg_expr.distinct,
601                    alias: agg_expr.alias.clone(),
602                })
603            })
604            .collect::<Result<Vec<_>>>()?;
605
606        // Build output schema and column names
607        let mut output_schema = Vec::new();
608        let mut output_columns = Vec::new();
609
610        // Add group-by columns
611        for (idx, expr) in agg.group_by.iter().enumerate() {
612            output_schema.push(LogicalType::Node); // Default type
613            output_columns.push(expression_to_string(expr));
614            // If there's a group column, we need to track it
615            if idx < group_columns.len() {
616                // Group column preserved
617            }
618        }
619
620        // Add aggregate result columns
621        for agg_expr in &agg.aggregates {
622            let result_type = match agg_expr.function {
623                LogicalAggregateFunction::Count => LogicalType::Int64,
624                LogicalAggregateFunction::Sum => LogicalType::Int64,
625                LogicalAggregateFunction::Avg => LogicalType::Float64,
626                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
627                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
628                    // since the aggregate can return any Value type, but the most common case
629                    // is numeric values from property expressions
630                    LogicalType::Int64
631                }
632                LogicalAggregateFunction::Collect => LogicalType::String, // List type
633            };
634            output_schema.push(result_type);
635            output_columns.push(
636                agg_expr
637                    .alias
638                    .clone()
639                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
640            );
641        }
642
643        // Choose operator based on whether there are group-by columns
644        let operator: Box<dyn Operator> = if group_columns.is_empty() {
645            Box::new(SimpleAggregateOperator::new(
646                input_op,
647                physical_aggregates,
648                output_schema,
649            ))
650        } else {
651            Box::new(HashAggregateOperator::new(
652                input_op,
653                group_columns,
654                physical_aggregates,
655                output_schema,
656            ))
657        };
658
659        Ok((operator, output_columns))
660    }
661
662    /// Resolves a logical expression to a column index.
663    #[allow(dead_code)]
664    fn resolve_expression_to_column(
665        &self,
666        expr: &LogicalExpression,
667        variable_columns: &HashMap<String, usize>,
668    ) -> Result<usize> {
669        match expr {
670            LogicalExpression::Variable(name) => variable_columns
671                .get(name)
672                .copied()
673                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
674            LogicalExpression::Property { variable, .. } => variable_columns
675                .get(variable)
676                .copied()
677                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
678            _ => Err(Error::Internal(format!(
679                "Cannot resolve expression to column: {:?}",
680                expr
681            ))),
682        }
683    }
684
685    /// Resolves a logical expression to a column index, using projected property columns.
686    ///
687    /// This is used for aggregations where properties have been projected into their own columns.
688    fn resolve_expression_to_column_with_properties(
689        &self,
690        expr: &LogicalExpression,
691        variable_columns: &HashMap<String, usize>,
692    ) -> Result<usize> {
693        match expr {
694            LogicalExpression::Variable(name) => variable_columns
695                .get(name)
696                .copied()
697                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
698            LogicalExpression::Property { variable, property } => {
699                // Look up the projected property column (e.g., "p_price" for p.price)
700                let col_name = format!("{}_{}", variable, property);
701                variable_columns.get(&col_name).copied().ok_or_else(|| {
702                    Error::Internal(format!(
703                        "Property column '{}' not found (from {}.{})",
704                        col_name, variable, property
705                    ))
706                })
707            }
708            _ => Err(Error::Internal(format!(
709                "Cannot resolve expression to column: {:?}",
710                expr
711            ))),
712        }
713    }
714
715    /// Converts a logical expression to a filter expression.
716    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
717        match expr {
718            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
719            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
720            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
721                variable: variable.clone(),
722                property: property.clone(),
723            }),
724            LogicalExpression::Binary { left, op, right } => {
725                let left_expr = self.convert_expression(left)?;
726                let right_expr = self.convert_expression(right)?;
727                let filter_op = convert_binary_op(*op)?;
728                Ok(FilterExpression::Binary {
729                    left: Box::new(left_expr),
730                    op: filter_op,
731                    right: Box::new(right_expr),
732                })
733            }
734            LogicalExpression::Unary { op, operand } => {
735                let operand_expr = self.convert_expression(operand)?;
736                let filter_op = convert_unary_op(*op)?;
737                Ok(FilterExpression::Unary {
738                    op: filter_op,
739                    operand: Box::new(operand_expr),
740                })
741            }
742            LogicalExpression::FunctionCall { name, args } => {
743                let filter_args: Vec<FilterExpression> = args
744                    .iter()
745                    .map(|a| self.convert_expression(a))
746                    .collect::<Result<Vec<_>>>()?;
747                Ok(FilterExpression::FunctionCall {
748                    name: name.clone(),
749                    args: filter_args,
750                })
751            }
752            LogicalExpression::Case {
753                operand,
754                when_clauses,
755                else_clause,
756            } => {
757                let filter_operand = operand
758                    .as_ref()
759                    .map(|e| self.convert_expression(e))
760                    .transpose()?
761                    .map(Box::new);
762                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
763                    .iter()
764                    .map(|(cond, result)| {
765                        Ok((
766                            self.convert_expression(cond)?,
767                            self.convert_expression(result)?,
768                        ))
769                    })
770                    .collect::<Result<Vec<_>>>()?;
771                let filter_else = else_clause
772                    .as_ref()
773                    .map(|e| self.convert_expression(e))
774                    .transpose()?
775                    .map(Box::new);
776                Ok(FilterExpression::Case {
777                    operand: filter_operand,
778                    when_clauses: filter_when_clauses,
779                    else_clause: filter_else,
780                })
781            }
782            LogicalExpression::List(items) => {
783                let filter_items: Vec<FilterExpression> = items
784                    .iter()
785                    .map(|item| self.convert_expression(item))
786                    .collect::<Result<Vec<_>>>()?;
787                Ok(FilterExpression::List(filter_items))
788            }
789            LogicalExpression::Map(pairs) => {
790                let filter_pairs: Vec<(String, FilterExpression)> = pairs
791                    .iter()
792                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
793                    .collect::<Result<Vec<_>>>()?;
794                Ok(FilterExpression::Map(filter_pairs))
795            }
796            LogicalExpression::IndexAccess { base, index } => {
797                let base_expr = self.convert_expression(base)?;
798                let index_expr = self.convert_expression(index)?;
799                Ok(FilterExpression::IndexAccess {
800                    base: Box::new(base_expr),
801                    index: Box::new(index_expr),
802                })
803            }
804            LogicalExpression::SliceAccess { base, start, end } => {
805                let base_expr = self.convert_expression(base)?;
806                let start_expr = start
807                    .as_ref()
808                    .map(|s| self.convert_expression(s))
809                    .transpose()?
810                    .map(Box::new);
811                let end_expr = end
812                    .as_ref()
813                    .map(|e| self.convert_expression(e))
814                    .transpose()?
815                    .map(Box::new);
816                Ok(FilterExpression::SliceAccess {
817                    base: Box::new(base_expr),
818                    start: start_expr,
819                    end: end_expr,
820                })
821            }
822            LogicalExpression::Parameter(_) => Err(Error::Internal(
823                "Parameters not yet supported in filters".to_string(),
824            )),
825            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
826            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
827            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
828            LogicalExpression::ListComprehension {
829                variable,
830                list_expr,
831                filter_expr,
832                map_expr,
833            } => {
834                let list = self.convert_expression(list_expr)?;
835                let filter = filter_expr
836                    .as_ref()
837                    .map(|f| self.convert_expression(f))
838                    .transpose()?
839                    .map(Box::new);
840                let map = self.convert_expression(map_expr)?;
841                Ok(FilterExpression::ListComprehension {
842                    variable: variable.clone(),
843                    list_expr: Box::new(list),
844                    filter_expr: filter,
845                    map_expr: Box::new(map),
846                })
847            }
848            LogicalExpression::ExistsSubquery(subplan) => {
849                // Extract the pattern from the subplan
850                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
851                let (start_var, direction, edge_type, end_labels) =
852                    self.extract_exists_pattern(subplan)?;
853
854                Ok(FilterExpression::ExistsSubquery {
855                    start_var,
856                    direction,
857                    edge_type,
858                    end_labels,
859                    min_hops: None,
860                    max_hops: None,
861                })
862            }
863            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
864                "COUNT subqueries not yet supported".to_string(),
865            )),
866        }
867    }
868
869    /// Extracts the pattern from an EXISTS subplan.
870    /// Returns (start_variable, direction, edge_type, end_labels).
871    fn extract_exists_pattern(
872        &self,
873        subplan: &LogicalOperator,
874    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
875        match subplan {
876            LogicalOperator::Expand(expand) => {
877                // Get end node labels from the to_variable if there's a node scan input
878                let end_labels = self.extract_end_labels_from_expand(expand);
879                let direction = match expand.direction {
880                    ExpandDirection::Outgoing => Direction::Outgoing,
881                    ExpandDirection::Incoming => Direction::Incoming,
882                    ExpandDirection::Both => Direction::Both,
883                };
884                Ok((
885                    expand.from_variable.clone(),
886                    direction,
887                    expand.edge_type.clone(),
888                    end_labels,
889                ))
890            }
891            LogicalOperator::NodeScan(scan) => {
892                if let Some(input) = &scan.input {
893                    self.extract_exists_pattern(input)
894                } else {
895                    Err(Error::Internal(
896                        "EXISTS subquery must contain an edge pattern".to_string(),
897                    ))
898                }
899            }
900            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
901            _ => Err(Error::Internal(
902                "Unsupported EXISTS subquery pattern".to_string(),
903            )),
904        }
905    }
906
907    /// Extracts end node labels from an Expand operator if present.
908    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
909        // Check if the expand has a NodeScan input with a label filter
910        match expand.input.as_ref() {
911            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
912            _ => None,
913        }
914    }
915
916    /// Plans a JOIN operator.
917    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
918        let (left_op, left_columns) = self.plan_operator(&join.left)?;
919        let (right_op, right_columns) = self.plan_operator(&join.right)?;
920
921        // Build combined output columns
922        let mut columns = left_columns.clone();
923        columns.extend(right_columns.clone());
924
925        // Convert join type
926        let physical_join_type = match join.join_type {
927            JoinType::Inner => PhysicalJoinType::Inner,
928            JoinType::Left => PhysicalJoinType::Left,
929            JoinType::Right => PhysicalJoinType::Right,
930            JoinType::Full => PhysicalJoinType::Full,
931            JoinType::Cross => PhysicalJoinType::Cross,
932            JoinType::Semi => PhysicalJoinType::Semi,
933            JoinType::Anti => PhysicalJoinType::Anti,
934        };
935
936        // Build key columns from join conditions
937        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
938            // Cross join - no keys
939            (vec![], vec![])
940        } else {
941            join.conditions
942                .iter()
943                .filter_map(|cond| {
944                    // Try to extract column indices from expressions
945                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
946                    let right_idx = self
947                        .expression_to_column(&cond.right, &right_columns)
948                        .ok()?;
949                    Some((left_idx, right_idx))
950                })
951                .unzip()
952        };
953
954        let output_schema = self.derive_schema_from_columns(&columns);
955
956        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
957            left_op,
958            right_op,
959            probe_keys,
960            build_keys,
961            physical_join_type,
962            output_schema,
963        ));
964
965        Ok((operator, columns))
966    }
967
968    /// Extracts a column index from an expression.
969    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
970        match expr {
971            LogicalExpression::Variable(name) => columns
972                .iter()
973                .position(|c| c == name)
974                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
975            _ => Err(Error::Internal(
976                "Only variables supported in join conditions".to_string(),
977            )),
978        }
979    }
980
981    /// Plans a UNION operator.
982    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
983        if union.inputs.is_empty() {
984            return Err(Error::Internal(
985                "Union requires at least one input".to_string(),
986            ));
987        }
988
989        let mut inputs = Vec::with_capacity(union.inputs.len());
990        let mut columns = Vec::new();
991
992        for (i, input) in union.inputs.iter().enumerate() {
993            let (op, cols) = self.plan_operator(input)?;
994            if i == 0 {
995                columns = cols;
996            }
997            inputs.push(op);
998        }
999
1000        let output_schema = self.derive_schema_from_columns(&columns);
1001        let operator = Box::new(UnionOperator::new(inputs, output_schema));
1002
1003        Ok((operator, columns))
1004    }
1005
1006    /// Plans a DISTINCT operator.
1007    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1008        let (input_op, columns) = self.plan_operator(&distinct.input)?;
1009        let output_schema = self.derive_schema_from_columns(&columns);
1010        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
1011        Ok((operator, columns))
1012    }
1013
1014    /// Plans a CREATE NODE operator.
1015    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1016        // Plan input if present
1017        let (input_op, mut columns) = if let Some(ref input) = create.input {
1018            let (op, cols) = self.plan_operator(input)?;
1019            (Some(op), cols)
1020        } else {
1021            (None, vec![])
1022        };
1023
1024        // Output column for the created node
1025        let output_column = columns.len();
1026        columns.push(create.variable.clone());
1027
1028        // Convert properties
1029        let properties: Vec<(String, PropertySource)> = create
1030            .properties
1031            .iter()
1032            .map(|(name, expr)| {
1033                let source = match expr {
1034                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1035                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1036                };
1037                (name.clone(), source)
1038            })
1039            .collect();
1040
1041        let output_schema = self.derive_schema_from_columns(&columns);
1042
1043        let operator = Box::new(
1044            CreateNodeOperator::new(
1045                Arc::clone(&self.store),
1046                input_op,
1047                create.labels.clone(),
1048                properties,
1049                output_schema,
1050                output_column,
1051            )
1052            .with_tx_context(self.viewing_epoch, self.tx_id),
1053        );
1054
1055        Ok((operator, columns))
1056    }
1057
1058    /// Plans a CREATE EDGE operator.
1059    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1060        let (input_op, mut columns) = self.plan_operator(&create.input)?;
1061
1062        // Find source and target columns
1063        let from_column = columns
1064            .iter()
1065            .position(|c| c == &create.from_variable)
1066            .ok_or_else(|| {
1067                Error::Internal(format!(
1068                    "Source variable '{}' not found",
1069                    create.from_variable
1070                ))
1071            })?;
1072
1073        let to_column = columns
1074            .iter()
1075            .position(|c| c == &create.to_variable)
1076            .ok_or_else(|| {
1077                Error::Internal(format!(
1078                    "Target variable '{}' not found",
1079                    create.to_variable
1080                ))
1081            })?;
1082
1083        // Output column for the created edge (if named)
1084        let output_column = create.variable.as_ref().map(|v| {
1085            let idx = columns.len();
1086            columns.push(v.clone());
1087            idx
1088        });
1089
1090        // Convert properties
1091        let properties: Vec<(String, PropertySource)> = create
1092            .properties
1093            .iter()
1094            .map(|(name, expr)| {
1095                let source = match expr {
1096                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1097                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1098                };
1099                (name.clone(), source)
1100            })
1101            .collect();
1102
1103        let output_schema = self.derive_schema_from_columns(&columns);
1104
1105        let operator = Box::new(
1106            CreateEdgeOperator::new(
1107                Arc::clone(&self.store),
1108                input_op,
1109                from_column,
1110                to_column,
1111                create.edge_type.clone(),
1112                properties,
1113                output_schema,
1114                output_column,
1115            )
1116            .with_tx_context(self.viewing_epoch, self.tx_id),
1117        );
1118
1119        Ok((operator, columns))
1120    }
1121
1122    /// Plans a DELETE NODE operator.
1123    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1124        let (input_op, columns) = self.plan_operator(&delete.input)?;
1125
1126        let node_column = columns
1127            .iter()
1128            .position(|c| c == &delete.variable)
1129            .ok_or_else(|| {
1130                Error::Internal(format!(
1131                    "Variable '{}' not found for delete",
1132                    delete.variable
1133                ))
1134            })?;
1135
1136        // Output schema for delete count
1137        let output_schema = vec![LogicalType::Int64];
1138        let output_columns = vec!["deleted_count".to_string()];
1139
1140        let operator = Box::new(
1141            DeleteNodeOperator::new(
1142                Arc::clone(&self.store),
1143                input_op,
1144                node_column,
1145                output_schema,
1146                true, // detach = true to delete connected edges
1147            )
1148            .with_tx_context(self.viewing_epoch, self.tx_id),
1149        );
1150
1151        Ok((operator, output_columns))
1152    }
1153
1154    /// Plans a DELETE EDGE operator.
1155    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1156        let (input_op, columns) = self.plan_operator(&delete.input)?;
1157
1158        let edge_column = columns
1159            .iter()
1160            .position(|c| c == &delete.variable)
1161            .ok_or_else(|| {
1162                Error::Internal(format!(
1163                    "Variable '{}' not found for delete",
1164                    delete.variable
1165                ))
1166            })?;
1167
1168        // Output schema for delete count
1169        let output_schema = vec![LogicalType::Int64];
1170        let output_columns = vec!["deleted_count".to_string()];
1171
1172        let operator = Box::new(
1173            DeleteEdgeOperator::new(
1174                Arc::clone(&self.store),
1175                input_op,
1176                edge_column,
1177                output_schema,
1178            )
1179            .with_tx_context(self.viewing_epoch, self.tx_id),
1180        );
1181
1182        Ok((operator, output_columns))
1183    }
1184
1185    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
1186    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1187        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
1188        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
1189
1190        // Build combined output columns (left + right)
1191        let mut columns = left_columns.clone();
1192        columns.extend(right_columns.clone());
1193
1194        // Find common variables between left and right for join keys
1195        let mut probe_keys = Vec::new();
1196        let mut build_keys = Vec::new();
1197
1198        for (right_idx, right_col) in right_columns.iter().enumerate() {
1199            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1200                probe_keys.push(left_idx);
1201                build_keys.push(right_idx);
1202            }
1203        }
1204
1205        let output_schema = self.derive_schema_from_columns(&columns);
1206
1207        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1208            left_op,
1209            right_op,
1210            probe_keys,
1211            build_keys,
1212            PhysicalJoinType::Left,
1213            output_schema,
1214        ));
1215
1216        Ok((operator, columns))
1217    }
1218
1219    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
1220    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1221        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
1222        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
1223
1224        // Anti-join only keeps left columns (filters out matching rows)
1225        let columns = left_columns.clone();
1226
1227        // Find common variables between left and right for join keys
1228        let mut probe_keys = Vec::new();
1229        let mut build_keys = Vec::new();
1230
1231        for (right_idx, right_col) in right_columns.iter().enumerate() {
1232            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1233                probe_keys.push(left_idx);
1234                build_keys.push(right_idx);
1235            }
1236        }
1237
1238        let output_schema = self.derive_schema_from_columns(&columns);
1239
1240        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1241            left_op,
1242            right_op,
1243            probe_keys,
1244            build_keys,
1245            PhysicalJoinType::Anti,
1246            output_schema,
1247        ));
1248
1249        Ok((operator, columns))
1250    }
1251
1252    /// Plans an unwind operator.
1253    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1254        // Plan the input operator first
1255        let (input_op, input_columns) = self.plan_operator(&unwind.input)?;
1256
1257        // The UNWIND expression should be a list - we need to find/evaluate it
1258        // For now, we handle the case where the expression references an existing column
1259        // or is a literal list
1260
1261        // Find if the expression references an existing column (like a list property)
1262        let list_col_idx = match &unwind.expression {
1263            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
1264            LogicalExpression::Property { variable, .. } => {
1265                // Property access needs to be evaluated - for now we'll need the filter predicate
1266                // to evaluate this. For simple cases, we treat it as a list column.
1267                input_columns.iter().position(|c| c == variable)
1268            }
1269            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
1270                // Literal list expression - we'll add it as a virtual column
1271                None
1272            }
1273            _ => None,
1274        };
1275
1276        // Build output columns: all input columns plus the new variable
1277        let mut columns = input_columns.clone();
1278        columns.push(unwind.variable.clone());
1279
1280        // Build output schema
1281        let mut output_schema = self.derive_schema_from_columns(&input_columns);
1282        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
1283
1284        // Use the list column index if found, otherwise default to 0
1285        // (in which case the first column should contain the list)
1286        let col_idx = list_col_idx.unwrap_or(0);
1287
1288        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
1289            input_op,
1290            col_idx,
1291            unwind.variable.clone(),
1292            output_schema,
1293        ));
1294
1295        Ok((operator, columns))
1296    }
1297
1298    /// Plans a MERGE operator.
1299    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1300        // Plan the input operator first (if any)
1301        let (_input_op, mut columns) = self.plan_operator(&merge.input)?;
1302
1303        // Convert match properties from LogicalExpression to Value
1304        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
1305            .match_properties
1306            .iter()
1307            .filter_map(|(name, expr)| {
1308                if let LogicalExpression::Literal(v) = expr {
1309                    Some((name.clone(), v.clone()))
1310                } else {
1311                    None // Skip non-literal expressions for now
1312                }
1313            })
1314            .collect();
1315
1316        // Convert ON CREATE properties
1317        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
1318            .on_create
1319            .iter()
1320            .filter_map(|(name, expr)| {
1321                if let LogicalExpression::Literal(v) = expr {
1322                    Some((name.clone(), v.clone()))
1323                } else {
1324                    None
1325                }
1326            })
1327            .collect();
1328
1329        // Convert ON MATCH properties
1330        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
1331            .on_match
1332            .iter()
1333            .filter_map(|(name, expr)| {
1334                if let LogicalExpression::Literal(v) = expr {
1335                    Some((name.clone(), v.clone()))
1336                } else {
1337                    None
1338                }
1339            })
1340            .collect();
1341
1342        // Add the merged node variable to output columns
1343        columns.push(merge.variable.clone());
1344
1345        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
1346            Arc::clone(&self.store),
1347            merge.variable.clone(),
1348            merge.labels.clone(),
1349            match_properties,
1350            on_create_properties,
1351            on_match_properties,
1352        ));
1353
1354        Ok((operator, columns))
1355    }
1356
1357    /// Plans an ADD LABEL operator.
1358    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1359        let (input_op, columns) = self.plan_operator(&add_label.input)?;
1360
1361        // Find the node column
1362        let node_column = columns
1363            .iter()
1364            .position(|c| c == &add_label.variable)
1365            .ok_or_else(|| {
1366                Error::Internal(format!(
1367                    "Variable '{}' not found for ADD LABEL",
1368                    add_label.variable
1369                ))
1370            })?;
1371
1372        // Output schema for update count
1373        let output_schema = vec![LogicalType::Int64];
1374        let output_columns = vec!["labels_added".to_string()];
1375
1376        let operator = Box::new(AddLabelOperator::new(
1377            Arc::clone(&self.store),
1378            input_op,
1379            node_column,
1380            add_label.labels.clone(),
1381            output_schema,
1382        ));
1383
1384        Ok((operator, output_columns))
1385    }
1386
1387    /// Plans a REMOVE LABEL operator.
1388    fn plan_remove_label(
1389        &self,
1390        remove_label: &RemoveLabelOp,
1391    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1392        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
1393
1394        // Find the node column
1395        let node_column = columns
1396            .iter()
1397            .position(|c| c == &remove_label.variable)
1398            .ok_or_else(|| {
1399                Error::Internal(format!(
1400                    "Variable '{}' not found for REMOVE LABEL",
1401                    remove_label.variable
1402                ))
1403            })?;
1404
1405        // Output schema for update count
1406        let output_schema = vec![LogicalType::Int64];
1407        let output_columns = vec!["labels_removed".to_string()];
1408
1409        let operator = Box::new(RemoveLabelOperator::new(
1410            Arc::clone(&self.store),
1411            input_op,
1412            node_column,
1413            remove_label.labels.clone(),
1414            output_schema,
1415        ));
1416
1417        Ok((operator, output_columns))
1418    }
1419
1420    /// Plans a SET PROPERTY operator.
1421    fn plan_set_property(
1422        &self,
1423        set_prop: &SetPropertyOp,
1424    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1425        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
1426
1427        // Find the entity column (node or edge variable)
1428        let entity_column = columns
1429            .iter()
1430            .position(|c| c == &set_prop.variable)
1431            .ok_or_else(|| {
1432                Error::Internal(format!(
1433                    "Variable '{}' not found for SET",
1434                    set_prop.variable
1435                ))
1436            })?;
1437
1438        // Convert properties to PropertySource
1439        let properties: Vec<(String, PropertySource)> = set_prop
1440            .properties
1441            .iter()
1442            .map(|(name, expr)| {
1443                let source = self.expression_to_property_source(expr, &columns)?;
1444                Ok((name.clone(), source))
1445            })
1446            .collect::<Result<Vec<_>>>()?;
1447
1448        // Output schema preserves input schema (passes through)
1449        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
1450        let output_columns = columns.clone();
1451
1452        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
1453        let operator = Box::new(SetPropertyOperator::new_for_node(
1454            Arc::clone(&self.store),
1455            input_op,
1456            entity_column,
1457            properties,
1458            output_schema,
1459        ));
1460
1461        Ok((operator, output_columns))
1462    }
1463
1464    /// Converts a logical expression to a PropertySource.
1465    fn expression_to_property_source(
1466        &self,
1467        expr: &LogicalExpression,
1468        columns: &[String],
1469    ) -> Result<PropertySource> {
1470        match expr {
1471            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
1472            LogicalExpression::Variable(name) => {
1473                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
1474                    Error::Internal(format!("Variable '{}' not found for property source", name))
1475                })?;
1476                Ok(PropertySource::Column(col_idx))
1477            }
1478            LogicalExpression::Parameter(name) => {
1479                // Parameters should be resolved before planning
1480                // For now, treat as a placeholder
1481                Ok(PropertySource::Constant(
1482                    grafeo_common::types::Value::String(format!("${}", name).into()),
1483                ))
1484            }
1485            _ => Err(Error::Internal(format!(
1486                "Unsupported expression type for property source: {:?}",
1487                expr
1488            ))),
1489        }
1490    }
1491}
1492
1493/// Converts a logical binary operator to a filter binary operator.
1494pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
1495    match op {
1496        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
1497        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
1498        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
1499        BinaryOp::Le => Ok(BinaryFilterOp::Le),
1500        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
1501        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
1502        BinaryOp::And => Ok(BinaryFilterOp::And),
1503        BinaryOp::Or => Ok(BinaryFilterOp::Or),
1504        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
1505        BinaryOp::Add => Ok(BinaryFilterOp::Add),
1506        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
1507        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
1508        BinaryOp::Div => Ok(BinaryFilterOp::Div),
1509        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
1510        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
1511        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
1512        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
1513        BinaryOp::In => Ok(BinaryFilterOp::In),
1514        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
1515        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
1516        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
1517            "Binary operator {:?} not yet supported in filters",
1518            op
1519        ))),
1520    }
1521}
1522
1523/// Converts a logical unary operator to a filter unary operator.
1524pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
1525    match op {
1526        UnaryOp::Not => Ok(UnaryFilterOp::Not),
1527        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
1528        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
1529        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
1530    }
1531}
1532
1533/// Converts a logical aggregate function to a physical aggregate function.
1534pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
1535    match func {
1536        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
1537        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
1538        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
1539        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
1540        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
1541        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
1542    }
1543}
1544
1545/// Converts a logical expression to a filter expression.
1546///
1547/// This is a standalone function that can be used by both LPG and RDF planners.
1548pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
1549    match expr {
1550        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1551        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1552        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1553            variable: variable.clone(),
1554            property: property.clone(),
1555        }),
1556        LogicalExpression::Binary { left, op, right } => {
1557            let left_expr = convert_filter_expression(left)?;
1558            let right_expr = convert_filter_expression(right)?;
1559            let filter_op = convert_binary_op(*op)?;
1560            Ok(FilterExpression::Binary {
1561                left: Box::new(left_expr),
1562                op: filter_op,
1563                right: Box::new(right_expr),
1564            })
1565        }
1566        LogicalExpression::Unary { op, operand } => {
1567            let operand_expr = convert_filter_expression(operand)?;
1568            let filter_op = convert_unary_op(*op)?;
1569            Ok(FilterExpression::Unary {
1570                op: filter_op,
1571                operand: Box::new(operand_expr),
1572            })
1573        }
1574        LogicalExpression::FunctionCall { name, args } => {
1575            let filter_args: Vec<FilterExpression> = args
1576                .iter()
1577                .map(|a| convert_filter_expression(a))
1578                .collect::<Result<Vec<_>>>()?;
1579            Ok(FilterExpression::FunctionCall {
1580                name: name.clone(),
1581                args: filter_args,
1582            })
1583        }
1584        LogicalExpression::Case {
1585            operand,
1586            when_clauses,
1587            else_clause,
1588        } => {
1589            let filter_operand = operand
1590                .as_ref()
1591                .map(|e| convert_filter_expression(e))
1592                .transpose()?
1593                .map(Box::new);
1594            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1595                .iter()
1596                .map(|(cond, result)| {
1597                    Ok((
1598                        convert_filter_expression(cond)?,
1599                        convert_filter_expression(result)?,
1600                    ))
1601                })
1602                .collect::<Result<Vec<_>>>()?;
1603            let filter_else = else_clause
1604                .as_ref()
1605                .map(|e| convert_filter_expression(e))
1606                .transpose()?
1607                .map(Box::new);
1608            Ok(FilterExpression::Case {
1609                operand: filter_operand,
1610                when_clauses: filter_when_clauses,
1611                else_clause: filter_else,
1612            })
1613        }
1614        LogicalExpression::List(items) => {
1615            let filter_items: Vec<FilterExpression> = items
1616                .iter()
1617                .map(|item| convert_filter_expression(item))
1618                .collect::<Result<Vec<_>>>()?;
1619            Ok(FilterExpression::List(filter_items))
1620        }
1621        LogicalExpression::Map(pairs) => {
1622            let filter_pairs: Vec<(String, FilterExpression)> = pairs
1623                .iter()
1624                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
1625                .collect::<Result<Vec<_>>>()?;
1626            Ok(FilterExpression::Map(filter_pairs))
1627        }
1628        LogicalExpression::IndexAccess { base, index } => {
1629            let base_expr = convert_filter_expression(base)?;
1630            let index_expr = convert_filter_expression(index)?;
1631            Ok(FilterExpression::IndexAccess {
1632                base: Box::new(base_expr),
1633                index: Box::new(index_expr),
1634            })
1635        }
1636        LogicalExpression::SliceAccess { base, start, end } => {
1637            let base_expr = convert_filter_expression(base)?;
1638            let start_expr = start
1639                .as_ref()
1640                .map(|s| convert_filter_expression(s))
1641                .transpose()?
1642                .map(Box::new);
1643            let end_expr = end
1644                .as_ref()
1645                .map(|e| convert_filter_expression(e))
1646                .transpose()?
1647                .map(Box::new);
1648            Ok(FilterExpression::SliceAccess {
1649                base: Box::new(base_expr),
1650                start: start_expr,
1651                end: end_expr,
1652            })
1653        }
1654        LogicalExpression::Parameter(_) => Err(Error::Internal(
1655            "Parameters not yet supported in filters".to_string(),
1656        )),
1657        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1658        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1659        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1660        LogicalExpression::ListComprehension {
1661            variable,
1662            list_expr,
1663            filter_expr,
1664            map_expr,
1665        } => {
1666            let list = convert_filter_expression(list_expr)?;
1667            let filter = filter_expr
1668                .as_ref()
1669                .map(|f| convert_filter_expression(f))
1670                .transpose()?
1671                .map(Box::new);
1672            let map = convert_filter_expression(map_expr)?;
1673            Ok(FilterExpression::ListComprehension {
1674                variable: variable.clone(),
1675                list_expr: Box::new(list),
1676                filter_expr: filter,
1677                map_expr: Box::new(map),
1678            })
1679        }
1680        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
1681            Error::Internal("Subqueries not yet supported in filters".to_string()),
1682        ),
1683    }
1684}
1685
1686/// Infers the logical type from a value.
1687fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
1688    use grafeo_common::types::Value;
1689    match value {
1690        Value::Null => LogicalType::String, // Default type for null
1691        Value::Bool(_) => LogicalType::Bool,
1692        Value::Int64(_) => LogicalType::Int64,
1693        Value::Float64(_) => LogicalType::Float64,
1694        Value::String(_) => LogicalType::String,
1695        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
1696        Value::Timestamp(_) => LogicalType::Timestamp,
1697        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
1698        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
1699    }
1700}
1701
1702/// Converts an expression to a string for column naming.
1703fn expression_to_string(expr: &LogicalExpression) -> String {
1704    match expr {
1705        LogicalExpression::Variable(name) => name.clone(),
1706        LogicalExpression::Property { variable, property } => {
1707            format!("{variable}.{property}")
1708        }
1709        LogicalExpression::Literal(value) => format!("{value:?}"),
1710        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
1711        _ => "expr".to_string(),
1712    }
1713}
1714
1715/// A physical plan ready for execution.
1716pub struct PhysicalPlan {
1717    /// The root physical operator.
1718    pub operator: Box<dyn Operator>,
1719    /// Column names for the result.
1720    pub columns: Vec<String>,
1721}
1722
1723impl PhysicalPlan {
1724    /// Returns the column names.
1725    #[must_use]
1726    pub fn columns(&self) -> &[String] {
1727        &self.columns
1728    }
1729
1730    /// Consumes the plan and returns the operator.
1731    pub fn into_operator(self) -> Box<dyn Operator> {
1732        self.operator
1733    }
1734}
1735
1736#[cfg(test)]
1737mod tests {
1738    use super::*;
1739    use crate::query::plan::{
1740        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
1741        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
1742        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
1743        SortKey, SortOp,
1744    };
1745    use grafeo_common::types::Value;
1746
1747    fn create_test_store() -> Arc<LpgStore> {
1748        let store = Arc::new(LpgStore::new());
1749        store.create_node(&["Person"]);
1750        store.create_node(&["Person"]);
1751        store.create_node(&["Company"]);
1752        store
1753    }
1754
1755    // ==================== Simple Scan Tests ====================
1756
1757    #[test]
1758    fn test_plan_simple_scan() {
1759        let store = create_test_store();
1760        let planner = Planner::new(store);
1761
1762        // MATCH (n:Person) RETURN n
1763        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1764            items: vec![ReturnItem {
1765                expression: LogicalExpression::Variable("n".to_string()),
1766                alias: None,
1767            }],
1768            distinct: false,
1769            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1770                variable: "n".to_string(),
1771                label: Some("Person".to_string()),
1772                input: None,
1773            })),
1774        }));
1775
1776        let physical = planner.plan(&logical).unwrap();
1777        assert_eq!(physical.columns(), &["n"]);
1778    }
1779
1780    #[test]
1781    fn test_plan_scan_without_label() {
1782        let store = create_test_store();
1783        let planner = Planner::new(store);
1784
1785        // MATCH (n) RETURN n
1786        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1787            items: vec![ReturnItem {
1788                expression: LogicalExpression::Variable("n".to_string()),
1789                alias: None,
1790            }],
1791            distinct: false,
1792            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1793                variable: "n".to_string(),
1794                label: None,
1795                input: None,
1796            })),
1797        }));
1798
1799        let physical = planner.plan(&logical).unwrap();
1800        assert_eq!(physical.columns(), &["n"]);
1801    }
1802
1803    #[test]
1804    fn test_plan_return_with_alias() {
1805        let store = create_test_store();
1806        let planner = Planner::new(store);
1807
1808        // MATCH (n:Person) RETURN n AS person
1809        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1810            items: vec![ReturnItem {
1811                expression: LogicalExpression::Variable("n".to_string()),
1812                alias: Some("person".to_string()),
1813            }],
1814            distinct: false,
1815            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816                variable: "n".to_string(),
1817                label: Some("Person".to_string()),
1818                input: None,
1819            })),
1820        }));
1821
1822        let physical = planner.plan(&logical).unwrap();
1823        assert_eq!(physical.columns(), &["person"]);
1824    }
1825
1826    #[test]
1827    fn test_plan_return_property() {
1828        let store = create_test_store();
1829        let planner = Planner::new(store);
1830
1831        // MATCH (n:Person) RETURN n.name
1832        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1833            items: vec![ReturnItem {
1834                expression: LogicalExpression::Property {
1835                    variable: "n".to_string(),
1836                    property: "name".to_string(),
1837                },
1838                alias: None,
1839            }],
1840            distinct: false,
1841            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1842                variable: "n".to_string(),
1843                label: Some("Person".to_string()),
1844                input: None,
1845            })),
1846        }));
1847
1848        let physical = planner.plan(&logical).unwrap();
1849        assert_eq!(physical.columns(), &["n.name"]);
1850    }
1851
1852    #[test]
1853    fn test_plan_return_literal() {
1854        let store = create_test_store();
1855        let planner = Planner::new(store);
1856
1857        // MATCH (n) RETURN 42 AS answer
1858        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1859            items: vec![ReturnItem {
1860                expression: LogicalExpression::Literal(Value::Int64(42)),
1861                alias: Some("answer".to_string()),
1862            }],
1863            distinct: false,
1864            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1865                variable: "n".to_string(),
1866                label: None,
1867                input: None,
1868            })),
1869        }));
1870
1871        let physical = planner.plan(&logical).unwrap();
1872        assert_eq!(physical.columns(), &["answer"]);
1873    }
1874
1875    // ==================== Filter Tests ====================
1876
1877    #[test]
1878    fn test_plan_filter_equality() {
1879        let store = create_test_store();
1880        let planner = Planner::new(store);
1881
1882        // MATCH (n:Person) WHERE n.age = 30 RETURN n
1883        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1884            items: vec![ReturnItem {
1885                expression: LogicalExpression::Variable("n".to_string()),
1886                alias: None,
1887            }],
1888            distinct: false,
1889            input: Box::new(LogicalOperator::Filter(FilterOp {
1890                predicate: LogicalExpression::Binary {
1891                    left: Box::new(LogicalExpression::Property {
1892                        variable: "n".to_string(),
1893                        property: "age".to_string(),
1894                    }),
1895                    op: BinaryOp::Eq,
1896                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1897                },
1898                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1899                    variable: "n".to_string(),
1900                    label: Some("Person".to_string()),
1901                    input: None,
1902                })),
1903            })),
1904        }));
1905
1906        let physical = planner.plan(&logical).unwrap();
1907        assert_eq!(physical.columns(), &["n"]);
1908    }
1909
1910    #[test]
1911    fn test_plan_filter_compound_and() {
1912        let store = create_test_store();
1913        let planner = Planner::new(store);
1914
1915        // WHERE n.age > 20 AND n.age < 40
1916        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1917            items: vec![ReturnItem {
1918                expression: LogicalExpression::Variable("n".to_string()),
1919                alias: None,
1920            }],
1921            distinct: false,
1922            input: Box::new(LogicalOperator::Filter(FilterOp {
1923                predicate: LogicalExpression::Binary {
1924                    left: Box::new(LogicalExpression::Binary {
1925                        left: Box::new(LogicalExpression::Property {
1926                            variable: "n".to_string(),
1927                            property: "age".to_string(),
1928                        }),
1929                        op: BinaryOp::Gt,
1930                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1931                    }),
1932                    op: BinaryOp::And,
1933                    right: Box::new(LogicalExpression::Binary {
1934                        left: Box::new(LogicalExpression::Property {
1935                            variable: "n".to_string(),
1936                            property: "age".to_string(),
1937                        }),
1938                        op: BinaryOp::Lt,
1939                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1940                    }),
1941                },
1942                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1943                    variable: "n".to_string(),
1944                    label: None,
1945                    input: None,
1946                })),
1947            })),
1948        }));
1949
1950        let physical = planner.plan(&logical).unwrap();
1951        assert_eq!(physical.columns(), &["n"]);
1952    }
1953
1954    #[test]
1955    fn test_plan_filter_unary_not() {
1956        let store = create_test_store();
1957        let planner = Planner::new(store);
1958
1959        // WHERE NOT n.active
1960        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1961            items: vec![ReturnItem {
1962                expression: LogicalExpression::Variable("n".to_string()),
1963                alias: None,
1964            }],
1965            distinct: false,
1966            input: Box::new(LogicalOperator::Filter(FilterOp {
1967                predicate: LogicalExpression::Unary {
1968                    op: UnaryOp::Not,
1969                    operand: Box::new(LogicalExpression::Property {
1970                        variable: "n".to_string(),
1971                        property: "active".to_string(),
1972                    }),
1973                },
1974                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1975                    variable: "n".to_string(),
1976                    label: None,
1977                    input: None,
1978                })),
1979            })),
1980        }));
1981
1982        let physical = planner.plan(&logical).unwrap();
1983        assert_eq!(physical.columns(), &["n"]);
1984    }
1985
1986    #[test]
1987    fn test_plan_filter_is_null() {
1988        let store = create_test_store();
1989        let planner = Planner::new(store);
1990
1991        // WHERE n.email IS NULL
1992        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1993            items: vec![ReturnItem {
1994                expression: LogicalExpression::Variable("n".to_string()),
1995                alias: None,
1996            }],
1997            distinct: false,
1998            input: Box::new(LogicalOperator::Filter(FilterOp {
1999                predicate: LogicalExpression::Unary {
2000                    op: UnaryOp::IsNull,
2001                    operand: Box::new(LogicalExpression::Property {
2002                        variable: "n".to_string(),
2003                        property: "email".to_string(),
2004                    }),
2005                },
2006                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2007                    variable: "n".to_string(),
2008                    label: None,
2009                    input: None,
2010                })),
2011            })),
2012        }));
2013
2014        let physical = planner.plan(&logical).unwrap();
2015        assert_eq!(physical.columns(), &["n"]);
2016    }
2017
2018    #[test]
2019    fn test_plan_filter_function_call() {
2020        let store = create_test_store();
2021        let planner = Planner::new(store);
2022
2023        // WHERE size(n.friends) > 0
2024        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2025            items: vec![ReturnItem {
2026                expression: LogicalExpression::Variable("n".to_string()),
2027                alias: None,
2028            }],
2029            distinct: false,
2030            input: Box::new(LogicalOperator::Filter(FilterOp {
2031                predicate: LogicalExpression::Binary {
2032                    left: Box::new(LogicalExpression::FunctionCall {
2033                        name: "size".to_string(),
2034                        args: vec![LogicalExpression::Property {
2035                            variable: "n".to_string(),
2036                            property: "friends".to_string(),
2037                        }],
2038                    }),
2039                    op: BinaryOp::Gt,
2040                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
2041                },
2042                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2043                    variable: "n".to_string(),
2044                    label: None,
2045                    input: None,
2046                })),
2047            })),
2048        }));
2049
2050        let physical = planner.plan(&logical).unwrap();
2051        assert_eq!(physical.columns(), &["n"]);
2052    }
2053
2054    // ==================== Expand Tests ====================
2055
2056    #[test]
2057    fn test_plan_expand_outgoing() {
2058        let store = create_test_store();
2059        let planner = Planner::new(store);
2060
2061        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
2062        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2063            items: vec![
2064                ReturnItem {
2065                    expression: LogicalExpression::Variable("a".to_string()),
2066                    alias: None,
2067                },
2068                ReturnItem {
2069                    expression: LogicalExpression::Variable("b".to_string()),
2070                    alias: None,
2071                },
2072            ],
2073            distinct: false,
2074            input: Box::new(LogicalOperator::Expand(ExpandOp {
2075                from_variable: "a".to_string(),
2076                to_variable: "b".to_string(),
2077                edge_variable: None,
2078                direction: ExpandDirection::Outgoing,
2079                edge_type: Some("KNOWS".to_string()),
2080                min_hops: 1,
2081                max_hops: Some(1),
2082                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2083                    variable: "a".to_string(),
2084                    label: Some("Person".to_string()),
2085                    input: None,
2086                })),
2087            })),
2088        }));
2089
2090        let physical = planner.plan(&logical).unwrap();
2091        // The return should have columns [a, b]
2092        assert!(physical.columns().contains(&"a".to_string()));
2093        assert!(physical.columns().contains(&"b".to_string()));
2094    }
2095
2096    #[test]
2097    fn test_plan_expand_with_edge_variable() {
2098        let store = create_test_store();
2099        let planner = Planner::new(store);
2100
2101        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
2102        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2103            items: vec![
2104                ReturnItem {
2105                    expression: LogicalExpression::Variable("a".to_string()),
2106                    alias: None,
2107                },
2108                ReturnItem {
2109                    expression: LogicalExpression::Variable("r".to_string()),
2110                    alias: None,
2111                },
2112                ReturnItem {
2113                    expression: LogicalExpression::Variable("b".to_string()),
2114                    alias: None,
2115                },
2116            ],
2117            distinct: false,
2118            input: Box::new(LogicalOperator::Expand(ExpandOp {
2119                from_variable: "a".to_string(),
2120                to_variable: "b".to_string(),
2121                edge_variable: Some("r".to_string()),
2122                direction: ExpandDirection::Outgoing,
2123                edge_type: Some("KNOWS".to_string()),
2124                min_hops: 1,
2125                max_hops: Some(1),
2126                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2127                    variable: "a".to_string(),
2128                    label: None,
2129                    input: None,
2130                })),
2131            })),
2132        }));
2133
2134        let physical = planner.plan(&logical).unwrap();
2135        assert!(physical.columns().contains(&"a".to_string()));
2136        assert!(physical.columns().contains(&"r".to_string()));
2137        assert!(physical.columns().contains(&"b".to_string()));
2138    }
2139
2140    // ==================== Limit/Skip/Sort Tests ====================
2141
2142    #[test]
2143    fn test_plan_limit() {
2144        let store = create_test_store();
2145        let planner = Planner::new(store);
2146
2147        // MATCH (n) RETURN n LIMIT 10
2148        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2149            items: vec![ReturnItem {
2150                expression: LogicalExpression::Variable("n".to_string()),
2151                alias: None,
2152            }],
2153            distinct: false,
2154            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2155                count: 10,
2156                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2157                    variable: "n".to_string(),
2158                    label: None,
2159                    input: None,
2160                })),
2161            })),
2162        }));
2163
2164        let physical = planner.plan(&logical).unwrap();
2165        assert_eq!(physical.columns(), &["n"]);
2166    }
2167
2168    #[test]
2169    fn test_plan_skip() {
2170        let store = create_test_store();
2171        let planner = Planner::new(store);
2172
2173        // MATCH (n) RETURN n SKIP 5
2174        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2175            items: vec![ReturnItem {
2176                expression: LogicalExpression::Variable("n".to_string()),
2177                alias: None,
2178            }],
2179            distinct: false,
2180            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2181                count: 5,
2182                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2183                    variable: "n".to_string(),
2184                    label: None,
2185                    input: None,
2186                })),
2187            })),
2188        }));
2189
2190        let physical = planner.plan(&logical).unwrap();
2191        assert_eq!(physical.columns(), &["n"]);
2192    }
2193
2194    #[test]
2195    fn test_plan_sort() {
2196        let store = create_test_store();
2197        let planner = Planner::new(store);
2198
2199        // MATCH (n) RETURN n ORDER BY n.name ASC
2200        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2201            items: vec![ReturnItem {
2202                expression: LogicalExpression::Variable("n".to_string()),
2203                alias: None,
2204            }],
2205            distinct: false,
2206            input: Box::new(LogicalOperator::Sort(SortOp {
2207                keys: vec![SortKey {
2208                    expression: LogicalExpression::Variable("n".to_string()),
2209                    order: SortOrder::Ascending,
2210                }],
2211                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2212                    variable: "n".to_string(),
2213                    label: None,
2214                    input: None,
2215                })),
2216            })),
2217        }));
2218
2219        let physical = planner.plan(&logical).unwrap();
2220        assert_eq!(physical.columns(), &["n"]);
2221    }
2222
2223    #[test]
2224    fn test_plan_sort_descending() {
2225        let store = create_test_store();
2226        let planner = Planner::new(store);
2227
2228        // ORDER BY n DESC
2229        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2230            items: vec![ReturnItem {
2231                expression: LogicalExpression::Variable("n".to_string()),
2232                alias: None,
2233            }],
2234            distinct: false,
2235            input: Box::new(LogicalOperator::Sort(SortOp {
2236                keys: vec![SortKey {
2237                    expression: LogicalExpression::Variable("n".to_string()),
2238                    order: SortOrder::Descending,
2239                }],
2240                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2241                    variable: "n".to_string(),
2242                    label: None,
2243                    input: None,
2244                })),
2245            })),
2246        }));
2247
2248        let physical = planner.plan(&logical).unwrap();
2249        assert_eq!(physical.columns(), &["n"]);
2250    }
2251
2252    #[test]
2253    fn test_plan_distinct() {
2254        let store = create_test_store();
2255        let planner = Planner::new(store);
2256
2257        // MATCH (n) RETURN DISTINCT n
2258        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2259            items: vec![ReturnItem {
2260                expression: LogicalExpression::Variable("n".to_string()),
2261                alias: None,
2262            }],
2263            distinct: false,
2264            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2265                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2266                    variable: "n".to_string(),
2267                    label: None,
2268                    input: None,
2269                })),
2270            })),
2271        }));
2272
2273        let physical = planner.plan(&logical).unwrap();
2274        assert_eq!(physical.columns(), &["n"]);
2275    }
2276
2277    // ==================== Aggregate Tests ====================
2278
2279    #[test]
2280    fn test_plan_aggregate_count() {
2281        let store = create_test_store();
2282        let planner = Planner::new(store);
2283
2284        // MATCH (n) RETURN count(n)
2285        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2286            items: vec![ReturnItem {
2287                expression: LogicalExpression::Variable("cnt".to_string()),
2288                alias: None,
2289            }],
2290            distinct: false,
2291            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
2292                group_by: vec![],
2293                aggregates: vec![LogicalAggregateExpr {
2294                    function: LogicalAggregateFunction::Count,
2295                    expression: Some(LogicalExpression::Variable("n".to_string())),
2296                    distinct: false,
2297                    alias: Some("cnt".to_string()),
2298                }],
2299                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2300                    variable: "n".to_string(),
2301                    label: None,
2302                    input: None,
2303                })),
2304            })),
2305        }));
2306
2307        let physical = planner.plan(&logical).unwrap();
2308        assert!(physical.columns().contains(&"cnt".to_string()));
2309    }
2310
2311    #[test]
2312    fn test_plan_aggregate_with_group_by() {
2313        let store = create_test_store();
2314        let planner = Planner::new(store);
2315
2316        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
2317        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2318            group_by: vec![LogicalExpression::Property {
2319                variable: "n".to_string(),
2320                property: "city".to_string(),
2321            }],
2322            aggregates: vec![LogicalAggregateExpr {
2323                function: LogicalAggregateFunction::Count,
2324                expression: Some(LogicalExpression::Variable("n".to_string())),
2325                distinct: false,
2326                alias: Some("cnt".to_string()),
2327            }],
2328            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2329                variable: "n".to_string(),
2330                label: Some("Person".to_string()),
2331                input: None,
2332            })),
2333        }));
2334
2335        let physical = planner.plan(&logical).unwrap();
2336        assert_eq!(physical.columns().len(), 2);
2337    }
2338
2339    #[test]
2340    fn test_plan_aggregate_sum() {
2341        let store = create_test_store();
2342        let planner = Planner::new(store);
2343
2344        // SUM(n.value)
2345        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2346            group_by: vec![],
2347            aggregates: vec![LogicalAggregateExpr {
2348                function: LogicalAggregateFunction::Sum,
2349                expression: Some(LogicalExpression::Property {
2350                    variable: "n".to_string(),
2351                    property: "value".to_string(),
2352                }),
2353                distinct: false,
2354                alias: Some("total".to_string()),
2355            }],
2356            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2357                variable: "n".to_string(),
2358                label: None,
2359                input: None,
2360            })),
2361        }));
2362
2363        let physical = planner.plan(&logical).unwrap();
2364        assert!(physical.columns().contains(&"total".to_string()));
2365    }
2366
2367    #[test]
2368    fn test_plan_aggregate_avg() {
2369        let store = create_test_store();
2370        let planner = Planner::new(store);
2371
2372        // AVG(n.score)
2373        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2374            group_by: vec![],
2375            aggregates: vec![LogicalAggregateExpr {
2376                function: LogicalAggregateFunction::Avg,
2377                expression: Some(LogicalExpression::Property {
2378                    variable: "n".to_string(),
2379                    property: "score".to_string(),
2380                }),
2381                distinct: false,
2382                alias: Some("average".to_string()),
2383            }],
2384            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2385                variable: "n".to_string(),
2386                label: None,
2387                input: None,
2388            })),
2389        }));
2390
2391        let physical = planner.plan(&logical).unwrap();
2392        assert!(physical.columns().contains(&"average".to_string()));
2393    }
2394
2395    #[test]
2396    fn test_plan_aggregate_min_max() {
2397        let store = create_test_store();
2398        let planner = Planner::new(store);
2399
2400        // MIN(n.age), MAX(n.age)
2401        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2402            group_by: vec![],
2403            aggregates: vec![
2404                LogicalAggregateExpr {
2405                    function: LogicalAggregateFunction::Min,
2406                    expression: Some(LogicalExpression::Property {
2407                        variable: "n".to_string(),
2408                        property: "age".to_string(),
2409                    }),
2410                    distinct: false,
2411                    alias: Some("youngest".to_string()),
2412                },
2413                LogicalAggregateExpr {
2414                    function: LogicalAggregateFunction::Max,
2415                    expression: Some(LogicalExpression::Property {
2416                        variable: "n".to_string(),
2417                        property: "age".to_string(),
2418                    }),
2419                    distinct: false,
2420                    alias: Some("oldest".to_string()),
2421                },
2422            ],
2423            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2424                variable: "n".to_string(),
2425                label: None,
2426                input: None,
2427            })),
2428        }));
2429
2430        let physical = planner.plan(&logical).unwrap();
2431        assert!(physical.columns().contains(&"youngest".to_string()));
2432        assert!(physical.columns().contains(&"oldest".to_string()));
2433    }
2434
2435    // ==================== Join Tests ====================
2436
2437    #[test]
2438    fn test_plan_inner_join() {
2439        let store = create_test_store();
2440        let planner = Planner::new(store);
2441
2442        // Inner join between two scans
2443        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2444            items: vec![
2445                ReturnItem {
2446                    expression: LogicalExpression::Variable("a".to_string()),
2447                    alias: None,
2448                },
2449                ReturnItem {
2450                    expression: LogicalExpression::Variable("b".to_string()),
2451                    alias: None,
2452                },
2453            ],
2454            distinct: false,
2455            input: Box::new(LogicalOperator::Join(JoinOp {
2456                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2457                    variable: "a".to_string(),
2458                    label: Some("Person".to_string()),
2459                    input: None,
2460                })),
2461                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2462                    variable: "b".to_string(),
2463                    label: Some("Company".to_string()),
2464                    input: None,
2465                })),
2466                join_type: JoinType::Inner,
2467                conditions: vec![JoinCondition {
2468                    left: LogicalExpression::Variable("a".to_string()),
2469                    right: LogicalExpression::Variable("b".to_string()),
2470                }],
2471            })),
2472        }));
2473
2474        let physical = planner.plan(&logical).unwrap();
2475        assert!(physical.columns().contains(&"a".to_string()));
2476        assert!(physical.columns().contains(&"b".to_string()));
2477    }
2478
2479    #[test]
2480    fn test_plan_cross_join() {
2481        let store = create_test_store();
2482        let planner = Planner::new(store);
2483
2484        // Cross join (no conditions)
2485        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2486            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2487                variable: "a".to_string(),
2488                label: None,
2489                input: None,
2490            })),
2491            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2492                variable: "b".to_string(),
2493                label: None,
2494                input: None,
2495            })),
2496            join_type: JoinType::Cross,
2497            conditions: vec![],
2498        }));
2499
2500        let physical = planner.plan(&logical).unwrap();
2501        assert_eq!(physical.columns().len(), 2);
2502    }
2503
2504    #[test]
2505    fn test_plan_left_join() {
2506        let store = create_test_store();
2507        let planner = Planner::new(store);
2508
2509        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2510            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2511                variable: "a".to_string(),
2512                label: None,
2513                input: None,
2514            })),
2515            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2516                variable: "b".to_string(),
2517                label: None,
2518                input: None,
2519            })),
2520            join_type: JoinType::Left,
2521            conditions: vec![],
2522        }));
2523
2524        let physical = planner.plan(&logical).unwrap();
2525        assert_eq!(physical.columns().len(), 2);
2526    }
2527
2528    // ==================== Mutation Tests ====================
2529
2530    #[test]
2531    fn test_plan_create_node() {
2532        let store = create_test_store();
2533        let planner = Planner::new(store);
2534
2535        // CREATE (n:Person {name: 'Alice'})
2536        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2537            variable: "n".to_string(),
2538            labels: vec!["Person".to_string()],
2539            properties: vec![(
2540                "name".to_string(),
2541                LogicalExpression::Literal(Value::String("Alice".into())),
2542            )],
2543            input: None,
2544        }));
2545
2546        let physical = planner.plan(&logical).unwrap();
2547        assert!(physical.columns().contains(&"n".to_string()));
2548    }
2549
2550    #[test]
2551    fn test_plan_create_edge() {
2552        let store = create_test_store();
2553        let planner = Planner::new(store);
2554
2555        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
2556        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
2557            variable: Some("r".to_string()),
2558            from_variable: "a".to_string(),
2559            to_variable: "b".to_string(),
2560            edge_type: "KNOWS".to_string(),
2561            properties: vec![],
2562            input: Box::new(LogicalOperator::Join(JoinOp {
2563                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2564                    variable: "a".to_string(),
2565                    label: None,
2566                    input: None,
2567                })),
2568                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2569                    variable: "b".to_string(),
2570                    label: None,
2571                    input: None,
2572                })),
2573                join_type: JoinType::Cross,
2574                conditions: vec![],
2575            })),
2576        }));
2577
2578        let physical = planner.plan(&logical).unwrap();
2579        assert!(physical.columns().contains(&"r".to_string()));
2580    }
2581
2582    #[test]
2583    fn test_plan_delete_node() {
2584        let store = create_test_store();
2585        let planner = Planner::new(store);
2586
2587        // MATCH (n) DELETE n
2588        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
2589            variable: "n".to_string(),
2590            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2591                variable: "n".to_string(),
2592                label: None,
2593                input: None,
2594            })),
2595        }));
2596
2597        let physical = planner.plan(&logical).unwrap();
2598        assert!(physical.columns().contains(&"deleted_count".to_string()));
2599    }
2600
2601    // ==================== Error Cases ====================
2602
2603    #[test]
2604    fn test_plan_empty_errors() {
2605        let store = create_test_store();
2606        let planner = Planner::new(store);
2607
2608        let logical = LogicalPlan::new(LogicalOperator::Empty);
2609        let result = planner.plan(&logical);
2610        assert!(result.is_err());
2611    }
2612
2613    #[test]
2614    fn test_plan_missing_variable_in_return() {
2615        let store = create_test_store();
2616        let planner = Planner::new(store);
2617
2618        // Return variable that doesn't exist in input
2619        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2620            items: vec![ReturnItem {
2621                expression: LogicalExpression::Variable("missing".to_string()),
2622                alias: None,
2623            }],
2624            distinct: false,
2625            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2626                variable: "n".to_string(),
2627                label: None,
2628                input: None,
2629            })),
2630        }));
2631
2632        let result = planner.plan(&logical);
2633        assert!(result.is_err());
2634    }
2635
2636    // ==================== Helper Function Tests ====================
2637
2638    #[test]
2639    fn test_convert_binary_ops() {
2640        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
2641        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
2642        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
2643        assert!(convert_binary_op(BinaryOp::Le).is_ok());
2644        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
2645        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
2646        assert!(convert_binary_op(BinaryOp::And).is_ok());
2647        assert!(convert_binary_op(BinaryOp::Or).is_ok());
2648        assert!(convert_binary_op(BinaryOp::Add).is_ok());
2649        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2650        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2651        assert!(convert_binary_op(BinaryOp::Div).is_ok());
2652    }
2653
2654    #[test]
2655    fn test_convert_unary_ops() {
2656        assert!(convert_unary_op(UnaryOp::Not).is_ok());
2657        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2658        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2659        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2660    }
2661
2662    #[test]
2663    fn test_convert_aggregate_functions() {
2664        assert!(matches!(
2665            convert_aggregate_function(LogicalAggregateFunction::Count),
2666            PhysicalAggregateFunction::Count
2667        ));
2668        assert!(matches!(
2669            convert_aggregate_function(LogicalAggregateFunction::Sum),
2670            PhysicalAggregateFunction::Sum
2671        ));
2672        assert!(matches!(
2673            convert_aggregate_function(LogicalAggregateFunction::Avg),
2674            PhysicalAggregateFunction::Avg
2675        ));
2676        assert!(matches!(
2677            convert_aggregate_function(LogicalAggregateFunction::Min),
2678            PhysicalAggregateFunction::Min
2679        ));
2680        assert!(matches!(
2681            convert_aggregate_function(LogicalAggregateFunction::Max),
2682            PhysicalAggregateFunction::Max
2683        ));
2684    }
2685
2686    #[test]
2687    fn test_planner_accessors() {
2688        let store = create_test_store();
2689        let planner = Planner::new(Arc::clone(&store));
2690
2691        assert!(planner.tx_id().is_none());
2692        assert!(planner.tx_manager().is_none());
2693        let _ = planner.viewing_epoch(); // Just ensure it's accessible
2694    }
2695
2696    #[test]
2697    fn test_physical_plan_accessors() {
2698        let store = create_test_store();
2699        let planner = Planner::new(store);
2700
2701        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2702            variable: "n".to_string(),
2703            label: None,
2704            input: None,
2705        }));
2706
2707        let physical = planner.plan(&logical).unwrap();
2708        assert_eq!(physical.columns(), &["n"]);
2709
2710        // Test into_operator
2711        let _ = physical.into_operator();
2712    }
2713}