Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::types::{EpochId, TransactionId};
27use grafeo_common::types::{LogicalType, Value};
28use grafeo_common::utils::error::{Error, Result};
29use grafeo_core::execution::AdaptiveContext;
30use grafeo_core::execution::operators::{
31    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
32    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator, EmptyOperator,
33    EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep, ExpressionPredicate,
34    FactorizedAggregate, FactorizedAggregateOperator, FilterExpression, FilterOperator,
35    HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
36    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
37    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
38    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
39    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
40    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
41    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnwindOperator,
42    VariableLengthExpandOperator,
43};
44use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
45use std::collections::HashMap;
46use std::sync::Arc;
47
48use crate::query::planner::common;
49use crate::query::planner::common::expression_to_string;
50use crate::query::planner::{
51    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
52    convert_unary_op, value_to_logical_type,
53};
54use crate::transaction::TransactionManager;
55
56/// Range bounds for property-based range queries.
57struct RangeBounds<'a> {
58    min: Option<&'a Value>,
59    max: Option<&'a Value>,
60    min_inclusive: bool,
61    max_inclusive: bool,
62}
63
64/// Converts a logical plan to a physical operator tree for LPG stores.
65pub struct Planner {
66    /// The graph store (supports both read and write operations).
67    pub(super) store: Arc<dyn GraphStoreMut>,
68    /// Transaction manager for MVCC operations.
69    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
70    /// Current transaction ID (if in a transaction).
71    pub(super) transaction_id: Option<TransactionId>,
72    /// Epoch to use for visibility checks.
73    pub(super) viewing_epoch: EpochId,
74    /// Counter for generating unique anonymous edge column names.
75    pub(super) anon_edge_counter: std::cell::Cell<u32>,
76    /// Whether to use factorized execution for multi-hop queries.
77    pub(super) factorized_execution: bool,
78    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
79    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
80    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
81    /// Variables that hold edge IDs (from MATCH edge patterns).
82    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
83    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84    /// Optional constraint validator for schema enforcement during mutations.
85    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
86    /// Catalog for user-defined procedure lookup.
87    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
88    /// Shared parameter state for the currently planning correlated Apply.
89    /// Set by `plan_apply` before planning the inner operator, consumed by
90    /// `plan_operator` when encountering `ParameterScan`.
91    pub(super) correlated_param_state:
92        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
93    /// Variables from variable-length expand patterns (group-list variables).
94    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
95    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
96    /// When true, each physical operator is wrapped in `ProfiledOperator`.
97    profiling: std::cell::Cell<bool>,
98    /// Profile entries collected during planning (post-order).
99    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
100    /// Optional write tracker for recording writes during mutations.
101    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
102    /// Session context for introspection functions (info, schema, current_schema, etc.).
103    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
104}
105
106impl Planner {
107    /// Creates a new planner with the given store.
108    ///
109    /// This creates a planner without transaction context, using the current
110    /// epoch from the store for visibility.
111    #[must_use]
112    pub fn new(store: Arc<dyn GraphStoreMut>) -> Self {
113        let epoch = store.current_epoch();
114        Self {
115            store,
116            transaction_manager: None,
117            transaction_id: None,
118            viewing_epoch: epoch,
119            anon_edge_counter: std::cell::Cell::new(0),
120            factorized_execution: true,
121            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
122            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
123            validator: None,
124            catalog: None,
125            correlated_param_state: std::cell::RefCell::new(None),
126            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
127            profiling: std::cell::Cell::new(false),
128            profile_entries: std::cell::RefCell::new(Vec::new()),
129            write_tracker: None,
130            session_context: grafeo_core::execution::operators::SessionContext::default(),
131        }
132    }
133
134    /// Creates a new planner with transaction context for MVCC-aware planning.
135    #[must_use]
136    pub fn with_context(
137        store: Arc<dyn GraphStoreMut>,
138        transaction_manager: Arc<TransactionManager>,
139        transaction_id: Option<TransactionId>,
140        viewing_epoch: EpochId,
141    ) -> Self {
142        use crate::transaction::TransactionWriteTracker;
143
144        // Create write tracker when there's an active transaction
145        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
146            if transaction_id.is_some() {
147                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
148                    &transaction_manager,
149                ))))
150            } else {
151                None
152            };
153
154        Self {
155            store,
156            transaction_manager: Some(transaction_manager),
157            transaction_id,
158            viewing_epoch,
159            anon_edge_counter: std::cell::Cell::new(0),
160            factorized_execution: true,
161            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
162            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
163            validator: None,
164            catalog: None,
165            correlated_param_state: std::cell::RefCell::new(None),
166            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
167            profiling: std::cell::Cell::new(false),
168            profile_entries: std::cell::RefCell::new(Vec::new()),
169            write_tracker,
170            session_context: grafeo_core::execution::operators::SessionContext::default(),
171        }
172    }
173
174    /// Returns the viewing epoch for this planner.
175    #[must_use]
176    pub fn viewing_epoch(&self) -> EpochId {
177        self.viewing_epoch
178    }
179
180    /// Returns the transaction ID for this planner, if any.
181    #[must_use]
182    pub fn transaction_id(&self) -> Option<TransactionId> {
183        self.transaction_id
184    }
185
186    /// Returns a reference to the transaction manager, if available.
187    #[must_use]
188    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
189        self.transaction_manager.as_ref()
190    }
191
192    /// Enables or disables factorized execution for multi-hop queries.
193    #[must_use]
194    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
195        self.factorized_execution = enabled;
196        self
197    }
198
199    /// Sets the constraint validator for schema enforcement during mutations.
200    #[must_use]
201    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
202        self.validator = Some(validator);
203        self
204    }
205
206    /// Sets the catalog for user-defined procedure lookup.
207    #[must_use]
208    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
209        self.catalog = Some(catalog);
210        self
211    }
212
213    /// Sets the session context for introspection functions.
214    #[must_use]
215    pub fn with_session_context(
216        mut self,
217        context: grafeo_core::execution::operators::SessionContext,
218    ) -> Self {
219        self.session_context = context;
220        self
221    }
222
223    /// Counts consecutive single-hop expand operations.
224    ///
225    /// Returns the count and the deepest non-expand operator (the base of the chain).
226    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
227        match op {
228            LogicalOperator::Expand(expand) => {
229                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
230
231                if is_single_hop {
232                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
233                    (inner_count + 1, base)
234                } else {
235                    (0, op)
236                }
237            }
238            _ => (0, op),
239        }
240    }
241
242    /// Collects expand operations from the outermost down to the base.
243    ///
244    /// Returns expands in order from innermost (base) to outermost.
245    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
246        let mut chain = Vec::new();
247        let mut current = op;
248
249        while let LogicalOperator::Expand(expand) = current {
250            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
251            if !is_single_hop {
252                break;
253            }
254            chain.push(expand);
255            current = &expand.input;
256        }
257
258        chain.reverse();
259        chain
260    }
261
262    /// Plans a logical plan into a physical operator.
263    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
264        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
265        Ok(PhysicalPlan {
266            operator,
267            columns,
268            adaptive_context: None,
269        })
270    }
271
272    /// Plans a logical plan with profiling: each physical operator is wrapped
273    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
274    /// collect row counts and timing. Returns the physical plan together with
275    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
276    /// items in post-order (children before parents).
277    pub fn plan_profiled(
278        &self,
279        logical_plan: &LogicalPlan,
280    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
281        self.profiling.set(true);
282        self.profile_entries.borrow_mut().clear();
283
284        let result = self.plan_operator(&logical_plan.root);
285
286        self.profiling.set(false);
287        let (operator, columns) = result?;
288        let entries = self.profile_entries.borrow_mut().drain(..).collect();
289
290        Ok((
291            PhysicalPlan {
292                operator,
293                columns,
294                adaptive_context: None,
295            },
296            entries,
297        ))
298    }
299
300    /// Plans a logical plan with adaptive execution support.
301    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
302        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
303
304        let mut adaptive_context = AdaptiveContext::new();
305        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
306
307        Ok(PhysicalPlan {
308            operator,
309            columns,
310            adaptive_context: Some(adaptive_context),
311        })
312    }
313
314    /// Collects cardinality estimates from the logical plan into an adaptive context.
315    fn collect_cardinality_estimates(
316        &self,
317        op: &LogicalOperator,
318        ctx: &mut AdaptiveContext,
319        depth: usize,
320    ) {
321        match op {
322            LogicalOperator::NodeScan(scan) => {
323                let estimate = if let Some(label) = &scan.label {
324                    self.store.nodes_by_label(label).len() as f64
325                } else {
326                    self.store.node_count() as f64
327                };
328                let id = format!("scan_{}", scan.variable);
329                ctx.set_estimate(&id, estimate);
330
331                if let Some(input) = &scan.input {
332                    self.collect_cardinality_estimates(input, ctx, depth + 1);
333                }
334            }
335            LogicalOperator::Filter(filter) => {
336                let input_estimate = self.estimate_cardinality(&filter.input);
337                let estimate = input_estimate * 0.3;
338                let id = format!("filter_{depth}");
339                ctx.set_estimate(&id, estimate);
340
341                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
342            }
343            LogicalOperator::Expand(expand) => {
344                let input_estimate = self.estimate_cardinality(&expand.input);
345                let stats = self.store.statistics();
346                let avg_degree = self.estimate_expand_degree(&stats, expand);
347                let estimate = input_estimate * avg_degree;
348                let id = format!("expand_{}", expand.to_variable);
349                ctx.set_estimate(&id, estimate);
350
351                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
352            }
353            LogicalOperator::Join(join) => {
354                let left_est = self.estimate_cardinality(&join.left);
355                let right_est = self.estimate_cardinality(&join.right);
356                let estimate = (left_est * right_est).sqrt();
357                let id = format!("join_{depth}");
358                ctx.set_estimate(&id, estimate);
359
360                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
361                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
362            }
363            LogicalOperator::Aggregate(agg) => {
364                let input_estimate = self.estimate_cardinality(&agg.input);
365                let estimate = if agg.group_by.is_empty() {
366                    1.0
367                } else {
368                    (input_estimate * 0.1).max(1.0)
369                };
370                let id = format!("aggregate_{depth}");
371                ctx.set_estimate(&id, estimate);
372
373                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
374            }
375            LogicalOperator::Distinct(distinct) => {
376                let input_estimate = self.estimate_cardinality(&distinct.input);
377                let estimate = (input_estimate * 0.5).max(1.0);
378                let id = format!("distinct_{depth}");
379                ctx.set_estimate(&id, estimate);
380
381                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
382            }
383            LogicalOperator::Return(ret) => {
384                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
385            }
386            LogicalOperator::Limit(limit) => {
387                let input_estimate = self.estimate_cardinality(&limit.input);
388                let estimate = (input_estimate).min(limit.count.estimate());
389                let id = format!("limit_{depth}");
390                ctx.set_estimate(&id, estimate);
391
392                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
393            }
394            LogicalOperator::Skip(skip) => {
395                let input_estimate = self.estimate_cardinality(&skip.input);
396                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
397                let id = format!("skip_{depth}");
398                ctx.set_estimate(&id, estimate);
399
400                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
401            }
402            LogicalOperator::Sort(sort) => {
403                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
404            }
405            LogicalOperator::Union(union) => {
406                let estimate: f64 = union
407                    .inputs
408                    .iter()
409                    .map(|input| self.estimate_cardinality(input))
410                    .sum();
411                let id = format!("union_{depth}");
412                ctx.set_estimate(&id, estimate);
413
414                for input in &union.inputs {
415                    self.collect_cardinality_estimates(input, ctx, depth + 1);
416                }
417            }
418            _ => {
419                // For other operators, try to recurse into known input patterns
420            }
421        }
422    }
423
424    /// Estimates cardinality for a logical operator subtree.
425    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
426        match op {
427            LogicalOperator::NodeScan(scan) => {
428                if let Some(label) = &scan.label {
429                    self.store.nodes_by_label(label).len() as f64
430                } else {
431                    self.store.node_count() as f64
432                }
433            }
434            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
435            LogicalOperator::Expand(expand) => {
436                let stats = self.store.statistics();
437                let avg_degree = self.estimate_expand_degree(&stats, expand);
438                self.estimate_cardinality(&expand.input) * avg_degree
439            }
440            LogicalOperator::Join(join) => {
441                let left = self.estimate_cardinality(&join.left);
442                let right = self.estimate_cardinality(&join.right);
443                (left * right).sqrt()
444            }
445            LogicalOperator::Aggregate(agg) => {
446                if agg.group_by.is_empty() {
447                    1.0
448                } else {
449                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
450                }
451            }
452            LogicalOperator::Distinct(distinct) => {
453                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
454            }
455            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
456            LogicalOperator::Limit(limit) => self
457                .estimate_cardinality(&limit.input)
458                .min(limit.count.estimate()),
459            LogicalOperator::Skip(skip) => {
460                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
461            }
462            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
463            LogicalOperator::Union(union) => union
464                .inputs
465                .iter()
466                .map(|input| self.estimate_cardinality(input))
467                .sum(),
468            LogicalOperator::Except(except) => {
469                let left = self.estimate_cardinality(&except.left);
470                let right = self.estimate_cardinality(&except.right);
471                (left - right).max(0.0)
472            }
473            LogicalOperator::Intersect(intersect) => {
474                let left = self.estimate_cardinality(&intersect.left);
475                let right = self.estimate_cardinality(&intersect.right);
476                left.min(right)
477            }
478            LogicalOperator::Otherwise(otherwise) => self
479                .estimate_cardinality(&otherwise.left)
480                .max(self.estimate_cardinality(&otherwise.right)),
481            _ => 1000.0,
482        }
483    }
484
485    /// Estimates the average edge degree for an expand operation using store statistics.
486    fn estimate_expand_degree(
487        &self,
488        stats: &grafeo_core::statistics::Statistics,
489        expand: &ExpandOp,
490    ) -> f64 {
491        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
492        if expand.edge_types.len() == 1 {
493            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
494        } else if stats.total_nodes > 0 {
495            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
496        } else {
497            10.0
498        }
499    }
500
501    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
502    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
503    fn maybe_profile(
504        &self,
505        result: Result<(Box<dyn Operator>, Vec<String>)>,
506        op: &LogicalOperator,
507    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
508        if self.profiling.get() {
509            let (physical, columns) = result?;
510            let (entry, stats) =
511                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
512            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
513            self.profile_entries.borrow_mut().push(entry);
514            Ok((Box::new(profiled), columns))
515        } else {
516            result
517        }
518    }
519
520    /// Plans a single logical operator.
521    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
522        let result = match op {
523            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
524            LogicalOperator::Expand(expand) => {
525                if self.factorized_execution {
526                    let (chain_len, _base) = Self::count_expand_chain(op);
527                    if chain_len >= 2 {
528                        return self.maybe_profile(self.plan_expand_chain(op), op);
529                    }
530                }
531                self.plan_expand(expand)
532            }
533            LogicalOperator::Return(ret) => self.plan_return(ret),
534            LogicalOperator::Filter(filter) => self.plan_filter(filter),
535            LogicalOperator::Project(project) => self.plan_project(project),
536            LogicalOperator::Limit(limit) => self.plan_limit(limit),
537            LogicalOperator::Skip(skip) => self.plan_skip(skip),
538            LogicalOperator::Sort(sort) => self.plan_sort(sort),
539            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
540            LogicalOperator::Join(join) => self.plan_join(join),
541            LogicalOperator::Union(union) => self.plan_union(union),
542            LogicalOperator::Except(except) => self.plan_except(except),
543            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
544            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
545            LogicalOperator::Apply(apply) => self.plan_apply(apply),
546            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
547            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
548            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
549            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
550            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
551            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
552            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
553            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
554            LogicalOperator::Merge(merge) => self.plan_merge(merge),
555            LogicalOperator::MergeRelationship(merge_rel) => {
556                self.plan_merge_relationship(merge_rel)
557            }
558            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
559            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
560            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
561            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
562            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
563            #[cfg(feature = "algos")]
564            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
565            #[cfg(not(feature = "algos"))]
566            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
567                "CALL procedures require the 'algos' feature".to_string(),
568            )),
569            LogicalOperator::ParameterScan(_param_scan) => {
570                let state = self
571                    .correlated_param_state
572                    .borrow()
573                    .clone()
574                    .ok_or_else(|| {
575                        Error::Internal(
576                            "ParameterScan without correlated Apply context".to_string(),
577                        )
578                    })?;
579                // Use the actual column names from the ParameterState (which may
580                // have been expanded from "*" to real variable names in plan_apply)
581                let columns = state.columns.clone();
582                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
583                Ok((operator, columns))
584            }
585            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
586            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
587            LogicalOperator::LoadData(load) => {
588                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
589                    load.path.clone(),
590                    load.format,
591                    load.with_headers,
592                    load.field_terminator,
593                    load.variable.clone(),
594                ));
595                Ok((operator, vec![load.variable.clone()]))
596            }
597            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
598            LogicalOperator::VectorScan(_) => Err(Error::Internal(
599                "VectorScan requires vector-index feature".to_string(),
600            )),
601            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
602                "VectorJoin requires vector-index feature".to_string(),
603            )),
604            _ => Err(Error::Internal(format!(
605                "Unsupported operator: {:?}",
606                std::mem::discriminant(op)
607            ))),
608        };
609        self.maybe_profile(result, op)
610    }
611
612    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
613    fn plan_horizontal_aggregate(
614        &self,
615        ha: &HorizontalAggregateOp,
616    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
617        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
618
619        let list_col_idx = child_columns
620            .iter()
621            .position(|c| c == &ha.list_column)
622            .ok_or_else(|| {
623                Error::Internal(format!(
624                    "HorizontalAggregate list column '{}' not found in {:?}",
625                    ha.list_column, child_columns
626                ))
627            })?;
628
629        let entity_kind = match ha.entity_kind {
630            LogicalEntityKind::Edge => EntityKind::Edge,
631            LogicalEntityKind::Node => EntityKind::Node,
632        };
633
634        let function = convert_aggregate_function(ha.function);
635        let input_column_count = child_columns.len();
636
637        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
638            child_op,
639            list_col_idx,
640            entity_kind,
641            function,
642            ha.property.clone(),
643            Arc::clone(&self.store) as Arc<dyn GraphStore>,
644            input_column_count,
645        ));
646
647        let mut columns = child_columns;
648        columns.push(ha.alias.clone());
649        // Mark the result as a scalar column
650        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
651
652        Ok((operator, columns))
653    }
654
655    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
656    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
657        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
658        let key_idx = child_columns
659            .iter()
660            .position(|c| c == &mc.key_var)
661            .ok_or_else(|| {
662                Error::Internal(format!(
663                    "MapCollect key '{}' not in columns {:?}",
664                    mc.key_var, child_columns
665                ))
666            })?;
667        let value_idx = child_columns
668            .iter()
669            .position(|c| c == &mc.value_var)
670            .ok_or_else(|| {
671                Error::Internal(format!(
672                    "MapCollect value '{}' not in columns {:?}",
673                    mc.value_var, child_columns
674                ))
675            })?;
676        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
677        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
678        Ok((operator, vec![mc.alias.clone()]))
679    }
680}
681
682/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
683#[cfg(feature = "algos")]
684struct StaticResultOperator {
685    rows: Vec<Vec<Value>>,
686    column_indices: Vec<usize>,
687    row_index: usize,
688}
689
690#[cfg(feature = "algos")]
691impl Operator for StaticResultOperator {
692    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
693        use grafeo_core::execution::DataChunk;
694
695        if self.row_index >= self.rows.len() {
696            return Ok(None);
697        }
698
699        let remaining = self.rows.len() - self.row_index;
700        let chunk_rows = remaining.min(1024);
701        let col_count = self.column_indices.len();
702
703        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
704        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
705
706        for row_offset in 0..chunk_rows {
707            let row = &self.rows[self.row_index + row_offset];
708            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
709                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
710                if let Some(col) = chunk.column_mut(col_idx) {
711                    col.push_value(value);
712                }
713            }
714        }
715        chunk.set_count(chunk_rows);
716
717        self.row_index += chunk_rows;
718        Ok(Some(chunk))
719    }
720
721    fn reset(&mut self) {
722        self.row_index = 0;
723    }
724
725    fn name(&self) -> &'static str {
726        "StaticResult"
727    }
728}
729
730#[cfg(test)]
731mod tests {
732    use super::*;
733    use crate::query::plan::{
734        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
735        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
736        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
737        SkipOp as LogicalSkipOp, SortKey, SortOp,
738    };
739    use grafeo_common::types::Value;
740    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
741    use grafeo_core::graph::GraphStoreMut;
742    use grafeo_core::graph::lpg::LpgStore;
743
744    fn create_test_store() -> Arc<dyn GraphStoreMut> {
745        let store = Arc::new(LpgStore::new().unwrap());
746        store.create_node(&["Person"]);
747        store.create_node(&["Person"]);
748        store.create_node(&["Company"]);
749        store
750    }
751
752    // ==================== Simple Scan Tests ====================
753
754    #[test]
755    fn test_plan_simple_scan() {
756        let store = create_test_store();
757        let planner = Planner::new(store);
758
759        // MATCH (n:Person) RETURN n
760        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
761            items: vec![ReturnItem {
762                expression: LogicalExpression::Variable("n".to_string()),
763                alias: None,
764            }],
765            distinct: false,
766            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
767                variable: "n".to_string(),
768                label: Some("Person".to_string()),
769                input: None,
770            })),
771        }));
772
773        let physical = planner.plan(&logical).unwrap();
774        assert_eq!(physical.columns(), &["n"]);
775    }
776
777    #[test]
778    fn test_plan_scan_without_label() {
779        let store = create_test_store();
780        let planner = Planner::new(store);
781
782        // MATCH (n) RETURN n
783        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
784            items: vec![ReturnItem {
785                expression: LogicalExpression::Variable("n".to_string()),
786                alias: None,
787            }],
788            distinct: false,
789            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
790                variable: "n".to_string(),
791                label: None,
792                input: None,
793            })),
794        }));
795
796        let physical = planner.plan(&logical).unwrap();
797        assert_eq!(physical.columns(), &["n"]);
798    }
799
800    #[test]
801    fn test_plan_return_with_alias() {
802        let store = create_test_store();
803        let planner = Planner::new(store);
804
805        // MATCH (n:Person) RETURN n AS person
806        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
807            items: vec![ReturnItem {
808                expression: LogicalExpression::Variable("n".to_string()),
809                alias: Some("person".to_string()),
810            }],
811            distinct: false,
812            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
813                variable: "n".to_string(),
814                label: Some("Person".to_string()),
815                input: None,
816            })),
817        }));
818
819        let physical = planner.plan(&logical).unwrap();
820        assert_eq!(physical.columns(), &["person"]);
821    }
822
823    #[test]
824    fn test_plan_return_property() {
825        let store = create_test_store();
826        let planner = Planner::new(store);
827
828        // MATCH (n:Person) RETURN n.name
829        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
830            items: vec![ReturnItem {
831                expression: LogicalExpression::Property {
832                    variable: "n".to_string(),
833                    property: "name".to_string(),
834                },
835                alias: None,
836            }],
837            distinct: false,
838            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
839                variable: "n".to_string(),
840                label: Some("Person".to_string()),
841                input: None,
842            })),
843        }));
844
845        let physical = planner.plan(&logical).unwrap();
846        assert_eq!(physical.columns(), &["n.name"]);
847    }
848
849    #[test]
850    fn test_plan_return_literal() {
851        let store = create_test_store();
852        let planner = Planner::new(store);
853
854        // MATCH (n) RETURN 42 AS answer
855        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
856            items: vec![ReturnItem {
857                expression: LogicalExpression::Literal(Value::Int64(42)),
858                alias: Some("answer".to_string()),
859            }],
860            distinct: false,
861            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
862                variable: "n".to_string(),
863                label: None,
864                input: None,
865            })),
866        }));
867
868        let physical = planner.plan(&logical).unwrap();
869        assert_eq!(physical.columns(), &["answer"]);
870    }
871
872    // ==================== Filter Tests ====================
873
874    #[test]
875    fn test_plan_filter_equality() {
876        let store = create_test_store();
877        let planner = Planner::new(store);
878
879        // MATCH (n:Person) WHERE n.age = 30 RETURN n
880        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
881            items: vec![ReturnItem {
882                expression: LogicalExpression::Variable("n".to_string()),
883                alias: None,
884            }],
885            distinct: false,
886            input: Box::new(LogicalOperator::Filter(FilterOp {
887                predicate: LogicalExpression::Binary {
888                    left: Box::new(LogicalExpression::Property {
889                        variable: "n".to_string(),
890                        property: "age".to_string(),
891                    }),
892                    op: BinaryOp::Eq,
893                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
894                },
895                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
896                    variable: "n".to_string(),
897                    label: Some("Person".to_string()),
898                    input: None,
899                })),
900                pushdown_hint: None,
901            })),
902        }));
903
904        let physical = planner.plan(&logical).unwrap();
905        assert_eq!(physical.columns(), &["n"]);
906    }
907
908    #[test]
909    fn test_plan_filter_compound_and() {
910        let store = create_test_store();
911        let planner = Planner::new(store);
912
913        // WHERE n.age > 20 AND n.age < 40
914        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
915            items: vec![ReturnItem {
916                expression: LogicalExpression::Variable("n".to_string()),
917                alias: None,
918            }],
919            distinct: false,
920            input: Box::new(LogicalOperator::Filter(FilterOp {
921                predicate: LogicalExpression::Binary {
922                    left: Box::new(LogicalExpression::Binary {
923                        left: Box::new(LogicalExpression::Property {
924                            variable: "n".to_string(),
925                            property: "age".to_string(),
926                        }),
927                        op: BinaryOp::Gt,
928                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
929                    }),
930                    op: BinaryOp::And,
931                    right: Box::new(LogicalExpression::Binary {
932                        left: Box::new(LogicalExpression::Property {
933                            variable: "n".to_string(),
934                            property: "age".to_string(),
935                        }),
936                        op: BinaryOp::Lt,
937                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
938                    }),
939                },
940                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
941                    variable: "n".to_string(),
942                    label: None,
943                    input: None,
944                })),
945                pushdown_hint: None,
946            })),
947        }));
948
949        let physical = planner.plan(&logical).unwrap();
950        assert_eq!(physical.columns(), &["n"]);
951    }
952
953    #[test]
954    fn test_plan_filter_unary_not() {
955        let store = create_test_store();
956        let planner = Planner::new(store);
957
958        // WHERE NOT n.active
959        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
960            items: vec![ReturnItem {
961                expression: LogicalExpression::Variable("n".to_string()),
962                alias: None,
963            }],
964            distinct: false,
965            input: Box::new(LogicalOperator::Filter(FilterOp {
966                predicate: LogicalExpression::Unary {
967                    op: UnaryOp::Not,
968                    operand: Box::new(LogicalExpression::Property {
969                        variable: "n".to_string(),
970                        property: "active".to_string(),
971                    }),
972                },
973                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
974                    variable: "n".to_string(),
975                    label: None,
976                    input: None,
977                })),
978                pushdown_hint: None,
979            })),
980        }));
981
982        let physical = planner.plan(&logical).unwrap();
983        assert_eq!(physical.columns(), &["n"]);
984    }
985
986    #[test]
987    fn test_plan_filter_is_null() {
988        let store = create_test_store();
989        let planner = Planner::new(store);
990
991        // WHERE n.email IS NULL
992        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
993            items: vec![ReturnItem {
994                expression: LogicalExpression::Variable("n".to_string()),
995                alias: None,
996            }],
997            distinct: false,
998            input: Box::new(LogicalOperator::Filter(FilterOp {
999                predicate: LogicalExpression::Unary {
1000                    op: UnaryOp::IsNull,
1001                    operand: Box::new(LogicalExpression::Property {
1002                        variable: "n".to_string(),
1003                        property: "email".to_string(),
1004                    }),
1005                },
1006                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1007                    variable: "n".to_string(),
1008                    label: None,
1009                    input: None,
1010                })),
1011                pushdown_hint: None,
1012            })),
1013        }));
1014
1015        let physical = planner.plan(&logical).unwrap();
1016        assert_eq!(physical.columns(), &["n"]);
1017    }
1018
1019    #[test]
1020    fn test_plan_filter_function_call() {
1021        let store = create_test_store();
1022        let planner = Planner::new(store);
1023
1024        // WHERE size(n.friends) > 0
1025        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1026            items: vec![ReturnItem {
1027                expression: LogicalExpression::Variable("n".to_string()),
1028                alias: None,
1029            }],
1030            distinct: false,
1031            input: Box::new(LogicalOperator::Filter(FilterOp {
1032                predicate: LogicalExpression::Binary {
1033                    left: Box::new(LogicalExpression::FunctionCall {
1034                        name: "size".to_string(),
1035                        args: vec![LogicalExpression::Property {
1036                            variable: "n".to_string(),
1037                            property: "friends".to_string(),
1038                        }],
1039                        distinct: false,
1040                    }),
1041                    op: BinaryOp::Gt,
1042                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1043                },
1044                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1045                    variable: "n".to_string(),
1046                    label: None,
1047                    input: None,
1048                })),
1049                pushdown_hint: None,
1050            })),
1051        }));
1052
1053        let physical = planner.plan(&logical).unwrap();
1054        assert_eq!(physical.columns(), &["n"]);
1055    }
1056
1057    // ==================== Expand Tests ====================
1058
1059    #[test]
1060    fn test_plan_expand_outgoing() {
1061        let store = create_test_store();
1062        let planner = Planner::new(store);
1063
1064        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1065        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1066            items: vec![
1067                ReturnItem {
1068                    expression: LogicalExpression::Variable("a".to_string()),
1069                    alias: None,
1070                },
1071                ReturnItem {
1072                    expression: LogicalExpression::Variable("b".to_string()),
1073                    alias: None,
1074                },
1075            ],
1076            distinct: false,
1077            input: Box::new(LogicalOperator::Expand(ExpandOp {
1078                from_variable: "a".to_string(),
1079                to_variable: "b".to_string(),
1080                edge_variable: None,
1081                direction: ExpandDirection::Outgoing,
1082                edge_types: vec!["KNOWS".to_string()],
1083                min_hops: 1,
1084                max_hops: Some(1),
1085                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1086                    variable: "a".to_string(),
1087                    label: Some("Person".to_string()),
1088                    input: None,
1089                })),
1090                path_alias: None,
1091                path_mode: PathMode::Walk,
1092            })),
1093        }));
1094
1095        let physical = planner.plan(&logical).unwrap();
1096        // The return should have columns [a, b]
1097        assert!(physical.columns().contains(&"a".to_string()));
1098        assert!(physical.columns().contains(&"b".to_string()));
1099    }
1100
1101    #[test]
1102    fn test_plan_expand_with_edge_variable() {
1103        let store = create_test_store();
1104        let planner = Planner::new(store);
1105
1106        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1107        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1108            items: vec![
1109                ReturnItem {
1110                    expression: LogicalExpression::Variable("a".to_string()),
1111                    alias: None,
1112                },
1113                ReturnItem {
1114                    expression: LogicalExpression::Variable("r".to_string()),
1115                    alias: None,
1116                },
1117                ReturnItem {
1118                    expression: LogicalExpression::Variable("b".to_string()),
1119                    alias: None,
1120                },
1121            ],
1122            distinct: false,
1123            input: Box::new(LogicalOperator::Expand(ExpandOp {
1124                from_variable: "a".to_string(),
1125                to_variable: "b".to_string(),
1126                edge_variable: Some("r".to_string()),
1127                direction: ExpandDirection::Outgoing,
1128                edge_types: vec!["KNOWS".to_string()],
1129                min_hops: 1,
1130                max_hops: Some(1),
1131                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1132                    variable: "a".to_string(),
1133                    label: None,
1134                    input: None,
1135                })),
1136                path_alias: None,
1137                path_mode: PathMode::Walk,
1138            })),
1139        }));
1140
1141        let physical = planner.plan(&logical).unwrap();
1142        assert!(physical.columns().contains(&"a".to_string()));
1143        assert!(physical.columns().contains(&"r".to_string()));
1144        assert!(physical.columns().contains(&"b".to_string()));
1145    }
1146
1147    // ==================== Limit/Skip/Sort Tests ====================
1148
1149    #[test]
1150    fn test_plan_limit() {
1151        let store = create_test_store();
1152        let planner = Planner::new(store);
1153
1154        // MATCH (n) RETURN n LIMIT 10
1155        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1156            items: vec![ReturnItem {
1157                expression: LogicalExpression::Variable("n".to_string()),
1158                alias: None,
1159            }],
1160            distinct: false,
1161            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1162                count: 10.into(),
1163                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1164                    variable: "n".to_string(),
1165                    label: None,
1166                    input: None,
1167                })),
1168            })),
1169        }));
1170
1171        let physical = planner.plan(&logical).unwrap();
1172        assert_eq!(physical.columns(), &["n"]);
1173    }
1174
1175    #[test]
1176    fn test_plan_skip() {
1177        let store = create_test_store();
1178        let planner = Planner::new(store);
1179
1180        // MATCH (n) RETURN n SKIP 5
1181        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1182            items: vec![ReturnItem {
1183                expression: LogicalExpression::Variable("n".to_string()),
1184                alias: None,
1185            }],
1186            distinct: false,
1187            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1188                count: 5.into(),
1189                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1190                    variable: "n".to_string(),
1191                    label: None,
1192                    input: None,
1193                })),
1194            })),
1195        }));
1196
1197        let physical = planner.plan(&logical).unwrap();
1198        assert_eq!(physical.columns(), &["n"]);
1199    }
1200
1201    #[test]
1202    fn test_plan_sort() {
1203        let store = create_test_store();
1204        let planner = Planner::new(store);
1205
1206        // MATCH (n) RETURN n ORDER BY n.name ASC
1207        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1208            items: vec![ReturnItem {
1209                expression: LogicalExpression::Variable("n".to_string()),
1210                alias: None,
1211            }],
1212            distinct: false,
1213            input: Box::new(LogicalOperator::Sort(SortOp {
1214                keys: vec![SortKey {
1215                    expression: LogicalExpression::Variable("n".to_string()),
1216                    order: SortOrder::Ascending,
1217                    nulls: None,
1218                }],
1219                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1220                    variable: "n".to_string(),
1221                    label: None,
1222                    input: None,
1223                })),
1224            })),
1225        }));
1226
1227        let physical = planner.plan(&logical).unwrap();
1228        assert_eq!(physical.columns(), &["n"]);
1229    }
1230
1231    #[test]
1232    fn test_plan_sort_descending() {
1233        let store = create_test_store();
1234        let planner = Planner::new(store);
1235
1236        // ORDER BY n DESC
1237        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1238            items: vec![ReturnItem {
1239                expression: LogicalExpression::Variable("n".to_string()),
1240                alias: None,
1241            }],
1242            distinct: false,
1243            input: Box::new(LogicalOperator::Sort(SortOp {
1244                keys: vec![SortKey {
1245                    expression: LogicalExpression::Variable("n".to_string()),
1246                    order: SortOrder::Descending,
1247                    nulls: None,
1248                }],
1249                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1250                    variable: "n".to_string(),
1251                    label: None,
1252                    input: None,
1253                })),
1254            })),
1255        }));
1256
1257        let physical = planner.plan(&logical).unwrap();
1258        assert_eq!(physical.columns(), &["n"]);
1259    }
1260
1261    #[test]
1262    fn test_plan_distinct() {
1263        let store = create_test_store();
1264        let planner = Planner::new(store);
1265
1266        // MATCH (n) RETURN DISTINCT n
1267        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1268            items: vec![ReturnItem {
1269                expression: LogicalExpression::Variable("n".to_string()),
1270                alias: None,
1271            }],
1272            distinct: false,
1273            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1274                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1275                    variable: "n".to_string(),
1276                    label: None,
1277                    input: None,
1278                })),
1279                columns: None,
1280            })),
1281        }));
1282
1283        let physical = planner.plan(&logical).unwrap();
1284        assert_eq!(physical.columns(), &["n"]);
1285    }
1286
1287    #[test]
1288    fn test_plan_distinct_with_columns() {
1289        let store = create_test_store();
1290        let planner = Planner::new(store);
1291
1292        // DISTINCT on specific columns (column-specific dedup)
1293        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1294            items: vec![ReturnItem {
1295                expression: LogicalExpression::Variable("n".to_string()),
1296                alias: None,
1297            }],
1298            distinct: false,
1299            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1300                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1301                    variable: "n".to_string(),
1302                    label: None,
1303                    input: None,
1304                })),
1305                columns: Some(vec!["n".to_string()]),
1306            })),
1307        }));
1308
1309        let physical = planner.plan(&logical).unwrap();
1310        assert_eq!(physical.columns(), &["n"]);
1311    }
1312
1313    #[test]
1314    fn test_plan_distinct_with_nonexistent_columns() {
1315        let store = create_test_store();
1316        let planner = Planner::new(store);
1317
1318        // When distinct columns don't match any output columns,
1319        // it falls back to full-row distinct.
1320        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1321            items: vec![ReturnItem {
1322                expression: LogicalExpression::Variable("n".to_string()),
1323                alias: None,
1324            }],
1325            distinct: false,
1326            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1327                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1328                    variable: "n".to_string(),
1329                    label: None,
1330                    input: None,
1331                })),
1332                columns: Some(vec!["nonexistent".to_string()]),
1333            })),
1334        }));
1335
1336        let physical = planner.plan(&logical).unwrap();
1337        assert_eq!(physical.columns(), &["n"]);
1338    }
1339
1340    // ==================== Aggregate Tests ====================
1341
1342    #[test]
1343    fn test_plan_aggregate_count() {
1344        let store = create_test_store();
1345        let planner = Planner::new(store);
1346
1347        // MATCH (n) RETURN count(n)
1348        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1349            items: vec![ReturnItem {
1350                expression: LogicalExpression::Variable("cnt".to_string()),
1351                alias: None,
1352            }],
1353            distinct: false,
1354            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1355                group_by: vec![],
1356                aggregates: vec![LogicalAggregateExpr {
1357                    function: LogicalAggregateFunction::Count,
1358                    expression: Some(LogicalExpression::Variable("n".to_string())),
1359                    expression2: None,
1360                    distinct: false,
1361                    alias: Some("cnt".to_string()),
1362                    percentile: None,
1363                    separator: None,
1364                }],
1365                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1366                    variable: "n".to_string(),
1367                    label: None,
1368                    input: None,
1369                })),
1370                having: None,
1371            })),
1372        }));
1373
1374        let physical = planner.plan(&logical).unwrap();
1375        assert!(physical.columns().contains(&"cnt".to_string()));
1376    }
1377
1378    #[test]
1379    fn test_plan_aggregate_with_group_by() {
1380        let store = create_test_store();
1381        let planner = Planner::new(store);
1382
1383        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1384        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1385            group_by: vec![LogicalExpression::Property {
1386                variable: "n".to_string(),
1387                property: "city".to_string(),
1388            }],
1389            aggregates: vec![LogicalAggregateExpr {
1390                function: LogicalAggregateFunction::Count,
1391                expression: Some(LogicalExpression::Variable("n".to_string())),
1392                expression2: None,
1393                distinct: false,
1394                alias: Some("cnt".to_string()),
1395                percentile: None,
1396                separator: None,
1397            }],
1398            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1399                variable: "n".to_string(),
1400                label: Some("Person".to_string()),
1401                input: None,
1402            })),
1403            having: None,
1404        }));
1405
1406        let physical = planner.plan(&logical).unwrap();
1407        assert_eq!(physical.columns().len(), 2);
1408    }
1409
1410    #[test]
1411    fn test_plan_aggregate_sum() {
1412        let store = create_test_store();
1413        let planner = Planner::new(store);
1414
1415        // SUM(n.value)
1416        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1417            group_by: vec![],
1418            aggregates: vec![LogicalAggregateExpr {
1419                function: LogicalAggregateFunction::Sum,
1420                expression: Some(LogicalExpression::Property {
1421                    variable: "n".to_string(),
1422                    property: "value".to_string(),
1423                }),
1424                expression2: None,
1425                distinct: false,
1426                alias: Some("total".to_string()),
1427                percentile: None,
1428                separator: None,
1429            }],
1430            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1431                variable: "n".to_string(),
1432                label: None,
1433                input: None,
1434            })),
1435            having: None,
1436        }));
1437
1438        let physical = planner.plan(&logical).unwrap();
1439        assert!(physical.columns().contains(&"total".to_string()));
1440    }
1441
1442    #[test]
1443    fn test_plan_aggregate_avg() {
1444        let store = create_test_store();
1445        let planner = Planner::new(store);
1446
1447        // AVG(n.score)
1448        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1449            group_by: vec![],
1450            aggregates: vec![LogicalAggregateExpr {
1451                function: LogicalAggregateFunction::Avg,
1452                expression: Some(LogicalExpression::Property {
1453                    variable: "n".to_string(),
1454                    property: "score".to_string(),
1455                }),
1456                expression2: None,
1457                distinct: false,
1458                alias: Some("average".to_string()),
1459                percentile: None,
1460                separator: None,
1461            }],
1462            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1463                variable: "n".to_string(),
1464                label: None,
1465                input: None,
1466            })),
1467            having: None,
1468        }));
1469
1470        let physical = planner.plan(&logical).unwrap();
1471        assert!(physical.columns().contains(&"average".to_string()));
1472    }
1473
1474    #[test]
1475    fn test_plan_aggregate_min_max() {
1476        let store = create_test_store();
1477        let planner = Planner::new(store);
1478
1479        // MIN(n.age), MAX(n.age)
1480        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1481            group_by: vec![],
1482            aggregates: vec![
1483                LogicalAggregateExpr {
1484                    function: LogicalAggregateFunction::Min,
1485                    expression: Some(LogicalExpression::Property {
1486                        variable: "n".to_string(),
1487                        property: "age".to_string(),
1488                    }),
1489                    expression2: None,
1490                    distinct: false,
1491                    alias: Some("youngest".to_string()),
1492                    percentile: None,
1493                    separator: None,
1494                },
1495                LogicalAggregateExpr {
1496                    function: LogicalAggregateFunction::Max,
1497                    expression: Some(LogicalExpression::Property {
1498                        variable: "n".to_string(),
1499                        property: "age".to_string(),
1500                    }),
1501                    expression2: None,
1502                    distinct: false,
1503                    alias: Some("oldest".to_string()),
1504                    percentile: None,
1505                    separator: None,
1506                },
1507            ],
1508            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1509                variable: "n".to_string(),
1510                label: None,
1511                input: None,
1512            })),
1513            having: None,
1514        }));
1515
1516        let physical = planner.plan(&logical).unwrap();
1517        assert!(physical.columns().contains(&"youngest".to_string()));
1518        assert!(physical.columns().contains(&"oldest".to_string()));
1519    }
1520
1521    // ==================== Join Tests ====================
1522
1523    #[test]
1524    fn test_plan_inner_join() {
1525        let store = create_test_store();
1526        let planner = Planner::new(store);
1527
1528        // Inner join between two scans
1529        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1530            items: vec![
1531                ReturnItem {
1532                    expression: LogicalExpression::Variable("a".to_string()),
1533                    alias: None,
1534                },
1535                ReturnItem {
1536                    expression: LogicalExpression::Variable("b".to_string()),
1537                    alias: None,
1538                },
1539            ],
1540            distinct: false,
1541            input: Box::new(LogicalOperator::Join(JoinOp {
1542                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1543                    variable: "a".to_string(),
1544                    label: Some("Person".to_string()),
1545                    input: None,
1546                })),
1547                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1548                    variable: "b".to_string(),
1549                    label: Some("Company".to_string()),
1550                    input: None,
1551                })),
1552                join_type: JoinType::Inner,
1553                conditions: vec![JoinCondition {
1554                    left: LogicalExpression::Variable("a".to_string()),
1555                    right: LogicalExpression::Variable("b".to_string()),
1556                }],
1557            })),
1558        }));
1559
1560        let physical = planner.plan(&logical).unwrap();
1561        assert!(physical.columns().contains(&"a".to_string()));
1562        assert!(physical.columns().contains(&"b".to_string()));
1563    }
1564
1565    #[test]
1566    fn test_plan_cross_join() {
1567        let store = create_test_store();
1568        let planner = Planner::new(store);
1569
1570        // Cross join (no conditions)
1571        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1572            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1573                variable: "a".to_string(),
1574                label: None,
1575                input: None,
1576            })),
1577            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1578                variable: "b".to_string(),
1579                label: None,
1580                input: None,
1581            })),
1582            join_type: JoinType::Cross,
1583            conditions: vec![],
1584        }));
1585
1586        let physical = planner.plan(&logical).unwrap();
1587        assert_eq!(physical.columns().len(), 2);
1588    }
1589
1590    #[test]
1591    fn test_plan_left_join() {
1592        let store = create_test_store();
1593        let planner = Planner::new(store);
1594
1595        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1596            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1597                variable: "a".to_string(),
1598                label: None,
1599                input: None,
1600            })),
1601            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1602                variable: "b".to_string(),
1603                label: None,
1604                input: None,
1605            })),
1606            join_type: JoinType::Left,
1607            conditions: vec![],
1608        }));
1609
1610        let physical = planner.plan(&logical).unwrap();
1611        assert_eq!(physical.columns().len(), 2);
1612    }
1613
1614    // ==================== Mutation Tests ====================
1615
1616    #[test]
1617    fn test_plan_create_node() {
1618        let store = create_test_store();
1619        let planner = Planner::new(store);
1620
1621        // CREATE (n:Person {name: 'Alix'})
1622        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1623            variable: "n".to_string(),
1624            labels: vec!["Person".to_string()],
1625            properties: vec![(
1626                "name".to_string(),
1627                LogicalExpression::Literal(Value::String("Alix".into())),
1628            )],
1629            input: None,
1630        }));
1631
1632        let physical = planner.plan(&logical).unwrap();
1633        assert!(physical.columns().contains(&"n".to_string()));
1634    }
1635
1636    #[test]
1637    fn test_plan_create_edge() {
1638        let store = create_test_store();
1639        let planner = Planner::new(store);
1640
1641        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1642        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1643            variable: Some("r".to_string()),
1644            from_variable: "a".to_string(),
1645            to_variable: "b".to_string(),
1646            edge_type: "KNOWS".to_string(),
1647            properties: vec![],
1648            input: Box::new(LogicalOperator::Join(JoinOp {
1649                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1650                    variable: "a".to_string(),
1651                    label: None,
1652                    input: None,
1653                })),
1654                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1655                    variable: "b".to_string(),
1656                    label: None,
1657                    input: None,
1658                })),
1659                join_type: JoinType::Cross,
1660                conditions: vec![],
1661            })),
1662        }));
1663
1664        let physical = planner.plan(&logical).unwrap();
1665        assert!(physical.columns().contains(&"r".to_string()));
1666    }
1667
1668    #[test]
1669    fn test_plan_delete_node() {
1670        let store = create_test_store();
1671        let planner = Planner::new(store);
1672
1673        // MATCH (n) DELETE n
1674        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1675            variable: "n".to_string(),
1676            detach: false,
1677            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1678                variable: "n".to_string(),
1679                label: None,
1680                input: None,
1681            })),
1682        }));
1683
1684        let physical = planner.plan(&logical).unwrap();
1685        assert!(physical.columns().contains(&"n".to_string()));
1686    }
1687
1688    // ==================== Error Cases ====================
1689
1690    #[test]
1691    fn test_plan_empty_errors() {
1692        let store = create_test_store();
1693        let planner = Planner::new(store);
1694
1695        let logical = LogicalPlan::new(LogicalOperator::Empty);
1696        let result = planner.plan(&logical);
1697        assert!(result.is_err());
1698    }
1699
1700    #[test]
1701    fn test_plan_missing_variable_in_return() {
1702        let store = create_test_store();
1703        let planner = Planner::new(store);
1704
1705        // Return variable that doesn't exist in input
1706        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1707            items: vec![ReturnItem {
1708                expression: LogicalExpression::Variable("missing".to_string()),
1709                alias: None,
1710            }],
1711            distinct: false,
1712            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1713                variable: "n".to_string(),
1714                label: None,
1715                input: None,
1716            })),
1717        }));
1718
1719        let result = planner.plan(&logical);
1720        assert!(result.is_err());
1721    }
1722
1723    // ==================== Helper Function Tests ====================
1724
1725    #[test]
1726    fn test_convert_binary_ops() {
1727        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1728        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1729        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1730        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1731        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1732        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1733        assert!(convert_binary_op(BinaryOp::And).is_ok());
1734        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1735        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1736        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1737        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1738        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1739    }
1740
1741    #[test]
1742    fn test_convert_unary_ops() {
1743        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1744        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1745        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1746        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1747    }
1748
1749    #[test]
1750    fn test_convert_aggregate_functions() {
1751        assert!(matches!(
1752            convert_aggregate_function(LogicalAggregateFunction::Count),
1753            PhysicalAggregateFunction::Count
1754        ));
1755        assert!(matches!(
1756            convert_aggregate_function(LogicalAggregateFunction::Sum),
1757            PhysicalAggregateFunction::Sum
1758        ));
1759        assert!(matches!(
1760            convert_aggregate_function(LogicalAggregateFunction::Avg),
1761            PhysicalAggregateFunction::Avg
1762        ));
1763        assert!(matches!(
1764            convert_aggregate_function(LogicalAggregateFunction::Min),
1765            PhysicalAggregateFunction::Min
1766        ));
1767        assert!(matches!(
1768            convert_aggregate_function(LogicalAggregateFunction::Max),
1769            PhysicalAggregateFunction::Max
1770        ));
1771    }
1772
1773    #[test]
1774    fn test_planner_accessors() {
1775        let store = create_test_store();
1776        let planner = Planner::new(Arc::clone(&store));
1777
1778        assert!(planner.transaction_id().is_none());
1779        assert!(planner.transaction_manager().is_none());
1780        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1781    }
1782
1783    #[test]
1784    fn test_physical_plan_accessors() {
1785        let store = create_test_store();
1786        let planner = Planner::new(store);
1787
1788        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1789            variable: "n".to_string(),
1790            label: None,
1791            input: None,
1792        }));
1793
1794        let physical = planner.plan(&logical).unwrap();
1795        assert_eq!(physical.columns(), &["n"]);
1796
1797        // Test into_operator
1798        let _ = physical.into_operator();
1799    }
1800
1801    // ==================== Adaptive Planning Tests ====================
1802
1803    #[test]
1804    fn test_plan_adaptive_with_scan() {
1805        let store = create_test_store();
1806        let planner = Planner::new(store);
1807
1808        // MATCH (n:Person) RETURN n
1809        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1810            items: vec![ReturnItem {
1811                expression: LogicalExpression::Variable("n".to_string()),
1812                alias: None,
1813            }],
1814            distinct: false,
1815            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1816                variable: "n".to_string(),
1817                label: Some("Person".to_string()),
1818                input: None,
1819            })),
1820        }));
1821
1822        let physical = planner.plan_adaptive(&logical).unwrap();
1823        assert_eq!(physical.columns(), &["n"]);
1824        // Should have adaptive context with estimates
1825        assert!(physical.adaptive_context.is_some());
1826    }
1827
1828    #[test]
1829    fn test_plan_adaptive_with_filter() {
1830        let store = create_test_store();
1831        let planner = Planner::new(store);
1832
1833        // MATCH (n) WHERE n.age > 30 RETURN n
1834        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1835            items: vec![ReturnItem {
1836                expression: LogicalExpression::Variable("n".to_string()),
1837                alias: None,
1838            }],
1839            distinct: false,
1840            input: Box::new(LogicalOperator::Filter(FilterOp {
1841                predicate: LogicalExpression::Binary {
1842                    left: Box::new(LogicalExpression::Property {
1843                        variable: "n".to_string(),
1844                        property: "age".to_string(),
1845                    }),
1846                    op: BinaryOp::Gt,
1847                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1848                },
1849                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1850                    variable: "n".to_string(),
1851                    label: None,
1852                    input: None,
1853                })),
1854                pushdown_hint: None,
1855            })),
1856        }));
1857
1858        let physical = planner.plan_adaptive(&logical).unwrap();
1859        assert!(physical.adaptive_context.is_some());
1860    }
1861
1862    #[test]
1863    fn test_plan_adaptive_with_expand() {
1864        let store = create_test_store();
1865        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
1866
1867        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1868        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1869            items: vec![
1870                ReturnItem {
1871                    expression: LogicalExpression::Variable("a".to_string()),
1872                    alias: None,
1873                },
1874                ReturnItem {
1875                    expression: LogicalExpression::Variable("b".to_string()),
1876                    alias: None,
1877                },
1878            ],
1879            distinct: false,
1880            input: Box::new(LogicalOperator::Expand(ExpandOp {
1881                from_variable: "a".to_string(),
1882                to_variable: "b".to_string(),
1883                edge_variable: None,
1884                direction: ExpandDirection::Outgoing,
1885                edge_types: vec!["KNOWS".to_string()],
1886                min_hops: 1,
1887                max_hops: Some(1),
1888                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1889                    variable: "a".to_string(),
1890                    label: None,
1891                    input: None,
1892                })),
1893                path_alias: None,
1894                path_mode: PathMode::Walk,
1895            })),
1896        }));
1897
1898        let physical = planner.plan_adaptive(&logical).unwrap();
1899        assert!(physical.adaptive_context.is_some());
1900    }
1901
1902    #[test]
1903    fn test_plan_adaptive_with_join() {
1904        let store = create_test_store();
1905        let planner = Planner::new(store);
1906
1907        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1908            items: vec![
1909                ReturnItem {
1910                    expression: LogicalExpression::Variable("a".to_string()),
1911                    alias: None,
1912                },
1913                ReturnItem {
1914                    expression: LogicalExpression::Variable("b".to_string()),
1915                    alias: None,
1916                },
1917            ],
1918            distinct: false,
1919            input: Box::new(LogicalOperator::Join(JoinOp {
1920                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1921                    variable: "a".to_string(),
1922                    label: None,
1923                    input: None,
1924                })),
1925                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1926                    variable: "b".to_string(),
1927                    label: None,
1928                    input: None,
1929                })),
1930                join_type: JoinType::Cross,
1931                conditions: vec![],
1932            })),
1933        }));
1934
1935        let physical = planner.plan_adaptive(&logical).unwrap();
1936        assert!(physical.adaptive_context.is_some());
1937    }
1938
1939    #[test]
1940    fn test_plan_adaptive_with_aggregate() {
1941        let store = create_test_store();
1942        let planner = Planner::new(store);
1943
1944        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1945            group_by: vec![],
1946            aggregates: vec![LogicalAggregateExpr {
1947                function: LogicalAggregateFunction::Count,
1948                expression: Some(LogicalExpression::Variable("n".to_string())),
1949                expression2: None,
1950                distinct: false,
1951                alias: Some("cnt".to_string()),
1952                percentile: None,
1953                separator: None,
1954            }],
1955            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1956                variable: "n".to_string(),
1957                label: None,
1958                input: None,
1959            })),
1960            having: None,
1961        }));
1962
1963        let physical = planner.plan_adaptive(&logical).unwrap();
1964        assert!(physical.adaptive_context.is_some());
1965    }
1966
1967    #[test]
1968    fn test_plan_adaptive_with_distinct() {
1969        let store = create_test_store();
1970        let planner = Planner::new(store);
1971
1972        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1973            items: vec![ReturnItem {
1974                expression: LogicalExpression::Variable("n".to_string()),
1975                alias: None,
1976            }],
1977            distinct: false,
1978            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1979                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1980                    variable: "n".to_string(),
1981                    label: None,
1982                    input: None,
1983                })),
1984                columns: None,
1985            })),
1986        }));
1987
1988        let physical = planner.plan_adaptive(&logical).unwrap();
1989        assert!(physical.adaptive_context.is_some());
1990    }
1991
1992    #[test]
1993    fn test_plan_adaptive_with_limit() {
1994        let store = create_test_store();
1995        let planner = Planner::new(store);
1996
1997        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1998            items: vec![ReturnItem {
1999                expression: LogicalExpression::Variable("n".to_string()),
2000                alias: None,
2001            }],
2002            distinct: false,
2003            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2004                count: 10.into(),
2005                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2006                    variable: "n".to_string(),
2007                    label: None,
2008                    input: None,
2009                })),
2010            })),
2011        }));
2012
2013        let physical = planner.plan_adaptive(&logical).unwrap();
2014        assert!(physical.adaptive_context.is_some());
2015    }
2016
2017    #[test]
2018    fn test_plan_adaptive_with_skip() {
2019        let store = create_test_store();
2020        let planner = Planner::new(store);
2021
2022        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2023            items: vec![ReturnItem {
2024                expression: LogicalExpression::Variable("n".to_string()),
2025                alias: None,
2026            }],
2027            distinct: false,
2028            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2029                count: 5.into(),
2030                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2031                    variable: "n".to_string(),
2032                    label: None,
2033                    input: None,
2034                })),
2035            })),
2036        }));
2037
2038        let physical = planner.plan_adaptive(&logical).unwrap();
2039        assert!(physical.adaptive_context.is_some());
2040    }
2041
2042    #[test]
2043    fn test_plan_adaptive_with_sort() {
2044        let store = create_test_store();
2045        let planner = Planner::new(store);
2046
2047        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2048            items: vec![ReturnItem {
2049                expression: LogicalExpression::Variable("n".to_string()),
2050                alias: None,
2051            }],
2052            distinct: false,
2053            input: Box::new(LogicalOperator::Sort(SortOp {
2054                keys: vec![SortKey {
2055                    expression: LogicalExpression::Variable("n".to_string()),
2056                    order: SortOrder::Ascending,
2057                    nulls: None,
2058                }],
2059                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2060                    variable: "n".to_string(),
2061                    label: None,
2062                    input: None,
2063                })),
2064            })),
2065        }));
2066
2067        let physical = planner.plan_adaptive(&logical).unwrap();
2068        assert!(physical.adaptive_context.is_some());
2069    }
2070
2071    #[test]
2072    fn test_plan_adaptive_with_union() {
2073        let store = create_test_store();
2074        let planner = Planner::new(store);
2075
2076        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2077            items: vec![ReturnItem {
2078                expression: LogicalExpression::Variable("n".to_string()),
2079                alias: None,
2080            }],
2081            distinct: false,
2082            input: Box::new(LogicalOperator::Union(UnionOp {
2083                inputs: vec![
2084                    LogicalOperator::NodeScan(NodeScanOp {
2085                        variable: "n".to_string(),
2086                        label: Some("Person".to_string()),
2087                        input: None,
2088                    }),
2089                    LogicalOperator::NodeScan(NodeScanOp {
2090                        variable: "n".to_string(),
2091                        label: Some("Company".to_string()),
2092                        input: None,
2093                    }),
2094                ],
2095            })),
2096        }));
2097
2098        let physical = planner.plan_adaptive(&logical).unwrap();
2099        assert!(physical.adaptive_context.is_some());
2100    }
2101
2102    // ==================== Variable Length Path Tests ====================
2103
2104    #[test]
2105    fn test_plan_expand_variable_length() {
2106        let store = create_test_store();
2107        let planner = Planner::new(store);
2108
2109        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2110        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2111            items: vec![
2112                ReturnItem {
2113                    expression: LogicalExpression::Variable("a".to_string()),
2114                    alias: None,
2115                },
2116                ReturnItem {
2117                    expression: LogicalExpression::Variable("b".to_string()),
2118                    alias: None,
2119                },
2120            ],
2121            distinct: false,
2122            input: Box::new(LogicalOperator::Expand(ExpandOp {
2123                from_variable: "a".to_string(),
2124                to_variable: "b".to_string(),
2125                edge_variable: None,
2126                direction: ExpandDirection::Outgoing,
2127                edge_types: vec!["KNOWS".to_string()],
2128                min_hops: 1,
2129                max_hops: Some(3),
2130                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2131                    variable: "a".to_string(),
2132                    label: None,
2133                    input: None,
2134                })),
2135                path_alias: None,
2136                path_mode: PathMode::Walk,
2137            })),
2138        }));
2139
2140        let physical = planner.plan(&logical).unwrap();
2141        assert!(physical.columns().contains(&"a".to_string()));
2142        assert!(physical.columns().contains(&"b".to_string()));
2143    }
2144
2145    #[test]
2146    fn test_plan_expand_with_path_alias() {
2147        let store = create_test_store();
2148        let planner = Planner::new(store);
2149
2150        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2151        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2152            items: vec![
2153                ReturnItem {
2154                    expression: LogicalExpression::Variable("a".to_string()),
2155                    alias: None,
2156                },
2157                ReturnItem {
2158                    expression: LogicalExpression::Variable("b".to_string()),
2159                    alias: None,
2160                },
2161            ],
2162            distinct: false,
2163            input: Box::new(LogicalOperator::Expand(ExpandOp {
2164                from_variable: "a".to_string(),
2165                to_variable: "b".to_string(),
2166                edge_variable: None,
2167                direction: ExpandDirection::Outgoing,
2168                edge_types: vec!["KNOWS".to_string()],
2169                min_hops: 1,
2170                max_hops: Some(3),
2171                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2172                    variable: "a".to_string(),
2173                    label: None,
2174                    input: None,
2175                })),
2176                path_alias: Some("p".to_string()),
2177                path_mode: PathMode::Walk,
2178            })),
2179        }));
2180
2181        let physical = planner.plan(&logical).unwrap();
2182        // Verify plan was created successfully with expected output columns
2183        assert!(physical.columns().contains(&"a".to_string()));
2184        assert!(physical.columns().contains(&"b".to_string()));
2185    }
2186
2187    #[test]
2188    fn test_plan_expand_incoming() {
2189        let store = create_test_store();
2190        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2191
2192        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2193        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2194            items: vec![
2195                ReturnItem {
2196                    expression: LogicalExpression::Variable("a".to_string()),
2197                    alias: None,
2198                },
2199                ReturnItem {
2200                    expression: LogicalExpression::Variable("b".to_string()),
2201                    alias: None,
2202                },
2203            ],
2204            distinct: false,
2205            input: Box::new(LogicalOperator::Expand(ExpandOp {
2206                from_variable: "a".to_string(),
2207                to_variable: "b".to_string(),
2208                edge_variable: None,
2209                direction: ExpandDirection::Incoming,
2210                edge_types: vec!["KNOWS".to_string()],
2211                min_hops: 1,
2212                max_hops: Some(1),
2213                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2214                    variable: "a".to_string(),
2215                    label: None,
2216                    input: None,
2217                })),
2218                path_alias: None,
2219                path_mode: PathMode::Walk,
2220            })),
2221        }));
2222
2223        let physical = planner.plan(&logical).unwrap();
2224        assert!(physical.columns().contains(&"a".to_string()));
2225        assert!(physical.columns().contains(&"b".to_string()));
2226    }
2227
2228    #[test]
2229    fn test_plan_expand_both_directions() {
2230        let store = create_test_store();
2231        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2232
2233        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2234        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2235            items: vec![
2236                ReturnItem {
2237                    expression: LogicalExpression::Variable("a".to_string()),
2238                    alias: None,
2239                },
2240                ReturnItem {
2241                    expression: LogicalExpression::Variable("b".to_string()),
2242                    alias: None,
2243                },
2244            ],
2245            distinct: false,
2246            input: Box::new(LogicalOperator::Expand(ExpandOp {
2247                from_variable: "a".to_string(),
2248                to_variable: "b".to_string(),
2249                edge_variable: None,
2250                direction: ExpandDirection::Both,
2251                edge_types: vec!["KNOWS".to_string()],
2252                min_hops: 1,
2253                max_hops: Some(1),
2254                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2255                    variable: "a".to_string(),
2256                    label: None,
2257                    input: None,
2258                })),
2259                path_alias: None,
2260                path_mode: PathMode::Walk,
2261            })),
2262        }));
2263
2264        let physical = planner.plan(&logical).unwrap();
2265        assert!(physical.columns().contains(&"a".to_string()));
2266        assert!(physical.columns().contains(&"b".to_string()));
2267    }
2268
2269    // ==================== With Context Tests ====================
2270
2271    #[test]
2272    fn test_planner_with_context() {
2273        use crate::transaction::TransactionManager;
2274
2275        let store = create_test_store();
2276        let transaction_manager = Arc::new(TransactionManager::new());
2277        let transaction_id = transaction_manager.begin();
2278        let epoch = transaction_manager.current_epoch();
2279
2280        let planner = Planner::with_context(
2281            Arc::clone(&store),
2282            Arc::clone(&transaction_manager),
2283            Some(transaction_id),
2284            epoch,
2285        );
2286
2287        assert_eq!(planner.transaction_id(), Some(transaction_id));
2288        assert!(planner.transaction_manager().is_some());
2289        assert_eq!(planner.viewing_epoch(), epoch);
2290    }
2291
2292    #[test]
2293    fn test_planner_with_factorized_execution_disabled() {
2294        let store = create_test_store();
2295        let planner = Planner::new(Arc::clone(&store)).with_factorized_execution(false);
2296
2297        // Two consecutive expands - should NOT use factorized execution
2298        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2299            items: vec![
2300                ReturnItem {
2301                    expression: LogicalExpression::Variable("a".to_string()),
2302                    alias: None,
2303                },
2304                ReturnItem {
2305                    expression: LogicalExpression::Variable("c".to_string()),
2306                    alias: None,
2307                },
2308            ],
2309            distinct: false,
2310            input: Box::new(LogicalOperator::Expand(ExpandOp {
2311                from_variable: "b".to_string(),
2312                to_variable: "c".to_string(),
2313                edge_variable: None,
2314                direction: ExpandDirection::Outgoing,
2315                edge_types: vec![],
2316                min_hops: 1,
2317                max_hops: Some(1),
2318                input: Box::new(LogicalOperator::Expand(ExpandOp {
2319                    from_variable: "a".to_string(),
2320                    to_variable: "b".to_string(),
2321                    edge_variable: None,
2322                    direction: ExpandDirection::Outgoing,
2323                    edge_types: vec![],
2324                    min_hops: 1,
2325                    max_hops: Some(1),
2326                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2327                        variable: "a".to_string(),
2328                        label: None,
2329                        input: None,
2330                    })),
2331                    path_alias: None,
2332                    path_mode: PathMode::Walk,
2333                })),
2334                path_alias: None,
2335                path_mode: PathMode::Walk,
2336            })),
2337        }));
2338
2339        let physical = planner.plan(&logical).unwrap();
2340        assert!(physical.columns().contains(&"a".to_string()));
2341        assert!(physical.columns().contains(&"c".to_string()));
2342    }
2343
2344    // ==================== Sort with Property Tests ====================
2345
2346    #[test]
2347    fn test_plan_sort_by_property() {
2348        let store = create_test_store();
2349        let planner = Planner::new(store);
2350
2351        // MATCH (n) RETURN n ORDER BY n.name ASC
2352        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2353            items: vec![ReturnItem {
2354                expression: LogicalExpression::Variable("n".to_string()),
2355                alias: None,
2356            }],
2357            distinct: false,
2358            input: Box::new(LogicalOperator::Sort(SortOp {
2359                keys: vec![SortKey {
2360                    expression: LogicalExpression::Property {
2361                        variable: "n".to_string(),
2362                        property: "name".to_string(),
2363                    },
2364                    order: SortOrder::Ascending,
2365                    nulls: None,
2366                }],
2367                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2368                    variable: "n".to_string(),
2369                    label: None,
2370                    input: None,
2371                })),
2372            })),
2373        }));
2374
2375        let physical = planner.plan(&logical).unwrap();
2376        // Should have the property column projected
2377        assert!(physical.columns().contains(&"n".to_string()));
2378    }
2379
2380    // ==================== Scan with Input Tests ====================
2381
2382    #[test]
2383    fn test_plan_scan_with_input() {
2384        let store = create_test_store();
2385        let planner = Planner::new(store);
2386
2387        // A scan with another scan as input (for chained patterns)
2388        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2389            items: vec![
2390                ReturnItem {
2391                    expression: LogicalExpression::Variable("a".to_string()),
2392                    alias: None,
2393                },
2394                ReturnItem {
2395                    expression: LogicalExpression::Variable("b".to_string()),
2396                    alias: None,
2397                },
2398            ],
2399            distinct: false,
2400            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2401                variable: "b".to_string(),
2402                label: Some("Company".to_string()),
2403                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2404                    variable: "a".to_string(),
2405                    label: Some("Person".to_string()),
2406                    input: None,
2407                }))),
2408            })),
2409        }));
2410
2411        let physical = planner.plan(&logical).unwrap();
2412        assert!(physical.columns().contains(&"a".to_string()));
2413        assert!(physical.columns().contains(&"b".to_string()));
2414    }
2415}