Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CallProcedureOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
10    ExpandDirection, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
11    LogicalOperator, LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp,
12    ShortestPathOp, SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29    UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37/// Range bounds for property-based range queries.
38struct RangeBounds<'a> {
39    min: Option<&'a Value>,
40    max: Option<&'a Value>,
41    min_inclusive: bool,
42    max_inclusive: bool,
43}
44
45/// Converts a logical plan to a physical operator tree.
46pub struct Planner {
47    /// The graph store to scan from.
48    store: Arc<LpgStore>,
49    /// Transaction manager for MVCC operations.
50    tx_manager: Option<Arc<TransactionManager>>,
51    /// Current transaction ID (if in a transaction).
52    tx_id: Option<TxId>,
53    /// Epoch to use for visibility checks.
54    viewing_epoch: EpochId,
55    /// Counter for generating unique anonymous edge column names.
56    anon_edge_counter: std::cell::Cell<u32>,
57    /// Whether to use factorized execution for multi-hop queries.
58    factorized_execution: bool,
59}
60
61impl Planner {
62    /// Creates a new planner with the given store.
63    ///
64    /// This creates a planner without transaction context, using the current
65    /// epoch from the store for visibility.
66    #[must_use]
67    pub fn new(store: Arc<LpgStore>) -> Self {
68        let epoch = store.current_epoch();
69        Self {
70            store,
71            tx_manager: None,
72            tx_id: None,
73            viewing_epoch: epoch,
74            anon_edge_counter: std::cell::Cell::new(0),
75            factorized_execution: true,
76        }
77    }
78
79    /// Creates a new planner with transaction context for MVCC-aware planning.
80    ///
81    /// # Arguments
82    ///
83    /// * `store` - The graph store
84    /// * `tx_manager` - Transaction manager for recording reads/writes
85    /// * `tx_id` - Current transaction ID (None for auto-commit)
86    /// * `viewing_epoch` - Epoch to use for version visibility
87    #[must_use]
88    pub fn with_context(
89        store: Arc<LpgStore>,
90        tx_manager: Arc<TransactionManager>,
91        tx_id: Option<TxId>,
92        viewing_epoch: EpochId,
93    ) -> Self {
94        Self {
95            store,
96            tx_manager: Some(tx_manager),
97            tx_id,
98            viewing_epoch,
99            anon_edge_counter: std::cell::Cell::new(0),
100            factorized_execution: true,
101        }
102    }
103
104    /// Returns the viewing epoch for this planner.
105    #[must_use]
106    pub fn viewing_epoch(&self) -> EpochId {
107        self.viewing_epoch
108    }
109
110    /// Returns the transaction ID for this planner, if any.
111    #[must_use]
112    pub fn tx_id(&self) -> Option<TxId> {
113        self.tx_id
114    }
115
116    /// Returns a reference to the transaction manager, if available.
117    #[must_use]
118    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
119        self.tx_manager.as_ref()
120    }
121
122    /// Enables or disables factorized execution for multi-hop queries.
123    #[must_use]
124    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
125        self.factorized_execution = enabled;
126        self
127    }
128
129    /// Counts consecutive single-hop expand operations.
130    ///
131    /// Returns the count and the deepest non-expand operator (the base of the chain).
132    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
133        match op {
134            LogicalOperator::Expand(expand) => {
135                // Only count single-hop expands (factorization doesn't apply to variable-length)
136                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
137
138                if is_single_hop {
139                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
140                    (inner_count + 1, base)
141                } else {
142                    // Variable-length path breaks the chain
143                    (0, op)
144                }
145            }
146            _ => (0, op),
147        }
148    }
149
150    /// Collects expand operations from the outermost down to the base.
151    ///
152    /// Returns expands in order from innermost (base) to outermost.
153    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
154        let mut chain = Vec::new();
155        let mut current = op;
156
157        while let LogicalOperator::Expand(expand) = current {
158            // Only include single-hop expands
159            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
160            if !is_single_hop {
161                break;
162            }
163            chain.push(expand);
164            current = &expand.input;
165        }
166
167        // Reverse so we go from base to outer
168        chain.reverse();
169        chain
170    }
171
172    /// Plans a logical plan into a physical operator.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if planning fails.
177    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
178        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
179        Ok(PhysicalPlan {
180            operator,
181            columns,
182            adaptive_context: None,
183        })
184    }
185
186    /// Plans a logical plan with adaptive execution support.
187    ///
188    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
189    /// joins) that can be monitored during execution to detect estimate errors.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if planning fails.
194    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
195        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
196
197        // Build adaptive context with cardinality estimates
198        let mut adaptive_context = AdaptiveContext::new();
199        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
200
201        Ok(PhysicalPlan {
202            operator,
203            columns,
204            adaptive_context: Some(adaptive_context),
205        })
206    }
207
208    /// Collects cardinality estimates from the logical plan into an adaptive context.
209    fn collect_cardinality_estimates(
210        &self,
211        op: &LogicalOperator,
212        ctx: &mut AdaptiveContext,
213        depth: usize,
214    ) {
215        match op {
216            LogicalOperator::NodeScan(scan) => {
217                // Estimate based on label statistics
218                let estimate = if let Some(label) = &scan.label {
219                    self.store.nodes_by_label(label).len() as f64
220                } else {
221                    self.store.node_count() as f64
222                };
223                let id = format!("scan_{}", scan.variable);
224                ctx.set_estimate(&id, estimate);
225
226                // Recurse into input if present
227                if let Some(input) = &scan.input {
228                    self.collect_cardinality_estimates(input, ctx, depth + 1);
229                }
230            }
231            LogicalOperator::Filter(filter) => {
232                // Default selectivity estimate for filters (30%)
233                let input_estimate = self.estimate_cardinality(&filter.input);
234                let estimate = input_estimate * 0.3;
235                let id = format!("filter_{depth}");
236                ctx.set_estimate(&id, estimate);
237
238                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
239            }
240            LogicalOperator::Expand(expand) => {
241                // Estimate based on average degree from store statistics
242                let input_estimate = self.estimate_cardinality(&expand.input);
243                let stats = self.store.statistics();
244                let avg_degree = self.estimate_expand_degree(&stats, expand);
245                let estimate = input_estimate * avg_degree;
246                let id = format!("expand_{}", expand.to_variable);
247                ctx.set_estimate(&id, estimate);
248
249                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
250            }
251            LogicalOperator::Join(join) => {
252                // Estimate join output (product with selectivity)
253                let left_est = self.estimate_cardinality(&join.left);
254                let right_est = self.estimate_cardinality(&join.right);
255                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
256                let id = format!("join_{depth}");
257                ctx.set_estimate(&id, estimate);
258
259                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
260                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
261            }
262            LogicalOperator::Aggregate(agg) => {
263                // Aggregates typically reduce cardinality
264                let input_estimate = self.estimate_cardinality(&agg.input);
265                let estimate = if agg.group_by.is_empty() {
266                    1.0 // Scalar aggregate
267                } else {
268                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
269                };
270                let id = format!("aggregate_{depth}");
271                ctx.set_estimate(&id, estimate);
272
273                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
274            }
275            LogicalOperator::Distinct(distinct) => {
276                let input_estimate = self.estimate_cardinality(&distinct.input);
277                let estimate = (input_estimate * 0.5).max(1.0);
278                let id = format!("distinct_{depth}");
279                ctx.set_estimate(&id, estimate);
280
281                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
282            }
283            LogicalOperator::Return(ret) => {
284                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
285            }
286            LogicalOperator::Limit(limit) => {
287                let input_estimate = self.estimate_cardinality(&limit.input);
288                let estimate = (input_estimate).min(limit.count as f64);
289                let id = format!("limit_{depth}");
290                ctx.set_estimate(&id, estimate);
291
292                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
293            }
294            LogicalOperator::Skip(skip) => {
295                let input_estimate = self.estimate_cardinality(&skip.input);
296                let estimate = (input_estimate - skip.count as f64).max(0.0);
297                let id = format!("skip_{depth}");
298                ctx.set_estimate(&id, estimate);
299
300                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
301            }
302            LogicalOperator::Sort(sort) => {
303                // Sort doesn't change cardinality
304                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
305            }
306            LogicalOperator::Union(union) => {
307                let estimate: f64 = union
308                    .inputs
309                    .iter()
310                    .map(|input| self.estimate_cardinality(input))
311                    .sum();
312                let id = format!("union_{depth}");
313                ctx.set_estimate(&id, estimate);
314
315                for input in &union.inputs {
316                    self.collect_cardinality_estimates(input, ctx, depth + 1);
317                }
318            }
319            _ => {
320                // For other operators, try to recurse into known input patterns
321            }
322        }
323    }
324
325    /// Estimates cardinality for a logical operator subtree.
326    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
327        match op {
328            LogicalOperator::NodeScan(scan) => {
329                if let Some(label) = &scan.label {
330                    self.store.nodes_by_label(label).len() as f64
331                } else {
332                    self.store.node_count() as f64
333                }
334            }
335            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
336            LogicalOperator::Expand(expand) => {
337                let stats = self.store.statistics();
338                let avg_degree = self.estimate_expand_degree(&stats, expand);
339                self.estimate_cardinality(&expand.input) * avg_degree
340            }
341            LogicalOperator::Join(join) => {
342                let left = self.estimate_cardinality(&join.left);
343                let right = self.estimate_cardinality(&join.right);
344                (left * right).sqrt()
345            }
346            LogicalOperator::Aggregate(agg) => {
347                if agg.group_by.is_empty() {
348                    1.0
349                } else {
350                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
351                }
352            }
353            LogicalOperator::Distinct(distinct) => {
354                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
355            }
356            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
357            LogicalOperator::Limit(limit) => self
358                .estimate_cardinality(&limit.input)
359                .min(limit.count as f64),
360            LogicalOperator::Skip(skip) => {
361                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
362            }
363            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
364            LogicalOperator::Union(union) => union
365                .inputs
366                .iter()
367                .map(|input| self.estimate_cardinality(input))
368                .sum(),
369            _ => 1000.0, // Default estimate for unknown operators
370        }
371    }
372
373    /// Estimates the average edge degree for an expand operation using store statistics.
374    fn estimate_expand_degree(
375        &self,
376        stats: &grafeo_core::statistics::Statistics,
377        expand: &ExpandOp,
378    ) -> f64 {
379        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
380        if let Some(edge_type) = &expand.edge_type {
381            stats.estimate_avg_degree(edge_type, outgoing)
382        } else if stats.total_nodes > 0 {
383            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
384        } else {
385            10.0 // fallback for empty graph
386        }
387    }
388
389    /// Plans a single logical operator.
390    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
391        match op {
392            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
393            LogicalOperator::Expand(expand) => {
394                // Check for expand chains when factorized execution is enabled
395                if self.factorized_execution {
396                    let (chain_len, _base) = Self::count_expand_chain(op);
397                    if chain_len >= 2 {
398                        // Use factorized chain for 2+ consecutive single-hop expands
399                        return self.plan_expand_chain(op);
400                    }
401                }
402                self.plan_expand(expand)
403            }
404            LogicalOperator::Return(ret) => self.plan_return(ret),
405            LogicalOperator::Filter(filter) => self.plan_filter(filter),
406            LogicalOperator::Project(project) => self.plan_project(project),
407            LogicalOperator::Limit(limit) => self.plan_limit(limit),
408            LogicalOperator::Skip(skip) => self.plan_skip(skip),
409            LogicalOperator::Sort(sort) => self.plan_sort(sort),
410            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
411            LogicalOperator::Join(join) => self.plan_join(join),
412            LogicalOperator::Union(union) => self.plan_union(union),
413            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
414            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
415            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
416            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
417            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
418            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
419            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
420            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
421            LogicalOperator::Merge(merge) => self.plan_merge(merge),
422            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
423            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
424            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
425            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
426            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
427            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
428            LogicalOperator::VectorScan(_) => Err(Error::Internal(
429                "VectorScan requires vector-index feature".to_string(),
430            )),
431            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
432                "VectorJoin requires vector-index feature".to_string(),
433            )),
434            _ => Err(Error::Internal(format!(
435                "Unsupported operator: {:?}",
436                std::mem::discriminant(op)
437            ))),
438        }
439    }
440
441    /// Plans a node scan operator.
442    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
443        let scan_op = if let Some(label) = &scan.label {
444            ScanOperator::with_label(Arc::clone(&self.store), label)
445        } else {
446            ScanOperator::new(Arc::clone(&self.store))
447        };
448
449        // Apply MVCC context if available
450        let scan_operator: Box<dyn Operator> =
451            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
452
453        // If there's an input, chain operators with a nested loop join (cross join)
454        if let Some(input) = &scan.input {
455            let (input_op, mut input_columns) = self.plan_operator(input)?;
456
457            // Build output schema: input columns + scan column
458            let mut output_schema: Vec<LogicalType> =
459                input_columns.iter().map(|_| LogicalType::Any).collect();
460            output_schema.push(LogicalType::Node);
461
462            // Add scan column to input columns
463            input_columns.push(scan.variable.clone());
464
465            // Use nested loop join to combine input rows with scanned nodes
466            let join_op = Box::new(NestedLoopJoinOperator::new(
467                input_op,
468                scan_operator,
469                None, // No join condition (cross join)
470                PhysicalJoinType::Cross,
471                output_schema,
472            ));
473
474            Ok((join_op, input_columns))
475        } else {
476            let columns = vec![scan.variable.clone()];
477            Ok((scan_operator, columns))
478        }
479    }
480
481    /// Plans an expand operator.
482    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
483        // Plan the input operator first
484        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
485
486        // Find the source column index
487        let source_column = input_columns
488            .iter()
489            .position(|c| c == &expand.from_variable)
490            .ok_or_else(|| {
491                Error::Internal(format!(
492                    "Source variable '{}' not found in input columns",
493                    expand.from_variable
494                ))
495            })?;
496
497        // Convert expand direction
498        let direction = match expand.direction {
499            ExpandDirection::Outgoing => Direction::Outgoing,
500            ExpandDirection::Incoming => Direction::Incoming,
501            ExpandDirection::Both => Direction::Both,
502        };
503
504        // Check if this is a variable-length path
505        let is_variable_length =
506            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
507
508        let operator: Box<dyn Operator> = if is_variable_length {
509            // Use VariableLengthExpandOperator for multi-hop paths
510            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
511            let mut expand_op = VariableLengthExpandOperator::new(
512                Arc::clone(&self.store),
513                input_op,
514                source_column,
515                direction,
516                expand.edge_type.clone(),
517                expand.min_hops,
518                max_hops,
519            )
520            .with_tx_context(self.viewing_epoch, self.tx_id);
521
522            // If a path alias is set, enable path length and detail output
523            if expand.path_alias.is_some() {
524                expand_op = expand_op
525                    .with_path_length_output()
526                    .with_path_detail_output();
527            }
528
529            Box::new(expand_op)
530        } else {
531            // Use simple ExpandOperator for single-hop paths
532            let expand_op = ExpandOperator::new(
533                Arc::clone(&self.store),
534                input_op,
535                source_column,
536                direction,
537                expand.edge_type.clone(),
538            )
539            .with_tx_context(self.viewing_epoch, self.tx_id);
540            Box::new(expand_op)
541        };
542
543        // Build output columns: [input_columns..., edge, target, (path_length)?]
544        // Preserve all input columns and add edge + target to match ExpandOperator output
545        let mut columns = input_columns;
546
547        // Generate edge column name - use provided name or generate anonymous name
548        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
549            let count = self.anon_edge_counter.get();
550            self.anon_edge_counter.set(count + 1);
551            format!("_anon_edge_{}", count)
552        });
553        columns.push(edge_col_name);
554
555        columns.push(expand.to_variable.clone());
556
557        // If a path alias is set, add columns for path length, nodes, and edges
558        if let Some(ref path_alias) = expand.path_alias {
559            columns.push(format!("_path_length_{}", path_alias));
560            columns.push(format!("_path_nodes_{}", path_alias));
561            columns.push(format!("_path_edges_{}", path_alias));
562        }
563
564        Ok((operator, columns))
565    }
566
567    /// Plans a chain of consecutive expand operations using factorized execution.
568    ///
569    /// This avoids the Cartesian product explosion that occurs with separate expands.
570    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
571    ///
572    /// The chain is executed lazily at query time, not during planning. This ensures
573    /// that any filters applied above the expand chain are properly respected.
574    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
575        let expands = Self::collect_expand_chain(op);
576        if expands.is_empty() {
577            return Err(Error::Internal("Empty expand chain".to_string()));
578        }
579
580        // Get the base operator (before first expand)
581        let first_expand = expands[0];
582        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
583
584        let mut columns = base_columns.clone();
585        let mut steps = Vec::new();
586
587        // Track the level-local source column for each expand
588        // For the first expand, it's the column in the input (base_columns)
589        // For subsequent expands, the target from the previous level is always at index 1
590        // (each level adds [edge, target], so target is at index 1)
591        let mut is_first = true;
592
593        for expand in &expands {
594            // Find source column for this expand
595            let source_column = if is_first {
596                // For first expand, find in base columns
597                base_columns
598                    .iter()
599                    .position(|c| c == &expand.from_variable)
600                    .ok_or_else(|| {
601                        Error::Internal(format!(
602                            "Source variable '{}' not found in base columns",
603                            expand.from_variable
604                        ))
605                    })?
606            } else {
607                // For subsequent expands, the target from the previous level is at index 1
608                // (each level adds [edge, target], so target is the second column)
609                1
610            };
611
612            // Convert direction
613            let direction = match expand.direction {
614                ExpandDirection::Outgoing => Direction::Outgoing,
615                ExpandDirection::Incoming => Direction::Incoming,
616                ExpandDirection::Both => Direction::Both,
617            };
618
619            // Add expand step configuration
620            steps.push(ExpandStep {
621                source_column,
622                direction,
623                edge_type: expand.edge_type.clone(),
624            });
625
626            // Add edge and target columns
627            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
628                let count = self.anon_edge_counter.get();
629                self.anon_edge_counter.set(count + 1);
630                format!("_anon_edge_{}", count)
631            });
632            columns.push(edge_col_name);
633            columns.push(expand.to_variable.clone());
634
635            is_first = false;
636        }
637
638        // Create lazy operator that executes at query time, not planning time
639        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
640
641        if let Some(tx_id) = self.tx_id {
642            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
643        } else {
644            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
645        }
646
647        Ok((Box::new(lazy_op), columns))
648    }
649
650    /// Plans a RETURN clause.
651    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
652        // Plan the input operator
653        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
654
655        // Build variable to column index mapping
656        let variable_columns: HashMap<String, usize> = input_columns
657            .iter()
658            .enumerate()
659            .map(|(i, name)| (name.clone(), i))
660            .collect();
661
662        // Extract column names from return items
663        let columns: Vec<String> = ret
664            .items
665            .iter()
666            .map(|item| {
667                item.alias.clone().unwrap_or_else(|| {
668                    // Generate a default name from the expression
669                    expression_to_string(&item.expression)
670                })
671            })
672            .collect();
673
674        // Check if we need a project operator (for property access or expression evaluation)
675        let needs_project = ret
676            .items
677            .iter()
678            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
679
680        if needs_project {
681            // Build project expressions
682            let mut projections = Vec::with_capacity(ret.items.len());
683            let mut output_types = Vec::with_capacity(ret.items.len());
684
685            for item in &ret.items {
686                match &item.expression {
687                    LogicalExpression::Variable(name) => {
688                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
689                            Error::Internal(format!("Variable '{}' not found in input", name))
690                        })?;
691                        projections.push(ProjectExpr::Column(col_idx));
692                        // Path detail variables carry List values — use Any/Generic
693                        if name.starts_with("_path_nodes_")
694                            || name.starts_with("_path_edges_")
695                            || name.starts_with("_path_length_")
696                        {
697                            output_types.push(LogicalType::Any);
698                        } else {
699                            output_types.push(LogicalType::Node);
700                        }
701                    }
702                    LogicalExpression::Property { variable, property } => {
703                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
704                            Error::Internal(format!("Variable '{}' not found in input", variable))
705                        })?;
706                        projections.push(ProjectExpr::PropertyAccess {
707                            column: col_idx,
708                            property: property.clone(),
709                        });
710                        // Property could be any type - use Any/Generic to preserve type
711                        output_types.push(LogicalType::Any);
712                    }
713                    LogicalExpression::Literal(value) => {
714                        projections.push(ProjectExpr::Constant(value.clone()));
715                        output_types.push(value_to_logical_type(value));
716                    }
717                    LogicalExpression::FunctionCall { name, args, .. } => {
718                        // Handle built-in functions
719                        match name.to_lowercase().as_str() {
720                            "type" => {
721                                // type(r) returns the edge type string
722                                if args.len() != 1 {
723                                    return Err(Error::Internal(
724                                        "type() requires exactly one argument".to_string(),
725                                    ));
726                                }
727                                if let LogicalExpression::Variable(var_name) = &args[0] {
728                                    let col_idx =
729                                        *variable_columns.get(var_name).ok_or_else(|| {
730                                            Error::Internal(format!(
731                                                "Variable '{}' not found in input",
732                                                var_name
733                                            ))
734                                        })?;
735                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
736                                    output_types.push(LogicalType::String);
737                                } else {
738                                    return Err(Error::Internal(
739                                        "type() argument must be a variable".to_string(),
740                                    ));
741                                }
742                            }
743                            "length" => {
744                                // length(p) returns the path length
745                                // For shortestPath results, the path column already contains the length
746                                if args.len() != 1 {
747                                    return Err(Error::Internal(
748                                        "length() requires exactly one argument".to_string(),
749                                    ));
750                                }
751                                if let LogicalExpression::Variable(var_name) = &args[0] {
752                                    let col_idx =
753                                        *variable_columns.get(var_name).ok_or_else(|| {
754                                            Error::Internal(format!(
755                                                "Variable '{}' not found in input",
756                                                var_name
757                                            ))
758                                        })?;
759                                    // Pass through the column value directly
760                                    projections.push(ProjectExpr::Column(col_idx));
761                                    output_types.push(LogicalType::Int64);
762                                } else {
763                                    return Err(Error::Internal(
764                                        "length() argument must be a variable".to_string(),
765                                    ));
766                                }
767                            }
768                            "nodes" | "edges" => {
769                                // nodes(p) / edges(p) returns the list of nodes/edges in the path
770                                if args.len() != 1 {
771                                    return Err(Error::Internal(format!(
772                                        "{}() requires exactly one argument",
773                                        name
774                                    )));
775                                }
776                                if let LogicalExpression::Variable(var_name) = &args[0] {
777                                    let col_idx =
778                                        *variable_columns.get(var_name).ok_or_else(|| {
779                                            Error::Internal(format!(
780                                                "Variable '{var_name}' not found in input",
781                                            ))
782                                        })?;
783                                    projections.push(ProjectExpr::Column(col_idx));
784                                    output_types.push(LogicalType::Any);
785                                } else {
786                                    return Err(Error::Internal(format!(
787                                        "{}() argument must be a variable",
788                                        name
789                                    )));
790                                }
791                            }
792                            // For other functions (head, tail, size, etc.), use expression evaluation
793                            _ => {
794                                let filter_expr = self.convert_expression(&item.expression)?;
795                                projections.push(ProjectExpr::Expression {
796                                    expr: filter_expr,
797                                    variable_columns: variable_columns.clone(),
798                                });
799                                output_types.push(LogicalType::Any);
800                            }
801                        }
802                    }
803                    LogicalExpression::Case { .. } => {
804                        // Convert CASE expression to FilterExpression for evaluation
805                        let filter_expr = self.convert_expression(&item.expression)?;
806                        projections.push(ProjectExpr::Expression {
807                            expr: filter_expr,
808                            variable_columns: variable_columns.clone(),
809                        });
810                        // CASE can return any type - use Any
811                        output_types.push(LogicalType::Any);
812                    }
813                    _ => {
814                        return Err(Error::Internal(format!(
815                            "Unsupported RETURN expression: {:?}",
816                            item.expression
817                        )));
818                    }
819                }
820            }
821
822            let operator = Box::new(ProjectOperator::with_store(
823                input_op,
824                projections,
825                output_types,
826                Arc::clone(&self.store),
827            ));
828
829            Ok((operator, columns))
830        } else {
831            // Simple case: just return variables
832            // Re-order columns to match return items if needed
833            let mut projections = Vec::with_capacity(ret.items.len());
834            let mut output_types = Vec::with_capacity(ret.items.len());
835
836            for item in &ret.items {
837                if let LogicalExpression::Variable(name) = &item.expression {
838                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
839                        Error::Internal(format!("Variable '{}' not found in input", name))
840                    })?;
841                    projections.push(ProjectExpr::Column(col_idx));
842                    output_types.push(LogicalType::Node);
843                }
844            }
845
846            // Only add ProjectOperator if reordering is needed
847            if projections.len() == input_columns.len()
848                && projections
849                    .iter()
850                    .enumerate()
851                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
852            {
853                // No reordering needed
854                Ok((input_op, columns))
855            } else {
856                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
857                Ok((operator, columns))
858            }
859        }
860    }
861
862    /// Plans a project operator (for WITH clause).
863    fn plan_project(
864        &self,
865        project: &crate::query::plan::ProjectOp,
866    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
867        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
868        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
869            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
870                // Create a single-row operator for projecting literals
871                let single_row_op: Box<dyn Operator> = Box::new(
872                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
873                );
874                (single_row_op, Vec::new())
875            } else {
876                self.plan_operator(&project.input)?
877            };
878
879        // Build variable to column index mapping
880        let variable_columns: HashMap<String, usize> = input_columns
881            .iter()
882            .enumerate()
883            .map(|(i, name)| (name.clone(), i))
884            .collect();
885
886        // Build projections and new column names
887        let mut projections = Vec::with_capacity(project.projections.len());
888        let mut output_types = Vec::with_capacity(project.projections.len());
889        let mut output_columns = Vec::with_capacity(project.projections.len());
890
891        for projection in &project.projections {
892            // Determine the output column name (alias or expression string)
893            let col_name = projection
894                .alias
895                .clone()
896                .unwrap_or_else(|| expression_to_string(&projection.expression));
897            output_columns.push(col_name);
898
899            match &projection.expression {
900                LogicalExpression::Variable(name) => {
901                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
902                        Error::Internal(format!("Variable '{}' not found in input", name))
903                    })?;
904                    projections.push(ProjectExpr::Column(col_idx));
905                    output_types.push(LogicalType::Node);
906                }
907                LogicalExpression::Property { variable, property } => {
908                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
909                        Error::Internal(format!("Variable '{}' not found in input", variable))
910                    })?;
911                    projections.push(ProjectExpr::PropertyAccess {
912                        column: col_idx,
913                        property: property.clone(),
914                    });
915                    output_types.push(LogicalType::Any);
916                }
917                LogicalExpression::Literal(value) => {
918                    projections.push(ProjectExpr::Constant(value.clone()));
919                    output_types.push(value_to_logical_type(value));
920                }
921                _ => {
922                    // For complex expressions, use full expression evaluation
923                    let filter_expr = self.convert_expression(&projection.expression)?;
924                    projections.push(ProjectExpr::Expression {
925                        expr: filter_expr,
926                        variable_columns: variable_columns.clone(),
927                    });
928                    output_types.push(LogicalType::Any);
929                }
930            }
931        }
932
933        let operator = Box::new(ProjectOperator::with_store(
934            input_op,
935            projections,
936            output_types,
937            Arc::clone(&self.store),
938        ));
939
940        Ok((operator, output_columns))
941    }
942
943    /// Plans a filter operator.
944    ///
945    /// Uses zone map pre-filtering to potentially skip scans when predicates
946    /// definitely won't match any data. Also uses property indexes when available
947    /// for O(1) lookups instead of full scans.
948    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
949        // Check zone maps for simple property predicates before scanning
950        // If zone map says "definitely no matches", we can short-circuit
951        if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
952            // Zone map says no matches possible - return empty result
953            let (_, columns) = self.plan_operator(&filter.input)?;
954            let schema = self.derive_schema_from_columns(&columns);
955            let empty_op = Box::new(EmptyOperator::new(schema));
956            return Ok((empty_op, columns));
957        }
958
959        // Try to use property index for equality predicates on indexed properties
960        if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
961            return Ok(result);
962        }
963
964        // Try to use range optimization for range predicates (>, <, >=, <=)
965        if let Some(result) = self.try_plan_filter_with_range_index(filter)? {
966            return Ok(result);
967        }
968
969        // Plan the input operator first
970        let (input_op, columns) = self.plan_operator(&filter.input)?;
971
972        // Build variable to column index mapping
973        let variable_columns: HashMap<String, usize> = columns
974            .iter()
975            .enumerate()
976            .map(|(i, name)| (name.clone(), i))
977            .collect();
978
979        // Convert logical expression to filter expression
980        let filter_expr = self.convert_expression(&filter.predicate)?;
981
982        // Create the predicate
983        let predicate =
984            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
985
986        // Create the filter operator
987        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
988
989        Ok((operator, columns))
990    }
991
992    /// Checks zone maps for a predicate to see if we can skip the scan entirely.
993    ///
994    /// Returns:
995    /// - `Some(false)` if zone map proves no matches possible (can skip)
996    /// - `Some(true)` if zone map says matches might exist
997    /// - `None` if zone map check not applicable
998    fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
999        use grafeo_core::graph::lpg::CompareOp;
1000
1001        match predicate {
1002            LogicalExpression::Binary { left, op, right } => {
1003                // Check for AND/OR first (compound conditions)
1004                match op {
1005                    BinaryOp::And => {
1006                        let left_result = self.check_zone_map_for_predicate(left);
1007                        let right_result = self.check_zone_map_for_predicate(right);
1008
1009                        return match (left_result, right_result) {
1010                            // If either side definitely won't match, the AND won't match
1011                            (Some(false), _) | (_, Some(false)) => Some(false),
1012                            // If both might match, might match overall
1013                            (Some(true), Some(true)) => Some(true),
1014                            // Otherwise, can't determine
1015                            _ => None,
1016                        };
1017                    }
1018                    BinaryOp::Or => {
1019                        let left_result = self.check_zone_map_for_predicate(left);
1020                        let right_result = self.check_zone_map_for_predicate(right);
1021
1022                        return match (left_result, right_result) {
1023                            // Both sides definitely won't match
1024                            (Some(false), Some(false)) => Some(false),
1025                            // At least one side might match
1026                            (Some(true), _) | (_, Some(true)) => Some(true),
1027                            // Otherwise, can't determine
1028                            _ => None,
1029                        };
1030                    }
1031                    _ => {}
1032                }
1033
1034                // Simple property comparison: n.property op value
1035                let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
1036                    (
1037                        LogicalExpression::Property { property, .. },
1038                        LogicalExpression::Literal(val),
1039                    ) => {
1040                        let cmp = match op {
1041                            BinaryOp::Eq => CompareOp::Eq,
1042                            BinaryOp::Ne => CompareOp::Ne,
1043                            BinaryOp::Lt => CompareOp::Lt,
1044                            BinaryOp::Le => CompareOp::Le,
1045                            BinaryOp::Gt => CompareOp::Gt,
1046                            BinaryOp::Ge => CompareOp::Ge,
1047                            _ => return None,
1048                        };
1049                        (property.clone(), cmp, val.clone())
1050                    }
1051                    (
1052                        LogicalExpression::Literal(val),
1053                        LogicalExpression::Property { property, .. },
1054                    ) => {
1055                        // Flip comparison for reversed operands
1056                        let cmp = match op {
1057                            BinaryOp::Eq => CompareOp::Eq,
1058                            BinaryOp::Ne => CompareOp::Ne,
1059                            BinaryOp::Lt => CompareOp::Gt, // val < prop means prop > val
1060                            BinaryOp::Le => CompareOp::Ge,
1061                            BinaryOp::Gt => CompareOp::Lt,
1062                            BinaryOp::Ge => CompareOp::Le,
1063                            _ => return None,
1064                        };
1065                        (property.clone(), cmp, val.clone())
1066                    }
1067                    _ => return None,
1068                };
1069
1070                // Check zone map for node properties
1071                let might_match =
1072                    self.store
1073                        .node_property_might_match(&property.into(), compare_op, &value);
1074
1075                Some(might_match)
1076            }
1077
1078            _ => None,
1079        }
1080    }
1081
1082    /// Tries to use a property index for filter optimization.
1083    ///
1084    /// When a filter predicate is an equality check on an indexed property,
1085    /// and the input is a simple NodeScan, we can use the index to look up
1086    /// matching nodes directly instead of scanning all nodes.
1087    ///
1088    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1089    /// `Ok(None)` if not applicable, or `Err` on error.
1090    fn try_plan_filter_with_property_index(
1091        &self,
1092        filter: &FilterOp,
1093    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1094        // Only optimize if input is a simple NodeScan (not nested)
1095        let (scan_variable, scan_label) = match filter.input.as_ref() {
1096            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1097                (scan.variable.clone(), scan.label.clone())
1098            }
1099            _ => return Ok(None),
1100        };
1101
1102        // Extract property equality conditions from the predicate
1103        // Handles both simple (n.prop = val) and compound (n.a = 1 AND n.b = 2)
1104        let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1105
1106        if conditions.is_empty() {
1107            return Ok(None);
1108        }
1109
1110        // Check if at least one condition has an index (otherwise full scan is needed anyway)
1111        let has_indexed_condition = conditions
1112            .iter()
1113            .any(|(prop, _)| self.store.has_property_index(prop));
1114
1115        if !has_indexed_condition {
1116            return Ok(None);
1117        }
1118
1119        // Use the optimized batch lookup for multiple conditions
1120        let conditions_ref: Vec<(&str, Value)> = conditions
1121            .iter()
1122            .map(|(p, v)| (p.as_str(), v.clone()))
1123            .collect();
1124        let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1125
1126        // If there's a label filter, also filter by label
1127        if let Some(label) = &scan_label {
1128            let label_nodes: std::collections::HashSet<_> =
1129                self.store.nodes_by_label(label).into_iter().collect();
1130            matching_nodes.retain(|n| label_nodes.contains(n));
1131        }
1132
1133        // Create a NodeListOperator with the matching nodes
1134        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1135        let columns = vec![scan_variable];
1136
1137        Ok(Some((node_list_op, columns)))
1138    }
1139
1140    /// Extracts equality conditions (property = literal) from a predicate.
1141    ///
1142    /// Handles both simple predicates and AND chains:
1143    /// - `n.name = "Alice"` → `[("name", "Alice")]`
1144    /// - `n.name = "Alice" AND n.age = 30` → `[("name", "Alice"), ("age", 30)]`
1145    fn extract_equality_conditions(
1146        &self,
1147        predicate: &LogicalExpression,
1148        target_variable: &str,
1149    ) -> Vec<(String, Value)> {
1150        let mut conditions = Vec::new();
1151        self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1152        conditions
1153    }
1154
1155    /// Recursively collects equality conditions from AND expressions.
1156    fn collect_equality_conditions(
1157        &self,
1158        expr: &LogicalExpression,
1159        target_variable: &str,
1160        conditions: &mut Vec<(String, Value)>,
1161    ) {
1162        match expr {
1163            // Handle AND: recurse into both sides
1164            LogicalExpression::Binary {
1165                left,
1166                op: BinaryOp::And,
1167                right,
1168            } => {
1169                self.collect_equality_conditions(left, target_variable, conditions);
1170                self.collect_equality_conditions(right, target_variable, conditions);
1171            }
1172
1173            // Handle equality: extract property and value
1174            LogicalExpression::Binary {
1175                left,
1176                op: BinaryOp::Eq,
1177                right,
1178            } => {
1179                if let Some((var, prop, val)) = self.extract_property_equality(left, right)
1180                    && var == target_variable
1181                {
1182                    conditions.push((prop, val));
1183                }
1184            }
1185
1186            _ => {}
1187        }
1188    }
1189
1190    /// Extracts (variable, property, value) from a property equality expression.
1191    fn extract_property_equality(
1192        &self,
1193        left: &LogicalExpression,
1194        right: &LogicalExpression,
1195    ) -> Option<(String, String, Value)> {
1196        match (left, right) {
1197            (
1198                LogicalExpression::Property { variable, property },
1199                LogicalExpression::Literal(val),
1200            ) => Some((variable.clone(), property.clone(), val.clone())),
1201            (
1202                LogicalExpression::Literal(val),
1203                LogicalExpression::Property { variable, property },
1204            ) => Some((variable.clone(), property.clone(), val.clone())),
1205            _ => None,
1206        }
1207    }
1208
1209    /// Tries to optimize a filter using range queries on properties.
1210    ///
1211    /// This optimization is applied when:
1212    /// - The input is a simple NodeScan (no nested operations)
1213    /// - The predicate contains range comparisons (>, <, >=, <=)
1214    /// - The same variable and property are being filtered
1215    ///
1216    /// Handles both simple range predicates (`n.age > 30`) and BETWEEN patterns
1217    /// (`n.age >= 30 AND n.age <= 50`).
1218    ///
1219    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1220    /// `Ok(None)` if not applicable, or `Err` on error.
1221    fn try_plan_filter_with_range_index(
1222        &self,
1223        filter: &FilterOp,
1224    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1225        // Only optimize if input is a simple NodeScan (not nested)
1226        let (scan_variable, scan_label) = match filter.input.as_ref() {
1227            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1228                (scan.variable.clone(), scan.label.clone())
1229            }
1230            _ => return Ok(None),
1231        };
1232
1233        // Try to extract BETWEEN pattern first (more efficient)
1234        if let Some((variable, property, min, max, min_inc, max_inc)) =
1235            self.extract_between_predicate(&filter.predicate)
1236            && variable == scan_variable
1237        {
1238            return self.plan_range_filter(
1239                &scan_variable,
1240                &scan_label,
1241                &property,
1242                RangeBounds {
1243                    min: Some(&min),
1244                    max: Some(&max),
1245                    min_inclusive: min_inc,
1246                    max_inclusive: max_inc,
1247                },
1248            );
1249        }
1250
1251        // Try to extract simple range predicate
1252        if let Some((variable, property, op, value)) =
1253            self.extract_range_predicate(&filter.predicate)
1254            && variable == scan_variable
1255        {
1256            let (min, max, min_inc, max_inc) = match op {
1257                BinaryOp::Lt => (None, Some(value), false, false),
1258                BinaryOp::Le => (None, Some(value), false, true),
1259                BinaryOp::Gt => (Some(value), None, false, false),
1260                BinaryOp::Ge => (Some(value), None, true, false),
1261                _ => return Ok(None),
1262            };
1263            return self.plan_range_filter(
1264                &scan_variable,
1265                &scan_label,
1266                &property,
1267                RangeBounds {
1268                    min: min.as_ref(),
1269                    max: max.as_ref(),
1270                    min_inclusive: min_inc,
1271                    max_inclusive: max_inc,
1272                },
1273            );
1274        }
1275
1276        Ok(None)
1277    }
1278
1279    /// Plans a range filter using `find_nodes_in_range`.
1280    fn plan_range_filter(
1281        &self,
1282        scan_variable: &str,
1283        scan_label: &Option<String>,
1284        property: &str,
1285        bounds: RangeBounds<'_>,
1286    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1287        // Use the store's range query method
1288        let mut matching_nodes = self.store.find_nodes_in_range(
1289            property,
1290            bounds.min,
1291            bounds.max,
1292            bounds.min_inclusive,
1293            bounds.max_inclusive,
1294        );
1295
1296        // If there's a label filter, also filter by label
1297        if let Some(label) = scan_label {
1298            let label_nodes: std::collections::HashSet<_> =
1299                self.store.nodes_by_label(label).into_iter().collect();
1300            matching_nodes.retain(|n| label_nodes.contains(n));
1301        }
1302
1303        // Create a NodeListOperator with the matching nodes
1304        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1305        let columns = vec![scan_variable.to_string()];
1306
1307        Ok(Some((node_list_op, columns)))
1308    }
1309
1310    /// Extracts a simple range predicate (>, <, >=, <=) from an expression.
1311    ///
1312    /// Returns `(variable, property, operator, value)` if found.
1313    fn extract_range_predicate(
1314        &self,
1315        predicate: &LogicalExpression,
1316    ) -> Option<(String, String, BinaryOp, Value)> {
1317        match predicate {
1318            LogicalExpression::Binary { left, op, right } => {
1319                match op {
1320                    BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1321                        // Try property on left: n.age > 30
1322                        if let (
1323                            LogicalExpression::Property { variable, property },
1324                            LogicalExpression::Literal(val),
1325                        ) = (left.as_ref(), right.as_ref())
1326                        {
1327                            return Some((variable.clone(), property.clone(), *op, val.clone()));
1328                        }
1329
1330                        // Try property on right: 30 < n.age (flip operator)
1331                        if let (
1332                            LogicalExpression::Literal(val),
1333                            LogicalExpression::Property { variable, property },
1334                        ) = (left.as_ref(), right.as_ref())
1335                        {
1336                            let flipped_op = match op {
1337                                BinaryOp::Lt => BinaryOp::Gt,
1338                                BinaryOp::Le => BinaryOp::Ge,
1339                                BinaryOp::Gt => BinaryOp::Lt,
1340                                BinaryOp::Ge => BinaryOp::Le,
1341                                _ => return None,
1342                            };
1343                            return Some((
1344                                variable.clone(),
1345                                property.clone(),
1346                                flipped_op,
1347                                val.clone(),
1348                            ));
1349                        }
1350                    }
1351                    _ => {}
1352                }
1353            }
1354            _ => {}
1355        }
1356        None
1357    }
1358
1359    /// Extracts a BETWEEN pattern from compound predicates.
1360    ///
1361    /// Recognizes patterns like:
1362    /// - `n.age >= 30 AND n.age <= 50`
1363    /// - `n.age > 30 AND n.age < 50`
1364    ///
1365    /// Returns `(variable, property, min_value, max_value, min_inclusive, max_inclusive)`.
1366    fn extract_between_predicate(
1367        &self,
1368        predicate: &LogicalExpression,
1369    ) -> Option<(String, String, Value, Value, bool, bool)> {
1370        // Must be an AND expression
1371        let (left, right) = match predicate {
1372            LogicalExpression::Binary {
1373                left,
1374                op: BinaryOp::And,
1375                right,
1376            } => (left.as_ref(), right.as_ref()),
1377            _ => return None,
1378        };
1379
1380        // Extract range predicates from both sides
1381        let left_range = self.extract_range_predicate(left);
1382        let right_range = self.extract_range_predicate(right);
1383
1384        let (left_var, left_prop, left_op, left_val) = left_range?;
1385        let (right_var, right_prop, right_op, right_val) = right_range?;
1386
1387        // Must be same variable and property
1388        if left_var != right_var || left_prop != right_prop {
1389            return None;
1390        }
1391
1392        // Determine which is lower bound and which is upper bound
1393        let (min_val, max_val, min_inc, max_inc) = match (left_op, right_op) {
1394            // n.x >= min AND n.x <= max
1395            (BinaryOp::Ge, BinaryOp::Le) => (left_val, right_val, true, true),
1396            // n.x >= min AND n.x < max
1397            (BinaryOp::Ge, BinaryOp::Lt) => (left_val, right_val, true, false),
1398            // n.x > min AND n.x <= max
1399            (BinaryOp::Gt, BinaryOp::Le) => (left_val, right_val, false, true),
1400            // n.x > min AND n.x < max
1401            (BinaryOp::Gt, BinaryOp::Lt) => (left_val, right_val, false, false),
1402            // Reversed order: n.x <= max AND n.x >= min
1403            (BinaryOp::Le, BinaryOp::Ge) => (right_val, left_val, true, true),
1404            // n.x < max AND n.x >= min
1405            (BinaryOp::Lt, BinaryOp::Ge) => (right_val, left_val, true, false),
1406            // n.x <= max AND n.x > min
1407            (BinaryOp::Le, BinaryOp::Gt) => (right_val, left_val, false, true),
1408            // n.x < max AND n.x > min
1409            (BinaryOp::Lt, BinaryOp::Gt) => (right_val, left_val, false, false),
1410            _ => return None,
1411        };
1412
1413        Some((left_var, left_prop, min_val, max_val, min_inc, max_inc))
1414    }
1415
1416    /// Plans a LIMIT operator.
1417    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1418        let (input_op, columns) = self.plan_operator(&limit.input)?;
1419        let output_schema = self.derive_schema_from_columns(&columns);
1420        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1421        Ok((operator, columns))
1422    }
1423
1424    /// Plans a SKIP operator.
1425    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1426        let (input_op, columns) = self.plan_operator(&skip.input)?;
1427        let output_schema = self.derive_schema_from_columns(&columns);
1428        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1429        Ok((operator, columns))
1430    }
1431
1432    /// Plans a SORT (ORDER BY) operator.
1433    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1434        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1435
1436        // Build variable to column index mapping
1437        let mut variable_columns: HashMap<String, usize> = input_columns
1438            .iter()
1439            .enumerate()
1440            .map(|(i, name)| (name.clone(), i))
1441            .collect();
1442
1443        // Collect property expressions that need to be projected before sorting
1444        let mut property_projections: Vec<(String, String, String)> = Vec::new();
1445        let mut next_col_idx = input_columns.len();
1446
1447        for key in &sort.keys {
1448            if let LogicalExpression::Property { variable, property } = &key.expression {
1449                let col_name = format!("{}_{}", variable, property);
1450                if !variable_columns.contains_key(&col_name) {
1451                    property_projections.push((
1452                        variable.clone(),
1453                        property.clone(),
1454                        col_name.clone(),
1455                    ));
1456                    variable_columns.insert(col_name, next_col_idx);
1457                    next_col_idx += 1;
1458                }
1459            }
1460        }
1461
1462        // Track output columns
1463        let mut output_columns = input_columns.clone();
1464
1465        // If we have property expressions, add a projection to materialize them
1466        if !property_projections.is_empty() {
1467            let mut projections = Vec::new();
1468            let mut output_types = Vec::new();
1469
1470            // First, pass through all existing columns (use Node type to preserve node IDs
1471            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1472            for (i, _) in input_columns.iter().enumerate() {
1473                projections.push(ProjectExpr::Column(i));
1474                output_types.push(LogicalType::Node);
1475            }
1476
1477            // Then add property access projections
1478            for (variable, property, col_name) in &property_projections {
1479                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1480                    Error::Internal(format!(
1481                        "Variable '{}' not found for ORDER BY property projection",
1482                        variable
1483                    ))
1484                })?;
1485                projections.push(ProjectExpr::PropertyAccess {
1486                    column: source_col,
1487                    property: property.clone(),
1488                });
1489                output_types.push(LogicalType::Any);
1490                output_columns.push(col_name.clone());
1491            }
1492
1493            input_op = Box::new(ProjectOperator::with_store(
1494                input_op,
1495                projections,
1496                output_types,
1497                Arc::clone(&self.store),
1498            ));
1499        }
1500
1501        // Convert logical sort keys to physical sort keys
1502        let physical_keys: Vec<PhysicalSortKey> = sort
1503            .keys
1504            .iter()
1505            .map(|key| {
1506                let col_idx = self
1507                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1508                Ok(PhysicalSortKey {
1509                    column: col_idx,
1510                    direction: match key.order {
1511                        SortOrder::Ascending => SortDirection::Ascending,
1512                        SortOrder::Descending => SortDirection::Descending,
1513                    },
1514                    null_order: NullOrder::NullsLast,
1515                })
1516            })
1517            .collect::<Result<Vec<_>>>()?;
1518
1519        let output_schema = self.derive_schema_from_columns(&output_columns);
1520        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1521        Ok((operator, output_columns))
1522    }
1523
1524    /// Resolves a sort expression to a column index, using projected property columns.
1525    fn resolve_sort_expression_with_properties(
1526        &self,
1527        expr: &LogicalExpression,
1528        variable_columns: &HashMap<String, usize>,
1529    ) -> Result<usize> {
1530        match expr {
1531            LogicalExpression::Variable(name) => {
1532                variable_columns.get(name).copied().ok_or_else(|| {
1533                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1534                })
1535            }
1536            LogicalExpression::Property { variable, property } => {
1537                // Look up the projected property column (e.g., "p_age" for p.age)
1538                let col_name = format!("{}_{}", variable, property);
1539                variable_columns.get(&col_name).copied().ok_or_else(|| {
1540                    Error::Internal(format!(
1541                        "Property column '{}' not found for ORDER BY (from {}.{})",
1542                        col_name, variable, property
1543                    ))
1544                })
1545            }
1546            _ => Err(Error::Internal(format!(
1547                "Unsupported ORDER BY expression: {:?}",
1548                expr
1549            ))),
1550        }
1551    }
1552
1553    /// Derives a schema from column names (uses Any type to handle all value types).
1554    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1555        columns.iter().map(|_| LogicalType::Any).collect()
1556    }
1557
1558    /// Plans an AGGREGATE operator.
1559    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1560        // Check if we can use factorized aggregation for speedup
1561        // Conditions:
1562        // 1. Factorized execution is enabled
1563        // 2. Input is an expand chain (multi-hop)
1564        // 3. No GROUP BY
1565        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1566        if self.factorized_execution
1567            && agg.group_by.is_empty()
1568            && Self::count_expand_chain(&agg.input).0 >= 2
1569            && self.is_simple_aggregate(agg)
1570            && let Ok((op, cols)) = self.plan_factorized_aggregate(agg)
1571        {
1572            return Ok((op, cols));
1573        }
1574        // Fall through to regular aggregate if factorized planning fails
1575
1576        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1577
1578        // Build variable to column index mapping
1579        let mut variable_columns: HashMap<String, usize> = input_columns
1580            .iter()
1581            .enumerate()
1582            .map(|(i, name)| (name.clone(), i))
1583            .collect();
1584
1585        // Collect all property expressions that need to be projected before aggregation
1586        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1587        let mut next_col_idx = input_columns.len();
1588
1589        // Check group-by expressions for properties
1590        for expr in &agg.group_by {
1591            if let LogicalExpression::Property { variable, property } = expr {
1592                let col_name = format!("{}_{}", variable, property);
1593                if !variable_columns.contains_key(&col_name) {
1594                    property_projections.push((
1595                        variable.clone(),
1596                        property.clone(),
1597                        col_name.clone(),
1598                    ));
1599                    variable_columns.insert(col_name, next_col_idx);
1600                    next_col_idx += 1;
1601                }
1602            }
1603        }
1604
1605        // Check aggregate expressions for properties
1606        for agg_expr in &agg.aggregates {
1607            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1608                let col_name = format!("{}_{}", variable, property);
1609                if !variable_columns.contains_key(&col_name) {
1610                    property_projections.push((
1611                        variable.clone(),
1612                        property.clone(),
1613                        col_name.clone(),
1614                    ));
1615                    variable_columns.insert(col_name, next_col_idx);
1616                    next_col_idx += 1;
1617                }
1618            }
1619        }
1620
1621        // If we have property expressions, add a projection to materialize them
1622        if !property_projections.is_empty() {
1623            let mut projections = Vec::new();
1624            let mut output_types = Vec::new();
1625
1626            // First, pass through all existing columns (use Node type to preserve node IDs
1627            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1628            for (i, _) in input_columns.iter().enumerate() {
1629                projections.push(ProjectExpr::Column(i));
1630                output_types.push(LogicalType::Node);
1631            }
1632
1633            // Then add property access projections
1634            for (variable, property, _col_name) in &property_projections {
1635                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1636                    Error::Internal(format!(
1637                        "Variable '{}' not found for property projection",
1638                        variable
1639                    ))
1640                })?;
1641                projections.push(ProjectExpr::PropertyAccess {
1642                    column: source_col,
1643                    property: property.clone(),
1644                });
1645                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1646            }
1647
1648            input_op = Box::new(ProjectOperator::with_store(
1649                input_op,
1650                projections,
1651                output_types,
1652                Arc::clone(&self.store),
1653            ));
1654        }
1655
1656        // Convert group-by expressions to column indices
1657        let group_columns: Vec<usize> = agg
1658            .group_by
1659            .iter()
1660            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1661            .collect::<Result<Vec<_>>>()?;
1662
1663        // Convert aggregate expressions to physical form
1664        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1665            .aggregates
1666            .iter()
1667            .map(|agg_expr| {
1668                let column = agg_expr
1669                    .expression
1670                    .as_ref()
1671                    .map(|e| {
1672                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1673                    })
1674                    .transpose()?;
1675
1676                Ok(PhysicalAggregateExpr {
1677                    function: convert_aggregate_function(agg_expr.function),
1678                    column,
1679                    distinct: agg_expr.distinct,
1680                    alias: agg_expr.alias.clone(),
1681                    percentile: agg_expr.percentile,
1682                })
1683            })
1684            .collect::<Result<Vec<_>>>()?;
1685
1686        // Build output schema and column names
1687        let mut output_schema = Vec::new();
1688        let mut output_columns = Vec::new();
1689
1690        // Add group-by columns
1691        for expr in &agg.group_by {
1692            output_schema.push(LogicalType::Any); // Group-by values can be any type
1693            output_columns.push(expression_to_string(expr));
1694        }
1695
1696        // Add aggregate result columns
1697        for agg_expr in &agg.aggregates {
1698            let result_type = match agg_expr.function {
1699                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1700                    LogicalType::Int64
1701                }
1702                LogicalAggregateFunction::Sum => LogicalType::Int64,
1703                LogicalAggregateFunction::Avg => LogicalType::Float64,
1704                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1705                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1706                    // since the aggregate can return any Value type, but the most common case
1707                    // is numeric values from property expressions
1708                    LogicalType::Int64
1709                }
1710                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1711                // Statistical functions return Float64
1712                LogicalAggregateFunction::StdDev
1713                | LogicalAggregateFunction::StdDevPop
1714                | LogicalAggregateFunction::PercentileDisc
1715                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1716            };
1717            output_schema.push(result_type);
1718            output_columns.push(
1719                agg_expr
1720                    .alias
1721                    .clone()
1722                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1723            );
1724        }
1725
1726        // Choose operator based on whether there are group-by columns
1727        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1728            Box::new(SimpleAggregateOperator::new(
1729                input_op,
1730                physical_aggregates,
1731                output_schema,
1732            ))
1733        } else {
1734            Box::new(HashAggregateOperator::new(
1735                input_op,
1736                group_columns,
1737                physical_aggregates,
1738                output_schema,
1739            ))
1740        };
1741
1742        // Apply HAVING clause filter if present
1743        if let Some(having_expr) = &agg.having {
1744            // Build variable to column mapping for the aggregate output
1745            let having_var_columns: HashMap<String, usize> = output_columns
1746                .iter()
1747                .enumerate()
1748                .map(|(i, name)| (name.clone(), i))
1749                .collect();
1750
1751            let filter_expr = self.convert_expression(having_expr)?;
1752            let predicate =
1753                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1754            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1755        }
1756
1757        Ok((operator, output_columns))
1758    }
1759
1760    /// Checks if an aggregate is simple enough for factorized execution.
1761    ///
1762    /// Simple aggregates:
1763    /// - COUNT(*) or COUNT(variable)
1764    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1765    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1766        agg.aggregates.iter().all(|agg_expr| {
1767            match agg_expr.function {
1768                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1769                    // COUNT(*) is always OK, COUNT(var) is OK
1770                    agg_expr.expression.is_none()
1771                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1772                }
1773                LogicalAggregateFunction::Sum
1774                | LogicalAggregateFunction::Avg
1775                | LogicalAggregateFunction::Min
1776                | LogicalAggregateFunction::Max => {
1777                    // For now, only support when expression is a variable
1778                    // (property access would require flattening first)
1779                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1780                }
1781                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1782                _ => false,
1783            }
1784        })
1785    }
1786
1787    /// Plans a factorized aggregate that operates directly on factorized data.
1788    ///
1789    /// This avoids the O(n²) cost of flattening before aggregation.
1790    fn plan_factorized_aggregate(
1791        &self,
1792        agg: &AggregateOp,
1793    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1794        // Build the expand chain - this returns a LazyFactorizedChainOperator
1795        let expands = Self::collect_expand_chain(&agg.input);
1796        if expands.is_empty() {
1797            return Err(Error::Internal(
1798                "Expected expand chain for factorized aggregate".to_string(),
1799            ));
1800        }
1801
1802        // Get the base operator (before first expand)
1803        let first_expand = expands[0];
1804        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1805
1806        let mut columns = base_columns.clone();
1807        let mut steps = Vec::new();
1808        let mut is_first = true;
1809
1810        for expand in &expands {
1811            // Find source column for this expand
1812            let source_column = if is_first {
1813                base_columns
1814                    .iter()
1815                    .position(|c| c == &expand.from_variable)
1816                    .ok_or_else(|| {
1817                        Error::Internal(format!(
1818                            "Source variable '{}' not found in base columns",
1819                            expand.from_variable
1820                        ))
1821                    })?
1822            } else {
1823                1 // Target from previous level
1824            };
1825
1826            let direction = match expand.direction {
1827                ExpandDirection::Outgoing => Direction::Outgoing,
1828                ExpandDirection::Incoming => Direction::Incoming,
1829                ExpandDirection::Both => Direction::Both,
1830            };
1831
1832            steps.push(ExpandStep {
1833                source_column,
1834                direction,
1835                edge_type: expand.edge_type.clone(),
1836            });
1837
1838            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1839                let count = self.anon_edge_counter.get();
1840                self.anon_edge_counter.set(count + 1);
1841                format!("_anon_edge_{}", count)
1842            });
1843            columns.push(edge_col_name);
1844            columns.push(expand.to_variable.clone());
1845
1846            is_first = false;
1847        }
1848
1849        // Create the lazy factorized chain operator
1850        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1851
1852        if let Some(tx_id) = self.tx_id {
1853            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1854        } else {
1855            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1856        }
1857
1858        // Convert logical aggregates to factorized aggregates
1859        let factorized_aggs: Vec<FactorizedAggregate> = agg
1860            .aggregates
1861            .iter()
1862            .map(|agg_expr| {
1863                match agg_expr.function {
1864                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1865                        // COUNT(*) uses simple count, COUNT(col) uses column count
1866                        if agg_expr.expression.is_none() {
1867                            FactorizedAggregate::count()
1868                        } else {
1869                            // For COUNT(variable), we use the deepest level's target column
1870                            // which is the last column added to the schema
1871                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1872                        }
1873                    }
1874                    LogicalAggregateFunction::Sum => {
1875                        // SUM on deepest level target
1876                        FactorizedAggregate::sum(1)
1877                    }
1878                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1879                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1880                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1881                    _ => {
1882                        // Shouldn't reach here due to is_simple_aggregate check
1883                        FactorizedAggregate::count()
1884                    }
1885                }
1886            })
1887            .collect();
1888
1889        // Build output column names
1890        let output_columns: Vec<String> = agg
1891            .aggregates
1892            .iter()
1893            .map(|agg_expr| {
1894                agg_expr
1895                    .alias
1896                    .clone()
1897                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1898            })
1899            .collect();
1900
1901        // Create the factorized aggregate operator
1902        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1903
1904        Ok((Box::new(factorized_agg_op), output_columns))
1905    }
1906
1907    /// Resolves a logical expression to a column index.
1908    #[allow(dead_code)]
1909    fn resolve_expression_to_column(
1910        &self,
1911        expr: &LogicalExpression,
1912        variable_columns: &HashMap<String, usize>,
1913    ) -> Result<usize> {
1914        match expr {
1915            LogicalExpression::Variable(name) => variable_columns
1916                .get(name)
1917                .copied()
1918                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1919            LogicalExpression::Property { variable, .. } => variable_columns
1920                .get(variable)
1921                .copied()
1922                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1923            _ => Err(Error::Internal(format!(
1924                "Cannot resolve expression to column: {:?}",
1925                expr
1926            ))),
1927        }
1928    }
1929
1930    /// Resolves a logical expression to a column index, using projected property columns.
1931    ///
1932    /// This is used for aggregations where properties have been projected into their own columns.
1933    fn resolve_expression_to_column_with_properties(
1934        &self,
1935        expr: &LogicalExpression,
1936        variable_columns: &HashMap<String, usize>,
1937    ) -> Result<usize> {
1938        match expr {
1939            LogicalExpression::Variable(name) => variable_columns
1940                .get(name)
1941                .copied()
1942                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1943            LogicalExpression::Property { variable, property } => {
1944                // Look up the projected property column (e.g., "p_price" for p.price)
1945                let col_name = format!("{}_{}", variable, property);
1946                variable_columns.get(&col_name).copied().ok_or_else(|| {
1947                    Error::Internal(format!(
1948                        "Property column '{}' not found (from {}.{})",
1949                        col_name, variable, property
1950                    ))
1951                })
1952            }
1953            _ => Err(Error::Internal(format!(
1954                "Cannot resolve expression to column: {:?}",
1955                expr
1956            ))),
1957        }
1958    }
1959
1960    /// Converts a logical expression to a filter expression.
1961    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1962        match expr {
1963            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1964            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1965            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1966                variable: variable.clone(),
1967                property: property.clone(),
1968            }),
1969            LogicalExpression::Binary { left, op, right } => {
1970                let left_expr = self.convert_expression(left)?;
1971                let right_expr = self.convert_expression(right)?;
1972                let filter_op = convert_binary_op(*op)?;
1973                Ok(FilterExpression::Binary {
1974                    left: Box::new(left_expr),
1975                    op: filter_op,
1976                    right: Box::new(right_expr),
1977                })
1978            }
1979            LogicalExpression::Unary { op, operand } => {
1980                let operand_expr = self.convert_expression(operand)?;
1981                let filter_op = convert_unary_op(*op)?;
1982                Ok(FilterExpression::Unary {
1983                    op: filter_op,
1984                    operand: Box::new(operand_expr),
1985                })
1986            }
1987            LogicalExpression::FunctionCall { name, args, .. } => {
1988                let filter_args: Vec<FilterExpression> = args
1989                    .iter()
1990                    .map(|a| self.convert_expression(a))
1991                    .collect::<Result<Vec<_>>>()?;
1992                Ok(FilterExpression::FunctionCall {
1993                    name: name.clone(),
1994                    args: filter_args,
1995                })
1996            }
1997            LogicalExpression::Case {
1998                operand,
1999                when_clauses,
2000                else_clause,
2001            } => {
2002                let filter_operand = operand
2003                    .as_ref()
2004                    .map(|e| self.convert_expression(e))
2005                    .transpose()?
2006                    .map(Box::new);
2007                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2008                    .iter()
2009                    .map(|(cond, result)| {
2010                        Ok((
2011                            self.convert_expression(cond)?,
2012                            self.convert_expression(result)?,
2013                        ))
2014                    })
2015                    .collect::<Result<Vec<_>>>()?;
2016                let filter_else = else_clause
2017                    .as_ref()
2018                    .map(|e| self.convert_expression(e))
2019                    .transpose()?
2020                    .map(Box::new);
2021                Ok(FilterExpression::Case {
2022                    operand: filter_operand,
2023                    when_clauses: filter_when_clauses,
2024                    else_clause: filter_else,
2025                })
2026            }
2027            LogicalExpression::List(items) => {
2028                let filter_items: Vec<FilterExpression> = items
2029                    .iter()
2030                    .map(|item| self.convert_expression(item))
2031                    .collect::<Result<Vec<_>>>()?;
2032                Ok(FilterExpression::List(filter_items))
2033            }
2034            LogicalExpression::Map(pairs) => {
2035                let filter_pairs: Vec<(String, FilterExpression)> = pairs
2036                    .iter()
2037                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
2038                    .collect::<Result<Vec<_>>>()?;
2039                Ok(FilterExpression::Map(filter_pairs))
2040            }
2041            LogicalExpression::IndexAccess { base, index } => {
2042                let base_expr = self.convert_expression(base)?;
2043                let index_expr = self.convert_expression(index)?;
2044                Ok(FilterExpression::IndexAccess {
2045                    base: Box::new(base_expr),
2046                    index: Box::new(index_expr),
2047                })
2048            }
2049            LogicalExpression::SliceAccess { base, start, end } => {
2050                let base_expr = self.convert_expression(base)?;
2051                let start_expr = start
2052                    .as_ref()
2053                    .map(|s| self.convert_expression(s))
2054                    .transpose()?
2055                    .map(Box::new);
2056                let end_expr = end
2057                    .as_ref()
2058                    .map(|e| self.convert_expression(e))
2059                    .transpose()?
2060                    .map(Box::new);
2061                Ok(FilterExpression::SliceAccess {
2062                    base: Box::new(base_expr),
2063                    start: start_expr,
2064                    end: end_expr,
2065                })
2066            }
2067            LogicalExpression::Parameter(_) => Err(Error::Internal(
2068                "Parameters not yet supported in filters".to_string(),
2069            )),
2070            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2071            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2072            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2073            LogicalExpression::ListComprehension {
2074                variable,
2075                list_expr,
2076                filter_expr,
2077                map_expr,
2078            } => {
2079                let list = self.convert_expression(list_expr)?;
2080                let filter = filter_expr
2081                    .as_ref()
2082                    .map(|f| self.convert_expression(f))
2083                    .transpose()?
2084                    .map(Box::new);
2085                let map = self.convert_expression(map_expr)?;
2086                Ok(FilterExpression::ListComprehension {
2087                    variable: variable.clone(),
2088                    list_expr: Box::new(list),
2089                    filter_expr: filter,
2090                    map_expr: Box::new(map),
2091                })
2092            }
2093            LogicalExpression::ExistsSubquery(subplan) => {
2094                // Extract the pattern from the subplan
2095                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
2096                let (start_var, direction, edge_type, end_labels) =
2097                    self.extract_exists_pattern(subplan)?;
2098
2099                Ok(FilterExpression::ExistsSubquery {
2100                    start_var,
2101                    direction,
2102                    edge_type,
2103                    end_labels,
2104                    min_hops: None,
2105                    max_hops: None,
2106                })
2107            }
2108            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
2109                "COUNT subqueries not yet supported".to_string(),
2110            )),
2111        }
2112    }
2113
2114    /// Extracts the pattern from an EXISTS subplan.
2115    /// Returns (start_variable, direction, edge_type, end_labels).
2116    fn extract_exists_pattern(
2117        &self,
2118        subplan: &LogicalOperator,
2119    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
2120        match subplan {
2121            LogicalOperator::Expand(expand) => {
2122                // Get end node labels from the to_variable if there's a node scan input
2123                let end_labels = self.extract_end_labels_from_expand(expand);
2124                let direction = match expand.direction {
2125                    ExpandDirection::Outgoing => Direction::Outgoing,
2126                    ExpandDirection::Incoming => Direction::Incoming,
2127                    ExpandDirection::Both => Direction::Both,
2128                };
2129                Ok((
2130                    expand.from_variable.clone(),
2131                    direction,
2132                    expand.edge_type.clone(),
2133                    end_labels,
2134                ))
2135            }
2136            LogicalOperator::NodeScan(scan) => {
2137                if let Some(input) = &scan.input {
2138                    self.extract_exists_pattern(input)
2139                } else {
2140                    Err(Error::Internal(
2141                        "EXISTS subquery must contain an edge pattern".to_string(),
2142                    ))
2143                }
2144            }
2145            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
2146            _ => Err(Error::Internal(
2147                "Unsupported EXISTS subquery pattern".to_string(),
2148            )),
2149        }
2150    }
2151
2152    /// Extracts end node labels from an Expand operator if present.
2153    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
2154        // Check if the expand has a NodeScan input with a label filter
2155        match expand.input.as_ref() {
2156            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
2157            _ => None,
2158        }
2159    }
2160
2161    /// Plans a JOIN operator.
2162    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2163        let (left_op, left_columns) = self.plan_operator(&join.left)?;
2164        let (right_op, right_columns) = self.plan_operator(&join.right)?;
2165
2166        // Build combined output columns
2167        let mut columns = left_columns.clone();
2168        columns.extend(right_columns.clone());
2169
2170        // Convert join type
2171        let physical_join_type = match join.join_type {
2172            JoinType::Inner => PhysicalJoinType::Inner,
2173            JoinType::Left => PhysicalJoinType::Left,
2174            JoinType::Right => PhysicalJoinType::Right,
2175            JoinType::Full => PhysicalJoinType::Full,
2176            JoinType::Cross => PhysicalJoinType::Cross,
2177            JoinType::Semi => PhysicalJoinType::Semi,
2178            JoinType::Anti => PhysicalJoinType::Anti,
2179        };
2180
2181        // Build key columns from join conditions
2182        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
2183            // Cross join - no keys
2184            (vec![], vec![])
2185        } else {
2186            join.conditions
2187                .iter()
2188                .filter_map(|cond| {
2189                    // Try to extract column indices from expressions
2190                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
2191                    let right_idx = self
2192                        .expression_to_column(&cond.right, &right_columns)
2193                        .ok()?;
2194                    Some((left_idx, right_idx))
2195                })
2196                .unzip()
2197        };
2198
2199        let output_schema = self.derive_schema_from_columns(&columns);
2200
2201        // Check if we should use leapfrog join for cyclic patterns
2202        // Currently we use hash join by default; leapfrog is available but
2203        // requires explicit multi-way join detection which will be added
2204        // when we have proper cyclic pattern detection in the optimizer.
2205        // For now, LeapfrogJoinOperator is available for direct use.
2206        let _ = LeapfrogJoinOperator::new; // Suppress unused warning
2207
2208        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2209            left_op,
2210            right_op,
2211            probe_keys,
2212            build_keys,
2213            physical_join_type,
2214            output_schema,
2215        ));
2216
2217        Ok((operator, columns))
2218    }
2219
2220    /// Checks if a join pattern is cyclic (e.g., triangle, clique).
2221    ///
2222    /// A cyclic pattern occurs when join conditions reference variables
2223    /// that create a cycle in the join graph. For example, a triangle
2224    /// pattern (a)->(b)->(c)->(a) creates a cycle.
2225    ///
2226    /// Returns true if the join graph contains at least one cycle with 3+ nodes,
2227    /// indicating potential for worst-case optimal join (WCOJ) optimization.
2228    #[allow(dead_code)]
2229    fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
2230        // Build adjacency list for join variables
2231        let mut edges: HashMap<String, Vec<String>> = HashMap::new();
2232        let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
2233
2234        // Collect edges from join conditions
2235        Self::collect_join_edges(
2236            &LogicalOperator::Join(join.clone()),
2237            &mut edges,
2238            &mut all_vars,
2239        );
2240
2241        // Need at least 3 variables to form a cycle
2242        if all_vars.len() < 3 {
2243            return false;
2244        }
2245
2246        // Detect cycle using DFS with coloring
2247        Self::has_cycle(&edges, &all_vars)
2248    }
2249
2250    /// Collects edges from join conditions into an adjacency list.
2251    fn collect_join_edges(
2252        op: &LogicalOperator,
2253        edges: &mut HashMap<String, Vec<String>>,
2254        vars: &mut std::collections::HashSet<String>,
2255    ) {
2256        match op {
2257            LogicalOperator::Join(join) => {
2258                // Process join conditions
2259                for cond in &join.conditions {
2260                    if let (Some(left_var), Some(right_var)) = (
2261                        Self::extract_join_variable(&cond.left),
2262                        Self::extract_join_variable(&cond.right),
2263                    ) && left_var != right_var
2264                    {
2265                        vars.insert(left_var.clone());
2266                        vars.insert(right_var.clone());
2267
2268                        // Add bidirectional edge
2269                        edges
2270                            .entry(left_var.clone())
2271                            .or_default()
2272                            .push(right_var.clone());
2273                        edges.entry(right_var).or_default().push(left_var);
2274                    }
2275                }
2276
2277                // Recurse into children
2278                Self::collect_join_edges(&join.left, edges, vars);
2279                Self::collect_join_edges(&join.right, edges, vars);
2280            }
2281            LogicalOperator::Expand(expand) => {
2282                // Expand creates implicit join between from_variable and to_variable
2283                vars.insert(expand.from_variable.clone());
2284                vars.insert(expand.to_variable.clone());
2285
2286                edges
2287                    .entry(expand.from_variable.clone())
2288                    .or_default()
2289                    .push(expand.to_variable.clone());
2290                edges
2291                    .entry(expand.to_variable.clone())
2292                    .or_default()
2293                    .push(expand.from_variable.clone());
2294
2295                Self::collect_join_edges(&expand.input, edges, vars);
2296            }
2297            LogicalOperator::Filter(filter) => {
2298                Self::collect_join_edges(&filter.input, edges, vars);
2299            }
2300            LogicalOperator::NodeScan(scan) => {
2301                vars.insert(scan.variable.clone());
2302            }
2303            _ => {}
2304        }
2305    }
2306
2307    /// Extracts the variable name from a join expression.
2308    fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2309        match expr {
2310            LogicalExpression::Variable(v) => Some(v.clone()),
2311            LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2312            LogicalExpression::Id(v) => Some(v.clone()),
2313            _ => None,
2314        }
2315    }
2316
2317    /// Detects if the graph has a cycle using DFS coloring.
2318    ///
2319    /// Colors: 0 = white (unvisited), 1 = gray (in progress), 2 = black (done)
2320    fn has_cycle(
2321        edges: &HashMap<String, Vec<String>>,
2322        vars: &std::collections::HashSet<String>,
2323    ) -> bool {
2324        let mut color: HashMap<&String, u8> = HashMap::new();
2325
2326        for var in vars {
2327            color.insert(var, 0);
2328        }
2329
2330        for start in vars {
2331            if color[start] == 0 && Self::dfs_cycle(start, None, edges, &mut color) {
2332                return true;
2333            }
2334        }
2335
2336        false
2337    }
2338
2339    /// DFS helper for cycle detection.
2340    fn dfs_cycle(
2341        node: &String,
2342        parent: Option<&String>,
2343        edges: &HashMap<String, Vec<String>>,
2344        color: &mut HashMap<&String, u8>,
2345    ) -> bool {
2346        *color.get_mut(node).unwrap() = 1; // Gray
2347
2348        if let Some(neighbors) = edges.get(node) {
2349            for neighbor in neighbors {
2350                // Skip the edge back to parent (undirected graph)
2351                if parent == Some(neighbor) {
2352                    continue;
2353                }
2354
2355                if let Some(&c) = color.get(neighbor) {
2356                    if c == 1 {
2357                        // Found a back edge - cycle detected
2358                        return true;
2359                    }
2360                    if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2361                        return true;
2362                    }
2363                }
2364            }
2365        }
2366
2367        *color.get_mut(node).unwrap() = 2; // Black
2368        false
2369    }
2370
2371    /// Counts the number of base relations in a logical operator tree.
2372    #[allow(dead_code)]
2373    fn count_relations(op: &LogicalOperator) -> usize {
2374        match op {
2375            LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2376            LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2377            LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2378            LogicalOperator::Join(j) => {
2379                Self::count_relations(&j.left) + Self::count_relations(&j.right)
2380            }
2381            _ => 0,
2382        }
2383    }
2384
2385    /// Extracts a column index from an expression.
2386    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2387        match expr {
2388            LogicalExpression::Variable(name) => columns
2389                .iter()
2390                .position(|c| c == name)
2391                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2392            _ => Err(Error::Internal(
2393                "Only variables supported in join conditions".to_string(),
2394            )),
2395        }
2396    }
2397
2398    /// Plans a UNION operator.
2399    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2400        if union.inputs.is_empty() {
2401            return Err(Error::Internal(
2402                "Union requires at least one input".to_string(),
2403            ));
2404        }
2405
2406        let mut inputs = Vec::with_capacity(union.inputs.len());
2407        let mut columns = Vec::new();
2408
2409        for (i, input) in union.inputs.iter().enumerate() {
2410            let (op, cols) = self.plan_operator(input)?;
2411            if i == 0 {
2412                columns = cols;
2413            }
2414            inputs.push(op);
2415        }
2416
2417        let output_schema = self.derive_schema_from_columns(&columns);
2418        let operator = Box::new(UnionOperator::new(inputs, output_schema));
2419
2420        Ok((operator, columns))
2421    }
2422
2423    /// Plans a DISTINCT operator.
2424    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2425        let (input_op, columns) = self.plan_operator(&distinct.input)?;
2426        let output_schema = self.derive_schema_from_columns(&columns);
2427        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2428        Ok((operator, columns))
2429    }
2430
2431    /// Plans a CREATE NODE operator.
2432    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2433        // Plan input if present
2434        let (input_op, mut columns) = if let Some(ref input) = create.input {
2435            let (op, cols) = self.plan_operator(input)?;
2436            (Some(op), cols)
2437        } else {
2438            (None, vec![])
2439        };
2440
2441        // Output column for the created node
2442        let output_column = columns.len();
2443        columns.push(create.variable.clone());
2444
2445        // Convert properties (with constant-folding for lists and function calls)
2446        let properties: Vec<(String, PropertySource)> = create
2447            .properties
2448            .iter()
2449            .map(|(name, expr)| {
2450                let source = match Self::try_fold_expression(expr) {
2451                    Some(value) => PropertySource::Constant(value),
2452                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2453                };
2454                (name.clone(), source)
2455            })
2456            .collect();
2457
2458        let output_schema = self.derive_schema_from_columns(&columns);
2459
2460        let operator = Box::new(
2461            CreateNodeOperator::new(
2462                Arc::clone(&self.store),
2463                input_op,
2464                create.labels.clone(),
2465                properties,
2466                output_schema,
2467                output_column,
2468            )
2469            .with_tx_context(self.viewing_epoch, self.tx_id),
2470        );
2471
2472        Ok((operator, columns))
2473    }
2474
2475    /// Plans a CREATE EDGE operator.
2476    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2477        let (input_op, mut columns) = self.plan_operator(&create.input)?;
2478
2479        // Find source and target columns
2480        let from_column = columns
2481            .iter()
2482            .position(|c| c == &create.from_variable)
2483            .ok_or_else(|| {
2484                Error::Internal(format!(
2485                    "Source variable '{}' not found",
2486                    create.from_variable
2487                ))
2488            })?;
2489
2490        let to_column = columns
2491            .iter()
2492            .position(|c| c == &create.to_variable)
2493            .ok_or_else(|| {
2494                Error::Internal(format!(
2495                    "Target variable '{}' not found",
2496                    create.to_variable
2497                ))
2498            })?;
2499
2500        // Output column for the created edge (if named)
2501        let output_column = create.variable.as_ref().map(|v| {
2502            let idx = columns.len();
2503            columns.push(v.clone());
2504            idx
2505        });
2506
2507        // Convert properties (with constant-folding for function calls like vector())
2508        let properties: Vec<(String, PropertySource)> = create
2509            .properties
2510            .iter()
2511            .map(|(name, expr)| {
2512                let source = match Self::try_fold_expression(expr) {
2513                    Some(value) => PropertySource::Constant(value),
2514                    None => PropertySource::Constant(grafeo_common::types::Value::Null),
2515                };
2516                (name.clone(), source)
2517            })
2518            .collect();
2519
2520        let output_schema = self.derive_schema_from_columns(&columns);
2521
2522        let mut operator = CreateEdgeOperator::new(
2523            Arc::clone(&self.store),
2524            input_op,
2525            from_column,
2526            to_column,
2527            create.edge_type.clone(),
2528            output_schema,
2529        )
2530        .with_properties(properties)
2531        .with_tx_context(self.viewing_epoch, self.tx_id);
2532
2533        if let Some(col) = output_column {
2534            operator = operator.with_output_column(col);
2535        }
2536
2537        let operator = Box::new(operator);
2538
2539        Ok((operator, columns))
2540    }
2541
2542    /// Plans a DELETE NODE operator.
2543    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2544        let (input_op, columns) = self.plan_operator(&delete.input)?;
2545
2546        let node_column = columns
2547            .iter()
2548            .position(|c| c == &delete.variable)
2549            .ok_or_else(|| {
2550                Error::Internal(format!(
2551                    "Variable '{}' not found for delete",
2552                    delete.variable
2553                ))
2554            })?;
2555
2556        // Output schema for delete count
2557        let output_schema = vec![LogicalType::Int64];
2558        let output_columns = vec!["deleted_count".to_string()];
2559
2560        let operator = Box::new(
2561            DeleteNodeOperator::new(
2562                Arc::clone(&self.store),
2563                input_op,
2564                node_column,
2565                output_schema,
2566                delete.detach, // DETACH DELETE deletes connected edges first
2567            )
2568            .with_tx_context(self.viewing_epoch, self.tx_id),
2569        );
2570
2571        Ok((operator, output_columns))
2572    }
2573
2574    /// Plans a DELETE EDGE operator.
2575    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2576        let (input_op, columns) = self.plan_operator(&delete.input)?;
2577
2578        let edge_column = columns
2579            .iter()
2580            .position(|c| c == &delete.variable)
2581            .ok_or_else(|| {
2582                Error::Internal(format!(
2583                    "Variable '{}' not found for delete",
2584                    delete.variable
2585                ))
2586            })?;
2587
2588        // Output schema for delete count
2589        let output_schema = vec![LogicalType::Int64];
2590        let output_columns = vec!["deleted_count".to_string()];
2591
2592        let operator = Box::new(
2593            DeleteEdgeOperator::new(
2594                Arc::clone(&self.store),
2595                input_op,
2596                edge_column,
2597                output_schema,
2598            )
2599            .with_tx_context(self.viewing_epoch, self.tx_id),
2600        );
2601
2602        Ok((operator, output_columns))
2603    }
2604
2605    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
2606    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2607        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2608        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2609
2610        // Build combined output columns (left + right)
2611        let mut columns = left_columns.clone();
2612        columns.extend(right_columns.clone());
2613
2614        // Find common variables between left and right for join keys
2615        let mut probe_keys = Vec::new();
2616        let mut build_keys = Vec::new();
2617
2618        for (right_idx, right_col) in right_columns.iter().enumerate() {
2619            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2620                probe_keys.push(left_idx);
2621                build_keys.push(right_idx);
2622            }
2623        }
2624
2625        let output_schema = self.derive_schema_from_columns(&columns);
2626
2627        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2628            left_op,
2629            right_op,
2630            probe_keys,
2631            build_keys,
2632            PhysicalJoinType::Left,
2633            output_schema,
2634        ));
2635
2636        Ok((operator, columns))
2637    }
2638
2639    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
2640    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2641        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2642        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2643
2644        // Anti-join only keeps left columns (filters out matching rows)
2645        let columns = left_columns.clone();
2646
2647        // Find common variables between left and right for join keys
2648        let mut probe_keys = Vec::new();
2649        let mut build_keys = Vec::new();
2650
2651        for (right_idx, right_col) in right_columns.iter().enumerate() {
2652            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2653                probe_keys.push(left_idx);
2654                build_keys.push(right_idx);
2655            }
2656        }
2657
2658        let output_schema = self.derive_schema_from_columns(&columns);
2659
2660        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2661            left_op,
2662            right_op,
2663            probe_keys,
2664            build_keys,
2665            PhysicalJoinType::Anti,
2666            output_schema,
2667        ));
2668
2669        Ok((operator, columns))
2670    }
2671
2672    /// Plans an unwind operator.
2673    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2674        // Plan the input operator first
2675        // Handle Empty specially - use a single-row operator
2676        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2677            if matches!(&*unwind.input, LogicalOperator::Empty) {
2678                // For UNWIND without prior MATCH, create a single-row input
2679                // We need an operator that produces one row with the list to unwind
2680                // For now, use EmptyScan which produces no rows - we'll handle the literal
2681                // list in the unwind operator itself
2682                let literal_list = self.convert_expression(&unwind.expression)?;
2683
2684                // Create a project operator that produces a single row with the list
2685                let single_row_op: Box<dyn Operator> = Box::new(
2686                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2687                );
2688                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2689                    single_row_op,
2690                    vec![ProjectExpr::Expression {
2691                        expr: literal_list,
2692                        variable_columns: HashMap::new(),
2693                    }],
2694                    vec![LogicalType::Any],
2695                    Arc::clone(&self.store),
2696                ));
2697
2698                (project_op, vec!["__list__".to_string()])
2699            } else {
2700                self.plan_operator(&unwind.input)?
2701            };
2702
2703        // The UNWIND expression should be a list - we need to find/evaluate it
2704        // For now, we handle the case where the expression references an existing column
2705        // or is a literal list
2706
2707        // Find if the expression references an existing column (like a list property)
2708        let list_col_idx = match &unwind.expression {
2709            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2710            LogicalExpression::Property { variable, .. } => {
2711                // Property access needs to be evaluated - for now we'll need the filter predicate
2712                // to evaluate this. For simple cases, we treat it as a list column.
2713                input_columns.iter().position(|c| c == variable)
2714            }
2715            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2716                // Literal list expression - we'll add it as a virtual column
2717                None
2718            }
2719            _ => None,
2720        };
2721
2722        // Build output columns: all input columns plus the new variable
2723        let mut columns = input_columns.clone();
2724        columns.push(unwind.variable.clone());
2725
2726        // Build output schema
2727        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2728        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2729
2730        // Use the list column index if found, otherwise default to 0
2731        // (in which case the first column should contain the list)
2732        let col_idx = list_col_idx.unwrap_or(0);
2733
2734        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2735            input_op,
2736            col_idx,
2737            unwind.variable.clone(),
2738            output_schema,
2739        ));
2740
2741        Ok((operator, columns))
2742    }
2743
2744    /// Plans a MERGE operator.
2745    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2746        // Plan the input operator if present (skip if Empty)
2747        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2748            Vec::new()
2749        } else {
2750            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2751            cols
2752        };
2753
2754        // Convert match properties from LogicalExpression to Value
2755        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2756            .match_properties
2757            .iter()
2758            .filter_map(|(name, expr)| {
2759                if let LogicalExpression::Literal(v) = expr {
2760                    Some((name.clone(), v.clone()))
2761                } else {
2762                    None // Skip non-literal expressions for now
2763                }
2764            })
2765            .collect();
2766
2767        // Convert ON CREATE properties
2768        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2769            .on_create
2770            .iter()
2771            .filter_map(|(name, expr)| {
2772                if let LogicalExpression::Literal(v) = expr {
2773                    Some((name.clone(), v.clone()))
2774                } else {
2775                    None
2776                }
2777            })
2778            .collect();
2779
2780        // Convert ON MATCH properties
2781        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2782            .on_match
2783            .iter()
2784            .filter_map(|(name, expr)| {
2785                if let LogicalExpression::Literal(v) = expr {
2786                    Some((name.clone(), v.clone()))
2787                } else {
2788                    None
2789                }
2790            })
2791            .collect();
2792
2793        // Add the merged node variable to output columns
2794        columns.push(merge.variable.clone());
2795
2796        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2797            Arc::clone(&self.store),
2798            merge.variable.clone(),
2799            merge.labels.clone(),
2800            match_properties,
2801            on_create_properties,
2802            on_match_properties,
2803        ));
2804
2805        Ok((operator, columns))
2806    }
2807
2808    /// Plans a SHORTEST PATH operator.
2809    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2810        // Plan the input operator
2811        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2812
2813        // Find source and target node columns
2814        let source_column = columns
2815            .iter()
2816            .position(|c| c == &sp.source_var)
2817            .ok_or_else(|| {
2818                Error::Internal(format!(
2819                    "Source variable '{}' not found for shortestPath",
2820                    sp.source_var
2821                ))
2822            })?;
2823
2824        let target_column = columns
2825            .iter()
2826            .position(|c| c == &sp.target_var)
2827            .ok_or_else(|| {
2828                Error::Internal(format!(
2829                    "Target variable '{}' not found for shortestPath",
2830                    sp.target_var
2831                ))
2832            })?;
2833
2834        // Convert direction
2835        let direction = match sp.direction {
2836            ExpandDirection::Outgoing => Direction::Outgoing,
2837            ExpandDirection::Incoming => Direction::Incoming,
2838            ExpandDirection::Both => Direction::Both,
2839        };
2840
2841        // Create the shortest path operator
2842        let operator: Box<dyn Operator> = Box::new(
2843            ShortestPathOperator::new(
2844                Arc::clone(&self.store),
2845                input_op,
2846                source_column,
2847                target_column,
2848                sp.edge_type.clone(),
2849                direction,
2850            )
2851            .with_all_paths(sp.all_paths),
2852        );
2853
2854        // Add path length column with the expected naming convention
2855        // The translator expects _path_length_{alias} format for length(p) calls
2856        columns.push(format!("_path_length_{}", sp.path_alias));
2857
2858        Ok((operator, columns))
2859    }
2860
2861    /// Plans a CALL procedure operator.
2862    fn plan_call_procedure(
2863        &self,
2864        call: &CallProcedureOp,
2865    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2866        use crate::procedures::{self, BuiltinProcedures};
2867
2868        static PROCEDURES: std::sync::OnceLock<BuiltinProcedures> = std::sync::OnceLock::new();
2869        let registry = PROCEDURES.get_or_init(BuiltinProcedures::new);
2870
2871        // Special case: grafeo.procedures() lists all procedures
2872        let resolved_name = call.name.join(".");
2873        if resolved_name == "grafeo.procedures" || resolved_name == "procedures" {
2874            let result = procedures::procedures_result(registry);
2875            return self.plan_static_result(result, &call.yield_items);
2876        }
2877
2878        // Look up the algorithm
2879        let algorithm = registry.get(&call.name).ok_or_else(|| {
2880            Error::Internal(format!(
2881                "Unknown procedure: '{}'. Use CALL grafeo.procedures() to list available procedures.",
2882                call.name.join(".")
2883            ))
2884        })?;
2885
2886        // Evaluate arguments to Parameters
2887        let params = procedures::evaluate_arguments(&call.arguments, algorithm.parameters());
2888
2889        // Canonical column names for this algorithm (user-facing names)
2890        let canonical_columns = procedures::output_columns_for_name(algorithm.as_ref());
2891
2892        // Determine output columns from YIELD or algorithm defaults
2893        let yield_columns = call.yield_items.as_ref().map(|items| {
2894            items
2895                .iter()
2896                .map(|item| (item.field_name.clone(), item.alias.clone()))
2897                .collect::<Vec<_>>()
2898        });
2899
2900        let output_columns = if let Some(yield_cols) = &yield_columns {
2901            yield_cols
2902                .iter()
2903                .map(|(name, alias)| alias.clone().unwrap_or_else(|| name.clone()))
2904                .collect()
2905        } else {
2906            canonical_columns.clone()
2907        };
2908
2909        let operator = Box::new(
2910            crate::query::executor::procedure_call::ProcedureCallOperator::new(
2911                Arc::clone(&self.store),
2912                algorithm,
2913                params,
2914                yield_columns,
2915                canonical_columns,
2916            ),
2917        );
2918
2919        Ok((operator, output_columns))
2920    }
2921
2922    /// Plans a static result set (e.g., from `grafeo.procedures()`).
2923    fn plan_static_result(
2924        &self,
2925        result: grafeo_adapters::plugins::AlgorithmResult,
2926        yield_items: &Option<Vec<crate::query::plan::ProcedureYield>>,
2927    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2928        // Determine output columns and column indices
2929        let (output_columns, column_indices) = if let Some(items) = yield_items {
2930            let mut cols = Vec::new();
2931            let mut indices = Vec::new();
2932            for item in items {
2933                let idx = result
2934                    .columns
2935                    .iter()
2936                    .position(|c| c == &item.field_name)
2937                    .ok_or_else(|| {
2938                        Error::Internal(format!(
2939                            "YIELD column '{}' not found (available: {})",
2940                            item.field_name,
2941                            result.columns.join(", ")
2942                        ))
2943                    })?;
2944                indices.push(idx);
2945                cols.push(
2946                    item.alias
2947                        .clone()
2948                        .unwrap_or_else(|| item.field_name.clone()),
2949                );
2950            }
2951            (cols, indices)
2952        } else {
2953            let indices: Vec<usize> = (0..result.columns.len()).collect();
2954            (result.columns.clone(), indices)
2955        };
2956
2957        let operator = Box::new(StaticResultOperator {
2958            rows: result.rows,
2959            column_indices,
2960            row_index: 0,
2961        });
2962
2963        Ok((operator, output_columns))
2964    }
2965
2966    /// Plans an ADD LABEL operator.
2967    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2968        let (input_op, columns) = self.plan_operator(&add_label.input)?;
2969
2970        // Find the node column
2971        let node_column = columns
2972            .iter()
2973            .position(|c| c == &add_label.variable)
2974            .ok_or_else(|| {
2975                Error::Internal(format!(
2976                    "Variable '{}' not found for ADD LABEL",
2977                    add_label.variable
2978                ))
2979            })?;
2980
2981        // Output schema for update count
2982        let output_schema = vec![LogicalType::Int64];
2983        let output_columns = vec!["labels_added".to_string()];
2984
2985        let operator = Box::new(AddLabelOperator::new(
2986            Arc::clone(&self.store),
2987            input_op,
2988            node_column,
2989            add_label.labels.clone(),
2990            output_schema,
2991        ));
2992
2993        Ok((operator, output_columns))
2994    }
2995
2996    /// Plans a REMOVE LABEL operator.
2997    fn plan_remove_label(
2998        &self,
2999        remove_label: &RemoveLabelOp,
3000    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3001        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
3002
3003        // Find the node column
3004        let node_column = columns
3005            .iter()
3006            .position(|c| c == &remove_label.variable)
3007            .ok_or_else(|| {
3008                Error::Internal(format!(
3009                    "Variable '{}' not found for REMOVE LABEL",
3010                    remove_label.variable
3011                ))
3012            })?;
3013
3014        // Output schema for update count
3015        let output_schema = vec![LogicalType::Int64];
3016        let output_columns = vec!["labels_removed".to_string()];
3017
3018        let operator = Box::new(RemoveLabelOperator::new(
3019            Arc::clone(&self.store),
3020            input_op,
3021            node_column,
3022            remove_label.labels.clone(),
3023            output_schema,
3024        ));
3025
3026        Ok((operator, output_columns))
3027    }
3028
3029    /// Plans a SET PROPERTY operator.
3030    fn plan_set_property(
3031        &self,
3032        set_prop: &SetPropertyOp,
3033    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
3034        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
3035
3036        // Find the entity column (node or edge variable)
3037        let entity_column = columns
3038            .iter()
3039            .position(|c| c == &set_prop.variable)
3040            .ok_or_else(|| {
3041                Error::Internal(format!(
3042                    "Variable '{}' not found for SET",
3043                    set_prop.variable
3044                ))
3045            })?;
3046
3047        // Convert properties to PropertySource
3048        let properties: Vec<(String, PropertySource)> = set_prop
3049            .properties
3050            .iter()
3051            .map(|(name, expr)| {
3052                let source = self.expression_to_property_source(expr, &columns)?;
3053                Ok((name.clone(), source))
3054            })
3055            .collect::<Result<Vec<_>>>()?;
3056
3057        // Output schema preserves input schema (passes through)
3058        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
3059        let output_columns = columns.clone();
3060
3061        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
3062        let operator = Box::new(SetPropertyOperator::new_for_node(
3063            Arc::clone(&self.store),
3064            input_op,
3065            entity_column,
3066            properties,
3067            output_schema,
3068        ));
3069
3070        Ok((operator, output_columns))
3071    }
3072
3073    /// Converts a logical expression to a PropertySource.
3074    fn expression_to_property_source(
3075        &self,
3076        expr: &LogicalExpression,
3077        columns: &[String],
3078    ) -> Result<PropertySource> {
3079        match expr {
3080            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
3081            LogicalExpression::Variable(name) => {
3082                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
3083                    Error::Internal(format!("Variable '{}' not found for property source", name))
3084                })?;
3085                Ok(PropertySource::Column(col_idx))
3086            }
3087            LogicalExpression::Parameter(name) => {
3088                // Parameters should be resolved before planning
3089                // For now, treat as a placeholder
3090                Ok(PropertySource::Constant(
3091                    grafeo_common::types::Value::String(format!("${}", name).into()),
3092                ))
3093            }
3094            _ => {
3095                if let Some(value) = Self::try_fold_expression(expr) {
3096                    Ok(PropertySource::Constant(value))
3097                } else {
3098                    Err(Error::Internal(format!(
3099                        "Unsupported expression type for property source: {:?}",
3100                        expr
3101                    )))
3102                }
3103            }
3104        }
3105    }
3106
3107    /// Tries to evaluate a constant expression at plan time.
3108    ///
3109    /// Recursively folds literals, lists, and known function calls (like `vector()`)
3110    /// into concrete values. Returns `None` if the expression contains non-constant
3111    /// parts (variables, property accesses, etc.).
3112    fn try_fold_expression(expr: &LogicalExpression) -> Option<Value> {
3113        match expr {
3114            LogicalExpression::Literal(v) => Some(v.clone()),
3115            LogicalExpression::List(items) => {
3116                let values: Option<Vec<Value>> =
3117                    items.iter().map(Self::try_fold_expression).collect();
3118                let values = values?;
3119                // All-numeric lists become vectors (matches Python list[float] behavior)
3120                let all_numeric = !values.is_empty()
3121                    && values
3122                        .iter()
3123                        .all(|v| matches!(v, Value::Float64(_) | Value::Int64(_)));
3124                if all_numeric {
3125                    let floats: Vec<f32> = values
3126                        .iter()
3127                        .filter_map(|v| match v {
3128                            Value::Float64(f) => Some(*f as f32),
3129                            Value::Int64(i) => Some(*i as f32),
3130                            _ => None,
3131                        })
3132                        .collect();
3133                    Some(Value::Vector(floats.into()))
3134                } else {
3135                    Some(Value::List(values.into()))
3136                }
3137            }
3138            LogicalExpression::FunctionCall { name, args, .. } => {
3139                match name.to_lowercase().as_str() {
3140                    "vector" => {
3141                        if args.len() != 1 {
3142                            return None;
3143                        }
3144                        let val = Self::try_fold_expression(&args[0])?;
3145                        match val {
3146                            Value::List(items) => {
3147                                let floats: Vec<f32> = items
3148                                    .iter()
3149                                    .filter_map(|v| match v {
3150                                        Value::Float64(f) => Some(*f as f32),
3151                                        Value::Int64(i) => Some(*i as f32),
3152                                        _ => None,
3153                                    })
3154                                    .collect();
3155                                if floats.len() == items.len() {
3156                                    Some(Value::Vector(floats.into()))
3157                                } else {
3158                                    None
3159                                }
3160                            }
3161                            // Already a vector (from all-numeric list folding)
3162                            Value::Vector(v) => Some(Value::Vector(v)),
3163                            _ => None,
3164                        }
3165                    }
3166                    _ => None,
3167                }
3168            }
3169            _ => None,
3170        }
3171    }
3172}
3173
3174/// Converts a logical binary operator to a filter binary operator.
3175pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
3176    match op {
3177        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
3178        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
3179        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
3180        BinaryOp::Le => Ok(BinaryFilterOp::Le),
3181        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
3182        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
3183        BinaryOp::And => Ok(BinaryFilterOp::And),
3184        BinaryOp::Or => Ok(BinaryFilterOp::Or),
3185        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
3186        BinaryOp::Add => Ok(BinaryFilterOp::Add),
3187        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
3188        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
3189        BinaryOp::Div => Ok(BinaryFilterOp::Div),
3190        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
3191        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
3192        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
3193        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
3194        BinaryOp::In => Ok(BinaryFilterOp::In),
3195        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
3196        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
3197        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
3198            "Binary operator {:?} not yet supported in filters",
3199            op
3200        ))),
3201    }
3202}
3203
3204/// Converts a logical unary operator to a filter unary operator.
3205pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
3206    match op {
3207        UnaryOp::Not => Ok(UnaryFilterOp::Not),
3208        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
3209        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
3210        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
3211    }
3212}
3213
3214/// Converts a logical aggregate function to a physical aggregate function.
3215pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
3216    match func {
3217        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
3218        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
3219        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
3220        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
3221        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
3222        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
3223        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
3224        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
3225        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
3226        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
3227        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
3228    }
3229}
3230
3231/// Converts a logical expression to a filter expression.
3232///
3233/// This is a standalone function that can be used by both LPG and RDF planners.
3234pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
3235    match expr {
3236        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
3237        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
3238        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
3239            variable: variable.clone(),
3240            property: property.clone(),
3241        }),
3242        LogicalExpression::Binary { left, op, right } => {
3243            let left_expr = convert_filter_expression(left)?;
3244            let right_expr = convert_filter_expression(right)?;
3245            let filter_op = convert_binary_op(*op)?;
3246            Ok(FilterExpression::Binary {
3247                left: Box::new(left_expr),
3248                op: filter_op,
3249                right: Box::new(right_expr),
3250            })
3251        }
3252        LogicalExpression::Unary { op, operand } => {
3253            let operand_expr = convert_filter_expression(operand)?;
3254            let filter_op = convert_unary_op(*op)?;
3255            Ok(FilterExpression::Unary {
3256                op: filter_op,
3257                operand: Box::new(operand_expr),
3258            })
3259        }
3260        LogicalExpression::FunctionCall { name, args, .. } => {
3261            let filter_args: Vec<FilterExpression> = args
3262                .iter()
3263                .map(convert_filter_expression)
3264                .collect::<Result<Vec<_>>>()?;
3265            Ok(FilterExpression::FunctionCall {
3266                name: name.clone(),
3267                args: filter_args,
3268            })
3269        }
3270        LogicalExpression::Case {
3271            operand,
3272            when_clauses,
3273            else_clause,
3274        } => {
3275            let filter_operand = operand
3276                .as_ref()
3277                .map(|e| convert_filter_expression(e))
3278                .transpose()?
3279                .map(Box::new);
3280            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
3281                .iter()
3282                .map(|(cond, result)| {
3283                    Ok((
3284                        convert_filter_expression(cond)?,
3285                        convert_filter_expression(result)?,
3286                    ))
3287                })
3288                .collect::<Result<Vec<_>>>()?;
3289            let filter_else = else_clause
3290                .as_ref()
3291                .map(|e| convert_filter_expression(e))
3292                .transpose()?
3293                .map(Box::new);
3294            Ok(FilterExpression::Case {
3295                operand: filter_operand,
3296                when_clauses: filter_when_clauses,
3297                else_clause: filter_else,
3298            })
3299        }
3300        LogicalExpression::List(items) => {
3301            let filter_items: Vec<FilterExpression> = items
3302                .iter()
3303                .map(convert_filter_expression)
3304                .collect::<Result<Vec<_>>>()?;
3305            Ok(FilterExpression::List(filter_items))
3306        }
3307        LogicalExpression::Map(pairs) => {
3308            let filter_pairs: Vec<(String, FilterExpression)> = pairs
3309                .iter()
3310                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
3311                .collect::<Result<Vec<_>>>()?;
3312            Ok(FilterExpression::Map(filter_pairs))
3313        }
3314        LogicalExpression::IndexAccess { base, index } => {
3315            let base_expr = convert_filter_expression(base)?;
3316            let index_expr = convert_filter_expression(index)?;
3317            Ok(FilterExpression::IndexAccess {
3318                base: Box::new(base_expr),
3319                index: Box::new(index_expr),
3320            })
3321        }
3322        LogicalExpression::SliceAccess { base, start, end } => {
3323            let base_expr = convert_filter_expression(base)?;
3324            let start_expr = start
3325                .as_ref()
3326                .map(|s| convert_filter_expression(s))
3327                .transpose()?
3328                .map(Box::new);
3329            let end_expr = end
3330                .as_ref()
3331                .map(|e| convert_filter_expression(e))
3332                .transpose()?
3333                .map(Box::new);
3334            Ok(FilterExpression::SliceAccess {
3335                base: Box::new(base_expr),
3336                start: start_expr,
3337                end: end_expr,
3338            })
3339        }
3340        LogicalExpression::Parameter(_) => Err(Error::Internal(
3341            "Parameters not yet supported in filters".to_string(),
3342        )),
3343        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
3344        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
3345        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
3346        LogicalExpression::ListComprehension {
3347            variable,
3348            list_expr,
3349            filter_expr,
3350            map_expr,
3351        } => {
3352            let list = convert_filter_expression(list_expr)?;
3353            let filter = filter_expr
3354                .as_ref()
3355                .map(|f| convert_filter_expression(f))
3356                .transpose()?
3357                .map(Box::new);
3358            let map = convert_filter_expression(map_expr)?;
3359            Ok(FilterExpression::ListComprehension {
3360                variable: variable.clone(),
3361                list_expr: Box::new(list),
3362                filter_expr: filter,
3363                map_expr: Box::new(map),
3364            })
3365        }
3366        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
3367            Error::Internal("Subqueries not yet supported in filters".to_string()),
3368        ),
3369    }
3370}
3371
3372/// Infers the logical type from a value.
3373fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
3374    use grafeo_common::types::Value;
3375    match value {
3376        Value::Null => LogicalType::String, // Default type for null
3377        Value::Bool(_) => LogicalType::Bool,
3378        Value::Int64(_) => LogicalType::Int64,
3379        Value::Float64(_) => LogicalType::Float64,
3380        Value::String(_) => LogicalType::String,
3381        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
3382        Value::Timestamp(_) => LogicalType::Timestamp,
3383        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
3384        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
3385        Value::Vector(v) => LogicalType::Vector(v.len()),
3386    }
3387}
3388
3389/// Converts an expression to a string for column naming.
3390fn expression_to_string(expr: &LogicalExpression) -> String {
3391    match expr {
3392        LogicalExpression::Variable(name) => name.clone(),
3393        LogicalExpression::Property { variable, property } => {
3394            format!("{variable}.{property}")
3395        }
3396        LogicalExpression::Literal(value) => format!("{value:?}"),
3397        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
3398        _ => "expr".to_string(),
3399    }
3400}
3401
3402/// A physical plan ready for execution.
3403pub struct PhysicalPlan {
3404    /// The root physical operator.
3405    pub operator: Box<dyn Operator>,
3406    /// Column names for the result.
3407    pub columns: Vec<String>,
3408    /// Adaptive execution context with cardinality estimates.
3409    ///
3410    /// When adaptive execution is enabled, this context contains estimated
3411    /// cardinalities at various checkpoints in the plan. During execution,
3412    /// actual row counts are recorded and compared against estimates.
3413    pub adaptive_context: Option<AdaptiveContext>,
3414}
3415
3416impl PhysicalPlan {
3417    /// Returns the column names.
3418    #[must_use]
3419    pub fn columns(&self) -> &[String] {
3420        &self.columns
3421    }
3422
3423    /// Consumes the plan and returns the operator.
3424    pub fn into_operator(self) -> Box<dyn Operator> {
3425        self.operator
3426    }
3427
3428    /// Returns the adaptive context, if adaptive execution is enabled.
3429    #[must_use]
3430    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
3431        self.adaptive_context.as_ref()
3432    }
3433
3434    /// Takes ownership of the adaptive context.
3435    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
3436        self.adaptive_context.take()
3437    }
3438}
3439
3440/// Helper operator that returns a single result chunk once.
3441///
3442/// Used by the factorized expand chain to wrap the final result.
3443#[allow(dead_code)]
3444struct SingleResultOperator {
3445    result: Option<grafeo_core::execution::DataChunk>,
3446}
3447
3448impl SingleResultOperator {
3449    #[allow(dead_code)]
3450    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
3451        Self { result }
3452    }
3453}
3454
3455impl Operator for SingleResultOperator {
3456    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3457        Ok(self.result.take())
3458    }
3459
3460    fn reset(&mut self) {
3461        // Cannot reset - result is consumed
3462    }
3463
3464    fn name(&self) -> &'static str {
3465        "SingleResult"
3466    }
3467}
3468
3469/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
3470struct StaticResultOperator {
3471    rows: Vec<Vec<Value>>,
3472    column_indices: Vec<usize>,
3473    row_index: usize,
3474}
3475
3476impl Operator for StaticResultOperator {
3477    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
3478        use grafeo_core::execution::DataChunk;
3479
3480        if self.row_index >= self.rows.len() {
3481            return Ok(None);
3482        }
3483
3484        let remaining = self.rows.len() - self.row_index;
3485        let chunk_rows = remaining.min(1024);
3486        let col_count = self.column_indices.len();
3487
3488        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
3489        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
3490
3491        for row_offset in 0..chunk_rows {
3492            let row = &self.rows[self.row_index + row_offset];
3493            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
3494                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
3495                if let Some(col) = chunk.column_mut(col_idx) {
3496                    col.push_value(value);
3497                }
3498            }
3499        }
3500        chunk.set_count(chunk_rows);
3501
3502        self.row_index += chunk_rows;
3503        Ok(Some(chunk))
3504    }
3505
3506    fn reset(&mut self) {
3507        self.row_index = 0;
3508    }
3509
3510    fn name(&self) -> &'static str {
3511        "StaticResult"
3512    }
3513}
3514
3515#[cfg(test)]
3516mod tests {
3517    use super::*;
3518    use crate::query::plan::{
3519        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3520        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3521        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3522        SortKey, SortOp,
3523    };
3524    use grafeo_common::types::Value;
3525
3526    fn create_test_store() -> Arc<LpgStore> {
3527        let store = Arc::new(LpgStore::new());
3528        store.create_node(&["Person"]);
3529        store.create_node(&["Person"]);
3530        store.create_node(&["Company"]);
3531        store
3532    }
3533
3534    // ==================== Simple Scan Tests ====================
3535
3536    #[test]
3537    fn test_plan_simple_scan() {
3538        let store = create_test_store();
3539        let planner = Planner::new(store);
3540
3541        // MATCH (n:Person) RETURN n
3542        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3543            items: vec![ReturnItem {
3544                expression: LogicalExpression::Variable("n".to_string()),
3545                alias: None,
3546            }],
3547            distinct: false,
3548            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3549                variable: "n".to_string(),
3550                label: Some("Person".to_string()),
3551                input: None,
3552            })),
3553        }));
3554
3555        let physical = planner.plan(&logical).unwrap();
3556        assert_eq!(physical.columns(), &["n"]);
3557    }
3558
3559    #[test]
3560    fn test_plan_scan_without_label() {
3561        let store = create_test_store();
3562        let planner = Planner::new(store);
3563
3564        // MATCH (n) RETURN n
3565        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3566            items: vec![ReturnItem {
3567                expression: LogicalExpression::Variable("n".to_string()),
3568                alias: None,
3569            }],
3570            distinct: false,
3571            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3572                variable: "n".to_string(),
3573                label: None,
3574                input: None,
3575            })),
3576        }));
3577
3578        let physical = planner.plan(&logical).unwrap();
3579        assert_eq!(physical.columns(), &["n"]);
3580    }
3581
3582    #[test]
3583    fn test_plan_return_with_alias() {
3584        let store = create_test_store();
3585        let planner = Planner::new(store);
3586
3587        // MATCH (n:Person) RETURN n AS person
3588        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3589            items: vec![ReturnItem {
3590                expression: LogicalExpression::Variable("n".to_string()),
3591                alias: Some("person".to_string()),
3592            }],
3593            distinct: false,
3594            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3595                variable: "n".to_string(),
3596                label: Some("Person".to_string()),
3597                input: None,
3598            })),
3599        }));
3600
3601        let physical = planner.plan(&logical).unwrap();
3602        assert_eq!(physical.columns(), &["person"]);
3603    }
3604
3605    #[test]
3606    fn test_plan_return_property() {
3607        let store = create_test_store();
3608        let planner = Planner::new(store);
3609
3610        // MATCH (n:Person) RETURN n.name
3611        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3612            items: vec![ReturnItem {
3613                expression: LogicalExpression::Property {
3614                    variable: "n".to_string(),
3615                    property: "name".to_string(),
3616                },
3617                alias: None,
3618            }],
3619            distinct: false,
3620            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3621                variable: "n".to_string(),
3622                label: Some("Person".to_string()),
3623                input: None,
3624            })),
3625        }));
3626
3627        let physical = planner.plan(&logical).unwrap();
3628        assert_eq!(physical.columns(), &["n.name"]);
3629    }
3630
3631    #[test]
3632    fn test_plan_return_literal() {
3633        let store = create_test_store();
3634        let planner = Planner::new(store);
3635
3636        // MATCH (n) RETURN 42 AS answer
3637        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3638            items: vec![ReturnItem {
3639                expression: LogicalExpression::Literal(Value::Int64(42)),
3640                alias: Some("answer".to_string()),
3641            }],
3642            distinct: false,
3643            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3644                variable: "n".to_string(),
3645                label: None,
3646                input: None,
3647            })),
3648        }));
3649
3650        let physical = planner.plan(&logical).unwrap();
3651        assert_eq!(physical.columns(), &["answer"]);
3652    }
3653
3654    // ==================== Filter Tests ====================
3655
3656    #[test]
3657    fn test_plan_filter_equality() {
3658        let store = create_test_store();
3659        let planner = Planner::new(store);
3660
3661        // MATCH (n:Person) WHERE n.age = 30 RETURN n
3662        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3663            items: vec![ReturnItem {
3664                expression: LogicalExpression::Variable("n".to_string()),
3665                alias: None,
3666            }],
3667            distinct: false,
3668            input: Box::new(LogicalOperator::Filter(FilterOp {
3669                predicate: LogicalExpression::Binary {
3670                    left: Box::new(LogicalExpression::Property {
3671                        variable: "n".to_string(),
3672                        property: "age".to_string(),
3673                    }),
3674                    op: BinaryOp::Eq,
3675                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3676                },
3677                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3678                    variable: "n".to_string(),
3679                    label: Some("Person".to_string()),
3680                    input: None,
3681                })),
3682            })),
3683        }));
3684
3685        let physical = planner.plan(&logical).unwrap();
3686        assert_eq!(physical.columns(), &["n"]);
3687    }
3688
3689    #[test]
3690    fn test_plan_filter_compound_and() {
3691        let store = create_test_store();
3692        let planner = Planner::new(store);
3693
3694        // WHERE n.age > 20 AND n.age < 40
3695        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3696            items: vec![ReturnItem {
3697                expression: LogicalExpression::Variable("n".to_string()),
3698                alias: None,
3699            }],
3700            distinct: false,
3701            input: Box::new(LogicalOperator::Filter(FilterOp {
3702                predicate: LogicalExpression::Binary {
3703                    left: Box::new(LogicalExpression::Binary {
3704                        left: Box::new(LogicalExpression::Property {
3705                            variable: "n".to_string(),
3706                            property: "age".to_string(),
3707                        }),
3708                        op: BinaryOp::Gt,
3709                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3710                    }),
3711                    op: BinaryOp::And,
3712                    right: Box::new(LogicalExpression::Binary {
3713                        left: Box::new(LogicalExpression::Property {
3714                            variable: "n".to_string(),
3715                            property: "age".to_string(),
3716                        }),
3717                        op: BinaryOp::Lt,
3718                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3719                    }),
3720                },
3721                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3722                    variable: "n".to_string(),
3723                    label: None,
3724                    input: None,
3725                })),
3726            })),
3727        }));
3728
3729        let physical = planner.plan(&logical).unwrap();
3730        assert_eq!(physical.columns(), &["n"]);
3731    }
3732
3733    #[test]
3734    fn test_plan_filter_unary_not() {
3735        let store = create_test_store();
3736        let planner = Planner::new(store);
3737
3738        // WHERE NOT n.active
3739        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3740            items: vec![ReturnItem {
3741                expression: LogicalExpression::Variable("n".to_string()),
3742                alias: None,
3743            }],
3744            distinct: false,
3745            input: Box::new(LogicalOperator::Filter(FilterOp {
3746                predicate: LogicalExpression::Unary {
3747                    op: UnaryOp::Not,
3748                    operand: Box::new(LogicalExpression::Property {
3749                        variable: "n".to_string(),
3750                        property: "active".to_string(),
3751                    }),
3752                },
3753                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3754                    variable: "n".to_string(),
3755                    label: None,
3756                    input: None,
3757                })),
3758            })),
3759        }));
3760
3761        let physical = planner.plan(&logical).unwrap();
3762        assert_eq!(physical.columns(), &["n"]);
3763    }
3764
3765    #[test]
3766    fn test_plan_filter_is_null() {
3767        let store = create_test_store();
3768        let planner = Planner::new(store);
3769
3770        // WHERE n.email IS NULL
3771        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3772            items: vec![ReturnItem {
3773                expression: LogicalExpression::Variable("n".to_string()),
3774                alias: None,
3775            }],
3776            distinct: false,
3777            input: Box::new(LogicalOperator::Filter(FilterOp {
3778                predicate: LogicalExpression::Unary {
3779                    op: UnaryOp::IsNull,
3780                    operand: Box::new(LogicalExpression::Property {
3781                        variable: "n".to_string(),
3782                        property: "email".to_string(),
3783                    }),
3784                },
3785                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3786                    variable: "n".to_string(),
3787                    label: None,
3788                    input: None,
3789                })),
3790            })),
3791        }));
3792
3793        let physical = planner.plan(&logical).unwrap();
3794        assert_eq!(physical.columns(), &["n"]);
3795    }
3796
3797    #[test]
3798    fn test_plan_filter_function_call() {
3799        let store = create_test_store();
3800        let planner = Planner::new(store);
3801
3802        // WHERE size(n.friends) > 0
3803        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3804            items: vec![ReturnItem {
3805                expression: LogicalExpression::Variable("n".to_string()),
3806                alias: None,
3807            }],
3808            distinct: false,
3809            input: Box::new(LogicalOperator::Filter(FilterOp {
3810                predicate: LogicalExpression::Binary {
3811                    left: Box::new(LogicalExpression::FunctionCall {
3812                        name: "size".to_string(),
3813                        args: vec![LogicalExpression::Property {
3814                            variable: "n".to_string(),
3815                            property: "friends".to_string(),
3816                        }],
3817                        distinct: false,
3818                    }),
3819                    op: BinaryOp::Gt,
3820                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3821                },
3822                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3823                    variable: "n".to_string(),
3824                    label: None,
3825                    input: None,
3826                })),
3827            })),
3828        }));
3829
3830        let physical = planner.plan(&logical).unwrap();
3831        assert_eq!(physical.columns(), &["n"]);
3832    }
3833
3834    // ==================== Expand Tests ====================
3835
3836    #[test]
3837    fn test_plan_expand_outgoing() {
3838        let store = create_test_store();
3839        let planner = Planner::new(store);
3840
3841        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
3842        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3843            items: vec![
3844                ReturnItem {
3845                    expression: LogicalExpression::Variable("a".to_string()),
3846                    alias: None,
3847                },
3848                ReturnItem {
3849                    expression: LogicalExpression::Variable("b".to_string()),
3850                    alias: None,
3851                },
3852            ],
3853            distinct: false,
3854            input: Box::new(LogicalOperator::Expand(ExpandOp {
3855                from_variable: "a".to_string(),
3856                to_variable: "b".to_string(),
3857                edge_variable: None,
3858                direction: ExpandDirection::Outgoing,
3859                edge_type: Some("KNOWS".to_string()),
3860                min_hops: 1,
3861                max_hops: Some(1),
3862                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3863                    variable: "a".to_string(),
3864                    label: Some("Person".to_string()),
3865                    input: None,
3866                })),
3867                path_alias: None,
3868            })),
3869        }));
3870
3871        let physical = planner.plan(&logical).unwrap();
3872        // The return should have columns [a, b]
3873        assert!(physical.columns().contains(&"a".to_string()));
3874        assert!(physical.columns().contains(&"b".to_string()));
3875    }
3876
3877    #[test]
3878    fn test_plan_expand_with_edge_variable() {
3879        let store = create_test_store();
3880        let planner = Planner::new(store);
3881
3882        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
3883        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3884            items: vec![
3885                ReturnItem {
3886                    expression: LogicalExpression::Variable("a".to_string()),
3887                    alias: None,
3888                },
3889                ReturnItem {
3890                    expression: LogicalExpression::Variable("r".to_string()),
3891                    alias: None,
3892                },
3893                ReturnItem {
3894                    expression: LogicalExpression::Variable("b".to_string()),
3895                    alias: None,
3896                },
3897            ],
3898            distinct: false,
3899            input: Box::new(LogicalOperator::Expand(ExpandOp {
3900                from_variable: "a".to_string(),
3901                to_variable: "b".to_string(),
3902                edge_variable: Some("r".to_string()),
3903                direction: ExpandDirection::Outgoing,
3904                edge_type: Some("KNOWS".to_string()),
3905                min_hops: 1,
3906                max_hops: Some(1),
3907                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3908                    variable: "a".to_string(),
3909                    label: None,
3910                    input: None,
3911                })),
3912                path_alias: None,
3913            })),
3914        }));
3915
3916        let physical = planner.plan(&logical).unwrap();
3917        assert!(physical.columns().contains(&"a".to_string()));
3918        assert!(physical.columns().contains(&"r".to_string()));
3919        assert!(physical.columns().contains(&"b".to_string()));
3920    }
3921
3922    // ==================== Limit/Skip/Sort Tests ====================
3923
3924    #[test]
3925    fn test_plan_limit() {
3926        let store = create_test_store();
3927        let planner = Planner::new(store);
3928
3929        // MATCH (n) RETURN n LIMIT 10
3930        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3931            items: vec![ReturnItem {
3932                expression: LogicalExpression::Variable("n".to_string()),
3933                alias: None,
3934            }],
3935            distinct: false,
3936            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3937                count: 10,
3938                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3939                    variable: "n".to_string(),
3940                    label: None,
3941                    input: None,
3942                })),
3943            })),
3944        }));
3945
3946        let physical = planner.plan(&logical).unwrap();
3947        assert_eq!(physical.columns(), &["n"]);
3948    }
3949
3950    #[test]
3951    fn test_plan_skip() {
3952        let store = create_test_store();
3953        let planner = Planner::new(store);
3954
3955        // MATCH (n) RETURN n SKIP 5
3956        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3957            items: vec![ReturnItem {
3958                expression: LogicalExpression::Variable("n".to_string()),
3959                alias: None,
3960            }],
3961            distinct: false,
3962            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3963                count: 5,
3964                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3965                    variable: "n".to_string(),
3966                    label: None,
3967                    input: None,
3968                })),
3969            })),
3970        }));
3971
3972        let physical = planner.plan(&logical).unwrap();
3973        assert_eq!(physical.columns(), &["n"]);
3974    }
3975
3976    #[test]
3977    fn test_plan_sort() {
3978        let store = create_test_store();
3979        let planner = Planner::new(store);
3980
3981        // MATCH (n) RETURN n ORDER BY n.name ASC
3982        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3983            items: vec![ReturnItem {
3984                expression: LogicalExpression::Variable("n".to_string()),
3985                alias: None,
3986            }],
3987            distinct: false,
3988            input: Box::new(LogicalOperator::Sort(SortOp {
3989                keys: vec![SortKey {
3990                    expression: LogicalExpression::Variable("n".to_string()),
3991                    order: SortOrder::Ascending,
3992                }],
3993                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3994                    variable: "n".to_string(),
3995                    label: None,
3996                    input: None,
3997                })),
3998            })),
3999        }));
4000
4001        let physical = planner.plan(&logical).unwrap();
4002        assert_eq!(physical.columns(), &["n"]);
4003    }
4004
4005    #[test]
4006    fn test_plan_sort_descending() {
4007        let store = create_test_store();
4008        let planner = Planner::new(store);
4009
4010        // ORDER BY n DESC
4011        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4012            items: vec![ReturnItem {
4013                expression: LogicalExpression::Variable("n".to_string()),
4014                alias: None,
4015            }],
4016            distinct: false,
4017            input: Box::new(LogicalOperator::Sort(SortOp {
4018                keys: vec![SortKey {
4019                    expression: LogicalExpression::Variable("n".to_string()),
4020                    order: SortOrder::Descending,
4021                }],
4022                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4023                    variable: "n".to_string(),
4024                    label: None,
4025                    input: None,
4026                })),
4027            })),
4028        }));
4029
4030        let physical = planner.plan(&logical).unwrap();
4031        assert_eq!(physical.columns(), &["n"]);
4032    }
4033
4034    #[test]
4035    fn test_plan_distinct() {
4036        let store = create_test_store();
4037        let planner = Planner::new(store);
4038
4039        // MATCH (n) RETURN DISTINCT n
4040        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4041            items: vec![ReturnItem {
4042                expression: LogicalExpression::Variable("n".to_string()),
4043                alias: None,
4044            }],
4045            distinct: false,
4046            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4047                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4048                    variable: "n".to_string(),
4049                    label: None,
4050                    input: None,
4051                })),
4052                columns: None,
4053            })),
4054        }));
4055
4056        let physical = planner.plan(&logical).unwrap();
4057        assert_eq!(physical.columns(), &["n"]);
4058    }
4059
4060    // ==================== Aggregate Tests ====================
4061
4062    #[test]
4063    fn test_plan_aggregate_count() {
4064        let store = create_test_store();
4065        let planner = Planner::new(store);
4066
4067        // MATCH (n) RETURN count(n)
4068        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4069            items: vec![ReturnItem {
4070                expression: LogicalExpression::Variable("cnt".to_string()),
4071                alias: None,
4072            }],
4073            distinct: false,
4074            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
4075                group_by: vec![],
4076                aggregates: vec![LogicalAggregateExpr {
4077                    function: LogicalAggregateFunction::Count,
4078                    expression: Some(LogicalExpression::Variable("n".to_string())),
4079                    distinct: false,
4080                    alias: Some("cnt".to_string()),
4081                    percentile: None,
4082                }],
4083                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4084                    variable: "n".to_string(),
4085                    label: None,
4086                    input: None,
4087                })),
4088                having: None,
4089            })),
4090        }));
4091
4092        let physical = planner.plan(&logical).unwrap();
4093        assert!(physical.columns().contains(&"cnt".to_string()));
4094    }
4095
4096    #[test]
4097    fn test_plan_aggregate_with_group_by() {
4098        let store = create_test_store();
4099        let planner = Planner::new(store);
4100
4101        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
4102        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4103            group_by: vec![LogicalExpression::Property {
4104                variable: "n".to_string(),
4105                property: "city".to_string(),
4106            }],
4107            aggregates: vec![LogicalAggregateExpr {
4108                function: LogicalAggregateFunction::Count,
4109                expression: Some(LogicalExpression::Variable("n".to_string())),
4110                distinct: false,
4111                alias: Some("cnt".to_string()),
4112                percentile: None,
4113            }],
4114            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4115                variable: "n".to_string(),
4116                label: Some("Person".to_string()),
4117                input: None,
4118            })),
4119            having: None,
4120        }));
4121
4122        let physical = planner.plan(&logical).unwrap();
4123        assert_eq!(physical.columns().len(), 2);
4124    }
4125
4126    #[test]
4127    fn test_plan_aggregate_sum() {
4128        let store = create_test_store();
4129        let planner = Planner::new(store);
4130
4131        // SUM(n.value)
4132        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4133            group_by: vec![],
4134            aggregates: vec![LogicalAggregateExpr {
4135                function: LogicalAggregateFunction::Sum,
4136                expression: Some(LogicalExpression::Property {
4137                    variable: "n".to_string(),
4138                    property: "value".to_string(),
4139                }),
4140                distinct: false,
4141                alias: Some("total".to_string()),
4142                percentile: None,
4143            }],
4144            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4145                variable: "n".to_string(),
4146                label: None,
4147                input: None,
4148            })),
4149            having: None,
4150        }));
4151
4152        let physical = planner.plan(&logical).unwrap();
4153        assert!(physical.columns().contains(&"total".to_string()));
4154    }
4155
4156    #[test]
4157    fn test_plan_aggregate_avg() {
4158        let store = create_test_store();
4159        let planner = Planner::new(store);
4160
4161        // AVG(n.score)
4162        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4163            group_by: vec![],
4164            aggregates: vec![LogicalAggregateExpr {
4165                function: LogicalAggregateFunction::Avg,
4166                expression: Some(LogicalExpression::Property {
4167                    variable: "n".to_string(),
4168                    property: "score".to_string(),
4169                }),
4170                distinct: false,
4171                alias: Some("average".to_string()),
4172                percentile: None,
4173            }],
4174            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4175                variable: "n".to_string(),
4176                label: None,
4177                input: None,
4178            })),
4179            having: None,
4180        }));
4181
4182        let physical = planner.plan(&logical).unwrap();
4183        assert!(physical.columns().contains(&"average".to_string()));
4184    }
4185
4186    #[test]
4187    fn test_plan_aggregate_min_max() {
4188        let store = create_test_store();
4189        let planner = Planner::new(store);
4190
4191        // MIN(n.age), MAX(n.age)
4192        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4193            group_by: vec![],
4194            aggregates: vec![
4195                LogicalAggregateExpr {
4196                    function: LogicalAggregateFunction::Min,
4197                    expression: Some(LogicalExpression::Property {
4198                        variable: "n".to_string(),
4199                        property: "age".to_string(),
4200                    }),
4201                    distinct: false,
4202                    alias: Some("youngest".to_string()),
4203                    percentile: None,
4204                },
4205                LogicalAggregateExpr {
4206                    function: LogicalAggregateFunction::Max,
4207                    expression: Some(LogicalExpression::Property {
4208                        variable: "n".to_string(),
4209                        property: "age".to_string(),
4210                    }),
4211                    distinct: false,
4212                    alias: Some("oldest".to_string()),
4213                    percentile: None,
4214                },
4215            ],
4216            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4217                variable: "n".to_string(),
4218                label: None,
4219                input: None,
4220            })),
4221            having: None,
4222        }));
4223
4224        let physical = planner.plan(&logical).unwrap();
4225        assert!(physical.columns().contains(&"youngest".to_string()));
4226        assert!(physical.columns().contains(&"oldest".to_string()));
4227    }
4228
4229    // ==================== Join Tests ====================
4230
4231    #[test]
4232    fn test_plan_inner_join() {
4233        let store = create_test_store();
4234        let planner = Planner::new(store);
4235
4236        // Inner join between two scans
4237        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4238            items: vec![
4239                ReturnItem {
4240                    expression: LogicalExpression::Variable("a".to_string()),
4241                    alias: None,
4242                },
4243                ReturnItem {
4244                    expression: LogicalExpression::Variable("b".to_string()),
4245                    alias: None,
4246                },
4247            ],
4248            distinct: false,
4249            input: Box::new(LogicalOperator::Join(JoinOp {
4250                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4251                    variable: "a".to_string(),
4252                    label: Some("Person".to_string()),
4253                    input: None,
4254                })),
4255                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4256                    variable: "b".to_string(),
4257                    label: Some("Company".to_string()),
4258                    input: None,
4259                })),
4260                join_type: JoinType::Inner,
4261                conditions: vec![JoinCondition {
4262                    left: LogicalExpression::Variable("a".to_string()),
4263                    right: LogicalExpression::Variable("b".to_string()),
4264                }],
4265            })),
4266        }));
4267
4268        let physical = planner.plan(&logical).unwrap();
4269        assert!(physical.columns().contains(&"a".to_string()));
4270        assert!(physical.columns().contains(&"b".to_string()));
4271    }
4272
4273    #[test]
4274    fn test_plan_cross_join() {
4275        let store = create_test_store();
4276        let planner = Planner::new(store);
4277
4278        // Cross join (no conditions)
4279        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4280            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4281                variable: "a".to_string(),
4282                label: None,
4283                input: None,
4284            })),
4285            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4286                variable: "b".to_string(),
4287                label: None,
4288                input: None,
4289            })),
4290            join_type: JoinType::Cross,
4291            conditions: vec![],
4292        }));
4293
4294        let physical = planner.plan(&logical).unwrap();
4295        assert_eq!(physical.columns().len(), 2);
4296    }
4297
4298    #[test]
4299    fn test_plan_left_join() {
4300        let store = create_test_store();
4301        let planner = Planner::new(store);
4302
4303        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
4304            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4305                variable: "a".to_string(),
4306                label: None,
4307                input: None,
4308            })),
4309            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4310                variable: "b".to_string(),
4311                label: None,
4312                input: None,
4313            })),
4314            join_type: JoinType::Left,
4315            conditions: vec![],
4316        }));
4317
4318        let physical = planner.plan(&logical).unwrap();
4319        assert_eq!(physical.columns().len(), 2);
4320    }
4321
4322    // ==================== Mutation Tests ====================
4323
4324    #[test]
4325    fn test_plan_create_node() {
4326        let store = create_test_store();
4327        let planner = Planner::new(store);
4328
4329        // CREATE (n:Person {name: 'Alice'})
4330        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
4331            variable: "n".to_string(),
4332            labels: vec!["Person".to_string()],
4333            properties: vec![(
4334                "name".to_string(),
4335                LogicalExpression::Literal(Value::String("Alice".into())),
4336            )],
4337            input: None,
4338        }));
4339
4340        let physical = planner.plan(&logical).unwrap();
4341        assert!(physical.columns().contains(&"n".to_string()));
4342    }
4343
4344    #[test]
4345    fn test_plan_create_edge() {
4346        let store = create_test_store();
4347        let planner = Planner::new(store);
4348
4349        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
4350        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
4351            variable: Some("r".to_string()),
4352            from_variable: "a".to_string(),
4353            to_variable: "b".to_string(),
4354            edge_type: "KNOWS".to_string(),
4355            properties: vec![],
4356            input: Box::new(LogicalOperator::Join(JoinOp {
4357                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4358                    variable: "a".to_string(),
4359                    label: None,
4360                    input: None,
4361                })),
4362                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4363                    variable: "b".to_string(),
4364                    label: None,
4365                    input: None,
4366                })),
4367                join_type: JoinType::Cross,
4368                conditions: vec![],
4369            })),
4370        }));
4371
4372        let physical = planner.plan(&logical).unwrap();
4373        assert!(physical.columns().contains(&"r".to_string()));
4374    }
4375
4376    #[test]
4377    fn test_plan_delete_node() {
4378        let store = create_test_store();
4379        let planner = Planner::new(store);
4380
4381        // MATCH (n) DELETE n
4382        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
4383            variable: "n".to_string(),
4384            detach: false,
4385            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4386                variable: "n".to_string(),
4387                label: None,
4388                input: None,
4389            })),
4390        }));
4391
4392        let physical = planner.plan(&logical).unwrap();
4393        assert!(physical.columns().contains(&"deleted_count".to_string()));
4394    }
4395
4396    // ==================== Error Cases ====================
4397
4398    #[test]
4399    fn test_plan_empty_errors() {
4400        let store = create_test_store();
4401        let planner = Planner::new(store);
4402
4403        let logical = LogicalPlan::new(LogicalOperator::Empty);
4404        let result = planner.plan(&logical);
4405        assert!(result.is_err());
4406    }
4407
4408    #[test]
4409    fn test_plan_missing_variable_in_return() {
4410        let store = create_test_store();
4411        let planner = Planner::new(store);
4412
4413        // Return variable that doesn't exist in input
4414        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4415            items: vec![ReturnItem {
4416                expression: LogicalExpression::Variable("missing".to_string()),
4417                alias: None,
4418            }],
4419            distinct: false,
4420            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4421                variable: "n".to_string(),
4422                label: None,
4423                input: None,
4424            })),
4425        }));
4426
4427        let result = planner.plan(&logical);
4428        assert!(result.is_err());
4429    }
4430
4431    // ==================== Helper Function Tests ====================
4432
4433    #[test]
4434    fn test_convert_binary_ops() {
4435        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
4436        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
4437        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
4438        assert!(convert_binary_op(BinaryOp::Le).is_ok());
4439        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
4440        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
4441        assert!(convert_binary_op(BinaryOp::And).is_ok());
4442        assert!(convert_binary_op(BinaryOp::Or).is_ok());
4443        assert!(convert_binary_op(BinaryOp::Add).is_ok());
4444        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
4445        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
4446        assert!(convert_binary_op(BinaryOp::Div).is_ok());
4447    }
4448
4449    #[test]
4450    fn test_convert_unary_ops() {
4451        assert!(convert_unary_op(UnaryOp::Not).is_ok());
4452        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
4453        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
4454        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
4455    }
4456
4457    #[test]
4458    fn test_convert_aggregate_functions() {
4459        assert!(matches!(
4460            convert_aggregate_function(LogicalAggregateFunction::Count),
4461            PhysicalAggregateFunction::Count
4462        ));
4463        assert!(matches!(
4464            convert_aggregate_function(LogicalAggregateFunction::Sum),
4465            PhysicalAggregateFunction::Sum
4466        ));
4467        assert!(matches!(
4468            convert_aggregate_function(LogicalAggregateFunction::Avg),
4469            PhysicalAggregateFunction::Avg
4470        ));
4471        assert!(matches!(
4472            convert_aggregate_function(LogicalAggregateFunction::Min),
4473            PhysicalAggregateFunction::Min
4474        ));
4475        assert!(matches!(
4476            convert_aggregate_function(LogicalAggregateFunction::Max),
4477            PhysicalAggregateFunction::Max
4478        ));
4479    }
4480
4481    #[test]
4482    fn test_planner_accessors() {
4483        let store = create_test_store();
4484        let planner = Planner::new(Arc::clone(&store));
4485
4486        assert!(planner.tx_id().is_none());
4487        assert!(planner.tx_manager().is_none());
4488        let _ = planner.viewing_epoch(); // Just ensure it's accessible
4489    }
4490
4491    #[test]
4492    fn test_physical_plan_accessors() {
4493        let store = create_test_store();
4494        let planner = Planner::new(store);
4495
4496        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
4497            variable: "n".to_string(),
4498            label: None,
4499            input: None,
4500        }));
4501
4502        let physical = planner.plan(&logical).unwrap();
4503        assert_eq!(physical.columns(), &["n"]);
4504
4505        // Test into_operator
4506        let _ = physical.into_operator();
4507    }
4508
4509    // ==================== Adaptive Planning Tests ====================
4510
4511    #[test]
4512    fn test_plan_adaptive_with_scan() {
4513        let store = create_test_store();
4514        let planner = Planner::new(store);
4515
4516        // MATCH (n:Person) RETURN n
4517        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4518            items: vec![ReturnItem {
4519                expression: LogicalExpression::Variable("n".to_string()),
4520                alias: None,
4521            }],
4522            distinct: false,
4523            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4524                variable: "n".to_string(),
4525                label: Some("Person".to_string()),
4526                input: None,
4527            })),
4528        }));
4529
4530        let physical = planner.plan_adaptive(&logical).unwrap();
4531        assert_eq!(physical.columns(), &["n"]);
4532        // Should have adaptive context with estimates
4533        assert!(physical.adaptive_context.is_some());
4534    }
4535
4536    #[test]
4537    fn test_plan_adaptive_with_filter() {
4538        let store = create_test_store();
4539        let planner = Planner::new(store);
4540
4541        // MATCH (n) WHERE n.age > 30 RETURN n
4542        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4543            items: vec![ReturnItem {
4544                expression: LogicalExpression::Variable("n".to_string()),
4545                alias: None,
4546            }],
4547            distinct: false,
4548            input: Box::new(LogicalOperator::Filter(FilterOp {
4549                predicate: LogicalExpression::Binary {
4550                    left: Box::new(LogicalExpression::Property {
4551                        variable: "n".to_string(),
4552                        property: "age".to_string(),
4553                    }),
4554                    op: BinaryOp::Gt,
4555                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4556                },
4557                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4558                    variable: "n".to_string(),
4559                    label: None,
4560                    input: None,
4561                })),
4562            })),
4563        }));
4564
4565        let physical = planner.plan_adaptive(&logical).unwrap();
4566        assert!(physical.adaptive_context.is_some());
4567    }
4568
4569    #[test]
4570    fn test_plan_adaptive_with_expand() {
4571        let store = create_test_store();
4572        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4573
4574        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
4575        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4576            items: vec![
4577                ReturnItem {
4578                    expression: LogicalExpression::Variable("a".to_string()),
4579                    alias: None,
4580                },
4581                ReturnItem {
4582                    expression: LogicalExpression::Variable("b".to_string()),
4583                    alias: None,
4584                },
4585            ],
4586            distinct: false,
4587            input: Box::new(LogicalOperator::Expand(ExpandOp {
4588                from_variable: "a".to_string(),
4589                to_variable: "b".to_string(),
4590                edge_variable: None,
4591                direction: ExpandDirection::Outgoing,
4592                edge_type: Some("KNOWS".to_string()),
4593                min_hops: 1,
4594                max_hops: Some(1),
4595                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4596                    variable: "a".to_string(),
4597                    label: None,
4598                    input: None,
4599                })),
4600                path_alias: None,
4601            })),
4602        }));
4603
4604        let physical = planner.plan_adaptive(&logical).unwrap();
4605        assert!(physical.adaptive_context.is_some());
4606    }
4607
4608    #[test]
4609    fn test_plan_adaptive_with_join() {
4610        let store = create_test_store();
4611        let planner = Planner::new(store);
4612
4613        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4614            items: vec![
4615                ReturnItem {
4616                    expression: LogicalExpression::Variable("a".to_string()),
4617                    alias: None,
4618                },
4619                ReturnItem {
4620                    expression: LogicalExpression::Variable("b".to_string()),
4621                    alias: None,
4622                },
4623            ],
4624            distinct: false,
4625            input: Box::new(LogicalOperator::Join(JoinOp {
4626                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4627                    variable: "a".to_string(),
4628                    label: None,
4629                    input: None,
4630                })),
4631                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4632                    variable: "b".to_string(),
4633                    label: None,
4634                    input: None,
4635                })),
4636                join_type: JoinType::Cross,
4637                conditions: vec![],
4638            })),
4639        }));
4640
4641        let physical = planner.plan_adaptive(&logical).unwrap();
4642        assert!(physical.adaptive_context.is_some());
4643    }
4644
4645    #[test]
4646    fn test_plan_adaptive_with_aggregate() {
4647        let store = create_test_store();
4648        let planner = Planner::new(store);
4649
4650        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4651            group_by: vec![],
4652            aggregates: vec![LogicalAggregateExpr {
4653                function: LogicalAggregateFunction::Count,
4654                expression: Some(LogicalExpression::Variable("n".to_string())),
4655                distinct: false,
4656                alias: Some("cnt".to_string()),
4657                percentile: None,
4658            }],
4659            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4660                variable: "n".to_string(),
4661                label: None,
4662                input: None,
4663            })),
4664            having: None,
4665        }));
4666
4667        let physical = planner.plan_adaptive(&logical).unwrap();
4668        assert!(physical.adaptive_context.is_some());
4669    }
4670
4671    #[test]
4672    fn test_plan_adaptive_with_distinct() {
4673        let store = create_test_store();
4674        let planner = Planner::new(store);
4675
4676        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4677            items: vec![ReturnItem {
4678                expression: LogicalExpression::Variable("n".to_string()),
4679                alias: None,
4680            }],
4681            distinct: false,
4682            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4683                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4684                    variable: "n".to_string(),
4685                    label: None,
4686                    input: None,
4687                })),
4688                columns: None,
4689            })),
4690        }));
4691
4692        let physical = planner.plan_adaptive(&logical).unwrap();
4693        assert!(physical.adaptive_context.is_some());
4694    }
4695
4696    #[test]
4697    fn test_plan_adaptive_with_limit() {
4698        let store = create_test_store();
4699        let planner = Planner::new(store);
4700
4701        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4702            items: vec![ReturnItem {
4703                expression: LogicalExpression::Variable("n".to_string()),
4704                alias: None,
4705            }],
4706            distinct: false,
4707            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4708                count: 10,
4709                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4710                    variable: "n".to_string(),
4711                    label: None,
4712                    input: None,
4713                })),
4714            })),
4715        }));
4716
4717        let physical = planner.plan_adaptive(&logical).unwrap();
4718        assert!(physical.adaptive_context.is_some());
4719    }
4720
4721    #[test]
4722    fn test_plan_adaptive_with_skip() {
4723        let store = create_test_store();
4724        let planner = Planner::new(store);
4725
4726        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4727            items: vec![ReturnItem {
4728                expression: LogicalExpression::Variable("n".to_string()),
4729                alias: None,
4730            }],
4731            distinct: false,
4732            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4733                count: 5,
4734                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4735                    variable: "n".to_string(),
4736                    label: None,
4737                    input: None,
4738                })),
4739            })),
4740        }));
4741
4742        let physical = planner.plan_adaptive(&logical).unwrap();
4743        assert!(physical.adaptive_context.is_some());
4744    }
4745
4746    #[test]
4747    fn test_plan_adaptive_with_sort() {
4748        let store = create_test_store();
4749        let planner = Planner::new(store);
4750
4751        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4752            items: vec![ReturnItem {
4753                expression: LogicalExpression::Variable("n".to_string()),
4754                alias: None,
4755            }],
4756            distinct: false,
4757            input: Box::new(LogicalOperator::Sort(SortOp {
4758                keys: vec![SortKey {
4759                    expression: LogicalExpression::Variable("n".to_string()),
4760                    order: SortOrder::Ascending,
4761                }],
4762                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4763                    variable: "n".to_string(),
4764                    label: None,
4765                    input: None,
4766                })),
4767            })),
4768        }));
4769
4770        let physical = planner.plan_adaptive(&logical).unwrap();
4771        assert!(physical.adaptive_context.is_some());
4772    }
4773
4774    #[test]
4775    fn test_plan_adaptive_with_union() {
4776        let store = create_test_store();
4777        let planner = Planner::new(store);
4778
4779        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4780            items: vec![ReturnItem {
4781                expression: LogicalExpression::Variable("n".to_string()),
4782                alias: None,
4783            }],
4784            distinct: false,
4785            input: Box::new(LogicalOperator::Union(UnionOp {
4786                inputs: vec![
4787                    LogicalOperator::NodeScan(NodeScanOp {
4788                        variable: "n".to_string(),
4789                        label: Some("Person".to_string()),
4790                        input: None,
4791                    }),
4792                    LogicalOperator::NodeScan(NodeScanOp {
4793                        variable: "n".to_string(),
4794                        label: Some("Company".to_string()),
4795                        input: None,
4796                    }),
4797                ],
4798            })),
4799        }));
4800
4801        let physical = planner.plan_adaptive(&logical).unwrap();
4802        assert!(physical.adaptive_context.is_some());
4803    }
4804
4805    // ==================== Variable Length Path Tests ====================
4806
4807    #[test]
4808    fn test_plan_expand_variable_length() {
4809        let store = create_test_store();
4810        let planner = Planner::new(store);
4811
4812        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
4813        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4814            items: vec![
4815                ReturnItem {
4816                    expression: LogicalExpression::Variable("a".to_string()),
4817                    alias: None,
4818                },
4819                ReturnItem {
4820                    expression: LogicalExpression::Variable("b".to_string()),
4821                    alias: None,
4822                },
4823            ],
4824            distinct: false,
4825            input: Box::new(LogicalOperator::Expand(ExpandOp {
4826                from_variable: "a".to_string(),
4827                to_variable: "b".to_string(),
4828                edge_variable: None,
4829                direction: ExpandDirection::Outgoing,
4830                edge_type: Some("KNOWS".to_string()),
4831                min_hops: 1,
4832                max_hops: Some(3),
4833                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4834                    variable: "a".to_string(),
4835                    label: None,
4836                    input: None,
4837                })),
4838                path_alias: None,
4839            })),
4840        }));
4841
4842        let physical = planner.plan(&logical).unwrap();
4843        assert!(physical.columns().contains(&"a".to_string()));
4844        assert!(physical.columns().contains(&"b".to_string()));
4845    }
4846
4847    #[test]
4848    fn test_plan_expand_with_path_alias() {
4849        let store = create_test_store();
4850        let planner = Planner::new(store);
4851
4852        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
4853        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4854            items: vec![
4855                ReturnItem {
4856                    expression: LogicalExpression::Variable("a".to_string()),
4857                    alias: None,
4858                },
4859                ReturnItem {
4860                    expression: LogicalExpression::Variable("b".to_string()),
4861                    alias: None,
4862                },
4863            ],
4864            distinct: false,
4865            input: Box::new(LogicalOperator::Expand(ExpandOp {
4866                from_variable: "a".to_string(),
4867                to_variable: "b".to_string(),
4868                edge_variable: None,
4869                direction: ExpandDirection::Outgoing,
4870                edge_type: Some("KNOWS".to_string()),
4871                min_hops: 1,
4872                max_hops: Some(3),
4873                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4874                    variable: "a".to_string(),
4875                    label: None,
4876                    input: None,
4877                })),
4878                path_alias: Some("p".to_string()),
4879            })),
4880        }));
4881
4882        let physical = planner.plan(&logical).unwrap();
4883        // Verify plan was created successfully with expected output columns
4884        assert!(physical.columns().contains(&"a".to_string()));
4885        assert!(physical.columns().contains(&"b".to_string()));
4886    }
4887
4888    #[test]
4889    fn test_plan_expand_incoming() {
4890        let store = create_test_store();
4891        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4892
4893        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
4894        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4895            items: vec![
4896                ReturnItem {
4897                    expression: LogicalExpression::Variable("a".to_string()),
4898                    alias: None,
4899                },
4900                ReturnItem {
4901                    expression: LogicalExpression::Variable("b".to_string()),
4902                    alias: None,
4903                },
4904            ],
4905            distinct: false,
4906            input: Box::new(LogicalOperator::Expand(ExpandOp {
4907                from_variable: "a".to_string(),
4908                to_variable: "b".to_string(),
4909                edge_variable: None,
4910                direction: ExpandDirection::Incoming,
4911                edge_type: Some("KNOWS".to_string()),
4912                min_hops: 1,
4913                max_hops: Some(1),
4914                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4915                    variable: "a".to_string(),
4916                    label: None,
4917                    input: None,
4918                })),
4919                path_alias: None,
4920            })),
4921        }));
4922
4923        let physical = planner.plan(&logical).unwrap();
4924        assert!(physical.columns().contains(&"a".to_string()));
4925        assert!(physical.columns().contains(&"b".to_string()));
4926    }
4927
4928    #[test]
4929    fn test_plan_expand_both_directions() {
4930        let store = create_test_store();
4931        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4932
4933        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
4934        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4935            items: vec![
4936                ReturnItem {
4937                    expression: LogicalExpression::Variable("a".to_string()),
4938                    alias: None,
4939                },
4940                ReturnItem {
4941                    expression: LogicalExpression::Variable("b".to_string()),
4942                    alias: None,
4943                },
4944            ],
4945            distinct: false,
4946            input: Box::new(LogicalOperator::Expand(ExpandOp {
4947                from_variable: "a".to_string(),
4948                to_variable: "b".to_string(),
4949                edge_variable: None,
4950                direction: ExpandDirection::Both,
4951                edge_type: Some("KNOWS".to_string()),
4952                min_hops: 1,
4953                max_hops: Some(1),
4954                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4955                    variable: "a".to_string(),
4956                    label: None,
4957                    input: None,
4958                })),
4959                path_alias: None,
4960            })),
4961        }));
4962
4963        let physical = planner.plan(&logical).unwrap();
4964        assert!(physical.columns().contains(&"a".to_string()));
4965        assert!(physical.columns().contains(&"b".to_string()));
4966    }
4967
4968    // ==================== With Context Tests ====================
4969
4970    #[test]
4971    fn test_planner_with_context() {
4972        use crate::transaction::TransactionManager;
4973
4974        let store = create_test_store();
4975        let tx_manager = Arc::new(TransactionManager::new());
4976        let tx_id = tx_manager.begin();
4977        let epoch = tx_manager.current_epoch();
4978
4979        let planner = Planner::with_context(
4980            Arc::clone(&store),
4981            Arc::clone(&tx_manager),
4982            Some(tx_id),
4983            epoch,
4984        );
4985
4986        assert_eq!(planner.tx_id(), Some(tx_id));
4987        assert!(planner.tx_manager().is_some());
4988        assert_eq!(planner.viewing_epoch(), epoch);
4989    }
4990
4991    #[test]
4992    fn test_planner_with_factorized_execution_disabled() {
4993        let store = create_test_store();
4994        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4995
4996        // Two consecutive expands - should NOT use factorized execution
4997        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4998            items: vec![
4999                ReturnItem {
5000                    expression: LogicalExpression::Variable("a".to_string()),
5001                    alias: None,
5002                },
5003                ReturnItem {
5004                    expression: LogicalExpression::Variable("c".to_string()),
5005                    alias: None,
5006                },
5007            ],
5008            distinct: false,
5009            input: Box::new(LogicalOperator::Expand(ExpandOp {
5010                from_variable: "b".to_string(),
5011                to_variable: "c".to_string(),
5012                edge_variable: None,
5013                direction: ExpandDirection::Outgoing,
5014                edge_type: None,
5015                min_hops: 1,
5016                max_hops: Some(1),
5017                input: Box::new(LogicalOperator::Expand(ExpandOp {
5018                    from_variable: "a".to_string(),
5019                    to_variable: "b".to_string(),
5020                    edge_variable: None,
5021                    direction: ExpandDirection::Outgoing,
5022                    edge_type: None,
5023                    min_hops: 1,
5024                    max_hops: Some(1),
5025                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5026                        variable: "a".to_string(),
5027                        label: None,
5028                        input: None,
5029                    })),
5030                    path_alias: None,
5031                })),
5032                path_alias: None,
5033            })),
5034        }));
5035
5036        let physical = planner.plan(&logical).unwrap();
5037        assert!(physical.columns().contains(&"a".to_string()));
5038        assert!(physical.columns().contains(&"c".to_string()));
5039    }
5040
5041    // ==================== Sort with Property Tests ====================
5042
5043    #[test]
5044    fn test_plan_sort_by_property() {
5045        let store = create_test_store();
5046        let planner = Planner::new(store);
5047
5048        // MATCH (n) RETURN n ORDER BY n.name ASC
5049        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5050            items: vec![ReturnItem {
5051                expression: LogicalExpression::Variable("n".to_string()),
5052                alias: None,
5053            }],
5054            distinct: false,
5055            input: Box::new(LogicalOperator::Sort(SortOp {
5056                keys: vec![SortKey {
5057                    expression: LogicalExpression::Property {
5058                        variable: "n".to_string(),
5059                        property: "name".to_string(),
5060                    },
5061                    order: SortOrder::Ascending,
5062                }],
5063                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5064                    variable: "n".to_string(),
5065                    label: None,
5066                    input: None,
5067                })),
5068            })),
5069        }));
5070
5071        let physical = planner.plan(&logical).unwrap();
5072        // Should have the property column projected
5073        assert!(physical.columns().contains(&"n".to_string()));
5074    }
5075
5076    // ==================== Scan with Input Tests ====================
5077
5078    #[test]
5079    fn test_plan_scan_with_input() {
5080        let store = create_test_store();
5081        let planner = Planner::new(store);
5082
5083        // A scan with another scan as input (for chained patterns)
5084        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
5085            items: vec![
5086                ReturnItem {
5087                    expression: LogicalExpression::Variable("a".to_string()),
5088                    alias: None,
5089                },
5090                ReturnItem {
5091                    expression: LogicalExpression::Variable("b".to_string()),
5092                    alias: None,
5093                },
5094            ],
5095            distinct: false,
5096            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
5097                variable: "b".to_string(),
5098                label: Some("Company".to_string()),
5099                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
5100                    variable: "a".to_string(),
5101                    label: Some("Person".to_string()),
5102                    input: None,
5103                }))),
5104            })),
5105        }));
5106
5107        let physical = planner.plan(&logical).unwrap();
5108        assert!(physical.columns().contains(&"a".to_string()));
5109        assert!(physical.columns().contains(&"b".to_string()));
5110    }
5111}