Skip to main content

graphos_engine/query/
planner.rs

1//! Logical query planner.
2//!
3//! Converts a logical plan into a physical plan (tree of operators).
4
5use crate::query::plan::{
6    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
7    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
8    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
9    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, SkipOp, SortOp,
10    SortOrder, UnaryOp, UnionOp, UnwindOp,
11};
12use graphos_common::types::LogicalType;
13use graphos_common::types::{EpochId, TxId};
14use graphos_common::utils::error::{Error, Result};
15use graphos_core::execution::operators::{
16    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
17    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
18    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
19    ExpressionPredicate, FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
20    JoinType as PhysicalJoinType, LimitOperator, MergeOperator, NullOrder, Operator, ProjectExpr,
21    ProjectOperator, PropertySource, RemoveLabelOperator, ScanOperator, SetPropertyOperator,
22    SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
23    UnaryFilterOp, UnionOperator, UnwindOperator,
24};
25use graphos_core::graph::{Direction, lpg::LpgStore};
26use std::collections::HashMap;
27use std::sync::Arc;
28
29use crate::transaction::TransactionManager;
30
31/// Converts a logical plan to a physical operator tree.
32pub struct Planner {
33    /// The graph store to scan from.
34    store: Arc<LpgStore>,
35    /// Transaction manager for MVCC operations.
36    tx_manager: Option<Arc<TransactionManager>>,
37    /// Current transaction ID (if in a transaction).
38    tx_id: Option<TxId>,
39    /// Epoch to use for visibility checks.
40    viewing_epoch: EpochId,
41    /// Counter for generating unique anonymous edge column names.
42    anon_edge_counter: std::cell::Cell<u32>,
43}
44
45impl Planner {
46    /// Creates a new planner with the given store.
47    ///
48    /// This creates a planner without transaction context, using the current
49    /// epoch from the store for visibility.
50    #[must_use]
51    pub fn new(store: Arc<LpgStore>) -> Self {
52        let epoch = store.current_epoch();
53        Self {
54            store,
55            tx_manager: None,
56            tx_id: None,
57            viewing_epoch: epoch,
58            anon_edge_counter: std::cell::Cell::new(0),
59        }
60    }
61
62    /// Creates a new planner with transaction context for MVCC-aware planning.
63    ///
64    /// # Arguments
65    ///
66    /// * `store` - The graph store
67    /// * `tx_manager` - Transaction manager for recording reads/writes
68    /// * `tx_id` - Current transaction ID (None for auto-commit)
69    /// * `viewing_epoch` - Epoch to use for version visibility
70    #[must_use]
71    pub fn with_context(
72        store: Arc<LpgStore>,
73        tx_manager: Arc<TransactionManager>,
74        tx_id: Option<TxId>,
75        viewing_epoch: EpochId,
76    ) -> Self {
77        Self {
78            store,
79            tx_manager: Some(tx_manager),
80            tx_id,
81            viewing_epoch,
82            anon_edge_counter: std::cell::Cell::new(0),
83        }
84    }
85
86    /// Returns the viewing epoch for this planner.
87    #[must_use]
88    pub fn viewing_epoch(&self) -> EpochId {
89        self.viewing_epoch
90    }
91
92    /// Returns the transaction ID for this planner, if any.
93    #[must_use]
94    pub fn tx_id(&self) -> Option<TxId> {
95        self.tx_id
96    }
97
98    /// Returns a reference to the transaction manager, if available.
99    #[must_use]
100    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
101        self.tx_manager.as_ref()
102    }
103
104    /// Plans a logical plan into a physical operator.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if planning fails.
109    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
110        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
111        Ok(PhysicalPlan { operator, columns })
112    }
113
114    /// Plans a single logical operator.
115    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
116        match op {
117            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
118            LogicalOperator::Expand(expand) => self.plan_expand(expand),
119            LogicalOperator::Return(ret) => self.plan_return(ret),
120            LogicalOperator::Filter(filter) => self.plan_filter(filter),
121            LogicalOperator::Project(project) => {
122                // For now, just plan the input
123                self.plan_operator(&project.input)
124            }
125            LogicalOperator::Limit(limit) => self.plan_limit(limit),
126            LogicalOperator::Skip(skip) => self.plan_skip(skip),
127            LogicalOperator::Sort(sort) => self.plan_sort(sort),
128            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
129            LogicalOperator::Join(join) => self.plan_join(join),
130            LogicalOperator::Union(union) => self.plan_union(union),
131            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
132            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
133            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
134            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
135            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
136            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
137            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
138            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
139            LogicalOperator::Merge(merge) => self.plan_merge(merge),
140            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
141            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
142            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
143            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
144            _ => Err(Error::Internal(format!(
145                "Unsupported operator: {:?}",
146                std::mem::discriminant(op)
147            ))),
148        }
149    }
150
151    /// Plans a node scan operator.
152    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
153        let scan_op = if let Some(label) = &scan.label {
154            ScanOperator::with_label(Arc::clone(&self.store), label)
155        } else {
156            ScanOperator::new(Arc::clone(&self.store))
157        };
158
159        // Apply MVCC context if available
160        let operator: Box<dyn Operator> =
161            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
162
163        let columns = vec![scan.variable.clone()];
164
165        // If there's an input, we'd need to chain operators
166        // For now, just return the scan
167        Ok((operator, columns))
168    }
169
170    /// Plans an expand operator.
171    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
172        // Plan the input operator first
173        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
174
175        // Find the source column index
176        let source_column = input_columns
177            .iter()
178            .position(|c| c == &expand.from_variable)
179            .ok_or_else(|| {
180                Error::Internal(format!(
181                    "Source variable '{}' not found in input columns",
182                    expand.from_variable
183                ))
184            })?;
185
186        // Convert expand direction
187        let direction = match expand.direction {
188            ExpandDirection::Outgoing => Direction::Outgoing,
189            ExpandDirection::Incoming => Direction::Incoming,
190            ExpandDirection::Both => Direction::Both,
191        };
192
193        // Create the expand operator with MVCC context
194        let expand_op = ExpandOperator::new(
195            Arc::clone(&self.store),
196            input_op,
197            source_column,
198            direction,
199            expand.edge_type.clone(),
200        )
201        .with_tx_context(self.viewing_epoch, self.tx_id);
202
203        let operator: Box<dyn Operator> = Box::new(expand_op);
204
205        // Build output columns: [input_columns..., edge, target]
206        // Preserve all input columns and add edge + target to match ExpandOperator output
207        let mut columns = input_columns;
208
209        // Generate edge column name - use provided name or generate anonymous name
210        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
211            let count = self.anon_edge_counter.get();
212            self.anon_edge_counter.set(count + 1);
213            format!("_anon_edge_{}", count)
214        });
215        columns.push(edge_col_name);
216
217        columns.push(expand.to_variable.clone());
218
219        Ok((operator, columns))
220    }
221
222    /// Plans a RETURN clause.
223    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
224        // Plan the input operator
225        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
226
227        // Build variable to column index mapping
228        let variable_columns: HashMap<String, usize> = input_columns
229            .iter()
230            .enumerate()
231            .map(|(i, name)| (name.clone(), i))
232            .collect();
233
234        // Extract column names from return items
235        let columns: Vec<String> = ret
236            .items
237            .iter()
238            .map(|item| {
239                item.alias.clone().unwrap_or_else(|| {
240                    // Generate a default name from the expression
241                    expression_to_string(&item.expression)
242                })
243            })
244            .collect();
245
246        // Check if we need a project operator (for property access or expression evaluation)
247        let needs_project = ret
248            .items
249            .iter()
250            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
251
252        if needs_project {
253            // Build project expressions
254            let mut projections = Vec::with_capacity(ret.items.len());
255            let mut output_types = Vec::with_capacity(ret.items.len());
256
257            for item in &ret.items {
258                match &item.expression {
259                    LogicalExpression::Variable(name) => {
260                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
261                            Error::Internal(format!("Variable '{}' not found in input", name))
262                        })?;
263                        projections.push(ProjectExpr::Column(col_idx));
264                        // Use Node type for variables (they could be nodes, edges, or values)
265                        output_types.push(LogicalType::Node);
266                    }
267                    LogicalExpression::Property { variable, property } => {
268                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
269                            Error::Internal(format!("Variable '{}' not found in input", variable))
270                        })?;
271                        projections.push(ProjectExpr::PropertyAccess {
272                            column: col_idx,
273                            property: property.clone(),
274                        });
275                        // Property could be any type - use String as default
276                        output_types.push(LogicalType::String);
277                    }
278                    LogicalExpression::Literal(value) => {
279                        projections.push(ProjectExpr::Constant(value.clone()));
280                        output_types.push(value_to_logical_type(value));
281                    }
282                    _ => {
283                        return Err(Error::Internal(format!(
284                            "Unsupported RETURN expression: {:?}",
285                            item.expression
286                        )));
287                    }
288                }
289            }
290
291            let operator = Box::new(ProjectOperator::with_store(
292                input_op,
293                projections,
294                output_types,
295                Arc::clone(&self.store),
296            ));
297
298            Ok((operator, columns))
299        } else {
300            // Simple case: just return variables
301            // Re-order columns to match return items if needed
302            let mut projections = Vec::with_capacity(ret.items.len());
303            let mut output_types = Vec::with_capacity(ret.items.len());
304
305            for item in &ret.items {
306                if let LogicalExpression::Variable(name) = &item.expression {
307                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
308                        Error::Internal(format!("Variable '{}' not found in input", name))
309                    })?;
310                    projections.push(ProjectExpr::Column(col_idx));
311                    output_types.push(LogicalType::Node);
312                }
313            }
314
315            // Only add ProjectOperator if reordering is needed
316            if projections.len() == input_columns.len()
317                && projections
318                    .iter()
319                    .enumerate()
320                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
321            {
322                // No reordering needed
323                Ok((input_op, columns))
324            } else {
325                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
326                Ok((operator, columns))
327            }
328        }
329    }
330
331    /// Plans a filter operator.
332    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
333        // Plan the input operator first
334        let (input_op, columns) = self.plan_operator(&filter.input)?;
335
336        // Build variable to column index mapping
337        let variable_columns: HashMap<String, usize> = columns
338            .iter()
339            .enumerate()
340            .map(|(i, name)| (name.clone(), i))
341            .collect();
342
343        // Convert logical expression to filter expression
344        let filter_expr = self.convert_expression(&filter.predicate)?;
345
346        // Create the predicate
347        let predicate =
348            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
349
350        // Create the filter operator
351        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
352
353        Ok((operator, columns))
354    }
355
356    /// Plans a LIMIT operator.
357    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
358        let (input_op, columns) = self.plan_operator(&limit.input)?;
359        let output_schema = self.derive_schema_from_columns(&columns);
360        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
361        Ok((operator, columns))
362    }
363
364    /// Plans a SKIP operator.
365    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
366        let (input_op, columns) = self.plan_operator(&skip.input)?;
367        let output_schema = self.derive_schema_from_columns(&columns);
368        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
369        Ok((operator, columns))
370    }
371
372    /// Plans a SORT (ORDER BY) operator.
373    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
374        let (input_op, columns) = self.plan_operator(&sort.input)?;
375
376        // Build variable to column index mapping
377        let variable_columns: HashMap<String, usize> = columns
378            .iter()
379            .enumerate()
380            .map(|(i, name)| (name.clone(), i))
381            .collect();
382
383        // Convert logical sort keys to physical sort keys
384        let physical_keys: Vec<PhysicalSortKey> = sort
385            .keys
386            .iter()
387            .map(|key| {
388                let col_idx = self.resolve_sort_expression(&key.expression, &variable_columns)?;
389                Ok(PhysicalSortKey {
390                    column: col_idx,
391                    direction: match key.order {
392                        SortOrder::Ascending => SortDirection::Ascending,
393                        SortOrder::Descending => SortDirection::Descending,
394                    },
395                    null_order: NullOrder::NullsLast,
396                })
397            })
398            .collect::<Result<Vec<_>>>()?;
399
400        let output_schema = self.derive_schema_from_columns(&columns);
401        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
402        Ok((operator, columns))
403    }
404
405    /// Resolves a sort expression to a column index.
406    fn resolve_sort_expression(
407        &self,
408        expr: &LogicalExpression,
409        variable_columns: &HashMap<String, usize>,
410    ) -> Result<usize> {
411        match expr {
412            LogicalExpression::Variable(name) => {
413                variable_columns.get(name).copied().ok_or_else(|| {
414                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
415                })
416            }
417            LogicalExpression::Property { variable, .. } => {
418                // For property access, find the variable column
419                variable_columns.get(variable).copied().ok_or_else(|| {
420                    Error::Internal(format!("Variable '{}' not found for ORDER BY", variable))
421                })
422            }
423            _ => Err(Error::Internal(format!(
424                "Unsupported ORDER BY expression: {:?}",
425                expr
426            ))),
427        }
428    }
429
430    /// Derives a schema from column names (assumes Node type as default).
431    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
432        columns.iter().map(|_| LogicalType::Node).collect()
433    }
434
435    /// Plans an AGGREGATE operator.
436    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
437        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
438
439        // Build variable to column index mapping
440        let mut variable_columns: HashMap<String, usize> = input_columns
441            .iter()
442            .enumerate()
443            .map(|(i, name)| (name.clone(), i))
444            .collect();
445
446        // Collect all property expressions that need to be projected before aggregation
447        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
448        let mut next_col_idx = input_columns.len();
449
450        // Check group-by expressions for properties
451        for expr in &agg.group_by {
452            if let LogicalExpression::Property { variable, property } = expr {
453                let col_name = format!("{}_{}", variable, property);
454                if !variable_columns.contains_key(&col_name) {
455                    property_projections.push((
456                        variable.clone(),
457                        property.clone(),
458                        col_name.clone(),
459                    ));
460                    variable_columns.insert(col_name, next_col_idx);
461                    next_col_idx += 1;
462                }
463            }
464        }
465
466        // Check aggregate expressions for properties
467        for agg_expr in &agg.aggregates {
468            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
469                let col_name = format!("{}_{}", variable, property);
470                if !variable_columns.contains_key(&col_name) {
471                    property_projections.push((
472                        variable.clone(),
473                        property.clone(),
474                        col_name.clone(),
475                    ));
476                    variable_columns.insert(col_name, next_col_idx);
477                    next_col_idx += 1;
478                }
479            }
480        }
481
482        // If we have property expressions, add a projection to materialize them
483        if !property_projections.is_empty() {
484            let mut projections = Vec::new();
485            let mut output_types = Vec::new();
486
487            // First, pass through all existing columns
488            for (i, _) in input_columns.iter().enumerate() {
489                projections.push(ProjectExpr::Column(i));
490                output_types.push(LogicalType::Node);
491            }
492
493            // Then add property access projections
494            for (variable, property, _col_name) in &property_projections {
495                let source_col = *variable_columns.get(variable).ok_or_else(|| {
496                    Error::Internal(format!(
497                        "Variable '{}' not found for property projection",
498                        variable
499                    ))
500                })?;
501                projections.push(ProjectExpr::PropertyAccess {
502                    column: source_col,
503                    property: property.clone(),
504                });
505                output_types.push(LogicalType::Int64); // Properties are typically numeric for aggregates
506            }
507
508            input_op = Box::new(ProjectOperator::with_store(
509                input_op,
510                projections,
511                output_types,
512                Arc::clone(&self.store),
513            ));
514        }
515
516        // Convert group-by expressions to column indices
517        let group_columns: Vec<usize> = agg
518            .group_by
519            .iter()
520            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
521            .collect::<Result<Vec<_>>>()?;
522
523        // Convert aggregate expressions to physical form
524        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
525            .aggregates
526            .iter()
527            .map(|agg_expr| {
528                let column = agg_expr
529                    .expression
530                    .as_ref()
531                    .map(|e| {
532                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
533                    })
534                    .transpose()?;
535
536                Ok(PhysicalAggregateExpr {
537                    function: convert_aggregate_function(agg_expr.function),
538                    column,
539                    distinct: agg_expr.distinct,
540                    alias: agg_expr.alias.clone(),
541                })
542            })
543            .collect::<Result<Vec<_>>>()?;
544
545        // Build output schema and column names
546        let mut output_schema = Vec::new();
547        let mut output_columns = Vec::new();
548
549        // Add group-by columns
550        for (idx, expr) in agg.group_by.iter().enumerate() {
551            output_schema.push(LogicalType::Node); // Default type
552            output_columns.push(expression_to_string(expr));
553            // If there's a group column, we need to track it
554            if idx < group_columns.len() {
555                // Group column preserved
556            }
557        }
558
559        // Add aggregate result columns
560        for agg_expr in &agg.aggregates {
561            let result_type = match agg_expr.function {
562                LogicalAggregateFunction::Count => LogicalType::Int64,
563                LogicalAggregateFunction::Sum => LogicalType::Int64,
564                LogicalAggregateFunction::Avg => LogicalType::Float64,
565                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
566                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
567                    // since the aggregate can return any Value type, but the most common case
568                    // is numeric values from property expressions
569                    LogicalType::Int64
570                }
571                LogicalAggregateFunction::Collect => LogicalType::String, // List type
572            };
573            output_schema.push(result_type);
574            output_columns.push(
575                agg_expr
576                    .alias
577                    .clone()
578                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
579            );
580        }
581
582        // Choose operator based on whether there are group-by columns
583        let operator: Box<dyn Operator> = if group_columns.is_empty() {
584            Box::new(SimpleAggregateOperator::new(
585                input_op,
586                physical_aggregates,
587                output_schema,
588            ))
589        } else {
590            Box::new(HashAggregateOperator::new(
591                input_op,
592                group_columns,
593                physical_aggregates,
594                output_schema,
595            ))
596        };
597
598        Ok((operator, output_columns))
599    }
600
601    /// Resolves a logical expression to a column index.
602    #[allow(dead_code)]
603    fn resolve_expression_to_column(
604        &self,
605        expr: &LogicalExpression,
606        variable_columns: &HashMap<String, usize>,
607    ) -> Result<usize> {
608        match expr {
609            LogicalExpression::Variable(name) => variable_columns
610                .get(name)
611                .copied()
612                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
613            LogicalExpression::Property { variable, .. } => variable_columns
614                .get(variable)
615                .copied()
616                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
617            _ => Err(Error::Internal(format!(
618                "Cannot resolve expression to column: {:?}",
619                expr
620            ))),
621        }
622    }
623
624    /// Resolves a logical expression to a column index, using projected property columns.
625    ///
626    /// This is used for aggregations where properties have been projected into their own columns.
627    fn resolve_expression_to_column_with_properties(
628        &self,
629        expr: &LogicalExpression,
630        variable_columns: &HashMap<String, usize>,
631    ) -> Result<usize> {
632        match expr {
633            LogicalExpression::Variable(name) => variable_columns
634                .get(name)
635                .copied()
636                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
637            LogicalExpression::Property { variable, property } => {
638                // Look up the projected property column (e.g., "p_price" for p.price)
639                let col_name = format!("{}_{}", variable, property);
640                variable_columns.get(&col_name).copied().ok_or_else(|| {
641                    Error::Internal(format!(
642                        "Property column '{}' not found (from {}.{})",
643                        col_name, variable, property
644                    ))
645                })
646            }
647            _ => Err(Error::Internal(format!(
648                "Cannot resolve expression to column: {:?}",
649                expr
650            ))),
651        }
652    }
653
654    /// Converts a logical expression to a filter expression.
655    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
656        match expr {
657            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
658            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
659            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
660                variable: variable.clone(),
661                property: property.clone(),
662            }),
663            LogicalExpression::Binary { left, op, right } => {
664                let left_expr = self.convert_expression(left)?;
665                let right_expr = self.convert_expression(right)?;
666                let filter_op = convert_binary_op(*op)?;
667                Ok(FilterExpression::Binary {
668                    left: Box::new(left_expr),
669                    op: filter_op,
670                    right: Box::new(right_expr),
671                })
672            }
673            LogicalExpression::Unary { op, operand } => {
674                let operand_expr = self.convert_expression(operand)?;
675                let filter_op = convert_unary_op(*op)?;
676                Ok(FilterExpression::Unary {
677                    op: filter_op,
678                    operand: Box::new(operand_expr),
679                })
680            }
681            LogicalExpression::FunctionCall { name, args } => {
682                let filter_args: Vec<FilterExpression> = args
683                    .iter()
684                    .map(|a| self.convert_expression(a))
685                    .collect::<Result<Vec<_>>>()?;
686                Ok(FilterExpression::FunctionCall {
687                    name: name.clone(),
688                    args: filter_args,
689                })
690            }
691            LogicalExpression::Case {
692                operand,
693                when_clauses,
694                else_clause,
695            } => {
696                let filter_operand = operand
697                    .as_ref()
698                    .map(|e| self.convert_expression(e))
699                    .transpose()?
700                    .map(Box::new);
701                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
702                    .iter()
703                    .map(|(cond, result)| {
704                        Ok((
705                            self.convert_expression(cond)?,
706                            self.convert_expression(result)?,
707                        ))
708                    })
709                    .collect::<Result<Vec<_>>>()?;
710                let filter_else = else_clause
711                    .as_ref()
712                    .map(|e| self.convert_expression(e))
713                    .transpose()?
714                    .map(Box::new);
715                Ok(FilterExpression::Case {
716                    operand: filter_operand,
717                    when_clauses: filter_when_clauses,
718                    else_clause: filter_else,
719                })
720            }
721            LogicalExpression::List(items) => {
722                let filter_items: Vec<FilterExpression> = items
723                    .iter()
724                    .map(|item| self.convert_expression(item))
725                    .collect::<Result<Vec<_>>>()?;
726                Ok(FilterExpression::List(filter_items))
727            }
728            LogicalExpression::Map(pairs) => {
729                let filter_pairs: Vec<(String, FilterExpression)> = pairs
730                    .iter()
731                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
732                    .collect::<Result<Vec<_>>>()?;
733                Ok(FilterExpression::Map(filter_pairs))
734            }
735            LogicalExpression::IndexAccess { base, index } => {
736                let base_expr = self.convert_expression(base)?;
737                let index_expr = self.convert_expression(index)?;
738                Ok(FilterExpression::IndexAccess {
739                    base: Box::new(base_expr),
740                    index: Box::new(index_expr),
741                })
742            }
743            LogicalExpression::SliceAccess { base, start, end } => {
744                let base_expr = self.convert_expression(base)?;
745                let start_expr = start
746                    .as_ref()
747                    .map(|s| self.convert_expression(s))
748                    .transpose()?
749                    .map(Box::new);
750                let end_expr = end
751                    .as_ref()
752                    .map(|e| self.convert_expression(e))
753                    .transpose()?
754                    .map(Box::new);
755                Ok(FilterExpression::SliceAccess {
756                    base: Box::new(base_expr),
757                    start: start_expr,
758                    end: end_expr,
759                })
760            }
761            LogicalExpression::Parameter(_) => Err(Error::Internal(
762                "Parameters not yet supported in filters".to_string(),
763            )),
764            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
765            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
766            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
767            LogicalExpression::ListComprehension {
768                variable,
769                list_expr,
770                filter_expr,
771                map_expr,
772            } => {
773                let list = self.convert_expression(list_expr)?;
774                let filter = filter_expr
775                    .as_ref()
776                    .map(|f| self.convert_expression(f))
777                    .transpose()?
778                    .map(Box::new);
779                let map = self.convert_expression(map_expr)?;
780                Ok(FilterExpression::ListComprehension {
781                    variable: variable.clone(),
782                    list_expr: Box::new(list),
783                    filter_expr: filter,
784                    map_expr: Box::new(map),
785                })
786            }
787            LogicalExpression::ExistsSubquery(subplan) => {
788                // Extract the pattern from the subplan
789                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
790                let (start_var, direction, edge_type, end_labels) =
791                    self.extract_exists_pattern(subplan)?;
792
793                Ok(FilterExpression::ExistsSubquery {
794                    start_var,
795                    direction,
796                    edge_type,
797                    end_labels,
798                    min_hops: None,
799                    max_hops: None,
800                })
801            }
802            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
803                "COUNT subqueries not yet supported".to_string(),
804            )),
805        }
806    }
807
808    /// Extracts the pattern from an EXISTS subplan.
809    /// Returns (start_variable, direction, edge_type, end_labels).
810    fn extract_exists_pattern(
811        &self,
812        subplan: &LogicalOperator,
813    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
814        match subplan {
815            LogicalOperator::Expand(expand) => {
816                // Get end node labels from the to_variable if there's a node scan input
817                let end_labels = self.extract_end_labels_from_expand(expand);
818                let direction = match expand.direction {
819                    ExpandDirection::Outgoing => Direction::Outgoing,
820                    ExpandDirection::Incoming => Direction::Incoming,
821                    ExpandDirection::Both => Direction::Both,
822                };
823                Ok((
824                    expand.from_variable.clone(),
825                    direction,
826                    expand.edge_type.clone(),
827                    end_labels,
828                ))
829            }
830            LogicalOperator::NodeScan(scan) => {
831                if let Some(input) = &scan.input {
832                    self.extract_exists_pattern(input)
833                } else {
834                    Err(Error::Internal(
835                        "EXISTS subquery must contain an edge pattern".to_string(),
836                    ))
837                }
838            }
839            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
840            _ => Err(Error::Internal(
841                "Unsupported EXISTS subquery pattern".to_string(),
842            )),
843        }
844    }
845
846    /// Extracts end node labels from an Expand operator if present.
847    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
848        // Check if the expand has a NodeScan input with a label filter
849        match expand.input.as_ref() {
850            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
851            _ => None,
852        }
853    }
854
855    /// Plans a JOIN operator.
856    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
857        let (left_op, left_columns) = self.plan_operator(&join.left)?;
858        let (right_op, right_columns) = self.plan_operator(&join.right)?;
859
860        // Build combined output columns
861        let mut columns = left_columns.clone();
862        columns.extend(right_columns.clone());
863
864        // Convert join type
865        let physical_join_type = match join.join_type {
866            JoinType::Inner => PhysicalJoinType::Inner,
867            JoinType::Left => PhysicalJoinType::Left,
868            JoinType::Right => PhysicalJoinType::Right,
869            JoinType::Full => PhysicalJoinType::Full,
870            JoinType::Cross => PhysicalJoinType::Cross,
871            JoinType::Semi => PhysicalJoinType::Semi,
872            JoinType::Anti => PhysicalJoinType::Anti,
873        };
874
875        // Build key columns from join conditions
876        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
877            // Cross join - no keys
878            (vec![], vec![])
879        } else {
880            join.conditions
881                .iter()
882                .filter_map(|cond| {
883                    // Try to extract column indices from expressions
884                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
885                    let right_idx = self
886                        .expression_to_column(&cond.right, &right_columns)
887                        .ok()?;
888                    Some((left_idx, right_idx))
889                })
890                .unzip()
891        };
892
893        let output_schema = self.derive_schema_from_columns(&columns);
894
895        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
896            left_op,
897            right_op,
898            probe_keys,
899            build_keys,
900            physical_join_type,
901            output_schema,
902        ));
903
904        Ok((operator, columns))
905    }
906
907    /// Extracts a column index from an expression.
908    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
909        match expr {
910            LogicalExpression::Variable(name) => columns
911                .iter()
912                .position(|c| c == name)
913                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
914            _ => Err(Error::Internal(
915                "Only variables supported in join conditions".to_string(),
916            )),
917        }
918    }
919
920    /// Plans a UNION operator.
921    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
922        if union.inputs.is_empty() {
923            return Err(Error::Internal(
924                "Union requires at least one input".to_string(),
925            ));
926        }
927
928        let mut inputs = Vec::with_capacity(union.inputs.len());
929        let mut columns = Vec::new();
930
931        for (i, input) in union.inputs.iter().enumerate() {
932            let (op, cols) = self.plan_operator(input)?;
933            if i == 0 {
934                columns = cols;
935            }
936            inputs.push(op);
937        }
938
939        let output_schema = self.derive_schema_from_columns(&columns);
940        let operator = Box::new(UnionOperator::new(inputs, output_schema));
941
942        Ok((operator, columns))
943    }
944
945    /// Plans a DISTINCT operator.
946    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
947        let (input_op, columns) = self.plan_operator(&distinct.input)?;
948        let output_schema = self.derive_schema_from_columns(&columns);
949        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
950        Ok((operator, columns))
951    }
952
953    /// Plans a CREATE NODE operator.
954    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
955        // Plan input if present
956        let (input_op, mut columns) = if let Some(ref input) = create.input {
957            let (op, cols) = self.plan_operator(input)?;
958            (Some(op), cols)
959        } else {
960            (None, vec![])
961        };
962
963        // Output column for the created node
964        let output_column = columns.len();
965        columns.push(create.variable.clone());
966
967        // Convert properties
968        let properties: Vec<(String, PropertySource)> = create
969            .properties
970            .iter()
971            .map(|(name, expr)| {
972                let source = match expr {
973                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
974                    _ => PropertySource::Constant(graphos_common::types::Value::Null),
975                };
976                (name.clone(), source)
977            })
978            .collect();
979
980        let output_schema = self.derive_schema_from_columns(&columns);
981
982        let operator = Box::new(
983            CreateNodeOperator::new(
984                Arc::clone(&self.store),
985                input_op,
986                create.labels.clone(),
987                properties,
988                output_schema,
989                output_column,
990            )
991            .with_tx_context(self.viewing_epoch, self.tx_id),
992        );
993
994        Ok((operator, columns))
995    }
996
997    /// Plans a CREATE EDGE operator.
998    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
999        let (input_op, mut columns) = self.plan_operator(&create.input)?;
1000
1001        // Find source and target columns
1002        let from_column = columns
1003            .iter()
1004            .position(|c| c == &create.from_variable)
1005            .ok_or_else(|| {
1006                Error::Internal(format!(
1007                    "Source variable '{}' not found",
1008                    create.from_variable
1009                ))
1010            })?;
1011
1012        let to_column = columns
1013            .iter()
1014            .position(|c| c == &create.to_variable)
1015            .ok_or_else(|| {
1016                Error::Internal(format!(
1017                    "Target variable '{}' not found",
1018                    create.to_variable
1019                ))
1020            })?;
1021
1022        // Output column for the created edge (if named)
1023        let output_column = create.variable.as_ref().map(|v| {
1024            let idx = columns.len();
1025            columns.push(v.clone());
1026            idx
1027        });
1028
1029        // Convert properties
1030        let properties: Vec<(String, PropertySource)> = create
1031            .properties
1032            .iter()
1033            .map(|(name, expr)| {
1034                let source = match expr {
1035                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1036                    _ => PropertySource::Constant(graphos_common::types::Value::Null),
1037                };
1038                (name.clone(), source)
1039            })
1040            .collect();
1041
1042        let output_schema = self.derive_schema_from_columns(&columns);
1043
1044        let operator = Box::new(
1045            CreateEdgeOperator::new(
1046                Arc::clone(&self.store),
1047                input_op,
1048                from_column,
1049                to_column,
1050                create.edge_type.clone(),
1051                properties,
1052                output_schema,
1053                output_column,
1054            )
1055            .with_tx_context(self.viewing_epoch, self.tx_id),
1056        );
1057
1058        Ok((operator, columns))
1059    }
1060
1061    /// Plans a DELETE NODE operator.
1062    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1063        let (input_op, columns) = self.plan_operator(&delete.input)?;
1064
1065        let node_column = columns
1066            .iter()
1067            .position(|c| c == &delete.variable)
1068            .ok_or_else(|| {
1069                Error::Internal(format!(
1070                    "Variable '{}' not found for delete",
1071                    delete.variable
1072                ))
1073            })?;
1074
1075        // Output schema for delete count
1076        let output_schema = vec![LogicalType::Int64];
1077        let output_columns = vec!["deleted_count".to_string()];
1078
1079        let operator = Box::new(
1080            DeleteNodeOperator::new(
1081                Arc::clone(&self.store),
1082                input_op,
1083                node_column,
1084                output_schema,
1085                true, // detach = true to delete connected edges
1086            )
1087            .with_tx_context(self.viewing_epoch, self.tx_id),
1088        );
1089
1090        Ok((operator, output_columns))
1091    }
1092
1093    /// Plans a DELETE EDGE operator.
1094    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1095        let (input_op, columns) = self.plan_operator(&delete.input)?;
1096
1097        let edge_column = columns
1098            .iter()
1099            .position(|c| c == &delete.variable)
1100            .ok_or_else(|| {
1101                Error::Internal(format!(
1102                    "Variable '{}' not found for delete",
1103                    delete.variable
1104                ))
1105            })?;
1106
1107        // Output schema for delete count
1108        let output_schema = vec![LogicalType::Int64];
1109        let output_columns = vec!["deleted_count".to_string()];
1110
1111        let operator = Box::new(
1112            DeleteEdgeOperator::new(
1113                Arc::clone(&self.store),
1114                input_op,
1115                edge_column,
1116                output_schema,
1117            )
1118            .with_tx_context(self.viewing_epoch, self.tx_id),
1119        );
1120
1121        Ok((operator, output_columns))
1122    }
1123
1124    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
1125    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1126        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
1127        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
1128
1129        // Build combined output columns (left + right)
1130        let mut columns = left_columns.clone();
1131        columns.extend(right_columns.clone());
1132
1133        // Find common variables between left and right for join keys
1134        let mut probe_keys = Vec::new();
1135        let mut build_keys = Vec::new();
1136
1137        for (right_idx, right_col) in right_columns.iter().enumerate() {
1138            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1139                probe_keys.push(left_idx);
1140                build_keys.push(right_idx);
1141            }
1142        }
1143
1144        let output_schema = self.derive_schema_from_columns(&columns);
1145
1146        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1147            left_op,
1148            right_op,
1149            probe_keys,
1150            build_keys,
1151            PhysicalJoinType::Left,
1152            output_schema,
1153        ));
1154
1155        Ok((operator, columns))
1156    }
1157
1158    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
1159    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1160        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
1161        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
1162
1163        // Anti-join only keeps left columns (filters out matching rows)
1164        let columns = left_columns.clone();
1165
1166        // Find common variables between left and right for join keys
1167        let mut probe_keys = Vec::new();
1168        let mut build_keys = Vec::new();
1169
1170        for (right_idx, right_col) in right_columns.iter().enumerate() {
1171            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
1172                probe_keys.push(left_idx);
1173                build_keys.push(right_idx);
1174            }
1175        }
1176
1177        let output_schema = self.derive_schema_from_columns(&columns);
1178
1179        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1180            left_op,
1181            right_op,
1182            probe_keys,
1183            build_keys,
1184            PhysicalJoinType::Anti,
1185            output_schema,
1186        ));
1187
1188        Ok((operator, columns))
1189    }
1190
1191    /// Plans an unwind operator.
1192    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1193        // Plan the input operator first
1194        let (input_op, input_columns) = self.plan_operator(&unwind.input)?;
1195
1196        // The UNWIND expression should be a list - we need to find/evaluate it
1197        // For now, we handle the case where the expression references an existing column
1198        // or is a literal list
1199
1200        // Find if the expression references an existing column (like a list property)
1201        let list_col_idx = match &unwind.expression {
1202            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
1203            LogicalExpression::Property { variable, .. } => {
1204                // Property access needs to be evaluated - for now we'll need the filter predicate
1205                // to evaluate this. For simple cases, we treat it as a list column.
1206                input_columns.iter().position(|c| c == variable)
1207            }
1208            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
1209                // Literal list expression - we'll add it as a virtual column
1210                None
1211            }
1212            _ => None,
1213        };
1214
1215        // Build output columns: all input columns plus the new variable
1216        let mut columns = input_columns.clone();
1217        columns.push(unwind.variable.clone());
1218
1219        // Build output schema
1220        let mut output_schema = self.derive_schema_from_columns(&input_columns);
1221        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
1222
1223        // Use the list column index if found, otherwise default to 0
1224        // (in which case the first column should contain the list)
1225        let col_idx = list_col_idx.unwrap_or(0);
1226
1227        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
1228            input_op,
1229            col_idx,
1230            unwind.variable.clone(),
1231            output_schema,
1232        ));
1233
1234        Ok((operator, columns))
1235    }
1236
1237    /// Plans a MERGE operator.
1238    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1239        // Plan the input operator first (if any)
1240        let (_input_op, mut columns) = self.plan_operator(&merge.input)?;
1241
1242        // Convert match properties from LogicalExpression to Value
1243        let match_properties: Vec<(String, graphos_common::types::Value)> = merge
1244            .match_properties
1245            .iter()
1246            .filter_map(|(name, expr)| {
1247                if let LogicalExpression::Literal(v) = expr {
1248                    Some((name.clone(), v.clone()))
1249                } else {
1250                    None // Skip non-literal expressions for now
1251                }
1252            })
1253            .collect();
1254
1255        // Convert ON CREATE properties
1256        let on_create_properties: Vec<(String, graphos_common::types::Value)> = merge
1257            .on_create
1258            .iter()
1259            .filter_map(|(name, expr)| {
1260                if let LogicalExpression::Literal(v) = expr {
1261                    Some((name.clone(), v.clone()))
1262                } else {
1263                    None
1264                }
1265            })
1266            .collect();
1267
1268        // Convert ON MATCH properties
1269        let on_match_properties: Vec<(String, graphos_common::types::Value)> = merge
1270            .on_match
1271            .iter()
1272            .filter_map(|(name, expr)| {
1273                if let LogicalExpression::Literal(v) = expr {
1274                    Some((name.clone(), v.clone()))
1275                } else {
1276                    None
1277                }
1278            })
1279            .collect();
1280
1281        // Add the merged node variable to output columns
1282        columns.push(merge.variable.clone());
1283
1284        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
1285            Arc::clone(&self.store),
1286            merge.variable.clone(),
1287            merge.labels.clone(),
1288            match_properties,
1289            on_create_properties,
1290            on_match_properties,
1291        ));
1292
1293        Ok((operator, columns))
1294    }
1295
1296    /// Plans an ADD LABEL operator.
1297    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1298        let (input_op, columns) = self.plan_operator(&add_label.input)?;
1299
1300        // Find the node column
1301        let node_column = columns
1302            .iter()
1303            .position(|c| c == &add_label.variable)
1304            .ok_or_else(|| {
1305                Error::Internal(format!(
1306                    "Variable '{}' not found for ADD LABEL",
1307                    add_label.variable
1308                ))
1309            })?;
1310
1311        // Output schema for update count
1312        let output_schema = vec![LogicalType::Int64];
1313        let output_columns = vec!["labels_added".to_string()];
1314
1315        let operator = Box::new(AddLabelOperator::new(
1316            Arc::clone(&self.store),
1317            input_op,
1318            node_column,
1319            add_label.labels.clone(),
1320            output_schema,
1321        ));
1322
1323        Ok((operator, output_columns))
1324    }
1325
1326    /// Plans a REMOVE LABEL operator.
1327    fn plan_remove_label(
1328        &self,
1329        remove_label: &RemoveLabelOp,
1330    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1331        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
1332
1333        // Find the node column
1334        let node_column = columns
1335            .iter()
1336            .position(|c| c == &remove_label.variable)
1337            .ok_or_else(|| {
1338                Error::Internal(format!(
1339                    "Variable '{}' not found for REMOVE LABEL",
1340                    remove_label.variable
1341                ))
1342            })?;
1343
1344        // Output schema for update count
1345        let output_schema = vec![LogicalType::Int64];
1346        let output_columns = vec!["labels_removed".to_string()];
1347
1348        let operator = Box::new(RemoveLabelOperator::new(
1349            Arc::clone(&self.store),
1350            input_op,
1351            node_column,
1352            remove_label.labels.clone(),
1353            output_schema,
1354        ));
1355
1356        Ok((operator, output_columns))
1357    }
1358
1359    /// Plans a SET PROPERTY operator.
1360    fn plan_set_property(
1361        &self,
1362        set_prop: &SetPropertyOp,
1363    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1364        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
1365
1366        // Find the entity column (node or edge variable)
1367        let entity_column = columns
1368            .iter()
1369            .position(|c| c == &set_prop.variable)
1370            .ok_or_else(|| {
1371                Error::Internal(format!(
1372                    "Variable '{}' not found for SET",
1373                    set_prop.variable
1374                ))
1375            })?;
1376
1377        // Convert properties to PropertySource
1378        let properties: Vec<(String, PropertySource)> = set_prop
1379            .properties
1380            .iter()
1381            .map(|(name, expr)| {
1382                let source = self.expression_to_property_source(expr, &columns)?;
1383                Ok((name.clone(), source))
1384            })
1385            .collect::<Result<Vec<_>>>()?;
1386
1387        // Output schema preserves input schema (passes through)
1388        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
1389        let output_columns = columns.clone();
1390
1391        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
1392        let operator = Box::new(SetPropertyOperator::new_for_node(
1393            Arc::clone(&self.store),
1394            input_op,
1395            entity_column,
1396            properties,
1397            output_schema,
1398        ));
1399
1400        Ok((operator, output_columns))
1401    }
1402
1403    /// Converts a logical expression to a PropertySource.
1404    fn expression_to_property_source(
1405        &self,
1406        expr: &LogicalExpression,
1407        columns: &[String],
1408    ) -> Result<PropertySource> {
1409        match expr {
1410            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
1411            LogicalExpression::Variable(name) => {
1412                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
1413                    Error::Internal(format!("Variable '{}' not found for property source", name))
1414                })?;
1415                Ok(PropertySource::Column(col_idx))
1416            }
1417            LogicalExpression::Parameter(name) => {
1418                // Parameters should be resolved before planning
1419                // For now, treat as a placeholder
1420                Ok(PropertySource::Constant(
1421                    graphos_common::types::Value::String(format!("${}", name).into()),
1422                ))
1423            }
1424            _ => Err(Error::Internal(format!(
1425                "Unsupported expression type for property source: {:?}",
1426                expr
1427            ))),
1428        }
1429    }
1430}
1431
1432/// Converts a logical binary operator to a filter binary operator.
1433pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
1434    match op {
1435        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
1436        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
1437        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
1438        BinaryOp::Le => Ok(BinaryFilterOp::Le),
1439        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
1440        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
1441        BinaryOp::And => Ok(BinaryFilterOp::And),
1442        BinaryOp::Or => Ok(BinaryFilterOp::Or),
1443        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
1444        BinaryOp::Add => Ok(BinaryFilterOp::Add),
1445        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
1446        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
1447        BinaryOp::Div => Ok(BinaryFilterOp::Div),
1448        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
1449        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
1450        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
1451        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
1452        BinaryOp::In => Ok(BinaryFilterOp::In),
1453        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
1454        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
1455        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
1456            "Binary operator {:?} not yet supported in filters",
1457            op
1458        ))),
1459    }
1460}
1461
1462/// Converts a logical unary operator to a filter unary operator.
1463pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
1464    match op {
1465        UnaryOp::Not => Ok(UnaryFilterOp::Not),
1466        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
1467        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
1468        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
1469    }
1470}
1471
1472/// Converts a logical aggregate function to a physical aggregate function.
1473pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
1474    match func {
1475        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
1476        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
1477        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
1478        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
1479        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
1480        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
1481    }
1482}
1483
1484/// Converts a logical expression to a filter expression.
1485///
1486/// This is a standalone function that can be used by both LPG and RDF planners.
1487pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
1488    match expr {
1489        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1490        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1491        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1492            variable: variable.clone(),
1493            property: property.clone(),
1494        }),
1495        LogicalExpression::Binary { left, op, right } => {
1496            let left_expr = convert_filter_expression(left)?;
1497            let right_expr = convert_filter_expression(right)?;
1498            let filter_op = convert_binary_op(*op)?;
1499            Ok(FilterExpression::Binary {
1500                left: Box::new(left_expr),
1501                op: filter_op,
1502                right: Box::new(right_expr),
1503            })
1504        }
1505        LogicalExpression::Unary { op, operand } => {
1506            let operand_expr = convert_filter_expression(operand)?;
1507            let filter_op = convert_unary_op(*op)?;
1508            Ok(FilterExpression::Unary {
1509                op: filter_op,
1510                operand: Box::new(operand_expr),
1511            })
1512        }
1513        LogicalExpression::FunctionCall { name, args } => {
1514            let filter_args: Vec<FilterExpression> = args
1515                .iter()
1516                .map(|a| convert_filter_expression(a))
1517                .collect::<Result<Vec<_>>>()?;
1518            Ok(FilterExpression::FunctionCall {
1519                name: name.clone(),
1520                args: filter_args,
1521            })
1522        }
1523        LogicalExpression::Case {
1524            operand,
1525            when_clauses,
1526            else_clause,
1527        } => {
1528            let filter_operand = operand
1529                .as_ref()
1530                .map(|e| convert_filter_expression(e))
1531                .transpose()?
1532                .map(Box::new);
1533            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1534                .iter()
1535                .map(|(cond, result)| {
1536                    Ok((
1537                        convert_filter_expression(cond)?,
1538                        convert_filter_expression(result)?,
1539                    ))
1540                })
1541                .collect::<Result<Vec<_>>>()?;
1542            let filter_else = else_clause
1543                .as_ref()
1544                .map(|e| convert_filter_expression(e))
1545                .transpose()?
1546                .map(Box::new);
1547            Ok(FilterExpression::Case {
1548                operand: filter_operand,
1549                when_clauses: filter_when_clauses,
1550                else_clause: filter_else,
1551            })
1552        }
1553        LogicalExpression::List(items) => {
1554            let filter_items: Vec<FilterExpression> = items
1555                .iter()
1556                .map(|item| convert_filter_expression(item))
1557                .collect::<Result<Vec<_>>>()?;
1558            Ok(FilterExpression::List(filter_items))
1559        }
1560        LogicalExpression::Map(pairs) => {
1561            let filter_pairs: Vec<(String, FilterExpression)> = pairs
1562                .iter()
1563                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
1564                .collect::<Result<Vec<_>>>()?;
1565            Ok(FilterExpression::Map(filter_pairs))
1566        }
1567        LogicalExpression::IndexAccess { base, index } => {
1568            let base_expr = convert_filter_expression(base)?;
1569            let index_expr = convert_filter_expression(index)?;
1570            Ok(FilterExpression::IndexAccess {
1571                base: Box::new(base_expr),
1572                index: Box::new(index_expr),
1573            })
1574        }
1575        LogicalExpression::SliceAccess { base, start, end } => {
1576            let base_expr = convert_filter_expression(base)?;
1577            let start_expr = start
1578                .as_ref()
1579                .map(|s| convert_filter_expression(s))
1580                .transpose()?
1581                .map(Box::new);
1582            let end_expr = end
1583                .as_ref()
1584                .map(|e| convert_filter_expression(e))
1585                .transpose()?
1586                .map(Box::new);
1587            Ok(FilterExpression::SliceAccess {
1588                base: Box::new(base_expr),
1589                start: start_expr,
1590                end: end_expr,
1591            })
1592        }
1593        LogicalExpression::Parameter(_) => Err(Error::Internal(
1594            "Parameters not yet supported in filters".to_string(),
1595        )),
1596        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1597        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1598        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1599        LogicalExpression::ListComprehension {
1600            variable,
1601            list_expr,
1602            filter_expr,
1603            map_expr,
1604        } => {
1605            let list = convert_filter_expression(list_expr)?;
1606            let filter = filter_expr
1607                .as_ref()
1608                .map(|f| convert_filter_expression(f))
1609                .transpose()?
1610                .map(Box::new);
1611            let map = convert_filter_expression(map_expr)?;
1612            Ok(FilterExpression::ListComprehension {
1613                variable: variable.clone(),
1614                list_expr: Box::new(list),
1615                filter_expr: filter,
1616                map_expr: Box::new(map),
1617            })
1618        }
1619        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
1620            Error::Internal("Subqueries not yet supported in filters".to_string()),
1621        ),
1622    }
1623}
1624
1625/// Infers the logical type from a value.
1626fn value_to_logical_type(value: &graphos_common::types::Value) -> LogicalType {
1627    use graphos_common::types::Value;
1628    match value {
1629        Value::Null => LogicalType::String, // Default type for null
1630        Value::Bool(_) => LogicalType::Bool,
1631        Value::Int64(_) => LogicalType::Int64,
1632        Value::Float64(_) => LogicalType::Float64,
1633        Value::String(_) => LogicalType::String,
1634        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
1635        Value::Timestamp(_) => LogicalType::Timestamp,
1636        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
1637        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
1638    }
1639}
1640
1641/// Converts an expression to a string for column naming.
1642fn expression_to_string(expr: &LogicalExpression) -> String {
1643    match expr {
1644        LogicalExpression::Variable(name) => name.clone(),
1645        LogicalExpression::Property { variable, property } => {
1646            format!("{variable}.{property}")
1647        }
1648        LogicalExpression::Literal(value) => format!("{value:?}"),
1649        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
1650        _ => "expr".to_string(),
1651    }
1652}
1653
1654/// A physical plan ready for execution.
1655pub struct PhysicalPlan {
1656    /// The root physical operator.
1657    pub operator: Box<dyn Operator>,
1658    /// Column names for the result.
1659    pub columns: Vec<String>,
1660}
1661
1662impl PhysicalPlan {
1663    /// Returns the column names.
1664    #[must_use]
1665    pub fn columns(&self) -> &[String] {
1666        &self.columns
1667    }
1668
1669    /// Consumes the plan and returns the operator.
1670    pub fn into_operator(self) -> Box<dyn Operator> {
1671        self.operator
1672    }
1673}
1674
1675#[cfg(test)]
1676mod tests {
1677    use super::*;
1678    use crate::query::plan::{
1679        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
1680        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
1681        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
1682        SortKey, SortOp,
1683    };
1684    use graphos_common::types::Value;
1685
1686    fn create_test_store() -> Arc<LpgStore> {
1687        let store = Arc::new(LpgStore::new());
1688        store.create_node(&["Person"]);
1689        store.create_node(&["Person"]);
1690        store.create_node(&["Company"]);
1691        store
1692    }
1693
1694    // ==================== Simple Scan Tests ====================
1695
1696    #[test]
1697    fn test_plan_simple_scan() {
1698        let store = create_test_store();
1699        let planner = Planner::new(store);
1700
1701        // MATCH (n:Person) RETURN n
1702        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1703            items: vec![ReturnItem {
1704                expression: LogicalExpression::Variable("n".to_string()),
1705                alias: None,
1706            }],
1707            distinct: false,
1708            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1709                variable: "n".to_string(),
1710                label: Some("Person".to_string()),
1711                input: None,
1712            })),
1713        }));
1714
1715        let physical = planner.plan(&logical).unwrap();
1716        assert_eq!(physical.columns(), &["n"]);
1717    }
1718
1719    #[test]
1720    fn test_plan_scan_without_label() {
1721        let store = create_test_store();
1722        let planner = Planner::new(store);
1723
1724        // MATCH (n) RETURN n
1725        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1726            items: vec![ReturnItem {
1727                expression: LogicalExpression::Variable("n".to_string()),
1728                alias: None,
1729            }],
1730            distinct: false,
1731            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1732                variable: "n".to_string(),
1733                label: None,
1734                input: None,
1735            })),
1736        }));
1737
1738        let physical = planner.plan(&logical).unwrap();
1739        assert_eq!(physical.columns(), &["n"]);
1740    }
1741
1742    #[test]
1743    fn test_plan_return_with_alias() {
1744        let store = create_test_store();
1745        let planner = Planner::new(store);
1746
1747        // MATCH (n:Person) RETURN n AS person
1748        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1749            items: vec![ReturnItem {
1750                expression: LogicalExpression::Variable("n".to_string()),
1751                alias: Some("person".to_string()),
1752            }],
1753            distinct: false,
1754            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1755                variable: "n".to_string(),
1756                label: Some("Person".to_string()),
1757                input: None,
1758            })),
1759        }));
1760
1761        let physical = planner.plan(&logical).unwrap();
1762        assert_eq!(physical.columns(), &["person"]);
1763    }
1764
1765    #[test]
1766    fn test_plan_return_property() {
1767        let store = create_test_store();
1768        let planner = Planner::new(store);
1769
1770        // MATCH (n:Person) RETURN n.name
1771        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1772            items: vec![ReturnItem {
1773                expression: LogicalExpression::Property {
1774                    variable: "n".to_string(),
1775                    property: "name".to_string(),
1776                },
1777                alias: None,
1778            }],
1779            distinct: false,
1780            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1781                variable: "n".to_string(),
1782                label: Some("Person".to_string()),
1783                input: None,
1784            })),
1785        }));
1786
1787        let physical = planner.plan(&logical).unwrap();
1788        assert_eq!(physical.columns(), &["n.name"]);
1789    }
1790
1791    #[test]
1792    fn test_plan_return_literal() {
1793        let store = create_test_store();
1794        let planner = Planner::new(store);
1795
1796        // MATCH (n) RETURN 42 AS answer
1797        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1798            items: vec![ReturnItem {
1799                expression: LogicalExpression::Literal(Value::Int64(42)),
1800                alias: Some("answer".to_string()),
1801            }],
1802            distinct: false,
1803            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1804                variable: "n".to_string(),
1805                label: None,
1806                input: None,
1807            })),
1808        }));
1809
1810        let physical = planner.plan(&logical).unwrap();
1811        assert_eq!(physical.columns(), &["answer"]);
1812    }
1813
1814    // ==================== Filter Tests ====================
1815
1816    #[test]
1817    fn test_plan_filter_equality() {
1818        let store = create_test_store();
1819        let planner = Planner::new(store);
1820
1821        // MATCH (n:Person) WHERE n.age = 30 RETURN n
1822        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1823            items: vec![ReturnItem {
1824                expression: LogicalExpression::Variable("n".to_string()),
1825                alias: None,
1826            }],
1827            distinct: false,
1828            input: Box::new(LogicalOperator::Filter(FilterOp {
1829                predicate: LogicalExpression::Binary {
1830                    left: Box::new(LogicalExpression::Property {
1831                        variable: "n".to_string(),
1832                        property: "age".to_string(),
1833                    }),
1834                    op: BinaryOp::Eq,
1835                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1836                },
1837                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1838                    variable: "n".to_string(),
1839                    label: Some("Person".to_string()),
1840                    input: None,
1841                })),
1842            })),
1843        }));
1844
1845        let physical = planner.plan(&logical).unwrap();
1846        assert_eq!(physical.columns(), &["n"]);
1847    }
1848
1849    #[test]
1850    fn test_plan_filter_compound_and() {
1851        let store = create_test_store();
1852        let planner = Planner::new(store);
1853
1854        // WHERE n.age > 20 AND n.age < 40
1855        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1856            items: vec![ReturnItem {
1857                expression: LogicalExpression::Variable("n".to_string()),
1858                alias: None,
1859            }],
1860            distinct: false,
1861            input: Box::new(LogicalOperator::Filter(FilterOp {
1862                predicate: LogicalExpression::Binary {
1863                    left: Box::new(LogicalExpression::Binary {
1864                        left: Box::new(LogicalExpression::Property {
1865                            variable: "n".to_string(),
1866                            property: "age".to_string(),
1867                        }),
1868                        op: BinaryOp::Gt,
1869                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
1870                    }),
1871                    op: BinaryOp::And,
1872                    right: Box::new(LogicalExpression::Binary {
1873                        left: Box::new(LogicalExpression::Property {
1874                            variable: "n".to_string(),
1875                            property: "age".to_string(),
1876                        }),
1877                        op: BinaryOp::Lt,
1878                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
1879                    }),
1880                },
1881                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1882                    variable: "n".to_string(),
1883                    label: None,
1884                    input: None,
1885                })),
1886            })),
1887        }));
1888
1889        let physical = planner.plan(&logical).unwrap();
1890        assert_eq!(physical.columns(), &["n"]);
1891    }
1892
1893    #[test]
1894    fn test_plan_filter_unary_not() {
1895        let store = create_test_store();
1896        let planner = Planner::new(store);
1897
1898        // WHERE NOT n.active
1899        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1900            items: vec![ReturnItem {
1901                expression: LogicalExpression::Variable("n".to_string()),
1902                alias: None,
1903            }],
1904            distinct: false,
1905            input: Box::new(LogicalOperator::Filter(FilterOp {
1906                predicate: LogicalExpression::Unary {
1907                    op: UnaryOp::Not,
1908                    operand: Box::new(LogicalExpression::Property {
1909                        variable: "n".to_string(),
1910                        property: "active".to_string(),
1911                    }),
1912                },
1913                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1914                    variable: "n".to_string(),
1915                    label: None,
1916                    input: None,
1917                })),
1918            })),
1919        }));
1920
1921        let physical = planner.plan(&logical).unwrap();
1922        assert_eq!(physical.columns(), &["n"]);
1923    }
1924
1925    #[test]
1926    fn test_plan_filter_is_null() {
1927        let store = create_test_store();
1928        let planner = Planner::new(store);
1929
1930        // WHERE n.email IS NULL
1931        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1932            items: vec![ReturnItem {
1933                expression: LogicalExpression::Variable("n".to_string()),
1934                alias: None,
1935            }],
1936            distinct: false,
1937            input: Box::new(LogicalOperator::Filter(FilterOp {
1938                predicate: LogicalExpression::Unary {
1939                    op: UnaryOp::IsNull,
1940                    operand: Box::new(LogicalExpression::Property {
1941                        variable: "n".to_string(),
1942                        property: "email".to_string(),
1943                    }),
1944                },
1945                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1946                    variable: "n".to_string(),
1947                    label: None,
1948                    input: None,
1949                })),
1950            })),
1951        }));
1952
1953        let physical = planner.plan(&logical).unwrap();
1954        assert_eq!(physical.columns(), &["n"]);
1955    }
1956
1957    #[test]
1958    fn test_plan_filter_function_call() {
1959        let store = create_test_store();
1960        let planner = Planner::new(store);
1961
1962        // WHERE size(n.friends) > 0
1963        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1964            items: vec![ReturnItem {
1965                expression: LogicalExpression::Variable("n".to_string()),
1966                alias: None,
1967            }],
1968            distinct: false,
1969            input: Box::new(LogicalOperator::Filter(FilterOp {
1970                predicate: LogicalExpression::Binary {
1971                    left: Box::new(LogicalExpression::FunctionCall {
1972                        name: "size".to_string(),
1973                        args: vec![LogicalExpression::Property {
1974                            variable: "n".to_string(),
1975                            property: "friends".to_string(),
1976                        }],
1977                    }),
1978                    op: BinaryOp::Gt,
1979                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1980                },
1981                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1982                    variable: "n".to_string(),
1983                    label: None,
1984                    input: None,
1985                })),
1986            })),
1987        }));
1988
1989        let physical = planner.plan(&logical).unwrap();
1990        assert_eq!(physical.columns(), &["n"]);
1991    }
1992
1993    // ==================== Expand Tests ====================
1994
1995    #[test]
1996    fn test_plan_expand_outgoing() {
1997        let store = create_test_store();
1998        let planner = Planner::new(store);
1999
2000        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
2001        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2002            items: vec![
2003                ReturnItem {
2004                    expression: LogicalExpression::Variable("a".to_string()),
2005                    alias: None,
2006                },
2007                ReturnItem {
2008                    expression: LogicalExpression::Variable("b".to_string()),
2009                    alias: None,
2010                },
2011            ],
2012            distinct: false,
2013            input: Box::new(LogicalOperator::Expand(ExpandOp {
2014                from_variable: "a".to_string(),
2015                to_variable: "b".to_string(),
2016                edge_variable: None,
2017                direction: ExpandDirection::Outgoing,
2018                edge_type: Some("KNOWS".to_string()),
2019                min_hops: 1,
2020                max_hops: Some(1),
2021                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2022                    variable: "a".to_string(),
2023                    label: Some("Person".to_string()),
2024                    input: None,
2025                })),
2026            })),
2027        }));
2028
2029        let physical = planner.plan(&logical).unwrap();
2030        // The return should have columns [a, b]
2031        assert!(physical.columns().contains(&"a".to_string()));
2032        assert!(physical.columns().contains(&"b".to_string()));
2033    }
2034
2035    #[test]
2036    fn test_plan_expand_with_edge_variable() {
2037        let store = create_test_store();
2038        let planner = Planner::new(store);
2039
2040        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
2041        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2042            items: vec![
2043                ReturnItem {
2044                    expression: LogicalExpression::Variable("a".to_string()),
2045                    alias: None,
2046                },
2047                ReturnItem {
2048                    expression: LogicalExpression::Variable("r".to_string()),
2049                    alias: None,
2050                },
2051                ReturnItem {
2052                    expression: LogicalExpression::Variable("b".to_string()),
2053                    alias: None,
2054                },
2055            ],
2056            distinct: false,
2057            input: Box::new(LogicalOperator::Expand(ExpandOp {
2058                from_variable: "a".to_string(),
2059                to_variable: "b".to_string(),
2060                edge_variable: Some("r".to_string()),
2061                direction: ExpandDirection::Outgoing,
2062                edge_type: Some("KNOWS".to_string()),
2063                min_hops: 1,
2064                max_hops: Some(1),
2065                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2066                    variable: "a".to_string(),
2067                    label: None,
2068                    input: None,
2069                })),
2070            })),
2071        }));
2072
2073        let physical = planner.plan(&logical).unwrap();
2074        assert!(physical.columns().contains(&"a".to_string()));
2075        assert!(physical.columns().contains(&"r".to_string()));
2076        assert!(physical.columns().contains(&"b".to_string()));
2077    }
2078
2079    // ==================== Limit/Skip/Sort Tests ====================
2080
2081    #[test]
2082    fn test_plan_limit() {
2083        let store = create_test_store();
2084        let planner = Planner::new(store);
2085
2086        // MATCH (n) RETURN n LIMIT 10
2087        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2088            items: vec![ReturnItem {
2089                expression: LogicalExpression::Variable("n".to_string()),
2090                alias: None,
2091            }],
2092            distinct: false,
2093            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2094                count: 10,
2095                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2096                    variable: "n".to_string(),
2097                    label: None,
2098                    input: None,
2099                })),
2100            })),
2101        }));
2102
2103        let physical = planner.plan(&logical).unwrap();
2104        assert_eq!(physical.columns(), &["n"]);
2105    }
2106
2107    #[test]
2108    fn test_plan_skip() {
2109        let store = create_test_store();
2110        let planner = Planner::new(store);
2111
2112        // MATCH (n) RETURN n SKIP 5
2113        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2114            items: vec![ReturnItem {
2115                expression: LogicalExpression::Variable("n".to_string()),
2116                alias: None,
2117            }],
2118            distinct: false,
2119            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2120                count: 5,
2121                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2122                    variable: "n".to_string(),
2123                    label: None,
2124                    input: None,
2125                })),
2126            })),
2127        }));
2128
2129        let physical = planner.plan(&logical).unwrap();
2130        assert_eq!(physical.columns(), &["n"]);
2131    }
2132
2133    #[test]
2134    fn test_plan_sort() {
2135        let store = create_test_store();
2136        let planner = Planner::new(store);
2137
2138        // MATCH (n) RETURN n ORDER BY n.name ASC
2139        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2140            items: vec![ReturnItem {
2141                expression: LogicalExpression::Variable("n".to_string()),
2142                alias: None,
2143            }],
2144            distinct: false,
2145            input: Box::new(LogicalOperator::Sort(SortOp {
2146                keys: vec![SortKey {
2147                    expression: LogicalExpression::Variable("n".to_string()),
2148                    order: SortOrder::Ascending,
2149                }],
2150                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2151                    variable: "n".to_string(),
2152                    label: None,
2153                    input: None,
2154                })),
2155            })),
2156        }));
2157
2158        let physical = planner.plan(&logical).unwrap();
2159        assert_eq!(physical.columns(), &["n"]);
2160    }
2161
2162    #[test]
2163    fn test_plan_sort_descending() {
2164        let store = create_test_store();
2165        let planner = Planner::new(store);
2166
2167        // ORDER BY n DESC
2168        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2169            items: vec![ReturnItem {
2170                expression: LogicalExpression::Variable("n".to_string()),
2171                alias: None,
2172            }],
2173            distinct: false,
2174            input: Box::new(LogicalOperator::Sort(SortOp {
2175                keys: vec![SortKey {
2176                    expression: LogicalExpression::Variable("n".to_string()),
2177                    order: SortOrder::Descending,
2178                }],
2179                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2180                    variable: "n".to_string(),
2181                    label: None,
2182                    input: None,
2183                })),
2184            })),
2185        }));
2186
2187        let physical = planner.plan(&logical).unwrap();
2188        assert_eq!(physical.columns(), &["n"]);
2189    }
2190
2191    #[test]
2192    fn test_plan_distinct() {
2193        let store = create_test_store();
2194        let planner = Planner::new(store);
2195
2196        // MATCH (n) RETURN DISTINCT n
2197        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2198            items: vec![ReturnItem {
2199                expression: LogicalExpression::Variable("n".to_string()),
2200                alias: None,
2201            }],
2202            distinct: false,
2203            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2204                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2205                    variable: "n".to_string(),
2206                    label: None,
2207                    input: None,
2208                })),
2209            })),
2210        }));
2211
2212        let physical = planner.plan(&logical).unwrap();
2213        assert_eq!(physical.columns(), &["n"]);
2214    }
2215
2216    // ==================== Aggregate Tests ====================
2217
2218    #[test]
2219    fn test_plan_aggregate_count() {
2220        let store = create_test_store();
2221        let planner = Planner::new(store);
2222
2223        // MATCH (n) RETURN count(n)
2224        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2225            items: vec![ReturnItem {
2226                expression: LogicalExpression::Variable("cnt".to_string()),
2227                alias: None,
2228            }],
2229            distinct: false,
2230            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
2231                group_by: vec![],
2232                aggregates: vec![LogicalAggregateExpr {
2233                    function: LogicalAggregateFunction::Count,
2234                    expression: Some(LogicalExpression::Variable("n".to_string())),
2235                    distinct: false,
2236                    alias: Some("cnt".to_string()),
2237                }],
2238                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2239                    variable: "n".to_string(),
2240                    label: None,
2241                    input: None,
2242                })),
2243            })),
2244        }));
2245
2246        let physical = planner.plan(&logical).unwrap();
2247        assert!(physical.columns().contains(&"cnt".to_string()));
2248    }
2249
2250    #[test]
2251    fn test_plan_aggregate_with_group_by() {
2252        let store = create_test_store();
2253        let planner = Planner::new(store);
2254
2255        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
2256        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2257            group_by: vec![LogicalExpression::Property {
2258                variable: "n".to_string(),
2259                property: "city".to_string(),
2260            }],
2261            aggregates: vec![LogicalAggregateExpr {
2262                function: LogicalAggregateFunction::Count,
2263                expression: Some(LogicalExpression::Variable("n".to_string())),
2264                distinct: false,
2265                alias: Some("cnt".to_string()),
2266            }],
2267            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2268                variable: "n".to_string(),
2269                label: Some("Person".to_string()),
2270                input: None,
2271            })),
2272        }));
2273
2274        let physical = planner.plan(&logical).unwrap();
2275        assert_eq!(physical.columns().len(), 2);
2276    }
2277
2278    #[test]
2279    fn test_plan_aggregate_sum() {
2280        let store = create_test_store();
2281        let planner = Planner::new(store);
2282
2283        // SUM(n.value)
2284        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2285            group_by: vec![],
2286            aggregates: vec![LogicalAggregateExpr {
2287                function: LogicalAggregateFunction::Sum,
2288                expression: Some(LogicalExpression::Property {
2289                    variable: "n".to_string(),
2290                    property: "value".to_string(),
2291                }),
2292                distinct: false,
2293                alias: Some("total".to_string()),
2294            }],
2295            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2296                variable: "n".to_string(),
2297                label: None,
2298                input: None,
2299            })),
2300        }));
2301
2302        let physical = planner.plan(&logical).unwrap();
2303        assert!(physical.columns().contains(&"total".to_string()));
2304    }
2305
2306    #[test]
2307    fn test_plan_aggregate_avg() {
2308        let store = create_test_store();
2309        let planner = Planner::new(store);
2310
2311        // AVG(n.score)
2312        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2313            group_by: vec![],
2314            aggregates: vec![LogicalAggregateExpr {
2315                function: LogicalAggregateFunction::Avg,
2316                expression: Some(LogicalExpression::Property {
2317                    variable: "n".to_string(),
2318                    property: "score".to_string(),
2319                }),
2320                distinct: false,
2321                alias: Some("average".to_string()),
2322            }],
2323            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2324                variable: "n".to_string(),
2325                label: None,
2326                input: None,
2327            })),
2328        }));
2329
2330        let physical = planner.plan(&logical).unwrap();
2331        assert!(physical.columns().contains(&"average".to_string()));
2332    }
2333
2334    #[test]
2335    fn test_plan_aggregate_min_max() {
2336        let store = create_test_store();
2337        let planner = Planner::new(store);
2338
2339        // MIN(n.age), MAX(n.age)
2340        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
2341            group_by: vec![],
2342            aggregates: vec![
2343                LogicalAggregateExpr {
2344                    function: LogicalAggregateFunction::Min,
2345                    expression: Some(LogicalExpression::Property {
2346                        variable: "n".to_string(),
2347                        property: "age".to_string(),
2348                    }),
2349                    distinct: false,
2350                    alias: Some("youngest".to_string()),
2351                },
2352                LogicalAggregateExpr {
2353                    function: LogicalAggregateFunction::Max,
2354                    expression: Some(LogicalExpression::Property {
2355                        variable: "n".to_string(),
2356                        property: "age".to_string(),
2357                    }),
2358                    distinct: false,
2359                    alias: Some("oldest".to_string()),
2360                },
2361            ],
2362            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2363                variable: "n".to_string(),
2364                label: None,
2365                input: None,
2366            })),
2367        }));
2368
2369        let physical = planner.plan(&logical).unwrap();
2370        assert!(physical.columns().contains(&"youngest".to_string()));
2371        assert!(physical.columns().contains(&"oldest".to_string()));
2372    }
2373
2374    // ==================== Join Tests ====================
2375
2376    #[test]
2377    fn test_plan_inner_join() {
2378        let store = create_test_store();
2379        let planner = Planner::new(store);
2380
2381        // Inner join between two scans
2382        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2383            items: vec![
2384                ReturnItem {
2385                    expression: LogicalExpression::Variable("a".to_string()),
2386                    alias: None,
2387                },
2388                ReturnItem {
2389                    expression: LogicalExpression::Variable("b".to_string()),
2390                    alias: None,
2391                },
2392            ],
2393            distinct: false,
2394            input: Box::new(LogicalOperator::Join(JoinOp {
2395                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2396                    variable: "a".to_string(),
2397                    label: Some("Person".to_string()),
2398                    input: None,
2399                })),
2400                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2401                    variable: "b".to_string(),
2402                    label: Some("Company".to_string()),
2403                    input: None,
2404                })),
2405                join_type: JoinType::Inner,
2406                conditions: vec![JoinCondition {
2407                    left: LogicalExpression::Variable("a".to_string()),
2408                    right: LogicalExpression::Variable("b".to_string()),
2409                }],
2410            })),
2411        }));
2412
2413        let physical = planner.plan(&logical).unwrap();
2414        assert!(physical.columns().contains(&"a".to_string()));
2415        assert!(physical.columns().contains(&"b".to_string()));
2416    }
2417
2418    #[test]
2419    fn test_plan_cross_join() {
2420        let store = create_test_store();
2421        let planner = Planner::new(store);
2422
2423        // Cross join (no conditions)
2424        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2425            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2426                variable: "a".to_string(),
2427                label: None,
2428                input: None,
2429            })),
2430            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2431                variable: "b".to_string(),
2432                label: None,
2433                input: None,
2434            })),
2435            join_type: JoinType::Cross,
2436            conditions: vec![],
2437        }));
2438
2439        let physical = planner.plan(&logical).unwrap();
2440        assert_eq!(physical.columns().len(), 2);
2441    }
2442
2443    #[test]
2444    fn test_plan_left_join() {
2445        let store = create_test_store();
2446        let planner = Planner::new(store);
2447
2448        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
2449            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2450                variable: "a".to_string(),
2451                label: None,
2452                input: None,
2453            })),
2454            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2455                variable: "b".to_string(),
2456                label: None,
2457                input: None,
2458            })),
2459            join_type: JoinType::Left,
2460            conditions: vec![],
2461        }));
2462
2463        let physical = planner.plan(&logical).unwrap();
2464        assert_eq!(physical.columns().len(), 2);
2465    }
2466
2467    // ==================== Mutation Tests ====================
2468
2469    #[test]
2470    fn test_plan_create_node() {
2471        let store = create_test_store();
2472        let planner = Planner::new(store);
2473
2474        // CREATE (n:Person {name: 'Alice'})
2475        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
2476            variable: "n".to_string(),
2477            labels: vec!["Person".to_string()],
2478            properties: vec![(
2479                "name".to_string(),
2480                LogicalExpression::Literal(Value::String("Alice".into())),
2481            )],
2482            input: None,
2483        }));
2484
2485        let physical = planner.plan(&logical).unwrap();
2486        assert!(physical.columns().contains(&"n".to_string()));
2487    }
2488
2489    #[test]
2490    fn test_plan_create_edge() {
2491        let store = create_test_store();
2492        let planner = Planner::new(store);
2493
2494        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
2495        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
2496            variable: Some("r".to_string()),
2497            from_variable: "a".to_string(),
2498            to_variable: "b".to_string(),
2499            edge_type: "KNOWS".to_string(),
2500            properties: vec![],
2501            input: Box::new(LogicalOperator::Join(JoinOp {
2502                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2503                    variable: "a".to_string(),
2504                    label: None,
2505                    input: None,
2506                })),
2507                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2508                    variable: "b".to_string(),
2509                    label: None,
2510                    input: None,
2511                })),
2512                join_type: JoinType::Cross,
2513                conditions: vec![],
2514            })),
2515        }));
2516
2517        let physical = planner.plan(&logical).unwrap();
2518        assert!(physical.columns().contains(&"r".to_string()));
2519    }
2520
2521    #[test]
2522    fn test_plan_delete_node() {
2523        let store = create_test_store();
2524        let planner = Planner::new(store);
2525
2526        // MATCH (n) DELETE n
2527        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
2528            variable: "n".to_string(),
2529            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2530                variable: "n".to_string(),
2531                label: None,
2532                input: None,
2533            })),
2534        }));
2535
2536        let physical = planner.plan(&logical).unwrap();
2537        assert!(physical.columns().contains(&"deleted_count".to_string()));
2538    }
2539
2540    // ==================== Error Cases ====================
2541
2542    #[test]
2543    fn test_plan_empty_errors() {
2544        let store = create_test_store();
2545        let planner = Planner::new(store);
2546
2547        let logical = LogicalPlan::new(LogicalOperator::Empty);
2548        let result = planner.plan(&logical);
2549        assert!(result.is_err());
2550    }
2551
2552    #[test]
2553    fn test_plan_missing_variable_in_return() {
2554        let store = create_test_store();
2555        let planner = Planner::new(store);
2556
2557        // Return variable that doesn't exist in input
2558        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2559            items: vec![ReturnItem {
2560                expression: LogicalExpression::Variable("missing".to_string()),
2561                alias: None,
2562            }],
2563            distinct: false,
2564            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2565                variable: "n".to_string(),
2566                label: None,
2567                input: None,
2568            })),
2569        }));
2570
2571        let result = planner.plan(&logical);
2572        assert!(result.is_err());
2573    }
2574
2575    // ==================== Helper Function Tests ====================
2576
2577    #[test]
2578    fn test_convert_binary_ops() {
2579        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
2580        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
2581        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
2582        assert!(convert_binary_op(BinaryOp::Le).is_ok());
2583        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
2584        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
2585        assert!(convert_binary_op(BinaryOp::And).is_ok());
2586        assert!(convert_binary_op(BinaryOp::Or).is_ok());
2587        assert!(convert_binary_op(BinaryOp::Add).is_ok());
2588        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
2589        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
2590        assert!(convert_binary_op(BinaryOp::Div).is_ok());
2591    }
2592
2593    #[test]
2594    fn test_convert_unary_ops() {
2595        assert!(convert_unary_op(UnaryOp::Not).is_ok());
2596        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
2597        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
2598        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
2599    }
2600
2601    #[test]
2602    fn test_convert_aggregate_functions() {
2603        assert!(matches!(
2604            convert_aggregate_function(LogicalAggregateFunction::Count),
2605            PhysicalAggregateFunction::Count
2606        ));
2607        assert!(matches!(
2608            convert_aggregate_function(LogicalAggregateFunction::Sum),
2609            PhysicalAggregateFunction::Sum
2610        ));
2611        assert!(matches!(
2612            convert_aggregate_function(LogicalAggregateFunction::Avg),
2613            PhysicalAggregateFunction::Avg
2614        ));
2615        assert!(matches!(
2616            convert_aggregate_function(LogicalAggregateFunction::Min),
2617            PhysicalAggregateFunction::Min
2618        ));
2619        assert!(matches!(
2620            convert_aggregate_function(LogicalAggregateFunction::Max),
2621            PhysicalAggregateFunction::Max
2622        ));
2623    }
2624
2625    #[test]
2626    fn test_planner_accessors() {
2627        let store = create_test_store();
2628        let planner = Planner::new(Arc::clone(&store));
2629
2630        assert!(planner.tx_id().is_none());
2631        assert!(planner.tx_manager().is_none());
2632        let _ = planner.viewing_epoch(); // Just ensure it's accessible
2633    }
2634
2635    #[test]
2636    fn test_physical_plan_accessors() {
2637        let store = create_test_store();
2638        let planner = Planner::new(store);
2639
2640        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
2641            variable: "n".to_string(),
2642            label: None,
2643            input: None,
2644        }));
2645
2646        let physical = planner.plan(&logical).unwrap();
2647        assert_eq!(physical.columns(), &["n"]);
2648
2649        // Test into_operator
2650        let _ = physical.into_operator();
2651    }
2652}