Skip to main content

grafeo_engine/query/
planner.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6
7use crate::query::plan::{
8    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BinaryOp,
9    CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp, ExpandDirection, ExpandOp,
10    FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator,
11    LogicalPlan, MergeOp, NodeScanOp, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
12    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
13};
14use grafeo_common::types::{EpochId, TxId};
15use grafeo_common::types::{LogicalType, Value};
16use grafeo_common::utils::error::{Error, Result};
17use grafeo_core::execution::AdaptiveContext;
18use grafeo_core::execution::operators::{
19    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr,
20    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, CreateEdgeOperator,
21    CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, DistinctOperator, EmptyOperator,
22    ExpandOperator, ExpandStep, ExpressionPredicate, FactorizedAggregate,
23    FactorizedAggregateOperator, FilterExpression, FilterOperator, HashAggregateOperator,
24    HashJoinOperator, JoinType as PhysicalJoinType, LazyFactorizedChainOperator,
25    LeapfrogJoinOperator, LimitOperator, MergeOperator, NestedLoopJoinOperator, NodeListOperator,
26    NullOrder, Operator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
27    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator, SkipOperator,
28    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnaryFilterOp, UnionOperator,
29    UnwindOperator, VariableLengthExpandOperator,
30};
31use grafeo_core::graph::{Direction, lpg::LpgStore};
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use crate::transaction::TransactionManager;
36
37/// Converts a logical plan to a physical operator tree.
38pub struct Planner {
39    /// The graph store to scan from.
40    store: Arc<LpgStore>,
41    /// Transaction manager for MVCC operations.
42    tx_manager: Option<Arc<TransactionManager>>,
43    /// Current transaction ID (if in a transaction).
44    tx_id: Option<TxId>,
45    /// Epoch to use for visibility checks.
46    viewing_epoch: EpochId,
47    /// Counter for generating unique anonymous edge column names.
48    anon_edge_counter: std::cell::Cell<u32>,
49    /// Whether to use factorized execution for multi-hop queries.
50    factorized_execution: bool,
51}
52
53impl Planner {
54    /// Creates a new planner with the given store.
55    ///
56    /// This creates a planner without transaction context, using the current
57    /// epoch from the store for visibility.
58    #[must_use]
59    pub fn new(store: Arc<LpgStore>) -> Self {
60        let epoch = store.current_epoch();
61        Self {
62            store,
63            tx_manager: None,
64            tx_id: None,
65            viewing_epoch: epoch,
66            anon_edge_counter: std::cell::Cell::new(0),
67            factorized_execution: true,
68        }
69    }
70
71    /// Creates a new planner with transaction context for MVCC-aware planning.
72    ///
73    /// # Arguments
74    ///
75    /// * `store` - The graph store
76    /// * `tx_manager` - Transaction manager for recording reads/writes
77    /// * `tx_id` - Current transaction ID (None for auto-commit)
78    /// * `viewing_epoch` - Epoch to use for version visibility
79    #[must_use]
80    pub fn with_context(
81        store: Arc<LpgStore>,
82        tx_manager: Arc<TransactionManager>,
83        tx_id: Option<TxId>,
84        viewing_epoch: EpochId,
85    ) -> Self {
86        Self {
87            store,
88            tx_manager: Some(tx_manager),
89            tx_id,
90            viewing_epoch,
91            anon_edge_counter: std::cell::Cell::new(0),
92            factorized_execution: true,
93        }
94    }
95
96    /// Returns the viewing epoch for this planner.
97    #[must_use]
98    pub fn viewing_epoch(&self) -> EpochId {
99        self.viewing_epoch
100    }
101
102    /// Returns the transaction ID for this planner, if any.
103    #[must_use]
104    pub fn tx_id(&self) -> Option<TxId> {
105        self.tx_id
106    }
107
108    /// Returns a reference to the transaction manager, if available.
109    #[must_use]
110    pub fn tx_manager(&self) -> Option<&Arc<TransactionManager>> {
111        self.tx_manager.as_ref()
112    }
113
114    /// Enables or disables factorized execution for multi-hop queries.
115    #[must_use]
116    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
117        self.factorized_execution = enabled;
118        self
119    }
120
121    /// Counts consecutive single-hop expand operations.
122    ///
123    /// Returns the count and the deepest non-expand operator (the base of the chain).
124    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
125        match op {
126            LogicalOperator::Expand(expand) => {
127                // Only count single-hop expands (factorization doesn't apply to variable-length)
128                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
129
130                if is_single_hop {
131                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
132                    (inner_count + 1, base)
133                } else {
134                    // Variable-length path breaks the chain
135                    (0, op)
136                }
137            }
138            _ => (0, op),
139        }
140    }
141
142    /// Collects expand operations from the outermost down to the base.
143    ///
144    /// Returns expands in order from innermost (base) to outermost.
145    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
146        let mut chain = Vec::new();
147        let mut current = op;
148
149        while let LogicalOperator::Expand(expand) = current {
150            // Only include single-hop expands
151            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
152            if !is_single_hop {
153                break;
154            }
155            chain.push(expand);
156            current = &expand.input;
157        }
158
159        // Reverse so we go from base to outer
160        chain.reverse();
161        chain
162    }
163
164    /// Plans a logical plan into a physical operator.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if planning fails.
169    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
170        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
171        Ok(PhysicalPlan {
172            operator,
173            columns,
174            adaptive_context: None,
175        })
176    }
177
178    /// Plans a logical plan with adaptive execution support.
179    ///
180    /// Creates cardinality checkpoints at key points in the plan (scans, filters,
181    /// joins) that can be monitored during execution to detect estimate errors.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if planning fails.
186    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
187        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
188
189        // Build adaptive context with cardinality estimates
190        let mut adaptive_context = AdaptiveContext::new();
191        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
192
193        Ok(PhysicalPlan {
194            operator,
195            columns,
196            adaptive_context: Some(adaptive_context),
197        })
198    }
199
200    /// Collects cardinality estimates from the logical plan into an adaptive context.
201    fn collect_cardinality_estimates(
202        &self,
203        op: &LogicalOperator,
204        ctx: &mut AdaptiveContext,
205        depth: usize,
206    ) {
207        match op {
208            LogicalOperator::NodeScan(scan) => {
209                // Estimate based on label statistics
210                let estimate = if let Some(label) = &scan.label {
211                    self.store.nodes_by_label(label).len() as f64
212                } else {
213                    self.store.node_count() as f64
214                };
215                let id = format!("scan_{}", scan.variable);
216                ctx.set_estimate(&id, estimate);
217
218                // Recurse into input if present
219                if let Some(input) = &scan.input {
220                    self.collect_cardinality_estimates(input, ctx, depth + 1);
221                }
222            }
223            LogicalOperator::Filter(filter) => {
224                // Default selectivity estimate for filters (30%)
225                let input_estimate = self.estimate_cardinality(&filter.input);
226                let estimate = input_estimate * 0.3;
227                let id = format!("filter_{depth}");
228                ctx.set_estimate(&id, estimate);
229
230                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
231            }
232            LogicalOperator::Expand(expand) => {
233                // Estimate based on average degree
234                let input_estimate = self.estimate_cardinality(&expand.input);
235                let avg_degree = 10.0; // Default estimate
236                let estimate = input_estimate * avg_degree;
237                let id = format!("expand_{}", expand.to_variable);
238                ctx.set_estimate(&id, estimate);
239
240                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
241            }
242            LogicalOperator::Join(join) => {
243                // Estimate join output (product with selectivity)
244                let left_est = self.estimate_cardinality(&join.left);
245                let right_est = self.estimate_cardinality(&join.right);
246                let estimate = (left_est * right_est).sqrt(); // Geometric mean as rough estimate
247                let id = format!("join_{depth}");
248                ctx.set_estimate(&id, estimate);
249
250                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
251                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
252            }
253            LogicalOperator::Aggregate(agg) => {
254                // Aggregates typically reduce cardinality
255                let input_estimate = self.estimate_cardinality(&agg.input);
256                let estimate = if agg.group_by.is_empty() {
257                    1.0 // Scalar aggregate
258                } else {
259                    (input_estimate * 0.1).max(1.0) // 10% of input as group estimate
260                };
261                let id = format!("aggregate_{depth}");
262                ctx.set_estimate(&id, estimate);
263
264                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
265            }
266            LogicalOperator::Distinct(distinct) => {
267                let input_estimate = self.estimate_cardinality(&distinct.input);
268                let estimate = (input_estimate * 0.5).max(1.0);
269                let id = format!("distinct_{depth}");
270                ctx.set_estimate(&id, estimate);
271
272                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
273            }
274            LogicalOperator::Return(ret) => {
275                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
276            }
277            LogicalOperator::Limit(limit) => {
278                let input_estimate = self.estimate_cardinality(&limit.input);
279                let estimate = (input_estimate).min(limit.count as f64);
280                let id = format!("limit_{depth}");
281                ctx.set_estimate(&id, estimate);
282
283                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
284            }
285            LogicalOperator::Skip(skip) => {
286                let input_estimate = self.estimate_cardinality(&skip.input);
287                let estimate = (input_estimate - skip.count as f64).max(0.0);
288                let id = format!("skip_{depth}");
289                ctx.set_estimate(&id, estimate);
290
291                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
292            }
293            LogicalOperator::Sort(sort) => {
294                // Sort doesn't change cardinality
295                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
296            }
297            LogicalOperator::Union(union) => {
298                let estimate: f64 = union
299                    .inputs
300                    .iter()
301                    .map(|input| self.estimate_cardinality(input))
302                    .sum();
303                let id = format!("union_{depth}");
304                ctx.set_estimate(&id, estimate);
305
306                for input in &union.inputs {
307                    self.collect_cardinality_estimates(input, ctx, depth + 1);
308                }
309            }
310            _ => {
311                // For other operators, try to recurse into known input patterns
312            }
313        }
314    }
315
316    /// Estimates cardinality for a logical operator subtree.
317    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
318        match op {
319            LogicalOperator::NodeScan(scan) => {
320                if let Some(label) = &scan.label {
321                    self.store.nodes_by_label(label).len() as f64
322                } else {
323                    self.store.node_count() as f64
324                }
325            }
326            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
327            LogicalOperator::Expand(expand) => self.estimate_cardinality(&expand.input) * 10.0,
328            LogicalOperator::Join(join) => {
329                let left = self.estimate_cardinality(&join.left);
330                let right = self.estimate_cardinality(&join.right);
331                (left * right).sqrt()
332            }
333            LogicalOperator::Aggregate(agg) => {
334                if agg.group_by.is_empty() {
335                    1.0
336                } else {
337                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
338                }
339            }
340            LogicalOperator::Distinct(distinct) => {
341                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
342            }
343            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
344            LogicalOperator::Limit(limit) => self
345                .estimate_cardinality(&limit.input)
346                .min(limit.count as f64),
347            LogicalOperator::Skip(skip) => {
348                (self.estimate_cardinality(&skip.input) - skip.count as f64).max(0.0)
349            }
350            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
351            LogicalOperator::Union(union) => union
352                .inputs
353                .iter()
354                .map(|input| self.estimate_cardinality(input))
355                .sum(),
356            _ => 1000.0, // Default estimate for unknown operators
357        }
358    }
359
360    /// Plans a single logical operator.
361    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
362        match op {
363            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
364            LogicalOperator::Expand(expand) => {
365                // Check for expand chains when factorized execution is enabled
366                if self.factorized_execution {
367                    let (chain_len, _base) = Self::count_expand_chain(op);
368                    if chain_len >= 2 {
369                        // Use factorized chain for 2+ consecutive single-hop expands
370                        return self.plan_expand_chain(op);
371                    }
372                }
373                self.plan_expand(expand)
374            }
375            LogicalOperator::Return(ret) => self.plan_return(ret),
376            LogicalOperator::Filter(filter) => self.plan_filter(filter),
377            LogicalOperator::Project(project) => self.plan_project(project),
378            LogicalOperator::Limit(limit) => self.plan_limit(limit),
379            LogicalOperator::Skip(skip) => self.plan_skip(skip),
380            LogicalOperator::Sort(sort) => self.plan_sort(sort),
381            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
382            LogicalOperator::Join(join) => self.plan_join(join),
383            LogicalOperator::Union(union) => self.plan_union(union),
384            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
385            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
386            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
387            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
388            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
389            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
390            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
391            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
392            LogicalOperator::Merge(merge) => self.plan_merge(merge),
393            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
394            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
395            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
396            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
397            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
398            _ => Err(Error::Internal(format!(
399                "Unsupported operator: {:?}",
400                std::mem::discriminant(op)
401            ))),
402        }
403    }
404
405    /// Plans a node scan operator.
406    fn plan_node_scan(&self, scan: &NodeScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
407        let scan_op = if let Some(label) = &scan.label {
408            ScanOperator::with_label(Arc::clone(&self.store), label)
409        } else {
410            ScanOperator::new(Arc::clone(&self.store))
411        };
412
413        // Apply MVCC context if available
414        let scan_operator: Box<dyn Operator> =
415            Box::new(scan_op.with_tx_context(self.viewing_epoch, self.tx_id));
416
417        // If there's an input, chain operators with a nested loop join (cross join)
418        if let Some(input) = &scan.input {
419            let (input_op, mut input_columns) = self.plan_operator(input)?;
420
421            // Build output schema: input columns + scan column
422            let mut output_schema: Vec<LogicalType> =
423                input_columns.iter().map(|_| LogicalType::Any).collect();
424            output_schema.push(LogicalType::Node);
425
426            // Add scan column to input columns
427            input_columns.push(scan.variable.clone());
428
429            // Use nested loop join to combine input rows with scanned nodes
430            let join_op = Box::new(NestedLoopJoinOperator::new(
431                input_op,
432                scan_operator,
433                None, // No join condition (cross join)
434                PhysicalJoinType::Cross,
435                output_schema,
436            ));
437
438            Ok((join_op, input_columns))
439        } else {
440            let columns = vec![scan.variable.clone()];
441            Ok((scan_operator, columns))
442        }
443    }
444
445    /// Plans an expand operator.
446    fn plan_expand(&self, expand: &ExpandOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
447        // Plan the input operator first
448        let (input_op, input_columns) = self.plan_operator(&expand.input)?;
449
450        // Find the source column index
451        let source_column = input_columns
452            .iter()
453            .position(|c| c == &expand.from_variable)
454            .ok_or_else(|| {
455                Error::Internal(format!(
456                    "Source variable '{}' not found in input columns",
457                    expand.from_variable
458                ))
459            })?;
460
461        // Convert expand direction
462        let direction = match expand.direction {
463            ExpandDirection::Outgoing => Direction::Outgoing,
464            ExpandDirection::Incoming => Direction::Incoming,
465            ExpandDirection::Both => Direction::Both,
466        };
467
468        // Check if this is a variable-length path
469        let is_variable_length =
470            expand.min_hops != 1 || expand.max_hops.is_none() || expand.max_hops != Some(1);
471
472        let operator: Box<dyn Operator> = if is_variable_length {
473            // Use VariableLengthExpandOperator for multi-hop paths
474            let max_hops = expand.max_hops.unwrap_or(expand.min_hops + 10); // Default max if unlimited
475            let mut expand_op = VariableLengthExpandOperator::new(
476                Arc::clone(&self.store),
477                input_op,
478                source_column,
479                direction,
480                expand.edge_type.clone(),
481                expand.min_hops,
482                max_hops,
483            )
484            .with_tx_context(self.viewing_epoch, self.tx_id);
485
486            // If a path alias is set, enable path length output
487            if expand.path_alias.is_some() {
488                expand_op = expand_op.with_path_length_output();
489            }
490
491            Box::new(expand_op)
492        } else {
493            // Use simple ExpandOperator for single-hop paths
494            let expand_op = ExpandOperator::new(
495                Arc::clone(&self.store),
496                input_op,
497                source_column,
498                direction,
499                expand.edge_type.clone(),
500            )
501            .with_tx_context(self.viewing_epoch, self.tx_id);
502            Box::new(expand_op)
503        };
504
505        // Build output columns: [input_columns..., edge, target, (path_length)?]
506        // Preserve all input columns and add edge + target to match ExpandOperator output
507        let mut columns = input_columns;
508
509        // Generate edge column name - use provided name or generate anonymous name
510        let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
511            let count = self.anon_edge_counter.get();
512            self.anon_edge_counter.set(count + 1);
513            format!("_anon_edge_{}", count)
514        });
515        columns.push(edge_col_name);
516
517        columns.push(expand.to_variable.clone());
518
519        // If a path alias is set, add a column for the path length
520        if let Some(ref path_alias) = expand.path_alias {
521            columns.push(format!("_path_length_{}", path_alias));
522        }
523
524        Ok((operator, columns))
525    }
526
527    /// Plans a chain of consecutive expand operations using factorized execution.
528    ///
529    /// This avoids the Cartesian product explosion that occurs with separate expands.
530    /// For a 2-hop query with degree d, this uses O(d) memory instead of O(d^2).
531    ///
532    /// The chain is executed lazily at query time, not during planning. This ensures
533    /// that any filters applied above the expand chain are properly respected.
534    fn plan_expand_chain(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
535        let expands = Self::collect_expand_chain(op);
536        if expands.is_empty() {
537            return Err(Error::Internal("Empty expand chain".to_string()));
538        }
539
540        // Get the base operator (before first expand)
541        let first_expand = expands[0];
542        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
543
544        let mut columns = base_columns.clone();
545        let mut steps = Vec::new();
546
547        // Track the level-local source column for each expand
548        // For the first expand, it's the column in the input (base_columns)
549        // For subsequent expands, the target from the previous level is always at index 1
550        // (each level adds [edge, target], so target is at index 1)
551        let mut is_first = true;
552
553        for expand in &expands {
554            // Find source column for this expand
555            let source_column = if is_first {
556                // For first expand, find in base columns
557                base_columns
558                    .iter()
559                    .position(|c| c == &expand.from_variable)
560                    .ok_or_else(|| {
561                        Error::Internal(format!(
562                            "Source variable '{}' not found in base columns",
563                            expand.from_variable
564                        ))
565                    })?
566            } else {
567                // For subsequent expands, the target from the previous level is at index 1
568                // (each level adds [edge, target], so target is the second column)
569                1
570            };
571
572            // Convert direction
573            let direction = match expand.direction {
574                ExpandDirection::Outgoing => Direction::Outgoing,
575                ExpandDirection::Incoming => Direction::Incoming,
576                ExpandDirection::Both => Direction::Both,
577            };
578
579            // Add expand step configuration
580            steps.push(ExpandStep {
581                source_column,
582                direction,
583                edge_type: expand.edge_type.clone(),
584            });
585
586            // Add edge and target columns
587            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
588                let count = self.anon_edge_counter.get();
589                self.anon_edge_counter.set(count + 1);
590                format!("_anon_edge_{}", count)
591            });
592            columns.push(edge_col_name);
593            columns.push(expand.to_variable.clone());
594
595            is_first = false;
596        }
597
598        // Create lazy operator that executes at query time, not planning time
599        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
600
601        if let Some(tx_id) = self.tx_id {
602            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
603        } else {
604            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
605        }
606
607        Ok((Box::new(lazy_op), columns))
608    }
609
610    /// Plans a RETURN clause.
611    fn plan_return(&self, ret: &ReturnOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
612        // Plan the input operator
613        let (input_op, input_columns) = self.plan_operator(&ret.input)?;
614
615        // Build variable to column index mapping
616        let variable_columns: HashMap<String, usize> = input_columns
617            .iter()
618            .enumerate()
619            .map(|(i, name)| (name.clone(), i))
620            .collect();
621
622        // Extract column names from return items
623        let columns: Vec<String> = ret
624            .items
625            .iter()
626            .map(|item| {
627                item.alias.clone().unwrap_or_else(|| {
628                    // Generate a default name from the expression
629                    expression_to_string(&item.expression)
630                })
631            })
632            .collect();
633
634        // Check if we need a project operator (for property access or expression evaluation)
635        let needs_project = ret
636            .items
637            .iter()
638            .any(|item| !matches!(&item.expression, LogicalExpression::Variable(_)));
639
640        if needs_project {
641            // Build project expressions
642            let mut projections = Vec::with_capacity(ret.items.len());
643            let mut output_types = Vec::with_capacity(ret.items.len());
644
645            for item in &ret.items {
646                match &item.expression {
647                    LogicalExpression::Variable(name) => {
648                        let col_idx = *variable_columns.get(name).ok_or_else(|| {
649                            Error::Internal(format!("Variable '{}' not found in input", name))
650                        })?;
651                        projections.push(ProjectExpr::Column(col_idx));
652                        // Use Node type for variables (they could be nodes, edges, or values)
653                        output_types.push(LogicalType::Node);
654                    }
655                    LogicalExpression::Property { variable, property } => {
656                        let col_idx = *variable_columns.get(variable).ok_or_else(|| {
657                            Error::Internal(format!("Variable '{}' not found in input", variable))
658                        })?;
659                        projections.push(ProjectExpr::PropertyAccess {
660                            column: col_idx,
661                            property: property.clone(),
662                        });
663                        // Property could be any type - use Any/Generic to preserve type
664                        output_types.push(LogicalType::Any);
665                    }
666                    LogicalExpression::Literal(value) => {
667                        projections.push(ProjectExpr::Constant(value.clone()));
668                        output_types.push(value_to_logical_type(value));
669                    }
670                    LogicalExpression::FunctionCall { name, args, .. } => {
671                        // Handle built-in functions
672                        match name.to_lowercase().as_str() {
673                            "type" => {
674                                // type(r) returns the edge type string
675                                if args.len() != 1 {
676                                    return Err(Error::Internal(
677                                        "type() requires exactly one argument".to_string(),
678                                    ));
679                                }
680                                if let LogicalExpression::Variable(var_name) = &args[0] {
681                                    let col_idx =
682                                        *variable_columns.get(var_name).ok_or_else(|| {
683                                            Error::Internal(format!(
684                                                "Variable '{}' not found in input",
685                                                var_name
686                                            ))
687                                        })?;
688                                    projections.push(ProjectExpr::EdgeType { column: col_idx });
689                                    output_types.push(LogicalType::String);
690                                } else {
691                                    return Err(Error::Internal(
692                                        "type() argument must be a variable".to_string(),
693                                    ));
694                                }
695                            }
696                            "length" => {
697                                // length(p) returns the path length
698                                // For shortestPath results, the path column already contains the length
699                                if args.len() != 1 {
700                                    return Err(Error::Internal(
701                                        "length() requires exactly one argument".to_string(),
702                                    ));
703                                }
704                                if let LogicalExpression::Variable(var_name) = &args[0] {
705                                    let col_idx =
706                                        *variable_columns.get(var_name).ok_or_else(|| {
707                                            Error::Internal(format!(
708                                                "Variable '{}' not found in input",
709                                                var_name
710                                            ))
711                                        })?;
712                                    // Pass through the column value directly
713                                    projections.push(ProjectExpr::Column(col_idx));
714                                    output_types.push(LogicalType::Int64);
715                                } else {
716                                    return Err(Error::Internal(
717                                        "length() argument must be a variable".to_string(),
718                                    ));
719                                }
720                            }
721                            // For other functions (head, tail, size, etc.), use expression evaluation
722                            _ => {
723                                let filter_expr = self.convert_expression(&item.expression)?;
724                                projections.push(ProjectExpr::Expression {
725                                    expr: filter_expr,
726                                    variable_columns: variable_columns.clone(),
727                                });
728                                output_types.push(LogicalType::Any);
729                            }
730                        }
731                    }
732                    LogicalExpression::Case { .. } => {
733                        // Convert CASE expression to FilterExpression for evaluation
734                        let filter_expr = self.convert_expression(&item.expression)?;
735                        projections.push(ProjectExpr::Expression {
736                            expr: filter_expr,
737                            variable_columns: variable_columns.clone(),
738                        });
739                        // CASE can return any type - use Any
740                        output_types.push(LogicalType::Any);
741                    }
742                    _ => {
743                        return Err(Error::Internal(format!(
744                            "Unsupported RETURN expression: {:?}",
745                            item.expression
746                        )));
747                    }
748                }
749            }
750
751            let operator = Box::new(ProjectOperator::with_store(
752                input_op,
753                projections,
754                output_types,
755                Arc::clone(&self.store),
756            ));
757
758            Ok((operator, columns))
759        } else {
760            // Simple case: just return variables
761            // Re-order columns to match return items if needed
762            let mut projections = Vec::with_capacity(ret.items.len());
763            let mut output_types = Vec::with_capacity(ret.items.len());
764
765            for item in &ret.items {
766                if let LogicalExpression::Variable(name) = &item.expression {
767                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
768                        Error::Internal(format!("Variable '{}' not found in input", name))
769                    })?;
770                    projections.push(ProjectExpr::Column(col_idx));
771                    output_types.push(LogicalType::Node);
772                }
773            }
774
775            // Only add ProjectOperator if reordering is needed
776            if projections.len() == input_columns.len()
777                && projections
778                    .iter()
779                    .enumerate()
780                    .all(|(i, p)| matches!(p, ProjectExpr::Column(c) if *c == i))
781            {
782                // No reordering needed
783                Ok((input_op, columns))
784            } else {
785                let operator = Box::new(ProjectOperator::new(input_op, projections, output_types));
786                Ok((operator, columns))
787            }
788        }
789    }
790
791    /// Plans a project operator (for WITH clause).
792    fn plan_project(
793        &self,
794        project: &crate::query::plan::ProjectOp,
795    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
796        // Handle Empty input specially (standalone WITH like: WITH [1,2,3] AS nums)
797        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
798            if matches!(project.input.as_ref(), LogicalOperator::Empty) {
799                // Create a single-row operator for projecting literals
800                let single_row_op: Box<dyn Operator> = Box::new(
801                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
802                );
803                (single_row_op, Vec::new())
804            } else {
805                self.plan_operator(&project.input)?
806            };
807
808        // Build variable to column index mapping
809        let variable_columns: HashMap<String, usize> = input_columns
810            .iter()
811            .enumerate()
812            .map(|(i, name)| (name.clone(), i))
813            .collect();
814
815        // Build projections and new column names
816        let mut projections = Vec::with_capacity(project.projections.len());
817        let mut output_types = Vec::with_capacity(project.projections.len());
818        let mut output_columns = Vec::with_capacity(project.projections.len());
819
820        for projection in &project.projections {
821            // Determine the output column name (alias or expression string)
822            let col_name = projection
823                .alias
824                .clone()
825                .unwrap_or_else(|| expression_to_string(&projection.expression));
826            output_columns.push(col_name);
827
828            match &projection.expression {
829                LogicalExpression::Variable(name) => {
830                    let col_idx = *variable_columns.get(name).ok_or_else(|| {
831                        Error::Internal(format!("Variable '{}' not found in input", name))
832                    })?;
833                    projections.push(ProjectExpr::Column(col_idx));
834                    output_types.push(LogicalType::Node);
835                }
836                LogicalExpression::Property { variable, property } => {
837                    let col_idx = *variable_columns.get(variable).ok_or_else(|| {
838                        Error::Internal(format!("Variable '{}' not found in input", variable))
839                    })?;
840                    projections.push(ProjectExpr::PropertyAccess {
841                        column: col_idx,
842                        property: property.clone(),
843                    });
844                    output_types.push(LogicalType::Any);
845                }
846                LogicalExpression::Literal(value) => {
847                    projections.push(ProjectExpr::Constant(value.clone()));
848                    output_types.push(value_to_logical_type(value));
849                }
850                _ => {
851                    // For complex expressions, use full expression evaluation
852                    let filter_expr = self.convert_expression(&projection.expression)?;
853                    projections.push(ProjectExpr::Expression {
854                        expr: filter_expr,
855                        variable_columns: variable_columns.clone(),
856                    });
857                    output_types.push(LogicalType::Any);
858                }
859            }
860        }
861
862        let operator = Box::new(ProjectOperator::with_store(
863            input_op,
864            projections,
865            output_types,
866            Arc::clone(&self.store),
867        ));
868
869        Ok((operator, output_columns))
870    }
871
872    /// Plans a filter operator.
873    ///
874    /// Uses zone map pre-filtering to potentially skip scans when predicates
875    /// definitely won't match any data. Also uses property indexes when available
876    /// for O(1) lookups instead of full scans.
877    fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
878        // Check zone maps for simple property predicates before scanning
879        // If zone map says "definitely no matches", we can short-circuit
880        if let Some(false) = self.check_zone_map_for_predicate(&filter.predicate) {
881            // Zone map says no matches possible - return empty result
882            let (_, columns) = self.plan_operator(&filter.input)?;
883            let schema = self.derive_schema_from_columns(&columns);
884            let empty_op = Box::new(EmptyOperator::new(schema));
885            return Ok((empty_op, columns));
886        }
887
888        // Try to use property index for equality predicates on indexed properties
889        if let Some(result) = self.try_plan_filter_with_property_index(filter)? {
890            return Ok(result);
891        }
892
893        // Plan the input operator first
894        let (input_op, columns) = self.plan_operator(&filter.input)?;
895
896        // Build variable to column index mapping
897        let variable_columns: HashMap<String, usize> = columns
898            .iter()
899            .enumerate()
900            .map(|(i, name)| (name.clone(), i))
901            .collect();
902
903        // Convert logical expression to filter expression
904        let filter_expr = self.convert_expression(&filter.predicate)?;
905
906        // Create the predicate
907        let predicate =
908            ExpressionPredicate::new(filter_expr, variable_columns, Arc::clone(&self.store));
909
910        // Create the filter operator
911        let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
912
913        Ok((operator, columns))
914    }
915
916    /// Checks zone maps for a predicate to see if we can skip the scan entirely.
917    ///
918    /// Returns:
919    /// - `Some(false)` if zone map proves no matches possible (can skip)
920    /// - `Some(true)` if zone map says matches might exist
921    /// - `None` if zone map check not applicable
922    fn check_zone_map_for_predicate(&self, predicate: &LogicalExpression) -> Option<bool> {
923        use grafeo_core::graph::lpg::CompareOp;
924
925        match predicate {
926            LogicalExpression::Binary { left, op, right } => {
927                // Check for AND/OR first (compound conditions)
928                match op {
929                    BinaryOp::And => {
930                        let left_result = self.check_zone_map_for_predicate(left);
931                        let right_result = self.check_zone_map_for_predicate(right);
932
933                        return match (left_result, right_result) {
934                            // If either side definitely won't match, the AND won't match
935                            (Some(false), _) | (_, Some(false)) => Some(false),
936                            // If both might match, might match overall
937                            (Some(true), Some(true)) => Some(true),
938                            // Otherwise, can't determine
939                            _ => None,
940                        };
941                    }
942                    BinaryOp::Or => {
943                        let left_result = self.check_zone_map_for_predicate(left);
944                        let right_result = self.check_zone_map_for_predicate(right);
945
946                        return match (left_result, right_result) {
947                            // Both sides definitely won't match
948                            (Some(false), Some(false)) => Some(false),
949                            // At least one side might match
950                            (Some(true), _) | (_, Some(true)) => Some(true),
951                            // Otherwise, can't determine
952                            _ => None,
953                        };
954                    }
955                    _ => {}
956                }
957
958                // Simple property comparison: n.property op value
959                let (property, compare_op, value) = match (left.as_ref(), right.as_ref()) {
960                    (
961                        LogicalExpression::Property { property, .. },
962                        LogicalExpression::Literal(val),
963                    ) => {
964                        let cmp = match op {
965                            BinaryOp::Eq => CompareOp::Eq,
966                            BinaryOp::Ne => CompareOp::Ne,
967                            BinaryOp::Lt => CompareOp::Lt,
968                            BinaryOp::Le => CompareOp::Le,
969                            BinaryOp::Gt => CompareOp::Gt,
970                            BinaryOp::Ge => CompareOp::Ge,
971                            _ => return None,
972                        };
973                        (property.clone(), cmp, val.clone())
974                    }
975                    (
976                        LogicalExpression::Literal(val),
977                        LogicalExpression::Property { property, .. },
978                    ) => {
979                        // Flip comparison for reversed operands
980                        let cmp = match op {
981                            BinaryOp::Eq => CompareOp::Eq,
982                            BinaryOp::Ne => CompareOp::Ne,
983                            BinaryOp::Lt => CompareOp::Gt, // val < prop means prop > val
984                            BinaryOp::Le => CompareOp::Ge,
985                            BinaryOp::Gt => CompareOp::Lt,
986                            BinaryOp::Ge => CompareOp::Le,
987                            _ => return None,
988                        };
989                        (property.clone(), cmp, val.clone())
990                    }
991                    _ => return None,
992                };
993
994                // Check zone map for node properties
995                let might_match =
996                    self.store
997                        .node_property_might_match(&property.into(), compare_op, &value);
998
999                Some(might_match)
1000            }
1001
1002            _ => None,
1003        }
1004    }
1005
1006    /// Tries to use a property index for filter optimization.
1007    ///
1008    /// When a filter predicate is an equality check on an indexed property,
1009    /// and the input is a simple NodeScan, we can use the index to look up
1010    /// matching nodes directly instead of scanning all nodes.
1011    ///
1012    /// Returns `Ok(Some((operator, columns)))` if optimization was applied,
1013    /// `Ok(None)` if not applicable, or `Err` on error.
1014    fn try_plan_filter_with_property_index(
1015        &self,
1016        filter: &FilterOp,
1017    ) -> Result<Option<(Box<dyn Operator>, Vec<String>)>> {
1018        // Only optimize if input is a simple NodeScan (not nested)
1019        let (scan_variable, scan_label) = match filter.input.as_ref() {
1020            LogicalOperator::NodeScan(scan) if scan.input.is_none() => {
1021                (scan.variable.clone(), scan.label.clone())
1022            }
1023            _ => return Ok(None),
1024        };
1025
1026        // Extract property equality conditions from the predicate
1027        // Handles both simple (n.prop = val) and compound (n.a = 1 AND n.b = 2)
1028        let conditions = self.extract_equality_conditions(&filter.predicate, &scan_variable);
1029
1030        if conditions.is_empty() {
1031            return Ok(None);
1032        }
1033
1034        // Check if at least one condition has an index (otherwise full scan is needed anyway)
1035        let has_indexed_condition = conditions
1036            .iter()
1037            .any(|(prop, _)| self.store.has_property_index(prop));
1038
1039        if !has_indexed_condition {
1040            return Ok(None);
1041        }
1042
1043        // Use the optimized batch lookup for multiple conditions
1044        let conditions_ref: Vec<(&str, Value)> = conditions
1045            .iter()
1046            .map(|(p, v)| (p.as_str(), v.clone()))
1047            .collect();
1048        let mut matching_nodes = self.store.find_nodes_by_properties(&conditions_ref);
1049
1050        // If there's a label filter, also filter by label
1051        if let Some(label) = &scan_label {
1052            let label_nodes: std::collections::HashSet<_> =
1053                self.store.nodes_by_label(label).into_iter().collect();
1054            matching_nodes.retain(|n| label_nodes.contains(n));
1055        }
1056
1057        // Create a NodeListOperator with the matching nodes
1058        let node_list_op = Box::new(NodeListOperator::new(matching_nodes, 2048));
1059        let columns = vec![scan_variable];
1060
1061        Ok(Some((node_list_op, columns)))
1062    }
1063
1064    /// Extracts equality conditions (property = literal) from a predicate.
1065    ///
1066    /// Handles both simple predicates and AND chains:
1067    /// - `n.name = "Alice"` → `[("name", "Alice")]`
1068    /// - `n.name = "Alice" AND n.age = 30` → `[("name", "Alice"), ("age", 30)]`
1069    fn extract_equality_conditions(
1070        &self,
1071        predicate: &LogicalExpression,
1072        target_variable: &str,
1073    ) -> Vec<(String, Value)> {
1074        let mut conditions = Vec::new();
1075        self.collect_equality_conditions(predicate, target_variable, &mut conditions);
1076        conditions
1077    }
1078
1079    /// Recursively collects equality conditions from AND expressions.
1080    fn collect_equality_conditions(
1081        &self,
1082        expr: &LogicalExpression,
1083        target_variable: &str,
1084        conditions: &mut Vec<(String, Value)>,
1085    ) {
1086        match expr {
1087            // Handle AND: recurse into both sides
1088            LogicalExpression::Binary {
1089                left,
1090                op: BinaryOp::And,
1091                right,
1092            } => {
1093                self.collect_equality_conditions(left, target_variable, conditions);
1094                self.collect_equality_conditions(right, target_variable, conditions);
1095            }
1096
1097            // Handle equality: extract property and value
1098            LogicalExpression::Binary {
1099                left,
1100                op: BinaryOp::Eq,
1101                right,
1102            } => {
1103                if let Some((var, prop, val)) = self.extract_property_equality(left, right) {
1104                    if var == target_variable {
1105                        conditions.push((prop, val));
1106                    }
1107                }
1108            }
1109
1110            _ => {}
1111        }
1112    }
1113
1114    /// Extracts (variable, property, value) from a property equality expression.
1115    fn extract_property_equality(
1116        &self,
1117        left: &LogicalExpression,
1118        right: &LogicalExpression,
1119    ) -> Option<(String, String, Value)> {
1120        match (left, right) {
1121            (
1122                LogicalExpression::Property { variable, property },
1123                LogicalExpression::Literal(val),
1124            ) => Some((variable.clone(), property.clone(), val.clone())),
1125            (
1126                LogicalExpression::Literal(val),
1127                LogicalExpression::Property { variable, property },
1128            ) => Some((variable.clone(), property.clone(), val.clone())),
1129            _ => None,
1130        }
1131    }
1132
1133    /// Plans a LIMIT operator.
1134    fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1135        let (input_op, columns) = self.plan_operator(&limit.input)?;
1136        let output_schema = self.derive_schema_from_columns(&columns);
1137        let operator = Box::new(LimitOperator::new(input_op, limit.count, output_schema));
1138        Ok((operator, columns))
1139    }
1140
1141    /// Plans a SKIP operator.
1142    fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1143        let (input_op, columns) = self.plan_operator(&skip.input)?;
1144        let output_schema = self.derive_schema_from_columns(&columns);
1145        let operator = Box::new(SkipOperator::new(input_op, skip.count, output_schema));
1146        Ok((operator, columns))
1147    }
1148
1149    /// Plans a SORT (ORDER BY) operator.
1150    fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1151        let (mut input_op, input_columns) = self.plan_operator(&sort.input)?;
1152
1153        // Build variable to column index mapping
1154        let mut variable_columns: HashMap<String, usize> = input_columns
1155            .iter()
1156            .enumerate()
1157            .map(|(i, name)| (name.clone(), i))
1158            .collect();
1159
1160        // Collect property expressions that need to be projected before sorting
1161        let mut property_projections: Vec<(String, String, String)> = Vec::new();
1162        let mut next_col_idx = input_columns.len();
1163
1164        for key in &sort.keys {
1165            if let LogicalExpression::Property { variable, property } = &key.expression {
1166                let col_name = format!("{}_{}", variable, property);
1167                if !variable_columns.contains_key(&col_name) {
1168                    property_projections.push((
1169                        variable.clone(),
1170                        property.clone(),
1171                        col_name.clone(),
1172                    ));
1173                    variable_columns.insert(col_name, next_col_idx);
1174                    next_col_idx += 1;
1175                }
1176            }
1177        }
1178
1179        // Track output columns
1180        let mut output_columns = input_columns.clone();
1181
1182        // If we have property expressions, add a projection to materialize them
1183        if !property_projections.is_empty() {
1184            let mut projections = Vec::new();
1185            let mut output_types = Vec::new();
1186
1187            // First, pass through all existing columns (use Node type to preserve node IDs
1188            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1189            for (i, _) in input_columns.iter().enumerate() {
1190                projections.push(ProjectExpr::Column(i));
1191                output_types.push(LogicalType::Node);
1192            }
1193
1194            // Then add property access projections
1195            for (variable, property, col_name) in &property_projections {
1196                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1197                    Error::Internal(format!(
1198                        "Variable '{}' not found for ORDER BY property projection",
1199                        variable
1200                    ))
1201                })?;
1202                projections.push(ProjectExpr::PropertyAccess {
1203                    column: source_col,
1204                    property: property.clone(),
1205                });
1206                output_types.push(LogicalType::Any);
1207                output_columns.push(col_name.clone());
1208            }
1209
1210            input_op = Box::new(ProjectOperator::with_store(
1211                input_op,
1212                projections,
1213                output_types,
1214                Arc::clone(&self.store),
1215            ));
1216        }
1217
1218        // Convert logical sort keys to physical sort keys
1219        let physical_keys: Vec<PhysicalSortKey> = sort
1220            .keys
1221            .iter()
1222            .map(|key| {
1223                let col_idx = self
1224                    .resolve_sort_expression_with_properties(&key.expression, &variable_columns)?;
1225                Ok(PhysicalSortKey {
1226                    column: col_idx,
1227                    direction: match key.order {
1228                        SortOrder::Ascending => SortDirection::Ascending,
1229                        SortOrder::Descending => SortDirection::Descending,
1230                    },
1231                    null_order: NullOrder::NullsLast,
1232                })
1233            })
1234            .collect::<Result<Vec<_>>>()?;
1235
1236        let output_schema = self.derive_schema_from_columns(&output_columns);
1237        let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
1238        Ok((operator, output_columns))
1239    }
1240
1241    /// Resolves a sort expression to a column index, using projected property columns.
1242    fn resolve_sort_expression_with_properties(
1243        &self,
1244        expr: &LogicalExpression,
1245        variable_columns: &HashMap<String, usize>,
1246    ) -> Result<usize> {
1247        match expr {
1248            LogicalExpression::Variable(name) => {
1249                variable_columns.get(name).copied().ok_or_else(|| {
1250                    Error::Internal(format!("Variable '{}' not found for ORDER BY", name))
1251                })
1252            }
1253            LogicalExpression::Property { variable, property } => {
1254                // Look up the projected property column (e.g., "p_age" for p.age)
1255                let col_name = format!("{}_{}", variable, property);
1256                variable_columns.get(&col_name).copied().ok_or_else(|| {
1257                    Error::Internal(format!(
1258                        "Property column '{}' not found for ORDER BY (from {}.{})",
1259                        col_name, variable, property
1260                    ))
1261                })
1262            }
1263            _ => Err(Error::Internal(format!(
1264                "Unsupported ORDER BY expression: {:?}",
1265                expr
1266            ))),
1267        }
1268    }
1269
1270    /// Derives a schema from column names (uses Any type to handle all value types).
1271    fn derive_schema_from_columns(&self, columns: &[String]) -> Vec<LogicalType> {
1272        columns.iter().map(|_| LogicalType::Any).collect()
1273    }
1274
1275    /// Plans an AGGREGATE operator.
1276    fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1277        // Check if we can use factorized aggregation for speedup
1278        // Conditions:
1279        // 1. Factorized execution is enabled
1280        // 2. Input is an expand chain (multi-hop)
1281        // 3. No GROUP BY
1282        // 4. All aggregates are simple (COUNT, SUM, AVG, MIN, MAX)
1283        if self.factorized_execution
1284            && agg.group_by.is_empty()
1285            && Self::count_expand_chain(&agg.input).0 >= 2
1286            && self.is_simple_aggregate(agg)
1287        {
1288            if let Ok((op, cols)) = self.plan_factorized_aggregate(agg) {
1289                return Ok((op, cols));
1290            }
1291            // Fall through to regular aggregate if factorized planning fails
1292        }
1293
1294        let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
1295
1296        // Build variable to column index mapping
1297        let mut variable_columns: HashMap<String, usize> = input_columns
1298            .iter()
1299            .enumerate()
1300            .map(|(i, name)| (name.clone(), i))
1301            .collect();
1302
1303        // Collect all property expressions that need to be projected before aggregation
1304        let mut property_projections: Vec<(String, String, String)> = Vec::new(); // (variable, property, new_column_name)
1305        let mut next_col_idx = input_columns.len();
1306
1307        // Check group-by expressions for properties
1308        for expr in &agg.group_by {
1309            if let LogicalExpression::Property { variable, property } = expr {
1310                let col_name = format!("{}_{}", variable, property);
1311                if !variable_columns.contains_key(&col_name) {
1312                    property_projections.push((
1313                        variable.clone(),
1314                        property.clone(),
1315                        col_name.clone(),
1316                    ));
1317                    variable_columns.insert(col_name, next_col_idx);
1318                    next_col_idx += 1;
1319                }
1320            }
1321        }
1322
1323        // Check aggregate expressions for properties
1324        for agg_expr in &agg.aggregates {
1325            if let Some(LogicalExpression::Property { variable, property }) = &agg_expr.expression {
1326                let col_name = format!("{}_{}", variable, property);
1327                if !variable_columns.contains_key(&col_name) {
1328                    property_projections.push((
1329                        variable.clone(),
1330                        property.clone(),
1331                        col_name.clone(),
1332                    ));
1333                    variable_columns.insert(col_name, next_col_idx);
1334                    next_col_idx += 1;
1335                }
1336            }
1337        }
1338
1339        // If we have property expressions, add a projection to materialize them
1340        if !property_projections.is_empty() {
1341            let mut projections = Vec::new();
1342            let mut output_types = Vec::new();
1343
1344            // First, pass through all existing columns (use Node type to preserve node IDs
1345            // for subsequent property access - nodes need VectorData::NodeId for get_node_id())
1346            for (i, _) in input_columns.iter().enumerate() {
1347                projections.push(ProjectExpr::Column(i));
1348                output_types.push(LogicalType::Node);
1349            }
1350
1351            // Then add property access projections
1352            for (variable, property, _col_name) in &property_projections {
1353                let source_col = *variable_columns.get(variable).ok_or_else(|| {
1354                    Error::Internal(format!(
1355                        "Variable '{}' not found for property projection",
1356                        variable
1357                    ))
1358                })?;
1359                projections.push(ProjectExpr::PropertyAccess {
1360                    column: source_col,
1361                    property: property.clone(),
1362                });
1363                output_types.push(LogicalType::Any); // Properties can be any type (string, int, etc.)
1364            }
1365
1366            input_op = Box::new(ProjectOperator::with_store(
1367                input_op,
1368                projections,
1369                output_types,
1370                Arc::clone(&self.store),
1371            ));
1372        }
1373
1374        // Convert group-by expressions to column indices
1375        let group_columns: Vec<usize> = agg
1376            .group_by
1377            .iter()
1378            .map(|expr| self.resolve_expression_to_column_with_properties(expr, &variable_columns))
1379            .collect::<Result<Vec<_>>>()?;
1380
1381        // Convert aggregate expressions to physical form
1382        let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
1383            .aggregates
1384            .iter()
1385            .map(|agg_expr| {
1386                let column = agg_expr
1387                    .expression
1388                    .as_ref()
1389                    .map(|e| {
1390                        self.resolve_expression_to_column_with_properties(e, &variable_columns)
1391                    })
1392                    .transpose()?;
1393
1394                Ok(PhysicalAggregateExpr {
1395                    function: convert_aggregate_function(agg_expr.function),
1396                    column,
1397                    distinct: agg_expr.distinct,
1398                    alias: agg_expr.alias.clone(),
1399                    percentile: agg_expr.percentile,
1400                })
1401            })
1402            .collect::<Result<Vec<_>>>()?;
1403
1404        // Build output schema and column names
1405        let mut output_schema = Vec::new();
1406        let mut output_columns = Vec::new();
1407
1408        // Add group-by columns
1409        for expr in &agg.group_by {
1410            output_schema.push(LogicalType::Any); // Group-by values can be any type
1411            output_columns.push(expression_to_string(expr));
1412        }
1413
1414        // Add aggregate result columns
1415        for agg_expr in &agg.aggregates {
1416            let result_type = match agg_expr.function {
1417                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1418                    LogicalType::Int64
1419                }
1420                LogicalAggregateFunction::Sum => LogicalType::Int64,
1421                LogicalAggregateFunction::Avg => LogicalType::Float64,
1422                LogicalAggregateFunction::Min | LogicalAggregateFunction::Max => {
1423                    // MIN/MAX preserve input type; use Int64 as default for numeric comparisons
1424                    // since the aggregate can return any Value type, but the most common case
1425                    // is numeric values from property expressions
1426                    LogicalType::Int64
1427                }
1428                LogicalAggregateFunction::Collect => LogicalType::Any, // List type (using Any since List is a complex type)
1429                // Statistical functions return Float64
1430                LogicalAggregateFunction::StdDev
1431                | LogicalAggregateFunction::StdDevPop
1432                | LogicalAggregateFunction::PercentileDisc
1433                | LogicalAggregateFunction::PercentileCont => LogicalType::Float64,
1434            };
1435            output_schema.push(result_type);
1436            output_columns.push(
1437                agg_expr
1438                    .alias
1439                    .clone()
1440                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
1441            );
1442        }
1443
1444        // Choose operator based on whether there are group-by columns
1445        let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
1446            Box::new(SimpleAggregateOperator::new(
1447                input_op,
1448                physical_aggregates,
1449                output_schema,
1450            ))
1451        } else {
1452            Box::new(HashAggregateOperator::new(
1453                input_op,
1454                group_columns,
1455                physical_aggregates,
1456                output_schema,
1457            ))
1458        };
1459
1460        // Apply HAVING clause filter if present
1461        if let Some(having_expr) = &agg.having {
1462            // Build variable to column mapping for the aggregate output
1463            let having_var_columns: HashMap<String, usize> = output_columns
1464                .iter()
1465                .enumerate()
1466                .map(|(i, name)| (name.clone(), i))
1467                .collect();
1468
1469            let filter_expr = self.convert_expression(having_expr)?;
1470            let predicate =
1471                ExpressionPredicate::new(filter_expr, having_var_columns, Arc::clone(&self.store));
1472            operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
1473        }
1474
1475        Ok((operator, output_columns))
1476    }
1477
1478    /// Checks if an aggregate is simple enough for factorized execution.
1479    ///
1480    /// Simple aggregates:
1481    /// - COUNT(*) or COUNT(variable)
1482    /// - SUM, AVG, MIN, MAX on variables (not properties for now)
1483    fn is_simple_aggregate(&self, agg: &AggregateOp) -> bool {
1484        agg.aggregates.iter().all(|agg_expr| {
1485            match agg_expr.function {
1486                LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1487                    // COUNT(*) is always OK, COUNT(var) is OK
1488                    agg_expr.expression.is_none()
1489                        || matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1490                }
1491                LogicalAggregateFunction::Sum
1492                | LogicalAggregateFunction::Avg
1493                | LogicalAggregateFunction::Min
1494                | LogicalAggregateFunction::Max => {
1495                    // For now, only support when expression is a variable
1496                    // (property access would require flattening first)
1497                    matches!(&agg_expr.expression, Some(LogicalExpression::Variable(_)))
1498                }
1499                // Other aggregates (Collect, StdDev, Percentile) not supported in factorized form
1500                _ => false,
1501            }
1502        })
1503    }
1504
1505    /// Plans a factorized aggregate that operates directly on factorized data.
1506    ///
1507    /// This avoids the O(n²) cost of flattening before aggregation.
1508    fn plan_factorized_aggregate(
1509        &self,
1510        agg: &AggregateOp,
1511    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
1512        // Build the expand chain - this returns a LazyFactorizedChainOperator
1513        let expands = Self::collect_expand_chain(&agg.input);
1514        if expands.is_empty() {
1515            return Err(Error::Internal(
1516                "Expected expand chain for factorized aggregate".to_string(),
1517            ));
1518        }
1519
1520        // Get the base operator (before first expand)
1521        let first_expand = expands[0];
1522        let (base_op, base_columns) = self.plan_operator(&first_expand.input)?;
1523
1524        let mut columns = base_columns.clone();
1525        let mut steps = Vec::new();
1526        let mut is_first = true;
1527
1528        for expand in &expands {
1529            // Find source column for this expand
1530            let source_column = if is_first {
1531                base_columns
1532                    .iter()
1533                    .position(|c| c == &expand.from_variable)
1534                    .ok_or_else(|| {
1535                        Error::Internal(format!(
1536                            "Source variable '{}' not found in base columns",
1537                            expand.from_variable
1538                        ))
1539                    })?
1540            } else {
1541                1 // Target from previous level
1542            };
1543
1544            let direction = match expand.direction {
1545                ExpandDirection::Outgoing => Direction::Outgoing,
1546                ExpandDirection::Incoming => Direction::Incoming,
1547                ExpandDirection::Both => Direction::Both,
1548            };
1549
1550            steps.push(ExpandStep {
1551                source_column,
1552                direction,
1553                edge_type: expand.edge_type.clone(),
1554            });
1555
1556            let edge_col_name = expand.edge_variable.clone().unwrap_or_else(|| {
1557                let count = self.anon_edge_counter.get();
1558                self.anon_edge_counter.set(count + 1);
1559                format!("_anon_edge_{}", count)
1560            });
1561            columns.push(edge_col_name);
1562            columns.push(expand.to_variable.clone());
1563
1564            is_first = false;
1565        }
1566
1567        // Create the lazy factorized chain operator
1568        let mut lazy_op = LazyFactorizedChainOperator::new(Arc::clone(&self.store), base_op, steps);
1569
1570        if let Some(tx_id) = self.tx_id {
1571            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, Some(tx_id));
1572        } else {
1573            lazy_op = lazy_op.with_tx_context(self.viewing_epoch, None);
1574        }
1575
1576        // Convert logical aggregates to factorized aggregates
1577        let factorized_aggs: Vec<FactorizedAggregate> = agg
1578            .aggregates
1579            .iter()
1580            .map(|agg_expr| {
1581                match agg_expr.function {
1582                    LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
1583                        // COUNT(*) uses simple count, COUNT(col) uses column count
1584                        if agg_expr.expression.is_none() {
1585                            FactorizedAggregate::count()
1586                        } else {
1587                            // For COUNT(variable), we use the deepest level's target column
1588                            // which is the last column added to the schema
1589                            FactorizedAggregate::count_column(1) // Target is at index 1 in deepest level
1590                        }
1591                    }
1592                    LogicalAggregateFunction::Sum => {
1593                        // SUM on deepest level target
1594                        FactorizedAggregate::sum(1)
1595                    }
1596                    LogicalAggregateFunction::Avg => FactorizedAggregate::avg(1),
1597                    LogicalAggregateFunction::Min => FactorizedAggregate::min(1),
1598                    LogicalAggregateFunction::Max => FactorizedAggregate::max(1),
1599                    _ => {
1600                        // Shouldn't reach here due to is_simple_aggregate check
1601                        FactorizedAggregate::count()
1602                    }
1603                }
1604            })
1605            .collect();
1606
1607        // Build output column names
1608        let output_columns: Vec<String> = agg
1609            .aggregates
1610            .iter()
1611            .map(|agg_expr| {
1612                agg_expr
1613                    .alias
1614                    .clone()
1615                    .unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase())
1616            })
1617            .collect();
1618
1619        // Create the factorized aggregate operator
1620        let factorized_agg_op = FactorizedAggregateOperator::new(lazy_op, factorized_aggs);
1621
1622        Ok((Box::new(factorized_agg_op), output_columns))
1623    }
1624
1625    /// Resolves a logical expression to a column index.
1626    #[allow(dead_code)]
1627    fn resolve_expression_to_column(
1628        &self,
1629        expr: &LogicalExpression,
1630        variable_columns: &HashMap<String, usize>,
1631    ) -> Result<usize> {
1632        match expr {
1633            LogicalExpression::Variable(name) => variable_columns
1634                .get(name)
1635                .copied()
1636                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1637            LogicalExpression::Property { variable, .. } => variable_columns
1638                .get(variable)
1639                .copied()
1640                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", variable))),
1641            _ => Err(Error::Internal(format!(
1642                "Cannot resolve expression to column: {:?}",
1643                expr
1644            ))),
1645        }
1646    }
1647
1648    /// Resolves a logical expression to a column index, using projected property columns.
1649    ///
1650    /// This is used for aggregations where properties have been projected into their own columns.
1651    fn resolve_expression_to_column_with_properties(
1652        &self,
1653        expr: &LogicalExpression,
1654        variable_columns: &HashMap<String, usize>,
1655    ) -> Result<usize> {
1656        match expr {
1657            LogicalExpression::Variable(name) => variable_columns
1658                .get(name)
1659                .copied()
1660                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
1661            LogicalExpression::Property { variable, property } => {
1662                // Look up the projected property column (e.g., "p_price" for p.price)
1663                let col_name = format!("{}_{}", variable, property);
1664                variable_columns.get(&col_name).copied().ok_or_else(|| {
1665                    Error::Internal(format!(
1666                        "Property column '{}' not found (from {}.{})",
1667                        col_name, variable, property
1668                    ))
1669                })
1670            }
1671            _ => Err(Error::Internal(format!(
1672                "Cannot resolve expression to column: {:?}",
1673                expr
1674            ))),
1675        }
1676    }
1677
1678    /// Converts a logical expression to a filter expression.
1679    fn convert_expression(&self, expr: &LogicalExpression) -> Result<FilterExpression> {
1680        match expr {
1681            LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
1682            LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
1683            LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
1684                variable: variable.clone(),
1685                property: property.clone(),
1686            }),
1687            LogicalExpression::Binary { left, op, right } => {
1688                let left_expr = self.convert_expression(left)?;
1689                let right_expr = self.convert_expression(right)?;
1690                let filter_op = convert_binary_op(*op)?;
1691                Ok(FilterExpression::Binary {
1692                    left: Box::new(left_expr),
1693                    op: filter_op,
1694                    right: Box::new(right_expr),
1695                })
1696            }
1697            LogicalExpression::Unary { op, operand } => {
1698                let operand_expr = self.convert_expression(operand)?;
1699                let filter_op = convert_unary_op(*op)?;
1700                Ok(FilterExpression::Unary {
1701                    op: filter_op,
1702                    operand: Box::new(operand_expr),
1703                })
1704            }
1705            LogicalExpression::FunctionCall { name, args, .. } => {
1706                let filter_args: Vec<FilterExpression> = args
1707                    .iter()
1708                    .map(|a| self.convert_expression(a))
1709                    .collect::<Result<Vec<_>>>()?;
1710                Ok(FilterExpression::FunctionCall {
1711                    name: name.clone(),
1712                    args: filter_args,
1713                })
1714            }
1715            LogicalExpression::Case {
1716                operand,
1717                when_clauses,
1718                else_clause,
1719            } => {
1720                let filter_operand = operand
1721                    .as_ref()
1722                    .map(|e| self.convert_expression(e))
1723                    .transpose()?
1724                    .map(Box::new);
1725                let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
1726                    .iter()
1727                    .map(|(cond, result)| {
1728                        Ok((
1729                            self.convert_expression(cond)?,
1730                            self.convert_expression(result)?,
1731                        ))
1732                    })
1733                    .collect::<Result<Vec<_>>>()?;
1734                let filter_else = else_clause
1735                    .as_ref()
1736                    .map(|e| self.convert_expression(e))
1737                    .transpose()?
1738                    .map(Box::new);
1739                Ok(FilterExpression::Case {
1740                    operand: filter_operand,
1741                    when_clauses: filter_when_clauses,
1742                    else_clause: filter_else,
1743                })
1744            }
1745            LogicalExpression::List(items) => {
1746                let filter_items: Vec<FilterExpression> = items
1747                    .iter()
1748                    .map(|item| self.convert_expression(item))
1749                    .collect::<Result<Vec<_>>>()?;
1750                Ok(FilterExpression::List(filter_items))
1751            }
1752            LogicalExpression::Map(pairs) => {
1753                let filter_pairs: Vec<(String, FilterExpression)> = pairs
1754                    .iter()
1755                    .map(|(k, v)| Ok((k.clone(), self.convert_expression(v)?)))
1756                    .collect::<Result<Vec<_>>>()?;
1757                Ok(FilterExpression::Map(filter_pairs))
1758            }
1759            LogicalExpression::IndexAccess { base, index } => {
1760                let base_expr = self.convert_expression(base)?;
1761                let index_expr = self.convert_expression(index)?;
1762                Ok(FilterExpression::IndexAccess {
1763                    base: Box::new(base_expr),
1764                    index: Box::new(index_expr),
1765                })
1766            }
1767            LogicalExpression::SliceAccess { base, start, end } => {
1768                let base_expr = self.convert_expression(base)?;
1769                let start_expr = start
1770                    .as_ref()
1771                    .map(|s| self.convert_expression(s))
1772                    .transpose()?
1773                    .map(Box::new);
1774                let end_expr = end
1775                    .as_ref()
1776                    .map(|e| self.convert_expression(e))
1777                    .transpose()?
1778                    .map(Box::new);
1779                Ok(FilterExpression::SliceAccess {
1780                    base: Box::new(base_expr),
1781                    start: start_expr,
1782                    end: end_expr,
1783                })
1784            }
1785            LogicalExpression::Parameter(_) => Err(Error::Internal(
1786                "Parameters not yet supported in filters".to_string(),
1787            )),
1788            LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
1789            LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
1790            LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
1791            LogicalExpression::ListComprehension {
1792                variable,
1793                list_expr,
1794                filter_expr,
1795                map_expr,
1796            } => {
1797                let list = self.convert_expression(list_expr)?;
1798                let filter = filter_expr
1799                    .as_ref()
1800                    .map(|f| self.convert_expression(f))
1801                    .transpose()?
1802                    .map(Box::new);
1803                let map = self.convert_expression(map_expr)?;
1804                Ok(FilterExpression::ListComprehension {
1805                    variable: variable.clone(),
1806                    list_expr: Box::new(list),
1807                    filter_expr: filter,
1808                    map_expr: Box::new(map),
1809                })
1810            }
1811            LogicalExpression::ExistsSubquery(subplan) => {
1812                // Extract the pattern from the subplan
1813                // For EXISTS { MATCH (n)-[:TYPE]->() }, we extract start_var, direction, edge_type
1814                let (start_var, direction, edge_type, end_labels) =
1815                    self.extract_exists_pattern(subplan)?;
1816
1817                Ok(FilterExpression::ExistsSubquery {
1818                    start_var,
1819                    direction,
1820                    edge_type,
1821                    end_labels,
1822                    min_hops: None,
1823                    max_hops: None,
1824                })
1825            }
1826            LogicalExpression::CountSubquery(_) => Err(Error::Internal(
1827                "COUNT subqueries not yet supported".to_string(),
1828            )),
1829        }
1830    }
1831
1832    /// Extracts the pattern from an EXISTS subplan.
1833    /// Returns (start_variable, direction, edge_type, end_labels).
1834    fn extract_exists_pattern(
1835        &self,
1836        subplan: &LogicalOperator,
1837    ) -> Result<(String, Direction, Option<String>, Option<Vec<String>>)> {
1838        match subplan {
1839            LogicalOperator::Expand(expand) => {
1840                // Get end node labels from the to_variable if there's a node scan input
1841                let end_labels = self.extract_end_labels_from_expand(expand);
1842                let direction = match expand.direction {
1843                    ExpandDirection::Outgoing => Direction::Outgoing,
1844                    ExpandDirection::Incoming => Direction::Incoming,
1845                    ExpandDirection::Both => Direction::Both,
1846                };
1847                Ok((
1848                    expand.from_variable.clone(),
1849                    direction,
1850                    expand.edge_type.clone(),
1851                    end_labels,
1852                ))
1853            }
1854            LogicalOperator::NodeScan(scan) => {
1855                if let Some(input) = &scan.input {
1856                    self.extract_exists_pattern(input)
1857                } else {
1858                    Err(Error::Internal(
1859                        "EXISTS subquery must contain an edge pattern".to_string(),
1860                    ))
1861                }
1862            }
1863            LogicalOperator::Filter(filter) => self.extract_exists_pattern(&filter.input),
1864            _ => Err(Error::Internal(
1865                "Unsupported EXISTS subquery pattern".to_string(),
1866            )),
1867        }
1868    }
1869
1870    /// Extracts end node labels from an Expand operator if present.
1871    fn extract_end_labels_from_expand(&self, expand: &ExpandOp) -> Option<Vec<String>> {
1872        // Check if the expand has a NodeScan input with a label filter
1873        match expand.input.as_ref() {
1874            LogicalOperator::NodeScan(scan) => scan.label.clone().map(|l| vec![l]),
1875            _ => None,
1876        }
1877    }
1878
1879    /// Plans a JOIN operator.
1880    fn plan_join(&self, join: &JoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
1881        let (left_op, left_columns) = self.plan_operator(&join.left)?;
1882        let (right_op, right_columns) = self.plan_operator(&join.right)?;
1883
1884        // Build combined output columns
1885        let mut columns = left_columns.clone();
1886        columns.extend(right_columns.clone());
1887
1888        // Convert join type
1889        let physical_join_type = match join.join_type {
1890            JoinType::Inner => PhysicalJoinType::Inner,
1891            JoinType::Left => PhysicalJoinType::Left,
1892            JoinType::Right => PhysicalJoinType::Right,
1893            JoinType::Full => PhysicalJoinType::Full,
1894            JoinType::Cross => PhysicalJoinType::Cross,
1895            JoinType::Semi => PhysicalJoinType::Semi,
1896            JoinType::Anti => PhysicalJoinType::Anti,
1897        };
1898
1899        // Build key columns from join conditions
1900        let (probe_keys, build_keys): (Vec<usize>, Vec<usize>) = if join.conditions.is_empty() {
1901            // Cross join - no keys
1902            (vec![], vec![])
1903        } else {
1904            join.conditions
1905                .iter()
1906                .filter_map(|cond| {
1907                    // Try to extract column indices from expressions
1908                    let left_idx = self.expression_to_column(&cond.left, &left_columns).ok()?;
1909                    let right_idx = self
1910                        .expression_to_column(&cond.right, &right_columns)
1911                        .ok()?;
1912                    Some((left_idx, right_idx))
1913                })
1914                .unzip()
1915        };
1916
1917        let output_schema = self.derive_schema_from_columns(&columns);
1918
1919        // Check if we should use leapfrog join for cyclic patterns
1920        // Currently we use hash join by default; leapfrog is available but
1921        // requires explicit multi-way join detection which will be added
1922        // when we have proper cyclic pattern detection in the optimizer.
1923        // For now, LeapfrogJoinOperator is available for direct use.
1924        let _ = LeapfrogJoinOperator::new; // Suppress unused warning
1925
1926        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
1927            left_op,
1928            right_op,
1929            probe_keys,
1930            build_keys,
1931            physical_join_type,
1932            output_schema,
1933        ));
1934
1935        Ok((operator, columns))
1936    }
1937
1938    /// Checks if a join pattern is cyclic (e.g., triangle, clique).
1939    ///
1940    /// A cyclic pattern occurs when join conditions reference variables
1941    /// that create a cycle in the join graph. For example, a triangle
1942    /// pattern (a)->(b)->(c)->(a) creates a cycle.
1943    ///
1944    /// Returns true if the join graph contains at least one cycle with 3+ nodes,
1945    /// indicating potential for worst-case optimal join (WCOJ) optimization.
1946    #[allow(dead_code)]
1947    fn is_cyclic_join_pattern(&self, join: &JoinOp) -> bool {
1948        // Build adjacency list for join variables
1949        let mut edges: HashMap<String, Vec<String>> = HashMap::new();
1950        let mut all_vars: std::collections::HashSet<String> = std::collections::HashSet::new();
1951
1952        // Collect edges from join conditions
1953        Self::collect_join_edges(
1954            &LogicalOperator::Join(join.clone()),
1955            &mut edges,
1956            &mut all_vars,
1957        );
1958
1959        // Need at least 3 variables to form a cycle
1960        if all_vars.len() < 3 {
1961            return false;
1962        }
1963
1964        // Detect cycle using DFS with coloring
1965        Self::has_cycle(&edges, &all_vars)
1966    }
1967
1968    /// Collects edges from join conditions into an adjacency list.
1969    fn collect_join_edges(
1970        op: &LogicalOperator,
1971        edges: &mut HashMap<String, Vec<String>>,
1972        vars: &mut std::collections::HashSet<String>,
1973    ) {
1974        match op {
1975            LogicalOperator::Join(join) => {
1976                // Process join conditions
1977                for cond in &join.conditions {
1978                    if let (Some(left_var), Some(right_var)) = (
1979                        Self::extract_join_variable(&cond.left),
1980                        Self::extract_join_variable(&cond.right),
1981                    ) {
1982                        if left_var != right_var {
1983                            vars.insert(left_var.clone());
1984                            vars.insert(right_var.clone());
1985
1986                            // Add bidirectional edge
1987                            edges
1988                                .entry(left_var.clone())
1989                                .or_default()
1990                                .push(right_var.clone());
1991                            edges.entry(right_var).or_default().push(left_var);
1992                        }
1993                    }
1994                }
1995
1996                // Recurse into children
1997                Self::collect_join_edges(&join.left, edges, vars);
1998                Self::collect_join_edges(&join.right, edges, vars);
1999            }
2000            LogicalOperator::Expand(expand) => {
2001                // Expand creates implicit join between from_variable and to_variable
2002                vars.insert(expand.from_variable.clone());
2003                vars.insert(expand.to_variable.clone());
2004
2005                edges
2006                    .entry(expand.from_variable.clone())
2007                    .or_default()
2008                    .push(expand.to_variable.clone());
2009                edges
2010                    .entry(expand.to_variable.clone())
2011                    .or_default()
2012                    .push(expand.from_variable.clone());
2013
2014                Self::collect_join_edges(&expand.input, edges, vars);
2015            }
2016            LogicalOperator::Filter(filter) => {
2017                Self::collect_join_edges(&filter.input, edges, vars);
2018            }
2019            LogicalOperator::NodeScan(scan) => {
2020                vars.insert(scan.variable.clone());
2021            }
2022            _ => {}
2023        }
2024    }
2025
2026    /// Extracts the variable name from a join expression.
2027    fn extract_join_variable(expr: &LogicalExpression) -> Option<String> {
2028        match expr {
2029            LogicalExpression::Variable(v) => Some(v.clone()),
2030            LogicalExpression::Property { variable, .. } => Some(variable.clone()),
2031            LogicalExpression::Id(v) => Some(v.clone()),
2032            _ => None,
2033        }
2034    }
2035
2036    /// Detects if the graph has a cycle using DFS coloring.
2037    ///
2038    /// Colors: 0 = white (unvisited), 1 = gray (in progress), 2 = black (done)
2039    fn has_cycle(
2040        edges: &HashMap<String, Vec<String>>,
2041        vars: &std::collections::HashSet<String>,
2042    ) -> bool {
2043        let mut color: HashMap<&String, u8> = HashMap::new();
2044
2045        for var in vars {
2046            color.insert(var, 0);
2047        }
2048
2049        for start in vars {
2050            if color[start] == 0 {
2051                if Self::dfs_cycle(start, None, edges, &mut color) {
2052                    return true;
2053                }
2054            }
2055        }
2056
2057        false
2058    }
2059
2060    /// DFS helper for cycle detection.
2061    fn dfs_cycle(
2062        node: &String,
2063        parent: Option<&String>,
2064        edges: &HashMap<String, Vec<String>>,
2065        color: &mut HashMap<&String, u8>,
2066    ) -> bool {
2067        *color.get_mut(node).unwrap() = 1; // Gray
2068
2069        if let Some(neighbors) = edges.get(node) {
2070            for neighbor in neighbors {
2071                // Skip the edge back to parent (undirected graph)
2072                if parent == Some(neighbor) {
2073                    continue;
2074                }
2075
2076                if let Some(&c) = color.get(neighbor) {
2077                    if c == 1 {
2078                        // Found a back edge - cycle detected
2079                        return true;
2080                    }
2081                    if c == 0 && Self::dfs_cycle(neighbor, Some(node), edges, color) {
2082                        return true;
2083                    }
2084                }
2085            }
2086        }
2087
2088        *color.get_mut(node).unwrap() = 2; // Black
2089        false
2090    }
2091
2092    /// Counts the number of base relations in a logical operator tree.
2093    #[allow(dead_code)]
2094    fn count_relations(op: &LogicalOperator) -> usize {
2095        match op {
2096            LogicalOperator::NodeScan(_) | LogicalOperator::EdgeScan(_) => 1,
2097            LogicalOperator::Expand(e) => Self::count_relations(&e.input),
2098            LogicalOperator::Filter(f) => Self::count_relations(&f.input),
2099            LogicalOperator::Join(j) => {
2100                Self::count_relations(&j.left) + Self::count_relations(&j.right)
2101            }
2102            _ => 0,
2103        }
2104    }
2105
2106    /// Extracts a column index from an expression.
2107    fn expression_to_column(&self, expr: &LogicalExpression, columns: &[String]) -> Result<usize> {
2108        match expr {
2109            LogicalExpression::Variable(name) => columns
2110                .iter()
2111                .position(|c| c == name)
2112                .ok_or_else(|| Error::Internal(format!("Variable '{}' not found", name))),
2113            _ => Err(Error::Internal(
2114                "Only variables supported in join conditions".to_string(),
2115            )),
2116        }
2117    }
2118
2119    /// Plans a UNION operator.
2120    fn plan_union(&self, union: &UnionOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2121        if union.inputs.is_empty() {
2122            return Err(Error::Internal(
2123                "Union requires at least one input".to_string(),
2124            ));
2125        }
2126
2127        let mut inputs = Vec::with_capacity(union.inputs.len());
2128        let mut columns = Vec::new();
2129
2130        for (i, input) in union.inputs.iter().enumerate() {
2131            let (op, cols) = self.plan_operator(input)?;
2132            if i == 0 {
2133                columns = cols;
2134            }
2135            inputs.push(op);
2136        }
2137
2138        let output_schema = self.derive_schema_from_columns(&columns);
2139        let operator = Box::new(UnionOperator::new(inputs, output_schema));
2140
2141        Ok((operator, columns))
2142    }
2143
2144    /// Plans a DISTINCT operator.
2145    fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2146        let (input_op, columns) = self.plan_operator(&distinct.input)?;
2147        let output_schema = self.derive_schema_from_columns(&columns);
2148        let operator = Box::new(DistinctOperator::new(input_op, output_schema));
2149        Ok((operator, columns))
2150    }
2151
2152    /// Plans a CREATE NODE operator.
2153    fn plan_create_node(&self, create: &CreateNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2154        // Plan input if present
2155        let (input_op, mut columns) = if let Some(ref input) = create.input {
2156            let (op, cols) = self.plan_operator(input)?;
2157            (Some(op), cols)
2158        } else {
2159            (None, vec![])
2160        };
2161
2162        // Output column for the created node
2163        let output_column = columns.len();
2164        columns.push(create.variable.clone());
2165
2166        // Convert properties
2167        let properties: Vec<(String, PropertySource)> = create
2168            .properties
2169            .iter()
2170            .map(|(name, expr)| {
2171                let source = match expr {
2172                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2173                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2174                };
2175                (name.clone(), source)
2176            })
2177            .collect();
2178
2179        let output_schema = self.derive_schema_from_columns(&columns);
2180
2181        let operator = Box::new(
2182            CreateNodeOperator::new(
2183                Arc::clone(&self.store),
2184                input_op,
2185                create.labels.clone(),
2186                properties,
2187                output_schema,
2188                output_column,
2189            )
2190            .with_tx_context(self.viewing_epoch, self.tx_id),
2191        );
2192
2193        Ok((operator, columns))
2194    }
2195
2196    /// Plans a CREATE EDGE operator.
2197    fn plan_create_edge(&self, create: &CreateEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2198        let (input_op, mut columns) = self.plan_operator(&create.input)?;
2199
2200        // Find source and target columns
2201        let from_column = columns
2202            .iter()
2203            .position(|c| c == &create.from_variable)
2204            .ok_or_else(|| {
2205                Error::Internal(format!(
2206                    "Source variable '{}' not found",
2207                    create.from_variable
2208                ))
2209            })?;
2210
2211        let to_column = columns
2212            .iter()
2213            .position(|c| c == &create.to_variable)
2214            .ok_or_else(|| {
2215                Error::Internal(format!(
2216                    "Target variable '{}' not found",
2217                    create.to_variable
2218                ))
2219            })?;
2220
2221        // Output column for the created edge (if named)
2222        let output_column = create.variable.as_ref().map(|v| {
2223            let idx = columns.len();
2224            columns.push(v.clone());
2225            idx
2226        });
2227
2228        // Convert properties
2229        let properties: Vec<(String, PropertySource)> = create
2230            .properties
2231            .iter()
2232            .map(|(name, expr)| {
2233                let source = match expr {
2234                    LogicalExpression::Literal(v) => PropertySource::Constant(v.clone()),
2235                    _ => PropertySource::Constant(grafeo_common::types::Value::Null),
2236                };
2237                (name.clone(), source)
2238            })
2239            .collect();
2240
2241        let output_schema = self.derive_schema_from_columns(&columns);
2242
2243        let operator = Box::new(
2244            CreateEdgeOperator::new(
2245                Arc::clone(&self.store),
2246                input_op,
2247                from_column,
2248                to_column,
2249                create.edge_type.clone(),
2250                properties,
2251                output_schema,
2252                output_column,
2253            )
2254            .with_tx_context(self.viewing_epoch, self.tx_id),
2255        );
2256
2257        Ok((operator, columns))
2258    }
2259
2260    /// Plans a DELETE NODE operator.
2261    fn plan_delete_node(&self, delete: &DeleteNodeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2262        let (input_op, columns) = self.plan_operator(&delete.input)?;
2263
2264        let node_column = columns
2265            .iter()
2266            .position(|c| c == &delete.variable)
2267            .ok_or_else(|| {
2268                Error::Internal(format!(
2269                    "Variable '{}' not found for delete",
2270                    delete.variable
2271                ))
2272            })?;
2273
2274        // Output schema for delete count
2275        let output_schema = vec![LogicalType::Int64];
2276        let output_columns = vec!["deleted_count".to_string()];
2277
2278        let operator = Box::new(
2279            DeleteNodeOperator::new(
2280                Arc::clone(&self.store),
2281                input_op,
2282                node_column,
2283                output_schema,
2284                delete.detach, // DETACH DELETE deletes connected edges first
2285            )
2286            .with_tx_context(self.viewing_epoch, self.tx_id),
2287        );
2288
2289        Ok((operator, output_columns))
2290    }
2291
2292    /// Plans a DELETE EDGE operator.
2293    fn plan_delete_edge(&self, delete: &DeleteEdgeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2294        let (input_op, columns) = self.plan_operator(&delete.input)?;
2295
2296        let edge_column = columns
2297            .iter()
2298            .position(|c| c == &delete.variable)
2299            .ok_or_else(|| {
2300                Error::Internal(format!(
2301                    "Variable '{}' not found for delete",
2302                    delete.variable
2303                ))
2304            })?;
2305
2306        // Output schema for delete count
2307        let output_schema = vec![LogicalType::Int64];
2308        let output_columns = vec!["deleted_count".to_string()];
2309
2310        let operator = Box::new(
2311            DeleteEdgeOperator::new(
2312                Arc::clone(&self.store),
2313                input_op,
2314                edge_column,
2315                output_schema,
2316            )
2317            .with_tx_context(self.viewing_epoch, self.tx_id),
2318        );
2319
2320        Ok((operator, output_columns))
2321    }
2322
2323    /// Plans a LEFT JOIN operator (for OPTIONAL MATCH).
2324    fn plan_left_join(&self, left_join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2325        let (left_op, left_columns) = self.plan_operator(&left_join.left)?;
2326        let (right_op, right_columns) = self.plan_operator(&left_join.right)?;
2327
2328        // Build combined output columns (left + right)
2329        let mut columns = left_columns.clone();
2330        columns.extend(right_columns.clone());
2331
2332        // Find common variables between left and right for join keys
2333        let mut probe_keys = Vec::new();
2334        let mut build_keys = Vec::new();
2335
2336        for (right_idx, right_col) in right_columns.iter().enumerate() {
2337            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2338                probe_keys.push(left_idx);
2339                build_keys.push(right_idx);
2340            }
2341        }
2342
2343        let output_schema = self.derive_schema_from_columns(&columns);
2344
2345        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2346            left_op,
2347            right_op,
2348            probe_keys,
2349            build_keys,
2350            PhysicalJoinType::Left,
2351            output_schema,
2352        ));
2353
2354        Ok((operator, columns))
2355    }
2356
2357    /// Plans an ANTI JOIN operator (for WHERE NOT EXISTS patterns).
2358    fn plan_anti_join(&self, anti_join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2359        let (left_op, left_columns) = self.plan_operator(&anti_join.left)?;
2360        let (right_op, right_columns) = self.plan_operator(&anti_join.right)?;
2361
2362        // Anti-join only keeps left columns (filters out matching rows)
2363        let columns = left_columns.clone();
2364
2365        // Find common variables between left and right for join keys
2366        let mut probe_keys = Vec::new();
2367        let mut build_keys = Vec::new();
2368
2369        for (right_idx, right_col) in right_columns.iter().enumerate() {
2370            if let Some(left_idx) = left_columns.iter().position(|c| c == right_col) {
2371                probe_keys.push(left_idx);
2372                build_keys.push(right_idx);
2373            }
2374        }
2375
2376        let output_schema = self.derive_schema_from_columns(&columns);
2377
2378        let operator: Box<dyn Operator> = Box::new(HashJoinOperator::new(
2379            left_op,
2380            right_op,
2381            probe_keys,
2382            build_keys,
2383            PhysicalJoinType::Anti,
2384            output_schema,
2385        ));
2386
2387        Ok((operator, columns))
2388    }
2389
2390    /// Plans an unwind operator.
2391    fn plan_unwind(&self, unwind: &UnwindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2392        // Plan the input operator first
2393        // Handle Empty specially - use a single-row operator
2394        let (input_op, input_columns): (Box<dyn Operator>, Vec<String>) =
2395            if matches!(&*unwind.input, LogicalOperator::Empty) {
2396                // For UNWIND without prior MATCH, create a single-row input
2397                // We need an operator that produces one row with the list to unwind
2398                // For now, use EmptyScan which produces no rows - we'll handle the literal
2399                // list in the unwind operator itself
2400                let literal_list = self.convert_expression(&unwind.expression)?;
2401
2402                // Create a project operator that produces a single row with the list
2403                let single_row_op: Box<dyn Operator> = Box::new(
2404                    grafeo_core::execution::operators::single_row::SingleRowOperator::new(),
2405                );
2406                let project_op: Box<dyn Operator> = Box::new(ProjectOperator::with_store(
2407                    single_row_op,
2408                    vec![ProjectExpr::Expression {
2409                        expr: literal_list,
2410                        variable_columns: HashMap::new(),
2411                    }],
2412                    vec![LogicalType::Any],
2413                    Arc::clone(&self.store),
2414                ));
2415
2416                (project_op, vec!["__list__".to_string()])
2417            } else {
2418                self.plan_operator(&unwind.input)?
2419            };
2420
2421        // The UNWIND expression should be a list - we need to find/evaluate it
2422        // For now, we handle the case where the expression references an existing column
2423        // or is a literal list
2424
2425        // Find if the expression references an existing column (like a list property)
2426        let list_col_idx = match &unwind.expression {
2427            LogicalExpression::Variable(var) => input_columns.iter().position(|c| c == var),
2428            LogicalExpression::Property { variable, .. } => {
2429                // Property access needs to be evaluated - for now we'll need the filter predicate
2430                // to evaluate this. For simple cases, we treat it as a list column.
2431                input_columns.iter().position(|c| c == variable)
2432            }
2433            LogicalExpression::List(_) | LogicalExpression::Literal(_) => {
2434                // Literal list expression - we'll add it as a virtual column
2435                None
2436            }
2437            _ => None,
2438        };
2439
2440        // Build output columns: all input columns plus the new variable
2441        let mut columns = input_columns.clone();
2442        columns.push(unwind.variable.clone());
2443
2444        // Build output schema
2445        let mut output_schema = self.derive_schema_from_columns(&input_columns);
2446        output_schema.push(LogicalType::Any); // The unwound element type is dynamic
2447
2448        // Use the list column index if found, otherwise default to 0
2449        // (in which case the first column should contain the list)
2450        let col_idx = list_col_idx.unwrap_or(0);
2451
2452        let operator: Box<dyn Operator> = Box::new(UnwindOperator::new(
2453            input_op,
2454            col_idx,
2455            unwind.variable.clone(),
2456            output_schema,
2457        ));
2458
2459        Ok((operator, columns))
2460    }
2461
2462    /// Plans a MERGE operator.
2463    fn plan_merge(&self, merge: &MergeOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2464        // Plan the input operator if present (skip if Empty)
2465        let mut columns = if matches!(merge.input.as_ref(), LogicalOperator::Empty) {
2466            Vec::new()
2467        } else {
2468            let (_input_op, cols) = self.plan_operator(&merge.input)?;
2469            cols
2470        };
2471
2472        // Convert match properties from LogicalExpression to Value
2473        let match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2474            .match_properties
2475            .iter()
2476            .filter_map(|(name, expr)| {
2477                if let LogicalExpression::Literal(v) = expr {
2478                    Some((name.clone(), v.clone()))
2479                } else {
2480                    None // Skip non-literal expressions for now
2481                }
2482            })
2483            .collect();
2484
2485        // Convert ON CREATE properties
2486        let on_create_properties: Vec<(String, grafeo_common::types::Value)> = merge
2487            .on_create
2488            .iter()
2489            .filter_map(|(name, expr)| {
2490                if let LogicalExpression::Literal(v) = expr {
2491                    Some((name.clone(), v.clone()))
2492                } else {
2493                    None
2494                }
2495            })
2496            .collect();
2497
2498        // Convert ON MATCH properties
2499        let on_match_properties: Vec<(String, grafeo_common::types::Value)> = merge
2500            .on_match
2501            .iter()
2502            .filter_map(|(name, expr)| {
2503                if let LogicalExpression::Literal(v) = expr {
2504                    Some((name.clone(), v.clone()))
2505                } else {
2506                    None
2507                }
2508            })
2509            .collect();
2510
2511        // Add the merged node variable to output columns
2512        columns.push(merge.variable.clone());
2513
2514        let operator: Box<dyn Operator> = Box::new(MergeOperator::new(
2515            Arc::clone(&self.store),
2516            merge.variable.clone(),
2517            merge.labels.clone(),
2518            match_properties,
2519            on_create_properties,
2520            on_match_properties,
2521        ));
2522
2523        Ok((operator, columns))
2524    }
2525
2526    /// Plans a SHORTEST PATH operator.
2527    fn plan_shortest_path(&self, sp: &ShortestPathOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2528        // Plan the input operator
2529        let (input_op, mut columns) = self.plan_operator(&sp.input)?;
2530
2531        // Find source and target node columns
2532        let source_column = columns
2533            .iter()
2534            .position(|c| c == &sp.source_var)
2535            .ok_or_else(|| {
2536                Error::Internal(format!(
2537                    "Source variable '{}' not found for shortestPath",
2538                    sp.source_var
2539                ))
2540            })?;
2541
2542        let target_column = columns
2543            .iter()
2544            .position(|c| c == &sp.target_var)
2545            .ok_or_else(|| {
2546                Error::Internal(format!(
2547                    "Target variable '{}' not found for shortestPath",
2548                    sp.target_var
2549                ))
2550            })?;
2551
2552        // Convert direction
2553        let direction = match sp.direction {
2554            ExpandDirection::Outgoing => Direction::Outgoing,
2555            ExpandDirection::Incoming => Direction::Incoming,
2556            ExpandDirection::Both => Direction::Both,
2557        };
2558
2559        // Create the shortest path operator
2560        let operator: Box<dyn Operator> = Box::new(
2561            ShortestPathOperator::new(
2562                Arc::clone(&self.store),
2563                input_op,
2564                source_column,
2565                target_column,
2566                sp.edge_type.clone(),
2567                direction,
2568            )
2569            .with_all_paths(sp.all_paths),
2570        );
2571
2572        // Add path length column with the expected naming convention
2573        // The translator expects _path_length_{alias} format for length(p) calls
2574        columns.push(format!("_path_length_{}", sp.path_alias));
2575
2576        Ok((operator, columns))
2577    }
2578
2579    /// Plans an ADD LABEL operator.
2580    fn plan_add_label(&self, add_label: &AddLabelOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
2581        let (input_op, columns) = self.plan_operator(&add_label.input)?;
2582
2583        // Find the node column
2584        let node_column = columns
2585            .iter()
2586            .position(|c| c == &add_label.variable)
2587            .ok_or_else(|| {
2588                Error::Internal(format!(
2589                    "Variable '{}' not found for ADD LABEL",
2590                    add_label.variable
2591                ))
2592            })?;
2593
2594        // Output schema for update count
2595        let output_schema = vec![LogicalType::Int64];
2596        let output_columns = vec!["labels_added".to_string()];
2597
2598        let operator = Box::new(AddLabelOperator::new(
2599            Arc::clone(&self.store),
2600            input_op,
2601            node_column,
2602            add_label.labels.clone(),
2603            output_schema,
2604        ));
2605
2606        Ok((operator, output_columns))
2607    }
2608
2609    /// Plans a REMOVE LABEL operator.
2610    fn plan_remove_label(
2611        &self,
2612        remove_label: &RemoveLabelOp,
2613    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2614        let (input_op, columns) = self.plan_operator(&remove_label.input)?;
2615
2616        // Find the node column
2617        let node_column = columns
2618            .iter()
2619            .position(|c| c == &remove_label.variable)
2620            .ok_or_else(|| {
2621                Error::Internal(format!(
2622                    "Variable '{}' not found for REMOVE LABEL",
2623                    remove_label.variable
2624                ))
2625            })?;
2626
2627        // Output schema for update count
2628        let output_schema = vec![LogicalType::Int64];
2629        let output_columns = vec!["labels_removed".to_string()];
2630
2631        let operator = Box::new(RemoveLabelOperator::new(
2632            Arc::clone(&self.store),
2633            input_op,
2634            node_column,
2635            remove_label.labels.clone(),
2636            output_schema,
2637        ));
2638
2639        Ok((operator, output_columns))
2640    }
2641
2642    /// Plans a SET PROPERTY operator.
2643    fn plan_set_property(
2644        &self,
2645        set_prop: &SetPropertyOp,
2646    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
2647        let (input_op, columns) = self.plan_operator(&set_prop.input)?;
2648
2649        // Find the entity column (node or edge variable)
2650        let entity_column = columns
2651            .iter()
2652            .position(|c| c == &set_prop.variable)
2653            .ok_or_else(|| {
2654                Error::Internal(format!(
2655                    "Variable '{}' not found for SET",
2656                    set_prop.variable
2657                ))
2658            })?;
2659
2660        // Convert properties to PropertySource
2661        let properties: Vec<(String, PropertySource)> = set_prop
2662            .properties
2663            .iter()
2664            .map(|(name, expr)| {
2665                let source = self.expression_to_property_source(expr, &columns)?;
2666                Ok((name.clone(), source))
2667            })
2668            .collect::<Result<Vec<_>>>()?;
2669
2670        // Output schema preserves input schema (passes through)
2671        let output_schema: Vec<LogicalType> = columns.iter().map(|_| LogicalType::Node).collect();
2672        let output_columns = columns.clone();
2673
2674        // Determine if this is a node or edge (for now assume node, edge detection can be added later)
2675        let operator = Box::new(SetPropertyOperator::new_for_node(
2676            Arc::clone(&self.store),
2677            input_op,
2678            entity_column,
2679            properties,
2680            output_schema,
2681        ));
2682
2683        Ok((operator, output_columns))
2684    }
2685
2686    /// Converts a logical expression to a PropertySource.
2687    fn expression_to_property_source(
2688        &self,
2689        expr: &LogicalExpression,
2690        columns: &[String],
2691    ) -> Result<PropertySource> {
2692        match expr {
2693            LogicalExpression::Literal(value) => Ok(PropertySource::Constant(value.clone())),
2694            LogicalExpression::Variable(name) => {
2695                let col_idx = columns.iter().position(|c| c == name).ok_or_else(|| {
2696                    Error::Internal(format!("Variable '{}' not found for property source", name))
2697                })?;
2698                Ok(PropertySource::Column(col_idx))
2699            }
2700            LogicalExpression::Parameter(name) => {
2701                // Parameters should be resolved before planning
2702                // For now, treat as a placeholder
2703                Ok(PropertySource::Constant(
2704                    grafeo_common::types::Value::String(format!("${}", name).into()),
2705                ))
2706            }
2707            _ => Err(Error::Internal(format!(
2708                "Unsupported expression type for property source: {:?}",
2709                expr
2710            ))),
2711        }
2712    }
2713}
2714
2715/// Converts a logical binary operator to a filter binary operator.
2716pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
2717    match op {
2718        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
2719        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
2720        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
2721        BinaryOp::Le => Ok(BinaryFilterOp::Le),
2722        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
2723        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
2724        BinaryOp::And => Ok(BinaryFilterOp::And),
2725        BinaryOp::Or => Ok(BinaryFilterOp::Or),
2726        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
2727        BinaryOp::Add => Ok(BinaryFilterOp::Add),
2728        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
2729        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
2730        BinaryOp::Div => Ok(BinaryFilterOp::Div),
2731        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
2732        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
2733        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
2734        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
2735        BinaryOp::In => Ok(BinaryFilterOp::In),
2736        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
2737        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
2738        BinaryOp::Concat | BinaryOp::Like => Err(Error::Internal(format!(
2739            "Binary operator {:?} not yet supported in filters",
2740            op
2741        ))),
2742    }
2743}
2744
2745/// Converts a logical unary operator to a filter unary operator.
2746pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
2747    match op {
2748        UnaryOp::Not => Ok(UnaryFilterOp::Not),
2749        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
2750        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
2751        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
2752    }
2753}
2754
2755/// Converts a logical aggregate function to a physical aggregate function.
2756pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
2757    match func {
2758        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
2759        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
2760        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
2761        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
2762        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
2763        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
2764        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
2765        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
2766        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
2767        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
2768        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
2769    }
2770}
2771
2772/// Converts a logical expression to a filter expression.
2773///
2774/// This is a standalone function that can be used by both LPG and RDF planners.
2775pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
2776    match expr {
2777        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
2778        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
2779        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
2780            variable: variable.clone(),
2781            property: property.clone(),
2782        }),
2783        LogicalExpression::Binary { left, op, right } => {
2784            let left_expr = convert_filter_expression(left)?;
2785            let right_expr = convert_filter_expression(right)?;
2786            let filter_op = convert_binary_op(*op)?;
2787            Ok(FilterExpression::Binary {
2788                left: Box::new(left_expr),
2789                op: filter_op,
2790                right: Box::new(right_expr),
2791            })
2792        }
2793        LogicalExpression::Unary { op, operand } => {
2794            let operand_expr = convert_filter_expression(operand)?;
2795            let filter_op = convert_unary_op(*op)?;
2796            Ok(FilterExpression::Unary {
2797                op: filter_op,
2798                operand: Box::new(operand_expr),
2799            })
2800        }
2801        LogicalExpression::FunctionCall { name, args, .. } => {
2802            let filter_args: Vec<FilterExpression> = args
2803                .iter()
2804                .map(|a| convert_filter_expression(a))
2805                .collect::<Result<Vec<_>>>()?;
2806            Ok(FilterExpression::FunctionCall {
2807                name: name.clone(),
2808                args: filter_args,
2809            })
2810        }
2811        LogicalExpression::Case {
2812            operand,
2813            when_clauses,
2814            else_clause,
2815        } => {
2816            let filter_operand = operand
2817                .as_ref()
2818                .map(|e| convert_filter_expression(e))
2819                .transpose()?
2820                .map(Box::new);
2821            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
2822                .iter()
2823                .map(|(cond, result)| {
2824                    Ok((
2825                        convert_filter_expression(cond)?,
2826                        convert_filter_expression(result)?,
2827                    ))
2828                })
2829                .collect::<Result<Vec<_>>>()?;
2830            let filter_else = else_clause
2831                .as_ref()
2832                .map(|e| convert_filter_expression(e))
2833                .transpose()?
2834                .map(Box::new);
2835            Ok(FilterExpression::Case {
2836                operand: filter_operand,
2837                when_clauses: filter_when_clauses,
2838                else_clause: filter_else,
2839            })
2840        }
2841        LogicalExpression::List(items) => {
2842            let filter_items: Vec<FilterExpression> = items
2843                .iter()
2844                .map(|item| convert_filter_expression(item))
2845                .collect::<Result<Vec<_>>>()?;
2846            Ok(FilterExpression::List(filter_items))
2847        }
2848        LogicalExpression::Map(pairs) => {
2849            let filter_pairs: Vec<(String, FilterExpression)> = pairs
2850                .iter()
2851                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
2852                .collect::<Result<Vec<_>>>()?;
2853            Ok(FilterExpression::Map(filter_pairs))
2854        }
2855        LogicalExpression::IndexAccess { base, index } => {
2856            let base_expr = convert_filter_expression(base)?;
2857            let index_expr = convert_filter_expression(index)?;
2858            Ok(FilterExpression::IndexAccess {
2859                base: Box::new(base_expr),
2860                index: Box::new(index_expr),
2861            })
2862        }
2863        LogicalExpression::SliceAccess { base, start, end } => {
2864            let base_expr = convert_filter_expression(base)?;
2865            let start_expr = start
2866                .as_ref()
2867                .map(|s| convert_filter_expression(s))
2868                .transpose()?
2869                .map(Box::new);
2870            let end_expr = end
2871                .as_ref()
2872                .map(|e| convert_filter_expression(e))
2873                .transpose()?
2874                .map(Box::new);
2875            Ok(FilterExpression::SliceAccess {
2876                base: Box::new(base_expr),
2877                start: start_expr,
2878                end: end_expr,
2879            })
2880        }
2881        LogicalExpression::Parameter(_) => Err(Error::Internal(
2882            "Parameters not yet supported in filters".to_string(),
2883        )),
2884        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
2885        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
2886        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
2887        LogicalExpression::ListComprehension {
2888            variable,
2889            list_expr,
2890            filter_expr,
2891            map_expr,
2892        } => {
2893            let list = convert_filter_expression(list_expr)?;
2894            let filter = filter_expr
2895                .as_ref()
2896                .map(|f| convert_filter_expression(f))
2897                .transpose()?
2898                .map(Box::new);
2899            let map = convert_filter_expression(map_expr)?;
2900            Ok(FilterExpression::ListComprehension {
2901                variable: variable.clone(),
2902                list_expr: Box::new(list),
2903                filter_expr: filter,
2904                map_expr: Box::new(map),
2905            })
2906        }
2907        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => Err(
2908            Error::Internal("Subqueries not yet supported in filters".to_string()),
2909        ),
2910    }
2911}
2912
2913/// Infers the logical type from a value.
2914fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
2915    use grafeo_common::types::Value;
2916    match value {
2917        Value::Null => LogicalType::String, // Default type for null
2918        Value::Bool(_) => LogicalType::Bool,
2919        Value::Int64(_) => LogicalType::Int64,
2920        Value::Float64(_) => LogicalType::Float64,
2921        Value::String(_) => LogicalType::String,
2922        Value::Bytes(_) => LogicalType::String, // No Bytes logical type, use String
2923        Value::Timestamp(_) => LogicalType::Timestamp,
2924        Value::List(_) => LogicalType::String, // Lists not yet supported as logical type
2925        Value::Map(_) => LogicalType::String,  // Maps not yet supported as logical type
2926    }
2927}
2928
2929/// Converts an expression to a string for column naming.
2930fn expression_to_string(expr: &LogicalExpression) -> String {
2931    match expr {
2932        LogicalExpression::Variable(name) => name.clone(),
2933        LogicalExpression::Property { variable, property } => {
2934            format!("{variable}.{property}")
2935        }
2936        LogicalExpression::Literal(value) => format!("{value:?}"),
2937        LogicalExpression::FunctionCall { name, .. } => format!("{name}(...)"),
2938        _ => "expr".to_string(),
2939    }
2940}
2941
2942/// A physical plan ready for execution.
2943pub struct PhysicalPlan {
2944    /// The root physical operator.
2945    pub operator: Box<dyn Operator>,
2946    /// Column names for the result.
2947    pub columns: Vec<String>,
2948    /// Adaptive execution context with cardinality estimates.
2949    ///
2950    /// When adaptive execution is enabled, this context contains estimated
2951    /// cardinalities at various checkpoints in the plan. During execution,
2952    /// actual row counts are recorded and compared against estimates.
2953    pub adaptive_context: Option<AdaptiveContext>,
2954}
2955
2956impl PhysicalPlan {
2957    /// Returns the column names.
2958    #[must_use]
2959    pub fn columns(&self) -> &[String] {
2960        &self.columns
2961    }
2962
2963    /// Consumes the plan and returns the operator.
2964    pub fn into_operator(self) -> Box<dyn Operator> {
2965        self.operator
2966    }
2967
2968    /// Returns the adaptive context, if adaptive execution is enabled.
2969    #[must_use]
2970    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
2971        self.adaptive_context.as_ref()
2972    }
2973
2974    /// Takes ownership of the adaptive context.
2975    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
2976        self.adaptive_context.take()
2977    }
2978}
2979
2980/// Helper operator that returns a single result chunk once.
2981///
2982/// Used by the factorized expand chain to wrap the final result.
2983#[allow(dead_code)]
2984struct SingleResultOperator {
2985    result: Option<grafeo_core::execution::DataChunk>,
2986}
2987
2988impl SingleResultOperator {
2989    #[allow(dead_code)]
2990    fn new(result: Option<grafeo_core::execution::DataChunk>) -> Self {
2991        Self { result }
2992    }
2993}
2994
2995impl Operator for SingleResultOperator {
2996    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
2997        Ok(self.result.take())
2998    }
2999
3000    fn reset(&mut self) {
3001        // Cannot reset - result is consumed
3002    }
3003
3004    fn name(&self) -> &'static str {
3005        "SingleResult"
3006    }
3007}
3008
3009#[cfg(test)]
3010mod tests {
3011    use super::*;
3012    use crate::query::plan::{
3013        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
3014        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
3015        LimitOp as LogicalLimitOp, NodeScanOp, ReturnItem, ReturnOp, SkipOp as LogicalSkipOp,
3016        SortKey, SortOp,
3017    };
3018    use grafeo_common::types::Value;
3019
3020    fn create_test_store() -> Arc<LpgStore> {
3021        let store = Arc::new(LpgStore::new());
3022        store.create_node(&["Person"]);
3023        store.create_node(&["Person"]);
3024        store.create_node(&["Company"]);
3025        store
3026    }
3027
3028    // ==================== Simple Scan Tests ====================
3029
3030    #[test]
3031    fn test_plan_simple_scan() {
3032        let store = create_test_store();
3033        let planner = Planner::new(store);
3034
3035        // MATCH (n:Person) RETURN n
3036        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3037            items: vec![ReturnItem {
3038                expression: LogicalExpression::Variable("n".to_string()),
3039                alias: None,
3040            }],
3041            distinct: false,
3042            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3043                variable: "n".to_string(),
3044                label: Some("Person".to_string()),
3045                input: None,
3046            })),
3047        }));
3048
3049        let physical = planner.plan(&logical).unwrap();
3050        assert_eq!(physical.columns(), &["n"]);
3051    }
3052
3053    #[test]
3054    fn test_plan_scan_without_label() {
3055        let store = create_test_store();
3056        let planner = Planner::new(store);
3057
3058        // MATCH (n) RETURN n
3059        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3060            items: vec![ReturnItem {
3061                expression: LogicalExpression::Variable("n".to_string()),
3062                alias: None,
3063            }],
3064            distinct: false,
3065            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3066                variable: "n".to_string(),
3067                label: None,
3068                input: None,
3069            })),
3070        }));
3071
3072        let physical = planner.plan(&logical).unwrap();
3073        assert_eq!(physical.columns(), &["n"]);
3074    }
3075
3076    #[test]
3077    fn test_plan_return_with_alias() {
3078        let store = create_test_store();
3079        let planner = Planner::new(store);
3080
3081        // MATCH (n:Person) RETURN n AS person
3082        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3083            items: vec![ReturnItem {
3084                expression: LogicalExpression::Variable("n".to_string()),
3085                alias: Some("person".to_string()),
3086            }],
3087            distinct: false,
3088            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3089                variable: "n".to_string(),
3090                label: Some("Person".to_string()),
3091                input: None,
3092            })),
3093        }));
3094
3095        let physical = planner.plan(&logical).unwrap();
3096        assert_eq!(physical.columns(), &["person"]);
3097    }
3098
3099    #[test]
3100    fn test_plan_return_property() {
3101        let store = create_test_store();
3102        let planner = Planner::new(store);
3103
3104        // MATCH (n:Person) RETURN n.name
3105        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3106            items: vec![ReturnItem {
3107                expression: LogicalExpression::Property {
3108                    variable: "n".to_string(),
3109                    property: "name".to_string(),
3110                },
3111                alias: None,
3112            }],
3113            distinct: false,
3114            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3115                variable: "n".to_string(),
3116                label: Some("Person".to_string()),
3117                input: None,
3118            })),
3119        }));
3120
3121        let physical = planner.plan(&logical).unwrap();
3122        assert_eq!(physical.columns(), &["n.name"]);
3123    }
3124
3125    #[test]
3126    fn test_plan_return_literal() {
3127        let store = create_test_store();
3128        let planner = Planner::new(store);
3129
3130        // MATCH (n) RETURN 42 AS answer
3131        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3132            items: vec![ReturnItem {
3133                expression: LogicalExpression::Literal(Value::Int64(42)),
3134                alias: Some("answer".to_string()),
3135            }],
3136            distinct: false,
3137            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3138                variable: "n".to_string(),
3139                label: None,
3140                input: None,
3141            })),
3142        }));
3143
3144        let physical = planner.plan(&logical).unwrap();
3145        assert_eq!(physical.columns(), &["answer"]);
3146    }
3147
3148    // ==================== Filter Tests ====================
3149
3150    #[test]
3151    fn test_plan_filter_equality() {
3152        let store = create_test_store();
3153        let planner = Planner::new(store);
3154
3155        // MATCH (n:Person) WHERE n.age = 30 RETURN n
3156        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3157            items: vec![ReturnItem {
3158                expression: LogicalExpression::Variable("n".to_string()),
3159                alias: None,
3160            }],
3161            distinct: false,
3162            input: Box::new(LogicalOperator::Filter(FilterOp {
3163                predicate: LogicalExpression::Binary {
3164                    left: Box::new(LogicalExpression::Property {
3165                        variable: "n".to_string(),
3166                        property: "age".to_string(),
3167                    }),
3168                    op: BinaryOp::Eq,
3169                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
3170                },
3171                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3172                    variable: "n".to_string(),
3173                    label: Some("Person".to_string()),
3174                    input: None,
3175                })),
3176            })),
3177        }));
3178
3179        let physical = planner.plan(&logical).unwrap();
3180        assert_eq!(physical.columns(), &["n"]);
3181    }
3182
3183    #[test]
3184    fn test_plan_filter_compound_and() {
3185        let store = create_test_store();
3186        let planner = Planner::new(store);
3187
3188        // WHERE n.age > 20 AND n.age < 40
3189        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3190            items: vec![ReturnItem {
3191                expression: LogicalExpression::Variable("n".to_string()),
3192                alias: None,
3193            }],
3194            distinct: false,
3195            input: Box::new(LogicalOperator::Filter(FilterOp {
3196                predicate: LogicalExpression::Binary {
3197                    left: Box::new(LogicalExpression::Binary {
3198                        left: Box::new(LogicalExpression::Property {
3199                            variable: "n".to_string(),
3200                            property: "age".to_string(),
3201                        }),
3202                        op: BinaryOp::Gt,
3203                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
3204                    }),
3205                    op: BinaryOp::And,
3206                    right: Box::new(LogicalExpression::Binary {
3207                        left: Box::new(LogicalExpression::Property {
3208                            variable: "n".to_string(),
3209                            property: "age".to_string(),
3210                        }),
3211                        op: BinaryOp::Lt,
3212                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
3213                    }),
3214                },
3215                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3216                    variable: "n".to_string(),
3217                    label: None,
3218                    input: None,
3219                })),
3220            })),
3221        }));
3222
3223        let physical = planner.plan(&logical).unwrap();
3224        assert_eq!(physical.columns(), &["n"]);
3225    }
3226
3227    #[test]
3228    fn test_plan_filter_unary_not() {
3229        let store = create_test_store();
3230        let planner = Planner::new(store);
3231
3232        // WHERE NOT n.active
3233        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3234            items: vec![ReturnItem {
3235                expression: LogicalExpression::Variable("n".to_string()),
3236                alias: None,
3237            }],
3238            distinct: false,
3239            input: Box::new(LogicalOperator::Filter(FilterOp {
3240                predicate: LogicalExpression::Unary {
3241                    op: UnaryOp::Not,
3242                    operand: Box::new(LogicalExpression::Property {
3243                        variable: "n".to_string(),
3244                        property: "active".to_string(),
3245                    }),
3246                },
3247                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3248                    variable: "n".to_string(),
3249                    label: None,
3250                    input: None,
3251                })),
3252            })),
3253        }));
3254
3255        let physical = planner.plan(&logical).unwrap();
3256        assert_eq!(physical.columns(), &["n"]);
3257    }
3258
3259    #[test]
3260    fn test_plan_filter_is_null() {
3261        let store = create_test_store();
3262        let planner = Planner::new(store);
3263
3264        // WHERE n.email IS NULL
3265        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3266            items: vec![ReturnItem {
3267                expression: LogicalExpression::Variable("n".to_string()),
3268                alias: None,
3269            }],
3270            distinct: false,
3271            input: Box::new(LogicalOperator::Filter(FilterOp {
3272                predicate: LogicalExpression::Unary {
3273                    op: UnaryOp::IsNull,
3274                    operand: Box::new(LogicalExpression::Property {
3275                        variable: "n".to_string(),
3276                        property: "email".to_string(),
3277                    }),
3278                },
3279                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3280                    variable: "n".to_string(),
3281                    label: None,
3282                    input: None,
3283                })),
3284            })),
3285        }));
3286
3287        let physical = planner.plan(&logical).unwrap();
3288        assert_eq!(physical.columns(), &["n"]);
3289    }
3290
3291    #[test]
3292    fn test_plan_filter_function_call() {
3293        let store = create_test_store();
3294        let planner = Planner::new(store);
3295
3296        // WHERE size(n.friends) > 0
3297        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3298            items: vec![ReturnItem {
3299                expression: LogicalExpression::Variable("n".to_string()),
3300                alias: None,
3301            }],
3302            distinct: false,
3303            input: Box::new(LogicalOperator::Filter(FilterOp {
3304                predicate: LogicalExpression::Binary {
3305                    left: Box::new(LogicalExpression::FunctionCall {
3306                        name: "size".to_string(),
3307                        args: vec![LogicalExpression::Property {
3308                            variable: "n".to_string(),
3309                            property: "friends".to_string(),
3310                        }],
3311                        distinct: false,
3312                    }),
3313                    op: BinaryOp::Gt,
3314                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
3315                },
3316                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3317                    variable: "n".to_string(),
3318                    label: None,
3319                    input: None,
3320                })),
3321            })),
3322        }));
3323
3324        let physical = planner.plan(&logical).unwrap();
3325        assert_eq!(physical.columns(), &["n"]);
3326    }
3327
3328    // ==================== Expand Tests ====================
3329
3330    #[test]
3331    fn test_plan_expand_outgoing() {
3332        let store = create_test_store();
3333        let planner = Planner::new(store);
3334
3335        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
3336        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3337            items: vec![
3338                ReturnItem {
3339                    expression: LogicalExpression::Variable("a".to_string()),
3340                    alias: None,
3341                },
3342                ReturnItem {
3343                    expression: LogicalExpression::Variable("b".to_string()),
3344                    alias: None,
3345                },
3346            ],
3347            distinct: false,
3348            input: Box::new(LogicalOperator::Expand(ExpandOp {
3349                from_variable: "a".to_string(),
3350                to_variable: "b".to_string(),
3351                edge_variable: None,
3352                direction: ExpandDirection::Outgoing,
3353                edge_type: Some("KNOWS".to_string()),
3354                min_hops: 1,
3355                max_hops: Some(1),
3356                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3357                    variable: "a".to_string(),
3358                    label: Some("Person".to_string()),
3359                    input: None,
3360                })),
3361                path_alias: None,
3362            })),
3363        }));
3364
3365        let physical = planner.plan(&logical).unwrap();
3366        // The return should have columns [a, b]
3367        assert!(physical.columns().contains(&"a".to_string()));
3368        assert!(physical.columns().contains(&"b".to_string()));
3369    }
3370
3371    #[test]
3372    fn test_plan_expand_with_edge_variable() {
3373        let store = create_test_store();
3374        let planner = Planner::new(store);
3375
3376        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
3377        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3378            items: vec![
3379                ReturnItem {
3380                    expression: LogicalExpression::Variable("a".to_string()),
3381                    alias: None,
3382                },
3383                ReturnItem {
3384                    expression: LogicalExpression::Variable("r".to_string()),
3385                    alias: None,
3386                },
3387                ReturnItem {
3388                    expression: LogicalExpression::Variable("b".to_string()),
3389                    alias: None,
3390                },
3391            ],
3392            distinct: false,
3393            input: Box::new(LogicalOperator::Expand(ExpandOp {
3394                from_variable: "a".to_string(),
3395                to_variable: "b".to_string(),
3396                edge_variable: Some("r".to_string()),
3397                direction: ExpandDirection::Outgoing,
3398                edge_type: Some("KNOWS".to_string()),
3399                min_hops: 1,
3400                max_hops: Some(1),
3401                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3402                    variable: "a".to_string(),
3403                    label: None,
3404                    input: None,
3405                })),
3406                path_alias: None,
3407            })),
3408        }));
3409
3410        let physical = planner.plan(&logical).unwrap();
3411        assert!(physical.columns().contains(&"a".to_string()));
3412        assert!(physical.columns().contains(&"r".to_string()));
3413        assert!(physical.columns().contains(&"b".to_string()));
3414    }
3415
3416    // ==================== Limit/Skip/Sort Tests ====================
3417
3418    #[test]
3419    fn test_plan_limit() {
3420        let store = create_test_store();
3421        let planner = Planner::new(store);
3422
3423        // MATCH (n) RETURN n LIMIT 10
3424        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3425            items: vec![ReturnItem {
3426                expression: LogicalExpression::Variable("n".to_string()),
3427                alias: None,
3428            }],
3429            distinct: false,
3430            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
3431                count: 10,
3432                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3433                    variable: "n".to_string(),
3434                    label: None,
3435                    input: None,
3436                })),
3437            })),
3438        }));
3439
3440        let physical = planner.plan(&logical).unwrap();
3441        assert_eq!(physical.columns(), &["n"]);
3442    }
3443
3444    #[test]
3445    fn test_plan_skip() {
3446        let store = create_test_store();
3447        let planner = Planner::new(store);
3448
3449        // MATCH (n) RETURN n SKIP 5
3450        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3451            items: vec![ReturnItem {
3452                expression: LogicalExpression::Variable("n".to_string()),
3453                alias: None,
3454            }],
3455            distinct: false,
3456            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
3457                count: 5,
3458                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3459                    variable: "n".to_string(),
3460                    label: None,
3461                    input: None,
3462                })),
3463            })),
3464        }));
3465
3466        let physical = planner.plan(&logical).unwrap();
3467        assert_eq!(physical.columns(), &["n"]);
3468    }
3469
3470    #[test]
3471    fn test_plan_sort() {
3472        let store = create_test_store();
3473        let planner = Planner::new(store);
3474
3475        // MATCH (n) RETURN n ORDER BY n.name ASC
3476        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3477            items: vec![ReturnItem {
3478                expression: LogicalExpression::Variable("n".to_string()),
3479                alias: None,
3480            }],
3481            distinct: false,
3482            input: Box::new(LogicalOperator::Sort(SortOp {
3483                keys: vec![SortKey {
3484                    expression: LogicalExpression::Variable("n".to_string()),
3485                    order: SortOrder::Ascending,
3486                }],
3487                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3488                    variable: "n".to_string(),
3489                    label: None,
3490                    input: None,
3491                })),
3492            })),
3493        }));
3494
3495        let physical = planner.plan(&logical).unwrap();
3496        assert_eq!(physical.columns(), &["n"]);
3497    }
3498
3499    #[test]
3500    fn test_plan_sort_descending() {
3501        let store = create_test_store();
3502        let planner = Planner::new(store);
3503
3504        // ORDER BY n DESC
3505        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3506            items: vec![ReturnItem {
3507                expression: LogicalExpression::Variable("n".to_string()),
3508                alias: None,
3509            }],
3510            distinct: false,
3511            input: Box::new(LogicalOperator::Sort(SortOp {
3512                keys: vec![SortKey {
3513                    expression: LogicalExpression::Variable("n".to_string()),
3514                    order: SortOrder::Descending,
3515                }],
3516                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3517                    variable: "n".to_string(),
3518                    label: None,
3519                    input: None,
3520                })),
3521            })),
3522        }));
3523
3524        let physical = planner.plan(&logical).unwrap();
3525        assert_eq!(physical.columns(), &["n"]);
3526    }
3527
3528    #[test]
3529    fn test_plan_distinct() {
3530        let store = create_test_store();
3531        let planner = Planner::new(store);
3532
3533        // MATCH (n) RETURN DISTINCT n
3534        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3535            items: vec![ReturnItem {
3536                expression: LogicalExpression::Variable("n".to_string()),
3537                alias: None,
3538            }],
3539            distinct: false,
3540            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
3541                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3542                    variable: "n".to_string(),
3543                    label: None,
3544                    input: None,
3545                })),
3546                columns: None,
3547            })),
3548        }));
3549
3550        let physical = planner.plan(&logical).unwrap();
3551        assert_eq!(physical.columns(), &["n"]);
3552    }
3553
3554    // ==================== Aggregate Tests ====================
3555
3556    #[test]
3557    fn test_plan_aggregate_count() {
3558        let store = create_test_store();
3559        let planner = Planner::new(store);
3560
3561        // MATCH (n) RETURN count(n)
3562        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3563            items: vec![ReturnItem {
3564                expression: LogicalExpression::Variable("cnt".to_string()),
3565                alias: None,
3566            }],
3567            distinct: false,
3568            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
3569                group_by: vec![],
3570                aggregates: vec![LogicalAggregateExpr {
3571                    function: LogicalAggregateFunction::Count,
3572                    expression: Some(LogicalExpression::Variable("n".to_string())),
3573                    distinct: false,
3574                    alias: Some("cnt".to_string()),
3575                    percentile: None,
3576                }],
3577                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3578                    variable: "n".to_string(),
3579                    label: None,
3580                    input: None,
3581                })),
3582                having: None,
3583            })),
3584        }));
3585
3586        let physical = planner.plan(&logical).unwrap();
3587        assert!(physical.columns().contains(&"cnt".to_string()));
3588    }
3589
3590    #[test]
3591    fn test_plan_aggregate_with_group_by() {
3592        let store = create_test_store();
3593        let planner = Planner::new(store);
3594
3595        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
3596        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3597            group_by: vec![LogicalExpression::Property {
3598                variable: "n".to_string(),
3599                property: "city".to_string(),
3600            }],
3601            aggregates: vec![LogicalAggregateExpr {
3602                function: LogicalAggregateFunction::Count,
3603                expression: Some(LogicalExpression::Variable("n".to_string())),
3604                distinct: false,
3605                alias: Some("cnt".to_string()),
3606                percentile: None,
3607            }],
3608            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3609                variable: "n".to_string(),
3610                label: Some("Person".to_string()),
3611                input: None,
3612            })),
3613            having: None,
3614        }));
3615
3616        let physical = planner.plan(&logical).unwrap();
3617        assert_eq!(physical.columns().len(), 2);
3618    }
3619
3620    #[test]
3621    fn test_plan_aggregate_sum() {
3622        let store = create_test_store();
3623        let planner = Planner::new(store);
3624
3625        // SUM(n.value)
3626        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3627            group_by: vec![],
3628            aggregates: vec![LogicalAggregateExpr {
3629                function: LogicalAggregateFunction::Sum,
3630                expression: Some(LogicalExpression::Property {
3631                    variable: "n".to_string(),
3632                    property: "value".to_string(),
3633                }),
3634                distinct: false,
3635                alias: Some("total".to_string()),
3636                percentile: None,
3637            }],
3638            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3639                variable: "n".to_string(),
3640                label: None,
3641                input: None,
3642            })),
3643            having: None,
3644        }));
3645
3646        let physical = planner.plan(&logical).unwrap();
3647        assert!(physical.columns().contains(&"total".to_string()));
3648    }
3649
3650    #[test]
3651    fn test_plan_aggregate_avg() {
3652        let store = create_test_store();
3653        let planner = Planner::new(store);
3654
3655        // AVG(n.score)
3656        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3657            group_by: vec![],
3658            aggregates: vec![LogicalAggregateExpr {
3659                function: LogicalAggregateFunction::Avg,
3660                expression: Some(LogicalExpression::Property {
3661                    variable: "n".to_string(),
3662                    property: "score".to_string(),
3663                }),
3664                distinct: false,
3665                alias: Some("average".to_string()),
3666                percentile: None,
3667            }],
3668            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3669                variable: "n".to_string(),
3670                label: None,
3671                input: None,
3672            })),
3673            having: None,
3674        }));
3675
3676        let physical = planner.plan(&logical).unwrap();
3677        assert!(physical.columns().contains(&"average".to_string()));
3678    }
3679
3680    #[test]
3681    fn test_plan_aggregate_min_max() {
3682        let store = create_test_store();
3683        let planner = Planner::new(store);
3684
3685        // MIN(n.age), MAX(n.age)
3686        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
3687            group_by: vec![],
3688            aggregates: vec![
3689                LogicalAggregateExpr {
3690                    function: LogicalAggregateFunction::Min,
3691                    expression: Some(LogicalExpression::Property {
3692                        variable: "n".to_string(),
3693                        property: "age".to_string(),
3694                    }),
3695                    distinct: false,
3696                    alias: Some("youngest".to_string()),
3697                    percentile: None,
3698                },
3699                LogicalAggregateExpr {
3700                    function: LogicalAggregateFunction::Max,
3701                    expression: Some(LogicalExpression::Property {
3702                        variable: "n".to_string(),
3703                        property: "age".to_string(),
3704                    }),
3705                    distinct: false,
3706                    alias: Some("oldest".to_string()),
3707                    percentile: None,
3708                },
3709            ],
3710            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3711                variable: "n".to_string(),
3712                label: None,
3713                input: None,
3714            })),
3715            having: None,
3716        }));
3717
3718        let physical = planner.plan(&logical).unwrap();
3719        assert!(physical.columns().contains(&"youngest".to_string()));
3720        assert!(physical.columns().contains(&"oldest".to_string()));
3721    }
3722
3723    // ==================== Join Tests ====================
3724
3725    #[test]
3726    fn test_plan_inner_join() {
3727        let store = create_test_store();
3728        let planner = Planner::new(store);
3729
3730        // Inner join between two scans
3731        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3732            items: vec![
3733                ReturnItem {
3734                    expression: LogicalExpression::Variable("a".to_string()),
3735                    alias: None,
3736                },
3737                ReturnItem {
3738                    expression: LogicalExpression::Variable("b".to_string()),
3739                    alias: None,
3740                },
3741            ],
3742            distinct: false,
3743            input: Box::new(LogicalOperator::Join(JoinOp {
3744                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3745                    variable: "a".to_string(),
3746                    label: Some("Person".to_string()),
3747                    input: None,
3748                })),
3749                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3750                    variable: "b".to_string(),
3751                    label: Some("Company".to_string()),
3752                    input: None,
3753                })),
3754                join_type: JoinType::Inner,
3755                conditions: vec![JoinCondition {
3756                    left: LogicalExpression::Variable("a".to_string()),
3757                    right: LogicalExpression::Variable("b".to_string()),
3758                }],
3759            })),
3760        }));
3761
3762        let physical = planner.plan(&logical).unwrap();
3763        assert!(physical.columns().contains(&"a".to_string()));
3764        assert!(physical.columns().contains(&"b".to_string()));
3765    }
3766
3767    #[test]
3768    fn test_plan_cross_join() {
3769        let store = create_test_store();
3770        let planner = Planner::new(store);
3771
3772        // Cross join (no conditions)
3773        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3774            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3775                variable: "a".to_string(),
3776                label: None,
3777                input: None,
3778            })),
3779            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3780                variable: "b".to_string(),
3781                label: None,
3782                input: None,
3783            })),
3784            join_type: JoinType::Cross,
3785            conditions: vec![],
3786        }));
3787
3788        let physical = planner.plan(&logical).unwrap();
3789        assert_eq!(physical.columns().len(), 2);
3790    }
3791
3792    #[test]
3793    fn test_plan_left_join() {
3794        let store = create_test_store();
3795        let planner = Planner::new(store);
3796
3797        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
3798            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3799                variable: "a".to_string(),
3800                label: None,
3801                input: None,
3802            })),
3803            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3804                variable: "b".to_string(),
3805                label: None,
3806                input: None,
3807            })),
3808            join_type: JoinType::Left,
3809            conditions: vec![],
3810        }));
3811
3812        let physical = planner.plan(&logical).unwrap();
3813        assert_eq!(physical.columns().len(), 2);
3814    }
3815
3816    // ==================== Mutation Tests ====================
3817
3818    #[test]
3819    fn test_plan_create_node() {
3820        let store = create_test_store();
3821        let planner = Planner::new(store);
3822
3823        // CREATE (n:Person {name: 'Alice'})
3824        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
3825            variable: "n".to_string(),
3826            labels: vec!["Person".to_string()],
3827            properties: vec![(
3828                "name".to_string(),
3829                LogicalExpression::Literal(Value::String("Alice".into())),
3830            )],
3831            input: None,
3832        }));
3833
3834        let physical = planner.plan(&logical).unwrap();
3835        assert!(physical.columns().contains(&"n".to_string()));
3836    }
3837
3838    #[test]
3839    fn test_plan_create_edge() {
3840        let store = create_test_store();
3841        let planner = Planner::new(store);
3842
3843        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
3844        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
3845            variable: Some("r".to_string()),
3846            from_variable: "a".to_string(),
3847            to_variable: "b".to_string(),
3848            edge_type: "KNOWS".to_string(),
3849            properties: vec![],
3850            input: Box::new(LogicalOperator::Join(JoinOp {
3851                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3852                    variable: "a".to_string(),
3853                    label: None,
3854                    input: None,
3855                })),
3856                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3857                    variable: "b".to_string(),
3858                    label: None,
3859                    input: None,
3860                })),
3861                join_type: JoinType::Cross,
3862                conditions: vec![],
3863            })),
3864        }));
3865
3866        let physical = planner.plan(&logical).unwrap();
3867        assert!(physical.columns().contains(&"r".to_string()));
3868    }
3869
3870    #[test]
3871    fn test_plan_delete_node() {
3872        let store = create_test_store();
3873        let planner = Planner::new(store);
3874
3875        // MATCH (n) DELETE n
3876        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
3877            variable: "n".to_string(),
3878            detach: false,
3879            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3880                variable: "n".to_string(),
3881                label: None,
3882                input: None,
3883            })),
3884        }));
3885
3886        let physical = planner.plan(&logical).unwrap();
3887        assert!(physical.columns().contains(&"deleted_count".to_string()));
3888    }
3889
3890    // ==================== Error Cases ====================
3891
3892    #[test]
3893    fn test_plan_empty_errors() {
3894        let store = create_test_store();
3895        let planner = Planner::new(store);
3896
3897        let logical = LogicalPlan::new(LogicalOperator::Empty);
3898        let result = planner.plan(&logical);
3899        assert!(result.is_err());
3900    }
3901
3902    #[test]
3903    fn test_plan_missing_variable_in_return() {
3904        let store = create_test_store();
3905        let planner = Planner::new(store);
3906
3907        // Return variable that doesn't exist in input
3908        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
3909            items: vec![ReturnItem {
3910                expression: LogicalExpression::Variable("missing".to_string()),
3911                alias: None,
3912            }],
3913            distinct: false,
3914            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
3915                variable: "n".to_string(),
3916                label: None,
3917                input: None,
3918            })),
3919        }));
3920
3921        let result = planner.plan(&logical);
3922        assert!(result.is_err());
3923    }
3924
3925    // ==================== Helper Function Tests ====================
3926
3927    #[test]
3928    fn test_convert_binary_ops() {
3929        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
3930        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
3931        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
3932        assert!(convert_binary_op(BinaryOp::Le).is_ok());
3933        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
3934        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
3935        assert!(convert_binary_op(BinaryOp::And).is_ok());
3936        assert!(convert_binary_op(BinaryOp::Or).is_ok());
3937        assert!(convert_binary_op(BinaryOp::Add).is_ok());
3938        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
3939        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
3940        assert!(convert_binary_op(BinaryOp::Div).is_ok());
3941    }
3942
3943    #[test]
3944    fn test_convert_unary_ops() {
3945        assert!(convert_unary_op(UnaryOp::Not).is_ok());
3946        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
3947        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
3948        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
3949    }
3950
3951    #[test]
3952    fn test_convert_aggregate_functions() {
3953        assert!(matches!(
3954            convert_aggregate_function(LogicalAggregateFunction::Count),
3955            PhysicalAggregateFunction::Count
3956        ));
3957        assert!(matches!(
3958            convert_aggregate_function(LogicalAggregateFunction::Sum),
3959            PhysicalAggregateFunction::Sum
3960        ));
3961        assert!(matches!(
3962            convert_aggregate_function(LogicalAggregateFunction::Avg),
3963            PhysicalAggregateFunction::Avg
3964        ));
3965        assert!(matches!(
3966            convert_aggregate_function(LogicalAggregateFunction::Min),
3967            PhysicalAggregateFunction::Min
3968        ));
3969        assert!(matches!(
3970            convert_aggregate_function(LogicalAggregateFunction::Max),
3971            PhysicalAggregateFunction::Max
3972        ));
3973    }
3974
3975    #[test]
3976    fn test_planner_accessors() {
3977        let store = create_test_store();
3978        let planner = Planner::new(Arc::clone(&store));
3979
3980        assert!(planner.tx_id().is_none());
3981        assert!(planner.tx_manager().is_none());
3982        let _ = planner.viewing_epoch(); // Just ensure it's accessible
3983    }
3984
3985    #[test]
3986    fn test_physical_plan_accessors() {
3987        let store = create_test_store();
3988        let planner = Planner::new(store);
3989
3990        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
3991            variable: "n".to_string(),
3992            label: None,
3993            input: None,
3994        }));
3995
3996        let physical = planner.plan(&logical).unwrap();
3997        assert_eq!(physical.columns(), &["n"]);
3998
3999        // Test into_operator
4000        let _ = physical.into_operator();
4001    }
4002
4003    // ==================== Adaptive Planning Tests ====================
4004
4005    #[test]
4006    fn test_plan_adaptive_with_scan() {
4007        let store = create_test_store();
4008        let planner = Planner::new(store);
4009
4010        // MATCH (n:Person) RETURN n
4011        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4012            items: vec![ReturnItem {
4013                expression: LogicalExpression::Variable("n".to_string()),
4014                alias: None,
4015            }],
4016            distinct: false,
4017            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4018                variable: "n".to_string(),
4019                label: Some("Person".to_string()),
4020                input: None,
4021            })),
4022        }));
4023
4024        let physical = planner.plan_adaptive(&logical).unwrap();
4025        assert_eq!(physical.columns(), &["n"]);
4026        // Should have adaptive context with estimates
4027        assert!(physical.adaptive_context.is_some());
4028    }
4029
4030    #[test]
4031    fn test_plan_adaptive_with_filter() {
4032        let store = create_test_store();
4033        let planner = Planner::new(store);
4034
4035        // MATCH (n) WHERE n.age > 30 RETURN n
4036        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4037            items: vec![ReturnItem {
4038                expression: LogicalExpression::Variable("n".to_string()),
4039                alias: None,
4040            }],
4041            distinct: false,
4042            input: Box::new(LogicalOperator::Filter(FilterOp {
4043                predicate: LogicalExpression::Binary {
4044                    left: Box::new(LogicalExpression::Property {
4045                        variable: "n".to_string(),
4046                        property: "age".to_string(),
4047                    }),
4048                    op: BinaryOp::Gt,
4049                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
4050                },
4051                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4052                    variable: "n".to_string(),
4053                    label: None,
4054                    input: None,
4055                })),
4056            })),
4057        }));
4058
4059        let physical = planner.plan_adaptive(&logical).unwrap();
4060        assert!(physical.adaptive_context.is_some());
4061    }
4062
4063    #[test]
4064    fn test_plan_adaptive_with_expand() {
4065        let store = create_test_store();
4066        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4067
4068        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
4069        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4070            items: vec![
4071                ReturnItem {
4072                    expression: LogicalExpression::Variable("a".to_string()),
4073                    alias: None,
4074                },
4075                ReturnItem {
4076                    expression: LogicalExpression::Variable("b".to_string()),
4077                    alias: None,
4078                },
4079            ],
4080            distinct: false,
4081            input: Box::new(LogicalOperator::Expand(ExpandOp {
4082                from_variable: "a".to_string(),
4083                to_variable: "b".to_string(),
4084                edge_variable: None,
4085                direction: ExpandDirection::Outgoing,
4086                edge_type: Some("KNOWS".to_string()),
4087                min_hops: 1,
4088                max_hops: Some(1),
4089                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4090                    variable: "a".to_string(),
4091                    label: None,
4092                    input: None,
4093                })),
4094                path_alias: None,
4095            })),
4096        }));
4097
4098        let physical = planner.plan_adaptive(&logical).unwrap();
4099        assert!(physical.adaptive_context.is_some());
4100    }
4101
4102    #[test]
4103    fn test_plan_adaptive_with_join() {
4104        let store = create_test_store();
4105        let planner = Planner::new(store);
4106
4107        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4108            items: vec![
4109                ReturnItem {
4110                    expression: LogicalExpression::Variable("a".to_string()),
4111                    alias: None,
4112                },
4113                ReturnItem {
4114                    expression: LogicalExpression::Variable("b".to_string()),
4115                    alias: None,
4116                },
4117            ],
4118            distinct: false,
4119            input: Box::new(LogicalOperator::Join(JoinOp {
4120                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4121                    variable: "a".to_string(),
4122                    label: None,
4123                    input: None,
4124                })),
4125                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4126                    variable: "b".to_string(),
4127                    label: None,
4128                    input: None,
4129                })),
4130                join_type: JoinType::Cross,
4131                conditions: vec![],
4132            })),
4133        }));
4134
4135        let physical = planner.plan_adaptive(&logical).unwrap();
4136        assert!(physical.adaptive_context.is_some());
4137    }
4138
4139    #[test]
4140    fn test_plan_adaptive_with_aggregate() {
4141        let store = create_test_store();
4142        let planner = Planner::new(store);
4143
4144        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
4145            group_by: vec![],
4146            aggregates: vec![LogicalAggregateExpr {
4147                function: LogicalAggregateFunction::Count,
4148                expression: Some(LogicalExpression::Variable("n".to_string())),
4149                distinct: false,
4150                alias: Some("cnt".to_string()),
4151                percentile: None,
4152            }],
4153            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4154                variable: "n".to_string(),
4155                label: None,
4156                input: None,
4157            })),
4158            having: None,
4159        }));
4160
4161        let physical = planner.plan_adaptive(&logical).unwrap();
4162        assert!(physical.adaptive_context.is_some());
4163    }
4164
4165    #[test]
4166    fn test_plan_adaptive_with_distinct() {
4167        let store = create_test_store();
4168        let planner = Planner::new(store);
4169
4170        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4171            items: vec![ReturnItem {
4172                expression: LogicalExpression::Variable("n".to_string()),
4173                alias: None,
4174            }],
4175            distinct: false,
4176            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
4177                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4178                    variable: "n".to_string(),
4179                    label: None,
4180                    input: None,
4181                })),
4182                columns: None,
4183            })),
4184        }));
4185
4186        let physical = planner.plan_adaptive(&logical).unwrap();
4187        assert!(physical.adaptive_context.is_some());
4188    }
4189
4190    #[test]
4191    fn test_plan_adaptive_with_limit() {
4192        let store = create_test_store();
4193        let planner = Planner::new(store);
4194
4195        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4196            items: vec![ReturnItem {
4197                expression: LogicalExpression::Variable("n".to_string()),
4198                alias: None,
4199            }],
4200            distinct: false,
4201            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
4202                count: 10,
4203                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4204                    variable: "n".to_string(),
4205                    label: None,
4206                    input: None,
4207                })),
4208            })),
4209        }));
4210
4211        let physical = planner.plan_adaptive(&logical).unwrap();
4212        assert!(physical.adaptive_context.is_some());
4213    }
4214
4215    #[test]
4216    fn test_plan_adaptive_with_skip() {
4217        let store = create_test_store();
4218        let planner = Planner::new(store);
4219
4220        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4221            items: vec![ReturnItem {
4222                expression: LogicalExpression::Variable("n".to_string()),
4223                alias: None,
4224            }],
4225            distinct: false,
4226            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
4227                count: 5,
4228                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4229                    variable: "n".to_string(),
4230                    label: None,
4231                    input: None,
4232                })),
4233            })),
4234        }));
4235
4236        let physical = planner.plan_adaptive(&logical).unwrap();
4237        assert!(physical.adaptive_context.is_some());
4238    }
4239
4240    #[test]
4241    fn test_plan_adaptive_with_sort() {
4242        let store = create_test_store();
4243        let planner = Planner::new(store);
4244
4245        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4246            items: vec![ReturnItem {
4247                expression: LogicalExpression::Variable("n".to_string()),
4248                alias: None,
4249            }],
4250            distinct: false,
4251            input: Box::new(LogicalOperator::Sort(SortOp {
4252                keys: vec![SortKey {
4253                    expression: LogicalExpression::Variable("n".to_string()),
4254                    order: SortOrder::Ascending,
4255                }],
4256                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4257                    variable: "n".to_string(),
4258                    label: None,
4259                    input: None,
4260                })),
4261            })),
4262        }));
4263
4264        let physical = planner.plan_adaptive(&logical).unwrap();
4265        assert!(physical.adaptive_context.is_some());
4266    }
4267
4268    #[test]
4269    fn test_plan_adaptive_with_union() {
4270        let store = create_test_store();
4271        let planner = Planner::new(store);
4272
4273        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4274            items: vec![ReturnItem {
4275                expression: LogicalExpression::Variable("n".to_string()),
4276                alias: None,
4277            }],
4278            distinct: false,
4279            input: Box::new(LogicalOperator::Union(UnionOp {
4280                inputs: vec![
4281                    LogicalOperator::NodeScan(NodeScanOp {
4282                        variable: "n".to_string(),
4283                        label: Some("Person".to_string()),
4284                        input: None,
4285                    }),
4286                    LogicalOperator::NodeScan(NodeScanOp {
4287                        variable: "n".to_string(),
4288                        label: Some("Company".to_string()),
4289                        input: None,
4290                    }),
4291                ],
4292            })),
4293        }));
4294
4295        let physical = planner.plan_adaptive(&logical).unwrap();
4296        assert!(physical.adaptive_context.is_some());
4297    }
4298
4299    // ==================== Variable Length Path Tests ====================
4300
4301    #[test]
4302    fn test_plan_expand_variable_length() {
4303        let store = create_test_store();
4304        let planner = Planner::new(store);
4305
4306        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
4307        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4308            items: vec![
4309                ReturnItem {
4310                    expression: LogicalExpression::Variable("a".to_string()),
4311                    alias: None,
4312                },
4313                ReturnItem {
4314                    expression: LogicalExpression::Variable("b".to_string()),
4315                    alias: None,
4316                },
4317            ],
4318            distinct: false,
4319            input: Box::new(LogicalOperator::Expand(ExpandOp {
4320                from_variable: "a".to_string(),
4321                to_variable: "b".to_string(),
4322                edge_variable: None,
4323                direction: ExpandDirection::Outgoing,
4324                edge_type: Some("KNOWS".to_string()),
4325                min_hops: 1,
4326                max_hops: Some(3),
4327                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4328                    variable: "a".to_string(),
4329                    label: None,
4330                    input: None,
4331                })),
4332                path_alias: None,
4333            })),
4334        }));
4335
4336        let physical = planner.plan(&logical).unwrap();
4337        assert!(physical.columns().contains(&"a".to_string()));
4338        assert!(physical.columns().contains(&"b".to_string()));
4339    }
4340
4341    #[test]
4342    fn test_plan_expand_with_path_alias() {
4343        let store = create_test_store();
4344        let planner = Planner::new(store);
4345
4346        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
4347        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4348            items: vec![
4349                ReturnItem {
4350                    expression: LogicalExpression::Variable("a".to_string()),
4351                    alias: None,
4352                },
4353                ReturnItem {
4354                    expression: LogicalExpression::Variable("b".to_string()),
4355                    alias: None,
4356                },
4357            ],
4358            distinct: false,
4359            input: Box::new(LogicalOperator::Expand(ExpandOp {
4360                from_variable: "a".to_string(),
4361                to_variable: "b".to_string(),
4362                edge_variable: None,
4363                direction: ExpandDirection::Outgoing,
4364                edge_type: Some("KNOWS".to_string()),
4365                min_hops: 1,
4366                max_hops: Some(3),
4367                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4368                    variable: "a".to_string(),
4369                    label: None,
4370                    input: None,
4371                })),
4372                path_alias: Some("p".to_string()),
4373            })),
4374        }));
4375
4376        let physical = planner.plan(&logical).unwrap();
4377        // Verify plan was created successfully with expected output columns
4378        assert!(physical.columns().contains(&"a".to_string()));
4379        assert!(physical.columns().contains(&"b".to_string()));
4380    }
4381
4382    #[test]
4383    fn test_plan_expand_incoming() {
4384        let store = create_test_store();
4385        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4386
4387        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
4388        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4389            items: vec![
4390                ReturnItem {
4391                    expression: LogicalExpression::Variable("a".to_string()),
4392                    alias: None,
4393                },
4394                ReturnItem {
4395                    expression: LogicalExpression::Variable("b".to_string()),
4396                    alias: None,
4397                },
4398            ],
4399            distinct: false,
4400            input: Box::new(LogicalOperator::Expand(ExpandOp {
4401                from_variable: "a".to_string(),
4402                to_variable: "b".to_string(),
4403                edge_variable: None,
4404                direction: ExpandDirection::Incoming,
4405                edge_type: Some("KNOWS".to_string()),
4406                min_hops: 1,
4407                max_hops: Some(1),
4408                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4409                    variable: "a".to_string(),
4410                    label: None,
4411                    input: None,
4412                })),
4413                path_alias: None,
4414            })),
4415        }));
4416
4417        let physical = planner.plan(&logical).unwrap();
4418        assert!(physical.columns().contains(&"a".to_string()));
4419        assert!(physical.columns().contains(&"b".to_string()));
4420    }
4421
4422    #[test]
4423    fn test_plan_expand_both_directions() {
4424        let store = create_test_store();
4425        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4426
4427        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
4428        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4429            items: vec![
4430                ReturnItem {
4431                    expression: LogicalExpression::Variable("a".to_string()),
4432                    alias: None,
4433                },
4434                ReturnItem {
4435                    expression: LogicalExpression::Variable("b".to_string()),
4436                    alias: None,
4437                },
4438            ],
4439            distinct: false,
4440            input: Box::new(LogicalOperator::Expand(ExpandOp {
4441                from_variable: "a".to_string(),
4442                to_variable: "b".to_string(),
4443                edge_variable: None,
4444                direction: ExpandDirection::Both,
4445                edge_type: Some("KNOWS".to_string()),
4446                min_hops: 1,
4447                max_hops: Some(1),
4448                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4449                    variable: "a".to_string(),
4450                    label: None,
4451                    input: None,
4452                })),
4453                path_alias: None,
4454            })),
4455        }));
4456
4457        let physical = planner.plan(&logical).unwrap();
4458        assert!(physical.columns().contains(&"a".to_string()));
4459        assert!(physical.columns().contains(&"b".to_string()));
4460    }
4461
4462    // ==================== With Context Tests ====================
4463
4464    #[test]
4465    fn test_planner_with_context() {
4466        use crate::transaction::TransactionManager;
4467
4468        let store = create_test_store();
4469        let tx_manager = Arc::new(TransactionManager::new());
4470        let tx_id = tx_manager.begin();
4471        let epoch = tx_manager.current_epoch();
4472
4473        let planner = Planner::with_context(
4474            Arc::clone(&store),
4475            Arc::clone(&tx_manager),
4476            Some(tx_id),
4477            epoch,
4478        );
4479
4480        assert_eq!(planner.tx_id(), Some(tx_id));
4481        assert!(planner.tx_manager().is_some());
4482        assert_eq!(planner.viewing_epoch(), epoch);
4483    }
4484
4485    #[test]
4486    fn test_planner_with_factorized_execution_disabled() {
4487        let store = create_test_store();
4488        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
4489
4490        // Two consecutive expands - should NOT use factorized execution
4491        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4492            items: vec![
4493                ReturnItem {
4494                    expression: LogicalExpression::Variable("a".to_string()),
4495                    alias: None,
4496                },
4497                ReturnItem {
4498                    expression: LogicalExpression::Variable("c".to_string()),
4499                    alias: None,
4500                },
4501            ],
4502            distinct: false,
4503            input: Box::new(LogicalOperator::Expand(ExpandOp {
4504                from_variable: "b".to_string(),
4505                to_variable: "c".to_string(),
4506                edge_variable: None,
4507                direction: ExpandDirection::Outgoing,
4508                edge_type: None,
4509                min_hops: 1,
4510                max_hops: Some(1),
4511                input: Box::new(LogicalOperator::Expand(ExpandOp {
4512                    from_variable: "a".to_string(),
4513                    to_variable: "b".to_string(),
4514                    edge_variable: None,
4515                    direction: ExpandDirection::Outgoing,
4516                    edge_type: None,
4517                    min_hops: 1,
4518                    max_hops: Some(1),
4519                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4520                        variable: "a".to_string(),
4521                        label: None,
4522                        input: None,
4523                    })),
4524                    path_alias: None,
4525                })),
4526                path_alias: None,
4527            })),
4528        }));
4529
4530        let physical = planner.plan(&logical).unwrap();
4531        assert!(physical.columns().contains(&"a".to_string()));
4532        assert!(physical.columns().contains(&"c".to_string()));
4533    }
4534
4535    // ==================== Sort with Property Tests ====================
4536
4537    #[test]
4538    fn test_plan_sort_by_property() {
4539        let store = create_test_store();
4540        let planner = Planner::new(store);
4541
4542        // MATCH (n) RETURN n ORDER BY n.name ASC
4543        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4544            items: vec![ReturnItem {
4545                expression: LogicalExpression::Variable("n".to_string()),
4546                alias: None,
4547            }],
4548            distinct: false,
4549            input: Box::new(LogicalOperator::Sort(SortOp {
4550                keys: vec![SortKey {
4551                    expression: LogicalExpression::Property {
4552                        variable: "n".to_string(),
4553                        property: "name".to_string(),
4554                    },
4555                    order: SortOrder::Ascending,
4556                }],
4557                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4558                    variable: "n".to_string(),
4559                    label: None,
4560                    input: None,
4561                })),
4562            })),
4563        }));
4564
4565        let physical = planner.plan(&logical).unwrap();
4566        // Should have the property column projected
4567        assert!(physical.columns().contains(&"n".to_string()));
4568    }
4569
4570    // ==================== Scan with Input Tests ====================
4571
4572    #[test]
4573    fn test_plan_scan_with_input() {
4574        let store = create_test_store();
4575        let planner = Planner::new(store);
4576
4577        // A scan with another scan as input (for chained patterns)
4578        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
4579            items: vec![
4580                ReturnItem {
4581                    expression: LogicalExpression::Variable("a".to_string()),
4582                    alias: None,
4583                },
4584                ReturnItem {
4585                    expression: LogicalExpression::Variable("b".to_string()),
4586                    alias: None,
4587                },
4588            ],
4589            distinct: false,
4590            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
4591                variable: "b".to_string(),
4592                label: Some("Company".to_string()),
4593                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
4594                    variable: "a".to_string(),
4595                    label: Some("Person".to_string()),
4596                    input: None,
4597                }))),
4598            })),
4599        }));
4600
4601        let physical = planner.plan(&logical).unwrap();
4602        assert!(physical.columns().contains(&"a".to_string()));
4603        assert!(physical.columns().contains(&"b".to_string()));
4604    }
4605}