Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::LogicalType;
15use grafeo_common::types::{EpochId, TxId};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, ExpandOperator,
22    ExpandStep, ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator,
23    FilterExpression, FilterOperator, HashAggregateOperator, HashJoinOperator,
24    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator, LimitOperator,
25    MergeOperator, NestedLoopJoinOperator, NullOrder, Operator, ProjectExpr, ProjectOperator,
26    PropertySource, RemoveLabelOperator, ScanOperator, SetPropertyOperator, ShortestPathOperator,
27    SimpleAggregateOperator, SkipOperator, SortDirection, SortKey as PhysicalSortKey, SortOperator,
28    UnaryFilterOp, UnionOperator, UnwindOperator, VariableLengthExpandOperator,
29};
30use grafeo_core::graph::{Direction, lpg::LpgStore};
31use std::collections::HashMap;
32use std::sync::Arc;
33
34use crate::transaction::TransactionManager;
35
36/// Converts a logical plan to a physical operator tree.
37pub struct Planner {
38    /// The graph store to scan from.
39    store: Arc<LpgStore>,
40    /// Transaction manager for MVCC operations.
41    tx_manager: Option<Arc<TransactionManager>>,
42    /// Current transaction ID (if in a transaction).
43    tx_id: Option<TxId>,
44    /// Epoch to use for visibility checks.
45    viewing_epoch: EpochId,
46    /// Counter for generating unique anonymous edge column names.
47    anon_edge_counter: std::cell::Cell<u32>,
48    /// Whether to use factorized execution for multi-hop queries.
49    factorized_execution: bool,
50}
51
52impl Planner {
53    /// Creates a new planner with the given store.
54    ///
55    /// This creates a planner without transaction context, using the current
56    /// epoch from the store for visibility.
57    #[must_use]
58    pub fn new(store: Arc<LpgStore>) -> Self {
59        let epoch = store.current_epoch();
60        Self {
61            store,
62            tx_manager: None,
63            tx_id: None,
64            viewing_epoch: epoch,
65            anon_edge_counter: std::cell::Cell::new(0),
66            factorized_execution: true,
67        }
68    }
69
70    /// Creates a new planner with transaction context for MVCC-aware planning.
71    ///
72    /// # Arguments
73    ///
74    /// * `store` - The graph store
75    /// * `tx_manager` - Transaction manager for recording reads/writes
76    /// * `tx_id` - Current transaction ID (None for auto-commit)
77    /// * `viewing_epoch` - Epoch to use for version visibility
78    #[must_use]
79    pub fn with_context(
80        store: Arc<LpgStore>,
81        tx_manager: Arc<TransactionManager>,
82        tx_id: Option<TxId>,
83        viewing_epoch: EpochId,
84    ) -> Self {
85        Self {
86            store,
87            tx_manager: Some(tx_manager),
88            tx_id,
89            viewing_epoch,
90            anon_edge_counter: std::cell::Cell::new(0),
91            factorized_execution: true,
92        }
93    }
94
95    /// Returns the viewing epoch for this planner.
96    #[must_use]
97    pub fn viewing_epoch(&self) -> EpochId {
98        self.viewing_epoch
99    }
100
101    /// Returns the transaction ID for this planner, if any.
102    #[must_use]
103    pub fn tx_id(&self) -> Option<TxId> {
104        self.tx_id
105    }
106
107    /// Returns a reference to the transaction manager, if available.
108    #[must_use]
109    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
110        self.tx_manager.as_ref()
111    }
112
113    /// Enables or disables factorized execution for multi-hop queries.
114    #[must_use]
115    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
116        self.factorized_execution = enabled;
117        self
118    }
119
120    /// Counts consecutive single-hop expand operations.
121    ///
122    /// Returns the count and the deepest non-expand operator (the base of the chain).
123    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
124        match op {
125            LogicalOperator::Expand(expand) => {
126                // Only count single-hop expands (factorization doesn't apply to variable-length)
127                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
128
129                if is_single_hop {
130                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
131                    (inner_count + 1, base)
132                } else {
133                    // Variable-length path breaks the chain
134                    (0, op)
135                }
136            }
137            _ => (0, op),
138        }
139    }
140
141    /// Collects expand operations from the outermost down to the base.
142    ///
143    /// Returns expands in order from innermost (base) to outermost.
144    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
145        let mut chain = Vec::new();
146        let mut current = op;
147
148        while let LogicalOperator::Expand(expand) = current {
149            // Only include single-hop expands
150            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
151            if !is_single_hop {
152                break;
153            }
154            chain.push(expand);
155            current = &expand.input;
156        }
157
158        // Reverse so we go from base to outer
159        chain.reverse();
160        chain
161    }
162
163    /// Plans a logical plan into a physical operator.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if planning fails.
168    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
169        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
170        Ok(PhysicalPlan {
171            operator,
172            columns,
173            adaptive_context: None,
174        })
175    }
176
177    /// Plans a logical plan with adaptive execution support.
178    ///
179    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
180    /// joins) that can be monitored during execution to detect estimate errors.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if planning fails.
185    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
186        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
187
188        // Build adaptive context with cardinality estimates
189        let mut adaptive_context = AdaptiveContext::new();
190        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
191
192        Ok(PhysicalPlan {
193            operator,
194            columns,
195            adaptive_context: Some(adaptive_context),
196        })
197    }
198
199    /// Collects cardinality estimates from the logical plan into an adaptive context.
200    fn collect_cardinality_estimates(
201        &self,
202        op: &LogicalOperator,
203        ctx: &mut AdaptiveContext,
204        depth: usize,
205    ) {
206        match op {
207            LogicalOperator::NodeScan(scan) => {
208                // Estimate based on label statistics
209                let estimate = if let Some(label) = &scan.label {
210                    self.store.nodes_by_label(label).len() as f64
211                } else {
212                    self.store.node_count() as f64
213                };
214                let id = format!("scan_{}", scan.variable);
215                ctx.set_estimate(&id, estimate);
216
217                // Recurse into input if present
218                if let Some(input) = &scan.input {
219                    self.collect_cardinality_estimates(input, ctx, depth + 1);
220                }
221            }
222            LogicalOperator::Filter(filter) => {
223                // Default selectivity estimate for filters (30%)
224                let input_estimate = self.estimate_cardinality(&filter.input);
225                let estimate = input_estimate * 0.3;
226                let id = format!("filter_{depth}");
227                ctx.set_estimate(&id, estimate);
228
229                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
230            }
231            LogicalOperator::Expand(expand) => {
232                // Estimate based on average degree
233                let input_estimate = self.estimate_cardinality(&expand.input);
234                let avg_degree = 10.0; // Default estimate
235                let estimate = input_estimate * avg_degree;
236                let id = format!("expand_{}", expand.to_variable);
237                ctx.set_estimate(&id, estimate);
238
239                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
240            }
241            LogicalOperator::Join(join) => {
242                // Estimate join output (product with selectivity)
243                let left_est = self.estimate_cardinality(&join.left);
244                let right_est = self.estimate_cardinality(&join.right);
245                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
246                let id = format!("join_{depth}");
247                ctx.set_estimate(&id, estimate);
248
249                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
250                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
251            }
252            LogicalOperator::Aggregate(agg) => {
253                // Aggregates typically reduce cardinality
254                let input_estimate = self.estimate_cardinality(&agg.input);
255                let estimate = if agg.group_by.is_empty() {
256                    1.0 // Scalar aggregate
257                } else {
258                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
259                };
260                let id = format!("aggregate_{depth}");
261                ctx.set_estimate(&id, estimate);
262
263                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
264            }
265            LogicalOperator::Distinct(distinct) => {
266                let input_estimate = self.estimate_cardinality(&distinct.input);
267                let estimate = (input_estimate * 0.5).max(1.0);
268                let id = format!("distinct_{depth}");
269                ctx.set_estimate(&id, estimate);
270
271                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
272            }
273            LogicalOperator::Return(ret) => {
274                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
275            }
276            LogicalOperator::Limit(limit) => {
277                let input_estimate = self.estimate_cardinality(&limit.input);
278                let estimate = (input_estimate).min(limit.count as f64);
279                let id = format!("limit_{depth}");
280                ctx.set_estimate(&id, estimate);
281
282                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
283            }
284            LogicalOperator::Skip(skip) => {
285                let input_estimate = self.estimate_cardinality(&skip.input);
286                let estimate = (input_estimate - skip.count as f64).max(0.0);
287                let id = format!("skip_{depth}");
288                ctx.set_estimate(&id, estimate);
289
290                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
291            }
292            LogicalOperator::Sort(sort) => {
293                // Sort doesn't change cardinality
294                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
295            }
296            LogicalOperator::Union(union) => {
297                let estimate: f64 = union
298                    .inputs
299                    .iter()
300                    .map(|input| self.estimate_cardinality(input))
301                    .sum();
302                let id = format!("union_{depth}");
303                ctx.set_estimate(&id, estimate);
304
305                for input in &union.inputs {
306                    self.collect_cardinality_estimates(input, ctx, depth + 1);
307                }
308            }
309            _ => {
310                // For other operators, try to recurse into known input patterns
311            }
312        }
313    }
314
315    /// Estimates cardinality for a logical operator subtree.
316    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
317        match op {
318            LogicalOperator::NodeScan(scan) => {
319                if let Some(label) = &scan.label {
320                    self.store.nodes_by_label(label).len() as f64
321                } else {
322                    self.store.node_count() as f64
323                }
324            }
325            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
326            LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
327            LogicalOperator::Join(join) => {
328                let left = self.estimate_cardinality(&join.left);
329                let right = self.estimate_cardinality(&join.right);
330                (left * right).sqrt()
331            }
332            LogicalOperator::Aggregate(agg) => {
333                if agg.group_by.is_empty() {
334                    1.0
335                } else {
336                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
337                }
338            }
339            LogicalOperator::Distinct(distinct) => {
340                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
341            }
342            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
343            LogicalOperator::Limit(limit) => self
344                .estimate_cardinality(&limit.input)
345                .min(limit.count as f64),
346            LogicalOperator::Skip(skip) => {
347                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
348            }
349            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
350            LogicalOperator::Union(union) => union
351                .inputs
352                .iter()
353                .map(|input| self.estimate_cardinality(input))
354                .sum(),
355            _ => 1000.0, // Default estimate for unknown operators
356        }
357    }
358
359    /// Plans a single logical operator.
360    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
361        match op {
362            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
363            LogicalOperator::Expand(expand) => {
364                // Check for expand chains when factorized execution is enabled
365                if self.factorized_execution {
366                    let (chain_len, _base) = Self::count_expand_chain(op);
367                    if chain_len >= 2 {
368                        // Use factorized chain for 2+ consecutive single-hop expands
369                        return self.plan_expand_chain(op);
370                    }
371                }
372                self.plan_expand(expand)
373            }
374            LogicalOperator::Return(ret) => self.plan_return(ret),
375            LogicalOperator::Filter(filter) => self.plan_filter(filter),
376            LogicalOperator::Project(project) => self.plan_project(project),
377            LogicalOperator::Limit(limit) => self.plan_limit(limit),
378            LogicalOperator::Skip(skip) => self.plan_skip(skip),
379            LogicalOperator::Sort(sort) => self.plan_sort(sort),
380            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
381            LogicalOperator::Join(join) => self.plan_join(join),
382            LogicalOperator::Union(union) => self.plan_union(union),
383            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
384            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
385            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
386            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
387            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
388            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
389            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
390            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
391            LogicalOperator::Merge(merge) => self.plan_merge(merge),
392            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
393            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
394            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
395            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
396            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
397            _ => Err(Error::Internal(format!(
398                "Unsupported operator: {:?}",
399                std::mem::discriminant(op)
400            ))),
401        }
402    }
403
404    /// Plans a node scan operator.
405    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
406        let scan_op = if let Some(label) = &scan.label {
407            ScanOperator::with_label(Arc::clone(&self.store), label)
408        } else {
409            ScanOperator::new(Arc::clone(&self.store))
410        };
411
412        // Apply MVCC context if available
413        let scan_operator: Box<dyn Operator> =
414            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
415
416        // If there's an input, chain operators with a nested loop join (cross join)
417        if let Some(input) = &scan.input {
418            let (input_op, mut input_columns) = self.plan_operator(input)?;
419
420            // Build output schema: input columns + scan column
421            let mut output_schema: Vec<LogicalType> =
422                input_columns.iter().map(|_| LogicalType::Any).collect();
423            output_schema.push(LogicalType::Node);
424
425            // Add scan column to input columns
426            input_columns.push(scan.variable.clone());
427
428            // Use nested loop join to combine input rows with scanned nodes
429            let join_op = Box::new(NestedLoopJoinOperator::new(
430                input_op,
431                scan_operator,
432                None, // No join condition (cross join)
433                PhysicalJoinType::Cross,
434                output_schema,
435            ));
436
437            Ok((join_op, input_columns))
438        } else {
439            let columns = vec![scan.variable.clone()];
440            Ok((scan_operator, columns))
441        }
442    }
443
444    /// Plans an expand operator.
445    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
446        // Plan the input operator first
447        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
448
449        // Find the source column index
450        let source_column = input_columns
451            .iter()
452            .position(|c| c == &expand.from_variable)
453            .ok_or_else(|| {
454                Error::Internal(format!(
455                    "Source variable '{}' not found in input columns",
456                    expand.from_variable
457                ))
458            })?;
459
460        // Convert expand direction
461        let direction = match expand.direction {
462            ExpandDirection::Outgoing => Direction::Outgoing,
463            ExpandDirection::Incoming => Direction::Incoming,
464            ExpandDirection::Both => Direction::Both,
465        };
466
467        // Check if this is a variable-length path
468        let is_variable_length =
469            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
470
471        let operator: Box<dyn Operator> = if is_variable_length {
472            // Use VariableLengthExpandOperator for multi-hop paths
473            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
474            let mut expand_op = VariableLengthExpandOperator::new(
475                Arc::clone(&self.store),
476                input_op,
477                source_column,
478                direction,
479                expand.edge_type.clone(),
480                expand.min_hops,
481                max_hops,
482            )
483            .with_tx_context(self.viewing_epoch, self.tx_id);
484
485            // If a path alias is set, enable path length output
486            if expand.path_alias.is_some() {
487                expand_op = expand_op.with_path_length_output();
488            }
489
490            Box::new(expand_op)
491        } else {
492            // Use simple ExpandOperator for single-hop paths
493            let expand_op = ExpandOperator::new(
494                Arc::clone(&self.store),
495                input_op,
496                source_column,
497                direction,
498                expand.edge_type.clone(),
499            )
500            .with_tx_context(self.viewing_epoch, self.tx_id);
501            Box::new(expand_op)
502        };
503
504        // Build output columns: [input_columns..., edge, target, (path_length)?]
505        // Preserve all input columns and add edge + target to match ExpandOperator output
506        let mut columns = input_columns;
507
508        // Generate edge column name - use provided name or generate anonymous name
509        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
510            let count = self.anon_edge_counter.get();
511            self.anon_edge_counter.set(count + 1);
512            format!("_anon_edge_{}", count)
513        });
514        columns.push(edge_col_name);
515
516        columns.push(expand.to_variable.clone());
517
518        // If a path alias is set, add a column for the path length
519        if let Some(ref path_alias) = expand.path_alias {
520            columns.push(format!("_path_length_{}", path_alias));
521        }
522
523        Ok((operator, columns))
524    }
525
526    /// Plans a chain of consecutive expand operations using factorized execution.
527    ///
528    /// This avoids the Cartesian product explosion that occurs with separate expands.
529    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
530    ///
531    /// The chain is executed lazily at query time, not during planning. This ensures
532    /// that any filters applied above the expand chain are properly respected.
533    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
534        let expands = Self::collect_expand_chain(op);
535        if expands.is_empty() {
536            return Err(Error::Internal("Empty expand chain".to_string()));
537        }
538
539        // Get the base operator (before first expand)
540        let first_expand = expands[0];
541        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
542
543        let mut columns = base_columns.clone();
544        let mut steps = Vec::new();
545
546        // Track the level-local source column for each expand
547        // For the first expand, it's the column in the input (base_columns)
548        // For subsequent expands, the target from the previous level is always at index 1
549        // (each level adds [edge, target], so target is at index 1)
550        let mut is_first = true;
551
552        for expand in &expands {
553            // Find source column for this expand
554            let source_column = if is_first {
555                // For first expand, find in base columns
556                base_columns
557                    .iter()
558                    .position(|c| c == &expand.from_variable)
559                    .ok_or_else(|| {
560                        Error::Internal(format!(
561                            "Source variable '{}' not found in base columns",
562                            expand.from_variable
563                        ))
564                    })?
565            } else {
566                // For subsequent expands, the target from the previous level is at index 1
567                // (each level adds [edge, target], so target is the second column)
568                1
569            };
570
571            // Convert direction
572            let direction = match expand.direction {
573                ExpandDirection::Outgoing => Direction::Outgoing,
574                ExpandDirection::Incoming => Direction::Incoming,
575                ExpandDirection::Both => Direction::Both,
576            };
577
578            // Add expand step configuration
579            steps.push(ExpandStep {
580                source_column,
581                direction,
582                edge_type: expand.edge_type.clone(),
583            });
584
585            // Add edge and target columns
586            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
587                let count = self.anon_edge_counter.get();
588                self.anon_edge_counter.set(count + 1);
589                format!("_anon_edge_{}", count)
590            });
591            columns.push(edge_col_name);
592            columns.push(expand.to_variable.clone());
593
594            is_first = false;
595        }
596
597        // Create lazy operator that executes at query time, not planning time
598        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
599
600        if let Some(tx_id) = self.tx_id {
601            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
602        } else {
603            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
604        }
605
606        Ok((Box::new(lazy_op), columns))
607    }
608
609    /// Plans a RETURN clause.
610    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
611        // Plan the input operator
612        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
613
614        // Build variable to column index mapping
615        let variable_columns: HashMap<String, usize> = input_columns
616            .iter()
617            .enumerate()
618            .map(|(i, name)| (name.clone(), i))
619            .collect();
620
621        // Extract column names from return items
622        let columns: Vec<String> = ret
623            .items
624            .iter()
625            .map(|item| {
626                item.alias.clone().unwrap_or_else(|| {
627                    // Generate a default name from the expression
628                    expression_to_string(&item.expression)
629                })
630            })
631            .collect();
632
633        // Check if we need a project operator (for property access or expression evaluation)
634        let needs_project = ret
635            .items
636            .iter()
637            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
638
639        if needs_project {
640            // Build project expressions
641            let mut projections = Vec::with_capacity(ret.items.len());
642            let mut output_types = Vec::with_capacity(ret.items.len());
643
644            for item in &ret.items {
645                match &item.expression {
646                    LogicalExpression::Variable(name) => {
647                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
648                            Error::Internal(format!("Variable '{}' not found in input", name))
649                        })?;
650                        projections.push(ProjectExpr::Column(col_idx));
651                        // Use Node type for variables (they could be nodes, edges, or values)
652                        output_types.push(LogicalType::Node);
653                    }
654                    LogicalExpression::Property { variable, property } => {
655                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
656                            Error::Internal(format!("Variable '{}' not found in input", variable))
657                        })?;
658                        projections.push(ProjectExpr::PropertyAccess {
659                            column: col_idx,
660                            property: property.clone(),
661                        });
662                        // Property could be any type - use Any/Generic to preserve type
663                        output_types.push(LogicalType::Any);
664                    }
665                    LogicalExpression::Literal(value) => {
666                        projections.push(ProjectExpr::Constant(value.clone()));
667                        output_types.push(value_to_logical_type(value));
668                    }
669                    LogicalExpression::FunctionCall { name, args, .. } => {
670                        // Handle built-in functions
671                        match name.to_lowercase().as_str() {
672                            "type" => {
673                                // type(r) returns the edge type string
674                                if args.len() != 1 {
675                                    return Err(Error::Internal(
676                                        "type() requires exactly one argument".to_string(),
677                                    ));
678                                }
679                                if let LogicalExpression::Variable(var_name) = &args[0] {
680                                    let col_idx =
681                                        *variable_columns.get(var_name).ok_or_else(|| {
682                                            Error::Internal(format!(
683                                                "Variable '{}' not found in input",
684                                                var_name
685                                            ))
686                                        })?;
687                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
688                                    output_types.push(LogicalType::String);
689                                } else {
690                                    return Err(Error::Internal(
691                                        "type() argument must be a variable".to_string(),
692                                    ));
693                                }
694                            }
695                            "length" => {
696                                // length(p) returns the path length
697                                // For shortestPath results, the path column already contains the length
698                                if args.len() != 1 {
699                                    return Err(Error::Internal(
700                                        "length() requires exactly one argument".to_string(),
701                                    ));
702                                }
703                                if let LogicalExpression::Variable(var_name) = &args[0] {
704                                    let col_idx =
705                                        *variable_columns.get(var_name).ok_or_else(|| {
706                                            Error::Internal(format!(
707                                                "Variable '{}' not found in input",
708                                                var_name
709                                            ))
710                                        })?;
711                                    // Pass through the column value directly
712                                    projections.push(ProjectExpr::Column(col_idx));
713                                    output_types.push(LogicalType::Int64);
714                                } else {
715                                    return Err(Error::Internal(
716                                        "length() argument must be a variable".to_string(),
717                                    ));
718                                }
719                            }
720                            // For other functions (head, tail, size, etc.), use expression evaluation
721                            _ => {
722                                let filter_expr = self.convert_expression(&item.expression)?;
723                                projections.push(ProjectExpr::Expression {
724                                    expr: filter_expr,
725                                    variable_columns: variable_columns.clone(),
726                                });
727                                output_types.push(LogicalType::Any);
728                            }
729                        }
730                    }
731                    LogicalExpression::Case { .. } => {
732                        // Convert CASE expression to FilterExpression for evaluation
733                        let filter_expr = self.convert_expression(&item.expression)?;
734                        projections.push(ProjectExpr::Expression {
735                            expr: filter_expr,
736                            variable_columns: variable_columns.clone(),
737                        });
738                        // CASE can return any type - use Any
739                        output_types.push(LogicalType::Any);
740                    }
741                    _ => {
742                        return Err(Error::Internal(format!(
743                            "Unsupported RETURN expression: {:?}",
744                            item.expression
745                        )));
746                    }
747                }
748            }
749
750            let operator = Box::new(ProjectOperator::with_store(
751                input_op,
752                projections,
753                output_types,
754                Arc::clone(&self.store),
755            ));
756
757            Ok((operator, columns))
758        } else {
759            // Simple case: just return variables
760            // Re-order columns to match return items if needed
761            let mut projections = Vec::with_capacity(ret.items.len());
762            let mut output_types = Vec::with_capacity(ret.items.len());
763
764            for item in &ret.items {
765                if let LogicalExpression::Variable(name) = &item.expression {
766                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
767                        Error::Internal(format!("Variable '{}' not found in input", name))
768                    })?;
769                    projections.push(ProjectExpr::Column(col_idx));
770                    output_types.push(LogicalType::Node);
771                }
772            }
773
774            // Only add ProjectOperator if reordering is needed
775            if projections.len() == input_columns.len()
776                && projections
777                    .iter()
778                    .enumerate()
779                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
780            {
781                // No reordering needed
782                Ok((input_op, columns))
783            } else {
784                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
785                Ok((operator, columns))
786            }
787        }
788    }
789
790    /// Plans a project operator (for WITH clause).
791    fn plan_project(
792        &self,
793        project: &crate::query::plan::ProjectOp,
794    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
795        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
796        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
797            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
798                // Create a single-row operator for projecting literals
799                let single_row_op: Box<dyn Operator> = Box::new(
800                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
801                );
802                (single_row_op, Vec::new())
803            } else {
804                self.plan_operator(&project.input)?
805            };
806
807        // Build variable to column index mapping
808        let variable_columns: HashMap<String, usize> = input_columns
809            .iter()
810            .enumerate()
811            .map(|(i, name)| (name.clone(), i))
812            .collect();
813
814        // Build projections and new column names
815        let mut projections = Vec::with_capacity(project.projections.len());
816        let mut output_types = Vec::with_capacity(project.projections.len());
817        let mut output_columns = Vec::with_capacity(project.projections.len());
818
819        for projection in &project.projections {
820            // Determine the output column name (alias or expression string)
821            let col_name = projection
822                .alias
823                .clone()
824                .unwrap_or_else(|| expression_to_string(&projection.expression));
825            output_columns.push(col_name);
826
827            match &projection.expression {
828                LogicalExpression::Variable(name) => {
829                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
830                        Error::Internal(format!("Variable '{}' not found in input", name))
831                    })?;
832                    projections.push(ProjectExpr::Column(col_idx));
833                    output_types.push(LogicalType::Node);
834                }
835                LogicalExpression::Property { variable, property } => {
836                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
837                        Error::Internal(format!("Variable '{}' not found in input", variable))
838                    })?;
839                    projections.push(ProjectExpr::PropertyAccess {
840                        column: col_idx,
841                        property: property.clone(),
842                    });
843                    output_types.push(LogicalType::Any);
844                }
845                LogicalExpression::Literal(value) => {
846                    projections.push(ProjectExpr::Constant(value.clone()));
847                    output_types.push(value_to_logical_type(value));
848                }
849                _ => {
850                    // For complex expressions, use full expression evaluation
851                    let filter_expr = self.convert_expression(&projection.expression)?;
852                    projections.push(ProjectExpr::Expression {
853                        expr: filter_expr,
854                        variable_columns: variable_columns.clone(),
855                    });
856                    output_types.push(LogicalType::Any);
857                }
858            }
859        }
860
861        let operator = Box::new(ProjectOperator::with_store(
862            input_op,
863            projections,
864            output_types,
865            Arc::clone(&self.store),
866        ));
867
868        Ok((operator, output_columns))
869    }
870
871    /// Plans a filter operator.
872    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
873        // Plan the input operator first
874        let (input_op, columns) = self.plan_operator(&filter.input)?;
875
876        // Build variable to column index mapping
877        let variable_columns: HashMap<String, usize> = columns
878            .iter()
879            .enumerate()
880            .map(|(i, name)| (name.clone(), i))
881            .collect();
882
883        // Convert logical expression to filter expression
884        let filter_expr = self.convert_expression(&filter.predicate)?;
885
886        // Create the predicate
887        let predicate =
888            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
889
890        // Create the filter operator
891        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
892
893        Ok((operator, columns))
894    }
895
896    /// Plans a LIMIT operator.
897    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
898        let (input_op, columns) = self.plan_operator(&limit.input)?;
899        let output_schema = self.derive_schema_from_columns(&columns);
900        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
901        Ok((operator, columns))
902    }
903
904    /// Plans a SKIP operator.
905    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
906        let (input_op, columns) = self.plan_operator(&skip.input)?;
907        let output_schema = self.derive_schema_from_columns(&columns);
908        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
909        Ok((operator, columns))
910    }
911
912    /// Plans a SORT (ORDER BY) operator.
913    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
914        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
915
916        // Build variable to column index mapping
917        let mut variable_columns: HashMap<String, usize> = input_columns
918            .iter()
919            .enumerate()
920            .map(|(i, name)| (name.clone(), i))
921            .collect();
922
923        // Collect property expressions that need to be projected before sorting
924        let mut property_projections: Vec<(String, String, String)> = Vec::new();
925        let mut next_col_idx = input_columns.len();
926
927        for key in &sort.keys {
928            if let LogicalExpression::Property { variable, property } = &key.expression {
929                let col_name = format!("{}_{}", variable, property);
930                if !variable_columns.contains_key(&col_name) {
931                    property_projections.push((
932                        variable.clone(),
933                        property.clone(),
934                        col_name.clone(),
935                    ));
936                    variable_columns.insert(col_name, next_col_idx);
937                    next_col_idx += 1;
938                }
939            }
940        }
941
942        // Track output columns
943        let mut output_columns = input_columns.clone();
944
945        // If we have property expressions, add a projection to materialize them
946        if !property_projections.is_empty() {
947            let mut projections = Vec::new();
948            let mut output_types = Vec::new();
949
950            // First, pass through all existing columns (use Node type to preserve node IDs
951            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
952            for (i, _) in input_columns.iter().enumerate() {
953                projections.push(ProjectExpr::Column(i));
954                output_types.push(LogicalType::Node);
955            }
956
957            // Then add property access projections
958            for (variable, property, col_name) in &property_projections {
959                let source_col = *variable_columns.get(variable).ok_or_else(|| {
960                    Error::Internal(format!(
961                        "Variable '{}' not found for ORDER BY property projection",
962                        variable
963                    ))
964                })?;
965                projections.push(ProjectExpr::PropertyAccess {
966                    column: source_col,
967                    property: property.clone(),
968                });
969                output_types.push(LogicalType::Any);
970                output_columns.push(col_name.clone());
971            }
972
973            input_op = Box::new(ProjectOperator::with_store(
974                input_op,
975                projections,
976                output_types,
977                Arc::clone(&self.store),
978            ));
979        }
980
981        // Convert logical sort keys to physical sort keys
982        let physical_keys: Vec<PhysicalSortKey> = sort
983            .keys
984            .iter()
985            .map(|key| {
986                let col_idx = self
987                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
988                Ok(PhysicalSortKey {
989                    column: col_idx,
990                    direction: match key.order {
991                        SortOrder::Ascending => SortDirection::Ascending,
992                        SortOrder::Descending => SortDirection::Descending,
993                    },
994                    null_order: NullOrder::NullsLast,
995                })
996            })
997            .collect::<Result<Vec<_>>>()?;
998
999        let output_schema = self.derive_schema_from_columns(&output_columns);
1000        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1001        Ok((operator, output_columns))
1002    }
1003
1004    /// Resolves a sort expression to a column index, using projected property columns.
1005    fn resolve_sort_expression_with_properties(
1006        &self,
1007        expr: &LogicalExpression,
1008        variable_columns: &HashMap<String, usize>,
1009    ) -> Result<usize> {
1010        match expr {
1011            LogicalExpression::Variable(name) => {
1012                variable_columns.get(name).copied().ok_or_else(|| {
1013                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1014                })
1015            }
1016            LogicalExpression::Property { variable, property } => {
1017                // Look up the projected property column (e.g., "p_age" for p.age)
1018                let col_name = format!("{}_{}", variable, property);
1019                variable_columns.get(&col_name).copied().ok_or_else(|| {
1020                    Error::Internal(format!(
1021                        "Property column '{}' not found for ORDER BY (from {}.{})",
1022                        col_name, variable, property
1023                    ))
1024                })
1025            }
1026            _ => Err(Error::Internal(format!(
1027                "Unsupported ORDER BY expression: {:?}",
1028                expr
1029            ))),
1030        }
1031    }
1032
1033    /// Derives a schema from column names (uses Any type to handle all value types).
1034    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1035        columns.iter().map(|_| LogicalType::Any).collect()
1036    }
1037
1038    /// Plans an AGGREGATE operator.
1039    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1040        // Check if we can use factorized aggregation for speedup
1041        // Conditions:
1042        // 1. Factorized execution is enabled
1043        // 2. Input is an expand chain (multi-hop)
1044        // 3. No GROUP BY
1045        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1046        if self.factorized_execution
1047            && agg.group_by.is_empty()
1048            && Self::count_expand_chain(&agg.input).0 >= 2
1049            && self.is_simple_aggregate(agg)
1050        {
1051            if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1052                return Ok((op, cols));
1053            }
1054            // Fall through to regular aggregate if factorized planning fails
1055        }
1056
1057        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1058
1059        // Build variable to column index mapping
1060        let mut variable_columns: HashMap<String, usize> = input_columns
1061            .iter()
1062            .enumerate()
1063            .map(|(i, name)| (name.clone(), i))
1064            .collect();
1065
1066        // Collect all property expressions that need to be projected before aggregation
1067        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1068        let mut next_col_idx = input_columns.len();
1069
1070        // Check group-by expressions for properties
1071        for expr in &agg.group_by {
1072            if let LogicalExpression::Property { variable, property } = expr {
1073                let col_name = format!("{}_{}", variable, property);
1074                if !variable_columns.contains_key(&col_name) {
1075                    property_projections.push((
1076                        variable.clone(),
1077                        property.clone(),
1078                        col_name.clone(),
1079                    ));
1080                    variable_columns.insert(col_name, next_col_idx);
1081                    next_col_idx += 1;
1082                }
1083            }
1084        }
1085
1086        // Check aggregate expressions for properties
1087        for agg_expr in &agg.aggregates {
1088            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1089                let col_name = format!("{}_{}", variable, property);
1090                if !variable_columns.contains_key(&col_name) {
1091                    property_projections.push((
1092                        variable.clone(),
1093                        property.clone(),
1094                        col_name.clone(),
1095                    ));
1096                    variable_columns.insert(col_name, next_col_idx);
1097                    next_col_idx += 1;
1098                }
1099            }
1100        }
1101
1102        // If we have property expressions, add a projection to materialize them
1103        if !property_projections.is_empty() {
1104            let mut projections = Vec::new();
1105            let mut output_types = Vec::new();
1106
1107            // First, pass through all existing columns (use Node type to preserve node IDs
1108            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1109            for (i, _) in input_columns.iter().enumerate() {
1110                projections.push(ProjectExpr::Column(i));
1111                output_types.push(LogicalType::Node);
1112            }
1113
1114            // Then add property access projections
1115            for (variable, property, _col_name) in &property_projections {
1116                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1117                    Error::Internal(format!(
1118                        "Variable '{}' not found for property projection",
1119                        variable
1120                    ))
1121                })?;
1122                projections.push(ProjectExpr::PropertyAccess {
1123                    column: source_col,
1124                    property: property.clone(),
1125                });
1126                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1127            }
1128
1129            input_op = Box::new(ProjectOperator::with_store(
1130                input_op,
1131                projections,
1132                output_types,
1133                Arc::clone(&self.store),
1134            ));
1135        }
1136
1137        // Convert group-by expressions to column indices
1138        let group_columns: Vec<usize> = agg
1139            .group_by
1140            .iter()
1141            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1142            .collect::<Result<Vec<_>>>()?;
1143
1144        // Convert aggregate expressions to physical form
1145        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1146            .aggregates
1147            .iter()
1148            .map(|agg_expr| {
1149                let column = agg_expr
1150                    .expression
1151                    .as_ref()
1152                    .map(|e| {
1153                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1154                    })
1155                    .transpose()?;
1156
1157                Ok(PhysicalAggregateExpr {
1158                    function: convert_aggregate_function(agg_expr.function),
1159                    column,
1160                    distinct: agg_expr.distinct,
1161                    alias: agg_expr.alias.clone(),
1162                    percentile: agg_expr.percentile,
1163                })
1164            })
1165            .collect::<Result<Vec<_>>>()?;
1166
1167        // Build output schema and column names
1168        let mut output_schema = Vec::new();
1169        let mut output_columns = Vec::new();
1170
1171        // Add group-by columns
1172        for expr in &agg.group_by {
1173            output_schema.push(LogicalType::Any); // Group-by values can be any type
1174            output_columns.push(expression_to_string(expr));
1175        }
1176
1177        // Add aggregate result columns
1178        for agg_expr in &agg.aggregates {
1179            let result_type = match agg_expr.function {
1180                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1181                    LogicalType::Int64
1182                }
1183                LogicalAggregateFunction::Sum => LogicalType::Int64,
1184                LogicalAggregateFunction::Avg => LogicalType::Float64,
1185                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1186                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1187                    // since the aggregate can return any Value type, but the most common case
1188                    // is numeric values from property expressions
1189                    LogicalType::Int64
1190                }
1191                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1192                // Statistical functions return Float64
1193                LogicalAggregateFunction::StdDev
1194                | LogicalAggregateFunction::StdDevPop
1195                | LogicalAggregateFunction::PercentileDisc
1196                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1197            };
1198            output_schema.push(result_type);
1199            output_columns.push(
1200                agg_expr
1201                    .alias
1202                    .clone()
1203                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1204            );
1205        }
1206
1207        // Choose operator based on whether there are group-by columns
1208        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1209            Box::new(SimpleAggregateOperator::new(
1210                input_op,
1211                physical_aggregates,
1212                output_schema,
1213            ))
1214        } else {
1215            Box::new(HashAggregateOperator::new(
1216                input_op,
1217                group_columns,
1218                physical_aggregates,
1219                output_schema,
1220            ))
1221        };
1222
1223        // Apply HAVING clause filter if present
1224        if let Some(having_expr) = &agg.having {
1225            // Build variable to column mapping for the aggregate output
1226            let having_var_columns: HashMap<String, usize> = output_columns
1227                .iter()
1228                .enumerate()
1229                .map(|(i, name)| (name.clone(), i))
1230                .collect();
1231
1232            let filter_expr = self.convert_expression(having_expr)?;
1233            let predicate =
1234                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1235            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1236        }
1237
1238        Ok((operator, output_columns))
1239    }
1240
1241    /// Checks if an aggregate is simple enough for factorized execution.
1242    ///
1243    /// Simple aggregates:
1244    /// - COUNT(*) or COUNT(variable)
1245    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1246    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1247        agg.aggregates.iter().all(|agg_expr| {
1248            match agg_expr.function {
1249                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1250                    // COUNT(*) is always OK, COUNT(var) is OK
1251                    agg_expr.expression.is_none()
1252                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1253                }
1254                LogicalAggregateFunction::Sum
1255                | LogicalAggregateFunction::Avg
1256                | LogicalAggregateFunction::Min
1257                | LogicalAggregateFunction::Max => {
1258                    // For now, only support when expression is a variable
1259                    // (property access would require flattening first)
1260                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1261                }
1262                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1263                _ => false,
1264            }
1265        })
1266    }
1267
1268    /// Plans a factorized aggregate that operates directly on factorized data.
1269    ///
1270    /// This avoids the O(n²) cost of flattening before aggregation.
1271    fn plan_factorized_aggregate(
1272        &self,
1273        agg: &AggregateOp,
1274    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1275        // Build the expand chain - this returns a LazyFactorizedChainOperator
1276        let expands = Self::collect_expand_chain(&agg.input);
1277        if expands.is_empty() {
1278            return Err(Error::Internal(
1279                "Expected expand chain for factorized aggregate".to_string(),
1280            ));
1281        }
1282
1283        // Get the base operator (before first expand)
1284        let first_expand = expands[0];
1285        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1286
1287        let mut columns = base_columns.clone();
1288        let mut steps = Vec::new();
1289        let mut is_first = true;
1290
1291        for expand in &expands {
1292            // Find source column for this expand
1293            let source_column = if is_first {
1294                base_columns
1295                    .iter()
1296                    .position(|c| c == &expand.from_variable)
1297                    .ok_or_else(|| {
1298                        Error::Internal(format!(
1299                            "Source variable '{}' not found in base columns",
1300                            expand.from_variable
1301                        ))
1302                    })?
1303            } else {
1304                1 // Target from previous level
1305            };
1306
1307            let direction = match expand.direction {
1308                ExpandDirection::Outgoing => Direction::Outgoing,
1309                ExpandDirection::Incoming => Direction::Incoming,
1310                ExpandDirection::Both => Direction::Both,
1311            };
1312
1313            steps.push(ExpandStep {
1314                source_column,
1315                direction,
1316                edge_type: expand.edge_type.clone(),
1317            });
1318
1319            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1320                let count = self.anon_edge_counter.get();
1321                self.anon_edge_counter.set(count + 1);
1322                format!("_anon_edge_{}", count)
1323            });
1324            columns.push(edge_col_name);
1325            columns.push(expand.to_variable.clone());
1326
1327            is_first = false;
1328        }
1329
1330        // Create the lazy factorized chain operator
1331        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1332
1333        if let Some(tx_id) = self.tx_id {
1334            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1335        } else {
1336            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1337        }
1338
1339        // Convert logical aggregates to factorized aggregates
1340        let factorized_aggs: Vec<FactorizedAggregate> = agg
1341            .aggregates
1342            .iter()
1343            .map(|agg_expr| {
1344                match agg_expr.function {
1345                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1346                        // COUNT(*) uses simple count, COUNT(col) uses column count
1347                        if agg_expr.expression.is_none() {
1348                            FactorizedAggregate::count()
1349                        } else {
1350                            // For COUNT(variable), we use the deepest level's target column
1351                            // which is the last column added to the schema
1352                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1353                        }
1354                    }
1355                    LogicalAggregateFunction::Sum => {
1356                        // SUM on deepest level target
1357                        FactorizedAggregate::sum(1)
1358                    }
1359                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1360                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1361                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1362                    _ => {
1363                        // Shouldn't reach here due to is_simple_aggregate check
1364                        FactorizedAggregate::count()
1365                    }
1366                }
1367            })
1368            .collect();
1369
1370        // Build output column names
1371        let output_columns: Vec<String> = agg
1372            .aggregates
1373            .iter()
1374            .map(|agg_expr| {
1375                agg_expr
1376                    .alias
1377                    .clone()
1378                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1379            })
1380            .collect();
1381
1382        // Create the factorized aggregate operator
1383        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1384
1385        Ok((Box::new(factorized_agg_op), output_columns))
1386    }
1387
1388    /// Resolves a logical expression to a column index.
1389    #[allow(dead_code)]
1390    fn resolve_expression_to_column(
1391        &self,
1392        expr: &LogicalExpression,
1393        variable_columns: &HashMap<String, usize>,
1394    ) -> Result<usize> {
1395        match expr {
1396            LogicalExpression::Variable(name) => variable_columns
1397                .get(name)
1398                .copied()
1399                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1400            LogicalExpression::Property { variable, .. } => variable_columns
1401                .get(variable)
1402                .copied()
1403                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1404            _ => Err(Error::Internal(format!(
1405                "Cannot resolve expression to column: {:?}",
1406                expr
1407            ))),
1408        }
1409    }
1410
1411    /// Resolves a logical expression to a column index, using projected property columns.
1412    ///
1413    /// This is used for aggregations where properties have been projected into their own columns.
1414    fn resolve_expression_to_column_with_properties(
1415        &self,
1416        expr: &LogicalExpression,
1417        variable_columns: &HashMap<String, usize>,
1418    ) -> Result<usize> {
1419        match expr {
1420            LogicalExpression::Variable(name) => variable_columns
1421                .get(name)
1422                .copied()
1423                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1424            LogicalExpression::Property { variable, property } => {
1425                // Look up the projected property column (e.g., "p_price" for p.price)
1426                let col_name = format!("{}_{}", variable, property);
1427                variable_columns.get(&col_name).copied().ok_or_else(|| {
1428                    Error::Internal(format!(
1429                        "Property column '{}' not found (from {}.{})",
1430                        col_name, variable, property
1431                    ))
1432                })
1433            }
1434            _ => Err(Error::Internal(format!(
1435                "Cannot resolve expression to column: {:?}",
1436                expr
1437            ))),
1438        }
1439    }
1440
1441    /// Converts a logical expression to a filter expression.
1442    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1443        match expr {
1444            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1445            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1446            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1447                variable: variable.clone(),
1448                property: property.clone(),
1449            }),
1450            LogicalExpression::Binary { left, op, right } => {
1451                let left_expr = self.convert_expression(left)?;
1452                let right_expr = self.convert_expression(right)?;
1453                let filter_op = convert_binary_op(*op)?;
1454                Ok(FilterExpression::Binary {
1455                    left: Box::new(left_expr),
1456                    op: filter_op,
1457                    right: Box::new(right_expr),
1458                })
1459            }
1460            LogicalExpression::Unary { op, operand } => {
1461                let operand_expr = self.convert_expression(operand)?;
1462                let filter_op = convert_unary_op(*op)?;
1463                Ok(FilterExpression::Unary {
1464                    op: filter_op,
1465                    operand: Box::new(operand_expr),
1466                })
1467            }
1468            LogicalExpression::FunctionCall { name, args, .. } => {
1469                let filter_args: Vec<FilterExpression> = args
1470                    .iter()
1471                    .map(|a| self.convert_expression(a))
1472                    .collect::<Result<Vec<_>>>()?;
1473                Ok(FilterExpression::FunctionCall {
1474                    name: name.clone(),
1475                    args: filter_args,
1476                })
1477            }
1478            LogicalExpression::Case {
1479                operand,
1480                when_clauses,
1481                else_clause,
1482            } => {
1483                let filter_operand = operand
1484                    .as_ref()
1485                    .map(|e| self.convert_expression(e))
1486                    .transpose()?
1487                    .map(Box::new);
1488                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1489                    .iter()
1490                    .map(|(cond, result)| {
1491                        Ok((
1492                            self.convert_expression(cond)?,
1493                            self.convert_expression(result)?,
1494                        ))
1495                    })
1496                    .collect::<Result<Vec<_>>>()?;
1497                let filter_else = else_clause
1498                    .as_ref()
1499                    .map(|e| self.convert_expression(e))
1500                    .transpose()?
1501                    .map(Box::new);
1502                Ok(FilterExpression::Case {
1503                    operand: filter_operand,
1504                    when_clauses: filter_when_clauses,
1505                    else_clause: filter_else,
1506                })
1507            }
1508            LogicalExpression::List(items) => {
1509                let filter_items: Vec<FilterExpression> = items
1510                    .iter()
1511                    .map(|item| self.convert_expression(item))
1512                    .collect::<Result<Vec<_>>>()?;
1513                Ok(FilterExpression::List(filter_items))
1514            }
1515            LogicalExpression::Map(pairs) => {
1516                let filter_pairs: Vec<(String, FilterExpression)> = pairs
1517                    .iter()
1518                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1519                    .collect::<Result<Vec<_>>>()?;
1520                Ok(FilterExpression::Map(filter_pairs))
1521            }
1522            LogicalExpression::IndexAccess { base, index } => {
1523                let base_expr = self.convert_expression(base)?;
1524                let index_expr = self.convert_expression(index)?;
1525                Ok(FilterExpression::IndexAccess {
1526                    base: Box::new(base_expr),
1527                    index: Box::new(index_expr),
1528                })
1529            }
1530            LogicalExpression::SliceAccess { base, start, end } => {
1531                let base_expr = self.convert_expression(base)?;
1532                let start_expr = start
1533                    .as_ref()
1534                    .map(|s| self.convert_expression(s))
1535                    .transpose()?
1536                    .map(Box::new);
1537                let end_expr = end
1538                    .as_ref()
1539                    .map(|e| self.convert_expression(e))
1540                    .transpose()?
1541                    .map(Box::new);
1542                Ok(FilterExpression::SliceAccess {
1543                    base: Box::new(base_expr),
1544                    start: start_expr,
1545                    end: end_expr,
1546                })
1547            }
1548            LogicalExpression::Parameter(_) => Err(Error::Internal(
1549                "Parameters not yet supported in filters".to_string(),
1550            )),
1551            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1552            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1553            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1554            LogicalExpression::ListComprehension {
1555                variable,
1556                list_expr,
1557                filter_expr,
1558                map_expr,
1559            } => {
1560                let list = self.convert_expression(list_expr)?;
1561                let filter = filter_expr
1562                    .as_ref()
1563                    .map(|f| self.convert_expression(f))
1564                    .transpose()?
1565                    .map(Box::new);
1566                let map = self.convert_expression(map_expr)?;
1567                Ok(FilterExpression::ListComprehension {
1568                    variable: variable.clone(),
1569                    list_expr: Box::new(list),
1570                    filter_expr: filter,
1571                    map_expr: Box::new(map),
1572                })
1573            }
1574            LogicalExpression::ExistsSubquery(subplan) => {
1575                // Extract the pattern from the subplan
1576                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
1577                let (start_var, direction, edge_type, end_labels) =
1578                    self.extract_exists_pattern(subplan)?;
1579
1580                Ok(FilterExpression::ExistsSubquery {
1581                    start_var,
1582                    direction,
1583                    edge_type,
1584                    end_labels,
1585                    min_hops: None,
1586                    max_hops: None,
1587                })
1588            }
1589            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
1590                "COUNT subqueries not yet supported".to_string(),
1591            )),
1592        }
1593    }
1594
1595    /// Extracts the pattern from an EXISTS subplan.
1596    /// Returns (start_variable, direction, edge_type, end_labels).
1597    fn extract_exists_pattern(
1598        &self,
1599        subplan: &LogicalOperator,
1600    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
1601        match subplan {
1602            LogicalOperator::Expand(expand) => {
1603                // Get end node labels from the to_variable if there's a node scan input
1604                let end_labels = self.extract_end_labels_from_expand(expand);
1605                let direction = match expand.direction {
1606                    ExpandDirection::Outgoing => Direction::Outgoing,
1607                    ExpandDirection::Incoming => Direction::Incoming,
1608                    ExpandDirection::Both => Direction::Both,
1609                };
1610                Ok((
1611                    expand.from_variable.clone(),
1612                    direction,
1613                    expand.edge_type.clone(),
1614                    end_labels,
1615                ))
1616            }
1617            LogicalOperator::NodeScan(scan) => {
1618                if let Some(input) = &scan.input {
1619                    self.extract_exists_pattern(input)
1620                } else {
1621                    Err(Error::Internal(
1622                        "EXISTS subquery must contain an edge pattern".to_string(),
1623                    ))
1624                }
1625            }
1626            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
1627            _ => Err(Error::Internal(
1628                "Unsupported EXISTS subquery pattern".to_string(),
1629            )),
1630        }
1631    }
1632
1633    /// Extracts end node labels from an Expand operator if present.
1634    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
1635        // Check if the expand has a NodeScan input with a label filter
1636        match expand.input.as_ref() {
1637            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
1638            _ => None,
1639        }
1640    }
1641
1642    /// Plans a JOIN operator.
1643    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1644        let (left_op, left_columns) = self.plan_operator(&join.left)?;
1645        let (right_op, right_columns) = self.plan_operator(&join.right)?;
1646
1647        // Build combined output columns
1648        let mut columns = left_columns.clone();
1649        columns.extend(right_columns.clone());
1650
1651        // Convert join type
1652        let physical_join_type = match join.join_type {
1653            JoinType::Inner => PhysicalJoinType::Inner,
1654            JoinType::Left => PhysicalJoinType::Left,
1655            JoinType::Right => PhysicalJoinType::Right,
1656            JoinType::Full => PhysicalJoinType::Full,
1657            JoinType::Cross => PhysicalJoinType::Cross,
1658            JoinType::Semi => PhysicalJoinType::Semi,
1659            JoinType::Anti => PhysicalJoinType::Anti,
1660        };
1661
1662        // Build key columns from join conditions
1663        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
1664            // Cross join - no keys
1665            (vec![], vec![])
1666        } else {
1667            join.conditions
1668                .iter()
1669                .filter_map(|cond| {
1670                    // Try to extract column indices from expressions
1671                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
1672                    let right_idx = self
1673                        .expression_to_column(&cond.right, &right_columns)
1674                        .ok()?;
1675                    Some((left_idx, right_idx))
1676                })
1677                .unzip()
1678        };
1679
1680        let output_schema = self.derive_schema_from_columns(&columns);
1681
1682        // Check if we should use leapfrog join for cyclic patterns
1683        // Currently we use hash join by default; leapfrog is available but
1684        // requires explicit multi-way join detection which will be added
1685        // when we have proper cyclic pattern detection in the optimizer.
1686        // For now, LeapfrogJoinOperator is available for direct use.
1687        let _ = LeapfrogJoinOperator::new; // Suppress unused warning
1688
1689        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1690            left_op,
1691            right_op,
1692            probe_keys,
1693            build_keys,
1694            physical_join_type,
1695            output_schema,
1696        ));
1697
1698        Ok((operator, columns))
1699    }
1700
1701    /// Checks if a join pattern is cyclic (e.g., triangle, clique).
1702    ///
1703    /// A cyclic pattern occurs when join conditions reference variables
1704    /// that create a cycle in the join graph. For example, a triangle
1705    /// pattern (a)->(b)->(c)->(a) creates a cycle.
1706    ///
1707    /// Returns true if the join graph contains at least one cycle with 3+ nodes,
1708    /// indicating potential for worst-case optimal join (WCOJ) optimization.
1709    #[allow(dead_code)]
1710    fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
1711        // Build adjacency list for join variables
1712        let mut edges: HashMap<String, Vec<String>> = HashMap::new();
1713        let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
1714
1715        // Collect edges from join conditions
1716        Self::collect_join_edges(
1717            &LogicalOperator::Join(join.clone()),
1718            &mut edges,
1719            &mut all_vars,
1720        );
1721
1722        // Need at least 3 variables to form a cycle
1723        if all_vars.len() < 3 {
1724            return false;
1725        }
1726
1727        // Detect cycle using DFS with coloring
1728        Self::has_cycle(&edges, &all_vars)
1729    }
1730
1731    /// Collects edges from join conditions into an adjacency list.
1732    fn collect_join_edges(
1733        op: &LogicalOperator,
1734        edges: &mut HashMap<String, Vec<String>>,
1735        vars: &mut std::collections::HashSet<String>,
1736    ) {
1737        match op {
1738            LogicalOperator::Join(join) => {
1739                // Process join conditions
1740                for cond in &join.conditions {
1741                    if let (Some(left_var), Some(right_var)) = (
1742                        Self::extract_join_variable(&cond.left),
1743                        Self::extract_join_variable(&cond.right),
1744                    ) {
1745                        if left_var != right_var {
1746                            vars.insert(left_var.clone());
1747                            vars.insert(right_var.clone());
1748
1749                            // Add bidirectional edge
1750                            edges
1751                                .entry(left_var.clone())
1752                                .or_default()
1753                                .push(right_var.clone());
1754                            edges.entry(right_var).or_default().push(left_var);
1755                        }
1756                    }
1757                }
1758
1759                // Recurse into children
1760                Self::collect_join_edges(&join.left, edges, vars);
1761                Self::collect_join_edges(&join.right, edges, vars);
1762            }
1763            LogicalOperator::Expand(expand) => {
1764                // Expand creates implicit join between from_variable and to_variable
1765                vars.insert(expand.from_variable.clone());
1766                vars.insert(expand.to_variable.clone());
1767
1768                edges
1769                    .entry(expand.from_variable.clone())
1770                    .or_default()
1771                    .push(expand.to_variable.clone());
1772                edges
1773                    .entry(expand.to_variable.clone())
1774                    .or_default()
1775                    .push(expand.from_variable.clone());
1776
1777                Self::collect_join_edges(&expand.input, edges, vars);
1778            }
1779            LogicalOperator::Filter(filter) => {
1780                Self::collect_join_edges(&filter.input, edges, vars);
1781            }
1782            LogicalOperator::NodeScan(scan) => {
1783                vars.insert(scan.variable.clone());
1784            }
1785            _ => {}
1786        }
1787    }
1788
1789    /// Extracts the variable name from a join expression.
1790    fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
1791        match expr {
1792            LogicalExpression::Variable(v) => Some(v.clone()),
1793            LogicalExpression::Property { variable, .. } => Some(variable.clone()),
1794            LogicalExpression::Id(v) => Some(v.clone()),
1795            _ => None,
1796        }
1797    }
1798
1799    /// Detects if the graph has a cycle using DFS coloring.
1800    ///
1801    /// Colors: 0 = white (unvisited), 1 = gray (in progress), 2 = black (done)
1802    fn has_cycle(
1803        edges: &HashMap<String, Vec<String>>,
1804        vars: &std::collections::HashSet<String>,
1805    ) -> bool {
1806        let mut color: HashMap<&String, u8> = HashMap::new();
1807
1808        for var in vars {
1809            color.insert(var, 0);
1810        }
1811
1812        for start in vars {
1813            if color[start] == 0 {
1814                if Self::dfs_cycle(start, None, edges, &mut color) {
1815                    return true;
1816                }
1817            }
1818        }
1819
1820        false
1821    }
1822
1823    /// DFS helper for cycle detection.
1824    fn dfs_cycle(
1825        node: &String,
1826        parent: Option<&String>,
1827        edges: &HashMap<String, Vec<String>>,
1828        color: &mut HashMap<&String, u8>,
1829    ) -> bool {
1830        *color.get_mut(node).unwrap() = 1; // Gray
1831
1832        if let Some(neighbors) = edges.get(node) {
1833            for neighbor in neighbors {
1834                // Skip the edge back to parent (undirected graph)
1835                if parent == Some(neighbor) {
1836                    continue;
1837                }
1838
1839                if let Some(&c) = color.get(neighbor) {
1840                    if c == 1 {
1841                        // Found a back edge - cycle detected
1842                        return true;
1843                    }
1844                    if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
1845                        return true;
1846                    }
1847                }
1848            }
1849        }
1850
1851        *color.get_mut(node).unwrap() = 2; // Black
1852        false
1853    }
1854
1855    /// Counts the number of base relations in a logical operator tree.
1856    #[allow(dead_code)]
1857    fn count_relations(op: &LogicalOperator) -> usize {
1858        match op {
1859            LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
1860            LogicalOperator::Expand(e) => Self::count_relations(&e.input),
1861            LogicalOperator::Filter(f) => Self::count_relations(&f.input),
1862            LogicalOperator::Join(j) => {
1863                Self::count_relations(&j.left) + Self::count_relations(&j.right)
1864            }
1865            _ => 0,
1866        }
1867    }
1868
1869    /// Extracts a column index from an expression.
1870    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
1871        match expr {
1872            LogicalExpression::Variable(name) => columns
1873                .iter()
1874                .position(|c| c == name)
1875                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1876            _ => Err(Error::Internal(
1877                "Only variables supported in join conditions".to_string(),
1878            )),
1879        }
1880    }
1881
1882    /// Plans a UNION operator.
1883    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1884        if union.inputs.is_empty() {
1885            return Err(Error::Internal(
1886                "Union requires at least one input".to_string(),
1887            ));
1888        }
1889
1890        let mut inputs = Vec::with_capacity(union.inputs.len());
1891        let mut columns = Vec::new();
1892
1893        for (i, input) in union.inputs.iter().enumerate() {
1894            let (op, cols) = self.plan_operator(input)?;
1895            if i == 0 {
1896                columns = cols;
1897            }
1898            inputs.push(op);
1899        }
1900
1901        let output_schema = self.derive_schema_from_columns(&columns);
1902        let operator = Box::new(UnionOperator::new(inputs, output_schema));
1903
1904        Ok((operator, columns))
1905    }
1906
1907    /// Plans a DISTINCT operator.
1908    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1909        let (input_op, columns) = self.plan_operator(&distinct.input)?;
1910        let output_schema = self.derive_schema_from_columns(&columns);
1911        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
1912        Ok((operator, columns))
1913    }
1914
1915    /// Plans a CREATE NODE operator.
1916    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1917        // Plan input if present
1918        let (input_op, mut columns) = if let Some(ref input) = create.input {
1919            let (op, cols) = self.plan_operator(input)?;
1920            (Some(op), cols)
1921        } else {
1922            (None, vec![])
1923        };
1924
1925        // Output column for the created node
1926        let output_column = columns.len();
1927        columns.push(create.variable.clone());
1928
1929        // Convert properties
1930        let properties: Vec<(String, PropertySource)> = create
1931            .properties
1932            .iter()
1933            .map(|(name, expr)| {
1934                let source = match expr {
1935                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1936                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1937                };
1938                (name.clone(), source)
1939            })
1940            .collect();
1941
1942        let output_schema = self.derive_schema_from_columns(&columns);
1943
1944        let operator = Box::new(
1945            CreateNodeOperator::new(
1946                Arc::clone(&self.store),
1947                input_op,
1948                create.labels.clone(),
1949                properties,
1950                output_schema,
1951                output_column,
1952            )
1953            .with_tx_context(self.viewing_epoch, self.tx_id),
1954        );
1955
1956        Ok((operator, columns))
1957    }
1958
1959    /// Plans a CREATE EDGE operator.
1960    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1961        let (input_op, mut columns) = self.plan_operator(&create.input)?;
1962
1963        // Find source and target columns
1964        let from_column = columns
1965            .iter()
1966            .position(|c| c == &create.from_variable)
1967            .ok_or_else(|| {
1968                Error::Internal(format!(
1969                    "Source variable '{}' not found",
1970                    create.from_variable
1971                ))
1972            })?;
1973
1974        let to_column = columns
1975            .iter()
1976            .position(|c| c == &create.to_variable)
1977            .ok_or_else(|| {
1978                Error::Internal(format!(
1979                    "Target variable '{}' not found",
1980                    create.to_variable
1981                ))
1982            })?;
1983
1984        // Output column for the created edge (if named)
1985        let output_column = create.variable.as_ref().map(|v| {
1986            let idx = columns.len();
1987            columns.push(v.clone());
1988            idx
1989        });
1990
1991        // Convert properties
1992        let properties: Vec<(String, PropertySource)> = create
1993            .properties
1994            .iter()
1995            .map(|(name, expr)| {
1996                let source = match expr {
1997                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
1998                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
1999                };
2000                (name.clone(), source)
2001            })
2002            .collect();
2003
2004        let output_schema = self.derive_schema_from_columns(&columns);
2005
2006        let operator = Box::new(
2007            CreateEdgeOperator::new(
2008                Arc::clone(&self.store),
2009                input_op,
2010                from_column,
2011                to_column,
2012                create.edge_type.clone(),
2013                properties,
2014                output_schema,
2015                output_column,
2016            )
2017            .with_tx_context(self.viewing_epoch, self.tx_id),
2018        );
2019
2020        Ok((operator, columns))
2021    }
2022
2023    /// Plans a DELETE NODE operator.
2024    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2025        let (input_op, columns) = self.plan_operator(&delete.input)?;
2026
2027        let node_column = columns
2028            .iter()
2029            .position(|c| c == &delete.variable)
2030            .ok_or_else(|| {
2031                Error::Internal(format!(
2032                    "Variable '{}' not found for delete",
2033                    delete.variable
2034                ))
2035            })?;
2036
2037        // Output schema for delete count
2038        let output_schema = vec![LogicalType::Int64];
2039        let output_columns = vec!["deleted_count".to_string()];
2040
2041        let operator = Box::new(
2042            DeleteNodeOperator::new(
2043                Arc::clone(&self.store),
2044                input_op,
2045                node_column,
2046                output_schema,
2047                delete.detach, // DETACH DELETE deletes connected edges first
2048            )
2049            .with_tx_context(self.viewing_epoch, self.tx_id),
2050        );
2051
2052        Ok((operator, output_columns))
2053    }
2054
2055    /// Plans a DELETE EDGE operator.
2056    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2057        let (input_op, columns) = self.plan_operator(&delete.input)?;
2058
2059        let edge_column = columns
2060            .iter()
2061            .position(|c| c == &delete.variable)
2062            .ok_or_else(|| {
2063                Error::Internal(format!(
2064                    "Variable '{}' not found for delete",
2065                    delete.variable
2066                ))
2067            })?;
2068
2069        // Output schema for delete count
2070        let output_schema = vec![LogicalType::Int64];
2071        let output_columns = vec!["deleted_count".to_string()];
2072
2073        let operator = Box::new(
2074            DeleteEdgeOperator::new(
2075                Arc::clone(&self.store),
2076                input_op,
2077                edge_column,
2078                output_schema,
2079            )
2080            .with_tx_context(self.viewing_epoch, self.tx_id),
2081        );
2082
2083        Ok((operator, output_columns))
2084    }
2085
2086    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
2087    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2088        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2089        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2090
2091        // Build combined output columns (left + right)
2092        let mut columns = left_columns.clone();
2093        columns.extend(right_columns.clone());
2094
2095        // Find common variables between left and right for join keys
2096        let mut probe_keys = Vec::new();
2097        let mut build_keys = Vec::new();
2098
2099        for (right_idx, right_col) in right_columns.iter().enumerate() {
2100            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2101                probe_keys.push(left_idx);
2102                build_keys.push(right_idx);
2103            }
2104        }
2105
2106        let output_schema = self.derive_schema_from_columns(&columns);
2107
2108        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2109            left_op,
2110            right_op,
2111            probe_keys,
2112            build_keys,
2113            PhysicalJoinType::Left,
2114            output_schema,
2115        ));
2116
2117        Ok((operator, columns))
2118    }
2119
2120    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
2121    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2122        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2123        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2124
2125        // Anti-join only keeps left columns (filters out matching rows)
2126        let columns = left_columns.clone();
2127
2128        // Find common variables between left and right for join keys
2129        let mut probe_keys = Vec::new();
2130        let mut build_keys = Vec::new();
2131
2132        for (right_idx, right_col) in right_columns.iter().enumerate() {
2133            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2134                probe_keys.push(left_idx);
2135                build_keys.push(right_idx);
2136            }
2137        }
2138
2139        let output_schema = self.derive_schema_from_columns(&columns);
2140
2141        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2142            left_op,
2143            right_op,
2144            probe_keys,
2145            build_keys,
2146            PhysicalJoinType::Anti,
2147            output_schema,
2148        ));
2149
2150        Ok((operator, columns))
2151    }
2152
2153    /// Plans an unwind operator.
2154    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2155        // Plan the input operator first
2156        // Handle Empty specially - use a single-row operator
2157        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2158            if matches!(&*unwind.input, LogicalOperator::Empty) {
2159                // For UNWIND without prior MATCH, create a single-row input
2160                // We need an operator that produces one row with the list to unwind
2161                // For now, use EmptyScan which produces no rows - we'll handle the literal
2162                // list in the unwind operator itself
2163                let literal_list = self.convert_expression(&unwind.expression)?;
2164
2165                // Create a project operator that produces a single row with the list
2166                let single_row_op: Box<dyn Operator> = Box::new(
2167                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2168                );
2169                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2170                    single_row_op,
2171                    vec![ProjectExpr::Expression {
2172                        expr: literal_list,
2173                        variable_columns: HashMap::new(),
2174                    }],
2175                    vec![LogicalType::Any],
2176                    Arc::clone(&self.store),
2177                ));
2178
2179                (project_op, vec!["__list__".to_string()])
2180            } else {
2181                self.plan_operator(&unwind.input)?
2182            };
2183
2184        // The UNWIND expression should be a list - we need to find/evaluate it
2185        // For now, we handle the case where the expression references an existing column
2186        // or is a literal list
2187
2188        // Find if the expression references an existing column (like a list property)
2189        let list_col_idx = match &unwind.expression {
2190            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2191            LogicalExpression::Property { variable, .. } => {
2192                // Property access needs to be evaluated - for now we'll need the filter predicate
2193                // to evaluate this. For simple cases, we treat it as a list column.
2194                input_columns.iter().position(|c| c == variable)
2195            }
2196            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2197                // Literal list expression - we'll add it as a virtual column
2198                None
2199            }
2200            _ => None,
2201        };
2202
2203        // Build output columns: all input columns plus the new variable
2204        let mut columns = input_columns.clone();
2205        columns.push(unwind.variable.clone());
2206
2207        // Build output schema
2208        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2209        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2210
2211        // Use the list column index if found, otherwise default to 0
2212        // (in which case the first column should contain the list)
2213        let col_idx = list_col_idx.unwrap_or(0);
2214
2215        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2216            input_op,
2217            col_idx,
2218            unwind.variable.clone(),
2219            output_schema,
2220        ));
2221
2222        Ok((operator, columns))
2223    }
2224
2225    /// Plans a MERGE operator.
2226    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2227        // Plan the input operator if present (skip if Empty)
2228        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2229            Vec::new()
2230        } else {
2231            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2232            cols
2233        };
2234
2235        // Convert match properties from LogicalExpression to Value
2236        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2237            .match_properties
2238            .iter()
2239            .filter_map(|(name, expr)| {
2240                if let LogicalExpression::Literal(v) = expr {
2241                    Some((name.clone(), v.clone()))
2242                } else {
2243                    None // Skip non-literal expressions for now
2244                }
2245            })
2246            .collect();
2247
2248        // Convert ON CREATE properties
2249        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2250            .on_create
2251            .iter()
2252            .filter_map(|(name, expr)| {
2253                if let LogicalExpression::Literal(v) = expr {
2254                    Some((name.clone(), v.clone()))
2255                } else {
2256                    None
2257                }
2258            })
2259            .collect();
2260
2261        // Convert ON MATCH properties
2262        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2263            .on_match
2264            .iter()
2265            .filter_map(|(name, expr)| {
2266                if let LogicalExpression::Literal(v) = expr {
2267                    Some((name.clone(), v.clone()))
2268                } else {
2269                    None
2270                }
2271            })
2272            .collect();
2273
2274        // Add the merged node variable to output columns
2275        columns.push(merge.variable.clone());
2276
2277        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2278            Arc::clone(&self.store),
2279            merge.variable.clone(),
2280            merge.labels.clone(),
2281            match_properties,
2282            on_create_properties,
2283            on_match_properties,
2284        ));
2285
2286        Ok((operator, columns))
2287    }
2288
2289    /// Plans a SHORTEST PATH operator.
2290    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2291        // Plan the input operator
2292        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2293
2294        // Find source and target node columns
2295        let source_column = columns
2296            .iter()
2297            .position(|c| c == &sp.source_var)
2298            .ok_or_else(|| {
2299                Error::Internal(format!(
2300                    "Source variable '{}' not found for shortestPath",
2301                    sp.source_var
2302                ))
2303            })?;
2304
2305        let target_column = columns
2306            .iter()
2307            .position(|c| c == &sp.target_var)
2308            .ok_or_else(|| {
2309                Error::Internal(format!(
2310                    "Target variable '{}' not found for shortestPath",
2311                    sp.target_var
2312                ))
2313            })?;
2314
2315        // Convert direction
2316        let direction = match sp.direction {
2317            ExpandDirection::Outgoing => Direction::Outgoing,
2318            ExpandDirection::Incoming => Direction::Incoming,
2319            ExpandDirection::Both => Direction::Both,
2320        };
2321
2322        // Create the shortest path operator
2323        let operator: Box<dyn Operator> = Box::new(
2324            ShortestPathOperator::new(
2325                Arc::clone(&self.store),
2326                input_op,
2327                source_column,
2328                target_column,
2329                sp.edge_type.clone(),
2330                direction,
2331            )
2332            .with_all_paths(sp.all_paths),
2333        );
2334
2335        // Add path length column with the expected naming convention
2336        // The translator expects _path_length_{alias} format for length(p) calls
2337        columns.push(format!("_path_length_{}", sp.path_alias));
2338
2339        Ok((operator, columns))
2340    }
2341
2342    /// Plans an ADD LABEL operator.
2343    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2344        let (input_op, columns) = self.plan_operator(&add_label.input)?;
2345
2346        // Find the node column
2347        let node_column = columns
2348            .iter()
2349            .position(|c| c == &add_label.variable)
2350            .ok_or_else(|| {
2351                Error::Internal(format!(
2352                    "Variable '{}' not found for ADD LABEL",
2353                    add_label.variable
2354                ))
2355            })?;
2356
2357        // Output schema for update count
2358        let output_schema = vec![LogicalType::Int64];
2359        let output_columns = vec!["labels_added".to_string()];
2360
2361        let operator = Box::new(AddLabelOperator::new(
2362            Arc::clone(&self.store),
2363            input_op,
2364            node_column,
2365            add_label.labels.clone(),
2366            output_schema,
2367        ));
2368
2369        Ok((operator, output_columns))
2370    }
2371
2372    /// Plans a REMOVE LABEL operator.
2373    fn plan_remove_label(
2374        &self,
2375        remove_label: &RemoveLabelOp,
2376    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2377        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2378
2379        // Find the node column
2380        let node_column = columns
2381            .iter()
2382            .position(|c| c == &remove_label.variable)
2383            .ok_or_else(|| {
2384                Error::Internal(format!(
2385                    "Variable '{}' not found for REMOVE LABEL",
2386                    remove_label.variable
2387                ))
2388            })?;
2389
2390        // Output schema for update count
2391        let output_schema = vec![LogicalType::Int64];
2392        let output_columns = vec!["labels_removed".to_string()];
2393
2394        let operator = Box::new(RemoveLabelOperator::new(
2395            Arc::clone(&self.store),
2396            input_op,
2397            node_column,
2398            remove_label.labels.clone(),
2399            output_schema,
2400        ));
2401
2402        Ok((operator, output_columns))
2403    }
2404
2405    /// Plans a SET PROPERTY operator.
2406    fn plan_set_property(
2407        &self,
2408        set_prop: &SetPropertyOp,
2409    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2410        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2411
2412        // Find the entity column (node or edge variable)
2413        let entity_column = columns
2414            .iter()
2415            .position(|c| c == &set_prop.variable)
2416            .ok_or_else(|| {
2417                Error::Internal(format!(
2418                    "Variable '{}' not found for SET",
2419                    set_prop.variable
2420                ))
2421            })?;
2422
2423        // Convert properties to PropertySource
2424        let properties: Vec<(String, PropertySource)> = set_prop
2425            .properties
2426            .iter()
2427            .map(|(name, expr)| {
2428                let source = self.expression_to_property_source(expr, &columns)?;
2429                Ok((name.clone(), source))
2430            })
2431            .collect::<Result<Vec<_>>>()?;
2432
2433        // Output schema preserves input schema (passes through)
2434        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2435        let output_columns = columns.clone();
2436
2437        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
2438        let operator = Box::new(SetPropertyOperator::new_for_node(
2439            Arc::clone(&self.store),
2440            input_op,
2441            entity_column,
2442            properties,
2443            output_schema,
2444        ));
2445
2446        Ok((operator, output_columns))
2447    }
2448
2449    /// Converts a logical expression to a PropertySource.
2450    fn expression_to_property_source(
2451        &self,
2452        expr: &LogicalExpression,
2453        columns: &[String],
2454    ) -> Result<PropertySource> {
2455        match expr {
2456            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2457            LogicalExpression::Variable(name) => {
2458                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2459                    Error::Internal(format!("Variable '{}' not found for property source", name))
2460                })?;
2461                Ok(PropertySource::Column(col_idx))
2462            }
2463            LogicalExpression::Parameter(name) => {
2464                // Parameters should be resolved before planning
2465                // For now, treat as a placeholder
2466                Ok(PropertySource::Constant(
2467                    grafeo_common::types::Value::String(format!("${}", name).into()),
2468                ))
2469            }
2470            _ => Err(Error::Internal(format!(
2471                "Unsupported expression type for property source: {:?}",
2472                expr
2473            ))),
2474        }
2475    }
2476}
2477
2478/// Converts a logical binary operator to a filter binary operator.
2479pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2480    match op {
2481        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2482        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2483        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2484        BinaryOp::Le => Ok(BinaryFilterOp::Le),
2485        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2486        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2487        BinaryOp::And => Ok(BinaryFilterOp::And),
2488        BinaryOp::Or => Ok(BinaryFilterOp::Or),
2489        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2490        BinaryOp::Add => Ok(BinaryFilterOp::Add),
2491        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2492        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2493        BinaryOp::Div => Ok(BinaryFilterOp::Div),
2494        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2495        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2496        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2497        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2498        BinaryOp::In => Ok(BinaryFilterOp::In),
2499        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2500        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2501        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2502            "Binary operator {:?} not yet supported in filters",
2503            op
2504        ))),
2505    }
2506}
2507
2508/// Converts a logical unary operator to a filter unary operator.
2509pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2510    match op {
2511        UnaryOp::Not => Ok(UnaryFilterOp::Not),
2512        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2513        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2514        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2515    }
2516}
2517
2518/// Converts a logical aggregate function to a physical aggregate function.
2519pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2520    match func {
2521        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2522        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2523        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2524        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2525        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2526        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2527        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2528        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2529        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2530        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2531        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2532    }
2533}
2534
2535/// Converts a logical expression to a filter expression.
2536///
2537/// This is a standalone function that can be used by both LPG and RDF planners.
2538pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2539    match expr {
2540        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2541        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2542        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2543            variable: variable.clone(),
2544            property: property.clone(),
2545        }),
2546        LogicalExpression::Binary { left, op, right } => {
2547            let left_expr = convert_filter_expression(left)?;
2548            let right_expr = convert_filter_expression(right)?;
2549            let filter_op = convert_binary_op(*op)?;
2550            Ok(FilterExpression::Binary {
2551                left: Box::new(left_expr),
2552                op: filter_op,
2553                right: Box::new(right_expr),
2554            })
2555        }
2556        LogicalExpression::Unary { op, operand } => {
2557            let operand_expr = convert_filter_expression(operand)?;
2558            let filter_op = convert_unary_op(*op)?;
2559            Ok(FilterExpression::Unary {
2560                op: filter_op,
2561                operand: Box::new(operand_expr),
2562            })
2563        }
2564        LogicalExpression::FunctionCall { name, args, .. } => {
2565            let filter_args: Vec<FilterExpression> = args
2566                .iter()
2567                .map(|a| convert_filter_expression(a))
2568                .collect::<Result<Vec<_>>>()?;
2569            Ok(FilterExpression::FunctionCall {
2570                name: name.clone(),
2571                args: filter_args,
2572            })
2573        }
2574        LogicalExpression::Case {
2575            operand,
2576            when_clauses,
2577            else_clause,
2578        } => {
2579            let filter_operand = operand
2580                .as_ref()
2581                .map(|e| convert_filter_expression(e))
2582                .transpose()?
2583                .map(Box::new);
2584            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2585                .iter()
2586                .map(|(cond, result)| {
2587                    Ok((
2588                        convert_filter_expression(cond)?,
2589                        convert_filter_expression(result)?,
2590                    ))
2591                })
2592                .collect::<Result<Vec<_>>>()?;
2593            let filter_else = else_clause
2594                .as_ref()
2595                .map(|e| convert_filter_expression(e))
2596                .transpose()?
2597                .map(Box::new);
2598            Ok(FilterExpression::Case {
2599                operand: filter_operand,
2600                when_clauses: filter_when_clauses,
2601                else_clause: filter_else,
2602            })
2603        }
2604        LogicalExpression::List(items) => {
2605            let filter_items: Vec<FilterExpression> = items
2606                .iter()
2607                .map(|item| convert_filter_expression(item))
2608                .collect::<Result<Vec<_>>>()?;
2609            Ok(FilterExpression::List(filter_items))
2610        }
2611        LogicalExpression::Map(pairs) => {
2612            let filter_pairs: Vec<(String, FilterExpression)> = pairs
2613                .iter()
2614                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
2615                .collect::<Result<Vec<_>>>()?;
2616            Ok(FilterExpression::Map(filter_pairs))
2617        }
2618        LogicalExpression::IndexAccess { base, index } => {
2619            let base_expr = convert_filter_expression(base)?;
2620            let index_expr = convert_filter_expression(index)?;
2621            Ok(FilterExpression::IndexAccess {
2622                base: Box::new(base_expr),
2623                index: Box::new(index_expr),
2624            })
2625        }
2626        LogicalExpression::SliceAccess { base, start, end } => {
2627            let base_expr = convert_filter_expression(base)?;
2628            let start_expr = start
2629                .as_ref()
2630                .map(|s| convert_filter_expression(s))
2631                .transpose()?
2632                .map(Box::new);
2633            let end_expr = end
2634                .as_ref()
2635                .map(|e| convert_filter_expression(e))
2636                .transpose()?
2637                .map(Box::new);
2638            Ok(FilterExpression::SliceAccess {
2639                base: Box::new(base_expr),
2640                start: start_expr,
2641                end: end_expr,
2642            })
2643        }
2644        LogicalExpression::Parameter(_) => Err(Error::Internal(
2645            "Parameters not yet supported in filters".to_string(),
2646        )),
2647        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2648        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2649        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2650        LogicalExpression::ListComprehension {
2651            variable,
2652            list_expr,
2653            filter_expr,
2654            map_expr,
2655        } => {
2656            let list = convert_filter_expression(list_expr)?;
2657            let filter = filter_expr
2658                .as_ref()
2659                .map(|f| convert_filter_expression(f))
2660                .transpose()?
2661                .map(Box::new);
2662            let map = convert_filter_expression(map_expr)?;
2663            Ok(FilterExpression::ListComprehension {
2664                variable: variable.clone(),
2665                list_expr: Box::new(list),
2666                filter_expr: filter,
2667                map_expr: Box::new(map),
2668            })
2669        }
2670        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
2671            Error::Internal("Subqueries not yet supported in filters".to_string()),
2672        ),
2673    }
2674}
2675
2676/// Infers the logical type from a value.
2677fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
2678    use grafeo_common::types::Value;
2679    match value {
2680        Value::Null => LogicalType::String, // Default type for null
2681        Value::Bool(_) => LogicalType::Bool,
2682        Value::Int64(_) => LogicalType::Int64,
2683        Value::Float64(_) => LogicalType::Float64,
2684        Value::String(_) => LogicalType::String,
2685        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
2686        Value::Timestamp(_) => LogicalType::Timestamp,
2687        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
2688        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
2689    }
2690}
2691
2692/// Converts an expression to a string for column naming.
2693fn expression_to_string(expr: &LogicalExpression) -> String {
2694    match expr {
2695        LogicalExpression::Variable(name) => name.clone(),
2696        LogicalExpression::Property { variable, property } => {
2697            format!("{variable}.{property}")
2698        }
2699        LogicalExpression::Literal(value) => format!("{value:?}"),
2700        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
2701        _ => "expr".to_string(),
2702    }
2703}
2704
2705/// A physical plan ready for execution.
2706pub struct PhysicalPlan {
2707    /// The root physical operator.
2708    pub operator: Box<dyn Operator>,
2709    /// Column names for the result.
2710    pub columns: Vec<String>,
2711    /// Adaptive execution context with cardinality estimates.
2712    ///
2713    /// When adaptive execution is enabled, this context contains estimated
2714    /// cardinalities at various checkpoints in the plan. During execution,
2715    /// actual row counts are recorded and compared against estimates.
2716    pub adaptive_context: Option<AdaptiveContext>,
2717}
2718
2719impl PhysicalPlan {
2720    /// Returns the column names.
2721    #[must_use]
2722    pub fn columns(&self) -> &[String] {
2723        &self.columns
2724    }
2725
2726    /// Consumes the plan and returns the operator.
2727    pub fn into_operator(self) -> Box<dyn Operator> {
2728        self.operator
2729    }
2730
2731    /// Returns the adaptive context, if adaptive execution is enabled.
2732    #[must_use]
2733    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
2734        self.adaptive_context.as_ref()
2735    }
2736
2737    /// Takes ownership of the adaptive context.
2738    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
2739        self.adaptive_context.take()
2740    }
2741}
2742
2743/// Helper operator that returns a single result chunk once.
2744///
2745/// Used by the factorized expand chain to wrap the final result.
2746#[allow(dead_code)]
2747struct SingleResultOperator {
2748    result: Option<grafeo_core::execution::DataChunk>,
2749}
2750
2751impl SingleResultOperator {
2752    #[allow(dead_code)]
2753    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
2754        Self { result }
2755    }
2756}
2757
2758impl Operator for SingleResultOperator {
2759    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
2760        Ok(self.result.take())
2761    }
2762
2763    fn reset(&mut self) {
2764        // Cannot reset - result is consumed
2765    }
2766
2767    fn name(&self) -> &'static str {
2768        "SingleResult"
2769    }
2770}
2771
2772#[cfg(test)]
2773mod tests {
2774    use super::*;
2775    use crate::query::plan::{
2776        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
2777        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
2778        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
2779        SortKey, SortOp,
2780    };
2781    use grafeo_common::types::Value;
2782
2783    fn create_test_store() -> Arc<LpgStore> {
2784        let store = Arc::new(LpgStore::new());
2785        store.create_node(&["Person"]);
2786        store.create_node(&["Person"]);
2787        store.create_node(&["Company"]);
2788        store
2789    }
2790
2791    // ==================== Simple Scan Tests ====================
2792
2793    #[test]
2794    fn test_plan_simple_scan() {
2795        let store = create_test_store();
2796        let planner = Planner::new(store);
2797
2798        // MATCH (n:Person) RETURN n
2799        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2800            items: vec![ReturnItem {
2801                expression: LogicalExpression::Variable("n".to_string()),
2802                alias: None,
2803            }],
2804            distinct: false,
2805            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2806                variable: "n".to_string(),
2807                label: Some("Person".to_string()),
2808                input: None,
2809            })),
2810        }));
2811
2812        let physical = planner.plan(&logical).unwrap();
2813        assert_eq!(physical.columns(), &["n"]);
2814    }
2815
2816    #[test]
2817    fn test_plan_scan_without_label() {
2818        let store = create_test_store();
2819        let planner = Planner::new(store);
2820
2821        // MATCH (n) RETURN n
2822        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2823            items: vec![ReturnItem {
2824                expression: LogicalExpression::Variable("n".to_string()),
2825                alias: None,
2826            }],
2827            distinct: false,
2828            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2829                variable: "n".to_string(),
2830                label: None,
2831                input: None,
2832            })),
2833        }));
2834
2835        let physical = planner.plan(&logical).unwrap();
2836        assert_eq!(physical.columns(), &["n"]);
2837    }
2838
2839    #[test]
2840    fn test_plan_return_with_alias() {
2841        let store = create_test_store();
2842        let planner = Planner::new(store);
2843
2844        // MATCH (n:Person) RETURN n AS person
2845        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2846            items: vec![ReturnItem {
2847                expression: LogicalExpression::Variable("n".to_string()),
2848                alias: Some("person".to_string()),
2849            }],
2850            distinct: false,
2851            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2852                variable: "n".to_string(),
2853                label: Some("Person".to_string()),
2854                input: None,
2855            })),
2856        }));
2857
2858        let physical = planner.plan(&logical).unwrap();
2859        assert_eq!(physical.columns(), &["person"]);
2860    }
2861
2862    #[test]
2863    fn test_plan_return_property() {
2864        let store = create_test_store();
2865        let planner = Planner::new(store);
2866
2867        // MATCH (n:Person) RETURN n.name
2868        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2869            items: vec![ReturnItem {
2870                expression: LogicalExpression::Property {
2871                    variable: "n".to_string(),
2872                    property: "name".to_string(),
2873                },
2874                alias: None,
2875            }],
2876            distinct: false,
2877            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2878                variable: "n".to_string(),
2879                label: Some("Person".to_string()),
2880                input: None,
2881            })),
2882        }));
2883
2884        let physical = planner.plan(&logical).unwrap();
2885        assert_eq!(physical.columns(), &["n.name"]);
2886    }
2887
2888    #[test]
2889    fn test_plan_return_literal() {
2890        let store = create_test_store();
2891        let planner = Planner::new(store);
2892
2893        // MATCH (n) RETURN 42 AS answer
2894        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2895            items: vec![ReturnItem {
2896                expression: LogicalExpression::Literal(Value::Int64(42)),
2897                alias: Some("answer".to_string()),
2898            }],
2899            distinct: false,
2900            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2901                variable: "n".to_string(),
2902                label: None,
2903                input: None,
2904            })),
2905        }));
2906
2907        let physical = planner.plan(&logical).unwrap();
2908        assert_eq!(physical.columns(), &["answer"]);
2909    }
2910
2911    // ==================== Filter Tests ====================
2912
2913    #[test]
2914    fn test_plan_filter_equality() {
2915        let store = create_test_store();
2916        let planner = Planner::new(store);
2917
2918        // MATCH (n:Person) WHERE n.age = 30 RETURN n
2919        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2920            items: vec![ReturnItem {
2921                expression: LogicalExpression::Variable("n".to_string()),
2922                alias: None,
2923            }],
2924            distinct: false,
2925            input: Box::new(LogicalOperator::Filter(FilterOp {
2926                predicate: LogicalExpression::Binary {
2927                    left: Box::new(LogicalExpression::Property {
2928                        variable: "n".to_string(),
2929                        property: "age".to_string(),
2930                    }),
2931                    op: BinaryOp::Eq,
2932                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2933                },
2934                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2935                    variable: "n".to_string(),
2936                    label: Some("Person".to_string()),
2937                    input: None,
2938                })),
2939            })),
2940        }));
2941
2942        let physical = planner.plan(&logical).unwrap();
2943        assert_eq!(physical.columns(), &["n"]);
2944    }
2945
2946    #[test]
2947    fn test_plan_filter_compound_and() {
2948        let store = create_test_store();
2949        let planner = Planner::new(store);
2950
2951        // WHERE n.age > 20 AND n.age < 40
2952        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2953            items: vec![ReturnItem {
2954                expression: LogicalExpression::Variable("n".to_string()),
2955                alias: None,
2956            }],
2957            distinct: false,
2958            input: Box::new(LogicalOperator::Filter(FilterOp {
2959                predicate: LogicalExpression::Binary {
2960                    left: Box::new(LogicalExpression::Binary {
2961                        left: Box::new(LogicalExpression::Property {
2962                            variable: "n".to_string(),
2963                            property: "age".to_string(),
2964                        }),
2965                        op: BinaryOp::Gt,
2966                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
2967                    }),
2968                    op: BinaryOp::And,
2969                    right: Box::new(LogicalExpression::Binary {
2970                        left: Box::new(LogicalExpression::Property {
2971                            variable: "n".to_string(),
2972                            property: "age".to_string(),
2973                        }),
2974                        op: BinaryOp::Lt,
2975                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
2976                    }),
2977                },
2978                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2979                    variable: "n".to_string(),
2980                    label: None,
2981                    input: None,
2982                })),
2983            })),
2984        }));
2985
2986        let physical = planner.plan(&logical).unwrap();
2987        assert_eq!(physical.columns(), &["n"]);
2988    }
2989
2990    #[test]
2991    fn test_plan_filter_unary_not() {
2992        let store = create_test_store();
2993        let planner = Planner::new(store);
2994
2995        // WHERE NOT n.active
2996        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2997            items: vec![ReturnItem {
2998                expression: LogicalExpression::Variable("n".to_string()),
2999                alias: None,
3000            }],
3001            distinct: false,
3002            input: Box::new(LogicalOperator::Filter(FilterOp {
3003                predicate: LogicalExpression::Unary {
3004                    op: UnaryOp::Not,
3005                    operand: Box::new(LogicalExpression::Property {
3006                        variable: "n".to_string(),
3007                        property: "active".to_string(),
3008                    }),
3009                },
3010                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3011                    variable: "n".to_string(),
3012                    label: None,
3013                    input: None,
3014                })),
3015            })),
3016        }));
3017
3018        let physical = planner.plan(&logical).unwrap();
3019        assert_eq!(physical.columns(), &["n"]);
3020    }
3021
3022    #[test]
3023    fn test_plan_filter_is_null() {
3024        let store = create_test_store();
3025        let planner = Planner::new(store);
3026
3027        // WHERE n.email IS NULL
3028        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3029            items: vec![ReturnItem {
3030                expression: LogicalExpression::Variable("n".to_string()),
3031                alias: None,
3032            }],
3033            distinct: false,
3034            input: Box::new(LogicalOperator::Filter(FilterOp {
3035                predicate: LogicalExpression::Unary {
3036                    op: UnaryOp::IsNull,
3037                    operand: Box::new(LogicalExpression::Property {
3038                        variable: "n".to_string(),
3039                        property: "email".to_string(),
3040                    }),
3041                },
3042                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3043                    variable: "n".to_string(),
3044                    label: None,
3045                    input: None,
3046                })),
3047            })),
3048        }));
3049
3050        let physical = planner.plan(&logical).unwrap();
3051        assert_eq!(physical.columns(), &["n"]);
3052    }
3053
3054    #[test]
3055    fn test_plan_filter_function_call() {
3056        let store = create_test_store();
3057        let planner = Planner::new(store);
3058
3059        // WHERE size(n.friends) > 0
3060        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3061            items: vec![ReturnItem {
3062                expression: LogicalExpression::Variable("n".to_string()),
3063                alias: None,
3064            }],
3065            distinct: false,
3066            input: Box::new(LogicalOperator::Filter(FilterOp {
3067                predicate: LogicalExpression::Binary {
3068                    left: Box::new(LogicalExpression::FunctionCall {
3069                        name: "size".to_string(),
3070                        args: vec![LogicalExpression::Property {
3071                            variable: "n".to_string(),
3072                            property: "friends".to_string(),
3073                        }],
3074                        distinct: false,
3075                    }),
3076                    op: BinaryOp::Gt,
3077                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3078                },
3079                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3080                    variable: "n".to_string(),
3081                    label: None,
3082                    input: None,
3083                })),
3084            })),
3085        }));
3086
3087        let physical = planner.plan(&logical).unwrap();
3088        assert_eq!(physical.columns(), &["n"]);
3089    }
3090
3091    // ==================== Expand Tests ====================
3092
3093    #[test]
3094    fn test_plan_expand_outgoing() {
3095        let store = create_test_store();
3096        let planner = Planner::new(store);
3097
3098        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
3099        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3100            items: vec![
3101                ReturnItem {
3102                    expression: LogicalExpression::Variable("a".to_string()),
3103                    alias: None,
3104                },
3105                ReturnItem {
3106                    expression: LogicalExpression::Variable("b".to_string()),
3107                    alias: None,
3108                },
3109            ],
3110            distinct: false,
3111            input: Box::new(LogicalOperator::Expand(ExpandOp {
3112                from_variable: "a".to_string(),
3113                to_variable: "b".to_string(),
3114                edge_variable: None,
3115                direction: ExpandDirection::Outgoing,
3116                edge_type: Some("KNOWS".to_string()),
3117                min_hops: 1,
3118                max_hops: Some(1),
3119                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3120                    variable: "a".to_string(),
3121                    label: Some("Person".to_string()),
3122                    input: None,
3123                })),
3124                path_alias: None,
3125            })),
3126        }));
3127
3128        let physical = planner.plan(&logical).unwrap();
3129        // The return should have columns [a, b]
3130        assert!(physical.columns().contains(&"a".to_string()));
3131        assert!(physical.columns().contains(&"b".to_string()));
3132    }
3133
3134    #[test]
3135    fn test_plan_expand_with_edge_variable() {
3136        let store = create_test_store();
3137        let planner = Planner::new(store);
3138
3139        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
3140        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3141            items: vec![
3142                ReturnItem {
3143                    expression: LogicalExpression::Variable("a".to_string()),
3144                    alias: None,
3145                },
3146                ReturnItem {
3147                    expression: LogicalExpression::Variable("r".to_string()),
3148                    alias: None,
3149                },
3150                ReturnItem {
3151                    expression: LogicalExpression::Variable("b".to_string()),
3152                    alias: None,
3153                },
3154            ],
3155            distinct: false,
3156            input: Box::new(LogicalOperator::Expand(ExpandOp {
3157                from_variable: "a".to_string(),
3158                to_variable: "b".to_string(),
3159                edge_variable: Some("r".to_string()),
3160                direction: ExpandDirection::Outgoing,
3161                edge_type: Some("KNOWS".to_string()),
3162                min_hops: 1,
3163                max_hops: Some(1),
3164                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3165                    variable: "a".to_string(),
3166                    label: None,
3167                    input: None,
3168                })),
3169                path_alias: None,
3170            })),
3171        }));
3172
3173        let physical = planner.plan(&logical).unwrap();
3174        assert!(physical.columns().contains(&"a".to_string()));
3175        assert!(physical.columns().contains(&"r".to_string()));
3176        assert!(physical.columns().contains(&"b".to_string()));
3177    }
3178
3179    // ==================== Limit/Skip/Sort Tests ====================
3180
3181    #[test]
3182    fn test_plan_limit() {
3183        let store = create_test_store();
3184        let planner = Planner::new(store);
3185
3186        // MATCH (n) RETURN n LIMIT 10
3187        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3188            items: vec![ReturnItem {
3189                expression: LogicalExpression::Variable("n".to_string()),
3190                alias: None,
3191            }],
3192            distinct: false,
3193            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3194                count: 10,
3195                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3196                    variable: "n".to_string(),
3197                    label: None,
3198                    input: None,
3199                })),
3200            })),
3201        }));
3202
3203        let physical = planner.plan(&logical).unwrap();
3204        assert_eq!(physical.columns(), &["n"]);
3205    }
3206
3207    #[test]
3208    fn test_plan_skip() {
3209        let store = create_test_store();
3210        let planner = Planner::new(store);
3211
3212        // MATCH (n) RETURN n SKIP 5
3213        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3214            items: vec![ReturnItem {
3215                expression: LogicalExpression::Variable("n".to_string()),
3216                alias: None,
3217            }],
3218            distinct: false,
3219            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3220                count: 5,
3221                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3222                    variable: "n".to_string(),
3223                    label: None,
3224                    input: None,
3225                })),
3226            })),
3227        }));
3228
3229        let physical = planner.plan(&logical).unwrap();
3230        assert_eq!(physical.columns(), &["n"]);
3231    }
3232
3233    #[test]
3234    fn test_plan_sort() {
3235        let store = create_test_store();
3236        let planner = Planner::new(store);
3237
3238        // MATCH (n) RETURN n ORDER BY n.name ASC
3239        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3240            items: vec![ReturnItem {
3241                expression: LogicalExpression::Variable("n".to_string()),
3242                alias: None,
3243            }],
3244            distinct: false,
3245            input: Box::new(LogicalOperator::Sort(SortOp {
3246                keys: vec![SortKey {
3247                    expression: LogicalExpression::Variable("n".to_string()),
3248                    order: SortOrder::Ascending,
3249                }],
3250                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3251                    variable: "n".to_string(),
3252                    label: None,
3253                    input: None,
3254                })),
3255            })),
3256        }));
3257
3258        let physical = planner.plan(&logical).unwrap();
3259        assert_eq!(physical.columns(), &["n"]);
3260    }
3261
3262    #[test]
3263    fn test_plan_sort_descending() {
3264        let store = create_test_store();
3265        let planner = Planner::new(store);
3266
3267        // ORDER BY n DESC
3268        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3269            items: vec![ReturnItem {
3270                expression: LogicalExpression::Variable("n".to_string()),
3271                alias: None,
3272            }],
3273            distinct: false,
3274            input: Box::new(LogicalOperator::Sort(SortOp {
3275                keys: vec![SortKey {
3276                    expression: LogicalExpression::Variable("n".to_string()),
3277                    order: SortOrder::Descending,
3278                }],
3279                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3280                    variable: "n".to_string(),
3281                    label: None,
3282                    input: None,
3283                })),
3284            })),
3285        }));
3286
3287        let physical = planner.plan(&logical).unwrap();
3288        assert_eq!(physical.columns(), &["n"]);
3289    }
3290
3291    #[test]
3292    fn test_plan_distinct() {
3293        let store = create_test_store();
3294        let planner = Planner::new(store);
3295
3296        // MATCH (n) RETURN DISTINCT n
3297        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3298            items: vec![ReturnItem {
3299                expression: LogicalExpression::Variable("n".to_string()),
3300                alias: None,
3301            }],
3302            distinct: false,
3303            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3304                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3305                    variable: "n".to_string(),
3306                    label: None,
3307                    input: None,
3308                })),
3309                columns: None,
3310            })),
3311        }));
3312
3313        let physical = planner.plan(&logical).unwrap();
3314        assert_eq!(physical.columns(), &["n"]);
3315    }
3316
3317    // ==================== Aggregate Tests ====================
3318
3319    #[test]
3320    fn test_plan_aggregate_count() {
3321        let store = create_test_store();
3322        let planner = Planner::new(store);
3323
3324        // MATCH (n) RETURN count(n)
3325        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3326            items: vec![ReturnItem {
3327                expression: LogicalExpression::Variable("cnt".to_string()),
3328                alias: None,
3329            }],
3330            distinct: false,
3331            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3332                group_by: vec![],
3333                aggregates: vec![LogicalAggregateExpr {
3334                    function: LogicalAggregateFunction::Count,
3335                    expression: Some(LogicalExpression::Variable("n".to_string())),
3336                    distinct: false,
3337                    alias: Some("cnt".to_string()),
3338                    percentile: None,
3339                }],
3340                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3341                    variable: "n".to_string(),
3342                    label: None,
3343                    input: None,
3344                })),
3345                having: None,
3346            })),
3347        }));
3348
3349        let physical = planner.plan(&logical).unwrap();
3350        assert!(physical.columns().contains(&"cnt".to_string()));
3351    }
3352
3353    #[test]
3354    fn test_plan_aggregate_with_group_by() {
3355        let store = create_test_store();
3356        let planner = Planner::new(store);
3357
3358        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
3359        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3360            group_by: vec![LogicalExpression::Property {
3361                variable: "n".to_string(),
3362                property: "city".to_string(),
3363            }],
3364            aggregates: vec![LogicalAggregateExpr {
3365                function: LogicalAggregateFunction::Count,
3366                expression: Some(LogicalExpression::Variable("n".to_string())),
3367                distinct: false,
3368                alias: Some("cnt".to_string()),
3369                percentile: None,
3370            }],
3371            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3372                variable: "n".to_string(),
3373                label: Some("Person".to_string()),
3374                input: None,
3375            })),
3376            having: None,
3377        }));
3378
3379        let physical = planner.plan(&logical).unwrap();
3380        assert_eq!(physical.columns().len(), 2);
3381    }
3382
3383    #[test]
3384    fn test_plan_aggregate_sum() {
3385        let store = create_test_store();
3386        let planner = Planner::new(store);
3387
3388        // SUM(n.value)
3389        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3390            group_by: vec![],
3391            aggregates: vec![LogicalAggregateExpr {
3392                function: LogicalAggregateFunction::Sum,
3393                expression: Some(LogicalExpression::Property {
3394                    variable: "n".to_string(),
3395                    property: "value".to_string(),
3396                }),
3397                distinct: false,
3398                alias: Some("total".to_string()),
3399                percentile: None,
3400            }],
3401            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3402                variable: "n".to_string(),
3403                label: None,
3404                input: None,
3405            })),
3406            having: None,
3407        }));
3408
3409        let physical = planner.plan(&logical).unwrap();
3410        assert!(physical.columns().contains(&"total".to_string()));
3411    }
3412
3413    #[test]
3414    fn test_plan_aggregate_avg() {
3415        let store = create_test_store();
3416        let planner = Planner::new(store);
3417
3418        // AVG(n.score)
3419        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3420            group_by: vec![],
3421            aggregates: vec![LogicalAggregateExpr {
3422                function: LogicalAggregateFunction::Avg,
3423                expression: Some(LogicalExpression::Property {
3424                    variable: "n".to_string(),
3425                    property: "score".to_string(),
3426                }),
3427                distinct: false,
3428                alias: Some("average".to_string()),
3429                percentile: None,
3430            }],
3431            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3432                variable: "n".to_string(),
3433                label: None,
3434                input: None,
3435            })),
3436            having: None,
3437        }));
3438
3439        let physical = planner.plan(&logical).unwrap();
3440        assert!(physical.columns().contains(&"average".to_string()));
3441    }
3442
3443    #[test]
3444    fn test_plan_aggregate_min_max() {
3445        let store = create_test_store();
3446        let planner = Planner::new(store);
3447
3448        // MIN(n.age), MAX(n.age)
3449        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3450            group_by: vec![],
3451            aggregates: vec![
3452                LogicalAggregateExpr {
3453                    function: LogicalAggregateFunction::Min,
3454                    expression: Some(LogicalExpression::Property {
3455                        variable: "n".to_string(),
3456                        property: "age".to_string(),
3457                    }),
3458                    distinct: false,
3459                    alias: Some("youngest".to_string()),
3460                    percentile: None,
3461                },
3462                LogicalAggregateExpr {
3463                    function: LogicalAggregateFunction::Max,
3464                    expression: Some(LogicalExpression::Property {
3465                        variable: "n".to_string(),
3466                        property: "age".to_string(),
3467                    }),
3468                    distinct: false,
3469                    alias: Some("oldest".to_string()),
3470                    percentile: None,
3471                },
3472            ],
3473            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3474                variable: "n".to_string(),
3475                label: None,
3476                input: None,
3477            })),
3478            having: None,
3479        }));
3480
3481        let physical = planner.plan(&logical).unwrap();
3482        assert!(physical.columns().contains(&"youngest".to_string()));
3483        assert!(physical.columns().contains(&"oldest".to_string()));
3484    }
3485
3486    // ==================== Join Tests ====================
3487
3488    #[test]
3489    fn test_plan_inner_join() {
3490        let store = create_test_store();
3491        let planner = Planner::new(store);
3492
3493        // Inner join between two scans
3494        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3495            items: vec![
3496                ReturnItem {
3497                    expression: LogicalExpression::Variable("a".to_string()),
3498                    alias: None,
3499                },
3500                ReturnItem {
3501                    expression: LogicalExpression::Variable("b".to_string()),
3502                    alias: None,
3503                },
3504            ],
3505            distinct: false,
3506            input: Box::new(LogicalOperator::Join(JoinOp {
3507                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3508                    variable: "a".to_string(),
3509                    label: Some("Person".to_string()),
3510                    input: None,
3511                })),
3512                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3513                    variable: "b".to_string(),
3514                    label: Some("Company".to_string()),
3515                    input: None,
3516                })),
3517                join_type: JoinType::Inner,
3518                conditions: vec![JoinCondition {
3519                    left: LogicalExpression::Variable("a".to_string()),
3520                    right: LogicalExpression::Variable("b".to_string()),
3521                }],
3522            })),
3523        }));
3524
3525        let physical = planner.plan(&logical).unwrap();
3526        assert!(physical.columns().contains(&"a".to_string()));
3527        assert!(physical.columns().contains(&"b".to_string()));
3528    }
3529
3530    #[test]
3531    fn test_plan_cross_join() {
3532        let store = create_test_store();
3533        let planner = Planner::new(store);
3534
3535        // Cross join (no conditions)
3536        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3537            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3538                variable: "a".to_string(),
3539                label: None,
3540                input: None,
3541            })),
3542            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3543                variable: "b".to_string(),
3544                label: None,
3545                input: None,
3546            })),
3547            join_type: JoinType::Cross,
3548            conditions: vec![],
3549        }));
3550
3551        let physical = planner.plan(&logical).unwrap();
3552        assert_eq!(physical.columns().len(), 2);
3553    }
3554
3555    #[test]
3556    fn test_plan_left_join() {
3557        let store = create_test_store();
3558        let planner = Planner::new(store);
3559
3560        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3561            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3562                variable: "a".to_string(),
3563                label: None,
3564                input: None,
3565            })),
3566            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3567                variable: "b".to_string(),
3568                label: None,
3569                input: None,
3570            })),
3571            join_type: JoinType::Left,
3572            conditions: vec![],
3573        }));
3574
3575        let physical = planner.plan(&logical).unwrap();
3576        assert_eq!(physical.columns().len(), 2);
3577    }
3578
3579    // ==================== Mutation Tests ====================
3580
3581    #[test]
3582    fn test_plan_create_node() {
3583        let store = create_test_store();
3584        let planner = Planner::new(store);
3585
3586        // CREATE (n:Person {name: 'Alice'})
3587        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3588            variable: "n".to_string(),
3589            labels: vec!["Person".to_string()],
3590            properties: vec![(
3591                "name".to_string(),
3592                LogicalExpression::Literal(Value::String("Alice".into())),
3593            )],
3594            input: None,
3595        }));
3596
3597        let physical = planner.plan(&logical).unwrap();
3598        assert!(physical.columns().contains(&"n".to_string()));
3599    }
3600
3601    #[test]
3602    fn test_plan_create_edge() {
3603        let store = create_test_store();
3604        let planner = Planner::new(store);
3605
3606        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
3607        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
3608            variable: Some("r".to_string()),
3609            from_variable: "a".to_string(),
3610            to_variable: "b".to_string(),
3611            edge_type: "KNOWS".to_string(),
3612            properties: vec![],
3613            input: Box::new(LogicalOperator::Join(JoinOp {
3614                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3615                    variable: "a".to_string(),
3616                    label: None,
3617                    input: None,
3618                })),
3619                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3620                    variable: "b".to_string(),
3621                    label: None,
3622                    input: None,
3623                })),
3624                join_type: JoinType::Cross,
3625                conditions: vec![],
3626            })),
3627        }));
3628
3629        let physical = planner.plan(&logical).unwrap();
3630        assert!(physical.columns().contains(&"r".to_string()));
3631    }
3632
3633    #[test]
3634    fn test_plan_delete_node() {
3635        let store = create_test_store();
3636        let planner = Planner::new(store);
3637
3638        // MATCH (n) DELETE n
3639        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
3640            variable: "n".to_string(),
3641            detach: false,
3642            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3643                variable: "n".to_string(),
3644                label: None,
3645                input: None,
3646            })),
3647        }));
3648
3649        let physical = planner.plan(&logical).unwrap();
3650        assert!(physical.columns().contains(&"deleted_count".to_string()));
3651    }
3652
3653    // ==================== Error Cases ====================
3654
3655    #[test]
3656    fn test_plan_empty_errors() {
3657        let store = create_test_store();
3658        let planner = Planner::new(store);
3659
3660        let logical = LogicalPlan::new(LogicalOperator::Empty);
3661        let result = planner.plan(&logical);
3662        assert!(result.is_err());
3663    }
3664
3665    #[test]
3666    fn test_plan_missing_variable_in_return() {
3667        let store = create_test_store();
3668        let planner = Planner::new(store);
3669
3670        // Return variable that doesn't exist in input
3671        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3672            items: vec![ReturnItem {
3673                expression: LogicalExpression::Variable("missing".to_string()),
3674                alias: None,
3675            }],
3676            distinct: false,
3677            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3678                variable: "n".to_string(),
3679                label: None,
3680                input: None,
3681            })),
3682        }));
3683
3684        let result = planner.plan(&logical);
3685        assert!(result.is_err());
3686    }
3687
3688    // ==================== Helper Function Tests ====================
3689
3690    #[test]
3691    fn test_convert_binary_ops() {
3692        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
3693        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
3694        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
3695        assert!(convert_binary_op(BinaryOp::Le).is_ok());
3696        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
3697        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
3698        assert!(convert_binary_op(BinaryOp::And).is_ok());
3699        assert!(convert_binary_op(BinaryOp::Or).is_ok());
3700        assert!(convert_binary_op(BinaryOp::Add).is_ok());
3701        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
3702        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
3703        assert!(convert_binary_op(BinaryOp::Div).is_ok());
3704    }
3705
3706    #[test]
3707    fn test_convert_unary_ops() {
3708        assert!(convert_unary_op(UnaryOp::Not).is_ok());
3709        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
3710        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
3711        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
3712    }
3713
3714    #[test]
3715    fn test_convert_aggregate_functions() {
3716        assert!(matches!(
3717            convert_aggregate_function(LogicalAggregateFunction::Count),
3718            PhysicalAggregateFunction::Count
3719        ));
3720        assert!(matches!(
3721            convert_aggregate_function(LogicalAggregateFunction::Sum),
3722            PhysicalAggregateFunction::Sum
3723        ));
3724        assert!(matches!(
3725            convert_aggregate_function(LogicalAggregateFunction::Avg),
3726            PhysicalAggregateFunction::Avg
3727        ));
3728        assert!(matches!(
3729            convert_aggregate_function(LogicalAggregateFunction::Min),
3730            PhysicalAggregateFunction::Min
3731        ));
3732        assert!(matches!(
3733            convert_aggregate_function(LogicalAggregateFunction::Max),
3734            PhysicalAggregateFunction::Max
3735        ));
3736    }
3737
3738    #[test]
3739    fn test_planner_accessors() {
3740        let store = create_test_store();
3741        let planner = Planner::new(Arc::clone(&store));
3742
3743        assert!(planner.tx_id().is_none());
3744        assert!(planner.tx_manager().is_none());
3745        let _ = planner.viewing_epoch(); // Just ensure it's accessible
3746    }
3747
3748    #[test]
3749    fn test_physical_plan_accessors() {
3750        let store = create_test_store();
3751        let planner = Planner::new(store);
3752
3753        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
3754            variable: "n".to_string(),
3755            label: None,
3756            input: None,
3757        }));
3758
3759        let physical = planner.plan(&logical).unwrap();
3760        assert_eq!(physical.columns(), &["n"]);
3761
3762        // Test into_operator
3763        let _ = physical.into_operator();
3764    }
3765
3766    // ==================== Adaptive Planning Tests ====================
3767
3768    #[test]
3769    fn test_plan_adaptive_with_scan() {
3770        let store = create_test_store();
3771        let planner = Planner::new(store);
3772
3773        // MATCH (n:Person) RETURN n
3774        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3775            items: vec![ReturnItem {
3776                expression: LogicalExpression::Variable("n".to_string()),
3777                alias: None,
3778            }],
3779            distinct: false,
3780            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3781                variable: "n".to_string(),
3782                label: Some("Person".to_string()),
3783                input: None,
3784            })),
3785        }));
3786
3787        let physical = planner.plan_adaptive(&logical).unwrap();
3788        assert_eq!(physical.columns(), &["n"]);
3789        // Should have adaptive context with estimates
3790        assert!(physical.adaptive_context.is_some());
3791    }
3792
3793    #[test]
3794    fn test_plan_adaptive_with_filter() {
3795        let store = create_test_store();
3796        let planner = Planner::new(store);
3797
3798        // MATCH (n) WHERE n.age > 30 RETURN n
3799        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3800            items: vec![ReturnItem {
3801                expression: LogicalExpression::Variable("n".to_string()),
3802                alias: None,
3803            }],
3804            distinct: false,
3805            input: Box::new(LogicalOperator::Filter(FilterOp {
3806                predicate: LogicalExpression::Binary {
3807                    left: Box::new(LogicalExpression::Property {
3808                        variable: "n".to_string(),
3809                        property: "age".to_string(),
3810                    }),
3811                    op: BinaryOp::Gt,
3812                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3813                },
3814                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3815                    variable: "n".to_string(),
3816                    label: None,
3817                    input: None,
3818                })),
3819            })),
3820        }));
3821
3822        let physical = planner.plan_adaptive(&logical).unwrap();
3823        assert!(physical.adaptive_context.is_some());
3824    }
3825
3826    #[test]
3827    fn test_plan_adaptive_with_expand() {
3828        let store = create_test_store();
3829        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
3830
3831        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
3832        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3833            items: vec![
3834                ReturnItem {
3835                    expression: LogicalExpression::Variable("a".to_string()),
3836                    alias: None,
3837                },
3838                ReturnItem {
3839                    expression: LogicalExpression::Variable("b".to_string()),
3840                    alias: None,
3841                },
3842            ],
3843            distinct: false,
3844            input: Box::new(LogicalOperator::Expand(ExpandOp {
3845                from_variable: "a".to_string(),
3846                to_variable: "b".to_string(),
3847                edge_variable: None,
3848                direction: ExpandDirection::Outgoing,
3849                edge_type: Some("KNOWS".to_string()),
3850                min_hops: 1,
3851                max_hops: Some(1),
3852                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3853                    variable: "a".to_string(),
3854                    label: None,
3855                    input: None,
3856                })),
3857                path_alias: None,
3858            })),
3859        }));
3860
3861        let physical = planner.plan_adaptive(&logical).unwrap();
3862        assert!(physical.adaptive_context.is_some());
3863    }
3864
3865    #[test]
3866    fn test_plan_adaptive_with_join() {
3867        let store = create_test_store();
3868        let planner = Planner::new(store);
3869
3870        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3871            items: vec![
3872                ReturnItem {
3873                    expression: LogicalExpression::Variable("a".to_string()),
3874                    alias: None,
3875                },
3876                ReturnItem {
3877                    expression: LogicalExpression::Variable("b".to_string()),
3878                    alias: None,
3879                },
3880            ],
3881            distinct: false,
3882            input: Box::new(LogicalOperator::Join(JoinOp {
3883                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3884                    variable: "a".to_string(),
3885                    label: None,
3886                    input: None,
3887                })),
3888                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3889                    variable: "b".to_string(),
3890                    label: None,
3891                    input: None,
3892                })),
3893                join_type: JoinType::Cross,
3894                conditions: vec![],
3895            })),
3896        }));
3897
3898        let physical = planner.plan_adaptive(&logical).unwrap();
3899        assert!(physical.adaptive_context.is_some());
3900    }
3901
3902    #[test]
3903    fn test_plan_adaptive_with_aggregate() {
3904        let store = create_test_store();
3905        let planner = Planner::new(store);
3906
3907        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3908            group_by: vec![],
3909            aggregates: vec![LogicalAggregateExpr {
3910                function: LogicalAggregateFunction::Count,
3911                expression: Some(LogicalExpression::Variable("n".to_string())),
3912                distinct: false,
3913                alias: Some("cnt".to_string()),
3914                percentile: None,
3915            }],
3916            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3917                variable: "n".to_string(),
3918                label: None,
3919                input: None,
3920            })),
3921            having: None,
3922        }));
3923
3924        let physical = planner.plan_adaptive(&logical).unwrap();
3925        assert!(physical.adaptive_context.is_some());
3926    }
3927
3928    #[test]
3929    fn test_plan_adaptive_with_distinct() {
3930        let store = create_test_store();
3931        let planner = Planner::new(store);
3932
3933        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3934            items: vec![ReturnItem {
3935                expression: LogicalExpression::Variable("n".to_string()),
3936                alias: None,
3937            }],
3938            distinct: false,
3939            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3940                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3941                    variable: "n".to_string(),
3942                    label: None,
3943                    input: None,
3944                })),
3945                columns: None,
3946            })),
3947        }));
3948
3949        let physical = planner.plan_adaptive(&logical).unwrap();
3950        assert!(physical.adaptive_context.is_some());
3951    }
3952
3953    #[test]
3954    fn test_plan_adaptive_with_limit() {
3955        let store = create_test_store();
3956        let planner = Planner::new(store);
3957
3958        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3959            items: vec![ReturnItem {
3960                expression: LogicalExpression::Variable("n".to_string()),
3961                alias: None,
3962            }],
3963            distinct: false,
3964            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3965                count: 10,
3966                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3967                    variable: "n".to_string(),
3968                    label: None,
3969                    input: None,
3970                })),
3971            })),
3972        }));
3973
3974        let physical = planner.plan_adaptive(&logical).unwrap();
3975        assert!(physical.adaptive_context.is_some());
3976    }
3977
3978    #[test]
3979    fn test_plan_adaptive_with_skip() {
3980        let store = create_test_store();
3981        let planner = Planner::new(store);
3982
3983        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3984            items: vec![ReturnItem {
3985                expression: LogicalExpression::Variable("n".to_string()),
3986                alias: None,
3987            }],
3988            distinct: false,
3989            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3990                count: 5,
3991                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3992                    variable: "n".to_string(),
3993                    label: None,
3994                    input: None,
3995                })),
3996            })),
3997        }));
3998
3999        let physical = planner.plan_adaptive(&logical).unwrap();
4000        assert!(physical.adaptive_context.is_some());
4001    }
4002
4003    #[test]
4004    fn test_plan_adaptive_with_sort() {
4005        let store = create_test_store();
4006        let planner = Planner::new(store);
4007
4008        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4009            items: vec![ReturnItem {
4010                expression: LogicalExpression::Variable("n".to_string()),
4011                alias: None,
4012            }],
4013            distinct: false,
4014            input: Box::new(LogicalOperator::Sort(SortOp {
4015                keys: vec![SortKey {
4016                    expression: LogicalExpression::Variable("n".to_string()),
4017                    order: SortOrder::Ascending,
4018                }],
4019                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4020                    variable: "n".to_string(),
4021                    label: None,
4022                    input: None,
4023                })),
4024            })),
4025        }));
4026
4027        let physical = planner.plan_adaptive(&logical).unwrap();
4028        assert!(physical.adaptive_context.is_some());
4029    }
4030
4031    #[test]
4032    fn test_plan_adaptive_with_union() {
4033        let store = create_test_store();
4034        let planner = Planner::new(store);
4035
4036        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4037            items: vec![ReturnItem {
4038                expression: LogicalExpression::Variable("n".to_string()),
4039                alias: None,
4040            }],
4041            distinct: false,
4042            input: Box::new(LogicalOperator::Union(UnionOp {
4043                inputs: vec![
4044                    LogicalOperator::NodeScan(NodeScanOp {
4045                        variable: "n".to_string(),
4046                        label: Some("Person".to_string()),
4047                        input: None,
4048                    }),
4049                    LogicalOperator::NodeScan(NodeScanOp {
4050                        variable: "n".to_string(),
4051                        label: Some("Company".to_string()),
4052                        input: None,
4053                    }),
4054                ],
4055            })),
4056        }));
4057
4058        let physical = planner.plan_adaptive(&logical).unwrap();
4059        assert!(physical.adaptive_context.is_some());
4060    }
4061
4062    // ==================== Variable Length Path Tests ====================
4063
4064    #[test]
4065    fn test_plan_expand_variable_length() {
4066        let store = create_test_store();
4067        let planner = Planner::new(store);
4068
4069        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
4070        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4071            items: vec![
4072                ReturnItem {
4073                    expression: LogicalExpression::Variable("a".to_string()),
4074                    alias: None,
4075                },
4076                ReturnItem {
4077                    expression: LogicalExpression::Variable("b".to_string()),
4078                    alias: None,
4079                },
4080            ],
4081            distinct: false,
4082            input: Box::new(LogicalOperator::Expand(ExpandOp {
4083                from_variable: "a".to_string(),
4084                to_variable: "b".to_string(),
4085                edge_variable: None,
4086                direction: ExpandDirection::Outgoing,
4087                edge_type: Some("KNOWS".to_string()),
4088                min_hops: 1,
4089                max_hops: Some(3),
4090                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4091                    variable: "a".to_string(),
4092                    label: None,
4093                    input: None,
4094                })),
4095                path_alias: None,
4096            })),
4097        }));
4098
4099        let physical = planner.plan(&logical).unwrap();
4100        assert!(physical.columns().contains(&"a".to_string()));
4101        assert!(physical.columns().contains(&"b".to_string()));
4102    }
4103
4104    #[test]
4105    fn test_plan_expand_with_path_alias() {
4106        let store = create_test_store();
4107        let planner = Planner::new(store);
4108
4109        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
4110        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4111            items: vec![
4112                ReturnItem {
4113                    expression: LogicalExpression::Variable("a".to_string()),
4114                    alias: None,
4115                },
4116                ReturnItem {
4117                    expression: LogicalExpression::Variable("b".to_string()),
4118                    alias: None,
4119                },
4120            ],
4121            distinct: false,
4122            input: Box::new(LogicalOperator::Expand(ExpandOp {
4123                from_variable: "a".to_string(),
4124                to_variable: "b".to_string(),
4125                edge_variable: None,
4126                direction: ExpandDirection::Outgoing,
4127                edge_type: Some("KNOWS".to_string()),
4128                min_hops: 1,
4129                max_hops: Some(3),
4130                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4131                    variable: "a".to_string(),
4132                    label: None,
4133                    input: None,
4134                })),
4135                path_alias: Some("p".to_string()),
4136            })),
4137        }));
4138
4139        let physical = planner.plan(&logical).unwrap();
4140        // Verify plan was created successfully with expected output columns
4141        assert!(physical.columns().contains(&"a".to_string()));
4142        assert!(physical.columns().contains(&"b".to_string()));
4143    }
4144
4145    #[test]
4146    fn test_plan_expand_incoming() {
4147        let store = create_test_store();
4148        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4149
4150        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
4151        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4152            items: vec![
4153                ReturnItem {
4154                    expression: LogicalExpression::Variable("a".to_string()),
4155                    alias: None,
4156                },
4157                ReturnItem {
4158                    expression: LogicalExpression::Variable("b".to_string()),
4159                    alias: None,
4160                },
4161            ],
4162            distinct: false,
4163            input: Box::new(LogicalOperator::Expand(ExpandOp {
4164                from_variable: "a".to_string(),
4165                to_variable: "b".to_string(),
4166                edge_variable: None,
4167                direction: ExpandDirection::Incoming,
4168                edge_type: Some("KNOWS".to_string()),
4169                min_hops: 1,
4170                max_hops: Some(1),
4171                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4172                    variable: "a".to_string(),
4173                    label: None,
4174                    input: None,
4175                })),
4176                path_alias: None,
4177            })),
4178        }));
4179
4180        let physical = planner.plan(&logical).unwrap();
4181        assert!(physical.columns().contains(&"a".to_string()));
4182        assert!(physical.columns().contains(&"b".to_string()));
4183    }
4184
4185    #[test]
4186    fn test_plan_expand_both_directions() {
4187        let store = create_test_store();
4188        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4189
4190        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
4191        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4192            items: vec![
4193                ReturnItem {
4194                    expression: LogicalExpression::Variable("a".to_string()),
4195                    alias: None,
4196                },
4197                ReturnItem {
4198                    expression: LogicalExpression::Variable("b".to_string()),
4199                    alias: None,
4200                },
4201            ],
4202            distinct: false,
4203            input: Box::new(LogicalOperator::Expand(ExpandOp {
4204                from_variable: "a".to_string(),
4205                to_variable: "b".to_string(),
4206                edge_variable: None,
4207                direction: ExpandDirection::Both,
4208                edge_type: Some("KNOWS".to_string()),
4209                min_hops: 1,
4210                max_hops: Some(1),
4211                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4212                    variable: "a".to_string(),
4213                    label: None,
4214                    input: None,
4215                })),
4216                path_alias: None,
4217            })),
4218        }));
4219
4220        let physical = planner.plan(&logical).unwrap();
4221        assert!(physical.columns().contains(&"a".to_string()));
4222        assert!(physical.columns().contains(&"b".to_string()));
4223    }
4224
4225    // ==================== With Context Tests ====================
4226
4227    #[test]
4228    fn test_planner_with_context() {
4229        use crate::transaction::TransactionManager;
4230
4231        let store = create_test_store();
4232        let tx_manager = Arc::new(TransactionManager::new());
4233        let tx_id = tx_manager.begin();
4234        let epoch = tx_manager.current_epoch();
4235
4236        let planner = Planner::with_context(
4237            Arc::clone(&store),
4238            Arc::clone(&tx_manager),
4239            Some(tx_id),
4240            epoch,
4241        );
4242
4243        assert_eq!(planner.tx_id(), Some(tx_id));
4244        assert!(planner.tx_manager().is_some());
4245        assert_eq!(planner.viewing_epoch(), epoch);
4246    }
4247
4248    #[test]
4249    fn test_planner_with_factorized_execution_disabled() {
4250        let store = create_test_store();
4251        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4252
4253        // Two consecutive expands - should NOT use factorized execution
4254        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4255            items: vec![
4256                ReturnItem {
4257                    expression: LogicalExpression::Variable("a".to_string()),
4258                    alias: None,
4259                },
4260                ReturnItem {
4261                    expression: LogicalExpression::Variable("c".to_string()),
4262                    alias: None,
4263                },
4264            ],
4265            distinct: false,
4266            input: Box::new(LogicalOperator::Expand(ExpandOp {
4267                from_variable: "b".to_string(),
4268                to_variable: "c".to_string(),
4269                edge_variable: None,
4270                direction: ExpandDirection::Outgoing,
4271                edge_type: None,
4272                min_hops: 1,
4273                max_hops: Some(1),
4274                input: Box::new(LogicalOperator::Expand(ExpandOp {
4275                    from_variable: "a".to_string(),
4276                    to_variable: "b".to_string(),
4277                    edge_variable: None,
4278                    direction: ExpandDirection::Outgoing,
4279                    edge_type: None,
4280                    min_hops: 1,
4281                    max_hops: Some(1),
4282                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4283                        variable: "a".to_string(),
4284                        label: None,
4285                        input: None,
4286                    })),
4287                    path_alias: None,
4288                })),
4289                path_alias: None,
4290            })),
4291        }));
4292
4293        let physical = planner.plan(&logical).unwrap();
4294        assert!(physical.columns().contains(&"a".to_string()));
4295        assert!(physical.columns().contains(&"c".to_string()));
4296    }
4297
4298    // ==================== Sort with Property Tests ====================
4299
4300    #[test]
4301    fn test_plan_sort_by_property() {
4302        let store = create_test_store();
4303        let planner = Planner::new(store);
4304
4305        // MATCH (n) RETURN n ORDER BY n.name ASC
4306        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4307            items: vec![ReturnItem {
4308                expression: LogicalExpression::Variable("n".to_string()),
4309                alias: None,
4310            }],
4311            distinct: false,
4312            input: Box::new(LogicalOperator::Sort(SortOp {
4313                keys: vec![SortKey {
4314                    expression: LogicalExpression::Property {
4315                        variable: "n".to_string(),
4316                        property: "name".to_string(),
4317                    },
4318                    order: SortOrder::Ascending,
4319                }],
4320                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4321                    variable: "n".to_string(),
4322                    label: None,
4323                    input: None,
4324                })),
4325            })),
4326        }));
4327
4328        let physical = planner.plan(&logical).unwrap();
4329        // Should have the property column projected
4330        assert!(physical.columns().contains(&"n".to_string()));
4331    }
4332
4333    // ==================== Scan with Input Tests ====================
4334
4335    #[test]
4336    fn test_plan_scan_with_input() {
4337        let store = create_test_store();
4338        let planner = Planner::new(store);
4339
4340        // A scan with another scan as input (for chained patterns)
4341        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4342            items: vec![
4343                ReturnItem {
4344                    expression: LogicalExpression::Variable("a".to_string()),
4345                    alias: None,
4346                },
4347                ReturnItem {
4348                    expression: LogicalExpression::Variable("b".to_string()),
4349                    alias: None,
4350                },
4351            ],
4352            distinct: false,
4353            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4354                variable: "b".to_string(),
4355                label: Some("Company".to_string()),
4356                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4357                    variable: "a".to_string(),
4358                    label: Some("Person".to_string()),
4359                    input: None,
4360                }))),
4361            })),
4362        }));
4363
4364        let physical = planner.plan(&logical).unwrap();
4365        assert!(physical.columns().contains(&"a".to_string()));
4366        assert!(physical.columns().contains(&"b".to_string()));
4367    }
4368}