Skip to main content

grafeo_engine/query/planner/lpg/
mod.rs

1//! LPG (Labeled Property Graph) planner.
2//!
3//! Converts logical plans with LPG operators (NodeScan, Expand, etc.) to
4//! physical operators that execute against an LPG store.
5
6mod aggregate;
7mod expand;
8mod expression;
9mod filter;
10mod join;
11mod mutation;
12mod project;
13mod scan;
14
15#[cfg(feature = "algos")]
16use crate::query::plan::CallProcedureOp;
17use crate::query::plan::{
18    AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
19    BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
20    EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
21    HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
22    LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
23    NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
24    SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
25};
26use grafeo_common::grafeo_debug_span;
27use grafeo_common::types::{EpochId, TransactionId};
28use grafeo_common::types::{LogicalType, Value};
29use grafeo_common::utils::error::{Error, Result};
30use grafeo_core::execution::AdaptiveContext;
31use grafeo_core::execution::operators::{
32    AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
33    CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
34    DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
35    ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
36    FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
37    JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
38    LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
39    MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
40    ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
41    ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
42    SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
43    VariableLengthExpandOperator,
44};
45use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use crate::query::planner::common;
50use crate::query::planner::common::expression_to_string;
51use crate::query::planner::{
52    PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
53    convert_unary_op, value_to_logical_type,
54};
55use crate::transaction::TransactionManager;
56
57/// Range bounds for property-based range queries.
58struct RangeBounds<'a> {
59    min: Option<&'a Value>,
60    max: Option<&'a Value>,
61    min_inclusive: bool,
62    max_inclusive: bool,
63}
64
65/// Converts a logical plan to a physical operator tree for LPG stores.
66pub struct Planner {
67    /// The graph store (read-only operations).
68    pub(super) store: Arc<dyn GraphStore>,
69    /// Writable graph store (None for read-only databases).
70    pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
71    /// Transaction manager for MVCC operations.
72    pub(super) transaction_manager: Option<Arc<TransactionManager>>,
73    /// Current transaction ID (if in a transaction).
74    pub(super) transaction_id: Option<TransactionId>,
75    /// Epoch to use for visibility checks.
76    pub(super) viewing_epoch: EpochId,
77    /// Counter for generating unique anonymous edge column names.
78    pub(super) anon_edge_counter: std::cell::Cell<u32>,
79    /// Whether to use factorized execution for multi-hop queries.
80    pub(super) factorized_execution: bool,
81    /// Variables that hold scalar values (from UNWIND/FOR), not node/edge IDs.
82    /// Used by plan_return to assign `LogicalType::Any` instead of `Node`.
83    pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
84    /// Variables that hold edge IDs (from MATCH edge patterns).
85    /// Used by plan_return to emit `EdgeResolve` instead of `NodeResolve`.
86    pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
87    /// Optional constraint validator for schema enforcement during mutations.
88    pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
89    /// Catalog for user-defined procedure lookup.
90    pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
91    /// Shared parameter state for the currently planning correlated Apply.
92    /// Set by `plan_apply` before planning the inner operator, consumed by
93    /// `plan_operator` when encountering `ParameterScan`.
94    pub(super) correlated_param_state:
95        std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
96    /// Variables from variable-length expand patterns (group-list variables).
97    /// Used by the aggregate planner to detect horizontal aggregation (GE09).
98    pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
99    /// When true, each physical operator is wrapped in `ProfiledOperator`.
100    profiling: std::cell::Cell<bool>,
101    /// Profile entries collected during planning (post-order).
102    profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
103    /// Optional write tracker for recording writes during mutations.
104    write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
105    /// Session context for introspection functions (info, schema, current_schema, etc.).
106    pub(super) session_context: grafeo_core::execution::operators::SessionContext,
107    /// When true, expand operators use epoch-only visibility (no MVCC version
108    /// chain walks).  Set when the plan contains no mutations, so PENDING
109    /// writes are impossible to observe.
110    pub(super) read_only: bool,
111}
112
113impl Planner {
114    /// Creates a new planner with the given store.
115    ///
116    /// This creates a planner without transaction context, using the current
117    /// epoch from the store for visibility.
118    #[must_use]
119    pub fn new(store: Arc<dyn GraphStore>) -> Self {
120        let epoch = store.current_epoch();
121        Self {
122            store,
123            write_store: None,
124            transaction_manager: None,
125            transaction_id: None,
126            viewing_epoch: epoch,
127            anon_edge_counter: std::cell::Cell::new(0),
128            factorized_execution: true,
129            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
130            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
131            validator: None,
132            catalog: None,
133            correlated_param_state: std::cell::RefCell::new(None),
134            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
135            profiling: std::cell::Cell::new(false),
136            profile_entries: std::cell::RefCell::new(Vec::new()),
137            write_tracker: None,
138            session_context: grafeo_core::execution::operators::SessionContext::default(),
139            read_only: false,
140        }
141    }
142
143    /// Creates a new planner with transaction context for MVCC-aware planning.
144    #[must_use]
145    pub fn with_context(
146        store: Arc<dyn GraphStore>,
147        write_store: Option<Arc<dyn GraphStoreMut>>,
148        transaction_manager: Arc<TransactionManager>,
149        transaction_id: Option<TransactionId>,
150        viewing_epoch: EpochId,
151    ) -> Self {
152        use crate::transaction::TransactionWriteTracker;
153
154        // Create write tracker when there's an active transaction
155        let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
156            if transaction_id.is_some() {
157                Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
158                    &transaction_manager,
159                ))))
160            } else {
161                None
162            };
163
164        Self {
165            store,
166            write_store,
167            transaction_manager: Some(transaction_manager),
168            transaction_id,
169            viewing_epoch,
170            anon_edge_counter: std::cell::Cell::new(0),
171            factorized_execution: true,
172            scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
173            edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
174            validator: None,
175            catalog: None,
176            correlated_param_state: std::cell::RefCell::new(None),
177            group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
178            profiling: std::cell::Cell::new(false),
179            profile_entries: std::cell::RefCell::new(Vec::new()),
180            write_tracker,
181            session_context: grafeo_core::execution::operators::SessionContext::default(),
182            read_only: false,
183        }
184    }
185
186    /// Marks this planner as planning a read-only query (no mutations),
187    /// enabling fast-path visibility checks in expand operators.
188    #[must_use]
189    pub fn with_read_only(mut self, read_only: bool) -> Self {
190        self.read_only = read_only;
191        self
192    }
193
194    /// Returns the writable store, or `TransactionError::ReadOnly` if unavailable.
195    fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
196        self.write_store
197            .as_ref()
198            .map(Arc::clone)
199            .ok_or(Error::Transaction(
200                grafeo_common::utils::error::TransactionError::ReadOnly,
201            ))
202    }
203
204    /// Returns the viewing epoch for this planner.
205    #[must_use]
206    pub fn viewing_epoch(&self) -> EpochId {
207        self.viewing_epoch
208    }
209
210    /// Returns the transaction ID for this planner, if any.
211    #[must_use]
212    pub fn transaction_id(&self) -> Option<TransactionId> {
213        self.transaction_id
214    }
215
216    /// Returns a reference to the transaction manager, if available.
217    #[must_use]
218    pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
219        self.transaction_manager.as_ref()
220    }
221
222    /// Enables or disables factorized execution for multi-hop queries.
223    #[must_use]
224    pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
225        self.factorized_execution = enabled;
226        self
227    }
228
229    /// Sets the constraint validator for schema enforcement during mutations.
230    #[must_use]
231    pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
232        self.validator = Some(validator);
233        self
234    }
235
236    /// Sets the catalog for user-defined procedure lookup.
237    #[must_use]
238    pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
239        self.catalog = Some(catalog);
240        self
241    }
242
243    /// Sets the session context for introspection functions.
244    #[must_use]
245    pub fn with_session_context(
246        mut self,
247        context: grafeo_core::execution::operators::SessionContext,
248    ) -> Self {
249        self.session_context = context;
250        self
251    }
252
253    /// Counts consecutive single-hop expand operations.
254    ///
255    /// Returns the count and the deepest non-expand operator (the base of the chain).
256    fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
257        match op {
258            LogicalOperator::Expand(expand) => {
259                let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
260
261                if is_single_hop {
262                    let (inner_count, base) = Self::count_expand_chain(&expand.input);
263                    (inner_count + 1, base)
264                } else {
265                    (0, op)
266                }
267            }
268            _ => (0, op),
269        }
270    }
271
272    /// Collects expand operations from the outermost down to the base.
273    ///
274    /// Returns expands in order from innermost (base) to outermost.
275    fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
276        let mut chain = Vec::new();
277        let mut current = op;
278
279        while let LogicalOperator::Expand(expand) = current {
280            let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
281            if !is_single_hop {
282                break;
283            }
284            chain.push(expand);
285            current = &expand.input;
286        }
287
288        chain.reverse();
289        chain
290    }
291
292    /// Plans a logical plan into a physical operator.
293    pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
294        let _span = grafeo_debug_span!("grafeo::query::plan");
295        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
296        Ok(PhysicalPlan {
297            operator,
298            columns,
299            adaptive_context: None,
300        })
301    }
302
303    /// Plans a logical plan with profiling: each physical operator is wrapped
304    /// in [`ProfiledOperator`](grafeo_core::execution::ProfiledOperator) to
305    /// collect row counts and timing. Returns the physical plan together with
306    /// the collected [`ProfileEntry`](crate::query::profile::ProfileEntry)
307    /// items in post-order (children before parents).
308    pub fn plan_profiled(
309        &self,
310        logical_plan: &LogicalPlan,
311    ) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
312        self.profiling.set(true);
313        self.profile_entries.borrow_mut().clear();
314
315        let result = self.plan_operator(&logical_plan.root);
316
317        self.profiling.set(false);
318        let (operator, columns) = result?;
319        let entries = self.profile_entries.borrow_mut().drain(..).collect();
320
321        Ok((
322            PhysicalPlan {
323                operator,
324                columns,
325                adaptive_context: None,
326            },
327            entries,
328        ))
329    }
330
331    /// Plans a logical plan with adaptive execution support.
332    pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
333        let (operator, columns) = self.plan_operator(&logical_plan.root)?;
334
335        let mut adaptive_context = AdaptiveContext::new();
336        self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
337
338        Ok(PhysicalPlan {
339            operator,
340            columns,
341            adaptive_context: Some(adaptive_context),
342        })
343    }
344
345    /// Collects cardinality estimates from the logical plan into an adaptive context.
346    fn collect_cardinality_estimates(
347        &self,
348        op: &LogicalOperator,
349        ctx: &mut AdaptiveContext,
350        depth: usize,
351    ) {
352        match op {
353            LogicalOperator::NodeScan(scan) => {
354                let estimate = if let Some(label) = &scan.label {
355                    self.store.nodes_by_label(label).len() as f64
356                } else {
357                    self.store.node_count() as f64
358                };
359                let id = format!("scan_{}", scan.variable);
360                ctx.set_estimate(&id, estimate);
361
362                if let Some(input) = &scan.input {
363                    self.collect_cardinality_estimates(input, ctx, depth + 1);
364                }
365            }
366            LogicalOperator::Filter(filter) => {
367                let input_estimate = self.estimate_cardinality(&filter.input);
368                let estimate = input_estimate * 0.3;
369                let id = format!("filter_{depth}");
370                ctx.set_estimate(&id, estimate);
371
372                self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
373            }
374            LogicalOperator::Expand(expand) => {
375                let input_estimate = self.estimate_cardinality(&expand.input);
376                let stats = self.store.statistics();
377                let avg_degree = self.estimate_expand_degree(&stats, expand);
378                let estimate = input_estimate * avg_degree;
379                let id = format!("expand_{}", expand.to_variable);
380                ctx.set_estimate(&id, estimate);
381
382                self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
383            }
384            LogicalOperator::Join(join) => {
385                let left_est = self.estimate_cardinality(&join.left);
386                let right_est = self.estimate_cardinality(&join.right);
387                let estimate = (left_est * right_est).sqrt();
388                let id = format!("join_{depth}");
389                ctx.set_estimate(&id, estimate);
390
391                self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
392                self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
393            }
394            LogicalOperator::Aggregate(agg) => {
395                let input_estimate = self.estimate_cardinality(&agg.input);
396                let estimate = if agg.group_by.is_empty() {
397                    1.0
398                } else {
399                    (input_estimate * 0.1).max(1.0)
400                };
401                let id = format!("aggregate_{depth}");
402                ctx.set_estimate(&id, estimate);
403
404                self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
405            }
406            LogicalOperator::Distinct(distinct) => {
407                let input_estimate = self.estimate_cardinality(&distinct.input);
408                let estimate = (input_estimate * 0.5).max(1.0);
409                let id = format!("distinct_{depth}");
410                ctx.set_estimate(&id, estimate);
411
412                self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
413            }
414            LogicalOperator::Return(ret) => {
415                self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
416            }
417            LogicalOperator::Limit(limit) => {
418                let input_estimate = self.estimate_cardinality(&limit.input);
419                let estimate = (input_estimate).min(limit.count.estimate());
420                let id = format!("limit_{depth}");
421                ctx.set_estimate(&id, estimate);
422
423                self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
424            }
425            LogicalOperator::Skip(skip) => {
426                let input_estimate = self.estimate_cardinality(&skip.input);
427                let estimate = (input_estimate - skip.count.estimate()).max(0.0);
428                let id = format!("skip_{depth}");
429                ctx.set_estimate(&id, estimate);
430
431                self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
432            }
433            LogicalOperator::Sort(sort) => {
434                self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
435            }
436            LogicalOperator::Union(union) => {
437                let estimate: f64 = union
438                    .inputs
439                    .iter()
440                    .map(|input| self.estimate_cardinality(input))
441                    .sum();
442                let id = format!("union_{depth}");
443                ctx.set_estimate(&id, estimate);
444
445                for input in &union.inputs {
446                    self.collect_cardinality_estimates(input, ctx, depth + 1);
447                }
448            }
449            _ => {
450                // For other operators, try to recurse into known input patterns
451            }
452        }
453    }
454
455    /// Estimates cardinality for a logical operator subtree.
456    fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
457        match op {
458            LogicalOperator::NodeScan(scan) => {
459                if let Some(label) = &scan.label {
460                    self.store.nodes_by_label(label).len() as f64
461                } else {
462                    self.store.node_count() as f64
463                }
464            }
465            LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
466            LogicalOperator::Expand(expand) => {
467                let stats = self.store.statistics();
468                let avg_degree = self.estimate_expand_degree(&stats, expand);
469                self.estimate_cardinality(&expand.input) * avg_degree
470            }
471            LogicalOperator::Join(join) => {
472                let left = self.estimate_cardinality(&join.left);
473                let right = self.estimate_cardinality(&join.right);
474                (left * right).sqrt()
475            }
476            LogicalOperator::Aggregate(agg) => {
477                if agg.group_by.is_empty() {
478                    1.0
479                } else {
480                    (self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
481                }
482            }
483            LogicalOperator::Distinct(distinct) => {
484                (self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
485            }
486            LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
487            LogicalOperator::Limit(limit) => self
488                .estimate_cardinality(&limit.input)
489                .min(limit.count.estimate()),
490            LogicalOperator::Skip(skip) => {
491                (self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
492            }
493            LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
494            LogicalOperator::Union(union) => union
495                .inputs
496                .iter()
497                .map(|input| self.estimate_cardinality(input))
498                .sum(),
499            LogicalOperator::Except(except) => {
500                let left = self.estimate_cardinality(&except.left);
501                let right = self.estimate_cardinality(&except.right);
502                (left - right).max(0.0)
503            }
504            LogicalOperator::Intersect(intersect) => {
505                let left = self.estimate_cardinality(&intersect.left);
506                let right = self.estimate_cardinality(&intersect.right);
507                left.min(right)
508            }
509            LogicalOperator::Otherwise(otherwise) => self
510                .estimate_cardinality(&otherwise.left)
511                .max(self.estimate_cardinality(&otherwise.right)),
512            _ => 1000.0,
513        }
514    }
515
516    /// Estimates the average edge degree for an expand operation using store statistics.
517    fn estimate_expand_degree(
518        &self,
519        stats: &grafeo_core::statistics::Statistics,
520        expand: &ExpandOp,
521    ) -> f64 {
522        let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
523        if expand.edge_types.len() == 1 {
524            stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
525        } else if stats.total_nodes > 0 {
526            (stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
527        } else {
528            10.0
529        }
530    }
531
532    /// If profiling is enabled, wraps a planned result in `ProfiledOperator`
533    /// and records a [`ProfileEntry`](crate::query::profile::ProfileEntry).
534    fn maybe_profile(
535        &self,
536        result: Result<(Box<dyn Operator>, Vec<String>)>,
537        op: &LogicalOperator,
538    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
539        if self.profiling.get() {
540            let (physical, columns) = result?;
541            let (entry, stats) =
542                crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
543            let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
544            self.profile_entries.borrow_mut().push(entry);
545            Ok((Box::new(profiled), columns))
546        } else {
547            result
548        }
549    }
550
551    /// Plans a single logical operator.
552    fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
553        let result = match op {
554            LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
555            LogicalOperator::Expand(expand) => {
556                if self.factorized_execution {
557                    let (chain_len, _base) = Self::count_expand_chain(op);
558                    if chain_len >= 2 {
559                        return self.maybe_profile(self.plan_expand_chain(op), op);
560                    }
561                }
562                self.plan_expand(expand)
563            }
564            LogicalOperator::Return(ret) => self.plan_return(ret),
565            LogicalOperator::Filter(filter) => self.plan_filter(filter),
566            LogicalOperator::Project(project) => self.plan_project(project),
567            LogicalOperator::Limit(limit) => self.plan_limit(limit),
568            LogicalOperator::Skip(skip) => self.plan_skip(skip),
569            LogicalOperator::Sort(sort) => self.plan_sort(sort),
570            LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
571            LogicalOperator::Join(join) => self.plan_join(join),
572            LogicalOperator::Union(union) => self.plan_union(union),
573            LogicalOperator::Except(except) => self.plan_except(except),
574            LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
575            LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
576            LogicalOperator::Apply(apply) => self.plan_apply(apply),
577            LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
578            LogicalOperator::CreateNode(create) => self.plan_create_node(create),
579            LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
580            LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
581            LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
582            LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
583            LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
584            LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
585            LogicalOperator::Merge(merge) => self.plan_merge(merge),
586            LogicalOperator::MergeRelationship(merge_rel) => {
587                self.plan_merge_relationship(merge_rel)
588            }
589            LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
590            LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
591            LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
592            LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
593            LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
594            #[cfg(feature = "algos")]
595            LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
596            #[cfg(not(feature = "algos"))]
597            LogicalOperator::CallProcedure(_) => Err(Error::Internal(
598                "CALL procedures require the 'algos' feature".to_string(),
599            )),
600            LogicalOperator::ParameterScan(_param_scan) => {
601                let state = self
602                    .correlated_param_state
603                    .borrow()
604                    .clone()
605                    .ok_or_else(|| {
606                        Error::Internal(
607                            "ParameterScan without correlated Apply context".to_string(),
608                        )
609                    })?;
610                // Use the actual column names from the ParameterState (which may
611                // have been expanded from "*" to real variable names in plan_apply)
612                let columns = state.columns.clone();
613                let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
614                Ok((operator, columns))
615            }
616            LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
617            LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
618            LogicalOperator::LoadData(load) => {
619                let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
620                    load.path.clone(),
621                    load.format,
622                    load.with_headers,
623                    load.field_terminator,
624                    load.variable.clone(),
625                ));
626                Ok((operator, vec![load.variable.clone()]))
627            }
628            LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
629            LogicalOperator::VectorScan(_) => Err(Error::Internal(
630                "VectorScan requires vector-index feature".to_string(),
631            )),
632            LogicalOperator::VectorJoin(_) => Err(Error::Internal(
633                "VectorJoin requires vector-index feature".to_string(),
634            )),
635            _ => Err(Error::Internal(format!(
636                "Unsupported operator: {:?}",
637                std::mem::discriminant(op)
638            ))),
639        };
640        self.maybe_profile(result, op)
641    }
642
643    /// Plans a horizontal aggregate operator (per-row aggregation over a list column).
644    fn plan_horizontal_aggregate(
645        &self,
646        ha: &HorizontalAggregateOp,
647    ) -> Result<(Box<dyn Operator>, Vec<String>)> {
648        let (child_op, child_columns) = self.plan_operator(&ha.input)?;
649
650        let list_col_idx = child_columns
651            .iter()
652            .position(|c| c == &ha.list_column)
653            .ok_or_else(|| {
654                Error::Internal(format!(
655                    "HorizontalAggregate list column '{}' not found in {:?}",
656                    ha.list_column, child_columns
657                ))
658            })?;
659
660        let entity_kind = match ha.entity_kind {
661            LogicalEntityKind::Edge => EntityKind::Edge,
662            LogicalEntityKind::Node => EntityKind::Node,
663        };
664
665        let function = convert_aggregate_function(ha.function);
666        let input_column_count = child_columns.len();
667
668        let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
669            child_op,
670            list_col_idx,
671            entity_kind,
672            function,
673            ha.property.clone(),
674            Arc::clone(&self.store) as Arc<dyn GraphStore>,
675            input_column_count,
676        ));
677
678        let mut columns = child_columns;
679        columns.push(ha.alias.clone());
680        // Mark the result as a scalar column
681        self.scalar_columns.borrow_mut().insert(ha.alias.clone());
682
683        Ok((operator, columns))
684    }
685
686    /// Plans a `MapCollect` operator that collapses grouped rows into a single Map value.
687    fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
688        let (child_op, child_columns) = self.plan_operator(&mc.input)?;
689        let key_idx = child_columns
690            .iter()
691            .position(|c| c == &mc.key_var)
692            .ok_or_else(|| {
693                Error::Internal(format!(
694                    "MapCollect key '{}' not in columns {:?}",
695                    mc.key_var, child_columns
696                ))
697            })?;
698        let value_idx = child_columns
699            .iter()
700            .position(|c| c == &mc.value_var)
701            .ok_or_else(|| {
702                Error::Internal(format!(
703                    "MapCollect value '{}' not in columns {:?}",
704                    mc.value_var, child_columns
705                ))
706            })?;
707        let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
708        self.scalar_columns.borrow_mut().insert(mc.alias.clone());
709        Ok((operator, vec![mc.alias.clone()]))
710    }
711}
712
713/// An operator that yields a static set of rows (for `grafeo.procedures()` etc.).
714#[cfg(feature = "algos")]
715struct StaticResultOperator {
716    rows: Vec<Vec<Value>>,
717    column_indices: Vec<usize>,
718    row_index: usize,
719}
720
721#[cfg(feature = "algos")]
722impl Operator for StaticResultOperator {
723    fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
724        use grafeo_core::execution::DataChunk;
725
726        if self.row_index >= self.rows.len() {
727            return Ok(None);
728        }
729
730        let remaining = self.rows.len() - self.row_index;
731        let chunk_rows = remaining.min(1024);
732        let col_count = self.column_indices.len();
733
734        let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
735        let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
736
737        for row_offset in 0..chunk_rows {
738            let row = &self.rows[self.row_index + row_offset];
739            for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
740                let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
741                if let Some(col) = chunk.column_mut(col_idx) {
742                    col.push_value(value);
743                }
744            }
745        }
746        chunk.set_count(chunk_rows);
747
748        self.row_index += chunk_rows;
749        Ok(Some(chunk))
750    }
751
752    fn reset(&mut self) {
753        self.row_index = 0;
754    }
755
756    fn name(&self) -> &'static str {
757        "StaticResult"
758    }
759}
760
761#[cfg(test)]
762mod tests {
763    use super::*;
764    use crate::query::plan::{
765        AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
766        DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
767        LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
768        SkipOp as LogicalSkipOp, SortKey, SortOp,
769    };
770    use grafeo_common::types::Value;
771    use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
772    use grafeo_core::graph::GraphStoreMut;
773    use grafeo_core::graph::lpg::LpgStore;
774
775    fn create_test_store() -> Arc<LpgStore> {
776        let store = Arc::new(LpgStore::new().unwrap());
777        store.create_node(&["Person"]);
778        store.create_node(&["Person"]);
779        store.create_node(&["Company"]);
780        store
781    }
782
783    // ==================== Simple Scan Tests ====================
784
785    #[test]
786    fn test_plan_simple_scan() {
787        let store = create_test_store();
788        let planner = Planner::new(store);
789
790        // MATCH (n:Person) RETURN n
791        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
792            items: vec![ReturnItem {
793                expression: LogicalExpression::Variable("n".to_string()),
794                alias: None,
795            }],
796            distinct: false,
797            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
798                variable: "n".to_string(),
799                label: Some("Person".to_string()),
800                input: None,
801            })),
802        }));
803
804        let physical = planner.plan(&logical).unwrap();
805        assert_eq!(physical.columns(), &["n"]);
806    }
807
808    #[test]
809    fn test_plan_scan_without_label() {
810        let store = create_test_store();
811        let planner = Planner::new(store);
812
813        // MATCH (n) RETURN n
814        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
815            items: vec![ReturnItem {
816                expression: LogicalExpression::Variable("n".to_string()),
817                alias: None,
818            }],
819            distinct: false,
820            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
821                variable: "n".to_string(),
822                label: None,
823                input: None,
824            })),
825        }));
826
827        let physical = planner.plan(&logical).unwrap();
828        assert_eq!(physical.columns(), &["n"]);
829    }
830
831    #[test]
832    fn test_plan_return_with_alias() {
833        let store = create_test_store();
834        let planner = Planner::new(store);
835
836        // MATCH (n:Person) RETURN n AS person
837        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
838            items: vec![ReturnItem {
839                expression: LogicalExpression::Variable("n".to_string()),
840                alias: Some("person".to_string()),
841            }],
842            distinct: false,
843            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
844                variable: "n".to_string(),
845                label: Some("Person".to_string()),
846                input: None,
847            })),
848        }));
849
850        let physical = planner.plan(&logical).unwrap();
851        assert_eq!(physical.columns(), &["person"]);
852    }
853
854    #[test]
855    fn test_plan_return_property() {
856        let store = create_test_store();
857        let planner = Planner::new(store);
858
859        // MATCH (n:Person) RETURN n.name
860        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
861            items: vec![ReturnItem {
862                expression: LogicalExpression::Property {
863                    variable: "n".to_string(),
864                    property: "name".to_string(),
865                },
866                alias: None,
867            }],
868            distinct: false,
869            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
870                variable: "n".to_string(),
871                label: Some("Person".to_string()),
872                input: None,
873            })),
874        }));
875
876        let physical = planner.plan(&logical).unwrap();
877        assert_eq!(physical.columns(), &["n.name"]);
878    }
879
880    #[test]
881    fn test_plan_return_literal() {
882        let store = create_test_store();
883        let planner = Planner::new(store);
884
885        // MATCH (n) RETURN 42 AS answer
886        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
887            items: vec![ReturnItem {
888                expression: LogicalExpression::Literal(Value::Int64(42)),
889                alias: Some("answer".to_string()),
890            }],
891            distinct: false,
892            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
893                variable: "n".to_string(),
894                label: None,
895                input: None,
896            })),
897        }));
898
899        let physical = planner.plan(&logical).unwrap();
900        assert_eq!(physical.columns(), &["answer"]);
901    }
902
903    // ==================== Filter Tests ====================
904
905    #[test]
906    fn test_plan_filter_equality() {
907        let store = create_test_store();
908        let planner = Planner::new(store);
909
910        // MATCH (n:Person) WHERE n.age = 30 RETURN n
911        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
912            items: vec![ReturnItem {
913                expression: LogicalExpression::Variable("n".to_string()),
914                alias: None,
915            }],
916            distinct: false,
917            input: Box::new(LogicalOperator::Filter(FilterOp {
918                predicate: LogicalExpression::Binary {
919                    left: Box::new(LogicalExpression::Property {
920                        variable: "n".to_string(),
921                        property: "age".to_string(),
922                    }),
923                    op: BinaryOp::Eq,
924                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
925                },
926                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
927                    variable: "n".to_string(),
928                    label: Some("Person".to_string()),
929                    input: None,
930                })),
931                pushdown_hint: None,
932            })),
933        }));
934
935        let physical = planner.plan(&logical).unwrap();
936        assert_eq!(physical.columns(), &["n"]);
937    }
938
939    #[test]
940    fn test_plan_filter_compound_and() {
941        let store = create_test_store();
942        let planner = Planner::new(store);
943
944        // WHERE n.age > 20 AND n.age < 40
945        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
946            items: vec![ReturnItem {
947                expression: LogicalExpression::Variable("n".to_string()),
948                alias: None,
949            }],
950            distinct: false,
951            input: Box::new(LogicalOperator::Filter(FilterOp {
952                predicate: LogicalExpression::Binary {
953                    left: Box::new(LogicalExpression::Binary {
954                        left: Box::new(LogicalExpression::Property {
955                            variable: "n".to_string(),
956                            property: "age".to_string(),
957                        }),
958                        op: BinaryOp::Gt,
959                        right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
960                    }),
961                    op: BinaryOp::And,
962                    right: Box::new(LogicalExpression::Binary {
963                        left: Box::new(LogicalExpression::Property {
964                            variable: "n".to_string(),
965                            property: "age".to_string(),
966                        }),
967                        op: BinaryOp::Lt,
968                        right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
969                    }),
970                },
971                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
972                    variable: "n".to_string(),
973                    label: None,
974                    input: None,
975                })),
976                pushdown_hint: None,
977            })),
978        }));
979
980        let physical = planner.plan(&logical).unwrap();
981        assert_eq!(physical.columns(), &["n"]);
982    }
983
984    #[test]
985    fn test_plan_filter_unary_not() {
986        let store = create_test_store();
987        let planner = Planner::new(store);
988
989        // WHERE NOT n.active
990        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
991            items: vec![ReturnItem {
992                expression: LogicalExpression::Variable("n".to_string()),
993                alias: None,
994            }],
995            distinct: false,
996            input: Box::new(LogicalOperator::Filter(FilterOp {
997                predicate: LogicalExpression::Unary {
998                    op: UnaryOp::Not,
999                    operand: Box::new(LogicalExpression::Property {
1000                        variable: "n".to_string(),
1001                        property: "active".to_string(),
1002                    }),
1003                },
1004                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1005                    variable: "n".to_string(),
1006                    label: None,
1007                    input: None,
1008                })),
1009                pushdown_hint: None,
1010            })),
1011        }));
1012
1013        let physical = planner.plan(&logical).unwrap();
1014        assert_eq!(physical.columns(), &["n"]);
1015    }
1016
1017    #[test]
1018    fn test_plan_filter_is_null() {
1019        let store = create_test_store();
1020        let planner = Planner::new(store);
1021
1022        // WHERE n.email IS NULL
1023        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1024            items: vec![ReturnItem {
1025                expression: LogicalExpression::Variable("n".to_string()),
1026                alias: None,
1027            }],
1028            distinct: false,
1029            input: Box::new(LogicalOperator::Filter(FilterOp {
1030                predicate: LogicalExpression::Unary {
1031                    op: UnaryOp::IsNull,
1032                    operand: Box::new(LogicalExpression::Property {
1033                        variable: "n".to_string(),
1034                        property: "email".to_string(),
1035                    }),
1036                },
1037                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1038                    variable: "n".to_string(),
1039                    label: None,
1040                    input: None,
1041                })),
1042                pushdown_hint: None,
1043            })),
1044        }));
1045
1046        let physical = planner.plan(&logical).unwrap();
1047        assert_eq!(physical.columns(), &["n"]);
1048    }
1049
1050    #[test]
1051    fn test_plan_filter_function_call() {
1052        let store = create_test_store();
1053        let planner = Planner::new(store);
1054
1055        // WHERE size(n.friends) > 0
1056        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1057            items: vec![ReturnItem {
1058                expression: LogicalExpression::Variable("n".to_string()),
1059                alias: None,
1060            }],
1061            distinct: false,
1062            input: Box::new(LogicalOperator::Filter(FilterOp {
1063                predicate: LogicalExpression::Binary {
1064                    left: Box::new(LogicalExpression::FunctionCall {
1065                        name: "size".to_string(),
1066                        args: vec![LogicalExpression::Property {
1067                            variable: "n".to_string(),
1068                            property: "friends".to_string(),
1069                        }],
1070                        distinct: false,
1071                    }),
1072                    op: BinaryOp::Gt,
1073                    right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
1074                },
1075                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1076                    variable: "n".to_string(),
1077                    label: None,
1078                    input: None,
1079                })),
1080                pushdown_hint: None,
1081            })),
1082        }));
1083
1084        let physical = planner.plan(&logical).unwrap();
1085        assert_eq!(physical.columns(), &["n"]);
1086    }
1087
1088    // ==================== Expand Tests ====================
1089
1090    #[test]
1091    fn test_plan_expand_outgoing() {
1092        let store = create_test_store();
1093        let planner = Planner::new(store);
1094
1095        // MATCH (a:Person)-[:KNOWS]->(b) RETURN a, b
1096        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1097            items: vec![
1098                ReturnItem {
1099                    expression: LogicalExpression::Variable("a".to_string()),
1100                    alias: None,
1101                },
1102                ReturnItem {
1103                    expression: LogicalExpression::Variable("b".to_string()),
1104                    alias: None,
1105                },
1106            ],
1107            distinct: false,
1108            input: Box::new(LogicalOperator::Expand(ExpandOp {
1109                from_variable: "a".to_string(),
1110                to_variable: "b".to_string(),
1111                edge_variable: None,
1112                direction: ExpandDirection::Outgoing,
1113                edge_types: vec!["KNOWS".to_string()],
1114                min_hops: 1,
1115                max_hops: Some(1),
1116                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1117                    variable: "a".to_string(),
1118                    label: Some("Person".to_string()),
1119                    input: None,
1120                })),
1121                path_alias: None,
1122                path_mode: PathMode::Walk,
1123            })),
1124        }));
1125
1126        let physical = planner.plan(&logical).unwrap();
1127        // The return should have columns [a, b]
1128        assert!(physical.columns().contains(&"a".to_string()));
1129        assert!(physical.columns().contains(&"b".to_string()));
1130    }
1131
1132    #[test]
1133    fn test_plan_expand_with_edge_variable() {
1134        let store = create_test_store();
1135        let planner = Planner::new(store);
1136
1137        // MATCH (a)-[r:KNOWS]->(b) RETURN a, r, b
1138        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1139            items: vec![
1140                ReturnItem {
1141                    expression: LogicalExpression::Variable("a".to_string()),
1142                    alias: None,
1143                },
1144                ReturnItem {
1145                    expression: LogicalExpression::Variable("r".to_string()),
1146                    alias: None,
1147                },
1148                ReturnItem {
1149                    expression: LogicalExpression::Variable("b".to_string()),
1150                    alias: None,
1151                },
1152            ],
1153            distinct: false,
1154            input: Box::new(LogicalOperator::Expand(ExpandOp {
1155                from_variable: "a".to_string(),
1156                to_variable: "b".to_string(),
1157                edge_variable: Some("r".to_string()),
1158                direction: ExpandDirection::Outgoing,
1159                edge_types: vec!["KNOWS".to_string()],
1160                min_hops: 1,
1161                max_hops: Some(1),
1162                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1163                    variable: "a".to_string(),
1164                    label: None,
1165                    input: None,
1166                })),
1167                path_alias: None,
1168                path_mode: PathMode::Walk,
1169            })),
1170        }));
1171
1172        let physical = planner.plan(&logical).unwrap();
1173        assert!(physical.columns().contains(&"a".to_string()));
1174        assert!(physical.columns().contains(&"r".to_string()));
1175        assert!(physical.columns().contains(&"b".to_string()));
1176    }
1177
1178    // ==================== Limit/Skip/Sort Tests ====================
1179
1180    #[test]
1181    fn test_plan_limit() {
1182        let store = create_test_store();
1183        let planner = Planner::new(store);
1184
1185        // MATCH (n) RETURN n LIMIT 10
1186        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1187            items: vec![ReturnItem {
1188                expression: LogicalExpression::Variable("n".to_string()),
1189                alias: None,
1190            }],
1191            distinct: false,
1192            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
1193                count: 10.into(),
1194                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1195                    variable: "n".to_string(),
1196                    label: None,
1197                    input: None,
1198                })),
1199            })),
1200        }));
1201
1202        let physical = planner.plan(&logical).unwrap();
1203        assert_eq!(physical.columns(), &["n"]);
1204    }
1205
1206    #[test]
1207    fn test_plan_skip() {
1208        let store = create_test_store();
1209        let planner = Planner::new(store);
1210
1211        // MATCH (n) RETURN n SKIP 5
1212        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1213            items: vec![ReturnItem {
1214                expression: LogicalExpression::Variable("n".to_string()),
1215                alias: None,
1216            }],
1217            distinct: false,
1218            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
1219                count: 5.into(),
1220                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1221                    variable: "n".to_string(),
1222                    label: None,
1223                    input: None,
1224                })),
1225            })),
1226        }));
1227
1228        let physical = planner.plan(&logical).unwrap();
1229        assert_eq!(physical.columns(), &["n"]);
1230    }
1231
1232    #[test]
1233    fn test_plan_sort() {
1234        let store = create_test_store();
1235        let planner = Planner::new(store);
1236
1237        // MATCH (n) RETURN n ORDER BY n.name ASC
1238        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1239            items: vec![ReturnItem {
1240                expression: LogicalExpression::Variable("n".to_string()),
1241                alias: None,
1242            }],
1243            distinct: false,
1244            input: Box::new(LogicalOperator::Sort(SortOp {
1245                keys: vec![SortKey {
1246                    expression: LogicalExpression::Variable("n".to_string()),
1247                    order: SortOrder::Ascending,
1248                    nulls: None,
1249                }],
1250                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1251                    variable: "n".to_string(),
1252                    label: None,
1253                    input: None,
1254                })),
1255            })),
1256        }));
1257
1258        let physical = planner.plan(&logical).unwrap();
1259        assert_eq!(physical.columns(), &["n"]);
1260    }
1261
1262    #[test]
1263    fn test_plan_sort_descending() {
1264        let store = create_test_store();
1265        let planner = Planner::new(store);
1266
1267        // ORDER BY n DESC
1268        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1269            items: vec![ReturnItem {
1270                expression: LogicalExpression::Variable("n".to_string()),
1271                alias: None,
1272            }],
1273            distinct: false,
1274            input: Box::new(LogicalOperator::Sort(SortOp {
1275                keys: vec![SortKey {
1276                    expression: LogicalExpression::Variable("n".to_string()),
1277                    order: SortOrder::Descending,
1278                    nulls: None,
1279                }],
1280                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1281                    variable: "n".to_string(),
1282                    label: None,
1283                    input: None,
1284                })),
1285            })),
1286        }));
1287
1288        let physical = planner.plan(&logical).unwrap();
1289        assert_eq!(physical.columns(), &["n"]);
1290    }
1291
1292    #[test]
1293    fn test_plan_distinct() {
1294        let store = create_test_store();
1295        let planner = Planner::new(store);
1296
1297        // MATCH (n) RETURN DISTINCT n
1298        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1299            items: vec![ReturnItem {
1300                expression: LogicalExpression::Variable("n".to_string()),
1301                alias: None,
1302            }],
1303            distinct: false,
1304            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1305                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1306                    variable: "n".to_string(),
1307                    label: None,
1308                    input: None,
1309                })),
1310                columns: None,
1311            })),
1312        }));
1313
1314        let physical = planner.plan(&logical).unwrap();
1315        assert_eq!(physical.columns(), &["n"]);
1316    }
1317
1318    #[test]
1319    fn test_plan_distinct_with_columns() {
1320        let store = create_test_store();
1321        let planner = Planner::new(store);
1322
1323        // DISTINCT on specific columns (column-specific dedup)
1324        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1325            items: vec![ReturnItem {
1326                expression: LogicalExpression::Variable("n".to_string()),
1327                alias: None,
1328            }],
1329            distinct: false,
1330            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1331                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1332                    variable: "n".to_string(),
1333                    label: None,
1334                    input: None,
1335                })),
1336                columns: Some(vec!["n".to_string()]),
1337            })),
1338        }));
1339
1340        let physical = planner.plan(&logical).unwrap();
1341        assert_eq!(physical.columns(), &["n"]);
1342    }
1343
1344    #[test]
1345    fn test_plan_distinct_with_nonexistent_columns() {
1346        let store = create_test_store();
1347        let planner = Planner::new(store);
1348
1349        // When distinct columns don't match any output columns,
1350        // it falls back to full-row distinct.
1351        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1352            items: vec![ReturnItem {
1353                expression: LogicalExpression::Variable("n".to_string()),
1354                alias: None,
1355            }],
1356            distinct: false,
1357            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
1358                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1359                    variable: "n".to_string(),
1360                    label: None,
1361                    input: None,
1362                })),
1363                columns: Some(vec!["nonexistent".to_string()]),
1364            })),
1365        }));
1366
1367        let physical = planner.plan(&logical).unwrap();
1368        assert_eq!(physical.columns(), &["n"]);
1369    }
1370
1371    // ==================== Aggregate Tests ====================
1372
1373    #[test]
1374    fn test_plan_aggregate_count() {
1375        let store = create_test_store();
1376        let planner = Planner::new(store);
1377
1378        // MATCH (n) RETURN count(n)
1379        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1380            items: vec![ReturnItem {
1381                expression: LogicalExpression::Variable("cnt".to_string()),
1382                alias: None,
1383            }],
1384            distinct: false,
1385            input: Box::new(LogicalOperator::Aggregate(AggregateOp {
1386                group_by: vec![],
1387                aggregates: vec![LogicalAggregateExpr {
1388                    function: LogicalAggregateFunction::Count,
1389                    expression: Some(LogicalExpression::Variable("n".to_string())),
1390                    expression2: None,
1391                    distinct: false,
1392                    alias: Some("cnt".to_string()),
1393                    percentile: None,
1394                    separator: None,
1395                }],
1396                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1397                    variable: "n".to_string(),
1398                    label: None,
1399                    input: None,
1400                })),
1401                having: None,
1402            })),
1403        }));
1404
1405        let physical = planner.plan(&logical).unwrap();
1406        assert!(physical.columns().contains(&"cnt".to_string()));
1407    }
1408
1409    #[test]
1410    fn test_plan_aggregate_with_group_by() {
1411        let store = create_test_store();
1412        let planner = Planner::new(store);
1413
1414        // MATCH (n:Person) RETURN n.city, count(n) GROUP BY n.city
1415        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1416            group_by: vec![LogicalExpression::Property {
1417                variable: "n".to_string(),
1418                property: "city".to_string(),
1419            }],
1420            aggregates: vec![LogicalAggregateExpr {
1421                function: LogicalAggregateFunction::Count,
1422                expression: Some(LogicalExpression::Variable("n".to_string())),
1423                expression2: None,
1424                distinct: false,
1425                alias: Some("cnt".to_string()),
1426                percentile: None,
1427                separator: None,
1428            }],
1429            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1430                variable: "n".to_string(),
1431                label: Some("Person".to_string()),
1432                input: None,
1433            })),
1434            having: None,
1435        }));
1436
1437        let physical = planner.plan(&logical).unwrap();
1438        assert_eq!(physical.columns().len(), 2);
1439    }
1440
1441    #[test]
1442    fn test_plan_aggregate_sum() {
1443        let store = create_test_store();
1444        let planner = Planner::new(store);
1445
1446        // SUM(n.value)
1447        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1448            group_by: vec![],
1449            aggregates: vec![LogicalAggregateExpr {
1450                function: LogicalAggregateFunction::Sum,
1451                expression: Some(LogicalExpression::Property {
1452                    variable: "n".to_string(),
1453                    property: "value".to_string(),
1454                }),
1455                expression2: None,
1456                distinct: false,
1457                alias: Some("total".to_string()),
1458                percentile: None,
1459                separator: None,
1460            }],
1461            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1462                variable: "n".to_string(),
1463                label: None,
1464                input: None,
1465            })),
1466            having: None,
1467        }));
1468
1469        let physical = planner.plan(&logical).unwrap();
1470        assert!(physical.columns().contains(&"total".to_string()));
1471    }
1472
1473    #[test]
1474    fn test_plan_aggregate_avg() {
1475        let store = create_test_store();
1476        let planner = Planner::new(store);
1477
1478        // AVG(n.score)
1479        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1480            group_by: vec![],
1481            aggregates: vec![LogicalAggregateExpr {
1482                function: LogicalAggregateFunction::Avg,
1483                expression: Some(LogicalExpression::Property {
1484                    variable: "n".to_string(),
1485                    property: "score".to_string(),
1486                }),
1487                expression2: None,
1488                distinct: false,
1489                alias: Some("average".to_string()),
1490                percentile: None,
1491                separator: None,
1492            }],
1493            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1494                variable: "n".to_string(),
1495                label: None,
1496                input: None,
1497            })),
1498            having: None,
1499        }));
1500
1501        let physical = planner.plan(&logical).unwrap();
1502        assert!(physical.columns().contains(&"average".to_string()));
1503    }
1504
1505    #[test]
1506    fn test_plan_aggregate_min_max() {
1507        let store = create_test_store();
1508        let planner = Planner::new(store);
1509
1510        // MIN(n.age), MAX(n.age)
1511        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1512            group_by: vec![],
1513            aggregates: vec![
1514                LogicalAggregateExpr {
1515                    function: LogicalAggregateFunction::Min,
1516                    expression: Some(LogicalExpression::Property {
1517                        variable: "n".to_string(),
1518                        property: "age".to_string(),
1519                    }),
1520                    expression2: None,
1521                    distinct: false,
1522                    alias: Some("youngest".to_string()),
1523                    percentile: None,
1524                    separator: None,
1525                },
1526                LogicalAggregateExpr {
1527                    function: LogicalAggregateFunction::Max,
1528                    expression: Some(LogicalExpression::Property {
1529                        variable: "n".to_string(),
1530                        property: "age".to_string(),
1531                    }),
1532                    expression2: None,
1533                    distinct: false,
1534                    alias: Some("oldest".to_string()),
1535                    percentile: None,
1536                    separator: None,
1537                },
1538            ],
1539            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1540                variable: "n".to_string(),
1541                label: None,
1542                input: None,
1543            })),
1544            having: None,
1545        }));
1546
1547        let physical = planner.plan(&logical).unwrap();
1548        assert!(physical.columns().contains(&"youngest".to_string()));
1549        assert!(physical.columns().contains(&"oldest".to_string()));
1550    }
1551
1552    // ==================== Join Tests ====================
1553
1554    #[test]
1555    fn test_plan_inner_join() {
1556        let store = create_test_store();
1557        let planner = Planner::new(store);
1558
1559        // Inner join between two scans
1560        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1561            items: vec![
1562                ReturnItem {
1563                    expression: LogicalExpression::Variable("a".to_string()),
1564                    alias: None,
1565                },
1566                ReturnItem {
1567                    expression: LogicalExpression::Variable("b".to_string()),
1568                    alias: None,
1569                },
1570            ],
1571            distinct: false,
1572            input: Box::new(LogicalOperator::Join(JoinOp {
1573                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1574                    variable: "a".to_string(),
1575                    label: Some("Person".to_string()),
1576                    input: None,
1577                })),
1578                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1579                    variable: "b".to_string(),
1580                    label: Some("Company".to_string()),
1581                    input: None,
1582                })),
1583                join_type: JoinType::Inner,
1584                conditions: vec![JoinCondition {
1585                    left: LogicalExpression::Variable("a".to_string()),
1586                    right: LogicalExpression::Variable("b".to_string()),
1587                }],
1588            })),
1589        }));
1590
1591        let physical = planner.plan(&logical).unwrap();
1592        assert!(physical.columns().contains(&"a".to_string()));
1593        assert!(physical.columns().contains(&"b".to_string()));
1594    }
1595
1596    #[test]
1597    fn test_plan_cross_join() {
1598        let store = create_test_store();
1599        let planner = Planner::new(store);
1600
1601        // Cross join (no conditions)
1602        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1603            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1604                variable: "a".to_string(),
1605                label: None,
1606                input: None,
1607            })),
1608            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1609                variable: "b".to_string(),
1610                label: None,
1611                input: None,
1612            })),
1613            join_type: JoinType::Cross,
1614            conditions: vec![],
1615        }));
1616
1617        let physical = planner.plan(&logical).unwrap();
1618        assert_eq!(physical.columns().len(), 2);
1619    }
1620
1621    #[test]
1622    fn test_plan_left_join() {
1623        let store = create_test_store();
1624        let planner = Planner::new(store);
1625
1626        let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
1627            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1628                variable: "a".to_string(),
1629                label: None,
1630                input: None,
1631            })),
1632            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1633                variable: "b".to_string(),
1634                label: None,
1635                input: None,
1636            })),
1637            join_type: JoinType::Left,
1638            conditions: vec![],
1639        }));
1640
1641        let physical = planner.plan(&logical).unwrap();
1642        assert_eq!(physical.columns().len(), 2);
1643    }
1644
1645    // ==================== Mutation Tests ====================
1646
1647    fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
1648        let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStore>);
1649        p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
1650        p
1651    }
1652
1653    #[test]
1654    fn test_plan_create_node() {
1655        let store = create_test_store();
1656        let planner = create_writable_planner(&store);
1657
1658        // CREATE (n:Person {name: 'Alix'})
1659        let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
1660            variable: "n".to_string(),
1661            labels: vec!["Person".to_string()],
1662            properties: vec![(
1663                "name".to_string(),
1664                LogicalExpression::Literal(Value::String("Alix".into())),
1665            )],
1666            input: None,
1667        }));
1668
1669        let physical = planner.plan(&logical).unwrap();
1670        assert!(physical.columns().contains(&"n".to_string()));
1671    }
1672
1673    #[test]
1674    fn test_plan_create_edge() {
1675        let store = create_test_store();
1676        let planner = create_writable_planner(&store);
1677
1678        // MATCH (a), (b) CREATE (a)-[:KNOWS]->(b)
1679        let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
1680            variable: Some("r".to_string()),
1681            from_variable: "a".to_string(),
1682            to_variable: "b".to_string(),
1683            edge_type: "KNOWS".to_string(),
1684            properties: vec![],
1685            input: Box::new(LogicalOperator::Join(JoinOp {
1686                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1687                    variable: "a".to_string(),
1688                    label: None,
1689                    input: None,
1690                })),
1691                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1692                    variable: "b".to_string(),
1693                    label: None,
1694                    input: None,
1695                })),
1696                join_type: JoinType::Cross,
1697                conditions: vec![],
1698            })),
1699        }));
1700
1701        let physical = planner.plan(&logical).unwrap();
1702        assert!(physical.columns().contains(&"r".to_string()));
1703    }
1704
1705    #[test]
1706    fn test_plan_delete_node() {
1707        let store = create_test_store();
1708        let planner = create_writable_planner(&store);
1709
1710        // MATCH (n) DELETE n
1711        let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
1712            variable: "n".to_string(),
1713            detach: false,
1714            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1715                variable: "n".to_string(),
1716                label: None,
1717                input: None,
1718            })),
1719        }));
1720
1721        let physical = planner.plan(&logical).unwrap();
1722        assert!(physical.columns().contains(&"n".to_string()));
1723    }
1724
1725    // ==================== Error Cases ====================
1726
1727    #[test]
1728    fn test_plan_empty_errors() {
1729        let store = create_test_store();
1730        let planner = Planner::new(store);
1731
1732        let logical = LogicalPlan::new(LogicalOperator::Empty);
1733        let result = planner.plan(&logical);
1734        assert!(result.is_err());
1735    }
1736
1737    #[test]
1738    fn test_plan_missing_variable_in_return() {
1739        let store = create_test_store();
1740        let planner = Planner::new(store);
1741
1742        // Return variable that doesn't exist in input
1743        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1744            items: vec![ReturnItem {
1745                expression: LogicalExpression::Variable("missing".to_string()),
1746                alias: None,
1747            }],
1748            distinct: false,
1749            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1750                variable: "n".to_string(),
1751                label: None,
1752                input: None,
1753            })),
1754        }));
1755
1756        let result = planner.plan(&logical);
1757        assert!(result.is_err());
1758    }
1759
1760    // ==================== Helper Function Tests ====================
1761
1762    #[test]
1763    fn test_convert_binary_ops() {
1764        assert!(convert_binary_op(BinaryOp::Eq).is_ok());
1765        assert!(convert_binary_op(BinaryOp::Ne).is_ok());
1766        assert!(convert_binary_op(BinaryOp::Lt).is_ok());
1767        assert!(convert_binary_op(BinaryOp::Le).is_ok());
1768        assert!(convert_binary_op(BinaryOp::Gt).is_ok());
1769        assert!(convert_binary_op(BinaryOp::Ge).is_ok());
1770        assert!(convert_binary_op(BinaryOp::And).is_ok());
1771        assert!(convert_binary_op(BinaryOp::Or).is_ok());
1772        assert!(convert_binary_op(BinaryOp::Add).is_ok());
1773        assert!(convert_binary_op(BinaryOp::Sub).is_ok());
1774        assert!(convert_binary_op(BinaryOp::Mul).is_ok());
1775        assert!(convert_binary_op(BinaryOp::Div).is_ok());
1776    }
1777
1778    #[test]
1779    fn test_convert_unary_ops() {
1780        assert!(convert_unary_op(UnaryOp::Not).is_ok());
1781        assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
1782        assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
1783        assert!(convert_unary_op(UnaryOp::Neg).is_ok());
1784    }
1785
1786    #[test]
1787    fn test_convert_aggregate_functions() {
1788        assert!(matches!(
1789            convert_aggregate_function(LogicalAggregateFunction::Count),
1790            PhysicalAggregateFunction::Count
1791        ));
1792        assert!(matches!(
1793            convert_aggregate_function(LogicalAggregateFunction::Sum),
1794            PhysicalAggregateFunction::Sum
1795        ));
1796        assert!(matches!(
1797            convert_aggregate_function(LogicalAggregateFunction::Avg),
1798            PhysicalAggregateFunction::Avg
1799        ));
1800        assert!(matches!(
1801            convert_aggregate_function(LogicalAggregateFunction::Min),
1802            PhysicalAggregateFunction::Min
1803        ));
1804        assert!(matches!(
1805            convert_aggregate_function(LogicalAggregateFunction::Max),
1806            PhysicalAggregateFunction::Max
1807        ));
1808    }
1809
1810    #[test]
1811    fn test_planner_accessors() {
1812        let store = create_test_store();
1813        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>);
1814
1815        assert!(planner.transaction_id().is_none());
1816        assert!(planner.transaction_manager().is_none());
1817        let _ = planner.viewing_epoch(); // Just ensure it's accessible
1818    }
1819
1820    #[test]
1821    fn test_physical_plan_accessors() {
1822        let store = create_test_store();
1823        let planner = Planner::new(store);
1824
1825        let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
1826            variable: "n".to_string(),
1827            label: None,
1828            input: None,
1829        }));
1830
1831        let physical = planner.plan(&logical).unwrap();
1832        assert_eq!(physical.columns(), &["n"]);
1833
1834        // Test into_operator
1835        let _ = physical.into_operator();
1836    }
1837
1838    // ==================== Adaptive Planning Tests ====================
1839
1840    #[test]
1841    fn test_plan_adaptive_with_scan() {
1842        let store = create_test_store();
1843        let planner = Planner::new(store);
1844
1845        // MATCH (n:Person) RETURN n
1846        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1847            items: vec![ReturnItem {
1848                expression: LogicalExpression::Variable("n".to_string()),
1849                alias: None,
1850            }],
1851            distinct: false,
1852            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1853                variable: "n".to_string(),
1854                label: Some("Person".to_string()),
1855                input: None,
1856            })),
1857        }));
1858
1859        let physical = planner.plan_adaptive(&logical).unwrap();
1860        assert_eq!(physical.columns(), &["n"]);
1861        // Should have adaptive context with estimates
1862        assert!(physical.adaptive_context.is_some());
1863    }
1864
1865    #[test]
1866    fn test_plan_adaptive_with_filter() {
1867        let store = create_test_store();
1868        let planner = Planner::new(store);
1869
1870        // MATCH (n) WHERE n.age > 30 RETURN n
1871        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1872            items: vec![ReturnItem {
1873                expression: LogicalExpression::Variable("n".to_string()),
1874                alias: None,
1875            }],
1876            distinct: false,
1877            input: Box::new(LogicalOperator::Filter(FilterOp {
1878                predicate: LogicalExpression::Binary {
1879                    left: Box::new(LogicalExpression::Property {
1880                        variable: "n".to_string(),
1881                        property: "age".to_string(),
1882                    }),
1883                    op: BinaryOp::Gt,
1884                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1885                },
1886                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1887                    variable: "n".to_string(),
1888                    label: None,
1889                    input: None,
1890                })),
1891                pushdown_hint: None,
1892            })),
1893        }));
1894
1895        let physical = planner.plan_adaptive(&logical).unwrap();
1896        assert!(physical.adaptive_context.is_some());
1897    }
1898
1899    #[test]
1900    fn test_plan_adaptive_with_expand() {
1901        let store = create_test_store();
1902        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
1903            .with_factorized_execution(false);
1904
1905        // MATCH (a)-[:KNOWS]->(b) RETURN a, b
1906        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1907            items: vec![
1908                ReturnItem {
1909                    expression: LogicalExpression::Variable("a".to_string()),
1910                    alias: None,
1911                },
1912                ReturnItem {
1913                    expression: LogicalExpression::Variable("b".to_string()),
1914                    alias: None,
1915                },
1916            ],
1917            distinct: false,
1918            input: Box::new(LogicalOperator::Expand(ExpandOp {
1919                from_variable: "a".to_string(),
1920                to_variable: "b".to_string(),
1921                edge_variable: None,
1922                direction: ExpandDirection::Outgoing,
1923                edge_types: vec!["KNOWS".to_string()],
1924                min_hops: 1,
1925                max_hops: Some(1),
1926                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1927                    variable: "a".to_string(),
1928                    label: None,
1929                    input: None,
1930                })),
1931                path_alias: None,
1932                path_mode: PathMode::Walk,
1933            })),
1934        }));
1935
1936        let physical = planner.plan_adaptive(&logical).unwrap();
1937        assert!(physical.adaptive_context.is_some());
1938    }
1939
1940    #[test]
1941    fn test_plan_adaptive_with_join() {
1942        let store = create_test_store();
1943        let planner = Planner::new(store);
1944
1945        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
1946            items: vec![
1947                ReturnItem {
1948                    expression: LogicalExpression::Variable("a".to_string()),
1949                    alias: None,
1950                },
1951                ReturnItem {
1952                    expression: LogicalExpression::Variable("b".to_string()),
1953                    alias: None,
1954                },
1955            ],
1956            distinct: false,
1957            input: Box::new(LogicalOperator::Join(JoinOp {
1958                left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1959                    variable: "a".to_string(),
1960                    label: None,
1961                    input: None,
1962                })),
1963                right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1964                    variable: "b".to_string(),
1965                    label: None,
1966                    input: None,
1967                })),
1968                join_type: JoinType::Cross,
1969                conditions: vec![],
1970            })),
1971        }));
1972
1973        let physical = planner.plan_adaptive(&logical).unwrap();
1974        assert!(physical.adaptive_context.is_some());
1975    }
1976
1977    #[test]
1978    fn test_plan_adaptive_with_aggregate() {
1979        let store = create_test_store();
1980        let planner = Planner::new(store);
1981
1982        let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
1983            group_by: vec![],
1984            aggregates: vec![LogicalAggregateExpr {
1985                function: LogicalAggregateFunction::Count,
1986                expression: Some(LogicalExpression::Variable("n".to_string())),
1987                expression2: None,
1988                distinct: false,
1989                alias: Some("cnt".to_string()),
1990                percentile: None,
1991                separator: None,
1992            }],
1993            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1994                variable: "n".to_string(),
1995                label: None,
1996                input: None,
1997            })),
1998            having: None,
1999        }));
2000
2001        let physical = planner.plan_adaptive(&logical).unwrap();
2002        assert!(physical.adaptive_context.is_some());
2003    }
2004
2005    #[test]
2006    fn test_plan_adaptive_with_distinct() {
2007        let store = create_test_store();
2008        let planner = Planner::new(store);
2009
2010        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2011            items: vec![ReturnItem {
2012                expression: LogicalExpression::Variable("n".to_string()),
2013                alias: None,
2014            }],
2015            distinct: false,
2016            input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
2017                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2018                    variable: "n".to_string(),
2019                    label: None,
2020                    input: None,
2021                })),
2022                columns: None,
2023            })),
2024        }));
2025
2026        let physical = planner.plan_adaptive(&logical).unwrap();
2027        assert!(physical.adaptive_context.is_some());
2028    }
2029
2030    #[test]
2031    fn test_plan_adaptive_with_limit() {
2032        let store = create_test_store();
2033        let planner = Planner::new(store);
2034
2035        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2036            items: vec![ReturnItem {
2037                expression: LogicalExpression::Variable("n".to_string()),
2038                alias: None,
2039            }],
2040            distinct: false,
2041            input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
2042                count: 10.into(),
2043                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2044                    variable: "n".to_string(),
2045                    label: None,
2046                    input: None,
2047                })),
2048            })),
2049        }));
2050
2051        let physical = planner.plan_adaptive(&logical).unwrap();
2052        assert!(physical.adaptive_context.is_some());
2053    }
2054
2055    #[test]
2056    fn test_plan_adaptive_with_skip() {
2057        let store = create_test_store();
2058        let planner = Planner::new(store);
2059
2060        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2061            items: vec![ReturnItem {
2062                expression: LogicalExpression::Variable("n".to_string()),
2063                alias: None,
2064            }],
2065            distinct: false,
2066            input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
2067                count: 5.into(),
2068                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2069                    variable: "n".to_string(),
2070                    label: None,
2071                    input: None,
2072                })),
2073            })),
2074        }));
2075
2076        let physical = planner.plan_adaptive(&logical).unwrap();
2077        assert!(physical.adaptive_context.is_some());
2078    }
2079
2080    #[test]
2081    fn test_plan_adaptive_with_sort() {
2082        let store = create_test_store();
2083        let planner = Planner::new(store);
2084
2085        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2086            items: vec![ReturnItem {
2087                expression: LogicalExpression::Variable("n".to_string()),
2088                alias: None,
2089            }],
2090            distinct: false,
2091            input: Box::new(LogicalOperator::Sort(SortOp {
2092                keys: vec![SortKey {
2093                    expression: LogicalExpression::Variable("n".to_string()),
2094                    order: SortOrder::Ascending,
2095                    nulls: None,
2096                }],
2097                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2098                    variable: "n".to_string(),
2099                    label: None,
2100                    input: None,
2101                })),
2102            })),
2103        }));
2104
2105        let physical = planner.plan_adaptive(&logical).unwrap();
2106        assert!(physical.adaptive_context.is_some());
2107    }
2108
2109    #[test]
2110    fn test_plan_adaptive_with_union() {
2111        let store = create_test_store();
2112        let planner = Planner::new(store);
2113
2114        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2115            items: vec![ReturnItem {
2116                expression: LogicalExpression::Variable("n".to_string()),
2117                alias: None,
2118            }],
2119            distinct: false,
2120            input: Box::new(LogicalOperator::Union(UnionOp {
2121                inputs: vec![
2122                    LogicalOperator::NodeScan(NodeScanOp {
2123                        variable: "n".to_string(),
2124                        label: Some("Person".to_string()),
2125                        input: None,
2126                    }),
2127                    LogicalOperator::NodeScan(NodeScanOp {
2128                        variable: "n".to_string(),
2129                        label: Some("Company".to_string()),
2130                        input: None,
2131                    }),
2132                ],
2133            })),
2134        }));
2135
2136        let physical = planner.plan_adaptive(&logical).unwrap();
2137        assert!(physical.adaptive_context.is_some());
2138    }
2139
2140    // ==================== Variable Length Path Tests ====================
2141
2142    #[test]
2143    fn test_plan_expand_variable_length() {
2144        let store = create_test_store();
2145        let planner = Planner::new(store);
2146
2147        // MATCH (a)-[:KNOWS*1..3]->(b) RETURN a, b
2148        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2149            items: vec![
2150                ReturnItem {
2151                    expression: LogicalExpression::Variable("a".to_string()),
2152                    alias: None,
2153                },
2154                ReturnItem {
2155                    expression: LogicalExpression::Variable("b".to_string()),
2156                    alias: None,
2157                },
2158            ],
2159            distinct: false,
2160            input: Box::new(LogicalOperator::Expand(ExpandOp {
2161                from_variable: "a".to_string(),
2162                to_variable: "b".to_string(),
2163                edge_variable: None,
2164                direction: ExpandDirection::Outgoing,
2165                edge_types: vec!["KNOWS".to_string()],
2166                min_hops: 1,
2167                max_hops: Some(3),
2168                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2169                    variable: "a".to_string(),
2170                    label: None,
2171                    input: None,
2172                })),
2173                path_alias: None,
2174                path_mode: PathMode::Walk,
2175            })),
2176        }));
2177
2178        let physical = planner.plan(&logical).unwrap();
2179        assert!(physical.columns().contains(&"a".to_string()));
2180        assert!(physical.columns().contains(&"b".to_string()));
2181    }
2182
2183    #[test]
2184    fn test_plan_expand_with_path_alias() {
2185        let store = create_test_store();
2186        let planner = Planner::new(store);
2187
2188        // MATCH p = (a)-[:KNOWS*1..3]->(b) RETURN a, b
2189        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2190            items: vec![
2191                ReturnItem {
2192                    expression: LogicalExpression::Variable("a".to_string()),
2193                    alias: None,
2194                },
2195                ReturnItem {
2196                    expression: LogicalExpression::Variable("b".to_string()),
2197                    alias: None,
2198                },
2199            ],
2200            distinct: false,
2201            input: Box::new(LogicalOperator::Expand(ExpandOp {
2202                from_variable: "a".to_string(),
2203                to_variable: "b".to_string(),
2204                edge_variable: None,
2205                direction: ExpandDirection::Outgoing,
2206                edge_types: vec!["KNOWS".to_string()],
2207                min_hops: 1,
2208                max_hops: Some(3),
2209                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2210                    variable: "a".to_string(),
2211                    label: None,
2212                    input: None,
2213                })),
2214                path_alias: Some("p".to_string()),
2215                path_mode: PathMode::Walk,
2216            })),
2217        }));
2218
2219        let physical = planner.plan(&logical).unwrap();
2220        // Verify plan was created successfully with expected output columns
2221        assert!(physical.columns().contains(&"a".to_string()));
2222        assert!(physical.columns().contains(&"b".to_string()));
2223    }
2224
2225    #[test]
2226    fn test_plan_expand_incoming() {
2227        let store = create_test_store();
2228        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2229            .with_factorized_execution(false);
2230
2231        // MATCH (a)<-[:KNOWS]-(b) RETURN a, b
2232        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2233            items: vec![
2234                ReturnItem {
2235                    expression: LogicalExpression::Variable("a".to_string()),
2236                    alias: None,
2237                },
2238                ReturnItem {
2239                    expression: LogicalExpression::Variable("b".to_string()),
2240                    alias: None,
2241                },
2242            ],
2243            distinct: false,
2244            input: Box::new(LogicalOperator::Expand(ExpandOp {
2245                from_variable: "a".to_string(),
2246                to_variable: "b".to_string(),
2247                edge_variable: None,
2248                direction: ExpandDirection::Incoming,
2249                edge_types: vec!["KNOWS".to_string()],
2250                min_hops: 1,
2251                max_hops: Some(1),
2252                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2253                    variable: "a".to_string(),
2254                    label: None,
2255                    input: None,
2256                })),
2257                path_alias: None,
2258                path_mode: PathMode::Walk,
2259            })),
2260        }));
2261
2262        let physical = planner.plan(&logical).unwrap();
2263        assert!(physical.columns().contains(&"a".to_string()));
2264        assert!(physical.columns().contains(&"b".to_string()));
2265    }
2266
2267    #[test]
2268    fn test_plan_expand_both_directions() {
2269        let store = create_test_store();
2270        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2271            .with_factorized_execution(false);
2272
2273        // MATCH (a)-[:KNOWS]-(b) RETURN a, b
2274        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2275            items: vec![
2276                ReturnItem {
2277                    expression: LogicalExpression::Variable("a".to_string()),
2278                    alias: None,
2279                },
2280                ReturnItem {
2281                    expression: LogicalExpression::Variable("b".to_string()),
2282                    alias: None,
2283                },
2284            ],
2285            distinct: false,
2286            input: Box::new(LogicalOperator::Expand(ExpandOp {
2287                from_variable: "a".to_string(),
2288                to_variable: "b".to_string(),
2289                edge_variable: None,
2290                direction: ExpandDirection::Both,
2291                edge_types: vec!["KNOWS".to_string()],
2292                min_hops: 1,
2293                max_hops: Some(1),
2294                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2295                    variable: "a".to_string(),
2296                    label: None,
2297                    input: None,
2298                })),
2299                path_alias: None,
2300                path_mode: PathMode::Walk,
2301            })),
2302        }));
2303
2304        let physical = planner.plan(&logical).unwrap();
2305        assert!(physical.columns().contains(&"a".to_string()));
2306        assert!(physical.columns().contains(&"b".to_string()));
2307    }
2308
2309    // ==================== With Context Tests ====================
2310
2311    #[test]
2312    fn test_planner_with_context() {
2313        use crate::transaction::TransactionManager;
2314
2315        let store = create_test_store();
2316        let transaction_manager = Arc::new(TransactionManager::new());
2317        let transaction_id = transaction_manager.begin();
2318        let epoch = transaction_manager.current_epoch();
2319
2320        let planner = Planner::with_context(
2321            Arc::clone(&store) as Arc<dyn GraphStore>,
2322            Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
2323            Arc::clone(&transaction_manager),
2324            Some(transaction_id),
2325            epoch,
2326        );
2327
2328        assert_eq!(planner.transaction_id(), Some(transaction_id));
2329        assert!(planner.transaction_manager().is_some());
2330        assert_eq!(planner.viewing_epoch(), epoch);
2331    }
2332
2333    #[test]
2334    fn test_planner_with_factorized_execution_disabled() {
2335        let store = create_test_store();
2336        let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
2337            .with_factorized_execution(false);
2338
2339        // Two consecutive expands - should NOT use factorized execution
2340        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2341            items: vec![
2342                ReturnItem {
2343                    expression: LogicalExpression::Variable("a".to_string()),
2344                    alias: None,
2345                },
2346                ReturnItem {
2347                    expression: LogicalExpression::Variable("c".to_string()),
2348                    alias: None,
2349                },
2350            ],
2351            distinct: false,
2352            input: Box::new(LogicalOperator::Expand(ExpandOp {
2353                from_variable: "b".to_string(),
2354                to_variable: "c".to_string(),
2355                edge_variable: None,
2356                direction: ExpandDirection::Outgoing,
2357                edge_types: vec![],
2358                min_hops: 1,
2359                max_hops: Some(1),
2360                input: Box::new(LogicalOperator::Expand(ExpandOp {
2361                    from_variable: "a".to_string(),
2362                    to_variable: "b".to_string(),
2363                    edge_variable: None,
2364                    direction: ExpandDirection::Outgoing,
2365                    edge_types: vec![],
2366                    min_hops: 1,
2367                    max_hops: Some(1),
2368                    input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2369                        variable: "a".to_string(),
2370                        label: None,
2371                        input: None,
2372                    })),
2373                    path_alias: None,
2374                    path_mode: PathMode::Walk,
2375                })),
2376                path_alias: None,
2377                path_mode: PathMode::Walk,
2378            })),
2379        }));
2380
2381        let physical = planner.plan(&logical).unwrap();
2382        assert!(physical.columns().contains(&"a".to_string()));
2383        assert!(physical.columns().contains(&"c".to_string()));
2384    }
2385
2386    // ==================== Sort with Property Tests ====================
2387
2388    #[test]
2389    fn test_plan_sort_by_property() {
2390        let store = create_test_store();
2391        let planner = Planner::new(store);
2392
2393        // MATCH (n) RETURN n ORDER BY n.name ASC
2394        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2395            items: vec![ReturnItem {
2396                expression: LogicalExpression::Variable("n".to_string()),
2397                alias: None,
2398            }],
2399            distinct: false,
2400            input: Box::new(LogicalOperator::Sort(SortOp {
2401                keys: vec![SortKey {
2402                    expression: LogicalExpression::Property {
2403                        variable: "n".to_string(),
2404                        property: "name".to_string(),
2405                    },
2406                    order: SortOrder::Ascending,
2407                    nulls: None,
2408                }],
2409                input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2410                    variable: "n".to_string(),
2411                    label: None,
2412                    input: None,
2413                })),
2414            })),
2415        }));
2416
2417        let physical = planner.plan(&logical).unwrap();
2418        // Should have the property column projected
2419        assert!(physical.columns().contains(&"n".to_string()));
2420    }
2421
2422    // ==================== Scan with Input Tests ====================
2423
2424    #[test]
2425    fn test_plan_scan_with_input() {
2426        let store = create_test_store();
2427        let planner = Planner::new(store);
2428
2429        // A scan with another scan as input (for chained patterns)
2430        let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
2431            items: vec![
2432                ReturnItem {
2433                    expression: LogicalExpression::Variable("a".to_string()),
2434                    alias: None,
2435                },
2436                ReturnItem {
2437                    expression: LogicalExpression::Variable("b".to_string()),
2438                    alias: None,
2439                },
2440            ],
2441            distinct: false,
2442            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2443                variable: "b".to_string(),
2444                label: Some("Company".to_string()),
2445                input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
2446                    variable: "a".to_string(),
2447                    label: Some("Person".to_string()),
2448                    input: None,
2449                }))),
2450            })),
2451        }));
2452
2453        let physical = planner.plan(&logical).unwrap();
2454        assert!(physical.columns().contains(&"a".to_string()));
2455        assert!(physical.columns().contains(&"b".to_string()));
2456    }
2457}