Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TransactionId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33    EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34    FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35    HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37    LoadCsvOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
38    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
39    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
40    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
41    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnwindOperator,
42    VariableLengthExpandOperator,
43};
44use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
45use std::collections::HashMap;
46use std::sync::Arc;
47
48use crate::query::planner::common;
49use crate::query::planner::common::expression_to_string;
50use crate::query::planner::{
51    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
52    convert_unary_op, value_to_logical_type,
53};
54use crate::transaction::TransactionManager;
55
56/// Range bounds for property-based range queries.
57struct RangeBounds<'a> {
58    min: Option<&'a Value>,
59    max: Option<&'a Value>,
60    min_inclusive: bool,
61    max_inclusive: bool,
62}
63
64/// Converts a logical plan to a physical operator tree for LPG stores.
65pub struct Planner {
66    /// The graph store (supports both read and write operations).
67    pub(super) store: Arc<dyn GraphStoreMut>,
68    /// Transaction manager for MVCC operations.
69    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
70    /// Current transaction ID (if in a transaction).
71    pub(super) transaction_id: Option<TransactionId>,
72    /// Epoch to use for visibility checks.
73    pub(super) viewing_epoch: EpochId,
74    /// Counter for generating unique anonymous edge column names.
75    pub(super) anon_edge_counter: std::cell::Cell<u32>,
76    /// Whether to use factorized execution for multi-hop queries.
77    pub(super) factorized_execution: bool,
78    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
79    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
80    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
81    /// Variables that hold edge IDs (from MATCH edge patterns).
82    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
83    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84    /// Optional constraint validator for schema enforcement during mutations.
85    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
86    /// Catalog for user-defined procedure lookup.
87    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
88    /// Shared parameter state for the currently planning correlated Apply.
89    /// Set by `plan_apply` before planning the inner operator, consumed by
90    /// `plan_operator` when encountering `ParameterScan`.
91    pub(super) correlated_param_state:
92        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
93    /// Variables from variable-length expand patterns (group-list variables).
94    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
95    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
96    /// When true, each physical operator is wrapped in `ProfiledOperator`.
97    profiling: std::cell::Cell<bool>,
98    /// Profile entries collected during planning (post-order).
99    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
100}
101
102impl Planner {
103    /// Creates a new planner with the given store.
104    ///
105    /// This creates a planner without transaction context, using the current
106    /// epoch from the store for visibility.
107    #[must_use]
108    pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
109        let epoch = store.current_epoch();
110        Self {
111            store,
112            transaction_manager: None,
113            transaction_id: None,
114            viewing_epoch: epoch,
115            anon_edge_counter: std::cell::Cell::new(0),
116            factorized_execution: true,
117            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
118            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
119            validator: None,
120            catalog: None,
121            correlated_param_state: std::cell::RefCell::new(None),
122            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
123            profiling: std::cell::Cell::new(false),
124            profile_entries: std::cell::RefCell::new(Vec::new()),
125        }
126    }
127
128    /// Creates a new planner with transaction context for MVCC-aware planning.
129    #[must_use]
130    pub fn with_context(
131        store: Arc<dyn GraphStoreMut>,
132        transaction_manager: Arc<TransactionManager>,
133        transaction_id: Option<TransactionId>,
134        viewing_epoch: EpochId,
135    ) -> Self {
136        Self {
137            store,
138            transaction_manager: Some(transaction_manager),
139            transaction_id,
140            viewing_epoch,
141            anon_edge_counter: std::cell::Cell::new(0),
142            factorized_execution: true,
143            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
144            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
145            validator: None,
146            catalog: None,
147            correlated_param_state: std::cell::RefCell::new(None),
148            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
149            profiling: std::cell::Cell::new(false),
150            profile_entries: std::cell::RefCell::new(Vec::new()),
151        }
152    }
153
154    /// Returns the viewing epoch for this planner.
155    #[must_use]
156    pub fn viewing_epoch(&self) -> EpochId {
157        self.viewing_epoch
158    }
159
160    /// Returns the transaction ID for this planner, if any.
161    #[must_use]
162    pub fn transaction_id(&self) -> Option<TransactionId> {
163        self.transaction_id
164    }
165
166    /// Returns a reference to the transaction manager, if available.
167    #[must_use]
168    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
169        self.transaction_manager.as_ref()
170    }
171
172    /// Enables or disables factorized execution for multi-hop queries.
173    #[must_use]
174    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
175        self.factorized_execution = enabled;
176        self
177    }
178
179    /// Sets the constraint validator for schema enforcement during mutations.
180    #[must_use]
181    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
182        self.validator = Some(validator);
183        self
184    }
185
186    /// Sets the catalog for user-defined procedure lookup.
187    #[must_use]
188    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
189        self.catalog = Some(catalog);
190        self
191    }
192
193    /// Counts consecutive single-hop expand operations.
194    ///
195    /// Returns the count and the deepest non-expand operator (the base of the chain).
196    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
197        match op {
198            LogicalOperator::Expand(expand) => {
199                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
200
201                if is_single_hop {
202                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
203                    (inner_count + 1, base)
204                } else {
205                    (0, op)
206                }
207            }
208            _ => (0, op),
209        }
210    }
211
212    /// Collects expand operations from the outermost down to the base.
213    ///
214    /// Returns expands in order from innermost (base) to outermost.
215    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
216        let mut chain = Vec::new();
217        let mut current = op;
218
219        while let LogicalOperator::Expand(expand) = current {
220            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
221            if !is_single_hop {
222                break;
223            }
224            chain.push(expand);
225            current = &expand.input;
226        }
227
228        chain.reverse();
229        chain
230    }
231
232    /// Plans a logical plan into a physical operator.
233    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
234        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
235        Ok(PhysicalPlan {
236            operator,
237            columns,
238            adaptive_context: None,
239        })
240    }
241
242    /// Plans a logical plan with profiling: each physical operator is wrapped
243    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
244    /// collect row counts and timing. Returns the physical plan together with
245    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
246    /// items in post-order (children before parents).
247    pub fn plan_profiled(
248        &self,
249        logical_plan: &LogicalPlan,
250    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
251        self.profiling.set(true);
252        self.profile_entries.borrow_mut().clear();
253
254        let result = self.plan_operator(&logical_plan.root);
255
256        self.profiling.set(false);
257        let (operator, columns) = result?;
258        let entries = self.profile_entries.borrow_mut().drain(..).collect();
259
260        Ok((
261            PhysicalPlan {
262                operator,
263                columns,
264                adaptive_context: None,
265            },
266            entries,
267        ))
268    }
269
270    /// Plans a logical plan with adaptive execution support.
271    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
272        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
273
274        let mut adaptive_context = AdaptiveContext::new();
275        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
276
277        Ok(PhysicalPlan {
278            operator,
279            columns,
280            adaptive_context: Some(adaptive_context),
281        })
282    }
283
284    /// Collects cardinality estimates from the logical plan into an adaptive context.
285    fn collect_cardinality_estimates(
286        &self,
287        op: &LogicalOperator,
288        ctx: &mut AdaptiveContext,
289        depth: usize,
290    ) {
291        match op {
292            LogicalOperator::NodeScan(scan) => {
293                let estimate = if let Some(label) = &scan.label {
294                    self.store.nodes_by_label(label).len() as f64
295                } else {
296                    self.store.node_count() as f64
297                };
298                let id = format!("scan_{}", scan.variable);
299                ctx.set_estimate(&id, estimate);
300
301                if let Some(input) = &scan.input {
302                    self.collect_cardinality_estimates(input, ctx, depth + 1);
303                }
304            }
305            LogicalOperator::Filter(filter) => {
306                let input_estimate = self.estimate_cardinality(&filter.input);
307                let estimate = input_estimate * 0.3;
308                let id = format!("filter_{depth}");
309                ctx.set_estimate(&id, estimate);
310
311                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
312            }
313            LogicalOperator::Expand(expand) => {
314                let input_estimate = self.estimate_cardinality(&expand.input);
315                let stats = self.store.statistics();
316                let avg_degree = self.estimate_expand_degree(&stats, expand);
317                let estimate = input_estimate * avg_degree;
318                let id = format!("expand_{}", expand.to_variable);
319                ctx.set_estimate(&id, estimate);
320
321                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
322            }
323            LogicalOperator::Join(join) => {
324                let left_est = self.estimate_cardinality(&join.left);
325                let right_est = self.estimate_cardinality(&join.right);
326                let estimate = (left_est * right_est).sqrt();
327                let id = format!("join_{depth}");
328                ctx.set_estimate(&id, estimate);
329
330                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
331                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
332            }
333            LogicalOperator::Aggregate(agg) => {
334                let input_estimate = self.estimate_cardinality(&agg.input);
335                let estimate = if agg.group_by.is_empty() {
336                    1.0
337                } else {
338                    (input_estimate * 0.1).max(1.0)
339                };
340                let id = format!("aggregate_{depth}");
341                ctx.set_estimate(&id, estimate);
342
343                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
344            }
345            LogicalOperator::Distinct(distinct) => {
346                let input_estimate = self.estimate_cardinality(&distinct.input);
347                let estimate = (input_estimate * 0.5).max(1.0);
348                let id = format!("distinct_{depth}");
349                ctx.set_estimate(&id, estimate);
350
351                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
352            }
353            LogicalOperator::Return(ret) => {
354                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
355            }
356            LogicalOperator::Limit(limit) => {
357                let input_estimate = self.estimate_cardinality(&limit.input);
358                let estimate = (input_estimate).min(limit.count.estimate());
359                let id = format!("limit_{depth}");
360                ctx.set_estimate(&id, estimate);
361
362                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
363            }
364            LogicalOperator::Skip(skip) => {
365                let input_estimate = self.estimate_cardinality(&skip.input);
366                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
367                let id = format!("skip_{depth}");
368                ctx.set_estimate(&id, estimate);
369
370                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
371            }
372            LogicalOperator::Sort(sort) => {
373                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
374            }
375            LogicalOperator::Union(union) => {
376                let estimate: f64 = union
377                    .inputs
378                    .iter()
379                    .map(|input| self.estimate_cardinality(input))
380                    .sum();
381                let id = format!("union_{depth}");
382                ctx.set_estimate(&id, estimate);
383
384                for input in &union.inputs {
385                    self.collect_cardinality_estimates(input, ctx, depth + 1);
386                }
387            }
388            _ => {
389                // For other operators, try to recurse into known input patterns
390            }
391        }
392    }
393
394    /// Estimates cardinality for a logical operator subtree.
395    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
396        match op {
397            LogicalOperator::NodeScan(scan) => {
398                if let Some(label) = &scan.label {
399                    self.store.nodes_by_label(label).len() as f64
400                } else {
401                    self.store.node_count() as f64
402                }
403            }
404            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
405            LogicalOperator::Expand(expand) => {
406                let stats = self.store.statistics();
407                let avg_degree = self.estimate_expand_degree(&stats, expand);
408                self.estimate_cardinality(&expand.input) * avg_degree
409            }
410            LogicalOperator::Join(join) => {
411                let left = self.estimate_cardinality(&join.left);
412                let right = self.estimate_cardinality(&join.right);
413                (left * right).sqrt()
414            }
415            LogicalOperator::Aggregate(agg) => {
416                if agg.group_by.is_empty() {
417                    1.0
418                } else {
419                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
420                }
421            }
422            LogicalOperator::Distinct(distinct) => {
423                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
424            }
425            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
426            LogicalOperator::Limit(limit) => self
427                .estimate_cardinality(&limit.input)
428                .min(limit.count.estimate()),
429            LogicalOperator::Skip(skip) => {
430                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
431            }
432            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
433            LogicalOperator::Union(union) => union
434                .inputs
435                .iter()
436                .map(|input| self.estimate_cardinality(input))
437                .sum(),
438            LogicalOperator::Except(except) => {
439                let left = self.estimate_cardinality(&except.left);
440                let right = self.estimate_cardinality(&except.right);
441                (left - right).max(0.0)
442            }
443            LogicalOperator::Intersect(intersect) => {
444                let left = self.estimate_cardinality(&intersect.left);
445                let right = self.estimate_cardinality(&intersect.right);
446                left.min(right)
447            }
448            LogicalOperator::Otherwise(otherwise) => self
449                .estimate_cardinality(&otherwise.left)
450                .max(self.estimate_cardinality(&otherwise.right)),
451            _ => 1000.0,
452        }
453    }
454
455    /// Estimates the average edge degree for an expand operation using store statistics.
456    fn estimate_expand_degree(
457        &self,
458        stats: &grafeo_core::statistics::Statistics,
459        expand: &ExpandOp,
460    ) -> f64 {
461        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
462        if expand.edge_types.len() == 1 {
463            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
464        } else if stats.total_nodes > 0 {
465            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
466        } else {
467            10.0
468        }
469    }
470
471    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
472    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
473    fn maybe_profile(
474        &self,
475        result: Result<(Box<dyn Operator>, Vec<String>)>,
476        op: &LogicalOperator,
477    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
478        if self.profiling.get() {
479            let (physical, columns) = result?;
480            let (entry, stats) =
481                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
482            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
483            self.profile_entries.borrow_mut().push(entry);
484            Ok((Box::new(profiled), columns))
485        } else {
486            result
487        }
488    }
489
490    /// Plans a single logical operator.
491    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
492        let result = match op {
493            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
494            LogicalOperator::Expand(expand) => {
495                if self.factorized_execution {
496                    let (chain_len, _base) = Self::count_expand_chain(op);
497                    if chain_len >= 2 {
498                        return self.maybe_profile(self.plan_expand_chain(op), op);
499                    }
500                }
501                self.plan_expand(expand)
502            }
503            LogicalOperator::Return(ret) => self.plan_return(ret),
504            LogicalOperator::Filter(filter) => self.plan_filter(filter),
505            LogicalOperator::Project(project) => self.plan_project(project),
506            LogicalOperator::Limit(limit) => self.plan_limit(limit),
507            LogicalOperator::Skip(skip) => self.plan_skip(skip),
508            LogicalOperator::Sort(sort) => self.plan_sort(sort),
509            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
510            LogicalOperator::Join(join) => self.plan_join(join),
511            LogicalOperator::Union(union) => self.plan_union(union),
512            LogicalOperator::Except(except) => self.plan_except(except),
513            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
514            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
515            LogicalOperator::Apply(apply) => self.plan_apply(apply),
516            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
517            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
518            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
519            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
520            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
521            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
522            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
523            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
524            LogicalOperator::Merge(merge) => self.plan_merge(merge),
525            LogicalOperator::MergeRelationship(merge_rel) => {
526                self.plan_merge_relationship(merge_rel)
527            }
528            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
529            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
530            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
531            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
532            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
533            #[cfg(feature = "algos")]
534            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
535            #[cfg(not(feature = "algos"))]
536            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
537                "CALL procedures require the 'algos' feature".to_string(),
538            )),
539            LogicalOperator::ParameterScan(param_scan) => {
540                let state = self
541                    .correlated_param_state
542                    .borrow()
543                    .clone()
544                    .ok_or_else(|| {
545                        Error::Internal(
546                            "ParameterScan without correlated Apply context".to_string(),
547                        )
548                    })?;
549                let columns = param_scan.columns.clone();
550                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
551                Ok((operator, columns))
552            }
553            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
554            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
555            LogicalOperator::LoadCsv(load) => {
556                let operator: Box<dyn Operator> = Box::new(LoadCsvOperator::new(
557                    load.path.clone(),
558                    load.with_headers,
559                    load.field_terminator,
560                    load.variable.clone(),
561                ));
562                Ok((operator, vec![load.variable.clone()]))
563            }
564            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
565            LogicalOperator::VectorScan(_) => Err(Error::Internal(
566                "VectorScan requires vector-index feature".to_string(),
567            )),
568            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
569                "VectorJoin requires vector-index feature".to_string(),
570            )),
571            _ => Err(Error::Internal(format!(
572                "Unsupported operator: {:?}",
573                std::mem::discriminant(op)
574            ))),
575        };
576        self.maybe_profile(result, op)
577    }
578
579    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
580    fn plan_horizontal_aggregate(
581        &self,
582        ha: &HorizontalAggregateOp,
583    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
584        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
585
586        let list_col_idx = child_columns
587            .iter()
588            .position(|c| c == &ha.list_column)
589            .ok_or_else(|| {
590                Error::Internal(format!(
591                    "HorizontalAggregate list column '{}' not found in {:?}",
592                    ha.list_column, child_columns
593                ))
594            })?;
595
596        let entity_kind = match ha.entity_kind {
597            LogicalEntityKind::Edge => EntityKind::Edge,
598            LogicalEntityKind::Node => EntityKind::Node,
599        };
600
601        let function = convert_aggregate_function(ha.function);
602        let input_column_count = child_columns.len();
603
604        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
605            child_op,
606            list_col_idx,
607            entity_kind,
608            function,
609            ha.property.clone(),
610            Arc::clone(&self.store) as Arc<dyn GraphStore>,
611            input_column_count,
612        ));
613
614        let mut columns = child_columns;
615        columns.push(ha.alias.clone());
616        // Mark the result as a scalar column
617        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
618
619        Ok((operator, columns))
620    }
621
622    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
623    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
624        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
625        let key_idx = child_columns
626            .iter()
627            .position(|c| c == &mc.key_var)
628            .ok_or_else(|| {
629                Error::Internal(format!(
630                    "MapCollect key '{}' not in columns {:?}",
631                    mc.key_var, child_columns
632                ))
633            })?;
634        let value_idx = child_columns
635            .iter()
636            .position(|c| c == &mc.value_var)
637            .ok_or_else(|| {
638                Error::Internal(format!(
639                    "MapCollect value '{}' not in columns {:?}",
640                    mc.value_var, child_columns
641                ))
642            })?;
643        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
644        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
645        Ok((operator, vec![mc.alias.clone()]))
646    }
647}
648
649/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
650#[cfg(feature = "algos")]
651struct StaticResultOperator {
652    rows: Vec<Vec<Value>>,
653    column_indices: Vec<usize>,
654    row_index: usize,
655}
656
657#[cfg(feature = "algos")]
658impl Operator for StaticResultOperator {
659    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
660        use grafeo_core::execution::DataChunk;
661
662        if self.row_index >= self.rows.len() {
663            return Ok(None);
664        }
665
666        let remaining = self.rows.len() - self.row_index;
667        let chunk_rows = remaining.min(1024);
668        let col_count = self.column_indices.len();
669
670        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
671        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
672
673        for row_offset in 0..chunk_rows {
674            let row = &self.rows[self.row_index + row_offset];
675            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
676                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
677                if let Some(col) = chunk.column_mut(col_idx) {
678                    col.push_value(value);
679                }
680            }
681        }
682        chunk.set_count(chunk_rows);
683
684        self.row_index += chunk_rows;
685        Ok(Some(chunk))
686    }
687
688    fn reset(&mut self) {
689        self.row_index = 0;
690    }
691
692    fn name(&self) -> &'static str {
693        "StaticResult"
694    }
695}
696
697#[cfg(test)]
698mod tests {
699    use super::*;
700    use crate::query::plan::{
701        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
702        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
703        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
704        SkipOp as LogicalSkipOp, SortKey, SortOp,
705    };
706    use grafeo_common::types::Value;
707    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
708    use grafeo_core::graph::GraphStoreMut;
709    use grafeo_core::graph::lpg::LpgStore;
710
711    fn create_test_store() -> Arc<dyn GraphStoreMut> {
712        let store = Arc::new(LpgStore::new().unwrap());
713        store.create_node(&["Person"]);
714        store.create_node(&["Person"]);
715        store.create_node(&["Company"]);
716        store
717    }
718
719    // ==================== Simple Scan Tests ====================
720
721    #[test]
722    fn test_plan_simple_scan() {
723        let store = create_test_store();
724        let planner = Planner::new(store);
725
726        // MATCH (n:Person) RETURN n
727        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
728            items: vec![ReturnItem {
729                expression: LogicalExpression::Variable("n".to_string()),
730                alias: None,
731            }],
732            distinct: false,
733            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
734                variable: "n".to_string(),
735                label: Some("Person".to_string()),
736                input: None,
737            })),
738        }));
739
740        let physical = planner.plan(&logical).unwrap();
741        assert_eq!(physical.columns(), &["n"]);
742    }
743
744    #[test]
745    fn test_plan_scan_without_label() {
746        let store = create_test_store();
747        let planner = Planner::new(store);
748
749        // MATCH (n) RETURN n
750        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
751            items: vec![ReturnItem {
752                expression: LogicalExpression::Variable("n".to_string()),
753                alias: None,
754            }],
755            distinct: false,
756            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
757                variable: "n".to_string(),
758                label: None,
759                input: None,
760            })),
761        }));
762
763        let physical = planner.plan(&logical).unwrap();
764        assert_eq!(physical.columns(), &["n"]);
765    }
766
767    #[test]
768    fn test_plan_return_with_alias() {
769        let store = create_test_store();
770        let planner = Planner::new(store);
771
772        // MATCH (n:Person) RETURN n AS person
773        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
774            items: vec![ReturnItem {
775                expression: LogicalExpression::Variable("n".to_string()),
776                alias: Some("person".to_string()),
777            }],
778            distinct: false,
779            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
780                variable: "n".to_string(),
781                label: Some("Person".to_string()),
782                input: None,
783            })),
784        }));
785
786        let physical = planner.plan(&logical).unwrap();
787        assert_eq!(physical.columns(), &["person"]);
788    }
789
790    #[test]
791    fn test_plan_return_property() {
792        let store = create_test_store();
793        let planner = Planner::new(store);
794
795        // MATCH (n:Person) RETURN n.name
796        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
797            items: vec![ReturnItem {
798                expression: LogicalExpression::Property {
799                    variable: "n".to_string(),
800                    property: "name".to_string(),
801                },
802                alias: None,
803            }],
804            distinct: false,
805            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
806                variable: "n".to_string(),
807                label: Some("Person".to_string()),
808                input: None,
809            })),
810        }));
811
812        let physical = planner.plan(&logical).unwrap();
813        assert_eq!(physical.columns(), &["n.name"]);
814    }
815
816    #[test]
817    fn test_plan_return_literal() {
818        let store = create_test_store();
819        let planner = Planner::new(store);
820
821        // MATCH (n) RETURN 42 AS answer
822        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
823            items: vec![ReturnItem {
824                expression: LogicalExpression::Literal(Value::Int64(42)),
825                alias: Some("answer".to_string()),
826            }],
827            distinct: false,
828            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
829                variable: "n".to_string(),
830                label: None,
831                input: None,
832            })),
833        }));
834
835        let physical = planner.plan(&logical).unwrap();
836        assert_eq!(physical.columns(), &["answer"]);
837    }
838
839    // ==================== Filter Tests ====================
840
841    #[test]
842    fn test_plan_filter_equality() {
843        let store = create_test_store();
844        let planner = Planner::new(store);
845
846        // MATCH (n:Person) WHERE n.age = 30 RETURN n
847        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
848            items: vec![ReturnItem {
849                expression: LogicalExpression::Variable("n".to_string()),
850                alias: None,
851            }],
852            distinct: false,
853            input: Box::new(LogicalOperator::Filter(FilterOp {
854                predicate: LogicalExpression::Binary {
855                    left: Box::new(LogicalExpression::Property {
856                        variable: "n".to_string(),
857                        property: "age".to_string(),
858                    }),
859                    op: BinaryOp::Eq,
860                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
861                },
862                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
863                    variable: "n".to_string(),
864                    label: Some("Person".to_string()),
865                    input: None,
866                })),
867                pushdown_hint: None,
868            })),
869        }));
870
871        let physical = planner.plan(&logical).unwrap();
872        assert_eq!(physical.columns(), &["n"]);
873    }
874
875    #[test]
876    fn test_plan_filter_compound_and() {
877        let store = create_test_store();
878        let planner = Planner::new(store);
879
880        // WHERE n.age > 20 AND n.age < 40
881        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
882            items: vec![ReturnItem {
883                expression: LogicalExpression::Variable("n".to_string()),
884                alias: None,
885            }],
886            distinct: false,
887            input: Box::new(LogicalOperator::Filter(FilterOp {
888                predicate: LogicalExpression::Binary {
889                    left: Box::new(LogicalExpression::Binary {
890                        left: Box::new(LogicalExpression::Property {
891                            variable: "n".to_string(),
892                            property: "age".to_string(),
893                        }),
894                        op: BinaryOp::Gt,
895                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
896                    }),
897                    op: BinaryOp::And,
898                    right: Box::new(LogicalExpression::Binary {
899                        left: Box::new(LogicalExpression::Property {
900                            variable: "n".to_string(),
901                            property: "age".to_string(),
902                        }),
903                        op: BinaryOp::Lt,
904                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
905                    }),
906                },
907                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
908                    variable: "n".to_string(),
909                    label: None,
910                    input: None,
911                })),
912                pushdown_hint: None,
913            })),
914        }));
915
916        let physical = planner.plan(&logical).unwrap();
917        assert_eq!(physical.columns(), &["n"]);
918    }
919
920    #[test]
921    fn test_plan_filter_unary_not() {
922        let store = create_test_store();
923        let planner = Planner::new(store);
924
925        // WHERE NOT n.active
926        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
927            items: vec![ReturnItem {
928                expression: LogicalExpression::Variable("n".to_string()),
929                alias: None,
930            }],
931            distinct: false,
932            input: Box::new(LogicalOperator::Filter(FilterOp {
933                predicate: LogicalExpression::Unary {
934                    op: UnaryOp::Not,
935                    operand: Box::new(LogicalExpression::Property {
936                        variable: "n".to_string(),
937                        property: "active".to_string(),
938                    }),
939                },
940                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
941                    variable: "n".to_string(),
942                    label: None,
943                    input: None,
944                })),
945                pushdown_hint: None,
946            })),
947        }));
948
949        let physical = planner.plan(&logical).unwrap();
950        assert_eq!(physical.columns(), &["n"]);
951    }
952
953    #[test]
954    fn test_plan_filter_is_null() {
955        let store = create_test_store();
956        let planner = Planner::new(store);
957
958        // WHERE n.email IS NULL
959        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
960            items: vec![ReturnItem {
961                expression: LogicalExpression::Variable("n".to_string()),
962                alias: None,
963            }],
964            distinct: false,
965            input: Box::new(LogicalOperator::Filter(FilterOp {
966                predicate: LogicalExpression::Unary {
967                    op: UnaryOp::IsNull,
968                    operand: Box::new(LogicalExpression::Property {
969                        variable: "n".to_string(),
970                        property: "email".to_string(),
971                    }),
972                },
973                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
974                    variable: "n".to_string(),
975                    label: None,
976                    input: None,
977                })),
978                pushdown_hint: None,
979            })),
980        }));
981
982        let physical = planner.plan(&logical).unwrap();
983        assert_eq!(physical.columns(), &["n"]);
984    }
985
986    #[test]
987    fn test_plan_filter_function_call() {
988        let store = create_test_store();
989        let planner = Planner::new(store);
990
991        // WHERE size(n.friends) > 0
992        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
993            items: vec![ReturnItem {
994                expression: LogicalExpression::Variable("n".to_string()),
995                alias: None,
996            }],
997            distinct: false,
998            input: Box::new(LogicalOperator::Filter(FilterOp {
999                predicate: LogicalExpression::Binary {
1000                    left: Box::new(LogicalExpression::FunctionCall {
1001                        name: "size".to_string(),
1002                        args: vec![LogicalExpression::Property {
1003                            variable: "n".to_string(),
1004                            property: "friends".to_string(),
1005                        }],
1006                        distinct: false,
1007                    }),
1008                    op: BinaryOp::Gt,
1009                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1010                },
1011                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1012                    variable: "n".to_string(),
1013                    label: None,
1014                    input: None,
1015                })),
1016                pushdown_hint: None,
1017            })),
1018        }));
1019
1020        let physical = planner.plan(&logical).unwrap();
1021        assert_eq!(physical.columns(), &["n"]);
1022    }
1023
1024    // ==================== Expand Tests ====================
1025
1026    #[test]
1027    fn test_plan_expand_outgoing() {
1028        let store = create_test_store();
1029        let planner = Planner::new(store);
1030
1031        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1032        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1033            items: vec![
1034                ReturnItem {
1035                    expression: LogicalExpression::Variable("a".to_string()),
1036                    alias: None,
1037                },
1038                ReturnItem {
1039                    expression: LogicalExpression::Variable("b".to_string()),
1040                    alias: None,
1041                },
1042            ],
1043            distinct: false,
1044            input: Box::new(LogicalOperator::Expand(ExpandOp {
1045                from_variable: "a".to_string(),
1046                to_variable: "b".to_string(),
1047                edge_variable: None,
1048                direction: ExpandDirection::Outgoing,
1049                edge_types: vec!["KNOWS".to_string()],
1050                min_hops: 1,
1051                max_hops: Some(1),
1052                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1053                    variable: "a".to_string(),
1054                    label: Some("Person".to_string()),
1055                    input: None,
1056                })),
1057                path_alias: None,
1058                path_mode: PathMode::Walk,
1059            })),
1060        }));
1061
1062        let physical = planner.plan(&logical).unwrap();
1063        // The return should have columns [a, b]
1064        assert!(physical.columns().contains(&"a".to_string()));
1065        assert!(physical.columns().contains(&"b".to_string()));
1066    }
1067
1068    #[test]
1069    fn test_plan_expand_with_edge_variable() {
1070        let store = create_test_store();
1071        let planner = Planner::new(store);
1072
1073        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1074        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1075            items: vec![
1076                ReturnItem {
1077                    expression: LogicalExpression::Variable("a".to_string()),
1078                    alias: None,
1079                },
1080                ReturnItem {
1081                    expression: LogicalExpression::Variable("r".to_string()),
1082                    alias: None,
1083                },
1084                ReturnItem {
1085                    expression: LogicalExpression::Variable("b".to_string()),
1086                    alias: None,
1087                },
1088            ],
1089            distinct: false,
1090            input: Box::new(LogicalOperator::Expand(ExpandOp {
1091                from_variable: "a".to_string(),
1092                to_variable: "b".to_string(),
1093                edge_variable: Some("r".to_string()),
1094                direction: ExpandDirection::Outgoing,
1095                edge_types: vec!["KNOWS".to_string()],
1096                min_hops: 1,
1097                max_hops: Some(1),
1098                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1099                    variable: "a".to_string(),
1100                    label: None,
1101                    input: None,
1102                })),
1103                path_alias: None,
1104                path_mode: PathMode::Walk,
1105            })),
1106        }));
1107
1108        let physical = planner.plan(&logical).unwrap();
1109        assert!(physical.columns().contains(&"a".to_string()));
1110        assert!(physical.columns().contains(&"r".to_string()));
1111        assert!(physical.columns().contains(&"b".to_string()));
1112    }
1113
1114    // ==================== Limit/Skip/Sort Tests ====================
1115
1116    #[test]
1117    fn test_plan_limit() {
1118        let store = create_test_store();
1119        let planner = Planner::new(store);
1120
1121        // MATCH (n) RETURN n LIMIT 10
1122        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1123            items: vec![ReturnItem {
1124                expression: LogicalExpression::Variable("n".to_string()),
1125                alias: None,
1126            }],
1127            distinct: false,
1128            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1129                count: 10.into(),
1130                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1131                    variable: "n".to_string(),
1132                    label: None,
1133                    input: None,
1134                })),
1135            })),
1136        }));
1137
1138        let physical = planner.plan(&logical).unwrap();
1139        assert_eq!(physical.columns(), &["n"]);
1140    }
1141
1142    #[test]
1143    fn test_plan_skip() {
1144        let store = create_test_store();
1145        let planner = Planner::new(store);
1146
1147        // MATCH (n) RETURN n SKIP 5
1148        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1149            items: vec![ReturnItem {
1150                expression: LogicalExpression::Variable("n".to_string()),
1151                alias: None,
1152            }],
1153            distinct: false,
1154            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1155                count: 5.into(),
1156                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1157                    variable: "n".to_string(),
1158                    label: None,
1159                    input: None,
1160                })),
1161            })),
1162        }));
1163
1164        let physical = planner.plan(&logical).unwrap();
1165        assert_eq!(physical.columns(), &["n"]);
1166    }
1167
1168    #[test]
1169    fn test_plan_sort() {
1170        let store = create_test_store();
1171        let planner = Planner::new(store);
1172
1173        // MATCH (n) RETURN n ORDER BY n.name ASC
1174        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1175            items: vec![ReturnItem {
1176                expression: LogicalExpression::Variable("n".to_string()),
1177                alias: None,
1178            }],
1179            distinct: false,
1180            input: Box::new(LogicalOperator::Sort(SortOp {
1181                keys: vec![SortKey {
1182                    expression: LogicalExpression::Variable("n".to_string()),
1183                    order: SortOrder::Ascending,
1184                    nulls: None,
1185                }],
1186                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1187                    variable: "n".to_string(),
1188                    label: None,
1189                    input: None,
1190                })),
1191            })),
1192        }));
1193
1194        let physical = planner.plan(&logical).unwrap();
1195        assert_eq!(physical.columns(), &["n"]);
1196    }
1197
1198    #[test]
1199    fn test_plan_sort_descending() {
1200        let store = create_test_store();
1201        let planner = Planner::new(store);
1202
1203        // ORDER BY n DESC
1204        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1205            items: vec![ReturnItem {
1206                expression: LogicalExpression::Variable("n".to_string()),
1207                alias: None,
1208            }],
1209            distinct: false,
1210            input: Box::new(LogicalOperator::Sort(SortOp {
1211                keys: vec![SortKey {
1212                    expression: LogicalExpression::Variable("n".to_string()),
1213                    order: SortOrder::Descending,
1214                    nulls: None,
1215                }],
1216                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1217                    variable: "n".to_string(),
1218                    label: None,
1219                    input: None,
1220                })),
1221            })),
1222        }));
1223
1224        let physical = planner.plan(&logical).unwrap();
1225        assert_eq!(physical.columns(), &["n"]);
1226    }
1227
1228    #[test]
1229    fn test_plan_distinct() {
1230        let store = create_test_store();
1231        let planner = Planner::new(store);
1232
1233        // MATCH (n) RETURN DISTINCT n
1234        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1235            items: vec![ReturnItem {
1236                expression: LogicalExpression::Variable("n".to_string()),
1237                alias: None,
1238            }],
1239            distinct: false,
1240            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1241                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1242                    variable: "n".to_string(),
1243                    label: None,
1244                    input: None,
1245                })),
1246                columns: None,
1247            })),
1248        }));
1249
1250        let physical = planner.plan(&logical).unwrap();
1251        assert_eq!(physical.columns(), &["n"]);
1252    }
1253
1254    #[test]
1255    fn test_plan_distinct_with_columns() {
1256        let store = create_test_store();
1257        let planner = Planner::new(store);
1258
1259        // DISTINCT on specific columns (column-specific dedup)
1260        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1261            items: vec![ReturnItem {
1262                expression: LogicalExpression::Variable("n".to_string()),
1263                alias: None,
1264            }],
1265            distinct: false,
1266            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1267                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1268                    variable: "n".to_string(),
1269                    label: None,
1270                    input: None,
1271                })),
1272                columns: Some(vec!["n".to_string()]),
1273            })),
1274        }));
1275
1276        let physical = planner.plan(&logical).unwrap();
1277        assert_eq!(physical.columns(), &["n"]);
1278    }
1279
1280    #[test]
1281    fn test_plan_distinct_with_nonexistent_columns() {
1282        let store = create_test_store();
1283        let planner = Planner::new(store);
1284
1285        // When distinct columns don't match any output columns,
1286        // it falls back to full-row distinct.
1287        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1288            items: vec![ReturnItem {
1289                expression: LogicalExpression::Variable("n".to_string()),
1290                alias: None,
1291            }],
1292            distinct: false,
1293            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1294                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1295                    variable: "n".to_string(),
1296                    label: None,
1297                    input: None,
1298                })),
1299                columns: Some(vec!["nonexistent".to_string()]),
1300            })),
1301        }));
1302
1303        let physical = planner.plan(&logical).unwrap();
1304        assert_eq!(physical.columns(), &["n"]);
1305    }
1306
1307    // ==================== Aggregate Tests ====================
1308
1309    #[test]
1310    fn test_plan_aggregate_count() {
1311        let store = create_test_store();
1312        let planner = Planner::new(store);
1313
1314        // MATCH (n) RETURN count(n)
1315        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1316            items: vec![ReturnItem {
1317                expression: LogicalExpression::Variable("cnt".to_string()),
1318                alias: None,
1319            }],
1320            distinct: false,
1321            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1322                group_by: vec![],
1323                aggregates: vec![LogicalAggregateExpr {
1324                    function: LogicalAggregateFunction::Count,
1325                    expression: Some(LogicalExpression::Variable("n".to_string())),
1326                    expression2: None,
1327                    distinct: false,
1328                    alias: Some("cnt".to_string()),
1329                    percentile: None,
1330                    separator: None,
1331                }],
1332                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1333                    variable: "n".to_string(),
1334                    label: None,
1335                    input: None,
1336                })),
1337                having: None,
1338            })),
1339        }));
1340
1341        let physical = planner.plan(&logical).unwrap();
1342        assert!(physical.columns().contains(&"cnt".to_string()));
1343    }
1344
1345    #[test]
1346    fn test_plan_aggregate_with_group_by() {
1347        let store = create_test_store();
1348        let planner = Planner::new(store);
1349
1350        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1351        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1352            group_by: vec![LogicalExpression::Property {
1353                variable: "n".to_string(),
1354                property: "city".to_string(),
1355            }],
1356            aggregates: vec![LogicalAggregateExpr {
1357                function: LogicalAggregateFunction::Count,
1358                expression: Some(LogicalExpression::Variable("n".to_string())),
1359                expression2: None,
1360                distinct: false,
1361                alias: Some("cnt".to_string()),
1362                percentile: None,
1363                separator: None,
1364            }],
1365            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1366                variable: "n".to_string(),
1367                label: Some("Person".to_string()),
1368                input: None,
1369            })),
1370            having: None,
1371        }));
1372
1373        let physical = planner.plan(&logical).unwrap();
1374        assert_eq!(physical.columns().len(), 2);
1375    }
1376
1377    #[test]
1378    fn test_plan_aggregate_sum() {
1379        let store = create_test_store();
1380        let planner = Planner::new(store);
1381
1382        // SUM(n.value)
1383        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1384            group_by: vec![],
1385            aggregates: vec![LogicalAggregateExpr {
1386                function: LogicalAggregateFunction::Sum,
1387                expression: Some(LogicalExpression::Property {
1388                    variable: "n".to_string(),
1389                    property: "value".to_string(),
1390                }),
1391                expression2: None,
1392                distinct: false,
1393                alias: Some("total".to_string()),
1394                percentile: None,
1395                separator: None,
1396            }],
1397            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1398                variable: "n".to_string(),
1399                label: None,
1400                input: None,
1401            })),
1402            having: None,
1403        }));
1404
1405        let physical = planner.plan(&logical).unwrap();
1406        assert!(physical.columns().contains(&"total".to_string()));
1407    }
1408
1409    #[test]
1410    fn test_plan_aggregate_avg() {
1411        let store = create_test_store();
1412        let planner = Planner::new(store);
1413
1414        // AVG(n.score)
1415        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1416            group_by: vec![],
1417            aggregates: vec![LogicalAggregateExpr {
1418                function: LogicalAggregateFunction::Avg,
1419                expression: Some(LogicalExpression::Property {
1420                    variable: "n".to_string(),
1421                    property: "score".to_string(),
1422                }),
1423                expression2: None,
1424                distinct: false,
1425                alias: Some("average".to_string()),
1426                percentile: None,
1427                separator: None,
1428            }],
1429            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1430                variable: "n".to_string(),
1431                label: None,
1432                input: None,
1433            })),
1434            having: None,
1435        }));
1436
1437        let physical = planner.plan(&logical).unwrap();
1438        assert!(physical.columns().contains(&"average".to_string()));
1439    }
1440
1441    #[test]
1442    fn test_plan_aggregate_min_max() {
1443        let store = create_test_store();
1444        let planner = Planner::new(store);
1445
1446        // MIN(n.age), MAX(n.age)
1447        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1448            group_by: vec![],
1449            aggregates: vec![
1450                LogicalAggregateExpr {
1451                    function: LogicalAggregateFunction::Min,
1452                    expression: Some(LogicalExpression::Property {
1453                        variable: "n".to_string(),
1454                        property: "age".to_string(),
1455                    }),
1456                    expression2: None,
1457                    distinct: false,
1458                    alias: Some("youngest".to_string()),
1459                    percentile: None,
1460                    separator: None,
1461                },
1462                LogicalAggregateExpr {
1463                    function: LogicalAggregateFunction::Max,
1464                    expression: Some(LogicalExpression::Property {
1465                        variable: "n".to_string(),
1466                        property: "age".to_string(),
1467                    }),
1468                    expression2: None,
1469                    distinct: false,
1470                    alias: Some("oldest".to_string()),
1471                    percentile: None,
1472                    separator: None,
1473                },
1474            ],
1475            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1476                variable: "n".to_string(),
1477                label: None,
1478                input: None,
1479            })),
1480            having: None,
1481        }));
1482
1483        let physical = planner.plan(&logical).unwrap();
1484        assert!(physical.columns().contains(&"youngest".to_string()));
1485        assert!(physical.columns().contains(&"oldest".to_string()));
1486    }
1487
1488    // ==================== Join Tests ====================
1489
1490    #[test]
1491    fn test_plan_inner_join() {
1492        let store = create_test_store();
1493        let planner = Planner::new(store);
1494
1495        // Inner join between two scans
1496        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1497            items: vec![
1498                ReturnItem {
1499                    expression: LogicalExpression::Variable("a".to_string()),
1500                    alias: None,
1501                },
1502                ReturnItem {
1503                    expression: LogicalExpression::Variable("b".to_string()),
1504                    alias: None,
1505                },
1506            ],
1507            distinct: false,
1508            input: Box::new(LogicalOperator::Join(JoinOp {
1509                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1510                    variable: "a".to_string(),
1511                    label: Some("Person".to_string()),
1512                    input: None,
1513                })),
1514                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1515                    variable: "b".to_string(),
1516                    label: Some("Company".to_string()),
1517                    input: None,
1518                })),
1519                join_type: JoinType::Inner,
1520                conditions: vec![JoinCondition {
1521                    left: LogicalExpression::Variable("a".to_string()),
1522                    right: LogicalExpression::Variable("b".to_string()),
1523                }],
1524            })),
1525        }));
1526
1527        let physical = planner.plan(&logical).unwrap();
1528        assert!(physical.columns().contains(&"a".to_string()));
1529        assert!(physical.columns().contains(&"b".to_string()));
1530    }
1531
1532    #[test]
1533    fn test_plan_cross_join() {
1534        let store = create_test_store();
1535        let planner = Planner::new(store);
1536
1537        // Cross join (no conditions)
1538        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1539            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1540                variable: "a".to_string(),
1541                label: None,
1542                input: None,
1543            })),
1544            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1545                variable: "b".to_string(),
1546                label: None,
1547                input: None,
1548            })),
1549            join_type: JoinType::Cross,
1550            conditions: vec![],
1551        }));
1552
1553        let physical = planner.plan(&logical).unwrap();
1554        assert_eq!(physical.columns().len(), 2);
1555    }
1556
1557    #[test]
1558    fn test_plan_left_join() {
1559        let store = create_test_store();
1560        let planner = Planner::new(store);
1561
1562        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1563            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1564                variable: "a".to_string(),
1565                label: None,
1566                input: None,
1567            })),
1568            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1569                variable: "b".to_string(),
1570                label: None,
1571                input: None,
1572            })),
1573            join_type: JoinType::Left,
1574            conditions: vec![],
1575        }));
1576
1577        let physical = planner.plan(&logical).unwrap();
1578        assert_eq!(physical.columns().len(), 2);
1579    }
1580
1581    // ==================== Mutation Tests ====================
1582
1583    #[test]
1584    fn test_plan_create_node() {
1585        let store = create_test_store();
1586        let planner = Planner::new(store);
1587
1588        // CREATE (n:Person {name: 'Alix'})
1589        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1590            variable: "n".to_string(),
1591            labels: vec!["Person".to_string()],
1592            properties: vec![(
1593                "name".to_string(),
1594                LogicalExpression::Literal(Value::String("Alix".into())),
1595            )],
1596            input: None,
1597        }));
1598
1599        let physical = planner.plan(&logical).unwrap();
1600        assert!(physical.columns().contains(&"n".to_string()));
1601    }
1602
1603    #[test]
1604    fn test_plan_create_edge() {
1605        let store = create_test_store();
1606        let planner = Planner::new(store);
1607
1608        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1609        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1610            variable: Some("r".to_string()),
1611            from_variable: "a".to_string(),
1612            to_variable: "b".to_string(),
1613            edge_type: "KNOWS".to_string(),
1614            properties: vec![],
1615            input: Box::new(LogicalOperator::Join(JoinOp {
1616                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1617                    variable: "a".to_string(),
1618                    label: None,
1619                    input: None,
1620                })),
1621                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1622                    variable: "b".to_string(),
1623                    label: None,
1624                    input: None,
1625                })),
1626                join_type: JoinType::Cross,
1627                conditions: vec![],
1628            })),
1629        }));
1630
1631        let physical = planner.plan(&logical).unwrap();
1632        assert!(physical.columns().contains(&"r".to_string()));
1633    }
1634
1635    #[test]
1636    fn test_plan_delete_node() {
1637        let store = create_test_store();
1638        let planner = Planner::new(store);
1639
1640        // MATCH (n) DELETE n
1641        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1642            variable: "n".to_string(),
1643            detach: false,
1644            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1645                variable: "n".to_string(),
1646                label: None,
1647                input: None,
1648            })),
1649        }));
1650
1651        let physical = planner.plan(&logical).unwrap();
1652        assert!(physical.columns().contains(&"n".to_string()));
1653    }
1654
1655    // ==================== Error Cases ====================
1656
1657    #[test]
1658    fn test_plan_empty_errors() {
1659        let store = create_test_store();
1660        let planner = Planner::new(store);
1661
1662        let logical = LogicalPlan::new(LogicalOperator::Empty);
1663        let result = planner.plan(&logical);
1664        assert!(result.is_err());
1665    }
1666
1667    #[test]
1668    fn test_plan_missing_variable_in_return() {
1669        let store = create_test_store();
1670        let planner = Planner::new(store);
1671
1672        // Return variable that doesn't exist in input
1673        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1674            items: vec![ReturnItem {
1675                expression: LogicalExpression::Variable("missing".to_string()),
1676                alias: None,
1677            }],
1678            distinct: false,
1679            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1680                variable: "n".to_string(),
1681                label: None,
1682                input: None,
1683            })),
1684        }));
1685
1686        let result = planner.plan(&logical);
1687        assert!(result.is_err());
1688    }
1689
1690    // ==================== Helper Function Tests ====================
1691
1692    #[test]
1693    fn test_convert_binary_ops() {
1694        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1695        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1696        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1697        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1698        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1699        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1700        assert!(convert_binary_op(BinaryOp::And).is_ok());
1701        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1702        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1703        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1704        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1705        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1706    }
1707
1708    #[test]
1709    fn test_convert_unary_ops() {
1710        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1711        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1712        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1713        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1714    }
1715
1716    #[test]
1717    fn test_convert_aggregate_functions() {
1718        assert!(matches!(
1719            convert_aggregate_function(LogicalAggregateFunction::Count),
1720            PhysicalAggregateFunction::Count
1721        ));
1722        assert!(matches!(
1723            convert_aggregate_function(LogicalAggregateFunction::Sum),
1724            PhysicalAggregateFunction::Sum
1725        ));
1726        assert!(matches!(
1727            convert_aggregate_function(LogicalAggregateFunction::Avg),
1728            PhysicalAggregateFunction::Avg
1729        ));
1730        assert!(matches!(
1731            convert_aggregate_function(LogicalAggregateFunction::Min),
1732            PhysicalAggregateFunction::Min
1733        ));
1734        assert!(matches!(
1735            convert_aggregate_function(LogicalAggregateFunction::Max),
1736            PhysicalAggregateFunction::Max
1737        ));
1738    }
1739
1740    #[test]
1741    fn test_planner_accessors() {
1742        let store = create_test_store();
1743        let planner = Planner::new(Arc::clone(&store));
1744
1745        assert!(planner.transaction_id().is_none());
1746        assert!(planner.transaction_manager().is_none());
1747        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1748    }
1749
1750    #[test]
1751    fn test_physical_plan_accessors() {
1752        let store = create_test_store();
1753        let planner = Planner::new(store);
1754
1755        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1756            variable: "n".to_string(),
1757            label: None,
1758            input: None,
1759        }));
1760
1761        let physical = planner.plan(&logical).unwrap();
1762        assert_eq!(physical.columns(), &["n"]);
1763
1764        // Test into_operator
1765        let _ = physical.into_operator();
1766    }
1767
1768    // ==================== Adaptive Planning Tests ====================
1769
1770    #[test]
1771    fn test_plan_adaptive_with_scan() {
1772        let store = create_test_store();
1773        let planner = Planner::new(store);
1774
1775        // MATCH (n:Person) RETURN n
1776        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1777            items: vec![ReturnItem {
1778                expression: LogicalExpression::Variable("n".to_string()),
1779                alias: None,
1780            }],
1781            distinct: false,
1782            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1783                variable: "n".to_string(),
1784                label: Some("Person".to_string()),
1785                input: None,
1786            })),
1787        }));
1788
1789        let physical = planner.plan_adaptive(&logical).unwrap();
1790        assert_eq!(physical.columns(), &["n"]);
1791        // Should have adaptive context with estimates
1792        assert!(physical.adaptive_context.is_some());
1793    }
1794
1795    #[test]
1796    fn test_plan_adaptive_with_filter() {
1797        let store = create_test_store();
1798        let planner = Planner::new(store);
1799
1800        // MATCH (n) WHERE n.age > 30 RETURN n
1801        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1802            items: vec![ReturnItem {
1803                expression: LogicalExpression::Variable("n".to_string()),
1804                alias: None,
1805            }],
1806            distinct: false,
1807            input: Box::new(LogicalOperator::Filter(FilterOp {
1808                predicate: LogicalExpression::Binary {
1809                    left: Box::new(LogicalExpression::Property {
1810                        variable: "n".to_string(),
1811                        property: "age".to_string(),
1812                    }),
1813                    op: BinaryOp::Gt,
1814                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1815                },
1816                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1817                    variable: "n".to_string(),
1818                    label: None,
1819                    input: None,
1820                })),
1821                pushdown_hint: None,
1822            })),
1823        }));
1824
1825        let physical = planner.plan_adaptive(&logical).unwrap();
1826        assert!(physical.adaptive_context.is_some());
1827    }
1828
1829    #[test]
1830    fn test_plan_adaptive_with_expand() {
1831        let store = create_test_store();
1832        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1833
1834        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1835        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1836            items: vec![
1837                ReturnItem {
1838                    expression: LogicalExpression::Variable("a".to_string()),
1839                    alias: None,
1840                },
1841                ReturnItem {
1842                    expression: LogicalExpression::Variable("b".to_string()),
1843                    alias: None,
1844                },
1845            ],
1846            distinct: false,
1847            input: Box::new(LogicalOperator::Expand(ExpandOp {
1848                from_variable: "a".to_string(),
1849                to_variable: "b".to_string(),
1850                edge_variable: None,
1851                direction: ExpandDirection::Outgoing,
1852                edge_types: vec!["KNOWS".to_string()],
1853                min_hops: 1,
1854                max_hops: Some(1),
1855                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1856                    variable: "a".to_string(),
1857                    label: None,
1858                    input: None,
1859                })),
1860                path_alias: None,
1861                path_mode: PathMode::Walk,
1862            })),
1863        }));
1864
1865        let physical = planner.plan_adaptive(&logical).unwrap();
1866        assert!(physical.adaptive_context.is_some());
1867    }
1868
1869    #[test]
1870    fn test_plan_adaptive_with_join() {
1871        let store = create_test_store();
1872        let planner = Planner::new(store);
1873
1874        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1875            items: vec![
1876                ReturnItem {
1877                    expression: LogicalExpression::Variable("a".to_string()),
1878                    alias: None,
1879                },
1880                ReturnItem {
1881                    expression: LogicalExpression::Variable("b".to_string()),
1882                    alias: None,
1883                },
1884            ],
1885            distinct: false,
1886            input: Box::new(LogicalOperator::Join(JoinOp {
1887                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1888                    variable: "a".to_string(),
1889                    label: None,
1890                    input: None,
1891                })),
1892                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1893                    variable: "b".to_string(),
1894                    label: None,
1895                    input: None,
1896                })),
1897                join_type: JoinType::Cross,
1898                conditions: vec![],
1899            })),
1900        }));
1901
1902        let physical = planner.plan_adaptive(&logical).unwrap();
1903        assert!(physical.adaptive_context.is_some());
1904    }
1905
1906    #[test]
1907    fn test_plan_adaptive_with_aggregate() {
1908        let store = create_test_store();
1909        let planner = Planner::new(store);
1910
1911        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1912            group_by: vec![],
1913            aggregates: vec![LogicalAggregateExpr {
1914                function: LogicalAggregateFunction::Count,
1915                expression: Some(LogicalExpression::Variable("n".to_string())),
1916                expression2: None,
1917                distinct: false,
1918                alias: Some("cnt".to_string()),
1919                percentile: None,
1920                separator: None,
1921            }],
1922            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1923                variable: "n".to_string(),
1924                label: None,
1925                input: None,
1926            })),
1927            having: None,
1928        }));
1929
1930        let physical = planner.plan_adaptive(&logical).unwrap();
1931        assert!(physical.adaptive_context.is_some());
1932    }
1933
1934    #[test]
1935    fn test_plan_adaptive_with_distinct() {
1936        let store = create_test_store();
1937        let planner = Planner::new(store);
1938
1939        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1940            items: vec![ReturnItem {
1941                expression: LogicalExpression::Variable("n".to_string()),
1942                alias: None,
1943            }],
1944            distinct: false,
1945            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1946                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1947                    variable: "n".to_string(),
1948                    label: None,
1949                    input: None,
1950                })),
1951                columns: None,
1952            })),
1953        }));
1954
1955        let physical = planner.plan_adaptive(&logical).unwrap();
1956        assert!(physical.adaptive_context.is_some());
1957    }
1958
1959    #[test]
1960    fn test_plan_adaptive_with_limit() {
1961        let store = create_test_store();
1962        let planner = Planner::new(store);
1963
1964        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1965            items: vec![ReturnItem {
1966                expression: LogicalExpression::Variable("n".to_string()),
1967                alias: None,
1968            }],
1969            distinct: false,
1970            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1971                count: 10.into(),
1972                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1973                    variable: "n".to_string(),
1974                    label: None,
1975                    input: None,
1976                })),
1977            })),
1978        }));
1979
1980        let physical = planner.plan_adaptive(&logical).unwrap();
1981        assert!(physical.adaptive_context.is_some());
1982    }
1983
1984    #[test]
1985    fn test_plan_adaptive_with_skip() {
1986        let store = create_test_store();
1987        let planner = Planner::new(store);
1988
1989        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1990            items: vec![ReturnItem {
1991                expression: LogicalExpression::Variable("n".to_string()),
1992                alias: None,
1993            }],
1994            distinct: false,
1995            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1996                count: 5.into(),
1997                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1998                    variable: "n".to_string(),
1999                    label: None,
2000                    input: None,
2001                })),
2002            })),
2003        }));
2004
2005        let physical = planner.plan_adaptive(&logical).unwrap();
2006        assert!(physical.adaptive_context.is_some());
2007    }
2008
2009    #[test]
2010    fn test_plan_adaptive_with_sort() {
2011        let store = create_test_store();
2012        let planner = Planner::new(store);
2013
2014        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2015            items: vec![ReturnItem {
2016                expression: LogicalExpression::Variable("n".to_string()),
2017                alias: None,
2018            }],
2019            distinct: false,
2020            input: Box::new(LogicalOperator::Sort(SortOp {
2021                keys: vec![SortKey {
2022                    expression: LogicalExpression::Variable("n".to_string()),
2023                    order: SortOrder::Ascending,
2024                    nulls: None,
2025                }],
2026                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2027                    variable: "n".to_string(),
2028                    label: None,
2029                    input: None,
2030                })),
2031            })),
2032        }));
2033
2034        let physical = planner.plan_adaptive(&logical).unwrap();
2035        assert!(physical.adaptive_context.is_some());
2036    }
2037
2038    #[test]
2039    fn test_plan_adaptive_with_union() {
2040        let store = create_test_store();
2041        let planner = Planner::new(store);
2042
2043        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2044            items: vec![ReturnItem {
2045                expression: LogicalExpression::Variable("n".to_string()),
2046                alias: None,
2047            }],
2048            distinct: false,
2049            input: Box::new(LogicalOperator::Union(UnionOp {
2050                inputs: vec![
2051                    LogicalOperator::NodeScan(NodeScanOp {
2052                        variable: "n".to_string(),
2053                        label: Some("Person".to_string()),
2054                        input: None,
2055                    }),
2056                    LogicalOperator::NodeScan(NodeScanOp {
2057                        variable: "n".to_string(),
2058                        label: Some("Company".to_string()),
2059                        input: None,
2060                    }),
2061                ],
2062            })),
2063        }));
2064
2065        let physical = planner.plan_adaptive(&logical).unwrap();
2066        assert!(physical.adaptive_context.is_some());
2067    }
2068
2069    // ==================== Variable Length Path Tests ====================
2070
2071    #[test]
2072    fn test_plan_expand_variable_length() {
2073        let store = create_test_store();
2074        let planner = Planner::new(store);
2075
2076        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2077        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2078            items: vec![
2079                ReturnItem {
2080                    expression: LogicalExpression::Variable("a".to_string()),
2081                    alias: None,
2082                },
2083                ReturnItem {
2084                    expression: LogicalExpression::Variable("b".to_string()),
2085                    alias: None,
2086                },
2087            ],
2088            distinct: false,
2089            input: Box::new(LogicalOperator::Expand(ExpandOp {
2090                from_variable: "a".to_string(),
2091                to_variable: "b".to_string(),
2092                edge_variable: None,
2093                direction: ExpandDirection::Outgoing,
2094                edge_types: vec!["KNOWS".to_string()],
2095                min_hops: 1,
2096                max_hops: Some(3),
2097                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2098                    variable: "a".to_string(),
2099                    label: None,
2100                    input: None,
2101                })),
2102                path_alias: None,
2103                path_mode: PathMode::Walk,
2104            })),
2105        }));
2106
2107        let physical = planner.plan(&logical).unwrap();
2108        assert!(physical.columns().contains(&"a".to_string()));
2109        assert!(physical.columns().contains(&"b".to_string()));
2110    }
2111
2112    #[test]
2113    fn test_plan_expand_with_path_alias() {
2114        let store = create_test_store();
2115        let planner = Planner::new(store);
2116
2117        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2118        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2119            items: vec![
2120                ReturnItem {
2121                    expression: LogicalExpression::Variable("a".to_string()),
2122                    alias: None,
2123                },
2124                ReturnItem {
2125                    expression: LogicalExpression::Variable("b".to_string()),
2126                    alias: None,
2127                },
2128            ],
2129            distinct: false,
2130            input: Box::new(LogicalOperator::Expand(ExpandOp {
2131                from_variable: "a".to_string(),
2132                to_variable: "b".to_string(),
2133                edge_variable: None,
2134                direction: ExpandDirection::Outgoing,
2135                edge_types: vec!["KNOWS".to_string()],
2136                min_hops: 1,
2137                max_hops: Some(3),
2138                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2139                    variable: "a".to_string(),
2140                    label: None,
2141                    input: None,
2142                })),
2143                path_alias: Some("p".to_string()),
2144                path_mode: PathMode::Walk,
2145            })),
2146        }));
2147
2148        let physical = planner.plan(&logical).unwrap();
2149        // Verify plan was created successfully with expected output columns
2150        assert!(physical.columns().contains(&"a".to_string()));
2151        assert!(physical.columns().contains(&"b".to_string()));
2152    }
2153
2154    #[test]
2155    fn test_plan_expand_incoming() {
2156        let store = create_test_store();
2157        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2158
2159        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2160        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2161            items: vec![
2162                ReturnItem {
2163                    expression: LogicalExpression::Variable("a".to_string()),
2164                    alias: None,
2165                },
2166                ReturnItem {
2167                    expression: LogicalExpression::Variable("b".to_string()),
2168                    alias: None,
2169                },
2170            ],
2171            distinct: false,
2172            input: Box::new(LogicalOperator::Expand(ExpandOp {
2173                from_variable: "a".to_string(),
2174                to_variable: "b".to_string(),
2175                edge_variable: None,
2176                direction: ExpandDirection::Incoming,
2177                edge_types: vec!["KNOWS".to_string()],
2178                min_hops: 1,
2179                max_hops: Some(1),
2180                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2181                    variable: "a".to_string(),
2182                    label: None,
2183                    input: None,
2184                })),
2185                path_alias: None,
2186                path_mode: PathMode::Walk,
2187            })),
2188        }));
2189
2190        let physical = planner.plan(&logical).unwrap();
2191        assert!(physical.columns().contains(&"a".to_string()));
2192        assert!(physical.columns().contains(&"b".to_string()));
2193    }
2194
2195    #[test]
2196    fn test_plan_expand_both_directions() {
2197        let store = create_test_store();
2198        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2199
2200        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2201        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2202            items: vec![
2203                ReturnItem {
2204                    expression: LogicalExpression::Variable("a".to_string()),
2205                    alias: None,
2206                },
2207                ReturnItem {
2208                    expression: LogicalExpression::Variable("b".to_string()),
2209                    alias: None,
2210                },
2211            ],
2212            distinct: false,
2213            input: Box::new(LogicalOperator::Expand(ExpandOp {
2214                from_variable: "a".to_string(),
2215                to_variable: "b".to_string(),
2216                edge_variable: None,
2217                direction: ExpandDirection::Both,
2218                edge_types: vec!["KNOWS".to_string()],
2219                min_hops: 1,
2220                max_hops: Some(1),
2221                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2222                    variable: "a".to_string(),
2223                    label: None,
2224                    input: None,
2225                })),
2226                path_alias: None,
2227                path_mode: PathMode::Walk,
2228            })),
2229        }));
2230
2231        let physical = planner.plan(&logical).unwrap();
2232        assert!(physical.columns().contains(&"a".to_string()));
2233        assert!(physical.columns().contains(&"b".to_string()));
2234    }
2235
2236    // ==================== With Context Tests ====================
2237
2238    #[test]
2239    fn test_planner_with_context() {
2240        use crate::transaction::TransactionManager;
2241
2242        let store = create_test_store();
2243        let transaction_manager = Arc::new(TransactionManager::new());
2244        let transaction_id = transaction_manager.begin();
2245        let epoch = transaction_manager.current_epoch();
2246
2247        let planner = Planner::with_context(
2248            Arc::clone(&store),
2249            Arc::clone(&transaction_manager),
2250            Some(transaction_id),
2251            epoch,
2252        );
2253
2254        assert_eq!(planner.transaction_id(), Some(transaction_id));
2255        assert!(planner.transaction_manager().is_some());
2256        assert_eq!(planner.viewing_epoch(), epoch);
2257    }
2258
2259    #[test]
2260    fn test_planner_with_factorized_execution_disabled() {
2261        let store = create_test_store();
2262        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2263
2264        // Two consecutive expands - should NOT use factorized execution
2265        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2266            items: vec![
2267                ReturnItem {
2268                    expression: LogicalExpression::Variable("a".to_string()),
2269                    alias: None,
2270                },
2271                ReturnItem {
2272                    expression: LogicalExpression::Variable("c".to_string()),
2273                    alias: None,
2274                },
2275            ],
2276            distinct: false,
2277            input: Box::new(LogicalOperator::Expand(ExpandOp {
2278                from_variable: "b".to_string(),
2279                to_variable: "c".to_string(),
2280                edge_variable: None,
2281                direction: ExpandDirection::Outgoing,
2282                edge_types: vec![],
2283                min_hops: 1,
2284                max_hops: Some(1),
2285                input: Box::new(LogicalOperator::Expand(ExpandOp {
2286                    from_variable: "a".to_string(),
2287                    to_variable: "b".to_string(),
2288                    edge_variable: None,
2289                    direction: ExpandDirection::Outgoing,
2290                    edge_types: vec![],
2291                    min_hops: 1,
2292                    max_hops: Some(1),
2293                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2294                        variable: "a".to_string(),
2295                        label: None,
2296                        input: None,
2297                    })),
2298                    path_alias: None,
2299                    path_mode: PathMode::Walk,
2300                })),
2301                path_alias: None,
2302                path_mode: PathMode::Walk,
2303            })),
2304        }));
2305
2306        let physical = planner.plan(&logical).unwrap();
2307        assert!(physical.columns().contains(&"a".to_string()));
2308        assert!(physical.columns().contains(&"c".to_string()));
2309    }
2310
2311    // ==================== Sort with Property Tests ====================
2312
2313    #[test]
2314    fn test_plan_sort_by_property() {
2315        let store = create_test_store();
2316        let planner = Planner::new(store);
2317
2318        // MATCH (n) RETURN n ORDER BY n.name ASC
2319        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2320            items: vec![ReturnItem {
2321                expression: LogicalExpression::Variable("n".to_string()),
2322                alias: None,
2323            }],
2324            distinct: false,
2325            input: Box::new(LogicalOperator::Sort(SortOp {
2326                keys: vec![SortKey {
2327                    expression: LogicalExpression::Property {
2328                        variable: "n".to_string(),
2329                        property: "name".to_string(),
2330                    },
2331                    order: SortOrder::Ascending,
2332                    nulls: None,
2333                }],
2334                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2335                    variable: "n".to_string(),
2336                    label: None,
2337                    input: None,
2338                })),
2339            })),
2340        }));
2341
2342        let physical = planner.plan(&logical).unwrap();
2343        // Should have the property column projected
2344        assert!(physical.columns().contains(&"n".to_string()));
2345    }
2346
2347    // ==================== Scan with Input Tests ====================
2348
2349    #[test]
2350    fn test_plan_scan_with_input() {
2351        let store = create_test_store();
2352        let planner = Planner::new(store);
2353
2354        // A scan with another scan as input (for chained patterns)
2355        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2356            items: vec![
2357                ReturnItem {
2358                    expression: LogicalExpression::Variable("a".to_string()),
2359                    alias: None,
2360                },
2361                ReturnItem {
2362                    expression: LogicalExpression::Variable("b".to_string()),
2363                    alias: None,
2364                },
2365            ],
2366            distinct: false,
2367            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2368                variable: "b".to_string(),
2369                label: Some("Company".to_string()),
2370                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2371                    variable: "a".to_string(),
2372                    label: Some("Person".to_string()),
2373                    input: None,
2374                }))),
2375            })),
2376        }));
2377
2378        let physical = planner.plan(&logical).unwrap();
2379        assert!(physical.columns().contains(&"a".to_string()));
2380        assert!(physical.columns().contains(&"b".to_string()));
2381    }
2382}