Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TransactionId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33    EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34    FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35    HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
38    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
39    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
40    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
41    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnwindOperator,
42    VariableLengthExpandOperator,
43};
44use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
45use std::collections::HashMap;
46use std::sync::Arc;
47
48use crate::query::planner::common;
49use crate::query::planner::common::expression_to_string;
50use crate::query::planner::{
51    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
52    convert_unary_op, value_to_logical_type,
53};
54use crate::transaction::TransactionManager;
55
56/// Range bounds for property-based range queries.
57struct RangeBounds<'a> {
58    min: Option<&'a Value>,
59    max: Option<&'a Value>,
60    min_inclusive: bool,
61    max_inclusive: bool,
62}
63
64/// Converts a logical plan to a physical operator tree for LPG stores.
65pub struct Planner {
66    /// The graph store (supports both read and write operations).
67    pub(super) store: Arc<dyn GraphStoreMut>,
68    /// Transaction manager for MVCC operations.
69    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
70    /// Current transaction ID (if in a transaction).
71    pub(super) transaction_id: Option<TransactionId>,
72    /// Epoch to use for visibility checks.
73    pub(super) viewing_epoch: EpochId,
74    /// Counter for generating unique anonymous edge column names.
75    pub(super) anon_edge_counter: std::cell::Cell<u32>,
76    /// Whether to use factorized execution for multi-hop queries.
77    pub(super) factorized_execution: bool,
78    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
79    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
80    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
81    /// Variables that hold edge IDs (from MATCH edge patterns).
82    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
83    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84    /// Optional constraint validator for schema enforcement during mutations.
85    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
86    /// Catalog for user-defined procedure lookup.
87    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
88    /// Shared parameter state for the currently planning correlated Apply.
89    /// Set by `plan_apply` before planning the inner operator, consumed by
90    /// `plan_operator` when encountering `ParameterScan`.
91    pub(super) correlated_param_state:
92        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
93    /// Variables from variable-length expand patterns (group-list variables).
94    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
95    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
96    /// When true, each physical operator is wrapped in `ProfiledOperator`.
97    profiling: std::cell::Cell<bool>,
98    /// Profile entries collected during planning (post-order).
99    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
100    /// Optional write tracker for recording writes during mutations.
101    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
102    /// Session context for introspection functions (info, schema, current_schema, etc.).
103    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
104    /// When true, expand operators use epoch-only visibility (no MVCC version
105    /// chain walks).  Set when the plan contains no mutations, so PENDING
106    /// writes are impossible to observe.
107    pub(super) read_only: bool,
108}
109
110impl Planner {
111    /// Creates a new planner with the given store.
112    ///
113    /// This creates a planner without transaction context, using the current
114    /// epoch from the store for visibility.
115    #[must_use]
116    pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
117        let epoch = store.current_epoch();
118        Self {
119            store,
120            transaction_manager: None,
121            transaction_id: None,
122            viewing_epoch: epoch,
123            anon_edge_counter: std::cell::Cell::new(0),
124            factorized_execution: true,
125            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
126            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
127            validator: None,
128            catalog: None,
129            correlated_param_state: std::cell::RefCell::new(None),
130            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
131            profiling: std::cell::Cell::new(false),
132            profile_entries: std::cell::RefCell::new(Vec::new()),
133            write_tracker: None,
134            session_context: grafeo_core::execution::operators::SessionContext::default(),
135            read_only: false,
136        }
137    }
138
139    /// Creates a new planner with transaction context for MVCC-aware planning.
140    #[must_use]
141    pub fn with_context(
142        store: Arc<dyn GraphStoreMut>,
143        transaction_manager: Arc<TransactionManager>,
144        transaction_id: Option<TransactionId>,
145        viewing_epoch: EpochId,
146    ) -> Self {
147        use crate::transaction::TransactionWriteTracker;
148
149        // Create write tracker when there's an active transaction
150        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
151            if transaction_id.is_some() {
152                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
153                    &transaction_manager,
154                ))))
155            } else {
156                None
157            };
158
159        Self {
160            store,
161            transaction_manager: Some(transaction_manager),
162            transaction_id,
163            viewing_epoch,
164            anon_edge_counter: std::cell::Cell::new(0),
165            factorized_execution: true,
166            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
167            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
168            validator: None,
169            catalog: None,
170            correlated_param_state: std::cell::RefCell::new(None),
171            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
172            profiling: std::cell::Cell::new(false),
173            profile_entries: std::cell::RefCell::new(Vec::new()),
174            write_tracker,
175            session_context: grafeo_core::execution::operators::SessionContext::default(),
176            read_only: false,
177        }
178    }
179
180    /// Marks this planner as planning a read-only query (no mutations),
181    /// enabling fast-path visibility checks in expand operators.
182    #[must_use]
183    pub fn with_read_only(mut self, read_only: bool) -> Self {
184        self.read_only = read_only;
185        self
186    }
187
188    /// Returns the viewing epoch for this planner.
189    #[must_use]
190    pub fn viewing_epoch(&self) -> EpochId {
191        self.viewing_epoch
192    }
193
194    /// Returns the transaction ID for this planner, if any.
195    #[must_use]
196    pub fn transaction_id(&self) -> Option<TransactionId> {
197        self.transaction_id
198    }
199
200    /// Returns a reference to the transaction manager, if available.
201    #[must_use]
202    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
203        self.transaction_manager.as_ref()
204    }
205
206    /// Enables or disables factorized execution for multi-hop queries.
207    #[must_use]
208    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
209        self.factorized_execution = enabled;
210        self
211    }
212
213    /// Sets the constraint validator for schema enforcement during mutations.
214    #[must_use]
215    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
216        self.validator = Some(validator);
217        self
218    }
219
220    /// Sets the catalog for user-defined procedure lookup.
221    #[must_use]
222    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
223        self.catalog = Some(catalog);
224        self
225    }
226
227    /// Sets the session context for introspection functions.
228    #[must_use]
229    pub fn with_session_context(
230        mut self,
231        context: grafeo_core::execution::operators::SessionContext,
232    ) -> Self {
233        self.session_context = context;
234        self
235    }
236
237    /// Counts consecutive single-hop expand operations.
238    ///
239    /// Returns the count and the deepest non-expand operator (the base of the chain).
240    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
241        match op {
242            LogicalOperator::Expand(expand) => {
243                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
244
245                if is_single_hop {
246                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
247                    (inner_count + 1, base)
248                } else {
249                    (0, op)
250                }
251            }
252            _ => (0, op),
253        }
254    }
255
256    /// Collects expand operations from the outermost down to the base.
257    ///
258    /// Returns expands in order from innermost (base) to outermost.
259    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
260        let mut chain = Vec::new();
261        let mut current = op;
262
263        while let LogicalOperator::Expand(expand) = current {
264            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
265            if !is_single_hop {
266                break;
267            }
268            chain.push(expand);
269            current = &expand.input;
270        }
271
272        chain.reverse();
273        chain
274    }
275
276    /// Plans a logical plan into a physical operator.
277    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
278        let _span = tracing::debug_span!("grafeo::query::plan").entered();
279        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
280        Ok(PhysicalPlan {
281            operator,
282            columns,
283            adaptive_context: None,
284        })
285    }
286
287    /// Plans a logical plan with profiling: each physical operator is wrapped
288    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
289    /// collect row counts and timing. Returns the physical plan together with
290    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
291    /// items in post-order (children before parents).
292    pub fn plan_profiled(
293        &self,
294        logical_plan: &LogicalPlan,
295    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
296        self.profiling.set(true);
297        self.profile_entries.borrow_mut().clear();
298
299        let result = self.plan_operator(&logical_plan.root);
300
301        self.profiling.set(false);
302        let (operator, columns) = result?;
303        let entries = self.profile_entries.borrow_mut().drain(..).collect();
304
305        Ok((
306            PhysicalPlan {
307                operator,
308                columns,
309                adaptive_context: None,
310            },
311            entries,
312        ))
313    }
314
315    /// Plans a logical plan with adaptive execution support.
316    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
317        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
318
319        let mut adaptive_context = AdaptiveContext::new();
320        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
321
322        Ok(PhysicalPlan {
323            operator,
324            columns,
325            adaptive_context: Some(adaptive_context),
326        })
327    }
328
329    /// Collects cardinality estimates from the logical plan into an adaptive context.
330    fn collect_cardinality_estimates(
331        &self,
332        op: &LogicalOperator,
333        ctx: &mut AdaptiveContext,
334        depth: usize,
335    ) {
336        match op {
337            LogicalOperator::NodeScan(scan) => {
338                let estimate = if let Some(label) = &scan.label {
339                    self.store.nodes_by_label(label).len() as f64
340                } else {
341                    self.store.node_count() as f64
342                };
343                let id = format!("scan_{}", scan.variable);
344                ctx.set_estimate(&id, estimate);
345
346                if let Some(input) = &scan.input {
347                    self.collect_cardinality_estimates(input, ctx, depth + 1);
348                }
349            }
350            LogicalOperator::Filter(filter) => {
351                let input_estimate = self.estimate_cardinality(&filter.input);
352                let estimate = input_estimate * 0.3;
353                let id = format!("filter_{depth}");
354                ctx.set_estimate(&id, estimate);
355
356                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
357            }
358            LogicalOperator::Expand(expand) => {
359                let input_estimate = self.estimate_cardinality(&expand.input);
360                let stats = self.store.statistics();
361                let avg_degree = self.estimate_expand_degree(&stats, expand);
362                let estimate = input_estimate * avg_degree;
363                let id = format!("expand_{}", expand.to_variable);
364                ctx.set_estimate(&id, estimate);
365
366                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
367            }
368            LogicalOperator::Join(join) => {
369                let left_est = self.estimate_cardinality(&join.left);
370                let right_est = self.estimate_cardinality(&join.right);
371                let estimate = (left_est * right_est).sqrt();
372                let id = format!("join_{depth}");
373                ctx.set_estimate(&id, estimate);
374
375                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
376                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
377            }
378            LogicalOperator::Aggregate(agg) => {
379                let input_estimate = self.estimate_cardinality(&agg.input);
380                let estimate = if agg.group_by.is_empty() {
381                    1.0
382                } else {
383                    (input_estimate * 0.1).max(1.0)
384                };
385                let id = format!("aggregate_{depth}");
386                ctx.set_estimate(&id, estimate);
387
388                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
389            }
390            LogicalOperator::Distinct(distinct) => {
391                let input_estimate = self.estimate_cardinality(&distinct.input);
392                let estimate = (input_estimate * 0.5).max(1.0);
393                let id = format!("distinct_{depth}");
394                ctx.set_estimate(&id, estimate);
395
396                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
397            }
398            LogicalOperator::Return(ret) => {
399                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
400            }
401            LogicalOperator::Limit(limit) => {
402                let input_estimate = self.estimate_cardinality(&limit.input);
403                let estimate = (input_estimate).min(limit.count.estimate());
404                let id = format!("limit_{depth}");
405                ctx.set_estimate(&id, estimate);
406
407                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
408            }
409            LogicalOperator::Skip(skip) => {
410                let input_estimate = self.estimate_cardinality(&skip.input);
411                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
412                let id = format!("skip_{depth}");
413                ctx.set_estimate(&id, estimate);
414
415                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
416            }
417            LogicalOperator::Sort(sort) => {
418                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
419            }
420            LogicalOperator::Union(union) => {
421                let estimate: f64 = union
422                    .inputs
423                    .iter()
424                    .map(|input| self.estimate_cardinality(input))
425                    .sum();
426                let id = format!("union_{depth}");
427                ctx.set_estimate(&id, estimate);
428
429                for input in &union.inputs {
430                    self.collect_cardinality_estimates(input, ctx, depth + 1);
431                }
432            }
433            _ => {
434                // For other operators, try to recurse into known input patterns
435            }
436        }
437    }
438
439    /// Estimates cardinality for a logical operator subtree.
440    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
441        match op {
442            LogicalOperator::NodeScan(scan) => {
443                if let Some(label) = &scan.label {
444                    self.store.nodes_by_label(label).len() as f64
445                } else {
446                    self.store.node_count() as f64
447                }
448            }
449            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
450            LogicalOperator::Expand(expand) => {
451                let stats = self.store.statistics();
452                let avg_degree = self.estimate_expand_degree(&stats, expand);
453                self.estimate_cardinality(&expand.input) * avg_degree
454            }
455            LogicalOperator::Join(join) => {
456                let left = self.estimate_cardinality(&join.left);
457                let right = self.estimate_cardinality(&join.right);
458                (left * right).sqrt()
459            }
460            LogicalOperator::Aggregate(agg) => {
461                if agg.group_by.is_empty() {
462                    1.0
463                } else {
464                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
465                }
466            }
467            LogicalOperator::Distinct(distinct) => {
468                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
469            }
470            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
471            LogicalOperator::Limit(limit) => self
472                .estimate_cardinality(&limit.input)
473                .min(limit.count.estimate()),
474            LogicalOperator::Skip(skip) => {
475                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
476            }
477            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
478            LogicalOperator::Union(union) => union
479                .inputs
480                .iter()
481                .map(|input| self.estimate_cardinality(input))
482                .sum(),
483            LogicalOperator::Except(except) => {
484                let left = self.estimate_cardinality(&except.left);
485                let right = self.estimate_cardinality(&except.right);
486                (left - right).max(0.0)
487            }
488            LogicalOperator::Intersect(intersect) => {
489                let left = self.estimate_cardinality(&intersect.left);
490                let right = self.estimate_cardinality(&intersect.right);
491                left.min(right)
492            }
493            LogicalOperator::Otherwise(otherwise) => self
494                .estimate_cardinality(&otherwise.left)
495                .max(self.estimate_cardinality(&otherwise.right)),
496            _ => 1000.0,
497        }
498    }
499
500    /// Estimates the average edge degree for an expand operation using store statistics.
501    fn estimate_expand_degree(
502        &self,
503        stats: &grafeo_core::statistics::Statistics,
504        expand: &ExpandOp,
505    ) -> f64 {
506        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
507        if expand.edge_types.len() == 1 {
508            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
509        } else if stats.total_nodes > 0 {
510            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
511        } else {
512            10.0
513        }
514    }
515
516    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
517    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
518    fn maybe_profile(
519        &self,
520        result: Result<(Box<dyn Operator>, Vec<String>)>,
521        op: &LogicalOperator,
522    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
523        if self.profiling.get() {
524            let (physical, columns) = result?;
525            let (entry, stats) =
526                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
527            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
528            self.profile_entries.borrow_mut().push(entry);
529            Ok((Box::new(profiled), columns))
530        } else {
531            result
532        }
533    }
534
535    /// Plans a single logical operator.
536    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
537        let result = match op {
538            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
539            LogicalOperator::Expand(expand) => {
540                if self.factorized_execution {
541                    let (chain_len, _base) = Self::count_expand_chain(op);
542                    if chain_len >= 2 {
543                        return self.maybe_profile(self.plan_expand_chain(op), op);
544                    }
545                }
546                self.plan_expand(expand)
547            }
548            LogicalOperator::Return(ret) => self.plan_return(ret),
549            LogicalOperator::Filter(filter) => self.plan_filter(filter),
550            LogicalOperator::Project(project) => self.plan_project(project),
551            LogicalOperator::Limit(limit) => self.plan_limit(limit),
552            LogicalOperator::Skip(skip) => self.plan_skip(skip),
553            LogicalOperator::Sort(sort) => self.plan_sort(sort),
554            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
555            LogicalOperator::Join(join) => self.plan_join(join),
556            LogicalOperator::Union(union) => self.plan_union(union),
557            LogicalOperator::Except(except) => self.plan_except(except),
558            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
559            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
560            LogicalOperator::Apply(apply) => self.plan_apply(apply),
561            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
562            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
563            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
564            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
565            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
566            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
567            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
568            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
569            LogicalOperator::Merge(merge) => self.plan_merge(merge),
570            LogicalOperator::MergeRelationship(merge_rel) => {
571                self.plan_merge_relationship(merge_rel)
572            }
573            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
574            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
575            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
576            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
577            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
578            #[cfg(feature = "algos")]
579            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
580            #[cfg(not(feature = "algos"))]
581            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
582                "CALL procedures require the 'algos' feature".to_string(),
583            )),
584            LogicalOperator::ParameterScan(_param_scan) => {
585                let state = self
586                    .correlated_param_state
587                    .borrow()
588                    .clone()
589                    .ok_or_else(|| {
590                        Error::Internal(
591                            "ParameterScan without correlated Apply context".to_string(),
592                        )
593                    })?;
594                // Use the actual column names from the ParameterState (which may
595                // have been expanded from "*" to real variable names in plan_apply)
596                let columns = state.columns.clone();
597                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
598                Ok((operator, columns))
599            }
600            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
601            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
602            LogicalOperator::LoadData(load) => {
603                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
604                    load.path.clone(),
605                    load.format,
606                    load.with_headers,
607                    load.field_terminator,
608                    load.variable.clone(),
609                ));
610                Ok((operator, vec![load.variable.clone()]))
611            }
612            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
613            LogicalOperator::VectorScan(_) => Err(Error::Internal(
614                "VectorScan requires vector-index feature".to_string(),
615            )),
616            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
617                "VectorJoin requires vector-index feature".to_string(),
618            )),
619            _ => Err(Error::Internal(format!(
620                "Unsupported operator: {:?}",
621                std::mem::discriminant(op)
622            ))),
623        };
624        self.maybe_profile(result, op)
625    }
626
627    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
628    fn plan_horizontal_aggregate(
629        &self,
630        ha: &HorizontalAggregateOp,
631    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
632        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
633
634        let list_col_idx = child_columns
635            .iter()
636            .position(|c| c == &ha.list_column)
637            .ok_or_else(|| {
638                Error::Internal(format!(
639                    "HorizontalAggregate list column '{}' not found in {:?}",
640                    ha.list_column, child_columns
641                ))
642            })?;
643
644        let entity_kind = match ha.entity_kind {
645            LogicalEntityKind::Edge => EntityKind::Edge,
646            LogicalEntityKind::Node => EntityKind::Node,
647        };
648
649        let function = convert_aggregate_function(ha.function);
650        let input_column_count = child_columns.len();
651
652        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
653            child_op,
654            list_col_idx,
655            entity_kind,
656            function,
657            ha.property.clone(),
658            Arc::clone(&self.store) as Arc<dyn GraphStore>,
659            input_column_count,
660        ));
661
662        let mut columns = child_columns;
663        columns.push(ha.alias.clone());
664        // Mark the result as a scalar column
665        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
666
667        Ok((operator, columns))
668    }
669
670    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
671    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
672        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
673        let key_idx = child_columns
674            .iter()
675            .position(|c| c == &mc.key_var)
676            .ok_or_else(|| {
677                Error::Internal(format!(
678                    "MapCollect key '{}' not in columns {:?}",
679                    mc.key_var, child_columns
680                ))
681            })?;
682        let value_idx = child_columns
683            .iter()
684            .position(|c| c == &mc.value_var)
685            .ok_or_else(|| {
686                Error::Internal(format!(
687                    "MapCollect value '{}' not in columns {:?}",
688                    mc.value_var, child_columns
689                ))
690            })?;
691        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
692        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
693        Ok((operator, vec![mc.alias.clone()]))
694    }
695}
696
697/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
698#[cfg(feature = "algos")]
699struct StaticResultOperator {
700    rows: Vec<Vec<Value>>,
701    column_indices: Vec<usize>,
702    row_index: usize,
703}
704
705#[cfg(feature = "algos")]
706impl Operator for StaticResultOperator {
707    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
708        use grafeo_core::execution::DataChunk;
709
710        if self.row_index >= self.rows.len() {
711            return Ok(None);
712        }
713
714        let remaining = self.rows.len() - self.row_index;
715        let chunk_rows = remaining.min(1024);
716        let col_count = self.column_indices.len();
717
718        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
719        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
720
721        for row_offset in 0..chunk_rows {
722            let row = &self.rows[self.row_index + row_offset];
723            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
724                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
725                if let Some(col) = chunk.column_mut(col_idx) {
726                    col.push_value(value);
727                }
728            }
729        }
730        chunk.set_count(chunk_rows);
731
732        self.row_index += chunk_rows;
733        Ok(Some(chunk))
734    }
735
736    fn reset(&mut self) {
737        self.row_index = 0;
738    }
739
740    fn name(&self) -> &'static str {
741        "StaticResult"
742    }
743}
744
745#[cfg(test)]
746mod tests {
747    use super::*;
748    use crate::query::plan::{
749        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
750        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
751        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
752        SkipOp as LogicalSkipOp, SortKey, SortOp,
753    };
754    use grafeo_common::types::Value;
755    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
756    use grafeo_core::graph::GraphStoreMut;
757    use grafeo_core::graph::lpg::LpgStore;
758
759    fn create_test_store() -> Arc<dyn GraphStoreMut> {
760        let store = Arc::new(LpgStore::new().unwrap());
761        store.create_node(&["Person"]);
762        store.create_node(&["Person"]);
763        store.create_node(&["Company"]);
764        store
765    }
766
767    // ==================== Simple Scan Tests ====================
768
769    #[test]
770    fn test_plan_simple_scan() {
771        let store = create_test_store();
772        let planner = Planner::new(store);
773
774        // MATCH (n:Person) RETURN n
775        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
776            items: vec![ReturnItem {
777                expression: LogicalExpression::Variable("n".to_string()),
778                alias: None,
779            }],
780            distinct: false,
781            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
782                variable: "n".to_string(),
783                label: Some("Person".to_string()),
784                input: None,
785            })),
786        }));
787
788        let physical = planner.plan(&logical).unwrap();
789        assert_eq!(physical.columns(), &["n"]);
790    }
791
792    #[test]
793    fn test_plan_scan_without_label() {
794        let store = create_test_store();
795        let planner = Planner::new(store);
796
797        // MATCH (n) RETURN n
798        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
799            items: vec![ReturnItem {
800                expression: LogicalExpression::Variable("n".to_string()),
801                alias: None,
802            }],
803            distinct: false,
804            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
805                variable: "n".to_string(),
806                label: None,
807                input: None,
808            })),
809        }));
810
811        let physical = planner.plan(&logical).unwrap();
812        assert_eq!(physical.columns(), &["n"]);
813    }
814
815    #[test]
816    fn test_plan_return_with_alias() {
817        let store = create_test_store();
818        let planner = Planner::new(store);
819
820        // MATCH (n:Person) RETURN n AS person
821        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
822            items: vec![ReturnItem {
823                expression: LogicalExpression::Variable("n".to_string()),
824                alias: Some("person".to_string()),
825            }],
826            distinct: false,
827            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
828                variable: "n".to_string(),
829                label: Some("Person".to_string()),
830                input: None,
831            })),
832        }));
833
834        let physical = planner.plan(&logical).unwrap();
835        assert_eq!(physical.columns(), &["person"]);
836    }
837
838    #[test]
839    fn test_plan_return_property() {
840        let store = create_test_store();
841        let planner = Planner::new(store);
842
843        // MATCH (n:Person) RETURN n.name
844        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
845            items: vec![ReturnItem {
846                expression: LogicalExpression::Property {
847                    variable: "n".to_string(),
848                    property: "name".to_string(),
849                },
850                alias: None,
851            }],
852            distinct: false,
853            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
854                variable: "n".to_string(),
855                label: Some("Person".to_string()),
856                input: None,
857            })),
858        }));
859
860        let physical = planner.plan(&logical).unwrap();
861        assert_eq!(physical.columns(), &["n.name"]);
862    }
863
864    #[test]
865    fn test_plan_return_literal() {
866        let store = create_test_store();
867        let planner = Planner::new(store);
868
869        // MATCH (n) RETURN 42 AS answer
870        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
871            items: vec![ReturnItem {
872                expression: LogicalExpression::Literal(Value::Int64(42)),
873                alias: Some("answer".to_string()),
874            }],
875            distinct: false,
876            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
877                variable: "n".to_string(),
878                label: None,
879                input: None,
880            })),
881        }));
882
883        let physical = planner.plan(&logical).unwrap();
884        assert_eq!(physical.columns(), &["answer"]);
885    }
886
887    // ==================== Filter Tests ====================
888
889    #[test]
890    fn test_plan_filter_equality() {
891        let store = create_test_store();
892        let planner = Planner::new(store);
893
894        // MATCH (n:Person) WHERE n.age = 30 RETURN n
895        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
896            items: vec![ReturnItem {
897                expression: LogicalExpression::Variable("n".to_string()),
898                alias: None,
899            }],
900            distinct: false,
901            input: Box::new(LogicalOperator::Filter(FilterOp {
902                predicate: LogicalExpression::Binary {
903                    left: Box::new(LogicalExpression::Property {
904                        variable: "n".to_string(),
905                        property: "age".to_string(),
906                    }),
907                    op: BinaryOp::Eq,
908                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
909                },
910                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
911                    variable: "n".to_string(),
912                    label: Some("Person".to_string()),
913                    input: None,
914                })),
915                pushdown_hint: None,
916            })),
917        }));
918
919        let physical = planner.plan(&logical).unwrap();
920        assert_eq!(physical.columns(), &["n"]);
921    }
922
923    #[test]
924    fn test_plan_filter_compound_and() {
925        let store = create_test_store();
926        let planner = Planner::new(store);
927
928        // WHERE n.age > 20 AND n.age < 40
929        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
930            items: vec![ReturnItem {
931                expression: LogicalExpression::Variable("n".to_string()),
932                alias: None,
933            }],
934            distinct: false,
935            input: Box::new(LogicalOperator::Filter(FilterOp {
936                predicate: LogicalExpression::Binary {
937                    left: Box::new(LogicalExpression::Binary {
938                        left: Box::new(LogicalExpression::Property {
939                            variable: "n".to_string(),
940                            property: "age".to_string(),
941                        }),
942                        op: BinaryOp::Gt,
943                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
944                    }),
945                    op: BinaryOp::And,
946                    right: Box::new(LogicalExpression::Binary {
947                        left: Box::new(LogicalExpression::Property {
948                            variable: "n".to_string(),
949                            property: "age".to_string(),
950                        }),
951                        op: BinaryOp::Lt,
952                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
953                    }),
954                },
955                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
956                    variable: "n".to_string(),
957                    label: None,
958                    input: None,
959                })),
960                pushdown_hint: None,
961            })),
962        }));
963
964        let physical = planner.plan(&logical).unwrap();
965        assert_eq!(physical.columns(), &["n"]);
966    }
967
968    #[test]
969    fn test_plan_filter_unary_not() {
970        let store = create_test_store();
971        let planner = Planner::new(store);
972
973        // WHERE NOT n.active
974        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
975            items: vec![ReturnItem {
976                expression: LogicalExpression::Variable("n".to_string()),
977                alias: None,
978            }],
979            distinct: false,
980            input: Box::new(LogicalOperator::Filter(FilterOp {
981                predicate: LogicalExpression::Unary {
982                    op: UnaryOp::Not,
983                    operand: Box::new(LogicalExpression::Property {
984                        variable: "n".to_string(),
985                        property: "active".to_string(),
986                    }),
987                },
988                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
989                    variable: "n".to_string(),
990                    label: None,
991                    input: None,
992                })),
993                pushdown_hint: None,
994            })),
995        }));
996
997        let physical = planner.plan(&logical).unwrap();
998        assert_eq!(physical.columns(), &["n"]);
999    }
1000
1001    #[test]
1002    fn test_plan_filter_is_null() {
1003        let store = create_test_store();
1004        let planner = Planner::new(store);
1005
1006        // WHERE n.email IS NULL
1007        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1008            items: vec![ReturnItem {
1009                expression: LogicalExpression::Variable("n".to_string()),
1010                alias: None,
1011            }],
1012            distinct: false,
1013            input: Box::new(LogicalOperator::Filter(FilterOp {
1014                predicate: LogicalExpression::Unary {
1015                    op: UnaryOp::IsNull,
1016                    operand: Box::new(LogicalExpression::Property {
1017                        variable: "n".to_string(),
1018                        property: "email".to_string(),
1019                    }),
1020                },
1021                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1022                    variable: "n".to_string(),
1023                    label: None,
1024                    input: None,
1025                })),
1026                pushdown_hint: None,
1027            })),
1028        }));
1029
1030        let physical = planner.plan(&logical).unwrap();
1031        assert_eq!(physical.columns(), &["n"]);
1032    }
1033
1034    #[test]
1035    fn test_plan_filter_function_call() {
1036        let store = create_test_store();
1037        let planner = Planner::new(store);
1038
1039        // WHERE size(n.friends) > 0
1040        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1041            items: vec![ReturnItem {
1042                expression: LogicalExpression::Variable("n".to_string()),
1043                alias: None,
1044            }],
1045            distinct: false,
1046            input: Box::new(LogicalOperator::Filter(FilterOp {
1047                predicate: LogicalExpression::Binary {
1048                    left: Box::new(LogicalExpression::FunctionCall {
1049                        name: "size".to_string(),
1050                        args: vec![LogicalExpression::Property {
1051                            variable: "n".to_string(),
1052                            property: "friends".to_string(),
1053                        }],
1054                        distinct: false,
1055                    }),
1056                    op: BinaryOp::Gt,
1057                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1058                },
1059                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1060                    variable: "n".to_string(),
1061                    label: None,
1062                    input: None,
1063                })),
1064                pushdown_hint: None,
1065            })),
1066        }));
1067
1068        let physical = planner.plan(&logical).unwrap();
1069        assert_eq!(physical.columns(), &["n"]);
1070    }
1071
1072    // ==================== Expand Tests ====================
1073
1074    #[test]
1075    fn test_plan_expand_outgoing() {
1076        let store = create_test_store();
1077        let planner = Planner::new(store);
1078
1079        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1080        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1081            items: vec![
1082                ReturnItem {
1083                    expression: LogicalExpression::Variable("a".to_string()),
1084                    alias: None,
1085                },
1086                ReturnItem {
1087                    expression: LogicalExpression::Variable("b".to_string()),
1088                    alias: None,
1089                },
1090            ],
1091            distinct: false,
1092            input: Box::new(LogicalOperator::Expand(ExpandOp {
1093                from_variable: "a".to_string(),
1094                to_variable: "b".to_string(),
1095                edge_variable: None,
1096                direction: ExpandDirection::Outgoing,
1097                edge_types: vec!["KNOWS".to_string()],
1098                min_hops: 1,
1099                max_hops: Some(1),
1100                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1101                    variable: "a".to_string(),
1102                    label: Some("Person".to_string()),
1103                    input: None,
1104                })),
1105                path_alias: None,
1106                path_mode: PathMode::Walk,
1107            })),
1108        }));
1109
1110        let physical = planner.plan(&logical).unwrap();
1111        // The return should have columns [a, b]
1112        assert!(physical.columns().contains(&"a".to_string()));
1113        assert!(physical.columns().contains(&"b".to_string()));
1114    }
1115
1116    #[test]
1117    fn test_plan_expand_with_edge_variable() {
1118        let store = create_test_store();
1119        let planner = Planner::new(store);
1120
1121        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1122        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1123            items: vec![
1124                ReturnItem {
1125                    expression: LogicalExpression::Variable("a".to_string()),
1126                    alias: None,
1127                },
1128                ReturnItem {
1129                    expression: LogicalExpression::Variable("r".to_string()),
1130                    alias: None,
1131                },
1132                ReturnItem {
1133                    expression: LogicalExpression::Variable("b".to_string()),
1134                    alias: None,
1135                },
1136            ],
1137            distinct: false,
1138            input: Box::new(LogicalOperator::Expand(ExpandOp {
1139                from_variable: "a".to_string(),
1140                to_variable: "b".to_string(),
1141                edge_variable: Some("r".to_string()),
1142                direction: ExpandDirection::Outgoing,
1143                edge_types: vec!["KNOWS".to_string()],
1144                min_hops: 1,
1145                max_hops: Some(1),
1146                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1147                    variable: "a".to_string(),
1148                    label: None,
1149                    input: None,
1150                })),
1151                path_alias: None,
1152                path_mode: PathMode::Walk,
1153            })),
1154        }));
1155
1156        let physical = planner.plan(&logical).unwrap();
1157        assert!(physical.columns().contains(&"a".to_string()));
1158        assert!(physical.columns().contains(&"r".to_string()));
1159        assert!(physical.columns().contains(&"b".to_string()));
1160    }
1161
1162    // ==================== Limit/Skip/Sort Tests ====================
1163
1164    #[test]
1165    fn test_plan_limit() {
1166        let store = create_test_store();
1167        let planner = Planner::new(store);
1168
1169        // MATCH (n) RETURN n LIMIT 10
1170        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1171            items: vec![ReturnItem {
1172                expression: LogicalExpression::Variable("n".to_string()),
1173                alias: None,
1174            }],
1175            distinct: false,
1176            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1177                count: 10.into(),
1178                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1179                    variable: "n".to_string(),
1180                    label: None,
1181                    input: None,
1182                })),
1183            })),
1184        }));
1185
1186        let physical = planner.plan(&logical).unwrap();
1187        assert_eq!(physical.columns(), &["n"]);
1188    }
1189
1190    #[test]
1191    fn test_plan_skip() {
1192        let store = create_test_store();
1193        let planner = Planner::new(store);
1194
1195        // MATCH (n) RETURN n SKIP 5
1196        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1197            items: vec![ReturnItem {
1198                expression: LogicalExpression::Variable("n".to_string()),
1199                alias: None,
1200            }],
1201            distinct: false,
1202            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1203                count: 5.into(),
1204                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1205                    variable: "n".to_string(),
1206                    label: None,
1207                    input: None,
1208                })),
1209            })),
1210        }));
1211
1212        let physical = planner.plan(&logical).unwrap();
1213        assert_eq!(physical.columns(), &["n"]);
1214    }
1215
1216    #[test]
1217    fn test_plan_sort() {
1218        let store = create_test_store();
1219        let planner = Planner::new(store);
1220
1221        // MATCH (n) RETURN n ORDER BY n.name ASC
1222        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1223            items: vec![ReturnItem {
1224                expression: LogicalExpression::Variable("n".to_string()),
1225                alias: None,
1226            }],
1227            distinct: false,
1228            input: Box::new(LogicalOperator::Sort(SortOp {
1229                keys: vec![SortKey {
1230                    expression: LogicalExpression::Variable("n".to_string()),
1231                    order: SortOrder::Ascending,
1232                    nulls: None,
1233                }],
1234                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1235                    variable: "n".to_string(),
1236                    label: None,
1237                    input: None,
1238                })),
1239            })),
1240        }));
1241
1242        let physical = planner.plan(&logical).unwrap();
1243        assert_eq!(physical.columns(), &["n"]);
1244    }
1245
1246    #[test]
1247    fn test_plan_sort_descending() {
1248        let store = create_test_store();
1249        let planner = Planner::new(store);
1250
1251        // ORDER BY n DESC
1252        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1253            items: vec![ReturnItem {
1254                expression: LogicalExpression::Variable("n".to_string()),
1255                alias: None,
1256            }],
1257            distinct: false,
1258            input: Box::new(LogicalOperator::Sort(SortOp {
1259                keys: vec![SortKey {
1260                    expression: LogicalExpression::Variable("n".to_string()),
1261                    order: SortOrder::Descending,
1262                    nulls: None,
1263                }],
1264                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1265                    variable: "n".to_string(),
1266                    label: None,
1267                    input: None,
1268                })),
1269            })),
1270        }));
1271
1272        let physical = planner.plan(&logical).unwrap();
1273        assert_eq!(physical.columns(), &["n"]);
1274    }
1275
1276    #[test]
1277    fn test_plan_distinct() {
1278        let store = create_test_store();
1279        let planner = Planner::new(store);
1280
1281        // MATCH (n) RETURN DISTINCT n
1282        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1283            items: vec![ReturnItem {
1284                expression: LogicalExpression::Variable("n".to_string()),
1285                alias: None,
1286            }],
1287            distinct: false,
1288            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1289                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1290                    variable: "n".to_string(),
1291                    label: None,
1292                    input: None,
1293                })),
1294                columns: None,
1295            })),
1296        }));
1297
1298        let physical = planner.plan(&logical).unwrap();
1299        assert_eq!(physical.columns(), &["n"]);
1300    }
1301
1302    #[test]
1303    fn test_plan_distinct_with_columns() {
1304        let store = create_test_store();
1305        let planner = Planner::new(store);
1306
1307        // DISTINCT on specific columns (column-specific dedup)
1308        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1309            items: vec![ReturnItem {
1310                expression: LogicalExpression::Variable("n".to_string()),
1311                alias: None,
1312            }],
1313            distinct: false,
1314            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1315                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1316                    variable: "n".to_string(),
1317                    label: None,
1318                    input: None,
1319                })),
1320                columns: Some(vec!["n".to_string()]),
1321            })),
1322        }));
1323
1324        let physical = planner.plan(&logical).unwrap();
1325        assert_eq!(physical.columns(), &["n"]);
1326    }
1327
1328    #[test]
1329    fn test_plan_distinct_with_nonexistent_columns() {
1330        let store = create_test_store();
1331        let planner = Planner::new(store);
1332
1333        // When distinct columns don't match any output columns,
1334        // it falls back to full-row distinct.
1335        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1336            items: vec![ReturnItem {
1337                expression: LogicalExpression::Variable("n".to_string()),
1338                alias: None,
1339            }],
1340            distinct: false,
1341            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1342                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1343                    variable: "n".to_string(),
1344                    label: None,
1345                    input: None,
1346                })),
1347                columns: Some(vec!["nonexistent".to_string()]),
1348            })),
1349        }));
1350
1351        let physical = planner.plan(&logical).unwrap();
1352        assert_eq!(physical.columns(), &["n"]);
1353    }
1354
1355    // ==================== Aggregate Tests ====================
1356
1357    #[test]
1358    fn test_plan_aggregate_count() {
1359        let store = create_test_store();
1360        let planner = Planner::new(store);
1361
1362        // MATCH (n) RETURN count(n)
1363        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1364            items: vec![ReturnItem {
1365                expression: LogicalExpression::Variable("cnt".to_string()),
1366                alias: None,
1367            }],
1368            distinct: false,
1369            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1370                group_by: vec![],
1371                aggregates: vec![LogicalAggregateExpr {
1372                    function: LogicalAggregateFunction::Count,
1373                    expression: Some(LogicalExpression::Variable("n".to_string())),
1374                    expression2: None,
1375                    distinct: false,
1376                    alias: Some("cnt".to_string()),
1377                    percentile: None,
1378                    separator: None,
1379                }],
1380                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1381                    variable: "n".to_string(),
1382                    label: None,
1383                    input: None,
1384                })),
1385                having: None,
1386            })),
1387        }));
1388
1389        let physical = planner.plan(&logical).unwrap();
1390        assert!(physical.columns().contains(&"cnt".to_string()));
1391    }
1392
1393    #[test]
1394    fn test_plan_aggregate_with_group_by() {
1395        let store = create_test_store();
1396        let planner = Planner::new(store);
1397
1398        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1399        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1400            group_by: vec![LogicalExpression::Property {
1401                variable: "n".to_string(),
1402                property: "city".to_string(),
1403            }],
1404            aggregates: vec![LogicalAggregateExpr {
1405                function: LogicalAggregateFunction::Count,
1406                expression: Some(LogicalExpression::Variable("n".to_string())),
1407                expression2: None,
1408                distinct: false,
1409                alias: Some("cnt".to_string()),
1410                percentile: None,
1411                separator: None,
1412            }],
1413            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1414                variable: "n".to_string(),
1415                label: Some("Person".to_string()),
1416                input: None,
1417            })),
1418            having: None,
1419        }));
1420
1421        let physical = planner.plan(&logical).unwrap();
1422        assert_eq!(physical.columns().len(), 2);
1423    }
1424
1425    #[test]
1426    fn test_plan_aggregate_sum() {
1427        let store = create_test_store();
1428        let planner = Planner::new(store);
1429
1430        // SUM(n.value)
1431        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1432            group_by: vec![],
1433            aggregates: vec![LogicalAggregateExpr {
1434                function: LogicalAggregateFunction::Sum,
1435                expression: Some(LogicalExpression::Property {
1436                    variable: "n".to_string(),
1437                    property: "value".to_string(),
1438                }),
1439                expression2: None,
1440                distinct: false,
1441                alias: Some("total".to_string()),
1442                percentile: None,
1443                separator: None,
1444            }],
1445            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1446                variable: "n".to_string(),
1447                label: None,
1448                input: None,
1449            })),
1450            having: None,
1451        }));
1452
1453        let physical = planner.plan(&logical).unwrap();
1454        assert!(physical.columns().contains(&"total".to_string()));
1455    }
1456
1457    #[test]
1458    fn test_plan_aggregate_avg() {
1459        let store = create_test_store();
1460        let planner = Planner::new(store);
1461
1462        // AVG(n.score)
1463        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1464            group_by: vec![],
1465            aggregates: vec![LogicalAggregateExpr {
1466                function: LogicalAggregateFunction::Avg,
1467                expression: Some(LogicalExpression::Property {
1468                    variable: "n".to_string(),
1469                    property: "score".to_string(),
1470                }),
1471                expression2: None,
1472                distinct: false,
1473                alias: Some("average".to_string()),
1474                percentile: None,
1475                separator: None,
1476            }],
1477            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1478                variable: "n".to_string(),
1479                label: None,
1480                input: None,
1481            })),
1482            having: None,
1483        }));
1484
1485        let physical = planner.plan(&logical).unwrap();
1486        assert!(physical.columns().contains(&"average".to_string()));
1487    }
1488
1489    #[test]
1490    fn test_plan_aggregate_min_max() {
1491        let store = create_test_store();
1492        let planner = Planner::new(store);
1493
1494        // MIN(n.age), MAX(n.age)
1495        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1496            group_by: vec![],
1497            aggregates: vec![
1498                LogicalAggregateExpr {
1499                    function: LogicalAggregateFunction::Min,
1500                    expression: Some(LogicalExpression::Property {
1501                        variable: "n".to_string(),
1502                        property: "age".to_string(),
1503                    }),
1504                    expression2: None,
1505                    distinct: false,
1506                    alias: Some("youngest".to_string()),
1507                    percentile: None,
1508                    separator: None,
1509                },
1510                LogicalAggregateExpr {
1511                    function: LogicalAggregateFunction::Max,
1512                    expression: Some(LogicalExpression::Property {
1513                        variable: "n".to_string(),
1514                        property: "age".to_string(),
1515                    }),
1516                    expression2: None,
1517                    distinct: false,
1518                    alias: Some("oldest".to_string()),
1519                    percentile: None,
1520                    separator: None,
1521                },
1522            ],
1523            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1524                variable: "n".to_string(),
1525                label: None,
1526                input: None,
1527            })),
1528            having: None,
1529        }));
1530
1531        let physical = planner.plan(&logical).unwrap();
1532        assert!(physical.columns().contains(&"youngest".to_string()));
1533        assert!(physical.columns().contains(&"oldest".to_string()));
1534    }
1535
1536    // ==================== Join Tests ====================
1537
1538    #[test]
1539    fn test_plan_inner_join() {
1540        let store = create_test_store();
1541        let planner = Planner::new(store);
1542
1543        // Inner join between two scans
1544        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1545            items: vec![
1546                ReturnItem {
1547                    expression: LogicalExpression::Variable("a".to_string()),
1548                    alias: None,
1549                },
1550                ReturnItem {
1551                    expression: LogicalExpression::Variable("b".to_string()),
1552                    alias: None,
1553                },
1554            ],
1555            distinct: false,
1556            input: Box::new(LogicalOperator::Join(JoinOp {
1557                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1558                    variable: "a".to_string(),
1559                    label: Some("Person".to_string()),
1560                    input: None,
1561                })),
1562                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1563                    variable: "b".to_string(),
1564                    label: Some("Company".to_string()),
1565                    input: None,
1566                })),
1567                join_type: JoinType::Inner,
1568                conditions: vec![JoinCondition {
1569                    left: LogicalExpression::Variable("a".to_string()),
1570                    right: LogicalExpression::Variable("b".to_string()),
1571                }],
1572            })),
1573        }));
1574
1575        let physical = planner.plan(&logical).unwrap();
1576        assert!(physical.columns().contains(&"a".to_string()));
1577        assert!(physical.columns().contains(&"b".to_string()));
1578    }
1579
1580    #[test]
1581    fn test_plan_cross_join() {
1582        let store = create_test_store();
1583        let planner = Planner::new(store);
1584
1585        // Cross join (no conditions)
1586        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1587            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1588                variable: "a".to_string(),
1589                label: None,
1590                input: None,
1591            })),
1592            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1593                variable: "b".to_string(),
1594                label: None,
1595                input: None,
1596            })),
1597            join_type: JoinType::Cross,
1598            conditions: vec![],
1599        }));
1600
1601        let physical = planner.plan(&logical).unwrap();
1602        assert_eq!(physical.columns().len(), 2);
1603    }
1604
1605    #[test]
1606    fn test_plan_left_join() {
1607        let store = create_test_store();
1608        let planner = Planner::new(store);
1609
1610        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1611            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1612                variable: "a".to_string(),
1613                label: None,
1614                input: None,
1615            })),
1616            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1617                variable: "b".to_string(),
1618                label: None,
1619                input: None,
1620            })),
1621            join_type: JoinType::Left,
1622            conditions: vec![],
1623        }));
1624
1625        let physical = planner.plan(&logical).unwrap();
1626        assert_eq!(physical.columns().len(), 2);
1627    }
1628
1629    // ==================== Mutation Tests ====================
1630
1631    #[test]
1632    fn test_plan_create_node() {
1633        let store = create_test_store();
1634        let planner = Planner::new(store);
1635
1636        // CREATE (n:Person {name: 'Alix'})
1637        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1638            variable: "n".to_string(),
1639            labels: vec!["Person".to_string()],
1640            properties: vec![(
1641                "name".to_string(),
1642                LogicalExpression::Literal(Value::String("Alix".into())),
1643            )],
1644            input: None,
1645        }));
1646
1647        let physical = planner.plan(&logical).unwrap();
1648        assert!(physical.columns().contains(&"n".to_string()));
1649    }
1650
1651    #[test]
1652    fn test_plan_create_edge() {
1653        let store = create_test_store();
1654        let planner = Planner::new(store);
1655
1656        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1657        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1658            variable: Some("r".to_string()),
1659            from_variable: "a".to_string(),
1660            to_variable: "b".to_string(),
1661            edge_type: "KNOWS".to_string(),
1662            properties: vec![],
1663            input: Box::new(LogicalOperator::Join(JoinOp {
1664                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1665                    variable: "a".to_string(),
1666                    label: None,
1667                    input: None,
1668                })),
1669                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1670                    variable: "b".to_string(),
1671                    label: None,
1672                    input: None,
1673                })),
1674                join_type: JoinType::Cross,
1675                conditions: vec![],
1676            })),
1677        }));
1678
1679        let physical = planner.plan(&logical).unwrap();
1680        assert!(physical.columns().contains(&"r".to_string()));
1681    }
1682
1683    #[test]
1684    fn test_plan_delete_node() {
1685        let store = create_test_store();
1686        let planner = Planner::new(store);
1687
1688        // MATCH (n) DELETE n
1689        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1690            variable: "n".to_string(),
1691            detach: false,
1692            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1693                variable: "n".to_string(),
1694                label: None,
1695                input: None,
1696            })),
1697        }));
1698
1699        let physical = planner.plan(&logical).unwrap();
1700        assert!(physical.columns().contains(&"n".to_string()));
1701    }
1702
1703    // ==================== Error Cases ====================
1704
1705    #[test]
1706    fn test_plan_empty_errors() {
1707        let store = create_test_store();
1708        let planner = Planner::new(store);
1709
1710        let logical = LogicalPlan::new(LogicalOperator::Empty);
1711        let result = planner.plan(&logical);
1712        assert!(result.is_err());
1713    }
1714
1715    #[test]
1716    fn test_plan_missing_variable_in_return() {
1717        let store = create_test_store();
1718        let planner = Planner::new(store);
1719
1720        // Return variable that doesn't exist in input
1721        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1722            items: vec![ReturnItem {
1723                expression: LogicalExpression::Variable("missing".to_string()),
1724                alias: None,
1725            }],
1726            distinct: false,
1727            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1728                variable: "n".to_string(),
1729                label: None,
1730                input: None,
1731            })),
1732        }));
1733
1734        let result = planner.plan(&logical);
1735        assert!(result.is_err());
1736    }
1737
1738    // ==================== Helper Function Tests ====================
1739
1740    #[test]
1741    fn test_convert_binary_ops() {
1742        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1743        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1744        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1745        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1746        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1747        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1748        assert!(convert_binary_op(BinaryOp::And).is_ok());
1749        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1750        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1751        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1752        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1753        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1754    }
1755
1756    #[test]
1757    fn test_convert_unary_ops() {
1758        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1759        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1760        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1761        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1762    }
1763
1764    #[test]
1765    fn test_convert_aggregate_functions() {
1766        assert!(matches!(
1767            convert_aggregate_function(LogicalAggregateFunction::Count),
1768            PhysicalAggregateFunction::Count
1769        ));
1770        assert!(matches!(
1771            convert_aggregate_function(LogicalAggregateFunction::Sum),
1772            PhysicalAggregateFunction::Sum
1773        ));
1774        assert!(matches!(
1775            convert_aggregate_function(LogicalAggregateFunction::Avg),
1776            PhysicalAggregateFunction::Avg
1777        ));
1778        assert!(matches!(
1779            convert_aggregate_function(LogicalAggregateFunction::Min),
1780            PhysicalAggregateFunction::Min
1781        ));
1782        assert!(matches!(
1783            convert_aggregate_function(LogicalAggregateFunction::Max),
1784            PhysicalAggregateFunction::Max
1785        ));
1786    }
1787
1788    #[test]
1789    fn test_planner_accessors() {
1790        let store = create_test_store();
1791        let planner = Planner::new(Arc::clone(&store));
1792
1793        assert!(planner.transaction_id().is_none());
1794        assert!(planner.transaction_manager().is_none());
1795        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1796    }
1797
1798    #[test]
1799    fn test_physical_plan_accessors() {
1800        let store = create_test_store();
1801        let planner = Planner::new(store);
1802
1803        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1804            variable: "n".to_string(),
1805            label: None,
1806            input: None,
1807        }));
1808
1809        let physical = planner.plan(&logical).unwrap();
1810        assert_eq!(physical.columns(), &["n"]);
1811
1812        // Test into_operator
1813        let _ = physical.into_operator();
1814    }
1815
1816    // ==================== Adaptive Planning Tests ====================
1817
1818    #[test]
1819    fn test_plan_adaptive_with_scan() {
1820        let store = create_test_store();
1821        let planner = Planner::new(store);
1822
1823        // MATCH (n:Person) RETURN n
1824        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1825            items: vec![ReturnItem {
1826                expression: LogicalExpression::Variable("n".to_string()),
1827                alias: None,
1828            }],
1829            distinct: false,
1830            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1831                variable: "n".to_string(),
1832                label: Some("Person".to_string()),
1833                input: None,
1834            })),
1835        }));
1836
1837        let physical = planner.plan_adaptive(&logical).unwrap();
1838        assert_eq!(physical.columns(), &["n"]);
1839        // Should have adaptive context with estimates
1840        assert!(physical.adaptive_context.is_some());
1841    }
1842
1843    #[test]
1844    fn test_plan_adaptive_with_filter() {
1845        let store = create_test_store();
1846        let planner = Planner::new(store);
1847
1848        // MATCH (n) WHERE n.age > 30 RETURN n
1849        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1850            items: vec![ReturnItem {
1851                expression: LogicalExpression::Variable("n".to_string()),
1852                alias: None,
1853            }],
1854            distinct: false,
1855            input: Box::new(LogicalOperator::Filter(FilterOp {
1856                predicate: LogicalExpression::Binary {
1857                    left: Box::new(LogicalExpression::Property {
1858                        variable: "n".to_string(),
1859                        property: "age".to_string(),
1860                    }),
1861                    op: BinaryOp::Gt,
1862                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1863                },
1864                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1865                    variable: "n".to_string(),
1866                    label: None,
1867                    input: None,
1868                })),
1869                pushdown_hint: None,
1870            })),
1871        }));
1872
1873        let physical = planner.plan_adaptive(&logical).unwrap();
1874        assert!(physical.adaptive_context.is_some());
1875    }
1876
1877    #[test]
1878    fn test_plan_adaptive_with_expand() {
1879        let store = create_test_store();
1880        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1881
1882        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1883        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1884            items: vec![
1885                ReturnItem {
1886                    expression: LogicalExpression::Variable("a".to_string()),
1887                    alias: None,
1888                },
1889                ReturnItem {
1890                    expression: LogicalExpression::Variable("b".to_string()),
1891                    alias: None,
1892                },
1893            ],
1894            distinct: false,
1895            input: Box::new(LogicalOperator::Expand(ExpandOp {
1896                from_variable: "a".to_string(),
1897                to_variable: "b".to_string(),
1898                edge_variable: None,
1899                direction: ExpandDirection::Outgoing,
1900                edge_types: vec!["KNOWS".to_string()],
1901                min_hops: 1,
1902                max_hops: Some(1),
1903                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1904                    variable: "a".to_string(),
1905                    label: None,
1906                    input: None,
1907                })),
1908                path_alias: None,
1909                path_mode: PathMode::Walk,
1910            })),
1911        }));
1912
1913        let physical = planner.plan_adaptive(&logical).unwrap();
1914        assert!(physical.adaptive_context.is_some());
1915    }
1916
1917    #[test]
1918    fn test_plan_adaptive_with_join() {
1919        let store = create_test_store();
1920        let planner = Planner::new(store);
1921
1922        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1923            items: vec![
1924                ReturnItem {
1925                    expression: LogicalExpression::Variable("a".to_string()),
1926                    alias: None,
1927                },
1928                ReturnItem {
1929                    expression: LogicalExpression::Variable("b".to_string()),
1930                    alias: None,
1931                },
1932            ],
1933            distinct: false,
1934            input: Box::new(LogicalOperator::Join(JoinOp {
1935                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1936                    variable: "a".to_string(),
1937                    label: None,
1938                    input: None,
1939                })),
1940                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1941                    variable: "b".to_string(),
1942                    label: None,
1943                    input: None,
1944                })),
1945                join_type: JoinType::Cross,
1946                conditions: vec![],
1947            })),
1948        }));
1949
1950        let physical = planner.plan_adaptive(&logical).unwrap();
1951        assert!(physical.adaptive_context.is_some());
1952    }
1953
1954    #[test]
1955    fn test_plan_adaptive_with_aggregate() {
1956        let store = create_test_store();
1957        let planner = Planner::new(store);
1958
1959        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1960            group_by: vec![],
1961            aggregates: vec![LogicalAggregateExpr {
1962                function: LogicalAggregateFunction::Count,
1963                expression: Some(LogicalExpression::Variable("n".to_string())),
1964                expression2: None,
1965                distinct: false,
1966                alias: Some("cnt".to_string()),
1967                percentile: None,
1968                separator: None,
1969            }],
1970            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1971                variable: "n".to_string(),
1972                label: None,
1973                input: None,
1974            })),
1975            having: None,
1976        }));
1977
1978        let physical = planner.plan_adaptive(&logical).unwrap();
1979        assert!(physical.adaptive_context.is_some());
1980    }
1981
1982    #[test]
1983    fn test_plan_adaptive_with_distinct() {
1984        let store = create_test_store();
1985        let planner = Planner::new(store);
1986
1987        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1988            items: vec![ReturnItem {
1989                expression: LogicalExpression::Variable("n".to_string()),
1990                alias: None,
1991            }],
1992            distinct: false,
1993            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1994                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1995                    variable: "n".to_string(),
1996                    label: None,
1997                    input: None,
1998                })),
1999                columns: None,
2000            })),
2001        }));
2002
2003        let physical = planner.plan_adaptive(&logical).unwrap();
2004        assert!(physical.adaptive_context.is_some());
2005    }
2006
2007    #[test]
2008    fn test_plan_adaptive_with_limit() {
2009        let store = create_test_store();
2010        let planner = Planner::new(store);
2011
2012        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2013            items: vec![ReturnItem {
2014                expression: LogicalExpression::Variable("n".to_string()),
2015                alias: None,
2016            }],
2017            distinct: false,
2018            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2019                count: 10.into(),
2020                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2021                    variable: "n".to_string(),
2022                    label: None,
2023                    input: None,
2024                })),
2025            })),
2026        }));
2027
2028        let physical = planner.plan_adaptive(&logical).unwrap();
2029        assert!(physical.adaptive_context.is_some());
2030    }
2031
2032    #[test]
2033    fn test_plan_adaptive_with_skip() {
2034        let store = create_test_store();
2035        let planner = Planner::new(store);
2036
2037        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2038            items: vec![ReturnItem {
2039                expression: LogicalExpression::Variable("n".to_string()),
2040                alias: None,
2041            }],
2042            distinct: false,
2043            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2044                count: 5.into(),
2045                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2046                    variable: "n".to_string(),
2047                    label: None,
2048                    input: None,
2049                })),
2050            })),
2051        }));
2052
2053        let physical = planner.plan_adaptive(&logical).unwrap();
2054        assert!(physical.adaptive_context.is_some());
2055    }
2056
2057    #[test]
2058    fn test_plan_adaptive_with_sort() {
2059        let store = create_test_store();
2060        let planner = Planner::new(store);
2061
2062        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2063            items: vec![ReturnItem {
2064                expression: LogicalExpression::Variable("n".to_string()),
2065                alias: None,
2066            }],
2067            distinct: false,
2068            input: Box::new(LogicalOperator::Sort(SortOp {
2069                keys: vec![SortKey {
2070                    expression: LogicalExpression::Variable("n".to_string()),
2071                    order: SortOrder::Ascending,
2072                    nulls: None,
2073                }],
2074                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2075                    variable: "n".to_string(),
2076                    label: None,
2077                    input: None,
2078                })),
2079            })),
2080        }));
2081
2082        let physical = planner.plan_adaptive(&logical).unwrap();
2083        assert!(physical.adaptive_context.is_some());
2084    }
2085
2086    #[test]
2087    fn test_plan_adaptive_with_union() {
2088        let store = create_test_store();
2089        let planner = Planner::new(store);
2090
2091        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2092            items: vec![ReturnItem {
2093                expression: LogicalExpression::Variable("n".to_string()),
2094                alias: None,
2095            }],
2096            distinct: false,
2097            input: Box::new(LogicalOperator::Union(UnionOp {
2098                inputs: vec![
2099                    LogicalOperator::NodeScan(NodeScanOp {
2100                        variable: "n".to_string(),
2101                        label: Some("Person".to_string()),
2102                        input: None,
2103                    }),
2104                    LogicalOperator::NodeScan(NodeScanOp {
2105                        variable: "n".to_string(),
2106                        label: Some("Company".to_string()),
2107                        input: None,
2108                    }),
2109                ],
2110            })),
2111        }));
2112
2113        let physical = planner.plan_adaptive(&logical).unwrap();
2114        assert!(physical.adaptive_context.is_some());
2115    }
2116
2117    // ==================== Variable Length Path Tests ====================
2118
2119    #[test]
2120    fn test_plan_expand_variable_length() {
2121        let store = create_test_store();
2122        let planner = Planner::new(store);
2123
2124        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2125        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2126            items: vec![
2127                ReturnItem {
2128                    expression: LogicalExpression::Variable("a".to_string()),
2129                    alias: None,
2130                },
2131                ReturnItem {
2132                    expression: LogicalExpression::Variable("b".to_string()),
2133                    alias: None,
2134                },
2135            ],
2136            distinct: false,
2137            input: Box::new(LogicalOperator::Expand(ExpandOp {
2138                from_variable: "a".to_string(),
2139                to_variable: "b".to_string(),
2140                edge_variable: None,
2141                direction: ExpandDirection::Outgoing,
2142                edge_types: vec!["KNOWS".to_string()],
2143                min_hops: 1,
2144                max_hops: Some(3),
2145                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2146                    variable: "a".to_string(),
2147                    label: None,
2148                    input: None,
2149                })),
2150                path_alias: None,
2151                path_mode: PathMode::Walk,
2152            })),
2153        }));
2154
2155        let physical = planner.plan(&logical).unwrap();
2156        assert!(physical.columns().contains(&"a".to_string()));
2157        assert!(physical.columns().contains(&"b".to_string()));
2158    }
2159
2160    #[test]
2161    fn test_plan_expand_with_path_alias() {
2162        let store = create_test_store();
2163        let planner = Planner::new(store);
2164
2165        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2166        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2167            items: vec![
2168                ReturnItem {
2169                    expression: LogicalExpression::Variable("a".to_string()),
2170                    alias: None,
2171                },
2172                ReturnItem {
2173                    expression: LogicalExpression::Variable("b".to_string()),
2174                    alias: None,
2175                },
2176            ],
2177            distinct: false,
2178            input: Box::new(LogicalOperator::Expand(ExpandOp {
2179                from_variable: "a".to_string(),
2180                to_variable: "b".to_string(),
2181                edge_variable: None,
2182                direction: ExpandDirection::Outgoing,
2183                edge_types: vec!["KNOWS".to_string()],
2184                min_hops: 1,
2185                max_hops: Some(3),
2186                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2187                    variable: "a".to_string(),
2188                    label: None,
2189                    input: None,
2190                })),
2191                path_alias: Some("p".to_string()),
2192                path_mode: PathMode::Walk,
2193            })),
2194        }));
2195
2196        let physical = planner.plan(&logical).unwrap();
2197        // Verify plan was created successfully with expected output columns
2198        assert!(physical.columns().contains(&"a".to_string()));
2199        assert!(physical.columns().contains(&"b".to_string()));
2200    }
2201
2202    #[test]
2203    fn test_plan_expand_incoming() {
2204        let store = create_test_store();
2205        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2206
2207        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2208        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2209            items: vec![
2210                ReturnItem {
2211                    expression: LogicalExpression::Variable("a".to_string()),
2212                    alias: None,
2213                },
2214                ReturnItem {
2215                    expression: LogicalExpression::Variable("b".to_string()),
2216                    alias: None,
2217                },
2218            ],
2219            distinct: false,
2220            input: Box::new(LogicalOperator::Expand(ExpandOp {
2221                from_variable: "a".to_string(),
2222                to_variable: "b".to_string(),
2223                edge_variable: None,
2224                direction: ExpandDirection::Incoming,
2225                edge_types: vec!["KNOWS".to_string()],
2226                min_hops: 1,
2227                max_hops: Some(1),
2228                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2229                    variable: "a".to_string(),
2230                    label: None,
2231                    input: None,
2232                })),
2233                path_alias: None,
2234                path_mode: PathMode::Walk,
2235            })),
2236        }));
2237
2238        let physical = planner.plan(&logical).unwrap();
2239        assert!(physical.columns().contains(&"a".to_string()));
2240        assert!(physical.columns().contains(&"b".to_string()));
2241    }
2242
2243    #[test]
2244    fn test_plan_expand_both_directions() {
2245        let store = create_test_store();
2246        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2247
2248        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2249        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2250            items: vec![
2251                ReturnItem {
2252                    expression: LogicalExpression::Variable("a".to_string()),
2253                    alias: None,
2254                },
2255                ReturnItem {
2256                    expression: LogicalExpression::Variable("b".to_string()),
2257                    alias: None,
2258                },
2259            ],
2260            distinct: false,
2261            input: Box::new(LogicalOperator::Expand(ExpandOp {
2262                from_variable: "a".to_string(),
2263                to_variable: "b".to_string(),
2264                edge_variable: None,
2265                direction: ExpandDirection::Both,
2266                edge_types: vec!["KNOWS".to_string()],
2267                min_hops: 1,
2268                max_hops: Some(1),
2269                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2270                    variable: "a".to_string(),
2271                    label: None,
2272                    input: None,
2273                })),
2274                path_alias: None,
2275                path_mode: PathMode::Walk,
2276            })),
2277        }));
2278
2279        let physical = planner.plan(&logical).unwrap();
2280        assert!(physical.columns().contains(&"a".to_string()));
2281        assert!(physical.columns().contains(&"b".to_string()));
2282    }
2283
2284    // ==================== With Context Tests ====================
2285
2286    #[test]
2287    fn test_planner_with_context() {
2288        use crate::transaction::TransactionManager;
2289
2290        let store = create_test_store();
2291        let transaction_manager = Arc::new(TransactionManager::new());
2292        let transaction_id = transaction_manager.begin();
2293        let epoch = transaction_manager.current_epoch();
2294
2295        let planner = Planner::with_context(
2296            Arc::clone(&store),
2297            Arc::clone(&transaction_manager),
2298            Some(transaction_id),
2299            epoch,
2300        );
2301
2302        assert_eq!(planner.transaction_id(), Some(transaction_id));
2303        assert!(planner.transaction_manager().is_some());
2304        assert_eq!(planner.viewing_epoch(), epoch);
2305    }
2306
2307    #[test]
2308    fn test_planner_with_factorized_execution_disabled() {
2309        let store = create_test_store();
2310        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2311
2312        // Two consecutive expands - should NOT use factorized execution
2313        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2314            items: vec![
2315                ReturnItem {
2316                    expression: LogicalExpression::Variable("a".to_string()),
2317                    alias: None,
2318                },
2319                ReturnItem {
2320                    expression: LogicalExpression::Variable("c".to_string()),
2321                    alias: None,
2322                },
2323            ],
2324            distinct: false,
2325            input: Box::new(LogicalOperator::Expand(ExpandOp {
2326                from_variable: "b".to_string(),
2327                to_variable: "c".to_string(),
2328                edge_variable: None,
2329                direction: ExpandDirection::Outgoing,
2330                edge_types: vec![],
2331                min_hops: 1,
2332                max_hops: Some(1),
2333                input: Box::new(LogicalOperator::Expand(ExpandOp {
2334                    from_variable: "a".to_string(),
2335                    to_variable: "b".to_string(),
2336                    edge_variable: None,
2337                    direction: ExpandDirection::Outgoing,
2338                    edge_types: vec![],
2339                    min_hops: 1,
2340                    max_hops: Some(1),
2341                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2342                        variable: "a".to_string(),
2343                        label: None,
2344                        input: None,
2345                    })),
2346                    path_alias: None,
2347                    path_mode: PathMode::Walk,
2348                })),
2349                path_alias: None,
2350                path_mode: PathMode::Walk,
2351            })),
2352        }));
2353
2354        let physical = planner.plan(&logical).unwrap();
2355        assert!(physical.columns().contains(&"a".to_string()));
2356        assert!(physical.columns().contains(&"c".to_string()));
2357    }
2358
2359    // ==================== Sort with Property Tests ====================
2360
2361    #[test]
2362    fn test_plan_sort_by_property() {
2363        let store = create_test_store();
2364        let planner = Planner::new(store);
2365
2366        // MATCH (n) RETURN n ORDER BY n.name ASC
2367        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2368            items: vec![ReturnItem {
2369                expression: LogicalExpression::Variable("n".to_string()),
2370                alias: None,
2371            }],
2372            distinct: false,
2373            input: Box::new(LogicalOperator::Sort(SortOp {
2374                keys: vec![SortKey {
2375                    expression: LogicalExpression::Property {
2376                        variable: "n".to_string(),
2377                        property: "name".to_string(),
2378                    },
2379                    order: SortOrder::Ascending,
2380                    nulls: None,
2381                }],
2382                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2383                    variable: "n".to_string(),
2384                    label: None,
2385                    input: None,
2386                })),
2387            })),
2388        }));
2389
2390        let physical = planner.plan(&logical).unwrap();
2391        // Should have the property column projected
2392        assert!(physical.columns().contains(&"n".to_string()));
2393    }
2394
2395    // ==================== Scan with Input Tests ====================
2396
2397    #[test]
2398    fn test_plan_scan_with_input() {
2399        let store = create_test_store();
2400        let planner = Planner::new(store);
2401
2402        // A scan with another scan as input (for chained patterns)
2403        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2404            items: vec![
2405                ReturnItem {
2406                    expression: LogicalExpression::Variable("a".to_string()),
2407                    alias: None,
2408                },
2409                ReturnItem {
2410                    expression: LogicalExpression::Variable("b".to_string()),
2411                    alias: None,
2412                },
2413            ],
2414            distinct: false,
2415            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2416                variable: "b".to_string(),
2417                label: Some("Company".to_string()),
2418                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2419                    variable: "a".to_string(),
2420                    label: Some("Person".to_string()),
2421                    input: None,
2422                }))),
2423            })),
2424        }));
2425
2426        let physical = planner.plan(&logical).unwrap();
2427        assert!(physical.columns().contains(&"a".to_string()));
2428        assert!(physical.columns().contains(&"b".to_string()));
2429    }
2430}