Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34    DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35    ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36    FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43    VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53    convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57/// Range bounds for property-based range queries.
58struct RangeBounds<'a> {
59    min: Option<&'a Value>,
60    max: Option<&'a Value>,
61    min_inclusive: bool,
62    max_inclusive: bool,
63}
64
65/// Converts a logical plan to a physical operator tree for LPG stores.
66pub struct Planner {
67    /// The graph store (supports both read and write operations).
68    pub(super) store: Arc<dyn GraphStoreMut>,
69    /// Transaction manager for MVCC operations.
70    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
71    /// Current transaction ID (if in a transaction).
72    pub(super) transaction_id: Option<TransactionId>,
73    /// Epoch to use for visibility checks.
74    pub(super) viewing_epoch: EpochId,
75    /// Counter for generating unique anonymous edge column names.
76    pub(super) anon_edge_counter: std::cell::Cell<u32>,
77    /// Whether to use factorized execution for multi-hop queries.
78    pub(super) factorized_execution: bool,
79    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
80    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
81    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
82    /// Variables that hold edge IDs (from MATCH edge patterns).
83    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
84    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
85    /// Optional constraint validator for schema enforcement during mutations.
86    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
87    /// Catalog for user-defined procedure lookup.
88    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
89    /// Shared parameter state for the currently planning correlated Apply.
90    /// Set by `plan_apply` before planning the inner operator, consumed by
91    /// `plan_operator` when encountering `ParameterScan`.
92    pub(super) correlated_param_state:
93        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
94    /// Variables from variable-length expand patterns (group-list variables).
95    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
96    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
97    /// When true, each physical operator is wrapped in `ProfiledOperator`.
98    profiling: std::cell::Cell<bool>,
99    /// Profile entries collected during planning (post-order).
100    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
101    /// Optional write tracker for recording writes during mutations.
102    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
103    /// Session context for introspection functions (info, schema, current_schema, etc.).
104    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
105    /// When true, expand operators use epoch-only visibility (no MVCC version
106    /// chain walks).  Set when the plan contains no mutations, so PENDING
107    /// writes are impossible to observe.
108    pub(super) read_only: bool,
109}
110
111impl Planner {
112    /// Creates a new planner with the given store.
113    ///
114    /// This creates a planner without transaction context, using the current
115    /// epoch from the store for visibility.
116    #[must_use]
117    pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
118        let epoch = store.current_epoch();
119        Self {
120            store,
121            transaction_manager: None,
122            transaction_id: None,
123            viewing_epoch: epoch,
124            anon_edge_counter: std::cell::Cell::new(0),
125            factorized_execution: true,
126            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
127            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
128            validator: None,
129            catalog: None,
130            correlated_param_state: std::cell::RefCell::new(None),
131            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
132            profiling: std::cell::Cell::new(false),
133            profile_entries: std::cell::RefCell::new(Vec::new()),
134            write_tracker: None,
135            session_context: grafeo_core::execution::operators::SessionContext::default(),
136            read_only: false,
137        }
138    }
139
140    /// Creates a new planner with transaction context for MVCC-aware planning.
141    #[must_use]
142    pub fn with_context(
143        store: Arc<dyn GraphStoreMut>,
144        transaction_manager: Arc<TransactionManager>,
145        transaction_id: Option<TransactionId>,
146        viewing_epoch: EpochId,
147    ) -> Self {
148        use crate::transaction::TransactionWriteTracker;
149
150        // Create write tracker when there's an active transaction
151        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
152            if transaction_id.is_some() {
153                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
154                    &transaction_manager,
155                ))))
156            } else {
157                None
158            };
159
160        Self {
161            store,
162            transaction_manager: Some(transaction_manager),
163            transaction_id,
164            viewing_epoch,
165            anon_edge_counter: std::cell::Cell::new(0),
166            factorized_execution: true,
167            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
168            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
169            validator: None,
170            catalog: None,
171            correlated_param_state: std::cell::RefCell::new(None),
172            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
173            profiling: std::cell::Cell::new(false),
174            profile_entries: std::cell::RefCell::new(Vec::new()),
175            write_tracker,
176            session_context: grafeo_core::execution::operators::SessionContext::default(),
177            read_only: false,
178        }
179    }
180
181    /// Marks this planner as planning a read-only query (no mutations),
182    /// enabling fast-path visibility checks in expand operators.
183    #[must_use]
184    pub fn with_read_only(mut self, read_only: bool) -> Self {
185        self.read_only = read_only;
186        self
187    }
188
189    /// Returns the viewing epoch for this planner.
190    #[must_use]
191    pub fn viewing_epoch(&self) -> EpochId {
192        self.viewing_epoch
193    }
194
195    /// Returns the transaction ID for this planner, if any.
196    #[must_use]
197    pub fn transaction_id(&self) -> Option<TransactionId> {
198        self.transaction_id
199    }
200
201    /// Returns a reference to the transaction manager, if available.
202    #[must_use]
203    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
204        self.transaction_manager.as_ref()
205    }
206
207    /// Enables or disables factorized execution for multi-hop queries.
208    #[must_use]
209    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
210        self.factorized_execution = enabled;
211        self
212    }
213
214    /// Sets the constraint validator for schema enforcement during mutations.
215    #[must_use]
216    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
217        self.validator = Some(validator);
218        self
219    }
220
221    /// Sets the catalog for user-defined procedure lookup.
222    #[must_use]
223    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
224        self.catalog = Some(catalog);
225        self
226    }
227
228    /// Sets the session context for introspection functions.
229    #[must_use]
230    pub fn with_session_context(
231        mut self,
232        context: grafeo_core::execution::operators::SessionContext,
233    ) -> Self {
234        self.session_context = context;
235        self
236    }
237
238    /// Counts consecutive single-hop expand operations.
239    ///
240    /// Returns the count and the deepest non-expand operator (the base of the chain).
241    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
242        match op {
243            LogicalOperator::Expand(expand) => {
244                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
245
246                if is_single_hop {
247                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
248                    (inner_count + 1, base)
249                } else {
250                    (0, op)
251                }
252            }
253            _ => (0, op),
254        }
255    }
256
257    /// Collects expand operations from the outermost down to the base.
258    ///
259    /// Returns expands in order from innermost (base) to outermost.
260    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
261        let mut chain = Vec::new();
262        let mut current = op;
263
264        while let LogicalOperator::Expand(expand) = current {
265            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
266            if !is_single_hop {
267                break;
268            }
269            chain.push(expand);
270            current = &expand.input;
271        }
272
273        chain.reverse();
274        chain
275    }
276
277    /// Plans a logical plan into a physical operator.
278    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
279        let _span = grafeo_debug_span!("grafeo::query::plan");
280        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
281        Ok(PhysicalPlan {
282            operator,
283            columns,
284            adaptive_context: None,
285        })
286    }
287
288    /// Plans a logical plan with profiling: each physical operator is wrapped
289    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
290    /// collect row counts and timing. Returns the physical plan together with
291    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
292    /// items in post-order (children before parents).
293    pub fn plan_profiled(
294        &self,
295        logical_plan: &LogicalPlan,
296    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
297        self.profiling.set(true);
298        self.profile_entries.borrow_mut().clear();
299
300        let result = self.plan_operator(&logical_plan.root);
301
302        self.profiling.set(false);
303        let (operator, columns) = result?;
304        let entries = self.profile_entries.borrow_mut().drain(..).collect();
305
306        Ok((
307            PhysicalPlan {
308                operator,
309                columns,
310                adaptive_context: None,
311            },
312            entries,
313        ))
314    }
315
316    /// Plans a logical plan with adaptive execution support.
317    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
318        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
319
320        let mut adaptive_context = AdaptiveContext::new();
321        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
322
323        Ok(PhysicalPlan {
324            operator,
325            columns,
326            adaptive_context: Some(adaptive_context),
327        })
328    }
329
330    /// Collects cardinality estimates from the logical plan into an adaptive context.
331    fn collect_cardinality_estimates(
332        &self,
333        op: &LogicalOperator,
334        ctx: &mut AdaptiveContext,
335        depth: usize,
336    ) {
337        match op {
338            LogicalOperator::NodeScan(scan) => {
339                let estimate = if let Some(label) = &scan.label {
340                    self.store.nodes_by_label(label).len() as f64
341                } else {
342                    self.store.node_count() as f64
343                };
344                let id = format!("scan_{}", scan.variable);
345                ctx.set_estimate(&id, estimate);
346
347                if let Some(input) = &scan.input {
348                    self.collect_cardinality_estimates(input, ctx, depth + 1);
349                }
350            }
351            LogicalOperator::Filter(filter) => {
352                let input_estimate = self.estimate_cardinality(&filter.input);
353                let estimate = input_estimate * 0.3;
354                let id = format!("filter_{depth}");
355                ctx.set_estimate(&id, estimate);
356
357                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
358            }
359            LogicalOperator::Expand(expand) => {
360                let input_estimate = self.estimate_cardinality(&expand.input);
361                let stats = self.store.statistics();
362                let avg_degree = self.estimate_expand_degree(&stats, expand);
363                let estimate = input_estimate * avg_degree;
364                let id = format!("expand_{}", expand.to_variable);
365                ctx.set_estimate(&id, estimate);
366
367                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
368            }
369            LogicalOperator::Join(join) => {
370                let left_est = self.estimate_cardinality(&join.left);
371                let right_est = self.estimate_cardinality(&join.right);
372                let estimate = (left_est * right_est).sqrt();
373                let id = format!("join_{depth}");
374                ctx.set_estimate(&id, estimate);
375
376                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
377                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
378            }
379            LogicalOperator::Aggregate(agg) => {
380                let input_estimate = self.estimate_cardinality(&agg.input);
381                let estimate = if agg.group_by.is_empty() {
382                    1.0
383                } else {
384                    (input_estimate * 0.1).max(1.0)
385                };
386                let id = format!("aggregate_{depth}");
387                ctx.set_estimate(&id, estimate);
388
389                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
390            }
391            LogicalOperator::Distinct(distinct) => {
392                let input_estimate = self.estimate_cardinality(&distinct.input);
393                let estimate = (input_estimate * 0.5).max(1.0);
394                let id = format!("distinct_{depth}");
395                ctx.set_estimate(&id, estimate);
396
397                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
398            }
399            LogicalOperator::Return(ret) => {
400                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
401            }
402            LogicalOperator::Limit(limit) => {
403                let input_estimate = self.estimate_cardinality(&limit.input);
404                let estimate = (input_estimate).min(limit.count.estimate());
405                let id = format!("limit_{depth}");
406                ctx.set_estimate(&id, estimate);
407
408                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
409            }
410            LogicalOperator::Skip(skip) => {
411                let input_estimate = self.estimate_cardinality(&skip.input);
412                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
413                let id = format!("skip_{depth}");
414                ctx.set_estimate(&id, estimate);
415
416                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
417            }
418            LogicalOperator::Sort(sort) => {
419                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
420            }
421            LogicalOperator::Union(union) => {
422                let estimate: f64 = union
423                    .inputs
424                    .iter()
425                    .map(|input| self.estimate_cardinality(input))
426                    .sum();
427                let id = format!("union_{depth}");
428                ctx.set_estimate(&id, estimate);
429
430                for input in &union.inputs {
431                    self.collect_cardinality_estimates(input, ctx, depth + 1);
432                }
433            }
434            _ => {
435                // For other operators, try to recurse into known input patterns
436            }
437        }
438    }
439
440    /// Estimates cardinality for a logical operator subtree.
441    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
442        match op {
443            LogicalOperator::NodeScan(scan) => {
444                if let Some(label) = &scan.label {
445                    self.store.nodes_by_label(label).len() as f64
446                } else {
447                    self.store.node_count() as f64
448                }
449            }
450            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
451            LogicalOperator::Expand(expand) => {
452                let stats = self.store.statistics();
453                let avg_degree = self.estimate_expand_degree(&stats, expand);
454                self.estimate_cardinality(&expand.input) * avg_degree
455            }
456            LogicalOperator::Join(join) => {
457                let left = self.estimate_cardinality(&join.left);
458                let right = self.estimate_cardinality(&join.right);
459                (left * right).sqrt()
460            }
461            LogicalOperator::Aggregate(agg) => {
462                if agg.group_by.is_empty() {
463                    1.0
464                } else {
465                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
466                }
467            }
468            LogicalOperator::Distinct(distinct) => {
469                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
470            }
471            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
472            LogicalOperator::Limit(limit) => self
473                .estimate_cardinality(&limit.input)
474                .min(limit.count.estimate()),
475            LogicalOperator::Skip(skip) => {
476                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
477            }
478            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
479            LogicalOperator::Union(union) => union
480                .inputs
481                .iter()
482                .map(|input| self.estimate_cardinality(input))
483                .sum(),
484            LogicalOperator::Except(except) => {
485                let left = self.estimate_cardinality(&except.left);
486                let right = self.estimate_cardinality(&except.right);
487                (left - right).max(0.0)
488            }
489            LogicalOperator::Intersect(intersect) => {
490                let left = self.estimate_cardinality(&intersect.left);
491                let right = self.estimate_cardinality(&intersect.right);
492                left.min(right)
493            }
494            LogicalOperator::Otherwise(otherwise) => self
495                .estimate_cardinality(&otherwise.left)
496                .max(self.estimate_cardinality(&otherwise.right)),
497            _ => 1000.0,
498        }
499    }
500
501    /// Estimates the average edge degree for an expand operation using store statistics.
502    fn estimate_expand_degree(
503        &self,
504        stats: &grafeo_core::statistics::Statistics,
505        expand: &ExpandOp,
506    ) -> f64 {
507        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
508        if expand.edge_types.len() == 1 {
509            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
510        } else if stats.total_nodes > 0 {
511            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
512        } else {
513            10.0
514        }
515    }
516
517    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
518    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
519    fn maybe_profile(
520        &self,
521        result: Result<(Box<dyn Operator>, Vec<String>)>,
522        op: &LogicalOperator,
523    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
524        if self.profiling.get() {
525            let (physical, columns) = result?;
526            let (entry, stats) =
527                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
528            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
529            self.profile_entries.borrow_mut().push(entry);
530            Ok((Box::new(profiled), columns))
531        } else {
532            result
533        }
534    }
535
536    /// Plans a single logical operator.
537    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
538        let result = match op {
539            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
540            LogicalOperator::Expand(expand) => {
541                if self.factorized_execution {
542                    let (chain_len, _base) = Self::count_expand_chain(op);
543                    if chain_len >= 2 {
544                        return self.maybe_profile(self.plan_expand_chain(op), op);
545                    }
546                }
547                self.plan_expand(expand)
548            }
549            LogicalOperator::Return(ret) => self.plan_return(ret),
550            LogicalOperator::Filter(filter) => self.plan_filter(filter),
551            LogicalOperator::Project(project) => self.plan_project(project),
552            LogicalOperator::Limit(limit) => self.plan_limit(limit),
553            LogicalOperator::Skip(skip) => self.plan_skip(skip),
554            LogicalOperator::Sort(sort) => self.plan_sort(sort),
555            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
556            LogicalOperator::Join(join) => self.plan_join(join),
557            LogicalOperator::Union(union) => self.plan_union(union),
558            LogicalOperator::Except(except) => self.plan_except(except),
559            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
560            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
561            LogicalOperator::Apply(apply) => self.plan_apply(apply),
562            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
563            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
564            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
565            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
566            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
567            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
568            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
569            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
570            LogicalOperator::Merge(merge) => self.plan_merge(merge),
571            LogicalOperator::MergeRelationship(merge_rel) => {
572                self.plan_merge_relationship(merge_rel)
573            }
574            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
575            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
576            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
577            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
578            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
579            #[cfg(feature = "algos")]
580            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
581            #[cfg(not(feature = "algos"))]
582            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
583                "CALL procedures require the 'algos' feature".to_string(),
584            )),
585            LogicalOperator::ParameterScan(_param_scan) => {
586                let state = self
587                    .correlated_param_state
588                    .borrow()
589                    .clone()
590                    .ok_or_else(|| {
591                        Error::Internal(
592                            "ParameterScan without correlated Apply context".to_string(),
593                        )
594                    })?;
595                // Use the actual column names from the ParameterState (which may
596                // have been expanded from "*" to real variable names in plan_apply)
597                let columns = state.columns.clone();
598                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
599                Ok((operator, columns))
600            }
601            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
602            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
603            LogicalOperator::LoadData(load) => {
604                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
605                    load.path.clone(),
606                    load.format,
607                    load.with_headers,
608                    load.field_terminator,
609                    load.variable.clone(),
610                ));
611                Ok((operator, vec![load.variable.clone()]))
612            }
613            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
614            LogicalOperator::VectorScan(_) => Err(Error::Internal(
615                "VectorScan requires vector-index feature".to_string(),
616            )),
617            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
618                "VectorJoin requires vector-index feature".to_string(),
619            )),
620            _ => Err(Error::Internal(format!(
621                "Unsupported operator: {:?}",
622                std::mem::discriminant(op)
623            ))),
624        };
625        self.maybe_profile(result, op)
626    }
627
628    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
629    fn plan_horizontal_aggregate(
630        &self,
631        ha: &HorizontalAggregateOp,
632    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
633        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
634
635        let list_col_idx = child_columns
636            .iter()
637            .position(|c| c == &ha.list_column)
638            .ok_or_else(|| {
639                Error::Internal(format!(
640                    "HorizontalAggregate list column '{}' not found in {:?}",
641                    ha.list_column, child_columns
642                ))
643            })?;
644
645        let entity_kind = match ha.entity_kind {
646            LogicalEntityKind::Edge => EntityKind::Edge,
647            LogicalEntityKind::Node => EntityKind::Node,
648        };
649
650        let function = convert_aggregate_function(ha.function);
651        let input_column_count = child_columns.len();
652
653        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
654            child_op,
655            list_col_idx,
656            entity_kind,
657            function,
658            ha.property.clone(),
659            Arc::clone(&self.store) as Arc<dyn GraphStore>,
660            input_column_count,
661        ));
662
663        let mut columns = child_columns;
664        columns.push(ha.alias.clone());
665        // Mark the result as a scalar column
666        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
667
668        Ok((operator, columns))
669    }
670
671    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
672    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
673        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
674        let key_idx = child_columns
675            .iter()
676            .position(|c| c == &mc.key_var)
677            .ok_or_else(|| {
678                Error::Internal(format!(
679                    "MapCollect key '{}' not in columns {:?}",
680                    mc.key_var, child_columns
681                ))
682            })?;
683        let value_idx = child_columns
684            .iter()
685            .position(|c| c == &mc.value_var)
686            .ok_or_else(|| {
687                Error::Internal(format!(
688                    "MapCollect value '{}' not in columns {:?}",
689                    mc.value_var, child_columns
690                ))
691            })?;
692        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
693        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
694        Ok((operator, vec![mc.alias.clone()]))
695    }
696}
697
698/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
699#[cfg(feature = "algos")]
700struct StaticResultOperator {
701    rows: Vec<Vec<Value>>,
702    column_indices: Vec<usize>,
703    row_index: usize,
704}
705
706#[cfg(feature = "algos")]
707impl Operator for StaticResultOperator {
708    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
709        use grafeo_core::execution::DataChunk;
710
711        if self.row_index >= self.rows.len() {
712            return Ok(None);
713        }
714
715        let remaining = self.rows.len() - self.row_index;
716        let chunk_rows = remaining.min(1024);
717        let col_count = self.column_indices.len();
718
719        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
720        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
721
722        for row_offset in 0..chunk_rows {
723            let row = &self.rows[self.row_index + row_offset];
724            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
725                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
726                if let Some(col) = chunk.column_mut(col_idx) {
727                    col.push_value(value);
728                }
729            }
730        }
731        chunk.set_count(chunk_rows);
732
733        self.row_index += chunk_rows;
734        Ok(Some(chunk))
735    }
736
737    fn reset(&mut self) {
738        self.row_index = 0;
739    }
740
741    fn name(&self) -> &'static str {
742        "StaticResult"
743    }
744}
745
746#[cfg(test)]
747mod tests {
748    use super::*;
749    use crate::query::plan::{
750        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
751        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
752        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
753        SkipOp as LogicalSkipOp, SortKey, SortOp,
754    };
755    use grafeo_common::types::Value;
756    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
757    use grafeo_core::graph::GraphStoreMut;
758    use grafeo_core::graph::lpg::LpgStore;
759
760    fn create_test_store() -> Arc<dyn GraphStoreMut> {
761        let store = Arc::new(LpgStore::new().unwrap());
762        store.create_node(&["Person"]);
763        store.create_node(&["Person"]);
764        store.create_node(&["Company"]);
765        store
766    }
767
768    // ==================== Simple Scan Tests ====================
769
770    #[test]
771    fn test_plan_simple_scan() {
772        let store = create_test_store();
773        let planner = Planner::new(store);
774
775        // MATCH (n:Person) RETURN n
776        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
777            items: vec![ReturnItem {
778                expression: LogicalExpression::Variable("n".to_string()),
779                alias: None,
780            }],
781            distinct: false,
782            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
783                variable: "n".to_string(),
784                label: Some("Person".to_string()),
785                input: None,
786            })),
787        }));
788
789        let physical = planner.plan(&logical).unwrap();
790        assert_eq!(physical.columns(), &["n"]);
791    }
792
793    #[test]
794    fn test_plan_scan_without_label() {
795        let store = create_test_store();
796        let planner = Planner::new(store);
797
798        // MATCH (n) RETURN n
799        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
800            items: vec![ReturnItem {
801                expression: LogicalExpression::Variable("n".to_string()),
802                alias: None,
803            }],
804            distinct: false,
805            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
806                variable: "n".to_string(),
807                label: None,
808                input: None,
809            })),
810        }));
811
812        let physical = planner.plan(&logical).unwrap();
813        assert_eq!(physical.columns(), &["n"]);
814    }
815
816    #[test]
817    fn test_plan_return_with_alias() {
818        let store = create_test_store();
819        let planner = Planner::new(store);
820
821        // MATCH (n:Person) RETURN n AS person
822        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
823            items: vec![ReturnItem {
824                expression: LogicalExpression::Variable("n".to_string()),
825                alias: Some("person".to_string()),
826            }],
827            distinct: false,
828            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
829                variable: "n".to_string(),
830                label: Some("Person".to_string()),
831                input: None,
832            })),
833        }));
834
835        let physical = planner.plan(&logical).unwrap();
836        assert_eq!(physical.columns(), &["person"]);
837    }
838
839    #[test]
840    fn test_plan_return_property() {
841        let store = create_test_store();
842        let planner = Planner::new(store);
843
844        // MATCH (n:Person) RETURN n.name
845        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
846            items: vec![ReturnItem {
847                expression: LogicalExpression::Property {
848                    variable: "n".to_string(),
849                    property: "name".to_string(),
850                },
851                alias: None,
852            }],
853            distinct: false,
854            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
855                variable: "n".to_string(),
856                label: Some("Person".to_string()),
857                input: None,
858            })),
859        }));
860
861        let physical = planner.plan(&logical).unwrap();
862        assert_eq!(physical.columns(), &["n.name"]);
863    }
864
865    #[test]
866    fn test_plan_return_literal() {
867        let store = create_test_store();
868        let planner = Planner::new(store);
869
870        // MATCH (n) RETURN 42 AS answer
871        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
872            items: vec![ReturnItem {
873                expression: LogicalExpression::Literal(Value::Int64(42)),
874                alias: Some("answer".to_string()),
875            }],
876            distinct: false,
877            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
878                variable: "n".to_string(),
879                label: None,
880                input: None,
881            })),
882        }));
883
884        let physical = planner.plan(&logical).unwrap();
885        assert_eq!(physical.columns(), &["answer"]);
886    }
887
888    // ==================== Filter Tests ====================
889
890    #[test]
891    fn test_plan_filter_equality() {
892        let store = create_test_store();
893        let planner = Planner::new(store);
894
895        // MATCH (n:Person) WHERE n.age = 30 RETURN n
896        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
897            items: vec![ReturnItem {
898                expression: LogicalExpression::Variable("n".to_string()),
899                alias: None,
900            }],
901            distinct: false,
902            input: Box::new(LogicalOperator::Filter(FilterOp {
903                predicate: LogicalExpression::Binary {
904                    left: Box::new(LogicalExpression::Property {
905                        variable: "n".to_string(),
906                        property: "age".to_string(),
907                    }),
908                    op: BinaryOp::Eq,
909                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
910                },
911                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
912                    variable: "n".to_string(),
913                    label: Some("Person".to_string()),
914                    input: None,
915                })),
916                pushdown_hint: None,
917            })),
918        }));
919
920        let physical = planner.plan(&logical).unwrap();
921        assert_eq!(physical.columns(), &["n"]);
922    }
923
924    #[test]
925    fn test_plan_filter_compound_and() {
926        let store = create_test_store();
927        let planner = Planner::new(store);
928
929        // WHERE n.age > 20 AND n.age < 40
930        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
931            items: vec![ReturnItem {
932                expression: LogicalExpression::Variable("n".to_string()),
933                alias: None,
934            }],
935            distinct: false,
936            input: Box::new(LogicalOperator::Filter(FilterOp {
937                predicate: LogicalExpression::Binary {
938                    left: Box::new(LogicalExpression::Binary {
939                        left: Box::new(LogicalExpression::Property {
940                            variable: "n".to_string(),
941                            property: "age".to_string(),
942                        }),
943                        op: BinaryOp::Gt,
944                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
945                    }),
946                    op: BinaryOp::And,
947                    right: Box::new(LogicalExpression::Binary {
948                        left: Box::new(LogicalExpression::Property {
949                            variable: "n".to_string(),
950                            property: "age".to_string(),
951                        }),
952                        op: BinaryOp::Lt,
953                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
954                    }),
955                },
956                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
957                    variable: "n".to_string(),
958                    label: None,
959                    input: None,
960                })),
961                pushdown_hint: None,
962            })),
963        }));
964
965        let physical = planner.plan(&logical).unwrap();
966        assert_eq!(physical.columns(), &["n"]);
967    }
968
969    #[test]
970    fn test_plan_filter_unary_not() {
971        let store = create_test_store();
972        let planner = Planner::new(store);
973
974        // WHERE NOT n.active
975        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
976            items: vec![ReturnItem {
977                expression: LogicalExpression::Variable("n".to_string()),
978                alias: None,
979            }],
980            distinct: false,
981            input: Box::new(LogicalOperator::Filter(FilterOp {
982                predicate: LogicalExpression::Unary {
983                    op: UnaryOp::Not,
984                    operand: Box::new(LogicalExpression::Property {
985                        variable: "n".to_string(),
986                        property: "active".to_string(),
987                    }),
988                },
989                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
990                    variable: "n".to_string(),
991                    label: None,
992                    input: None,
993                })),
994                pushdown_hint: None,
995            })),
996        }));
997
998        let physical = planner.plan(&logical).unwrap();
999        assert_eq!(physical.columns(), &["n"]);
1000    }
1001
1002    #[test]
1003    fn test_plan_filter_is_null() {
1004        let store = create_test_store();
1005        let planner = Planner::new(store);
1006
1007        // WHERE n.email IS NULL
1008        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1009            items: vec![ReturnItem {
1010                expression: LogicalExpression::Variable("n".to_string()),
1011                alias: None,
1012            }],
1013            distinct: false,
1014            input: Box::new(LogicalOperator::Filter(FilterOp {
1015                predicate: LogicalExpression::Unary {
1016                    op: UnaryOp::IsNull,
1017                    operand: Box::new(LogicalExpression::Property {
1018                        variable: "n".to_string(),
1019                        property: "email".to_string(),
1020                    }),
1021                },
1022                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1023                    variable: "n".to_string(),
1024                    label: None,
1025                    input: None,
1026                })),
1027                pushdown_hint: None,
1028            })),
1029        }));
1030
1031        let physical = planner.plan(&logical).unwrap();
1032        assert_eq!(physical.columns(), &["n"]);
1033    }
1034
1035    #[test]
1036    fn test_plan_filter_function_call() {
1037        let store = create_test_store();
1038        let planner = Planner::new(store);
1039
1040        // WHERE size(n.friends) > 0
1041        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1042            items: vec![ReturnItem {
1043                expression: LogicalExpression::Variable("n".to_string()),
1044                alias: None,
1045            }],
1046            distinct: false,
1047            input: Box::new(LogicalOperator::Filter(FilterOp {
1048                predicate: LogicalExpression::Binary {
1049                    left: Box::new(LogicalExpression::FunctionCall {
1050                        name: "size".to_string(),
1051                        args: vec![LogicalExpression::Property {
1052                            variable: "n".to_string(),
1053                            property: "friends".to_string(),
1054                        }],
1055                        distinct: false,
1056                    }),
1057                    op: BinaryOp::Gt,
1058                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1059                },
1060                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1061                    variable: "n".to_string(),
1062                    label: None,
1063                    input: None,
1064                })),
1065                pushdown_hint: None,
1066            })),
1067        }));
1068
1069        let physical = planner.plan(&logical).unwrap();
1070        assert_eq!(physical.columns(), &["n"]);
1071    }
1072
1073    // ==================== Expand Tests ====================
1074
1075    #[test]
1076    fn test_plan_expand_outgoing() {
1077        let store = create_test_store();
1078        let planner = Planner::new(store);
1079
1080        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1081        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1082            items: vec![
1083                ReturnItem {
1084                    expression: LogicalExpression::Variable("a".to_string()),
1085                    alias: None,
1086                },
1087                ReturnItem {
1088                    expression: LogicalExpression::Variable("b".to_string()),
1089                    alias: None,
1090                },
1091            ],
1092            distinct: false,
1093            input: Box::new(LogicalOperator::Expand(ExpandOp {
1094                from_variable: "a".to_string(),
1095                to_variable: "b".to_string(),
1096                edge_variable: None,
1097                direction: ExpandDirection::Outgoing,
1098                edge_types: vec!["KNOWS".to_string()],
1099                min_hops: 1,
1100                max_hops: Some(1),
1101                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1102                    variable: "a".to_string(),
1103                    label: Some("Person".to_string()),
1104                    input: None,
1105                })),
1106                path_alias: None,
1107                path_mode: PathMode::Walk,
1108            })),
1109        }));
1110
1111        let physical = planner.plan(&logical).unwrap();
1112        // The return should have columns [a, b]
1113        assert!(physical.columns().contains(&"a".to_string()));
1114        assert!(physical.columns().contains(&"b".to_string()));
1115    }
1116
1117    #[test]
1118    fn test_plan_expand_with_edge_variable() {
1119        let store = create_test_store();
1120        let planner = Planner::new(store);
1121
1122        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1123        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1124            items: vec![
1125                ReturnItem {
1126                    expression: LogicalExpression::Variable("a".to_string()),
1127                    alias: None,
1128                },
1129                ReturnItem {
1130                    expression: LogicalExpression::Variable("r".to_string()),
1131                    alias: None,
1132                },
1133                ReturnItem {
1134                    expression: LogicalExpression::Variable("b".to_string()),
1135                    alias: None,
1136                },
1137            ],
1138            distinct: false,
1139            input: Box::new(LogicalOperator::Expand(ExpandOp {
1140                from_variable: "a".to_string(),
1141                to_variable: "b".to_string(),
1142                edge_variable: Some("r".to_string()),
1143                direction: ExpandDirection::Outgoing,
1144                edge_types: vec!["KNOWS".to_string()],
1145                min_hops: 1,
1146                max_hops: Some(1),
1147                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1148                    variable: "a".to_string(),
1149                    label: None,
1150                    input: None,
1151                })),
1152                path_alias: None,
1153                path_mode: PathMode::Walk,
1154            })),
1155        }));
1156
1157        let physical = planner.plan(&logical).unwrap();
1158        assert!(physical.columns().contains(&"a".to_string()));
1159        assert!(physical.columns().contains(&"r".to_string()));
1160        assert!(physical.columns().contains(&"b".to_string()));
1161    }
1162
1163    // ==================== Limit/Skip/Sort Tests ====================
1164
1165    #[test]
1166    fn test_plan_limit() {
1167        let store = create_test_store();
1168        let planner = Planner::new(store);
1169
1170        // MATCH (n) RETURN n LIMIT 10
1171        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1172            items: vec![ReturnItem {
1173                expression: LogicalExpression::Variable("n".to_string()),
1174                alias: None,
1175            }],
1176            distinct: false,
1177            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1178                count: 10.into(),
1179                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1180                    variable: "n".to_string(),
1181                    label: None,
1182                    input: None,
1183                })),
1184            })),
1185        }));
1186
1187        let physical = planner.plan(&logical).unwrap();
1188        assert_eq!(physical.columns(), &["n"]);
1189    }
1190
1191    #[test]
1192    fn test_plan_skip() {
1193        let store = create_test_store();
1194        let planner = Planner::new(store);
1195
1196        // MATCH (n) RETURN n SKIP 5
1197        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1198            items: vec![ReturnItem {
1199                expression: LogicalExpression::Variable("n".to_string()),
1200                alias: None,
1201            }],
1202            distinct: false,
1203            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1204                count: 5.into(),
1205                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1206                    variable: "n".to_string(),
1207                    label: None,
1208                    input: None,
1209                })),
1210            })),
1211        }));
1212
1213        let physical = planner.plan(&logical).unwrap();
1214        assert_eq!(physical.columns(), &["n"]);
1215    }
1216
1217    #[test]
1218    fn test_plan_sort() {
1219        let store = create_test_store();
1220        let planner = Planner::new(store);
1221
1222        // MATCH (n) RETURN n ORDER BY n.name ASC
1223        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1224            items: vec![ReturnItem {
1225                expression: LogicalExpression::Variable("n".to_string()),
1226                alias: None,
1227            }],
1228            distinct: false,
1229            input: Box::new(LogicalOperator::Sort(SortOp {
1230                keys: vec![SortKey {
1231                    expression: LogicalExpression::Variable("n".to_string()),
1232                    order: SortOrder::Ascending,
1233                    nulls: None,
1234                }],
1235                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1236                    variable: "n".to_string(),
1237                    label: None,
1238                    input: None,
1239                })),
1240            })),
1241        }));
1242
1243        let physical = planner.plan(&logical).unwrap();
1244        assert_eq!(physical.columns(), &["n"]);
1245    }
1246
1247    #[test]
1248    fn test_plan_sort_descending() {
1249        let store = create_test_store();
1250        let planner = Planner::new(store);
1251
1252        // ORDER BY n DESC
1253        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1254            items: vec![ReturnItem {
1255                expression: LogicalExpression::Variable("n".to_string()),
1256                alias: None,
1257            }],
1258            distinct: false,
1259            input: Box::new(LogicalOperator::Sort(SortOp {
1260                keys: vec![SortKey {
1261                    expression: LogicalExpression::Variable("n".to_string()),
1262                    order: SortOrder::Descending,
1263                    nulls: None,
1264                }],
1265                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1266                    variable: "n".to_string(),
1267                    label: None,
1268                    input: None,
1269                })),
1270            })),
1271        }));
1272
1273        let physical = planner.plan(&logical).unwrap();
1274        assert_eq!(physical.columns(), &["n"]);
1275    }
1276
1277    #[test]
1278    fn test_plan_distinct() {
1279        let store = create_test_store();
1280        let planner = Planner::new(store);
1281
1282        // MATCH (n) RETURN DISTINCT n
1283        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1284            items: vec![ReturnItem {
1285                expression: LogicalExpression::Variable("n".to_string()),
1286                alias: None,
1287            }],
1288            distinct: false,
1289            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1290                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1291                    variable: "n".to_string(),
1292                    label: None,
1293                    input: None,
1294                })),
1295                columns: None,
1296            })),
1297        }));
1298
1299        let physical = planner.plan(&logical).unwrap();
1300        assert_eq!(physical.columns(), &["n"]);
1301    }
1302
1303    #[test]
1304    fn test_plan_distinct_with_columns() {
1305        let store = create_test_store();
1306        let planner = Planner::new(store);
1307
1308        // DISTINCT on specific columns (column-specific dedup)
1309        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1310            items: vec![ReturnItem {
1311                expression: LogicalExpression::Variable("n".to_string()),
1312                alias: None,
1313            }],
1314            distinct: false,
1315            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1316                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1317                    variable: "n".to_string(),
1318                    label: None,
1319                    input: None,
1320                })),
1321                columns: Some(vec!["n".to_string()]),
1322            })),
1323        }));
1324
1325        let physical = planner.plan(&logical).unwrap();
1326        assert_eq!(physical.columns(), &["n"]);
1327    }
1328
1329    #[test]
1330    fn test_plan_distinct_with_nonexistent_columns() {
1331        let store = create_test_store();
1332        let planner = Planner::new(store);
1333
1334        // When distinct columns don't match any output columns,
1335        // it falls back to full-row distinct.
1336        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1337            items: vec![ReturnItem {
1338                expression: LogicalExpression::Variable("n".to_string()),
1339                alias: None,
1340            }],
1341            distinct: false,
1342            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1343                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1344                    variable: "n".to_string(),
1345                    label: None,
1346                    input: None,
1347                })),
1348                columns: Some(vec!["nonexistent".to_string()]),
1349            })),
1350        }));
1351
1352        let physical = planner.plan(&logical).unwrap();
1353        assert_eq!(physical.columns(), &["n"]);
1354    }
1355
1356    // ==================== Aggregate Tests ====================
1357
1358    #[test]
1359    fn test_plan_aggregate_count() {
1360        let store = create_test_store();
1361        let planner = Planner::new(store);
1362
1363        // MATCH (n) RETURN count(n)
1364        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1365            items: vec![ReturnItem {
1366                expression: LogicalExpression::Variable("cnt".to_string()),
1367                alias: None,
1368            }],
1369            distinct: false,
1370            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1371                group_by: vec![],
1372                aggregates: vec![LogicalAggregateExpr {
1373                    function: LogicalAggregateFunction::Count,
1374                    expression: Some(LogicalExpression::Variable("n".to_string())),
1375                    expression2: None,
1376                    distinct: false,
1377                    alias: Some("cnt".to_string()),
1378                    percentile: None,
1379                    separator: None,
1380                }],
1381                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1382                    variable: "n".to_string(),
1383                    label: None,
1384                    input: None,
1385                })),
1386                having: None,
1387            })),
1388        }));
1389
1390        let physical = planner.plan(&logical).unwrap();
1391        assert!(physical.columns().contains(&"cnt".to_string()));
1392    }
1393
1394    #[test]
1395    fn test_plan_aggregate_with_group_by() {
1396        let store = create_test_store();
1397        let planner = Planner::new(store);
1398
1399        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1400        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1401            group_by: vec![LogicalExpression::Property {
1402                variable: "n".to_string(),
1403                property: "city".to_string(),
1404            }],
1405            aggregates: vec![LogicalAggregateExpr {
1406                function: LogicalAggregateFunction::Count,
1407                expression: Some(LogicalExpression::Variable("n".to_string())),
1408                expression2: None,
1409                distinct: false,
1410                alias: Some("cnt".to_string()),
1411                percentile: None,
1412                separator: None,
1413            }],
1414            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1415                variable: "n".to_string(),
1416                label: Some("Person".to_string()),
1417                input: None,
1418            })),
1419            having: None,
1420        }));
1421
1422        let physical = planner.plan(&logical).unwrap();
1423        assert_eq!(physical.columns().len(), 2);
1424    }
1425
1426    #[test]
1427    fn test_plan_aggregate_sum() {
1428        let store = create_test_store();
1429        let planner = Planner::new(store);
1430
1431        // SUM(n.value)
1432        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1433            group_by: vec![],
1434            aggregates: vec![LogicalAggregateExpr {
1435                function: LogicalAggregateFunction::Sum,
1436                expression: Some(LogicalExpression::Property {
1437                    variable: "n".to_string(),
1438                    property: "value".to_string(),
1439                }),
1440                expression2: None,
1441                distinct: false,
1442                alias: Some("total".to_string()),
1443                percentile: None,
1444                separator: None,
1445            }],
1446            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1447                variable: "n".to_string(),
1448                label: None,
1449                input: None,
1450            })),
1451            having: None,
1452        }));
1453
1454        let physical = planner.plan(&logical).unwrap();
1455        assert!(physical.columns().contains(&"total".to_string()));
1456    }
1457
1458    #[test]
1459    fn test_plan_aggregate_avg() {
1460        let store = create_test_store();
1461        let planner = Planner::new(store);
1462
1463        // AVG(n.score)
1464        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1465            group_by: vec![],
1466            aggregates: vec![LogicalAggregateExpr {
1467                function: LogicalAggregateFunction::Avg,
1468                expression: Some(LogicalExpression::Property {
1469                    variable: "n".to_string(),
1470                    property: "score".to_string(),
1471                }),
1472                expression2: None,
1473                distinct: false,
1474                alias: Some("average".to_string()),
1475                percentile: None,
1476                separator: None,
1477            }],
1478            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1479                variable: "n".to_string(),
1480                label: None,
1481                input: None,
1482            })),
1483            having: None,
1484        }));
1485
1486        let physical = planner.plan(&logical).unwrap();
1487        assert!(physical.columns().contains(&"average".to_string()));
1488    }
1489
1490    #[test]
1491    fn test_plan_aggregate_min_max() {
1492        let store = create_test_store();
1493        let planner = Planner::new(store);
1494
1495        // MIN(n.age), MAX(n.age)
1496        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1497            group_by: vec![],
1498            aggregates: vec![
1499                LogicalAggregateExpr {
1500                    function: LogicalAggregateFunction::Min,
1501                    expression: Some(LogicalExpression::Property {
1502                        variable: "n".to_string(),
1503                        property: "age".to_string(),
1504                    }),
1505                    expression2: None,
1506                    distinct: false,
1507                    alias: Some("youngest".to_string()),
1508                    percentile: None,
1509                    separator: None,
1510                },
1511                LogicalAggregateExpr {
1512                    function: LogicalAggregateFunction::Max,
1513                    expression: Some(LogicalExpression::Property {
1514                        variable: "n".to_string(),
1515                        property: "age".to_string(),
1516                    }),
1517                    expression2: None,
1518                    distinct: false,
1519                    alias: Some("oldest".to_string()),
1520                    percentile: None,
1521                    separator: None,
1522                },
1523            ],
1524            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1525                variable: "n".to_string(),
1526                label: None,
1527                input: None,
1528            })),
1529            having: None,
1530        }));
1531
1532        let physical = planner.plan(&logical).unwrap();
1533        assert!(physical.columns().contains(&"youngest".to_string()));
1534        assert!(physical.columns().contains(&"oldest".to_string()));
1535    }
1536
1537    // ==================== Join Tests ====================
1538
1539    #[test]
1540    fn test_plan_inner_join() {
1541        let store = create_test_store();
1542        let planner = Planner::new(store);
1543
1544        // Inner join between two scans
1545        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1546            items: vec![
1547                ReturnItem {
1548                    expression: LogicalExpression::Variable("a".to_string()),
1549                    alias: None,
1550                },
1551                ReturnItem {
1552                    expression: LogicalExpression::Variable("b".to_string()),
1553                    alias: None,
1554                },
1555            ],
1556            distinct: false,
1557            input: Box::new(LogicalOperator::Join(JoinOp {
1558                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1559                    variable: "a".to_string(),
1560                    label: Some("Person".to_string()),
1561                    input: None,
1562                })),
1563                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1564                    variable: "b".to_string(),
1565                    label: Some("Company".to_string()),
1566                    input: None,
1567                })),
1568                join_type: JoinType::Inner,
1569                conditions: vec![JoinCondition {
1570                    left: LogicalExpression::Variable("a".to_string()),
1571                    right: LogicalExpression::Variable("b".to_string()),
1572                }],
1573            })),
1574        }));
1575
1576        let physical = planner.plan(&logical).unwrap();
1577        assert!(physical.columns().contains(&"a".to_string()));
1578        assert!(physical.columns().contains(&"b".to_string()));
1579    }
1580
1581    #[test]
1582    fn test_plan_cross_join() {
1583        let store = create_test_store();
1584        let planner = Planner::new(store);
1585
1586        // Cross join (no conditions)
1587        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1588            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1589                variable: "a".to_string(),
1590                label: None,
1591                input: None,
1592            })),
1593            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1594                variable: "b".to_string(),
1595                label: None,
1596                input: None,
1597            })),
1598            join_type: JoinType::Cross,
1599            conditions: vec![],
1600        }));
1601
1602        let physical = planner.plan(&logical).unwrap();
1603        assert_eq!(physical.columns().len(), 2);
1604    }
1605
1606    #[test]
1607    fn test_plan_left_join() {
1608        let store = create_test_store();
1609        let planner = Planner::new(store);
1610
1611        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1612            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1613                variable: "a".to_string(),
1614                label: None,
1615                input: None,
1616            })),
1617            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1618                variable: "b".to_string(),
1619                label: None,
1620                input: None,
1621            })),
1622            join_type: JoinType::Left,
1623            conditions: vec![],
1624        }));
1625
1626        let physical = planner.plan(&logical).unwrap();
1627        assert_eq!(physical.columns().len(), 2);
1628    }
1629
1630    // ==================== Mutation Tests ====================
1631
1632    #[test]
1633    fn test_plan_create_node() {
1634        let store = create_test_store();
1635        let planner = Planner::new(store);
1636
1637        // CREATE (n:Person {name: 'Alix'})
1638        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1639            variable: "n".to_string(),
1640            labels: vec!["Person".to_string()],
1641            properties: vec![(
1642                "name".to_string(),
1643                LogicalExpression::Literal(Value::String("Alix".into())),
1644            )],
1645            input: None,
1646        }));
1647
1648        let physical = planner.plan(&logical).unwrap();
1649        assert!(physical.columns().contains(&"n".to_string()));
1650    }
1651
1652    #[test]
1653    fn test_plan_create_edge() {
1654        let store = create_test_store();
1655        let planner = Planner::new(store);
1656
1657        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1658        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1659            variable: Some("r".to_string()),
1660            from_variable: "a".to_string(),
1661            to_variable: "b".to_string(),
1662            edge_type: "KNOWS".to_string(),
1663            properties: vec![],
1664            input: Box::new(LogicalOperator::Join(JoinOp {
1665                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1666                    variable: "a".to_string(),
1667                    label: None,
1668                    input: None,
1669                })),
1670                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1671                    variable: "b".to_string(),
1672                    label: None,
1673                    input: None,
1674                })),
1675                join_type: JoinType::Cross,
1676                conditions: vec![],
1677            })),
1678        }));
1679
1680        let physical = planner.plan(&logical).unwrap();
1681        assert!(physical.columns().contains(&"r".to_string()));
1682    }
1683
1684    #[test]
1685    fn test_plan_delete_node() {
1686        let store = create_test_store();
1687        let planner = Planner::new(store);
1688
1689        // MATCH (n) DELETE n
1690        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1691            variable: "n".to_string(),
1692            detach: false,
1693            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1694                variable: "n".to_string(),
1695                label: None,
1696                input: None,
1697            })),
1698        }));
1699
1700        let physical = planner.plan(&logical).unwrap();
1701        assert!(physical.columns().contains(&"n".to_string()));
1702    }
1703
1704    // ==================== Error Cases ====================
1705
1706    #[test]
1707    fn test_plan_empty_errors() {
1708        let store = create_test_store();
1709        let planner = Planner::new(store);
1710
1711        let logical = LogicalPlan::new(LogicalOperator::Empty);
1712        let result = planner.plan(&logical);
1713        assert!(result.is_err());
1714    }
1715
1716    #[test]
1717    fn test_plan_missing_variable_in_return() {
1718        let store = create_test_store();
1719        let planner = Planner::new(store);
1720
1721        // Return variable that doesn't exist in input
1722        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1723            items: vec![ReturnItem {
1724                expression: LogicalExpression::Variable("missing".to_string()),
1725                alias: None,
1726            }],
1727            distinct: false,
1728            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1729                variable: "n".to_string(),
1730                label: None,
1731                input: None,
1732            })),
1733        }));
1734
1735        let result = planner.plan(&logical);
1736        assert!(result.is_err());
1737    }
1738
1739    // ==================== Helper Function Tests ====================
1740
1741    #[test]
1742    fn test_convert_binary_ops() {
1743        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1744        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1745        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1746        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1747        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1748        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1749        assert!(convert_binary_op(BinaryOp::And).is_ok());
1750        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1751        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1752        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1753        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1754        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1755    }
1756
1757    #[test]
1758    fn test_convert_unary_ops() {
1759        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1760        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1761        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1762        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1763    }
1764
1765    #[test]
1766    fn test_convert_aggregate_functions() {
1767        assert!(matches!(
1768            convert_aggregate_function(LogicalAggregateFunction::Count),
1769            PhysicalAggregateFunction::Count
1770        ));
1771        assert!(matches!(
1772            convert_aggregate_function(LogicalAggregateFunction::Sum),
1773            PhysicalAggregateFunction::Sum
1774        ));
1775        assert!(matches!(
1776            convert_aggregate_function(LogicalAggregateFunction::Avg),
1777            PhysicalAggregateFunction::Avg
1778        ));
1779        assert!(matches!(
1780            convert_aggregate_function(LogicalAggregateFunction::Min),
1781            PhysicalAggregateFunction::Min
1782        ));
1783        assert!(matches!(
1784            convert_aggregate_function(LogicalAggregateFunction::Max),
1785            PhysicalAggregateFunction::Max
1786        ));
1787    }
1788
1789    #[test]
1790    fn test_planner_accessors() {
1791        let store = create_test_store();
1792        let planner = Planner::new(Arc::clone(&store));
1793
1794        assert!(planner.transaction_id().is_none());
1795        assert!(planner.transaction_manager().is_none());
1796        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1797    }
1798
1799    #[test]
1800    fn test_physical_plan_accessors() {
1801        let store = create_test_store();
1802        let planner = Planner::new(store);
1803
1804        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1805            variable: "n".to_string(),
1806            label: None,
1807            input: None,
1808        }));
1809
1810        let physical = planner.plan(&logical).unwrap();
1811        assert_eq!(physical.columns(), &["n"]);
1812
1813        // Test into_operator
1814        let _ = physical.into_operator();
1815    }
1816
1817    // ==================== Adaptive Planning Tests ====================
1818
1819    #[test]
1820    fn test_plan_adaptive_with_scan() {
1821        let store = create_test_store();
1822        let planner = Planner::new(store);
1823
1824        // MATCH (n:Person) RETURN n
1825        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1826            items: vec![ReturnItem {
1827                expression: LogicalExpression::Variable("n".to_string()),
1828                alias: None,
1829            }],
1830            distinct: false,
1831            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1832                variable: "n".to_string(),
1833                label: Some("Person".to_string()),
1834                input: None,
1835            })),
1836        }));
1837
1838        let physical = planner.plan_adaptive(&logical).unwrap();
1839        assert_eq!(physical.columns(), &["n"]);
1840        // Should have adaptive context with estimates
1841        assert!(physical.adaptive_context.is_some());
1842    }
1843
1844    #[test]
1845    fn test_plan_adaptive_with_filter() {
1846        let store = create_test_store();
1847        let planner = Planner::new(store);
1848
1849        // MATCH (n) WHERE n.age > 30 RETURN n
1850        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1851            items: vec![ReturnItem {
1852                expression: LogicalExpression::Variable("n".to_string()),
1853                alias: None,
1854            }],
1855            distinct: false,
1856            input: Box::new(LogicalOperator::Filter(FilterOp {
1857                predicate: LogicalExpression::Binary {
1858                    left: Box::new(LogicalExpression::Property {
1859                        variable: "n".to_string(),
1860                        property: "age".to_string(),
1861                    }),
1862                    op: BinaryOp::Gt,
1863                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1864                },
1865                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1866                    variable: "n".to_string(),
1867                    label: None,
1868                    input: None,
1869                })),
1870                pushdown_hint: None,
1871            })),
1872        }));
1873
1874        let physical = planner.plan_adaptive(&logical).unwrap();
1875        assert!(physical.adaptive_context.is_some());
1876    }
1877
1878    #[test]
1879    fn test_plan_adaptive_with_expand() {
1880        let store = create_test_store();
1881        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1882
1883        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1884        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1885            items: vec![
1886                ReturnItem {
1887                    expression: LogicalExpression::Variable("a".to_string()),
1888                    alias: None,
1889                },
1890                ReturnItem {
1891                    expression: LogicalExpression::Variable("b".to_string()),
1892                    alias: None,
1893                },
1894            ],
1895            distinct: false,
1896            input: Box::new(LogicalOperator::Expand(ExpandOp {
1897                from_variable: "a".to_string(),
1898                to_variable: "b".to_string(),
1899                edge_variable: None,
1900                direction: ExpandDirection::Outgoing,
1901                edge_types: vec!["KNOWS".to_string()],
1902                min_hops: 1,
1903                max_hops: Some(1),
1904                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1905                    variable: "a".to_string(),
1906                    label: None,
1907                    input: None,
1908                })),
1909                path_alias: None,
1910                path_mode: PathMode::Walk,
1911            })),
1912        }));
1913
1914        let physical = planner.plan_adaptive(&logical).unwrap();
1915        assert!(physical.adaptive_context.is_some());
1916    }
1917
1918    #[test]
1919    fn test_plan_adaptive_with_join() {
1920        let store = create_test_store();
1921        let planner = Planner::new(store);
1922
1923        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1924            items: vec![
1925                ReturnItem {
1926                    expression: LogicalExpression::Variable("a".to_string()),
1927                    alias: None,
1928                },
1929                ReturnItem {
1930                    expression: LogicalExpression::Variable("b".to_string()),
1931                    alias: None,
1932                },
1933            ],
1934            distinct: false,
1935            input: Box::new(LogicalOperator::Join(JoinOp {
1936                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1937                    variable: "a".to_string(),
1938                    label: None,
1939                    input: None,
1940                })),
1941                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1942                    variable: "b".to_string(),
1943                    label: None,
1944                    input: None,
1945                })),
1946                join_type: JoinType::Cross,
1947                conditions: vec![],
1948            })),
1949        }));
1950
1951        let physical = planner.plan_adaptive(&logical).unwrap();
1952        assert!(physical.adaptive_context.is_some());
1953    }
1954
1955    #[test]
1956    fn test_plan_adaptive_with_aggregate() {
1957        let store = create_test_store();
1958        let planner = Planner::new(store);
1959
1960        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1961            group_by: vec![],
1962            aggregates: vec![LogicalAggregateExpr {
1963                function: LogicalAggregateFunction::Count,
1964                expression: Some(LogicalExpression::Variable("n".to_string())),
1965                expression2: None,
1966                distinct: false,
1967                alias: Some("cnt".to_string()),
1968                percentile: None,
1969                separator: None,
1970            }],
1971            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1972                variable: "n".to_string(),
1973                label: None,
1974                input: None,
1975            })),
1976            having: None,
1977        }));
1978
1979        let physical = planner.plan_adaptive(&logical).unwrap();
1980        assert!(physical.adaptive_context.is_some());
1981    }
1982
1983    #[test]
1984    fn test_plan_adaptive_with_distinct() {
1985        let store = create_test_store();
1986        let planner = Planner::new(store);
1987
1988        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1989            items: vec![ReturnItem {
1990                expression: LogicalExpression::Variable("n".to_string()),
1991                alias: None,
1992            }],
1993            distinct: false,
1994            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1995                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1996                    variable: "n".to_string(),
1997                    label: None,
1998                    input: None,
1999                })),
2000                columns: None,
2001            })),
2002        }));
2003
2004        let physical = planner.plan_adaptive(&logical).unwrap();
2005        assert!(physical.adaptive_context.is_some());
2006    }
2007
2008    #[test]
2009    fn test_plan_adaptive_with_limit() {
2010        let store = create_test_store();
2011        let planner = Planner::new(store);
2012
2013        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2014            items: vec![ReturnItem {
2015                expression: LogicalExpression::Variable("n".to_string()),
2016                alias: None,
2017            }],
2018            distinct: false,
2019            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2020                count: 10.into(),
2021                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2022                    variable: "n".to_string(),
2023                    label: None,
2024                    input: None,
2025                })),
2026            })),
2027        }));
2028
2029        let physical = planner.plan_adaptive(&logical).unwrap();
2030        assert!(physical.adaptive_context.is_some());
2031    }
2032
2033    #[test]
2034    fn test_plan_adaptive_with_skip() {
2035        let store = create_test_store();
2036        let planner = Planner::new(store);
2037
2038        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2039            items: vec![ReturnItem {
2040                expression: LogicalExpression::Variable("n".to_string()),
2041                alias: None,
2042            }],
2043            distinct: false,
2044            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2045                count: 5.into(),
2046                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2047                    variable: "n".to_string(),
2048                    label: None,
2049                    input: None,
2050                })),
2051            })),
2052        }));
2053
2054        let physical = planner.plan_adaptive(&logical).unwrap();
2055        assert!(physical.adaptive_context.is_some());
2056    }
2057
2058    #[test]
2059    fn test_plan_adaptive_with_sort() {
2060        let store = create_test_store();
2061        let planner = Planner::new(store);
2062
2063        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2064            items: vec![ReturnItem {
2065                expression: LogicalExpression::Variable("n".to_string()),
2066                alias: None,
2067            }],
2068            distinct: false,
2069            input: Box::new(LogicalOperator::Sort(SortOp {
2070                keys: vec![SortKey {
2071                    expression: LogicalExpression::Variable("n".to_string()),
2072                    order: SortOrder::Ascending,
2073                    nulls: None,
2074                }],
2075                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2076                    variable: "n".to_string(),
2077                    label: None,
2078                    input: None,
2079                })),
2080            })),
2081        }));
2082
2083        let physical = planner.plan_adaptive(&logical).unwrap();
2084        assert!(physical.adaptive_context.is_some());
2085    }
2086
2087    #[test]
2088    fn test_plan_adaptive_with_union() {
2089        let store = create_test_store();
2090        let planner = Planner::new(store);
2091
2092        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2093            items: vec![ReturnItem {
2094                expression: LogicalExpression::Variable("n".to_string()),
2095                alias: None,
2096            }],
2097            distinct: false,
2098            input: Box::new(LogicalOperator::Union(UnionOp {
2099                inputs: vec![
2100                    LogicalOperator::NodeScan(NodeScanOp {
2101                        variable: "n".to_string(),
2102                        label: Some("Person".to_string()),
2103                        input: None,
2104                    }),
2105                    LogicalOperator::NodeScan(NodeScanOp {
2106                        variable: "n".to_string(),
2107                        label: Some("Company".to_string()),
2108                        input: None,
2109                    }),
2110                ],
2111            })),
2112        }));
2113
2114        let physical = planner.plan_adaptive(&logical).unwrap();
2115        assert!(physical.adaptive_context.is_some());
2116    }
2117
2118    // ==================== Variable Length Path Tests ====================
2119
2120    #[test]
2121    fn test_plan_expand_variable_length() {
2122        let store = create_test_store();
2123        let planner = Planner::new(store);
2124
2125        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2126        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2127            items: vec![
2128                ReturnItem {
2129                    expression: LogicalExpression::Variable("a".to_string()),
2130                    alias: None,
2131                },
2132                ReturnItem {
2133                    expression: LogicalExpression::Variable("b".to_string()),
2134                    alias: None,
2135                },
2136            ],
2137            distinct: false,
2138            input: Box::new(LogicalOperator::Expand(ExpandOp {
2139                from_variable: "a".to_string(),
2140                to_variable: "b".to_string(),
2141                edge_variable: None,
2142                direction: ExpandDirection::Outgoing,
2143                edge_types: vec!["KNOWS".to_string()],
2144                min_hops: 1,
2145                max_hops: Some(3),
2146                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2147                    variable: "a".to_string(),
2148                    label: None,
2149                    input: None,
2150                })),
2151                path_alias: None,
2152                path_mode: PathMode::Walk,
2153            })),
2154        }));
2155
2156        let physical = planner.plan(&logical).unwrap();
2157        assert!(physical.columns().contains(&"a".to_string()));
2158        assert!(physical.columns().contains(&"b".to_string()));
2159    }
2160
2161    #[test]
2162    fn test_plan_expand_with_path_alias() {
2163        let store = create_test_store();
2164        let planner = Planner::new(store);
2165
2166        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2167        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2168            items: vec![
2169                ReturnItem {
2170                    expression: LogicalExpression::Variable("a".to_string()),
2171                    alias: None,
2172                },
2173                ReturnItem {
2174                    expression: LogicalExpression::Variable("b".to_string()),
2175                    alias: None,
2176                },
2177            ],
2178            distinct: false,
2179            input: Box::new(LogicalOperator::Expand(ExpandOp {
2180                from_variable: "a".to_string(),
2181                to_variable: "b".to_string(),
2182                edge_variable: None,
2183                direction: ExpandDirection::Outgoing,
2184                edge_types: vec!["KNOWS".to_string()],
2185                min_hops: 1,
2186                max_hops: Some(3),
2187                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2188                    variable: "a".to_string(),
2189                    label: None,
2190                    input: None,
2191                })),
2192                path_alias: Some("p".to_string()),
2193                path_mode: PathMode::Walk,
2194            })),
2195        }));
2196
2197        let physical = planner.plan(&logical).unwrap();
2198        // Verify plan was created successfully with expected output columns
2199        assert!(physical.columns().contains(&"a".to_string()));
2200        assert!(physical.columns().contains(&"b".to_string()));
2201    }
2202
2203    #[test]
2204    fn test_plan_expand_incoming() {
2205        let store = create_test_store();
2206        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2207
2208        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2209        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2210            items: vec![
2211                ReturnItem {
2212                    expression: LogicalExpression::Variable("a".to_string()),
2213                    alias: None,
2214                },
2215                ReturnItem {
2216                    expression: LogicalExpression::Variable("b".to_string()),
2217                    alias: None,
2218                },
2219            ],
2220            distinct: false,
2221            input: Box::new(LogicalOperator::Expand(ExpandOp {
2222                from_variable: "a".to_string(),
2223                to_variable: "b".to_string(),
2224                edge_variable: None,
2225                direction: ExpandDirection::Incoming,
2226                edge_types: vec!["KNOWS".to_string()],
2227                min_hops: 1,
2228                max_hops: Some(1),
2229                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2230                    variable: "a".to_string(),
2231                    label: None,
2232                    input: None,
2233                })),
2234                path_alias: None,
2235                path_mode: PathMode::Walk,
2236            })),
2237        }));
2238
2239        let physical = planner.plan(&logical).unwrap();
2240        assert!(physical.columns().contains(&"a".to_string()));
2241        assert!(physical.columns().contains(&"b".to_string()));
2242    }
2243
2244    #[test]
2245    fn test_plan_expand_both_directions() {
2246        let store = create_test_store();
2247        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2248
2249        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2250        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2251            items: vec![
2252                ReturnItem {
2253                    expression: LogicalExpression::Variable("a".to_string()),
2254                    alias: None,
2255                },
2256                ReturnItem {
2257                    expression: LogicalExpression::Variable("b".to_string()),
2258                    alias: None,
2259                },
2260            ],
2261            distinct: false,
2262            input: Box::new(LogicalOperator::Expand(ExpandOp {
2263                from_variable: "a".to_string(),
2264                to_variable: "b".to_string(),
2265                edge_variable: None,
2266                direction: ExpandDirection::Both,
2267                edge_types: vec!["KNOWS".to_string()],
2268                min_hops: 1,
2269                max_hops: Some(1),
2270                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2271                    variable: "a".to_string(),
2272                    label: None,
2273                    input: None,
2274                })),
2275                path_alias: None,
2276                path_mode: PathMode::Walk,
2277            })),
2278        }));
2279
2280        let physical = planner.plan(&logical).unwrap();
2281        assert!(physical.columns().contains(&"a".to_string()));
2282        assert!(physical.columns().contains(&"b".to_string()));
2283    }
2284
2285    // ==================== With Context Tests ====================
2286
2287    #[test]
2288    fn test_planner_with_context() {
2289        use crate::transaction::TransactionManager;
2290
2291        let store = create_test_store();
2292        let transaction_manager = Arc::new(TransactionManager::new());
2293        let transaction_id = transaction_manager.begin();
2294        let epoch = transaction_manager.current_epoch();
2295
2296        let planner = Planner::with_context(
2297            Arc::clone(&store),
2298            Arc::clone(&transaction_manager),
2299            Some(transaction_id),
2300            epoch,
2301        );
2302
2303        assert_eq!(planner.transaction_id(), Some(transaction_id));
2304        assert!(planner.transaction_manager().is_some());
2305        assert_eq!(planner.viewing_epoch(), epoch);
2306    }
2307
2308    #[test]
2309    fn test_planner_with_factorized_execution_disabled() {
2310        let store = create_test_store();
2311        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2312
2313        // Two consecutive expands - should NOT use factorized execution
2314        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2315            items: vec![
2316                ReturnItem {
2317                    expression: LogicalExpression::Variable("a".to_string()),
2318                    alias: None,
2319                },
2320                ReturnItem {
2321                    expression: LogicalExpression::Variable("c".to_string()),
2322                    alias: None,
2323                },
2324            ],
2325            distinct: false,
2326            input: Box::new(LogicalOperator::Expand(ExpandOp {
2327                from_variable: "b".to_string(),
2328                to_variable: "c".to_string(),
2329                edge_variable: None,
2330                direction: ExpandDirection::Outgoing,
2331                edge_types: vec![],
2332                min_hops: 1,
2333                max_hops: Some(1),
2334                input: Box::new(LogicalOperator::Expand(ExpandOp {
2335                    from_variable: "a".to_string(),
2336                    to_variable: "b".to_string(),
2337                    edge_variable: None,
2338                    direction: ExpandDirection::Outgoing,
2339                    edge_types: vec![],
2340                    min_hops: 1,
2341                    max_hops: Some(1),
2342                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2343                        variable: "a".to_string(),
2344                        label: None,
2345                        input: None,
2346                    })),
2347                    path_alias: None,
2348                    path_mode: PathMode::Walk,
2349                })),
2350                path_alias: None,
2351                path_mode: PathMode::Walk,
2352            })),
2353        }));
2354
2355        let physical = planner.plan(&logical).unwrap();
2356        assert!(physical.columns().contains(&"a".to_string()));
2357        assert!(physical.columns().contains(&"c".to_string()));
2358    }
2359
2360    // ==================== Sort with Property Tests ====================
2361
2362    #[test]
2363    fn test_plan_sort_by_property() {
2364        let store = create_test_store();
2365        let planner = Planner::new(store);
2366
2367        // MATCH (n) RETURN n ORDER BY n.name ASC
2368        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2369            items: vec![ReturnItem {
2370                expression: LogicalExpression::Variable("n".to_string()),
2371                alias: None,
2372            }],
2373            distinct: false,
2374            input: Box::new(LogicalOperator::Sort(SortOp {
2375                keys: vec![SortKey {
2376                    expression: LogicalExpression::Property {
2377                        variable: "n".to_string(),
2378                        property: "name".to_string(),
2379                    },
2380                    order: SortOrder::Ascending,
2381                    nulls: None,
2382                }],
2383                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2384                    variable: "n".to_string(),
2385                    label: None,
2386                    input: None,
2387                })),
2388            })),
2389        }));
2390
2391        let physical = planner.plan(&logical).unwrap();
2392        // Should have the property column projected
2393        assert!(physical.columns().contains(&"n".to_string()));
2394    }
2395
2396    // ==================== Scan with Input Tests ====================
2397
2398    #[test]
2399    fn test_plan_scan_with_input() {
2400        let store = create_test_store();
2401        let planner = Planner::new(store);
2402
2403        // A scan with another scan as input (for chained patterns)
2404        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2405            items: vec![
2406                ReturnItem {
2407                    expression: LogicalExpression::Variable("a".to_string()),
2408                    alias: None,
2409                },
2410                ReturnItem {
2411                    expression: LogicalExpression::Variable("b".to_string()),
2412                    alias: None,
2413                },
2414            ],
2415            distinct: false,
2416            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2417                variable: "b".to_string(),
2418                label: Some("Company".to_string()),
2419                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2420                    variable: "a".to_string(),
2421                    label: Some("Person".to_string()),
2422                    input: None,
2423                }))),
2424            })),
2425        }));
2426
2427        let physical = planner.plan(&logical).unwrap();
2428        assert!(physical.columns().contains(&"a".to_string()));
2429        assert!(physical.columns().contains(&"b".to_string()));
2430    }
2431}